1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22
23 import com.google.common.base.Joiner;
24
25 import com.google.common.collect.Sets;
26 import org.apache.commons.cli.CommandLine;
27 import org.apache.commons.lang.RandomStringUtils;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellUtil;
34 import org.apache.hadoop.hbase.HBaseConfiguration;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.IntegrationTestBase;
38 import org.apache.hadoop.hbase.IntegrationTestingUtility;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.client.Admin;
42 import org.apache.hadoop.hbase.client.Connection;
43 import org.apache.hadoop.hbase.client.ConnectionFactory;
44 import org.apache.hadoop.hbase.client.Consistency;
45 import org.apache.hadoop.hbase.client.RegionLocator;
46 import org.apache.hadoop.hbase.client.Result;
47 import org.apache.hadoop.hbase.client.Scan;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
50 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
51 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
52 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
53 import org.apache.hadoop.hbase.regionserver.InternalScanner;
54 import org.apache.hadoop.hbase.regionserver.RegionScanner;
55 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58 import org.apache.hadoop.hbase.util.RegionSplitter;
59 import org.apache.hadoop.io.LongWritable;
60 import org.apache.hadoop.io.NullWritable;
61 import org.apache.hadoop.io.Writable;
62 import org.apache.hadoop.io.WritableComparable;
63 import org.apache.hadoop.io.WritableComparator;
64 import org.apache.hadoop.io.WritableUtils;
65 import org.apache.hadoop.mapreduce.InputFormat;
66 import org.apache.hadoop.mapreduce.InputSplit;
67 import org.apache.hadoop.mapreduce.Job;
68 import org.apache.hadoop.mapreduce.JobContext;
69 import org.apache.hadoop.mapreduce.Mapper;
70 import org.apache.hadoop.mapreduce.Partitioner;
71 import org.apache.hadoop.mapreduce.RecordReader;
72 import org.apache.hadoop.mapreduce.Reducer;
73 import org.apache.hadoop.mapreduce.TaskAttemptContext;
74 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
75 import org.apache.hadoop.util.ToolRunner;
76 import org.junit.Test;
77 import org.junit.experimental.categories.Category;
78
79 import java.io.DataInput;
80 import java.io.DataOutput;
81 import java.io.IOException;
82 import java.util.ArrayList;
83 import java.util.HashSet;
84 import java.util.List;
85 import java.util.Map;
86 import java.util.Random;
87 import java.util.Set;
88 import java.util.concurrent.atomic.AtomicLong;
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 @Category(IntegrationTests.class)
124 public class IntegrationTestBulkLoad extends IntegrationTestBase {
125
126 private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
127
128 private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
129 private static final byte[] SORT_FAM = Bytes.toBytes("S");
130 private static final byte[] DATA_FAM = Bytes.toBytes("D");
131
132 private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
133 private static int CHAIN_LENGTH = 500000;
134
135 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
136 private static int NUM_MAPS = 1;
137
138 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
139 private static int NUM_IMPORT_ROUNDS = 1;
140
141 private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
142
143 private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
144 private static String TABLE_NAME = "IntegrationTestBulkLoad";
145
146 private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
147 private static int NUM_REPLICA_COUNT_DEFAULT = 1;
148
149 private static final String OPT_LOAD = "load";
150 private static final String OPT_CHECK = "check";
151
152 private boolean load = false;
153 private boolean check = false;
154
155 public static class SlowMeCoproScanOperations extends BaseRegionObserver {
156 static final AtomicLong sleepTime = new AtomicLong(2000);
157 Random r = new Random();
158 AtomicLong countOfNext = new AtomicLong(0);
159 AtomicLong countOfOpen = new AtomicLong(0);
160 public SlowMeCoproScanOperations() {}
161 @Override
162 public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
163 final Scan scan, final RegionScanner s) throws IOException {
164 if (countOfOpen.incrementAndGet() == 2) {
165 slowdownCode(e);
166 }
167 return s;
168 }
169
170 @Override
171 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
172 final InternalScanner s, final List<Result> results,
173 final int limit, final boolean hasMore) throws IOException {
174
175
176 countOfNext.incrementAndGet();
177 if (countOfNext.get() == 0 || countOfNext.get() == 4) {
178 slowdownCode(e);
179 }
180 return true;
181 }
182 protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
183 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
184 try {
185 if (sleepTime.get() > 0) {
186 LOG.info("Sleeping for " + sleepTime.get() + " ms");
187 Thread.sleep(sleepTime.get());
188 }
189 } catch (InterruptedException e1) {
190 LOG.error(e1);
191 }
192 }
193 }
194 }
195
196
197
198
199 private void installSlowingCoproc() throws IOException, InterruptedException {
200 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
201 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
202
203 TableName t = getTablename();
204 Admin admin = util.getHBaseAdmin();
205 HTableDescriptor desc = admin.getTableDescriptor(t);
206 desc.addCoprocessor(SlowMeCoproScanOperations.class.getName());
207 HBaseTestingUtility.modifyTableSync(admin, desc);
208
209
210 Thread.sleep(30000);
211 }
212
213 @Test
214 public void testBulkLoad() throws Exception {
215 runLoad();
216 installSlowingCoproc();
217 runCheck();
218 }
219
220 public void runLoad() throws Exception {
221 setupTable();
222 int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
223 LOG.info("Running load with numIterations:" + numImportRounds);
224 for (int i = 0; i < numImportRounds; i++) {
225 runLinkedListMRJob(i);
226 }
227 }
228
229 private byte[][] getSplits(int numRegions) {
230 RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
231 split.setFirstRow(Bytes.toBytes(0L));
232 split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
233 return split.split(numRegions);
234 }
235
236 private void setupTable() throws IOException, InterruptedException {
237 if (util.getHBaseAdmin().tableExists(getTablename())) {
238 util.deleteTable(getTablename());
239 }
240
241 util.createTable(
242 getTablename().getName(),
243 new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
244 getSplits(16)
245 );
246
247 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
248 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
249
250 TableName t = getTablename();
251 HBaseTestingUtility.setReplicas(util.getHBaseAdmin(), t, replicaCount);
252 }
253
254 private void runLinkedListMRJob(int iteration) throws Exception {
255 String jobName = IntegrationTestBulkLoad.class.getSimpleName() + " - " +
256 EnvironmentEdgeManager.currentTime();
257 Configuration conf = new Configuration(util.getConfiguration());
258 Path p = null;
259 if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
260 p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
261 } else {
262 p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
263 }
264
265 conf.setBoolean("mapreduce.map.speculative", false);
266 conf.setBoolean("mapreduce.reduce.speculative", false);
267 conf.setInt(ROUND_NUM_KEY, iteration);
268
269 Job job = new Job(conf);
270
271 job.setJobName(jobName);
272
273
274 job.setInputFormatClass(ITBulkLoadInputFormat.class);
275
276
277 job.setMapperClass(LinkedListCreationMapper.class);
278 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
279 job.setMapOutputValueClass(KeyValue.class);
280
281
282
283
284
285 job.setJarByClass(getClass());
286
287
288 FileOutputFormat.setOutputPath(job, p);
289 try (Connection conn = ConnectionFactory.createConnection(conf);
290 Admin admin = conn.getAdmin();
291 Table table = conn.getTable(getTablename());
292 RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
293
294
295 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
296
297
298 assertEquals(true, job.waitForCompletion(true));
299
300
301 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
302
303
304 loader.doBulkLoad(p, admin, table, regionLocator);
305 }
306
307
308 util.getTestFileSystem().delete(p, true);
309 }
310
311 public static class EmptySplit extends InputSplit implements Writable {
312 @Override
313 public void write(DataOutput out) throws IOException { }
314 @Override
315 public void readFields(DataInput in) throws IOException { }
316 @Override
317 public long getLength() { return 0L; }
318 @Override
319 public String[] getLocations() { return new String[0]; }
320 }
321
322 public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
323 private int index = -1;
324 private K[] keys;
325 private V[] values;
326
327 public FixedRecordReader(K[] keys, V[] values) {
328 this.keys = keys;
329 this.values = values;
330 }
331 @Override
332 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
333 InterruptedException { }
334 @Override
335 public boolean nextKeyValue() throws IOException, InterruptedException {
336 return ++index < keys.length;
337 }
338 @Override
339 public K getCurrentKey() throws IOException, InterruptedException {
340 return keys[index];
341 }
342 @Override
343 public V getCurrentValue() throws IOException, InterruptedException {
344 return values[index];
345 }
346 @Override
347 public float getProgress() throws IOException, InterruptedException {
348 return (float)index / keys.length;
349 }
350 @Override
351 public void close() throws IOException {
352 }
353 }
354
355 public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
356 @Override
357 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
358 int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
359 ArrayList<InputSplit> ret = new ArrayList<InputSplit>(numSplits);
360 for (int i = 0; i < numSplits; ++i) {
361 ret.add(new EmptySplit());
362 }
363 return ret;
364 }
365
366 @Override
367 public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
368 TaskAttemptContext context)
369 throws IOException, InterruptedException {
370 int taskId = context.getTaskAttemptID().getTaskID().getId();
371 int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
372 int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
373 int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
374
375 taskId = taskId + iteration * numMapTasks;
376 numMapTasks = numMapTasks * numIterations;
377
378 long chainId = Math.abs(new Random().nextLong());
379 chainId = chainId - (chainId % numMapTasks) + taskId;
380 LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
381
382 return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
383 }
384 }
385
386
387
388
389
390
391
392
393 public static class LinkedListCreationMapper
394 extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
395
396 private Random rand = new Random();
397
398 @Override
399 protected void map(LongWritable key, LongWritable value, Context context)
400 throws IOException, InterruptedException {
401 long chainId = value.get();
402 LOG.info("Starting mapper with chainId:" + chainId);
403
404 byte[] chainIdArray = Bytes.toBytes(chainId);
405 long currentRow = 0;
406
407 long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
408 long nextRow = getNextRow(0, chainLength);
409
410 for (long i = 0; i < chainLength; i++) {
411 byte[] rk = Bytes.toBytes(currentRow);
412
413
414 KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
415
416 KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
417
418 KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
419 Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
420 );
421
422
423 context.write(new ImmutableBytesWritable(rk), linkKv);
424 context.write(new ImmutableBytesWritable(rk), sortKv);
425 context.write(new ImmutableBytesWritable(rk), dataKv);
426
427 currentRow = nextRow;
428 nextRow = getNextRow(i+1, chainLength);
429 }
430 }
431
432
433 private long getNextRow(long index, long chainLength) {
434 long nextRow = Math.abs(rand.nextLong());
435
436
437
438 nextRow = nextRow - (nextRow % chainLength) + index;
439 return nextRow;
440 }
441 }
442
443
444
445
446
447
448 public static class LinkKey implements WritableComparable<LinkKey> {
449
450 private Long chainId;
451
452 public Long getOrder() {
453 return order;
454 }
455
456 public Long getChainId() {
457 return chainId;
458 }
459
460 private Long order;
461
462 public LinkKey() {}
463
464 public LinkKey(long chainId, long order) {
465 this.chainId = chainId;
466 this.order = order;
467 }
468
469 @Override
470 public int compareTo(LinkKey linkKey) {
471 int res = getChainId().compareTo(linkKey.getChainId());
472 if (res == 0) {
473 res = getOrder().compareTo(linkKey.getOrder());
474 }
475 return res;
476 }
477
478 @Override
479 public void write(DataOutput dataOutput) throws IOException {
480 WritableUtils.writeVLong(dataOutput, chainId);
481 WritableUtils.writeVLong(dataOutput, order);
482 }
483
484 @Override
485 public void readFields(DataInput dataInput) throws IOException {
486 chainId = WritableUtils.readVLong(dataInput);
487 order = WritableUtils.readVLong(dataInput);
488 }
489 }
490
491
492
493
494 public static class LinkChain implements WritableComparable<LinkChain> {
495
496 public Long getNext() {
497 return next;
498 }
499
500 public Long getRk() {
501 return rk;
502 }
503
504 public LinkChain() {}
505
506 public LinkChain(Long rk, Long next) {
507 this.rk = rk;
508 this.next = next;
509 }
510
511 private Long rk;
512 private Long next;
513
514 @Override
515 public int compareTo(LinkChain linkChain) {
516 int res = getRk().compareTo(linkChain.getRk());
517 if (res == 0) {
518 res = getNext().compareTo(linkChain.getNext());
519 }
520 return res;
521 }
522
523 @Override
524 public void write(DataOutput dataOutput) throws IOException {
525 WritableUtils.writeVLong(dataOutput, rk);
526 WritableUtils.writeVLong(dataOutput, next);
527 }
528
529 @Override
530 public void readFields(DataInput dataInput) throws IOException {
531 rk = WritableUtils.readVLong(dataInput);
532 next = WritableUtils.readVLong(dataInput);
533 }
534 }
535
536
537
538
539
540 public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
541 @Override
542 public int getPartition(LinkKey linkKey,
543 LinkChain linkChain,
544 int numPartitions) {
545 int hash = linkKey.getChainId().hashCode();
546 return Math.abs(hash % numPartitions);
547 }
548 }
549
550
551
552
553
554 public static class NaturalKeyGroupingComparator extends WritableComparator {
555
556 protected NaturalKeyGroupingComparator() {
557 super(LinkKey.class, true);
558 }
559
560 @Override
561 public int compare(WritableComparable w1, WritableComparable w2) {
562 LinkKey k1 = (LinkKey) w1;
563 LinkKey k2 = (LinkKey) w2;
564
565 return k1.getChainId().compareTo(k2.getChainId());
566 }
567 }
568
569
570
571
572
573 public static class CompositeKeyComparator extends WritableComparator {
574
575 protected CompositeKeyComparator() {
576 super(LinkKey.class, true);
577 }
578
579 @Override
580 public int compare(WritableComparable w1, WritableComparable w2) {
581 LinkKey k1 = (LinkKey) w1;
582 LinkKey k2 = (LinkKey) w2;
583
584 return k1.compareTo(k2);
585 }
586 }
587
588
589
590
591
592
593
594 public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
595 @Override
596 protected void map(ImmutableBytesWritable key, Result value, Context context)
597 throws IOException, InterruptedException {
598 long longRk = Bytes.toLong(value.getRow());
599
600 for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
601 long chainId = Bytes.toLong(entry.getKey());
602 long next = Bytes.toLong(entry.getValue());
603 Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
604 long order = Bytes.toLong(CellUtil.cloneValue(c));
605 context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
606 }
607 }
608 }
609
610
611
612
613
614
615
616
617
618
619 public static class LinkedListCheckingReducer
620 extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
621 @Override
622 protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
623 throws java.io.IOException, java.lang.InterruptedException {
624 long next = -1L;
625 long prev = -1L;
626 long count = 0L;
627
628 for (LinkChain lc : values) {
629
630 if (next == -1) {
631 if (lc.getRk() != 0L) {
632 String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
633 + ". Chain:" + key.chainId + ", order:" + key.order;
634 logError(msg, context);
635 throw new RuntimeException(msg);
636 }
637 next = lc.getNext();
638 } else {
639 if (next != lc.getRk()) {
640 String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
641 + next + " but got " + lc.getRk() + ". Chain:" + key.chainId
642 + ", order:" + key.order;
643 logError(msg, context);
644 throw new RuntimeException(msg);
645 }
646 prev = lc.getRk();
647 next = lc.getNext();
648 }
649 count++;
650 }
651
652 int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
653 if (count != expectedChainLen) {
654 String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got "
655 + count + ". Chain:" + key.chainId + ", order:" + key.order;
656 logError(msg, context);
657 throw new RuntimeException(msg);
658 }
659 }
660
661 private static void logError(String msg, Context context) throws IOException {
662 HBaseTestingUtility util = new HBaseTestingUtility(context.getConfiguration());
663 TableName table = getTableName(context.getConfiguration());
664
665 LOG.error("Failure in chain verification: " + msg);
666 LOG.error("cluster status:\n" + util.getHBaseClusterInterface().getClusterStatus());
667 LOG.error("table regions:\n"
668 + Joiner.on("\n").join(util.getHBaseAdmin().getTableRegions(table)));
669 }
670 }
671
672
673
674
675
676
677
678 private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
679 LOG.info("Running check");
680 Configuration conf = getConf();
681 String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
682 Path p = util.getDataTestDirOnTestFS(jobName);
683
684 Job job = new Job(conf);
685 job.setJarByClass(getClass());
686 job.setJobName(jobName);
687
688 job.setPartitionerClass(NaturalKeyPartitioner.class);
689 job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
690 job.setSortComparatorClass(CompositeKeyComparator.class);
691
692 Scan scan = new Scan();
693 scan.addFamily(CHAIN_FAM);
694 scan.addFamily(SORT_FAM);
695 scan.setMaxVersions(1);
696 scan.setCacheBlocks(false);
697 scan.setBatch(1000);
698
699 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
700 if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
701 scan.setConsistency(Consistency.TIMELINE);
702 }
703
704 TableMapReduceUtil.initTableMapperJob(
705 getTablename().getName(),
706 scan,
707 LinkedListCheckingMapper.class,
708 LinkKey.class,
709 LinkChain.class,
710 job
711 );
712
713 job.setReducerClass(LinkedListCheckingReducer.class);
714 job.setOutputKeyClass(NullWritable.class);
715 job.setOutputValueClass(NullWritable.class);
716
717 FileOutputFormat.setOutputPath(job, p);
718
719 assertEquals(true, job.waitForCompletion(true));
720
721
722 util.getTestFileSystem().delete(p, true);
723 }
724
725 @Override
726 public void setUpCluster() throws Exception {
727 util = getTestingUtil(getConf());
728 util.initializeCluster(1);
729 int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
730 if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
731 LOG.debug("Region Replicas enabled: " + replicaCount);
732 }
733
734
735 if (util.isDistributedCluster()) {
736 util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
737 Integer.toString(util.getHBaseAdmin().getClusterStatus().getServersSize() * 10)
738 );
739 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
740 } else {
741 util.startMiniMapReduceCluster();
742 }
743 }
744
745 @Override
746 protected void addOptions() {
747 super.addOptions();
748 super.addOptNoArg(OPT_CHECK, "Run check only");
749 super.addOptNoArg(OPT_LOAD, "Run load only");
750 }
751
752 @Override
753 protected void processOptions(CommandLine cmd) {
754 super.processOptions(cmd);
755 check = cmd.hasOption(OPT_CHECK);
756 load = cmd.hasOption(OPT_LOAD);
757 }
758
759 @Override
760 public int runTestFromCommandLine() throws Exception {
761 if (load) {
762 runLoad();
763 } else if (check) {
764 installSlowingCoproc();
765 runCheck();
766 } else {
767 testBulkLoad();
768 }
769 return 0;
770 }
771
772 @Override
773 public TableName getTablename() {
774 return getTableName(getConf());
775 }
776
777 public static TableName getTableName(Configuration conf) {
778 return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
779 }
780
781 @Override
782 protected Set<String> getColumnFamilies() {
783 return Sets.newHashSet(Bytes.toString(CHAIN_FAM) , Bytes.toString(DATA_FAM),
784 Bytes.toString(SORT_FAM));
785 }
786
787 public static void main(String[] args) throws Exception {
788 Configuration conf = HBaseConfiguration.create();
789 IntegrationTestingUtility.setUseDistributedCluster(conf);
790 int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
791 System.exit(status);
792 }
793 }