1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.CellUtil;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
32 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
33 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
34 import org.apache.hadoop.hbase.util.Bytes;
35
36 import com.google.common.annotations.VisibleForTesting;
37
38 import java.io.IOException;
39 import java.util.concurrent.ExecutorService;
40
41
42
43
44
45
46
47
48
49 @InterfaceAudience.Private
50 public class ClientSmallReversedScanner extends ReversedClientScanner {
51 private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
52 private ScannerCallableWithReplicas smallScanCallable = null;
53 private SmallScannerCallableFactory callableFactory;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
79 final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
80 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
81 throws IOException {
82 this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
83 primaryOperationTimeout, new SmallScannerCallableFactory());
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111 @VisibleForTesting
112 ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
113 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
114 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
115 SmallScannerCallableFactory callableFactory) throws IOException {
116 super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
117 primaryOperationTimeout);
118 this.callableFactory = callableFactory;
119 }
120
121
122
123
124
125
126
127
128
129
130
131
132
133 private boolean nextScanner(int nbRows, final boolean done,
134 boolean currentRegionDone) throws IOException {
135
136 byte[] localStartKey;
137 int cacheNum = nbRows;
138 boolean regionChanged = true;
139
140 if (this.currentRegion != null && currentRegionDone) {
141 byte[] startKey = this.currentRegion.getStartKey();
142 if (startKey == null
143 || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
144 || checkScanStopRow(startKey) || done) {
145 close();
146 if (LOG.isDebugEnabled()) {
147 LOG.debug("Finished with small scan at " + this.currentRegion);
148 }
149 return false;
150 }
151
152 localStartKey = createClosestRowBefore(startKey);
153 if (LOG.isDebugEnabled()) {
154 LOG.debug("Finished with region " + this.currentRegion);
155 }
156 } else if (this.lastResult != null) {
157 regionChanged = false;
158 localStartKey = createClosestRowBefore(lastResult.getRow());
159 } else {
160 localStartKey = this.scan.getStartRow();
161 }
162
163 if (LOG.isTraceEnabled()) {
164 LOG.trace("Advancing internal small scanner to startKey at '"
165 + Bytes.toStringBinary(localStartKey) + "'");
166 }
167
168 smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
169 getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
170 getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
171
172 if (this.scanMetrics != null && regionChanged) {
173 this.scanMetrics.countOfRegions.incrementAndGet();
174 }
175 return true;
176 }
177
178 @Override
179 public Result next() throws IOException {
180
181
182 if (cache.size() == 0 && this.closed) {
183 return null;
184 }
185 if (cache.size() == 0) {
186 loadCache();
187 }
188
189 if (cache.size() > 0) {
190 return cache.poll();
191 }
192
193
194 writeScanMetrics();
195 return null;
196 }
197
198 @Override
199 protected void loadCache() throws IOException {
200 Result[] values = null;
201 long remainingResultSize = maxScannerResultSize;
202 int countdown = this.caching;
203 boolean currentRegionDone = false;
204
205 while (remainingResultSize > 0 && countdown > 0
206 && nextScanner(countdown, values == null, currentRegionDone)) {
207
208
209
210
211
212 values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
213 this.currentRegion = smallScanCallable.getHRegionInfo();
214 long currentTime = System.currentTimeMillis();
215 if (this.scanMetrics != null) {
216 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
217 - lastNext);
218 }
219 lastNext = currentTime;
220 if (values != null && values.length > 0) {
221 for (int i = 0; i < values.length; i++) {
222 Result rs = values[i];
223 cache.add(rs);
224
225 for (Cell cell : rs.rawCells()) {
226 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
227 }
228 countdown--;
229 this.lastResult = rs;
230 }
231 }
232 if (smallScanCallable.hasMoreResultsContext()) {
233 currentRegionDone = !smallScanCallable.getServerHasMoreResults();
234 } else {
235 currentRegionDone = countdown > 0;
236 }
237 }
238 }
239
240 @Override
241 protected void initializeScannerInConstruction() throws IOException {
242
243
244 }
245
246 @Override
247 public void close() {
248 if (!scanMetricsPublished) writeScanMetrics();
249 closed = true;
250 }
251
252 @VisibleForTesting
253 protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
254 this.callableFactory = callableFactory;
255 }
256 }