View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.client.Result;
34  import org.apache.hadoop.hbase.client.ResultScanner;
35  import org.apache.hadoop.hbase.client.Durability;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Table;
38  import org.apache.hadoop.hbase.io.HFileLink;
39  import org.apache.hadoop.hbase.testclassification.LargeTests;
40  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.FSUtils;
43  import org.apache.hadoop.hbase.util.FSVisitor;
44  import org.apache.hadoop.hbase.util.TestTableName;
45  
46  import org.junit.After;
47  import org.junit.Before;
48  import org.junit.Rule;
49  import org.junit.Test;
50  import org.junit.experimental.categories.Category;
51  
52  import static org.junit.Assert.assertEquals;
53  import static org.junit.Assert.assertTrue;
54  
55  @Category(LargeTests.class)
56  public class TestCorruptedRegionStoreFile {
57    private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
58  
59    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
60  
61    private static final String FAMILY_NAME_STR = "f";
62    private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
63  
64    private static final int NUM_FILES = 10;
65    private static final int ROW_PER_FILE = 2000;
66    private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
67  
68    @Rule public TestTableName TEST_TABLE = new TestTableName();
69  
70    private final ArrayList<Path> storeFiles = new ArrayList<Path>();
71    private Path tableDir;
72    private int rowCount;
73  
74    private static void setupConf(Configuration conf) {
75      // Disable compaction so the store file count stays constant
76      conf.setLong("hbase.hstore.compactionThreshold", NUM_FILES + 1);
77      conf.setLong("hbase.hstore.blockingStoreFiles", NUM_FILES * 2);
78    }
79  
80    private void setupTable(final TableName tableName) throws IOException {
81      // load the table
82      Table table = UTIL.createTable(tableName, FAMILY_NAME);
83      try {
84        rowCount = 0;
85        byte[] value = new byte[1024];
86        byte[] q = Bytes.toBytes("q");
87        while (rowCount < NUM_ROWS) {
88          Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
89          put.setDurability(Durability.SKIP_WAL);
90          put.add(FAMILY_NAME, q, value);
91          table.put(put);
92  
93          if ((rowCount++ % ROW_PER_FILE) == 0) {
94            // flush it
95            ((HTable)table).flushCommits();
96            UTIL.getHBaseAdmin().flush(tableName);
97          }
98        }
99      } finally {
100       UTIL.getHBaseAdmin().flush(tableName);
101       table.close();
102     }
103 
104     assertEquals(NUM_ROWS, rowCount);
105 
106     // get the store file paths
107     storeFiles.clear();
108     tableDir = FSUtils.getTableDir(getRootDir(), tableName);
109     FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() {
110       @Override
111       public void storeFile(final String region, final String family, final String hfile)
112           throws IOException {
113         HFileLink link = HFileLink.build(UTIL.getConfiguration(), tableName, region, family, hfile);
114         storeFiles.add(link.getOriginPath());
115       }
116     });
117     assertTrue("Expected at least " + NUM_FILES + " store files", storeFiles.size() >= NUM_FILES);
118     LOG.info("Store files: " + storeFiles);
119   }
120 
121   @Before
122   public void setup() throws Exception {
123     setupConf(UTIL.getConfiguration());
124     UTIL.startMiniCluster(2, 3);
125 
126     setupTable(TEST_TABLE.getTableName());
127   }
128 
129   @After
130   public void tearDown() throws Exception {
131     try {
132       UTIL.shutdownMiniCluster();
133     } catch (Exception e) {
134       LOG.warn("failure shutting down cluster", e);
135     }
136   }
137 
138   @Test(timeout=180000)
139   public void testLosingFileDuringScan() throws Exception {
140     assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
141 
142     final FileSystem fs = getFileSystem();
143     final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
144 
145     // try to query with the missing file
146     int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
147       private boolean hasFile = true;
148 
149       @Override
150       public void beforeScanNext(Table table) throws Exception {
151         // move the path away (now the region is corrupted)
152         if (hasFile) {
153           fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
154           LOG.info("Move file to local");
155           evictHFileCache(storeFiles.get(0));
156           hasFile = false;
157         }
158       }
159     });
160     assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
161                count >= (NUM_ROWS - ROW_PER_FILE));
162   }
163 
164   @Test(timeout=180000)
165   public void testLosingFileAfterScannerInit() throws Exception {
166     assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
167 
168     final FileSystem fs = getFileSystem();
169     final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
170 
171     // try to query with the missing file
172     int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
173       private boolean hasFile = true;
174 
175       @Override
176       public void beforeScan(Table table, Scan scan) throws Exception {
177         // move the path away (now the region is corrupted)
178         if (hasFile) {
179           fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
180           LOG.info("Move file to local");
181           evictHFileCache(storeFiles.get(0));
182           hasFile = false;
183         }
184       }
185     });
186     assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
187                count >= (NUM_ROWS - ROW_PER_FILE));
188   }
189 
190   // ==========================================================================
191   //  Helpers
192   // ==========================================================================
193   private FileSystem getFileSystem() {
194     return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
195   }
196 
197   private Path getRootDir() {
198     return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
199   }
200 
201   private void evictHFileCache(final Path hfile) throws Exception {
202     for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
203       HRegionServer rs = rst.getRegionServer();
204       rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
205     }
206     Thread.sleep(6000);
207   }
208 
209   private int fullScanAndCount(final TableName tableName) throws Exception {
210     return fullScanAndCount(tableName, new ScanInjector());
211   }
212 
213   private int fullScanAndCount(final TableName tableName, final ScanInjector injector)
214       throws Exception {
215     Table table = UTIL.getConnection().getTable(tableName);
216     int count = 0;
217     try {
218       Scan scan = new Scan();
219       scan.setCaching(1);
220       scan.setCacheBlocks(false);
221       injector.beforeScan(table, scan);
222       ResultScanner scanner = table.getScanner(scan);
223       try {
224         while (true) {
225           injector.beforeScanNext(table);
226           Result result = scanner.next();
227           injector.afterScanNext(table, result);
228           if (result == null) break;
229           if ((count++ % (ROW_PER_FILE / 2)) == 0) {
230             LOG.debug("scan next " + count);
231           }
232         }
233       } finally {
234         scanner.close();
235         injector.afterScan(table);
236       }
237     } finally {
238       table.close();
239     }
240     return count;
241   }
242 
243   private class ScanInjector {
244     protected void beforeScan(Table table, Scan scan) throws Exception {}
245     protected void beforeScanNext(Table table) throws Exception {}
246     protected void afterScanNext(Table table, Result result) throws Exception {}
247     protected void afterScan(Table table) throws Exception {}
248   }
249 }