View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.io.encoding;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertNotNull;
25  import static org.junit.Assert.fail;
26  
27  import java.io.ByteArrayOutputStream;
28  import java.io.DataOutputStream;
29  import java.nio.ByteBuffer;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.List;
33  import java.util.Random;
34  import java.util.concurrent.ConcurrentSkipListSet;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.KeyValueUtil;
41  import org.apache.hadoop.hbase.testclassification.SmallTests;
42  import org.apache.hadoop.hbase.Tag;
43  import org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec;
44  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
45  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
46  import org.apache.hadoop.hbase.io.hfile.HFileContext;
47  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
50  import org.junit.Assert;
51  import org.junit.Before;
52  import org.junit.Test;
53  import org.junit.experimental.categories.Category;
54  import org.junit.runner.RunWith;
55  import org.junit.runners.Parameterized;
56  import org.junit.runners.Parameterized.Parameters;
57  
58  /**
59   * Tests scanning/seeking data with PrefixTree Encoding.
60   */
61  @RunWith(Parameterized.class)
62  @Category(SmallTests.class)
63  public class TestPrefixTreeEncoding {
64    private static final Log LOG = LogFactory.getLog(TestPrefixTreeEncoding.class);
65    private static final String CF = "EncodingTestCF";
66    private static final byte[] CF_BYTES = Bytes.toBytes(CF);
67    private static final int NUM_ROWS_PER_BATCH = 50;
68    private static final int NUM_COLS_PER_ROW = 20;
69  
70    private int numBatchesWritten = 0;
71    private ConcurrentSkipListSet<Cell> kvset = new ConcurrentSkipListSet<Cell>(
72        KeyValue.COMPARATOR);
73  
74    private static boolean formatRowNum = false;
75    
76    @Parameters
77    public static Collection<Object[]> parameters() {
78      List<Object[]> paramList = new ArrayList<Object[]>();
79      {
80        paramList.add(new Object[] { false });
81        paramList.add(new Object[] { true });
82      }
83      return paramList;
84    }
85    private final boolean includesTag;
86    public TestPrefixTreeEncoding(boolean includesTag) {
87      this.includesTag = includesTag;
88    }
89   
90    @Before
91    public void setUp() throws Exception {
92      kvset.clear();
93      formatRowNum = false;
94    }
95  
96    @Test
97    public void testSeekBeforeWithFixedData() throws Exception {
98      formatRowNum = true;
99      PrefixTreeCodec encoder = new PrefixTreeCodec();
100     int batchId = numBatchesWritten++;
101     HFileContext meta = new HFileContextBuilder()
102                         .withHBaseCheckSum(false)
103                         .withIncludesMvcc(false)
104                         .withIncludesTags(includesTag)
105                         .withCompression(Algorithm.NONE).build();
106     HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
107         DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
108     ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
109     DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
110     generateFixedTestData(kvset, batchId, false, includesTag, encoder, blkEncodingCtx,
111         userDataStream);
112     EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
113         encoder.newDataBlockDecodingContext(meta));
114     byte[] onDiskBytes = baosInMemory.toByteArray();
115     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
116         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
117     seeker.setCurrentBuffer(readBuffer);
118 
119     // Seek before the first keyvalue;
120     KeyValue seekKey = KeyValueUtil.createFirstDeleteFamilyOnRow(getRowKey(batchId, 0), CF_BYTES);
121     seeker.seekToKeyInBlock(
122         new KeyValue.KeyOnlyKeyValue(seekKey.getBuffer(), seekKey.getKeyOffset(), seekKey
123             .getKeyLength()), true);
124     assertEquals(null, seeker.getKeyValue());
125 
126     // Seek before the middle keyvalue;
127     seekKey = KeyValueUtil.createFirstDeleteFamilyOnRow(getRowKey(batchId, NUM_ROWS_PER_BATCH / 3),
128         CF_BYTES);
129     seeker.seekToKeyInBlock(
130         new KeyValue.KeyOnlyKeyValue(seekKey.getBuffer(), seekKey.getKeyOffset(), seekKey
131             .getKeyLength()), true);
132     assertNotNull(seeker.getKeyValue());
133     assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH / 3 - 1), seeker.getKeyValue().getRow());
134 
135     // Seek before the last keyvalue;
136     seekKey = KeyValueUtil.createFirstDeleteFamilyOnRow(Bytes.toBytes("zzzz"), CF_BYTES);
137     seeker.seekToKeyInBlock(
138         new KeyValue.KeyOnlyKeyValue(seekKey.getBuffer(), seekKey.getKeyOffset(), seekKey
139             .getKeyLength()), true);
140     assertNotNull(seeker.getKeyValue());
141     assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH - 1), seeker.getKeyValue().getRow());
142   }
143 
144   @Test
145   public void testScanWithRandomData() throws Exception {
146     PrefixTreeCodec encoder = new PrefixTreeCodec();
147     ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
148     DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
149     HFileContext meta = new HFileContextBuilder()
150                         .withHBaseCheckSum(false)
151                         .withIncludesMvcc(false)
152                         .withIncludesTags(includesTag)
153                         .withCompression(Algorithm.NONE)
154                         .build();
155     HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
156         DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
157     generateRandomTestData(kvset, numBatchesWritten++, includesTag, encoder, blkEncodingCtx,
158         userDataStream);
159     EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
160         encoder.newDataBlockDecodingContext(meta));
161     byte[] onDiskBytes = baosInMemory.toByteArray();
162     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
163         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
164     seeker.setCurrentBuffer(readBuffer);
165     Cell previousKV = null;
166     do {
167       Cell currentKV = seeker.getKeyValue();
168       System.out.println(currentKV);
169       if (previousKV != null && KeyValue.COMPARATOR.compare(currentKV, previousKV) < 0) {
170         dumpInputKVSet();
171         fail("Current kv " + currentKV + " is smaller than previous keyvalue " + previousKV);
172       }
173       if (!includesTag) {
174         assertFalse(currentKV.getTagsLength() > 0);
175       } else {
176         Assert.assertTrue(currentKV.getTagsLength() > 0);
177       }
178       previousKV = currentKV;
179     } while (seeker.next());
180   }
181 
182   @Test
183   public void testSeekWithRandomData() throws Exception {
184     PrefixTreeCodec encoder = new PrefixTreeCodec();
185     ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
186     DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
187     int batchId = numBatchesWritten++;
188     HFileContext meta = new HFileContextBuilder()
189                         .withHBaseCheckSum(false)
190                         .withIncludesMvcc(false)
191                         .withIncludesTags(includesTag)
192                         .withCompression(Algorithm.NONE)
193                         .build();
194     HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
195         DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
196     generateRandomTestData(kvset, batchId, includesTag, encoder, blkEncodingCtx, userDataStream);
197     EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
198         encoder.newDataBlockDecodingContext(meta));
199     byte[] onDiskBytes = baosInMemory.toByteArray();
200     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
201         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
202     verifySeeking(seeker, readBuffer, batchId);
203   }
204 
205   @Test
206   public void testSeekWithFixedData() throws Exception {
207     PrefixTreeCodec encoder = new PrefixTreeCodec();
208     int batchId = numBatchesWritten++;
209     HFileContext meta = new HFileContextBuilder()
210                         .withHBaseCheckSum(false)
211                         .withIncludesMvcc(false)
212                         .withIncludesTags(includesTag)
213                         .withCompression(Algorithm.NONE)
214                         .build();
215     HFileBlockEncodingContext blkEncodingCtx = new HFileBlockDefaultEncodingContext(
216         DataBlockEncoding.PREFIX_TREE, new byte[0], meta);
217     ByteArrayOutputStream baosInMemory = new ByteArrayOutputStream();
218     DataOutputStream userDataStream = new DataOutputStream(baosInMemory);
219     generateFixedTestData(kvset, batchId, includesTag, encoder, blkEncodingCtx, userDataStream);
220     EncodedSeeker seeker = encoder.createSeeker(KeyValue.COMPARATOR,
221         encoder.newDataBlockDecodingContext(meta));
222     byte[] onDiskBytes = baosInMemory.toByteArray();
223     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, DataBlockEncoding.ID_SIZE,
224         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
225     verifySeeking(seeker, readBuffer, batchId);
226   }
227   
228   private void verifySeeking(EncodedSeeker encodeSeeker,
229       ByteBuffer encodedData, int batchId) {
230     List<KeyValue> kvList = new ArrayList<KeyValue>();
231     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
232       kvList.clear();
233       encodeSeeker.setCurrentBuffer(encodedData);
234       KeyValue firstOnRow = KeyValueUtil.createFirstOnRow(getRowKey(batchId, i));
235       encodeSeeker.seekToKeyInBlock(
236           new KeyValue.KeyOnlyKeyValue(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
237               firstOnRow.getKeyLength()), false);
238       boolean hasMoreOfEncodeScanner = encodeSeeker.next();
239       CollectionBackedScanner collectionScanner = new CollectionBackedScanner(
240           this.kvset);
241       boolean hasMoreOfCollectionScanner = collectionScanner.seek(firstOnRow);
242       if (hasMoreOfEncodeScanner != hasMoreOfCollectionScanner) {
243         dumpInputKVSet();
244         fail("Get error result after seeking " + firstOnRow);
245       }
246       if (hasMoreOfEncodeScanner) {
247         if (KeyValue.COMPARATOR.compare(encodeSeeker.getKeyValue(),
248             collectionScanner.peek()) != 0) {
249           dumpInputKVSet();
250           fail("Expected " + collectionScanner.peek() + " actual "
251               + encodeSeeker.getKeyValue() + ", after seeking " + firstOnRow);
252         }
253       }
254     }
255   }
256 
257   private void dumpInputKVSet() {
258     LOG.info("Dumping input keyvalue set in error case:");
259     for (Cell kv : kvset) {
260       System.out.println(kv);
261     }
262   }
263 
264   private static void generateFixedTestData(ConcurrentSkipListSet<Cell> kvset, int batchId,
265       boolean useTags, PrefixTreeCodec encoder, HFileBlockEncodingContext blkEncodingCtx,
266       DataOutputStream userDataStream) throws Exception {
267     generateFixedTestData(kvset, batchId, true, useTags, encoder, blkEncodingCtx, userDataStream);
268   }
269 
270   private static void generateFixedTestData(ConcurrentSkipListSet<Cell> kvset,
271       int batchId, boolean partial, boolean useTags, PrefixTreeCodec encoder,
272       HFileBlockEncodingContext blkEncodingCtx, DataOutputStream userDataStream) throws Exception {
273     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
274       if (partial && i / 10 % 2 == 1)
275         continue;
276       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
277         if (!useTags) {
278           KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), getValue(
279               batchId, i, j));
280           kvset.add(kv);
281         } else {
282           KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), 0l,
283               getValue(batchId, i, j), new Tag[] { new Tag((byte) 1, "metaValue1") });
284           kvset.add(kv);
285         }
286       }
287     }
288     encoder.startBlockEncoding(blkEncodingCtx, userDataStream);
289     for (Cell kv : kvset) {
290       encoder.encode(kv, blkEncodingCtx, userDataStream);
291     }
292     encoder.endBlockEncoding(blkEncodingCtx, userDataStream, null);
293   }
294 
295   private static void generateRandomTestData(ConcurrentSkipListSet<Cell> kvset,
296       int batchId, boolean useTags, PrefixTreeCodec encoder,
297       HFileBlockEncodingContext blkEncodingCtx, DataOutputStream userDataStream) throws Exception {
298     Random random = new Random();
299     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
300       if (random.nextInt(100) < 50)
301         continue;
302       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
303         if (random.nextInt(100) < 50)
304           continue;
305         if (!useTags) {
306           KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), getValue(
307               batchId, i, j));
308           kvset.add(kv);
309         } else {
310           KeyValue kv = new KeyValue(getRowKey(batchId, i), CF_BYTES, getQualifier(j), 0l,
311               getValue(batchId, i, j), new Tag[] { new Tag((byte) 1, "metaValue1") });
312           kvset.add(kv);
313         }
314       }
315     }
316     encoder.startBlockEncoding(blkEncodingCtx, userDataStream);
317     for (Cell kv : kvset) {
318       encoder.encode(kv, blkEncodingCtx, userDataStream);
319     }
320     encoder.endBlockEncoding(blkEncodingCtx, userDataStream, null);
321   }
322 
323   private static byte[] getRowKey(int batchId, int i) {
324     return Bytes
325         .toBytes("batch" + batchId + "_row" + (formatRowNum ? String.format("%04d", i) : i));
326   }
327 
328   private static byte[] getQualifier(int j) {
329     return Bytes.toBytes("colfdfafhfhsdfhsdfh" + j);
330   }
331 
332   private static byte[] getValue(int batchId, int i, int j) {
333     return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i)) + "_col" + j);
334   }
335 
336 }