1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.hamcrest.CoreMatchers.is;
22 import static org.hamcrest.CoreMatchers.not;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertThat;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.when;
28
29 import java.util.List;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.ChoreService;
36 import org.apache.hadoop.hbase.CoordinatedStateManager;
37 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.testclassification.MediumTests;
42 import org.apache.hadoop.hbase.Server;
43 import org.apache.hadoop.hbase.ServerName;
44 import org.apache.hadoop.hbase.SplitLogCounters;
45 import org.apache.hadoop.hbase.SplitLogTask;
46 import org.apache.hadoop.hbase.Waiter;
47 import org.apache.hadoop.hbase.client.ClusterConnection;
48 import org.apache.hadoop.hbase.executor.ExecutorService;
49 import org.apache.hadoop.hbase.executor.ExecutorType;
50 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
51 import org.apache.hadoop.hbase.util.CancelableProgressable;
52 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
53 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
54 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
55 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
56 import org.apache.log4j.Level;
57 import org.apache.log4j.Logger;
58 import org.apache.zookeeper.CreateMode;
59 import org.apache.zookeeper.ZooDefs.Ids;
60 import org.junit.After;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64
65 @Category(MediumTests.class)
66 public class TestSplitLogWorker {
67 private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
68 private static final int WAIT_TIME = 15000;
69 private final ServerName MANAGER = ServerName.valueOf("manager,1,1");
70 static {
71 Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
72 }
73 private final static HBaseTestingUtility TEST_UTIL =
74 new HBaseTestingUtility();
75 private DummyServer ds;
76 private ZooKeeperWatcher zkw;
77 private SplitLogWorker slw;
78 private ExecutorService executorService;
79 private RecoveryMode mode;
80
81 class DummyServer implements Server {
82 private ZooKeeperWatcher zkw;
83 private Configuration conf;
84 private CoordinatedStateManager cm;
85
86 public DummyServer(ZooKeeperWatcher zkw, Configuration conf) {
87 this.zkw = zkw;
88 this.conf = conf;
89 cm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
90 cm.initialize(this);
91 }
92
93 @Override
94 public void abort(String why, Throwable e) {
95 }
96
97 @Override
98 public boolean isAborted() {
99 return false;
100 }
101
102 @Override
103 public void stop(String why) {
104 }
105
106 @Override
107 public boolean isStopped() {
108 return false;
109 }
110
111 @Override
112 public Configuration getConfiguration() {
113 return conf;
114 }
115
116 @Override
117 public ZooKeeperWatcher getZooKeeper() {
118 return zkw;
119 }
120
121 @Override
122 public ServerName getServerName() {
123 return null;
124 }
125
126 @Override
127 public CoordinatedStateManager getCoordinatedStateManager() {
128 return cm;
129 }
130
131 @Override
132 public ClusterConnection getConnection() {
133 return null;
134 }
135
136 @Override
137 public MetaTableLocator getMetaTableLocator() {
138 return null;
139 }
140
141 @Override
142 public ChoreService getChoreService() {
143 return null;
144 }
145 }
146
147 private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems)
148 throws Exception {
149 assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval,
150 waitForCounterBoolean(ctr, oldval, newval, timems));
151 }
152
153 private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval,
154 long timems) throws Exception {
155
156 return waitForCounterBoolean(ctr, oldval, newval, timems, true);
157 }
158
159 private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval,
160 long timems, boolean failIfTimeout) throws Exception {
161
162 long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
163 new Waiter.Predicate<Exception>() {
164 @Override
165 public boolean evaluate() throws Exception {
166 return (ctr.get() >= newval);
167 }
168 });
169
170 if( timeWaited > 0) {
171
172 assertEquals(newval, ctr.get());
173 }
174 return true;
175 }
176
177 @Before
178 public void setup() throws Exception {
179 TEST_UTIL.startMiniZKCluster();
180 Configuration conf = TEST_UTIL.getConfiguration();
181 zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
182 "split-log-worker-tests", null);
183 ds = new DummyServer(zkw, conf);
184 ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
185 ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
186 assertThat(ZKUtil.checkExists(zkw, zkw.baseZNode), not (is(-1)));
187 LOG.debug(zkw.baseZNode + " created");
188 ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
189 assertThat(ZKUtil.checkExists(zkw, zkw.splitLogZNode), not (is(-1)));
190
191 LOG.debug(zkw.splitLogZNode + " created");
192 ZKUtil.createAndFailSilent(zkw, zkw.rsZNode);
193 assertThat(ZKUtil.checkExists(zkw, zkw.rsZNode), not (is(-1)));
194
195 SplitLogCounters.resetCounters();
196 executorService = new ExecutorService("TestSplitLogWorker");
197 executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
198 this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
199 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
200 }
201
202 @After
203 public void teardown() throws Exception {
204 if (executorService != null) {
205 executorService.shutdown();
206 }
207 TEST_UTIL.shutdownMiniZKCluster();
208 }
209
210 SplitLogWorker.TaskExecutor neverEndingTask =
211 new SplitLogWorker.TaskExecutor() {
212
213 @Override
214 public Status exec(String name, RecoveryMode mode, CancelableProgressable p) {
215 while (true) {
216 try {
217 Thread.sleep(1000);
218 } catch (InterruptedException e) {
219 return Status.PREEMPTED;
220 }
221 if (!p.progress()) {
222 return Status.PREEMPTED;
223 }
224 }
225 }
226
227 };
228
229 @Test(timeout=60000)
230 public void testAcquireTaskAtStartup() throws Exception {
231 LOG.info("testAcquireTaskAtStartup");
232 SplitLogCounters.resetCounters();
233 final String TATAS = "tatas";
234 final ServerName RS = ServerName.valueOf("rs,1,1");
235 RegionServerServices mockedRS = getRegionServer(RS);
236 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
237 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
238 Ids.OPEN_ACL_UNSAFE,
239 CreateMode.PERSISTENT);
240
241 SplitLogWorker slw =
242 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
243 slw.start();
244 try {
245 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
246 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS));
247 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
248 assertTrue(slt.isOwned(RS));
249 } finally {
250 stopSplitLogWorker(slw);
251 }
252 }
253
254 private void stopSplitLogWorker(final SplitLogWorker slw)
255 throws InterruptedException {
256 if (slw != null) {
257 slw.stop();
258 slw.worker.join(WAIT_TIME);
259 if (slw.worker.isAlive()) {
260 assertTrue(("Could not stop the worker thread slw=" + slw) == null);
261 }
262 }
263 }
264
265 @Test(timeout=60000)
266 public void testRaceForTask() throws Exception {
267 LOG.info("testRaceForTask");
268 SplitLogCounters.resetCounters();
269 final String TRFT = "trft";
270 final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
271 final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
272 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
273 new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
274 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
275 RegionServerServices mockedRS1 = getRegionServer(SVR1);
276 RegionServerServices mockedRS2 = getRegionServer(SVR2);
277 SplitLogWorker slw1 =
278 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
279 SplitLogWorker slw2 =
280 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
281 slw1.start();
282 slw2.start();
283 try {
284 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
285
286
287 assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
288 WAIT_TIME, false) ||
289 SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1);
290 byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
291 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
292 assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));
293 } finally {
294 stopSplitLogWorker(slw1);
295 stopSplitLogWorker(slw2);
296 }
297 }
298
299 @Test(timeout=60000)
300 public void testPreemptTask() throws Exception {
301 LOG.info("testPreemptTask");
302 SplitLogCounters.resetCounters();
303 final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
304 final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
305 RegionServerServices mockedRS = getRegionServer(SRV);
306 SplitLogWorker slw =
307 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
308 slw.start();
309 try {
310 Thread.yield();
311 Thread.sleep(1000);
312 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
313
314
315 zkw.getRecoverableZooKeeper().create(PATH,
316 new SplitLogTask.Unassigned(MANAGER, this.mode).toByteArray(),
317 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
318
319 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
320 assertEquals(1, slw.getTaskReadySeq());
321 byte [] bytes = ZKUtil.getData(zkw, PATH);
322 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
323 assertTrue(slt.isOwned(SRV));
324 slt = new SplitLogTask.Owned(MANAGER, this.mode);
325 ZKUtil.setData(zkw, PATH, slt.toByteArray());
326 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
327 } finally {
328 stopSplitLogWorker(slw);
329 }
330 }
331
332 @Test(timeout=60000)
333 public void testMultipleTasks() throws Exception {
334 LOG.info("testMultipleTasks");
335 SplitLogCounters.resetCounters();
336 final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
337 final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
338 RegionServerServices mockedRS = getRegionServer(SRV);
339 SplitLogWorker slw =
340 new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
341 slw.start();
342 try {
343 Thread.yield();
344 Thread.sleep(100);
345 waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);
346
347 SplitLogTask unassignedManager =
348 new SplitLogTask.Unassigned(MANAGER, this.mode);
349 zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
350 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
351
352 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
353
354
355
356 final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
357 zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
358 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
359
360
361 final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
362 SplitLogTask slt = new SplitLogTask.Owned(anotherWorker, this.mode);
363 ZKUtil.setData(zkw, PATH1, slt.toByteArray());
364 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
365
366 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
367 assertEquals(2, slw.getTaskReadySeq());
368 byte [] bytes = ZKUtil.getData(zkw, PATH2);
369 slt = SplitLogTask.parseFrom(bytes);
370 assertTrue(slt.isOwned(SRV));
371 } finally {
372 stopSplitLogWorker(slw);
373 }
374 }
375
376 @Test(timeout=60000)
377 public void testRescan() throws Exception {
378 LOG.info("testRescan");
379 SplitLogCounters.resetCounters();
380 final ServerName SRV = ServerName.valueOf("svr,1,1");
381 RegionServerServices mockedRS = getRegionServer(SRV);
382 slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
383 slw.start();
384 Thread.yield();
385 Thread.sleep(100);
386
387 String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
388 SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, this.mode);
389 zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
390 CreateMode.PERSISTENT);
391
392 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
393
394
395
396 ZKUtil.setData(zkw, task, slt.toByteArray());
397 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
398
399
400 String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
401 rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
402 CreateMode.PERSISTENT_SEQUENTIAL);
403
404 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
405
406
407
408 ZKUtil.setData(zkw, task, slt.toByteArray());
409 waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
410 waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);
411
412 List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
413 LOG.debug(nodes);
414 int num = 0;
415 for (String node : nodes) {
416 num++;
417 if (node.startsWith("RESCAN")) {
418 String name = ZKSplitLog.getEncodedNodeName(zkw, node);
419 String fn = ZKSplitLog.getFileName(name);
420 byte [] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(zkw.splitLogZNode, fn));
421 slt = SplitLogTask.parseFrom(data);
422 assertTrue(slt.toString(), slt.isDone(SRV));
423 }
424 }
425 assertEquals(2, num);
426 }
427
428 @Test(timeout=60000)
429 public void testAcquireMultiTasks() throws Exception {
430 LOG.info("testAcquireMultiTasks");
431 SplitLogCounters.resetCounters();
432 final String TATAS = "tatas";
433 final ServerName RS = ServerName.valueOf("rs,1,1");
434 final int maxTasks = 3;
435 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
436 testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
437 RegionServerServices mockedRS = getRegionServer(RS);
438 for (int i = 0; i < maxTasks; i++) {
439 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
440 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
441 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
442 }
443
444 SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
445 slw.start();
446 try {
447 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
448 for (int i = 0; i < maxTasks; i++) {
449 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
450 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
451 assertTrue(slt.isOwned(RS));
452 }
453 } finally {
454 stopSplitLogWorker(slw);
455 }
456 }
457
458
459
460
461
462
463 @Test(timeout=60000)
464 public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception {
465 LOG.info("testAcquireMultiTasks");
466 SplitLogCounters.resetCounters();
467 final String TATAS = "tatas";
468 final ServerName RS = ServerName.valueOf("rs,1,1");
469 final ServerName RS2 = ServerName.valueOf("rs,1,2");
470 final int maxTasks = 3;
471 Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
472 testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks);
473 RegionServerServices mockedRS = getRegionServer(RS);
474
475
476 String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName());
477 zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
478 rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName());
479 zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
480
481 for (int i = 0; i < maxTasks; i++) {
482 zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
483 new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(),
484 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
485 }
486
487 SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
488 slw.start();
489 try {
490 int acquiredTasks = 0;
491 waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME);
492 for (int i = 0; i < maxTasks; i++) {
493 byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i));
494 SplitLogTask slt = SplitLogTask.parseFrom(bytes);
495 if (slt.isOwned(RS)) {
496 acquiredTasks++;
497 }
498 }
499 assertEquals(2, acquiredTasks);
500 } finally {
501 stopSplitLogWorker(slw);
502 }
503 }
504
505
506
507
508
509
510 private RegionServerServices getRegionServer(ServerName name) {
511
512 RegionServerServices mockedServer = mock(RegionServerServices.class);
513 when(mockedServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
514 when(mockedServer.getServerName()).thenReturn(name);
515 when(mockedServer.getZooKeeper()).thenReturn(zkw);
516 when(mockedServer.isStopped()).thenReturn(false);
517 when(mockedServer.getExecutorService()).thenReturn(executorService);
518
519 return mockedServer;
520 }
521
522 }