1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.apache.hadoop.hbase.SplitLogCounters.resetCounters;
22 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_heartbeat;
23 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_node_create_queued;
24 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_orphan_task_acquired;
25 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan;
26 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_rescan_deleted;
27 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit;
28 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_dead_server_task;
29 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_failed;
30 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_force;
31 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_threshold_reached;
32 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_resubmit_unassigned;
33 import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_task_deleted;
34 import static org.junit.Assert.assertEquals;
35 import static org.junit.Assert.assertFalse;
36 import static org.junit.Assert.assertTrue;
37
38 import java.io.IOException;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.UUID;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.FileSystem;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.hbase.ChoreService;
50 import org.apache.hadoop.hbase.CoordinatedStateManager;
51 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
52 import org.apache.hadoop.hbase.HBaseTestingUtility;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.testclassification.MediumTests;
56 import org.apache.hadoop.hbase.Server;
57 import org.apache.hadoop.hbase.ServerName;
58 import org.apache.hadoop.hbase.SplitLogCounters;
59 import org.apache.hadoop.hbase.SplitLogTask;
60 import org.apache.hadoop.hbase.Stoppable;
61 import org.apache.hadoop.hbase.Waiter;
62 import org.apache.hadoop.hbase.client.ClusterConnection;
63 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
64 import org.apache.hadoop.hbase.master.SplitLogManager.Task;
65 import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
66 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
67 import org.apache.hadoop.hbase.regionserver.TestMasterAddressTracker.NodeCreationListener;
68 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
69 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
70 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
72 import org.apache.log4j.Level;
73 import org.apache.log4j.Logger;
74 import org.apache.zookeeper.CreateMode;
75 import org.apache.zookeeper.KeeperException;
76 import org.apache.zookeeper.ZooDefs.Ids;
77 import org.junit.After;
78 import org.junit.Assert;
79 import org.junit.Before;
80 import org.junit.Ignore;
81 import org.junit.Test;
82 import org.junit.experimental.categories.Category;
83 import org.mockito.Mockito;
84
85 @Category(MediumTests.class)
86 public class TestSplitLogManager {
87 private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
88 private final ServerName DUMMY_MASTER = ServerName.valueOf("dummy-master,1,1");
89 private final ServerManager sm = Mockito.mock(ServerManager.class);
90 private final MasterServices master = Mockito.mock(MasterServices.class);
91
92 static {
93 Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
94 }
95
96 private ZooKeeperWatcher zkw;
97 private DummyServer ds;
98 private static boolean stopped = false;
99 private SplitLogManager slm;
100 private Configuration conf;
101 private int to;
102 private RecoveryMode mode;
103
104 private static HBaseTestingUtility TEST_UTIL;
105
106 class DummyServer implements Server {
107 private ZooKeeperWatcher zkw;
108 private Configuration conf;
109 private CoordinatedStateManager cm;
110
111 public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
112 this.zkw = zkw;
113 this.conf = conf;
114 cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
115 cm.initialize(this);
116 }
117
118 @Override
119 public void abort(String why, Throwable e) {
120 }
121
122 @Override
123 public boolean isAborted() {
124 return false;
125 }
126
127 @Override
128 public void stop(String why) {
129 }
130
131 @Override
132 public boolean isStopped() {
133 return false;
134 }
135
136 @Override
137 public Configuration getConfiguration() {
138 return conf;
139 }
140
141 @Override
142 public ZooKeeperWatcher getZooKeeper() {
143 return zkw;
144 }
145
146 @Override
147 public ServerName getServerName() {
148 return null;
149 }
150
151 @Override
152 public CoordinatedStateManager getCoordinatedStateManager() {
153 return cm;
154 }
155
156 @Override
157 public ClusterConnection getConnection() {
158 return null;
159 }
160
161 @Override
162 public MetaTableLocator getMetaTableLocator() {
163 return null;
164 }
165
166 @Override
167 public ChoreService getChoreService() {
168 return null;
169 }
170 }
171
172 static Stoppable stopper = new Stoppable() {
173 @Override
174 public void stop(String why) {
175 stopped = true;
176 }
177
178 @Override
179 public boolean isStopped() {
180 return stopped;
181 }
182 };
183
184 @Before
185 public void setup() throws Exception {
186 TEST_UTIL = new HBaseTestingUtility();
187 TEST_UTIL.startMiniZKCluster();
188 conf = TEST_UTIL.getConfiguration();
189
190 zkw =
191 new ZooKeeperWatcher(conf, "split-log-manager-tests" + UUID.randomUUID().toString(), null);
192 ds = new DummyServer(zkw, conf);
193
194 ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
195 ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
196 assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
197 LOG.debug(zkw.baseZNode + " created");
198 ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
199 assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
200 LOG.debug(zkw.splitLogZNode + " created");
201
202 stopped = false;
203 resetCounters();
204
205
206
207 Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
208 Mockito.when(master.getServerManager()).thenReturn(sm);
209
210 to = 12000;
211 conf.setInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, to);
212 conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
213
214 conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
215 to = to + 4 * 100;
216
217 this.mode =
218 (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY
219 : RecoveryMode.LOG_SPLITTING);
220 }
221
222 @After
223 public void teardown() throws IOException, KeeperException {
224 stopper.stop("");
225 if (slm != null) slm.stop();
226 TEST_UTIL.shutdownMiniZKCluster();
227 }
228
229 private interface Expr {
230 long eval();
231 }
232
233 private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems)
234 throws Exception {
235 Expr e = new Expr() {
236 @Override
237 public long eval() {
238 return ctr.get();
239 }
240 };
241 waitForCounter(e, oldval, newval, timems);
242 return;
243 }
244
245 private void waitForCounter(final Expr e, final long oldval, long newval, long timems)
246 throws Exception {
247
248 TEST_UTIL.waitFor(timems, 10, new Waiter.Predicate<Exception>() {
249 @Override
250 public boolean evaluate() throws Exception {
251 return (e.eval() != oldval);
252 }
253 });
254
255 assertEquals(newval, e.eval());
256 }
257
258 private String submitTaskAndWait(TaskBatch batch, String name) throws KeeperException,
259 InterruptedException {
260 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
261 NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
262 zkw.registerListener(listener);
263 ZKUtil.watchAndCheckExists(zkw, tasknode);
264
265 slm.enqueueSplitTask(name, batch);
266 assertEquals(1, batch.installed);
267 assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
268 assertEquals(1L, tot_mgr_node_create_queued.get());
269
270 LOG.debug("waiting for task node creation");
271 listener.waitForCreation();
272 LOG.debug("task created");
273 return tasknode;
274 }
275
276
277
278
279
280 @Test (timeout=180000)
281 public void testTaskCreation() throws Exception {
282
283 LOG.info("TestTaskCreation - test the creation of a task in zk");
284 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
285 TaskBatch batch = new TaskBatch();
286
287 String tasknode = submitTaskAndWait(batch, "foo/1");
288
289 byte[] data = ZKUtil.getData(zkw, tasknode);
290 SplitLogTask slt = SplitLogTask.parseFrom(data);
291 LOG.info("Task node created " + slt.toString());
292 assertTrue(slt.isUnassigned(DUMMY_MASTER));
293 }
294
295 @Test (timeout=180000)
296 public void testOrphanTaskAcquisition() throws Exception {
297 LOG.info("TestOrphanTaskAcquisition");
298
299 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
300 SplitLogTask slt = new SplitLogTask.Owned(DUMMY_MASTER, this.mode);
301 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
302 CreateMode.PERSISTENT);
303
304 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
305 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
306 Task task = slm.findOrCreateOrphanTask(tasknode);
307 assertTrue(task.isOrphan());
308 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
309 assertFalse(task.isUnassigned());
310 long curt = System.currentTimeMillis();
311 assertTrue((task.last_update <= curt) &&
312 (task.last_update > (curt - 1000)));
313 LOG.info("waiting for manager to resubmit the orphan task");
314 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
315 assertTrue(task.isUnassigned());
316 waitForCounter(tot_mgr_rescan, 0, 1, to + to/2);
317 }
318
319 @Test (timeout=180000)
320 public void testUnassignedOrphan() throws Exception {
321 LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
322 " startup");
323 String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
324
325 SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, this.mode);
326 zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
327 CreateMode.PERSISTENT);
328 int version = ZKUtil.checkExists(zkw, tasknode);
329
330 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
331 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
332 Task task = slm.findOrCreateOrphanTask(tasknode);
333 assertTrue(task.isOrphan());
334 assertTrue(task.isUnassigned());
335
336 waitForCounter(tot_mgr_rescan, 0, 1, to/2);
337 Task task2 = slm.findOrCreateOrphanTask(tasknode);
338 assertTrue(task == task2);
339 LOG.debug("task = " + task);
340 assertEquals(1L, tot_mgr_resubmit.get());
341 assertEquals(1, task.incarnation.get());
342 assertEquals(0, task.unforcedResubmits.get());
343 assertTrue(task.isOrphan());
344 assertTrue(task.isUnassigned());
345 assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);
346 }
347
348 @Test (timeout=180000)
349 public void testMultipleResubmits() throws Exception {
350 LOG.info("TestMultipleResbmits - no indefinite resubmissions");
351 conf.setInt("hbase.splitlog.max.resubmit", 2);
352 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
353 TaskBatch batch = new TaskBatch();
354
355 String tasknode = submitTaskAndWait(batch, "foo/1");
356 int version = ZKUtil.checkExists(zkw, tasknode);
357 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
358 final ServerName worker2 = ServerName.valueOf("worker2,1,1");
359 final ServerName worker3 = ServerName.valueOf("worker3,1,1");
360 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
361 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
362 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
363 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
364 int version1 = ZKUtil.checkExists(zkw, tasknode);
365 assertTrue(version1 > version);
366 slt = new SplitLogTask.Owned(worker2, this.mode);
367 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
368 waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
369 waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
370 int version2 = ZKUtil.checkExists(zkw, tasknode);
371 assertTrue(version2 > version1);
372 slt = new SplitLogTask.Owned(worker3, this.mode);
373 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
374 waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
375 waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
376 Thread.sleep(to + to/2);
377 assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get());
378 }
379
380 @Test (timeout=180000)
381 public void testRescanCleanup() throws Exception {
382 LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
383
384 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
385 TaskBatch batch = new TaskBatch();
386
387 String tasknode = submitTaskAndWait(batch, "foo/1");
388 int version = ZKUtil.checkExists(zkw, tasknode);
389 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
390 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
391 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
392 waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
393 waitForCounter(new Expr() {
394 @Override
395 public long eval() {
396 return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
397 }
398 }, 0, 1, 5*60000);
399 Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get());
400 int version1 = ZKUtil.checkExists(zkw, tasknode);
401 assertTrue(version1 > version);
402 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
403 slt = SplitLogTask.parseFrom(taskstate);
404 assertTrue(slt.isUnassigned(DUMMY_MASTER));
405
406 waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
407 }
408
409 @Test (timeout=180000)
410 public void testTaskDone() throws Exception {
411 LOG.info("TestTaskDone - cleanup task node once in DONE state");
412
413 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
414 TaskBatch batch = new TaskBatch();
415 String tasknode = submitTaskAndWait(batch, "foo/1");
416 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
417 SplitLogTask slt = new SplitLogTask.Done(worker1, this.mode);
418 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
419 synchronized (batch) {
420 while (batch.installed != batch.done) {
421 batch.wait();
422 }
423 }
424 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
425 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
426 }
427
428 @Test (timeout=180000)
429 public void testTaskErr() throws Exception {
430 LOG.info("TestTaskErr - cleanup task node once in ERR state");
431
432 conf.setInt("hbase.splitlog.max.resubmit", 0);
433 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
434 TaskBatch batch = new TaskBatch();
435
436 String tasknode = submitTaskAndWait(batch, "foo/1");
437 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
438 SplitLogTask slt = new SplitLogTask.Err(worker1, this.mode);
439 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
440
441 synchronized (batch) {
442 while (batch.installed != batch.error) {
443 batch.wait();
444 }
445 }
446 waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
447 assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
448 conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
449 }
450
451 @Test (timeout=180000)
452 public void testTaskResigned() throws Exception {
453 LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
454 assertEquals(tot_mgr_resubmit.get(), 0);
455 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
456 assertEquals(tot_mgr_resubmit.get(), 0);
457 TaskBatch batch = new TaskBatch();
458 String tasknode = submitTaskAndWait(batch, "foo/1");
459 assertEquals(tot_mgr_resubmit.get(), 0);
460 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
461 assertEquals(tot_mgr_resubmit.get(), 0);
462 SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
463 assertEquals(tot_mgr_resubmit.get(), 0);
464 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
465 int version = ZKUtil.checkExists(zkw, tasknode);
466
467 if (tot_mgr_resubmit.get() == 0) {
468 waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
469 }
470 assertEquals(tot_mgr_resubmit.get(), 1);
471
472 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
473 slt = SplitLogTask.parseFrom(taskstate);
474 assertTrue(slt.isUnassigned(DUMMY_MASTER));
475 }
476
477 @Test (timeout=180000)
478 public void testUnassignedTimeout() throws Exception {
479 LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
480 " resubmit");
481
482
483 String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
484 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
485 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
486 zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
487 CreateMode.PERSISTENT);
488
489 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
490 waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
491
492
493 TaskBatch batch = new TaskBatch();
494 submitTaskAndWait(batch, "foo/1");
495
496
497 for (int i = 0; i < (3 * to)/100; i++) {
498 Thread.sleep(100);
499 final ServerName worker2 = ServerName.valueOf("worker1,1,1");
500 slt = new SplitLogTask.Owned(worker2, this.mode);
501 ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
502 }
503
504
505
506 LOG.info("waiting for manager to resubmit the orphan task");
507 waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
508
509
510 waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
511 }
512
513 @Test (timeout=180000)
514 public void testDeadWorker() throws Exception {
515 LOG.info("testDeadWorker");
516
517 conf.setLong("hbase.splitlog.max.resubmit", 0);
518 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
519 TaskBatch batch = new TaskBatch();
520
521 String tasknode = submitTaskAndWait(batch, "foo/1");
522 int version = ZKUtil.checkExists(zkw, tasknode);
523 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
524 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
525 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
526 if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
527 slm.handleDeadWorker(worker1);
528 if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
529 if (tot_mgr_resubmit_dead_server_task.get() == 0) {
530 waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
531 }
532
533 int version1 = ZKUtil.checkExists(zkw, tasknode);
534 assertTrue(version1 > version);
535 byte[] taskstate = ZKUtil.getData(zkw, tasknode);
536 slt = SplitLogTask.parseFrom(taskstate);
537 assertTrue(slt.isUnassigned(DUMMY_MASTER));
538 return;
539 }
540
541 @Test (timeout=180000)
542 public void testWorkerCrash() throws Exception {
543 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
544 TaskBatch batch = new TaskBatch();
545
546 String tasknode = submitTaskAndWait(batch, "foo/1");
547 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
548
549 SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
550 ZKUtil.setData(zkw, tasknode, slt.toByteArray());
551 if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
552
553
554 Assert.assertEquals(0, tot_mgr_resubmit.get());
555
556
557 Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
558
559 Thread.sleep(1300);
560
561
562 Assert.assertEquals(1, tot_mgr_resubmit.get());
563 }
564
565 @Test (timeout=180000)
566 public void testEmptyLogDir() throws Exception {
567 LOG.info("testEmptyLogDir");
568 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
569 FileSystem fs = TEST_UTIL.getTestFileSystem();
570 Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
571 UUID.randomUUID().toString());
572 fs.mkdirs(emptyLogDirPath);
573 slm.splitLogDistributed(emptyLogDirPath);
574 assertFalse(fs.exists(emptyLogDirPath));
575 }
576
577 @Test (timeout = 60000)
578 public void testLogFilesAreArchived() throws Exception {
579 LOG.info("testLogFilesAreArchived");
580 final SplitLogManager slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
581 FileSystem fs = TEST_UTIL.getTestFileSystem();
582 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testLogFilesAreArchived");
583 conf.set(HConstants.HBASE_DIR, dir.toString());
584 Path logDirPath = new Path(dir, UUID.randomUUID().toString());
585 fs.mkdirs(logDirPath);
586
587 String logFile = ServerName.valueOf("foo", 1, 1).toString();
588 fs.create(new Path(logDirPath, logFile)).close();
589
590
591 new Thread() {
592 @Override
593 public void run() {
594 boolean done = false;
595 while (!done) {
596 for (Map.Entry<String, Task> entry : slm.getTasks().entrySet()) {
597 final ServerName worker1 = ServerName.valueOf("worker1,1,1");
598 SplitLogTask slt = new SplitLogTask.Done(worker1, RecoveryMode.LOG_SPLITTING);
599 boolean encounteredZKException = false;
600 try {
601 ZKUtil.setData(zkw, entry.getKey(), slt.toByteArray());
602 } catch (KeeperException e) {
603 LOG.warn(e);
604 encounteredZKException = true;
605 }
606 if (!encounteredZKException) {
607 done = true;
608 }
609 }
610 }
611 };
612 }.start();
613
614 slm.splitLogDistributed(logDirPath);
615
616 assertFalse(fs.exists(logDirPath));
617 }
618
619
620
621
622
623
624 @Test(timeout = 300000)
625 public void testRecoveryRegionRemovedFromZK() throws Exception {
626 LOG.info("testRecoveryRegionRemovedFromZK");
627 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
628 String nodePath =
629 ZKUtil.joinZNode(zkw.recoveringRegionsZNode,
630 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
631 ZKUtil.createSetData(zkw, nodePath, ZKUtil.positionToByteArray(0L));
632
633 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
634 slm.removeStaleRecoveringRegions(null);
635
636 List<String> recoveringRegions =
637 zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
638
639 assertTrue("Recovery regions isn't cleaned", recoveringRegions.isEmpty());
640 }
641
642 @Ignore("DLR is broken by HBASE-12751") @Test(timeout=60000)
643 public void testGetPreviousRecoveryMode() throws Exception {
644 LOG.info("testGetPreviousRecoveryMode");
645 SplitLogCounters.resetCounters();
646
647
648 conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
649
650 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"),
651 new SplitLogTask.Unassigned(
652 ServerName.valueOf("mgr,1,1"), RecoveryMode.LOG_SPLITTING).toByteArray(),
653 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
654
655 slm = new SplitLogManager(ds, conf, stopper, master, DUMMY_MASTER);
656 LOG.info("Mode1=" + slm.getRecoveryMode());
657 assertTrue(slm.isLogSplitting());
658 zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
659 LOG.info("Mode2=" + slm.getRecoveryMode());
660 slm.setRecoveryMode(false);
661 LOG.info("Mode3=" + slm.getRecoveryMode());
662 assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying());
663 }
664 }