View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.mockito.Matchers.any;
22  import static org.mockito.Matchers.anyListOf;
23  import static org.mockito.Matchers.eq;
24  import static org.mockito.Mockito.atMost;
25  import static org.mockito.Mockito.never;
26  import static org.mockito.Mockito.spy;
27  import static org.mockito.Mockito.when;
28  
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.List;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.ThreadPoolExecutor;
35  import java.util.concurrent.atomic.AtomicInteger;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.hbase.Abortable;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.testclassification.MediumTests;
42  import org.apache.hadoop.hbase.errorhandling.ForeignException;
43  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
44  import org.apache.hadoop.hbase.errorhandling.TimeoutException;
45  import org.apache.hadoop.hbase.procedure.Subprocedure.SubprocedureImpl;
46  import org.apache.hadoop.hbase.util.Pair;
47  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48  import org.junit.AfterClass;
49  import org.junit.BeforeClass;
50  import org.junit.Test;
51  import org.junit.experimental.categories.Category;
52  import org.mockito.Mockito;
53  import org.mockito.internal.matchers.ArrayEquals;
54  import org.mockito.invocation.InvocationOnMock;
55  import org.mockito.stubbing.Answer;
56  import org.mockito.verification.VerificationMode;
57  
58  import com.google.common.collect.Lists;
59  
60  /**
61   * Cluster-wide testing of a distributed three-phase commit using a 'real' zookeeper cluster
62   */
63  @Category(MediumTests.class)
64  public class TestZKProcedure {
65  
66    private static final Log LOG = LogFactory.getLog(TestZKProcedure.class);
67    private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
68    private static final String COORDINATOR_NODE_NAME = "coordinator";
69    private static final long KEEP_ALIVE = 100; // seconds
70    private static final int POOL_SIZE = 1;
71    private static final long TIMEOUT = 10000; // when debugging make this larger for debugging
72    private static final long WAKE_FREQUENCY = 500;
73    private static final String opName = "op";
74    private static final byte[] data = new byte[] { 1, 2 }; // TODO what is this used for?
75    private static final VerificationMode once = Mockito.times(1);
76  
77    @BeforeClass
78    public static void setupTest() throws Exception {
79      UTIL.startMiniZKCluster();
80    }
81  
82    @AfterClass
83    public static void cleanupTest() throws Exception {
84      UTIL.shutdownMiniZKCluster();
85    }
86  
87    private static ZooKeeperWatcher newZooKeeperWatcher() throws IOException {
88      return new ZooKeeperWatcher(UTIL.getConfiguration(), "testing utility", new Abortable() {
89        @Override
90        public void abort(String why, Throwable e) {
91          throw new RuntimeException(
92              "Unexpected abort in distributed three phase commit test:" + why, e);
93        }
94  
95        @Override
96        public boolean isAborted() {
97          return false;
98        }
99      });
100   }
101 
102   @Test
103   public void testEmptyMemberSet() throws Exception {
104     runCommit();
105   }
106 
107   @Test
108   public void testSingleMember() throws Exception {
109     runCommit("one");
110   }
111 
112   @Test
113   public void testMultipleMembers() throws Exception {
114     runCommit("one", "two", "three", "four" );
115   }
116 
117   private void runCommit(String... members) throws Exception {
118     // make sure we just have an empty list
119     if (members == null) {
120       members = new String[0];
121     }
122     List<String> expected = Arrays.asList(members);
123 
124     // setup the constants
125     ZooKeeperWatcher coordZkw = newZooKeeperWatcher();
126     String opDescription = "coordination test - " + members.length + " cohort members";
127 
128     // start running the controller
129     ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
130         coordZkw, opDescription, COORDINATOR_NODE_NAME);
131     ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
132     ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
133       @Override
134       public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
135           List<String> expectedMembers) {
136         return Mockito.spy(super.createProcedure(fed, procName, procArgs, expectedMembers));
137       }
138     };
139 
140     // build and start members
141     // NOTE: There is a single subprocedure builder for all members here.
142     SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
143     List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> procMembers = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
144         members.length);
145     // start each member
146     for (String member : members) {
147       ZooKeeperWatcher watcher = newZooKeeperWatcher();
148       ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription);
149       ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
150       ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
151       procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember, comms));
152       comms.start(member, procMember);
153     }
154 
155     // setup mock member subprocedures
156     final List<Subprocedure> subprocs = new ArrayList<Subprocedure>();
157     for (int i = 0; i < procMembers.size(); i++) {
158       ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
159       Subprocedure commit = Mockito
160       .spy(new SubprocedureImpl(procMembers.get(i).getFirst(), opName, cohortMonitor,
161           WAKE_FREQUENCY, TIMEOUT));
162       subprocs.add(commit);
163     }
164 
165     // link subprocedure to buildNewOperation invocation.
166     final AtomicInteger i = new AtomicInteger(0); // NOTE: would be racy if not an AtomicInteger
167     Mockito.when(subprocFactory.buildSubprocedure(Mockito.eq(opName),
168         (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
169       new Answer<Subprocedure>() {
170         @Override
171         public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
172           int index = i.getAndIncrement();
173           LOG.debug("Task size:" + subprocs.size() + ", getting:" + index);
174           Subprocedure commit = subprocs.get(index);
175           return commit;
176         }
177       });
178 
179     // setup spying on the coordinator
180 //    Procedure proc = Mockito.spy(procBuilder.createProcedure(coordinator, opName, data, expected));
181 //    Mockito.when(procBuilder.build(coordinator, opName, data, expected)).thenReturn(proc);
182 
183     // start running the operation
184     Procedure task = coordinator.startProcedure(new ForeignExceptionDispatcher(), opName, data, expected);
185 //    assertEquals("Didn't mock coordinator task", proc, task);
186 
187     // verify all things ran as expected
188 //    waitAndVerifyProc(proc, once, once, never(), once, false);
189     waitAndVerifyProc(task, once, once, never(), once, false);
190     verifyCohortSuccessful(expected, subprocFactory, subprocs, once, once, never(), once, false);
191 
192     // close all the things
193     closeAll(coordinator, coordinatorComms, procMembers);
194   }
195 
196   /**
197    * Test a distributed commit with multiple cohort members, where one of the cohort members has a
198    * timeout exception during the prepare stage.
199    */
200   @Test
201   public void testMultiCohortWithMemberTimeoutDuringPrepare() throws Exception {
202     String opDescription = "error injection coordination";
203     String[] cohortMembers = new String[] { "one", "two", "three" };
204     List<String> expected = Lists.newArrayList(cohortMembers);
205     // error constants
206     final int memberErrorIndex = 2;
207     final CountDownLatch coordinatorReceivedErrorLatch = new CountDownLatch(1);
208 
209     // start running the coordinator and its controller
210     ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
211     ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
212         coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
213     ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE, KEEP_ALIVE);
214     ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController, pool));
215 
216     // start a member for each node
217     SubprocedureFactory subprocFactory = Mockito.mock(SubprocedureFactory.class);
218     List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> members = new ArrayList<Pair<ProcedureMember, ZKProcedureMemberRpcs>>(
219         expected.size());
220     for (String member : expected) {
221       ZooKeeperWatcher watcher = newZooKeeperWatcher();
222       ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription);
223       ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
224       ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
225       members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
226       controller.start(member, mem);
227     }
228 
229     // setup mock subprocedures
230     final List<Subprocedure> cohortTasks = new ArrayList<Subprocedure>();
231     final int[] elem = new int[1];
232     for (int i = 0; i < members.size(); i++) {
233       ForeignExceptionDispatcher cohortMonitor = new ForeignExceptionDispatcher();
234       final ProcedureMember comms = members.get(i).getFirst();
235       Subprocedure commit = Mockito
236       .spy(new SubprocedureImpl(comms, opName, cohortMonitor, WAKE_FREQUENCY, TIMEOUT));
237       // This nasty bit has one of the impls throw a TimeoutException
238       Mockito.doAnswer(new Answer<Void>() {
239         @Override
240         public Void answer(InvocationOnMock invocation) throws Throwable {
241           int index = elem[0];
242           if (index == memberErrorIndex) {
243             LOG.debug("Sending error to coordinator");
244             ForeignException remoteCause = new ForeignException("TIMER",
245                 new TimeoutException("subprocTimeout" , 1, 2, 0));
246             Subprocedure r = ((Subprocedure) invocation.getMock());
247             LOG.error("Remote commit failure, not propagating error:" + remoteCause);
248             comms.receiveAbortProcedure(r.getName(), remoteCause);
249             assertEquals(r.isComplete(), true);
250             // don't complete the error phase until the coordinator has gotten the error
251             // notification (which ensures that we never progress past prepare)
252             try {
253               Procedure.waitForLatch(coordinatorReceivedErrorLatch, new ForeignExceptionDispatcher(),
254                   WAKE_FREQUENCY, "coordinator received error");
255             } catch (InterruptedException e) {
256               LOG.debug("Wait for latch interrupted, done:" + (coordinatorReceivedErrorLatch.getCount() == 0));
257               // reset the interrupt status on the thread
258               Thread.currentThread().interrupt();
259             }
260           }
261           elem[0] = ++index;
262           return null;
263         }
264       }).when(commit).acquireBarrier();
265       cohortTasks.add(commit);
266     }
267 
268     // pass out a task per member
269     final AtomicInteger taskIndex = new AtomicInteger();
270     Mockito.when(
271       subprocFactory.buildSubprocedure(Mockito.eq(opName),
272         (byte[]) Mockito.argThat(new ArrayEquals(data)))).thenAnswer(
273       new Answer<Subprocedure>() {
274         @Override
275         public Subprocedure answer(InvocationOnMock invocation) throws Throwable {
276           int index = taskIndex.getAndIncrement();
277           Subprocedure commit = cohortTasks.get(index);
278           return commit;
279         }
280       });
281 
282     // setup spying on the coordinator
283     ForeignExceptionDispatcher coordinatorTaskErrorMonitor = Mockito
284         .spy(new ForeignExceptionDispatcher());
285     Procedure coordinatorTask = Mockito.spy(new Procedure(coordinator,
286         coordinatorTaskErrorMonitor, WAKE_FREQUENCY, TIMEOUT,
287         opName, data, expected));
288     when(coordinator.createProcedure(any(ForeignExceptionDispatcher.class), eq(opName), eq(data), anyListOf(String.class)))
289       .thenReturn(coordinatorTask);
290     // count down the error latch when we get the remote error
291     Mockito.doAnswer(new Answer<Void>() {
292       @Override
293       public Void answer(InvocationOnMock invocation) throws Throwable {
294         // pass on the error to the master
295         invocation.callRealMethod();
296         // then count down the got error latch
297         coordinatorReceivedErrorLatch.countDown();
298         return null;
299       }
300     }).when(coordinatorTask).receive(Mockito.any(ForeignException.class));
301 
302     // ----------------------------
303     // start running the operation
304     // ----------------------------
305 
306     Procedure task = coordinator.startProcedure(coordinatorTaskErrorMonitor, opName, data, expected);
307     assertEquals("Didn't mock coordinator task", coordinatorTask, task);
308 
309     // wait for the task to complete
310     try {
311       task.waitForCompleted();
312     } catch (ForeignException fe) {
313       // this may get caught or may not
314     }
315 
316     // -------------
317     // verification
318     // -------------
319 
320     // always expect prepared, never committed, and possible to have cleanup and finish (racy since
321     // error case)
322     waitAndVerifyProc(coordinatorTask, once, never(), once, atMost(1), true);
323     verifyCohortSuccessful(expected, subprocFactory, cohortTasks, once, never(), once,
324       once, true);
325 
326     // close all the open things
327     closeAll(coordinator, coordinatorController, members);
328   }
329 
330   /**
331    * Wait for the coordinator task to complete, and verify all the mocks
332    * @param task to wait on
333    * @throws Exception on unexpected failure
334    */
335   private void waitAndVerifyProc(Procedure proc, VerificationMode prepare,
336       VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
337       throws Exception {
338     boolean caughtError = false;
339     try {
340       proc.waitForCompleted();
341     } catch (ForeignException fe) {
342       caughtError = true;
343     }
344     // make sure that the task called all the expected phases
345     Mockito.verify(proc, prepare).sendGlobalBarrierStart();
346     Mockito.verify(proc, commit).sendGlobalBarrierReached();
347     Mockito.verify(proc, finish).sendGlobalBarrierComplete();
348     assertEquals("Operation error state was unexpected", opHasError, proc.getErrorMonitor()
349         .hasException());
350     assertEquals("Operation error state was unexpected", opHasError, caughtError);
351 
352   }
353 
354   /**
355    * Wait for the coordinator task to complete, and verify all the mocks
356    * @param task to wait on
357    * @throws Exception on unexpected failure
358    */
359   private void waitAndVerifySubproc(Subprocedure op, VerificationMode prepare,
360       VerificationMode commit, VerificationMode cleanup, VerificationMode finish, boolean opHasError)
361       throws Exception {
362     boolean caughtError = false;
363     try {
364       op.waitForLocallyCompleted();
365     } catch (ForeignException fe) {
366       caughtError = true;
367     }
368     // make sure that the task called all the expected phases
369     Mockito.verify(op, prepare).acquireBarrier();
370     Mockito.verify(op, commit).insideBarrier();
371     // We cannot guarantee that cleanup has run so we don't check it.
372 
373     assertEquals("Operation error state was unexpected", opHasError, op.getErrorCheckable()
374         .hasException());
375     assertEquals("Operation error state was unexpected", opHasError, caughtError);
376 
377   }
378 
379   private void verifyCohortSuccessful(List<String> cohortNames,
380       SubprocedureFactory subprocFactory, Iterable<Subprocedure> cohortTasks,
381       VerificationMode prepare, VerificationMode commit, VerificationMode cleanup,
382       VerificationMode finish, boolean opHasError) throws Exception {
383 
384     // make sure we build the correct number of cohort members
385     Mockito.verify(subprocFactory, Mockito.times(cohortNames.size())).buildSubprocedure(
386       Mockito.eq(opName), (byte[]) Mockito.argThat(new ArrayEquals(data)));
387     // verify that we ran each of the operations cleanly
388     int j = 0;
389     for (Subprocedure op : cohortTasks) {
390       LOG.debug("Checking mock:" + (j++));
391       waitAndVerifySubproc(op, prepare, commit, cleanup, finish, opHasError);
392     }
393   }
394 
395   private void closeAll(
396       ProcedureCoordinator coordinator,
397       ZKProcedureCoordinatorRpcs coordinatorController,
398       List<Pair<ProcedureMember, ZKProcedureMemberRpcs>> cohort)
399       throws IOException {
400     // make sure we close all the resources
401     for (Pair<ProcedureMember, ZKProcedureMemberRpcs> member : cohort) {
402       member.getFirst().close();
403       member.getSecond().close();
404     }
405     coordinator.close();
406     coordinatorController.close();
407   }
408 }