View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23  import static org.junit.Assert.*;
24  
25  import java.io.ByteArrayOutputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.io.OutputStream;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Random;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Executor;
40  import java.util.concurrent.ExecutorCompletionService;
41  import java.util.concurrent.Executors;
42  import java.util.concurrent.Future;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.fs.FSDataInputStream;
47  import org.apache.hadoop.fs.FSDataOutputStream;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.HBaseTestingUtility;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.testclassification.MediumTests;
54  import org.apache.hadoop.hbase.Tag;
55  import org.apache.hadoop.hbase.fs.HFileSystem;
56  import org.apache.hadoop.hbase.io.compress.Compression;
57  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
58  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.util.ChecksumType;
61  import org.apache.hadoop.hbase.util.ClassSize;
62  import org.apache.hadoop.io.WritableUtils;
63  import org.apache.hadoop.io.compress.Compressor;
64  import org.junit.Before;
65  import org.junit.Test;
66  import org.junit.experimental.categories.Category;
67  import org.junit.runner.RunWith;
68  import org.junit.runners.Parameterized;
69  import org.junit.runners.Parameterized.Parameters;
70  import org.mockito.Mockito;
71  
72  @Category(MediumTests.class)
73  @RunWith(Parameterized.class)
74  public class TestHFileBlock {
75    // change this value to activate more logs
76    private static final boolean detailedLogging = false;
77    private static final boolean[] BOOLEAN_VALUES = new boolean[] { false, true };
78  
79    private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
80  
81    static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
82  
83    private static final int NUM_TEST_BLOCKS = 1000;
84    private static final int NUM_READER_THREADS = 26;
85  
86    // Used to generate KeyValues
87    private static int NUM_KEYVALUES = 50;
88    private static int FIELD_LENGTH = 10;
89    private static float CHANCE_TO_REPEAT = 0.6f;
90  
91    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
92    private FileSystem fs;
93  
94    private final boolean includesMemstoreTS;
95    private final boolean includesTag;
96    public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
97      this.includesMemstoreTS = includesMemstoreTS;
98      this.includesTag = includesTag;
99    }
100 
101   @Parameters
102   public static Collection<Object[]> parameters() {
103     return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
104   }
105 
106   @Before
107   public void setUp() throws IOException {
108     fs = HFileSystem.get(TEST_UTIL.getConfiguration());
109   }
110 
111   static void writeTestBlockContents(DataOutputStream dos) throws IOException {
112     // This compresses really well.
113     for (int i = 0; i < 1000; ++i)
114       dos.writeInt(i / 100);
115   }
116 
117   static int writeTestKeyValues(HFileBlock.Writer hbw, int seed, boolean includesMemstoreTS,
118       boolean useTag) throws IOException {
119     List<KeyValue> keyValues = new ArrayList<KeyValue>();
120     Random randomizer = new Random(42l + seed); // just any fixed number
121 
122     // generate keyValues
123     for (int i = 0; i < NUM_KEYVALUES; ++i) {
124       byte[] row;
125       long timestamp;
126       byte[] family;
127       byte[] qualifier;
128       byte[] value;
129 
130       // generate it or repeat, it should compress well
131       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
132         row = keyValues.get(randomizer.nextInt(keyValues.size())).getRow();
133       } else {
134         row = new byte[FIELD_LENGTH];
135         randomizer.nextBytes(row);
136       }
137       if (0 == i) {
138         family = new byte[FIELD_LENGTH];
139         randomizer.nextBytes(family);
140       } else {
141         family = keyValues.get(0).getFamily();
142       }
143       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
144         qualifier = keyValues.get(
145             randomizer.nextInt(keyValues.size())).getQualifier();
146       } else {
147         qualifier = new byte[FIELD_LENGTH];
148         randomizer.nextBytes(qualifier);
149       }
150       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
151         value = keyValues.get(randomizer.nextInt(keyValues.size())).getValue();
152       } else {
153         value = new byte[FIELD_LENGTH];
154         randomizer.nextBytes(value);
155       }
156       if (0 < i && randomizer.nextFloat() < CHANCE_TO_REPEAT) {
157         timestamp = keyValues.get(
158             randomizer.nextInt(keyValues.size())).getTimestamp();
159       } else {
160         timestamp = randomizer.nextLong();
161       }
162       if (!useTag) {
163         keyValues.add(new KeyValue(row, family, qualifier, timestamp, value));
164       } else {
165         keyValues.add(new KeyValue(row, family, qualifier, timestamp, value, new Tag[] { new Tag(
166             (byte) 1, Bytes.toBytes("myTagVal")) }));
167       }
168     }
169 
170     // sort it and write to stream
171     int totalSize = 0;
172     Collections.sort(keyValues, KeyValue.COMPARATOR);
173 
174     for (KeyValue kv : keyValues) {
175       totalSize += kv.getLength();
176       if (includesMemstoreTS) {
177         long memstoreTS = randomizer.nextLong();
178         kv.setSequenceId(memstoreTS);
179         totalSize += WritableUtils.getVIntSize(memstoreTS);
180       }
181       hbw.write(kv);
182     }
183     return totalSize;
184   }
185 
186   public byte[] createTestV1Block(Compression.Algorithm algo)
187       throws IOException {
188     Compressor compressor = algo.getCompressor();
189     ByteArrayOutputStream baos = new ByteArrayOutputStream();
190     OutputStream os = algo.createCompressionStream(baos, compressor, 0);
191     DataOutputStream dos = new DataOutputStream(os);
192     BlockType.META.write(dos); // Let's make this a meta block.
193     writeTestBlockContents(dos);
194     dos.flush();
195     algo.returnCompressor(compressor);
196     return baos.toByteArray();
197   }
198 
199   static HFileBlock.Writer createTestV2Block(Compression.Algorithm algo,
200       boolean includesMemstoreTS, boolean includesTag) throws IOException {
201     final BlockType blockType = BlockType.DATA;
202     HFileContext meta = new HFileContextBuilder()
203                         .withCompression(algo)
204                         .withIncludesMvcc(includesMemstoreTS)
205                         .withIncludesTags(includesTag)
206                         .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
207                         .build();
208     HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
209     DataOutputStream dos = hbw.startWriting(blockType);
210     writeTestBlockContents(dos);
211     dos.flush();
212     hbw.ensureBlockReady();
213     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
214     hbw.release();
215     return hbw;
216   }
217 
218   public String createTestBlockStr(Compression.Algorithm algo,
219       int correctLength, boolean useTag) throws IOException {
220     HFileBlock.Writer hbw = createTestV2Block(algo, includesMemstoreTS, useTag);
221     byte[] testV2Block = hbw.getHeaderAndDataForTest();
222     int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE + 9;
223     if (testV2Block.length == correctLength) {
224       // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
225       // variations across operating systems.
226       // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
227       // We only make this change when the compressed block length matches.
228       // Otherwise, there are obviously other inconsistencies.
229       testV2Block[osOffset] = 3;
230     }
231     return Bytes.toStringBinary(testV2Block);
232   }
233 
234   @Test
235   public void testNoCompression() throws IOException {
236     CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
237     Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false);
238 
239     HFileBlock block =
240       createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);
241     assertEquals(4000, block.getUncompressedSizeWithoutHeader());
242     assertEquals(4004, block.getOnDiskSizeWithoutHeader());
243     assertTrue(block.isUnpacked());
244   }
245 
246   @Test
247   public void testGzipCompression() throws IOException {
248     final String correctTestBlockStr =
249         "DATABLK*\\x00\\x00\\x00>\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
250             + "\\xFF\\xFF\\xFF\\xFF"
251             + "\\x0" + ChecksumType.getDefaultChecksumType().getCode()
252             + "\\x00\\x00@\\x00\\x00\\x00\\x00["
253             // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
254             + "\\x1F\\x8B"  // gzip magic signature
255             + "\\x08"  // Compression method: 8 = "deflate"
256             + "\\x00"  // Flags
257             + "\\x00\\x00\\x00\\x00"  // mtime
258             + "\\x00"  // XFL (extra flags)
259             // OS (0 = FAT filesystems, 3 = Unix). However, this field
260             // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
261             // This appears to be a difference caused by the availability
262             // (and use) of the native GZ codec.
263             + "\\x03"
264             + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
265             + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
266             + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00"
267             + "\\x00\\x00\\x00\\x00"; //  4 byte checksum (ignored)
268     final int correctGzipBlockLength = 95;
269     final String testBlockStr = createTestBlockStr(GZ, correctGzipBlockLength, false);
270     // We ignore the block checksum because createTestBlockStr can change the
271     // gzip header after the block is produced
272     assertEquals(correctTestBlockStr.substring(0, correctGzipBlockLength - 4),
273       testBlockStr.substring(0, correctGzipBlockLength - 4));
274   }
275 
276   @Test
277   public void testReaderV2() throws IOException {
278     testReaderV2Internals();
279   }
280 
281   protected void testReaderV2Internals() throws IOException {
282     if(includesTag) {
283       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
284     }
285     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
286       for (boolean pread : new boolean[] { false, true }) {
287           LOG.info("testReaderV2: Compression algorithm: " + algo +
288                    ", pread=" + pread);
289         Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
290             + algo);
291         FSDataOutputStream os = fs.create(path);
292         HFileContext meta = new HFileContextBuilder()
293                            .withCompression(algo)
294                            .withIncludesMvcc(includesMemstoreTS)
295                            .withIncludesTags(includesTag)
296                            .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
297                            .build();
298         HFileBlock.Writer hbw = new HFileBlock.Writer(null,
299            meta);
300         long totalSize = 0;
301         for (int blockId = 0; blockId < 2; ++blockId) {
302           DataOutputStream dos = hbw.startWriting(BlockType.DATA);
303           for (int i = 0; i < 1234; ++i)
304             dos.writeInt(i);
305           hbw.writeHeaderAndData(os);
306           totalSize += hbw.getOnDiskSizeWithHeader();
307         }
308         os.close();
309 
310         FSDataInputStream is = fs.open(path);
311         meta = new HFileContextBuilder()
312         .withHBaseCheckSum(true)
313         .withIncludesMvcc(includesMemstoreTS)
314         .withIncludesTags(includesTag)
315         .withCompression(algo).build();
316         HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
317         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
318         is.close();
319         assertEquals(0, HFile.getChecksumFailuresCount());
320 
321         b.sanityCheck();
322         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
323         assertEquals(algo == GZ ? 2173 : 4936,
324                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
325         HFileBlock expected = b;
326 
327         if (algo == GZ) {
328           is = fs.open(path);
329           hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
330           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
331                                 b.totalChecksumBytes(), -1, pread);
332           assertEquals(expected, b);
333           int wrongCompressedSize = 2172;
334           try {
335             b = hbr.readBlockData(0, wrongCompressedSize
336                 + HConstants.HFILEBLOCK_HEADER_SIZE, -1, pread);
337             fail("Exception expected");
338           } catch (IOException ex) {
339             String expectedPrefix = "On-disk size without header provided is "
340                 + wrongCompressedSize + ", but block header contains "
341                 + b.getOnDiskSizeWithoutHeader() + ".";
342             assertTrue("Invalid exception message: '" + ex.getMessage()
343                 + "'.\nMessage is expected to start with: '" + expectedPrefix
344                 + "'", ex.getMessage().startsWith(expectedPrefix));
345           }
346           is.close();
347         }
348       }
349     }
350   }
351 
352   /**
353    * Test encoding/decoding data blocks.
354    * @throws IOException a bug or a problem with temporary files.
355    */
356   @Test
357   public void testDataBlockEncoding() throws IOException {
358     testInternals();
359   }
360 
361   private void testInternals() throws IOException {
362     final int numBlocks = 5;
363     if(includesTag) {
364       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
365     }
366     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
367       for (boolean pread : new boolean[] { false, true }) {
368         for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
369           Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
370               + algo + "_" + encoding.toString());
371           FSDataOutputStream os = fs.create(path);
372           HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
373               new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
374           HFileContext meta = new HFileContextBuilder()
375                               .withCompression(algo)
376                               .withIncludesMvcc(includesMemstoreTS)
377                               .withIncludesTags(includesTag)
378                               .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
379                               .build();
380           HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
381           long totalSize = 0;
382           final List<Integer> encodedSizes = new ArrayList<Integer>();
383           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
384           for (int blockId = 0; blockId < numBlocks; ++blockId) {
385             hbw.startWriting(BlockType.DATA);
386             writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
387             hbw.writeHeaderAndData(os);
388             int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
389             byte[] encodedResultWithHeader = hbw.getUncompressedBufferWithHeader().array();
390             final int encodedSize = encodedResultWithHeader.length - headerLen;
391             if (encoding != DataBlockEncoding.NONE) {
392               // We need to account for the two-byte encoding algorithm ID that
393               // comes after the 24-byte block header but before encoded KVs.
394               headerLen += DataBlockEncoding.ID_SIZE;
395             }
396             byte[] encodedDataSection =
397                 new byte[encodedResultWithHeader.length - headerLen];
398             System.arraycopy(encodedResultWithHeader, headerLen,
399                 encodedDataSection, 0, encodedDataSection.length);
400             final ByteBuffer encodedBuf =
401                 ByteBuffer.wrap(encodedDataSection);
402             encodedSizes.add(encodedSize);
403             encodedBlocks.add(encodedBuf);
404             totalSize += hbw.getOnDiskSizeWithHeader();
405           }
406           os.close();
407 
408           FSDataInputStream is = fs.open(path);
409           meta = new HFileContextBuilder()
410                 .withHBaseCheckSum(true)
411                 .withCompression(algo)
412                 .withIncludesMvcc(includesMemstoreTS)
413                 .withIncludesTags(includesTag)
414                 .build();
415           HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
416           hbr.setDataBlockEncoder(dataBlockEncoder);
417           hbr.setIncludesMemstoreTS(includesMemstoreTS);
418           HFileBlock blockFromHFile, blockUnpacked;
419           int pos = 0;
420           for (int blockId = 0; blockId < numBlocks; ++blockId) {
421             blockFromHFile = hbr.readBlockData(pos, -1, -1, pread);
422             assertEquals(0, HFile.getChecksumFailuresCount());
423             blockFromHFile.sanityCheck();
424             pos += blockFromHFile.getOnDiskSizeWithHeader();
425             assertEquals((int) encodedSizes.get(blockId),
426               blockFromHFile.getUncompressedSizeWithoutHeader());
427             assertEquals(meta.isCompressedOrEncrypted(), !blockFromHFile.isUnpacked());
428             long packedHeapsize = blockFromHFile.heapSize();
429             blockUnpacked = blockFromHFile.unpack(meta, hbr);
430             assertTrue(blockUnpacked.isUnpacked());
431             if (meta.isCompressedOrEncrypted()) {
432               LOG.info("packedHeapsize=" + packedHeapsize + ", unpackedHeadsize=" + blockUnpacked
433                 .heapSize());
434               assertFalse(packedHeapsize == blockUnpacked.heapSize());
435               assertTrue("Packed heapSize should be < unpacked heapSize",
436                 packedHeapsize < blockUnpacked.heapSize());
437             }
438             ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader();
439             if (encoding != DataBlockEncoding.NONE) {
440               // We expect a two-byte big-endian encoding id.
441               assertEquals(
442                 "Unexpected first byte with " + buildMessageDetails(algo, encoding, pread),
443                 Long.toHexString(0), Long.toHexString(actualBuffer.get(0)));
444               assertEquals(
445                 "Unexpected second byte with " + buildMessageDetails(algo, encoding, pread),
446                 Long.toHexString(encoding.getId()), Long.toHexString(actualBuffer.get(1)));
447               actualBuffer.position(2);
448               actualBuffer = actualBuffer.slice();
449             }
450 
451             ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
452             expectedBuffer.rewind();
453 
454             // test if content matches, produce nice message
455             assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread);
456 
457             // test serialized blocks
458             for (boolean reuseBuffer : new boolean[] { false, true }) {
459               ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength());
460               blockFromHFile.serialize(serialized);
461               HFileBlock deserialized =
462                 (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer);
463               assertEquals(
464                 "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer,
465                 blockFromHFile, deserialized);
466               // intentional reference comparison
467               if (blockFromHFile != blockUnpacked) {
468                 assertEquals("Deserializaed block cannot be unpacked correctly.",
469                   blockUnpacked, deserialized.unpack(meta, hbr));
470               }
471             }
472           }
473           is.close();
474         }
475       }
476     }
477   }
478 
479   static String buildMessageDetails(Algorithm compression, DataBlockEncoding encoding,
480       boolean pread) {
481     return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread);
482   }
483 
484   static void assertBuffersEqual(ByteBuffer expectedBuffer,
485       ByteBuffer actualBuffer, Compression.Algorithm compression,
486       DataBlockEncoding encoding, boolean pread) {
487     if (!actualBuffer.equals(expectedBuffer)) {
488       int prefix = 0;
489       int minLimit = Math.min(expectedBuffer.limit(), actualBuffer.limit());
490       while (prefix < minLimit &&
491           expectedBuffer.get(prefix) == actualBuffer.get(prefix)) {
492         prefix++;
493       }
494 
495       fail(String.format(
496           "Content mismatch for %s, commonPrefix %d, expected %s, got %s",
497           buildMessageDetails(compression, encoding, pread), prefix,
498           nextBytesToStr(expectedBuffer, prefix),
499           nextBytesToStr(actualBuffer, prefix)));
500     }
501   }
502 
503   /**
504    * Convert a few next bytes in the given buffer at the given position to
505    * string. Used for error messages.
506    */
507   private static String nextBytesToStr(ByteBuffer buf, int pos) {
508     int maxBytes = buf.limit() - pos;
509     int numBytes = Math.min(16, maxBytes);
510     return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos,
511         numBytes) + (numBytes < maxBytes ? "..." : "");
512   }
513 
514   @Test
515   public void testPreviousOffset() throws IOException {
516     testPreviousOffsetInternals();
517   }
518 
519   protected void testPreviousOffsetInternals() throws IOException {
520     // TODO: parameterize these nested loops.
521     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
522       for (boolean pread : BOOLEAN_VALUES) {
523         for (boolean cacheOnWrite : BOOLEAN_VALUES) {
524           Random rand = defaultRandom();
525           LOG.info("testPreviousOffset:Compression algorithm: " + algo +
526                    ", pread=" + pread +
527                    ", cacheOnWrite=" + cacheOnWrite);
528           Path path = new Path(TEST_UTIL.getDataTestDir(), "prev_offset");
529           List<Long> expectedOffsets = new ArrayList<Long>();
530           List<Long> expectedPrevOffsets = new ArrayList<Long>();
531           List<BlockType> expectedTypes = new ArrayList<BlockType>();
532           List<ByteBuffer> expectedContents = cacheOnWrite
533               ? new ArrayList<ByteBuffer>() : null;
534           long totalSize = writeBlocks(rand, algo, path, expectedOffsets,
535               expectedPrevOffsets, expectedTypes, expectedContents);
536 
537           FSDataInputStream is = fs.open(path);
538           HFileContext meta = new HFileContextBuilder()
539                               .withHBaseCheckSum(true)
540                               .withIncludesMvcc(includesMemstoreTS)
541                               .withIncludesTags(includesTag)
542                               .withCompression(algo).build();
543           HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta);
544           long curOffset = 0;
545           for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
546             if (!pread) {
547               assertEquals(is.getPos(), curOffset + (i == 0 ? 0 :
548                   HConstants.HFILEBLOCK_HEADER_SIZE));
549             }
550 
551             assertEquals(expectedOffsets.get(i).longValue(), curOffset);
552             if (detailedLogging) {
553               LOG.info("Reading block #" + i + " at offset " + curOffset);
554             }
555             HFileBlock b = hbr.readBlockData(curOffset, -1, -1, pread);
556             if (detailedLogging) {
557               LOG.info("Block #" + i + ": " + b);
558             }
559             assertEquals("Invalid block #" + i + "'s type:",
560                 expectedTypes.get(i), b.getBlockType());
561             assertEquals("Invalid previous block offset for block " + i
562                 + " of " + "type " + b.getBlockType() + ":",
563                 (long) expectedPrevOffsets.get(i), b.getPrevBlockOffset());
564             b.sanityCheck();
565             assertEquals(curOffset, b.getOffset());
566 
567             // Now re-load this block knowing the on-disk size. This tests a
568             // different branch in the loader.
569             HFileBlock b2 = hbr.readBlockData(curOffset,
570                 b.getOnDiskSizeWithHeader(), -1, pread);
571             b2.sanityCheck();
572 
573             assertEquals(b.getBlockType(), b2.getBlockType());
574             assertEquals(b.getOnDiskSizeWithoutHeader(),
575                 b2.getOnDiskSizeWithoutHeader());
576             assertEquals(b.getOnDiskSizeWithHeader(),
577                 b2.getOnDiskSizeWithHeader());
578             assertEquals(b.getUncompressedSizeWithoutHeader(),
579                 b2.getUncompressedSizeWithoutHeader());
580             assertEquals(b.getPrevBlockOffset(), b2.getPrevBlockOffset());
581             assertEquals(curOffset, b2.getOffset());
582             assertEquals(b.getBytesPerChecksum(), b2.getBytesPerChecksum());
583             assertEquals(b.getOnDiskDataSizeWithHeader(),
584                          b2.getOnDiskDataSizeWithHeader());
585             assertEquals(0, HFile.getChecksumFailuresCount());
586 
587             curOffset += b.getOnDiskSizeWithHeader();
588 
589             if (cacheOnWrite) {
590               // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
591               // verifies that the unpacked value read back off disk matches the unpacked value
592               // generated before writing to disk.
593               b = b.unpack(meta, hbr);
594               // b's buffer has header + data + checksum while
595               // expectedContents have header + data only
596               ByteBuffer bufRead = b.getBufferWithHeader();
597               ByteBuffer bufExpected = expectedContents.get(i);
598               boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
599                   bufRead.arrayOffset(),
600                   bufRead.limit() - b.totalChecksumBytes(),
601                   bufExpected.array(), bufExpected.arrayOffset(),
602                   bufExpected.limit()) == 0;
603               String wrongBytesMsg = "";
604 
605               if (!bytesAreCorrect) {
606                 // Optimization: only construct an error message in case we
607                 // will need it.
608                 wrongBytesMsg = "Expected bytes in block #" + i + " (algo="
609                     + algo + ", pread=" + pread
610                     + ", cacheOnWrite=" + cacheOnWrite + "):\n";
611                 wrongBytesMsg += Bytes.toStringBinary(bufExpected.array(),
612                   bufExpected.arrayOffset(), Math.min(32 + 10, bufExpected.limit()))
613                     + ", actual:\n"
614                     + Bytes.toStringBinary(bufRead.array(),
615                   bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit()));
616                 if (detailedLogging) {
617                   LOG.warn("expected header" +
618                            HFileBlock.toStringHeader(bufExpected) +
619                            "\nfound    header" +
620                            HFileBlock.toStringHeader(bufRead));
621                   LOG.warn("bufread offset " + bufRead.arrayOffset() +
622                            " limit " + bufRead.limit() +
623                            " expected offset " + bufExpected.arrayOffset() +
624                            " limit " + bufExpected.limit());
625                   LOG.warn(wrongBytesMsg);
626                 }
627               }
628               assertTrue(wrongBytesMsg, bytesAreCorrect);
629             }
630           }
631 
632           assertEquals(curOffset, fs.getFileStatus(path).getLen());
633           is.close();
634         }
635       }
636     }
637   }
638 
639   private Random defaultRandom() {
640     return new Random(189237);
641   }
642 
643   private class BlockReaderThread implements Callable<Boolean> {
644     private final String clientId;
645     private final HFileBlock.FSReader hbr;
646     private final List<Long> offsets;
647     private final List<BlockType> types;
648     private final long fileSize;
649 
650     public BlockReaderThread(String clientId,
651         HFileBlock.FSReader hbr, List<Long> offsets, List<BlockType> types,
652         long fileSize) {
653       this.clientId = clientId;
654       this.offsets = offsets;
655       this.hbr = hbr;
656       this.types = types;
657       this.fileSize = fileSize;
658     }
659 
660     @Override
661     public Boolean call() throws Exception {
662       Random rand = new Random(clientId.hashCode());
663       long endTime = System.currentTimeMillis() + 10000;
664       int numBlocksRead = 0;
665       int numPositionalRead = 0;
666       int numWithOnDiskSize = 0;
667       while (System.currentTimeMillis() < endTime) {
668         int blockId = rand.nextInt(NUM_TEST_BLOCKS);
669         long offset = offsets.get(blockId);
670         boolean pread = rand.nextBoolean();
671         boolean withOnDiskSize = rand.nextBoolean();
672         long expectedSize =
673           (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
674               : offsets.get(blockId + 1)) - offset;
675 
676         HFileBlock b;
677         try {
678           long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
679           b = hbr.readBlockData(offset, onDiskSizeArg, -1, pread);
680         } catch (IOException ex) {
681           LOG.error("Error in client " + clientId + " trying to read block at "
682               + offset + ", pread=" + pread + ", withOnDiskSize=" +
683               withOnDiskSize, ex);
684           return false;
685         }
686 
687         assertEquals(types.get(blockId), b.getBlockType());
688         assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
689         assertEquals(offset, b.getOffset());
690 
691         ++numBlocksRead;
692         if (pread)
693           ++numPositionalRead;
694         if (withOnDiskSize)
695           ++numWithOnDiskSize;
696       }
697       LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
698         " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
699         "specified: " + numWithOnDiskSize + ")");
700 
701       return true;
702     }
703 
704   }
705 
706   @Test
707   public void testConcurrentReading() throws Exception {
708     testConcurrentReadingInternals();
709   }
710 
711   protected void testConcurrentReadingInternals() throws IOException,
712       InterruptedException, ExecutionException {
713     for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
714       Path path =
715           new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
716       Random rand = defaultRandom();
717       List<Long> offsets = new ArrayList<Long>();
718       List<BlockType> types = new ArrayList<BlockType>();
719       writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
720       FSDataInputStream is = fs.open(path);
721       long fileSize = fs.getFileStatus(path).getLen();
722       HFileContext meta = new HFileContextBuilder()
723                           .withHBaseCheckSum(true)
724                           .withIncludesMvcc(includesMemstoreTS)
725                           .withIncludesTags(includesTag)
726                           .withCompression(compressAlgo)
727                           .build();
728       HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, fileSize, meta);
729 
730       Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
731       ExecutorCompletionService<Boolean> ecs =
732           new ExecutorCompletionService<Boolean>(exec);
733 
734       for (int i = 0; i < NUM_READER_THREADS; ++i) {
735         ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
736             offsets, types, fileSize));
737       }
738 
739       for (int i = 0; i < NUM_READER_THREADS; ++i) {
740         Future<Boolean> result = ecs.take();
741         assertTrue(result.get());
742         if (detailedLogging) {
743           LOG.info(String.valueOf(i + 1)
744             + " reader threads finished successfully (algo=" + compressAlgo
745             + ")");
746         }
747       }
748 
749       is.close();
750     }
751   }
752 
753   private long writeBlocks(Random rand, Compression.Algorithm compressAlgo,
754       Path path, List<Long> expectedOffsets, List<Long> expectedPrevOffsets,
755       List<BlockType> expectedTypes, List<ByteBuffer> expectedContents
756   ) throws IOException {
757     boolean cacheOnWrite = expectedContents != null;
758     FSDataOutputStream os = fs.create(path);
759     HFileContext meta = new HFileContextBuilder()
760                         .withHBaseCheckSum(true)
761                         .withIncludesMvcc(includesMemstoreTS)
762                         .withIncludesTags(includesTag)
763                         .withCompression(compressAlgo)
764                         .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
765                         .build();
766     HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
767     Map<BlockType, Long> prevOffsetByType = new HashMap<BlockType, Long>();
768     long totalSize = 0;
769     for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
770       long pos = os.getPos();
771       int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
772       if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
773         blockTypeOrdinal = BlockType.DATA.ordinal();
774       }
775       BlockType bt = BlockType.values()[blockTypeOrdinal];
776       DataOutputStream dos = hbw.startWriting(bt);
777       int size = rand.nextInt(500);
778       for (int j = 0; j < size; ++j) {
779         // This might compress well.
780         dos.writeShort(i + 1);
781         dos.writeInt(j + 1);
782       }
783 
784       if (expectedOffsets != null)
785         expectedOffsets.add(os.getPos());
786 
787       if (expectedPrevOffsets != null) {
788         Long prevOffset = prevOffsetByType.get(bt);
789         expectedPrevOffsets.add(prevOffset != null ? prevOffset : -1);
790         prevOffsetByType.put(bt, os.getPos());
791       }
792 
793       expectedTypes.add(bt);
794 
795       hbw.writeHeaderAndData(os);
796       totalSize += hbw.getOnDiskSizeWithHeader();
797 
798       if (cacheOnWrite)
799         expectedContents.add(hbw.getUncompressedBufferWithHeader());
800 
801       if (detailedLogging) {
802         LOG.info("Written block #" + i + " of type " + bt
803             + ", uncompressed size " + hbw.getUncompressedSizeWithoutHeader()
804             + ", packed size " + hbw.getOnDiskSizeWithoutHeader()
805             + " at offset " + pos);
806       }
807     }
808     os.close();
809     LOG.info("Created a temporary file at " + path + ", "
810         + fs.getFileStatus(path).getLen() + " byte, compression=" +
811         compressAlgo);
812     return totalSize;
813   }
814 
815   @Test
816   public void testBlockHeapSize() {
817     testBlockHeapSizeInternals();
818   }
819 
820   protected void testBlockHeapSizeInternals() {
821     if (ClassSize.is32BitJVM()) {
822       assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 64);
823     } else {
824       assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 80);
825     }
826 
827     for (int size : new int[] { 100, 256, 12345 }) {
828       byte[] byteArr = new byte[HConstants.HFILEBLOCK_HEADER_SIZE + size];
829       ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
830       HFileContext meta = new HFileContextBuilder()
831                           .withIncludesMvcc(includesMemstoreTS)
832                           .withIncludesTags(includesTag)
833                           .withHBaseCheckSum(false)
834                           .withCompression(Algorithm.NONE)
835                           .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
836                           .withChecksumType(ChecksumType.NULL).build();
837       HFileBlock block = new HFileBlock(BlockType.DATA, size, size, -1, buf,
838           HFileBlock.FILL_HEADER, -1, 
839           0, meta);
840       long byteBufferExpectedSize =
841           ClassSize.align(ClassSize.estimateBase(buf.getClass(), true)
842               + HConstants.HFILEBLOCK_HEADER_SIZE + size);
843       long hfileMetaSize =  ClassSize.align(ClassSize.estimateBase(HFileContext.class, true));
844       long hfileBlockExpectedSize =
845           ClassSize.align(ClassSize.estimateBase(HFileBlock.class, true));
846       long expected = hfileBlockExpectedSize + byteBufferExpectedSize + hfileMetaSize;
847       assertEquals("Block data size: " + size + ", byte buffer expected " +
848           "size: " + byteBufferExpectedSize + ", HFileBlock class expected " +
849           "size: " + hfileBlockExpectedSize + ";", expected,
850           block.heapSize());
851     }
852   }
853 }