1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.NavigableSet;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.locks.ReentrantLock;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.DoNotRetryIOException;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.KeyValue.KVComparator;
38 import org.apache.hadoop.hbase.KeyValueUtil;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.client.IsolationLevel;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.executor.ExecutorService;
43 import org.apache.hadoop.hbase.filter.Filter;
44 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
45 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
46 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
47 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50
51 import com.google.common.annotations.VisibleForTesting;
52
53
54
55
56
57 @InterfaceAudience.Private
58 public class StoreScanner extends NonReversedNonLazyKeyValueScanner
59 implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
60 private static final Log LOG = LogFactory.getLog(StoreScanner.class);
61
62 protected final Store store;
63 protected ScanQueryMatcher matcher;
64 protected KeyValueHeap heap;
65 protected boolean cacheBlocks;
66
67 protected long countPerRow = 0;
68 protected int storeLimit = -1;
69 protected int storeOffset = 0;
70
71
72
73 protected boolean closing = false;
74 protected final boolean get;
75 protected final boolean explicitColumnQuery;
76 protected final boolean useRowColBloom;
77
78
79
80 protected boolean parallelSeekEnabled = false;
81 protected ExecutorService executor;
82 protected final Scan scan;
83 protected final NavigableSet<byte[]> columns;
84 protected final long oldestUnexpiredTS;
85 protected final long now;
86 protected final int minVersions;
87 protected final long maxRowSize;
88 protected final long cellsPerHeartbeatCheck;
89
90
91
92
93
94 private long kvsScanned = 0;
95 private Cell prevCell = null;
96
97
98 static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
99 public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
100 "hbase.storescanner.parallel.seek.enable";
101
102
103 protected static boolean lazySeekEnabledGlobally =
104 LAZY_SEEK_ENABLED_BY_DEFAULT;
105
106
107
108
109
110
111 public static final String HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK =
112 "hbase.cells.scanned.per.heartbeat.check";
113
114
115
116
117 public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
118
119
120 protected Cell lastTop = null;
121
122
123 private boolean scanUsePread = false;
124 protected ReentrantLock lock = new ReentrantLock();
125
126 private final long readPt;
127
128
129 enum StoreScannerCompactionRace {
130 BEFORE_SEEK,
131 AFTER_SEEK,
132 COMPACT_COMPLETE
133 }
134
135
136 protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
137 final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks) {
138 this.readPt = readPt;
139 this.store = store;
140 this.cacheBlocks = cacheBlocks;
141 get = scan.isGetScan();
142 int numCol = columns == null ? 0 : columns.size();
143 explicitColumnQuery = numCol > 0;
144 this.scan = scan;
145 this.columns = columns;
146 this.now = EnvironmentEdgeManager.currentTime();
147 this.oldestUnexpiredTS = now - scanInfo.getTtl();
148 this.minVersions = scanInfo.getMinVersions();
149
150
151
152
153
154 this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
155
156 this.maxRowSize = scanInfo.getTableMaxRowSize();
157 this.scanUsePread = scan.isSmall()? true: scanInfo.isUsePread();
158 this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
159
160 if (this.store != null && this.store.getStorefilesCount() > 1) {
161 RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
162 if (rsService != null && scanInfo.isParallelSeekEnabled()) {
163 this.parallelSeekEnabled = true;
164 this.executor = rsService.getExecutorService();
165 }
166 }
167 }
168
169
170
171
172
173
174
175
176
177
178 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,
179 long readPt)
180 throws IOException {
181 this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
182 if (columns != null && scan.isRaw()) {
183 throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
184 }
185 matcher = new ScanQueryMatcher(scan, scanInfo, columns,
186 ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
187 oldestUnexpiredTS, now, store.getCoprocessorHost());
188
189 this.store.addChangedReaderObserver(this);
190
191
192 List<KeyValueScanner> scanners = getScannersNoCompaction();
193
194
195
196
197
198 seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
199 && lazySeekEnabledGlobally, parallelSeekEnabled);
200
201
202 this.storeLimit = scan.getMaxResultsPerColumnFamily();
203
204
205 this.storeOffset = scan.getRowOffsetPerColumnFamily();
206
207
208 resetKVHeap(scanners, store.getComparator());
209 }
210
211
212
213
214
215
216
217
218
219
220
221 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
222 List<? extends KeyValueScanner> scanners, ScanType scanType,
223 long smallestReadPoint, long earliestPutTs) throws IOException {
224 this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238 public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
239 List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
240 byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
241 this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
242 earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
243 }
244
245 private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
246 List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
247 long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
248 this(store, scan, scanInfo, null,
249 ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED), false);
250 if (dropDeletesFromRow == null) {
251 matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
252 earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
253 } else {
254 matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
255 oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
256 }
257
258
259 scanners = selectScannersFrom(scanners);
260
261
262 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
263
264
265 resetKVHeap(scanners, store.getComparator());
266 }
267
268 @VisibleForTesting
269 StoreScanner(final Scan scan, ScanInfo scanInfo,
270 ScanType scanType, final NavigableSet<byte[]> columns,
271 final List<KeyValueScanner> scanners) throws IOException {
272 this(scan, scanInfo, scanType, columns, scanners,
273 HConstants.LATEST_TIMESTAMP,
274
275 0);
276 }
277
278 @VisibleForTesting
279 StoreScanner(final Scan scan, ScanInfo scanInfo,
280 ScanType scanType, final NavigableSet<byte[]> columns,
281 final List<KeyValueScanner> scanners, long earliestPutTs)
282 throws IOException {
283 this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
284
285 0);
286 }
287
288 private StoreScanner(final Scan scan, ScanInfo scanInfo,
289 ScanType scanType, final NavigableSet<byte[]> columns,
290 final List<KeyValueScanner> scanners, long earliestPutTs, long readPt)
291 throws IOException {
292 this(null, scan, scanInfo, columns, readPt, scan.getCacheBlocks());
293 this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
294 Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
295
296
297 if (this.store != null) {
298 this.store.addChangedReaderObserver(this);
299 }
300
301 seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
302 resetKVHeap(scanners, scanInfo.getComparator());
303 }
304
305
306
307
308
309 protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
310 final boolean isCompaction = false;
311 boolean usePread = get || scanUsePread;
312 return selectScannersFrom(store.getScanners(cacheBlocks, get, usePread,
313 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
314 }
315
316
317
318
319
320
321
322
323
324 protected void seekScanners(List<? extends KeyValueScanner> scanners,
325 Cell seekKey, boolean isLazy, boolean isParallelSeek)
326 throws IOException {
327
328
329
330
331 if (isLazy) {
332 for (KeyValueScanner scanner : scanners) {
333 scanner.requestSeek(seekKey, false, true);
334 }
335 } else {
336 if (!isParallelSeek) {
337 long totalScannersSoughtBytes = 0;
338 for (KeyValueScanner scanner : scanners) {
339 if (totalScannersSoughtBytes >= maxRowSize) {
340 throw new RowTooBigException("Max row size allowed: " + maxRowSize
341 + ", but row is bigger than that");
342 }
343 scanner.seek(seekKey);
344 Cell c = scanner.peek();
345 if (c != null) {
346 totalScannersSoughtBytes += CellUtil.estimatedSerializedSizeOf(c);
347 }
348 }
349 } else {
350 parallelSeek(scanners, seekKey);
351 }
352 }
353 }
354
355 protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
356 KVComparator comparator) throws IOException {
357
358 heap = new KeyValueHeap(scanners, comparator);
359 }
360
361
362
363
364
365 protected List<KeyValueScanner> selectScannersFrom(
366 final List<? extends KeyValueScanner> allScanners) {
367 boolean memOnly;
368 boolean filesOnly;
369 if (scan instanceof InternalScan) {
370 InternalScan iscan = (InternalScan)scan;
371 memOnly = iscan.isCheckOnlyMemStore();
372 filesOnly = iscan.isCheckOnlyStoreFiles();
373 } else {
374 memOnly = false;
375 filesOnly = false;
376 }
377
378 List<KeyValueScanner> scanners =
379 new ArrayList<KeyValueScanner>(allScanners.size());
380
381
382
383 long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS :
384 Long.MIN_VALUE;
385
386
387 for (KeyValueScanner kvs : allScanners) {
388 boolean isFile = kvs.isFileScanner();
389 if ((!isFile && filesOnly) || (isFile && memOnly)) {
390 continue;
391 }
392
393 if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
394 scanners.add(kvs);
395 }
396 }
397 return scanners;
398 }
399
400 @Override
401 public Cell peek() {
402 lock.lock();
403 try {
404 if (this.heap == null) {
405 return this.lastTop;
406 }
407 return this.heap.peek();
408 } finally {
409 lock.unlock();
410 }
411 }
412
413 @Override
414 public KeyValue next() {
415
416 throw new RuntimeException("Never call StoreScanner.next()");
417 }
418
419 @Override
420 public void close() {
421 lock.lock();
422 try {
423 if (this.closing) return;
424 this.closing = true;
425
426 if (this.store != null)
427 this.store.deleteChangedReaderObserver(this);
428 if (this.heap != null)
429 this.heap.close();
430 this.heap = null;
431 this.lastTop = null;
432 } finally {
433 lock.unlock();
434 }
435 }
436
437 @Override
438 public boolean seek(Cell key) throws IOException {
439 lock.lock();
440 try {
441
442 checkReseek();
443 return this.heap.seek(key);
444 } finally {
445 lock.unlock();
446 }
447 }
448
449 @Override
450 public boolean next(List<Cell> outResult) throws IOException {
451 return next(outResult, NoLimitScannerContext.getInstance());
452 }
453
454
455
456
457
458
459
460 @Override
461 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
462 lock.lock();
463
464 try {
465 if (scannerContext == null) {
466 throw new IllegalArgumentException("Scanner context cannot be null");
467 }
468 if (checkReseek()) {
469 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
470 }
471
472
473
474 if (this.heap == null) {
475 close();
476 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
477 }
478
479 Cell cell = this.heap.peek();
480 if (cell == null) {
481 close();
482 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
483 }
484
485
486
487 byte[] row = cell.getRowArray();
488 int offset = cell.getRowOffset();
489 short length = cell.getRowLength();
490
491
492
493
494 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.row == null) {
495 this.countPerRow = 0;
496 matcher.setRow(row, offset, length);
497 }
498
499
500 if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
501
502
503 KeyValue.KVComparator comparator =
504 store != null ? store.getComparator() : null;
505
506 int count = 0;
507 long totalBytesRead = 0;
508
509 LOOP: do {
510
511 if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
512 scannerContext.updateTimeProgress();
513 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
514 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
515 }
516 }
517
518 if (prevCell != cell) ++kvsScanned;
519 checkScanOrder(prevCell, cell, comparator);
520 prevCell = cell;
521
522 ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
523 qcode = optimize(qcode, cell);
524 switch(qcode) {
525 case INCLUDE:
526 case INCLUDE_AND_SEEK_NEXT_ROW:
527 case INCLUDE_AND_SEEK_NEXT_COL:
528
529 Filter f = matcher.getFilter();
530 if (f != null) {
531
532 cell = f.transformCell(cell);
533 }
534
535 this.countPerRow++;
536 if (storeLimit > -1 &&
537 this.countPerRow > (storeLimit + storeOffset)) {
538
539 if (!matcher.moreRowsMayExistAfter(cell)) {
540 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
541 }
542
543
544
545 matcher.row = null;
546 seekToNextRow(cell);
547 break LOOP;
548 }
549
550
551
552 if (this.countPerRow > storeOffset) {
553 outResult.add(cell);
554
555
556 count++;
557 totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell);
558
559
560 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
561 scannerContext.incrementBatchProgress(1);
562
563 if (totalBytesRead > maxRowSize) {
564 throw new RowTooBigException("Max row size allowed: " + maxRowSize
565 + ", but the row is bigger than that.");
566 }
567 }
568
569 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
570 if (!matcher.moreRowsMayExistAfter(cell)) {
571 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
572 }
573
574
575
576 matcher.row = null;
577 seekToNextRow(cell);
578 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
579 seekAsDirection(matcher.getKeyForNextColumn(cell));
580 } else {
581 this.heap.next();
582 }
583
584 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
585 break LOOP;
586 }
587 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
588 break LOOP;
589 }
590 continue;
591
592 case DONE:
593
594
595
596 matcher.row = null;
597 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
598
599 case DONE_SCAN:
600 close();
601 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
602
603 case SEEK_NEXT_ROW:
604
605
606 if (!matcher.moreRowsMayExistAfter(cell)) {
607 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
608 }
609
610
611
612 matcher.row = null;
613 seekToNextRow(cell);
614 break;
615
616 case SEEK_NEXT_COL:
617 seekAsDirection(matcher.getKeyForNextColumn(cell));
618 break;
619
620 case SKIP:
621 this.heap.next();
622 break;
623
624 case SEEK_NEXT_USING_HINT:
625
626 Cell nextKV = matcher.getNextKeyHint(cell);
627 if (nextKV != null) {
628 seekAsDirection(nextKV);
629 } else {
630 heap.next();
631 }
632 break;
633
634 default:
635 throw new RuntimeException("UNEXPECTED");
636 }
637 } while((cell = this.heap.peek()) != null);
638
639 if (count > 0) {
640 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
641 }
642
643
644 close();
645 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
646 } finally {
647 lock.unlock();
648 }
649 }
650
651
652
653
654
655 private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) {
656 switch(qcode) {
657 case INCLUDE_AND_SEEK_NEXT_COL:
658 case SEEK_NEXT_COL:
659 {
660 Cell nextIndexedKey = getNextIndexedKey();
661 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
662 && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) {
663 return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE;
664 }
665 break;
666 }
667 case INCLUDE_AND_SEEK_NEXT_ROW:
668 case SEEK_NEXT_ROW:
669 {
670 Cell nextIndexedKey = getNextIndexedKey();
671 if (nextIndexedKey != null && nextIndexedKey != KeyValueScanner.NO_NEXT_INDEXED_KEY
672 && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) {
673 return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE;
674 }
675 break;
676 }
677 default:
678 break;
679 }
680 return qcode;
681 }
682
683
684 @Override
685 public void updateReaders() throws IOException {
686 lock.lock();
687 try {
688 if (this.closing) return;
689
690
691
692
693
694
695 if (this.heap == null) return;
696
697
698 this.lastTop = this.peek();
699
700
701
702
703 this.heap.close();
704 this.heap = null;
705
706
707 } finally {
708 lock.unlock();
709 }
710 }
711
712
713
714
715
716
717 protected boolean checkReseek() throws IOException {
718 if (this.heap == null && this.lastTop != null) {
719 resetScannerStack(this.lastTop);
720 if (this.heap.peek() == null
721 || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
722 LOG.debug("Storescanner.peek() is changed where before = "
723 + this.lastTop.toString() + ",and after = " + this.heap.peek());
724 this.lastTop = null;
725 return true;
726 }
727 this.lastTop = null;
728 }
729
730 return false;
731 }
732
733 protected void resetScannerStack(Cell lastTopKey) throws IOException {
734 if (heap != null) {
735 throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
736 }
737
738
739
740
741 List<KeyValueScanner> scanners = getScannersNoCompaction();
742
743
744 seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
745
746
747 resetKVHeap(scanners, store.getComparator());
748
749
750
751
752 Cell kv = heap.peek();
753 if (kv == null) {
754 kv = lastTopKey;
755 }
756 byte[] row = kv.getRowArray();
757 int offset = kv.getRowOffset();
758 short length = kv.getRowLength();
759 if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
760 matcher.rowOffset, matcher.rowLength)) {
761 this.countPerRow = 0;
762 matcher.reset();
763 matcher.setRow(row, offset, length);
764 }
765 }
766
767
768
769
770
771
772
773
774 protected void checkScanOrder(Cell prevKV, Cell kv,
775 KeyValue.KVComparator comparator) throws IOException {
776
777 assert prevKV == null || comparator == null
778 || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
779 + " followed by a " + "smaller key " + kv + " in cf " + store;
780 }
781
782 protected boolean seekToNextRow(Cell kv) throws IOException {
783 return reseek(KeyValueUtil.createLastOnRow(kv));
784 }
785
786
787
788
789
790
791
792 protected boolean seekAsDirection(Cell kv)
793 throws IOException {
794 return reseek(kv);
795 }
796
797 @Override
798 public boolean reseek(Cell kv) throws IOException {
799 lock.lock();
800 try {
801
802
803
804 checkReseek();
805 if (explicitColumnQuery && lazySeekEnabledGlobally) {
806 return heap.requestSeek(kv, true, useRowColBloom);
807 }
808 return heap.reseek(kv);
809 } finally {
810 lock.unlock();
811 }
812 }
813
814 @Override
815 public long getSequenceID() {
816 return 0;
817 }
818
819
820
821
822
823
824
825 private void parallelSeek(final List<? extends KeyValueScanner>
826 scanners, final Cell kv) throws IOException {
827 if (scanners.isEmpty()) return;
828 int storeFileScannerCount = scanners.size();
829 CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
830 List<ParallelSeekHandler> handlers =
831 new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
832 for (KeyValueScanner scanner : scanners) {
833 if (scanner instanceof StoreFileScanner) {
834 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
835 this.readPt, latch);
836 executor.submit(seekHandler);
837 handlers.add(seekHandler);
838 } else {
839 scanner.seek(kv);
840 latch.countDown();
841 }
842 }
843
844 try {
845 latch.await();
846 } catch (InterruptedException ie) {
847 throw (InterruptedIOException)new InterruptedIOException().initCause(ie);
848 }
849
850 for (ParallelSeekHandler handler : handlers) {
851 if (handler.getErr() != null) {
852 throw new IOException(handler.getErr());
853 }
854 }
855 }
856
857
858
859
860
861 List<KeyValueScanner> getAllScannersForTesting() {
862 List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
863 KeyValueScanner current = heap.getCurrentForTesting();
864 if (current != null)
865 allScanners.add(current);
866 for (KeyValueScanner scanner : heap.getHeap())
867 allScanners.add(scanner);
868 return allScanners;
869 }
870
871 static void enableLazySeekGlobally(boolean enable) {
872 lazySeekEnabledGlobally = enable;
873 }
874
875
876
877
878 public long getEstimatedNumberOfKvsScanned() {
879 return this.kvsScanned;
880 }
881
882 @Override
883 public Cell getNextIndexedKey() {
884 return this.heap.getNextIndexedKey();
885 }
886 }
887