1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.junit.Assert.*;
22
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.commons.logging.impl.Log4JLogger;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.Waiter.Predicate;
39 import org.apache.hadoop.hbase.client.Admin;
40 import org.apache.hadoop.hbase.client.Connection;
41 import org.apache.hadoop.hbase.client.ConnectionFactory;
42 import org.apache.hadoop.hbase.client.Consistency;
43 import org.apache.hadoop.hbase.client.Get;
44 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
45 import org.apache.hadoop.hbase.client.Table;
46 import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
47 import org.apache.hadoop.hbase.testclassification.LargeTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
50 import org.apache.hadoop.hbase.util.Threads;
51 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
52 import org.apache.log4j.Level;
53 import org.junit.After;
54 import org.junit.Before;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.experimental.categories.Category;
58 import org.junit.rules.TestName;
59 import org.junit.runner.RunWith;
60 import org.junit.runners.Parameterized;
61 import org.junit.runners.Parameterized.Parameters;
62
63
64
65
66 @RunWith(Parameterized.class)
67 @Category(LargeTests.class)
68 public class TestRegionReplicaFailover {
69
70 private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
71
72 static {
73 ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
74 }
75
76 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
77
78 private static final int NB_SERVERS = 3;
79
80 protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1,
81 HBaseTestingUtility.fam2, HBaseTestingUtility.fam3};
82 protected final byte[] fam = HBaseTestingUtility.fam1;
83 protected final byte[] qual1 = Bytes.toBytes("qual1");
84 protected final byte[] value1 = Bytes.toBytes("value1");
85 protected final byte[] row = Bytes.toBytes("rowA");
86 protected final byte[] row2 = Bytes.toBytes("rowB");
87
88 @Rule public TestName name = new TestName();
89
90 private HTableDescriptor htd;
91
92
93
94
95 @Parameters
96 public static Collection<Object[]> getParameters() {
97 Object[][] params =
98 new Boolean[][] {
99 return Arrays.asList(params);
100 }
101
102 @Parameterized.Parameter(0)
103 public boolean distributedLogReplay;
104
105 @Before
106 public void before() throws Exception {
107 Configuration conf = HTU.getConfiguration();
108
109 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
110 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
111 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
112 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
113 conf.setInt("replication.stats.thread.period.seconds", 5);
114 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
115 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
116
117 HTU.startMiniCluster(NB_SERVERS);
118 htd = HTU.createTableDescriptor(
119 name.getMethodName().substring(0, name.getMethodName().length()-3));
120 htd.setRegionReplication(3);
121 HTU.getHBaseAdmin().createTable(htd);
122 }
123
124 @After
125 public void after() throws Exception {
126 HTU.deleteTableIfAny(htd.getTableName());
127 HTU.shutdownMiniCluster();
128 }
129
130
131
132
133
134 @Test(timeout = 60000)
135 public void testSecondaryRegionWithEmptyRegion() throws IOException {
136
137
138 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
139 Table table = connection.getTable(htd.getTableName())) {
140
141 Get get = new Get(row);
142 get.setConsistency(Consistency.TIMELINE);
143 get.setReplicaId(1);
144 table.get(get);
145 }
146 }
147
148
149
150
151
152
153 @Test(timeout = 60000)
154 public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
155
156
157 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
158 Table table = connection.getTable(htd.getTableName())) {
159
160 HTU.loadNumericRows(table, fam, 0, 1000);
161
162 HTU.getHBaseAdmin().disableTable(htd.getTableName());
163 HTU.getHBaseAdmin().enableTable(htd.getTableName());
164
165 HTU.verifyNumericRows(table, fam, 0, 1000, 1);
166 }
167 }
168
169
170
171
172 @Test (timeout = 120000)
173 public void testPrimaryRegionKill() throws Exception {
174 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
175 Table table = connection.getTable(htd.getTableName())) {
176
177 HTU.loadNumericRows(table, fam, 0, 1000);
178
179
180 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
181 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
182
183
184
185
186 boolean aborted = false;
187 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
188 for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
189 if (r.getRegionInfo().getReplicaId() == 0) {
190 LOG.info("Aborting region server hosting primary region replica");
191 rs.getRegionServer().abort("for test");
192 aborted = true;
193 }
194 }
195 }
196 assertTrue(aborted);
197
198
199 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
200 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
201 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
202 }
203
204
205 HTU.getMiniHBaseCluster().startRegionServer();
206 }
207
208
209
210 private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
211 final int endRow, final int replicaId, final long timeout) throws Exception {
212 try {
213 HTU.waitFor(timeout, new Predicate<Exception>() {
214 @Override
215 public boolean evaluate() throws Exception {
216 try {
217 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
218 return true;
219 } catch (AssertionError ae) {
220 return false;
221 }
222 }
223 });
224 } catch (Throwable t) {
225
226 HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
227 }
228 }
229
230
231
232
233
234 @Test (timeout = 120000)
235 public void testSecondaryRegionKill() throws Exception {
236 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
237 Table table = connection.getTable(htd.getTableName())) {
238 HTU.loadNumericRows(table, fam, 0, 1000);
239
240
241 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
242 verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
243
244
245
246
247 boolean aborted = false;
248 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
249 for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
250 if (r.getRegionInfo().getReplicaId() == 1) {
251 LOG.info("Aborting region server hosting secondary region replica");
252 rs.getRegionServer().abort("for test");
253 aborted = true;
254 }
255 }
256 }
257 assertTrue(aborted);
258
259 Threads.sleep(5000);
260
261 HTU.verifyNumericRows(table, fam, 0, 1000, 1);
262 HTU.verifyNumericRows(table, fam, 0, 1000, 2);
263 }
264
265
266 HTU.getMiniHBaseCluster().startRegionServer();
267 }
268
269
270
271
272
273
274 @Test (timeout = 120000)
275 public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
276 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
277 Table table = connection.getTable(htd.getTableName());
278 Admin admin = connection.getAdmin()) {
279
280 HTU.loadNumericRows(table, fam, 0, 1000);
281 admin.flush(table.getName());
282 HTU.loadNumericRows(table, fam, 1000, 2000);
283
284 final AtomicReference<Throwable> ex = new AtomicReference<Throwable>(null);
285 final AtomicBoolean done = new AtomicBoolean(false);
286 final AtomicInteger key = new AtomicInteger(2000);
287
288 Thread loader = new Thread() {
289 @Override
290 public void run() {
291 while (!done.get()) {
292 try {
293 HTU.loadNumericRows(table, fam, key.get(), key.get()+1000);
294 key.addAndGet(1000);
295 } catch (Throwable e) {
296 ex.compareAndSet(null, e);
297 }
298 }
299 }
300 };
301 loader.start();
302
303 Thread aborter = new Thread() {
304 @Override
305 public void run() {
306 try {
307 boolean aborted = false;
308 for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
309 for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
310 if (r.getRegionInfo().getReplicaId() == 1) {
311 LOG.info("Aborting region server hosting secondary region replica");
312 rs.getRegionServer().abort("for test");
313 aborted = true;
314 }
315 }
316 }
317 assertTrue(aborted);
318 } catch (Throwable e) {
319 ex.compareAndSet(null, e);
320 }
321 };
322 };
323
324 aborter.start();
325 aborter.join();
326 done.set(true);
327 loader.join();
328
329 assertNull(ex.get());
330
331 assertTrue(key.get() > 1000);
332 LOG.info("Loaded up to key :" + key.get());
333 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
334 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
335 verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
336 }
337
338
339 HTU.getMiniHBaseCluster().startRegionServer();
340 }
341
342
343
344
345
346 @Test (timeout = 120000)
347 public void testLotsOfRegionReplicas() throws IOException {
348 int numRegions = NB_SERVERS * 20;
349 int regionReplication = 10;
350 String tableName = htd.getTableName().getNameAsString() + "2";
351 htd = HTU.createTableDescriptor(tableName);
352 htd.setRegionReplication(regionReplication);
353
354
355 byte[] startKey = Bytes.toBytes("aaa");
356 byte[] endKey = Bytes.toBytes("zzz");
357 byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
358 HTU.getHBaseAdmin().createTable(htd, startKey, endKey, numRegions);
359
360 try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
361 Table table = connection.getTable(htd.getTableName())) {
362
363 for (int i = 1; i < splits.length; i++) {
364 for (int j = 0; j < regionReplication; j++) {
365 Get get = new Get(splits[i]);
366 get.setConsistency(Consistency.TIMELINE);
367 get.setReplicaId(j);
368 table.get(get);
369 }
370 }
371 }
372
373 HTU.deleteTableIfAny(TableName.valueOf(tableName));
374 }
375 }