View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import static org.junit.Assert.assertEquals;
20  import static org.junit.Assert.fail;
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.ByteArrayOutputStream;
24  import java.io.DataInputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.List;
31  import java.util.Random;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.CategoryBasedTimeout;
36  import org.apache.hadoop.hbase.HBaseTestingUtility;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.KeyValue.Type;
40  import org.apache.hadoop.hbase.KeyValueUtil;
41  import org.apache.hadoop.hbase.Tag;
42  import org.apache.hadoop.hbase.io.compress.Compression;
43  import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
44  import org.apache.hadoop.hbase.io.hfile.HFileContext;
45  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
46  import org.apache.hadoop.hbase.testclassification.LargeTests;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.test.RedundantKVGenerator;
49  import org.junit.Rule;
50  import org.junit.Test;
51  import org.junit.experimental.categories.Category;
52  import org.junit.rules.TestRule;
53  import org.junit.runner.RunWith;
54  import org.junit.runners.Parameterized;
55  import org.junit.runners.Parameterized.Parameters;
56  
57  /**
58   * Test all of the data block encoding algorithms for correctness. Most of the
59   * class generate data which will test different branches in code.
60   */
61  @Category(LargeTests.class)
62  @RunWith(Parameterized.class)
63  public class TestDataBlockEncoders {
64  
65    private static final Log LOG = LogFactory.getLog(TestDataBlockEncoders.class);
66  
67    @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
68        withTimeout(this.getClass()).withLookingForStuckThread(true).build();
69  
70    private static int NUMBER_OF_KV = 10000;
71    private static int NUM_RANDOM_SEEKS = 1000;
72  
73    private static int ENCODED_DATA_OFFSET = HConstants.HFILEBLOCK_HEADER_SIZE
74        + DataBlockEncoding.ID_SIZE;
75  
76    private RedundantKVGenerator generator = new RedundantKVGenerator();
77    private Random randomizer = new Random(42l);
78  
79    private final boolean includesMemstoreTS;
80    private final boolean includesTags;
81  
82    @Parameters
83    public static Collection<Object[]> parameters() {
84      return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
85    }
86    public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag) {
87      this.includesMemstoreTS = includesMemstoreTS;
88      this.includesTags = includesTag;
89    }
90    
91    private HFileBlockEncodingContext getEncodingContext(Compression.Algorithm algo,
92        DataBlockEncoding encoding) {
93      DataBlockEncoder encoder = encoding.getEncoder();
94      HFileContext meta = new HFileContextBuilder()
95                          .withHBaseCheckSum(false)
96                          .withIncludesMvcc(includesMemstoreTS)
97                          .withIncludesTags(includesTags)
98                          .withCompression(algo).build();
99      if (encoder != null) {
100       return encoder.newDataBlockEncodingContext(encoding,
101           HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
102     } else {
103       return new HFileBlockDefaultEncodingContext(encoding,
104           HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
105     }
106   }
107 
108   /**
109    * Test data block encoding of empty KeyValue.
110    * 
111    * @throws IOException
112    *           On test failure.
113    */
114   @Test
115   public void testEmptyKeyValues() throws IOException {
116     List<KeyValue> kvList = new ArrayList<KeyValue>();
117     byte[] row = new byte[0];
118     byte[] family = new byte[0];
119     byte[] qualifier = new byte[0];
120     byte[] value = new byte[0];
121     if (!includesTags) {
122       kvList.add(new KeyValue(row, family, qualifier, 0l, value));
123       kvList.add(new KeyValue(row, family, qualifier, 0l, value));
124     } else {
125       byte[] metaValue1 = Bytes.toBytes("metaValue1");
126       byte[] metaValue2 = Bytes.toBytes("metaValue2");
127       kvList.add(new KeyValue(row, family, qualifier, 0l, value, new Tag[] { new Tag((byte) 1,
128           metaValue1) }));
129       kvList.add(new KeyValue(row, family, qualifier, 0l, value, new Tag[] { new Tag((byte) 1,
130           metaValue2) }));
131     }
132     testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
133   }
134 
135   /**
136    * Test KeyValues with negative timestamp.
137    * 
138    * @throws IOException
139    *           On test failure.
140    */
141   @Test
142   public void testNegativeTimestamps() throws IOException {
143     List<KeyValue> kvList = new ArrayList<KeyValue>();
144     byte[] row = new byte[0];
145     byte[] family = new byte[0];
146     byte[] qualifier = new byte[0];
147     byte[] value = new byte[0];
148     if (includesTags) {
149       byte[] metaValue1 = Bytes.toBytes("metaValue1");
150       byte[] metaValue2 = Bytes.toBytes("metaValue2");
151       kvList.add(new KeyValue(row, family, qualifier, 0l, value, new Tag[] { new Tag((byte) 1,
152           metaValue1) }));
153       kvList.add(new KeyValue(row, family, qualifier, 0l, value, new Tag[] { new Tag((byte) 1,
154           metaValue2) }));
155     } else {
156       kvList.add(new KeyValue(row, family, qualifier, -1l, Type.Put, value));
157       kvList.add(new KeyValue(row, family, qualifier, -2l, Type.Put, value));
158     }
159     testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
160   }
161 
162 
163   /**
164    * Test whether compression -> decompression gives the consistent results on
165    * pseudorandom sample.
166    * @throws IOException On test failure.
167    */
168   @Test
169   public void testExecutionOnSample() throws IOException {
170     List<KeyValue> kvList = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
171     testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
172   }
173 
174   /**
175    * Test seeking while file is encoded.
176    */
177   @Test
178   public void testSeekingOnSample() throws IOException {
179     List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
180 
181     // create all seekers
182     List<DataBlockEncoder.EncodedSeeker> encodedSeekers = 
183         new ArrayList<DataBlockEncoder.EncodedSeeker>();
184     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
185       LOG.info("Encoding: " + encoding);
186       // Off heap block data support not added for PREFIX_TREE DBE yet.
187       // TODO remove this once support is added. HBASE-12298
188       if (encoding == DataBlockEncoding.PREFIX_TREE) continue;
189       DataBlockEncoder encoder = encoding.getEncoder();
190       if (encoder == null) {
191         continue;
192       }
193       LOG.info("Encoder: " + encoder);
194       ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
195           getEncodingContext(Compression.Algorithm.NONE, encoding));
196       HFileContext meta = new HFileContextBuilder()
197                           .withHBaseCheckSum(false)
198                           .withIncludesMvcc(includesMemstoreTS)
199                           .withIncludesTags(includesTags)
200                           .withCompression(Compression.Algorithm.NONE)
201                           .build();
202       DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
203           encoder.newDataBlockDecodingContext(meta));
204       seeker.setCurrentBuffer(encodedBuffer);
205       encodedSeekers.add(seeker);
206     }
207     LOG.info("Testing it!");
208     // test it!
209     // try a few random seeks
210     for (boolean seekBefore : new boolean[] { false, true }) {
211       for (int i = 0; i < NUM_RANDOM_SEEKS; ++i) {
212         int keyValueId;
213         if (!seekBefore) {
214           keyValueId = randomizer.nextInt(sampleKv.size());
215         } else {
216           keyValueId = randomizer.nextInt(sampleKv.size() - 1) + 1;
217         }
218 
219         KeyValue keyValue = sampleKv.get(keyValueId);
220         checkSeekingConsistency(encodedSeekers, seekBefore, keyValue);
221       }
222     }
223 
224     // check edge cases
225     LOG.info("Checking edge cases");
226     checkSeekingConsistency(encodedSeekers, false, sampleKv.get(0));
227     for (boolean seekBefore : new boolean[] { false, true }) {
228       checkSeekingConsistency(encodedSeekers, seekBefore, sampleKv.get(sampleKv.size() - 1));
229       KeyValue midKv = sampleKv.get(sampleKv.size() / 2);
230       KeyValue lastMidKv =KeyValueUtil.createLastOnRowCol(midKv);
231       checkSeekingConsistency(encodedSeekers, seekBefore, lastMidKv);
232     }
233     LOG.info("Done");
234   }
235 
236   static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> kvs,
237       HFileBlockEncodingContext encodingContext) throws IOException {
238     DataBlockEncoder encoder = encoding.getEncoder();
239     ByteArrayOutputStream baos = new ByteArrayOutputStream();
240     baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
241     DataOutputStream dos = new DataOutputStream(baos);
242     encoder.startBlockEncoding(encodingContext, dos);
243     for (KeyValue kv : kvs) {
244       encoder.encode(kv, encodingContext, dos);
245     }
246     BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
247     baos.writeTo(stream);
248     encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
249     byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
250     System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, encodedData.length);
251     return ByteBuffer.wrap(encodedData);
252   }
253 
254   @Test
255   public void testNextOnSample() throws IOException {
256     List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
257 
258     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
259       if (encoding.getEncoder() == null) {
260         continue;
261       }
262       DataBlockEncoder encoder = encoding.getEncoder();
263       ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
264           getEncodingContext(Compression.Algorithm.NONE, encoding));
265       HFileContext meta = new HFileContextBuilder()
266                           .withHBaseCheckSum(false)
267                           .withIncludesMvcc(includesMemstoreTS)
268                           .withIncludesTags(includesTags)
269                           .withCompression(Compression.Algorithm.NONE)
270                           .build();
271       DataBlockEncoder.EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
272           encoder.newDataBlockDecodingContext(meta));
273       seeker.setCurrentBuffer(encodedBuffer);
274       int i = 0;
275       do {
276         KeyValue expectedKeyValue = sampleKv.get(i);
277         KeyValue keyValue = KeyValueUtil.copyToNewKeyValue(seeker.getKeyValue());
278         if (0 != Bytes.compareTo(keyValue.getBuffer(), keyValue.getOffset(), keyValue.getLength(),
279             expectedKeyValue.getBuffer(), expectedKeyValue.getOffset(),
280             expectedKeyValue.getLength())) {
281 
282           int commonPrefix = 0;
283           byte[] left = keyValue.getBuffer();
284           byte[] right = expectedKeyValue.getBuffer();
285           int leftOff = keyValue.getOffset();
286           int rightOff = expectedKeyValue.getOffset();
287           int length = Math.min(keyValue.getLength(), expectedKeyValue.getLength());
288           while (commonPrefix < length
289               && left[commonPrefix + leftOff] == right[commonPrefix + rightOff]) {
290             commonPrefix++;
291           }
292 
293           fail(String.format("next() produces wrong results "
294               + "encoder: %s i: %d commonPrefix: %d" + "\n expected %s\n actual      %s", encoder
295               .toString(), i, commonPrefix, Bytes.toStringBinary(expectedKeyValue.getBuffer(),
296               expectedKeyValue.getOffset(), expectedKeyValue.getLength()), Bytes
297               .toStringBinary(keyValue.getBuffer())));
298         }
299         i++;
300       } while (seeker.next());
301     }
302   }
303 
304   /**
305    * Test whether the decompression of first key is implemented correctly.
306    * @throws IOException
307    */
308   @Test
309   public void testFirstKeyInBlockOnSample() throws IOException {
310     List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, includesTags);
311 
312     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
313       if (encoding.getEncoder() == null) {
314         continue;
315       }
316       DataBlockEncoder encoder = encoding.getEncoder();
317       ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
318           getEncodingContext(Compression.Algorithm.NONE, encoding));
319       ByteBuffer keyBuffer = encoder.getFirstKeyInBlock(encodedBuffer);
320       KeyValue firstKv = sampleKv.get(0);
321       if (0 != Bytes.compareTo(keyBuffer.array(), keyBuffer.arrayOffset(), keyBuffer.limit(),
322           firstKv.getBuffer(), firstKv.getKeyOffset(), firstKv.getKeyLength())) {
323 
324         int commonPrefix = 0;
325         int length = Math.min(keyBuffer.limit(), firstKv.getKeyLength());
326         while (commonPrefix < length
327             && keyBuffer.array()[keyBuffer.arrayOffset() + commonPrefix] == firstKv.getBuffer()[firstKv
328                 .getKeyOffset() + commonPrefix]) {
329           commonPrefix++;
330         }
331         fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), commonPrefix));
332       }
333     }
334   }
335   
336   private void checkSeekingConsistency(List<DataBlockEncoder.EncodedSeeker> encodedSeekers,
337       boolean seekBefore, KeyValue keyValue) {
338     ByteBuffer expectedKeyValue = null;
339     ByteBuffer expectedKey = null;
340     ByteBuffer expectedValue = null;
341     for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
342       seeker.seekToKeyInBlock(keyValue, seekBefore);
343       seeker.rewind();
344 
345       ByteBuffer actualKeyValue = KeyValueUtil.copyKeyToNewByteBuffer(seeker.getKeyValue());
346       ByteBuffer actualKey = seeker.getKeyDeepCopy();
347       ByteBuffer actualValue = seeker.getValueShallowCopy();
348 
349       if (expectedKeyValue != null) {
350         assertEquals(expectedKeyValue, actualKeyValue);
351       } else {
352         expectedKeyValue = actualKeyValue;
353       }
354 
355       if (expectedKey != null) {
356         assertEquals(expectedKey, actualKey);
357       } else {
358         expectedKey = actualKey;
359       }
360 
361       if (expectedValue != null) {
362         assertEquals(expectedValue, actualValue);
363       } else {
364         expectedValue = actualValue;
365       }
366     }
367   }
368 
369   private void testEncodersOnDataset(List<KeyValue> kvList, boolean includesMemstoreTS,
370       boolean includesTags) throws IOException {
371     ByteBuffer unencodedDataBuf = RedundantKVGenerator.convertKvToByteBuffer(kvList,
372         includesMemstoreTS);
373     HFileContext fileContext = new HFileContextBuilder().withIncludesMvcc(includesMemstoreTS)
374         .withIncludesTags(includesTags).build();
375     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
376       DataBlockEncoder encoder = encoding.getEncoder();
377       if (encoder == null) {
378         continue;
379       }
380       HFileBlockEncodingContext encodingContext = new HFileBlockDefaultEncodingContext(encoding,
381           HConstants.HFILEBLOCK_DUMMY_HEADER, fileContext);
382 
383       ByteArrayOutputStream baos = new ByteArrayOutputStream();
384       baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
385       DataOutputStream dos = new DataOutputStream(baos);
386       encoder.startBlockEncoding(encodingContext, dos);
387       for (KeyValue kv : kvList) {
388         encoder.encode(kv, encodingContext, dos);
389       }
390       BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
391       baos.writeTo(stream);
392       encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
393       byte[] encodedData = baos.toByteArray();
394 
395       testAlgorithm(encodedData, unencodedDataBuf, encoder);
396     }
397   }
398   
399   @Test
400   public void testZeroByte() throws IOException {
401     List<KeyValue> kvList = new ArrayList<KeyValue>();
402     byte[] row = Bytes.toBytes("abcd");
403     byte[] family = new byte[] { 'f' };
404     byte[] qualifier0 = new byte[] { 'b' };
405     byte[] qualifier1 = new byte[] { 'c' };
406     byte[] value0 = new byte[] { 'd' };
407     byte[] value1 = new byte[] { 0x00 };
408     if (includesTags) {
409       kvList.add(new KeyValue(row, family, qualifier0, 0, value0, new Tag[] { new Tag((byte) 1,
410           "value1") }));
411       kvList.add(new KeyValue(row, family, qualifier1, 0, value1, new Tag[] { new Tag((byte) 1,
412           "value1") }));
413     } else {
414       kvList.add(new KeyValue(row, family, qualifier0, 0, Type.Put, value0));
415       kvList.add(new KeyValue(row, family, qualifier1, 0, Type.Put, value1));
416     }
417     testEncodersOnDataset(kvList, includesMemstoreTS, includesTags);
418   }
419 
420   private void testAlgorithm(byte[] encodedData, ByteBuffer unencodedDataBuf,
421       DataBlockEncoder encoder) throws IOException {
422     // decode
423     ByteArrayInputStream bais = new ByteArrayInputStream(encodedData, ENCODED_DATA_OFFSET,
424         encodedData.length - ENCODED_DATA_OFFSET);
425     DataInputStream dis = new DataInputStream(bais);
426     ByteBuffer actualDataset;
427     HFileContext meta = new HFileContextBuilder().withHBaseCheckSum(false)
428         .withIncludesMvcc(includesMemstoreTS).withIncludesTags(includesTags)
429         .withCompression(Compression.Algorithm.NONE).build();
430     actualDataset = encoder.decodeKeyValues(dis, encoder.newDataBlockDecodingContext(meta));
431     actualDataset.rewind();
432 
433     // this is because in case of prefix tree the decoded stream will not have
434     // the
435     // mvcc in it.
436     assertEquals("Encoding -> decoding gives different results for " + encoder,
437         Bytes.toStringBinary(unencodedDataBuf), Bytes.toStringBinary(actualDataset));
438   }
439 }