1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.replication;
19
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.fail;
23
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.List;
27 import java.util.Random;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HColumnDescriptor;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.testclassification.LargeTests;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.client.Admin;
41 import org.apache.hadoop.hbase.client.Delete;
42 import org.apache.hadoop.hbase.client.Durability;
43 import org.apache.hadoop.hbase.client.Get;
44 import org.apache.hadoop.hbase.client.HBaseAdmin;
45 import org.apache.hadoop.hbase.client.HTable;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
50 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
51 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54 import org.apache.hadoop.hbase.regionserver.HRegionServer;
55 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
56 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
59 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
60 import org.junit.After;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64
65 import com.google.protobuf.ServiceException;
66
67 @Category(LargeTests.class)
68 public class TestMasterReplication {
69
70 private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
71
72 private Configuration baseConfiguration;
73
74 private HBaseTestingUtility[] utilities;
75 private Configuration[] configurations;
76 private MiniZooKeeperCluster miniZK;
77
78 private static final long SLEEP_TIME = 500;
79 private static final int NB_RETRIES = 10;
80
81 private static final TableName tableName = TableName.valueOf("test");
82 private static final byte[] famName = Bytes.toBytes("f");
83 private static final byte[] row = Bytes.toBytes("row");
84 private static final byte[] row1 = Bytes.toBytes("row1");
85 private static final byte[] row2 = Bytes.toBytes("row2");
86 private static final byte[] row3 = Bytes.toBytes("row3");
87 private static final byte[] row4 = Bytes.toBytes("row4");
88 private static final byte[] noRepfamName = Bytes.toBytes("norep");
89
90 private static final byte[] count = Bytes.toBytes("count");
91 private static final byte[] put = Bytes.toBytes("put");
92 private static final byte[] delete = Bytes.toBytes("delete");
93
94 private HTableDescriptor table;
95
96 @Before
97 public void setUp() throws Exception {
98 baseConfiguration = HBaseConfiguration.create();
99
100
101 baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
102 baseConfiguration.setInt("replication.source.size.capacity", 1024);
103 baseConfiguration.setLong("replication.source.sleepforretries", 100);
104 baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
105 baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
106 baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
107 HConstants.REPLICATION_ENABLE_DEFAULT);
108 baseConfiguration.setBoolean("dfs.support.append", true);
109 baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
110 baseConfiguration.setStrings(
111 CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
112 CoprocessorCounter.class.getName());
113
114 table = new HTableDescriptor(tableName);
115 HColumnDescriptor fam = new HColumnDescriptor(famName);
116 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
117 table.addFamily(fam);
118 fam = new HColumnDescriptor(noRepfamName);
119 table.addFamily(fam);
120 }
121
122
123
124
125
126
127
128 @Test(timeout = 300000)
129 public void testCyclicReplication1() throws Exception {
130 LOG.info("testSimplePutDelete");
131 int numClusters = 2;
132 Table[] htables = null;
133 try {
134 startMiniClusters(numClusters);
135 createTableOnClusters(table);
136
137 htables = getHTablesOnClusters(tableName);
138
139
140 addPeer("1", 0, 1);
141 addPeer("1", 1, 0);
142
143 int[] expectedCounts = new int[] { 2, 2 };
144
145
146
147 putAndWait(row, famName, htables[0], htables[1]);
148 putAndWait(row1, famName, htables[1], htables[0]);
149 validateCounts(htables, put, expectedCounts);
150
151 deleteAndWait(row, htables[0], htables[1]);
152 deleteAndWait(row1, htables[1], htables[0]);
153 validateCounts(htables, delete, expectedCounts);
154 } finally {
155 close(htables);
156 shutDownMiniClusters();
157 }
158 }
159
160
161
162
163
164
165
166
167
168 @Test(timeout = 300000)
169 public void testCyclicReplication2() throws Exception {
170 LOG.info("testCyclicReplication1");
171 int numClusters = 3;
172 Table[] htables = null;
173 try {
174 startMiniClusters(numClusters);
175 createTableOnClusters(table);
176
177
178 addPeer("1", 0, 1);
179 addPeer("1", 1, 2);
180 addPeer("1", 2, 0);
181
182 htables = getHTablesOnClusters(tableName);
183
184
185 putAndWait(row, famName, htables[0], htables[2]);
186 putAndWait(row1, famName, htables[1], htables[0]);
187 putAndWait(row2, famName, htables[2], htables[1]);
188
189 deleteAndWait(row, htables[0], htables[2]);
190 deleteAndWait(row1, htables[1], htables[0]);
191 deleteAndWait(row2, htables[2], htables[1]);
192
193 int[] expectedCounts = new int[] { 3, 3, 3 };
194 validateCounts(htables, put, expectedCounts);
195 validateCounts(htables, delete, expectedCounts);
196
197
198 disablePeer("1", 2);
199
200
201 putAndWait(row3, famName, htables[0], htables[1]);
202
203 htables[1].put(new Put(row4).add(famName, row4, row4));
204
205 enablePeer("1", 2);
206
207
208
209 wait(row4, htables[0], true);
210 } finally {
211 close(htables);
212 shutDownMiniClusters();
213 }
214 }
215
216
217
218
219 @Test(timeout = 300000)
220 public void testCyclicReplication3() throws Exception {
221 LOG.info("testCyclicReplication2");
222 int numClusters = 3;
223 Table[] htables = null;
224 try {
225 startMiniClusters(numClusters);
226 createTableOnClusters(table);
227
228
229 addPeer("1", 0, 1);
230 addPeer("1", 1, 2);
231 addPeer("1", 2, 1);
232
233 htables = getHTablesOnClusters(tableName);
234
235
236 putAndWait(row, famName, htables[0], htables[2]);
237 putAndWait(row1, famName, htables[1], htables[2]);
238 putAndWait(row2, famName, htables[2], htables[1]);
239
240 deleteAndWait(row, htables[0], htables[2]);
241 deleteAndWait(row1, htables[1], htables[2]);
242 deleteAndWait(row2, htables[2], htables[1]);
243
244 int[] expectedCounts = new int[] { 1, 3, 3 };
245 validateCounts(htables, put, expectedCounts);
246 validateCounts(htables, delete, expectedCounts);
247 } finally {
248 close(htables);
249 shutDownMiniClusters();
250 }
251 }
252
253
254
255
256
257 @Test(timeout = 180000, expected = ServiceException.class)
258 public void testReplicateWALEntryWhenReplicationIsDisabled() throws Exception {
259 LOG.info("testSimplePutDelete");
260 baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
261 Table[] htables = null;
262 try {
263 startMiniClusters(1);
264 createTableOnClusters(table);
265 htables = getHTablesOnClusters(tableName);
266
267 HRegionServer rs = utilities[0].getRSForFirstRegionInTable(tableName);
268 RSRpcServices rsrpc = new RSRpcServices(rs);
269 rsrpc.replicateWALEntry(null, null);
270 } finally {
271 close(htables);
272 shutDownMiniClusters();
273 }
274 }
275
276 @After
277 public void tearDown() throws IOException {
278 configurations = null;
279 utilities = null;
280 }
281
282 @SuppressWarnings("resource")
283 private void startMiniClusters(int numClusters) throws Exception {
284 Random random = new Random();
285 utilities = new HBaseTestingUtility[numClusters];
286 configurations = new Configuration[numClusters];
287 for (int i = 0; i < numClusters; i++) {
288 Configuration conf = new Configuration(baseConfiguration);
289 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
290 HBaseTestingUtility utility = new HBaseTestingUtility(conf);
291 if (i == 0) {
292 utility.startMiniZKCluster();
293 miniZK = utility.getZkCluster();
294 } else {
295 utility.setZkCluster(miniZK);
296 }
297 utility.startMiniCluster();
298 utilities[i] = utility;
299 configurations[i] = conf;
300 new ZooKeeperWatcher(conf, "cluster" + i, null, true);
301 }
302 }
303
304 private void shutDownMiniClusters() throws Exception {
305 int numClusters = utilities.length;
306 for (int i = numClusters - 1; i >= 0; i--) {
307 if (utilities[i] != null) {
308 utilities[i].shutdownMiniCluster();
309 }
310 }
311 miniZK.shutdown();
312 }
313
314 private void createTableOnClusters(HTableDescriptor table) throws Exception {
315 int numClusters = configurations.length;
316 for (int i = 0; i < numClusters; i++) {
317 Admin hbaseAdmin = null;
318 try {
319 hbaseAdmin = new HBaseAdmin(configurations[i]);
320 hbaseAdmin.createTable(table);
321 } finally {
322 close(hbaseAdmin);
323 }
324 }
325 }
326
327 private void addPeer(String id, int masterClusterNumber,
328 int slaveClusterNumber) throws Exception {
329 ReplicationAdmin replicationAdmin = null;
330 try {
331 replicationAdmin = new ReplicationAdmin(
332 configurations[masterClusterNumber]);
333 replicationAdmin.addPeer(id,
334 utilities[slaveClusterNumber].getClusterKey());
335 } finally {
336 close(replicationAdmin);
337 }
338 }
339
340 private void disablePeer(String id, int masterClusterNumber) throws Exception {
341 ReplicationAdmin replicationAdmin = null;
342 try {
343 replicationAdmin = new ReplicationAdmin(
344 configurations[masterClusterNumber]);
345 replicationAdmin.disablePeer(id);
346 } finally {
347 close(replicationAdmin);
348 }
349 }
350
351 private void enablePeer(String id, int masterClusterNumber) throws Exception {
352 ReplicationAdmin replicationAdmin = null;
353 try {
354 replicationAdmin = new ReplicationAdmin(
355 configurations[masterClusterNumber]);
356 replicationAdmin.enablePeer(id);
357 } finally {
358 close(replicationAdmin);
359 }
360 }
361
362 private void close(Closeable... closeables) {
363 try {
364 if (closeables != null) {
365 for (Closeable closeable : closeables) {
366 closeable.close();
367 }
368 }
369 } catch (Exception e) {
370 LOG.warn("Exception occured while closing the object:", e);
371 }
372 }
373
374 @SuppressWarnings("resource")
375 private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
376 int numClusters = utilities.length;
377 Table[] htables = new Table[numClusters];
378 for (int i = 0; i < numClusters; i++) {
379 Table htable = new HTable(configurations[i], tableName);
380 htable.setWriteBufferSize(1024);
381 htables[i] = htable;
382 }
383 return htables;
384 }
385
386 private void validateCounts(Table[] htables, byte[] type,
387 int[] expectedCounts) throws IOException {
388 for (int i = 0; i < htables.length; i++) {
389 assertEquals(Bytes.toString(type) + " were replicated back ",
390 expectedCounts[i], getCount(htables[i], type));
391 }
392 }
393
394 private int getCount(Table t, byte[] type) throws IOException {
395 Get test = new Get(row);
396 test.setAttribute("count", new byte[] {});
397 Result res = t.get(test);
398 return Bytes.toInt(res.getValue(count, type));
399 }
400
401 private void deleteAndWait(byte[] row, Table source, Table target)
402 throws Exception {
403 Delete del = new Delete(row);
404 source.delete(del);
405 wait(row, target, true);
406 }
407
408 private void putAndWait(byte[] row, byte[] fam, Table source, Table target)
409 throws Exception {
410 Put put = new Put(row);
411 put.add(fam, row, row);
412 source.put(put);
413 wait(row, target, false);
414 }
415
416 private void wait(byte[] row, Table target, boolean isDeleted)
417 throws Exception {
418 Get get = new Get(row);
419 for (int i = 0; i < NB_RETRIES; i++) {
420 if (i == NB_RETRIES - 1) {
421 fail("Waited too much time for replication. Row:" + Bytes.toString(row)
422 + ". IsDeleteReplication:" + isDeleted);
423 }
424 Result res = target.get(get);
425 boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0;
426 if (sleep) {
427 LOG.info("Waiting for more time for replication. Row:"
428 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
429 Thread.sleep(SLEEP_TIME);
430 } else {
431 if (!isDeleted) {
432 assertArrayEquals(res.value(), row);
433 }
434 LOG.info("Obtained row:"
435 + Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
436 break;
437 }
438 }
439 }
440
441
442
443
444
445 public static class CoprocessorCounter extends BaseRegionObserver {
446 private int nCount = 0;
447 private int nDelete = 0;
448
449 @Override
450 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
451 final WALEdit edit, final Durability durability) throws IOException {
452 nCount++;
453 }
454
455 @Override
456 public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
457 final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
458 nDelete++;
459 }
460
461 @Override
462 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
463 final Get get, final List<Cell> result) throws IOException {
464 if (get.getAttribute("count") != null) {
465 result.clear();
466
467 result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
468 result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
469 c.bypass();
470 }
471 }
472 }
473
474 }