1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.hbase.*;
24 import org.apache.hadoop.hbase.client.*;
25 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
26 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
27 import org.apache.hadoop.hbase.testclassification.MediumTests;
28 import org.apache.hadoop.hbase.util.Bytes;
29 import org.junit.Assert;
30 import org.junit.Test;
31 import org.junit.experimental.categories.Category;
32
33 import static org.junit.Assert.assertEquals;
34 import static org.junit.Assert.assertTrue;
35
36 @Category(MediumTests.class)
37 public class TestCompactSplitThread {
38 private static final Log LOG = LogFactory.getLog(TestCompactSplitThread.class);
39 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
40 private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
41 private final byte[] family = Bytes.toBytes("f");
42
43 @Test
44 public void testThreadPoolSizeTuning() throws Exception {
45 Configuration conf = TEST_UTIL.getConfiguration();
46 conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 3);
47 conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 4);
48 conf.setInt(CompactSplitThread.SPLIT_THREADS, 5);
49 conf.setInt(CompactSplitThread.MERGE_THREADS, 6);
50 TEST_UTIL.startMiniCluster(1);
51 Connection conn = ConnectionFactory.createConnection(conf);
52 try {
53 HTableDescriptor htd = new HTableDescriptor(tableName);
54 htd.addFamily(new HColumnDescriptor(family));
55 htd.setCompactionEnabled(false);
56 TEST_UTIL.getHBaseAdmin().createTable(htd);
57 TEST_UTIL.waitTableAvailable(tableName);
58 HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
59
60
61 assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum());
62 assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum());
63 assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum());
64 assertEquals(6, regionServer.compactSplitThread.getMergeThreadNum());
65
66
67 conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 4);
68 conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 5);
69 conf.setInt(CompactSplitThread.SPLIT_THREADS, 6);
70 conf.setInt(CompactSplitThread.MERGE_THREADS, 7);
71 try {
72 regionServer.compactSplitThread.onConfigurationChange(conf);
73 } catch (IllegalArgumentException iae) {
74 Assert.fail("Update bigger configuration failed!");
75 }
76
77
78 assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum());
79 assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum());
80 assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum());
81 assertEquals(7, regionServer.compactSplitThread.getMergeThreadNum());
82
83
84 conf.setInt(CompactSplitThread.LARGE_COMPACTION_THREADS, 2);
85 conf.setInt(CompactSplitThread.SMALL_COMPACTION_THREADS, 3);
86 conf.setInt(CompactSplitThread.SPLIT_THREADS, 4);
87 conf.setInt(CompactSplitThread.MERGE_THREADS, 5);
88 try {
89 regionServer.compactSplitThread.onConfigurationChange(conf);
90 } catch (IllegalArgumentException iae) {
91 Assert.fail("Update smaller configuration failed!");
92 }
93
94
95 assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum());
96 assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum());
97 assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum());
98 assertEquals(5, regionServer.compactSplitThread.getMergeThreadNum());
99 } finally {
100 conn.close();
101 TEST_UTIL.shutdownMiniCluster();
102 }
103 }
104 }