1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Durability;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
39 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
40 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
41 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
42 import org.apache.hadoop.hbase.testclassification.LargeTests;
43 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.FSUtils;
46 import org.apache.hadoop.hbase.util.FSVisitor;
47 import org.apache.hadoop.hbase.util.TestTableName;
48
49 import org.junit.AfterClass;
50 import org.junit.BeforeClass;
51 import org.junit.Rule;
52 import org.junit.Test;
53 import org.junit.experimental.categories.Category;
54
55 import static org.junit.Assert.assertEquals;
56 import static org.junit.Assert.assertFalse;
57 import static org.junit.Assert.assertTrue;
58 import static org.junit.Assert.fail;
59
60 @Category(LargeTests.class)
61 public class TestScannerRetriableFailure {
62 private static final Log LOG = LogFactory.getLog(TestScannerRetriableFailure.class);
63
64 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
65
66 private static final String FAMILY_NAME_STR = "f";
67 private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
68
69 @Rule public TestTableName TEST_TABLE = new TestTableName();
70
71 public static class FaultyScannerObserver extends BaseRegionObserver {
72 private int faults = 0;
73
74 @Override
75 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
76 final InternalScanner s, final List<Result> results,
77 final int limit, final boolean hasMore) throws IOException {
78 final TableName tableName = e.getEnvironment().getRegionInfo().getTable();
79 if (!tableName.isSystemTable() && (faults++ % 2) == 0) {
80 LOG.debug(" Injecting fault in table=" + tableName + " scanner");
81 throw new IOException("injected fault");
82 }
83 return hasMore;
84 }
85 }
86
87 private static void setupConf(Configuration conf) {
88 conf.setLong("hbase.hstore.compaction.min", 20);
89 conf.setLong("hbase.hstore.compaction.max", 39);
90 conf.setLong("hbase.hstore.blockingStoreFiles", 40);
91
92 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, FaultyScannerObserver.class.getName());
93 }
94
95 @BeforeClass
96 public static void setup() throws Exception {
97 setupConf(UTIL.getConfiguration());
98 UTIL.startMiniCluster(1);
99 }
100
101 @AfterClass
102 public static void tearDown() throws Exception {
103 try {
104 UTIL.shutdownMiniCluster();
105 } catch (Exception e) {
106 LOG.warn("failure shutting down cluster", e);
107 }
108 }
109
110 @Test(timeout=180000)
111 public void testFaultyScanner() throws Exception {
112 TableName tableName = TEST_TABLE.getTableName();
113 Table table = UTIL.createTable(tableName, FAMILY_NAME);
114 try {
115 final int NUM_ROWS = 100;
116 loadTable(table, NUM_ROWS);
117 checkTableRows(table, NUM_ROWS);
118 } finally {
119 table.close();
120 }
121 }
122
123
124
125
126 private FileSystem getFileSystem() {
127 return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
128 }
129
130 private Path getRootDir() {
131 return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
132 }
133
134 public void loadTable(final Table table, int numRows) throws IOException {
135 List<Put> puts = new ArrayList<Put>(numRows);
136 for (int i = 0; i < numRows; ++i) {
137 byte[] row = Bytes.toBytes(String.format("%09d", i));
138 Put put = new Put(row);
139 put.setDurability(Durability.SKIP_WAL);
140 put.add(FAMILY_NAME, null, row);
141 table.put(put);
142 }
143 }
144
145 private void checkTableRows(final Table table, int numRows) throws Exception {
146 Scan scan = new Scan();
147 scan.setCaching(1);
148 scan.setCacheBlocks(false);
149 ResultScanner scanner = table.getScanner(scan);
150 try {
151 int count = 0;
152 for (int i = 0; i < numRows; ++i) {
153 byte[] row = Bytes.toBytes(String.format("%09d", i));
154 Result result = scanner.next();
155 assertTrue(result != null);
156 assertTrue(Bytes.equals(row, result.getRow()));
157 count++;
158 }
159
160 while (true) {
161 Result result = scanner.next();
162 if (result == null) break;
163 count++;
164 }
165 assertEquals(numRows, count);
166 } finally {
167 scanner.close();
168 }
169 }
170 }