1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapred;
20
21 import com.google.common.collect.Lists;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.hbase.client.Result;
27 import org.apache.hadoop.hbase.client.Scan;
28 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
29 import org.apache.hadoop.hbase.testclassification.LargeTests;
30 import org.apache.hadoop.io.NullWritable;
31 import org.apache.hadoop.mapred.FileOutputFormat;
32 import org.apache.hadoop.mapred.JobClient;
33 import org.apache.hadoop.mapred.JobConf;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reporter;
36 import org.apache.hadoop.mapred.RunningJob;
37 import org.junit.experimental.categories.Category;
38
39 import java.io.IOException;
40 import java.util.Iterator;
41 import java.util.List;
42
43 import static org.junit.Assert.assertTrue;
44
45 @Category({ LargeTests.class })
46 public class TestMultiTableSnapshotInputFormat
47 extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat {
48
49 private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class);
50
51 @Override
52 protected void runJob(String jobName, Configuration c, List<Scan> scans)
53 throws IOException, InterruptedException, ClassNotFoundException {
54 JobConf job = new JobConf(TEST_UTIL.getConfiguration());
55
56 job.setJobName(jobName);
57 job.setMapperClass(Mapper.class);
58 job.setReducerClass(Reducer.class);
59
60 TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
61 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
62
63 TableMapReduceUtil.addDependencyJars(job);
64
65 job.setReducerClass(Reducer.class);
66 job.setNumReduceTasks(1);
67 FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
68 LOG.info("Started " + job.getJobName());
69
70 RunningJob runningJob = JobClient.runJob(job);
71 runningJob.waitForCompletion();
72 assertTrue(runningJob.isSuccessful());
73 LOG.info("After map/reduce completion - job " + jobName);
74 }
75
76 public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper
77 implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> {
78
79 @Override
80 public void map(ImmutableBytesWritable key, Result value,
81 OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector,
82 Reporter reporter) throws IOException {
83 makeAssertions(key, value);
84 outputCollector.collect(key, key);
85 }
86
87
88
89
90
91
92
93
94 @Override
95 public void close() throws IOException {
96 }
97
98 @Override
99 public void configure(JobConf jobConf) {
100
101 }
102 }
103
104 public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements
105 org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
106 NullWritable, NullWritable> {
107
108 private JobConf jobConf;
109
110 @Override
111 public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values,
112 OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter)
113 throws IOException {
114 makeAssertions(key, Lists.newArrayList(values));
115 }
116
117
118
119
120
121
122
123
124 @Override
125 public void close() throws IOException {
126 super.cleanup(this.jobConf);
127 }
128
129 @Override
130 public void configure(JobConf jobConf) {
131 this.jobConf = jobConf;
132 }
133 }
134 }