1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.HBaseTestingUtility;
28 import org.apache.hadoop.hbase.HRegionLocation;
29 import org.apache.hadoop.hbase.MiniHBaseCluster;
30 import org.apache.hadoop.hbase.ServerName;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.regionserver.HRegionServer;
33 import org.apache.hadoop.hbase.testclassification.ClientTests;
34 import org.apache.hadoop.hbase.testclassification.LargeTests;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.Pair;
37 import org.junit.AfterClass;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 import org.junit.experimental.categories.Category;
41
42 import static org.junit.Assert.assertEquals;
43 import static org.junit.Assert.assertNotNull;
44 import static org.junit.Assert.assertTrue;
45
46 @Category({ LargeTests.class, ClientTests.class })
47 public class TestHTableMultiplexerFlushCache {
48 private static final Log LOG = LogFactory.getLog(TestHTableMultiplexerFlushCache.class);
49 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
50 private static byte[] FAMILY = Bytes.toBytes("testFamily");
51 private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1");
52 private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2");
53 private static byte[] VALUE1 = Bytes.toBytes("testValue1");
54 private static byte[] VALUE2 = Bytes.toBytes("testValue2");
55 private static int SLAVES = 3;
56 private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
57
58
59
60
61 @BeforeClass
62 public static void setUpBeforeClass() throws Exception {
63 TEST_UTIL.startMiniCluster(SLAVES);
64 }
65
66
67
68
69 @AfterClass
70 public static void tearDownAfterClass() throws Exception {
71 TEST_UTIL.shutdownMiniCluster();
72 }
73
74 private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality,
75 byte[] value) throws Exception {
76
77 Result r;
78 Get get = new Get(row);
79 get.addColumn(family, quality);
80 int nbTry = 0;
81 do {
82 assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50);
83 nbTry++;
84 Thread.sleep(100);
85 r = htable.get(get);
86 } while (r == null || r.getValue(family, quality) == null);
87 assertEquals("value", Bytes.toStringBinary(value),
88 Bytes.toStringBinary(r.getValue(family, quality)));
89 }
90
91 @Test
92 public void testOnRegionChange() throws Exception {
93 TableName TABLE = TableName.valueOf("testOnRegionChange");
94 final int NUM_REGIONS = 10;
95 HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
96 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
97
98 HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
99 PER_REGIONSERVER_QUEUE_SIZE);
100
101 byte[][] startRows = htable.getStartKeys();
102 byte[] row = startRows[1];
103 assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
104
105 Put put = new Put(row).add(FAMILY, QUALIFIER1, VALUE1);
106 assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
107
108 checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
109
110
111 HRegionLocation loc = htable.getRegionLocation(row);
112 MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
113 hbaseCluster.stopRegionServer(loc.getServerName());
114 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
115
116
117 put = new Put(row).add(FAMILY, QUALIFIER2, VALUE2);
118 assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
119
120 checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
121 }
122
123 @Test
124 public void testOnRegionMove() throws Exception {
125
126
127
128
129 TableName TABLE = TableName.valueOf("testOnRegionMove");
130 final int NUM_REGIONS = 10;
131 HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
132 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
133
134 HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
135 PER_REGIONSERVER_QUEUE_SIZE);
136
137 final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
138 Pair<byte[][],byte[][]> startEndRows = regionLocator.getStartEndKeys();
139 byte[] row = startEndRows.getFirst()[1];
140 assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
141
142 Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1);
143 assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
144
145 checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
146
147 final HRegionLocation loc = regionLocator.getRegionLocation(row);
148 final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
149
150 final ServerName originalServer = loc.getServerName();
151 ServerName newServer = null;
152
153 for (int i = 0; i < SLAVES; i++) {
154 HRegionServer rs = hbaseCluster.getRegionServer(0);
155 if (!rs.getServerName().equals(originalServer.getServerName())) {
156 newServer = rs.getServerName();
157 break;
158 }
159 }
160 assertNotNull("Did not find a new RegionServer to use", newServer);
161
162
163 LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer
164 + " to " + newServer);
165 TEST_UTIL.getHBaseAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(),
166 Bytes.toBytes(newServer.getServerName()));
167
168 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
169
170
171 put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2);
172 assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
173
174
175 checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
176 }
177 }