1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.CallQueueTooBigException;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.RegionLocations;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.HRegionInfo;
32 import org.apache.hadoop.hbase.HRegionLocation;
33 import org.apache.hadoop.hbase.testclassification.MediumTests;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
36 import org.apache.hadoop.hbase.client.coprocessor.Batch;
37 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
38 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.Threads;
41 import org.junit.Assert;
42 import org.junit.BeforeClass;
43 import org.junit.Rule;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46 import org.junit.rules.Timeout;
47 import org.mockito.Mockito;
48
49 import java.io.IOException;
50 import java.io.InterruptedIOException;
51 import java.util.ArrayList;
52 import java.util.Arrays;
53 import java.util.HashMap;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Set;
57 import java.util.TreeSet;
58 import java.util.concurrent.BlockingQueue;
59 import java.util.concurrent.ExecutorService;
60 import java.util.concurrent.Future;
61 import java.util.concurrent.LinkedBlockingQueue;
62 import java.util.concurrent.RejectedExecutionException;
63 import java.util.concurrent.SynchronousQueue;
64 import java.util.concurrent.ThreadFactory;
65 import java.util.concurrent.ThreadPoolExecutor;
66 import java.util.concurrent.TimeUnit;
67 import java.util.concurrent.atomic.AtomicBoolean;
68 import java.util.concurrent.atomic.AtomicInteger;
69 import java.util.concurrent.atomic.AtomicLong;
70
71 @Category(MediumTests.class)
72 public class TestAsyncProcess {
73 private final static Log LOG = LogFactory.getLog(TestAsyncProcess.class);
74 private static final TableName DUMMY_TABLE =
75 TableName.valueOf("DUMMY_TABLE");
76 private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
77 private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
78 private static final byte[] DUMMY_BYTES_3 = "DUMMY_BYTES_3".getBytes();
79 private static final byte[] FAILS = "FAILS".getBytes();
80 private static final Configuration conf = new Configuration();
81
82 private static ServerName sn = ServerName.valueOf("s1:1,1");
83 private static ServerName sn2 = ServerName.valueOf("s2:2,2");
84 private static ServerName sn3 = ServerName.valueOf("s3:3,3");
85 private static HRegionInfo hri1 =
86 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_1, DUMMY_BYTES_2, false, 1);
87 private static HRegionInfo hri2 =
88 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_2, HConstants.EMPTY_END_ROW, false, 2);
89 private static HRegionInfo hri3 =
90 new HRegionInfo(DUMMY_TABLE, DUMMY_BYTES_3, HConstants.EMPTY_END_ROW, false, 3);
91 private static HRegionLocation loc1 = new HRegionLocation(hri1, sn);
92 private static HRegionLocation loc2 = new HRegionLocation(hri2, sn);
93 private static HRegionLocation loc3 = new HRegionLocation(hri3, sn2);
94
95
96 private static HRegionInfo hri1r1 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 1),
97 hri1r2 = RegionReplicaUtil.getRegionInfoForReplica(hri1, 2);
98 private static HRegionInfo hri2r1 = RegionReplicaUtil.getRegionInfoForReplica(hri2, 1);
99 private static RegionLocations hrls1 = new RegionLocations(new HRegionLocation(hri1, sn),
100 new HRegionLocation(hri1r1, sn2), new HRegionLocation(hri1r2, sn3));
101 private static RegionLocations hrls2 = new RegionLocations(new HRegionLocation(hri2, sn2),
102 new HRegionLocation(hri2r1, sn3));
103 private static RegionLocations hrls3 = new RegionLocations(new HRegionLocation(hri3, sn3), null);
104
105 private static final String success = "success";
106 private static Exception failure = new Exception("failure");
107
108 private static int NB_RETRIES = 3;
109
110 @BeforeClass
111 public static void beforeClass(){
112 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES);
113 }
114
115 static class CountingThreadFactory implements ThreadFactory {
116 final AtomicInteger nbThreads;
117 ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess");
118 @Override
119 public Thread newThread(Runnable r) {
120 nbThreads.incrementAndGet();
121 return realFactory.newThread(r);
122 }
123
124 CountingThreadFactory(AtomicInteger nbThreads){
125 this.nbThreads = nbThreads;
126 }
127 }
128
129 static class MyAsyncProcess extends AsyncProcess {
130 final AtomicInteger nbMultiResponse = new AtomicInteger();
131 final AtomicInteger nbActions = new AtomicInteger();
132 public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
133 public AtomicInteger callsCt = new AtomicInteger();
134
135 @Override
136 protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
137 List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
138 Batch.Callback<Res> callback, Object[] results, boolean needResults) {
139
140 AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
141 DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
142 allReqs.add(r);
143 callsCt.incrementAndGet();
144 return r;
145 }
146
147 public MyAsyncProcess(ClusterConnection hc, Configuration conf) {
148 this(hc, conf, new AtomicInteger());
149 }
150
151 public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
152 super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
153 new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
154 new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
155 }
156
157 public MyAsyncProcess(
158 ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
159 super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
160 new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
161 new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
162 }
163
164 public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
165 @SuppressWarnings("unused") boolean dummy) {
166 super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
167 new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) {
168 @Override
169 public void execute(Runnable command) {
170 throw new RejectedExecutionException("test under failure");
171 }
172 },
173 new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
174 }
175
176 @Override
177 public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
178 boolean atLeastOne, Callback<Res> callback, boolean needResults)
179 throws InterruptedIOException {
180
181 return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
182 }
183
184 @Override
185 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
186 callsCt.incrementAndGet();
187 final MultiResponse mr = createMultiResponse(
188 callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
189 @Override
190 public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
191 if (Arrays.equals(FAILS, a.getAction().getRow())) {
192 mr.add(regionName, a.getOriginalIndex(), failure);
193 } else {
194 mr.add(regionName, a.getOriginalIndex(), success);
195 }
196 }
197 });
198
199 return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
200 @Override
201 public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable,
202 int callTimeout)
203 throws IOException, RuntimeException {
204 try {
205
206
207 Thread.sleep(1000);
208 } catch (InterruptedException e) {
209
210 }
211 return mr;
212 }
213 };
214 }
215 }
216
217 static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
218
219 private final IOException e;
220
221 public CallerWithFailure(IOException e) {
222 super(100, 100, 9);
223 this.e = e;
224 }
225
226 @Override
227 public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
228 throws IOException, RuntimeException {
229 throw e;
230 }
231 }
232
233
234 static class AsyncProcessWithFailure extends MyAsyncProcess {
235
236 private final IOException ioe;
237
238 public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf, IOException ioe) {
239 super(hc, conf, true);
240 this.ioe = ioe;
241 serverTrackerTimeout = 1;
242 }
243
244 @Override
245 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
246 callsCt.incrementAndGet();
247 return new CallerWithFailure(ioe);
248 }
249 }
250
251 class MyAsyncProcessWithReplicas extends MyAsyncProcess {
252 private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator());
253 private long primarySleepMs = 0, replicaSleepMs = 0;
254 private Map<ServerName, Long> customPrimarySleepMs = new HashMap<ServerName, Long>();
255 private final AtomicLong replicaCalls = new AtomicLong(0);
256
257 public void addFailures(HRegionInfo... hris) {
258 for (HRegionInfo hri : hris) {
259 failures.add(hri.getRegionName());
260 }
261 }
262
263 public long getReplicaCallCount() {
264 return replicaCalls.get();
265 }
266
267 public void setPrimaryCallDelay(ServerName server, long primaryMs) {
268 customPrimarySleepMs.put(server, primaryMs);
269 }
270
271 public MyAsyncProcessWithReplicas(ClusterConnection hc, Configuration conf) {
272 super(hc, conf);
273 }
274
275 public void setCallDelays(long primaryMs, long replicaMs) {
276 this.primarySleepMs = primaryMs;
277 this.replicaSleepMs = replicaMs;
278 }
279
280 @Override
281 protected RpcRetryingCaller<MultiResponse> createCaller(
282 MultiServerCallable<Row> callable) {
283 final MultiResponse mr = createMultiResponse(
284 callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
285 @Override
286 public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
287 if (failures.contains(regionName)) {
288 mr.add(regionName, a.getOriginalIndex(), failure);
289 } else {
290 boolean isStale = !RegionReplicaUtil.isDefaultReplica(a.getReplicaId());
291 mr.add(regionName, a.getOriginalIndex(),
292 Result.create(new Cell[0], null, isStale));
293 }
294 }
295 });
296
297 final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
298 callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
299 final ServerName server = ((MultiServerCallable<?>)callable).getServerName();
300 String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
301 + callable.getMulti().actions.size() + " entries: ";
302 for (byte[] region : callable.getMulti().actions.keySet()) {
303 debugMsg += "[" + Bytes.toStringBinary(region) + "], ";
304 }
305 LOG.debug(debugMsg);
306 if (!isDefault) {
307 replicaCalls.incrementAndGet();
308 }
309
310 return new RpcRetryingCaller<MultiResponse>(100, 10, 9) {
311 @Override
312 public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout)
313 throws IOException, RuntimeException {
314 long sleep = -1;
315 if (isDefault) {
316 Long customSleep = customPrimarySleepMs.get(server);
317 sleep = (customSleep == null ? primarySleepMs : customSleep.longValue());
318 } else {
319 sleep = replicaSleepMs;
320 }
321 if (sleep != 0) {
322 try {
323 Thread.sleep(sleep);
324 } catch (InterruptedException e) {
325 }
326 }
327 return mr;
328 }
329 };
330 }
331 }
332
333 static MultiResponse createMultiResponse(final MultiAction<Row> multi,
334 AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
335 final MultiResponse mr = new MultiResponse();
336 nbMultiResponse.incrementAndGet();
337 for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
338 byte[] regionName = entry.getKey();
339 for (Action<Row> a : entry.getValue()) {
340 nbActions.incrementAndGet();
341 gen.addResponse(mr, regionName, a);
342 }
343 }
344 return mr;
345 }
346
347 private static interface ResponseGenerator {
348 void addResponse(final MultiResponse mr, byte[] regionName, Action<Row> a);
349 }
350
351
352
353
354 static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation {
355 final AtomicInteger nbThreads = new AtomicInteger(0);
356
357
358 protected MyConnectionImpl(Configuration conf) {
359 super(conf);
360 }
361
362 @Override
363 public RegionLocations locateRegion(TableName tableName,
364 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
365 return new RegionLocations(loc1);
366 }
367
368 @Override
369 public boolean hasCellBlockSupport() {
370 return false;
371 }
372 }
373
374
375
376
377 static class MyConnectionImpl2 extends MyConnectionImpl {
378 List<HRegionLocation> hrl;
379 final boolean usedRegions[];
380
381 protected MyConnectionImpl2(List<HRegionLocation> hrl) {
382 super(conf);
383 this.hrl = hrl;
384 this.usedRegions = new boolean[hrl.size()];
385 }
386
387 @Override
388 public RegionLocations locateRegion(TableName tableName,
389 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException {
390 int i = 0;
391 for (HRegionLocation hr : hrl){
392 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) {
393 usedRegions[i] = true;
394 return new RegionLocations(hr);
395 }
396 i++;
397 }
398 return null;
399 }
400
401 }
402
403 @Rule
404 public Timeout timeout = Timeout.millis(10000);
405
406 @Test
407 public void testSubmit() throws Exception {
408 ClusterConnection hc = createHConnection();
409 AsyncProcess ap = new MyAsyncProcess(hc, conf);
410
411 List<Put> puts = new ArrayList<Put>();
412 puts.add(createPut(1, true));
413
414 ap.submit(DUMMY_TABLE, puts, false, null, false);
415 Assert.assertTrue(puts.isEmpty());
416 }
417
418 @Test
419 public void testSubmitWithCB() throws Exception {
420 ClusterConnection hc = createHConnection();
421 final AtomicInteger updateCalled = new AtomicInteger(0);
422 Batch.Callback<Object> cb = new Batch.Callback<Object>() {
423 @Override
424 public void update(byte[] region, byte[] row, Object result) {
425 updateCalled.incrementAndGet();
426 }
427 };
428 AsyncProcess ap = new MyAsyncProcess(hc, conf);
429
430 List<Put> puts = new ArrayList<Put>();
431 puts.add(createPut(1, true));
432
433 final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
434 Assert.assertTrue(puts.isEmpty());
435 ars.waitUntilDone();
436 Assert.assertEquals(updateCalled.get(), 1);
437 }
438
439 @Test
440 public void testSubmitBusyRegion() throws Exception {
441 ClusterConnection hc = createHConnection();
442 AsyncProcess ap = new MyAsyncProcess(hc, conf);
443
444 List<Put> puts = new ArrayList<Put>();
445 puts.add(createPut(1, true));
446
447 ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
448 ap.submit(DUMMY_TABLE, puts, false, null, false);
449 Assert.assertEquals(puts.size(), 1);
450
451 ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn);
452 ap.submit(DUMMY_TABLE, puts, false, null, false);
453 Assert.assertEquals(0, puts.size());
454 }
455
456
457 @Test
458 public void testSubmitBusyRegionServer() throws Exception {
459 ClusterConnection hc = createHConnection();
460 AsyncProcess ap = new MyAsyncProcess(hc, conf);
461
462 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer));
463
464 List<Put> puts = new ArrayList<Put>();
465 puts.add(createPut(1, true));
466 puts.add(createPut(3, true));
467 puts.add(createPut(1, true));
468 puts.add(createPut(2, true));
469
470 ap.submit(DUMMY_TABLE, puts, false, null, false);
471 Assert.assertEquals(" puts=" + puts, 1, puts.size());
472
473 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1));
474 ap.submit(DUMMY_TABLE, puts, false, null, false);
475 Assert.assertTrue(puts.isEmpty());
476 }
477
478 @Test
479 public void testFail() throws Exception {
480 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
481
482 List<Put> puts = new ArrayList<Put>();
483 Put p = createPut(1, false);
484 puts.add(p);
485
486 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
487 Assert.assertEquals(0, puts.size());
488 ars.waitUntilDone();
489 verifyResult(ars, false);
490 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
491
492 Assert.assertEquals(1, ars.getErrors().exceptions.size());
493 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
494 failure.equals(ars.getErrors().exceptions.get(0)));
495 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
496 failure.equals(ars.getErrors().exceptions.get(0)));
497
498 Assert.assertEquals(1, ars.getFailedOperations().size());
499 Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
500 p.equals(ars.getFailedOperations().get(0)));
501 }
502
503
504 @Test
505 public void testSubmitTrue() throws IOException {
506 final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
507 ap.tasksInProgress.incrementAndGet();
508 final AtomicInteger ai = new AtomicInteger(1);
509 ap.taskCounterPerRegion.put(hri1.getRegionName(), ai);
510
511 final AtomicBoolean checkPoint = new AtomicBoolean(false);
512 final AtomicBoolean checkPoint2 = new AtomicBoolean(false);
513
514 Thread t = new Thread(){
515 @Override
516 public void run(){
517 Threads.sleep(1000);
518 Assert.assertFalse(checkPoint.get());
519 ai.decrementAndGet();
520 ap.tasksInProgress.decrementAndGet();
521 checkPoint2.set(true);
522 }
523 };
524
525 List<Put> puts = new ArrayList<Put>();
526 Put p = createPut(1, true);
527 puts.add(p);
528
529 ap.submit(DUMMY_TABLE, puts, false, null, false);
530 Assert.assertFalse(puts.isEmpty());
531
532 t.start();
533
534 ap.submit(DUMMY_TABLE, puts, true, null, false);
535 Assert.assertTrue(puts.isEmpty());
536
537 checkPoint.set(true);
538 while (!checkPoint2.get()){
539 Threads.sleep(1);
540 }
541 }
542
543 @Test
544 public void testFailAndSuccess() throws Exception {
545 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
546
547 List<Put> puts = new ArrayList<Put>();
548 puts.add(createPut(1, false));
549 puts.add(createPut(1, true));
550 puts.add(createPut(1, true));
551
552 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
553 Assert.assertTrue(puts.isEmpty());
554 ars.waitUntilDone();
555 verifyResult(ars, false, true, true);
556 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
557 ap.callsCt.set(0);
558 Assert.assertEquals(1, ars.getErrors().actions.size());
559
560 puts.add(createPut(1, true));
561
562 ap.waitUntilDone();
563 ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
564 Assert.assertEquals(0, puts.size());
565 ars.waitUntilDone();
566 Assert.assertEquals(2, ap.callsCt.get());
567 verifyResult(ars, true);
568 }
569
570 @Test
571 public void testFlush() throws Exception {
572 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
573
574 List<Put> puts = new ArrayList<Put>();
575 puts.add(createPut(1, false));
576 puts.add(createPut(1, true));
577 puts.add(createPut(1, true));
578
579 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
580 ars.waitUntilDone();
581 verifyResult(ars, false, true, true);
582 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
583
584 Assert.assertEquals(1, ars.getFailedOperations().size());
585 }
586
587 @Test
588 public void testMaxTask() throws Exception {
589 final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
590
591 for (int i = 0; i < 1000; i++) {
592 ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn);
593 }
594
595 final Thread myThread = Thread.currentThread();
596
597 Thread t = new Thread() {
598 @Override
599 public void run() {
600 Threads.sleep(2000);
601 myThread.interrupt();
602 }
603 };
604
605 List<Put> puts = new ArrayList<Put>();
606 puts.add(createPut(1, true));
607
608 t.start();
609
610 try {
611 ap.submit(DUMMY_TABLE, puts, false, null, false);
612 Assert.fail("We should have been interrupted.");
613 } catch (InterruptedIOException expected) {
614 }
615
616 final long sleepTime = 2000;
617
618 Thread t2 = new Thread() {
619 @Override
620 public void run() {
621 Threads.sleep(sleepTime);
622 while (ap.tasksInProgress.get() > 0) {
623 ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn);
624 }
625 }
626 };
627 t2.start();
628
629 long start = System.currentTimeMillis();
630 ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false);
631 long end = System.currentTimeMillis();
632
633
634 Assert.assertTrue(start + 100L + sleepTime > end);
635 }
636
637 private static ClusterConnection createHConnection() throws IOException {
638 ClusterConnection hc = createHConnectionCommon();
639 setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1));
640 setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2));
641 setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3));
642 setMockLocation(hc, FAILS, new RegionLocations(loc2));
643 return hc;
644 }
645
646 private static ClusterConnection createHConnectionWithReplicas() throws IOException {
647 ClusterConnection hc = createHConnectionCommon();
648 setMockLocation(hc, DUMMY_BYTES_1, hrls1);
649 setMockLocation(hc, DUMMY_BYTES_2, hrls2);
650 setMockLocation(hc, DUMMY_BYTES_3, hrls3);
651 return hc;
652 }
653
654 private static void setMockLocation(ClusterConnection hc, byte[] row,
655 RegionLocations result) throws IOException {
656 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row),
657 Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
658 }
659
660 private static ClusterConnection createHConnectionCommon() {
661 ClusterConnection hc = Mockito.mock(ClusterConnection.class);
662 NonceGenerator ng = Mockito.mock(NonceGenerator.class);
663 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
664 Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
665 Mockito.when(hc.getConfiguration()).thenReturn(conf);
666 return hc;
667 }
668
669 @Test
670 public void testHTablePutSuccess() throws Exception {
671 BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class);
672 ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
673
674 Put put = createPut(1, true);
675
676 Assert.assertEquals(0, ht.getWriteBufferSize());
677 ht.mutate(put);
678 Assert.assertEquals(0, ht.getWriteBufferSize());
679 }
680
681 private void doHTableFailedPut(boolean bufferOn) throws Exception {
682 ClusterConnection conn = createHConnection();
683 HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
684 MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
685 ht.mutator.ap = ap;
686 if (bufferOn) {
687 ht.setWriteBufferSize(1024L * 1024L);
688 } else {
689 ht.setWriteBufferSize(0L);
690 }
691
692 Put put = createPut(1, false);
693
694 Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
695 try {
696 ht.put(put);
697 if (bufferOn) {
698 ht.flushCommits();
699 }
700 Assert.fail();
701 } catch (RetriesExhaustedException expected) {
702 }
703 Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
704
705 AsyncRequestFuture ars = null;
706 for (AsyncRequestFuture someReqs : ap.allReqs) {
707 if (someReqs.getResults().length == 0) continue;
708 Assert.assertTrue(ars == null);
709 ars = someReqs;
710 }
711 Assert.assertTrue(ars != null);
712 verifyResult(ars, false);
713
714
715 ht.close();
716 }
717
718 @Test
719 public void testHTableFailedPutWithBuffer() throws Exception {
720 doHTableFailedPut(true);
721 }
722
723 @Test
724 public void testHTableFailedPutWithoutBuffer() throws Exception {
725 doHTableFailedPut(false);
726 }
727
728 @Test
729 public void testHTableFailedPutAndNewPut() throws Exception {
730 ClusterConnection conn = createHConnection();
731 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null,
732 new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0));
733 MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
734 mutator.ap = ap;
735
736 Put p = createPut(1, false);
737 mutator.mutate(p);
738
739 ap.waitUntilDone();
740
741
742
743
744
745
746 p = createPut(1, true);
747 Assert.assertEquals(0, mutator.getWriteBuffer().size());
748 try {
749 mutator.mutate(p);
750 Assert.fail();
751 } catch (RetriesExhaustedException expected) {
752 }
753 Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size());
754 }
755
756 @Test
757 public void testBatch() throws IOException, InterruptedException {
758 ClusterConnection conn = new MyConnectionImpl(conf);
759 HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
760 ht.multiAp = new MyAsyncProcess(conn, conf, false);
761
762 List<Put> puts = new ArrayList<Put>();
763 puts.add(createPut(1, true));
764 puts.add(createPut(1, true));
765 puts.add(createPut(1, true));
766 puts.add(createPut(1, true));
767 puts.add(createPut(1, false));
768 puts.add(createPut(1, true));
769 puts.add(createPut(1, false));
770
771 Object[] res = new Object[puts.size()];
772 try {
773 ht.processBatch(puts, res);
774 Assert.fail();
775 } catch (RetriesExhaustedException expected) {
776 }
777
778 Assert.assertEquals(res[0], success);
779 Assert.assertEquals(res[1], success);
780 Assert.assertEquals(res[2], success);
781 Assert.assertEquals(res[3], success);
782 Assert.assertEquals(res[4], failure);
783 Assert.assertEquals(res[5], success);
784 Assert.assertEquals(res[6], failure);
785 }
786
787 @Test
788 public void testErrorsServers() throws IOException {
789 Configuration configuration = new Configuration(conf);
790 ClusterConnection conn = new MyConnectionImpl(configuration);
791 BufferedMutatorImpl mutator =
792 new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
793 configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
794
795 MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
796 mutator.ap = ap;
797
798 Assert.assertNotNull(mutator.ap.createServerErrorTracker());
799 Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200);
800 mutator.ap.serverTrackerTimeout = 1;
801
802 Put p = createPut(1, false);
803 mutator.mutate(p);
804
805 try {
806 mutator.flush();
807 Assert.fail();
808 } catch (RetriesExhaustedWithDetailsException expected) {
809 }
810
811 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
812 }
813
814 @Test
815 public void testGlobalErrors() throws IOException {
816 ClusterConnection conn = new MyConnectionImpl(conf);
817 BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
818 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test"));
819 mutator.ap = ap;
820
821 Assert.assertNotNull(mutator.ap.createServerErrorTracker());
822
823 Put p = createPut(1, true);
824 mutator.mutate(p);
825
826 try {
827 mutator.flush();
828 Assert.fail();
829 } catch (RetriesExhaustedWithDetailsException expected) {
830 }
831
832 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
833 }
834
835
836 @Test
837 public void testCallQueueTooLarge() throws IOException {
838 ClusterConnection conn = new MyConnectionImpl(conf);
839 BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
840 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException());
841 mutator.ap = ap;
842
843 Assert.assertNotNull(mutator.ap.createServerErrorTracker());
844
845 Put p = createPut(1, true);
846 mutator.mutate(p);
847
848 try {
849 mutator.flush();
850 Assert.fail();
851 } catch (RetriesExhaustedWithDetailsException expected) {
852 }
853
854 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
855 }
856
857
858
859
860 @Test
861 public void testThreadCreation() throws Exception {
862 final int NB_REGS = 100;
863 List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
864 List<Get> gets = new ArrayList<Get>(NB_REGS);
865 for (int i = 0; i < NB_REGS; i++) {
866 HRegionInfo hri = new HRegionInfo(
867 DUMMY_TABLE, Bytes.toBytes(i * 10L), Bytes.toBytes(i * 10L + 9L), false, i);
868 HRegionLocation hrl = new HRegionLocation(hri, i % 2 == 0 ? sn : sn2);
869 hrls.add(hrl);
870
871 Get get = new Get(Bytes.toBytes(i * 10L));
872 gets.add(get);
873 }
874
875 MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
876 HTable ht = new HTable(con, new BufferedMutatorParams(DUMMY_TABLE));
877 MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
878 ht.multiAp = ap;
879
880 ht.batch(gets, new Object[gets.size()]);
881
882 Assert.assertEquals(ap.nbActions.get(), NB_REGS);
883 Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());
884 Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
885
886 int nbReg = 0;
887 for (int i =0; i<NB_REGS; i++){
888 if (con.usedRegions[i]) nbReg++;
889 }
890 Assert.assertEquals("nbReg=" + nbReg, nbReg, NB_REGS);
891 }
892
893 @Test
894 public void testReplicaReplicaSuccess() throws Exception {
895
896
897 MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
898 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
899 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
900 verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
901 Assert.assertEquals(2, ap.getReplicaCallCount());
902 }
903
904 @Test
905 public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
906
907 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
908 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
909 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
910 verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
911 Assert.assertEquals(0, ap.getReplicaCallCount());
912 }
913
914 @Test
915 public void testReplicaParallelCallsSucceed() throws Exception {
916
917 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
918 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
919 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
920 verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
921 long replicaCalls = ap.getReplicaCallCount();
922 Assert.assertTrue(replicaCalls >= 0);
923 Assert.assertTrue(replicaCalls <= 2);
924 }
925
926 @Test
927 public void testReplicaPartialReplicaCall() throws Exception {
928
929
930
931 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
932 ap.setPrimaryCallDelay(sn2, 2000);
933 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
934 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
935 verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
936 Assert.assertEquals(1, ap.getReplicaCallCount());
937 }
938
939 @Test
940 public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
941
942
943
944 MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
945 ap.addFailures(hri1, hri2);
946 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
947 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
948 verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
949 Assert.assertEquals(0, ap.getReplicaCallCount());
950 }
951
952 @Test
953 public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
954
955
956 MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
957 ap.addFailures(hri1, hri1r2, hri2);
958 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
959 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
960 verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
961 Assert.assertEquals(2, ap.getReplicaCallCount());
962 }
963
964 @Test
965 public void testReplicaAllCallsFailForOneRegion() throws Exception {
966
967
968 MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
969 ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
970 List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
971 AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
972 verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
973
974 Assert.assertEquals(3, ars.getErrors().getNumExceptions());
975 for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
976 Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
977 }
978 }
979
980 private MyAsyncProcessWithReplicas createReplicaAp(
981 int replicaAfterMs, int primaryMs, int replicaMs) throws Exception {
982 return createReplicaAp(replicaAfterMs, primaryMs, replicaMs, -1);
983 }
984
985 private MyAsyncProcessWithReplicas createReplicaAp(
986 int replicaAfterMs, int primaryMs, int replicaMs, int retries) throws Exception {
987
988
989 Configuration conf = new Configuration();
990 ClusterConnection conn = createHConnectionWithReplicas();
991 conf.setInt(AsyncProcess.PRIMARY_CALL_TIMEOUT_KEY, replicaAfterMs * 1000);
992 if (retries > 0) {
993 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
994 }
995 MyAsyncProcessWithReplicas ap = new MyAsyncProcessWithReplicas(conn, conf);
996 ap.setCallDelays(primaryMs, replicaMs);
997 return ap;
998 }
999
1000 private static List<Get> makeTimelineGets(byte[]... rows) {
1001 List<Get> result = new ArrayList<Get>();
1002 for (byte[] row : rows) {
1003 Get get = new Get(row);
1004 get.setConsistency(Consistency.TIMELINE);
1005 result.add(get);
1006 }
1007 return result;
1008 }
1009
1010 private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
1011 Object[] actual = ars.getResults();
1012 Assert.assertEquals(expected.length, actual.length);
1013 for (int i = 0; i < expected.length; ++i) {
1014 Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
1015 }
1016 }
1017
1018
1019 private enum RR {
1020 TRUE,
1021 FALSE,
1022 DONT_CARE,
1023 FAILED
1024 }
1025
1026 private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
1027 Object[] actuals = ars.getResults();
1028 Assert.assertEquals(expecteds.length, actuals.length);
1029 for (int i = 0; i < expecteds.length; ++i) {
1030 Object actual = actuals[i];
1031 RR expected = expecteds[i];
1032 Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
1033 if (expected != RR.FAILED && expected != RR.DONT_CARE) {
1034 Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
1035 }
1036 }
1037 }
1038
1039
1040
1041
1042
1043
1044 private Put createPut(int regCnt, boolean success) {
1045 Put p;
1046 if (!success) {
1047 p = new Put(FAILS);
1048 } else switch (regCnt){
1049 case 1 :
1050 p = new Put(DUMMY_BYTES_1);
1051 break;
1052 case 2:
1053 p = new Put(DUMMY_BYTES_2);
1054 break;
1055 case 3:
1056 p = new Put(DUMMY_BYTES_3);
1057 break;
1058 default:
1059 throw new IllegalArgumentException("unknown " + regCnt);
1060 }
1061
1062 p.add(DUMMY_BYTES_1, DUMMY_BYTES_1, DUMMY_BYTES_1);
1063
1064 return p;
1065 }
1066
1067 static class MyThreadPoolExecutor extends ThreadPoolExecutor {
1068 public MyThreadPoolExecutor(int coreThreads, int maxThreads, long keepAliveTime,
1069 TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) {
1070 super(coreThreads, maxThreads, keepAliveTime, timeunit, blockingqueue);
1071 }
1072
1073 @Override
1074 public Future submit(Runnable runnable) {
1075 throw new OutOfMemoryError("OutOfMemory error thrown by means");
1076 }
1077 }
1078
1079 static class AsyncProcessForThrowableCheck extends AsyncProcess {
1080 public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
1081 ExecutorService pool) {
1082 super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
1083 conf));
1084 }
1085 }
1086
1087 @Test
1088 public void testUncheckedException() throws Exception {
1089
1090 ClusterConnection hc = createHConnection();
1091 MyThreadPoolExecutor myPool =
1092 new MyThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
1093 new LinkedBlockingQueue<Runnable>(200));
1094 AsyncProcess ap = new AsyncProcessForThrowableCheck(hc, conf, myPool);
1095
1096 List<Put> puts = new ArrayList<Put>();
1097 puts.add(createPut(1, true));
1098
1099 ap.submit(DUMMY_TABLE, puts, false, null, false);
1100 Assert.assertTrue(puts.isEmpty());
1101 }
1102
1103 }