1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io;
19
20 import java.io.IOException;
21
22 import org.apache.hadoop.fs.FSDataInputStream;
23 import org.apache.hadoop.fs.FileSystem;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.hbase.fs.HFileSystem;
26
27 import com.google.common.annotations.VisibleForTesting;
28
29
30
31
32
33
34 public class FSDataInputStreamWrapper {
35 private final HFileSystem hfs;
36 private final Path path;
37 private final FileLink link;
38 private final boolean doCloseStreams;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 private volatile FSDataInputStream stream = null;
61 private volatile FSDataInputStream streamNoFsChecksum = null;
62 private Object streamNoFsChecksumFirstCreateLock = new Object();
63
64
65 private boolean useHBaseChecksumConfigured;
66
67
68
69
70
71 private volatile boolean useHBaseChecksum;
72
73
74
75 private volatile int hbaseChecksumOffCount = -1;
76
77 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
78 this(fs, null, path, false);
79 }
80
81 public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException {
82 this(fs, null, path, dropBehind);
83 }
84
85 public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
86 this(fs, link, null, false);
87 }
88 public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
89 boolean dropBehind) throws IOException {
90 this(fs, link, null, dropBehind);
91 }
92
93 private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
94 Path path, boolean dropBehind) throws IOException {
95 assert (path == null) != (link == null);
96 this.path = path;
97 this.link = link;
98 this.doCloseStreams = true;
99
100
101
102 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
103
104
105 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
106 this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
107 try {
108 this.stream.setDropBehind(dropBehind);
109 } catch (Exception e) {
110
111 }
112 }
113
114
115
116
117
118
119
120
121 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
122 if (hfs == null) return;
123 assert this.stream != null && !this.useHBaseChecksumConfigured;
124 boolean useHBaseChecksum =
125 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
126
127 if (useHBaseChecksum) {
128 FileSystem fsNc = hfs.getNoChecksumFs();
129 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
130 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
131
132 this.stream.close();
133 this.stream = null;
134 }
135 }
136
137
138 @VisibleForTesting
139 public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
140 this(fsdis, fsdis);
141 }
142
143
144 @VisibleForTesting
145 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
146 doCloseStreams = false;
147 stream = fsdis;
148 streamNoFsChecksum = noChecksum;
149 path = null;
150 link = null;
151 hfs = null;
152 useHBaseChecksumConfigured = useHBaseChecksum = false;
153 }
154
155
156
157
158 public boolean shouldUseHBaseChecksum() {
159 return this.useHBaseChecksum;
160 }
161
162
163
164
165
166
167 public FSDataInputStream getStream(boolean useHBaseChecksum) {
168 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
169 }
170
171
172
173
174
175 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
176
177 boolean partOfConvoy = false;
178 if (this.stream == null) {
179 synchronized (streamNoFsChecksumFirstCreateLock) {
180 partOfConvoy = (this.stream != null);
181 if (!partOfConvoy) {
182 this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
183 }
184 }
185 }
186 if (!partOfConvoy) {
187 this.useHBaseChecksum = false;
188 this.hbaseChecksumOffCount = offCount;
189 }
190 return this.stream;
191 }
192
193
194 public void checksumOk() {
195 if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
196 && (this.hbaseChecksumOffCount-- < 0)) {
197
198 assert this.streamNoFsChecksum != null;
199 this.useHBaseChecksum = true;
200 }
201 }
202
203
204 public void close() throws IOException {
205 if (!doCloseStreams) return;
206 try {
207 if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
208 streamNoFsChecksum.close();
209 streamNoFsChecksum = null;
210 }
211 } finally {
212 if (stream != null) {
213 stream.close();
214 stream = null;
215 }
216 }
217 }
218
219 public HFileSystem getHfs() {
220 return this.hfs;
221 }
222 }