1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Durability;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.io.HFileLink;
39 import org.apache.hadoop.hbase.testclassification.LargeTests;
40 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.FSUtils;
43 import org.apache.hadoop.hbase.util.FSVisitor;
44 import org.apache.hadoop.hbase.util.TestTableName;
45
46 import org.junit.After;
47 import org.junit.Before;
48 import org.junit.Rule;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51
52 import static org.junit.Assert.assertEquals;
53 import static org.junit.Assert.assertTrue;
54
55 @Category(LargeTests.class)
56 public class TestCorruptedRegionStoreFile {
57 private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
58
59 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
60
61 private static final String FAMILY_NAME_STR = "f";
62 private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
63
64 private static final int NUM_FILES = 10;
65 private static final int ROW_PER_FILE = 2000;
66 private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
67
68 @Rule public TestTableName TEST_TABLE = new TestTableName();
69
70 private final ArrayList<Path> storeFiles = new ArrayList<Path>();
71 private Path tableDir;
72 private int rowCount;
73
74 private static void setupConf(Configuration conf) {
75
76 conf.setLong("hbase.hstore.compactionThreshold", NUM_FILES + 1);
77 conf.setLong("hbase.hstore.blockingStoreFiles", NUM_FILES * 2);
78 }
79
80 private void setupTable(final TableName tableName) throws IOException {
81
82 Table table = UTIL.createTable(tableName, FAMILY_NAME);
83 try {
84 rowCount = 0;
85 byte[] value = new byte[1024];
86 byte[] q = Bytes.toBytes("q");
87 while (rowCount < NUM_ROWS) {
88 Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
89 put.setDurability(Durability.SKIP_WAL);
90 put.add(FAMILY_NAME, q, value);
91 table.put(put);
92
93 if ((rowCount++ % ROW_PER_FILE) == 0) {
94
95 ((HTable)table).flushCommits();
96 UTIL.getHBaseAdmin().flush(tableName);
97 }
98 }
99 } finally {
100 UTIL.getHBaseAdmin().flush(tableName);
101 table.close();
102 }
103
104 assertEquals(NUM_ROWS, rowCount);
105
106
107 storeFiles.clear();
108 tableDir = FSUtils.getTableDir(getRootDir(), tableName);
109 FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
110 @Override
111 public void storeFile(final String region, final String family, final String hfile)
112 throws IOException {
113 HFileLink link = HFileLink.build(UTIL.getConfiguration(), tableName, region, family, hfile);
114 storeFiles.add(link.getOriginPath());
115 }
116 });
117 assertTrue("Expected at least " + NUM_FILES + " store files", storeFiles.size() >= NUM_FILES);
118 LOG.info("Store files: " + storeFiles);
119 }
120
121 @Before
122 public void setup() throws Exception {
123 setupConf(UTIL.getConfiguration());
124 UTIL.startMiniCluster(2, 3);
125
126 setupTable(TEST_TABLE.getTableName());
127 }
128
129 @After
130 public void tearDown() throws Exception {
131 try {
132 UTIL.shutdownMiniCluster();
133 } catch (Exception e) {
134 LOG.warn("failure shutting down cluster", e);
135 }
136 }
137
138 @Test(timeout=180000)
139 public void testLosingFileDuringScan() throws Exception {
140 assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
141
142 final FileSystem fs = getFileSystem();
143 final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
144
145
146 int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
147 private boolean hasFile = true;
148
149 @Override
150 public void beforeScanNext(Table table) throws Exception {
151
152 if (hasFile) {
153 fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
154 LOG.info("Move file to local");
155 evictHFileCache(storeFiles.get(0));
156 hasFile = false;
157 }
158 }
159 });
160 assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
161 count >= (NUM_ROWS - ROW_PER_FILE));
162 }
163
164 @Test(timeout=180000)
165 public void testLosingFileAfterScannerInit() throws Exception {
166 assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
167
168 final FileSystem fs = getFileSystem();
169 final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
170
171
172 int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
173 private boolean hasFile = true;
174
175 @Override
176 public void beforeScan(Table table, Scan scan) throws Exception {
177
178 if (hasFile) {
179 fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
180 LOG.info("Move file to local");
181 evictHFileCache(storeFiles.get(0));
182 hasFile = false;
183 }
184 }
185 });
186 assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
187 count >= (NUM_ROWS - ROW_PER_FILE));
188 }
189
190
191
192
193 private FileSystem getFileSystem() {
194 return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
195 }
196
197 private Path getRootDir() {
198 return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
199 }
200
201 private void evictHFileCache(final Path hfile) throws Exception {
202 for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
203 HRegionServer rs = rst.getRegionServer();
204 rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
205 }
206 Thread.sleep(6000);
207 }
208
209 private int fullScanAndCount(final TableName tableName) throws Exception {
210 return fullScanAndCount(tableName, new ScanInjector());
211 }
212
213 private int fullScanAndCount(final TableName tableName, final ScanInjector injector)
214 throws Exception {
215 Table table = UTIL.getConnection().getTable(tableName);
216 int count = 0;
217 try {
218 Scan scan = new Scan();
219 scan.setCaching(1);
220 scan.setCacheBlocks(false);
221 injector.beforeScan(table, scan);
222 ResultScanner scanner = table.getScanner(scan);
223 try {
224 while (true) {
225 injector.beforeScanNext(table);
226 Result result = scanner.next();
227 injector.afterScanNext(table, result);
228 if (result == null) break;
229 if ((count++ % (ROW_PER_FILE / 2)) == 0) {
230 LOG.debug("scan next " + count);
231 }
232 }
233 } finally {
234 scanner.close();
235 injector.afterScan(table);
236 }
237 } finally {
238 table.close();
239 }
240 return count;
241 }
242
243 private class ScanInjector {
244 protected void beforeScan(Table table, Scan scan) throws Exception {}
245 protected void beforeScanNext(Table table) throws Exception {}
246 protected void afterScanNext(Table table, Result result) throws Exception {}
247 protected void afterScan(Table table) throws Exception {}
248 }
249 }