1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Matchers.anyObject;
25 import static org.mockito.Mockito.doAnswer;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.spy;
30
31 import java.io.IOException;
32 import java.util.Arrays;
33 import java.util.Map;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.*;
38 import org.apache.hadoop.hbase.client.Connection;
39 import org.apache.hadoop.hbase.client.ConnectionFactory;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.Put;
42 import org.apache.hadoop.hbase.client.Result;
43 import org.apache.hadoop.hbase.client.ResultScanner;
44 import org.apache.hadoop.hbase.client.Scan;
45 import org.apache.hadoop.hbase.client.Table;
46 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
47 import org.apache.hadoop.hbase.filter.Filter;
48 import org.apache.hadoop.hbase.filter.RegexStringComparator;
49 import org.apache.hadoop.hbase.filter.RowFilter;
50 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
51 import org.apache.hadoop.hbase.testclassification.LargeTests;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.io.NullWritable;
54 import org.apache.hadoop.mapred.JobConf;
55 import org.apache.hadoop.mapred.JobConfigurable;
56 import org.apache.hadoop.mapred.MiniMRCluster;
57 import org.apache.hadoop.mapreduce.InputFormat;
58 import org.apache.hadoop.mapreduce.Job;
59 import org.apache.hadoop.mapreduce.JobContext;
60 import org.apache.hadoop.mapreduce.Mapper.Context;
61 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
62 import org.junit.AfterClass;
63 import org.junit.Before;
64 import org.junit.BeforeClass;
65 import org.junit.Test;
66 import org.junit.experimental.categories.Category;
67 import org.mockito.invocation.InvocationOnMock;
68 import org.mockito.stubbing.Answer;
69
70
71
72
73
74 @Category(LargeTests.class)
75 public class TestTableInputFormat {
76
77 private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
78
79 private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
80 static final byte[] FAMILY = Bytes.toBytes("family");
81
82 private static final byte[][] columns = new byte[][] { FAMILY };
83
84 @BeforeClass
85 public static void beforeClass() throws Exception {
86 UTIL.setJobWithoutMRCluster();
87 UTIL.startMiniCluster();
88 }
89
90 @AfterClass
91 public static void afterClass() throws Exception {
92 UTIL.shutdownMiniCluster();
93 }
94
95 @Before
96 public void before() throws IOException {
97 LOG.info("before");
98 UTIL.ensureSomeRegionServersAvailable(1);
99 LOG.info("before done");
100 }
101
102
103
104
105
106
107
108
109 public static Table createTable(byte[] tableName) throws IOException {
110 return createTable(tableName, new byte[][] { FAMILY });
111 }
112
113
114
115
116
117
118
119
120 public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
121 Table table = UTIL.createTable(TableName.valueOf(tableName), families);
122 Put p = new Put("aaa".getBytes());
123 for (byte[] family : families) {
124 p.add(family, null, "value aaa".getBytes());
125 }
126 table.put(p);
127 p = new Put("bbb".getBytes());
128 for (byte[] family : families) {
129 p.add(family, null, "value bbb".getBytes());
130 }
131 table.put(p);
132 return table;
133 }
134
135
136
137
138
139
140
141
142
143
144 static boolean checkResult(Result r, ImmutableBytesWritable key,
145 byte[] expectedKey, byte[] expectedValue) {
146 assertEquals(0, key.compareTo(expectedKey));
147 Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
148 byte[] value = vals.values().iterator().next();
149 assertTrue(Arrays.equals(value, expectedValue));
150 return true;
151 }
152
153
154
155
156
157
158
159
160
161 static void runTestMapreduce(Table table) throws IOException,
162 InterruptedException {
163 org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
164 new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
165 Scan s = new Scan();
166 s.setStartRow("aaa".getBytes());
167 s.setStopRow("zzz".getBytes());
168 s.addFamily(FAMILY);
169 trr.setScan(s);
170 trr.setHTable(table);
171
172 trr.initialize(null, null);
173 Result r = new Result();
174 ImmutableBytesWritable key = new ImmutableBytesWritable();
175
176 boolean more = trr.nextKeyValue();
177 assertTrue(more);
178 key = trr.getCurrentKey();
179 r = trr.getCurrentValue();
180 checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
181
182 more = trr.nextKeyValue();
183 assertTrue(more);
184 key = trr.getCurrentKey();
185 r = trr.getCurrentValue();
186 checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
187
188
189 more = trr.nextKeyValue();
190 assertFalse(more);
191 }
192
193
194
195
196
197
198 static Table createIOEScannerTable(byte[] name, final int failCnt)
199 throws IOException {
200
201 Answer<ResultScanner> a = new Answer<ResultScanner>() {
202 int cnt = 0;
203
204 @Override
205 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
206
207 if (cnt++ < failCnt) {
208
209 Scan scan = mock(Scan.class);
210 doReturn("bogus".getBytes()).when(scan).getStartRow();
211 ResultScanner scanner = mock(ResultScanner.class);
212
213 doThrow(new IOException("Injected exception")).when(scanner).next();
214 return scanner;
215 }
216
217
218 return (ResultScanner) invocation.callRealMethod();
219 }
220 };
221
222 Table htable = spy(createTable(name));
223 doAnswer(a).when(htable).getScanner((Scan) anyObject());
224 return htable;
225 }
226
227
228
229
230
231
232
233 static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
234 throws IOException {
235
236 Answer<ResultScanner> a = new Answer<ResultScanner>() {
237 int cnt = 0;
238
239 @Override
240 public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
241
242 if (cnt++ < failCnt) {
243
244 Scan scan = mock(Scan.class);
245 doReturn("bogus".getBytes()).when(scan).getStartRow();
246 ResultScanner scanner = mock(ResultScanner.class);
247
248 invocation.callRealMethod();
249 doThrow(
250 new NotServingRegionException("Injected simulated TimeoutException"))
251 .when(scanner).next();
252 return scanner;
253 }
254
255
256 return (ResultScanner) invocation.callRealMethod();
257 }
258 };
259
260 Table htable = spy(createTable(name));
261 doAnswer(a).when(htable).getScanner((Scan) anyObject());
262 return htable;
263 }
264
265
266
267
268
269
270
271 @Test
272 public void testTableRecordReaderMapreduce() throws IOException,
273 InterruptedException {
274 Table table = createTable("table1-mr".getBytes());
275 runTestMapreduce(table);
276 }
277
278
279
280
281
282
283
284 @Test
285 public void testTableRecordReaderScannerFailMapreduce() throws IOException,
286 InterruptedException {
287 Table htable = createIOEScannerTable("table2-mr".getBytes(), 1);
288 runTestMapreduce(htable);
289 }
290
291
292
293
294
295
296
297 @Test(expected = IOException.class)
298 public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException,
299 InterruptedException {
300 Table htable = createIOEScannerTable("table3-mr".getBytes(), 2);
301 runTestMapreduce(htable);
302 }
303
304
305
306
307
308
309
310 @Test
311 public void testTableRecordReaderScannerTimeoutMapreduce()
312 throws IOException, InterruptedException {
313 Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
314 runTestMapreduce(htable);
315 }
316
317
318
319
320
321
322
323 @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
324 public void testTableRecordReaderScannerTimeoutMapreduceTwice()
325 throws IOException, InterruptedException {
326 Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
327 runTestMapreduce(htable);
328 }
329
330
331
332
333 @Test
334 public void testExtensionOfTableInputFormatBase()
335 throws IOException, InterruptedException, ClassNotFoundException {
336 LOG.info("testing use of an InputFormat taht extends InputFormatBase");
337 final Table htable = createTable(Bytes.toBytes("exampleTable"),
338 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
339 testInputFormat(ExampleTIF.class);
340 }
341
342 @Test
343 public void testJobConfigurableExtensionOfTableInputFormatBase()
344 throws IOException, InterruptedException, ClassNotFoundException {
345 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
346 "using JobConfigurable.");
347 final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
348 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
349 testInputFormat(ExampleJobConfigurableTIF.class);
350 }
351
352 @Test
353 public void testDeprecatedExtensionOfTableInputFormatBase()
354 throws IOException, InterruptedException, ClassNotFoundException {
355 LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
356 "using the approach documented in 0.98.");
357 final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"),
358 new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
359 testInputFormat(ExampleDeprecatedTIF.class);
360 }
361
362 void testInputFormat(Class<? extends InputFormat> clazz)
363 throws IOException, InterruptedException, ClassNotFoundException {
364 final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
365 job.setInputFormatClass(clazz);
366 job.setOutputFormatClass(NullOutputFormat.class);
367 job.setMapperClass(ExampleVerifier.class);
368 job.setNumReduceTasks(0);
369
370 LOG.debug("submitting job.");
371 assertTrue("job failed!", job.waitForCompletion(true));
372 assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
373 .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
374 assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
375 .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
376 assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
377 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
378 assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
379 .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
380 assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
381 .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
382 assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
383 .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
384 }
385
386 public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
387
388 @Override
389 public void map(ImmutableBytesWritable key, Result value, Context context)
390 throws IOException {
391 for (Cell cell : value.listCells()) {
392 context.getCounter(TestTableInputFormat.class.getName() + ":row",
393 Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
394 .increment(1l);
395 context.getCounter(TestTableInputFormat.class.getName() + ":family",
396 Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
397 .increment(1l);
398 context.getCounter(TestTableInputFormat.class.getName() + ":value",
399 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
400 .increment(1l);
401 }
402 }
403
404 }
405
406 public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
407
408 @Override
409 public void configure(JobConf job) {
410 try {
411 HTable exampleTable = new HTable(HBaseConfiguration.create(job),
412 Bytes.toBytes("exampleDeprecatedTable"));
413
414 setHTable(exampleTable);
415 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
416 Bytes.toBytes("columnB") };
417
418 Scan scan = new Scan();
419 for (byte[] family : inputColumns) {
420 scan.addFamily(family);
421 }
422 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
423 scan.setFilter(exampleFilter);
424 setScan(scan);
425 } catch (IOException exception) {
426 throw new RuntimeException("Failed to configure for job.", exception);
427 }
428 }
429
430 }
431
432
433 public static class ExampleJobConfigurableTIF extends TableInputFormatBase
434 implements JobConfigurable {
435
436 @Override
437 public void configure(JobConf job) {
438 try {
439 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
440 TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
441
442 initializeTable(connection, tableName);
443 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
444 Bytes.toBytes("columnB") };
445
446 Scan scan = new Scan();
447 for (byte[] family : inputColumns) {
448 scan.addFamily(family);
449 }
450 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
451 scan.setFilter(exampleFilter);
452 setScan(scan);
453 } catch (IOException exception) {
454 throw new RuntimeException("Failed to initialize.", exception);
455 }
456 }
457 }
458
459
460 public static class ExampleTIF extends TableInputFormatBase {
461
462 @Override
463 protected void initialize(JobContext job) throws IOException {
464 Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
465 job.getConfiguration()));
466 TableName tableName = TableName.valueOf("exampleTable");
467
468 initializeTable(connection, tableName);
469 byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
470 Bytes.toBytes("columnB") };
471
472 Scan scan = new Scan();
473 for (byte[] family : inputColumns) {
474 scan.addFamily(family);
475 }
476 Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
477 scan.setFilter(exampleFilter);
478 setScan(scan);
479 }
480
481 }
482 }
483