View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.List;
25  import java.util.Random;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.MiniHBaseCluster;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.client.Connection;
36  import org.apache.hadoop.hbase.client.ConnectionFactory;
37  import org.apache.hadoop.hbase.client.HBaseAdmin;
38  import org.apache.hadoop.hbase.client.HTable;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Table;
41  import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
42  import org.apache.hadoop.hbase.regionserver.HRegion;
43  import org.apache.hadoop.hbase.regionserver.HRegionServer;
44  import org.apache.hadoop.hbase.regionserver.HStore;
45  import org.apache.hadoop.hbase.regionserver.Region;
46  import org.apache.hadoop.hbase.regionserver.Store;
47  import org.apache.hadoop.hbase.regionserver.StoreEngine;
48  import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
49  import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
50  import org.apache.hadoop.hbase.testclassification.MediumTests;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.JVMClusterUtil;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  
56  @Category(MediumTests.class)
57  public class TestCompactionWithThroughputController {
58  
59    private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
60  
61    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
62  
63    private static final double EPSILON = 1E-6;
64  
65    private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
66  
67    private final byte[] family = Bytes.toBytes("f");
68  
69    private final byte[] qualifier = Bytes.toBytes("q");
70  
71    private Store getStoreWithName(TableName tableName) {
72      MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
73      List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
74      for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
75        HRegionServer hrs = rsts.get(i).getRegionServer();
76        for (Region region : hrs.getOnlineRegions(tableName)) {
77          return region.getStores().iterator().next();
78        }
79      }
80      return null;
81    }
82  
83    private Store prepareData() throws IOException {
84      HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
85      if (admin.tableExists(tableName)) {
86        admin.disableTable(tableName);
87        admin.deleteTable(tableName);
88      }
89      HTable table = TEST_UTIL.createTable(tableName, family);
90      Random rand = new Random();
91      for (int i = 0; i < 10; i++) {
92        for (int j = 0; j < 10; j++) {
93          byte[] value = new byte[128 * 1024];
94          rand.nextBytes(value);
95          table.put(new Put(Bytes.toBytes(i * 10 + j)).add(family, qualifier, value));
96        }
97        admin.flush(tableName);
98      }
99      return getStoreWithName(tableName);
100   }
101 
102   private long testCompactionWithThroughputLimit() throws Exception {
103     long throughputLimit = 1024L * 1024;
104     Configuration conf = TEST_UTIL.getConfiguration();
105     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
106     conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
107     conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
108     conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
109     conf.setLong(
110       PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
111       throughputLimit);
112     conf.setLong(
113       PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
114       throughputLimit);
115     conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
116       PressureAwareCompactionThroughputController.class.getName());
117     TEST_UTIL.startMiniCluster(1);
118     try {
119       Store store = prepareData();
120       assertEquals(10, store.getStorefilesCount());
121       long startTime = System.currentTimeMillis();
122       TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
123       while (store.getStorefilesCount() != 1) {
124         Thread.sleep(20);
125       }
126       long duration = System.currentTimeMillis() - startTime;
127       double throughput = (double) store.getStorefilesSize() / duration * 1000;
128       // confirm that the speed limit work properly(not too fast, and also not too slow)
129       // 20% is the max acceptable error rate.
130       assertTrue(throughput < throughputLimit * 1.2);
131       assertTrue(throughput > throughputLimit * 0.8);
132       return System.currentTimeMillis() - startTime;
133     } finally {
134       TEST_UTIL.shutdownMiniCluster();
135     }
136   }
137 
138   private long testCompactionWithoutThroughputLimit() throws Exception {
139     Configuration conf = TEST_UTIL.getConfiguration();
140     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
141     conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
142     conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
143     conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
144     conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
145       NoLimitCompactionThroughputController.class.getName());
146     TEST_UTIL.startMiniCluster(1);
147     try {
148       Store store = prepareData();
149       assertEquals(10, store.getStorefilesCount());
150       long startTime = System.currentTimeMillis();
151       TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
152       while (store.getStorefilesCount() != 1) {
153         Thread.sleep(20);
154       }
155       return System.currentTimeMillis() - startTime;
156     } finally {
157       TEST_UTIL.shutdownMiniCluster();
158     }
159   }
160 
161   @Test
162   public void testCompaction() throws Exception {
163     long limitTime = testCompactionWithThroughputLimit();
164     long noLimitTime = testCompactionWithoutThroughputLimit();
165     LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
166         + noLimitTime + "ms");
167     // usually the throughput of a compaction without limitation is about 40MB/sec at least, so this
168     // is a very weak assumption.
169     assertTrue(limitTime > noLimitTime * 2);
170   }
171 
172   /**
173    * Test the tuning task of {@link PressureAwareCompactionThroughputController}
174    */
175   @Test
176   public void testThroughputTuning() throws Exception {
177     Configuration conf = TEST_UTIL.getConfiguration();
178     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
179     conf.setLong(
180       PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
181       20L * 1024 * 1024);
182     conf.setLong(
183       PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
184       10L * 1024 * 1024);
185     conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
186     conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
187     conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
188       PressureAwareCompactionThroughputController.class.getName());
189     conf.setInt(
190       PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
191       1000);
192     TEST_UTIL.startMiniCluster(1);
193     Connection conn = ConnectionFactory.createConnection(conf);
194     try {
195       HTableDescriptor htd = new HTableDescriptor(tableName);
196       htd.addFamily(new HColumnDescriptor(family));
197       htd.setCompactionEnabled(false);
198       TEST_UTIL.getHBaseAdmin().createTable(htd);
199       TEST_UTIL.waitTableAvailable(tableName);
200       HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
201       PressureAwareCompactionThroughputController throughputController =
202           (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
203               .getCompactionThroughputController();
204       assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
205       Table table = conn.getTable(tableName);
206       for (int i = 0; i < 5; i++) {
207         table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
208         TEST_UTIL.flush(tableName);
209       }
210       Thread.sleep(2000);
211       assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
212 
213       table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
214       TEST_UTIL.flush(tableName);
215       Thread.sleep(2000);
216       assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
217 
218       table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
219       TEST_UTIL.flush(tableName);
220       Thread.sleep(2000);
221       assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON);
222 
223       conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
224         NoLimitCompactionThroughputController.class.getName());
225       regionServer.compactSplitThread.onConfigurationChange(conf);
226       assertTrue(throughputController.isStopped());
227       assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() instanceof NoLimitCompactionThroughputController);
228     } finally {
229       conn.close();
230       TEST_UTIL.shutdownMiniCluster();
231     }
232   }
233 
234   /**
235    * Test the logic that we calculate compaction pressure for a striped store.
236    */
237   @Test
238   public void testGetCompactionPressureForStripedStore() throws Exception {
239     Configuration conf = TEST_UTIL.getConfiguration();
240     conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
241     conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
242     conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
243     conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
244     conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
245     TEST_UTIL.startMiniCluster(1);
246     Connection conn = ConnectionFactory.createConnection(conf);
247     try {
248       HTableDescriptor htd = new HTableDescriptor(tableName);
249       htd.addFamily(new HColumnDescriptor(family));
250       htd.setCompactionEnabled(false);
251       TEST_UTIL.getHBaseAdmin().createTable(htd);
252       TEST_UTIL.waitTableAvailable(tableName);
253       HStore store = (HStore) getStoreWithName(tableName);
254       assertEquals(0, store.getStorefilesCount());
255       assertEquals(0.0, store.getCompactionPressure(), EPSILON);
256       Table table = conn.getTable(tableName);
257       for (int i = 0; i < 4; i++) {
258         table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
259         table.put(new Put(Bytes.toBytes(100 + i)).add(family, qualifier, new byte[0]));
260         TEST_UTIL.flush(tableName);
261       }
262       assertEquals(8, store.getStorefilesCount());
263       assertEquals(0.0, store.getCompactionPressure(), EPSILON);
264 
265       table.put(new Put(Bytes.toBytes(4)).add(family, qualifier, new byte[0]));
266       table.put(new Put(Bytes.toBytes(104)).add(family, qualifier, new byte[0]));
267       TEST_UTIL.flush(tableName);
268       assertEquals(10, store.getStorefilesCount());
269       assertEquals(0.5, store.getCompactionPressure(), EPSILON);
270 
271       table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
272       table.put(new Put(Bytes.toBytes(105)).add(family, qualifier, new byte[0]));
273       TEST_UTIL.flush(tableName);
274       assertEquals(12, store.getStorefilesCount());
275       assertEquals(1.0, store.getCompactionPressure(), EPSILON);
276 
277       table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
278       table.put(new Put(Bytes.toBytes(106)).add(family, qualifier, new byte[0]));
279       TEST_UTIL.flush(tableName);
280       assertEquals(14, store.getStorefilesCount());
281       assertEquals(2.0, store.getCompactionPressure(), EPSILON);
282     } finally {
283       conn.close();
284       TEST_UTIL.shutdownMiniCluster();
285     }
286   }
287 }