1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ConcurrentSkipListMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.hbase.RetryImmediatelyException;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.hbase.DoNotRetryIOException;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.HRegionInfo;
50 import org.apache.hadoop.hbase.HRegionLocation;
51 import org.apache.hadoop.hbase.RegionLocations;
52 import org.apache.hadoop.hbase.ServerName;
53 import org.apache.hadoop.hbase.TableName;
54 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
55 import org.apache.hadoop.hbase.client.coprocessor.Batch;
56 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
57 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60 import org.apache.htrace.Trace;
61
62 import com.google.common.annotations.VisibleForTesting;
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 @InterfaceAudience.Private
100 class AsyncProcess {
101 private static final Log LOG = LogFactory.getLog(AsyncProcess.class);
102 protected static final AtomicLong COUNTER = new AtomicLong();
103
104 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
105
106
107
108
109
110
111
112 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
113 "hbase.client.start.log.errors.counter";
114 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
115
116
117
118
119
120
121
122 public static interface AsyncRequestFuture {
123 public boolean hasError();
124 public RetriesExhaustedWithDetailsException getErrors();
125 public List<? extends Row> getFailedOperations();
126 public Object[] getResults() throws InterruptedIOException;
127
128 public void waitUntilDone() throws InterruptedIOException;
129 }
130
131
132
133
134 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
135
136 final Object[] result = new Object[0];
137
138 @Override
139 public boolean hasError() {
140 return false;
141 }
142
143 @Override
144 public RetriesExhaustedWithDetailsException getErrors() {
145 return null;
146 }
147
148 @Override
149 public List<? extends Row> getFailedOperations() {
150 return null;
151 }
152
153 @Override
154 public Object[] getResults() {
155 return result;
156 }
157
158 @Override
159 public void waitUntilDone() throws InterruptedIOException {
160 }
161 };
162
163
164
165
166
167 private static class ReplicaResultState {
168 public ReplicaResultState(int callCount) {
169 this.callCount = callCount;
170 }
171
172
173 int callCount;
174
175
176 BatchErrors replicaErrors = null;
177
178 @Override
179 public String toString() {
180 return "[call count " + callCount + "; errors " + replicaErrors + "]";
181 }
182 }
183
184
185
186 protected final long id;
187
188 protected final ClusterConnection connection;
189 protected final RpcRetryingCallerFactory rpcCallerFactory;
190 protected final RpcControllerFactory rpcFactory;
191 protected final BatchErrors globalErrors;
192 protected final ExecutorService pool;
193
194 protected final AtomicLong tasksInProgress = new AtomicLong(0);
195 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
196 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
197 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
198 new ConcurrentHashMap<ServerName, AtomicInteger>();
199
200
201 private final int startLogErrorsCnt;
202
203
204
205
206 protected final int maxTotalConcurrentTasks;
207
208
209
210
211
212
213
214 protected final int maxConcurrentTasksPerRegion;
215
216
217
218
219 protected final int maxConcurrentTasksPerServer;
220 protected final long pause;
221 protected int numTries;
222 protected int serverTrackerTimeout;
223 protected int timeout;
224 protected long primaryCallTimeoutMicroseconds;
225
226
227 protected static class BatchErrors {
228 private final List<Throwable> throwables = new ArrayList<Throwable>();
229 private final List<Row> actions = new ArrayList<Row>();
230 private final List<String> addresses = new ArrayList<String>();
231
232 public synchronized void add(Throwable ex, Row row, ServerName serverName) {
233 if (row == null){
234 throw new IllegalArgumentException("row cannot be null. location=" + serverName);
235 }
236
237 throwables.add(ex);
238 actions.add(row);
239 addresses.add(serverName != null ? serverName.toString() : "null");
240 }
241
242 public boolean hasErrors() {
243 return !throwables.isEmpty();
244 }
245
246 private synchronized RetriesExhaustedWithDetailsException makeException() {
247 return new RetriesExhaustedWithDetailsException(
248 new ArrayList<Throwable>(throwables),
249 new ArrayList<Row>(actions), new ArrayList<String>(addresses));
250 }
251
252 public synchronized void clear() {
253 throwables.clear();
254 actions.clear();
255 addresses.clear();
256 }
257
258 public synchronized void merge(BatchErrors other) {
259 throwables.addAll(other.throwables);
260 actions.addAll(other.actions);
261 addresses.addAll(other.addresses);
262 }
263 }
264
265 public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
266 RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
267 if (hc == null) {
268 throw new IllegalArgumentException("HConnection cannot be null.");
269 }
270
271 this.connection = hc;
272 this.pool = pool;
273 this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
274
275 this.id = COUNTER.incrementAndGet();
276
277 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
278 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
279 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
280 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
281 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
282 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
283 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
284
285 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
286 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
287 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
288 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
289 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
290 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
291
292 this.startLogErrorsCnt =
293 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
294
295 if (this.maxTotalConcurrentTasks <= 0) {
296 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
297 }
298 if (this.maxConcurrentTasksPerServer <= 0) {
299 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
300 maxConcurrentTasksPerServer);
301 }
302 if (this.maxConcurrentTasksPerRegion <= 0) {
303 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
304 maxConcurrentTasksPerRegion);
305 }
306
307
308
309
310
311
312
313
314 this.serverTrackerTimeout = 0;
315 for (int i = 0; i < this.numTries; ++i) {
316 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
317 }
318
319 this.rpcCallerFactory = rpcCaller;
320 this.rpcFactory = rpcFactory;
321 }
322
323
324
325
326
327 private ExecutorService getPool(ExecutorService pool) {
328 if (pool != null) {
329 return pool;
330 }
331 if (this.pool != null) {
332 return this.pool;
333 }
334 throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
335 }
336
337
338
339
340
341 public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
342 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
343 throws InterruptedIOException {
344 return submit(null, tableName, rows, atLeastOne, callback, needResults);
345 }
346
347
348
349
350
351
352
353
354
355
356
357
358
359 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
360 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
361 boolean needResults) throws InterruptedIOException {
362 if (rows.isEmpty()) {
363 return NO_REQS_RESULT;
364 }
365
366 Map<ServerName, MultiAction<Row>> actionsByServer =
367 new HashMap<ServerName, MultiAction<Row>>();
368 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
369
370 NonceGenerator ng = this.connection.getNonceGenerator();
371 long nonceGroup = ng.getNonceGroup();
372
373
374 List<Exception> locationErrors = null;
375 List<Integer> locationErrorRows = null;
376 do {
377
378 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
379
380
381
382 Map<HRegionInfo, Boolean> regionIncluded = new HashMap<HRegionInfo, Boolean>();
383 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
384
385 int posInList = -1;
386 Iterator<? extends Row> it = rows.iterator();
387 while (it.hasNext()) {
388 Row r = it.next();
389 HRegionLocation loc;
390 try {
391 if (r == null) {
392 throw new IllegalArgumentException("#" + id + ", row cannot be null");
393 }
394
395 RegionLocations locs = connection.locateRegion(
396 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
397 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
398 throw new IOException("#" + id + ", no location found, aborting submit for"
399 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
400 }
401 loc = locs.getDefaultRegionLocation();
402 } catch (IOException ex) {
403 locationErrors = new ArrayList<Exception>();
404 locationErrorRows = new ArrayList<Integer>();
405 LOG.error("Failed to get region location ", ex);
406
407
408 retainedActions.add(new Action<Row>(r, ++posInList));
409 locationErrors.add(ex);
410 locationErrorRows.add(posInList);
411 it.remove();
412 break;
413 }
414
415 if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
416 Action<Row> action = new Action<Row>(r, ++posInList);
417 setNonce(ng, r, action);
418 retainedActions.add(action);
419
420 byte[] regionName = loc.getRegionInfo().getRegionName();
421 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
422 it.remove();
423 }
424 }
425 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
426
427 if (retainedActions.isEmpty()) return NO_REQS_RESULT;
428
429 return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
430 locationErrors, locationErrorRows, actionsByServer, pool);
431 }
432
433 <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
434 List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
435 Object[] results, boolean needResults, List<Exception> locationErrors,
436 List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
437 ExecutorService pool) {
438 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
439 tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
440
441 if (locationErrors != null) {
442 for (int i = 0; i < locationErrors.size(); ++i) {
443 int originalIndex = locationErrorRows.get(i);
444 Row row = retainedActions.get(originalIndex).getAction();
445 ars.manageError(originalIndex, row,
446 Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
447 }
448 }
449 ars.sendMultiAction(actionsByServer, 1, null, false);
450 return ars;
451 }
452
453
454
455
456
457
458
459
460
461 private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
462 Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
463 MultiAction<Row> multiAction = actionsByServer.get(server);
464 if (multiAction == null) {
465 multiAction = new MultiAction<Row>();
466 actionsByServer.put(server, multiAction);
467 }
468 if (action.hasNonce() && !multiAction.hasNonceGroup()) {
469 multiAction.setNonceGroup(nonceGroup);
470 }
471
472 multiAction.add(regionName, action);
473 }
474
475
476
477
478
479
480
481
482
483 protected boolean canTakeOperation(HRegionLocation loc,
484 Map<HRegionInfo, Boolean> regionsIncluded,
485 Map<ServerName, Boolean> serversIncluded) {
486 HRegionInfo regionInfo = loc.getRegionInfo();
487 Boolean regionPrevious = regionsIncluded.get(regionInfo);
488
489 if (regionPrevious != null) {
490
491 return regionPrevious;
492 }
493
494 Boolean serverPrevious = serversIncluded.get(loc.getServerName());
495 if (Boolean.FALSE.equals(serverPrevious)) {
496
497 regionsIncluded.put(regionInfo, Boolean.FALSE);
498 return false;
499 }
500
501 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
502 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
503
504 regionsIncluded.put(regionInfo, Boolean.FALSE);
505 return false;
506 }
507
508 if (serverPrevious == null) {
509
510 int newServers = 0;
511 for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
512 if (kv.getValue()) {
513 newServers++;
514 }
515 }
516
517
518 boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
519
520 if (ok) {
521
522 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
523 ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
524 }
525
526 if (!ok) {
527 regionsIncluded.put(regionInfo, Boolean.FALSE);
528 serversIncluded.put(loc.getServerName(), Boolean.FALSE);
529 return false;
530 }
531
532 serversIncluded.put(loc.getServerName(), Boolean.TRUE);
533 } else {
534 assert serverPrevious.equals(Boolean.TRUE);
535 }
536
537 regionsIncluded.put(regionInfo, Boolean.TRUE);
538
539 return true;
540 }
541
542
543
544
545
546 public <CResult> AsyncRequestFuture submitAll(TableName tableName,
547 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
548 return submitAll(null, tableName, rows, callback, results);
549 }
550
551
552
553
554
555
556
557
558
559
560
561 public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
562 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
563 List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
564
565
566 int posInList = -1;
567 NonceGenerator ng = this.connection.getNonceGenerator();
568 for (Row r : rows) {
569 posInList++;
570 if (r instanceof Put) {
571 Put put = (Put) r;
572 if (put.isEmpty()) {
573 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
574 }
575 }
576 Action<Row> action = new Action<Row>(r, posInList);
577 setNonce(ng, r, action);
578 actions.add(action);
579 }
580 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
581 tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
582 ars.groupAndSendMultiAction(actions, 1);
583 return ars;
584 }
585
586 private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
587 if (!(r instanceof Append) && !(r instanceof Increment)) return;
588 action.setNonce(ng.newNonce());
589 }
590
591
592
593
594
595
596
597
598
599
600 protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
601
602
603
604
605
606
607
608
609 private final class ReplicaCallIssuingRunnable implements Runnable {
610 private final long startTime;
611 private final List<Action<Row>> initialActions;
612
613 public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
614 this.initialActions = initialActions;
615 this.startTime = startTime;
616 }
617
618 @Override
619 public void run() {
620 boolean done = false;
621 if (primaryCallTimeoutMicroseconds > 0) {
622 try {
623 done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
624 } catch (InterruptedException ex) {
625 LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
626 return;
627 }
628 }
629 if (done) return;
630 Map<ServerName, MultiAction<Row>> actionsByServer =
631 new HashMap<ServerName, MultiAction<Row>>();
632 List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
633 if (replicaGetIndices == null) {
634 for (int i = 0; i < results.length; ++i) {
635 addReplicaActions(i, actionsByServer, unknownLocActions);
636 }
637 } else {
638 for (int replicaGetIndice : replicaGetIndices) {
639 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
640 }
641 }
642 if (!actionsByServer.isEmpty()) {
643 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
644 }
645 if (!unknownLocActions.isEmpty()) {
646 actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
647 for (Action<Row> action : unknownLocActions) {
648 addReplicaActionsAgain(action, actionsByServer);
649 }
650
651 if (!actionsByServer.isEmpty()) {
652 sendMultiAction(actionsByServer, 1, null, true);
653 }
654 }
655 }
656
657
658
659
660
661
662 private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
663 List<Action<Row>> unknownReplicaActions) {
664 if (results[index] != null) return;
665 Action<Row> action = initialActions.get(index);
666 RegionLocations loc = findAllLocationsOrFail(action, true);
667 if (loc == null) return;
668 HRegionLocation[] locs = loc.getRegionLocations();
669 if (locs.length == 1) {
670 LOG.warn("No replicas found for " + action.getAction());
671 return;
672 }
673 synchronized (replicaResultLock) {
674
675
676
677 if (results[index] != null) return;
678
679
680 results[index] = new ReplicaResultState(locs.length);
681 }
682 for (int i = 1; i < locs.length; ++i) {
683 Action<Row> replicaAction = new Action<Row>(action, i);
684 if (locs[i] != null) {
685 addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
686 replicaAction, actionsByServer, nonceGroup);
687 } else {
688 unknownReplicaActions.add(replicaAction);
689 }
690 }
691 }
692
693 private void addReplicaActionsAgain(
694 Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
695 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
696 throw new AssertionError("Cannot have default replica here");
697 }
698 HRegionLocation loc = getReplicaLocationOrFail(action);
699 if (loc == null) return;
700 addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
701 action, actionsByServer, nonceGroup);
702 }
703 }
704
705
706
707
708
709 private final class SingleServerRequestRunnable implements Runnable {
710 private final MultiAction<Row> multiAction;
711 private final int numAttempt;
712 private final ServerName server;
713 private final Set<MultiServerCallable<Row>> callsInProgress;
714
715 private SingleServerRequestRunnable(
716 MultiAction<Row> multiAction, int numAttempt, ServerName server,
717 Set<MultiServerCallable<Row>> callsInProgress) {
718 this.multiAction = multiAction;
719 this.numAttempt = numAttempt;
720 this.server = server;
721 this.callsInProgress = callsInProgress;
722 }
723
724 @Override
725 public void run() {
726 MultiResponse res;
727 MultiServerCallable<Row> callable = null;
728 try {
729 callable = createCallable(server, tableName, multiAction);
730 try {
731 RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
732 if (callsInProgress != null) callsInProgress.add(callable);
733 res = caller.callWithoutRetries(callable, timeout);
734
735 if (res == null) {
736
737 return;
738 }
739
740 } catch (IOException e) {
741
742
743 receiveGlobalFailure(multiAction, server, numAttempt, e);
744 return;
745 } catch (Throwable t) {
746
747 LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
748 " Retrying. Server is " + server + ", tableName=" + tableName, t);
749 receiveGlobalFailure(multiAction, server, numAttempt, t);
750 return;
751 }
752
753
754 receiveMultiAction(multiAction, server, res, numAttempt);
755 } catch (Throwable t) {
756
757 LOG.error("Internal AsyncProcess #" + id + " error for "
758 + tableName + " processing for " + server, t);
759 throw new RuntimeException(t);
760 } finally {
761 decTaskCounters(multiAction.getRegions(), server);
762 if (callsInProgress != null && callable != null) {
763 callsInProgress.remove(callable);
764 }
765 }
766 }
767 }
768
769 private final Batch.Callback<CResult> callback;
770 private final BatchErrors errors;
771 private final ConnectionManager.ServerErrorTracker errorsByServer;
772 private final ExecutorService pool;
773 private final Set<MultiServerCallable<Row>> callsInProgress;
774
775
776 private final TableName tableName;
777 private final AtomicLong actionsInProgress = new AtomicLong(-1);
778
779
780
781
782
783 private final Object replicaResultLock = new Object();
784
785
786
787
788
789
790
791
792
793 private final Object[] results;
794
795
796
797 private final int[] replicaGetIndices;
798 private final boolean hasAnyReplicaGets;
799 private final long nonceGroup;
800
801 public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
802 ExecutorService pool, boolean needResults, Object[] results,
803 Batch.Callback<CResult> callback) {
804 this.pool = pool;
805 this.callback = callback;
806 this.nonceGroup = nonceGroup;
807 this.tableName = tableName;
808 this.actionsInProgress.set(actions.size());
809 if (results != null) {
810 assert needResults;
811 if (results.length != actions.size()) {
812 throw new AssertionError("results.length");
813 }
814 this.results = results;
815 for (int i = 0; i != this.results.length; ++i) {
816 results[i] = null;
817 }
818 } else {
819 this.results = needResults ? new Object[actions.size()] : null;
820 }
821 List<Integer> replicaGetIndices = null;
822 boolean hasAnyReplicaGets = false;
823 if (needResults) {
824
825
826
827
828
829 boolean hasAnyNonReplicaReqs = false;
830 int posInList = 0;
831 for (Action<Row> action : actions) {
832 boolean isReplicaGet = isReplicaGet(action.getAction());
833 if (isReplicaGet) {
834 hasAnyReplicaGets = true;
835 if (hasAnyNonReplicaReqs) {
836 if (replicaGetIndices == null) {
837 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
838 }
839 replicaGetIndices.add(posInList);
840 }
841 } else if (!hasAnyNonReplicaReqs) {
842
843 hasAnyNonReplicaReqs = true;
844 if (posInList > 0) {
845
846
847 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
848 for (int i = 0; i < posInList; ++i) {
849 replicaGetIndices.add(i);
850 }
851 }
852 }
853 ++posInList;
854 }
855 }
856 this.hasAnyReplicaGets = hasAnyReplicaGets;
857 if (replicaGetIndices != null) {
858 this.replicaGetIndices = new int[replicaGetIndices.size()];
859 int i = 0;
860 for (Integer el : replicaGetIndices) {
861 this.replicaGetIndices[i++] = el;
862 }
863 } else {
864 this.replicaGetIndices = null;
865 }
866 this.callsInProgress = !hasAnyReplicaGets ? null :
867 Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
868
869 this.errorsByServer = createServerErrorTracker();
870 this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
871 }
872
873 public Set<MultiServerCallable<Row>> getCallsInProgress() {
874 return callsInProgress;
875 }
876
877
878
879
880
881
882
883 private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
884 Map<ServerName, MultiAction<Row>> actionsByServer =
885 new HashMap<ServerName, MultiAction<Row>>();
886
887 boolean isReplica = false;
888 List<Action<Row>> unknownReplicaActions = null;
889 for (Action<Row> action : currentActions) {
890 RegionLocations locs = findAllLocationsOrFail(action, true);
891 if (locs == null) continue;
892 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
893 if (isReplica && !isReplicaAction) {
894
895 throw new AssertionError("Replica and non-replica actions in the same retry");
896 }
897 isReplica = isReplicaAction;
898 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
899 if (loc == null || loc.getServerName() == null) {
900 if (isReplica) {
901 if (unknownReplicaActions == null) {
902 unknownReplicaActions = new ArrayList<Action<Row>>();
903 }
904 unknownReplicaActions.add(action);
905 } else {
906
907 manageLocationError(action, null);
908 }
909 } else {
910 byte[] regionName = loc.getRegionInfo().getRegionName();
911 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
912 }
913 }
914 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
915 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
916
917 if (!actionsByServer.isEmpty()) {
918
919 sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
920 ? currentActions : null, numAttempt > 1 && !hasUnknown);
921 }
922
923 if (hasUnknown) {
924 actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
925 for (Action<Row> action : unknownReplicaActions) {
926 HRegionLocation loc = getReplicaLocationOrFail(action);
927 if (loc == null) continue;
928 byte[] regionName = loc.getRegionInfo().getRegionName();
929 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
930 }
931 if (!actionsByServer.isEmpty()) {
932 sendMultiAction(
933 actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
934 }
935 }
936 }
937
938 private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
939
940
941 int replicaId = action.getReplicaId();
942 RegionLocations locs = findAllLocationsOrFail(action, true);
943 if (locs == null) return null;
944 HRegionLocation loc = locs.getRegionLocation(replicaId);
945 if (loc == null || loc.getServerName() == null) {
946 locs = findAllLocationsOrFail(action, false);
947 if (locs == null) return null;
948 loc = locs.getRegionLocation(replicaId);
949 }
950 if (loc == null || loc.getServerName() == null) {
951 manageLocationError(action, null);
952 return null;
953 }
954 return loc;
955 }
956
957 private void manageLocationError(Action<Row> action, Exception ex) {
958 String msg = "Cannot get replica " + action.getReplicaId()
959 + " location for " + action.getAction();
960 LOG.error(msg);
961 if (ex == null) {
962 ex = new IOException(msg);
963 }
964 manageError(action.getOriginalIndex(), action.getAction(),
965 Retry.NO_LOCATION_PROBLEM, ex, null);
966 }
967
968 private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
969 if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
970 ", row cannot be null");
971 RegionLocations loc = null;
972 try {
973 loc = connection.locateRegion(
974 tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
975 } catch (IOException ex) {
976 manageLocationError(action, ex);
977 }
978 return loc;
979 }
980
981
982
983
984
985
986
987
988
989 private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
990 int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
991
992
993 int actionsRemaining = actionsByServer.size();
994
995 for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
996 ServerName server = e.getKey();
997 MultiAction<Row> multiAction = e.getValue();
998 incTaskCounters(multiAction.getRegions(), server);
999 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
1000 numAttempt);
1001
1002
1003 if (runnables.size() > actionsRemaining) {
1004 actionsRemaining = runnables.size();
1005 }
1006
1007
1008 for (Runnable runnable : runnables) {
1009 if ((--actionsRemaining == 0) && reuseThread) {
1010 runnable.run();
1011 } else {
1012 try {
1013 pool.submit(runnable);
1014 } catch (Throwable t) {
1015 if (t instanceof RejectedExecutionException) {
1016
1017
1018 LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1019 " Server is " + server.getServerName(), t);
1020 } else {
1021
1022 LOG.warn("Caught unexpected exception/error: ", t);
1023 }
1024 decTaskCounters(multiAction.getRegions(), server);
1025
1026
1027 receiveGlobalFailure(multiAction, server, numAttempt, t);
1028 }
1029 }
1030 }
1031 }
1032
1033 if (actionsForReplicaThread != null) {
1034 startWaitingForReplicaCalls(actionsForReplicaThread);
1035 }
1036 }
1037
1038 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1039 MultiAction<Row> multiAction,
1040 int numAttempt) {
1041
1042 if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1043 if (connection.getConnectionMetrics() != null) {
1044 connection.getConnectionMetrics().incrNormalRunners();
1045 }
1046 return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1047 new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1048 }
1049
1050
1051 Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1052 .size());
1053
1054
1055 for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1056 Long backoff = getBackoff(server, e.getKey());
1057 DelayingRunner runner = actions.get(backoff);
1058 if (runner == null) {
1059 actions.put(backoff, new DelayingRunner(backoff, e));
1060 } else {
1061 runner.add(e);
1062 }
1063 }
1064
1065 List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1066 for (DelayingRunner runner : actions.values()) {
1067 String traceText = "AsyncProcess.sendMultiAction";
1068 Runnable runnable =
1069 new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1070 callsInProgress);
1071
1072 if (runner.getSleepTime() > 0) {
1073 runner.setRunner(runnable);
1074 traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1075 runnable = runner;
1076 if (connection.getConnectionMetrics() != null) {
1077 connection.getConnectionMetrics().incrDelayRunners();
1078 connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
1079 }
1080 } else {
1081 if (connection.getConnectionMetrics() != null) {
1082 connection.getConnectionMetrics().incrNormalRunners();
1083 }
1084 }
1085 runnable = Trace.wrap(traceText, runnable);
1086 toReturn.add(runnable);
1087
1088 }
1089 return toReturn;
1090 }
1091
1092
1093
1094
1095
1096
1097
1098 private Long getBackoff(ServerName server, byte[] regionName) {
1099 ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1100 ServerStatistics stats = tracker.getStats(server);
1101 return AsyncProcess.this.connection.getBackoffPolicy()
1102 .getBackoffTime(server, regionName, stats);
1103 }
1104
1105
1106
1107
1108 private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1109 long startTime = EnvironmentEdgeManager.currentTime();
1110 ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1111 actionsForReplicaThread, startTime);
1112 if (primaryCallTimeoutMicroseconds == 0) {
1113
1114 replicaRunnable.run();
1115 } else {
1116
1117
1118 try {
1119 pool.submit(replicaRunnable);
1120 } catch (RejectedExecutionException ree) {
1121 LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1122 }
1123 }
1124 }
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136 public Retry manageError(int originalIndex, Row row, Retry canRetry,
1137 Throwable throwable, ServerName server) {
1138 if (canRetry == Retry.YES
1139 && throwable != null && (throwable instanceof DoNotRetryIOException ||
1140 throwable instanceof NeedUnmanagedConnectionException)) {
1141 canRetry = Retry.NO_NOT_RETRIABLE;
1142 }
1143
1144 if (canRetry != Retry.YES) {
1145
1146 setError(originalIndex, row, throwable, server);
1147 } else if (isActionComplete(originalIndex, row)) {
1148 canRetry = Retry.NO_OTHER_SUCCEEDED;
1149 }
1150 return canRetry;
1151 }
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161 private void receiveGlobalFailure(
1162 MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1163 errorsByServer.reportServerError(server);
1164 Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1165 ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1166
1167 if (tableName == null) {
1168
1169 connection.clearCaches(server);
1170 }
1171 int failed = 0, stopped = 0;
1172 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1173 for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1174 byte[] regionName = e.getKey();
1175 byte[] row = e.getValue().iterator().next().getAction().getRow();
1176
1177
1178 if (tableName != null) {
1179 connection.updateCachedLocations(tableName, regionName, row,
1180 ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
1181 }
1182 for (Action<Row> action : e.getValue()) {
1183 Retry retry = manageError(
1184 action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1185 if (retry == Retry.YES) {
1186 toReplay.add(action);
1187 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1188 ++stopped;
1189 } else {
1190 ++failed;
1191 }
1192 }
1193 }
1194
1195 if (toReplay.isEmpty()) {
1196 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1197 } else {
1198 resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1199 }
1200 }
1201
1202
1203
1204
1205
1206 private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1207 int numAttempt, int failureCount, Throwable throwable) {
1208
1209
1210
1211
1212
1213
1214
1215
1216 boolean retryImmediately = throwable instanceof RetryImmediatelyException;
1217 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
1218 long backOffTime = retryImmediately ? 0 :
1219 errorsByServer.calculateBackoffTime(oldServer, pause);
1220 if (numAttempt > startLogErrorsCnt) {
1221
1222
1223 LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1224 oldServer, throwable, backOffTime, true, null, -1, -1));
1225 }
1226
1227 try {
1228 if (backOffTime > 0) {
1229 Thread.sleep(backOffTime);
1230 }
1231 } catch (InterruptedException e) {
1232 LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1233 Thread.currentThread().interrupt();
1234 return;
1235 }
1236
1237 groupAndSendMultiAction(toReplay, nextAttemptNumber);
1238 }
1239
1240 private void logNoResubmit(ServerName oldServer, int numAttempt,
1241 int failureCount, Throwable throwable, int failed, int stopped) {
1242 if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1243 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1244 String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1245 throwable, -1, false, timeStr, failed, stopped);
1246 if (failed != 0) {
1247
1248 LOG.warn(logMessage);
1249 } else {
1250 LOG.info(logMessage);
1251 }
1252 }
1253 }
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263 private void receiveMultiAction(MultiAction<Row> multiAction,
1264 ServerName server, MultiResponse responses, int numAttempt) {
1265 assert responses != null;
1266
1267
1268
1269
1270
1271
1272
1273 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1274 Throwable throwable = null;
1275 int failureCount = 0;
1276 boolean canRetry = true;
1277
1278
1279 int failed = 0, stopped = 0;
1280 for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1281 byte[] regionName = regionEntry.getKey();
1282 Map<Integer, Object> regionResults = responses.getResults().get(regionName);
1283 if (regionResults == null) {
1284 if (!responses.getExceptions().containsKey(regionName)) {
1285 LOG.error("Server sent us neither results nor exceptions for "
1286 + Bytes.toStringBinary(regionName));
1287 responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
1288 }
1289 continue;
1290 }
1291 boolean regionFailureRegistered = false;
1292 for (Action<Row> sentAction : regionEntry.getValue()) {
1293 Object result = regionResults.get(sentAction.getOriginalIndex());
1294
1295 if (result == null || result instanceof Throwable) {
1296 Row row = sentAction.getAction();
1297 throwable = ClientExceptionsUtil.findException(result);
1298
1299 if (!regionFailureRegistered) {
1300 regionFailureRegistered = true;
1301 connection.updateCachedLocations(
1302 tableName, regionName, row.getRow(), result, server);
1303 }
1304 if (failureCount == 0) {
1305 errorsByServer.reportServerError(server);
1306
1307 canRetry = errorsByServer.canRetryMore(numAttempt);
1308 }
1309 ++failureCount;
1310 Retry retry = manageError(sentAction.getOriginalIndex(), row,
1311 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
1312 if (retry == Retry.YES) {
1313 toReplay.add(sentAction);
1314 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1315 ++stopped;
1316 } else {
1317 ++failed;
1318 }
1319 } else {
1320
1321 if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
1322 AsyncProcess.this.connection.getConnectionMetrics().
1323 updateServerStats(server, regionName, result);
1324 }
1325
1326
1327
1328 if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1329 result = ResultStatsUtil.updateStats(result,
1330 AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1331 }
1332
1333 if (callback != null) {
1334 try {
1335
1336
1337 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1338 } catch (Throwable t) {
1339 LOG.error("User callback threw an exception for "
1340 + Bytes.toStringBinary(regionName) + ", ignoring", t);
1341 }
1342 }
1343 setResult(sentAction, result);
1344 }
1345 }
1346 }
1347
1348
1349
1350 for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
1351 throwable = throwableEntry.getValue();
1352 byte[] region = throwableEntry.getKey();
1353 List<Action<Row>> actions = multiAction.actions.get(region);
1354 if (actions == null || actions.isEmpty()) {
1355 throw new IllegalStateException("Wrong response for the region: " +
1356 HRegionInfo.encodeRegionName(region));
1357 }
1358
1359 if (failureCount == 0) {
1360 errorsByServer.reportServerError(server);
1361 canRetry = errorsByServer.canRetryMore(numAttempt);
1362 }
1363 if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
1364
1365
1366
1367 connection.clearCaches(server);
1368 } else {
1369 connection.updateCachedLocations(
1370 tableName, region, actions.get(0).getAction().getRow(), throwable, server);
1371 }
1372 failureCount += actions.size();
1373
1374 for (Action<Row> action : actions) {
1375 Row row = action.getAction();
1376 Retry retry = manageError(action.getOriginalIndex(), row,
1377 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
1378 if (retry == Retry.YES) {
1379 toReplay.add(action);
1380 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1381 ++stopped;
1382 } else {
1383 ++failed;
1384 }
1385 }
1386 }
1387
1388 if (toReplay.isEmpty()) {
1389 logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1390 } else {
1391 resubmit(server, toReplay, numAttempt, failureCount, throwable);
1392 }
1393 }
1394
1395 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1396 Throwable error, long backOffTime, boolean willRetry, String startTime,
1397 int failed, int stopped) {
1398 StringBuilder sb = new StringBuilder();
1399 sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1400 .append("attempt=").append(numAttempt)
1401 .append("/").append(numTries).append(" ");
1402
1403 if (failureCount > 0 || error != null){
1404 sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1405 append(error == null ? "null" : error);
1406 } else {
1407 sb.append("succeeded");
1408 }
1409
1410 sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1411
1412 if (willRetry) {
1413 sb.append(", retrying after=").append(backOffTime).append("ms").
1414 append(", replay=").append(replaySize).append("ops");
1415 } else if (failureCount > 0) {
1416 if (stopped > 0) {
1417 sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1418 }
1419 if (failed > 0) {
1420 sb.append("; not retrying ").append(failed).append(" - final failure");
1421 }
1422
1423 }
1424
1425 return sb.toString();
1426 }
1427
1428
1429
1430
1431
1432
1433 private void setResult(Action<Row> action, Object result) {
1434 if (result == null) {
1435 throw new RuntimeException("Result cannot be null");
1436 }
1437 ReplicaResultState state = null;
1438 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1439 int index = action.getOriginalIndex();
1440 if (results == null) {
1441 decActionCounter(index);
1442 return;
1443 } else if ((state = trySetResultSimple(
1444 index, action.getAction(), false, result, null, isStale)) == null) {
1445 return;
1446 }
1447 assert state != null;
1448
1449
1450
1451
1452
1453 synchronized (state) {
1454 if (state.callCount == 0) {
1455 return;
1456 }
1457 state.callCount = 0;
1458 }
1459 synchronized (replicaResultLock) {
1460 if (results[index] != state) {
1461 throw new AssertionError("We set the callCount but someone else replaced the result");
1462 }
1463 results[index] = result;
1464 }
1465
1466 decActionCounter(index);
1467 }
1468
1469
1470
1471
1472
1473
1474
1475
1476 private void setError(int index, Row row, Throwable throwable, ServerName server) {
1477 ReplicaResultState state = null;
1478 if (results == null) {
1479
1480
1481
1482 errors.add(throwable, row, server);
1483 decActionCounter(index);
1484 return;
1485 } else if ((state = trySetResultSimple(
1486 index, row, true, throwable, server, false)) == null) {
1487 return;
1488 }
1489 assert state != null;
1490 BatchErrors target = null;
1491 boolean isActionDone = false;
1492 synchronized (state) {
1493 switch (state.callCount) {
1494 case 0: return;
1495 case 1: {
1496 target = errors;
1497 isActionDone = true;
1498 break;
1499 }
1500 default: {
1501 assert state.callCount > 1;
1502 if (state.replicaErrors == null) {
1503 state.replicaErrors = new BatchErrors();
1504 }
1505 target = state.replicaErrors;
1506 break;
1507 }
1508 }
1509 --state.callCount;
1510 }
1511 target.add(throwable, row, server);
1512 if (isActionDone) {
1513 if (state.replicaErrors != null) {
1514 errors.merge(state.replicaErrors);
1515 }
1516
1517 synchronized (replicaResultLock) {
1518 if (results[index] != state) {
1519 throw new AssertionError("We set the callCount but someone else replaced the result");
1520 }
1521 results[index] = throwable;
1522 }
1523 decActionCounter(index);
1524 }
1525 }
1526
1527
1528
1529
1530
1531
1532
1533
1534 private boolean isActionComplete(int index, Row row) {
1535 if (!isReplicaGet(row)) return false;
1536 Object resObj = results[index];
1537 return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1538 || ((ReplicaResultState)resObj).callCount == 0);
1539 }
1540
1541
1542
1543
1544
1545 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1546 Object result, ServerName server, boolean isFromReplica) {
1547 Object resObj = null;
1548 if (!isReplicaGet(row)) {
1549 if (isFromReplica) {
1550 throw new AssertionError("Unexpected stale result for " + row);
1551 }
1552 results[index] = result;
1553 } else {
1554 synchronized (replicaResultLock) {
1555 if ((resObj = results[index]) == null) {
1556 if (isFromReplica) {
1557 throw new AssertionError("Unexpected stale result for " + row);
1558 }
1559 results[index] = result;
1560 }
1561 }
1562 }
1563
1564 ReplicaResultState rrs =
1565 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1566 if (rrs == null && isError) {
1567
1568 errors.add((Throwable)result, row, server);
1569 }
1570
1571 if (resObj == null) {
1572
1573 decActionCounter(index);
1574 return null;
1575 }
1576 return rrs;
1577 }
1578
1579 private void decActionCounter(int index) {
1580 long actionsRemaining = actionsInProgress.decrementAndGet();
1581 if (actionsRemaining < 0) {
1582 String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1583 throw new AssertionError(error);
1584 } else if (actionsRemaining == 0) {
1585 synchronized (actionsInProgress) {
1586 actionsInProgress.notifyAll();
1587 }
1588 }
1589 }
1590
1591 private String buildDetailedErrorMsg(String string, int index) {
1592 StringBuilder error = new StringBuilder(string);
1593 error.append("; called for ").
1594 append(index).
1595 append(", actionsInProgress ").
1596 append(actionsInProgress.get()).
1597 append("; replica gets: ");
1598 if (replicaGetIndices != null) {
1599 for (int i = 0; i < replicaGetIndices.length; ++i) {
1600 error.append(replicaGetIndices[i]).append(", ");
1601 }
1602 } else {
1603 error.append(hasAnyReplicaGets ? "all" : "none");
1604 }
1605 error.append("; results ");
1606 if (results != null) {
1607 for (int i = 0; i < results.length; ++i) {
1608 Object o = results[i];
1609 error.append(((o == null) ? "null" : o.toString())).append(", ");
1610 }
1611 }
1612 return error.toString();
1613 }
1614
1615 @Override
1616 public void waitUntilDone() throws InterruptedIOException {
1617 try {
1618 waitUntilDone(Long.MAX_VALUE);
1619 } catch (InterruptedException iex) {
1620 throw new InterruptedIOException(iex.getMessage());
1621 } finally {
1622 if (callsInProgress != null) {
1623 for (MultiServerCallable<Row> clb : callsInProgress) {
1624 clb.cancel();
1625 }
1626 }
1627 }
1628 }
1629
1630 private boolean waitUntilDone(long cutoff) throws InterruptedException {
1631 boolean hasWait = cutoff != Long.MAX_VALUE;
1632 long lastLog = EnvironmentEdgeManager.currentTime();
1633 long currentInProgress;
1634 while (0 != (currentInProgress = actionsInProgress.get())) {
1635 long now = EnvironmentEdgeManager.currentTime();
1636 if (hasWait && (now * 1000L) > cutoff) {
1637 return false;
1638 }
1639 if (!hasWait) {
1640 if (now > lastLog + 10000) {
1641 lastLog = now;
1642 LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish");
1643 }
1644 }
1645 synchronized (actionsInProgress) {
1646 if (actionsInProgress.get() == 0) break;
1647 if (!hasWait) {
1648 actionsInProgress.wait(100);
1649 } else {
1650 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1651 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1652 }
1653 }
1654 }
1655 return true;
1656 }
1657
1658 @Override
1659 public boolean hasError() {
1660 return errors.hasErrors();
1661 }
1662
1663 @Override
1664 public List<? extends Row> getFailedOperations() {
1665 return errors.actions;
1666 }
1667
1668 @Override
1669 public RetriesExhaustedWithDetailsException getErrors() {
1670 return errors.makeException();
1671 }
1672
1673 @Override
1674 public Object[] getResults() throws InterruptedIOException {
1675 waitUntilDone();
1676 return results;
1677 }
1678 }
1679
1680 @VisibleForTesting
1681
1682 protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1683 TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1684 Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1685 return new AsyncRequestFutureImpl<CResult>(
1686 tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1687 }
1688
1689
1690
1691
1692 @VisibleForTesting
1693 protected MultiServerCallable<Row> createCallable(final ServerName server,
1694 TableName tableName, final MultiAction<Row> multi) {
1695 return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1696 }
1697
1698
1699
1700
1701 @VisibleForTesting
1702 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1703 return rpcCallerFactory.<MultiResponse> newCaller();
1704 }
1705
1706 @VisibleForTesting
1707
1708 void waitUntilDone() throws InterruptedIOException {
1709 waitForMaximumCurrentTasks(0);
1710 }
1711
1712
1713 private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
1714 long lastLog = EnvironmentEdgeManager.currentTime();
1715 long currentInProgress, oldInProgress = Long.MAX_VALUE;
1716 while ((currentInProgress = this.tasksInProgress.get()) > max) {
1717 if (oldInProgress != currentInProgress) {
1718 long now = EnvironmentEdgeManager.currentTime();
1719 if (now > lastLog + 10000) {
1720 lastLog = now;
1721 LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1722 + max + ", tasksInProgress=" + currentInProgress);
1723 }
1724 }
1725 oldInProgress = currentInProgress;
1726 try {
1727 synchronized (this.tasksInProgress) {
1728 if (tasksInProgress.get() != oldInProgress) break;
1729 this.tasksInProgress.wait(100);
1730 }
1731 } catch (InterruptedException e) {
1732 throw new InterruptedIOException("#" + id + ", interrupted." +
1733 " currentNumberOfTask=" + currentInProgress);
1734 }
1735 }
1736 }
1737
1738
1739
1740
1741
1742
1743 public boolean hasError() {
1744 return globalErrors.hasErrors();
1745 }
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756 public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1757 List<Row> failedRows) throws InterruptedIOException {
1758 waitForMaximumCurrentTasks(0);
1759 if (!globalErrors.hasErrors()) {
1760 return null;
1761 }
1762 if (failedRows != null) {
1763 failedRows.addAll(globalErrors.actions);
1764 }
1765 RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1766 globalErrors.clear();
1767 return result;
1768 }
1769
1770
1771
1772
1773 protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1774 tasksInProgress.incrementAndGet();
1775
1776 AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1777 if (serverCnt == null) {
1778 taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1779 serverCnt = taskCounterPerServer.get(sn);
1780 }
1781 serverCnt.incrementAndGet();
1782
1783 for (byte[] regBytes : regions) {
1784 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1785 if (regionCnt == null) {
1786 regionCnt = new AtomicInteger();
1787 AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1788 if (oldCnt != null) {
1789 regionCnt = oldCnt;
1790 }
1791 }
1792 regionCnt.incrementAndGet();
1793 }
1794 }
1795
1796
1797
1798
1799 protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1800 for (byte[] regBytes : regions) {
1801 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1802 regionCnt.decrementAndGet();
1803 }
1804
1805 taskCounterPerServer.get(sn).decrementAndGet();
1806 tasksInProgress.decrementAndGet();
1807 synchronized (tasksInProgress) {
1808 tasksInProgress.notifyAll();
1809 }
1810 }
1811
1812
1813
1814
1815
1816
1817
1818
1819 protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1820 return new ConnectionManager.ServerErrorTracker(
1821 this.serverTrackerTimeout, this.numTries);
1822 }
1823
1824 private static boolean isReplicaGet(Row row) {
1825 return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1826 }
1827
1828
1829
1830
1831 private enum Retry {
1832 YES,
1833 NO_LOCATION_PROBLEM,
1834 NO_NOT_RETRIABLE,
1835 NO_RETRIES_EXHAUSTED,
1836 NO_OTHER_SUCCEEDED
1837 }
1838 }