1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.ProcedureInfo;
32 import org.apache.hadoop.hbase.util.Threads;
33 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
34 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
36 import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
37 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
38 import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
39
40 import static org.junit.Assert.assertEquals;
41 import static org.junit.Assert.assertFalse;
42 import static org.junit.Assert.assertTrue;
43
44 public class ProcedureTestingUtility {
45 private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class);
46
47 private ProcedureTestingUtility() {
48 }
49
50 public static ProcedureStore createStore(final Configuration conf, final FileSystem fs,
51 final Path baseDir) throws IOException {
52 return createWalStore(conf, fs, baseDir);
53 }
54
55 public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
56 final Path logDir) throws IOException {
57 return new WALProcedureStore(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
58 @Override
59 public void recoverFileLease(FileSystem fs, Path path) throws IOException {
60
61 }
62 });
63 }
64
65 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
66 throws Exception {
67 restart(procExecutor, null, true);
68 }
69
70 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
71 Runnable beforeStartAction, boolean failOnCorrupted) throws Exception {
72 ProcedureStore procStore = procExecutor.getStore();
73 int storeThreads = procExecutor.getNumThreads();
74 int execThreads = procExecutor.getNumThreads();
75
76 procExecutor.stop();
77 procExecutor.join();
78 procStore.stop(false);
79
80 if (beforeStartAction != null) {
81 beforeStartAction.run();
82 }
83
84 procStore.start(storeThreads);
85 procExecutor.start(execThreads, failOnCorrupted);
86 }
87
88 public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
89 boolean value) {
90 if (procExecutor.testing == null) {
91 procExecutor.testing = new ProcedureExecutor.Testing();
92 }
93 procExecutor.testing.killBeforeStoreUpdate = value;
94 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
95 }
96
97 public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
98 boolean value) {
99 if (procExecutor.testing == null) {
100 procExecutor.testing = new ProcedureExecutor.Testing();
101 }
102 procExecutor.testing.toggleKillBeforeStoreUpdate = value;
103 }
104
105 public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
106 if (procExecutor.testing == null) {
107 procExecutor.testing = new ProcedureExecutor.Testing();
108 }
109 procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
110 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
111 }
112
113 public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
114 boolean value) {
115 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
116 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
117 }
118
119 public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
120 throws IOException {
121 NoopProcedureStore procStore = new NoopProcedureStore();
122 ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<TEnv>(conf, env, procStore);
123 procStore.start(1);
124 procExecutor.start(1, false);
125 try {
126 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
127 } finally {
128 procStore.stop(false);
129 procExecutor.stop();
130 }
131 }
132
133 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
134 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
135 }
136
137 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
138 final long nonceGroup,
139 final long nonce) {
140 long procId = procExecutor.submitProcedure(proc, nonceGroup, nonce);
141 waitProcedure(procExecutor, procId);
142 return procId;
143 }
144
145 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
146 while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
147 Threads.sleepWithoutInterrupt(250);
148 }
149 }
150
151 public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
152 int stableRuns = 0;
153 while (stableRuns < 10) {
154 if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getRunnableSet().size() > 0) {
155 stableRuns = 0;
156 Threads.sleepWithoutInterrupt(100);
157 } else {
158 stableRuns++;
159 Threads.sleepWithoutInterrupt(25);
160 }
161 }
162 }
163
164 public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
165 long procId) {
166 assertFalse("expected a running proc", procExecutor.isFinished(procId));
167 assertEquals(null, procExecutor.getResult(procId));
168 }
169
170 public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
171 long procId) {
172 ProcedureInfo result = procExecutor.getResult(procId);
173 assertTrue("expected procedure result", result != null);
174 assertProcNotFailed(result);
175 }
176
177 public static void assertProcNotFailed(final ProcedureInfo result) {
178 ForeignExceptionMessage exception = result.getForeignExceptionMessage();
179 String msg = exception != null ? result.getExceptionFullMessage() : "no exception found";
180 assertFalse(msg, result.isFailed());
181 }
182
183 public static void assertIsAbortException(final ProcedureInfo result) {
184 assertEquals(true, result.isFailed());
185 LOG.info(result.getExceptionFullMessage());
186 Throwable cause = getExceptionCause(result);
187 assertTrue("expected abort exception, got " + cause,
188 cause instanceof ProcedureAbortedException);
189 }
190
191 public static void assertIsTimeoutException(final ProcedureInfo result) {
192 assertEquals(true, result.isFailed());
193 LOG.info(result.getExceptionFullMessage());
194 Throwable cause = getExceptionCause(result);
195 assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
196 }
197
198 public static void assertIsIllegalArgumentException(final ProcedureInfo result) {
199 assertEquals(true, result.isFailed());
200 LOG.info(result.getExceptionFullMessage());
201 Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
202 assertTrue("expected IllegalArgumentIOException, got " + cause,
203 cause instanceof IllegalArgumentIOException);
204 }
205
206 public static Throwable getExceptionCause(final ProcedureInfo procInfo) {
207 assert procInfo.getForeignExceptionMessage() != null;
208 return RemoteProcedureException.fromProto(procInfo.getForeignExceptionMessage()).getCause();
209 }
210
211 public static class TestProcedure extends Procedure<Void> {
212 public TestProcedure() {}
213
214 public TestProcedure(long procId) {
215 this(procId, 0);
216 }
217
218 public TestProcedure(long procId, long parentId) {
219 setProcId(procId);
220 if (parentId > 0) {
221 setParentProcId(parentId);
222 }
223 }
224
225 public void addStackId(final int index) {
226 addStackIndex(index);
227 }
228
229 @Override
230 protected Procedure[] execute(Void env) { return null; }
231
232 @Override
233 protected void rollback(Void env) { }
234
235 @Override
236 protected boolean abort(Void env) { return false; }
237
238 @Override
239 protected void serializeStateData(final OutputStream stream) throws IOException { }
240
241 @Override
242 protected void deserializeStateData(final InputStream stream) throws IOException { }
243 }
244 }