1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.NavigableMap;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.client.Result;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.io.NullWritable;
41 import org.apache.hadoop.mapreduce.InputSplit;
42 import org.apache.hadoop.mapreduce.Job;
43 import org.apache.hadoop.mapreduce.Reducer;
44 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
45 import org.junit.AfterClass;
46 import org.junit.Assert;
47 import org.junit.BeforeClass;
48
49
50
51
52
53
54
55
56
57
58
59
60
61 public abstract class TestTableInputFormatScanBase {
62
63 private static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
64 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
65
66 static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
67 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
68 static final String KEY_STARTROW = "startRow";
69 static final String KEY_LASTROW = "stpRow";
70
71 private static HTable table = null;
72
73 @BeforeClass
74 public static void setUpBeforeClass() throws Exception {
75
76
77 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
78
79
80 TEST_UTIL.enableDebug(TableInputFormat.class);
81 TEST_UTIL.enableDebug(TableInputFormatBase.class);
82 TEST_UTIL.setJobWithoutMRCluster();
83
84 TEST_UTIL.startMiniCluster(3);
85
86 table = TEST_UTIL.createMultiRegionTable(TableName.valueOf(TABLE_NAME), INPUT_FAMILY);
87 TEST_UTIL.loadTable(table, INPUT_FAMILY, false);
88 }
89
90 @AfterClass
91 public static void tearDownAfterClass() throws Exception {
92 TEST_UTIL.shutdownMiniCluster();
93 }
94
95
96
97
98 public static class ScanMapper
99 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
100
101
102
103
104
105
106
107
108
109 @Override
110 public void map(ImmutableBytesWritable key, Result value,
111 Context context)
112 throws IOException, InterruptedException {
113 if (value.size() != 1) {
114 throw new IOException("There should only be one input column");
115 }
116 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
117 cf = value.getMap();
118 if(!cf.containsKey(INPUT_FAMILY)) {
119 throw new IOException("Wrong input columns. Missing: '" +
120 Bytes.toString(INPUT_FAMILY) + "'.");
121 }
122 String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
123 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
124 ", value -> " + val);
125 context.write(key, key);
126 }
127
128 }
129
130
131
132
133 public static class ScanReducer
134 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
135 NullWritable, NullWritable> {
136
137 private String first = null;
138 private String last = null;
139
140 protected void reduce(ImmutableBytesWritable key,
141 Iterable<ImmutableBytesWritable> values, Context context)
142 throws IOException ,InterruptedException {
143 int count = 0;
144 for (ImmutableBytesWritable value : values) {
145 String val = Bytes.toStringBinary(value.get());
146 LOG.info("reduce: key[" + count + "] -> " +
147 Bytes.toStringBinary(key.get()) + ", value -> " + val);
148 if (first == null) first = val;
149 last = val;
150 count++;
151 }
152 }
153
154 protected void cleanup(Context context)
155 throws IOException, InterruptedException {
156 Configuration c = context.getConfiguration();
157 String startRow = c.get(KEY_STARTROW);
158 String lastRow = c.get(KEY_LASTROW);
159 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
160 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
161 if (startRow != null && startRow.length() > 0) {
162 assertEquals(startRow, first);
163 }
164 if (lastRow != null && lastRow.length() > 0) {
165 assertEquals(lastRow, last);
166 }
167 }
168
169 }
170
171
172
173
174
175
176
177
178 protected void testScanFromConfiguration(String start, String stop, String last)
179 throws IOException, InterruptedException, ClassNotFoundException {
180 String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") +
181 "To" + (stop != null ? stop.toUpperCase() : "Empty");
182 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
183 c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME));
184 c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY));
185 c.set(KEY_STARTROW, start != null ? start : "");
186 c.set(KEY_LASTROW, last != null ? last : "");
187
188 if (start != null) {
189 c.set(TableInputFormat.SCAN_ROW_START, start);
190 }
191
192 if (stop != null) {
193 c.set(TableInputFormat.SCAN_ROW_STOP, stop);
194 }
195
196 Job job = new Job(c, jobName);
197 job.setMapperClass(ScanMapper.class);
198 job.setReducerClass(ScanReducer.class);
199 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
200 job.setMapOutputValueClass(ImmutableBytesWritable.class);
201 job.setInputFormatClass(TableInputFormat.class);
202 job.setNumReduceTasks(1);
203 FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
204 TableMapReduceUtil.addDependencyJars(job);
205 assertTrue(job.waitForCompletion(true));
206 }
207
208
209
210
211
212
213
214
215 protected void testScan(String start, String stop, String last)
216 throws IOException, InterruptedException, ClassNotFoundException {
217 String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
218 "To" + (stop != null ? stop.toUpperCase() : "Empty");
219 LOG.info("Before map/reduce startup - job " + jobName);
220 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
221 Scan scan = new Scan();
222 scan.addFamily(INPUT_FAMILY);
223 if (start != null) {
224 scan.setStartRow(Bytes.toBytes(start));
225 }
226 c.set(KEY_STARTROW, start != null ? start : "");
227 if (stop != null) {
228 scan.setStopRow(Bytes.toBytes(stop));
229 }
230 c.set(KEY_LASTROW, last != null ? last : "");
231 LOG.info("scan before: " + scan);
232 Job job = new Job(c, jobName);
233 TableMapReduceUtil.initTableMapperJob(
234 Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
235 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
236 job.setReducerClass(ScanReducer.class);
237 job.setNumReduceTasks(1);
238 FileOutputFormat.setOutputPath(job,
239 new Path(TEST_UTIL.getDataTestDir(), job.getJobName()));
240 LOG.info("Started " + job.getJobName());
241 assertTrue(job.waitForCompletion(true));
242 LOG.info("After map/reduce completion - job " + jobName);
243 }
244
245
246
247
248
249
250
251
252
253 public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
254 InterruptedException,
255 ClassNotFoundException {
256 String jobName = "TestJobForNumOfSplits";
257 LOG.info("Before map/reduce startup - job " + jobName);
258 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
259 Scan scan = new Scan();
260 scan.addFamily(INPUT_FAMILY);
261 c.set("hbase.mapreduce.input.autobalance", "true");
262 c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
263 c.set(KEY_STARTROW, "");
264 c.set(KEY_LASTROW, "");
265 Job job = new Job(c, jobName);
266 TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
267 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
268 TableInputFormat tif = new TableInputFormat();
269 tif.setConf(job.getConfiguration());
270 Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
271 List<InputSplit> splits = tif.getSplits(job);
272 Assert.assertEquals(expectedNumOfSplits, splits.size());
273 }
274
275
276
277
278 public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) {
279 byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
280 Assert.assertArrayEquals(splitKey, result);
281 }
282 }
283