1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.FSDataInputStream;
36 import org.apache.hadoop.hbase.codec.Codec;
37 import org.apache.hadoop.hbase.io.LimitInputStream;
38 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
41 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
42 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
43 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.wal.WAL.Entry;
46
47 import com.google.protobuf.CodedInputStream;
48 import com.google.protobuf.InvalidProtocolBufferException;
49
50
51
52
53
54
55
56
57
58
59
60
61 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
62 HBaseInterfaceAudience.CONFIG})
63 public class ProtobufLogReader extends ReaderBase {
64 private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
65
66 @InterfaceAudience.Private
67 public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
68
69 @InterfaceAudience.Private
70 public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
71
72
73
74
75 static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
76 static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024;
77
78 protected FSDataInputStream inputStream;
79 protected Codec.Decoder cellDecoder;
80 protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
81 protected boolean hasCompression = false;
82 protected boolean hasTagCompression = false;
83
84
85 private long walEditsStopOffset;
86 private boolean trailerPresent;
87 protected WALTrailer trailer;
88
89
90 protected int trailerWarnSize;
91 private static List<String> writerClsNames = new ArrayList<String>();
92 static {
93 writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
94 }
95
96
97 private String codecClsName = null;
98
99 enum WALHdrResult {
100 EOF,
101 SUCCESS,
102 UNKNOWN_WRITER_CLS
103 }
104
105
106 static class WALHdrContext {
107 WALHdrResult result;
108 String cellCodecClsName;
109
110 WALHdrContext(WALHdrResult result, String cellCodecClsName) {
111 this.result = result;
112 this.cellCodecClsName = cellCodecClsName;
113 }
114 WALHdrResult getResult() {
115 return result;
116 }
117 String getCellCodecClsName() {
118 return cellCodecClsName;
119 }
120 }
121
122 public ProtobufLogReader() {
123 super();
124 }
125
126 @Override
127 public void close() throws IOException {
128 if (this.inputStream != null) {
129 this.inputStream.close();
130 this.inputStream = null;
131 }
132 }
133
134 @Override
135 public long getPosition() throws IOException {
136 return inputStream.getPos();
137 }
138
139 @Override
140 public void reset() throws IOException {
141 String clsName = initInternal(null, false);
142 initAfterCompression(clsName);
143 }
144
145 @Override
146 public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
147 throws IOException {
148 this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
149 super.init(fs, path, conf, stream);
150 }
151
152 @Override
153 protected String initReader(FSDataInputStream stream) throws IOException {
154 return initInternal(stream, true);
155 }
156
157
158
159
160 public List<String> getWriterClsNames() {
161 return writerClsNames;
162 }
163
164
165
166
167 public String getCodecClsName() {
168 return codecClsName;
169 }
170
171 protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream)
172 throws IOException {
173 boolean res = builder.mergeDelimitedFrom(stream);
174 if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
175 if (builder.hasWriterClsName() &&
176 !getWriterClsNames().contains(builder.getWriterClsName())) {
177 return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
178 }
179 String clsName = null;
180 if (builder.hasCellCodecClsName()) {
181 clsName = builder.getCellCodecClsName();
182 }
183 return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
184 }
185
186 private String initInternal(FSDataInputStream stream, boolean isFirst)
187 throws IOException {
188 close();
189 long expectedPos = PB_WAL_MAGIC.length;
190 if (stream == null) {
191 stream = fs.open(path);
192 stream.seek(expectedPos);
193 }
194 if (stream.getPos() != expectedPos) {
195 throw new IOException("The stream is at invalid position: " + stream.getPos());
196 }
197
198 WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
199 WALHdrContext hdrCtxt = readHeader(builder, stream);
200 WALHdrResult walHdrRes = hdrCtxt.getResult();
201 if (walHdrRes == WALHdrResult.EOF) {
202 throw new EOFException("Couldn't read WAL PB header");
203 }
204 if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
205 throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
206 }
207 if (isFirst) {
208 WALProtos.WALHeader header = builder.build();
209 this.hasCompression = header.hasHasCompression() && header.getHasCompression();
210 this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
211 }
212 this.inputStream = stream;
213 this.walEditsStopOffset = this.fileLength;
214 long currentPosition = stream.getPos();
215 trailerPresent = setTrailerIfPresent();
216 this.seekOnFs(currentPosition);
217 if (LOG.isTraceEnabled()) {
218 LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
219 + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent);
220 }
221
222 codecClsName = hdrCtxt.getCellCodecClsName();
223
224 return hdrCtxt.getCellCodecClsName();
225 }
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244 private boolean setTrailerIfPresent() {
245 try {
246 long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
247 if (trailerSizeOffset <= 0) return false;
248 this.seekOnFs(trailerSizeOffset);
249
250 int trailerSize = this.inputStream.readInt();
251 ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
252 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
253 if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
254 LOG.trace("No trailer found.");
255 return false;
256 }
257 if (trailerSize < 0) {
258 LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
259 return false;
260 } else if (trailerSize > this.trailerWarnSize) {
261
262 LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
263 + trailerSize + " > " + this.trailerWarnSize);
264 }
265
266 long positionOfTrailer = trailerSizeOffset - trailerSize;
267 this.seekOnFs(positionOfTrailer);
268
269 buf = ByteBuffer.allocate(trailerSize);
270 this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
271 trailer = WALTrailer.parseFrom(buf.array());
272 this.walEditsStopOffset = positionOfTrailer;
273 return true;
274 } catch (IOException ioe) {
275 LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
276 }
277 return false;
278 }
279
280 protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
281 CompressionContext compressionContext) throws IOException {
282 return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
283 }
284
285 @Override
286 protected void initAfterCompression() throws IOException {
287 initAfterCompression(null);
288 }
289
290 @Override
291 protected void initAfterCompression(String cellCodecClsName) throws IOException {
292 WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
293 this.cellDecoder = codec.getDecoder(this.inputStream);
294 if (this.hasCompression) {
295 this.byteStringUncompressor = codec.getByteStringUncompressor();
296 }
297 }
298
299 @Override
300 protected boolean hasCompression() {
301 return this.hasCompression;
302 }
303
304 @Override
305 protected boolean hasTagCompression() {
306 return this.hasTagCompression;
307 }
308
309 @Override
310 protected boolean readNext(Entry entry) throws IOException {
311 while (true) {
312
313 long originalPosition = this.inputStream.getPos();
314 if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
315 return false;
316 }
317 WALKey.Builder builder = WALKey.newBuilder();
318 long size = 0;
319 try {
320 long available = -1;
321 try {
322 int firstByte = this.inputStream.read();
323 if (firstByte == -1) {
324 throw new EOFException("First byte is negative");
325 }
326 size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
327
328 available = this.inputStream.available();
329 if (available > 0 && available < size) {
330 throw new EOFException("Available stream not enough for edit, " +
331 "inputStream.available()= " + this.inputStream.available() + ", " +
332 "entry size= " + size);
333 }
334 ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size),
335 (int)size);
336 } catch (InvalidProtocolBufferException ipbe) {
337 throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" +
338 originalPosition + ", currentPosition=" + this.inputStream.getPos() +
339 ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe);
340 }
341 if (!builder.isInitialized()) {
342
343
344 throw new EOFException("Partial PB while reading WAL, " +
345 "probably an unexpected EOF, ignoring");
346 }
347 WALKey walKey = builder.build();
348 entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
349 if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
350 LOG.trace("WALKey has no KVs that follow it; trying the next one");
351 continue;
352 }
353 int expectedCells = walKey.getFollowingKvCount();
354 long posBefore = this.inputStream.getPos();
355 try {
356 int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
357 if (expectedCells != actualCells) {
358 throw new EOFException("Only read " + actualCells);
359 }
360 } catch (Exception ex) {
361 String posAfterStr = "<unknown>";
362 try {
363 posAfterStr = this.inputStream.getPos() + "";
364 } catch (Throwable t) {
365 LOG.trace("Error getting pos for error message - ignoring", t);
366 }
367 String message = " while reading " + expectedCells + " WAL KVs; started reading at "
368 + posBefore + " and read up to " + posAfterStr;
369 IOException realEofEx = extractHiddenEof(ex);
370 throw (EOFException) new EOFException("EOF " + message).
371 initCause(realEofEx != null ? realEofEx : ex);
372 }
373 if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
374 LOG.error("Read WALTrailer while reading WALEdits. wal: " + this.path
375 + ", inputStream.getPos(): " + this.inputStream.getPos() + ", walEditsStopOffset: "
376 + this.walEditsStopOffset);
377 throw new EOFException("Read WALTrailer while reading WALEdits");
378 }
379 } catch (EOFException eof) {
380 LOG.trace("Encountered a malformed edit, seeking back to last good position in file", eof);
381
382 if (originalPosition < 0) throw eof;
383
384
385 seekOnFs(originalPosition);
386 return false;
387 }
388 return true;
389 }
390 }
391
392 private IOException extractHiddenEof(Exception ex) {
393
394
395 IOException ioEx = null;
396 if (ex instanceof EOFException) {
397 return (EOFException)ex;
398 } else if (ex instanceof IOException) {
399 ioEx = (IOException)ex;
400 } else if (ex instanceof RuntimeException
401 && ex.getCause() != null && ex.getCause() instanceof IOException) {
402 ioEx = (IOException)ex.getCause();
403 }
404 if (ioEx != null) {
405 if (ioEx.getMessage().contains("EOF")) return ioEx;
406 return null;
407 }
408 return null;
409 }
410
411 @Override
412 protected void seekOnFs(long pos) throws IOException {
413 this.inputStream.seek(pos);
414 }
415 }