View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.io.ByteArrayOutputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.DataOutput;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.KeyValue.KVComparator;
40  import org.apache.hadoop.hbase.KeyValueUtil;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
44  import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader;
45  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
46  import org.apache.hadoop.hbase.util.ByteBufferUtils;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.ClassSize;
49  import org.apache.hadoop.io.WritableUtils;
50  import org.apache.hadoop.util.StringUtils;
51  
52  /**
53   * Provides functionality to write ({@link BlockIndexWriter}) and read
54   * BlockIndexReader
55   * single-level and multi-level block indexes.
56   *
57   * Examples of how to use the block index writer can be found in
58   * {@link org.apache.hadoop.hbase.util.CompoundBloomFilterWriter} and
59   *  {@link HFileWriterV2}. Examples of how to use the reader can be
60   *  found in {@link HFileReaderV2} and
61   *  {@link org.apache.hadoop.hbase.io.hfile.TestHFileBlockIndex}.
62   */
63  @InterfaceAudience.Private
64  public class HFileBlockIndex {
65  
66    private static final Log LOG = LogFactory.getLog(HFileBlockIndex.class);
67  
68    static final int DEFAULT_MAX_CHUNK_SIZE = 128 * 1024;
69  
70    /**
71     * The maximum size guideline for index blocks (both leaf, intermediate, and
72     * root). If not specified, <code>DEFAULT_MAX_CHUNK_SIZE</code> is used.
73     */
74    public static final String MAX_CHUNK_SIZE_KEY = "hfile.index.block.max.size";
75  
76    /**
77     * The number of bytes stored in each "secondary index" entry in addition to
78     * key bytes in the non-root index block format. The first long is the file
79     * offset of the deeper-level block the entry points to, and the int that
80     * follows is that block's on-disk size without including header.
81     */
82    static final int SECONDARY_INDEX_ENTRY_OVERHEAD = Bytes.SIZEOF_INT
83        + Bytes.SIZEOF_LONG;
84  
85    /**
86     * Error message when trying to use inline block API in single-level mode.
87     */
88    private static final String INLINE_BLOCKS_NOT_ALLOWED =
89        "Inline blocks are not allowed in the single-level-only mode";
90  
91    /**
92     * The size of a meta-data record used for finding the mid-key in a
93     * multi-level index. Consists of the middle leaf-level index block offset
94     * (long), its on-disk size without header included (int), and the mid-key
95     * entry's zero-based index in that leaf index block.
96     */
97    private static final int MID_KEY_METADATA_SIZE = Bytes.SIZEOF_LONG +
98        2 * Bytes.SIZEOF_INT;
99  
100   /**
101    * The reader will always hold the root level index in the memory. Index
102    * blocks at all other levels will be cached in the LRU cache in practice,
103    * although this API does not enforce that.
104    *
105    * All non-root (leaf and intermediate) index blocks contain what we call a
106    * "secondary index": an array of offsets to the entries within the block.
107    * This allows us to do binary search for the entry corresponding to the
108    * given key without having to deserialize the block.
109    */
110   public static class BlockIndexReader implements HeapSize {
111     /** Needed doing lookup on blocks. */
112     private final KVComparator comparator;
113 
114     // Root-level data.
115     private byte[][] blockKeys;
116     private long[] blockOffsets;
117     private int[] blockDataSizes;
118     private int rootCount = 0;
119 
120     // Mid-key metadata.
121     private long midLeafBlockOffset = -1;
122     private int midLeafBlockOnDiskSize = -1;
123     private int midKeyEntry = -1;
124 
125     /** Pre-computed mid-key */
126     private AtomicReference<byte[]> midKey = new AtomicReference<byte[]>();
127 
128     /**
129      * The number of levels in the block index tree. One if there is only root
130      * level, two for root and leaf levels, etc.
131      */
132     private int searchTreeLevel;
133 
134     /** A way to read {@link HFile} blocks at a given offset */
135     private CachingBlockReader cachingBlockReader;
136 
137     public BlockIndexReader(final KVComparator c, final int treeLevel,
138         final CachingBlockReader cachingBlockReader) {
139       this(c, treeLevel);
140       this.cachingBlockReader = cachingBlockReader;
141     }
142 
143     public BlockIndexReader(final KVComparator c, final int treeLevel)
144     {
145       comparator = c;
146       searchTreeLevel = treeLevel;
147     }
148 
149     /**
150      * @return true if the block index is empty.
151      */
152     public boolean isEmpty() {
153       return blockKeys.length == 0;
154     }
155 
156     /**
157      * Verifies that the block index is non-empty and throws an
158      * {@link IllegalStateException} otherwise.
159      */
160     public void ensureNonEmpty() {
161       if (blockKeys.length == 0) {
162         throw new IllegalStateException("Block index is empty or not loaded");
163       }
164     }
165 
166     /**
167      * Return the data block which contains this key. This function will only
168      * be called when the HFile version is larger than 1.
169      *
170      * @param key the key we are looking for
171      * @param currentBlock the current block, to avoid re-reading the same block
172      * @param cacheBlocks
173      * @param pread
174      * @param isCompaction
175      * @param expectedDataBlockEncoding the data block encoding the caller is
176      *          expecting the data block to be in, or null to not perform this
177      *          check and return the block irrespective of the encoding
178      * @return reader a basic way to load blocks
179      * @throws IOException
180      */
181     public HFileBlock seekToDataBlock(final Cell key, HFileBlock currentBlock, boolean cacheBlocks,
182         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
183         throws IOException {
184       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
185           cacheBlocks,
186           pread, isCompaction, expectedDataBlockEncoding);
187       if (blockWithScanInfo == null) {
188         return null;
189       } else {
190         return blockWithScanInfo.getHFileBlock();
191       }
192     }
193 
194     /**
195      * Return the BlockWithScanInfo which contains the DataBlock with other scan
196      * info such as nextIndexedKey. This function will only be called when the
197      * HFile version is larger than 1.
198      * 
199      * @param key
200      *          the key we are looking for
201      * @param currentBlock
202      *          the current block, to avoid re-reading the same block
203      * @param cacheBlocks
204      * @param pread
205      * @param isCompaction
206      * @param expectedDataBlockEncoding the data block encoding the caller is
207      *          expecting the data block to be in, or null to not perform this
208      *          check and return the block irrespective of the encoding.
209      * @return the BlockWithScanInfo which contains the DataBlock with other
210      *         scan info such as nextIndexedKey.
211      * @throws IOException
212      */
213     public BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
214         boolean cacheBlocks,
215         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
216         throws IOException {
217       int rootLevelIndex = rootBlockContainingKey(key);
218       if (rootLevelIndex < 0 || rootLevelIndex >= blockOffsets.length) {
219         return null;
220       }
221 
222       // the next indexed key
223       Cell nextIndexedKey = null;
224 
225       // Read the next-level (intermediate or leaf) index block.
226       long currentOffset = blockOffsets[rootLevelIndex];
227       int currentOnDiskSize = blockDataSizes[rootLevelIndex];
228 
229       if (rootLevelIndex < blockKeys.length - 1) {
230         nextIndexedKey = new KeyValue.KeyOnlyKeyValue(blockKeys[rootLevelIndex + 1]);
231       } else {
232         nextIndexedKey = KeyValueScanner.NO_NEXT_INDEXED_KEY;
233       }
234 
235       int lookupLevel = 1; // How many levels deep we are in our lookup.
236       int index = -1;
237 
238       HFileBlock block;
239       while (true) {
240 
241         if (currentBlock != null && currentBlock.getOffset() == currentOffset)
242         {
243           // Avoid reading the same block again, even with caching turned off.
244           // This is crucial for compaction-type workload which might have
245           // caching turned off. This is like a one-block cache inside the
246           // scanner.
247           block = currentBlock;
248         } else {
249           // Call HFile's caching block reader API. We always cache index
250           // blocks, otherwise we might get terrible performance.
251           boolean shouldCache = cacheBlocks || (lookupLevel < searchTreeLevel);
252           BlockType expectedBlockType;
253           if (lookupLevel < searchTreeLevel - 1) {
254             expectedBlockType = BlockType.INTERMEDIATE_INDEX;
255           } else if (lookupLevel == searchTreeLevel - 1) {
256             expectedBlockType = BlockType.LEAF_INDEX;
257           } else {
258             // this also accounts for ENCODED_DATA
259             expectedBlockType = BlockType.DATA;
260           }
261           block = cachingBlockReader.readBlock(currentOffset,
262           currentOnDiskSize, shouldCache, pread, isCompaction, true,
263               expectedBlockType, expectedDataBlockEncoding);
264         }
265 
266         if (block == null) {
267           throw new IOException("Failed to read block at offset " +
268               currentOffset + ", onDiskSize=" + currentOnDiskSize);
269         }
270 
271         // Found a data block, break the loop and check our level in the tree.
272         if (block.getBlockType().isData()) {
273           break;
274         }
275 
276         // Not a data block. This must be a leaf-level or intermediate-level
277         // index block. We don't allow going deeper than searchTreeLevel.
278         if (++lookupLevel > searchTreeLevel) {
279           throw new IOException("Search Tree Level overflow: lookupLevel="+
280               lookupLevel + ", searchTreeLevel=" + searchTreeLevel);
281         }
282 
283         // Locate the entry corresponding to the given key in the non-root
284         // (leaf or intermediate-level) index block.
285         ByteBuffer buffer = block.getBufferWithoutHeader();
286         index = locateNonRootIndexEntry(buffer, key, comparator);
287         if (index == -1) {
288           // This has to be changed
289           // For now change this to key value
290           KeyValue kv = KeyValueUtil.ensureKeyValue(key);
291           throw new IOException("The key "
292               + Bytes.toStringBinary(kv.getKey(), kv.getKeyOffset(), kv.getKeyLength())
293               + " is before the" + " first key of the non-root index block "
294               + block);
295         }
296 
297         currentOffset = buffer.getLong();
298         currentOnDiskSize = buffer.getInt();
299 
300         // Only update next indexed key if there is a next indexed key in the current level
301         byte[] tmpNextIndexedKey = getNonRootIndexedKey(buffer, index + 1);
302         if (tmpNextIndexedKey != null) {
303           nextIndexedKey = new KeyValue.KeyOnlyKeyValue(tmpNextIndexedKey);
304         }
305       }
306 
307       if (lookupLevel != searchTreeLevel) {
308         throw new IOException("Reached a data block at level " + lookupLevel +
309             " but the number of levels is " + searchTreeLevel);
310       }
311 
312       // set the next indexed key for the current block.
313       BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
314       return blockWithScanInfo;
315     }
316 
317     /**
318      * An approximation to the {@link HFile}'s mid-key. Operates on block
319      * boundaries, and does not go inside blocks. In other words, returns the
320      * first key of the middle block of the file.
321      *
322      * @return the first key of the middle block
323      */
324     public byte[] midkey() throws IOException {
325       if (rootCount == 0)
326         throw new IOException("HFile empty");
327 
328       byte[] targetMidKey = this.midKey.get();
329       if (targetMidKey != null) {
330         return targetMidKey;
331       }
332 
333       if (midLeafBlockOffset >= 0) {
334         if (cachingBlockReader == null) {
335           throw new IOException("Have to read the middle leaf block but " +
336               "no block reader available");
337         }
338 
339         // Caching, using pread, assuming this is not a compaction.
340         HFileBlock midLeafBlock = cachingBlockReader.readBlock(
341             midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true,
342             BlockType.LEAF_INDEX, null);
343 
344         ByteBuffer b = midLeafBlock.getBufferWithoutHeader();
345         int numDataBlocks = b.getInt();
346         int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1));
347         int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) -
348             keyRelOffset;
349         int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset
350             + SECONDARY_INDEX_ENTRY_OVERHEAD;
351         targetMidKey = ByteBufferUtils.toBytes(b, keyOffset, keyLen);
352       } else {
353         // The middle of the root-level index.
354         targetMidKey = blockKeys[rootCount / 2];
355       }
356 
357       this.midKey.set(targetMidKey);
358       return targetMidKey;
359     }
360 
361     /**
362      * @param i from 0 to {@link #getRootBlockCount() - 1}
363      */
364     public byte[] getRootBlockKey(int i) {
365       return blockKeys[i];
366     }
367 
368     /**
369      * @param i from 0 to {@link #getRootBlockCount() - 1}
370      */
371     public long getRootBlockOffset(int i) {
372       return blockOffsets[i];
373     }
374 
375     /**
376      * @param i zero-based index of a root-level block
377      * @return the on-disk size of the root-level block for version 2, or the
378      *         uncompressed size for version 1
379      */
380     public int getRootBlockDataSize(int i) {
381       return blockDataSizes[i];
382     }
383 
384     /**
385      * @return the number of root-level blocks in this block index
386      */
387     public int getRootBlockCount() {
388       return rootCount;
389     }
390 
391     /**
392      * Finds the root-level index block containing the given key.
393      *
394      * @param key
395      *          Key to find
396      * @return Offset of block containing <code>key</code> (between 0 and the
397      *         number of blocks - 1) or -1 if this file does not contain the
398      *         request.
399      */
400     public int rootBlockContainingKey(final byte[] key, int offset, int length) {
401       int pos = Bytes.binarySearch(blockKeys, key, offset, length, comparator);
402       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
403       // binarySearch's javadoc.
404 
405       if (pos >= 0) {
406         // This means this is an exact match with an element of blockKeys.
407         assert pos < blockKeys.length;
408         return pos;
409       }
410 
411       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
412       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
413       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
414       // key < blockKeys[0], meaning the file does not contain the given key.
415 
416       int i = -pos - 1;
417       assert 0 <= i && i <= blockKeys.length;
418       return i - 1;
419     }
420 
421     /**
422      * Finds the root-level index block containing the given key.
423      *
424      * @param key
425      *          Key to find
426      */
427     public int rootBlockContainingKey(final Cell key) {
428       int pos = Bytes.binarySearch(blockKeys, key, comparator);
429       // pos is between -(blockKeys.length + 1) to blockKeys.length - 1, see
430       // binarySearch's javadoc.
431 
432       if (pos >= 0) {
433         // This means this is an exact match with an element of blockKeys.
434         assert pos < blockKeys.length;
435         return pos;
436       }
437 
438       // Otherwise, pos = -(i + 1), where blockKeys[i - 1] < key < blockKeys[i],
439       // and i is in [0, blockKeys.length]. We are returning j = i - 1 such that
440       // blockKeys[j] <= key < blockKeys[j + 1]. In particular, j = -1 if
441       // key < blockKeys[0], meaning the file does not contain the given key.
442 
443       int i = -pos - 1;
444       assert 0 <= i && i <= blockKeys.length;
445       return i - 1;
446     }
447 
448     /**
449      * Adds a new entry in the root block index. Only used when reading.
450      *
451      * @param key Last key in the block
452      * @param offset file offset where the block is stored
453      * @param dataSize the uncompressed data size
454      */
455     private void add(final byte[] key, final long offset, final int dataSize) {
456       blockOffsets[rootCount] = offset;
457       blockKeys[rootCount] = key;
458       blockDataSizes[rootCount] = dataSize;
459       rootCount++;
460     }
461 
462     /**
463      * The indexed key at the ith position in the nonRootIndex. The position starts at 0.
464      * @param nonRootIndex
465      * @param i the ith position
466      * @return The indexed key at the ith position in the nonRootIndex.
467      */
468     private byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) {
469       int numEntries = nonRootIndex.getInt(0);
470       if (i < 0 || i >= numEntries) {
471         return null;
472       }
473 
474       // Entries start after the number of entries and the secondary index.
475       // The secondary index takes numEntries + 1 ints.
476       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
477       // Targetkey's offset relative to the end of secondary index
478       int targetKeyRelOffset = nonRootIndex.getInt(
479           Bytes.SIZEOF_INT * (i + 1));
480 
481       // The offset of the target key in the blockIndex buffer
482       int targetKeyOffset = entriesOffset     // Skip secondary index
483           + targetKeyRelOffset               // Skip all entries until mid
484           + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
485 
486       // We subtract the two consecutive secondary index elements, which
487       // gives us the size of the whole (offset, onDiskSize, key) tuple. We
488       // then need to subtract the overhead of offset and onDiskSize.
489       int targetKeyLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (i + 2)) -
490         targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
491 
492       return ByteBufferUtils.toBytes(nonRootIndex, targetKeyOffset, targetKeyLength);
493     }
494 
495     /**
496      * Performs a binary search over a non-root level index block. Utilizes the
497      * secondary index, which records the offsets of (offset, onDiskSize,
498      * firstKey) tuples of all entries.
499      * 
500      * @param key
501      *          the key we are searching for offsets to individual entries in
502      *          the blockIndex buffer
503      * @param nonRootIndex
504      *          the non-root index block buffer, starting with the secondary
505      *          index. The position is ignored.
506      * @return the index i in [0, numEntries - 1] such that keys[i] <= key <
507      *         keys[i + 1], if keys is the array of all keys being searched, or
508      *         -1 otherwise
509      * @throws IOException
510      */
511     static int binarySearchNonRootIndex(Cell key, ByteBuffer nonRootIndex,
512         KVComparator comparator) {
513 
514       int numEntries = nonRootIndex.getInt(0);
515       int low = 0;
516       int high = numEntries - 1;
517       int mid = 0;
518 
519       // Entries start after the number of entries and the secondary index.
520       // The secondary index takes numEntries + 1 ints.
521       int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
522 
523       // If we imagine that keys[-1] = -Infinity and
524       // keys[numEntries] = Infinity, then we are maintaining an invariant that
525       // keys[low - 1] < key < keys[high + 1] while narrowing down the range.
526       KeyValue.KeyOnlyKeyValue nonRootIndexKV = new KeyValue.KeyOnlyKeyValue();
527       while (low <= high) {
528         mid = (low + high) >>> 1;
529 
530         // Midkey's offset relative to the end of secondary index
531         int midKeyRelOffset = nonRootIndex.getInt(
532             Bytes.SIZEOF_INT * (mid + 1));
533 
534         // The offset of the middle key in the blockIndex buffer
535         int midKeyOffset = entriesOffset       // Skip secondary index
536             + midKeyRelOffset                  // Skip all entries until mid
537             + SECONDARY_INDEX_ENTRY_OVERHEAD;  // Skip offset and on-disk-size
538 
539         // We subtract the two consecutive secondary index elements, which
540         // gives us the size of the whole (offset, onDiskSize, key) tuple. We
541         // then need to subtract the overhead of offset and onDiskSize.
542         int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) -
543             midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD;
544 
545         // we have to compare in this order, because the comparator order
546         // has special logic when the 'left side' is a special key.
547         // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be
548         // done after HBASE-12224 & HBASE-12282
549         nonRootIndexKV.setKey(nonRootIndex.array(),
550             nonRootIndex.arrayOffset() + midKeyOffset, midLength);
551         int cmp = comparator.compareOnlyKeyPortion(key, nonRootIndexKV);
552 
553         // key lives above the midpoint
554         if (cmp > 0)
555           low = mid + 1; // Maintain the invariant that keys[low - 1] < key
556         // key lives below the midpoint
557         else if (cmp < 0)
558           high = mid - 1; // Maintain the invariant that key < keys[high + 1]
559         else
560           return mid; // exact match
561       }
562 
563       // As per our invariant, keys[low - 1] < key < keys[high + 1], meaning
564       // that low - 1 < high + 1 and (low - high) <= 1. As per the loop break
565       // condition, low >= high + 1. Therefore, low = high + 1.
566 
567       if (low != high + 1) {
568         throw new IllegalStateException("Binary search broken: low=" + low
569             + " " + "instead of " + (high + 1));
570       }
571 
572       // OK, our invariant says that keys[low - 1] < key < keys[low]. We need to
573       // return i such that keys[i] <= key < keys[i + 1]. Therefore i = low - 1.
574       int i = low - 1;
575 
576       // Some extra validation on the result.
577       if (i < -1 || i >= numEntries) {
578         throw new IllegalStateException("Binary search broken: result is " +
579             i + " but expected to be between -1 and (numEntries - 1) = " +
580             (numEntries - 1));
581       }
582 
583       return i;
584     }
585 
586     /**
587      * Search for one key using the secondary index in a non-root block. In case
588      * of success, positions the provided buffer at the entry of interest, where
589      * the file offset and the on-disk-size can be read.
590      *
591      * @param nonRootBlock
592      *          a non-root block without header. Initial position does not
593      *          matter.
594      * @param key
595      *          the byte array containing the key
596      * @return the index position where the given key was found, otherwise
597      *         return -1 in the case the given key is before the first key.
598      *
599      */
600     static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, Cell key,
601         KVComparator comparator) {
602       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
603 
604       if (entryIndex != -1) {
605         int numEntries = nonRootBlock.getInt(0);
606 
607         // The end of secondary index and the beginning of entries themselves.
608         int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2);
609 
610         // The offset of the entry we are interested in relative to the end of
611         // the secondary index.
612         int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT * (1 + entryIndex));
613 
614         nonRootBlock.position(entriesOffset + entryRelOffset);
615       }
616 
617       return entryIndex;
618     }
619 
620     /**
621      * Read in the root-level index from the given input stream. Must match
622      * what was written into the root level by
623      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
624      * offset that function returned.
625      *
626      * @param in the buffered input stream or wrapped byte input stream
627      * @param numEntries the number of root-level index entries
628      * @throws IOException
629      */
630     public void readRootIndex(DataInput in, final int numEntries)
631         throws IOException {
632       blockOffsets = new long[numEntries];
633       blockKeys = new byte[numEntries][];
634       blockDataSizes = new int[numEntries];
635 
636       // If index size is zero, no index was written.
637       if (numEntries > 0) {
638         for (int i = 0; i < numEntries; ++i) {
639           long offset = in.readLong();
640           int dataSize = in.readInt();
641           byte[] key = Bytes.readByteArray(in);
642           add(key, offset, dataSize);
643         }
644       }
645     }
646     
647     /**
648      * Read in the root-level index from the given input stream. Must match
649      * what was written into the root level by
650      * {@link BlockIndexWriter#writeIndexBlocks(FSDataOutputStream)} at the
651      * offset that function returned.
652      *
653      * @param blk the HFile block
654      * @param numEntries the number of root-level index entries
655      * @return the buffered input stream or wrapped byte input stream
656      * @throws IOException
657      */
658     public DataInputStream readRootIndex(HFileBlock blk, final int numEntries) throws IOException {
659       DataInputStream in = blk.getByteStream();
660       readRootIndex(in, numEntries);
661       return in;
662     }
663 
664     /**
665      * Read the root-level metadata of a multi-level block index. Based on
666      * {@link #readRootIndex(DataInput, int)}, but also reads metadata
667      * necessary to compute the mid-key in a multi-level index.
668      *
669      * @param blk the HFile block
670      * @param numEntries the number of root-level index entries
671      * @throws IOException
672      */
673     public void readMultiLevelIndexRoot(HFileBlock blk,
674         final int numEntries) throws IOException {
675       DataInputStream in = readRootIndex(blk, numEntries);
676       // after reading the root index the checksum bytes have to
677       // be subtracted to know if the mid key exists.
678       int checkSumBytes = blk.totalChecksumBytes();
679       if ((in.available() - checkSumBytes) < MID_KEY_METADATA_SIZE) {
680         // No mid-key metadata available.
681         return;
682       }
683       midLeafBlockOffset = in.readLong();
684       midLeafBlockOnDiskSize = in.readInt();
685       midKeyEntry = in.readInt();
686     }
687 
688     @Override
689     public String toString() {
690       StringBuilder sb = new StringBuilder();
691       sb.append("size=" + rootCount).append("\n");
692       for (int i = 0; i < rootCount; i++) {
693         sb.append("key=").append(KeyValue.keyToString(blockKeys[i]))
694             .append("\n  offset=").append(blockOffsets[i])
695             .append(", dataSize=" + blockDataSizes[i]).append("\n");
696       }
697       return sb.toString();
698     }
699 
700     @Override
701     public long heapSize() {
702       long heapSize = ClassSize.align(6 * ClassSize.REFERENCE +
703           2 * Bytes.SIZEOF_INT + ClassSize.OBJECT);
704 
705       // Mid-key metadata.
706       heapSize += MID_KEY_METADATA_SIZE;
707 
708       // Calculating the size of blockKeys
709       if (blockKeys != null) {
710         // Adding array + references overhead
711         heapSize += ClassSize.align(ClassSize.ARRAY + blockKeys.length
712             * ClassSize.REFERENCE);
713 
714         // Adding bytes
715         for (byte[] key : blockKeys) {
716           heapSize += ClassSize.align(ClassSize.ARRAY + key.length);
717         }
718       }
719 
720       if (blockOffsets != null) {
721         heapSize += ClassSize.align(ClassSize.ARRAY + blockOffsets.length
722             * Bytes.SIZEOF_LONG);
723       }
724 
725       if (blockDataSizes != null) {
726         heapSize += ClassSize.align(ClassSize.ARRAY + blockDataSizes.length
727             * Bytes.SIZEOF_INT);
728       }
729 
730       return ClassSize.align(heapSize);
731     }
732 
733   }
734 
735   /**
736    * Writes the block index into the output stream. Generate the tree from
737    * bottom up. The leaf level is written to disk as a sequence of inline
738    * blocks, if it is larger than a certain number of bytes. If the leaf level
739    * is not large enough, we write all entries to the root level instead.
740    *
741    * After all leaf blocks have been written, we end up with an index
742    * referencing the resulting leaf index blocks. If that index is larger than
743    * the allowed root index size, the writer will break it up into
744    * reasonable-size intermediate-level index block chunks write those chunks
745    * out, and create another index referencing those chunks. This will be
746    * repeated until the remaining index is small enough to become the root
747    * index. However, in most practical cases we will only have leaf-level
748    * blocks and the root index, or just the root index.
749    */
750   public static class BlockIndexWriter implements InlineBlockWriter {
751     /**
752      * While the index is being written, this represents the current block
753      * index referencing all leaf blocks, with one exception. If the file is
754      * being closed and there are not enough blocks to complete even a single
755      * leaf block, no leaf blocks get written and this contains the entire
756      * block index. After all levels of the index were written by
757      * {@link #writeIndexBlocks(FSDataOutputStream)}, this contains the final
758      * root-level index.
759      */
760     private BlockIndexChunk rootChunk = new BlockIndexChunk();
761 
762     /**
763      * Current leaf-level chunk. New entries referencing data blocks get added
764      * to this chunk until it grows large enough to be written to disk.
765      */
766     private BlockIndexChunk curInlineChunk = new BlockIndexChunk();
767 
768     /**
769      * The number of block index levels. This is one if there is only root
770      * level (even empty), two if there a leaf level and root level, and is
771      * higher if there are intermediate levels. This is only final after
772      * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
773      * initial value accounts for the root level, and will be increased to two
774      * as soon as we find out there is a leaf-level in
775      * {@link #blockWritten(long, int, int)}.
776      */
777     private int numLevels = 1;
778 
779     private HFileBlock.Writer blockWriter;
780     private byte[] firstKey = null;
781 
782     /**
783      * The total number of leaf-level entries, i.e. entries referenced by
784      * leaf-level blocks. For the data block index this is equal to the number
785      * of data blocks.
786      */
787     private long totalNumEntries;
788 
789     /** Total compressed size of all index blocks. */
790     private long totalBlockOnDiskSize;
791 
792     /** Total uncompressed size of all index blocks. */
793     private long totalBlockUncompressedSize;
794 
795     /** The maximum size guideline of all multi-level index blocks. */
796     private int maxChunkSize;
797 
798     /** Whether we require this block index to always be single-level. */
799     private boolean singleLevelOnly;
800 
801     /** CacheConfig, or null if cache-on-write is disabled */
802     private CacheConfig cacheConf;
803 
804     /** Name to use for computing cache keys */
805     private String nameForCaching;
806 
807     /** Creates a single-level block index writer */
808     public BlockIndexWriter() {
809       this(null, null, null);
810       singleLevelOnly = true;
811     }
812 
813     /**
814      * Creates a multi-level block index writer.
815      *
816      * @param blockWriter the block writer to use to write index blocks
817      * @param cacheConf used to determine when and how a block should be cached-on-write.
818      */
819     public BlockIndexWriter(HFileBlock.Writer blockWriter,
820         CacheConfig cacheConf, String nameForCaching) {
821       if ((cacheConf == null) != (nameForCaching == null)) {
822         throw new IllegalArgumentException("Block cache and file name for " +
823             "caching must be both specified or both null");
824       }
825 
826       this.blockWriter = blockWriter;
827       this.cacheConf = cacheConf;
828       this.nameForCaching = nameForCaching;
829       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
830     }
831 
832     public void setMaxChunkSize(int maxChunkSize) {
833       if (maxChunkSize <= 0) {
834         throw new IllegalArgumentException("Invald maximum index block size");
835       }
836       this.maxChunkSize = maxChunkSize;
837     }
838 
839     /**
840      * Writes the root level and intermediate levels of the block index into
841      * the output stream, generating the tree from bottom up. Assumes that the
842      * leaf level has been inline-written to the disk if there is enough data
843      * for more than one leaf block. We iterate by breaking the current level
844      * of the block index, starting with the index of all leaf-level blocks,
845      * into chunks small enough to be written to disk, and generate its parent
846      * level, until we end up with a level small enough to become the root
847      * level.
848      *
849      * If the leaf level is not large enough, there is no inline block index
850      * anymore, so we only write that level of block index to disk as the root
851      * level.
852      *
853      * @param out FSDataOutputStream
854      * @return position at which we entered the root-level index.
855      * @throws IOException
856      */
857     public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
858       if (curInlineChunk != null && curInlineChunk.getNumEntries() != 0) {
859         throw new IOException("Trying to write a multi-level block index, " +
860             "but are " + curInlineChunk.getNumEntries() + " entries in the " +
861             "last inline chunk.");
862       }
863 
864       // We need to get mid-key metadata before we create intermediate
865       // indexes and overwrite the root chunk.
866       byte[] midKeyMetadata = numLevels > 1 ? rootChunk.getMidKeyMetadata()
867           : null;
868 
869       if (curInlineChunk != null) {
870         while (rootChunk.getRootSize() > maxChunkSize) {
871           rootChunk = writeIntermediateLevel(out, rootChunk);
872           numLevels += 1;
873         }
874       }
875 
876       // write the root level
877       long rootLevelIndexPos = out.getPos();
878 
879       {
880         DataOutput blockStream =
881             blockWriter.startWriting(BlockType.ROOT_INDEX);
882         rootChunk.writeRoot(blockStream);
883         if (midKeyMetadata != null)
884           blockStream.write(midKeyMetadata);
885         blockWriter.writeHeaderAndData(out);
886       }
887 
888       // Add root index block size
889       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
890       totalBlockUncompressedSize +=
891           blockWriter.getUncompressedSizeWithoutHeader();
892 
893       if (LOG.isTraceEnabled()) {
894         LOG.trace("Wrote a " + numLevels + "-level index with root level at pos "
895           + rootLevelIndexPos + ", " + rootChunk.getNumEntries()
896           + " root-level entries, " + totalNumEntries + " total entries, "
897           + StringUtils.humanReadableInt(this.totalBlockOnDiskSize) +
898           " on-disk size, "
899           + StringUtils.humanReadableInt(totalBlockUncompressedSize) +
900           " total uncompressed size.");
901       }
902       return rootLevelIndexPos;
903     }
904 
905     /**
906      * Writes the block index data as a single level only. Does not do any
907      * block framing.
908      *
909      * @param out the buffered output stream to write the index to. Typically a
910      *          stream writing into an {@link HFile} block.
911      * @param description a short description of the index being written. Used
912      *          in a log message.
913      * @throws IOException
914      */
915     public void writeSingleLevelIndex(DataOutput out, String description)
916         throws IOException {
917       expectNumLevels(1);
918 
919       if (!singleLevelOnly)
920         throw new IOException("Single-level mode is turned off");
921 
922       if (rootChunk.getNumEntries() > 0)
923         throw new IOException("Root-level entries already added in " +
924             "single-level mode");
925 
926       rootChunk = curInlineChunk;
927       curInlineChunk = new BlockIndexChunk();
928 
929       if (LOG.isTraceEnabled()) {
930         LOG.trace("Wrote a single-level " + description + " index with "
931           + rootChunk.getNumEntries() + " entries, " + rootChunk.getRootSize()
932           + " bytes");
933       }
934       rootChunk.writeRoot(out);
935     }
936 
937     /**
938      * Split the current level of the block index into intermediate index
939      * blocks of permitted size and write those blocks to disk. Return the next
940      * level of the block index referencing those intermediate-level blocks.
941      *
942      * @param out
943      * @param currentLevel the current level of the block index, such as the a
944      *          chunk referencing all leaf-level index blocks
945      * @return the parent level block index, which becomes the root index after
946      *         a few (usually zero) iterations
947      * @throws IOException
948      */
949     private BlockIndexChunk writeIntermediateLevel(FSDataOutputStream out,
950         BlockIndexChunk currentLevel) throws IOException {
951       // Entries referencing intermediate-level blocks we are about to create.
952       BlockIndexChunk parent = new BlockIndexChunk();
953 
954       // The current intermediate-level block index chunk.
955       BlockIndexChunk curChunk = new BlockIndexChunk();
956 
957       for (int i = 0; i < currentLevel.getNumEntries(); ++i) {
958         curChunk.add(currentLevel.getBlockKey(i),
959             currentLevel.getBlockOffset(i), currentLevel.getOnDiskDataSize(i));
960 
961         if (curChunk.getRootSize() >= maxChunkSize)
962           writeIntermediateBlock(out, parent, curChunk);
963       }
964 
965       if (curChunk.getNumEntries() > 0) {
966         writeIntermediateBlock(out, parent, curChunk);
967       }
968 
969       return parent;
970     }
971 
972     private void writeIntermediateBlock(FSDataOutputStream out,
973         BlockIndexChunk parent, BlockIndexChunk curChunk) throws IOException {
974       long beginOffset = out.getPos();
975       DataOutputStream dos = blockWriter.startWriting(
976           BlockType.INTERMEDIATE_INDEX);
977       curChunk.writeNonRoot(dos);
978       byte[] curFirstKey = curChunk.getBlockKey(0);
979       blockWriter.writeHeaderAndData(out);
980 
981       if (cacheConf != null) {
982         HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
983         cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
984           beginOffset), blockForCaching);
985       }
986 
987       // Add intermediate index block size
988       totalBlockOnDiskSize += blockWriter.getOnDiskSizeWithoutHeader();
989       totalBlockUncompressedSize +=
990           blockWriter.getUncompressedSizeWithoutHeader();
991 
992       // OFFSET is the beginning offset the chunk of block index entries.
993       // SIZE is the total byte size of the chunk of block index entries
994       // + the secondary index size
995       // FIRST_KEY is the first key in the chunk of block index
996       // entries.
997       parent.add(curFirstKey, beginOffset,
998           blockWriter.getOnDiskSizeWithHeader());
999 
1000       // clear current block index chunk
1001       curChunk.clear();
1002       curFirstKey = null;
1003     }
1004 
1005     /**
1006      * @return how many block index entries there are in the root level
1007      */
1008     public final int getNumRootEntries() {
1009       return rootChunk.getNumEntries();
1010     }
1011 
1012     /**
1013      * @return the number of levels in this block index.
1014      */
1015     public int getNumLevels() {
1016       return numLevels;
1017     }
1018 
1019     private void expectNumLevels(int expectedNumLevels) {
1020       if (numLevels != expectedNumLevels) {
1021         throw new IllegalStateException("Number of block index levels is "
1022             + numLevels + "but is expected to be " + expectedNumLevels);
1023       }
1024     }
1025 
1026     /**
1027      * Whether there is an inline block ready to be written. In general, we
1028      * write an leaf-level index block as an inline block as soon as its size
1029      * as serialized in the non-root format reaches a certain threshold.
1030      */
1031     @Override
1032     public boolean shouldWriteBlock(boolean closing) {
1033       if (singleLevelOnly) {
1034         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1035       }
1036 
1037       if (curInlineChunk == null) {
1038         throw new IllegalStateException("curInlineChunk is null; has shouldWriteBlock been " +
1039             "called with closing=true and then called again?");
1040       }
1041 
1042       if (curInlineChunk.getNumEntries() == 0) {
1043         return false;
1044       }
1045 
1046       // We do have some entries in the current inline chunk.
1047       if (closing) {
1048         if (rootChunk.getNumEntries() == 0) {
1049           // We did not add any leaf-level blocks yet. Instead of creating a
1050           // leaf level with one block, move these entries to the root level.
1051 
1052           expectNumLevels(1);
1053           rootChunk = curInlineChunk;
1054           curInlineChunk = null;  // Disallow adding any more index entries.
1055           return false;
1056         }
1057 
1058         return true;
1059       } else {
1060         return curInlineChunk.getNonRootSize() >= maxChunkSize;
1061       }
1062     }
1063 
1064     /**
1065      * Write out the current inline index block. Inline blocks are non-root
1066      * blocks, so the non-root index format is used.
1067      *
1068      * @param out
1069      */
1070     @Override
1071     public void writeInlineBlock(DataOutput out) throws IOException {
1072       if (singleLevelOnly)
1073         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1074 
1075       // Write the inline block index to the output stream in the non-root
1076       // index block format.
1077       curInlineChunk.writeNonRoot(out);
1078 
1079       // Save the first key of the inline block so that we can add it to the
1080       // parent-level index.
1081       firstKey = curInlineChunk.getBlockKey(0);
1082 
1083       // Start a new inline index block
1084       curInlineChunk.clear();
1085     }
1086 
1087     /**
1088      * Called after an inline block has been written so that we can add an
1089      * entry referring to that block to the parent-level index.
1090      */
1091     @Override
1092     public void blockWritten(long offset, int onDiskSize, int uncompressedSize) {
1093       // Add leaf index block size
1094       totalBlockOnDiskSize += onDiskSize;
1095       totalBlockUncompressedSize += uncompressedSize;
1096 
1097       if (singleLevelOnly)
1098         throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);
1099 
1100       if (firstKey == null) {
1101         throw new IllegalStateException("Trying to add second-level index " +
1102             "entry with offset=" + offset + " and onDiskSize=" + onDiskSize +
1103             "but the first key was not set in writeInlineBlock");
1104       }
1105 
1106       if (rootChunk.getNumEntries() == 0) {
1107         // We are writing the first leaf block, so increase index level.
1108         expectNumLevels(1);
1109         numLevels = 2;
1110       }
1111 
1112       // Add another entry to the second-level index. Include the number of
1113       // entries in all previous leaf-level chunks for mid-key calculation.
1114       rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);
1115       firstKey = null;
1116     }
1117 
1118     @Override
1119     public BlockType getInlineBlockType() {
1120       return BlockType.LEAF_INDEX;
1121     }
1122 
1123     /**
1124      * Add one index entry to the current leaf-level block. When the leaf-level
1125      * block gets large enough, it will be flushed to disk as an inline block.
1126      *
1127      * @param firstKey the first key of the data block
1128      * @param blockOffset the offset of the data block
1129      * @param blockDataSize the on-disk size of the data block ({@link HFile}
1130      *          format version 2), or the uncompressed size of the data block (
1131      *          {@link HFile} format version 1).
1132      */
1133     public void addEntry(byte[] firstKey, long blockOffset, int blockDataSize) {
1134       curInlineChunk.add(firstKey, blockOffset, blockDataSize);
1135       ++totalNumEntries;
1136     }
1137 
1138     /**
1139      * @throws IOException if we happened to write a multi-level index.
1140      */
1141     public void ensureSingleLevel() throws IOException {
1142       if (numLevels > 1) {
1143         throw new IOException ("Wrote a " + numLevels + "-level index with " +
1144             rootChunk.getNumEntries() + " root-level entries, but " +
1145             "this is expected to be a single-level block index.");
1146       }
1147     }
1148 
1149     /**
1150      * @return true if we are using cache-on-write. This is configured by the
1151      *         caller of the constructor by either passing a valid block cache
1152      *         or null.
1153      */
1154     @Override
1155     public boolean getCacheOnWrite() {
1156       return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
1157     }
1158 
1159     /**
1160      * The total uncompressed size of the root index block, intermediate-level
1161      * index blocks, and leaf-level index blocks.
1162      *
1163      * @return the total uncompressed size of all index blocks
1164      */
1165     public long getTotalUncompressedSize() {
1166       return totalBlockUncompressedSize;
1167     }
1168 
1169   }
1170 
1171   /**
1172    * A single chunk of the block index in the process of writing. The data in
1173    * this chunk can become a leaf-level, intermediate-level, or root index
1174    * block.
1175    */
1176   static class BlockIndexChunk {
1177 
1178     /** First keys of the key range corresponding to each index entry. */
1179     private final List<byte[]> blockKeys = new ArrayList<byte[]>();
1180 
1181     /** Block offset in backing stream. */
1182     private final List<Long> blockOffsets = new ArrayList<Long>();
1183 
1184     /** On-disk data sizes of lower-level data or index blocks. */
1185     private final List<Integer> onDiskDataSizes = new ArrayList<Integer>();
1186 
1187     /**
1188      * The cumulative number of sub-entries, i.e. entries on deeper-level block
1189      * index entries. numSubEntriesAt[i] is the number of sub-entries in the
1190      * blocks corresponding to this chunk's entries #0 through #i inclusively.
1191      */
1192     private final List<Long> numSubEntriesAt = new ArrayList<Long>();
1193 
1194     /**
1195      * The offset of the next entry to be added, relative to the end of the
1196      * "secondary index" in the "non-root" format representation of this index
1197      * chunk. This is the next value to be added to the secondary index.
1198      */
1199     private int curTotalNonRootEntrySize = 0;
1200 
1201     /**
1202      * The accumulated size of this chunk if stored in the root index format.
1203      */
1204     private int curTotalRootSize = 0;
1205 
1206     /**
1207      * The "secondary index" used for binary search over variable-length
1208      * records in a "non-root" format block. These offsets are relative to the
1209      * end of this secondary index.
1210      */
1211     private final List<Integer> secondaryIndexOffsetMarks =
1212         new ArrayList<Integer>();
1213 
1214     /**
1215      * Adds a new entry to this block index chunk.
1216      *
1217      * @param firstKey the first key in the block pointed to by this entry
1218      * @param blockOffset the offset of the next-level block pointed to by this
1219      *          entry
1220      * @param onDiskDataSize the on-disk data of the block pointed to by this
1221      *          entry, including header size
1222      * @param curTotalNumSubEntries if this chunk is the root index chunk under
1223      *          construction, this specifies the current total number of
1224      *          sub-entries in all leaf-level chunks, including the one
1225      *          corresponding to the second-level entry being added.
1226      */
1227     void add(byte[] firstKey, long blockOffset, int onDiskDataSize,
1228         long curTotalNumSubEntries) {
1229       // Record the offset for the secondary index
1230       secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);
1231       curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD
1232           + firstKey.length;
1233 
1234       curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT
1235           + WritableUtils.getVIntSize(firstKey.length) + firstKey.length;
1236 
1237       blockKeys.add(firstKey);
1238       blockOffsets.add(blockOffset);
1239       onDiskDataSizes.add(onDiskDataSize);
1240 
1241       if (curTotalNumSubEntries != -1) {
1242         numSubEntriesAt.add(curTotalNumSubEntries);
1243 
1244         // Make sure the parallel arrays are in sync.
1245         if (numSubEntriesAt.size() != blockKeys.size()) {
1246           throw new IllegalStateException("Only have key/value count " +
1247               "stats for " + numSubEntriesAt.size() + " block index " +
1248               "entries out of " + blockKeys.size());
1249         }
1250       }
1251     }
1252 
1253     /**
1254      * The same as {@link #add(byte[], long, int, long)} but does not take the
1255      * key/value into account. Used for single-level indexes.
1256      *
1257      * @see {@link #add(byte[], long, int, long)}
1258      */
1259     public void add(byte[] firstKey, long blockOffset, int onDiskDataSize) {
1260       add(firstKey, blockOffset, onDiskDataSize, -1);
1261     }
1262 
1263     public void clear() {
1264       blockKeys.clear();
1265       blockOffsets.clear();
1266       onDiskDataSizes.clear();
1267       secondaryIndexOffsetMarks.clear();
1268       numSubEntriesAt.clear();
1269       curTotalNonRootEntrySize = 0;
1270       curTotalRootSize = 0;
1271     }
1272 
1273     /**
1274      * Finds the entry corresponding to the deeper-level index block containing
1275      * the given deeper-level entry (a "sub-entry"), assuming a global 0-based
1276      * ordering of sub-entries.
1277      *
1278      * <p>
1279      * <i> Implementation note. </i> We are looking for i such that
1280      * numSubEntriesAt[i - 1] <= k < numSubEntriesAt[i], because a deeper-level
1281      * block #i (0-based) contains sub-entries # numSubEntriesAt[i - 1]'th
1282      * through numSubEntriesAt[i] - 1, assuming a global 0-based ordering of
1283      * sub-entries. i is by definition the insertion point of k in
1284      * numSubEntriesAt.
1285      *
1286      * @param k sub-entry index, from 0 to the total number sub-entries - 1
1287      * @return the 0-based index of the entry corresponding to the given
1288      *         sub-entry
1289      */
1290     public int getEntryBySubEntry(long k) {
1291       // We define mid-key as the key corresponding to k'th sub-entry
1292       // (0-based).
1293 
1294       int i = Collections.binarySearch(numSubEntriesAt, k);
1295 
1296       // Exact match: cumulativeWeight[i] = k. This means chunks #0 through
1297       // #i contain exactly k sub-entries, and the sub-entry #k (0-based)
1298       // is in the (i + 1)'th chunk.
1299       if (i >= 0)
1300         return i + 1;
1301 
1302       // Inexact match. Return the insertion point.
1303       return -i - 1;
1304     }
1305 
1306     /**
1307      * Used when writing the root block index of a multi-level block index.
1308      * Serializes additional information allowing to efficiently identify the
1309      * mid-key.
1310      *
1311      * @return a few serialized fields for finding the mid-key
1312      * @throws IOException if could not create metadata for computing mid-key
1313      */
1314     public byte[] getMidKeyMetadata() throws IOException {
1315       ByteArrayOutputStream baos = new ByteArrayOutputStream(
1316           MID_KEY_METADATA_SIZE);
1317       DataOutputStream baosDos = new DataOutputStream(baos);
1318       long totalNumSubEntries = numSubEntriesAt.get(blockKeys.size() - 1);
1319       if (totalNumSubEntries == 0) {
1320         throw new IOException("No leaf-level entries, mid-key unavailable");
1321       }
1322       long midKeySubEntry = (totalNumSubEntries - 1) / 2;
1323       int midKeyEntry = getEntryBySubEntry(midKeySubEntry);
1324 
1325       baosDos.writeLong(blockOffsets.get(midKeyEntry));
1326       baosDos.writeInt(onDiskDataSizes.get(midKeyEntry));
1327 
1328       long numSubEntriesBefore = midKeyEntry > 0
1329           ? numSubEntriesAt.get(midKeyEntry - 1) : 0;
1330       long subEntryWithinEntry = midKeySubEntry - numSubEntriesBefore;
1331       if (subEntryWithinEntry < 0 || subEntryWithinEntry > Integer.MAX_VALUE)
1332       {
1333         throw new IOException("Could not identify mid-key index within the "
1334             + "leaf-level block containing mid-key: out of range ("
1335             + subEntryWithinEntry + ", numSubEntriesBefore="
1336             + numSubEntriesBefore + ", midKeySubEntry=" + midKeySubEntry
1337             + ")");
1338       }
1339 
1340       baosDos.writeInt((int) subEntryWithinEntry);
1341 
1342       if (baosDos.size() != MID_KEY_METADATA_SIZE) {
1343         throw new IOException("Could not write mid-key metadata: size=" +
1344             baosDos.size() + ", correct size: " + MID_KEY_METADATA_SIZE);
1345       }
1346 
1347       // Close just to be good citizens, although this has no effect.
1348       baos.close();
1349 
1350       return baos.toByteArray();
1351     }
1352 
1353     /**
1354      * Writes the block index chunk in the non-root index block format. This
1355      * format contains the number of entries, an index of integer offsets
1356      * for quick binary search on variable-length records, and tuples of
1357      * block offset, on-disk block size, and the first key for each entry.
1358      *
1359      * @param out
1360      * @throws IOException
1361      */
1362     void writeNonRoot(DataOutput out) throws IOException {
1363       // The number of entries in the block.
1364       out.writeInt(blockKeys.size());
1365 
1366       if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {
1367         throw new IOException("Corrupted block index chunk writer: " +
1368             blockKeys.size() + " entries but " +
1369             secondaryIndexOffsetMarks.size() + " secondary index items");
1370       }
1371 
1372       // For each entry, write a "secondary index" of relative offsets to the
1373       // entries from the end of the secondary index. This works, because at
1374       // read time we read the number of entries and know where the secondary
1375       // index ends.
1376       for (int currentSecondaryIndex : secondaryIndexOffsetMarks)
1377         out.writeInt(currentSecondaryIndex);
1378 
1379       // We include one other element in the secondary index to calculate the
1380       // size of each entry more easily by subtracting secondary index elements.
1381       out.writeInt(curTotalNonRootEntrySize);
1382 
1383       for (int i = 0; i < blockKeys.size(); ++i) {
1384         out.writeLong(blockOffsets.get(i));
1385         out.writeInt(onDiskDataSizes.get(i));
1386         out.write(blockKeys.get(i));
1387       }
1388     }
1389 
1390     /**
1391      * @return the size of this chunk if stored in the non-root index block
1392      *         format
1393      */
1394     int getNonRootSize() {
1395       return Bytes.SIZEOF_INT                          // Number of entries
1396           + Bytes.SIZEOF_INT * (blockKeys.size() + 1)  // Secondary index
1397           + curTotalNonRootEntrySize;                  // All entries
1398     }
1399 
1400     /**
1401      * Writes this chunk into the given output stream in the root block index
1402      * format. This format is similar to the {@link HFile} version 1 block
1403      * index format, except that we store on-disk size of the block instead of
1404      * its uncompressed size.
1405      *
1406      * @param out the data output stream to write the block index to. Typically
1407      *          a stream writing into an {@link HFile} block.
1408      * @throws IOException
1409      */
1410     void writeRoot(DataOutput out) throws IOException {
1411       for (int i = 0; i < blockKeys.size(); ++i) {
1412         out.writeLong(blockOffsets.get(i));
1413         out.writeInt(onDiskDataSizes.get(i));
1414         Bytes.writeByteArray(out, blockKeys.get(i));
1415       }
1416     }
1417 
1418     /**
1419      * @return the size of this chunk if stored in the root index block format
1420      */
1421     int getRootSize() {
1422       return curTotalRootSize;
1423     }
1424 
1425     /**
1426      * @return the number of entries in this block index chunk
1427      */
1428     public int getNumEntries() {
1429       return blockKeys.size();
1430     }
1431 
1432     public byte[] getBlockKey(int i) {
1433       return blockKeys.get(i);
1434     }
1435 
1436     public long getBlockOffset(int i) {
1437       return blockOffsets.get(i);
1438     }
1439 
1440     public int getOnDiskDataSize(int i) {
1441       return onDiskDataSizes.get(i);
1442     }
1443 
1444     public long getCumulativeNumKV(int i) {
1445       if (i < 0)
1446         return 0;
1447       return numSubEntriesAt.get(i);
1448     }
1449 
1450   }
1451 
1452   public static int getMaxChunkSize(Configuration conf) {
1453     return conf.getInt(MAX_CHUNK_SIZE_KEY, DEFAULT_MAX_CHUNK_SIZE);
1454   }
1455 }