View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertNotNull;
26  import static org.junit.Assert.assertNull;
27  import static org.junit.Assert.assertTrue;
28  
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Map.Entry;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HBaseTestCase;
43  import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
44  import org.apache.hadoop.hbase.HBaseTestingUtility;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.client.Delete;
48  import org.apache.hadoop.hbase.client.Get;
49  import org.apache.hadoop.hbase.client.Result;
50  import org.apache.hadoop.hbase.client.Scan;
51  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
52  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
53  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
54  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
55  import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
56  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
57  import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
58  import org.apache.hadoop.hbase.testclassification.MediumTests;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.wal.WAL;
61  import org.junit.After;
62  import org.junit.Before;
63  import org.junit.Rule;
64  import org.junit.Test;
65  import org.junit.experimental.categories.Category;
66  import org.junit.rules.TestName;
67  
68  
69  /**
70   * Test major compactions
71   */
72  @Category(MediumTests.class)
73  public class TestMajorCompaction {
74    @Rule public TestName name = new TestName();
75    private static final Log LOG = LogFactory.getLog(TestMajorCompaction.class.getName());
76    private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
77    protected Configuration conf = UTIL.getConfiguration();
78    
79    private Region r = null;
80    private HTableDescriptor htd = null;
81    private static final byte [] COLUMN_FAMILY = fam1;
82    private final byte [] STARTROW = Bytes.toBytes(START_KEY);
83    private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
84    private int compactionThreshold;
85    private byte[] secondRowBytes, thirdRowBytes;
86    private static final long MAX_FILES_TO_COMPACT = 10;
87  
88    /** constructor */
89    public TestMajorCompaction() {
90      super();
91  
92      // Set cache flush size to 1MB
93      conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
94      conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
95      compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
96  
97      secondRowBytes = START_KEY_BYTES.clone();
98      // Increment the least significant character so we get to next row.
99      secondRowBytes[START_KEY_BYTES.length - 1]++;
100     thirdRowBytes = START_KEY_BYTES.clone();
101     thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
102   }
103 
104   @Before
105   public void setUp() throws Exception {
106     this.htd = UTIL.createTableDescriptor(name.getMethodName());
107     this.r = UTIL.createLocalHRegion(htd, null, null);
108   }
109 
110   @After
111   public void tearDown() throws Exception {
112     WAL wal = ((HRegion)r).getWAL();
113     ((HRegion)r).close();
114     wal.close();
115   }
116 
117   /**
118    * Test that on a major compaction, if all cells are expired or deleted, then
119    * we'll end up with no product.  Make sure scanner over region returns
120    * right answer in this case - and that it just basically works.
121    * @throws IOException
122    */
123   @Test
124   public void testMajorCompactingToNoOutput() throws IOException {
125     createStoreFile(r);
126     for (int i = 0; i < compactionThreshold; i++) {
127       createStoreFile(r);
128     }
129     // Now delete everything.
130     InternalScanner s = r.getScanner(new Scan());
131     do {
132       List<Cell> results = new ArrayList<Cell>();
133       boolean result = s.next(results);
134       r.delete(new Delete(CellUtil.cloneRow(results.get(0))));
135       if (!result) break;
136     } while(true);
137     s.close();
138     // Flush
139     r.flush(true);
140     // Major compact.
141     r.compact(true);
142     s = r.getScanner(new Scan());
143     int counter = 0;
144     do {
145       List<Cell> results = new ArrayList<Cell>();
146       boolean result = s.next(results);
147       if (!result) break;
148       counter++;
149     } while(true);
150     assertEquals(0, counter);
151   }
152 
153   /**
154    * Run compaction and flushing memstore
155    * Assert deletes get cleaned up.
156    * @throws Exception
157    */
158   @Test
159   public void testMajorCompaction() throws Exception {
160     majorCompaction();
161   }
162 
163   @Test
164   public void testDataBlockEncodingInCacheOnly() throws Exception {
165     majorCompactionWithDataBlockEncoding(true);
166   }
167 
168   @Test
169   public void testDataBlockEncodingEverywhere() throws Exception {
170     majorCompactionWithDataBlockEncoding(false);
171   }
172 
173   public void majorCompactionWithDataBlockEncoding(boolean inCacheOnly)
174       throws Exception {
175     Map<Store, HFileDataBlockEncoder> replaceBlockCache =
176         new HashMap<Store, HFileDataBlockEncoder>();
177     for (Store store : r.getStores()) {
178       HFileDataBlockEncoder blockEncoder = store.getDataBlockEncoder();
179       replaceBlockCache.put(store, blockEncoder);
180       final DataBlockEncoding inCache = DataBlockEncoding.PREFIX;
181       final DataBlockEncoding onDisk = inCacheOnly ? DataBlockEncoding.NONE :
182           inCache;
183       ((HStore)store).setDataBlockEncoderInTest(new HFileDataBlockEncoderImpl(onDisk));
184     }
185 
186     majorCompaction();
187 
188     // restore settings
189     for (Entry<Store, HFileDataBlockEncoder> entry : replaceBlockCache.entrySet()) {
190       ((HStore)entry.getKey()).setDataBlockEncoderInTest(entry.getValue());
191     }
192   }
193 
194   private void majorCompaction() throws Exception {
195     createStoreFile(r);
196     for (int i = 0; i < compactionThreshold; i++) {
197       createStoreFile(r);
198     }
199     // Add more content.
200     HBaseTestCase.addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
201 
202     // Now there are about 5 versions of each column.
203     // Default is that there only 3 (MAXVERSIONS) versions allowed per column.
204     //
205     // Assert == 3 when we ask for versions.
206     Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
207     assertEquals(compactionThreshold, result.size());
208 
209     // see if CompactionProgress is in place but null
210     for (Store store : r.getStores()) {
211       assertNull(store.getCompactionProgress());
212     }
213 
214     r.flush(true);
215     r.compact(true);
216 
217     // see if CompactionProgress has done its thing on at least one store
218     int storeCount = 0;
219     for (Store store : r.getStores()) {
220       CompactionProgress progress = store.getCompactionProgress();
221       if( progress != null ) {
222         ++storeCount;
223         assertTrue(progress.currentCompactedKVs > 0);
224         assertTrue(progress.totalCompactingKVs > 0);
225       }
226       assertTrue(storeCount > 0);
227     }
228 
229     // look at the second row
230     // Increment the least significant character so we get to next row.
231     byte [] secondRowBytes = START_KEY_BYTES.clone();
232     secondRowBytes[START_KEY_BYTES.length - 1]++;
233 
234     // Always 3 versions if that is what max versions is.
235     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).
236         setMaxVersions(100));
237     LOG.debug("Row " + Bytes.toStringBinary(secondRowBytes) + " after " +
238         "initial compaction: " + result);
239     assertEquals("Invalid number of versions of row "
240         + Bytes.toStringBinary(secondRowBytes) + ".", compactionThreshold,
241         result.size());
242 
243     // Now add deletes to memstore and then flush it.
244     // That will put us over
245     // the compaction threshold of 3 store files.  Compacting these store files
246     // should result in a compacted store file that has no references to the
247     // deleted row.
248     LOG.debug("Adding deletes to memstore and flushing");
249     Delete delete = new Delete(secondRowBytes, System.currentTimeMillis());
250     byte [][] famAndQf = {COLUMN_FAMILY, null};
251     delete.deleteFamily(famAndQf[0]);
252     r.delete(delete);
253 
254     // Assert deleted.
255     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
256     assertTrue("Second row should have been deleted", result.isEmpty());
257 
258     r.flush(true);
259 
260     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
261     assertTrue("Second row should have been deleted", result.isEmpty());
262 
263     // Add a bit of data and flush.  Start adding at 'bbb'.
264     createSmallerStoreFile(this.r);
265     r.flush(true);
266     // Assert that the second row is still deleted.
267     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
268     assertTrue("Second row should still be deleted", result.isEmpty());
269 
270     // Force major compaction.
271     r.compact(true);
272     assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
273 
274     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100));
275     assertTrue("Second row should still be deleted", result.isEmpty());
276 
277     // Make sure the store files do have some 'aaa' keys in them -- exactly 3.
278     // Also, that compacted store files do not have any secondRowBytes because
279     // they were deleted.
280     verifyCounts(3,0);
281 
282     // Multiple versions allowed for an entry, so the delete isn't enough
283     // Lower TTL and expire to ensure that all our entries have been wiped
284     final int ttl = 1000;
285     for (Store hstore : r.getStores()) {
286       HStore store = ((HStore) hstore);
287       ScanInfo old = store.getScanInfo();
288       ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
289           old.getMinVersions(), old.getMaxVersions(), ttl,
290           old.getKeepDeletedCells(), 0, old.getComparator());
291       store.setScanInfo(si);
292     }
293     Thread.sleep(1000);
294 
295     r.compact(true);
296     int count = count();
297     assertEquals("Should not see anything after TTL has expired", 0, count);
298   }
299 
300   @Test
301   public void testTimeBasedMajorCompaction() throws Exception {
302     // create 2 storefiles and force a major compaction to reset the time
303     int delay = 10 * 1000; // 10 sec
304     float jitterPct = 0.20f; // 20%
305     conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay);
306     conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
307 
308     HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
309     s.storeEngine.getCompactionPolicy().setConf(conf);
310     try {
311       createStoreFile(r);
312       createStoreFile(r);
313       r.compact(true);
314 
315       // add one more file & verify that a regular compaction won't work
316       createStoreFile(r);
317       r.compact(false);
318       assertEquals(2, s.getStorefilesCount());
319 
320       // ensure that major compaction time is deterministic
321       RatioBasedCompactionPolicy
322           c = (RatioBasedCompactionPolicy)s.storeEngine.getCompactionPolicy();
323       Collection<StoreFile> storeFiles = s.getStorefiles();
324       long mcTime = c.getNextMajorCompactTime(storeFiles);
325       for (int i = 0; i < 10; ++i) {
326         assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
327       }
328 
329       // ensure that the major compaction time is within the variance
330       long jitter = Math.round(delay * jitterPct);
331       assertTrue(delay - jitter <= mcTime && mcTime <= delay + jitter);
332 
333       // wait until the time-based compaction interval
334       Thread.sleep(mcTime);
335 
336       // trigger a compaction request and ensure that it's upgraded to major
337       r.compact(false);
338       assertEquals(1, s.getStorefilesCount());
339     } finally {
340       // reset the timed compaction settings
341       conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
342       conf.setFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
343       // run a major to reset the cache
344       createStoreFile(r);
345       r.compact(true);
346       assertEquals(1, s.getStorefilesCount());
347     }
348   }
349 
350   private void verifyCounts(int countRow1, int countRow2) throws Exception {
351     int count1 = 0;
352     int count2 = 0;
353     for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) {
354       HFileScanner scanner = f.getReader().getScanner(false, false);
355       scanner.seekTo();
356       do {
357         byte [] row = scanner.getKeyValue().getRow();
358         if (Bytes.equals(row, STARTROW)) {
359           count1++;
360         } else if(Bytes.equals(row, secondRowBytes)) {
361           count2++;
362         }
363       } while(scanner.next());
364     }
365     assertEquals(countRow1,count1);
366     assertEquals(countRow2,count2);
367   }
368 
369 
370   private int count() throws IOException {
371     int count = 0;
372     for (StoreFile f: r.getStore(COLUMN_FAMILY_TEXT).getStorefiles()) {
373       HFileScanner scanner = f.getReader().getScanner(false, false);
374       if (!scanner.seekTo()) {
375         continue;
376       }
377       do {
378         count++;
379       } while(scanner.next());
380     }
381     return count;
382   }
383 
384   private void createStoreFile(final Region region) throws IOException {
385     createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
386   }
387 
388   private void createStoreFile(final Region region, String family) throws IOException {
389     HRegionIncommon loader = new HRegionIncommon(region);
390     HBaseTestCase.addContent(loader, family);
391     loader.flushcache();
392   }
393 
394   private void createSmallerStoreFile(final Region region) throws IOException {
395     HRegionIncommon loader = new HRegionIncommon(region);
396     HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" +
397         "bbb").getBytes(), null);
398     loader.flushcache();
399   }
400 
401   /**
402    * Test for HBASE-5920 - Test user requested major compactions always occurring
403    */
404   @Test
405   public void testNonUserMajorCompactionRequest() throws Exception {
406     Store store = r.getStore(COLUMN_FAMILY);
407     createStoreFile(r);
408     for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
409       createStoreFile(r);
410     }
411     store.triggerMajorCompaction();
412 
413     CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
414     assertNotNull("Expected to receive a compaction request", request);
415     assertEquals(
416       "System-requested major compaction should not occur if there are too many store files",
417       false,
418       request.isMajor());
419   }
420 
421   /**
422    * Test for HBASE-5920
423    */
424   @Test
425   public void testUserMajorCompactionRequest() throws IOException{
426     Store store = r.getStore(COLUMN_FAMILY);
427     createStoreFile(r);
428     for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
429       createStoreFile(r);
430     }
431     store.triggerMajorCompaction();
432     CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
433     assertNotNull("Expected to receive a compaction request", request);
434     assertEquals(
435       "User-requested major compaction should always occur, even if there are too many store files",
436       true, 
437       request.isMajor());
438   }
439 
440   /**
441    * Test that on a major compaction, if all cells are expired or deleted, then we'll end up with no
442    * product. Make sure scanner over region returns right answer in this case - and that it just
443    * basically works.
444    * @throws IOException
445    */
446   public void testMajorCompactingToNoOutputWithReverseScan() throws IOException {
447     createStoreFile(r);
448     for (int i = 0; i < compactionThreshold; i++) {
449       createStoreFile(r);
450     }
451     // Now delete everything.
452     Scan scan = new Scan();
453     scan.setReversed(true);
454     InternalScanner s = r.getScanner(scan);
455     do {
456       List<Cell> results = new ArrayList<Cell>();
457       boolean result = s.next(results);
458       assertTrue(!results.isEmpty());
459       r.delete(new Delete(results.get(0).getRow()));
460       if (!result) break;
461     } while (true);
462     s.close();
463     // Flush
464     r.flush(true);
465     // Major compact.
466     r.compact(true);
467     scan = new Scan();
468     scan.setReversed(true);
469     s = r.getScanner(scan);
470     int counter = 0;
471     do {
472       List<Cell> results = new ArrayList<Cell>();
473       boolean result = s.next(results);
474       if (!result) break;
475       counter++;
476     } while (true);
477     s.close();
478     assertEquals(0, counter);
479   }
480 }