1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.Random;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellUtil;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HColumnDescriptor;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.MultithreadedTestUtil;
48 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
49 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.client.Append;
52 import org.apache.hadoop.hbase.client.Delete;
53 import org.apache.hadoop.hbase.client.Durability;
54 import org.apache.hadoop.hbase.client.Get;
55 import org.apache.hadoop.hbase.client.Increment;
56 import org.apache.hadoop.hbase.client.Mutation;
57 import org.apache.hadoop.hbase.client.Put;
58 import org.apache.hadoop.hbase.client.Result;
59 import org.apache.hadoop.hbase.client.RowMutations;
60 import org.apache.hadoop.hbase.client.Scan;
61 import org.apache.hadoop.hbase.filter.BinaryComparator;
62 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
63 import org.apache.hadoop.hbase.io.HeapSize;
64 import org.apache.hadoop.hbase.io.hfile.BlockCache;
65 import org.apache.hadoop.hbase.testclassification.MediumTests;
66 import org.apache.hadoop.hbase.util.Bytes;
67 import org.apache.hadoop.hbase.wal.WAL;
68 import org.junit.After;
69 import org.junit.Before;
70 import org.junit.Rule;
71 import org.junit.Test;
72 import org.junit.experimental.categories.Category;
73 import org.junit.rules.TestName;
74
75
76
77
78
79 @Category(MediumTests.class)
80 public class TestAtomicOperation {
81 private static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
82 @Rule public TestName name = new TestName();
83
84 HRegion region = null;
85 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
86
87
88 static byte[] tableName;
89 static final byte[] qual1 = Bytes.toBytes("qual1");
90 static final byte[] qual2 = Bytes.toBytes("qual2");
91 static final byte[] qual3 = Bytes.toBytes("qual3");
92 static final byte[] value1 = Bytes.toBytes("value1");
93 static final byte[] value2 = Bytes.toBytes("value2");
94 static final byte [] row = Bytes.toBytes("rowA");
95 static final byte [] row2 = Bytes.toBytes("rowB");
96
97 @Before
98 public void setup() {
99 tableName = Bytes.toBytes(name.getMethodName());
100 }
101
102 @After
103 public void teardown() throws IOException {
104 if (region != null) {
105 BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache();
106 ((HRegion)region).close();
107 WAL wal = ((HRegion)region).getWAL();
108 if (wal != null) wal.close();
109 if (bc != null) bc.shutdown();
110 region = null;
111 }
112 }
113
114
115
116
117
118
119
120
121
122
123 @Test
124 public void testAppend() throws IOException {
125 initHRegion(tableName, name.getMethodName(), fam1);
126 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
127 " The Universe, and Everything";
128 String v2 = " is... 42.";
129 Append a = new Append(row);
130 a.setReturnResults(false);
131 a.add(fam1, qual1, Bytes.toBytes(v1));
132 a.add(fam1, qual2, Bytes.toBytes(v2));
133 assertNull(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE));
134 a = new Append(row);
135 a.add(fam1, qual1, Bytes.toBytes(v2));
136 a.add(fam1, qual2, Bytes.toBytes(v1));
137 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
138 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
139 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
140 }
141
142
143
144
145 @Test
146 public void testIncrementMultiThreads() throws IOException {
147 LOG.info("Starting test testIncrementMultiThreads");
148
149 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
150
151
152 int numThreads = 100;
153 int incrementsPerThread = 1000;
154 Incrementer[] all = new Incrementer[numThreads];
155 int expectedTotal = 0;
156
157 for (int i = 0; i < numThreads; i++) {
158 all[i] = new Incrementer(region, i, i, incrementsPerThread);
159 expectedTotal += (i * incrementsPerThread);
160 }
161
162
163 for (int i = 0; i < numThreads; i++) {
164 all[i].start();
165 }
166
167
168 for (int i = 0; i < numThreads; i++) {
169 try {
170 all[i].join();
171 } catch (InterruptedException e) {
172 LOG.info("Ignored", e);
173 }
174 }
175 assertICV(row, fam1, qual1, expectedTotal);
176 assertICV(row, fam1, qual2, expectedTotal*2);
177 assertICV(row, fam2, qual3, expectedTotal*3);
178 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
179 }
180
181
182 private void assertICV(byte [] row,
183 byte [] familiy,
184 byte[] qualifier,
185 long amount) throws IOException {
186
187 Get get = new Get(row);
188 get.addColumn(familiy, qualifier);
189 Result result = region.get(get);
190 assertEquals(1, result.size());
191
192 Cell kv = result.rawCells()[0];
193 long r = Bytes.toLong(CellUtil.cloneValue(kv));
194 assertEquals(amount, r);
195 }
196
197 private void initHRegion (byte [] tableName, String callingMethod,
198 byte[] ... families)
199 throws IOException {
200 initHRegion(tableName, callingMethod, null, families);
201 }
202
203 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
204 byte[] ... families)
205 throws IOException {
206 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
207 int i=0;
208 for(byte [] family : families) {
209 HColumnDescriptor hcd = new HColumnDescriptor(family);
210 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
211 htd.addFamily(hcd);
212 }
213 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
214 region = TEST_UTIL.createLocalHRegion(info, htd);
215 }
216
217
218
219
220 public static class Incrementer extends Thread {
221
222 private final Region region;
223 private final int numIncrements;
224 private final int amount;
225
226
227 public Incrementer(Region region,
228 int threadNumber, int amount, int numIncrements) {
229 super("incrementer." + threadNumber);
230 this.region = region;
231 this.numIncrements = numIncrements;
232 this.amount = amount;
233 setDaemon(true);
234 }
235
236 @Override
237 public void run() {
238 for (int i = 0; i < numIncrements; i++) {
239 try {
240 Increment inc = new Increment(row);
241 inc.addColumn(fam1, qual1, amount);
242 inc.addColumn(fam1, qual2, amount*2);
243 inc.addColumn(fam2, qual3, amount*3);
244 inc.setDurability(Durability.ASYNC_WAL);
245 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
246
247
248 Get g = new Get(row);
249 Result result = region.get(g);
250 if (result != null) {
251 assertTrue(result.getValue(fam1, qual1) != null);
252 assertTrue(result.getValue(fam1, qual2) != null);
253 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
254 Bytes.toLong(result.getValue(fam1, qual2)));
255 assertTrue(result.getValue(fam2, qual3) != null);
256 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3,
257 Bytes.toLong(result.getValue(fam2, qual3)));
258 }
259 } catch (IOException e) {
260 e.printStackTrace();
261 }
262 }
263 }
264 }
265
266 @Test
267 public void testAppendMultiThreads() throws IOException {
268 LOG.info("Starting test testAppendMultiThreads");
269
270 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
271
272 int numThreads = 100;
273 int opsPerThread = 100;
274 AtomicOperation[] all = new AtomicOperation[numThreads];
275 final byte[] val = new byte[]{1};
276
277 AtomicInteger failures = new AtomicInteger(0);
278
279 for (int i = 0; i < numThreads; i++) {
280 all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
281 @Override
282 public void run() {
283 for (int i=0; i<numOps; i++) {
284 try {
285 Append a = new Append(row);
286 a.add(fam1, qual1, val);
287 a.add(fam1, qual2, val);
288 a.add(fam2, qual3, val);
289 a.setDurability(Durability.ASYNC_WAL);
290 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
291
292 Get g = new Get(row);
293 Result result = region.get(g);
294 assertEquals(result.getValue(fam1, qual1).length,
295 result.getValue(fam1, qual2).length);
296 assertEquals(result.getValue(fam1, qual1).length,
297 result.getValue(fam2, qual3).length);
298 } catch (IOException e) {
299 e.printStackTrace();
300 failures.incrementAndGet();
301 fail();
302 }
303 }
304 }
305 };
306 }
307
308
309 for (int i = 0; i < numThreads; i++) {
310 all[i].start();
311 }
312
313
314 for (int i = 0; i < numThreads; i++) {
315 try {
316 all[i].join();
317 } catch (InterruptedException e) {
318 }
319 }
320 assertEquals(0, failures.get());
321 Get g = new Get(row);
322 Result result = region.get(g);
323 assertEquals(result.getValue(fam1, qual1).length, 10000);
324 assertEquals(result.getValue(fam1, qual2).length, 10000);
325 assertEquals(result.getValue(fam2, qual3).length, 10000);
326 }
327
328
329
330 @Test
331 public void testRowMutationMultiThreads() throws IOException {
332 LOG.info("Starting test testRowMutationMultiThreads");
333 initHRegion(tableName, name.getMethodName(), fam1);
334
335
336
337 int numThreads = 10;
338 int opsPerThread = 250;
339 AtomicOperation[] all = new AtomicOperation[numThreads];
340
341 AtomicLong timeStamps = new AtomicLong(0);
342 AtomicInteger failures = new AtomicInteger(0);
343
344 for (int i = 0; i < numThreads; i++) {
345 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
346 @Override
347 public void run() {
348 boolean op = true;
349 for (int i=0; i<numOps; i++) {
350 try {
351
352 if (i%10==0) {
353 synchronized(region) {
354 LOG.debug("flushing");
355 region.flush(true);
356 if (i%100==0) {
357 region.compact(false);
358 }
359 }
360 }
361 long ts = timeStamps.incrementAndGet();
362 RowMutations rm = new RowMutations(row);
363 if (op) {
364 Put p = new Put(row, ts);
365 p.add(fam1, qual1, value1);
366 p.setDurability(Durability.ASYNC_WAL);
367 rm.add(p);
368 Delete d = new Delete(row);
369 d.deleteColumns(fam1, qual2, ts);
370 d.setDurability(Durability.ASYNC_WAL);
371 rm.add(d);
372 } else {
373 Delete d = new Delete(row);
374 d.deleteColumns(fam1, qual1, ts);
375 d.setDurability(Durability.ASYNC_WAL);
376 rm.add(d);
377 Put p = new Put(row, ts);
378 p.add(fam1, qual2, value2);
379 p.setDurability(Durability.ASYNC_WAL);
380 rm.add(p);
381 }
382 region.mutateRow(rm);
383 op ^= true;
384
385 Get g = new Get(row);
386 Result r = region.get(g);
387 if (r.size() != 1) {
388 LOG.debug(r);
389 failures.incrementAndGet();
390 fail();
391 }
392 } catch (IOException e) {
393 e.printStackTrace();
394 failures.incrementAndGet();
395 fail();
396 }
397 }
398 }
399 };
400 }
401
402
403 for (int i = 0; i < numThreads; i++) {
404 all[i].start();
405 }
406
407
408 for (int i = 0; i < numThreads; i++) {
409 try {
410 all[i].join();
411 } catch (InterruptedException e) {
412 }
413 }
414 assertEquals(0, failures.get());
415 }
416
417
418
419
420
421 @Test
422 public void testMultiRowMutationMultiThreads() throws IOException {
423
424 LOG.info("Starting test testMultiRowMutationMultiThreads");
425 initHRegion(tableName, name.getMethodName(), fam1);
426
427
428
429 int numThreads = 10;
430 int opsPerThread = 250;
431 AtomicOperation[] all = new AtomicOperation[numThreads];
432
433 AtomicLong timeStamps = new AtomicLong(0);
434 AtomicInteger failures = new AtomicInteger(0);
435 final List<byte[]> rowsToLock = Arrays.asList(row, row2);
436
437 for (int i = 0; i < numThreads; i++) {
438 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
439 @Override
440 public void run() {
441 boolean op = true;
442 for (int i=0; i<numOps; i++) {
443 try {
444
445 if (i%10==0) {
446 synchronized(region) {
447 LOG.debug("flushing");
448 region.flush(true);
449 if (i%100==0) {
450 region.compact(false);
451 }
452 }
453 }
454 long ts = timeStamps.incrementAndGet();
455 List<Mutation> mrm = new ArrayList<Mutation>();
456 if (op) {
457 Put p = new Put(row2, ts);
458 p.add(fam1, qual1, value1);
459 p.setDurability(Durability.ASYNC_WAL);
460 mrm.add(p);
461 Delete d = new Delete(row);
462 d.deleteColumns(fam1, qual1, ts);
463 d.setDurability(Durability.ASYNC_WAL);
464 mrm.add(d);
465 } else {
466 Delete d = new Delete(row2);
467 d.deleteColumns(fam1, qual1, ts);
468 d.setDurability(Durability.ASYNC_WAL);
469 mrm.add(d);
470 Put p = new Put(row, ts);
471 p.setDurability(Durability.ASYNC_WAL);
472 p.add(fam1, qual1, value2);
473 mrm.add(p);
474 }
475 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
476 op ^= true;
477
478 Scan s = new Scan(row);
479 RegionScanner rs = region.getScanner(s);
480 List<Cell> r = new ArrayList<Cell>();
481 while (rs.next(r))
482 ;
483 rs.close();
484 if (r.size() != 1) {
485 LOG.debug(r);
486 failures.incrementAndGet();
487 fail();
488 }
489 } catch (IOException e) {
490 e.printStackTrace();
491 failures.incrementAndGet();
492 fail();
493 }
494 }
495 }
496 };
497 }
498
499
500 for (int i = 0; i < numThreads; i++) {
501 all[i].start();
502 }
503
504
505 for (int i = 0; i < numThreads; i++) {
506 try {
507 all[i].join();
508 } catch (InterruptedException e) {
509 }
510 }
511 assertEquals(0, failures.get());
512 }
513
514 public static class AtomicOperation extends Thread {
515 protected final HRegion region;
516 protected final int numOps;
517 protected final AtomicLong timeStamps;
518 protected final AtomicInteger failures;
519 protected final Random r = new Random();
520
521 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
522 AtomicInteger failures) {
523 this.region = region;
524 this.numOps = numOps;
525 this.timeStamps = timeStamps;
526 this.failures = failures;
527 }
528 }
529
530 private static CountDownLatch latch = new CountDownLatch(1);
531 private enum TestStep {
532 INIT,
533 PUT_STARTED,
534 PUT_COMPLETED,
535 CHECKANDPUT_STARTED,
536 CHECKANDPUT_COMPLETED
537
538 }
539 private static volatile TestStep testStep = TestStep.INIT;
540 private final String family = "f1";
541
542
543
544
545
546
547
548 @Test
549 public void testPutAndCheckAndPutInParallel() throws Exception {
550
551 final String tableName = "testPutAndCheckAndPut";
552 Configuration conf = TEST_UTIL.getConfiguration();
553 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
554 final MockHRegion region = (MockHRegion) TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
555 null, null, tableName, conf, false, Durability.SYNC_WAL, null, Bytes.toBytes(family));
556
557 Put[] puts = new Put[1];
558 Put put = new Put(Bytes.toBytes("r1"));
559 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
560 puts[0] = put;
561
562 region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
563 MultithreadedTestUtil.TestContext ctx =
564 new MultithreadedTestUtil.TestContext(conf);
565 ctx.addThread(new PutThread(ctx, region));
566 ctx.addThread(new CheckAndPutThread(ctx, region));
567 ctx.startThreads();
568 while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
569 Thread.sleep(100);
570 }
571 ctx.stop();
572 Scan s = new Scan();
573 RegionScanner scanner = region.getScanner(s);
574 List<Cell> results = new ArrayList<Cell>();
575 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
576 scanner.next(results, scannerContext);
577 for (Cell keyValue : results) {
578 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
579 }
580 }
581
582 private class PutThread extends TestThread {
583 private HRegion region;
584 PutThread(TestContext ctx, HRegion region) {
585 super(ctx);
586 this.region = region;
587 }
588
589 public void doWork() throws Exception {
590 Put[] puts = new Put[1];
591 Put put = new Put(Bytes.toBytes("r1"));
592 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
593 puts[0] = put;
594 testStep = TestStep.PUT_STARTED;
595 region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
596 }
597 }
598
599 private class CheckAndPutThread extends TestThread {
600 private HRegion region;
601 CheckAndPutThread(TestContext ctx, HRegion region) {
602 super(ctx);
603 this.region = region;
604 }
605
606 public void doWork() throws Exception {
607 Put[] puts = new Put[1];
608 Put put = new Put(Bytes.toBytes("r1"));
609 put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
610 puts[0] = put;
611 while (testStep != TestStep.PUT_COMPLETED) {
612 Thread.sleep(100);
613 }
614 testStep = TestStep.CHECKANDPUT_STARTED;
615 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
616 CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
617 testStep = TestStep.CHECKANDPUT_COMPLETED;
618 }
619 }
620
621 public static class MockHRegion extends HRegion {
622
623 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
624 final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) {
625 super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
626 }
627
628 @Override
629 public RowLock getRowLock(final byte[] row, boolean readLock) throws IOException {
630 if (testStep == TestStep.CHECKANDPUT_STARTED) {
631 latch.countDown();
632 }
633 return new WrappedRowLock(super.getRowLock(row, readLock));
634 }
635
636 public class WrappedRowLock implements RowLock {
637
638 private final RowLock rowLock;
639
640 private WrappedRowLock(RowLock rowLock) {
641 this.rowLock = rowLock;
642 }
643
644
645 @Override
646 public void release() {
647 if (testStep == TestStep.INIT) {
648 this.rowLock.release();
649 return;
650 }
651
652 if (testStep == TestStep.PUT_STARTED) {
653 try {
654 testStep = TestStep.PUT_COMPLETED;
655 this.rowLock.release();
656
657
658
659
660
661
662
663
664
665
666 latch.await();
667 Thread.sleep(1000);
668 } catch (InterruptedException e) {
669 Thread.currentThread().interrupt();
670 }
671 }
672 else if (testStep == TestStep.CHECKANDPUT_STARTED) {
673 this.rowLock.release();
674 }
675 }
676 }
677 }
678 }