View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.master.procedure;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.Arrays;
25  import java.util.ArrayList;
26  import java.util.HashSet;
27  import java.util.Map;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.HBaseConfiguration;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.master.TableLockManager;
38  import org.apache.hadoop.hbase.procedure2.Procedure;
39  import org.apache.hadoop.hbase.testclassification.MasterTests;
40  import org.apache.hadoop.hbase.testclassification.SmallTests;
41  import org.junit.After;
42  import org.junit.Before;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  
46  import static org.junit.Assert.assertEquals;
47  import static org.junit.Assert.assertFalse;
48  import static org.junit.Assert.assertTrue;
49  import static org.junit.Assert.fail;
50  
51  @Category({MasterTests.class, SmallTests.class})
52  public class TestMasterProcedureScheduler {
53    private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
54  
55    private MasterProcedureScheduler queue;
56    private Configuration conf;
57  
58    @Before
59    public void setUp() throws IOException {
60      conf = HBaseConfiguration.create();
61      queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
62    }
63  
64    @After
65    public void tearDown() throws IOException {
66      assertEquals(0, queue.size());
67    }
68  
69    @Test
70    public void testConcurrentCreateDelete() throws Exception {
71      final MasterProcedureScheduler procQueue = queue;
72      final TableName table = TableName.valueOf("testtb");
73      final AtomicBoolean running = new AtomicBoolean(true);
74      final AtomicBoolean failure = new AtomicBoolean(false);
75      Thread createThread = new Thread() {
76        @Override
77        public void run() {
78          try {
79            TestTableProcedure proc = new TestTableProcedure(1, table,
80                TableProcedureInterface.TableOperationType.CREATE);
81            while (running.get() && !failure.get()) {
82              if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
83                procQueue.releaseTableExclusiveLock(proc, table);
84              }
85            }
86          } catch (Throwable e) {
87            LOG.error("create failed", e);
88            failure.set(true);
89          }
90        }
91      };
92  
93      Thread deleteThread = new Thread() {
94        @Override
95        public void run() {
96          try {
97            TestTableProcedure proc = new TestTableProcedure(2, table,
98                TableProcedureInterface.TableOperationType.DELETE);
99            while (running.get() && !failure.get()) {
100             if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
101               procQueue.releaseTableExclusiveLock(proc, table);
102             }
103             procQueue.markTableAsDeleted(table);
104           }
105         } catch (Throwable e) {
106           LOG.error("delete failed", e);
107           failure.set(true);
108         }
109       }
110     };
111 
112     createThread.start();
113     deleteThread.start();
114     for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
115       Thread.sleep(100);
116     }
117     running.set(false);
118     createThread.join();
119     deleteThread.join();
120     assertEquals(false, failure.get());
121   }
122 
123   /**
124    * Verify simple create/insert/fetch/delete of the table queue.
125    */
126   @Test
127   public void testSimpleTableOpsQueues() throws Exception {
128     final int NUM_TABLES = 10;
129     final int NUM_ITEMS = 10;
130 
131     int count = 0;
132     for (int i = 1; i <= NUM_TABLES; ++i) {
133       TableName tableName = TableName.valueOf(String.format("test-%04d", i));
134       // insert items
135       for (int j = 1; j <= NUM_ITEMS; ++j) {
136         queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
137           TableProcedureInterface.TableOperationType.EDIT));
138         assertEquals(++count, queue.size());
139       }
140     }
141     assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
142 
143     for (int j = 1; j <= NUM_ITEMS; ++j) {
144       for (int i = 1; i <= NUM_TABLES; ++i) {
145         Procedure proc = queue.poll();
146         assertTrue(proc != null);
147         TableName tableName = ((TestTableProcedure)proc).getTableName();
148         queue.tryAcquireTableExclusiveLock(proc, tableName);
149         queue.releaseTableExclusiveLock(proc, tableName);
150         queue.completionCleanup(proc);
151         assertEquals(--count, queue.size());
152         assertEquals(i * 1000 + j, proc.getProcId());
153       }
154     }
155     assertEquals(0, queue.size());
156 
157     for (int i = 1; i <= NUM_TABLES; ++i) {
158       TableName tableName = TableName.valueOf(String.format("test-%04d", i));
159       // complete the table deletion
160       assertTrue(queue.markTableAsDeleted(tableName));
161     }
162   }
163 
164   /**
165    * Check that the table queue is not deletable until every procedure
166    * in-progress is completed (this is a special case for write-locks).
167    */
168   @Test
169   public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
170     TableName tableName = TableName.valueOf("testtb");
171 
172     queue.addBack(new TestTableProcedure(1, tableName,
173           TableProcedureInterface.TableOperationType.EDIT));
174 
175     // table can't be deleted because one item is in the queue
176     assertFalse(queue.markTableAsDeleted(tableName));
177 
178     // fetch item and take a lock
179     Procedure proc = queue.poll();
180     assertEquals(1, proc.getProcId());
181     // take the xlock
182     assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
183     // table can't be deleted because we have the lock
184     assertEquals(0, queue.size());
185     assertFalse(queue.markTableAsDeleted(tableName));
186     // release the xlock
187     queue.releaseTableExclusiveLock(proc, tableName);
188     // complete the table deletion
189     assertTrue(queue.markTableAsDeleted(tableName));
190   }
191 
192   /**
193    * Check that the table queue is not deletable until every procedure
194    * in-progress is completed (this is a special case for read-locks).
195    */
196   @Test
197   public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
198     final TableName tableName = TableName.valueOf("testtb");
199     final int nitems = 2;
200 
201     for (int i = 1; i <= nitems; ++i) {
202       queue.addBack(new TestTableProcedure(i, tableName,
203             TableProcedureInterface.TableOperationType.READ));
204     }
205 
206     // table can't be deleted because one item is in the queue
207     assertFalse(queue.markTableAsDeleted(tableName));
208 
209     Procedure[] procs = new Procedure[nitems];
210     for (int i = 0; i < nitems; ++i) {
211       // fetch item and take a lock
212       Procedure proc = procs[i] = queue.poll();
213       assertEquals(i + 1, proc.getProcId());
214       // take the rlock
215       assertTrue(queue.tryAcquireTableSharedLock(proc, tableName));
216       // table can't be deleted because we have locks and/or items in the queue
217       assertFalse(queue.markTableAsDeleted(tableName));
218     }
219 
220     for (int i = 0; i < nitems; ++i) {
221       // table can't be deleted because we have locks
222       assertFalse(queue.markTableAsDeleted(tableName));
223       // release the rlock
224       queue.releaseTableSharedLock(procs[i], tableName);
225     }
226 
227     // there are no items and no lock in the queeu
228     assertEquals(0, queue.size());
229     // complete the table deletion
230     assertTrue(queue.markTableAsDeleted(tableName));
231   }
232 
233   /**
234    * Verify the correct logic of RWLocks on the queue
235    */
236   @Test
237   public void testVerifyRwLocks() throws Exception {
238     TableName tableName = TableName.valueOf("testtb");
239     queue.addBack(new TestTableProcedure(1, tableName,
240           TableProcedureInterface.TableOperationType.EDIT));
241     queue.addBack(new TestTableProcedure(2, tableName,
242           TableProcedureInterface.TableOperationType.READ));
243     queue.addBack(new TestTableProcedure(3, tableName,
244           TableProcedureInterface.TableOperationType.EDIT));
245     queue.addBack(new TestTableProcedure(4, tableName,
246           TableProcedureInterface.TableOperationType.READ));
247     queue.addBack(new TestTableProcedure(5, tableName,
248           TableProcedureInterface.TableOperationType.READ));
249 
250     // Fetch the 1st item and take the write lock
251     Procedure proc = queue.poll();
252     assertEquals(1, proc.getProcId());
253     assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
254 
255     // Fetch the 2nd item and verify that the lock can't be acquired
256     assertEquals(null, queue.poll(0));
257 
258     // Release the write lock and acquire the read lock
259     queue.releaseTableExclusiveLock(proc, tableName);
260 
261     // Fetch the 2nd item and take the read lock
262     Procedure rdProc = queue.poll();
263     assertEquals(2, rdProc.getProcId());
264     assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName));
265 
266     // Fetch the 3rd item and verify that the lock can't be acquired
267     Procedure wrProc = queue.poll();
268     assertEquals(3, wrProc.getProcId());
269     assertEquals(false, queue.tryAcquireTableExclusiveLock(wrProc, tableName));
270 
271     // release the rdlock of item 2 and take the wrlock for the 3d item
272     queue.releaseTableSharedLock(rdProc, tableName);
273     assertEquals(true, queue.tryAcquireTableExclusiveLock(wrProc, tableName));
274 
275     // Fetch 4th item and verify that the lock can't be acquired
276     assertEquals(null, queue.poll(0));
277 
278     // Release the write lock and acquire the read lock
279     queue.releaseTableExclusiveLock(wrProc, tableName);
280 
281     // Fetch the 4th item and take the read lock
282     rdProc = queue.poll();
283     assertEquals(4, rdProc.getProcId());
284     assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName));
285 
286     // Fetch the 4th item and take the read lock
287     Procedure rdProc2 = queue.poll();
288     assertEquals(5, rdProc2.getProcId());
289     assertEquals(true, queue.tryAcquireTableSharedLock(rdProc2, tableName));
290 
291     // Release 4th and 5th read-lock
292     queue.releaseTableSharedLock(rdProc, tableName);
293     queue.releaseTableSharedLock(rdProc2, tableName);
294 
295     // remove table queue
296     assertEquals(0, queue.size());
297     assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
298   }
299 
300   /**
301    * Verify that "write" operations for a single table are serialized,
302    * but different tables can be executed in parallel.
303    */
304   @Test(timeout=90000)
305   public void testConcurrentWriteOps() throws Exception {
306     final TestTableProcSet procSet = new TestTableProcSet(queue);
307 
308     final int NUM_ITEMS = 10;
309     final int NUM_TABLES = 4;
310     final AtomicInteger opsCount = new AtomicInteger(0);
311     for (int i = 0; i < NUM_TABLES; ++i) {
312       TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
313       for (int j = 1; j < NUM_ITEMS; ++j) {
314         procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
315           TableProcedureInterface.TableOperationType.EDIT));
316         opsCount.incrementAndGet();
317       }
318     }
319     assertEquals(opsCount.get(), queue.size());
320 
321     final Thread[] threads = new Thread[NUM_TABLES * 2];
322     final HashSet<TableName> concurrentTables = new HashSet<TableName>();
323     final ArrayList<String> failures = new ArrayList<String>();
324     final AtomicInteger concurrentCount = new AtomicInteger(0);
325     for (int i = 0; i < threads.length; ++i) {
326       threads[i] = new Thread() {
327         @Override
328         public void run() {
329           while (opsCount.get() > 0) {
330             try {
331               Procedure proc = procSet.acquire();
332               if (proc == null) {
333                 queue.signalAll();
334                 if (opsCount.get() > 0) {
335                   continue;
336                 }
337                 break;
338               }
339 
340               TableName tableId = procSet.getTableName(proc);
341               synchronized (concurrentTables) {
342                 assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
343               }
344               assertTrue(opsCount.decrementAndGet() >= 0);
345               try {
346                 long procId = proc.getProcId();
347                 int concurrent = concurrentCount.incrementAndGet();
348                 assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
349                   concurrent >= 1 && concurrent <= NUM_TABLES);
350                 LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
351                 Thread.sleep(2000);
352                 concurrent = concurrentCount.decrementAndGet();
353                 LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
354                 assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
355               } finally {
356                 synchronized (concurrentTables) {
357                   assertTrue(concurrentTables.remove(tableId));
358                 }
359                 procSet.release(proc);
360               }
361             } catch (Throwable e) {
362               LOG.error("Failed " + e.getMessage(), e);
363               synchronized (failures) {
364                 failures.add(e.getMessage());
365               }
366             } finally {
367               queue.signalAll();
368             }
369           }
370         }
371       };
372       threads[i].start();
373     }
374     for (int i = 0; i < threads.length; ++i) {
375       threads[i].join();
376     }
377     assertTrue(failures.toString(), failures.isEmpty());
378     assertEquals(0, opsCount.get());
379     assertEquals(0, queue.size());
380 
381     for (int i = 1; i <= NUM_TABLES; ++i) {
382       TableName table = TableName.valueOf(String.format("testtb-%04d", i));
383       assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
384     }
385   }
386 
387   public static class TestTableProcSet {
388     private final MasterProcedureScheduler queue;
389 
390     public TestTableProcSet(final MasterProcedureScheduler queue) {
391       this.queue = queue;
392     }
393 
394     public void addBack(Procedure proc) {
395       queue.addBack(proc);
396     }
397 
398     public void addFront(Procedure proc) {
399       queue.addFront(proc);
400     }
401 
402     public Procedure acquire() {
403       Procedure proc = null;
404       boolean avail = false;
405       while (!avail) {
406         proc = queue.poll();
407         if (proc == null) break;
408         switch (getTableOperationType(proc)) {
409           case CREATE:
410           case DELETE:
411           case EDIT:
412             avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
413             break;
414           case READ:
415             avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
416             break;
417           default:
418             throw new UnsupportedOperationException();
419         }
420         if (!avail) {
421           addFront(proc);
422           LOG.debug("yield procId=" + proc);
423         }
424       }
425       return proc;
426     }
427 
428     public void release(Procedure proc) {
429       switch (getTableOperationType(proc)) {
430         case CREATE:
431         case DELETE:
432         case EDIT:
433           queue.releaseTableExclusiveLock(proc, getTableName(proc));
434           break;
435         case READ:
436           queue.releaseTableSharedLock(proc, getTableName(proc));
437           break;
438       }
439     }
440 
441     public TableName getTableName(Procedure proc) {
442       return ((TableProcedureInterface)proc).getTableName();
443     }
444 
445     public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) {
446       return ((TableProcedureInterface)proc).getTableOperationType();
447     }
448   }
449 
450   public static class TestTableProcedure extends Procedure<Void>
451       implements TableProcedureInterface {
452     private final TableOperationType opType;
453     private final TableName tableName;
454 
455     public TestTableProcedure() {
456       throw new UnsupportedOperationException("recovery should not be triggered here");
457     }
458 
459     public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
460       this.tableName = tableName;
461       this.opType = opType;
462       setProcId(procId);
463     }
464 
465     @Override
466     public TableName getTableName() {
467       return tableName;
468     }
469 
470     @Override
471     public TableOperationType getTableOperationType() {
472       return opType;
473     }
474 
475     @Override
476     protected Procedure[] execute(Void env) {
477       return null;
478     }
479 
480     @Override
481     protected void rollback(Void env) {
482       throw new UnsupportedOperationException();
483     }
484 
485     @Override
486     protected boolean abort(Void env) {
487       throw new UnsupportedOperationException();
488     }
489 
490     @Override
491     protected void serializeStateData(final OutputStream stream) throws IOException {}
492 
493     @Override
494     protected void deserializeStateData(final InputStream stream) throws IOException {}
495   }
496 }