1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.wal.WAL.Reader;
28 import org.apache.hadoop.hbase.wal.WAL.Entry;
29 import org.apache.hadoop.hbase.wal.WALFactory;
30
31 import java.io.IOException;
32
33
34
35
36
37 @InterfaceAudience.Private
38 public class ReplicationWALReaderManager {
39
40 private static final Log LOG = LogFactory.getLog(ReplicationWALReaderManager.class);
41 private final FileSystem fs;
42 private final Configuration conf;
43 private long position = 0;
44 private Reader reader;
45 private Path lastPath;
46
47
48
49
50
51
52
53 public ReplicationWALReaderManager(FileSystem fs, Configuration conf) {
54 this.fs = fs;
55 this.conf = conf;
56 }
57
58
59
60
61
62
63
64 public Reader openReader(Path path) throws IOException {
65
66
67 if (this.reader == null || !this.lastPath.equals(path)) {
68 this.closeReader();
69 this.reader = WALFactory.createReader(this.fs, path, this.conf);
70 this.lastPath = path;
71 } else {
72 try {
73 this.reader.reset();
74 } catch (NullPointerException npe) {
75 throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
76 }
77 }
78 return this.reader;
79 }
80
81
82
83
84
85
86 public Entry readNextAndSetPosition() throws IOException {
87 Entry entry = this.reader.next();
88
89
90
91
92 this.position = this.reader.getPosition();
93
94 if (entry != null) {
95 entry.setCompressionContext(null);
96 }
97 return entry;
98 }
99
100
101
102
103
104 public void seek() throws IOException {
105 if (this.position != 0) {
106 this.reader.seek(this.position);
107 }
108 }
109
110
111
112
113
114 public long getPosition() {
115 return this.position;
116 }
117
118 public void setPosition(long pos) {
119 this.position = pos;
120 }
121
122
123
124
125
126 public void closeReader() throws IOException {
127 if (this.reader != null) {
128 this.reader.close();
129 this.reader = null;
130 }
131 }
132
133
134
135
136 void finishCurrentFile() {
137 this.position = 0;
138 try {
139 this.closeReader();
140 } catch (IOException e) {
141 LOG.warn("Unable to close reader", e);
142 }
143 }
144
145 }