1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io.hfile;
19
20 import java.io.DataInput;
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.KeyValue.KVComparator;
35 import org.apache.hadoop.hbase.NoTagsKeyValue;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.fs.HFileSystem;
38 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
39 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
40 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
41 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
42 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
43 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.IdLock;
46 import org.apache.hadoop.io.WritableUtils;
47 import org.apache.htrace.Trace;
48 import org.apache.htrace.TraceScope;
49
50 import com.google.common.annotations.VisibleForTesting;
51
52
53
54
55 @InterfaceAudience.Private
56 public class HFileReaderV2 extends AbstractHFileReader {
57
58 private static final Log LOG = LogFactory.getLog(HFileReaderV2.class);
59
60
61 public static final int MINOR_VERSION_WITH_CHECKSUM = 1;
62
63 public static final int MINOR_VERSION_NO_CHECKSUM = 0;
64
65
66 public static final int PBUF_TRAILER_MINOR_VERSION = 2;
67
68
69
70
71
72 public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
73
74 private boolean includesMemstoreTS = false;
75 protected boolean decodeMemstoreTS = false;
76
77 protected boolean shouldIncludeMemstoreTS() {
78 return includesMemstoreTS;
79 }
80
81
82 private HFileBlock.FSReader fsBlockReader;
83
84
85
86
87
88
89
90 private IdLock offsetLock = new IdLock();
91
92
93
94
95
96 private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
97
98
99 static final int MIN_MINOR_VERSION = 0;
100
101
102
103
104 static final int MAX_MINOR_VERSION = 3;
105
106
107 static final int MINOR_VERSION_WITH_FAKED_KEY = 3;
108
109 HFileContext hfileContext;
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public HFileReaderV2(final Path path, final FixedFileTrailer trailer,
124 final FSDataInputStreamWrapper fsdis, final long size, final CacheConfig cacheConf,
125 final HFileSystem hfs, final Configuration conf) throws IOException {
126 super(path, trailer, size, cacheConf, hfs, conf);
127 this.conf = conf;
128 trailer.expectMajorVersion(getMajorVersion());
129 validateMinorVersion(path, trailer.getMinorVersion());
130 this.hfileContext = createHFileContext(fsdis, fileSize, hfs, path, trailer);
131 HFileBlock.FSReaderImpl fsBlockReaderV2 =
132 new HFileBlock.FSReaderImpl(fsdis, fileSize, hfs, path, hfileContext);
133 this.fsBlockReader = fsBlockReaderV2;
134
135
136 comparator = trailer.createComparator();
137 dataBlockIndexReader = new HFileBlockIndex.BlockIndexReader(comparator,
138 trailer.getNumDataIndexLevels(), this);
139 metaBlockIndexReader = new HFileBlockIndex.BlockIndexReader(
140 KeyValue.RAW_COMPARATOR, 1);
141
142
143
144 HFileBlock.BlockIterator blockIter = fsBlockReaderV2.blockRange(
145 trailer.getLoadOnOpenDataOffset(),
146 fileSize - trailer.getTrailerSize());
147
148
149
150 dataBlockIndexReader.readMultiLevelIndexRoot(
151 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
152 trailer.getDataIndexCount());
153
154
155 metaBlockIndexReader.readRootIndex(
156 blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX),
157 trailer.getMetaIndexCount());
158
159
160 fileInfo = new FileInfo();
161 fileInfo.read(blockIter.nextBlockWithBlockType(BlockType.FILE_INFO).getByteStream());
162 byte[] creationTimeBytes = fileInfo.get(FileInfo.CREATE_TIME_TS);
163 this.hfileContext.setFileCreateTime(creationTimeBytes == null? 0:
164 Bytes.toLong(creationTimeBytes));
165 lastKey = fileInfo.get(FileInfo.LASTKEY);
166 avgKeyLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_KEY_LEN));
167 avgValueLen = Bytes.toInt(fileInfo.get(FileInfo.AVG_VALUE_LEN));
168 byte [] keyValueFormatVersion =
169 fileInfo.get(HFileWriterV2.KEY_VALUE_VERSION);
170 includesMemstoreTS = keyValueFormatVersion != null &&
171 Bytes.toInt(keyValueFormatVersion) ==
172 HFileWriterV2.KEY_VALUE_VER_WITH_MEMSTORE;
173 fsBlockReaderV2.setIncludesMemstoreTS(includesMemstoreTS);
174 if (includesMemstoreTS) {
175 decodeMemstoreTS = Bytes.toLong(fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY)) > 0;
176 }
177
178
179 dataBlockEncoder = HFileDataBlockEncoderImpl.createFromFileInfo(fileInfo);
180 fsBlockReaderV2.setDataBlockEncoder(dataBlockEncoder);
181
182
183 HFileBlock b;
184 while ((b = blockIter.nextBlock()) != null) {
185 loadOnOpenBlocks.add(b);
186 }
187
188
189 if (cacheConf.shouldPrefetchOnOpen()) {
190 PrefetchExecutor.request(path, new Runnable() {
191 public void run() {
192 long offset = 0;
193 long end = 0;
194 try {
195 end = getTrailer().getLoadOnOpenDataOffset();
196 HFileBlock prevBlock = null;
197 if (LOG.isTraceEnabled()) {
198 LOG.trace("File=" + path.toString() + ", offset=" + offset + ", end=" + end);
199 }
200 while (offset < end) {
201 if (Thread.interrupted()) {
202 break;
203 }
204 long onDiskSize = -1;
205 if (prevBlock != null) {
206 onDiskSize = prevBlock.getNextBlockOnDiskSizeWithHeader();
207 }
208 HFileBlock block = readBlock(offset, onDiskSize, true, false, false, false,
209 null, null);
210 prevBlock = block;
211 offset += block.getOnDiskSizeWithHeader();
212 }
213 } catch (IOException e) {
214
215 if (LOG.isTraceEnabled()) {
216 LOG.trace("File=" + path.toString() + ", offset=" + offset + ", end=" + end, e);
217 }
218 } catch (Exception e) {
219
220 LOG.warn("File=" + path.toString() + ", offset=" + offset + ", end=" + end, e);
221 } finally {
222 PrefetchExecutor.complete(path);
223 }
224 }
225 });
226 }
227 }
228
229 protected HFileContext createHFileContext(FSDataInputStreamWrapper fsdis, long fileSize,
230 HFileSystem hfs, Path path, FixedFileTrailer trailer) throws IOException {
231 return new HFileContextBuilder()
232 .withIncludesMvcc(this.includesMemstoreTS)
233 .withCompression(this.compressAlgo)
234 .withHBaseCheckSum(trailer.getMinorVersion() >= MINOR_VERSION_WITH_CHECKSUM)
235 .build();
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250 @Override
251 public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
252 final boolean isCompaction) {
253 if (dataBlockEncoder.useEncodedScanner()) {
254 return new EncodedScannerV2(this, cacheBlocks, pread, isCompaction,
255 hfileContext);
256 }
257
258 return new ScannerV2(this, cacheBlocks, pread, isCompaction);
259 }
260
261
262
263
264
265 private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock,
266 boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType,
267 DataBlockEncoding expectedDataBlockEncoding) throws IOException {
268
269 if (cacheConf.isBlockCacheEnabled()) {
270 BlockCache cache = cacheConf.getBlockCache();
271 HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
272 updateCacheMetrics);
273 if (cachedBlock != null) {
274 if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
275 cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
276 }
277 validateBlockType(cachedBlock, expectedBlockType);
278
279 if (expectedDataBlockEncoding == null) {
280 return cachedBlock;
281 }
282 DataBlockEncoding actualDataBlockEncoding =
283 cachedBlock.getDataBlockEncoding();
284
285
286
287 if (cachedBlock.getBlockType().isData() &&
288 !actualDataBlockEncoding.equals(expectedDataBlockEncoding)) {
289
290
291
292
293
294
295
296
297
298
299 if (!expectedDataBlockEncoding.equals(DataBlockEncoding.NONE) &&
300 !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)) {
301
302
303
304
305
306
307 LOG.info("Evicting cached block with key " + cacheKey +
308 " because of a data block encoding mismatch" +
309 "; expected: " + expectedDataBlockEncoding +
310 ", actual: " + actualDataBlockEncoding);
311 cache.evictBlock(cacheKey);
312 }
313 return null;
314 }
315 return cachedBlock;
316 }
317 }
318 return null;
319 }
320
321
322
323
324
325
326 @Override
327 public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
328 throws IOException {
329 if (trailer.getMetaIndexCount() == 0) {
330 return null;
331 }
332 if (metaBlockIndexReader == null) {
333 throw new IOException("Meta index not loaded");
334 }
335
336 byte[] mbname = Bytes.toBytes(metaBlockName);
337 int block = metaBlockIndexReader.rootBlockContainingKey(mbname,
338 0, mbname.length);
339 if (block == -1)
340 return null;
341 long blockSize = metaBlockIndexReader.getRootBlockDataSize(block);
342
343
344
345
346 synchronized (metaBlockIndexReader.getRootBlockKey(block)) {
347
348 long metaBlockOffset = metaBlockIndexReader.getRootBlockOffset(block);
349 BlockCacheKey cacheKey = new BlockCacheKey(name, metaBlockOffset,
350 this.isPrimaryReplicaReader());
351
352 cacheBlock &= cacheConf.shouldCacheDataOnRead();
353 if (cacheConf.isBlockCacheEnabled()) {
354 HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true,
355 BlockType.META, null);
356 if (cachedBlock != null) {
357 assert cachedBlock.isUnpacked() : "Packed block leak.";
358
359
360 return cachedBlock.getBufferWithoutHeader();
361 }
362
363 }
364
365 HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
366 blockSize, -1, true).unpack(hfileContext, fsBlockReader);
367
368
369 if (cacheBlock) {
370 cacheConf.getBlockCache().cacheBlock(cacheKey, metaBlock,
371 cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
372 }
373
374 return metaBlock.getBufferWithoutHeader();
375 }
376 }
377
378 @Override
379 public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
380 final boolean cacheBlock, boolean pread, final boolean isCompaction,
381 boolean updateCacheMetrics, BlockType expectedBlockType,
382 DataBlockEncoding expectedDataBlockEncoding)
383 throws IOException {
384 if (dataBlockIndexReader == null) {
385 throw new IOException("Block index not loaded");
386 }
387 long trailerOffset = trailer.getLoadOnOpenDataOffset();
388 if (dataBlockOffset < 0 || dataBlockOffset >= trailerOffset) {
389 throw new IOException("Requested block is out of range: " + dataBlockOffset +
390 ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset() +
391 ", trailer.getLoadOnOpenDataOffset: " + trailerOffset);
392 }
393
394
395
396
397
398 BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset,this.isPrimaryReplicaReader());
399 boolean useLock = false;
400 IdLock.Entry lockEntry = null;
401 TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
402 try {
403 while (true) {
404
405 if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
406 if (useLock) {
407 lockEntry = offsetLock.getLockEntry(dataBlockOffset);
408 }
409
410
411 HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction,
412 updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
413 if (cachedBlock != null) {
414 if (Trace.isTracing()) {
415 traceScope.getSpan().addTimelineAnnotation("blockCacheHit");
416 }
417 assert cachedBlock.isUnpacked() : "Packed block leak.";
418 if (cachedBlock.getBlockType().isData()) {
419 if (updateCacheMetrics) {
420 HFile.dataBlockReadCnt.incrementAndGet();
421 }
422
423
424 if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
425 throw new IOException("Cached block under key " + cacheKey + " "
426 + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
427 + dataBlockEncoder.getDataBlockEncoding() + ")");
428 }
429 }
430
431 return cachedBlock;
432 }
433 if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
434
435 useLock = true;
436 continue;
437 }
438
439 }
440
441 if (Trace.isTracing()) {
442 traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
443 }
444
445 HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
446 pread);
447 validateBlockType(hfileBlock, expectedBlockType);
448 HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
449 BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
450
451
452 if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
453 cacheConf.getBlockCache().cacheBlock(cacheKey,
454 cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
455 cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1());
456 }
457
458 if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
459 HFile.dataBlockReadCnt.incrementAndGet();
460 }
461
462 return unpacked;
463 }
464 } finally {
465 traceScope.close();
466 if (lockEntry != null) {
467 offsetLock.releaseLockEntry(lockEntry);
468 }
469 }
470 }
471
472 @Override
473 public boolean hasMVCCInfo() {
474 return includesMemstoreTS && decodeMemstoreTS;
475 }
476
477
478
479
480
481
482
483
484
485
486 private void validateBlockType(HFileBlock block,
487 BlockType expectedBlockType) throws IOException {
488 if (expectedBlockType == null) {
489 return;
490 }
491 BlockType actualBlockType = block.getBlockType();
492 if (expectedBlockType.isData() && actualBlockType.isData()) {
493
494
495 return;
496 }
497 if (actualBlockType != expectedBlockType) {
498 throw new IOException("Expected block type " + expectedBlockType + ", " +
499 "but got " + actualBlockType + ": " + block);
500 }
501 }
502
503
504
505
506
507
508 @Override
509 public byte[] getLastKey() {
510 return dataBlockIndexReader.isEmpty() ? null : lastKey;
511 }
512
513
514
515
516
517
518 @Override
519 public byte[] midkey() throws IOException {
520 return dataBlockIndexReader.midkey();
521 }
522
523 @Override
524 public void close() throws IOException {
525 close(cacheConf.shouldEvictOnClose());
526 }
527
528 public void close(boolean evictOnClose) throws IOException {
529 PrefetchExecutor.cancel(path);
530 if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
531 int numEvicted = cacheConf.getBlockCache().evictBlocksByHfileName(name);
532 if (LOG.isTraceEnabled()) {
533 LOG.trace("On close, file=" + name + " evicted=" + numEvicted
534 + " block(s)");
535 }
536 }
537 fsBlockReader.closeStreams();
538 }
539
540 public DataBlockEncoding getEffectiveEncodingInCache(boolean isCompaction) {
541 return dataBlockEncoder.getEffectiveEncodingInCache(isCompaction);
542 }
543
544
545 @Override
546 HFileBlock.FSReader getUncachedBlockReader() {
547 return fsBlockReader;
548 }
549
550
551 protected abstract static class AbstractScannerV2
552 extends AbstractHFileReader.Scanner {
553 protected HFileBlock block;
554
555 @Override
556 public Cell getNextIndexedKey() {
557 return nextIndexedKey;
558 }
559
560
561
562
563
564
565
566 protected Cell nextIndexedKey;
567
568 public AbstractScannerV2(HFileReaderV2 r, boolean cacheBlocks,
569 final boolean pread, final boolean isCompaction) {
570 super(r, cacheBlocks, pread, isCompaction);
571 }
572
573 protected abstract ByteBuffer getFirstKeyInBlock(HFileBlock curBlock);
574
575 protected abstract int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
576 boolean rewind, Cell key, boolean seekBefore) throws IOException;
577
578 @Override
579 public int seekTo(byte[] key, int offset, int length) throws IOException {
580
581
582 return seekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
583 }
584
585 @Override
586 public int reseekTo(byte[] key, int offset, int length) throws IOException {
587 return reseekTo(new KeyValue.KeyOnlyKeyValue(key, offset, length));
588 }
589
590 @Override
591 public int seekTo(Cell key) throws IOException {
592 return seekTo(key, true);
593 }
594
595 @Override
596 public int reseekTo(Cell key) throws IOException {
597 int compared;
598 if (isSeeked()) {
599 compared = compareKey(reader.getComparator(), key);
600 if (compared < 1) {
601
602
603 return compared;
604 } else {
605
606 if (this.nextIndexedKey != null &&
607 (this.nextIndexedKey == KeyValueScanner.NO_NEXT_INDEXED_KEY || reader
608 .getComparator()
609 .compareOnlyKeyPortion(key, nextIndexedKey) < 0)) {
610
611
612
613
614
615
616 return loadBlockAndSeekToKey(this.block, nextIndexedKey, false, key, false);
617 }
618 }
619 }
620
621
622 return seekTo(key, false);
623 }
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640 public int seekTo(Cell key, boolean rewind) throws IOException {
641 HFileBlockIndex.BlockIndexReader indexReader = reader.getDataBlockIndexReader();
642 BlockWithScanInfo blockWithScanInfo = indexReader.loadDataBlockWithScanInfo(key, block,
643 cacheBlocks, pread, isCompaction, getEffectiveDataBlockEncoding());
644 if (blockWithScanInfo == null || blockWithScanInfo.getHFileBlock() == null) {
645
646 return -1;
647 }
648 return loadBlockAndSeekToKey(blockWithScanInfo.getHFileBlock(),
649 blockWithScanInfo.getNextIndexedKey(), rewind, key, false);
650 }
651
652 @Override
653 public boolean seekBefore(byte[] key, int offset, int length) throws IOException {
654 return seekBefore(new KeyValue.KeyOnlyKeyValue(key, offset, length));
655 }
656
657 @Override
658 public boolean seekBefore(Cell key) throws IOException {
659 HFileBlock seekToBlock = reader.getDataBlockIndexReader().seekToDataBlock(key, block,
660 cacheBlocks, pread, isCompaction,
661 ((HFileReaderV2) reader).getEffectiveEncodingInCache(isCompaction));
662 if (seekToBlock == null) {
663 return false;
664 }
665 ByteBuffer firstKey = getFirstKeyInBlock(seekToBlock);
666
667 if (reader.getComparator()
668 .compareOnlyKeyPortion(
669 new KeyValue.KeyOnlyKeyValue(firstKey.array(), firstKey.arrayOffset(),
670 firstKey.limit()), key) >= 0) {
671 long previousBlockOffset = seekToBlock.getPrevBlockOffset();
672
673 if (previousBlockOffset == -1) {
674
675 return false;
676 }
677
678
679
680
681
682
683 int prevBlockSize = -1;
684 seekToBlock = reader.readBlock(previousBlockOffset,
685 prevBlockSize, cacheBlocks,
686 pread, isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
687
688
689 }
690 Cell firstKeyInCurrentBlock = new KeyValue.KeyOnlyKeyValue(Bytes.getBytes(firstKey));
691 loadBlockAndSeekToKey(seekToBlock, firstKeyInCurrentBlock, true, key, true);
692 return true;
693 }
694
695
696
697
698
699
700
701
702 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
703 justification="Yeah, unnecessary null check; could do w/ clean up")
704 protected HFileBlock readNextDataBlock() throws IOException {
705 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
706 if (block == null)
707 return null;
708
709 HFileBlock curBlock = block;
710
711 do {
712 if (curBlock.getOffset() >= lastDataBlockOffset) {
713 return null;
714 }
715
716 if (curBlock.getOffset() < 0) {
717 throw new IOException("Invalid block file offset: " + block);
718 }
719
720
721
722 curBlock = reader.readBlock(curBlock.getOffset()
723 + curBlock.getOnDiskSizeWithHeader(),
724 curBlock.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
725 isCompaction, true, null, getEffectiveDataBlockEncoding());
726 } while (!curBlock.getBlockType().isData());
727
728 return curBlock;
729 }
730
731 public DataBlockEncoding getEffectiveDataBlockEncoding() {
732 return ((HFileReaderV2)reader).getEffectiveEncodingInCache(isCompaction);
733 }
734
735
736
737
738
739
740
741
742 public abstract int compareKey(KVComparator comparator, byte[] key, int offset,
743 int length);
744
745 public abstract int compareKey(KVComparator comparator, Cell kv);
746 }
747
748
749
750
751 protected static class ScannerV2 extends AbstractScannerV2 {
752 private HFileReaderV2 reader;
753
754 public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
755 final boolean pread, final boolean isCompaction) {
756 super(r, cacheBlocks, pread, isCompaction);
757 this.reader = r;
758 }
759
760 @Override
761 public Cell getKeyValue() {
762 if (!isSeeked())
763 return null;
764
765 return formNoTagsKeyValue();
766 }
767
768 protected Cell formNoTagsKeyValue() {
769 NoTagsKeyValue ret = new NoTagsKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
770 + blockBuffer.position(), getCellBufSize());
771 if (this.reader.shouldIncludeMemstoreTS()) {
772 ret.setSequenceId(currMemstoreTS);
773 }
774 return ret;
775 }
776
777 protected int getCellBufSize() {
778 return KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen;
779 }
780
781 @Override
782 public ByteBuffer getKey() {
783 assertSeeked();
784 return ByteBuffer.wrap(
785 blockBuffer.array(),
786 blockBuffer.arrayOffset() + blockBuffer.position()
787 + KEY_VALUE_LEN_SIZE, currKeyLen).slice();
788 }
789
790 @Override
791 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
792 return comparator.compareFlatKey(key, offset, length, blockBuffer.array(),
793 blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen);
794 }
795
796 @Override
797 public ByteBuffer getValue() {
798 assertSeeked();
799 return ByteBuffer.wrap(
800 blockBuffer.array(),
801 blockBuffer.arrayOffset() + blockBuffer.position()
802 + KEY_VALUE_LEN_SIZE + currKeyLen, currValueLen).slice();
803 }
804
805 protected void setNonSeekedState() {
806 block = null;
807 blockBuffer = null;
808 currKeyLen = 0;
809 currValueLen = 0;
810 currMemstoreTS = 0;
811 currMemstoreTSLen = 0;
812 }
813
814
815
816
817 private void positionThisBlockBuffer() {
818 try {
819 blockBuffer.position(getNextCellStartPosition());
820 } catch (IllegalArgumentException e) {
821 LOG.error("Current pos = " + blockBuffer.position()
822 + "; currKeyLen = " + currKeyLen + "; currValLen = "
823 + currValueLen + "; block limit = " + blockBuffer.limit()
824 + "; HFile name = " + reader.getName()
825 + "; currBlock currBlockOffset = " + block.getOffset());
826 throw e;
827 }
828 }
829
830
831
832
833
834
835 private boolean positionForNextBlock() throws IOException {
836
837 long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
838 if (block.getOffset() >= lastDataBlockOffset) {
839 setNonSeekedState();
840 return false;
841 }
842 return isNextBlock();
843 }
844
845 private boolean isNextBlock() throws IOException {
846
847 HFileBlock nextBlock = readNextDataBlock();
848 if (nextBlock == null) {
849 setNonSeekedState();
850 return false;
851 }
852 updateCurrBlock(nextBlock);
853 return true;
854 }
855
856 private final boolean _next() throws IOException {
857
858 if (blockBuffer.remaining() <= 0) {
859 return positionForNextBlock();
860 }
861
862 readKeyValueLen();
863 return true;
864 }
865
866
867
868
869
870
871
872
873 @Override
874 public boolean next() throws IOException {
875
876
877 assertSeeked();
878 positionThisBlockBuffer();
879 return _next();
880 }
881
882 protected int getNextCellStartPosition() {
883 return blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen + currValueLen
884 + currMemstoreTSLen;
885 }
886
887
888
889
890
891
892
893
894 @Override
895 public boolean seekTo() throws IOException {
896 if (reader == null) {
897 return false;
898 }
899
900 if (reader.getTrailer().getEntryCount() == 0) {
901
902 return false;
903 }
904
905 long firstDataBlockOffset =
906 reader.getTrailer().getFirstDataBlockOffset();
907 if (block != null && block.getOffset() == firstDataBlockOffset) {
908 blockBuffer.rewind();
909 readKeyValueLen();
910 return true;
911 }
912
913 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
914 isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
915 if (block.getOffset() < 0) {
916 throw new IOException("Invalid block offset: " + block.getOffset());
917 }
918 updateCurrBlock(block);
919 return true;
920 }
921
922 @Override
923 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
924 boolean rewind, Cell key, boolean seekBefore) throws IOException {
925 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
926 updateCurrBlock(seekToBlock);
927 } else if (rewind) {
928 blockBuffer.rewind();
929 }
930
931
932 this.nextIndexedKey = nextIndexedKey;
933 return blockSeek(key, seekBefore);
934 }
935
936
937
938
939
940
941
942 protected void updateCurrBlock(HFileBlock newBlock) {
943 block = newBlock;
944
945
946 if (block.getBlockType() != BlockType.DATA) {
947 throw new IllegalStateException("ScannerV2 works only on data " +
948 "blocks, got " + block.getBlockType() + "; " +
949 "fileName=" + reader.name + ", " +
950 "dataBlockEncoder=" + reader.dataBlockEncoder + ", " +
951 "isCompaction=" + isCompaction);
952 }
953
954 blockBuffer = block.getBufferWithoutHeader();
955 readKeyValueLen();
956 blockFetches.incrementAndGet();
957
958
959 this.nextIndexedKey = null;
960 }
961
962
963
964
965
966 protected final boolean checkLen(final int v) {
967 return v < 0 || v > this.blockBuffer.limit();
968 }
969
970
971
972
973 protected final void checkKeyValueLen() {
974 if (checkLen(this.currKeyLen) || checkLen(this.currValueLen)) {
975 throw new IllegalStateException("Invalid currKeyLen " + this.currKeyLen +
976 " or currValueLen " + this.currValueLen + ". Block offset: " + block.getOffset() +
977 ", block length: " + this.blockBuffer.limit() + ", position: " +
978 this.blockBuffer.position() + " (without header).");
979 }
980 }
981
982 protected void readKeyValueLen() {
983
984
985
986
987
988 int p = blockBuffer.position() + blockBuffer.arrayOffset();
989
990
991 long ll = Bytes.toLong(blockBuffer.array(), p);
992
993 this.currKeyLen = (int)(ll >> Integer.SIZE);
994 this.currValueLen = (int)(Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
995 checkKeyValueLen();
996
997 p += (Bytes.SIZEOF_LONG + currKeyLen + currValueLen);
998 readMvccVersion(p);
999 }
1000
1001
1002
1003
1004
1005 protected void readMvccVersion(final int position) {
1006
1007 if (!this.reader.shouldIncludeMemstoreTS()) return;
1008 if (!this.reader.decodeMemstoreTS) {
1009 currMemstoreTS = 0;
1010 currMemstoreTSLen = 1;
1011 return;
1012 }
1013 _readMvccVersion(position);
1014 }
1015
1016
1017
1018
1019
1020 private void _readMvccVersion(final int position) {
1021
1022
1023
1024 byte firstByte = blockBuffer.array()[position];
1025 int len = WritableUtils.decodeVIntSize(firstByte);
1026 if (len == 1) {
1027 this.currMemstoreTS = firstByte;
1028 } else {
1029 long i = 0;
1030 for (int idx = 0; idx < len - 1; idx++) {
1031 byte b = blockBuffer.array()[position + 1 + idx];
1032 i = i << 8;
1033 i = i | (b & 0xFF);
1034 }
1035 currMemstoreTS = (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
1036 }
1037 this.currMemstoreTSLen = len;
1038 }
1039
1040 protected void readMvccVersion() {
1041
1042 readMvccVersion(blockBuffer.arrayOffset() + blockBuffer.position());
1043 }
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062 protected int blockSeek(Cell key, boolean seekBefore) {
1063 int klen, vlen;
1064 long memstoreTS = 0;
1065 int memstoreTSLen = 0;
1066 int lastKeyValueSize = -1;
1067 KeyValue.KeyOnlyKeyValue keyOnlykv = new KeyValue.KeyOnlyKeyValue();
1068 do {
1069 blockBuffer.mark();
1070 klen = blockBuffer.getInt();
1071 vlen = blockBuffer.getInt();
1072 blockBuffer.reset();
1073 if (this.reader.shouldIncludeMemstoreTS()) {
1074 if (this.reader.decodeMemstoreTS) {
1075 int memstoreTSOffset = blockBuffer.arrayOffset() + blockBuffer.position()
1076 + KEY_VALUE_LEN_SIZE + klen + vlen;
1077 memstoreTS = Bytes.readAsVLong(blockBuffer.array(), memstoreTSOffset);
1078 memstoreTSLen = WritableUtils.getVIntSize(memstoreTS);
1079 } else {
1080 memstoreTS = 0;
1081 memstoreTSLen = 1;
1082 }
1083 }
1084
1085 int keyOffset = blockBuffer.arrayOffset() + blockBuffer.position() + KEY_VALUE_LEN_SIZE;
1086 keyOnlykv.setKey(blockBuffer.array(), keyOffset, klen);
1087 int comp = reader.getComparator().compareOnlyKeyPortion(key, keyOnlykv);
1088
1089 if (comp == 0) {
1090 if (seekBefore) {
1091 if (lastKeyValueSize < 0) {
1092 throw new IllegalStateException("blockSeek with seekBefore "
1093 + "at the first key of the block: key="
1094 + CellUtil.getCellKeyAsString(key)
1095 + ", blockOffset=" + block.getOffset() + ", onDiskSize="
1096 + block.getOnDiskSizeWithHeader());
1097 }
1098 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1099 readKeyValueLen();
1100 return 1;
1101 }
1102 currKeyLen = klen;
1103 currValueLen = vlen;
1104 if (this.reader.shouldIncludeMemstoreTS()) {
1105 currMemstoreTS = memstoreTS;
1106 currMemstoreTSLen = memstoreTSLen;
1107 }
1108 return 0;
1109 } else if (comp < 0) {
1110 if (lastKeyValueSize > 0)
1111 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1112 readKeyValueLen();
1113 if (lastKeyValueSize == -1 && blockBuffer.position() == 0
1114 && this.reader.trailer.getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) {
1115 return HConstants.INDEX_KEY_MAGIC;
1116 }
1117 return 1;
1118 }
1119
1120
1121 lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE;
1122 blockBuffer.position(blockBuffer.position() + lastKeyValueSize);
1123 } while (blockBuffer.remaining() > 0);
1124
1125
1126
1127
1128 blockBuffer.position(blockBuffer.position() - lastKeyValueSize);
1129 readKeyValueLen();
1130 return 1;
1131 }
1132
1133 @Override
1134 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1135 ByteBuffer buffer = curBlock.getBufferWithoutHeader();
1136
1137 buffer.rewind();
1138 int klen = buffer.getInt();
1139 buffer.getInt();
1140 ByteBuffer keyBuff = buffer.slice();
1141 keyBuff.limit(klen);
1142 keyBuff.rewind();
1143 return keyBuff;
1144 }
1145
1146 @Override
1147 public String getKeyString() {
1148 return Bytes.toStringBinary(blockBuffer.array(),
1149 blockBuffer.arrayOffset() + blockBuffer.position()
1150 + KEY_VALUE_LEN_SIZE, currKeyLen);
1151 }
1152
1153 @Override
1154 public String getValueString() {
1155 return Bytes.toString(blockBuffer.array(), blockBuffer.arrayOffset()
1156 + blockBuffer.position() + KEY_VALUE_LEN_SIZE + currKeyLen,
1157 currValueLen);
1158 }
1159
1160 @Override
1161 public int compareKey(KVComparator comparator, Cell key) {
1162 return comparator.compareOnlyKeyPortion(
1163 key,
1164 new KeyValue.KeyOnlyKeyValue(blockBuffer.array(), blockBuffer.arrayOffset()
1165 + blockBuffer.position() + KEY_VALUE_LEN_SIZE, currKeyLen));
1166 }
1167 }
1168
1169
1170
1171
1172 protected static class EncodedScannerV2 extends AbstractScannerV2 {
1173 private final HFileBlockDecodingContext decodingCtx;
1174 private final DataBlockEncoder.EncodedSeeker seeker;
1175 private final DataBlockEncoder dataBlockEncoder;
1176 protected final HFileContext meta;
1177
1178 public EncodedScannerV2(HFileReaderV2 reader, boolean cacheBlocks,
1179 boolean pread, boolean isCompaction, HFileContext meta) {
1180 super(reader, cacheBlocks, pread, isCompaction);
1181 DataBlockEncoding encoding = reader.dataBlockEncoder.getDataBlockEncoding();
1182 dataBlockEncoder = encoding.getEncoder();
1183 decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(meta);
1184 seeker = dataBlockEncoder.createSeeker(
1185 reader.getComparator(), decodingCtx);
1186 this.meta = meta;
1187 }
1188
1189 @Override
1190 public boolean isSeeked(){
1191 return this.block != null;
1192 }
1193
1194
1195
1196
1197
1198
1199
1200
1201 private void updateCurrentBlock(HFileBlock newBlock) throws CorruptHFileException {
1202 block = newBlock;
1203
1204
1205 if (block.getBlockType() != BlockType.ENCODED_DATA) {
1206 throw new IllegalStateException(
1207 "EncodedScanner works only on encoded data blocks");
1208 }
1209 short dataBlockEncoderId = block.getDataBlockEncodingId();
1210 if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
1211 String encoderCls = dataBlockEncoder.getClass().getName();
1212 throw new CorruptHFileException("Encoder " + encoderCls
1213 + " doesn't support data block encoding "
1214 + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
1215 }
1216
1217 seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
1218 blockFetches.incrementAndGet();
1219
1220
1221 this.nextIndexedKey = null;
1222 }
1223
1224 private ByteBuffer getEncodedBuffer(HFileBlock newBlock) {
1225 ByteBuffer origBlock = newBlock.getBufferReadOnly();
1226 ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(),
1227 origBlock.arrayOffset() + newBlock.headerSize() +
1228 DataBlockEncoding.ID_SIZE,
1229 newBlock.getUncompressedSizeWithoutHeader() -
1230 DataBlockEncoding.ID_SIZE).slice();
1231 return encodedBlock;
1232 }
1233
1234 @Override
1235 public boolean seekTo() throws IOException {
1236 if (reader == null) {
1237 return false;
1238 }
1239
1240 if (reader.getTrailer().getEntryCount() == 0) {
1241
1242 return false;
1243 }
1244
1245 long firstDataBlockOffset =
1246 reader.getTrailer().getFirstDataBlockOffset();
1247 if (block != null && block.getOffset() == firstDataBlockOffset) {
1248 seeker.rewind();
1249 return true;
1250 }
1251
1252 block = reader.readBlock(firstDataBlockOffset, -1, cacheBlocks, pread,
1253 isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
1254 if (block.getOffset() < 0) {
1255 throw new IOException("Invalid block offset: " + block.getOffset());
1256 }
1257 updateCurrentBlock(block);
1258 return true;
1259 }
1260
1261 @Override
1262 public boolean next() throws IOException {
1263 boolean isValid = seeker.next();
1264 if (!isValid) {
1265 block = readNextDataBlock();
1266 isValid = block != null;
1267 if (isValid) {
1268 updateCurrentBlock(block);
1269 }
1270 }
1271 return isValid;
1272 }
1273
1274 @Override
1275 public ByteBuffer getKey() {
1276 assertValidSeek();
1277 return seeker.getKeyDeepCopy();
1278 }
1279
1280 @Override
1281 public int compareKey(KVComparator comparator, byte[] key, int offset, int length) {
1282 return seeker.compareKey(comparator, key, offset, length);
1283 }
1284
1285 @Override
1286 public ByteBuffer getValue() {
1287 assertValidSeek();
1288 return seeker.getValueShallowCopy();
1289 }
1290
1291 @Override
1292 public Cell getKeyValue() {
1293 if (block == null) {
1294 return null;
1295 }
1296 return seeker.getKeyValue();
1297 }
1298
1299 @Override
1300 public String getKeyString() {
1301 ByteBuffer keyBuffer = getKey();
1302 return Bytes.toStringBinary(keyBuffer.array(),
1303 keyBuffer.arrayOffset(), keyBuffer.limit());
1304 }
1305
1306 @Override
1307 public String getValueString() {
1308 ByteBuffer valueBuffer = getValue();
1309 return Bytes.toStringBinary(valueBuffer.array(),
1310 valueBuffer.arrayOffset(), valueBuffer.limit());
1311 }
1312
1313 private void assertValidSeek() {
1314 if (block == null) {
1315 throw new NotSeekedException();
1316 }
1317 }
1318
1319 @Override
1320 protected ByteBuffer getFirstKeyInBlock(HFileBlock curBlock) {
1321 return dataBlockEncoder.getFirstKeyInBlock(getEncodedBuffer(curBlock));
1322 }
1323
1324 @Override
1325 protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
1326 boolean rewind, Cell key, boolean seekBefore) throws IOException {
1327 if (block == null || block.getOffset() != seekToBlock.getOffset()) {
1328 updateCurrentBlock(seekToBlock);
1329 } else if (rewind) {
1330 seeker.rewind();
1331 }
1332 this.nextIndexedKey = nextIndexedKey;
1333 return seeker.seekToKeyInBlock(key, seekBefore);
1334 }
1335
1336 @Override
1337 public int compareKey(KVComparator comparator, Cell key) {
1338 return seeker.compareKey(comparator, key);
1339 }
1340 }
1341
1342
1343
1344
1345
1346 @Override
1347 public DataInput getGeneralBloomFilterMetadata() throws IOException {
1348 return this.getBloomFilterMetadata(BlockType.GENERAL_BLOOM_META);
1349 }
1350
1351 @Override
1352 public DataInput getDeleteBloomFilterMetadata() throws IOException {
1353 return this.getBloomFilterMetadata(BlockType.DELETE_FAMILY_BLOOM_META);
1354 }
1355
1356 private DataInput getBloomFilterMetadata(BlockType blockType)
1357 throws IOException {
1358 if (blockType != BlockType.GENERAL_BLOOM_META &&
1359 blockType != BlockType.DELETE_FAMILY_BLOOM_META) {
1360 throw new RuntimeException("Block Type: " + blockType.toString() +
1361 " is not supported") ;
1362 }
1363
1364 for (HFileBlock b : loadOnOpenBlocks)
1365 if (b.getBlockType() == blockType)
1366 return b.getByteStream();
1367 return null;
1368 }
1369
1370 @Override
1371 public boolean isFileInfoLoaded() {
1372 return true;
1373 }
1374
1375
1376
1377
1378
1379 private void validateMinorVersion(Path path, int minorVersion) {
1380 if (minorVersion < MIN_MINOR_VERSION ||
1381 minorVersion > MAX_MINOR_VERSION) {
1382 String msg = "Minor version for path " + path +
1383 " is expected to be between " +
1384 MIN_MINOR_VERSION + " and " + MAX_MINOR_VERSION +
1385 " but is found to be " + minorVersion;
1386 LOG.error(msg);
1387 throw new RuntimeException(msg);
1388 }
1389 }
1390
1391 @Override
1392 public int getMajorVersion() {
1393 return 2;
1394 }
1395
1396 @Override
1397 public HFileContext getFileContext() {
1398 return hfileContext;
1399 }
1400
1401
1402
1403
1404
1405 @VisibleForTesting
1406 boolean prefetchComplete() {
1407 return PrefetchExecutor.isCompleted(path);
1408 }
1409 }