1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.io;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotEquals;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.FileNotFoundException;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FSDataInputStream;
33 import org.apache.hadoop.fs.FSDataOutputStream;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.HBaseTestingUtility;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.util.FSUtils;
39 import org.apache.hadoop.hdfs.MiniDFSCluster;
40 import org.junit.Test;
41 import org.junit.experimental.categories.Category;
42
43
44
45
46
47 @Category(MediumTests.class)
48 public class TestFileLink {
49
50 @Test
51 public void testEquals() {
52 Path p1 = new Path("/p1");
53 Path p2 = new Path("/p2");
54 Path p3 = new Path("/p3");
55
56 assertEquals(new FileLink(), new FileLink());
57 assertEquals(new FileLink(p1), new FileLink(p1));
58 assertEquals(new FileLink(p1, p2), new FileLink(p1, p2));
59 assertEquals(new FileLink(p1, p2, p3), new FileLink(p1, p2, p3));
60
61 assertNotEquals(new FileLink(p1), new FileLink(p3));
62 assertNotEquals(new FileLink(p1, p2), new FileLink(p1));
63 assertNotEquals(new FileLink(p1, p2), new FileLink(p2));
64 assertNotEquals(new FileLink(p1, p2), new FileLink(p2, p1));
65 }
66
67 @Test
68 public void testHashCode() {
69 Path p1 = new Path("/p1");
70 Path p2 = new Path("/p2");
71 Path p3 = new Path("/p3");
72
73 assertEquals(new FileLink().hashCode(), new FileLink().hashCode());
74 assertEquals(new FileLink(p1).hashCode(), new FileLink(p1).hashCode());
75 assertEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1, p2).hashCode());
76 assertEquals(new FileLink(p1, p2, p3).hashCode(), new FileLink(p1, p2, p3).hashCode());
77
78 assertNotEquals(new FileLink(p1).hashCode(), new FileLink(p3).hashCode());
79 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p1).hashCode());
80 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2).hashCode());
81 assertNotEquals(new FileLink(p1, p2).hashCode(), new FileLink(p2, p1).hashCode());
82 }
83
84
85
86
87
88 @Test
89 public void testHDFSLinkReadDuringRename() throws Exception {
90 HBaseTestingUtility testUtil = new HBaseTestingUtility();
91 Configuration conf = testUtil.getConfiguration();
92 conf.setInt("dfs.blocksize", 1024 * 1024);
93 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
94
95 testUtil.startMiniDFSCluster(1);
96 MiniDFSCluster cluster = testUtil.getDFSCluster();
97 FileSystem fs = cluster.getFileSystem();
98 assertEquals("hdfs", fs.getUri().getScheme());
99
100 try {
101 testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
102 } finally {
103 testUtil.shutdownMiniCluster();
104 }
105 }
106
107
108
109
110
111 @Test
112 public void testLocalLinkReadDuringRename() throws IOException {
113 HBaseTestingUtility testUtil = new HBaseTestingUtility();
114 FileSystem fs = testUtil.getTestFileSystem();
115 assertEquals("file", fs.getUri().getScheme());
116 testLinkReadDuringRename(fs, testUtil.getDataTestDir());
117 }
118
119
120
121
122 private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException {
123 Path originalPath = new Path(rootDir, "test.file");
124 Path archivedPath = new Path(rootDir, "archived.file");
125
126 writeSomeData(fs, originalPath, 256 << 20, (byte)2);
127
128 List<Path> files = new ArrayList<Path>();
129 files.add(originalPath);
130 files.add(archivedPath);
131
132 FileLink link = new FileLink(files);
133 FSDataInputStream in = link.open(fs);
134 try {
135 byte[] data = new byte[8192];
136 long size = 0;
137
138
139 int n = in.read(data);
140 dataVerify(data, n, (byte)2);
141 size += n;
142
143 if (FSUtils.WINDOWS) {
144 in.close();
145 }
146
147
148 assertFalse(fs.exists(archivedPath));
149 fs.rename(originalPath, archivedPath);
150 assertFalse(fs.exists(originalPath));
151 assertTrue(fs.exists(archivedPath));
152
153 if (FSUtils.WINDOWS) {
154 in = link.open(fs);
155 in.read(data);
156 }
157
158
159 while ((n = in.read(data)) > 0) {
160 dataVerify(data, n, (byte)2);
161 size += n;
162 }
163
164 assertEquals(256 << 20, size);
165 } finally {
166 in.close();
167 if (fs.exists(originalPath)) fs.delete(originalPath, true);
168 if (fs.exists(archivedPath)) fs.delete(archivedPath, true);
169 }
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183 @Test
184 public void testHDFSLinkReadDuringDelete() throws Exception {
185 HBaseTestingUtility testUtil = new HBaseTestingUtility();
186 Configuration conf = testUtil.getConfiguration();
187 conf.setInt("dfs.blocksize", 1024 * 1024);
188 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
189
190 testUtil.startMiniDFSCluster(1);
191 MiniDFSCluster cluster = testUtil.getDFSCluster();
192 FileSystem fs = cluster.getFileSystem();
193 assertEquals("hdfs", fs.getUri().getScheme());
194
195 try {
196 List<Path> files = new ArrayList<Path>();
197 for (int i = 0; i < 3; i++) {
198 Path path = new Path(String.format("test-data-%d", i));
199 writeSomeData(fs, path, 1 << 20, (byte)i);
200 files.add(path);
201 }
202
203 FileLink link = new FileLink(files);
204 FSDataInputStream in = link.open(fs);
205 try {
206 byte[] data = new byte[8192];
207 int n;
208
209
210 n = in.read(data);
211 dataVerify(data, n, (byte)0);
212 fs.delete(files.get(0), true);
213 skipBuffer(in, (byte)0);
214
215
216 n = in.read(data);
217 dataVerify(data, n, (byte)1);
218 fs.delete(files.get(1), true);
219 skipBuffer(in, (byte)1);
220
221
222 n = in.read(data);
223 dataVerify(data, n, (byte)2);
224 fs.delete(files.get(2), true);
225 skipBuffer(in, (byte)2);
226
227
228 try {
229 n = in.read(data);
230 assert(n <= 0);
231 } catch (FileNotFoundException e) {
232 assertTrue(true);
233 }
234 } finally {
235 in.close();
236 }
237 } finally {
238 testUtil.shutdownMiniCluster();
239 }
240 }
241
242
243
244
245 private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException {
246 byte[] data = new byte[4096];
247 for (int i = 0; i < data.length; i++) {
248 data[i] = v;
249 }
250
251 FSDataOutputStream stream = fs.create(path);
252 try {
253 long written = 0;
254 while (written < size) {
255 stream.write(data, 0, data.length);
256 written += data.length;
257 }
258 } finally {
259 stream.close();
260 }
261 }
262
263
264
265
266 private static void dataVerify(byte[] data, int n, byte v) {
267 for (int i = 0; i < n; ++i) {
268 assertEquals(v, data[i]);
269 }
270 }
271
272 private static void skipBuffer(FSDataInputStream in, byte v) throws IOException {
273 byte[] data = new byte[8192];
274 try {
275 int n;
276 while ((n = in.read(data)) == data.length) {
277 for (int i = 0; i < data.length; ++i) {
278 if (data[i] != v)
279 throw new Exception("File changed");
280 }
281 }
282 } catch (Exception e) {
283 }
284 }
285 }