View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertTrue;
25  import static org.junit.Assert.fail;
26  
27  import java.io.IOException;
28  import java.util.List;
29  import java.util.Random;
30  import java.util.concurrent.Callable;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.ExecutorService;
34  import java.util.concurrent.Executors;
35  import java.util.concurrent.Future;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.ChoreService;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.testclassification.LargeTests;
46  import org.apache.hadoop.hbase.NotServingRegionException;
47  import org.apache.hadoop.hbase.ScheduledChore;
48  import org.apache.hadoop.hbase.ServerName;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.TableNotDisabledException;
51  import org.apache.hadoop.hbase.Waiter;
52  import org.apache.hadoop.hbase.client.Admin;
53  import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
54  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
55  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
56  import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
57  import org.apache.hadoop.hbase.regionserver.HRegion;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.LoadTestTool;
60  import org.apache.hadoop.hbase.util.StoppableImplementation;
61  import org.apache.hadoop.hbase.util.Threads;
62  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
63  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
64  import org.junit.After;
65  import org.junit.Test;
66  import org.junit.experimental.categories.Category;
67  
68  /**
69   * Tests the default table lock manager
70   */
71  @Category(LargeTests.class)
72  public class TestTableLockManager {
73  
74    private static final Log LOG =
75      LogFactory.getLog(TestTableLockManager.class);
76  
77    private static final TableName TABLE_NAME =
78        TableName.valueOf("TestTableLevelLocks");
79  
80    private static final byte[] FAMILY = Bytes.toBytes("f1");
81  
82    private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
83  
84    private final HBaseTestingUtility TEST_UTIL =
85      new HBaseTestingUtility();
86  
87    private static final CountDownLatch deleteColumn = new CountDownLatch(1);
88    private static final CountDownLatch addColumn = new CountDownLatch(1);
89  
90    public void prepareMiniCluster() throws Exception {
91      TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
92      TEST_UTIL.startMiniCluster(2);
93      TEST_UTIL.createTable(TABLE_NAME, FAMILY);
94    }
95  
96    public void prepareMiniZkCluster() throws Exception {
97      TEST_UTIL.startMiniZKCluster(1);
98    }
99  
100   @After
101   public void tearDown() throws Exception {
102     TEST_UTIL.shutdownMiniCluster();
103   }
104 
105   public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver {
106     @Override
107     public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
108         TableName tableName, byte[] c) throws IOException {
109       deleteColumn.countDown();
110     }
111     @Override
112     public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
113         TableName tableName, byte[] c) throws IOException {
114       Threads.sleep(10000);
115     }
116 
117     @Override
118     public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
119         TableName tableName, HColumnDescriptor column) throws IOException {
120       fail("Add column should have timeouted out for acquiring the table lock");
121     }
122   }
123 
124   @Test(timeout = 600000)
125   public void testAlterAndDisable() throws Exception {
126     prepareMiniCluster();
127     // Send a request to alter a table, then sleep during
128     // the alteration phase. In the mean time, from another
129     // thread, send a request to disable, and then delete a table.
130 
131     HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
132     master.getMasterCoprocessorHost().load(TestAlterAndDisableMasterObserver.class,
133         0, TEST_UTIL.getConfiguration());
134 
135     ExecutorService executor = Executors.newFixedThreadPool(2);
136     Future<Object> alterTableFuture = executor.submit(new Callable<Object>() {
137       @Override
138       public Object call() throws Exception {
139         Admin admin = TEST_UTIL.getHBaseAdmin();
140         admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
141         LOG.info("Added new column family");
142         HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME);
143         assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY));
144         return null;
145       }
146     });
147     Future<Object> disableTableFuture = executor.submit(new Callable<Object>() {
148       @Override
149       public Object call() throws Exception {
150         Admin admin = TEST_UTIL.getHBaseAdmin();
151         admin.disableTable(TABLE_NAME);
152         assertTrue(admin.isTableDisabled(TABLE_NAME));
153         admin.deleteTable(TABLE_NAME);
154         assertFalse(admin.tableExists(TABLE_NAME));
155         return null;
156       }
157     });
158 
159     try {
160       disableTableFuture.get();
161       alterTableFuture.get();
162     } catch (ExecutionException e) {
163       if (e.getCause() instanceof AssertionError) {
164         throw (AssertionError) e.getCause();
165       }
166       throw e;
167     }
168   }
169 
170   public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver {
171     @Override
172     public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
173         TableName tableName, HColumnDescriptor column) throws IOException {
174       LOG.debug("addColumn called");
175       addColumn.countDown();
176     }
177 
178     @Override
179     public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
180         TableName tableName, HColumnDescriptor column) throws IOException {
181       Threads.sleep(6000);
182       try {
183         ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName);
184       } catch(TableNotDisabledException expected) {
185         //pass
186         return;
187       } catch(IOException ex) {
188       }
189       fail("was expecting the table to be enabled");
190     }
191 
192     @Override
193     public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
194                                 TableName tableName) throws IOException {
195       try {
196         LOG.debug("Waiting for addColumn to be processed first");
197         //wait for addColumn to be processed first
198         addColumn.await();
199         LOG.debug("addColumn started, we can continue");
200       } catch (InterruptedException ex) {
201         LOG.warn("Sleep interrupted while waiting for addColumn countdown");
202       }
203     }
204 
205     @Override
206     public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
207                                         TableName tableName) throws IOException {
208       Threads.sleep(3000);
209     }
210   }
211 
212   @Test(timeout = 600000)
213   public void testDelete() throws Exception {
214     prepareMiniCluster();
215 
216     Admin admin = TEST_UTIL.getHBaseAdmin();
217     admin.disableTable(TABLE_NAME);
218     admin.deleteTable(TABLE_NAME);
219 
220     //ensure that znode for the table node has been deleted
221     final ZooKeeperWatcher zkWatcher = TEST_UTIL.getZooKeeperWatcher();
222     final String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString());
223 
224     TEST_UTIL.waitFor(5000, new Waiter.Predicate<Exception>() {
225       @Override
226       public boolean evaluate() throws Exception {
227         int ver = ZKUtil.checkExists(zkWatcher, znode);
228         return ver < 0;
229       }
230     });
231     int ver = ZKUtil.checkExists(zkWatcher,
232       ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString()));
233     assertTrue("Unexpected znode version " + ver, ver < 0);
234 
235   }
236 
237 
238   @Test(timeout = 600000)
239   public void testReapAllTableLocks() throws Exception {
240     prepareMiniZkCluster();
241     ServerName serverName = ServerName.valueOf("localhost:10000", 0);
242     final TableLockManager lockManager = TableLockManager.createTableLockManager(
243         TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
244 
245     String tables[] = {"table1", "table2", "table3", "table4"};
246     ExecutorService executor = Executors.newFixedThreadPool(6);
247 
248     final CountDownLatch writeLocksObtained = new CountDownLatch(4);
249     final CountDownLatch writeLocksAttempted = new CountDownLatch(10);
250     //TODO: read lock tables
251 
252     //6 threads will be stuck waiting for the table lock
253     for (int i = 0; i < tables.length; i++) {
254       final String table = tables[i];
255       for (int j = 0; j < i+1; j++) { //i+1 write locks attempted for table[i]
256         executor.submit(new Callable<Void>() {
257           @Override
258           public Void call() throws Exception {
259             writeLocksAttempted.countDown();
260             lockManager.writeLock(TableName.valueOf(table),
261                 "testReapAllTableLocks").acquire();
262             writeLocksObtained.countDown();
263             return null;
264           }
265         });
266       }
267     }
268 
269     writeLocksObtained.await();
270     writeLocksAttempted.await();
271 
272     //now reap all table locks
273     lockManager.reapWriteLocks();
274 
275     TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0);
276     TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager(
277           TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
278 
279     //should not throw table lock timeout exception
280     zeroTimeoutLockManager.writeLock(
281         TableName.valueOf(tables[tables.length - 1]),
282         "zero timeout")
283       .acquire();
284 
285     executor.shutdownNow();
286   }
287 
288   @Test(timeout = 600000)
289   public void testTableReadLock() throws Exception {
290     // test plan: write some data to the table. Continuously alter the table and
291     // force splits
292     // concurrently until we have 5 regions. verify the data just in case.
293     // Every region should contain the same table descriptor
294     // This is not an exact test
295     prepareMiniCluster();
296     LoadTestTool loadTool = new LoadTestTool();
297     loadTool.setConf(TEST_UTIL.getConfiguration());
298     int numKeys = 10000;
299     final TableName tableName = TableName.valueOf("testTableReadLock");
300     final Admin admin = TEST_UTIL.getHBaseAdmin();
301     final HTableDescriptor desc = new HTableDescriptor(tableName);
302     final byte[] family = Bytes.toBytes("test_cf");
303     desc.addFamily(new HColumnDescriptor(family));
304     admin.createTable(desc); // create with one region
305 
306     // write some data, not much
307     int ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-write",
308         String.format("%d:%d:%d", 1, 10, 10), "-num_keys", String.valueOf(numKeys), "-skip_init" });
309     if (0 != ret) {
310       String errorMsg = "Load failed with error code " + ret;
311       LOG.error(errorMsg);
312       fail(errorMsg);
313     }
314 
315     int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
316     StoppableImplementation stopper = new StoppableImplementation();
317     final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
318 
319     //alter table every 10 sec
320     ScheduledChore alterThread = new ScheduledChore("Alter Chore", stopper, 10000) {
321       @Override
322       protected void chore() {
323         Random random = new Random();
324         try {
325           HTableDescriptor htd = admin.getTableDescriptor(tableName);
326           String val = String.valueOf(random.nextInt());
327           htd.getFamily(family).setValue(val, val);
328           desc.getFamily(family).setValue(val, val); // save it for later
329                                                      // control
330           admin.modifyTable(tableName, htd);
331         } catch (Exception ex) {
332           LOG.warn("Caught exception", ex);
333           fail(ex.getMessage());
334         }
335       }
336     };
337 
338     //split table every 5 sec
339     ScheduledChore splitThread = new ScheduledChore("Split thread", stopper, 5000) {
340       @Override
341       public void chore() {
342         try {
343           HRegion region = TEST_UTIL.getSplittableRegion(tableName, -1);
344           if (region != null) {
345             byte[] regionName = region.getRegionInfo().getRegionName();
346             admin.flushRegion(regionName);
347             admin.compactRegion(regionName);
348             admin.splitRegion(regionName);
349           } else {
350             LOG.warn("Could not find suitable region for the table.  Possibly the " +
351               "region got closed and the attempts got over before " +
352               "the region could have got reassigned.");
353           }
354         } catch (NotServingRegionException nsre) {
355           // the region may be in transition
356           LOG.warn("Caught exception", nsre);
357         } catch (Exception ex) {
358           LOG.warn("Caught exception", ex);
359           fail(ex.getMessage());
360         }
361       }
362     };
363 
364     choreService.scheduleChore(alterThread);
365     choreService.scheduleChore(splitThread);
366     TEST_UTIL.waitTableEnabled(tableName);
367     while (true) {
368       List<HRegionInfo> regions = admin.getTableRegions(tableName);
369       LOG.info(String.format("Table #regions: %d regions: %s:", regions.size(), regions));
370       assertEquals(admin.getTableDescriptor(tableName), desc);
371       for (HRegion region : TEST_UTIL.getMiniHBaseCluster().getRegions(tableName)) {
372         assertEquals(desc, region.getTableDesc());
373       }
374       if (regions.size() >= 5) {
375         break;
376       }
377       Threads.sleep(1000);
378     }
379     stopper.stop("test finished");
380 
381     int newFamilyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
382     LOG.info(String.format("Altered the table %d times", newFamilyValues - familyValues));
383     assertTrue(newFamilyValues > familyValues); // at least one alter went
384                                                 // through
385 
386     ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-read", "100:10",
387         "-num_keys", String.valueOf(numKeys), "-skip_init" });
388     if (0 != ret) {
389       String errorMsg = "Verify failed with error code " + ret;
390       LOG.error(errorMsg);
391       fail(errorMsg);
392     }
393 
394     admin.close();
395     choreService.shutdown();
396   }
397 
398 }