1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.Random;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HColumnDescriptor;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.MiniHBaseCluster;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.Connection;
36 import org.apache.hadoop.hbase.client.ConnectionFactory;
37 import org.apache.hadoop.hbase.client.HBaseAdmin;
38 import org.apache.hadoop.hbase.client.HTable;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Table;
41 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
42 import org.apache.hadoop.hbase.regionserver.HRegion;
43 import org.apache.hadoop.hbase.regionserver.HRegionServer;
44 import org.apache.hadoop.hbase.regionserver.HStore;
45 import org.apache.hadoop.hbase.regionserver.Region;
46 import org.apache.hadoop.hbase.regionserver.Store;
47 import org.apache.hadoop.hbase.regionserver.StoreEngine;
48 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
49 import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
50 import org.apache.hadoop.hbase.testclassification.MediumTests;
51 import org.apache.hadoop.hbase.util.Bytes;
52 import org.apache.hadoop.hbase.util.JVMClusterUtil;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55
56 @Category(MediumTests.class)
57 public class TestCompactionWithThroughputController {
58
59 private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
60
61 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
62
63 private static final double EPSILON = 1E-6;
64
65 private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
66
67 private final byte[] family = Bytes.toBytes("f");
68
69 private final byte[] qualifier = Bytes.toBytes("q");
70
71 private Store getStoreWithName(TableName tableName) {
72 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
73 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
74 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
75 HRegionServer hrs = rsts.get(i).getRegionServer();
76 for (Region region : hrs.getOnlineRegions(tableName)) {
77 return region.getStores().iterator().next();
78 }
79 }
80 return null;
81 }
82
83 private Store prepareData() throws IOException {
84 HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
85 if (admin.tableExists(tableName)) {
86 admin.disableTable(tableName);
87 admin.deleteTable(tableName);
88 }
89 HTable table = TEST_UTIL.createTable(tableName, family);
90 Random rand = new Random();
91 for (int i = 0; i < 10; i++) {
92 for (int j = 0; j < 10; j++) {
93 byte[] value = new byte[128 * 1024];
94 rand.nextBytes(value);
95 table.put(new Put(Bytes.toBytes(i * 10 + j)).add(family, qualifier, value));
96 }
97 admin.flush(tableName);
98 }
99 return getStoreWithName(tableName);
100 }
101
102 private long testCompactionWithThroughputLimit() throws Exception {
103 long throughputLimit = 1024L * 1024;
104 Configuration conf = TEST_UTIL.getConfiguration();
105 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
106 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
107 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
108 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
109 conf.setLong(
110 PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
111 throughputLimit);
112 conf.setLong(
113 PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
114 throughputLimit);
115 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
116 PressureAwareCompactionThroughputController.class.getName());
117 TEST_UTIL.startMiniCluster(1);
118 try {
119 Store store = prepareData();
120 assertEquals(10, store.getStorefilesCount());
121 long startTime = System.currentTimeMillis();
122 TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
123 while (store.getStorefilesCount() != 1) {
124 Thread.sleep(20);
125 }
126 long duration = System.currentTimeMillis() - startTime;
127 double throughput = (double) store.getStorefilesSize() / duration * 1000;
128
129
130 assertTrue(throughput < throughputLimit * 1.2);
131 assertTrue(throughput > throughputLimit * 0.8);
132 return System.currentTimeMillis() - startTime;
133 } finally {
134 TEST_UTIL.shutdownMiniCluster();
135 }
136 }
137
138 private long testCompactionWithoutThroughputLimit() throws Exception {
139 Configuration conf = TEST_UTIL.getConfiguration();
140 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
141 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
142 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
143 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
144 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
145 NoLimitCompactionThroughputController.class.getName());
146 TEST_UTIL.startMiniCluster(1);
147 try {
148 Store store = prepareData();
149 assertEquals(10, store.getStorefilesCount());
150 long startTime = System.currentTimeMillis();
151 TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
152 while (store.getStorefilesCount() != 1) {
153 Thread.sleep(20);
154 }
155 return System.currentTimeMillis() - startTime;
156 } finally {
157 TEST_UTIL.shutdownMiniCluster();
158 }
159 }
160
161 @Test
162 public void testCompaction() throws Exception {
163 long limitTime = testCompactionWithThroughputLimit();
164 long noLimitTime = testCompactionWithoutThroughputLimit();
165 LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction use "
166 + noLimitTime + "ms");
167
168
169 assertTrue(limitTime > noLimitTime * 2);
170 }
171
172
173
174
175 @Test
176 public void testThroughputTuning() throws Exception {
177 Configuration conf = TEST_UTIL.getConfiguration();
178 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
179 conf.setLong(
180 PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
181 20L * 1024 * 1024);
182 conf.setLong(
183 PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
184 10L * 1024 * 1024);
185 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
186 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
187 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
188 PressureAwareCompactionThroughputController.class.getName());
189 conf.setInt(
190 PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
191 1000);
192 TEST_UTIL.startMiniCluster(1);
193 Connection conn = ConnectionFactory.createConnection(conf);
194 try {
195 HTableDescriptor htd = new HTableDescriptor(tableName);
196 htd.addFamily(new HColumnDescriptor(family));
197 htd.setCompactionEnabled(false);
198 TEST_UTIL.getHBaseAdmin().createTable(htd);
199 TEST_UTIL.waitTableAvailable(tableName);
200 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
201 PressureAwareCompactionThroughputController throughputController =
202 (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
203 .getCompactionThroughputController();
204 assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
205 Table table = conn.getTable(tableName);
206 for (int i = 0; i < 5; i++) {
207 table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
208 TEST_UTIL.flush(tableName);
209 }
210 Thread.sleep(2000);
211 assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
212
213 table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
214 TEST_UTIL.flush(tableName);
215 Thread.sleep(2000);
216 assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
217
218 table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
219 TEST_UTIL.flush(tableName);
220 Thread.sleep(2000);
221 assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON);
222
223 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
224 NoLimitCompactionThroughputController.class.getName());
225 regionServer.compactSplitThread.onConfigurationChange(conf);
226 assertTrue(throughputController.isStopped());
227 assertTrue(regionServer.compactSplitThread.getCompactionThroughputController() instanceof NoLimitCompactionThroughputController);
228 } finally {
229 conn.close();
230 TEST_UTIL.shutdownMiniCluster();
231 }
232 }
233
234
235
236
237 @Test
238 public void testGetCompactionPressureForStripedStore() throws Exception {
239 Configuration conf = TEST_UTIL.getConfiguration();
240 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
241 conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
242 conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
243 conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
244 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
245 TEST_UTIL.startMiniCluster(1);
246 Connection conn = ConnectionFactory.createConnection(conf);
247 try {
248 HTableDescriptor htd = new HTableDescriptor(tableName);
249 htd.addFamily(new HColumnDescriptor(family));
250 htd.setCompactionEnabled(false);
251 TEST_UTIL.getHBaseAdmin().createTable(htd);
252 TEST_UTIL.waitTableAvailable(tableName);
253 HStore store = (HStore) getStoreWithName(tableName);
254 assertEquals(0, store.getStorefilesCount());
255 assertEquals(0.0, store.getCompactionPressure(), EPSILON);
256 Table table = conn.getTable(tableName);
257 for (int i = 0; i < 4; i++) {
258 table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
259 table.put(new Put(Bytes.toBytes(100 + i)).add(family, qualifier, new byte[0]));
260 TEST_UTIL.flush(tableName);
261 }
262 assertEquals(8, store.getStorefilesCount());
263 assertEquals(0.0, store.getCompactionPressure(), EPSILON);
264
265 table.put(new Put(Bytes.toBytes(4)).add(family, qualifier, new byte[0]));
266 table.put(new Put(Bytes.toBytes(104)).add(family, qualifier, new byte[0]));
267 TEST_UTIL.flush(tableName);
268 assertEquals(10, store.getStorefilesCount());
269 assertEquals(0.5, store.getCompactionPressure(), EPSILON);
270
271 table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
272 table.put(new Put(Bytes.toBytes(105)).add(family, qualifier, new byte[0]));
273 TEST_UTIL.flush(tableName);
274 assertEquals(12, store.getStorefilesCount());
275 assertEquals(1.0, store.getCompactionPressure(), EPSILON);
276
277 table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
278 table.put(new Put(Bytes.toBytes(106)).add(family, qualifier, new byte[0]));
279 TEST_UTIL.flush(tableName);
280 assertEquals(14, store.getStorefilesCount());
281 assertEquals(2.0, store.getCompactionPressure(), EPSILON);
282 } finally {
283 conn.close();
284 TEST_UTIL.shutdownMiniCluster();
285 }
286 }
287 }