1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.procedure;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.Arrays;
25 import java.util.ArrayList;
26 import java.util.HashSet;
27 import java.util.Map;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.master.TableLockManager;
38 import org.apache.hadoop.hbase.procedure2.Procedure;
39 import org.apache.hadoop.hbase.testclassification.MasterTests;
40 import org.apache.hadoop.hbase.testclassification.SmallTests;
41 import org.junit.After;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49 import static org.junit.Assert.fail;
50
51 @Category({MasterTests.class, SmallTests.class})
52 public class TestMasterProcedureScheduler {
53 private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
54
55 private MasterProcedureScheduler queue;
56 private Configuration conf;
57
58 @Before
59 public void setUp() throws IOException {
60 conf = HBaseConfiguration.create();
61 queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
62 }
63
64 @After
65 public void tearDown() throws IOException {
66 assertEquals(0, queue.size());
67 }
68
69 @Test
70 public void testConcurrentCreateDelete() throws Exception {
71 final MasterProcedureScheduler procQueue = queue;
72 final TableName table = TableName.valueOf("testtb");
73 final AtomicBoolean running = new AtomicBoolean(true);
74 final AtomicBoolean failure = new AtomicBoolean(false);
75 Thread createThread = new Thread() {
76 @Override
77 public void run() {
78 try {
79 TestTableProcedure proc = new TestTableProcedure(1, table,
80 TableProcedureInterface.TableOperationType.CREATE);
81 while (running.get() && !failure.get()) {
82 if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
83 procQueue.releaseTableExclusiveLock(proc, table);
84 }
85 }
86 } catch (Throwable e) {
87 LOG.error("create failed", e);
88 failure.set(true);
89 }
90 }
91 };
92
93 Thread deleteThread = new Thread() {
94 @Override
95 public void run() {
96 try {
97 TestTableProcedure proc = new TestTableProcedure(2, table,
98 TableProcedureInterface.TableOperationType.DELETE);
99 while (running.get() && !failure.get()) {
100 if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
101 procQueue.releaseTableExclusiveLock(proc, table);
102 }
103 procQueue.markTableAsDeleted(table);
104 }
105 } catch (Throwable e) {
106 LOG.error("delete failed", e);
107 failure.set(true);
108 }
109 }
110 };
111
112 createThread.start();
113 deleteThread.start();
114 for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
115 Thread.sleep(100);
116 }
117 running.set(false);
118 createThread.join();
119 deleteThread.join();
120 assertEquals(false, failure.get());
121 }
122
123
124
125
126 @Test
127 public void testSimpleTableOpsQueues() throws Exception {
128 final int NUM_TABLES = 10;
129 final int NUM_ITEMS = 10;
130
131 int count = 0;
132 for (int i = 1; i <= NUM_TABLES; ++i) {
133 TableName tableName = TableName.valueOf(String.format("test-%04d", i));
134
135 for (int j = 1; j <= NUM_ITEMS; ++j) {
136 queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
137 TableProcedureInterface.TableOperationType.EDIT));
138 assertEquals(++count, queue.size());
139 }
140 }
141 assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
142
143 for (int j = 1; j <= NUM_ITEMS; ++j) {
144 for (int i = 1; i <= NUM_TABLES; ++i) {
145 Procedure proc = queue.poll();
146 assertTrue(proc != null);
147 TableName tableName = ((TestTableProcedure)proc).getTableName();
148 queue.tryAcquireTableExclusiveLock(proc, tableName);
149 queue.releaseTableExclusiveLock(proc, tableName);
150 queue.completionCleanup(proc);
151 assertEquals(--count, queue.size());
152 assertEquals(i * 1000 + j, proc.getProcId());
153 }
154 }
155 assertEquals(0, queue.size());
156
157 for (int i = 1; i <= NUM_TABLES; ++i) {
158 TableName tableName = TableName.valueOf(String.format("test-%04d", i));
159
160 assertTrue(queue.markTableAsDeleted(tableName));
161 }
162 }
163
164
165
166
167
168 @Test
169 public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
170 TableName tableName = TableName.valueOf("testtb");
171
172 queue.addBack(new TestTableProcedure(1, tableName,
173 TableProcedureInterface.TableOperationType.EDIT));
174
175
176 assertFalse(queue.markTableAsDeleted(tableName));
177
178
179 Procedure proc = queue.poll();
180 assertEquals(1, proc.getProcId());
181
182 assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
183
184 assertEquals(0, queue.size());
185 assertFalse(queue.markTableAsDeleted(tableName));
186
187 queue.releaseTableExclusiveLock(proc, tableName);
188
189 assertTrue(queue.markTableAsDeleted(tableName));
190 }
191
192
193
194
195
196 @Test
197 public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
198 final TableName tableName = TableName.valueOf("testtb");
199 final int nitems = 2;
200
201 for (int i = 1; i <= nitems; ++i) {
202 queue.addBack(new TestTableProcedure(i, tableName,
203 TableProcedureInterface.TableOperationType.READ));
204 }
205
206
207 assertFalse(queue.markTableAsDeleted(tableName));
208
209 Procedure[] procs = new Procedure[nitems];
210 for (int i = 0; i < nitems; ++i) {
211
212 Procedure proc = procs[i] = queue.poll();
213 assertEquals(i + 1, proc.getProcId());
214
215 assertTrue(queue.tryAcquireTableSharedLock(proc, tableName));
216
217 assertFalse(queue.markTableAsDeleted(tableName));
218 }
219
220 for (int i = 0; i < nitems; ++i) {
221
222 assertFalse(queue.markTableAsDeleted(tableName));
223
224 queue.releaseTableSharedLock(procs[i], tableName);
225 }
226
227
228 assertEquals(0, queue.size());
229
230 assertTrue(queue.markTableAsDeleted(tableName));
231 }
232
233
234
235
236 @Test
237 public void testVerifyRwLocks() throws Exception {
238 TableName tableName = TableName.valueOf("testtb");
239 queue.addBack(new TestTableProcedure(1, tableName,
240 TableProcedureInterface.TableOperationType.EDIT));
241 queue.addBack(new TestTableProcedure(2, tableName,
242 TableProcedureInterface.TableOperationType.READ));
243 queue.addBack(new TestTableProcedure(3, tableName,
244 TableProcedureInterface.TableOperationType.EDIT));
245 queue.addBack(new TestTableProcedure(4, tableName,
246 TableProcedureInterface.TableOperationType.READ));
247 queue.addBack(new TestTableProcedure(5, tableName,
248 TableProcedureInterface.TableOperationType.READ));
249
250
251 Procedure proc = queue.poll();
252 assertEquals(1, proc.getProcId());
253 assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
254
255
256 assertEquals(null, queue.poll(0));
257
258
259 queue.releaseTableExclusiveLock(proc, tableName);
260
261
262 Procedure rdProc = queue.poll();
263 assertEquals(2, rdProc.getProcId());
264 assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName));
265
266
267 Procedure wrProc = queue.poll();
268 assertEquals(3, wrProc.getProcId());
269 assertEquals(false, queue.tryAcquireTableExclusiveLock(wrProc, tableName));
270
271
272 queue.releaseTableSharedLock(rdProc, tableName);
273 assertEquals(true, queue.tryAcquireTableExclusiveLock(wrProc, tableName));
274
275
276 assertEquals(null, queue.poll(0));
277
278
279 queue.releaseTableExclusiveLock(wrProc, tableName);
280
281
282 rdProc = queue.poll();
283 assertEquals(4, rdProc.getProcId());
284 assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName));
285
286
287 Procedure rdProc2 = queue.poll();
288 assertEquals(5, rdProc2.getProcId());
289 assertEquals(true, queue.tryAcquireTableSharedLock(rdProc2, tableName));
290
291
292 queue.releaseTableSharedLock(rdProc, tableName);
293 queue.releaseTableSharedLock(rdProc2, tableName);
294
295
296 assertEquals(0, queue.size());
297 assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
298 }
299
300
301
302
303
304 @Test(timeout=90000)
305 public void testConcurrentWriteOps() throws Exception {
306 final TestTableProcSet procSet = new TestTableProcSet(queue);
307
308 final int NUM_ITEMS = 10;
309 final int NUM_TABLES = 4;
310 final AtomicInteger opsCount = new AtomicInteger(0);
311 for (int i = 0; i < NUM_TABLES; ++i) {
312 TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
313 for (int j = 1; j < NUM_ITEMS; ++j) {
314 procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
315 TableProcedureInterface.TableOperationType.EDIT));
316 opsCount.incrementAndGet();
317 }
318 }
319 assertEquals(opsCount.get(), queue.size());
320
321 final Thread[] threads = new Thread[NUM_TABLES * 2];
322 final HashSet<TableName> concurrentTables = new HashSet<TableName>();
323 final ArrayList<String> failures = new ArrayList<String>();
324 final AtomicInteger concurrentCount = new AtomicInteger(0);
325 for (int i = 0; i < threads.length; ++i) {
326 threads[i] = new Thread() {
327 @Override
328 public void run() {
329 while (opsCount.get() > 0) {
330 try {
331 Procedure proc = procSet.acquire();
332 if (proc == null) {
333 queue.signalAll();
334 if (opsCount.get() > 0) {
335 continue;
336 }
337 break;
338 }
339
340 TableName tableId = procSet.getTableName(proc);
341 synchronized (concurrentTables) {
342 assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
343 }
344 assertTrue(opsCount.decrementAndGet() >= 0);
345 try {
346 long procId = proc.getProcId();
347 int concurrent = concurrentCount.incrementAndGet();
348 assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
349 concurrent >= 1 && concurrent <= NUM_TABLES);
350 LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
351 Thread.sleep(2000);
352 concurrent = concurrentCount.decrementAndGet();
353 LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
354 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
355 } finally {
356 synchronized (concurrentTables) {
357 assertTrue(concurrentTables.remove(tableId));
358 }
359 procSet.release(proc);
360 }
361 } catch (Throwable e) {
362 LOG.error("Failed " + e.getMessage(), e);
363 synchronized (failures) {
364 failures.add(e.getMessage());
365 }
366 } finally {
367 queue.signalAll();
368 }
369 }
370 }
371 };
372 threads[i].start();
373 }
374 for (int i = 0; i < threads.length; ++i) {
375 threads[i].join();
376 }
377 assertTrue(failures.toString(), failures.isEmpty());
378 assertEquals(0, opsCount.get());
379 assertEquals(0, queue.size());
380
381 for (int i = 1; i <= NUM_TABLES; ++i) {
382 TableName table = TableName.valueOf(String.format("testtb-%04d", i));
383 assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
384 }
385 }
386
387 public static class TestTableProcSet {
388 private final MasterProcedureScheduler queue;
389
390 public TestTableProcSet(final MasterProcedureScheduler queue) {
391 this.queue = queue;
392 }
393
394 public void addBack(Procedure proc) {
395 queue.addBack(proc);
396 }
397
398 public void addFront(Procedure proc) {
399 queue.addFront(proc);
400 }
401
402 public Procedure acquire() {
403 Procedure proc = null;
404 boolean avail = false;
405 while (!avail) {
406 proc = queue.poll();
407 if (proc == null) break;
408 switch (getTableOperationType(proc)) {
409 case CREATE:
410 case DELETE:
411 case EDIT:
412 avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
413 break;
414 case READ:
415 avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
416 break;
417 default:
418 throw new UnsupportedOperationException();
419 }
420 if (!avail) {
421 addFront(proc);
422 LOG.debug("yield procId=" + proc);
423 }
424 }
425 return proc;
426 }
427
428 public void release(Procedure proc) {
429 switch (getTableOperationType(proc)) {
430 case CREATE:
431 case DELETE:
432 case EDIT:
433 queue.releaseTableExclusiveLock(proc, getTableName(proc));
434 break;
435 case READ:
436 queue.releaseTableSharedLock(proc, getTableName(proc));
437 break;
438 }
439 }
440
441 public TableName getTableName(Procedure proc) {
442 return ((TableProcedureInterface)proc).getTableName();
443 }
444
445 public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) {
446 return ((TableProcedureInterface)proc).getTableOperationType();
447 }
448 }
449
450 public static class TestTableProcedure extends Procedure<Void>
451 implements TableProcedureInterface {
452 private final TableOperationType opType;
453 private final TableName tableName;
454
455 public TestTableProcedure() {
456 throw new UnsupportedOperationException("recovery should not be triggered here");
457 }
458
459 public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
460 this.tableName = tableName;
461 this.opType = opType;
462 setProcId(procId);
463 }
464
465 @Override
466 public TableName getTableName() {
467 return tableName;
468 }
469
470 @Override
471 public TableOperationType getTableOperationType() {
472 return opType;
473 }
474
475 @Override
476 protected Procedure[] execute(Void env) {
477 return null;
478 }
479
480 @Override
481 protected void rollback(Void env) {
482 throw new UnsupportedOperationException();
483 }
484
485 @Override
486 protected boolean abort(Void env) {
487 throw new UnsupportedOperationException();
488 }
489
490 @Override
491 protected void serializeStateData(final OutputStream stream) throws IOException {}
492
493 @Override
494 protected void deserializeStateData(final InputStream stream) throws IOException {}
495 }
496 }