1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import com.google.common.collect.Lists;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileUtil;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.CategoryBasedTimeout;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.HTable;
31 import org.apache.hadoop.hbase.client.Result;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.io.NullWritable;
36 import org.apache.hadoop.mapreduce.Job;
37 import org.apache.hadoop.mapreduce.Reducer;
38 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
39 import org.junit.After;
40 import org.junit.AfterClass;
41 import org.junit.BeforeClass;
42 import org.junit.Rule;
43 import org.junit.Test;
44 import org.junit.rules.TestRule;
45
46 import java.io.File;
47 import java.io.IOException;
48 import java.util.ArrayList;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.NavigableMap;
52
53 import static org.junit.Assert.assertEquals;
54 import static org.junit.Assert.assertTrue;
55
56
57
58
59 public abstract class MultiTableInputFormatTestBase {
60 static final Log LOG = LogFactory.getLog(MultiTableInputFormatTestBase.class);
61 @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
62 withTimeout(this.getClass()).withLookingForStuckThread(true).build();
63 public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
64 static final String TABLE_NAME = "scantest";
65 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
66 static final String KEY_STARTROW = "startRow";
67 static final String KEY_LASTROW = "stpRow";
68
69 static List<String> TABLES = Lists.newArrayList();
70
71 static {
72 for (int i = 0; i < 3; i++) {
73 TABLES.add(TABLE_NAME + String.valueOf(i));
74 }
75 }
76
77 @BeforeClass
78 public static void setUpBeforeClass() throws Exception {
79
80 TEST_UTIL.enableDebug(MultiTableInputFormatBase.class);
81 TEST_UTIL.setJobWithoutMRCluster();
82
83 TEST_UTIL.startMiniCluster(3);
84
85 for (String tableName : TABLES) {
86 HTable table = null;
87 try {
88 table = TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), INPUT_FAMILY, 4);
89 TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
90 } finally {
91 if (table != null) {
92 table.close();
93 }
94 }
95 }
96 }
97
98 @AfterClass
99 public static void tearDownAfterClass() throws Exception {
100 TEST_UTIL.shutdownMiniCluster();
101 }
102
103 @After
104 public void tearDown() throws Exception {
105 Configuration c = TEST_UTIL.getConfiguration();
106 FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
107 }
108
109
110
111
112 public static class ScanMapper extends
113 TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
114
115
116
117
118
119
120
121
122 @Override
123 public void map(ImmutableBytesWritable key, Result value, Context context)
124 throws IOException, InterruptedException {
125 makeAssertions(key, value);
126 context.write(key, key);
127 }
128
129 public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException {
130 if (value.size() != 1) {
131 throw new IOException("There should only be one input column");
132 }
133 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf =
134 value.getMap();
135 if (!cf.containsKey(INPUT_FAMILY)) {
136 throw new IOException("Wrong input columns. Missing: '" +
137 Bytes.toString(INPUT_FAMILY) + "'.");
138 }
139 String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
140 LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) +
141 ", value -> " + val);
142 }
143 }
144
145
146
147
148 public static class ScanReducer
149 extends
150 Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
151 NullWritable, NullWritable> {
152 private String first = null;
153 private String last = null;
154
155 @Override
156 protected void reduce(ImmutableBytesWritable key,
157 Iterable<ImmutableBytesWritable> values, Context context)
158 throws IOException, InterruptedException {
159 makeAssertions(key, values);
160 }
161
162 protected void makeAssertions(ImmutableBytesWritable key,
163 Iterable<ImmutableBytesWritable> values) {
164 int count = 0;
165 for (ImmutableBytesWritable value : values) {
166 String val = Bytes.toStringBinary(value.get());
167 LOG.debug("reduce: key[" + count + "] -> " +
168 Bytes.toStringBinary(key.get()) + ", value -> " + val);
169 if (first == null) first = val;
170 last = val;
171 count++;
172 }
173 assertEquals(3, count);
174 }
175
176 @Override
177 protected void cleanup(Context context) throws IOException,
178 InterruptedException {
179 Configuration c = context.getConfiguration();
180 cleanup(c);
181 }
182
183 protected void cleanup(Configuration c) {
184 String startRow = c.get(KEY_STARTROW);
185 String lastRow = c.get(KEY_LASTROW);
186 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" +
187 startRow + "\"");
188 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow +
189 "\"");
190 if (startRow != null && startRow.length() > 0) {
191 assertEquals(startRow, first);
192 }
193 if (lastRow != null && lastRow.length() > 0) {
194 assertEquals(lastRow, last);
195 }
196 }
197 }
198
199 @Test
200 public void testScanEmptyToEmpty() throws IOException, InterruptedException,
201 ClassNotFoundException {
202 testScan(null, null, null);
203 }
204
205 @Test
206 public void testScanEmptyToAPP() throws IOException, InterruptedException,
207 ClassNotFoundException {
208 testScan(null, "app", "apo");
209 }
210
211 @Test
212 public void testScanOBBToOPP() throws IOException, InterruptedException,
213 ClassNotFoundException {
214 testScan("obb", "opp", "opo");
215 }
216
217 @Test
218 public void testScanYZYToEmpty() throws IOException, InterruptedException,
219 ClassNotFoundException {
220 testScan("yzy", null, "zzz");
221 }
222
223
224
225
226
227
228
229
230 private void testScan(String start, String stop, String last)
231 throws IOException, InterruptedException, ClassNotFoundException {
232 String jobName =
233 "Scan" + (start != null ? start.toUpperCase() : "Empty") + "To" +
234 (stop != null ? stop.toUpperCase() : "Empty");
235 LOG.info("Before map/reduce startup - job " + jobName);
236 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
237
238 c.set(KEY_STARTROW, start != null ? start : "");
239 c.set(KEY_LASTROW, last != null ? last : "");
240
241 List<Scan> scans = new ArrayList<Scan>();
242
243 for (String tableName : TABLES) {
244 Scan scan = new Scan();
245
246 scan.addFamily(INPUT_FAMILY);
247 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
248
249 if (start != null) {
250 scan.setStartRow(Bytes.toBytes(start));
251 }
252 if (stop != null) {
253 scan.setStopRow(Bytes.toBytes(stop));
254 }
255
256 scans.add(scan);
257
258 LOG.info("scan before: " + scan);
259 }
260
261 runJob(jobName, c, scans);
262 }
263
264 protected void runJob(String jobName, Configuration c, List<Scan> scans)
265 throws IOException, InterruptedException, ClassNotFoundException {
266 Job job = new Job(c, jobName);
267
268 initJob(scans, job);
269 job.setReducerClass(ScanReducer.class);
270 job.setNumReduceTasks(1);
271 FileOutputFormat.setOutputPath(job,
272 new Path(TEST_UTIL.getDataTestDirOnTestFS(), job.getJobName()));
273 LOG.info("Started " + job.getJobName());
274 job.waitForCompletion(true);
275 assertTrue(job.isSuccessful());
276 LOG.info("After map/reduce completion - job " + jobName);
277 }
278
279 protected abstract void initJob(List<Scan> scans, Job job) throws IOException;
280
281
282 }