1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.util.List;
29 import java.util.Random;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.Future;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.ChoreService;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HRegionInfo;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.testclassification.LargeTests;
46 import org.apache.hadoop.hbase.NotServingRegionException;
47 import org.apache.hadoop.hbase.ScheduledChore;
48 import org.apache.hadoop.hbase.ServerName;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.TableNotDisabledException;
51 import org.apache.hadoop.hbase.Waiter;
52 import org.apache.hadoop.hbase.client.Admin;
53 import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
54 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
55 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
56 import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
57 import org.apache.hadoop.hbase.regionserver.HRegion;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.LoadTestTool;
60 import org.apache.hadoop.hbase.util.StoppableImplementation;
61 import org.apache.hadoop.hbase.util.Threads;
62 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
63 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
64 import org.junit.After;
65 import org.junit.Test;
66 import org.junit.experimental.categories.Category;
67
68
69
70
71 @Category(LargeTests.class)
72 public class TestTableLockManager {
73
74 private static final Log LOG =
75 LogFactory.getLog(TestTableLockManager.class);
76
77 private static final TableName TABLE_NAME =
78 TableName.valueOf("TestTableLevelLocks");
79
80 private static final byte[] FAMILY = Bytes.toBytes("f1");
81
82 private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
83
84 private final HBaseTestingUtility TEST_UTIL =
85 new HBaseTestingUtility();
86
87 private static final CountDownLatch deleteColumn = new CountDownLatch(1);
88 private static final CountDownLatch addColumn = new CountDownLatch(1);
89
90 public void prepareMiniCluster() throws Exception {
91 TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
92 TEST_UTIL.startMiniCluster(2);
93 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
94 }
95
96 public void prepareMiniZkCluster() throws Exception {
97 TEST_UTIL.startMiniZKCluster(1);
98 }
99
100 @After
101 public void tearDown() throws Exception {
102 TEST_UTIL.shutdownMiniCluster();
103 }
104
105 public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver {
106 @Override
107 public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
108 TableName tableName, byte[] c) throws IOException {
109 deleteColumn.countDown();
110 }
111 @Override
112 public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
113 TableName tableName, byte[] c) throws IOException {
114 Threads.sleep(10000);
115 }
116
117 @Override
118 public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
119 TableName tableName, HColumnDescriptor column) throws IOException {
120 fail("Add column should have timeouted out for acquiring the table lock");
121 }
122 }
123
124 @Test(timeout = 600000)
125 public void testAlterAndDisable() throws Exception {
126 prepareMiniCluster();
127
128
129
130
131 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
132 master.getMasterCoprocessorHost().load(TestAlterAndDisableMasterObserver.class,
133 0, TEST_UTIL.getConfiguration());
134
135 ExecutorService executor = Executors.newFixedThreadPool(2);
136 Future<Object> alterTableFuture = executor.submit(new Callable<Object>() {
137 @Override
138 public Object call() throws Exception {
139 Admin admin = TEST_UTIL.getHBaseAdmin();
140 admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
141 LOG.info("Added new column family");
142 HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME);
143 assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY));
144 return null;
145 }
146 });
147 Future<Object> disableTableFuture = executor.submit(new Callable<Object>() {
148 @Override
149 public Object call() throws Exception {
150 Admin admin = TEST_UTIL.getHBaseAdmin();
151 admin.disableTable(TABLE_NAME);
152 assertTrue(admin.isTableDisabled(TABLE_NAME));
153 admin.deleteTable(TABLE_NAME);
154 assertFalse(admin.tableExists(TABLE_NAME));
155 return null;
156 }
157 });
158
159 try {
160 disableTableFuture.get();
161 alterTableFuture.get();
162 } catch (ExecutionException e) {
163 if (e.getCause() instanceof AssertionError) {
164 throw (AssertionError) e.getCause();
165 }
166 throw e;
167 }
168 }
169
170 public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver {
171 @Override
172 public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
173 TableName tableName, HColumnDescriptor column) throws IOException {
174 LOG.debug("addColumn called");
175 addColumn.countDown();
176 }
177
178 @Override
179 public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
180 TableName tableName, HColumnDescriptor column) throws IOException {
181 Threads.sleep(6000);
182 try {
183 ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName);
184 } catch(TableNotDisabledException expected) {
185
186 return;
187 } catch(IOException ex) {
188 }
189 fail("was expecting the table to be enabled");
190 }
191
192 @Override
193 public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
194 TableName tableName) throws IOException {
195 try {
196 LOG.debug("Waiting for addColumn to be processed first");
197
198 addColumn.await();
199 LOG.debug("addColumn started, we can continue");
200 } catch (InterruptedException ex) {
201 LOG.warn("Sleep interrupted while waiting for addColumn countdown");
202 }
203 }
204
205 @Override
206 public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
207 TableName tableName) throws IOException {
208 Threads.sleep(3000);
209 }
210 }
211
212 @Test(timeout = 600000)
213 public void testDelete() throws Exception {
214 prepareMiniCluster();
215
216 Admin admin = TEST_UTIL.getHBaseAdmin();
217 admin.disableTable(TABLE_NAME);
218 admin.deleteTable(TABLE_NAME);
219
220
221 final ZooKeeperWatcher zkWatcher = TEST_UTIL.getZooKeeperWatcher();
222 final String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString());
223
224 TEST_UTIL.waitFor(5000, new Waiter.Predicate<Exception>() {
225 @Override
226 public boolean evaluate() throws Exception {
227 int ver = ZKUtil.checkExists(zkWatcher, znode);
228 return ver < 0;
229 }
230 });
231 int ver = ZKUtil.checkExists(zkWatcher,
232 ZKUtil.joinZNode(zkWatcher.tableLockZNode, TABLE_NAME.getNameAsString()));
233 assertTrue("Unexpected znode version " + ver, ver < 0);
234
235 }
236
237
238 @Test(timeout = 600000)
239 public void testReapAllTableLocks() throws Exception {
240 prepareMiniZkCluster();
241 ServerName serverName = ServerName.valueOf("localhost:10000", 0);
242 final TableLockManager lockManager = TableLockManager.createTableLockManager(
243 TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
244
245 String tables[] = {"table1", "table2", "table3", "table4"};
246 ExecutorService executor = Executors.newFixedThreadPool(6);
247
248 final CountDownLatch writeLocksObtained = new CountDownLatch(4);
249 final CountDownLatch writeLocksAttempted = new CountDownLatch(10);
250
251
252
253 for (int i = 0; i < tables.length; i++) {
254 final String table = tables[i];
255 for (int j = 0; j < i+1; j++) {
256 executor.submit(new Callable<Void>() {
257 @Override
258 public Void call() throws Exception {
259 writeLocksAttempted.countDown();
260 lockManager.writeLock(TableName.valueOf(table),
261 "testReapAllTableLocks").acquire();
262 writeLocksObtained.countDown();
263 return null;
264 }
265 });
266 }
267 }
268
269 writeLocksObtained.await();
270 writeLocksAttempted.await();
271
272
273 lockManager.reapWriteLocks();
274
275 TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0);
276 TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager(
277 TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
278
279
280 zeroTimeoutLockManager.writeLock(
281 TableName.valueOf(tables[tables.length - 1]),
282 "zero timeout")
283 .acquire();
284
285 executor.shutdownNow();
286 }
287
288 @Test(timeout = 600000)
289 public void testTableReadLock() throws Exception {
290
291
292
293
294
295 prepareMiniCluster();
296 LoadTestTool loadTool = new LoadTestTool();
297 loadTool.setConf(TEST_UTIL.getConfiguration());
298 int numKeys = 10000;
299 final TableName tableName = TableName.valueOf("testTableReadLock");
300 final Admin admin = TEST_UTIL.getHBaseAdmin();
301 final HTableDescriptor desc = new HTableDescriptor(tableName);
302 final byte[] family = Bytes.toBytes("test_cf");
303 desc.addFamily(new HColumnDescriptor(family));
304 admin.createTable(desc);
305
306
307 int ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-write",
308 String.format("%d:%d:%d", 1, 10, 10), "-num_keys", String.valueOf(numKeys), "-skip_init" });
309 if (0 != ret) {
310 String errorMsg = "Load failed with error code " + ret;
311 LOG.error(errorMsg);
312 fail(errorMsg);
313 }
314
315 int familyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
316 StoppableImplementation stopper = new StoppableImplementation();
317 final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
318
319
320 ScheduledChore alterThread = new ScheduledChore("Alter Chore", stopper, 10000) {
321 @Override
322 protected void chore() {
323 Random random = new Random();
324 try {
325 HTableDescriptor htd = admin.getTableDescriptor(tableName);
326 String val = String.valueOf(random.nextInt());
327 htd.getFamily(family).setValue(val, val);
328 desc.getFamily(family).setValue(val, val);
329
330 admin.modifyTable(tableName, htd);
331 } catch (Exception ex) {
332 LOG.warn("Caught exception", ex);
333 fail(ex.getMessage());
334 }
335 }
336 };
337
338
339 ScheduledChore splitThread = new ScheduledChore("Split thread", stopper, 5000) {
340 @Override
341 public void chore() {
342 try {
343 HRegion region = TEST_UTIL.getSplittableRegion(tableName, -1);
344 if (region != null) {
345 byte[] regionName = region.getRegionInfo().getRegionName();
346 admin.flushRegion(regionName);
347 admin.compactRegion(regionName);
348 admin.splitRegion(regionName);
349 } else {
350 LOG.warn("Could not find suitable region for the table. Possibly the " +
351 "region got closed and the attempts got over before " +
352 "the region could have got reassigned.");
353 }
354 } catch (NotServingRegionException nsre) {
355
356 LOG.warn("Caught exception", nsre);
357 } catch (Exception ex) {
358 LOG.warn("Caught exception", ex);
359 fail(ex.getMessage());
360 }
361 }
362 };
363
364 choreService.scheduleChore(alterThread);
365 choreService.scheduleChore(splitThread);
366 TEST_UTIL.waitTableEnabled(tableName);
367 while (true) {
368 List<HRegionInfo> regions = admin.getTableRegions(tableName);
369 LOG.info(String.format("Table #regions: %d regions: %s:", regions.size(), regions));
370 assertEquals(admin.getTableDescriptor(tableName), desc);
371 for (HRegion region : TEST_UTIL.getMiniHBaseCluster().getRegions(tableName)) {
372 assertEquals(desc, region.getTableDesc());
373 }
374 if (regions.size() >= 5) {
375 break;
376 }
377 Threads.sleep(1000);
378 }
379 stopper.stop("test finished");
380
381 int newFamilyValues = admin.getTableDescriptor(tableName).getFamily(family).getValues().size();
382 LOG.info(String.format("Altered the table %d times", newFamilyValues - familyValues));
383 assertTrue(newFamilyValues > familyValues);
384
385
386 ret = loadTool.run(new String[] { "-tn", tableName.getNameAsString(), "-read", "100:10",
387 "-num_keys", String.valueOf(numKeys), "-skip_init" });
388 if (0 != ret) {
389 String errorMsg = "Verify failed with error code " + ret;
390 LOG.error(errorMsg);
391 fail(errorMsg);
392 }
393
394 admin.close();
395 choreService.shutdown();
396 }
397
398 }