1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Random;
31 import java.util.Set;
32 import java.util.SortedMap;
33 import java.util.SortedSet;
34 import java.util.TreeSet;
35 import java.util.UUID;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.CopyOnWriteArrayList;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.hbase.Server;
49 import org.apache.hadoop.hbase.TableDescriptors;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.regionserver.HRegionServer;
52 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
53 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
54 import org.apache.hadoop.hbase.replication.ReplicationException;
55 import org.apache.hadoop.hbase.replication.ReplicationListener;
56 import org.apache.hadoop.hbase.replication.ReplicationPeer;
57 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
58 import org.apache.hadoop.hbase.replication.ReplicationPeers;
59 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
60 import org.apache.hadoop.hbase.replication.ReplicationQueues;
61 import org.apache.hadoop.hbase.replication.ReplicationTracker;
62 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
63
64 import com.google.common.util.concurrent.ThreadFactoryBuilder;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 @InterfaceAudience.Private
83 public class ReplicationSourceManager implements ReplicationListener {
84 private static final Log LOG =
85 LogFactory.getLog(ReplicationSourceManager.class);
86
87 private final List<ReplicationSourceInterface> sources;
88
89 private final List<ReplicationSourceInterface> oldsources;
90 private final ReplicationQueues replicationQueues;
91 private final ReplicationTracker replicationTracker;
92 private final ReplicationPeers replicationPeers;
93
94 private final UUID clusterId;
95
96 private final Server server;
97
98
99 private final Map<String, Map<String, SortedSet<String>>> walsById;
100
101 private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
102 private final Configuration conf;
103 private final FileSystem fs;
104
105 private Set<Path> latestPaths;
106
107 private final Path logDir;
108
109 private final Path oldLogDir;
110
111 private final long sleepBeforeFailover;
112
113 private final ThreadPoolExecutor executor;
114
115 private final Random rand;
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 public ReplicationSourceManager(final ReplicationQueues replicationQueues,
131 final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
132 final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
133 final Path oldLogDir, final UUID clusterId) {
134
135
136 this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
137 this.replicationQueues = replicationQueues;
138 this.replicationPeers = replicationPeers;
139 this.replicationTracker = replicationTracker;
140 this.server = server;
141 this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
142 this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
143 this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
144 this.conf = conf;
145 this.fs = fs;
146 this.logDir = logDir;
147 this.oldLogDir = oldLogDir;
148 this.sleepBeforeFailover =
149 conf.getLong("replication.sleep.before.failover", 30000);
150 this.clusterId = clusterId;
151 this.replicationTracker.registerListener(this);
152 this.replicationPeers.getAllPeerIds();
153
154
155 int nbWorkers = conf.getInt("replication.executor.workers", 1);
156
157
158 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
159 100, TimeUnit.MILLISECONDS,
160 new LinkedBlockingQueue<Runnable>());
161 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
162 tfb.setNameFormat("ReplicationExecutor-%d");
163 tfb.setDaemon(true);
164 this.executor.setThreadFactory(tfb.build());
165 this.rand = new Random();
166 this.latestPaths = Collections.synchronizedSet(new HashSet<Path>());
167 }
168
169
170
171
172
173
174
175
176
177
178
179
180 public void logPositionAndCleanOldLogs(Path log, String id, long position,
181 boolean queueRecovered, boolean holdLogInZK) {
182 String fileName = log.getName();
183 this.replicationQueues.setLogPosition(id, fileName, position);
184 if (holdLogInZK) {
185 return;
186 }
187 cleanOldLogs(fileName, id, queueRecovered);
188 }
189
190
191
192
193
194
195
196
197 public void cleanOldLogs(String key, String id, boolean queueRecovered) {
198 String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
199 if (queueRecovered) {
200 SortedSet<String> wals = walsByIdRecoveredQueues.get(id).get(logPrefix);
201 if (wals != null && !wals.first().equals(key)) {
202 cleanOldLogs(wals, key, id);
203 }
204 } else {
205 synchronized (this.walsById) {
206 SortedSet<String> wals = walsById.get(id).get(logPrefix);
207 if (wals != null && !wals.first().equals(key)) {
208 cleanOldLogs(wals, key, id);
209 }
210 }
211 }
212 }
213
214 private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
215 SortedSet<String> walSet = wals.headSet(key);
216 LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
217 for (String wal : walSet) {
218 this.replicationQueues.removeLog(id, wal);
219 }
220 walSet.clear();
221 }
222
223
224
225
226
227 protected void init() throws IOException, ReplicationException {
228 for (String id : this.replicationPeers.getPeerIds()) {
229 addSource(id);
230 }
231 List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
232 if (currentReplicators == null || currentReplicators.size() == 0) {
233 return;
234 }
235 List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
236 LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
237 + otherRegionServers);
238
239
240 for (String rs : currentReplicators) {
241 if (!otherRegionServers.contains(rs)) {
242 transferQueues(rs);
243 }
244 }
245 }
246
247
248
249
250
251
252
253
254 protected ReplicationSourceInterface addSource(String id) throws IOException,
255 ReplicationException {
256 ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
257 ReplicationPeer peer = replicationPeers.getPeer(id);
258 ReplicationSourceInterface src =
259 getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
260 this.replicationPeers, server, id, this.clusterId, peerConfig, peer);
261 synchronized (this.walsById) {
262 this.sources.add(src);
263 Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
264 this.walsById.put(id, walsByGroup);
265
266 synchronized (latestPaths) {
267 if (this.latestPaths.size() > 0) {
268 for (Path logPath : latestPaths) {
269 String name = logPath.getName();
270 String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(name);
271 SortedSet<String> logs = new TreeSet<String>();
272 logs.add(name);
273 walsByGroup.put(walPrefix, logs);
274 try {
275 this.replicationQueues.addLog(id, name);
276 } catch (ReplicationException e) {
277 String message =
278 "Cannot add log to queue when creating a new source, queueId=" + id
279 + ", filename=" + name;
280 server.stop(message);
281 throw e;
282 }
283 src.enqueueLog(logPath);
284 }
285 }
286 }
287 }
288 src.startup();
289 return src;
290 }
291
292
293
294
295
296 public void deleteSource(String peerId, boolean closeConnection) {
297 this.replicationQueues.removeQueue(peerId);
298 if (closeConnection) {
299 this.replicationPeers.peerRemoved(peerId);
300 }
301 }
302
303
304
305
306 public void join() {
307 this.executor.shutdown();
308 if (this.sources.size() == 0) {
309 this.replicationQueues.removeAllQueues();
310 }
311 for (ReplicationSourceInterface source : this.sources) {
312 source.terminate("Region server is closing");
313 }
314 }
315
316
317
318
319
320 protected Map<String, Map<String, SortedSet<String>>> getWALs() {
321 return Collections.unmodifiableMap(walsById);
322 }
323
324
325
326
327
328 protected Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
329 return Collections.unmodifiableMap(walsByIdRecoveredQueues);
330 }
331
332
333
334
335
336 public List<ReplicationSourceInterface> getSources() {
337 return this.sources;
338 }
339
340
341
342
343
344 public List<ReplicationSourceInterface> getOldSources() {
345 return this.oldsources;
346 }
347
348 void preLogRoll(Path newLog) throws IOException {
349 recordLog(newLog);
350 String logName = newLog.getName();
351 String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
352 synchronized (latestPaths) {
353 Iterator<Path> iterator = latestPaths.iterator();
354 while (iterator.hasNext()) {
355 Path path = iterator.next();
356 if (path.getName().contains(logPrefix)) {
357 iterator.remove();
358 break;
359 }
360 }
361 this.latestPaths.add(newLog);
362 }
363 }
364
365
366
367
368
369
370
371 private void recordLog(Path logPath) throws IOException {
372 String logName = logPath.getName();
373 String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(logName);
374
375 synchronized (replicationPeers) {
376
377 for (String id : replicationPeers.getPeerIds()) {
378 try {
379 this.replicationQueues.addLog(id, logName);
380 } catch (ReplicationException e) {
381 throw new IOException("Cannot add log to replication queue"
382 + " when creating a new source, queueId=" + id + ", filename=" + logName, e);
383 }
384 }
385 }
386
387 synchronized (walsById) {
388 for (Map.Entry<String, Map<String, SortedSet<String>>> entry : this.walsById.entrySet()) {
389 String peerId = entry.getKey();
390 Map<String, SortedSet<String>> walsByPrefix = entry.getValue();
391 boolean existingPrefix = false;
392 for (Map.Entry<String, SortedSet<String>> walsEntry : walsByPrefix.entrySet()) {
393 SortedSet<String> wals = walsEntry.getValue();
394 if (this.sources.isEmpty()) {
395
396
397 wals.clear();
398 }
399 if (logPrefix.equals(walsEntry.getKey())) {
400 wals.add(logName);
401 existingPrefix = true;
402 }
403 }
404 if (!existingPrefix) {
405
406 LOG.debug("Start tracking logs for wal group " + logPrefix + " for peer " + peerId);
407 SortedSet<String> wals = new TreeSet<String>();
408 wals.add(logName);
409 walsByPrefix.put(logPrefix, wals);
410 }
411 }
412 }
413 }
414
415 void postLogRoll(Path newLog) throws IOException {
416
417 for (ReplicationSourceInterface source : this.sources) {
418 source.enqueueLog(newLog);
419 }
420 }
421
422
423
424
425
426
427
428
429
430
431
432 protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
433 final FileSystem fs, final ReplicationSourceManager manager,
434 final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
435 final Server server, final String peerId, final UUID clusterId,
436 final ReplicationPeerConfig peerConfig, final ReplicationPeer replicationPeer)
437 throws IOException {
438 RegionServerCoprocessorHost rsServerHost = null;
439 TableDescriptors tableDescriptors = null;
440 if (server instanceof HRegionServer) {
441 rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
442 tableDescriptors = ((HRegionServer) server).getTableDescriptors();
443 }
444 ReplicationSourceInterface src;
445 try {
446 @SuppressWarnings("rawtypes")
447 Class c = Class.forName(conf.get("replication.replicationsource.implementation",
448 ReplicationSource.class.getCanonicalName()));
449 src = (ReplicationSourceInterface) c.newInstance();
450 } catch (Exception e) {
451 LOG.warn("Passed replication source implementation throws errors, " +
452 "defaulting to ReplicationSource", e);
453 src = new ReplicationSource();
454 }
455
456 ReplicationEndpoint replicationEndpoint = null;
457 try {
458 String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
459 if (replicationEndpointImpl == null) {
460
461 replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
462 }
463 @SuppressWarnings("rawtypes")
464 Class c = Class.forName(replicationEndpointImpl);
465 replicationEndpoint = (ReplicationEndpoint) c.newInstance();
466 if(rsServerHost != null) {
467 ReplicationEndpoint newReplicationEndPoint = rsServerHost
468 .postCreateReplicationEndPoint(replicationEndpoint);
469 if(newReplicationEndPoint != null) {
470
471 replicationEndpoint = newReplicationEndPoint;
472 }
473 }
474 } catch (Exception e) {
475 LOG.warn("Passed replication endpoint implementation throws errors"
476 + " while initializing ReplicationSource for peer: " + peerId, e);
477 throw new IOException(e);
478 }
479
480 MetricsSource metrics = new MetricsSource(peerId);
481
482 src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId,
483 clusterId, replicationEndpoint, metrics);
484
485
486 replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
487 fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
488
489 return src;
490 }
491
492
493
494
495
496
497
498
499
500 private void transferQueues(String rsZnode) {
501 NodeFailoverWorker transfer =
502 new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
503 this.clusterId);
504 try {
505 this.executor.execute(transfer);
506 } catch (RejectedExecutionException ex) {
507 LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
508 }
509 }
510
511
512
513
514
515 public void closeRecoveredQueue(ReplicationSourceInterface src) {
516 LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
517 this.oldsources.remove(src);
518 deleteSource(src.getPeerClusterZnode(), false);
519 this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
520 }
521
522
523
524
525
526
527 public void removePeer(String id) {
528 LOG.info("Closing the following queue " + id + ", currently have "
529 + sources.size() + " and another "
530 + oldsources.size() + " that were recovered");
531 String terminateMessage = "Replication stream was removed by a user";
532 List<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
533 List<ReplicationSourceInterface> oldSourcesToDelete =
534 new ArrayList<ReplicationSourceInterface>();
535
536 for (ReplicationSourceInterface src : oldsources) {
537 if (id.equals(src.getPeerClusterId())) {
538 oldSourcesToDelete.add(src);
539 }
540 }
541 for (ReplicationSourceInterface src : oldSourcesToDelete) {
542 src.terminate(terminateMessage);
543 closeRecoveredQueue((src));
544 }
545 LOG.info("Number of deleted recovered sources for " + id + ": "
546 + oldSourcesToDelete.size());
547
548 synchronized (this.replicationPeers) {
549
550 for (ReplicationSourceInterface src : this.sources) {
551 if (id.equals(src.getPeerClusterId())) {
552 srcToRemove.add(src);
553 }
554 }
555 if (srcToRemove.size() == 0) {
556 LOG.error("The queue we wanted to close is missing " + id);
557 return;
558 }
559 for (ReplicationSourceInterface toRemove : srcToRemove) {
560 toRemove.terminate(terminateMessage);
561 this.sources.remove(toRemove);
562 }
563 deleteSource(id, true);
564 }
565 }
566
567 @Override
568 public void regionServerRemoved(String regionserver) {
569 transferQueues(regionserver);
570 }
571
572 @Override
573 public void peerRemoved(String peerId) {
574 removePeer(peerId);
575 }
576
577 @Override
578 public void peerListChanged(List<String> peerIds) {
579 for (String id : peerIds) {
580 try {
581 boolean added = this.replicationPeers.peerAdded(id);
582 if (added) {
583 addSource(id);
584 }
585 } catch (Exception e) {
586 LOG.error("Error while adding a new peer", e);
587 }
588 }
589 }
590
591
592
593
594
595 class NodeFailoverWorker extends Thread {
596
597 private String rsZnode;
598 private final ReplicationQueues rq;
599 private final ReplicationPeers rp;
600 private final UUID clusterId;
601
602
603
604
605
606 public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
607 final ReplicationPeers replicationPeers, final UUID clusterId) {
608 super("Failover-for-"+rsZnode);
609 this.rsZnode = rsZnode;
610 this.rq = replicationQueues;
611 this.rp = replicationPeers;
612 this.clusterId = clusterId;
613 }
614
615 @Override
616 public void run() {
617 if (this.rq.isThisOurZnode(rsZnode)) {
618 return;
619 }
620
621
622 try {
623 Thread.sleep(sleepBeforeFailover + (long) (rand.nextFloat() * sleepBeforeFailover));
624 } catch (InterruptedException e) {
625 LOG.warn("Interrupted while waiting before transferring a queue.");
626 Thread.currentThread().interrupt();
627 }
628
629 if (server.isStopped()) {
630 LOG.info("Not transferring queue since we are shutting down");
631 return;
632 }
633 SortedMap<String, SortedSet<String>> newQueues = null;
634
635 newQueues = this.rq.claimQueues(rsZnode);
636
637
638 if (newQueues.isEmpty()) {
639
640
641 return;
642 }
643
644 for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
645 String peerId = entry.getKey();
646 SortedSet<String> walsSet = entry.getValue();
647 try {
648
649 ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
650 String actualPeerId = replicationQueueInfo.getPeerId();
651 ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
652 ReplicationPeerConfig peerConfig = null;
653 try {
654 peerConfig = replicationPeers.getReplicationPeerConfig(actualPeerId);
655 } catch (ReplicationException ex) {
656 LOG.warn("Received exception while getting replication peer config, skipping replay"
657 + ex);
658 }
659 if (peer == null || peerConfig == null) {
660 LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
661 continue;
662 }
663
664 Map<String, SortedSet<String>> walsByGroup = new HashMap<String, SortedSet<String>>();
665 walsByIdRecoveredQueues.put(peerId, walsByGroup);
666 for (String wal : walsSet) {
667 String walPrefix = DefaultWALProvider.getWALPrefixFromWALName(wal);
668 SortedSet<String> wals = walsByGroup.get(walPrefix);
669 if (wals == null) {
670 wals = new TreeSet<String>();
671 walsByGroup.put(walPrefix, wals);
672 }
673 wals.add(wal);
674 }
675
676
677 ReplicationSourceInterface src =
678 getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
679 server, peerId, this.clusterId, peerConfig, peer);
680 if (!this.rp.getPeerIds().contains((src.getPeerClusterId()))) {
681 src.terminate("Recovered queue doesn't belong to any current peer");
682 break;
683 }
684 oldsources.add(src);
685 for (String wal : walsSet) {
686 src.enqueueLog(new Path(oldLogDir, wal));
687 }
688 src.startup();
689 } catch (IOException e) {
690
691 LOG.error("Failed creating a source", e);
692 }
693 }
694 }
695 }
696
697
698
699
700
701 public Path getOldLogDir() {
702 return this.oldLogDir;
703 }
704
705
706
707
708
709 public Path getLogDir() {
710 return this.logDir;
711 }
712
713
714
715
716
717 public FileSystem getFs() {
718 return this.fs;
719 }
720
721
722
723
724 public String getStats() {
725 StringBuffer stats = new StringBuffer();
726 for (ReplicationSourceInterface source : sources) {
727 stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
728 stats.append(source.getStats() + "\n");
729 }
730 for (ReplicationSourceInterface oldSource : oldsources) {
731 stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId()+": ");
732 stats.append(oldSource.getStats()+ "\n");
733 }
734 return stats.toString();
735 }
736 }