View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.concurrent.ExecutorService;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.KeyValue.MetaComparator;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellComparator;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.DoNotRetryIOException;
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.NotServingRegionException;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.UnknownScannerException;
43  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
44  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
45  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
46  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
47  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
48  import org.apache.hadoop.hbase.util.Bytes;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  
52  /**
53   * Implements the scanner interface for the HBase client.
54   * If there are multiple regions in a table, this scanner will iterate
55   * through them all.
56   */
57  @InterfaceAudience.Private
58  public class ClientScanner extends AbstractClientScanner {
59      private static final Log LOG = LogFactory.getLog(ClientScanner.class);
60      // A byte array in which all elements are the max byte, and it is used to
61      // construct closest front row
62      static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
63      protected Scan scan;
64      protected boolean closed = false;
65      // Current region scanner is against.  Gets cleared if current region goes
66      // wonky: e.g. if it splits on us.
67      protected HRegionInfo currentRegion = null;
68      protected ScannerCallableWithReplicas callable = null;
69      protected final LinkedList<Result> cache = new LinkedList<Result>();
70      /**
71       * A list of partial results that have been returned from the server. This list should only
72       * contain results if this scanner does not have enough partial results to form the complete
73       * result.
74       */
75      protected final LinkedList<Result> partialResults = new LinkedList<Result>();
76      /**
77       * The row for which we are accumulating partial Results (i.e. the row of the Results stored
78       * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync
79       * via the methods {@link #addToPartialResults(Result)} and {@link #clearPartialResults()}
80       */
81      protected byte[] partialResultsRow = null;
82      /**
83       * The last cell from a not full Row which is added to cache
84       */
85      protected Cell lastCellLoadedToCache = null;
86      protected final int caching;
87      protected long lastNext;
88      // Keep lastResult returned successfully in case we have to reset scanner.
89      protected Result lastResult = null;
90      protected final long maxScannerResultSize;
91      private final ClusterConnection connection;
92      private final TableName tableName;
93      protected final int scannerTimeout;
94      protected boolean scanMetricsPublished = false;
95      protected RpcRetryingCaller<Result []> caller;
96      protected RpcControllerFactory rpcControllerFactory;
97      protected Configuration conf;
98      //The timeout on the primary. Applicable if there are multiple replicas for a region
99      //In that case, we will only wait for this much timeout on the primary before going
100     //to the replicas and trying the same scan. Note that the retries will still happen
101     //on each replica and the first successful results will be taken. A timeout of 0 is
102     //disallowed.
103     protected final int primaryOperationTimeout;
104     private int retries;
105     protected final ExecutorService pool;
106     private static MetaComparator metaComparator = new MetaComparator();
107 
108   /**
109    * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
110    * row maybe changed changed.
111    * @param conf The {@link Configuration} to use.
112    * @param scan {@link Scan} to use in this scanner
113    * @param tableName The table that we wish to scan
114    * @param connection Connection identifying the cluster
115    * @throws IOException
116    */
117   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
118       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
119       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
120       throws IOException {
121       if (LOG.isTraceEnabled()) {
122         LOG.trace("Scan table=" + tableName
123             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
124       }
125       this.scan = scan;
126       this.tableName = tableName;
127       this.lastNext = System.currentTimeMillis();
128       this.connection = connection;
129       this.pool = pool;
130       this.primaryOperationTimeout = primaryOperationTimeout;
131       this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
132           HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
133       if (scan.getMaxResultSize() > 0) {
134         this.maxScannerResultSize = scan.getMaxResultSize();
135       } else {
136         this.maxScannerResultSize = conf.getLong(
137           HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
138           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
139       }
140       this.scannerTimeout = HBaseConfiguration.getInt(conf,
141         HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
142         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
143         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
144 
145       // check if application wants to collect scan metrics
146       initScanMetrics(scan);
147 
148       // Use the caching from the Scan.  If not set, use the default cache setting for this table.
149       if (this.scan.getCaching() > 0) {
150         this.caching = this.scan.getCaching();
151       } else {
152         this.caching = conf.getInt(
153             HConstants.HBASE_CLIENT_SCANNER_CACHING,
154             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
155       }
156 
157       this.caller = rpcFactory.<Result[]> newCaller();
158       this.rpcControllerFactory = controllerFactory;
159 
160       this.conf = conf;
161       initializeScannerInConstruction();
162     }
163 
164     protected void initializeScannerInConstruction() throws IOException{
165       // initialize the scanner
166       nextScanner(this.caching, false);
167     }
168 
169     protected ClusterConnection getConnection() {
170       return this.connection;
171     }
172 
173     /**
174      * @return Table name
175      * @deprecated As of release 0.96
176      *             (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>).
177      *             This will be removed in HBase 2.0.0. Use {@link #getTable()}.
178      */
179     @Deprecated
180     protected byte [] getTableName() {
181       return this.tableName.getName();
182     }
183 
184     protected TableName getTable() {
185       return this.tableName;
186     }
187 
188     protected int getRetries() {
189       return this.retries;
190     }
191 
192     protected int getScannerTimeout() {
193       return this.scannerTimeout;
194     }
195 
196     protected Configuration getConf() {
197       return this.conf;
198     }
199 
200     protected Scan getScan() {
201       return scan;
202     }
203 
204     protected ExecutorService getPool() {
205       return pool;
206     }
207 
208     protected int getPrimaryOperationTimeout() {
209       return primaryOperationTimeout;
210     }
211 
212     protected int getCaching() {
213       return caching;
214     }
215 
216     protected long getTimestamp() {
217       return lastNext;
218     }
219 
220     @VisibleForTesting
221     protected long getMaxResultSize() {
222       return maxScannerResultSize;
223     }
224 
225     // returns true if the passed region endKey
226     protected boolean checkScanStopRow(final byte [] endKey) {
227       if (this.scan.getStopRow().length > 0) {
228         // there is a stop row, check to see if we are past it.
229         byte [] stopRow = scan.getStopRow();
230         int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
231           endKey, 0, endKey.length);
232         if (cmp <= 0) {
233           // stopRow <= endKey (endKey is equals to or larger than stopRow)
234           // This is a stop.
235           return true;
236         }
237       }
238       return false; //unlikely.
239     }
240 
241     private boolean possiblyNextScanner(int nbRows, final boolean done) throws IOException {
242       // If we have just switched replica, don't go to the next scanner yet. Rather, try
243       // the scanner operations on the new replica, from the right point in the scan
244       // Note that when we switched to a different replica we left it at a point
245       // where we just did the "openScanner" with the appropriate startrow
246       if (callable != null && callable.switchedToADifferentReplica()) return true;
247       return nextScanner(nbRows, done);
248     }
249 
250     /*
251      * Gets a scanner for the next region.  If this.currentRegion != null, then
252      * we will move to the endrow of this.currentRegion.  Else we will get
253      * scanner at the scan.getStartRow().  We will go no further, just tidy
254      * up outstanding scanners, if <code>currentRegion != null</code> and
255      * <code>done</code> is true.
256      * @param nbRows
257      * @param done Server-side says we're done scanning.
258      */
259   protected boolean nextScanner(int nbRows, final boolean done)
260     throws IOException {
261       // Close the previous scanner if it's open
262       if (this.callable != null) {
263         this.callable.setClose();
264         call(callable, caller, scannerTimeout);
265         this.callable = null;
266       }
267 
268       // Where to start the next scanner
269       byte [] localStartKey;
270 
271       // if we're at end of table, close and return false to stop iterating
272       if (this.currentRegion != null) {
273         byte [] endKey = this.currentRegion.getEndKey();
274         if (endKey == null ||
275             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
276             checkScanStopRow(endKey) ||
277             done) {
278           close();
279           if (LOG.isTraceEnabled()) {
280             LOG.trace("Finished " + this.currentRegion);
281           }
282           return false;
283         }
284         localStartKey = endKey;
285         if (LOG.isTraceEnabled()) {
286           LOG.trace("Finished " + this.currentRegion);
287         }
288       } else {
289         localStartKey = this.scan.getStartRow();
290       }
291 
292       if (LOG.isDebugEnabled() && this.currentRegion != null) {
293         // Only worth logging if NOT first region in scan.
294         LOG.debug("Advancing internal scanner to startKey at '" +
295           Bytes.toStringBinary(localStartKey) + "'");
296       }
297       try {
298         callable = getScannerCallable(localStartKey, nbRows);
299         // Open a scanner on the region server starting at the
300         // beginning of the region
301         call(callable, caller, scannerTimeout);
302         this.currentRegion = callable.getHRegionInfo();
303         if (this.scanMetrics != null) {
304           this.scanMetrics.countOfRegions.incrementAndGet();
305         }
306       } catch (IOException e) {
307         close();
308         throw e;
309       }
310       return true;
311     }
312 
313   @VisibleForTesting
314   boolean isAnyRPCcancelled() {
315     return callable.isAnyRPCcancelled();
316   }
317 
318   Result[] call(ScannerCallableWithReplicas callable,
319       RpcRetryingCaller<Result[]> caller, int scannerTimeout)
320       throws IOException, RuntimeException {
321     if (Thread.interrupted()) {
322       throw new InterruptedIOException();
323     }
324     // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
325     // we do a callWithRetries
326     return caller.callWithoutRetries(callable, scannerTimeout);
327   }
328 
329     @InterfaceAudience.Private
330     protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
331         int nbRows) {
332       scan.setStartRow(localStartKey);
333       ScannerCallable s =
334           new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
335               this.rpcControllerFactory);
336       s.setCaching(nbRows);
337       ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
338        s, pool, primaryOperationTimeout, scan,
339        retries, scannerTimeout, caching, conf, caller);
340       return sr;
341     }
342 
343     /**
344      * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
345      * application or TableInputFormat.Later, we could push it to other systems. We don't use
346      * metrics framework because it doesn't support multi-instances of the same metrics on the same
347      * machine; for scan/map reduce scenarios, we will have multiple scans running at the same time.
348      *
349      * By default, scan metrics are disabled; if the application wants to collect them, this
350      * behavior can be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
351      * 
352      * <p>This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
353      */
354     protected void writeScanMetrics() {
355       if (this.scanMetrics == null || scanMetricsPublished) {
356         return;
357       }
358       MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
359       scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
360       scanMetricsPublished = true;
361     }
362 
363     @Override
364     public Result next() throws IOException {
365       // If the scanner is closed and there's nothing left in the cache, next is a no-op.
366       if (cache.size() == 0 && this.closed) {
367         return null;
368       }
369       if (cache.size() == 0) {
370         loadCache();
371       }
372 
373       if (cache.size() > 0) {
374         return cache.poll();
375       }
376 
377       // if we exhausted this scanner before calling close, write out the scan metrics
378       writeScanMetrics();
379       return null;
380     }
381 
382   @VisibleForTesting
383   public int getCacheSize() {
384     return cache != null ? cache.size() : 0;
385   }
386 
387   /**
388    * Contact the servers to load more {@link Result}s in the cache.
389    */
390   protected void loadCache() throws IOException {
391     Result[] values = null;
392     long remainingResultSize = maxScannerResultSize;
393     int countdown = this.caching;
394     // We need to reset it if it's a new callable that was created with a countdown in nextScanner
395     callable.setCaching(this.caching);
396     // This flag is set when we want to skip the result returned. We do
397     // this when we reset scanner because it split under us.
398     boolean retryAfterOutOfOrderException = true;
399     // We don't expect that the server will have more results for us if
400     // it doesn't tell us otherwise. We rely on the size or count of results
401     boolean serverHasMoreResults = false;
402     boolean allResultsSkipped = false;
403     do {
404       allResultsSkipped = false;
405       try {
406         // Server returns a null values if scanning is to stop. Else,
407         // returns an empty array if scanning is to go on and we've just
408         // exhausted current region.
409         values = call(callable, caller, scannerTimeout);
410         // When the replica switch happens, we need to do certain operations again.
411         // The callable will openScanner with the right startkey but we need to pick up
412         // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
413         // of the loop as it happens for the cases where we see exceptions.
414         // Since only openScanner would have happened, values would be null
415         if (values == null && callable.switchedToADifferentReplica()) {
416           // Any accumulated partial results are no longer valid since the callable will
417           // openScanner with the correct startkey and we must pick up from there
418           clearPartialResults();
419           this.currentRegion = callable.getHRegionInfo();
420           continue;
421         }
422         retryAfterOutOfOrderException = true;
423       } catch (DoNotRetryIOException | NeedUnmanagedConnectionException e) {
424         // An exception was thrown which makes any partial results that we were collecting
425         // invalid. The scanner will need to be reset to the beginning of a row.
426         clearPartialResults();
427         // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
428         // to reset the scanner and come back in again.
429         if (e instanceof UnknownScannerException) {
430           long timeout = lastNext + scannerTimeout;
431           // If we are over the timeout, throw this exception to the client wrapped in
432           // a ScannerTimeoutException. Else, it's because the region moved and we used the old
433           // id against the new region server; reset the scanner.
434           if (timeout < System.currentTimeMillis()) {
435             LOG.info("For hints related to the following exception, please try taking a look at: "
436                     + "https://hbase.apache.org/book.html#trouble.client.scantimeout");
437             long elapsed = System.currentTimeMillis() - lastNext;
438             ScannerTimeoutException ex =
439                 new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
440                     + "timeout is currently set to " + scannerTimeout);
441             ex.initCause(e);
442             throw ex;
443           }
444         } else {
445           // If exception is any but the list below throw it back to the client; else setup
446           // the scanner and retry.
447           Throwable cause = e.getCause();
448           if ((cause != null && cause instanceof NotServingRegionException) ||
449               (cause != null && cause instanceof RegionServerStoppedException) ||
450               e instanceof OutOfOrderScannerNextException) {
451             // Pass. It is easier writing the if loop test as list of what is allowed rather than
452             // as a list of what is not allowed... so if in here, it means we do not throw.
453           } else {
454             throw e;
455           }
456         }
457         // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
458         if (this.lastResult != null) {
459           // The region has moved. We need to open a brand new scanner at the new location.
460           // Reset the startRow to the row we've seen last so that the new scanner starts at
461           // the correct row. Otherwise we may see previously returned rows again.
462           // (ScannerCallable by now has "relocated" the correct region)
463           if (!this.lastResult.isPartial() && scan.getBatch() < 0 ) {
464             if (scan.isReversed()) {
465               scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
466             } else {
467               scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
468             }
469           } else {
470             // we need rescan this row because we only loaded partial row before
471             scan.setStartRow(lastResult.getRow());
472           }
473         }
474         if (e instanceof OutOfOrderScannerNextException) {
475           if (retryAfterOutOfOrderException) {
476             retryAfterOutOfOrderException = false;
477           } else {
478             // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
479             throw new DoNotRetryIOException("Failed after retry of " +
480                 "OutOfOrderScannerNextException: was there a rpc timeout?", e);
481           }
482         }
483         // Clear region.
484         this.currentRegion = null;
485         // Set this to zero so we don't try and do an rpc and close on remote server when
486         // the exception we got was UnknownScanner or the Server is going down.
487         callable = null;
488         // This continue will take us to while at end of loop where we will set up new scanner.
489         continue;
490       }
491       long currentTime = System.currentTimeMillis();
492       if (this.scanMetrics != null) {
493         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
494       }
495       lastNext = currentTime;
496       // Groom the array of Results that we received back from the server before adding that
497       // Results to the scanner's cache. If partial results are not allowed to be seen by the
498       // caller, all book keeping will be performed within this method.
499       List<Result> resultsToAddToCache =
500           getResultsToAddToCache(values, callable.isHeartbeatMessage());
501       if (!resultsToAddToCache.isEmpty()) {
502         for (Result rs : resultsToAddToCache) {
503           rs = filterLoadedCell(rs);
504           if (rs == null) {
505             continue;
506           }
507           cache.add(rs);
508           for (Cell cell : rs.rawCells()) {
509             remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
510           }
511           countdown--;
512           this.lastResult = rs;
513           if (this.lastResult.isPartial() || scan.getBatch() > 0 ) {
514             updateLastCellLoadedToCache(this.lastResult);
515           } else {
516             this.lastCellLoadedToCache = null;
517           }
518         }
519         if (cache.isEmpty()) {
520           // all result has been seen before, we need scan more.
521           allResultsSkipped = true;
522           continue;
523         }
524       }
525       if (callable.isHeartbeatMessage()) {
526         if (cache.size() > 0) {
527           // Caller of this method just wants a Result. If we see a heartbeat message, it means
528           // processing of the scan is taking a long time server side. Rather than continue to
529           // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
530           // unnecesary delays to the caller
531           if (LOG.isTraceEnabled()) {
532             LOG.trace("Heartbeat message received and cache contains Results."
533                     + " Breaking out of scan loop");
534           }
535           break;
536         }
537         continue;
538       }
539 
540       // We expect that the server won't have more results for us when we exhaust
541       // the size (bytes or count) of the results returned. If the server *does* inform us that
542       // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
543       // get results is the moreResults context valid.
544       if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
545         // Only adhere to more server results when we don't have any partialResults
546         // as it keeps the outer loop logic the same.
547         serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
548       }
549       // Values == null means server-side filter has determined we must STOP
550       // !partialResults.isEmpty() means that we are still accumulating partial Results for a
551       // row. We should not change scanners before we receive all the partial Results for that
552       // row.
553     } while (allResultsSkipped || (callable != null && callable.isHeartbeatMessage())
554         || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
555         && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
556   }
557 
558   /**
559    * @param remainingResultSize
560    * @param remainingRows
561    * @param regionHasMoreResults
562    * @return true when the current region has been exhausted. When the current region has been
563    *         exhausted, the region must be changed before scanning can continue
564    */
565   private boolean doneWithRegion(long remainingResultSize, int remainingRows,
566       boolean regionHasMoreResults) {
567     return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
568   }
569 
570   /**
571    * This method ensures all of our book keeping regarding partial results is kept up to date. This
572    * method should be called once we know that the results we received back from the RPC request do
573    * not contain errors. We return a list of results that should be added to the cache. In general,
574    * this list will contain all NON-partial results from the input array (unless the client has
575    * specified that they are okay with receiving partial results)
576    * @param resultsFromServer The array of {@link Result}s returned from the server
577    * @param heartbeatMessage Flag indicating whether or not the response received from the server
578    *          represented a complete response, or a heartbeat message that was sent to keep the
579    *          client-server connection alive
580    * @return the list of results that should be added to the cache.
581    * @throws IOException
582    */
583   protected List<Result>
584       getResultsToAddToCache(Result[] resultsFromServer, boolean heartbeatMessage)
585           throws IOException {
586     int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
587     List<Result> resultsToAddToCache = new ArrayList<Result>(resultSize);
588 
589     final boolean isBatchSet = scan != null && scan.getBatch() > 0;
590     final boolean allowPartials = scan != null && scan.getAllowPartialResults();
591 
592     // If the caller has indicated in their scan that they are okay with seeing partial results,
593     // then simply add all results to the list. Note that since scan batching also returns results
594     // for a row in pieces we treat batch being set as equivalent to allowing partials. The
595     // implication of treating batching as equivalent to partial results is that it is possible
596     // the caller will receive a result back where the number of cells in the result is less than
597     // the batch size even though it may not be the last group of cells for that row.
598     if (allowPartials || isBatchSet) {
599       addResultsToList(resultsToAddToCache, resultsFromServer, 0,
600           (null == resultsFromServer ? 0 : resultsFromServer.length));
601       return resultsToAddToCache;
602     }
603 
604     // If no results were returned it indicates that either we have the all the partial results
605     // necessary to construct the complete result or the server had to send a heartbeat message
606     // to the client to keep the client-server connection alive
607     if (resultsFromServer == null || resultsFromServer.length == 0) {
608       // If this response was an empty heartbeat message, then we have not exhausted the region
609       // and thus there may be more partials server side that still need to be added to the partial
610       // list before we form the complete Result
611       if (!partialResults.isEmpty() && !heartbeatMessage) {
612         resultsToAddToCache.add(Result.createCompleteResult(partialResults));
613         clearPartialResults();
614       }
615 
616       return resultsToAddToCache;
617     }
618 
619     // In every RPC response there should be at most a single partial result. Furthermore, if
620     // there is a partial result, it is guaranteed to be in the last position of the array.
621     Result last = resultsFromServer[resultsFromServer.length - 1];
622     Result partial = last.isPartial() ? last : null;
623 
624     if (LOG.isTraceEnabled()) {
625       StringBuilder sb = new StringBuilder();
626       sb.append("number results from RPC: ").append(resultsFromServer.length).append(",");
627       sb.append("partial != null: ").append(partial != null).append(",");
628       sb.append("number of partials so far: ").append(partialResults.size());
629       LOG.trace(sb.toString());
630     }
631 
632     // There are three possibilities cases that can occur while handling partial results
633     //
634     // 1. (partial != null && partialResults.isEmpty())
635     // This is the first partial result that we have received. It should be added to
636     // the list of partialResults and await the next RPC request at which point another
637     // portion of the complete result will be received
638     //
639     // 2. !partialResults.isEmpty()
640     // Since our partialResults list is not empty it means that we have been accumulating partial
641     // Results for a particular row. We cannot form the complete/whole Result for that row until
642     // all partials for the row have been received. Thus we loop through all of the Results
643     // returned from the server and determine whether or not all partial Results for the row have
644     // been received. We know that we have received all of the partial Results for the row when:
645     // i) We notice a row change in the Results
646     // ii) We see a Result for the partial row that is NOT marked as a partial Result
647     //
648     // 3. (partial == null && partialResults.isEmpty())
649     // Business as usual. We are not accumulating partial results and there wasn't a partial result
650     // in the RPC response. This means that all of the results we received from the server are
651     // complete and can be added directly to the cache
652     if (partial != null && partialResults.isEmpty()) {
653       addToPartialResults(partial);
654 
655       // Exclude the last result, it's a partial
656       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length - 1);
657     } else if (!partialResults.isEmpty()) {
658       for (int i = 0; i < resultsFromServer.length; i++) {
659         Result result = resultsFromServer[i];
660 
661         // This result is from the same row as the partial Results. Add it to the list of partials
662         // and check if it was the last partial Result for that row
663         if (Bytes.equals(partialResultsRow, result.getRow())) {
664           addToPartialResults(result);
665 
666           // If the result is not a partial, it is a signal to us that it is the last Result we
667           // need to form the complete Result client-side
668           if (!result.isPartial()) {
669             resultsToAddToCache.add(Result.createCompleteResult(partialResults));
670             clearPartialResults();
671           }
672         } else {
673           // The row of this result differs from the row of the partial results we have received so
674           // far. If our list of partials isn't empty, this is a signal to form the complete Result
675           // since the row has now changed
676           if (!partialResults.isEmpty()) {
677             resultsToAddToCache.add(Result.createCompleteResult(partialResults));
678             clearPartialResults();
679           }
680 
681           // It's possible that in one response from the server we receive the final partial for
682           // one row and receive a partial for a different row. Thus, make sure that all Results
683           // are added to the proper list
684           if (result.isPartial()) {
685             addToPartialResults(result);
686           } else {
687             resultsToAddToCache.add(result);
688           }
689         }
690       }
691     } else { // partial == null && partialResults.isEmpty() -- business as usual
692       addResultsToList(resultsToAddToCache, resultsFromServer, 0, resultsFromServer.length);
693     }
694 
695     return resultsToAddToCache;
696   }
697 
698   /**
699    * A convenience method for adding a Result to our list of partials. This method ensure that only
700    * Results that belong to the same row as the other partials can be added to the list.
701    * @param result The result that we want to add to our list of partial Results
702    * @throws IOException
703    */
704   private void addToPartialResults(final Result result) throws IOException {
705     final byte[] row = result.getRow();
706     if (partialResultsRow != null && !Bytes.equals(row, partialResultsRow)) {
707       throw new IOException("Partial result row does not match. All partial results must come "
708           + "from the same row. partialResultsRow: " + Bytes.toString(partialResultsRow) + "row: "
709           + Bytes.toString(row));
710     }
711     partialResultsRow = row;
712     partialResults.add(result);
713   }
714 
715   /**
716    * Convenience method for clearing the list of partials and resetting the partialResultsRow.
717    */
718   private void clearPartialResults() {
719     partialResults.clear();
720     partialResultsRow = null;
721   }
722 
723   /**
724    * Helper method for adding results between the indices [start, end) to the outputList
725    * @param outputList the list that results will be added to
726    * @param inputArray the array that results are taken from
727    * @param start beginning index (inclusive)
728    * @param end ending index (exclusive)
729    */
730   private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
731     if (inputArray == null || start < 0 || end > inputArray.length) return;
732 
733     for (int i = start; i < end; i++) {
734       outputList.add(inputArray[i]);
735     }
736   }
737 
738     @Override
739     public void close() {
740       if (!scanMetricsPublished) writeScanMetrics();
741       if (callable != null) {
742         callable.setClose();
743         try {
744           call(callable, caller, scannerTimeout);
745         } catch (UnknownScannerException e) {
746            // We used to catch this error, interpret, and rethrow. However, we
747            // have since decided that it's not nice for a scanner's close to
748            // throw exceptions. Chances are it was just due to lease time out.
749         } catch (IOException e) {
750            /* An exception other than UnknownScanner is unexpected. */
751            LOG.warn("scanner failed to close. Exception follows: " + e);
752         }
753         callable = null;
754       }
755       closed = true;
756     }
757 
758   /**
759    * Create the closest row before the specified row
760    * @param row
761    * @return a new byte array which is the closest front row of the specified one
762    */
763   protected static byte[] createClosestRowBefore(byte[] row) {
764     if (row == null) {
765       throw new IllegalArgumentException("The passed row is empty");
766     }
767     if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
768       return MAX_BYTE_ARRAY;
769     }
770     if (row[row.length - 1] == 0) {
771       return Arrays.copyOf(row, row.length - 1);
772     } else {
773       byte[] closestFrontRow = Arrays.copyOf(row, row.length);
774       closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
775       closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
776       return closestFrontRow;
777     }
778   }
779 
780   @Override
781   public boolean renewLease() {
782     if (callable != null) {
783       // do not return any rows, do not advance the scanner
784       callable.setRenew(true);
785       try {
786         this.caller.callWithoutRetries(callable, this.scannerTimeout);
787       } catch (Exception e) {
788         return false;
789       } finally {
790         callable.setRenew(false);
791       }
792       return true;
793     }
794     return false;
795   }
796 
797   protected void updateLastCellLoadedToCache(Result result) {
798     if (result.rawCells().length == 0) {
799       return;
800     }
801     this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
802   }
803 
804   /**
805    * Compare two Cells considering reversed scanner.
806    * ReversedScanner only reverses rows, not columns.
807    */
808   private int compare(Cell a, Cell b) {
809     int r = 0;
810     if (currentRegion != null && currentRegion.isMetaRegion()) {
811       r = metaComparator.compareRows(a, b);
812     } else {
813       r = CellComparator.compareRows(a, b);
814     }
815     if (r != 0) {
816       return this.scan.isReversed() ? -r : r;
817     }
818     return CellComparator.compareWithoutRow(a, b);
819   }
820 
821   private Result filterLoadedCell(Result result) {
822     // we only filter result when last result is partial
823     // so lastCellLoadedToCache and result should have same row key.
824     // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
825     // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
826     if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
827       return result;
828     }
829     if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
830       // The first cell of this result is larger than the last cell of loadcache.
831       // If user do not allow partial result, it must be true.
832       return result;
833     }
834     if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
835       // The last cell of this result is smaller than the last cell of loadcache, skip all.
836       return null;
837     }
838 
839     // The first one must not in filtered result, we start at the second.
840     int index = 1;
841     while (index < result.rawCells().length) {
842       if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
843         break;
844       }
845       index++;
846     }
847     Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
848     return Result.create(list, result.getExists(), result.isStale(), result.isPartial());
849   }
850 }