1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.HBaseConfiguration;
25 import org.apache.hadoop.hbase.HBaseTestingUtility;
26 import org.apache.hadoop.hbase.HColumnDescriptor;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.HTableDescriptor;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.Admin;
31 import org.apache.hadoop.hbase.client.Connection;
32 import org.apache.hadoop.hbase.client.ConnectionFactory;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
37 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
38 import org.junit.AfterClass;
39 import org.junit.BeforeClass;
40
41
42
43
44
45
46
47 public class TestReplicationBase {
48
49
50
51
52
53 private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
54
55 protected static Configuration conf1 = HBaseConfiguration.create();
56 protected static Configuration conf2;
57 protected static Configuration CONF_WITH_LOCALFS;
58
59 protected static ZooKeeperWatcher zkw1;
60 protected static ZooKeeperWatcher zkw2;
61
62 protected static ReplicationAdmin admin;
63
64 protected static Table htable1;
65 protected static Table htable2;
66
67 protected static HBaseTestingUtility utility1;
68 protected static HBaseTestingUtility utility2;
69 protected static final int NB_ROWS_IN_BATCH = 100;
70 protected static final int NB_ROWS_IN_BIG_BATCH =
71 NB_ROWS_IN_BATCH * 10;
72 protected static final long SLEEP_TIME = 500;
73 protected static final int NB_RETRIES = 10;
74
75 protected static final TableName tableName = TableName.valueOf("test");
76 protected static final byte[] famName = Bytes.toBytes("f");
77 protected static final byte[] row = Bytes.toBytes("row");
78 protected static final byte[] noRepfamName = Bytes.toBytes("norep");
79
80
81
82
83 @BeforeClass
84 public static void setUpBeforeClass() throws Exception {
85 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
86
87
88
89
90 conf1.setInt("replication.source.size.capacity", 102400);
91 conf1.setLong("replication.source.sleepforretries", 100);
92 conf1.setInt("hbase.regionserver.maxlogs", 10);
93 conf1.setLong("hbase.master.logcleaner.ttl", 10);
94 conf1.setInt("zookeeper.recovery.retry", 1);
95 conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
96 conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
97 conf1.setBoolean("dfs.support.append", true);
98 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
99 conf1.setInt("replication.stats.thread.period.seconds", 5);
100 conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
101 conf1.setLong("replication.sleep.before.failover", 2000);
102 conf1.setInt("replication.source.maxretriesmultiplier", 10);
103 conf1.setFloat("replication.source.ratio", 1.0f);
104
105 utility1 = new HBaseTestingUtility(conf1);
106 utility1.startMiniZKCluster();
107 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
108
109
110 conf1 = utility1.getConfiguration();
111 zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null, true);
112 admin = new ReplicationAdmin(conf1);
113 LOG.info("Setup first Zk");
114
115
116 conf2 = HBaseConfiguration.create(conf1);
117 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
118 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
119 conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
120 conf2.setBoolean("dfs.support.append", true);
121 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
122
123 utility2 = new HBaseTestingUtility(conf2);
124 utility2.setZkCluster(miniZK);
125 zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
126
127 admin.addPeer("2", utility2.getClusterKey());
128
129 LOG.info("Setup second Zk");
130 CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
131 utility1.startMiniCluster(2);
132
133
134 utility2.startMiniCluster(4);
135
136 HTableDescriptor table = new HTableDescriptor(tableName);
137 HColumnDescriptor fam = new HColumnDescriptor(famName);
138 fam.setMaxVersions(100);
139 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
140 table.addFamily(fam);
141 fam = new HColumnDescriptor(noRepfamName);
142 table.addFamily(fam);
143 Connection connection1 = ConnectionFactory.createConnection(conf1);
144 Connection connection2 = ConnectionFactory.createConnection(conf2);
145 try (Admin admin1 = connection1.getAdmin()) {
146 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
147 }
148 try (Admin admin2 = connection2.getAdmin()) {
149 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
150 }
151 utility1.waitUntilAllRegionsAssigned(tableName);
152 utility2.waitUntilAllRegionsAssigned(tableName);
153 htable1 = connection1.getTable(tableName);
154 htable1.setWriteBufferSize(1024);
155 htable2 = connection2.getTable(tableName);
156 }
157
158
159
160
161 @AfterClass
162 public static void tearDownAfterClass() throws Exception {
163 htable2.close();
164 htable1.close();
165 admin.close();
166 utility2.shutdownMiniCluster();
167 utility1.shutdownMiniCluster();
168 }
169 }