1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore;
22
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.util.HashSet;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.CancellationException;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.RegionLocations;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.Pair;
45
46 import com.google.common.annotations.VisibleForTesting;
47
48
49
50
51
52
53
54
55
56
57
58
59
60 @InterfaceAudience.Private
61 class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
62 private static final Log LOG = LogFactory.getLog(ScannerCallableWithReplicas.class);
63 volatile ScannerCallable currentScannerCallable;
64 AtomicBoolean replicaSwitched = new AtomicBoolean(false);
65 final ClusterConnection cConnection;
66 protected final ExecutorService pool;
67 protected final int timeBeforeReplicas;
68 private final Scan scan;
69 private final int retries;
70 private Result lastResult;
71 private final RpcRetryingCaller<Result[]> caller;
72 private final TableName tableName;
73 private Configuration conf;
74 private int scannerTimeout;
75 private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
76 private boolean someRPCcancelled = false;
77
78 public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
79 ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
80 int retries, int scannerTimeout, int caching, Configuration conf,
81 RpcRetryingCaller<Result []> caller) {
82 this.currentScannerCallable = baseCallable;
83 this.cConnection = cConnection;
84 this.pool = pool;
85 if (timeBeforeReplicas < 0) {
86 throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
87 }
88 this.timeBeforeReplicas = timeBeforeReplicas;
89 this.scan = scan;
90 this.retries = retries;
91 this.tableName = tableName;
92 this.conf = conf;
93 this.scannerTimeout = scannerTimeout;
94 this.caller = caller;
95 }
96
97 public void setClose() {
98 currentScannerCallable.setClose();
99 }
100
101 public void setRenew(boolean val) {
102 currentScannerCallable.setRenew(val);
103 }
104
105 public void setCaching(int caching) {
106 currentScannerCallable.setCaching(caching);
107 }
108
109 public int getCaching() {
110 return currentScannerCallable.getCaching();
111 }
112
113 public HRegionInfo getHRegionInfo() {
114 return currentScannerCallable.getHRegionInfo();
115 }
116
117 public boolean getServerHasMoreResults() {
118 return currentScannerCallable.getServerHasMoreResults();
119 }
120
121 public void setServerHasMoreResults(boolean serverHasMoreResults) {
122 currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
123 }
124
125 public boolean hasMoreResultsContext() {
126 return currentScannerCallable.hasMoreResultsContext();
127 }
128
129 public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
130 currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
131 }
132
133 @Override
134 public Result [] call(int timeout) throws IOException {
135
136
137
138
139 if (currentScannerCallable != null && currentScannerCallable.closed) {
140
141
142 if (LOG.isTraceEnabled()) {
143 LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
144 }
145 Result[] r = currentScannerCallable.call(timeout);
146 currentScannerCallable = null;
147 return r;
148 }
149
150
151
152
153
154
155
156 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
157 RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
158 currentScannerCallable.getRow());
159
160
161
162 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
163 new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
164 RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf), pool,
165 rl.size() * 5);
166
167 AtomicBoolean done = new AtomicBoolean(false);
168 replicaSwitched.set(false);
169
170 addCallsForCurrentReplica(cs, rl);
171
172 try {
173
174 Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
175 TimeUnit.MICROSECONDS);
176 if (f != null) {
177 Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
178 if (r != null && r.getSecond() != null) {
179 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
180 }
181 return r == null ? null : r.getFirst();
182 }
183 } catch (ExecutionException e) {
184 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
185 } catch (CancellationException e) {
186 throw new InterruptedIOException(e.getMessage());
187 } catch (InterruptedException e) {
188 throw new InterruptedIOException(e.getMessage());
189 } catch (TimeoutException e) {
190 throw new InterruptedIOException(e.getMessage());
191 }
192
193
194
195 addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
196
197 try {
198 Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeout, TimeUnit.MILLISECONDS);
199 if (f != null) {
200 Pair<Result[], ScannerCallable> r = f.get(timeout, TimeUnit.MILLISECONDS);
201 if (r != null && r.getSecond() != null) {
202 updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
203 }
204 return r == null ? null : r.getFirst();
205 }
206 } catch (ExecutionException e) {
207 RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, retries);
208 } catch (CancellationException e) {
209 throw new InterruptedIOException(e.getMessage());
210 } catch (InterruptedException e) {
211 throw new InterruptedIOException(e.getMessage());
212 } catch (TimeoutException e) {
213 throw new InterruptedIOException(e.getMessage());
214 } finally {
215
216
217 cs.cancelAll();
218 }
219 return null;
220 }
221
222 private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result,
223 AtomicBoolean done, ExecutorService pool) {
224 if (done.compareAndSet(false, true)) {
225 if (currentScannerCallable != scanner) replicaSwitched.set(true);
226 currentScannerCallable = scanner;
227
228 if (result != null && result.length != 0) this.lastResult = result[result.length - 1];
229 if (LOG.isTraceEnabled()) {
230 LOG.trace("Setting current scanner as id=" + currentScannerCallable.scannerId +
231 " associated with replica=" + currentScannerCallable.getHRegionInfo().getReplicaId());
232 }
233
234 outstandingCallables.remove(scanner);
235 for (ScannerCallable s : outstandingCallables) {
236 if (LOG.isTraceEnabled()) {
237 LOG.trace("Closing scanner id=" + s.scannerId +
238 ", replica=" + s.getHRegionInfo().getRegionId() +
239 " because slow and replica=" +
240 this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded");
241 }
242
243
244
245 s.setClose();
246 final RetryingRPC r = new RetryingRPC(s);
247 pool.submit(new Callable<Void>(){
248 @Override
249 public Void call() throws Exception {
250 r.call(scannerTimeout);
251 return null;
252 }
253 });
254 }
255
256 outstandingCallables.clear();
257 }
258 }
259
260
261
262
263
264
265 public boolean switchedToADifferentReplica() {
266 return replicaSwitched.get();
267 }
268
269
270
271
272
273
274
275 public boolean isHeartbeatMessage() {
276 return currentScannerCallable != null && currentScannerCallable.isHeartbeatMessage();
277 }
278
279 private void addCallsForCurrentReplica(
280 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
281 RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
282 outstandingCallables.add(currentScannerCallable);
283 cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
284 }
285
286 private void addCallsForOtherReplicas(
287 ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
288 int min, int max) {
289 if (scan.getConsistency() == Consistency.STRONG) {
290 return;
291 }
292 for (int id = min; id <= max; id++) {
293 if (currentScannerCallable.id == id) {
294 continue;
295 }
296 ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
297 setStartRowForReplicaCallable(s);
298 outstandingCallables.add(s);
299 RetryingRPC retryingOnReplica = new RetryingRPC(s);
300 cs.submit(retryingOnReplica, scannerTimeout, id);
301 }
302 }
303
304
305
306
307
308 private void setStartRowForReplicaCallable(ScannerCallable callable) {
309 if (this.lastResult == null || callable == null) return;
310
311 if (this.lastResult.isPartial()) {
312
313
314
315
316 callable.getScan().setStartRow(this.lastResult.getRow());
317 } else {
318
319
320
321 if (callable.getScan().isReversed()) {
322 callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
323 } else {
324 callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1]));
325 }
326 }
327 }
328
329 @VisibleForTesting
330 boolean isAnyRPCcancelled() {
331 return someRPCcancelled;
332 }
333
334 class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
335 final ScannerCallable callable;
336 RpcRetryingCaller<Result[]> caller;
337 private volatile boolean cancelled = false;
338
339 RetryingRPC(ScannerCallable callable) {
340 this.callable = callable;
341
342
343
344
345
346 this.caller = ScannerCallableWithReplicas.this.caller;
347 if (scan.getConsistency() == Consistency.TIMELINE) {
348 this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf)
349 .<Result[]>newCaller();
350 }
351 }
352
353 @Override
354 public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
355
356
357 if (cancelled) {
358 return null;
359 }
360 Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
361 return new Pair<Result[], ScannerCallable>(res, this.callable);
362 }
363
364 @Override
365 public void prepare(boolean reload) throws IOException {
366 if (cancelled) return;
367
368 if (Thread.interrupted()) {
369 throw new InterruptedIOException();
370 }
371
372 callable.prepare(reload);
373 }
374
375 @Override
376 public void throwable(Throwable t, boolean retrying) {
377 callable.throwable(t, retrying);
378 }
379
380 @Override
381 public String getExceptionMessageAdditionalDetail() {
382 return callable.getExceptionMessageAdditionalDetail();
383 }
384
385 @Override
386 public long sleep(long pause, int tries) {
387 return callable.sleep(pause, tries);
388 }
389
390 @Override
391 public void cancel() {
392 cancelled = true;
393 caller.cancel();
394 if (callable.getController() != null) {
395 callable.getController().startCancel();
396 }
397 someRPCcancelled = true;
398 }
399
400 @Override
401 public boolean isCancelled() {
402 return cancelled;
403 }
404 }
405
406 @Override
407 public void prepare(boolean reload) throws IOException {
408 }
409
410 @Override
411 public void throwable(Throwable t, boolean retrying) {
412 currentScannerCallable.throwable(t, retrying);
413 }
414
415 @Override
416 public String getExceptionMessageAdditionalDetail() {
417 return currentScannerCallable.getExceptionMessageAdditionalDetail();
418 }
419
420 @Override
421 public long sleep(long pause, int tries) {
422 return currentScannerCallable.sleep(pause, tries);
423 }
424 }