1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.ChoreService;
33 import org.apache.hadoop.hbase.ClusterId;
34 import org.apache.hadoop.hbase.CoordinatedStateManager;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.Server;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.client.ClusterConnection;
41 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
42 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
43 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
44 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50
51
52
53
54
55
56
57
58 @Category(MediumTests.class)
59 public class TestReplicationTrackerZKImpl {
60
61 private static final Log LOG = LogFactory.getLog(TestReplicationTrackerZKImpl.class);
62
63 private static Configuration conf;
64 private static HBaseTestingUtility utility;
65
66
67 private ZooKeeperWatcher zkw;
68 private ReplicationPeers rp;
69 private ReplicationTracker rt;
70 private AtomicInteger rsRemovedCount;
71 private String rsRemovedData;
72 private AtomicInteger plChangedCount;
73 private List<String> plChangedData;
74 private AtomicInteger peerRemovedCount;
75 private String peerRemovedData;
76
77 @BeforeClass
78 public static void setUpBeforeClass() throws Exception {
79 utility = new HBaseTestingUtility();
80 utility.startMiniZKCluster();
81 conf = utility.getConfiguration();
82 ZooKeeperWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility);
83 ZKUtil.createWithParents(zk, zk.rsZNode);
84 }
85
86 @Before
87 public void setUp() throws Exception {
88 zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
89 String fakeRs1 = ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234");
90 try {
91 ZKClusterId.setClusterId(zkw, new ClusterId());
92 rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
93 rp.init();
94 rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
95 } catch (Exception e) {
96 fail("Exception during test setup: " + e);
97 }
98 rsRemovedCount = new AtomicInteger(0);
99 rsRemovedData = "";
100 plChangedCount = new AtomicInteger(0);
101 plChangedData = new ArrayList<String>();
102 peerRemovedCount = new AtomicInteger(0);
103 peerRemovedData = "";
104 }
105
106 @AfterClass
107 public static void tearDownAfterClass() throws Exception {
108 utility.shutdownMiniZKCluster();
109 }
110
111 @Test
112 public void testGetListOfRegionServers() throws Exception {
113
114 assertEquals(0, rt.getListOfRegionServers().size());
115
116
117 ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
118 assertEquals(1, rt.getListOfRegionServers().size());
119
120
121 ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
122 assertEquals(2, rt.getListOfRegionServers().size());
123
124
125 ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
126 assertEquals(1, rt.getListOfRegionServers().size());
127
128
129 ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname1.example.org:1234"));
130 assertEquals(0, rt.getListOfRegionServers().size());
131 }
132
133 @Test(timeout = 30000)
134 public void testRegionServerRemovedEvent() throws Exception {
135 ZKUtil.createAndWatch(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"),
136 HConstants.EMPTY_BYTE_ARRAY);
137 rt.registerListener(new DummyReplicationListener());
138
139 ZKUtil.deleteNode(zkw, ZKUtil.joinZNode(zkw.rsZNode, "hostname2.example.org:1234"));
140
141 while (rsRemovedCount.get() < 1) {
142 Thread.sleep(5);
143 }
144 assertEquals("hostname2.example.org:1234", rsRemovedData);
145 }
146
147 @Test(timeout = 30000)
148 public void testPeerRemovedEvent() throws Exception {
149 rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
150 rt.registerListener(new DummyReplicationListener());
151 rp.removePeer("5");
152
153 while (peerRemovedCount.get() < 1) {
154 Thread.sleep(5);
155 }
156 assertEquals("5", peerRemovedData);
157 }
158
159 @Test(timeout = 30000)
160 public void testPeerListChangedEvent() throws Exception {
161
162 rp.addPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
163 zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true);
164 rt.registerListener(new DummyReplicationListener());
165 rp.disablePeer("5");
166 int tmp = plChangedCount.get();
167 LOG.info("Peer count=" + tmp);
168 ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state");
169
170 while (plChangedCount.get() <= tmp) {
171 Thread.sleep(100);
172 LOG.info("Peer count=" + tmp);
173 }
174 assertEquals(1, plChangedData.size());
175 assertTrue(plChangedData.contains("5"));
176
177
178
179 rp.removePeer("5");
180 }
181
182 @Test(timeout = 30000)
183 public void testPeerNameControl() throws Exception {
184 int exists = 0;
185 int hyphen = 0;
186 rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
187
188 try{
189 rp.addPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
190 }catch(IllegalArgumentException e){
191 exists++;
192 }
193
194 try{
195 rp.addPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey()), null);
196 }catch(IllegalArgumentException e){
197 hyphen++;
198 }
199 assertEquals(1, exists);
200 assertEquals(1, hyphen);
201
202
203 rp.removePeer("6");
204 }
205
206 private class DummyReplicationListener implements ReplicationListener {
207
208 @Override
209 public void regionServerRemoved(String regionServer) {
210 rsRemovedData = regionServer;
211 rsRemovedCount.getAndIncrement();
212 LOG.debug("Received regionServerRemoved event: " + regionServer);
213 }
214
215 @Override
216 public void peerRemoved(String peerId) {
217 peerRemovedData = peerId;
218 peerRemovedCount.getAndIncrement();
219 LOG.debug("Received peerRemoved event: " + peerId);
220 }
221
222 @Override
223 public void peerListChanged(List<String> peerIds) {
224 plChangedData.clear();
225 plChangedData.addAll(peerIds);
226 int count = plChangedCount.getAndIncrement();
227 LOG.debug("Received peerListChanged event " + count);
228 }
229 }
230
231 private class DummyServer implements Server {
232 private String serverName;
233 private boolean isAborted = false;
234 private boolean isStopped = false;
235
236 public DummyServer(String serverName) {
237 this.serverName = serverName;
238 }
239
240 @Override
241 public Configuration getConfiguration() {
242 return conf;
243 }
244
245 @Override
246 public ZooKeeperWatcher getZooKeeper() {
247 return zkw;
248 }
249
250 @Override
251 public CoordinatedStateManager getCoordinatedStateManager() {
252 return null;
253 }
254
255 @Override
256 public ClusterConnection getConnection() {
257 return null;
258 }
259
260 @Override
261 public MetaTableLocator getMetaTableLocator() {
262 return null;
263 }
264
265 @Override
266 public ServerName getServerName() {
267 return ServerName.valueOf(this.serverName);
268 }
269
270 @Override
271 public void abort(String why, Throwable e) {
272 LOG.info("Aborting " + serverName);
273 this.isAborted = true;
274 }
275
276 @Override
277 public boolean isAborted() {
278 return this.isAborted;
279 }
280
281 @Override
282 public void stop(String why) {
283 this.isStopped = true;
284 }
285
286 @Override
287 public boolean isStopped() {
288 return this.isStopped;
289 }
290
291 @Override
292 public ChoreService getChoreService() {
293 return null;
294 }
295 }
296 }