1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.CountDownLatch;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.FileStatus;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
33 import org.apache.hadoop.hbase.ProcedureInfo;
34 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
35 import org.apache.hadoop.hbase.testclassification.SmallTests;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
38 import org.apache.hadoop.hbase.util.Threads;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.junit.experimental.categories.Category;
43
44 import static org.junit.Assert.assertEquals;
45 import static org.junit.Assert.assertFalse;
46 import static org.junit.Assert.assertTrue;
47
48 @Category(SmallTests.class)
49 public class TestProcedureRecovery {
50 private static final Log LOG = LogFactory.getLog(TestProcedureRecovery.class);
51
52 private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
53
54 private static TestProcEnv procEnv;
55 private static ProcedureExecutor<TestProcEnv> procExecutor;
56 private static ProcedureStore procStore;
57 private static int procSleepInterval;
58
59 private HBaseCommonTestingUtility htu;
60 private FileSystem fs;
61 private Path testDir;
62 private Path logDir;
63
64 @Before
65 public void setUp() throws IOException {
66 htu = new HBaseCommonTestingUtility();
67 testDir = htu.getDataTestDir();
68 fs = testDir.getFileSystem(htu.getConfiguration());
69 assertTrue(testDir.depth() > 1);
70
71 logDir = new Path(testDir, "proc-logs");
72 procEnv = new TestProcEnv();
73 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
74 procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
75 procExecutor.testing = new ProcedureExecutor.Testing();
76 procStore.start(PROCEDURE_EXECUTOR_SLOTS);
77 procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
78 procSleepInterval = 0;
79 }
80
81 @After
82 public void tearDown() throws IOException {
83 procExecutor.stop();
84 procStore.stop(false);
85 fs.delete(logDir, true);
86 }
87
88 private void restart() throws Exception {
89 dumpLogDirState();
90 ProcedureTestingUtility.restart(procExecutor);
91 dumpLogDirState();
92 }
93
94 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
95 private int step = 0;
96
97 public TestSingleStepProcedure() { }
98
99 @Override
100 protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
101 env.waitOnLatch();
102 LOG.debug("execute procedure " + this + " step=" + step);
103 step++;
104 setResult(Bytes.toBytes(step));
105 return null;
106 }
107
108 @Override
109 protected void rollback(TestProcEnv env) { }
110
111 @Override
112 protected boolean abort(TestProcEnv env) { return true; }
113 }
114
115 public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
116 private AtomicBoolean abort = new AtomicBoolean(false);
117 private int step = 0;
118
119 @Override
120 protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
121 env.waitOnLatch();
122 LOG.debug("execute procedure " + this + " step=" + step);
123 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
124 step++;
125 Threads.sleepWithoutInterrupt(procSleepInterval);
126 if (isAborted()) {
127 setFailure(new RemoteProcedureException(getClass().getName(),
128 new ProcedureAbortedException(
129 "got an abort at " + getClass().getName() + " step=" + step)));
130 return null;
131 }
132 return null;
133 }
134
135 @Override
136 protected void rollback(TestProcEnv env) {
137 LOG.debug("rollback procedure " + this + " step=" + step);
138 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
139 step++;
140 }
141
142 @Override
143 protected boolean abort(TestProcEnv env) {
144 abort.set(true);
145 return true;
146 }
147
148 private boolean isAborted() {
149 boolean aborted = abort.get();
150 BaseTestStepProcedure proc = this;
151 while (proc.hasParent() && !aborted) {
152 proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId());
153 aborted = proc.isAborted();
154 }
155 return aborted;
156 }
157 }
158
159 public static class TestMultiStepProcedure extends BaseTestStepProcedure {
160 public TestMultiStepProcedure() { }
161
162 @Override
163 public Procedure[] execute(TestProcEnv env) throws InterruptedException {
164 super.execute(env);
165 return isFailed() ? null : new Procedure[] { new Step1Procedure() };
166 }
167
168 public static class Step1Procedure extends BaseTestStepProcedure {
169 public Step1Procedure() { }
170
171 @Override
172 protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
173 super.execute(env);
174 return isFailed() ? null : new Procedure[] { new Step2Procedure() };
175 }
176 }
177
178 public static class Step2Procedure extends BaseTestStepProcedure {
179 public Step2Procedure() { }
180 }
181 }
182
183 @Test
184 public void testNoopLoad() throws Exception {
185 restart();
186 }
187
188 @Test(timeout=30000)
189 public void testSingleStepProcRecovery() throws Exception {
190 Procedure proc = new TestSingleStepProcedure();
191 procExecutor.testing.killBeforeStoreUpdate = true;
192 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
193 assertFalse(procExecutor.isRunning());
194 procExecutor.testing.killBeforeStoreUpdate = false;
195
196
197 long restartTs = EnvironmentEdgeManager.currentTime();
198 restart();
199 waitProcedure(procId);
200 ProcedureInfo result = procExecutor.getResult(procId);
201 assertTrue(result.getLastUpdate() > restartTs);
202 ProcedureTestingUtility.assertProcNotFailed(result);
203 assertEquals(1, Bytes.toInt(result.getResult()));
204 long resultTs = result.getLastUpdate();
205
206
207 restart();
208 result = procExecutor.getResult(procId);
209 ProcedureTestingUtility.assertProcNotFailed(result);
210 assertEquals(resultTs, result.getLastUpdate());
211 assertEquals(1, Bytes.toInt(result.getResult()));
212 }
213
214 @Test(timeout=30000)
215 public void testMultiStepProcRecovery() throws Exception {
216
217 Procedure proc = new TestMultiStepProcedure();
218 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
219 assertFalse(procExecutor.isRunning());
220
221
222 restart();
223 waitProcedure(procId);
224 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
225 assertFalse(procExecutor.isRunning());
226
227
228 restart();
229 waitProcedure(procId);
230 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
231 assertFalse(procExecutor.isRunning());
232
233
234 restart();
235 waitProcedure(procId);
236 assertTrue(procExecutor.isRunning());
237
238
239 ProcedureInfo result = procExecutor.getResult(procId);
240 ProcedureTestingUtility.assertProcNotFailed(result);
241 }
242
243 @Test(timeout=30000)
244 public void testMultiStepRollbackRecovery() throws Exception {
245
246 Procedure proc = new TestMultiStepProcedure();
247 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
248 assertFalse(procExecutor.isRunning());
249
250
251 restart();
252 waitProcedure(procId);
253 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
254 assertFalse(procExecutor.isRunning());
255
256
257 restart();
258 waitProcedure(procId);
259 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
260 assertFalse(procExecutor.isRunning());
261
262
263 procSleepInterval = 2500;
264 restart();
265 assertTrue(procExecutor.abort(procId));
266 waitProcedure(procId);
267 assertFalse(procExecutor.isRunning());
268
269
270 restart();
271 waitProcedure(procId);
272 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
273 assertFalse(procExecutor.isRunning());
274
275
276 restart();
277 waitProcedure(procId);
278 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
279 assertFalse(procExecutor.isRunning());
280
281
282 restart();
283 waitProcedure(procId);
284
285
286 ProcedureInfo result = procExecutor.getResult(procId);
287 ProcedureTestingUtility.assertIsAbortException(result);
288 }
289
290 @Test(timeout=30000)
291 public void testCompletedProcWithSameNonce() throws Exception {
292 final long nonceGroup = 123;
293 final long nonce = 2222;
294 Procedure proc = new TestSingleStepProcedure();
295
296 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
297
298
299 restart();
300 waitProcedure(procId);
301
302 Procedure proc2 = new TestSingleStepProcedure();
303
304 long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce);
305 assertTrue(procId == procId2);
306
307 ProcedureInfo result = procExecutor.getResult(procId2);
308 ProcedureTestingUtility.assertProcNotFailed(result);
309 }
310
311 @Test(timeout=30000)
312 public void testRunningProcWithSameNonce() throws Exception {
313 final long nonceGroup = 456;
314 final long nonce = 33333;
315 Procedure proc = new TestSingleStepProcedure();
316 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce);
317
318
319 CountDownLatch latch = new CountDownLatch(1);
320 procEnv.setWaitLatch(latch);
321 restart();
322
323 Procedure proc2 = new TestSingleStepProcedure();
324 long procId2 = procExecutor.submitProcedure(proc2, nonceGroup, nonce);
325 latch.countDown();
326 procEnv.setWaitLatch(null);
327
328
329 assertTrue(procId == procId2);
330 }
331
332
333 public static class TestStateMachineProcedure
334 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
335 enum State { STATE_1, STATE_2, STATE_3, DONE }
336
337 public TestStateMachineProcedure() {}
338
339 private AtomicBoolean aborted = new AtomicBoolean(false);
340 private int iResult = 0;
341
342 @Override
343 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
344 switch (state) {
345 case STATE_1:
346 LOG.info("execute step 1 " + this);
347 setNextState(State.STATE_2);
348 iResult += 3;
349 break;
350 case STATE_2:
351 LOG.info("execute step 2 " + this);
352 setNextState(State.STATE_3);
353 iResult += 5;
354 break;
355 case STATE_3:
356 LOG.info("execute step 3 " + this);
357 Threads.sleepWithoutInterrupt(procSleepInterval);
358 if (aborted.get()) {
359 LOG.info("aborted step 3 " + this);
360 setAbortFailure("test", "aborted");
361 break;
362 }
363 setNextState(State.DONE);
364 iResult += 7;
365 setResult(Bytes.toBytes(iResult));
366 return Flow.NO_MORE_STATE;
367 default:
368 throw new UnsupportedOperationException();
369 }
370 return Flow.HAS_MORE_STATE;
371 }
372
373 @Override
374 protected void rollbackState(TestProcEnv env, final State state) {
375 switch (state) {
376 case STATE_1:
377 LOG.info("rollback step 1 " + this);
378 break;
379 case STATE_2:
380 LOG.info("rollback step 2 " + this);
381 break;
382 case STATE_3:
383 LOG.info("rollback step 3 " + this);
384 break;
385 default:
386 throw new UnsupportedOperationException();
387 }
388 }
389
390 @Override
391 protected State getState(final int stateId) {
392 return State.values()[stateId];
393 }
394
395 @Override
396 protected int getStateId(final State state) {
397 return state.ordinal();
398 }
399
400 @Override
401 protected State getInitialState() {
402 return State.STATE_1;
403 }
404
405 @Override
406 protected boolean abort(TestProcEnv env) {
407 aborted.set(true);
408 return true;
409 }
410
411 @Override
412 protected void serializeStateData(final OutputStream stream) throws IOException {
413 super.serializeStateData(stream);
414 stream.write(Bytes.toBytes(iResult));
415 }
416
417 @Override
418 protected void deserializeStateData(final InputStream stream) throws IOException {
419 super.deserializeStateData(stream);
420 byte[] data = new byte[4];
421 stream.read(data);
422 iResult = Bytes.toInt(data);
423 }
424 }
425
426 @Test(timeout=30000)
427 public void testStateMachineRecovery() throws Exception {
428 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
429 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
430
431
432 Procedure proc = new TestStateMachineProcedure();
433 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
434 assertFalse(procExecutor.isRunning());
435
436
437 restart();
438 waitProcedure(procId);
439 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
440 assertFalse(procExecutor.isRunning());
441
442
443 restart();
444 waitProcedure(procId);
445 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
446 assertFalse(procExecutor.isRunning());
447
448
449 restart();
450 waitProcedure(procId);
451 assertTrue(procExecutor.isRunning());
452
453
454 ProcedureInfo result = procExecutor.getResult(procId);
455 ProcedureTestingUtility.assertProcNotFailed(result);
456 assertEquals(15, Bytes.toInt(result.getResult()));
457 }
458
459 @Test(timeout=30000)
460 public void testStateMachineRollbackRecovery() throws Exception {
461 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
462 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
463
464
465 Procedure proc = new TestStateMachineProcedure();
466 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
467 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
468 assertFalse(procExecutor.isRunning());
469
470
471 restart();
472 waitProcedure(procId);
473 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
474 assertFalse(procExecutor.isRunning());
475
476
477 restart();
478 waitProcedure(procId);
479 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
480 assertFalse(procExecutor.isRunning());
481
482
483 procSleepInterval = 2500;
484 restart();
485 assertTrue(procExecutor.abort(procId));
486 waitProcedure(procId);
487 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
488 assertFalse(procExecutor.isRunning());
489
490
491 restart();
492 waitProcedure(procId);
493 assertFalse(procExecutor.isRunning());
494 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
495
496
497 restart();
498 waitProcedure(procId);
499 assertFalse(procExecutor.isRunning());
500 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
501
502
503 restart();
504 waitProcedure(procId);
505 assertTrue(procExecutor.isRunning());
506
507
508 ProcedureInfo result = procExecutor.getResult(procId);
509 ProcedureTestingUtility.assertIsAbortException(result);
510 }
511
512 private void waitProcedure(final long procId) {
513 ProcedureTestingUtility.waitProcedure(procExecutor, procId);
514 dumpLogDirState();
515 }
516
517 private void dumpLogDirState() {
518 try {
519 FileStatus[] files = fs.listStatus(logDir);
520 if (files != null && files.length > 0) {
521 for (FileStatus file: files) {
522 assertTrue(file.toString(), file.isFile());
523 LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
524 }
525 } else {
526 LOG.debug("no files under: " + logDir);
527 }
528 } catch (IOException e) {
529 LOG.warn("Unable to dump " + logDir, e);
530 }
531 }
532
533 private static class TestProcEnv {
534 private CountDownLatch latch = null;
535
536
537
538
539 public void setWaitLatch(CountDownLatch latch) {
540 this.latch = latch;
541 }
542
543 public void waitOnLatch() throws InterruptedException {
544 if (latch != null) {
545 latch.await();
546 }
547 }
548 }
549 }