View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
20  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNull;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.List;
30  import java.util.Random;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.MultithreadedTestUtil;
48  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
49  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
50  import org.apache.hadoop.hbase.TableName;
51  import org.apache.hadoop.hbase.client.Append;
52  import org.apache.hadoop.hbase.client.Delete;
53  import org.apache.hadoop.hbase.client.Durability;
54  import org.apache.hadoop.hbase.client.Get;
55  import org.apache.hadoop.hbase.client.Increment;
56  import org.apache.hadoop.hbase.client.Mutation;
57  import org.apache.hadoop.hbase.client.Put;
58  import org.apache.hadoop.hbase.client.Result;
59  import org.apache.hadoop.hbase.client.RowMutations;
60  import org.apache.hadoop.hbase.client.Scan;
61  import org.apache.hadoop.hbase.filter.BinaryComparator;
62  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
63  import org.apache.hadoop.hbase.io.HeapSize;
64  import org.apache.hadoop.hbase.io.hfile.BlockCache;
65  import org.apache.hadoop.hbase.testclassification.MediumTests;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.wal.WAL;
68  import org.junit.After;
69  import org.junit.Before;
70  import org.junit.Rule;
71  import org.junit.Test;
72  import org.junit.experimental.categories.Category;
73  import org.junit.rules.TestName;
74  
75  /**
76   * Testing of HRegion.incrementColumnValue, HRegion.increment,
77   * and HRegion.append
78   */
79  @Category(MediumTests.class) // Starts 100 threads
80  public class TestAtomicOperation {
81    private static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
82    @Rule public TestName name = new TestName();
83  
84    HRegion region = null;
85    private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
86  
87    // Test names
88    static  byte[] tableName;
89    static final byte[] qual1 = Bytes.toBytes("qual1");
90    static final byte[] qual2 = Bytes.toBytes("qual2");
91    static final byte[] qual3 = Bytes.toBytes("qual3");
92    static final byte[] value1 = Bytes.toBytes("value1");
93    static final byte[] value2 = Bytes.toBytes("value2");
94    static final byte [] row = Bytes.toBytes("rowA");
95    static final byte [] row2 = Bytes.toBytes("rowB");
96  
97    @Before
98    public void setup() {
99      tableName = Bytes.toBytes(name.getMethodName());
100   }
101 
102   @After
103   public void teardown() throws IOException {
104     if (region != null) {
105       BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache();
106       ((HRegion)region).close();
107       WAL wal = ((HRegion)region).getWAL();
108       if (wal != null) wal.close();
109       if (bc != null) bc.shutdown();
110       region = null;
111     }
112   }
113   //////////////////////////////////////////////////////////////////////////////
114   // New tests that doesn't spin up a mini cluster but rather just test the
115   // individual code pieces in the HRegion.
116   //////////////////////////////////////////////////////////////////////////////
117 
118   /**
119    * Test basic append operation.
120    * More tests in
121    * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
122    */
123   @Test
124   public void testAppend() throws IOException {
125     initHRegion(tableName, name.getMethodName(), fam1);
126     String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
127     " The Universe, and Everything";
128     String v2 = " is... 42.";
129     Append a = new Append(row);
130     a.setReturnResults(false);
131     a.add(fam1, qual1, Bytes.toBytes(v1));
132     a.add(fam1, qual2, Bytes.toBytes(v2));
133     assertNull(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE));
134     a = new Append(row);
135     a.add(fam1, qual1, Bytes.toBytes(v2));
136     a.add(fam1, qual2, Bytes.toBytes(v1));
137     Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
138     assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
139     assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
140   }
141 
142   /**
143    * Test multi-threaded increments.
144    */
145   @Test
146   public void testIncrementMultiThreads() throws IOException {
147     LOG.info("Starting test testIncrementMultiThreads");
148     // run a with mixed column families (1 and 3 versions)
149     initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
150 
151     // Create 100 threads, each will increment by its own quantity
152     int numThreads = 100;
153     int incrementsPerThread = 1000;
154     Incrementer[] all = new Incrementer[numThreads];
155     int expectedTotal = 0;
156     // create all threads
157     for (int i = 0; i < numThreads; i++) {
158       all[i] = new Incrementer(region, i, i, incrementsPerThread);
159       expectedTotal += (i * incrementsPerThread);
160     }
161 
162     // run all threads
163     for (int i = 0; i < numThreads; i++) {
164       all[i].start();
165     }
166 
167     // wait for all threads to finish
168     for (int i = 0; i < numThreads; i++) {
169       try {
170         all[i].join();
171       } catch (InterruptedException e) {
172         LOG.info("Ignored", e);
173       }
174     }
175     assertICV(row, fam1, qual1, expectedTotal);
176     assertICV(row, fam1, qual2, expectedTotal*2);
177     assertICV(row, fam2, qual3, expectedTotal*3);
178     LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
179   }
180 
181 
182   private void assertICV(byte [] row,
183                          byte [] familiy,
184                          byte[] qualifier,
185                          long amount) throws IOException {
186     // run a get and see?
187     Get get = new Get(row);
188     get.addColumn(familiy, qualifier);
189     Result result = region.get(get);
190     assertEquals(1, result.size());
191 
192     Cell kv = result.rawCells()[0];
193     long r = Bytes.toLong(CellUtil.cloneValue(kv));
194     assertEquals(amount, r);
195   }
196 
197   private void initHRegion (byte [] tableName, String callingMethod,
198       byte[] ... families)
199     throws IOException {
200     initHRegion(tableName, callingMethod, null, families);
201   }
202 
203   private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
204     byte[] ... families)
205   throws IOException {
206     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
207     int i=0;
208     for(byte [] family : families) {
209       HColumnDescriptor hcd = new HColumnDescriptor(family);
210       hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
211       htd.addFamily(hcd);
212     }
213     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
214     region = TEST_UTIL.createLocalHRegion(info, htd);
215   }
216 
217   /**
218    * A thread that makes a few increment calls
219    */
220   public static class Incrementer extends Thread {
221 
222     private final Region region;
223     private final int numIncrements;
224     private final int amount;
225 
226 
227     public Incrementer(Region region,
228         int threadNumber, int amount, int numIncrements) {
229       super("incrementer." + threadNumber);
230       this.region = region;
231       this.numIncrements = numIncrements;
232       this.amount = amount;
233       setDaemon(true);
234     }
235 
236     @Override
237     public void run() {
238       for (int i = 0; i < numIncrements; i++) {
239         try {
240           Increment inc = new Increment(row);
241           inc.addColumn(fam1, qual1, amount);
242           inc.addColumn(fam1, qual2, amount*2);
243           inc.addColumn(fam2, qual3, amount*3);
244           inc.setDurability(Durability.ASYNC_WAL);
245           region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
246 
247           // verify: Make sure we only see completed increments
248           Get g = new Get(row);
249           Result result = region.get(g);
250           if (result != null) {
251             assertTrue(result.getValue(fam1, qual1) != null);
252             assertTrue(result.getValue(fam1, qual2) != null);
253             assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
254               Bytes.toLong(result.getValue(fam1, qual2)));
255             assertTrue(result.getValue(fam2, qual3) != null);
256             assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3,
257               Bytes.toLong(result.getValue(fam2, qual3)));
258           }
259         } catch (IOException e) {
260           e.printStackTrace();
261         }
262       }
263     }
264   }
265 
266   @Test
267   public void testAppendMultiThreads() throws IOException {
268     LOG.info("Starting test testAppendMultiThreads");
269     // run a with mixed column families (1 and 3 versions)
270     initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
271 
272     int numThreads = 100;
273     int opsPerThread = 100;
274     AtomicOperation[] all = new AtomicOperation[numThreads];
275     final byte[] val = new byte[]{1};
276 
277     AtomicInteger failures = new AtomicInteger(0);
278     // create all threads
279     for (int i = 0; i < numThreads; i++) {
280       all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
281         @Override
282         public void run() {
283           for (int i=0; i<numOps; i++) {
284             try {
285               Append a = new Append(row);
286               a.add(fam1, qual1, val);
287               a.add(fam1, qual2, val);
288               a.add(fam2, qual3, val);
289               a.setDurability(Durability.ASYNC_WAL);
290               region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
291 
292               Get g = new Get(row);
293               Result result = region.get(g);
294               assertEquals(result.getValue(fam1, qual1).length,
295                   result.getValue(fam1, qual2).length);
296               assertEquals(result.getValue(fam1, qual1).length,
297                   result.getValue(fam2, qual3).length);
298             } catch (IOException e) {
299               e.printStackTrace();
300               failures.incrementAndGet();
301               fail();
302             }
303           }
304         }
305       };
306     }
307 
308     // run all threads
309     for (int i = 0; i < numThreads; i++) {
310       all[i].start();
311     }
312 
313     // wait for all threads to finish
314     for (int i = 0; i < numThreads; i++) {
315       try {
316         all[i].join();
317       } catch (InterruptedException e) {
318       }
319     }
320     assertEquals(0, failures.get());
321     Get g = new Get(row);
322     Result result = region.get(g);
323     assertEquals(result.getValue(fam1, qual1).length, 10000);
324     assertEquals(result.getValue(fam1, qual2).length, 10000);
325     assertEquals(result.getValue(fam2, qual3).length, 10000);
326   }
327   /**
328    * Test multi-threaded row mutations.
329    */
330   @Test
331   public void testRowMutationMultiThreads() throws IOException {
332     LOG.info("Starting test testRowMutationMultiThreads");
333     initHRegion(tableName, name.getMethodName(), fam1);
334 
335     // create 10 threads, each will alternate between adding and
336     // removing a column
337     int numThreads = 10;
338     int opsPerThread = 250;
339     AtomicOperation[] all = new AtomicOperation[numThreads];
340 
341     AtomicLong timeStamps = new AtomicLong(0);
342     AtomicInteger failures = new AtomicInteger(0);
343     // create all threads
344     for (int i = 0; i < numThreads; i++) {
345       all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
346         @Override
347         public void run() {
348           boolean op = true;
349           for (int i=0; i<numOps; i++) {
350             try {
351               // throw in some flushes
352               if (i%10==0) {
353                 synchronized(region) {
354                   LOG.debug("flushing");
355                   region.flush(true);
356                   if (i%100==0) {
357                     region.compact(false);
358                   }
359                 }
360               }
361               long ts = timeStamps.incrementAndGet();
362               RowMutations rm = new RowMutations(row);
363               if (op) {
364                 Put p = new Put(row, ts);
365                 p.add(fam1, qual1, value1);
366                 p.setDurability(Durability.ASYNC_WAL);
367                 rm.add(p);
368                 Delete d = new Delete(row);
369                 d.deleteColumns(fam1, qual2, ts);
370                 d.setDurability(Durability.ASYNC_WAL);
371                 rm.add(d);
372               } else {
373                 Delete d = new Delete(row);
374                 d.deleteColumns(fam1, qual1, ts);
375                 d.setDurability(Durability.ASYNC_WAL);
376                 rm.add(d);
377                 Put p = new Put(row, ts);
378                 p.add(fam1, qual2, value2);
379                 p.setDurability(Durability.ASYNC_WAL);
380                 rm.add(p);
381               }
382               region.mutateRow(rm);
383               op ^= true;
384               // check: should always see exactly one column
385               Get g = new Get(row);
386               Result r = region.get(g);
387               if (r.size() != 1) {
388                 LOG.debug(r);
389                 failures.incrementAndGet();
390                 fail();
391               }
392             } catch (IOException e) {
393               e.printStackTrace();
394               failures.incrementAndGet();
395               fail();
396             }
397           }
398         }
399       };
400     }
401 
402     // run all threads
403     for (int i = 0; i < numThreads; i++) {
404       all[i].start();
405     }
406 
407     // wait for all threads to finish
408     for (int i = 0; i < numThreads; i++) {
409       try {
410         all[i].join();
411       } catch (InterruptedException e) {
412       }
413     }
414     assertEquals(0, failures.get());
415   }
416 
417 
418   /**
419    * Test multi-threaded region mutations.
420    */
421   @Test
422   public void testMultiRowMutationMultiThreads() throws IOException {
423 
424     LOG.info("Starting test testMultiRowMutationMultiThreads");
425     initHRegion(tableName, name.getMethodName(), fam1);
426 
427     // create 10 threads, each will alternate between adding and
428     // removing a column
429     int numThreads = 10;
430     int opsPerThread = 250;
431     AtomicOperation[] all = new AtomicOperation[numThreads];
432 
433     AtomicLong timeStamps = new AtomicLong(0);
434     AtomicInteger failures = new AtomicInteger(0);
435     final List<byte[]> rowsToLock = Arrays.asList(row, row2);
436     // create all threads
437     for (int i = 0; i < numThreads; i++) {
438       all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
439         @Override
440         public void run() {
441           boolean op = true;
442           for (int i=0; i<numOps; i++) {
443             try {
444               // throw in some flushes
445               if (i%10==0) {
446                 synchronized(region) {
447                   LOG.debug("flushing");
448                   region.flush(true);
449                   if (i%100==0) {
450                     region.compact(false);
451                   }
452                 }
453               }
454               long ts = timeStamps.incrementAndGet();
455               List<Mutation> mrm = new ArrayList<Mutation>();
456               if (op) {
457                 Put p = new Put(row2, ts);
458                 p.add(fam1, qual1, value1);
459                 p.setDurability(Durability.ASYNC_WAL);
460                 mrm.add(p);
461                 Delete d = new Delete(row);
462                 d.deleteColumns(fam1, qual1, ts);
463                 d.setDurability(Durability.ASYNC_WAL);
464                 mrm.add(d);
465               } else {
466                 Delete d = new Delete(row2);
467                 d.deleteColumns(fam1, qual1, ts);
468                 d.setDurability(Durability.ASYNC_WAL);
469                 mrm.add(d);
470                 Put p = new Put(row, ts);
471                 p.setDurability(Durability.ASYNC_WAL);
472                 p.add(fam1, qual1, value2);
473                 mrm.add(p);
474               }
475               region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
476               op ^= true;
477               // check: should always see exactly one column
478               Scan s = new Scan(row);
479               RegionScanner rs = region.getScanner(s);
480               List<Cell> r = new ArrayList<Cell>();
481               while (rs.next(r))
482                 ;
483               rs.close();
484               if (r.size() != 1) {
485                 LOG.debug(r);
486                 failures.incrementAndGet();
487                 fail();
488               }
489             } catch (IOException e) {
490               e.printStackTrace();
491               failures.incrementAndGet();
492               fail();
493             }
494           }
495         }
496       };
497     }
498 
499     // run all threads
500     for (int i = 0; i < numThreads; i++) {
501       all[i].start();
502     }
503 
504     // wait for all threads to finish
505     for (int i = 0; i < numThreads; i++) {
506       try {
507         all[i].join();
508       } catch (InterruptedException e) {
509       }
510     }
511     assertEquals(0, failures.get());
512   }
513 
514   public static class AtomicOperation extends Thread {
515     protected final HRegion region;
516     protected final int numOps;
517     protected final AtomicLong timeStamps;
518     protected final AtomicInteger failures;
519     protected final Random r = new Random();
520 
521     public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
522         AtomicInteger failures) {
523       this.region = region;
524       this.numOps = numOps;
525       this.timeStamps = timeStamps;
526       this.failures = failures;
527     }
528   }
529 
530   private static CountDownLatch latch = new CountDownLatch(1);
531   private enum TestStep {
532     INIT,                  // initial put of 10 to set value of the cell
533     PUT_STARTED,           // began doing a put of 50 to cell
534     PUT_COMPLETED,         // put complete (released RowLock, but may not have advanced MVCC).
535     CHECKANDPUT_STARTED,   // began checkAndPut: if 10 -> 11
536     CHECKANDPUT_COMPLETED  // completed checkAndPut
537     // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
538   }
539   private static volatile TestStep testStep = TestStep.INIT;
540   private final String family = "f1";
541 
542   /**
543    * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
544    * MVCC.
545    *
546    * Moved into TestAtomicOperation from its original location, TestHBase7051
547    */
548   @Test
549   public void testPutAndCheckAndPutInParallel() throws Exception {
550 
551     final String tableName = "testPutAndCheckAndPut";
552     Configuration conf = TEST_UTIL.getConfiguration();
553     conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
554     final MockHRegion region = (MockHRegion) TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
555         null, null, tableName, conf, false, Durability.SYNC_WAL, null, Bytes.toBytes(family));
556 
557     Put[] puts = new Put[1];
558     Put put = new Put(Bytes.toBytes("r1"));
559     put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
560     puts[0] = put;
561 
562     region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
563     MultithreadedTestUtil.TestContext ctx =
564       new MultithreadedTestUtil.TestContext(conf);
565     ctx.addThread(new PutThread(ctx, region));
566     ctx.addThread(new CheckAndPutThread(ctx, region));
567     ctx.startThreads();
568     while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
569       Thread.sleep(100);
570     }
571     ctx.stop();
572     Scan s = new Scan();
573     RegionScanner scanner = region.getScanner(s);
574     List<Cell> results = new ArrayList<Cell>();
575     ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
576     scanner.next(results, scannerContext);
577     for (Cell keyValue : results) {
578       assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
579     }
580   }
581 
582   private class PutThread extends TestThread {
583     private HRegion region;
584     PutThread(TestContext ctx, HRegion region) {
585       super(ctx);
586       this.region = region;
587     }
588 
589     public void doWork() throws Exception {
590       Put[] puts = new Put[1];
591       Put put = new Put(Bytes.toBytes("r1"));
592       put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
593       puts[0] = put;
594       testStep = TestStep.PUT_STARTED;
595       region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
596     }
597   }
598 
599   private class CheckAndPutThread extends TestThread {
600     private HRegion region;
601     CheckAndPutThread(TestContext ctx, HRegion region) {
602       super(ctx);
603       this.region = region;
604    }
605 
606     public void doWork() throws Exception {
607       Put[] puts = new Put[1];
608       Put put = new Put(Bytes.toBytes("r1"));
609       put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
610       puts[0] = put;
611       while (testStep != TestStep.PUT_COMPLETED) {
612         Thread.sleep(100);
613       }
614       testStep = TestStep.CHECKANDPUT_STARTED;
615       region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
616         CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
617       testStep = TestStep.CHECKANDPUT_COMPLETED;
618     }
619   }
620 
621   public static class MockHRegion extends HRegion {
622 
623     public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
624         final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) {
625       super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
626     }
627 
628     @Override
629     public RowLock getRowLock(final byte[] row, boolean readLock) throws IOException {
630       if (testStep == TestStep.CHECKANDPUT_STARTED) {
631         latch.countDown();
632       }
633       return new WrappedRowLock(super.getRowLock(row, readLock));
634     }
635 
636     public class WrappedRowLock implements RowLock {
637 
638       private final RowLock rowLock;
639 
640       private WrappedRowLock(RowLock rowLock) {
641         this.rowLock = rowLock;
642       }
643 
644 
645       @Override
646       public void release() {
647         if (testStep == TestStep.INIT) {
648           this.rowLock.release();
649           return;
650         }
651 
652         if (testStep == TestStep.PUT_STARTED) {
653           try {
654             testStep = TestStep.PUT_COMPLETED;
655             this.rowLock.release();
656             // put has been written to the memstore and the row lock has been released, but the
657             // MVCC has not been advanced.  Prior to fixing HBASE-7051, the following order of
658             // operations would cause the non-atomicity to show up:
659             // 1) Put releases row lock (where we are now)
660             // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
661             //    because the MVCC has not advanced
662             // 3) Put advances MVCC
663             // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
664             // (see below), and then wait some more to give the checkAndPut time to read the old
665             // value.
666             latch.await();
667             Thread.sleep(1000);
668           } catch (InterruptedException e) {
669             Thread.currentThread().interrupt();
670           }
671         }
672         else if (testStep == TestStep.CHECKANDPUT_STARTED) {
673           this.rowLock.release();
674         }
675       }
676     }
677   }
678 }