View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
22  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
23  import static org.junit.Assert.*;
24  
25  import java.io.ByteArrayOutputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.io.OutputStream;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.List;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.fs.FSDataInputStream;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.KeyValueUtil;
44  import org.apache.hadoop.hbase.testclassification.SmallTests;
45  import org.apache.hadoop.hbase.fs.HFileSystem;
46  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
47  import org.apache.hadoop.hbase.io.compress.Compression;
48  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
49  import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
50  import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
51  import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.ChecksumType;
54  import org.apache.hadoop.io.WritableUtils;
55  import org.apache.hadoop.io.compress.Compressor;
56  import org.junit.Before;
57  import org.junit.Test;
58  import org.junit.experimental.categories.Category;
59  import org.junit.runner.RunWith;
60  import org.junit.runners.Parameterized;
61  import org.junit.runners.Parameterized.Parameters;
62  
63  import com.google.common.base.Preconditions;
64  
65  /**
66   * This class has unit tests to prove that older versions of
67   * HFiles (without checksums) are compatible with current readers.
68   */
69  @Category(SmallTests.class)
70  @RunWith(Parameterized.class)
71  public class TestHFileBlockCompatibility {
72  
73    private static final Log LOG = LogFactory.getLog(TestHFileBlockCompatibility.class);
74    private static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
75        NONE, GZ };
76  
77    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
78    private HFileSystem fs;
79  
80    private final boolean includesMemstoreTS;
81    private final boolean includesTag;
82  
83    public TestHFileBlockCompatibility(boolean includesMemstoreTS, boolean includesTag) {
84      this.includesMemstoreTS = includesMemstoreTS;
85      this.includesTag = includesTag;
86    }
87  
88    @Parameters
89    public static Collection<Object[]> parameters() {
90      return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
91    }
92  
93    @Before
94    public void setUp() throws IOException {
95      fs = (HFileSystem)HFileSystem.get(TEST_UTIL.getConfiguration());
96    }
97  
98    public byte[] createTestV1Block(Compression.Algorithm algo)
99        throws IOException {
100     Compressor compressor = algo.getCompressor();
101     ByteArrayOutputStream baos = new ByteArrayOutputStream();
102     OutputStream os = algo.createCompressionStream(baos, compressor, 0);
103     DataOutputStream dos = new DataOutputStream(os);
104     BlockType.META.write(dos); // Let's make this a meta block.
105     TestHFileBlock.writeTestBlockContents(dos);
106     dos.flush();
107     algo.returnCompressor(compressor);
108     return baos.toByteArray();
109   }
110 
111   private Writer createTestV2Block(Compression.Algorithm algo)
112       throws IOException {
113     final BlockType blockType = BlockType.DATA;
114     Writer hbw = new Writer(algo, null,
115         includesMemstoreTS, includesTag);
116     DataOutputStream dos = hbw.startWriting(blockType);
117     TestHFileBlock.writeTestBlockContents(dos);
118     // make sure the block is ready by calling hbw.getHeaderAndData()
119     hbw.getHeaderAndData();
120     assertEquals(1000 * 4, hbw.getUncompressedSizeWithoutHeader());
121     hbw.releaseCompressor();
122     return hbw;
123   }
124 
125  private String createTestBlockStr(Compression.Algorithm algo,
126       int correctLength) throws IOException {
127     Writer hbw = createTestV2Block(algo);
128     byte[] testV2Block = hbw.getHeaderAndData();
129     int osOffset = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + 9;
130     if (testV2Block.length == correctLength) {
131       // Force-set the "OS" field of the gzip header to 3 (Unix) to avoid
132       // variations across operating systems.
133       // See http://www.gzip.org/zlib/rfc-gzip.html for gzip format.
134       testV2Block[osOffset] = 3;
135     }
136     return Bytes.toStringBinary(testV2Block);
137   }
138 
139   @Test
140   public void testNoCompression() throws IOException {
141     assertEquals(4000, createTestV2Block(NONE).getBlockForCaching().
142         getUncompressedSizeWithoutHeader());
143   }
144 
145   @Test
146   public void testGzipCompression() throws IOException {
147     final String correctTestBlockStr =
148         "DATABLK*\\x00\\x00\\x00:\\x00\\x00\\x0F\\xA0\\xFF\\xFF\\xFF\\xFF"
149             + "\\xFF\\xFF\\xFF\\xFF"
150             // gzip-compressed block: http://www.gzip.org/zlib/rfc-gzip.html
151             + "\\x1F\\x8B"  // gzip magic signature
152             + "\\x08"  // Compression method: 8 = "deflate"
153             + "\\x00"  // Flags
154             + "\\x00\\x00\\x00\\x00"  // mtime
155             + "\\x00"  // XFL (extra flags)
156             // OS (0 = FAT filesystems, 3 = Unix). However, this field
157             // sometimes gets set to 0 on Linux and Mac, so we reset it to 3.
158             + "\\x03"
159             + "\\xED\\xC3\\xC1\\x11\\x00 \\x08\\xC00DD\\xDD\\x7Fa"
160             + "\\xD6\\xE8\\xA3\\xB9K\\x84`\\x96Q\\xD3\\xA8\\xDB\\xA8e\\xD4c"
161             + "\\xD46\\xEA5\\xEA3\\xEA7\\xE7\\x00LI\\x5Cs\\xA0\\x0F\\x00\\x00";
162     final int correctGzipBlockLength = 82;
163 
164     String returnedStr = createTestBlockStr(GZ, correctGzipBlockLength);
165     assertEquals(correctTestBlockStr, returnedStr);
166   }
167 
168   @Test
169   public void testReaderV2() throws IOException {
170     if(includesTag) {
171       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
172     }
173     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
174       for (boolean pread : new boolean[] { false, true }) {
175           LOG.info("testReaderV2: Compression algorithm: " + algo +
176                    ", pread=" + pread);
177         Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
178             + algo);
179         FSDataOutputStream os = fs.create(path);
180         Writer hbw = new Writer(algo, null,
181             includesMemstoreTS, includesTag);
182         long totalSize = 0;
183         for (int blockId = 0; blockId < 2; ++blockId) {
184           DataOutputStream dos = hbw.startWriting(BlockType.DATA);
185           for (int i = 0; i < 1234; ++i)
186             dos.writeInt(i);
187           hbw.writeHeaderAndData(os);
188           totalSize += hbw.getOnDiskSizeWithHeader();
189         }
190         os.close();
191 
192         FSDataInputStream is = fs.open(path);
193         HFileContext meta = new HFileContextBuilder()
194                            .withHBaseCheckSum(false)
195                            .withIncludesMvcc(includesMemstoreTS)
196                            .withIncludesTags(includesTag)
197                            .withCompression(algo)
198                            .build();
199         HFileBlock.FSReader hbr =
200           new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path, meta);
201         HFileBlock b = hbr.readBlockData(0, -1, -1, pread);
202         is.close();
203 
204         b.sanityCheck();
205         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
206         assertEquals(algo == GZ ? 2173 : 4936,
207                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
208         HFileBlock expected = b;
209 
210         if (algo == GZ) {
211           is = fs.open(path);
212           hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is), totalSize, fs, path,
213               meta);
214           b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM +
215                                 b.totalChecksumBytes(), -1, pread);
216           assertEquals(expected, b);
217           int wrongCompressedSize = 2172;
218           try {
219             b = hbr.readBlockData(0, wrongCompressedSize
220                 + HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM, -1, pread);
221             fail("Exception expected");
222           } catch (IOException ex) {
223             String expectedPrefix = "On-disk size without header provided is "
224                 + wrongCompressedSize + ", but block header contains "
225                 + b.getOnDiskSizeWithoutHeader() + ".";
226             assertTrue("Invalid exception message: '" + ex.getMessage()
227                 + "'.\nMessage is expected to start with: '" + expectedPrefix
228                 + "'", ex.getMessage().startsWith(expectedPrefix));
229           }
230           is.close();
231         }
232       }
233     }
234   }
235 
236   /**
237    * Test encoding/decoding data blocks.
238    * @throws IOException a bug or a problem with temporary files.
239    */
240   @Test
241   public void testDataBlockEncoding() throws IOException {
242     if(includesTag) {
243       TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
244     }
245     final int numBlocks = 5;
246     for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
247       for (boolean pread : new boolean[] { false, true }) {
248         for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
249           LOG.info("testDataBlockEncoding algo " + algo +
250                    " pread = " + pread +
251                    " encoding " + encoding);
252           Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
253               + algo + "_" + encoding.toString());
254           FSDataOutputStream os = fs.create(path);
255           HFileDataBlockEncoder dataBlockEncoder = (encoding != DataBlockEncoding.NONE) ?
256               new HFileDataBlockEncoderImpl(encoding) : NoOpDataBlockEncoder.INSTANCE;
257           TestHFileBlockCompatibility.Writer hbw =
258               new TestHFileBlockCompatibility.Writer(algo,
259                   dataBlockEncoder, includesMemstoreTS, includesTag);
260           long totalSize = 0;
261           final List<Integer> encodedSizes = new ArrayList<Integer>();
262           final List<ByteBuffer> encodedBlocks = new ArrayList<ByteBuffer>();
263           for (int blockId = 0; blockId < numBlocks; ++blockId) {
264             hbw.startWriting(BlockType.DATA);
265             TestHFileBlock.writeTestKeyValues(hbw, blockId, pread, includesTag);
266             hbw.writeHeaderAndData(os);
267             int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
268             byte[] encodedResultWithHeader = hbw.getUncompressedDataWithHeader();
269             final int encodedSize = encodedResultWithHeader.length - headerLen;
270             if (encoding != DataBlockEncoding.NONE) {
271               // We need to account for the two-byte encoding algorithm ID that
272               // comes after the 24-byte block header but before encoded KVs.
273               headerLen += DataBlockEncoding.ID_SIZE;
274             }
275             byte[] encodedDataSection =
276                 new byte[encodedResultWithHeader.length - headerLen];
277             System.arraycopy(encodedResultWithHeader, headerLen,
278                 encodedDataSection, 0, encodedDataSection.length);
279             final ByteBuffer encodedBuf =
280                 ByteBuffer.wrap(encodedDataSection);
281             encodedSizes.add(encodedSize);
282             encodedBlocks.add(encodedBuf);
283             totalSize += hbw.getOnDiskSizeWithHeader();
284           }
285           os.close();
286 
287           FSDataInputStream is = fs.open(path);
288           HFileContext meta = new HFileContextBuilder()
289                               .withHBaseCheckSum(false)
290                               .withIncludesMvcc(includesMemstoreTS)
291                               .withIncludesTags(includesTag)
292                               .withCompression(algo)
293                               .build();
294           HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is),
295               totalSize, fs, path, meta);
296           hbr.setDataBlockEncoder(dataBlockEncoder);
297           hbr.setIncludesMemstoreTS(includesMemstoreTS);
298 
299           HFileBlock b;
300           int pos = 0;
301           for (int blockId = 0; blockId < numBlocks; ++blockId) {
302             b = hbr.readBlockData(pos, -1, -1, pread);
303             b.sanityCheck();
304             if (meta.isCompressedOrEncrypted()) {
305               assertFalse(b.isUnpacked());
306               b = b.unpack(meta, hbr);
307             }
308             pos += b.getOnDiskSizeWithHeader();
309 
310             assertEquals((int) encodedSizes.get(blockId),
311                 b.getUncompressedSizeWithoutHeader());
312             ByteBuffer actualBuffer = b.getBufferWithoutHeader();
313             if (encoding != DataBlockEncoding.NONE) {
314               // We expect a two-byte big-endian encoding id.
315               assertEquals(0, actualBuffer.get(0));
316               assertEquals(encoding.getId(), actualBuffer.get(1));
317               actualBuffer.position(2);
318               actualBuffer = actualBuffer.slice();
319             }
320 
321             ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
322             expectedBuffer.rewind();
323 
324             // test if content matches, produce nice message
325             TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer,
326               algo, encoding, pread);
327           }
328           is.close();
329         }
330       }
331     }
332   }
333   /**
334    * This is the version of the HFileBlock.Writer that is used to
335    * create V2 blocks with minor version 0. These blocks do not
336    * have hbase-level checksums. The code is here to test
337    * backward compatibility. The reason we do not inherit from
338    * HFileBlock.Writer is because we never ever want to change the code
339    * in this class but the code in HFileBlock.Writer will continually
340    * evolve.
341    */
342   public static final class Writer extends HFileBlock.Writer {
343 
344     // These constants are as they were in minorVersion 0.
345     private static final int HEADER_SIZE = HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM;
346     private static final boolean DONT_FILL_HEADER = HFileBlock.DONT_FILL_HEADER;
347     private static final byte[] DUMMY_HEADER =
348       HFileBlock.DUMMY_HEADER_NO_CHECKSUM;
349 
350     private enum State {
351       INIT,
352       WRITING,
353       BLOCK_READY
354     };
355 
356     /** Writer state. Used to ensure the correct usage protocol. */
357     private State state = State.INIT;
358 
359     /** Compression algorithm for all blocks this instance writes. */
360     private final Compression.Algorithm compressAlgo;
361 
362     /** Data block encoder used for data blocks */
363     private final HFileDataBlockEncoder dataBlockEncoder;
364 
365     private HFileBlockEncodingContext dataBlockEncodingCtx;
366     /** block encoding context for non-data blocks */
367     private HFileBlockDefaultEncodingContext defaultBlockEncodingCtx;
368 
369     /**
370      * The stream we use to accumulate data in uncompressed format for each
371      * block. We reset this stream at the end of each block and reuse it. The
372      * header is written as the first {@link #HEADER_SIZE} bytes into this
373      * stream.
374      */
375     private ByteArrayOutputStream baosInMemory;
376 
377     /** Compressor, which is also reused between consecutive blocks. */
378     private Compressor compressor;
379 
380     /**
381      * Current block type. Set in {@link #startWriting(BlockType)}. Could be
382      * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
383      * to {@link BlockType#ENCODED_DATA}.
384      */
385     private BlockType blockType;
386 
387     /**
388      * A stream that we write uncompressed bytes to, which compresses them and
389      * writes them to {@link #baosInMemory}.
390      */
391     private DataOutputStream userDataStream;
392 
393     /**
394      * Bytes to be written to the file system, including the header. Compressed
395      * if compression is turned on.
396      */
397     private byte[] onDiskBytesWithHeader;
398 
399     /**
400      * Valid in the READY state. Contains the header and the uncompressed (but
401      * potentially encoded, if this is a data block) bytes, so the length is
402      * {@link #uncompressedSizeWithoutHeader} + {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
403      */
404     private byte[] uncompressedBytesWithHeader;
405 
406     /**
407      * Current block's start offset in the {@link HFile}. Set in
408      * {@link #writeHeaderAndData(FSDataOutputStream)}.
409      */
410     private long startOffset;
411 
412     /**
413      * Offset of previous block by block type. Updated when the next block is
414      * started.
415      */
416     private long[] prevOffsetByType;
417 
418     /** The offset of the previous block of the same type */
419     private long prevOffset;
420 
421     private int unencodedDataSizeWritten;
422 
423     public Writer(Compression.Algorithm compressionAlgorithm,
424         HFileDataBlockEncoder dataBlockEncoder, boolean includesMemstoreTS, boolean includesTag) {
425       this(dataBlockEncoder, new HFileContextBuilder().withHBaseCheckSum(false)
426           .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTag)
427           .withCompression(compressionAlgorithm).build());
428     }
429 
430     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext meta) {
431       super(dataBlockEncoder, meta);
432       compressAlgo = meta.getCompression() == null ? NONE : meta.getCompression();
433       this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder
434           : NoOpDataBlockEncoder.INSTANCE;
435       defaultBlockEncodingCtx = new HFileBlockDefaultEncodingContext(null, DUMMY_HEADER, meta);
436       dataBlockEncodingCtx = this.dataBlockEncoder.newDataBlockEncodingContext(DUMMY_HEADER, meta);
437       baosInMemory = new ByteArrayOutputStream();
438 
439       prevOffsetByType = new long[BlockType.values().length];
440       for (int i = 0; i < prevOffsetByType.length; ++i)
441         prevOffsetByType[i] = -1;
442     }
443 
444     /**
445      * Starts writing into the block. The previous block's data is discarded.
446      *
447      * @return the stream the user can write their data into
448      * @throws IOException
449      */
450     public DataOutputStream startWriting(BlockType newBlockType)
451         throws IOException {
452       if (state == State.BLOCK_READY && startOffset != -1) {
453         // We had a previous block that was written to a stream at a specific
454         // offset. Save that offset as the last offset of a block of that type.
455         prevOffsetByType[blockType.getId()] = startOffset;
456       }
457 
458       startOffset = -1;
459       blockType = newBlockType;
460 
461       baosInMemory.reset();
462       baosInMemory.write(DUMMY_HEADER);
463 
464       state = State.WRITING;
465 
466       // We will compress it later in finishBlock()
467       userDataStream = new DataOutputStream(baosInMemory);
468       if (newBlockType == BlockType.DATA) {
469         this.dataBlockEncoder.startBlockEncoding(dataBlockEncodingCtx, userDataStream);
470       }
471       this.unencodedDataSizeWritten = 0;
472       return userDataStream;
473     }
474 
475     @Override
476     public void write(Cell c) throws IOException {
477       KeyValue kv = KeyValueUtil.ensureKeyValue(c);
478       expectState(State.WRITING);
479       this.dataBlockEncoder.encode(kv, dataBlockEncodingCtx, this.userDataStream);
480       this.unencodedDataSizeWritten += kv.getLength();
481       if (dataBlockEncodingCtx.getHFileContext().isIncludesMvcc()) {
482         this.unencodedDataSizeWritten += WritableUtils.getVIntSize(kv.getMvccVersion());
483       }
484     }
485 
486     /**
487      * Returns the stream for the user to write to. The block writer takes care
488      * of handling compression and buffering for caching on write. Can only be
489      * called in the "writing" state.
490      *
491      * @return the data output stream for the user to write to
492      */
493     DataOutputStream getUserDataStream() {
494       expectState(State.WRITING);
495       return userDataStream;
496     }
497 
498     /**
499      * Transitions the block writer from the "writing" state to the "block
500      * ready" state.  Does nothing if a block is already finished.
501      */
502     void ensureBlockReady() throws IOException {
503       Preconditions.checkState(state != State.INIT,
504           "Unexpected state: " + state);
505 
506       if (state == State.BLOCK_READY)
507         return;
508 
509       // This will set state to BLOCK_READY.
510       finishBlock();
511     }
512 
513     /**
514      * An internal method that flushes the compressing stream (if using
515      * compression), serializes the header, and takes care of the separate
516      * uncompressed stream for caching on write, if applicable. Sets block
517      * write state to "block ready".
518      */
519     void finishBlock() throws IOException {
520       if (blockType == BlockType.DATA) {
521         this.dataBlockEncoder.endBlockEncoding(dataBlockEncodingCtx, userDataStream,
522             baosInMemory.toByteArray(), blockType);
523         blockType = dataBlockEncodingCtx.getBlockType();
524       }
525       userDataStream.flush();
526       // This does an array copy, so it is safe to cache this byte array.
527       uncompressedBytesWithHeader = baosInMemory.toByteArray();
528       prevOffset = prevOffsetByType[blockType.getId()];
529 
530       // We need to set state before we can package the block up for
531       // cache-on-write. In a way, the block is ready, but not yet encoded or
532       // compressed.
533       state = State.BLOCK_READY;
534       if (blockType == BlockType.DATA || blockType == BlockType.ENCODED_DATA) {
535         onDiskBytesWithHeader = dataBlockEncodingCtx
536             .compressAndEncrypt(uncompressedBytesWithHeader);
537       } else {
538         onDiskBytesWithHeader = defaultBlockEncodingCtx
539             .compressAndEncrypt(uncompressedBytesWithHeader);
540       }
541 
542       // put the header for on disk bytes
543       putHeader(onDiskBytesWithHeader, 0,
544           onDiskBytesWithHeader.length,
545           uncompressedBytesWithHeader.length);
546       //set the header for the uncompressed bytes (for cache-on-write)
547       putHeader(uncompressedBytesWithHeader, 0,
548           onDiskBytesWithHeader.length,
549         uncompressedBytesWithHeader.length);
550     }
551 
552     /**
553      * Put the header into the given byte array at the given offset.
554      * @param onDiskSize size of the block on disk
555      * @param uncompressedSize size of the block after decompression (but
556      *          before optional data block decoding)
557      */
558     private void putHeader(byte[] dest, int offset, int onDiskSize,
559         int uncompressedSize) {
560       offset = blockType.put(dest, offset);
561       offset = Bytes.putInt(dest, offset, onDiskSize - HEADER_SIZE);
562       offset = Bytes.putInt(dest, offset, uncompressedSize - HEADER_SIZE);
563       Bytes.putLong(dest, offset, prevOffset);
564     }
565 
566     /**
567      * Similar to {@link #writeHeaderAndData(FSDataOutputStream)}, but records
568      * the offset of this block so that it can be referenced in the next block
569      * of the same type.
570      *
571      * @param out
572      * @throws IOException
573      */
574     public void writeHeaderAndData(FSDataOutputStream out) throws IOException {
575       long offset = out.getPos();
576       if (startOffset != -1 && offset != startOffset) {
577         throw new IOException("A " + blockType + " block written to a "
578             + "stream twice, first at offset " + startOffset + ", then at "
579             + offset);
580       }
581       startOffset = offset;
582 
583       writeHeaderAndData((DataOutputStream) out);
584     }
585 
586     /**
587      * Writes the header and the compressed data of this block (or uncompressed
588      * data when not using compression) into the given stream. Can be called in
589      * the "writing" state or in the "block ready" state. If called in the
590      * "writing" state, transitions the writer to the "block ready" state.
591      *
592      * @param out the output stream to write the
593      * @throws IOException
594      */
595     private void writeHeaderAndData(DataOutputStream out) throws IOException {
596       ensureBlockReady();
597       out.write(onDiskBytesWithHeader);
598     }
599 
600     /**
601      * Returns the header or the compressed data (or uncompressed data when not
602      * using compression) as a byte array. Can be called in the "writing" state
603      * or in the "block ready" state. If called in the "writing" state,
604      * transitions the writer to the "block ready" state.
605      *
606      * @return header and data as they would be stored on disk in a byte array
607      * @throws IOException
608      */
609     public byte[] getHeaderAndData() throws IOException {
610       ensureBlockReady();
611       return onDiskBytesWithHeader;
612     }
613 
614     /**
615      * Releases the compressor this writer uses to compress blocks into the
616      * compressor pool. Needs to be called before the writer is discarded.
617      */
618     public void releaseCompressor() {
619       if (compressor != null) {
620         compressAlgo.returnCompressor(compressor);
621         compressor = null;
622       }
623     }
624 
625     /**
626      * Returns the on-disk size of the data portion of the block. This is the
627      * compressed size if compression is enabled. Can only be called in the
628      * "block ready" state. Header is not compressed, and its size is not
629      * included in the return value.
630      *
631      * @return the on-disk size of the block, not including the header.
632      */
633     public int getOnDiskSizeWithoutHeader() {
634       expectState(State.BLOCK_READY);
635       return onDiskBytesWithHeader.length - HEADER_SIZE;
636     }
637 
638     /**
639      * Returns the on-disk size of the block. Can only be called in the
640      * "block ready" state.
641      *
642      * @return the on-disk size of the block ready to be written, including the
643      *         header size
644      */
645     public int getOnDiskSizeWithHeader() {
646       expectState(State.BLOCK_READY);
647       return onDiskBytesWithHeader.length;
648     }
649 
650     /**
651      * The uncompressed size of the block data. Does not include header size.
652      */
653     public int getUncompressedSizeWithoutHeader() {
654       expectState(State.BLOCK_READY);
655       return uncompressedBytesWithHeader.length - HEADER_SIZE;
656     }
657 
658     /**
659      * The uncompressed size of the block data, including header size.
660      */
661     public int getUncompressedSizeWithHeader() {
662       expectState(State.BLOCK_READY);
663       return uncompressedBytesWithHeader.length;
664     }
665 
666     /** @return true if a block is being written  */
667     public boolean isWriting() {
668       return state == State.WRITING;
669     }
670 
671     /**
672      * Returns the number of bytes written into the current block so far, or
673      * zero if not writing the block at the moment. Note that this will return
674      * zero in the "block ready" state as well.
675      *
676      * @return the number of bytes written
677      */
678     public int blockSizeWritten() {
679       if (state != State.WRITING)
680         return 0;
681       return this.unencodedDataSizeWritten;
682     }
683 
684     /**
685      * Returns the header followed by the uncompressed data, even if using
686      * compression. This is needed for storing uncompressed blocks in the block
687      * cache. Can be called in the "writing" state or the "block ready" state.
688      *
689      * @return uncompressed block bytes for caching on write
690      */
691     private byte[] getUncompressedDataWithHeader() {
692       expectState(State.BLOCK_READY);
693 
694       return uncompressedBytesWithHeader;
695     }
696 
697     private void expectState(State expectedState) {
698       if (state != expectedState) {
699         throw new IllegalStateException("Expected state: " + expectedState +
700             ", actual state: " + state);
701       }
702     }
703 
704     /**
705      * Similar to {@link #getUncompressedBufferWithHeader()} but returns a byte
706      * buffer.
707      *
708      * @return uncompressed block for caching on write in the form of a buffer
709      */
710     public ByteBuffer getUncompressedBufferWithHeader() {
711       byte[] b = getUncompressedDataWithHeader();
712       return ByteBuffer.wrap(b, 0, b.length);
713     }
714 
715     /**
716      * Takes the given {@link BlockWritable} instance, creates a new block of
717      * its appropriate type, writes the writable into this block, and flushes
718      * the block into the output stream. The writer is instructed not to buffer
719      * uncompressed bytes for cache-on-write.
720      *
721      * @param bw the block-writable object to write as a block
722      * @param out the file system output stream
723      * @throws IOException
724      */
725     public void writeBlock(BlockWritable bw, FSDataOutputStream out)
726         throws IOException {
727       bw.writeToBlock(startWriting(bw.getBlockType()));
728       writeHeaderAndData(out);
729     }
730 
731     /**
732      * Creates a new HFileBlock.
733      */
734     public HFileBlock getBlockForCaching() {
735       HFileContext meta = new HFileContextBuilder()
736              .withHBaseCheckSum(false)
737              .withChecksumType(ChecksumType.NULL)
738              .withBytesPerCheckSum(0)
739              .build();
740       return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
741           getUncompressedSizeWithoutHeader(), prevOffset,
742           getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset,
743           getOnDiskSizeWithoutHeader(), meta);
744     }
745   }
746 
747 }
748