View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
30  import org.apache.hadoop.hbase.ProcedureInfo;
31  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
32  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
33  import org.apache.hadoop.hbase.testclassification.SmallTests;
34  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
35  
36  import org.junit.After;
37  import org.junit.Before;
38  import org.junit.Test;
39  import org.junit.experimental.categories.Category;
40  
41  import static org.junit.Assert.assertEquals;
42  import static org.junit.Assert.assertTrue;
43  
44  @Category(SmallTests.class)
45  public class TestProcedureExecution {
46    private static final Log LOG = LogFactory.getLog(TestProcedureExecution.class);
47  
48    private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
49    private static final Procedure NULL_PROC = null;
50  
51    private ProcedureExecutor<Void> procExecutor;
52    private ProcedureStore procStore;
53  
54    private HBaseCommonTestingUtility htu;
55    private FileSystem fs;
56    private Path testDir;
57    private Path logDir;
58  
59    @Before
60    public void setUp() throws IOException {
61      htu = new HBaseCommonTestingUtility();
62      testDir = htu.getDataTestDir();
63      fs = testDir.getFileSystem(htu.getConfiguration());
64      assertTrue(testDir.depth() > 1);
65  
66      logDir = new Path(testDir, "proc-logs");
67      procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
68      procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
69      procStore.start(PROCEDURE_EXECUTOR_SLOTS);
70      procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
71    }
72  
73    @After
74    public void tearDown() throws IOException {
75      procExecutor.stop();
76      procStore.stop(false);
77      fs.delete(logDir, true);
78    }
79  
80    private static class TestProcedureException extends IOException {
81      public TestProcedureException(String msg) { super(msg); }
82    }
83  
84    public static class TestSequentialProcedure extends SequentialProcedure<Void> {
85      private final Procedure[] subProcs;
86      private final List<String> state;
87      private final Exception failure;
88      private final String name;
89  
90      public TestSequentialProcedure() {
91        throw new UnsupportedOperationException("recovery should not be triggered here");
92      }
93  
94      public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) {
95        this.state = state;
96        this.subProcs = subProcs;
97        this.name = name;
98        this.failure = null;
99      }
100 
101     public TestSequentialProcedure(String name, List<String> state, Exception failure) {
102       this.state = state;
103       this.subProcs = null;
104       this.name = name;
105       this.failure = failure;
106     }
107 
108     @Override
109     protected Procedure[] execute(Void env) {
110       state.add(name + "-execute");
111       if (failure != null) {
112         setFailure(new RemoteProcedureException(name + "-failure", failure));
113         return null;
114       }
115       return subProcs;
116     }
117 
118     @Override
119     protected void rollback(Void env) {
120       state.add(name + "-rollback");
121     }
122 
123     @Override
124     protected boolean abort(Void env) {
125       state.add(name + "-abort");
126       return true;
127     }
128   }
129 
130   @Test(timeout=30000)
131   public void testBadSubprocList() {
132     List<String> state = new ArrayList<String>();
133     Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
134     Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC);
135     Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
136     long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
137 
138     // subProc1 has a "null" subprocedure which is catched as InvalidArgument
139     // failed state with 2 execute and 2 rollback
140     LOG.info(state);
141     ProcedureInfo result = procExecutor.getResult(rootId);
142     assertTrue(state.toString(), result.isFailed());
143     ProcedureTestingUtility.assertIsIllegalArgumentException(result);
144 
145     assertEquals(state.toString(), 4, state.size());
146     assertEquals("rootProc-execute", state.get(0));
147     assertEquals("subProc1-execute", state.get(1));
148     assertEquals("subProc1-rollback", state.get(2));
149     assertEquals("rootProc-rollback", state.get(3));
150   }
151 
152   @Test(timeout=30000)
153   public void testSingleSequentialProc() {
154     List<String> state = new ArrayList<String>();
155     Procedure subProc2 = new TestSequentialProcedure("subProc2", state);
156     Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
157     Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
158     long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
159 
160     // successful state, with 3 execute
161     LOG.info(state);
162     ProcedureInfo result = procExecutor.getResult(rootId);
163     ProcedureTestingUtility.assertProcNotFailed(result);
164     assertEquals(state.toString(), 3, state.size());
165   }
166 
167   @Test(timeout=30000)
168   public void testSingleSequentialProcRollback() {
169     List<String> state = new ArrayList<String>();
170     Procedure subProc2 = new TestSequentialProcedure("subProc2", state,
171                                                      new TestProcedureException("fail test"));
172     Procedure subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
173     Procedure rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
174     long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
175 
176     // the 3rd proc fail, rollback after 2 successful execution
177     LOG.info(state);
178     ProcedureInfo result = procExecutor.getResult(rootId);
179     assertTrue(state.toString(), result.isFailed());
180     LOG.info(result.getExceptionFullMessage());
181     Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
182     assertTrue("expected TestProcedureException, got " + cause,
183       cause instanceof TestProcedureException);
184 
185     assertEquals(state.toString(), 6, state.size());
186     assertEquals("rootProc-execute", state.get(0));
187     assertEquals("subProc1-execute", state.get(1));
188     assertEquals("subProc2-execute", state.get(2));
189     assertEquals("subProc2-rollback", state.get(3));
190     assertEquals("subProc1-rollback", state.get(4));
191     assertEquals("rootProc-rollback", state.get(5));
192   }
193 
194   public static class TestFaultyRollback extends SequentialProcedure<Void> {
195     private int retries = 0;
196 
197     public TestFaultyRollback() { }
198 
199     @Override
200     protected Procedure[] execute(Void env) {
201       setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback"));
202       return null;
203     }
204 
205     @Override
206     protected void rollback(Void env) throws IOException {
207       if (++retries < 3) {
208         LOG.info("inject rollback failure " + retries);
209         throw new IOException("injected failure number " + retries);
210       }
211       LOG.info("execute non faulty rollback step retries=" + retries);
212     }
213 
214     @Override
215     protected boolean abort(Void env) { return false; }
216   }
217 
218   @Test(timeout=30000)
219   public void testRollbackRetriableFailure() {
220     long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback());
221 
222     ProcedureInfo result = procExecutor.getResult(procId);
223     assertTrue("expected a failure", result.isFailed());
224     LOG.info(result.getExceptionFullMessage());
225     Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
226     assertTrue("expected TestProcedureException, got " + cause,
227       cause instanceof TestProcedureException);
228   }
229 
230   public static class TestWaitingProcedure extends SequentialProcedure<Void> {
231     private final List<String> state;
232     private final boolean hasChild;
233     private final String name;
234 
235     public TestWaitingProcedure() {
236       throw new UnsupportedOperationException("recovery should not be triggered here");
237     }
238 
239     public TestWaitingProcedure(String name, List<String> state, boolean hasChild) {
240       this.hasChild = hasChild;
241       this.state = state;
242       this.name = name;
243     }
244 
245     @Override
246     protected Procedure[] execute(Void env) {
247       state.add(name + "-execute");
248       setState(ProcedureState.WAITING_TIMEOUT);
249       return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null;
250     }
251 
252     @Override
253     protected void rollback(Void env) {
254       state.add(name + "-rollback");
255     }
256 
257     @Override
258     protected boolean abort(Void env) {
259       state.add(name + "-abort");
260       return true;
261     }
262 
263     public static class TestWaitChild extends SequentialProcedure<Void> {
264       private final List<String> state;
265       private final String name;
266 
267       public TestWaitChild() {
268         throw new UnsupportedOperationException("recovery should not be triggered here");
269       }
270 
271       public TestWaitChild(String name, List<String> state) {
272         this.name = name;
273         this.state = state;
274       }
275 
276       @Override
277       protected Procedure[] execute(Void env) {
278         state.add(name + "-child-execute");
279         return null;
280       }
281 
282       @Override
283       protected void rollback(Void env) {
284         state.add(name + "-child-rollback");
285       }
286 
287       @Override
288       protected boolean abort(Void env) {
289         state.add(name + "-child-abort");
290         return true;
291       }
292     }
293   }
294 
295   @Test(timeout=30000)
296   public void testAbortTimeout() {
297     final int PROC_TIMEOUT_MSEC = 2500;
298     List<String> state = new ArrayList<String>();
299     Procedure proc = new TestWaitingProcedure("wproc", state, false);
300     proc.setTimeout(PROC_TIMEOUT_MSEC);
301     long startTime = EnvironmentEdgeManager.currentTime();
302     long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
303     long execTime = EnvironmentEdgeManager.currentTime() - startTime;
304     LOG.info(state);
305     assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC);
306     ProcedureInfo result = procExecutor.getResult(rootId);
307     assertTrue(state.toString(), result.isFailed());
308     ProcedureTestingUtility.assertIsTimeoutException(result);
309     assertEquals(state.toString(), 2, state.size());
310     assertEquals("wproc-execute", state.get(0));
311     assertEquals("wproc-rollback", state.get(1));
312   }
313 
314   @Test(timeout=30000)
315   public void testAbortTimeoutWithChildren() {
316     List<String> state = new ArrayList<String>();
317     Procedure proc = new TestWaitingProcedure("wproc", state, true);
318     proc.setTimeout(2500);
319     long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
320     LOG.info(state);
321     ProcedureInfo result = procExecutor.getResult(rootId);
322     assertTrue(state.toString(), result.isFailed());
323     ProcedureTestingUtility.assertIsTimeoutException(result);
324     assertEquals(state.toString(), 4, state.size());
325     assertEquals("wproc-execute", state.get(0));
326     assertEquals("wproc-child-execute", state.get(1));
327     assertEquals("wproc-child-rollback", state.get(2));
328     assertEquals("wproc-rollback", state.get(3));
329   }
330 }