View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
29  import org.apache.commons.lang.RandomStringUtils;
30  import org.apache.commons.lang.math.RandomUtils;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.client.*;
35  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.util.HBaseFsck;
38  import org.apache.hadoop.hbase.util.Threads;
39  import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
40  import org.apache.hadoop.util.ToolRunner;
41  import org.junit.Assert;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  
45  /**
46   *
47   * Integration test that verifies Procedure V2. <br/><br/>
48   *
49   * DDL operations should go through (rollforward or rollback) when primary master is killed by
50   * ChaosMonkey (default MASTER_KILLING)<br/><br/>
51   *
52   * Multiple Worker threads are started to randomly do the following Actions in loops:<br/>
53   * Actions generating and populating tables:
54   * <ul>
55   *     <li>CreateTableAction</li>
56   *     <li>DisableTableAction</li>
57   *     <li>EnableTableAction</li>
58   *     <li>DeleteTableAction</li>
59   *     <li>AddRowAction</li>
60   * </ul>
61   * Actions performing DDL operations:
62   * <ul>
63   *     <li>AddColumnFamilyAction</li>
64   *     <li>AlterColumnFamilyVersionsAction</li>
65   *     <li>AlterColumnFamilyEncodingAction</li>
66   *     <li>DeleteColumnFamilyAction</li>
67   * </ul>
68   * <br/>
69   *
70   * The threads run for a period of time (default 20 minutes) then are stopped at the end of
71   * runtime. Verification is performed towards those checkpoints:
72   * <ol>
73   *     <li>No Actions throw Exceptions.</li>
74   *     <li>No inconsistencies are detected in hbck.</li>
75   * </ol>
76   *
77   * <p>
78   * This test should be run by the hbase user since it invokes hbck at the end
79   * </p><p>
80   * Usage:
81   *  hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover
82   *    -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000
83   *    -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20
84   *    -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling
85   */
86  
87  @Category(IntegrationTests.class)
88  public class IntegrationTestDDLMasterFailover extends IntegrationTestBase {
89  
90    private static final Log LOG = LogFactory.getLog(IntegrationTestDDLMasterFailover.class);
91  
92    private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
93  
94    protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
95  
96    protected static final int DEFAULT_NUM_THREADS = 20;
97  
98    protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables
99  
100   protected HBaseCluster cluster;
101 
102   protected Connection connection;
103 
104   /**
105    * A soft limit on how long we should run
106    */
107   protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
108   protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads";
109   protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions";
110 
111   protected AtomicBoolean running = new AtomicBoolean(true);
112 
113   protected AtomicBoolean create_table = new AtomicBoolean(true);
114 
115   protected int numThreads, numRegions;
116 
117   ConcurrentHashMap<TableName, HTableDescriptor> enabledTables =
118       new ConcurrentHashMap<TableName, HTableDescriptor>();
119 
120   ConcurrentHashMap<TableName, HTableDescriptor> disabledTables =
121       new ConcurrentHashMap<TableName, HTableDescriptor>();
122 
123   ConcurrentHashMap<TableName, HTableDescriptor> deletedTables =
124       new ConcurrentHashMap<TableName, HTableDescriptor>();
125 
126   @Override
127   public void setUpCluster() throws Exception {
128     util = getTestingUtil(getConf());
129     LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
130     util.initializeCluster(getMinServerCount());
131     LOG.debug("Done initializing/checking cluster");
132     cluster = util.getHBaseClusterInterface();
133   }
134 
135   @Override
136   public void cleanUpCluster() throws Exception {
137     Admin admin = util.getHBaseAdmin();
138     admin.disableTables("ittable-\\d+");
139     admin.deleteTables("ittable-\\d+");
140     Connection connection = getConnection();
141     connection.close();
142     super.cleanUpCluster();
143   }
144 
145   protected int getMinServerCount() {
146     return SERVER_COUNT;
147   }
148 
149   protected synchronized void setConnection(Connection connection){
150     this.connection = connection;
151   }
152 
153   protected synchronized Connection getConnection(){
154     if (this.connection == null) {
155       try {
156         Connection connection = ConnectionFactory.createConnection(getConf());
157         setConnection(connection);
158       } catch (IOException e) {
159         LOG.fatal("Failed to establish connection.", e);
160       }
161     }
162     return connection;
163   }
164 
165   protected void verifyTables() throws  IOException{
166     Connection connection = getConnection();
167     Admin admin = connection.getAdmin();
168     // iterating concurrent map
169     for (TableName tableName : enabledTables.keySet()){
170       Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled",
171           admin.isTableEnabled(tableName));
172     }
173     for (TableName tableName : disabledTables.keySet()){
174       Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled",
175           admin.isTableDisabled(tableName));
176     }
177     for (TableName tableName : deletedTables.keySet()){
178       Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted",
179           admin.tableExists(tableName));
180     }
181     admin.close();
182   }
183 
184   @Test
185   public void testAsUnitTest() throws Exception {
186     runTest();
187   }
188 
189   @Override
190   public int runTestFromCommandLine() throws Exception {
191     int ret = runTest();
192     return ret;
193   }
194 
195   private abstract class MasterAction{
196     Connection connection = getConnection();
197 
198     abstract void perform() throws IOException;
199   }
200 
201   private abstract class TableAction extends  MasterAction{
202     // TableAction has implemented selectTable() shared by multiple table Actions
203     protected HTableDescriptor selectTable(ConcurrentHashMap<TableName, HTableDescriptor> tableMap)
204     {
205       // randomly select table from tableMap
206       if (tableMap.isEmpty()){
207         return null;
208       }
209       // synchronization to prevent removal from multiple threads
210       synchronized (tableMap){
211         ArrayList<TableName> tableList = new ArrayList<TableName>(tableMap.keySet());
212         TableName randomKey = tableList.get(RandomUtils.nextInt(tableList.size()));
213         HTableDescriptor randomHtd = tableMap.get(randomKey);
214         // remove from tableMap
215         tableMap.remove(randomKey);
216         return randomHtd;
217       }
218     }
219   }
220 
221   private class CreateTableAction extends TableAction {
222 
223     @Override
224     void perform() throws IOException {
225       Admin admin = connection.getAdmin();
226       try {
227         HTableDescriptor htd = createTableDesc();
228         TableName tableName = htd.getTableName();
229         if ( admin.tableExists(tableName)){
230           return;
231         }
232         String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName());
233         numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS);
234         byte[] startKey = Bytes.toBytes("row-0000000000");
235         byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE);
236         LOG.info("Creating table:" + htd);
237         admin.createTable(htd, startKey, endKey, numRegions);
238         Assert.assertTrue("Table: " + htd + " was not created", admin.tableExists(tableName));
239         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
240         enabledTables.put(tableName, freshTableDesc);
241         LOG.info("Created table:" + freshTableDesc);
242       } catch (Exception e){
243         LOG.warn("Caught exception in action: " + this.getClass());
244         // TODO workaround
245         // when master failover happens during CREATE_TABLE, client will do RPC retry and get TableExistsException
246         // ignore for now till better resolution
247         if (e instanceof TableExistsException) {
248           LOG.warn("Caught TableExistsException in action: " + this.getClass(), e);
249         } else {
250           throw e;
251         }
252       } finally {
253         admin.close();
254       }
255       verifyTables();
256     }
257 
258     private HTableDescriptor createTableDesc() {
259       String tableName = "ittable-" + String.format("%010d",
260         RandomUtils.nextInt(Integer.MAX_VALUE));
261       String familyName = "cf-" + Math.abs(RandomUtils.nextInt());
262       HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
263       // add random column family
264       htd.addFamily(new HColumnDescriptor(familyName));
265       return htd;
266     }
267   }
268 
269   private class DisableTableAction extends TableAction {
270 
271     @Override
272     void perform() throws IOException {
273 
274       HTableDescriptor selected = selectTable(enabledTables);
275       if (selected == null) {
276         return;
277       }
278 
279       Admin admin = connection.getAdmin();
280       try {
281         TableName tableName = selected.getTableName();
282         LOG.info("Disabling table :" + selected);
283         admin.disableTable(tableName);
284         Assert.assertTrue("Table: " + selected + " was not disabled",
285             admin.isTableDisabled(tableName));
286         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
287         disabledTables.put(tableName, freshTableDesc);
288         LOG.info("Disabled table :" + freshTableDesc);
289       } catch (Exception e){
290         LOG.warn("Caught exception in action: " + this.getClass());
291         // TODO workaround
292         // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
293         // operations
294         // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node
295         // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes
296         // 2) if master failover happens in the middle of the enable/disable operation, the new
297         // master will try to recover the tables in ENABLING/DISABLING state, as programmed in
298         // AssignmentManager#recoverTableInEnablingState() and
299         // AssignmentManager#recoverTableInDisablingState()
300         // 3) after the new master initialization completes, the procedure tries to re-do the
301         // enable/disable operation, which was already done. Ignore those exceptions before change
302         // of behaviors of AssignmentManager in presence of PV2
303         if (e instanceof TableNotEnabledException) {
304           LOG.warn("Caught TableNotEnabledException in action: " + this.getClass());
305           e.printStackTrace();
306         } else {
307           throw e;
308         }
309       } finally {
310         admin.close();
311       }
312       verifyTables();
313     }
314   }
315 
316   private class EnableTableAction extends TableAction {
317 
318     @Override
319     void perform() throws IOException {
320 
321       HTableDescriptor selected = selectTable(disabledTables);
322       if (selected == null ) {
323         return;
324       }
325 
326       Admin admin = connection.getAdmin();
327       try {
328         TableName tableName = selected.getTableName();
329         LOG.info("Enabling table :" + selected);
330         admin.enableTable(tableName);
331         Assert.assertTrue("Table: " + selected + " was not enabled",
332             admin.isTableEnabled(tableName));
333         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
334         enabledTables.put(tableName, freshTableDesc);
335         LOG.info("Enabled table :" + freshTableDesc);
336       } catch (Exception e){
337         LOG.warn("Caught exception in action: " + this.getClass());
338         // TODO workaround
339         // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
340         // operations 1) when enable/disable starts, the table state is changed to
341         // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED
342         // once the operation completes 2) if master failover happens in the middle of the
343         // enable/disable operation, the new master will try to recover the tables in
344         // ENABLING/DISABLING state, as programmed in
345         // AssignmentManager#recoverTableInEnablingState() and
346         // AssignmentManager#recoverTableInDisablingState()
347         // 3) after the new master initialization completes, the procedure tries to re-do the
348         // enable/disable operation, which was already done. Ignore those exceptions before
349         // change of behaviors of AssignmentManager in presence of PV2
350         if (e instanceof TableNotDisabledException) {
351           LOG.warn("Caught TableNotDisabledException in action: " + this.getClass());
352           e.printStackTrace();
353         } else {
354           throw e;
355         }
356       } finally {
357         admin.close();
358       }
359       verifyTables();
360     }
361   }
362 
363   private class DeleteTableAction extends TableAction {
364 
365     @Override
366     void perform() throws IOException {
367 
368       HTableDescriptor selected = selectTable(disabledTables);
369       if (selected == null) {
370         return;
371       }
372 
373       Admin admin = connection.getAdmin();
374       try {
375         TableName tableName = selected.getTableName();
376         LOG.info("Deleting table :" + selected);
377         admin.deleteTable(tableName);
378         Assert.assertFalse("Table: " + selected + " was not deleted",
379                 admin.tableExists(tableName));
380         deletedTables.put(tableName, selected);
381         LOG.info("Deleted table :" + selected);
382       } catch (Exception e){
383         LOG.warn("Caught exception in action: " + this.getClass());
384         // TODO workaround
385         // when master failover happens during DELETE_TABLE, client will do RPC retry and get
386         // TableNotFoundException ignore for now till better resolution
387         if (e instanceof TableNotFoundException) {
388           LOG.warn("Caught TableNotFoundException in action: " + this.getClass());
389           e.printStackTrace();
390         } else {
391           throw e;
392         }
393       } finally {
394         admin.close();
395       }
396       verifyTables();
397     }
398   }
399 
400 
401   private abstract class ColumnAction extends TableAction{
402     // ColumnAction has implemented selectFamily() shared by multiple family Actions
403     protected HColumnDescriptor selectFamily(HTableDescriptor htd) {
404       if (htd == null) {
405         return null;
406       }
407       HColumnDescriptor[] families = htd.getColumnFamilies();
408       if (families.length == 0){
409         LOG.info("No column families in table: " + htd);
410         return null;
411       }
412       HColumnDescriptor randomCfd = families[RandomUtils.nextInt(families.length)];
413       return randomCfd;
414     }
415   }
416 
417   private class AddColumnFamilyAction extends ColumnAction {
418 
419     @Override
420     void perform() throws IOException {
421       HTableDescriptor selected = selectTable(disabledTables);
422       if (selected == null) {
423         return;
424       }
425 
426       Admin admin = connection.getAdmin();
427       try {
428         HColumnDescriptor cfd = createFamilyDesc();
429         if (selected.hasFamily(cfd.getName())){
430           LOG.info(new String(cfd.getName()) + " already exists in table "
431               + selected.getTableName());
432           return;
433         }
434         TableName tableName = selected.getTableName();
435         LOG.info("Adding column family: " + cfd + " to table: " + tableName);
436         admin.addColumn(tableName, cfd);
437         // assertion
438         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
439         Assert.assertTrue("Column family: " + cfd + " was not added",
440             freshTableDesc.hasFamily(cfd.getName()));
441         LOG.info("Added column family: " + cfd + " to table: " + tableName);
442         disabledTables.put(tableName, freshTableDesc);
443       } catch (Exception e){
444         LOG.warn("Caught exception in action: " + this.getClass());
445         // TODO HBASE-13415
446         // loose restriction for InvalidFamilyOperationException thrown in async operations before
447         // HBASE-13415 completes when failover happens, multiple procids may be created from the
448         // same request when 1 procedure succeeds, the others would complain about family already
449         // exists
450         if (e instanceof InvalidFamilyOperationException) {
451           LOG.warn("Caught InvalidFamilyOperationException in action: " + this.getClass());
452           e.printStackTrace();
453         } else {
454           throw e;
455         }
456       } finally {
457         admin.close();
458       }
459       verifyTables();
460     }
461 
462     private HColumnDescriptor createFamilyDesc() {
463       String familyName = "cf-" + String.format("%010d", RandomUtils.nextInt(Integer.MAX_VALUE));
464       HColumnDescriptor cfd = new HColumnDescriptor(familyName);
465       return cfd;
466     }
467   }
468 
469   private class AlterFamilyVersionsAction extends ColumnAction {
470 
471     @Override
472     void perform() throws IOException {
473       HTableDescriptor selected = selectTable(disabledTables);
474       if (selected == null) {
475         return;
476       }
477       HColumnDescriptor columnDesc = selectFamily(selected);
478       if (columnDesc == null){
479         return;
480       }
481 
482       Admin admin = connection.getAdmin();
483       int versions = RandomUtils.nextInt(10) + 3;
484       try {
485         TableName tableName = selected.getTableName();
486         LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions +
487             " in table: " + tableName);
488         columnDesc.setMinVersions(versions);
489         columnDesc.setMaxVersions(versions);
490         admin.modifyTable(tableName, selected);
491         // assertion
492         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
493         HColumnDescriptor freshColumnDesc = freshTableDesc.getFamily(columnDesc.getName());
494         Assert.assertEquals("Column family: " + columnDesc + " was not altered",
495             freshColumnDesc.getMaxVersions(), versions);
496         Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered",
497             freshColumnDesc.getMinVersions(), versions);
498         LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions +
499             " in table: " + tableName);
500         disabledTables.put(tableName, freshTableDesc);
501       } catch (Exception e) {
502         LOG.warn("Caught exception in action: " + this.getClass());
503         throw e;
504       } finally {
505         admin.close();
506       }
507       verifyTables();
508     }
509   }
510 
511   private class AlterFamilyEncodingAction extends ColumnAction {
512 
513     @Override
514     void perform() throws IOException {
515       HTableDescriptor selected = selectTable(disabledTables);
516       if (selected == null) {
517         return;
518       }
519       HColumnDescriptor columnDesc = selectFamily(selected);
520       if (columnDesc == null){
521         return;
522       }
523 
524       Admin admin = connection.getAdmin();
525       try {
526         TableName tableName = selected.getTableName();
527         // possible DataBlockEncoding ids
528         int[] possibleIds = {0, 2, 3, 4, 6};
529         short id = (short) possibleIds[RandomUtils.nextInt(possibleIds.length)];
530         LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id +
531             " in table: " + tableName);
532         columnDesc.setDataBlockEncoding(DataBlockEncoding.getEncodingById(id));
533         admin.modifyTable(tableName, selected);
534         // assertion
535         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
536         HColumnDescriptor freshColumnDesc = freshTableDesc.getFamily(columnDesc.getName());
537         Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered",
538             freshColumnDesc.getDataBlockEncoding().getId(), id);
539         LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id +
540             " in table: " + tableName);
541         disabledTables.put(tableName, freshTableDesc);
542       } catch (Exception e) {
543         LOG.warn("Caught exception in action: " + this.getClass());
544         throw e;
545       } finally {
546         admin.close();
547       }
548       verifyTables();
549     }
550   }
551 
552   private class DeleteColumnFamilyAction extends ColumnAction {
553 
554     @Override
555     void perform() throws IOException {
556       HTableDescriptor selected = selectTable(disabledTables);
557       HColumnDescriptor cfd = selectFamily(selected);
558       if (selected == null || cfd == null) {
559         return;
560       }
561 
562       Admin admin = connection.getAdmin();
563       try {
564         if (selected.getColumnFamilies().length < 2) {
565           LOG.info("No enough column families to delete in table " + selected.getTableName());
566           return;
567         }
568         TableName tableName = selected.getTableName();
569         LOG.info("Deleting column family: " + cfd + " from table: " + tableName);
570         admin.deleteColumn(tableName, cfd.getName());
571         // assertion
572         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
573         Assert.assertFalse("Column family: " + cfd + " was not added",
574             freshTableDesc.hasFamily(cfd.getName()));
575         LOG.info("Deleted column family: " + cfd + " from table: " + tableName);
576         disabledTables.put(tableName, freshTableDesc);
577       } catch (Exception e) {
578         LOG.warn("Caught exception in action: " + this.getClass());
579         // TODO HBASE-13415
580         // loose restriction for InvalidFamilyOperationException thrown in async operations before
581         // HBASE-13415 completes when failover happens, multiple procids may be created from the
582         //  same request when 1 procedure succeeds, the others would complain about family not
583         // exists
584         if (e instanceof InvalidFamilyOperationException) {
585           LOG.warn("Caught InvalidFamilyOperationException in action: " + this.getClass());
586           e.printStackTrace();
587         } else {
588           throw e;
589         }
590       } finally {
591         admin.close();
592       }
593       verifyTables();
594     }
595   }
596 
597   private class AddRowAction extends ColumnAction {
598     // populate tables
599     @Override
600     void perform() throws IOException {
601       HTableDescriptor selected = selectTable(enabledTables);
602       if (selected == null ) {
603         return;
604       }
605 
606       Admin admin = connection.getAdmin();
607       TableName tableName = selected.getTableName();
608       try (Table table = connection.getTable(tableName)){
609         ArrayList<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>(admin.getTableRegions(
610             selected.getTableName()));
611         int numRegions = regionInfos.size();
612         // average number of rows to be added per action to each region
613         int average_rows = 1;
614         int numRows = average_rows * numRegions;
615         LOG.info("Adding " + numRows + " rows to table: " + selected);
616         for (int i = 0; i < numRows; i++){
617           // nextInt(Integer.MAX_VALUE)) to return positive numbers only
618           byte[] rowKey = Bytes.toBytes(
619               "row-" + String.format("%010d", RandomUtils.nextInt(Integer.MAX_VALUE)));
620           HColumnDescriptor cfd = selectFamily(selected);
621           if (cfd == null){
622             return;
623           }
624           byte[] family = cfd.getName();
625           byte[] qualifier = Bytes.toBytes("col-" + RandomUtils.nextInt(Integer.MAX_VALUE) % 10);
626           byte[] value = Bytes.toBytes("val-" + RandomStringUtils.randomAlphanumeric(10));
627           Put put = new Put(rowKey);
628           put.addColumn(family, qualifier, value);
629           table.put(put);
630         }
631         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
632         enabledTables.put(tableName, freshTableDesc);
633         LOG.info("Added " + numRows + " rows to table: " + selected);
634       } catch (Exception e) {
635         LOG.warn("Caught exception in action: " + this.getClass());
636         throw e;
637       } finally {
638         admin.close();
639       }
640       verifyTables();
641     }
642   }
643 
644   private enum ACTION {
645     CREATE_TABLE,
646     DISABLE_TABLE,
647     ENABLE_TABLE,
648     DELETE_TABLE,
649     ADD_COLUMNFAMILY,
650     DELETE_COLUMNFAMILY,
651     ALTER_FAMILYVERSIONS,
652     ALTER_FAMILYENCODING,
653     ADD_ROW
654   }
655 
656   private class Worker extends Thread {
657 
658     private Exception savedException;
659 
660     private ACTION action;
661 
662     @Override
663     public void run() {
664       while (running.get()) {
665         // select random action
666         ACTION selectedAction = ACTION.values()[RandomUtils.nextInt() % ACTION.values().length];
667         this.action = selectedAction;
668         LOG.info("Performing Action: " + selectedAction);
669 
670         try {
671           switch (selectedAction) {
672           case CREATE_TABLE:
673             // stop creating new tables in the later stage of the test to avoid too many empty
674             // tables
675             if (create_table.get()) {
676               new CreateTableAction().perform();
677             }
678             break;
679           case ADD_ROW:
680             new AddRowAction().perform();
681             break;
682           case DISABLE_TABLE:
683             new DisableTableAction().perform();
684             break;
685           case ENABLE_TABLE:
686             new EnableTableAction().perform();
687             break;
688           case DELETE_TABLE:
689             // reduce probability of deleting table to 20%
690             if (RandomUtils.nextInt(100) < 20) {
691               new DeleteTableAction().perform();
692             }
693             break;
694           case ADD_COLUMNFAMILY:
695             new AddColumnFamilyAction().perform();
696             break;
697           case DELETE_COLUMNFAMILY:
698             // reduce probability of deleting column family to 20%
699             if (RandomUtils.nextInt(100) < 20) {
700               new DeleteColumnFamilyAction().perform();
701             }
702             break;
703           case ALTER_FAMILYVERSIONS:
704             new AlterFamilyVersionsAction().perform();
705             break;
706           case ALTER_FAMILYENCODING:
707             new AlterFamilyEncodingAction().perform();
708             break;
709           }
710         } catch (Exception ex) {
711           this.savedException = ex;
712           return;
713         }
714       }
715       LOG.info(this.getName() + " stopped");
716     }
717 
718     public Exception getSavedException(){
719       return this.savedException;
720     }
721 
722     public ACTION getAction(){
723       return this.action;
724     }
725   }
726 
727   private void checkException(List<Worker> workers){
728     if(workers == null || workers.isEmpty())
729       return;
730     for (Worker worker : workers){
731       Exception e = worker.getSavedException();
732       if (e != null) {
733         LOG.error("Found exception in thread: " + worker.getName());
734         e.printStackTrace();
735       }
736       Assert.assertNull("Action failed: " + worker.getAction() + " in thread: "
737           + worker.getName(), e);
738     }
739   }
740 
741   private int runTest() throws Exception {
742     LOG.info("Starting the test");
743 
744     String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
745     long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME);
746 
747     String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName());
748     numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS);
749 
750     ArrayList<Worker> workers = new ArrayList<>();
751     for (int i = 0; i < numThreads; i++) {
752       checkException(workers);
753       Worker worker = new Worker();
754       LOG.info("Launching worker thread " + worker.getName());
755       workers.add(worker);
756       worker.start();
757     }
758 
759     Threads.sleep(runtime / 2);
760     LOG.info("Stopping creating new tables");
761     create_table.set(false);
762     Threads.sleep(runtime / 2);
763     LOG.info("Runtime is up");
764     running.set(false);
765 
766     checkException(workers);
767 
768     for (Worker worker : workers) {
769       worker.join();
770     }
771     LOG.info("All Worker threads stopped");
772 
773     // verify
774     LOG.info("Verify actions of all threads succeeded");
775     checkException(workers);
776     LOG.info("Verify states of all tables");
777     verifyTables();
778 
779     // RUN HBCK
780 
781     HBaseFsck hbck = null;
782     try {
783       LOG.info("Running hbck");
784       hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false);
785       HbckTestingUtil.assertNoErrors(hbck);
786       LOG.info("Finished hbck");
787     } finally {
788       if (hbck != null) {
789         hbck.close();
790       }
791     }
792      return 0;
793   }
794 
795   @Override
796   public TableName getTablename() {
797     return null; // This test is not inteded to run with stock Chaos Monkey
798   }
799 
800   @Override
801   protected Set<String> getColumnFamilies() {
802     return null; // This test is not inteded to run with stock Chaos Monkey
803   }
804 
805   public static void main(String[] args) throws Exception {
806     Configuration conf = HBaseConfiguration.create();
807     IntegrationTestingUtility.setUseDistributedCluster(conf);
808     IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover();
809     Connection connection = null;
810     int ret = 1;
811     try {
812       // Initialize connection once, then pass to Actions
813       LOG.debug("Setting up connection ...");
814       connection = ConnectionFactory.createConnection(conf);
815       masterFailover.setConnection(connection);
816       ret = ToolRunner.run(conf, masterFailover, args);
817     } catch (IOException e){
818       LOG.fatal("Failed to establish connection. Aborting test ...", e);
819     } finally {
820       connection = masterFailover.getConnection();
821       if (connection != null){
822         connection.close();
823       }
824       System.exit(ret);
825     }
826   }
827 }