1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Comparator;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.UUID;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.PriorityBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicLong;
36
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileStatus;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.Cell;
46 import org.apache.hadoop.hbase.CellUtil;
47 import org.apache.hadoop.hbase.HBaseConfiguration;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.Stoppable;
50 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
51 import org.apache.hadoop.hbase.wal.WAL;
52 import org.apache.hadoop.hbase.wal.WALKey;
53 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
55 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
56 import org.apache.hadoop.hbase.replication.ReplicationException;
57 import org.apache.hadoop.hbase.replication.ReplicationPeers;
58 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
59 import org.apache.hadoop.hbase.replication.ReplicationQueues;
60 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
61 import org.apache.hadoop.hbase.replication.WALEntryFilter;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.util.CancelableProgressable;
64 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65 import org.apache.hadoop.hbase.util.FSUtils;
66 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
67 import org.apache.hadoop.hbase.util.Threads;
68
69 import com.google.common.collect.Lists;
70 import com.google.common.util.concurrent.ListenableFuture;
71 import com.google.common.util.concurrent.Service;
72
73
74
75
76
77
78
79
80
81
82
83
84
85 @InterfaceAudience.Private
86 public class ReplicationSource extends Thread
87 implements ReplicationSourceInterface {
88
89 private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
90
91
92 private Map<String, PriorityBlockingQueue<Path>> queues =
93 new HashMap<String, PriorityBlockingQueue<Path>>();
94
95 private int queueSizePerGroup;
96 private ReplicationQueues replicationQueues;
97 private ReplicationPeers replicationPeers;
98
99 private Configuration conf;
100 private ReplicationQueueInfo replicationQueueInfo;
101
102 private String peerId;
103
104 private ReplicationSourceManager manager;
105
106 private Stoppable stopper;
107
108 private long sleepForRetries;
109
110 private long replicationQueueSizeCapacity;
111
112 private int replicationQueueNbCapacity;
113 private FileSystem fs;
114
115 private UUID clusterId;
116
117 private UUID peerClusterId;
118
119 private AtomicLong totalReplicatedEdits = new AtomicLong(0);
120
121 private AtomicLong totalReplicatedOperations = new AtomicLong(0);
122
123 private String peerClusterZnode;
124
125 private int maxRetriesMultiplier;
126
127 private volatile boolean sourceRunning = false;
128
129 private MetricsSource metrics;
130
131 private int logQueueWarnThreshold;
132
133 private ReplicationEndpoint replicationEndpoint;
134
135 private WALEntryFilter walEntryFilter;
136
137 private ReplicationThrottler throttler;
138 private AtomicInteger logQueueSize = new AtomicInteger(0);
139 private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
140 new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 @Override
156 public void init(final Configuration conf, final FileSystem fs,
157 final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
158 final ReplicationPeers replicationPeers, final Stoppable stopper,
159 final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
160 final MetricsSource metrics)
161 throws IOException {
162 this.stopper = stopper;
163 this.conf = HBaseConfiguration.create(conf);
164 decorateConf();
165 this.replicationQueueSizeCapacity =
166 this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
167 this.replicationQueueNbCapacity =
168 this.conf.getInt("replication.source.nb.capacity", 25000);
169 this.sleepForRetries =
170 this.conf.getLong("replication.source.sleepforretries", 1000);
171 this.maxRetriesMultiplier =
172 this.conf.getInt("replication.source.maxretriesmultiplier", 300);
173 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
174 long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
175 this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
176 this.replicationQueues = replicationQueues;
177 this.replicationPeers = replicationPeers;
178 this.manager = manager;
179 this.fs = fs;
180 this.metrics = metrics;
181 this.clusterId = clusterId;
182
183 this.peerClusterZnode = peerClusterZnode;
184 this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
185
186 this.peerId = this.replicationQueueInfo.getPeerId();
187 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
188 this.replicationEndpoint = replicationEndpoint;
189 }
190
191 private void decorateConf() {
192 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
193 if (StringUtils.isNotEmpty(replicationCodec)) {
194 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
195 }
196 }
197
198 @Override
199 public void enqueueLog(Path log) {
200 String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
201 PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
202 if (queue == null) {
203 queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
204 queues.put(logPrefix, queue);
205 if (this.sourceRunning) {
206
207
208
209 final ReplicationSourceWorkerThread worker =
210 new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
211 ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
212 if (extant != null) {
213 LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
214 } else {
215 LOG.debug("Starting up worker for wal group " + logPrefix);
216 worker.startup();
217 }
218 }
219 }
220 queue.put(log);
221 int queueSize = logQueueSize.incrementAndGet();
222 this.metrics.setSizeOfLogQueue(queueSize);
223
224 if (queue.size() > this.logQueueWarnThreshold) {
225 LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
226 + " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
227 }
228 }
229
230 private void uninitialize() {
231 LOG.debug("Source exiting " + this.peerId);
232 metrics.clear();
233 if (replicationEndpoint.state() == Service.State.STARTING
234 || replicationEndpoint.state() == Service.State.RUNNING) {
235 replicationEndpoint.stopAndWait();
236 }
237 }
238
239 @Override
240 public void run() {
241
242 this.sourceRunning = true;
243 try {
244
245 Service.State state = replicationEndpoint.start().get();
246 if (state != Service.State.RUNNING) {
247 LOG.warn("ReplicationEndpoint was not started. Exiting");
248 uninitialize();
249 return;
250 }
251 } catch (Exception ex) {
252 LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
253 throw new RuntimeException(ex);
254 }
255
256
257 ArrayList<WALEntryFilter> filters = Lists.newArrayList(
258 (WALEntryFilter)new SystemTableWALEntryFilter());
259 WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
260 if (filterFromEndpoint != null) {
261 filters.add(filterFromEndpoint);
262 }
263 this.walEntryFilter = new ChainWALEntryFilter(filters);
264
265 int sleepMultiplier = 1;
266
267 while (this.isSourceActive() && this.peerClusterId == null) {
268 this.peerClusterId = replicationEndpoint.getPeerUUID();
269 if (this.isSourceActive() && this.peerClusterId == null) {
270 if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
271 sleepMultiplier++;
272 }
273 }
274 }
275
276
277
278 if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
279 this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
280 + peerClusterId + " which is not allowed by ReplicationEndpoint:"
281 + replicationEndpoint.getClass().getName(), null, false);
282 }
283 LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
284
285 for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
286 String walGroupId = entry.getKey();
287 PriorityBlockingQueue<Path> queue = entry.getValue();
288 final ReplicationSourceWorkerThread worker =
289 new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
290 ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
291 if (extant != null) {
292 LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
293 } else {
294 LOG.debug("Starting up worker for wal group " + walGroupId);
295 worker.startup();
296 }
297 }
298 }
299
300
301
302
303
304
305
306 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
307 try {
308 if (LOG.isTraceEnabled()) {
309 LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
310 }
311 Thread.sleep(this.sleepForRetries * sleepMultiplier);
312 } catch (InterruptedException e) {
313 LOG.debug("Interrupted while sleeping between retries");
314 Thread.currentThread().interrupt();
315 }
316 return sleepMultiplier < maxRetriesMultiplier;
317 }
318
319
320
321
322
323
324 protected boolean isPeerEnabled() {
325 return this.replicationPeers.getStatusOfPeer(this.peerId);
326 }
327
328 @Override
329 public void startup() {
330 String n = Thread.currentThread().getName();
331 Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
332 @Override
333 public void uncaughtException(final Thread t, final Throwable e) {
334 LOG.error("Unexpected exception in ReplicationSource", e);
335 }
336 };
337 Threads
338 .setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
339 }
340
341 @Override
342 public void terminate(String reason) {
343 terminate(reason, null);
344 }
345
346 @Override
347 public void terminate(String reason, Exception cause) {
348 terminate(reason, cause, true);
349 }
350
351 public void terminate(String reason, Exception cause, boolean join) {
352 if (cause == null) {
353 LOG.info("Closing source "
354 + this.peerClusterZnode + " because: " + reason);
355
356 } else {
357 LOG.error("Closing source " + this.peerClusterZnode
358 + " because an error occurred: " + reason, cause);
359 }
360 this.sourceRunning = false;
361 Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
362 for (ReplicationSourceWorkerThread worker : workers) {
363 worker.setWorkerRunning(false);
364 worker.interrupt();
365 }
366 ListenableFuture<Service.State> future = null;
367 if (this.replicationEndpoint != null) {
368 future = this.replicationEndpoint.stop();
369 }
370 if (join) {
371 for (ReplicationSourceWorkerThread worker : workers) {
372 Threads.shutdown(worker, this.sleepForRetries);
373 LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
374 }
375 if (future != null) {
376 try {
377 future.get();
378 } catch (Exception e) {
379 LOG.warn("Got exception:" + e);
380 }
381 }
382 }
383 }
384
385 @Override
386 public String getPeerClusterZnode() {
387 return this.peerClusterZnode;
388 }
389
390 @Override
391 public String getPeerClusterId() {
392 return this.peerId;
393 }
394
395 @Override
396 public Path getCurrentPath() {
397
398 for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
399 if (worker.getCurrentPath() != null) return worker.getCurrentPath();
400 }
401 return null;
402 }
403
404 private boolean isSourceActive() {
405 return !this.stopper.isStopped() && this.sourceRunning;
406 }
407
408
409
410
411 public static class LogsComparator implements Comparator<Path> {
412
413 @Override
414 public int compare(Path o1, Path o2) {
415 return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
416 }
417
418
419
420
421
422
423
424 private static long getTS(Path p) {
425 int tsIndex = p.getName().lastIndexOf('.') + 1;
426 return Long.parseLong(p.getName().substring(tsIndex));
427 }
428 }
429
430 @Override
431 public String getStats() {
432 StringBuilder sb = new StringBuilder();
433 sb.append("Total replicated edits: ").append(totalReplicatedEdits)
434 .append(", current progress: \n");
435 for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
436 String walGroupId = entry.getKey();
437 ReplicationSourceWorkerThread worker = entry.getValue();
438 long position = worker.getCurrentPosition();
439 Path currentPath = worker.getCurrentPath();
440 sb.append("walGroup [").append(walGroupId).append("]: ");
441 if (currentPath != null) {
442 sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
443 .append(position).append("\n");
444 } else {
445 sb.append("no replication ongoing, waiting for new log");
446 }
447 }
448 return sb.toString();
449 }
450
451
452
453
454
455 public MetricsSource getSourceMetrics() {
456 return this.metrics;
457 }
458
459 public class ReplicationSourceWorkerThread extends Thread {
460 private ReplicationSource source;
461 private String walGroupId;
462 private PriorityBlockingQueue<Path> queue;
463 private ReplicationQueueInfo replicationQueueInfo;
464
465 private WAL.Reader reader;
466
467 private long lastLoggedPosition = -1;
468
469 private volatile Path currentPath;
470
471 private ReplicationWALReaderManager repLogReader;
472
473 private int currentNbOperations = 0;
474
475 private int currentSize = 0;
476
477 private boolean workerRunning = true;
478
479 public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
480 ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
481 this.walGroupId = walGroupId;
482 this.queue = queue;
483 this.replicationQueueInfo = replicationQueueInfo;
484 this.repLogReader = new ReplicationWALReaderManager(fs, conf);
485 this.source = source;
486 }
487
488 @Override
489 public void run() {
490
491
492 if (this.replicationQueueInfo.isQueueRecovered()) {
493 try {
494 this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
495 this.queue.peek().getName()));
496 if (LOG.isTraceEnabled()) {
497 LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
498 + this.repLogReader.getPosition());
499 }
500 } catch (ReplicationException e) {
501 terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
502 }
503 }
504
505 while (isWorkerActive()) {
506 int sleepMultiplier = 1;
507
508 if (!isPeerEnabled()) {
509 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
510 sleepMultiplier++;
511 }
512 continue;
513 }
514 Path oldPath = getCurrentPath();
515
516
517
518 boolean hasCurrentPath = getNextPath();
519 if (getCurrentPath() != null && oldPath == null) {
520 sleepMultiplier = 1;
521 }
522 if (!hasCurrentPath) {
523 if (sleepForRetries("No log to process", sleepMultiplier)) {
524 sleepMultiplier++;
525 }
526 continue;
527 }
528 boolean currentWALisBeingWrittenTo = false;
529
530
531
532
533
534
535
536
537 if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
538 currentWALisBeingWrittenTo = true;
539 }
540
541 if (!openReader(sleepMultiplier)) {
542
543 sleepMultiplier = 1;
544 continue;
545 }
546
547
548 if (this.reader == null) {
549 if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
550 sleepMultiplier++;
551 }
552 continue;
553 }
554
555 boolean gotIOE = false;
556 currentNbOperations = 0;
557 List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
558 currentSize = 0;
559 try {
560 if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
561 continue;
562 }
563 } catch (IOException ioe) {
564 LOG.warn(peerClusterZnode + " Got: ", ioe);
565 gotIOE = true;
566 if (ioe.getCause() instanceof EOFException) {
567
568 boolean considerDumping = false;
569 if (this.replicationQueueInfo.isQueueRecovered()) {
570 try {
571 FileStatus stat = fs.getFileStatus(this.currentPath);
572 if (stat.getLen() == 0) {
573 LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
574 }
575 considerDumping = true;
576 } catch (IOException e) {
577 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
578 }
579 }
580
581 if (considerDumping &&
582 sleepMultiplier == maxRetriesMultiplier &&
583 processEndOfFile()) {
584 continue;
585 }
586 }
587 } finally {
588 try {
589 this.reader = null;
590 this.repLogReader.closeReader();
591 } catch (IOException e) {
592 gotIOE = true;
593 LOG.warn("Unable to finalize the tailing of a file", e);
594 }
595 }
596
597
598
599
600 if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
601 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
602 manager.logPositionAndCleanOldLogs(this.currentPath,
603 peerClusterZnode, this.repLogReader.getPosition(),
604 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
605 this.lastLoggedPosition = this.repLogReader.getPosition();
606 }
607
608 if (!gotIOE) {
609 sleepMultiplier = 1;
610
611
612 metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
613 }
614 if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
615 sleepMultiplier++;
616 }
617 continue;
618 }
619 sleepMultiplier = 1;
620 shipEdits(currentWALisBeingWrittenTo, entries);
621 }
622 if (replicationQueueInfo.isQueueRecovered()) {
623
624 synchronized (workerThreads) {
625 Threads.sleep(100);
626 boolean allOtherTaskDone = true;
627 for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
628 if (!worker.equals(this) && worker.isAlive()) {
629 allOtherTaskDone = false;
630 break;
631 }
632 }
633 if (allOtherTaskDone) {
634 manager.closeRecoveredQueue(this.source);
635 LOG.info("Finished recovering queue " + peerClusterZnode
636 + " with the following stats: " + getStats());
637 }
638 }
639 }
640 }
641
642
643
644
645
646
647
648
649
650 protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
651 List<WAL.Entry> entries) throws IOException {
652 long seenEntries = 0;
653 if (LOG.isTraceEnabled()) {
654 LOG.trace("Seeking in " + this.currentPath + " at position "
655 + this.repLogReader.getPosition());
656 }
657 this.repLogReader.seek();
658 long positionBeforeRead = this.repLogReader.getPosition();
659 WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
660 while (entry != null) {
661 metrics.incrLogEditsRead();
662 seenEntries++;
663
664
665 if (replicationEndpoint.canReplicateToSameCluster()
666 || !entry.getKey().getClusterIds().contains(peerClusterId)) {
667
668 entry = walEntryFilter.filter(entry);
669 WALEdit edit = null;
670 WALKey logKey = null;
671 if (entry != null) {
672 edit = entry.getEdit();
673 logKey = entry.getKey();
674 }
675
676 if (edit != null && edit.size() != 0) {
677
678 logKey.addClusterId(clusterId);
679 currentNbOperations += countDistinctRowKeys(edit);
680 entries.add(entry);
681 currentSize += entry.getEdit().heapSize();
682 } else {
683 metrics.incrLogEditsFiltered();
684 }
685 }
686
687
688 if (currentSize >= replicationQueueSizeCapacity
689 || entries.size() >= replicationQueueNbCapacity) {
690 break;
691 }
692 try {
693 entry = this.repLogReader.readNextAndSetPosition();
694 } catch (IOException ie) {
695 LOG.debug("Break on IOE: " + ie.getMessage());
696 break;
697 }
698 }
699 metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
700 if (currentWALisBeingWrittenTo) {
701 return false;
702 }
703
704
705 return seenEntries == 0 && processEndOfFile();
706 }
707
708
709
710
711
712 protected boolean getNextPath() {
713 try {
714 if (this.currentPath == null) {
715 this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
716 int queueSize = logQueueSize.decrementAndGet();
717 metrics.setSizeOfLogQueue(queueSize);
718 if (this.currentPath != null) {
719
720 manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
721 this.replicationQueueInfo.isQueueRecovered());
722 if (LOG.isTraceEnabled()) {
723 LOG.trace("New log: " + this.currentPath);
724 }
725 }
726 }
727 } catch (InterruptedException e) {
728 LOG.warn("Interrupted while reading edits", e);
729 }
730 return this.currentPath != null;
731 }
732
733
734
735
736
737
738
739 protected boolean openReader(int sleepMultiplier) {
740 try {
741 try {
742 if (LOG.isTraceEnabled()) {
743 LOG.trace("Opening log " + this.currentPath);
744 }
745 this.reader = repLogReader.openReader(this.currentPath);
746 } catch (FileNotFoundException fnfe) {
747 if (this.replicationQueueInfo.isQueueRecovered()) {
748
749
750
751 List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
752 LOG.info("NB dead servers : " + deadRegionServers.size());
753 final Path rootDir = FSUtils.getRootDir(conf);
754 for (String curDeadServerName : deadRegionServers) {
755 final Path deadRsDirectory = new Path(rootDir,
756 DefaultWALProvider.getWALDirectoryName(curDeadServerName));
757 Path[] locs = new Path[] {
758 new Path(deadRsDirectory, currentPath.getName()),
759 new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
760 currentPath.getName()),
761 };
762 for (Path possibleLogLocation : locs) {
763 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
764 if (manager.getFs().exists(possibleLogLocation)) {
765
766 LOG.info("Log " + this.currentPath + " still exists at " +
767 possibleLogLocation);
768
769
770 return true;
771 }
772 }
773 }
774
775
776 if (stopper instanceof ReplicationSyncUp.DummyServer) {
777
778
779 FileStatus[] rss = fs.listStatus(manager.getLogDir());
780 for (FileStatus rs : rss) {
781 Path p = rs.getPath();
782 FileStatus[] logs = fs.listStatus(p);
783 for (FileStatus log : logs) {
784 p = new Path(p, log.getPath().getName());
785 if (p.getName().equals(currentPath.getName())) {
786 currentPath = p;
787 LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
788
789 this.openReader(sleepMultiplier);
790 return true;
791 }
792 }
793 }
794 }
795
796
797
798
799
800
801
802
803 throw new IOException("File from recovered queue is " +
804 "nowhere to be found", fnfe);
805 } else {
806
807 Path archivedLogLocation =
808 new Path(manager.getOldLogDir(), currentPath.getName());
809 if (manager.getFs().exists(archivedLogLocation)) {
810 currentPath = archivedLogLocation;
811 LOG.info("Log " + this.currentPath + " was moved to " +
812 archivedLogLocation);
813
814 this.openReader(sleepMultiplier);
815
816 }
817
818 }
819 }
820 } catch (LeaseNotRecoveredException lnre) {
821
822 LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
823 recoverLease(conf, currentPath);
824 this.reader = null;
825 } catch (IOException ioe) {
826 if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
827 LOG.warn(peerClusterZnode + " Got: ", ioe);
828 this.reader = null;
829 if (ioe.getCause() instanceof NullPointerException) {
830
831
832
833 LOG.warn("Got NPE opening reader, will retry.");
834 } else if (sleepMultiplier >= maxRetriesMultiplier) {
835
836
837 LOG.warn("Waited too long for this file, considering dumping");
838 return !processEndOfFile();
839 }
840 }
841 return true;
842 }
843
844 private void recoverLease(final Configuration conf, final Path path) {
845 try {
846 final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
847 FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
848 fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
849 @Override
850 public boolean progress() {
851 LOG.debug("recover WAL lease: " + path);
852 return isWorkerActive();
853 }
854 });
855 } catch (IOException e) {
856 LOG.warn("unable to recover lease for WAL: " + path, e);
857 }
858 }
859
860
861
862
863
864
865
866 private boolean isCurrentLogEmpty() {
867 return (this.repLogReader.getPosition() == 0 &&
868 !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
869 }
870
871
872
873
874
875
876
877 private int countDistinctRowKeys(WALEdit edit) {
878 List<Cell> cells = edit.getCells();
879 int distinctRowKeys = 1;
880 Cell lastCell = cells.get(0);
881 for (int i = 0; i < edit.size(); i++) {
882 if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
883 distinctRowKeys++;
884 }
885 }
886 return distinctRowKeys;
887 }
888
889
890
891
892
893
894 protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
895 int sleepMultiplier = 0;
896 if (entries.isEmpty()) {
897 LOG.warn("Was given 0 edits to ship");
898 return;
899 }
900 while (isWorkerActive()) {
901 try {
902 if (throttler.isEnabled()) {
903 long sleepTicks = throttler.getNextSleepInterval(currentSize);
904 if (sleepTicks > 0) {
905 try {
906 if (LOG.isTraceEnabled()) {
907 LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
908 }
909 Thread.sleep(sleepTicks);
910 } catch (InterruptedException e) {
911 LOG.debug("Interrupted while sleeping for throttling control");
912 Thread.currentThread().interrupt();
913
914
915 continue;
916 }
917
918 throttler.resetStartTick();
919 }
920 }
921
922
923 ReplicationEndpoint.ReplicateContext replicateContext =
924 new ReplicationEndpoint.ReplicateContext();
925 replicateContext.setEntries(entries).setSize(currentSize);
926 replicateContext.setWalGroupId(walGroupId);
927
928 long startTimeNs = System.nanoTime();
929
930 boolean replicated = replicationEndpoint.replicate(replicateContext);
931 long endTimeNs = System.nanoTime();
932
933 if (!replicated) {
934 continue;
935 } else {
936 sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
937 }
938
939 if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
940 manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
941 this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
942 currentWALisBeingWrittenTo);
943 this.lastLoggedPosition = this.repLogReader.getPosition();
944 }
945 if (throttler.isEnabled()) {
946 throttler.addPushSize(currentSize);
947 }
948 totalReplicatedEdits.addAndGet(entries.size());
949 totalReplicatedOperations.addAndGet(currentNbOperations);
950
951 metrics.shipBatch(currentNbOperations, currentSize / 1024);
952 metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
953 walGroupId);
954 if (LOG.isTraceEnabled()) {
955 LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
956 + totalReplicatedOperations + " operations in "
957 + ((endTimeNs - startTimeNs) / 1000000) + " ms");
958 }
959 break;
960 } catch (Exception ex) {
961 LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
962 + org.apache.hadoop.util.StringUtils.stringifyException(ex));
963 if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
964 sleepMultiplier++;
965 }
966 }
967 }
968 }
969
970
971
972
973
974
975
976 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
977 justification = "Yeah, this is how it works")
978 protected boolean processEndOfFile() {
979 if (this.queue.size() != 0) {
980 if (LOG.isTraceEnabled()) {
981 String filesize = "N/A";
982 try {
983 FileStatus stat = fs.getFileStatus(this.currentPath);
984 filesize = stat.getLen() + "";
985 } catch (IOException ex) {
986 }
987 LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
988 + ", and the length of the file is " + filesize);
989 }
990 this.currentPath = null;
991 this.repLogReader.finishCurrentFile();
992 this.reader = null;
993 return true;
994 } else if (this.replicationQueueInfo.isQueueRecovered()) {
995 LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
996 + peerClusterZnode);
997 workerRunning = false;
998 return true;
999 }
1000 return false;
1001 }
1002
1003 public void startup() {
1004 String n = Thread.currentThread().getName();
1005 Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
1006 @Override
1007 public void uncaughtException(final Thread t, final Throwable e) {
1008 LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
1009 + getCurrentPath(), e);
1010 }
1011 };
1012 Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
1013 + peerClusterZnode, handler);
1014 workerThreads.put(walGroupId, this);
1015 }
1016
1017 public Path getCurrentPath() {
1018 return this.currentPath;
1019 }
1020
1021 public long getCurrentPosition() {
1022 return this.repLogReader.getPosition();
1023 }
1024
1025 private boolean isWorkerActive() {
1026 return !stopper.isStopped() && workerRunning && !isInterrupted();
1027 }
1028
1029 private void terminate(String reason, Exception cause) {
1030 if (cause == null) {
1031 LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
1032
1033 } else {
1034 LOG.error("Closing worker for wal group " + this.walGroupId
1035 + " because an error occurred: " + reason, cause);
1036 }
1037 this.interrupt();
1038 Threads.shutdown(this, sleepForRetries);
1039 LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
1040 }
1041
1042 public void setWorkerRunning(boolean workerRunning) {
1043 this.workerRunning = workerRunning;
1044 }
1045 }
1046 }