1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.Abortable;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
33 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
34 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
35 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
36 import org.apache.zookeeper.KeeperException;
37 import org.apache.zookeeper.KeeperException.AuthFailedException;
38 import org.apache.zookeeper.KeeperException.ConnectionLossException;
39 import org.apache.zookeeper.KeeperException.SessionExpiredException;
40
41
42
43
44
45 @InterfaceAudience.Private
46 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
47 justification="Thinks zkw needs to be synchronized access but should be fine as is.")
48 public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
49 implements Abortable {
50
51 private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class);
52
53 private ZooKeeperWatcher zkw = null;
54
55 private List<ServerName> regionServers = new ArrayList<ServerName>(0);
56 private long lastRegionServerUpdate;
57
58 protected void disconnect() {
59 if (zkw != null) {
60 zkw.close();
61 }
62 }
63
64
65
66
67
68 protected void reconnect(KeeperException ke) {
69 if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
70 || ke instanceof AuthFailedException) {
71 String clusterKey = ctx.getPeerConfig().getClusterKey();
72 LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke);
73 try {
74 reloadZkWatcher();
75 } catch (IOException io) {
76 LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io);
77 }
78 }
79 }
80
81 @Override
82 protected void doStart() {
83 try {
84 reloadZkWatcher();
85 notifyStarted();
86 } catch (IOException e) {
87 notifyFailed(e);
88 }
89 }
90
91 @Override
92 protected void doStop() {
93 disconnect();
94 notifyStopped();
95 }
96
97 @Override
98
99
100
101
102 public synchronized UUID getPeerUUID() {
103 UUID peerUUID = null;
104 try {
105 peerUUID = ZKClusterId.getUUIDForCluster(zkw);
106 } catch (KeeperException ke) {
107 reconnect(ke);
108 }
109 return peerUUID;
110 }
111
112
113
114
115
116 protected ZooKeeperWatcher getZkw() {
117 return zkw;
118 }
119
120
121
122
123
124 void reloadZkWatcher() throws IOException {
125 if (zkw != null) zkw.close();
126 zkw = new ZooKeeperWatcher(ctx.getConfiguration(),
127 "connection to cluster: " + ctx.getPeerId(), this);
128 getZkw().registerListener(new PeerRegionServerListener(this));
129 }
130
131 @Override
132 public void abort(String why, Throwable e) {
133 LOG.fatal("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
134 + " was aborted for the following reason(s):" + why, e);
135 }
136
137 @Override
138 public boolean isAborted() {
139
140 return false;
141 }
142
143
144
145
146
147
148 protected static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
149 throws KeeperException {
150 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
151 if (children == null) {
152 return Collections.emptyList();
153 }
154 List<ServerName> addresses = new ArrayList<ServerName>(children.size());
155 for (String child : children) {
156 addresses.add(ServerName.parseServerName(child));
157 }
158 return addresses;
159 }
160
161
162
163
164
165
166
167
168
169
170 public synchronized List<ServerName> getRegionServers() {
171 try {
172 setRegionServers(fetchSlavesAddresses(this.getZkw()));
173 } catch (KeeperException ke) {
174 if (LOG.isDebugEnabled()) {
175 LOG.debug("Fetch slaves addresses failed", ke);
176 }
177 reconnect(ke);
178 }
179 return regionServers;
180 }
181
182
183
184
185
186 public synchronized void setRegionServers(List<ServerName> regionServers) {
187 this.regionServers = regionServers;
188 lastRegionServerUpdate = System.currentTimeMillis();
189 }
190
191
192
193
194
195
196 public long getLastRegionServerUpdate() {
197 return lastRegionServerUpdate;
198 }
199
200
201
202
203 public static class PeerRegionServerListener extends ZooKeeperListener {
204
205 private final HBaseReplicationEndpoint replicationEndpoint;
206 private final String regionServerListNode;
207
208 public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
209 super(replicationPeer.getZkw());
210 this.replicationEndpoint = replicationPeer;
211 this.regionServerListNode = replicationEndpoint.getZkw().rsZNode;
212 }
213
214 @Override
215 public synchronized void nodeChildrenChanged(String path) {
216 if (path.equals(regionServerListNode)) {
217 try {
218 LOG.info("Detected change to peer region servers, fetching updated list");
219 replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
220 } catch (KeeperException e) {
221 LOG.fatal("Error reading slave addresses", e);
222 }
223 }
224 }
225 }
226 }