1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
21
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.io.OutputStream;
26 import java.lang.management.ManagementFactory;
27 import java.lang.management.MemoryUsage;
28 import java.lang.reflect.InvocationTargetException;
29 import java.net.URLEncoder;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Comparator;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.NavigableMap;
36 import java.util.Set;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ConcurrentSkipListMap;
40 import java.util.concurrent.CopyOnWriteArrayList;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.LinkedBlockingQueue;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.concurrent.atomic.AtomicLong;
50 import java.util.concurrent.locks.ReentrantLock;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.fs.FSDataOutputStream;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.Path;
59 import org.apache.hadoop.fs.PathFilter;
60 import org.apache.hadoop.hbase.Cell;
61 import org.apache.hadoop.hbase.CellUtil;
62 import org.apache.hadoop.hbase.HBaseConfiguration;
63 import org.apache.hadoop.hbase.HConstants;
64 import org.apache.hadoop.hbase.HRegionInfo;
65 import org.apache.hadoop.hbase.HTableDescriptor;
66 import org.apache.hadoop.hbase.classification.InterfaceAudience;
67 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.util.ClassSize;
70 import org.apache.hadoop.hbase.util.DrainBarrier;
71 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72 import org.apache.hadoop.hbase.util.FSUtils;
73 import org.apache.hadoop.hbase.util.HasThread;
74 import org.apache.hadoop.hbase.util.Threads;
75 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
76 import org.apache.hadoop.hbase.wal.WAL;
77 import org.apache.hadoop.hbase.wal.WALFactory;
78 import org.apache.hadoop.hbase.wal.WALKey;
79 import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
80 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
81 import org.apache.hadoop.hbase.wal.WALSplitter;
82 import org.apache.hadoop.hdfs.DFSOutputStream;
83 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
84 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
85 import org.apache.hadoop.util.StringUtils;
86 import org.apache.htrace.NullScope;
87 import org.apache.htrace.Span;
88 import org.apache.htrace.Trace;
89 import org.apache.htrace.TraceScope;
90
91 import com.google.common.annotations.VisibleForTesting;
92 import com.lmax.disruptor.BlockingWaitStrategy;
93 import com.lmax.disruptor.EventHandler;
94 import com.lmax.disruptor.ExceptionHandler;
95 import com.lmax.disruptor.LifecycleAware;
96 import com.lmax.disruptor.TimeoutException;
97 import com.lmax.disruptor.dsl.Disruptor;
98 import com.lmax.disruptor.dsl.ProducerType;
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 @InterfaceAudience.Private
128 public class FSHLog implements WAL {
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 private static final Log LOG = LogFactory.getLog(FSHLog.class);
164
165 private static final int DEFAULT_SLOW_SYNC_TIME_MS = 100;
166
167
168
169
170
171
172
173
174
175 private final Disruptor<RingBufferTruck> disruptor;
176
177
178
179
180 private final ExecutorService appendExecutor;
181
182
183
184
185
186
187
188 private final RingBufferEventHandler ringBufferEventHandler;
189
190
191
192
193
194
195
196 private final Map<Thread, SyncFuture> syncFuturesByHandler;
197
198
199
200
201
202 private volatile long highestUnsyncedSequence = -1;
203
204
205
206
207
208
209 private final AtomicLong highestSyncedSequence = new AtomicLong(0);
210
211
212
213
214 protected final FileSystem fs;
215
216
217
218
219 private final Path fullPathLogDir;
220
221
222
223
224 private final Path fullPathArchiveDir;
225
226
227
228
229 private final PathFilter ourFiles;
230
231
232
233
234 private final String logFilePrefix;
235
236
237
238
239 private final String logFileSuffix;
240
241
242
243
244 private final String prefixPathStr;
245
246 private final WALCoprocessorHost coprocessorHost;
247
248
249
250
251 protected final Configuration conf;
252
253
254 private final List<WALActionsListener> listeners =
255 new CopyOnWriteArrayList<WALActionsListener>();
256
257 @Override
258 public void registerWALActionsListener(final WALActionsListener listener) {
259 this.listeners.add(listener);
260 }
261
262 @Override
263 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
264 return this.listeners.remove(listener);
265 }
266
267 @Override
268 public WALCoprocessorHost getCoprocessorHost() {
269 return coprocessorHost;
270 }
271
272
273
274
275 private FSDataOutputStream hdfs_out;
276
277
278
279
280 private final int minTolerableReplication;
281
282 private final int slowSyncNs;
283
284
285
286
287
288 private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
289
290 private final int lowReplicationRollLimit;
291
292
293
294
295 private volatile boolean lowReplicationRollEnabled = true;
296
297
298
299
300
301
302 private SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
303
304
305
306
307 volatile Writer writer;
308
309
310 private final DrainBarrier closeBarrier = new DrainBarrier();
311
312
313
314
315
316
317
318
319 private final ReentrantLock rollWriterLock = new ReentrantLock(true);
320
321 private volatile boolean closed = false;
322 private final AtomicBoolean shutdown = new AtomicBoolean(false);
323
324
325 private final AtomicLong filenum = new AtomicLong(-1);
326
327
328 private final AtomicInteger numEntries = new AtomicInteger(0);
329
330
331 private final long logrollsize;
332
333
334
335
336 private AtomicLong totalLogSize = new AtomicLong(0);
337
338
339
340
341
342
343 private final int maxLogs;
344
345
346 private final int closeErrorsTolerated;
347
348 private final AtomicInteger closeErrorCount = new AtomicInteger();
349
350
351
352
353
354
355 final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
356 @Override
357 public int compare(Path o1, Path o2) {
358 long t1 = getFileNumFromFileName(o1);
359 long t2 = getFileNumFromFileName(o2);
360 if (t1 == t2) return 0;
361 return (t1 > t2) ? 1 : -1;
362 }
363 };
364
365
366
367
368
369 private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
370 new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
371
372
373
374
375
376 static class RingBufferExceptionHandler implements ExceptionHandler {
377 @Override
378 public void handleEventException(Throwable ex, long sequence, Object event) {
379 LOG.error("Sequence=" + sequence + ", event=" + event, ex);
380 throw new RuntimeException(ex);
381 }
382
383 @Override
384 public void handleOnStartException(Throwable ex) {
385 LOG.error(ex);
386 throw new RuntimeException(ex);
387 }
388
389 @Override
390 public void handleOnShutdownException(Throwable ex) {
391 LOG.error(ex);
392 throw new RuntimeException(ex);
393 }
394 }
395
396
397
398
399
400
401
402
403
404
405 public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf)
406 throws IOException {
407 this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
408 }
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434 public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
435 final String archiveDir, final Configuration conf,
436 final List<WALActionsListener> listeners,
437 final boolean failIfWALExists, final String prefix, final String suffix)
438 throws IOException {
439 this.fs = fs;
440 this.fullPathLogDir = new Path(rootDir, logDir);
441 this.fullPathArchiveDir = new Path(rootDir, archiveDir);
442 this.conf = conf;
443
444 if (!fs.exists(fullPathLogDir) && !fs.mkdirs(fullPathLogDir)) {
445 throw new IOException("Unable to mkdir " + fullPathLogDir);
446 }
447
448 if (!fs.exists(this.fullPathArchiveDir)) {
449 if (!fs.mkdirs(this.fullPathArchiveDir)) {
450 throw new IOException("Unable to mkdir " + this.fullPathArchiveDir);
451 }
452 }
453
454
455 this.logFilePrefix =
456 prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8");
457
458 if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) {
459 throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER +
460 "' but instead was '" + suffix + "'");
461 }
462
463
464 FSUtils.setStoragePolicy(fs, conf, this.fullPathLogDir, HConstants.WAL_STORAGE_POLICY,
465 HConstants.DEFAULT_WAL_STORAGE_POLICY);
466 this.logFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
467 this.prefixPathStr = new Path(fullPathLogDir,
468 logFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
469
470 this.ourFiles = new PathFilter() {
471 @Override
472 public boolean accept(final Path fileName) {
473
474 final String fileNameString = fileName.toString();
475 if (!fileNameString.startsWith(prefixPathStr)) {
476 return false;
477 }
478 if (logFileSuffix.isEmpty()) {
479
480 return org.apache.commons.lang.StringUtils.isNumeric(
481 fileNameString.substring(prefixPathStr.length()));
482 } else if (!fileNameString.endsWith(logFileSuffix)) {
483 return false;
484 }
485 return true;
486 }
487 };
488
489 if (failIfWALExists) {
490 final FileStatus[] walFiles = FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
491 if (null != walFiles && 0 != walFiles.length) {
492 throw new IOException("Target WAL already exists within directory " + fullPathLogDir);
493 }
494 }
495
496
497 if (listeners != null) {
498 for (WALActionsListener i: listeners) {
499 registerWALActionsListener(i);
500 }
501 }
502 this.coprocessorHost = new WALCoprocessorHost(this, conf);
503
504
505
506 final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
507 FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
508 this.logrollsize =
509 (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
510
511 float memstoreRatio = conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY,
512 conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY,
513 HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE));
514 boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null;
515 if(maxLogsDefined){
516 LOG.warn("'hbase.regionserver.maxlogs' was deprecated.");
517 }
518 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
519 Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize)));
520 this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication",
521 FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
522 this.lowReplicationRollLimit =
523 conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
524 this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0);
525 int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200);
526
527 LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) +
528 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
529 ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" +
530 this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir);
531
532
533 rollWriter();
534
535 this.slowSyncNs =
536 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms",
537 DEFAULT_SLOW_SYNC_TIME_MS);
538
539
540
541 String hostingThreadName = Thread.currentThread().getName();
542 this.appendExecutor = Executors.
543 newSingleThreadExecutor(Threads.getNamedThreadFactory(hostingThreadName + ".append"));
544
545
546
547
548 final int preallocatedEventCount =
549 this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
550
551
552 this.disruptor =
553 new Disruptor<RingBufferTruck>(RingBufferTruck.EVENT_FACTORY, preallocatedEventCount,
554 this.appendExecutor, ProducerType.MULTI, new BlockingWaitStrategy());
555
556
557 this.disruptor.getRingBuffer().next();
558 this.ringBufferEventHandler =
559 new RingBufferEventHandler(conf.getInt("hbase.regionserver.hlog.syncer.count", 5),
560 maxHandlersCount);
561 this.disruptor.handleExceptionsWith(new RingBufferExceptionHandler());
562 this.disruptor.handleEventsWith(new RingBufferEventHandler [] {this.ringBufferEventHandler});
563
564 this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount);
565
566 this.disruptor.start();
567 }
568
569 private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) {
570 MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
571 int maxLogs = Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize);
572 return maxLogs;
573 }
574
575
576
577
578
579 protected FileStatus[] getFiles() throws IOException {
580 return FSUtils.listStatus(fs, fullPathLogDir, ourFiles);
581 }
582
583
584
585
586
587
588
589
590
591 @VisibleForTesting
592 OutputStream getOutputStream() {
593 FSDataOutputStream fsdos = this.hdfs_out;
594 if (fsdos == null) return null;
595 return fsdos.getWrappedStream();
596 }
597
598 @Override
599 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
600 return rollWriter(false);
601 }
602
603
604
605
606
607 private Path getNewPath() throws IOException {
608 this.filenum.set(System.currentTimeMillis());
609 Path newPath = getCurrentFileName();
610 while (fs.exists(newPath)) {
611 this.filenum.incrementAndGet();
612 newPath = getCurrentFileName();
613 }
614 return newPath;
615 }
616
617 Path getOldPath() {
618 long currentFilenum = this.filenum.get();
619 Path oldPath = null;
620 if (currentFilenum > 0) {
621
622 oldPath = computeFilename(currentFilenum);
623 }
624 return oldPath;
625 }
626
627
628
629
630
631 private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath)
632 throws IOException {
633 if (!this.listeners.isEmpty()) {
634 for (WALActionsListener i : this.listeners) {
635 i.preLogRoll(oldPath, newPath);
636 }
637 }
638 }
639
640
641
642
643
644 private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath)
645 throws IOException {
646 if (!this.listeners.isEmpty()) {
647 for (WALActionsListener i : this.listeners) {
648 i.postLogRoll(oldPath, newPath);
649 }
650 }
651 }
652
653
654
655
656
657
658 private void preemptiveSync(final ProtobufLogWriter nextWriter) {
659 long startTimeNanos = System.nanoTime();
660 try {
661 nextWriter.sync();
662 postSync(System.nanoTime() - startTimeNanos, 0);
663 } catch (IOException e) {
664
665 LOG.warn("pre-sync failed but an optimization so keep going", e);
666 }
667 }
668
669 @Override
670 public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
671 rollWriterLock.lock();
672 try {
673
674 if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null;
675 byte [][] regionsToFlush = null;
676 if (this.closed) {
677 LOG.debug("WAL closed. Skipping rolling of writer");
678 return regionsToFlush;
679 }
680 if (!closeBarrier.beginOp()) {
681 LOG.debug("WAL closing. Skipping rolling of writer");
682 return regionsToFlush;
683 }
684 TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
685 try {
686 Path oldPath = getOldPath();
687 Path newPath = getNewPath();
688
689 Writer nextWriter = this.createWriterInstance(newPath);
690 FSDataOutputStream nextHdfsOut = null;
691 if (nextWriter instanceof ProtobufLogWriter) {
692 nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
693
694
695 preemptiveSync((ProtobufLogWriter)nextWriter);
696 }
697 tellListenersAboutPreLogRoll(oldPath, newPath);
698
699 newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut);
700 tellListenersAboutPostLogRoll(oldPath, newPath);
701
702 if (getNumRolledLogFiles() > 0) {
703 cleanOldLogs();
704 regionsToFlush = findRegionsToForceFlush();
705 }
706 } finally {
707 closeBarrier.endOp();
708 assert scope == NullScope.INSTANCE || !scope.isDetached();
709 scope.close();
710 }
711 return regionsToFlush;
712 } finally {
713 rollWriterLock.unlock();
714 }
715 }
716
717
718
719
720
721
722
723 protected Writer createWriterInstance(final Path path) throws IOException {
724 return DefaultWALProvider.createWriter(conf, fs, path, false);
725 }
726
727
728
729
730
731 private void cleanOldLogs() throws IOException {
732 List<Path> logsToArchive = null;
733
734
735 for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) {
736 Path log = e.getKey();
737 Map<byte[], Long> sequenceNums = e.getValue();
738 if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
739 if (logsToArchive == null) logsToArchive = new ArrayList<Path>();
740 logsToArchive.add(log);
741 if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log);
742 }
743 }
744 if (logsToArchive != null) {
745 for (Path p : logsToArchive) {
746 this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
747 archiveLogFile(p);
748 this.byWalRegionSequenceIds.remove(p);
749 }
750 }
751 }
752
753
754
755
756
757
758
759
760 byte[][] findRegionsToForceFlush() throws IOException {
761 byte [][] regions = null;
762 int logCount = getNumRolledLogFiles();
763 if (logCount > this.maxLogs && logCount > 0) {
764 Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
765 this.byWalRegionSequenceIds.firstEntry();
766 regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue());
767 }
768 if (regions != null) {
769 StringBuilder sb = new StringBuilder();
770 for (int i = 0; i < regions.length; i++) {
771 if (i > 0) sb.append(", ");
772 sb.append(Bytes.toStringBinary(regions[i]));
773 }
774 LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
775 "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
776 }
777 return regions;
778 }
779
780
781
782
783
784 @VisibleForTesting
785 protected void afterCreatingZigZagLatch() {}
786
787
788
789
790 @VisibleForTesting
791 protected void beforeWaitOnSafePoint() {};
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811 Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter,
812 final FSDataOutputStream nextHdfsOut)
813 throws IOException {
814
815
816
817
818
819 SyncFuture syncFuture = null;
820 SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)?
821 null: this.ringBufferEventHandler.attainSafePoint();
822 afterCreatingZigZagLatch();
823 TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
824 try {
825
826
827
828
829
830 try {
831 if (zigzagLatch != null) {
832 Trace.addTimelineAnnotation("awaiting safepoint");
833 syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
834 }
835 } catch (FailedSyncBeforeLogCloseException e) {
836
837 if (isUnflushedEntries()) throw e;
838 LOG.warn("Failed sync-before-close but no outstanding appends; closing WAL: " +
839 e.getMessage());
840 }
841
842
843
844 try {
845 if (this.writer != null) {
846 Trace.addTimelineAnnotation("closing writer");
847 this.writer.close();
848 Trace.addTimelineAnnotation("writer closed");
849 }
850 this.closeErrorCount.set(0);
851 } catch (IOException ioe) {
852 int errors = closeErrorCount.incrementAndGet();
853 if (!isUnflushedEntries() && (errors <= this.closeErrorsTolerated)) {
854 LOG.warn("Riding over failed WAL close of " + oldPath + ", cause=\"" +
855 ioe.getMessage() + "\", errors=" + errors +
856 "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK");
857 } else {
858 throw ioe;
859 }
860 }
861 this.writer = nextWriter;
862 this.hdfs_out = nextHdfsOut;
863 int oldNumEntries = this.numEntries.get();
864 this.numEntries.set(0);
865 final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
866 if (oldPath != null) {
867 this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest());
868 long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
869 this.totalLogSize.addAndGet(oldFileLen);
870 LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
871 ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
872 newPathString);
873 } else {
874 LOG.info("New WAL " + newPathString);
875 }
876 } catch (InterruptedException ie) {
877
878 Thread.currentThread().interrupt();
879 } catch (IOException e) {
880 long count = getUnflushedEntriesCount();
881 LOG.error("Failed close of WAL writer " + oldPath + ", unflushedEntries=" + count, e);
882 throw new FailedLogCloseException(oldPath + ", unflushedEntries=" + count, e);
883 } finally {
884 try {
885
886 if (zigzagLatch != null) {
887 zigzagLatch.releaseSafePoint();
888
889
890
891
892
893
894 if (syncFuture != null) {
895 try {
896 blockOnSync(syncFuture);
897 } catch (IOException ioe) {
898 if (LOG.isTraceEnabled()) LOG.trace("Stale sync exception", ioe);
899 }
900 }
901 }
902 } finally {
903 scope.close();
904 }
905 }
906 return newPath;
907 }
908
909 long getUnflushedEntriesCount() {
910 long highestSynced = this.highestSyncedSequence.get();
911 return highestSynced > this.highestUnsyncedSequence?
912 0: this.highestUnsyncedSequence - highestSynced;
913 }
914
915 boolean isUnflushedEntries() {
916 return getUnflushedEntriesCount() > 0;
917 }
918
919
920
921
922
923 public static Path getWALArchivePath(Path archiveDir, Path p) {
924 return new Path(archiveDir, p.getName());
925 }
926
927 private void archiveLogFile(final Path p) throws IOException {
928 Path newPath = getWALArchivePath(this.fullPathArchiveDir, p);
929
930 if (!this.listeners.isEmpty()) {
931 for (WALActionsListener i : this.listeners) {
932 i.preLogArchive(p, newPath);
933 }
934 }
935 LOG.info("Archiving " + p + " to " + newPath);
936 if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
937 throw new IOException("Unable to rename " + p + " to " + newPath);
938 }
939
940 if (!this.listeners.isEmpty()) {
941 for (WALActionsListener i : this.listeners) {
942 i.postLogArchive(p, newPath);
943 }
944 }
945 }
946
947
948
949
950
951
952
953 protected Path computeFilename(final long filenum) {
954 if (filenum < 0) {
955 throw new RuntimeException("WAL file number can't be < 0");
956 }
957 String child = logFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + logFileSuffix;
958 return new Path(fullPathLogDir, child);
959 }
960
961
962
963
964
965
966 public Path getCurrentFileName() {
967 return computeFilename(this.filenum.get());
968 }
969
970 @Override
971 public String toString() {
972 return "FSHLog " + logFilePrefix + ":" + logFileSuffix + "(num " + filenum + ")";
973 }
974
975
976
977
978
979
980
981
982
983 protected long getFileNumFromFileName(Path fileName) {
984 if (fileName == null) throw new IllegalArgumentException("file name can't be null");
985 if (!ourFiles.accept(fileName)) {
986 throw new IllegalArgumentException("The log file " + fileName +
987 " doesn't belong to this WAL. (" + toString() + ")");
988 }
989 final String fileNameString = fileName.toString();
990 String chompedPath = fileNameString.substring(prefixPathStr.length(),
991 (fileNameString.length() - logFileSuffix.length()));
992 return Long.parseLong(chompedPath);
993 }
994
995 @Override
996 public void close() throws IOException {
997 shutdown();
998 final FileStatus[] files = getFiles();
999 if (null != files && 0 != files.length) {
1000 for (FileStatus file : files) {
1001 Path p = getWALArchivePath(this.fullPathArchiveDir, file.getPath());
1002
1003 if (!this.listeners.isEmpty()) {
1004 for (WALActionsListener i : this.listeners) {
1005 i.preLogArchive(file.getPath(), p);
1006 }
1007 }
1008
1009 if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
1010 throw new IOException("Unable to rename " + file.getPath() + " to " + p);
1011 }
1012
1013 if (!this.listeners.isEmpty()) {
1014 for (WALActionsListener i : this.listeners) {
1015 i.postLogArchive(file.getPath(), p);
1016 }
1017 }
1018 }
1019 LOG.debug("Moved " + files.length + " WAL file(s) to " +
1020 FSUtils.getPath(this.fullPathArchiveDir));
1021 }
1022 LOG.info("Closed WAL: " + toString());
1023 }
1024
1025 @Override
1026 public void shutdown() throws IOException {
1027 if (shutdown.compareAndSet(false, true)) {
1028 try {
1029
1030 closeBarrier.stopAndDrainOps();
1031 } catch (InterruptedException e) {
1032 LOG.error("Exception while waiting for cache flushes and log rolls", e);
1033 Thread.currentThread().interrupt();
1034 }
1035
1036
1037
1038
1039 if (this.disruptor != null) {
1040 long timeoutms = conf.getLong("hbase.wal.disruptor.shutdown.timeout.ms", 60000);
1041 try {
1042 this.disruptor.shutdown(timeoutms, TimeUnit.MILLISECONDS);
1043 } catch (TimeoutException e) {
1044 LOG.warn("Timed out bringing down disruptor after " + timeoutms + "ms; forcing halt " +
1045 "(It is a problem if this is NOT an ABORT! -- DATALOSS!!!!)");
1046 this.disruptor.halt();
1047 this.disruptor.shutdown();
1048 }
1049 }
1050
1051 if (this.appendExecutor != null) this.appendExecutor.shutdown();
1052
1053
1054 if (!this.listeners.isEmpty()) {
1055 for (WALActionsListener i : this.listeners) {
1056 i.logCloseRequested();
1057 }
1058 }
1059 this.closed = true;
1060 if (LOG.isDebugEnabled()) {
1061 LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir));
1062 }
1063 if (this.writer != null) {
1064 this.writer.close();
1065 this.writer = null;
1066 }
1067 }
1068 }
1069
1070 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
1071 justification="Will never be null")
1072 @Override
1073 public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
1074 final WALEdit edits, final boolean inMemstore) throws IOException {
1075 if (this.closed) throw new IOException("Cannot append; log is closed");
1076
1077
1078 TraceScope scope = Trace.startSpan("FSHLog.append");
1079
1080
1081
1082
1083 FSWALEntry entry = null;
1084 long sequence = this.disruptor.getRingBuffer().next();
1085 try {
1086 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1087
1088 entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
1089 truck.loadPayload(entry, scope.detach());
1090 } finally {
1091 this.disruptor.getRingBuffer().publish(sequence);
1092 }
1093 return sequence;
1094 }
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111 private class SyncRunner extends HasThread {
1112 private volatile long sequence;
1113
1114 private final BlockingQueue<SyncFuture> syncFutures;
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126 SyncRunner(final String name, final int maxHandlersCount) {
1127 super(name);
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141 this.syncFutures = new LinkedBlockingQueue<SyncFuture>(maxHandlersCount * 3);
1142 }
1143
1144 void offer(final long sequence, final SyncFuture [] syncFutures, final int syncFutureCount) {
1145
1146 this.sequence = sequence;
1147 for (int i = 0; i < syncFutureCount; ++i) {
1148 this.syncFutures.add(syncFutures[i]);
1149 }
1150 }
1151
1152
1153
1154
1155
1156
1157
1158
1159 private int releaseSyncFuture(final SyncFuture syncFuture, final long currentSequence,
1160 final Throwable t) {
1161 if (!syncFuture.done(currentSequence, t)) throw new IllegalStateException();
1162
1163 return 1;
1164 }
1165
1166
1167
1168
1169
1170
1171
1172 private int releaseSyncFutures(final long currentSequence, final Throwable t) {
1173 int syncCount = 0;
1174 for (SyncFuture syncFuture; (syncFuture = this.syncFutures.peek()) != null;) {
1175 if (syncFuture.getRingBufferSequence() > currentSequence) break;
1176 releaseSyncFuture(syncFuture, currentSequence, t);
1177 if (!this.syncFutures.remove(syncFuture)) {
1178 throw new IllegalStateException(syncFuture.toString());
1179 }
1180 syncCount++;
1181 }
1182 return syncCount;
1183 }
1184
1185
1186
1187
1188
1189 private long updateHighestSyncedSequence(long sequence) {
1190 long currentHighestSyncedSequence;
1191
1192 do {
1193 currentHighestSyncedSequence = highestSyncedSequence.get();
1194 if (currentHighestSyncedSequence >= sequence) {
1195
1196
1197 sequence = currentHighestSyncedSequence;
1198 break;
1199 }
1200 } while (!highestSyncedSequence.compareAndSet(currentHighestSyncedSequence, sequence));
1201 return sequence;
1202 }
1203
1204 public void run() {
1205 long currentSequence;
1206 while (!isInterrupted()) {
1207 int syncCount = 0;
1208 SyncFuture takeSyncFuture;
1209 try {
1210 while (true) {
1211
1212 takeSyncFuture = this.syncFutures.take();
1213 currentSequence = this.sequence;
1214 long syncFutureSequence = takeSyncFuture.getRingBufferSequence();
1215 if (syncFutureSequence > currentSequence) {
1216 throw new IllegalStateException("currentSequence=" + syncFutureSequence +
1217 ", syncFutureSequence=" + syncFutureSequence);
1218 }
1219
1220 long currentHighestSyncedSequence = highestSyncedSequence.get();
1221 if (currentSequence < currentHighestSyncedSequence) {
1222 syncCount += releaseSyncFuture(takeSyncFuture, currentHighestSyncedSequence, null);
1223
1224 continue;
1225 }
1226 break;
1227 }
1228
1229
1230 TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
1231 long start = System.nanoTime();
1232 Throwable lastException = null;
1233 try {
1234 Trace.addTimelineAnnotation("syncing writer");
1235 writer.sync();
1236 Trace.addTimelineAnnotation("writer synced");
1237 currentSequence = updateHighestSyncedSequence(currentSequence);
1238 } catch (IOException e) {
1239 LOG.error("Error syncing, request close of WAL", e);
1240 lastException = e;
1241 } catch (Exception e) {
1242 LOG.warn("UNEXPECTED", e);
1243 lastException = e;
1244 } finally {
1245
1246 takeSyncFuture.setSpan(scope.detach());
1247
1248 syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, lastException);
1249
1250 syncCount += releaseSyncFutures(currentSequence, lastException);
1251 if (lastException != null) requestLogRoll();
1252 else checkLogRoll();
1253 }
1254 postSync(System.nanoTime() - start, syncCount);
1255 } catch (InterruptedException e) {
1256
1257 Thread.currentThread().interrupt();
1258 } catch (Throwable t) {
1259 LOG.warn("UNEXPECTED, continuing", t);
1260 }
1261 }
1262 }
1263 }
1264
1265
1266
1267
1268 void checkLogRoll() {
1269
1270 if (!rollWriterLock.tryLock()) return;
1271 boolean lowReplication;
1272 try {
1273 lowReplication = checkLowReplication();
1274 } finally {
1275 rollWriterLock.unlock();
1276 }
1277 try {
1278 if (lowReplication || writer != null && writer.getLength() > logrollsize) {
1279 requestLogRoll(lowReplication);
1280 }
1281 } catch (IOException e) {
1282 LOG.warn("Writer.getLength() failed; continuing", e);
1283 }
1284 }
1285
1286
1287
1288
1289 private boolean checkLowReplication() {
1290 boolean logRollNeeded = false;
1291
1292
1293 try {
1294 int numCurrentReplicas = getLogReplication();
1295 if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) {
1296 if (this.lowReplicationRollEnabled) {
1297 if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
1298 LOG.warn("HDFS pipeline error detected. " + "Found "
1299 + numCurrentReplicas + " replicas but expecting no less than "
1300 + this.minTolerableReplication + " replicas. "
1301 + " Requesting close of WAL. current pipeline: "
1302 + Arrays.toString(getPipeLine()));
1303 logRollNeeded = true;
1304
1305
1306
1307 this.consecutiveLogRolls.getAndIncrement();
1308 } else {
1309 LOG.warn("Too many consecutive RollWriter requests, it's a sign of "
1310 + "the total number of live datanodes is lower than the tolerable replicas.");
1311 this.consecutiveLogRolls.set(0);
1312 this.lowReplicationRollEnabled = false;
1313 }
1314 }
1315 } else if (numCurrentReplicas >= this.minTolerableReplication) {
1316 if (!this.lowReplicationRollEnabled) {
1317
1318
1319
1320 if (this.numEntries.get() <= 1) {
1321 return logRollNeeded;
1322 }
1323
1324
1325 this.lowReplicationRollEnabled = true;
1326 LOG.info("LowReplication-Roller was enabled.");
1327 }
1328 }
1329 } catch (Exception e) {
1330 LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e +
1331 ", continuing...");
1332 }
1333 return logRollNeeded;
1334 }
1335
1336 private SyncFuture publishSyncOnRingBuffer() {
1337 return publishSyncOnRingBuffer(null);
1338 }
1339
1340 private SyncFuture publishSyncOnRingBuffer(Span span) {
1341 long sequence = this.disruptor.getRingBuffer().next();
1342 SyncFuture syncFuture = getSyncFuture(sequence, span);
1343 try {
1344 RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
1345 truck.loadPayload(syncFuture);
1346 } finally {
1347 this.disruptor.getRingBuffer().publish(sequence);
1348 }
1349 return syncFuture;
1350 }
1351
1352
1353 private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
1354 return blockOnSync(publishSyncOnRingBuffer(span));
1355 }
1356
1357 private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
1358
1359 try {
1360 syncFuture.get();
1361 return syncFuture.getSpan();
1362 } catch (InterruptedException ie) {
1363 LOG.warn("Interrupted", ie);
1364 throw convertInterruptedExceptionToIOException(ie);
1365 } catch (ExecutionException e) {
1366 throw ensureIOException(e.getCause());
1367 }
1368 }
1369
1370 private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
1371 Thread.currentThread().interrupt();
1372 IOException ioe = new InterruptedIOException();
1373 ioe.initCause(ie);
1374 return ioe;
1375 }
1376
1377 private SyncFuture getSyncFuture(final long sequence, Span span) {
1378 SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
1379 if (syncFuture == null) {
1380 syncFuture = new SyncFuture();
1381 this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
1382 }
1383 return syncFuture.reset(sequence, span);
1384 }
1385
1386 private void postSync(final long timeInNanos, final int handlerSyncs) {
1387 if (timeInNanos > this.slowSyncNs) {
1388 String msg =
1389 new StringBuilder().append("Slow sync cost: ")
1390 .append(timeInNanos / 1000000).append(" ms, current pipeline: ")
1391 .append(Arrays.toString(getPipeLine())).toString();
1392 Trace.addTimelineAnnotation(msg);
1393 LOG.info(msg);
1394 }
1395 if (!listeners.isEmpty()) {
1396 for (WALActionsListener listener : listeners) {
1397 listener.postSync(timeInNanos, handlerSyncs);
1398 }
1399 }
1400 }
1401
1402 private long postAppend(final Entry e, final long elapsedTime) {
1403 long len = 0;
1404 if (!listeners.isEmpty()) {
1405 for (Cell cell : e.getEdit().getCells()) {
1406 len += CellUtil.estimatedSerializedSizeOf(cell);
1407 }
1408 for (WALActionsListener listener : listeners) {
1409 listener.postAppend(len, elapsedTime);
1410 }
1411 }
1412 return len;
1413 }
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428 @VisibleForTesting
1429 int getLogReplication() {
1430 try {
1431
1432 if (this.hdfs_out instanceof HdfsDataOutputStream) {
1433 return ((HdfsDataOutputStream) this.hdfs_out).getCurrentBlockReplication();
1434 }
1435 } catch (IOException e) {
1436 LOG.info("", e);
1437 }
1438 return 0;
1439 }
1440
1441 @Override
1442 public void sync() throws IOException {
1443 TraceScope scope = Trace.startSpan("FSHLog.sync");
1444 try {
1445 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1446 } finally {
1447 assert scope == NullScope.INSTANCE || !scope.isDetached();
1448 scope.close();
1449 }
1450 }
1451
1452 @Override
1453 public void sync(long txid) throws IOException {
1454 if (this.highestSyncedSequence.get() >= txid){
1455
1456 return;
1457 }
1458 TraceScope scope = Trace.startSpan("FSHLog.sync");
1459 try {
1460 scope = Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
1461 } finally {
1462 assert scope == NullScope.INSTANCE || !scope.isDetached();
1463 scope.close();
1464 }
1465 }
1466
1467
1468 public void requestLogRoll() {
1469 requestLogRoll(false);
1470 }
1471
1472 private void requestLogRoll(boolean tooFewReplicas) {
1473 if (!this.listeners.isEmpty()) {
1474 for (WALActionsListener i: this.listeners) {
1475 i.logRollRequested(tooFewReplicas);
1476 }
1477 }
1478 }
1479
1480
1481
1482 public int getNumRolledLogFiles() {
1483 return byWalRegionSequenceIds.size();
1484 }
1485
1486
1487
1488 public int getNumLogFiles() {
1489
1490 return getNumRolledLogFiles() + 1;
1491 }
1492
1493
1494
1495 public long getLogFileSize() {
1496 return this.totalLogSize.get();
1497 }
1498
1499 @Override
1500 public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families) {
1501 if (!closeBarrier.beginOp()) {
1502 LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
1503 return null;
1504 }
1505 return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
1506 }
1507
1508 @Override
1509 public void completeCacheFlush(final byte [] encodedRegionName) {
1510 this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
1511 closeBarrier.endOp();
1512 }
1513
1514 @Override
1515 public void abortCacheFlush(byte[] encodedRegionName) {
1516 this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
1517 closeBarrier.endOp();
1518 }
1519
1520 @VisibleForTesting
1521 boolean isLowReplicationRollEnabled() {
1522 return lowReplicationRollEnabled;
1523 }
1524
1525 public static final long FIXED_OVERHEAD = ClassSize.align(
1526 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1527 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1528
1529 private static void split(final Configuration conf, final Path p)
1530 throws IOException {
1531 FileSystem fs = FileSystem.get(conf);
1532 if (!fs.exists(p)) {
1533 throw new FileNotFoundException(p.toString());
1534 }
1535 if (!fs.getFileStatus(p).isDirectory()) {
1536 throw new IOException(p + " is not a directory");
1537 }
1538
1539 final Path baseDir = FSUtils.getRootDir(conf);
1540 final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1541 WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
1542 }
1543
1544
1545 @Override
1546 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
1547
1548 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
1549 }
1550
1551 @Override
1552 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
1553
1554
1555
1556
1557
1558
1559
1560
1561 return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
1562 }
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590 static class SafePointZigZagLatch {
1591
1592
1593
1594 private volatile CountDownLatch safePointAttainedLatch = new CountDownLatch(1);
1595
1596
1597
1598 private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1);
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610 SyncFuture waitSafePoint(final SyncFuture syncFuture)
1611 throws InterruptedException, FailedSyncBeforeLogCloseException {
1612 while (true) {
1613 if (this.safePointAttainedLatch.await(1, TimeUnit.NANOSECONDS)) break;
1614 if (syncFuture.isThrowable()) {
1615 throw new FailedSyncBeforeLogCloseException(syncFuture.getThrowable());
1616 }
1617 }
1618 return syncFuture;
1619 }
1620
1621
1622
1623
1624
1625
1626
1627 void safePointAttained() throws InterruptedException {
1628 this.safePointAttainedLatch.countDown();
1629 this.safePointReleasedLatch.await();
1630 }
1631
1632
1633
1634
1635
1636 void releaseSafePoint() {
1637 this.safePointReleasedLatch.countDown();
1638 }
1639
1640
1641
1642
1643 boolean isCocked() {
1644 return this.safePointAttainedLatch.getCount() > 0 &&
1645 this.safePointReleasedLatch.getCount() > 0;
1646 }
1647 }
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672 class RingBufferEventHandler implements EventHandler<RingBufferTruck>, LifecycleAware {
1673 private final SyncRunner [] syncRunners;
1674 private final SyncFuture [] syncFutures;
1675
1676
1677 private volatile int syncFuturesCount = 0;
1678 private volatile SafePointZigZagLatch zigzagLatch;
1679
1680
1681
1682
1683 private Exception exception = null;
1684
1685
1686
1687 private final Object safePointWaiter = new Object();
1688 private volatile boolean shutdown = false;
1689
1690
1691
1692
1693 private int syncRunnerIndex;
1694
1695 RingBufferEventHandler(final int syncRunnerCount, final int maxHandlersCount) {
1696 this.syncFutures = new SyncFuture[maxHandlersCount];
1697 this.syncRunners = new SyncRunner[syncRunnerCount];
1698 for (int i = 0; i < syncRunnerCount; i++) {
1699 this.syncRunners[i] = new SyncRunner("sync." + i, maxHandlersCount);
1700 }
1701 }
1702
1703 private void cleanupOutstandingSyncsOnException(final long sequence, final Exception e) {
1704
1705 for (int i = 0; i < this.syncFuturesCount; i++) this.syncFutures[i].done(sequence, e);
1706 this.syncFuturesCount = 0;
1707 }
1708
1709
1710
1711
1712 private boolean isOutstandingSyncs() {
1713 for (int i = 0; i < this.syncFuturesCount; i++) {
1714 if (!this.syncFutures[i].isDone()) return true;
1715 }
1716 return false;
1717 }
1718
1719 @Override
1720
1721 public void onEvent(final RingBufferTruck truck, final long sequence, boolean endOfBatch)
1722 throws Exception {
1723
1724
1725
1726
1727
1728
1729
1730
1731 try {
1732 if (truck.hasSyncFuturePayload()) {
1733 this.syncFutures[this.syncFuturesCount++] = truck.unloadSyncFuturePayload();
1734
1735 if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = true;
1736 } else if (truck.hasFSWALEntryPayload()) {
1737 TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
1738 try {
1739 FSWALEntry entry = truck.unloadFSWALEntryPayload();
1740 if (this.exception != null) {
1741
1742
1743
1744
1745
1746 entry.stampRegionSequenceId();
1747
1748 return;
1749 }
1750 append(entry);
1751 } catch (Exception e) {
1752
1753 this.exception = e;
1754
1755 return;
1756 } finally {
1757 assert scope == NullScope.INSTANCE || !scope.isDetached();
1758 scope.close();
1759 }
1760 } else {
1761
1762 cleanupOutstandingSyncsOnException(sequence,
1763 new IllegalStateException("Neither append nor sync"));
1764
1765 return;
1766 }
1767
1768
1769
1770 if (this.exception == null) {
1771
1772
1773 if (!endOfBatch || this.syncFuturesCount <= 0) return;
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783 this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;
1784 try {
1785
1786
1787 this.syncRunners[this.syncRunnerIndex].offer(sequence, this.syncFutures,
1788 this.syncFuturesCount);
1789 } catch (Exception e) {
1790
1791 requestLogRoll();
1792 this.exception = new DamagedWALException("Failed offering sync", e);
1793 }
1794 }
1795
1796 if (this.exception != null) {
1797 cleanupOutstandingSyncsOnException(sequence,
1798 this.exception instanceof DamagedWALException?
1799 this.exception:
1800 new DamagedWALException("On sync", this.exception));
1801 }
1802 attainSafePoint(sequence);
1803 this.syncFuturesCount = 0;
1804 } catch (Throwable t) {
1805 LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
1806 }
1807 }
1808
1809 SafePointZigZagLatch attainSafePoint() {
1810 this.zigzagLatch = new SafePointZigZagLatch();
1811 return this.zigzagLatch;
1812 }
1813
1814
1815
1816
1817
1818 private void attainSafePoint(final long currentSequence) {
1819 if (this.zigzagLatch == null || !this.zigzagLatch.isCocked()) return;
1820
1821 beforeWaitOnSafePoint();
1822 try {
1823
1824
1825
1826 while (!this.shutdown && this.zigzagLatch.isCocked() &&
1827 highestSyncedSequence.get() < currentSequence &&
1828
1829
1830 isOutstandingSyncs()) {
1831 synchronized (this.safePointWaiter) {
1832 this.safePointWaiter.wait(0, 1);
1833 }
1834 }
1835
1836
1837
1838 this.exception = null;
1839 this.zigzagLatch.safePointAttained();
1840 } catch (InterruptedException e) {
1841 LOG.warn("Interrupted ", e);
1842 Thread.currentThread().interrupt();
1843 }
1844 }
1845
1846
1847
1848
1849
1850
1851 void append(final FSWALEntry entry) throws Exception {
1852
1853 atHeadOfRingBufferEventHandlerAppend();
1854
1855 long start = EnvironmentEdgeManager.currentTime();
1856 byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
1857 long regionSequenceId = WALKey.NO_SEQUENCE_ID;
1858 try {
1859
1860
1861
1862 regionSequenceId = entry.stampRegionSequenceId();
1863
1864
1865
1866 if (entry.getEdit().isEmpty()) {
1867 return;
1868 }
1869
1870
1871 if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
1872 entry.getEdit())) {
1873 if (entry.getEdit().isReplay()) {
1874
1875 entry.getKey().setScopes(null);
1876 }
1877 }
1878 if (!listeners.isEmpty()) {
1879 for (WALActionsListener i: listeners) {
1880
1881 i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
1882 entry.getEdit());
1883 }
1884 }
1885
1886 writer.append(entry);
1887 assert highestUnsyncedSequence < entry.getSequence();
1888 highestUnsyncedSequence = entry.getSequence();
1889 sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1890 entry.isInMemstore());
1891 coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
1892
1893 postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
1894 } catch (Exception e) {
1895 String msg = "Append sequenceId=" + regionSequenceId + ", requesting roll of WAL";
1896 LOG.warn(msg, e);
1897 requestLogRoll();
1898 throw new DamagedWALException(msg, e);
1899 }
1900 numEntries.incrementAndGet();
1901 }
1902
1903 @Override
1904 public void onStart() {
1905 for (SyncRunner syncRunner: this.syncRunners) syncRunner.start();
1906 }
1907
1908 @Override
1909 public void onShutdown() {
1910 for (SyncRunner syncRunner: this.syncRunners) syncRunner.interrupt();
1911 }
1912 }
1913
1914
1915
1916
1917 @VisibleForTesting
1918 void atHeadOfRingBufferEventHandlerAppend() {
1919
1920 }
1921
1922 private static IOException ensureIOException(final Throwable t) {
1923 return (t instanceof IOException)? (IOException)t: new IOException(t);
1924 }
1925
1926 private static void usage() {
1927 System.err.println("Usage: FSHLog <ARGS>");
1928 System.err.println("Arguments:");
1929 System.err.println(" --dump Dump textual representation of passed one or more files");
1930 System.err.println(" For example: " +
1931 "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1932 System.err.println(" --split Split the passed directory of WAL logs");
1933 System.err.println(" For example: " +
1934 "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1935 }
1936
1937
1938
1939
1940
1941
1942
1943
1944 public static void main(String[] args) throws IOException {
1945 if (args.length < 2) {
1946 usage();
1947 System.exit(-1);
1948 }
1949
1950 if (args[0].compareTo("--dump") == 0) {
1951 WALPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1952 } else if (args[0].compareTo("--perf") == 0) {
1953 LOG.fatal("Please use the WALPerformanceEvaluation tool instead. i.e.:");
1954 LOG.fatal("\thbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation --iterations " +
1955 args[1]);
1956 System.exit(-1);
1957 } else if (args[0].compareTo("--split") == 0) {
1958 Configuration conf = HBaseConfiguration.create();
1959 for (int i = 1; i < args.length; i++) {
1960 try {
1961 Path logPath = new Path(args[i]);
1962 FSUtils.setFsDefault(conf, logPath);
1963 split(conf, logPath);
1964 } catch (IOException t) {
1965 t.printStackTrace(System.err);
1966 System.exit(-1);
1967 }
1968 }
1969 } else {
1970 usage();
1971 System.exit(-1);
1972 }
1973 }
1974
1975
1976
1977
1978 @VisibleForTesting
1979 DatanodeInfo[] getPipeLine() {
1980 if (this.hdfs_out != null) {
1981 if (this.hdfs_out.getWrappedStream() instanceof DFSOutputStream) {
1982 return ((DFSOutputStream) this.hdfs_out.getWrappedStream()).getPipeline();
1983 }
1984 }
1985 return new DatanodeInfo[0];
1986 }
1987 }