1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.ArrayList;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
32 import org.apache.hadoop.hbase.io.util.StreamUtils;
33 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
34 import org.apache.hadoop.hbase.testclassification.LargeTests;
35
36 import org.junit.After;
37 import org.junit.Before;
38 import org.junit.Test;
39 import org.junit.experimental.categories.Category;
40
41 import static org.junit.Assert.assertEquals;
42 import static org.junit.Assert.assertTrue;
43 import static org.junit.Assert.fail;
44
45 @Category(LargeTests.class)
46 public class TestProcedureReplayOrder {
47 private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class);
48
49 private static final int NUM_THREADS = 16;
50
51 private ProcedureExecutor<Void> procExecutor;
52 private TestProcedureEnv procEnv;
53 private ProcedureStore procStore;
54
55 private HBaseCommonTestingUtility htu;
56 private FileSystem fs;
57 private Path testDir;
58 private Path logDir;
59
60 @Before
61 public void setUp() throws IOException {
62 htu = new HBaseCommonTestingUtility();
63 htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 25);
64
65 testDir = htu.getDataTestDir();
66 fs = testDir.getFileSystem(htu.getConfiguration());
67 assertTrue(testDir.depth() > 1);
68
69 logDir = new Path(testDir, "proc-logs");
70 procEnv = new TestProcedureEnv();
71 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
72 procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
73 procStore.start(NUM_THREADS);
74 procExecutor.start(1, true);
75 }
76
77 @After
78 public void tearDown() throws IOException {
79 procExecutor.stop();
80 procStore.stop(false);
81 fs.delete(logDir, true);
82 }
83
84 @Test(timeout=90000)
85 public void testSingleStepReplayOrder() throws Exception {
86 final int NUM_PROC_XTHREAD = 32;
87 final int NUM_PROCS = NUM_THREADS * NUM_PROC_XTHREAD;
88
89
90 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestSingleStepProcedure.class);
91
92 while (procEnv.getExecId() < NUM_PROCS) {
93 Thread.sleep(100);
94 }
95
96
97 ProcedureTestingUtility.restart(procExecutor);
98
99
100
101 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
102 procEnv.assertSortedExecList(NUM_PROCS);
103 }
104
105 @Test(timeout=90000)
106 public void testMultiStepReplayOrder() throws Exception {
107 final int NUM_PROC_XTHREAD = 24;
108 final int NUM_PROCS = NUM_THREADS * (NUM_PROC_XTHREAD * 2);
109
110
111 submitProcedures(NUM_THREADS, NUM_PROC_XTHREAD, TestTwoStepProcedure.class);
112
113 while (procEnv.getExecId() < NUM_PROCS) {
114 Thread.sleep(100);
115 }
116
117
118 ProcedureTestingUtility.restart(procExecutor);
119
120
121
122 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
123 procEnv.assertSortedExecList(NUM_PROCS);
124 }
125
126 private void submitProcedures(final int nthreads, final int nprocPerThread,
127 final Class<?> procClazz) throws Exception {
128 Thread[] submitThreads = new Thread[nthreads];
129 for (int i = 0; i < submitThreads.length; ++i) {
130 submitThreads[i] = new Thread() {
131 @Override
132 public void run() {
133 for (int i = 0; i < nprocPerThread; ++i) {
134 try {
135 procExecutor.submitProcedure((Procedure)procClazz.newInstance());
136 } catch (InstantiationException|IllegalAccessException e) {
137 LOG.error("unable to instantiate the procedure", e);
138 fail("failure during the proc.newInstance(): " + e.getMessage());
139 }
140 }
141 }
142 };
143 }
144
145 for (int i = 0; i < submitThreads.length; ++i) {
146 submitThreads[i].start();
147 }
148
149 for (int i = 0; i < submitThreads.length; ++i) {
150 submitThreads[i].join();
151 }
152 }
153
154 private static class TestProcedureEnv {
155 private ArrayList<TestProcedure> execList = new ArrayList<TestProcedure>();
156 private AtomicLong execTimestamp = new AtomicLong(0);
157
158 public long getExecId() {
159 return execTimestamp.get();
160 }
161
162 public long nextExecId() {
163 return execTimestamp.incrementAndGet();
164 }
165
166 public void addToExecList(final TestProcedure proc) {
167 execList.add(proc);
168 }
169
170 public void assertSortedExecList(int numProcs) {
171 assertEquals(numProcs, execList.size());
172 LOG.debug("EXEC LIST: " + execList);
173 for (int i = 0; i < execList.size() - 1; ++i) {
174 TestProcedure a = execList.get(i);
175 TestProcedure b = execList.get(i + 1);
176 assertTrue("exec list not sorted: " + a + " < " + b, a.getExecId() > b.getExecId());
177 }
178 }
179 }
180
181 public static abstract class TestProcedure extends Procedure<TestProcedureEnv> {
182 protected long execId = 0;
183 protected int step = 0;
184
185 public long getExecId() {
186 return execId;
187 }
188
189 @Override
190 protected void rollback(TestProcedureEnv env) { }
191
192 @Override
193 protected boolean abort(TestProcedureEnv env) { return true; }
194
195 @Override
196 protected void serializeStateData(final OutputStream stream) throws IOException {
197 StreamUtils.writeLong(stream, execId);
198 }
199
200 @Override
201 protected void deserializeStateData(final InputStream stream) throws IOException {
202 execId = StreamUtils.readLong(stream);
203 step = 2;
204 }
205 }
206
207 public static class TestSingleStepProcedure extends TestProcedure {
208 public TestSingleStepProcedure() { }
209
210 @Override
211 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
212 LOG.trace("execute procedure step=" + step + ": " + this);
213 if (step == 0) {
214 step = 1;
215 execId = env.nextExecId();
216 return new Procedure[] { this };
217 } else if (step == 2) {
218 env.addToExecList(this);
219 return null;
220 }
221 throw new ProcedureYieldException();
222 }
223
224 @Override
225 public String toString() {
226 return "SingleStep(procId=" + getProcId() + " execId=" + execId + ")";
227 }
228 }
229
230 public static class TestTwoStepProcedure extends TestProcedure {
231 public TestTwoStepProcedure() { }
232
233 @Override
234 protected Procedure[] execute(TestProcedureEnv env) throws ProcedureYieldException {
235 LOG.trace("execute procedure step=" + step + ": " + this);
236 if (step == 0) {
237 step = 1;
238 execId = env.nextExecId();
239 return new Procedure[] { new TestSingleStepProcedure() };
240 } else if (step == 2) {
241 env.addToExecList(this);
242 return null;
243 }
244 throw new ProcedureYieldException();
245 }
246
247 @Override
248 public String toString() {
249 return "TwoStep(procId=" + getProcId() + " execId=" + execId + ")";
250 }
251 }
252 }