View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.TimeoutException;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
35  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
36  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
37  import org.apache.hadoop.hbase.testclassification.SmallTests;
38  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39  
40  import org.junit.After;
41  import org.junit.Before;
42  import org.junit.Assert;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  
46  import static org.junit.Assert.assertEquals;
47  import static org.junit.Assert.assertFalse;
48  import static org.junit.Assert.assertTrue;
49  import static org.junit.Assert.fail;
50  
51  @Category(SmallTests.class)
52  public class TestYieldProcedures {
53    private static final Log LOG = LogFactory.getLog(TestYieldProcedures.class);
54  
55    private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
56    private static final Procedure NULL_PROC = null;
57  
58    private ProcedureExecutor<TestProcEnv> procExecutor;
59    private TestRunQueue procRunnables;
60    private ProcedureStore procStore;
61  
62    private HBaseCommonTestingUtility htu;
63    private FileSystem fs;
64    private Path testDir;
65    private Path logDir;
66  
67    @Before
68    public void setUp() throws IOException {
69      htu = new HBaseCommonTestingUtility();
70      testDir = htu.getDataTestDir();
71      fs = testDir.getFileSystem(htu.getConfiguration());
72      assertTrue(testDir.depth() > 1);
73  
74      logDir = new Path(testDir, "proc-logs");
75      procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
76      procRunnables = new TestRunQueue();
77      procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
78          procStore, procRunnables);
79      procStore.start(PROCEDURE_EXECUTOR_SLOTS);
80      procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);
81    }
82  
83    @After
84    public void tearDown() throws IOException {
85      procExecutor.stop();
86      procStore.stop(false);
87      fs.delete(logDir, true);
88    }
89  
90    @Test
91    public void testYieldEachExecutionStep() throws Exception {
92      final int NUM_STATES = 3;
93  
94      TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
95      for (int i = 0; i < procs.length; ++i) {
96        procs[i] = new TestStateMachineProcedure(true, false);
97        procExecutor.submitProcedure(procs[i]);
98      }
99      ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
100 
101     for (int i = 0; i < procs.length; ++i) {
102       assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
103 
104       // verify execution
105       int index = 0;
106       for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
107         TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
108         assertEquals(false, info.isRollback());
109         assertEquals(execStep, info.getStep().ordinal());
110       }
111 
112       // verify rollback
113       for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
114         TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
115         assertEquals(true, info.isRollback());
116         assertEquals(execStep, info.getStep().ordinal());
117       }
118     }
119 
120     // check runnable queue stats
121     assertEquals(0, procRunnables.size());
122     assertEquals(0, procRunnables.addFrontCalls);
123     assertEquals(18, procRunnables.addBackCalls);
124     assertEquals(15, procRunnables.yieldCalls);
125     assertEquals(19, procRunnables.pollCalls);
126     assertEquals(3, procRunnables.completionCalls);
127   }
128 
129   @Test
130   public void testYieldOnInterrupt() throws Exception {
131     final int NUM_STATES = 3;
132     int count = 0;
133 
134     TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
135     ProcedureTestingUtility.submitAndWait(procExecutor, proc);
136 
137     // test execute (we execute steps twice, one has the IE the other completes)
138     assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
139     for (int i = 0; i < NUM_STATES; ++i) {
140       TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
141       assertEquals(false, info.isRollback());
142       assertEquals(i, info.getStep().ordinal());
143 
144       info = proc.getExecutionInfo().get(count++);
145       assertEquals(false, info.isRollback());
146       assertEquals(i, info.getStep().ordinal());
147     }
148 
149     // test rollback (we execute steps twice, one has the IE the other completes)
150     for (int i = NUM_STATES - 1; i >= 0; --i) {
151       TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
152       assertEquals(true, info.isRollback());
153       assertEquals(i, info.getStep().ordinal());
154 
155       info = proc.getExecutionInfo().get(count++);
156       assertEquals(true, info.isRollback());
157       assertEquals(i, info.getStep().ordinal());
158     }
159 
160     // check runnable queue stats
161     assertEquals(0, procRunnables.size());
162     assertEquals(0, procRunnables.addFrontCalls);
163     assertEquals(12, procRunnables.addBackCalls);
164     assertEquals(11, procRunnables.yieldCalls);
165     assertEquals(13, procRunnables.pollCalls);
166     assertEquals(1, procRunnables.completionCalls);
167   }
168 
169   @Test
170   public void testYieldException() {
171     TestYieldProcedure proc = new TestYieldProcedure();
172     ProcedureTestingUtility.submitAndWait(procExecutor, proc);
173     assertEquals(6, proc.step);
174 
175     // check runnable queue stats
176     assertEquals(0, procRunnables.size());
177     assertEquals(0, procRunnables.addFrontCalls);
178     assertEquals(6, procRunnables.addBackCalls);
179     assertEquals(5, procRunnables.yieldCalls);
180     assertEquals(7, procRunnables.pollCalls);
181     assertEquals(1, procRunnables.completionCalls);
182   }
183 
184   private static class TestProcEnv {
185     public final AtomicLong timestamp = new AtomicLong(0);
186 
187     public long nextTimestamp() {
188       return timestamp.incrementAndGet();
189     }
190   }
191 
192   public static class TestStateMachineProcedure
193       extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
194     enum State { STATE_1, STATE_2, STATE_3 }
195 
196     public class ExecutionInfo {
197       private final boolean rollback;
198       private final long timestamp;
199       private final State step;
200 
201       public ExecutionInfo(long timestamp, State step, boolean isRollback) {
202         this.timestamp = timestamp;
203         this.step = step;
204         this.rollback = isRollback;
205       }
206 
207       public State getStep() { return step; }
208       public long getTimestamp() { return timestamp; }
209       public boolean isRollback() { return rollback; }
210     }
211 
212     private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<ExecutionInfo>();
213     private final AtomicBoolean aborted = new AtomicBoolean(false);
214     private final boolean throwInterruptOnceOnEachStep;
215     private final boolean abortOnFinalStep;
216 
217     public TestStateMachineProcedure() {
218       this(false, false);
219     }
220 
221     public TestStateMachineProcedure(boolean abortOnFinalStep,
222         boolean throwInterruptOnceOnEachStep) {
223       this.abortOnFinalStep = abortOnFinalStep;
224       this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
225     }
226 
227     public ArrayList<ExecutionInfo> getExecutionInfo() {
228       return executionInfo;
229     }
230 
231     @Override
232     protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
233         throws InterruptedException {
234       final long ts = env.nextTimestamp();
235       LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
236       executionInfo.add(new ExecutionInfo(ts, state, false));
237       Thread.sleep(150);
238 
239       if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
240         LOG.debug("THROW INTERRUPT");
241         throw new InterruptedException("test interrupt");
242       }
243 
244       switch (state) {
245         case STATE_1:
246           setNextState(State.STATE_2);
247           break;
248         case STATE_2:
249           setNextState(State.STATE_3);
250           break;
251         case STATE_3:
252           if (abortOnFinalStep) {
253             setFailure("test", new IOException("Requested abort on final step"));
254           }
255           return Flow.NO_MORE_STATE;
256         default:
257           throw new UnsupportedOperationException();
258       }
259       return Flow.HAS_MORE_STATE;
260     }
261 
262     @Override
263     protected void rollbackState(TestProcEnv env, final State state)
264         throws InterruptedException {
265       final long ts = env.nextTimestamp();
266       LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
267       executionInfo.add(new ExecutionInfo(ts, state, true));
268       Thread.sleep(150);
269 
270       if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
271         LOG.debug("THROW INTERRUPT");
272         throw new InterruptedException("test interrupt");
273       }
274 
275       switch (state) {
276         case STATE_1:
277           break;
278         case STATE_2:
279           break;
280         case STATE_3:
281           break;
282         default:
283           throw new UnsupportedOperationException();
284       }
285     }
286 
287     @Override
288     protected State getState(final int stateId) {
289       return State.values()[stateId];
290     }
291 
292     @Override
293     protected int getStateId(final State state) {
294       return state.ordinal();
295     }
296 
297     @Override
298     protected State getInitialState() {
299       return State.STATE_1;
300     }
301 
302     @Override
303     protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
304       return true;
305     }
306 
307     @Override
308     protected boolean abort(TestProcEnv env) {
309       aborted.set(true);
310       return true;
311     }
312   }
313 
314   public static class TestYieldProcedure extends Procedure<TestProcEnv> {
315     private int step = 0;
316 
317     public TestYieldProcedure() {
318     }
319 
320     @Override
321     protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
322       LOG.info("execute step " + step);
323       if (step++ < 5) {
324         throw new ProcedureYieldException();
325       }
326       return null;
327     }
328 
329     @Override
330     protected void rollback(TestProcEnv env) {
331     }
332 
333     @Override
334     protected boolean abort(TestProcEnv env) {
335       return false;
336     }
337 
338     @Override
339     protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
340       return true;
341     }
342 
343     @Override
344     protected void serializeStateData(final OutputStream stream) throws IOException {
345     }
346 
347     @Override
348     protected void deserializeStateData(final InputStream stream) throws IOException {
349     }
350   }
351 
352   private static class TestRunQueue extends ProcedureSimpleRunQueue {
353     private int completionCalls;
354     private int addFrontCalls;
355     private int addBackCalls;
356     private int yieldCalls;
357     private int pollCalls;
358 
359     public TestRunQueue() {}
360 
361     public void addFront(final Procedure proc) {
362         addFrontCalls++;
363         super.addFront(proc);
364       }
365 
366       @Override
367       public void addBack(final Procedure proc) {
368         addBackCalls++;
369         super.addBack(proc);
370       }
371 
372       @Override
373       public void yield(final Procedure proc) {
374         yieldCalls++;
375         super.yield(proc);
376       }
377 
378       @Override
379       public Procedure poll() {
380         pollCalls++;
381         return super.poll();
382       }
383 
384       @Override
385       public void completionCleanup(Procedure proc) {
386         completionCalls++;
387       }
388   }
389 }