1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.test;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.security.SecureRandom;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Random;
32 import java.util.Set;
33 import java.util.SortedSet;
34 import java.util.TreeSet;
35 import java.util.UUID;
36 import java.util.concurrent.ThreadLocalRandom;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.commons.cli.CommandLine;
40 import org.apache.commons.cli.GnuParser;
41 import org.apache.commons.cli.HelpFormatter;
42 import org.apache.commons.cli.Options;
43 import org.apache.commons.cli.ParseException;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.conf.Configured;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.LocatedFileStatus;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.fs.RemoteIterator;
52 import org.apache.hadoop.hbase.Cell;
53 import org.apache.hadoop.hbase.HBaseConfiguration;
54 import org.apache.hadoop.hbase.HBaseTestingUtility;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionLocation;
58 import org.apache.hadoop.hbase.HTableDescriptor;
59 import org.apache.hadoop.hbase.client.HTable;
60 import org.apache.hadoop.hbase.client.Table;
61 import org.apache.hadoop.hbase.IntegrationTestBase;
62 import org.apache.hadoop.hbase.IntegrationTestingUtility;
63 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
64 import org.apache.hadoop.hbase.fs.HFileSystem;
65 import org.apache.hadoop.hbase.MasterNotRunningException;
66 import org.apache.hadoop.hbase.TableName;
67 import org.apache.hadoop.hbase.client.Admin;
68 import org.apache.hadoop.hbase.client.BufferedMutator;
69 import org.apache.hadoop.hbase.client.BufferedMutatorParams;
70 import org.apache.hadoop.hbase.client.Connection;
71 import org.apache.hadoop.hbase.client.ConnectionFactory;
72 import org.apache.hadoop.hbase.client.Get;
73 import org.apache.hadoop.hbase.client.Mutation;
74 import org.apache.hadoop.hbase.client.Put;
75 import org.apache.hadoop.hbase.client.RegionLocator;
76 import org.apache.hadoop.hbase.client.Result;
77 import org.apache.hadoop.hbase.client.ResultScanner;
78 import org.apache.hadoop.hbase.client.Scan;
79 import org.apache.hadoop.hbase.client.ScannerCallable;
80 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
81 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
82 import org.apache.hadoop.hbase.mapreduce.TableMapper;
83 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
84 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
85 import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
86 import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
87 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
88 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
89 import org.apache.hadoop.hbase.util.Bytes;
90 import org.apache.hadoop.hbase.util.RegionSplitter;
91 import org.apache.hadoop.hbase.wal.WALKey;
92 import org.apache.hadoop.io.BytesWritable;
93 import org.apache.hadoop.io.NullWritable;
94 import org.apache.hadoop.io.Writable;
95 import org.apache.hadoop.mapreduce.Counter;
96 import org.apache.hadoop.mapreduce.CounterGroup;
97 import org.apache.hadoop.mapreduce.Counters;
98 import org.apache.hadoop.mapreduce.InputFormat;
99 import org.apache.hadoop.mapreduce.InputSplit;
100 import org.apache.hadoop.mapreduce.Job;
101 import org.apache.hadoop.mapreduce.JobContext;
102 import org.apache.hadoop.mapreduce.Mapper;
103 import org.apache.hadoop.mapreduce.RecordReader;
104 import org.apache.hadoop.mapreduce.Reducer;
105 import org.apache.hadoop.mapreduce.TaskAttemptContext;
106 import org.apache.hadoop.mapreduce.TaskAttemptID;
107 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
108 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
109 import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
110 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
111 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
112 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
113 import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
114 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
115 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
116 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
117 import org.apache.hadoop.util.Tool;
118 import org.apache.hadoop.util.ToolRunner;
119 import org.junit.Test;
120 import org.junit.experimental.categories.Category;
121
122 import com.google.common.collect.Sets;
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187 @Category(IntegrationTests.class)
188 public class IntegrationTestBigLinkedList extends IntegrationTestBase {
189 protected static final byte[] NO_KEY = new byte[1];
190
191 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
192
193 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
194
195 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
196 private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
197 private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
198
199
200 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
201
202
203 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
204
205
206 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
207
208
209 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
210 = "IntegrationTestBigLinkedList.generator.num_rows";
211
212 private static final String GENERATOR_NUM_MAPPERS_KEY
213 = "IntegrationTestBigLinkedList.generator.map.tasks";
214
215 private static final String GENERATOR_WIDTH_KEY
216 = "IntegrationTestBigLinkedList.generator.width";
217
218 private static final String GENERATOR_WRAP_KEY
219 = "IntegrationTestBigLinkedList.generator.wrap";
220
221 protected int NUM_SLAVES_BASE = 3;
222
223 private static final int MISSING_ROWS_TO_LOG = 10;
224
225 private static final int WIDTH_DEFAULT = 1000000;
226 private static final int WRAP_DEFAULT = 25;
227 private static final int ROWKEY_LENGTH = 16;
228
229 protected String toRun;
230 protected String[] otherArgs;
231
232 static class CINode {
233 byte[] key;
234 byte[] prev;
235 String client;
236 long count;
237 }
238
239
240
241
242 static class Generator extends Configured implements Tool {
243
244 private static final Log LOG = LogFactory.getLog(Generator.class);
245
246
247
248
249
250
251
252
253
254
255
256
257 public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
258 "generator.multiple.columnfamilies";
259
260 static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
261 static class GeneratorInputSplit extends InputSplit implements Writable {
262 @Override
263 public long getLength() throws IOException, InterruptedException {
264 return 1;
265 }
266 @Override
267 public String[] getLocations() throws IOException, InterruptedException {
268 return new String[0];
269 }
270 @Override
271 public void readFields(DataInput arg0) throws IOException {
272 }
273 @Override
274 public void write(DataOutput arg0) throws IOException {
275 }
276 }
277
278 static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
279 private long count;
280 private long numNodes;
281 private Random rand;
282
283 @Override
284 public void close() throws IOException {
285 }
286
287 @Override
288 public BytesWritable getCurrentKey() throws IOException, InterruptedException {
289 byte[] bytes = new byte[ROWKEY_LENGTH];
290 rand.nextBytes(bytes);
291 return new BytesWritable(bytes);
292 }
293
294 @Override
295 public NullWritable getCurrentValue() throws IOException, InterruptedException {
296 return NullWritable.get();
297 }
298
299 @Override
300 public float getProgress() throws IOException, InterruptedException {
301 return (float)(count / (double)numNodes);
302 }
303
304 @Override
305 public void initialize(InputSplit arg0, TaskAttemptContext context)
306 throws IOException, InterruptedException {
307 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
308
309 rand = new SecureRandom();
310 }
311
312 @Override
313 public boolean nextKeyValue() throws IOException, InterruptedException {
314 return count++ < numNodes;
315 }
316
317 }
318
319 @Override
320 public RecordReader<BytesWritable,NullWritable> createRecordReader(
321 InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
322 GeneratorRecordReader rr = new GeneratorRecordReader();
323 rr.initialize(split, context);
324 return rr;
325 }
326
327 @Override
328 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
329 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
330
331 ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
332
333 for (int i = 0; i < numMappers; i++) {
334 splits.add(new GeneratorInputSplit());
335 }
336
337 return splits;
338 }
339 }
340
341
342 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
343 @Override
344 protected boolean isSplitable(JobContext context, Path filename) {
345 return false;
346 }
347 }
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375 static class GeneratorMapper
376 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
377
378 byte[][] first = null;
379 byte[][] prev = null;
380 byte[][] current = null;
381 byte[] id;
382 long count = 0;
383 int i;
384 BufferedMutator mutator;
385 Connection connection;
386 long numNodes;
387 long wrap;
388 int width;
389 boolean multipleUnevenColumnFamilies;
390 byte[] tinyValue = new byte[] { 't' };
391 byte[] bigValue = null;
392
393 @Override
394 protected void setup(Context context) throws IOException, InterruptedException {
395 id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
396 Configuration conf = context.getConfiguration();
397 connection = ConnectionFactory.createConnection(conf);
398 instantiateHTable();
399 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
400 current = new byte[this.width][];
401 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
402 this.wrap = (long)wrapMultiplier * width;
403 this.numNodes = context.getConfiguration().getLong(
404 GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
405 if (this.numNodes < this.wrap) {
406 this.wrap = this.numNodes;
407 }
408 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
409 }
410
411 protected void instantiateHTable() throws IOException {
412 mutator = connection.getBufferedMutator(
413 new BufferedMutatorParams(getTableName(connection.getConfiguration()))
414 .writeBufferSize(4 * 1024 * 1024));
415 }
416
417 @Override
418 protected void cleanup(Context context) throws IOException ,InterruptedException {
419 mutator.close();
420 connection.close();
421 }
422
423 @Override
424 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
425 current[i] = new byte[key.getLength()];
426 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
427 if (++i == current.length) {
428 LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" +
429 Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) +
430 ", i=" + i);
431 persist(output, count, prev, current, id);
432 i = 0;
433
434 if (first == null) {
435 first = current;
436 }
437 prev = current;
438 current = new byte[this.width][];
439
440 count += current.length;
441 output.setStatus("Count " + count);
442
443 if (count % wrap == 0) {
444
445
446 circularLeftShift(first);
447
448 persist(output, -1, prev, first, null);
449
450 first = null;
451 prev = null;
452 }
453 }
454 }
455
456 private static <T> void circularLeftShift(T[] first) {
457 T ez = first[0];
458 System.arraycopy(first, 1, first, 0, first.length - 1);
459 first[first.length - 1] = ez;
460 }
461
462 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
463 throws IOException {
464 for (int i = 0; i < current.length; i++) {
465 Put put = new Put(current[i]);
466 put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
467
468 if (count >= 0) {
469 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
470 }
471 if (id != null) {
472 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
473 }
474
475 if (this.multipleUnevenColumnFamilies) {
476
477 put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
478
479 if (this.bigValue == null) {
480 this.bigValue = new byte[current[i].length * 10];
481 ThreadLocalRandom.current().nextBytes(this.bigValue);
482 }
483
484 put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
485 }
486 mutator.mutate(put);
487
488 if (i % 1000 == 0) {
489
490 output.progress();
491 }
492 }
493
494 mutator.flush();
495 }
496 }
497
498 @Override
499 public int run(String[] args) throws Exception {
500 if (args.length < 3) {
501 System.out.println("Usage : " + Generator.class.getSimpleName() +
502 " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
503 System.out.println(" where <num nodes per map> should be a multiple of " +
504 " width*wrap multiplier, 25M by default");
505 return 0;
506 }
507
508 int numMappers = Integer.parseInt(args[0]);
509 long numNodes = Long.parseLong(args[1]);
510 Path tmpOutput = new Path(args[2]);
511 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
512 Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
513 return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
514 }
515
516 protected void createSchema() throws IOException {
517 Configuration conf = getConf();
518 TableName tableName = getTableName(conf);
519 try (Connection conn = ConnectionFactory.createConnection(conf);
520 Admin admin = conn.getAdmin()) {
521 if (!admin.tableExists(tableName)) {
522 HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
523 htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
524
525 htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
526 htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
527
528
529 if (conf.getBoolean(HBaseTestingUtility.PRESPLIT_TEST_TABLE_KEY,
530 HBaseTestingUtility.PRESPLIT_TEST_TABLE)) {
531 int numberOfServers = admin.getClusterStatus().getServers().size();
532 if (numberOfServers == 0) {
533 throw new IllegalStateException("No live regionservers");
534 }
535 int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
536 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
537 int totalNumberOfRegions = numberOfServers * regionsPerServer;
538 LOG.info("Number of live regionservers: " + numberOfServers + ", " +
539 "pre-splitting table into " + totalNumberOfRegions + " regions " +
540 "(default regions per server: " + regionsPerServer + ")");
541
542
543 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
544
545 admin.createTable(htd, splits);
546 } else {
547
548
549
550 admin.createTable(htd);
551 }
552 }
553 } catch (MasterNotRunningException e) {
554 LOG.error("Master not running", e);
555 throw new IOException(e);
556 }
557 }
558
559 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
560 Integer width, Integer wrapMuplitplier) throws Exception {
561 LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
562 + ", numNodes=" + numNodes);
563 Job job = Job.getInstance(getConf());
564
565 job.setJobName("Random Input Generator");
566 job.setNumReduceTasks(0);
567 job.setJarByClass(getClass());
568
569 job.setInputFormatClass(GeneratorInputFormat.class);
570 job.setOutputKeyClass(BytesWritable.class);
571 job.setOutputValueClass(NullWritable.class);
572
573 setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
574
575 job.setMapperClass(Mapper.class);
576
577 FileOutputFormat.setOutputPath(job, tmpOutput);
578 job.setOutputFormatClass(SequenceFileOutputFormat.class);
579
580 boolean success = jobCompletion(job);
581
582 return success ? 0 : 1;
583 }
584
585 public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
586 Integer width, Integer wrapMuplitplier) throws Exception {
587 LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
588 createSchema();
589 Job job = Job.getInstance(getConf());
590
591 job.setJobName("Link Generator");
592 job.setNumReduceTasks(0);
593 job.setJarByClass(getClass());
594
595 FileInputFormat.setInputPaths(job, tmpOutput);
596 job.setInputFormatClass(OneFilePerMapperSFIF.class);
597 job.setOutputKeyClass(NullWritable.class);
598 job.setOutputValueClass(NullWritable.class);
599
600 setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
601
602 setMapperForGenerator(job);
603
604 job.setOutputFormatClass(NullOutputFormat.class);
605
606 job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
607 TableMapReduceUtil.addDependencyJars(job);
608 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
609 TableMapReduceUtil.initCredentials(job);
610
611 boolean success = jobCompletion(job);
612
613 return success ? 0 : 1;
614 }
615
616 protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
617 ClassNotFoundException {
618 boolean success = job.waitForCompletion(true);
619 return success;
620 }
621
622 protected void setMapperForGenerator(Job job) {
623 job.setMapperClass(GeneratorMapper.class);
624 }
625
626 public int run(int numMappers, long numNodes, Path tmpOutput,
627 Integer width, Integer wrapMuplitplier) throws Exception {
628 int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
629 if (ret > 0) {
630 return ret;
631 }
632 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
633 }
634 }
635
636
637
638
639
640
641
642 static class Search extends Configured implements Tool {
643 private static final Log LOG = LogFactory.getLog(Search.class);
644 protected Job job;
645
646 private static void printUsage(final String error) {
647 if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
648 System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
649 }
650
651 @Override
652 public int run(String[] args) throws Exception {
653 if (args.length < 1 || args.length > 2) {
654 printUsage(null);
655 return 1;
656 }
657 Path inputDir = new Path(args[0]);
658 int numMappers = 1;
659 if (args.length > 1) {
660 numMappers = Integer.parseInt(args[1]);
661 }
662 return run(inputDir, numMappers);
663 }
664
665
666
667
668 public static class WALSearcher extends WALPlayer {
669 public WALSearcher(Configuration conf) {
670 super(conf);
671 }
672
673
674
675
676 public static class WALMapperSearcher extends WALMapper {
677 private SortedSet<byte []> keysToFind;
678 private AtomicInteger rows = new AtomicInteger(0);
679
680 @Override
681 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
682 throws IOException {
683 super.setup(context);
684 try {
685 this.keysToFind = readKeysToSearch(context.getConfiguration());
686 LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
687 } catch (InterruptedException e) {
688 throw new InterruptedIOException(e.toString());
689 }
690 }
691
692 @Override
693 protected boolean filter(Context context, Cell cell) {
694
695 byte [] row = new byte [cell.getRowLength()];
696 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
697 boolean b = this.keysToFind.contains(row);
698 if (b) {
699 String keyStr = Bytes.toStringBinary(row);
700 try {
701 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
702 } catch (IOException|InterruptedException e) {
703 LOG.warn(e);
704 }
705 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
706 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
707 }
708 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
709 }
710 return b;
711 }
712 }
713
714
715 @Override
716 public Job createSubmittableJob(String[] args) throws IOException {
717 Job job = super.createSubmittableJob(args);
718
719 job.setJarByClass(WALMapperSearcher.class);
720 job.setMapperClass(WALMapperSearcher.class);
721 job.setOutputFormatClass(NullOutputFormat.class);
722 return job;
723 }
724 }
725
726 static final String FOUND_GROUP_KEY = "Found";
727 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
728
729 public int run(Path inputDir, int numMappers) throws Exception {
730 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
731 SortedSet<byte []> keys = readKeysToSearch(getConf());
732 if (keys.isEmpty()) throw new RuntimeException("No keys to find");
733 LOG.info("Count of keys to find: " + keys.size());
734 for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
735 Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
736
737 Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
738 Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
739 LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
740 " against " + getConf().get(HConstants.HBASE_DIR));
741 int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
742 if (ret != 0) return ret;
743 return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
744 }
745
746 static SortedSet<byte []> readKeysToSearch(final Configuration conf)
747 throws IOException, InterruptedException {
748 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
749 FileSystem fs = FileSystem.get(conf);
750 SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
751 if (!fs.exists(keysInputDir)) {
752 throw new FileNotFoundException(keysInputDir.toString());
753 }
754 if (!fs.isDirectory(keysInputDir)) {
755 throw new UnsupportedOperationException("TODO");
756 } else {
757 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
758 while(iterator.hasNext()) {
759 LocatedFileStatus keyFileStatus = iterator.next();
760
761 if (keyFileStatus.getPath().getName().startsWith("_")) continue;
762 result.addAll(readFileToSearch(conf, fs, keyFileStatus));
763 }
764 }
765 return result;
766 }
767
768 private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
769 final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
770 InterruptedException {
771 SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
772
773
774 TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
775 try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
776 new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
777 InputSplit is =
778 new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
779 rr.initialize(is, context);
780 while (rr.nextKeyValue()) {
781 rr.getCurrentKey();
782 BytesWritable bw = rr.getCurrentValue();
783 if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
784 byte[] key = new byte[rr.getCurrentKey().getLength()];
785 System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
786 .getLength());
787 result.add(key);
788 }
789 }
790 }
791 return result;
792 }
793 }
794
795
796
797
798
799 static class Verify extends Configured implements Tool {
800
801 private static final Log LOG = LogFactory.getLog(Verify.class);
802 protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
803 protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
804
805 protected Job job;
806
807 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
808 private BytesWritable row = new BytesWritable();
809 private BytesWritable ref = new BytesWritable();
810
811 private boolean multipleUnevenColumnFamilies;
812
813 @Override
814 protected void setup(
815 Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
816 throws IOException, InterruptedException {
817 this.multipleUnevenColumnFamilies = isMultiUnevenColumnFamilies(context.getConfiguration());
818 }
819
820 @Override
821 protected void map(ImmutableBytesWritable key, Result value, Context context)
822 throws IOException ,InterruptedException {
823 byte[] rowKey = key.get();
824 row.set(rowKey, 0, rowKey.length);
825 if (multipleUnevenColumnFamilies
826 && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
827 TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
828 context.write(row, DEF_LOST_FAMILIES);
829 } else {
830 context.write(row, DEF);
831 }
832 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
833 if (prev != null && prev.length > 0) {
834 ref.set(prev, 0, prev.length);
835 context.write(ref, row);
836 } else {
837 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
838 }
839 }
840 }
841
842
843
844
845
846 public static enum Counts {
847 UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
848 LOST_FAMILIES
849 }
850
851
852
853
854
855
856 public static class VerifyReducer extends
857 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
858 private ArrayList<byte[]> refs = new ArrayList<byte[]>();
859 private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
860 Counts.UNREFERENCED.ordinal(), new byte[] {}));
861 private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
862 Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
863
864 private AtomicInteger rows = new AtomicInteger(0);
865 private Connection connection;
866
867 @Override
868 protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
869 throws IOException, InterruptedException {
870 super.setup(context);
871 this.connection = ConnectionFactory.createConnection(context.getConfiguration());
872 }
873
874 @Override
875 protected void cleanup(
876 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
877 throws IOException, InterruptedException {
878 if (this.connection != null) {
879 this.connection.close();
880 }
881 super.cleanup(context);
882 }
883
884
885
886
887
888
889
890 public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
891 byte[] prefix = Bytes.toBytes((short)ordinal);
892 if (prefix.length != Bytes.SIZEOF_SHORT) {
893 throw new RuntimeException("Unexpected size: " + prefix.length);
894 }
895 byte[] result = new byte[prefix.length + r.length];
896 System.arraycopy(prefix, 0, result, 0, prefix.length);
897 System.arraycopy(r, 0, result, prefix.length, r.length);
898 return result;
899 }
900
901
902
903
904
905
906 public static Counts whichType(final byte [] bs) {
907 int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
908 return Counts.values()[ordinal];
909 }
910
911
912
913
914
915 public static byte[] getRowOnly(BytesWritable bw) {
916 byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
917 System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
918 return bytes;
919 }
920
921 @Override
922 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
923 throws IOException, InterruptedException {
924 int defCount = 0;
925 boolean lostFamilies = false;
926 refs.clear();
927 for (BytesWritable type : values) {
928 if (type.getLength() == DEF.getLength()) {
929 defCount++;
930 if (type.getBytes()[0] == 1) {
931 lostFamilies = true;
932 }
933 } else {
934 byte[] bytes = new byte[type.getLength()];
935 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
936 refs.add(bytes);
937 }
938 }
939
940
941 StringBuilder refsSb = null;
942 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
943 if (defCount == 0 || refs.size() != 1) {
944 refsSb = dumpExtraInfoOnRefs(key, context, refs);
945 LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
946 (refsSb != null? refsSb.toString(): ""));
947 }
948 if (lostFamilies) {
949 LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
950 context.getCounter(Counts.LOST_FAMILIES).increment(1);
951 context.write(key, LOSTFAM);
952 }
953
954 if (defCount == 0 && refs.size() > 0) {
955
956
957
958 for (int i = 0; i < refs.size(); i++) {
959 byte[] bs = refs.get(i);
960 int ordinal;
961 if (i <= 0) {
962 ordinal = Counts.UNDEFINED.ordinal();
963 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
964 context.getCounter(Counts.UNDEFINED).increment(1);
965 } else {
966 ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
967 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
968 }
969 }
970 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
971
972
973
974
975 context.getCounter("undef", keyString).increment(1);
976 }
977 } else if (defCount > 0 && refs.size() == 0) {
978
979 context.write(key, UNREF);
980 context.getCounter(Counts.UNREFERENCED).increment(1);
981 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
982 context.getCounter("unref", keyString).increment(1);
983 }
984 } else {
985 if (refs.size() > 1) {
986
987 for (int i = 1; i < refs.size(); i++) {
988 context.write(key,
989 new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
990 }
991 context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
992 }
993
994 context.getCounter(Counts.REFERENCED).increment(1);
995 }
996 }
997
998
999
1000
1001
1002
1003 private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1004 final List<byte []> refs)
1005 throws IOException {
1006 StringBuilder refsSb = null;
1007 if (refs.isEmpty()) return refsSb;
1008 refsSb = new StringBuilder();
1009 String comma = "";
1010
1011
1012
1013 TableName tn = getTableName(context.getConfiguration());
1014 try (Table t = this.connection.getTable(tn)) {
1015 for (byte [] ref : refs) {
1016 Result r = t.get(new Get(ref));
1017 List<Cell> cells = r.listCells();
1018 String ts = (cells != null && !cells.isEmpty())?
1019 new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
1020 byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1021 String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
1022 b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1023 long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
1024 b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1025 String refRegionLocation = "";
1026 String keyRegionLocation = "";
1027 if (b != null && b.length > 0) {
1028 try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1029 HRegionLocation hrl = rl.getRegionLocation(b);
1030 if (hrl != null) refRegionLocation = hrl.toString();
1031
1032 hrl = rl.getRegionLocation(key.getBytes());
1033 if (hrl != null) keyRegionLocation = hrl.toString();
1034 }
1035 }
1036 LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
1037 ", refPrevEqualsKey=" +
1038 (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
1039 ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
1040 ", ref row date=" + ts + ", jobStr=" + jobStr +
1041 ", ref row count=" + count +
1042 ", ref row regionLocation=" + refRegionLocation +
1043 ", key row regionLocation=" + keyRegionLocation);
1044 refsSb.append(comma);
1045 comma = ",";
1046 refsSb.append(Bytes.toStringBinary(ref));
1047 }
1048 }
1049 return refsSb;
1050 }
1051 }
1052
1053 @Override
1054 public int run(String[] args) throws Exception {
1055 if (args.length != 2) {
1056 System.out.println("Usage : " + Verify.class.getSimpleName()
1057 + " <output dir> <num reducers>");
1058 return 0;
1059 }
1060
1061 String outputDir = args[0];
1062 int numReducers = Integer.parseInt(args[1]);
1063
1064 return run(outputDir, numReducers);
1065 }
1066
1067 public int run(String outputDir, int numReducers) throws Exception {
1068 return run(new Path(outputDir), numReducers);
1069 }
1070
1071 public int run(Path outputDir, int numReducers) throws Exception {
1072 LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
1073
1074 job = Job.getInstance(getConf());
1075
1076 job.setJobName("Link Verifier");
1077 job.setNumReduceTasks(numReducers);
1078 job.setJarByClass(getClass());
1079
1080 setJobScannerConf(job);
1081
1082 Scan scan = new Scan();
1083 scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1084 scan.setCaching(10000);
1085 scan.setCacheBlocks(false);
1086 if (isMultiUnevenColumnFamilies(getConf())) {
1087 scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1088 scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1089 }
1090
1091 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1092 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1093 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
1094
1095 job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1096
1097 job.setReducerClass(VerifyReducer.class);
1098 job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1099 job.setOutputKeyClass(BytesWritable.class);
1100 job.setOutputValueClass(BytesWritable.class);
1101 TextOutputFormat.setOutputPath(job, outputDir);
1102
1103 boolean success = job.waitForCompletion(true);
1104
1105 if (success) {
1106 Counters counters = job.getCounters();
1107 if (null == counters) {
1108 LOG.warn("Counters were null, cannot verify Job completion");
1109
1110 return 0;
1111 }
1112
1113
1114 if (verifyUnexpectedValues(counters)) {
1115
1116 return 0;
1117 }
1118 }
1119
1120
1121 return 1;
1122 }
1123
1124 public boolean verify(long expectedReferenced) throws Exception {
1125 if (job == null) {
1126 throw new IllegalStateException("You should call run() first");
1127 }
1128
1129 Counters counters = job.getCounters();
1130
1131
1132 boolean success = verifyExpectedValues(expectedReferenced, counters);
1133
1134 if (!verifyUnexpectedValues(counters)) {
1135
1136 success = false;
1137 }
1138
1139 if (!success) {
1140 handleFailure(counters);
1141 }
1142 return success;
1143 }
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154 protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1155 final Counter referenced = counters.findCounter(Counts.REFERENCED);
1156 final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
1157 boolean success = true;
1158
1159 if (expectedReferenced != referenced.getValue()) {
1160 LOG.error("Expected referenced count does not match with actual referenced count. " +
1161 "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1162 success = false;
1163 }
1164
1165 if (unreferenced.getValue() > 0) {
1166 final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
1167 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1168 LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1169 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1170 success = false;
1171 }
1172
1173 return success;
1174 }
1175
1176
1177
1178
1179
1180
1181
1182
1183 protected boolean verifyUnexpectedValues(Counters counters) {
1184 final Counter undefined = counters.findCounter(Counts.UNDEFINED);
1185 final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
1186 boolean success = true;
1187
1188 if (undefined.getValue() > 0) {
1189 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1190 success = false;
1191 }
1192
1193 if (lostfamilies.getValue() > 0) {
1194 LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1195 success = false;
1196 }
1197
1198 return success;
1199 }
1200
1201 protected void handleFailure(Counters counters) throws IOException {
1202 Configuration conf = job.getConfiguration();
1203 TableName tableName = getTableName(conf);
1204 try (Connection conn = ConnectionFactory.createConnection(conf)) {
1205 try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1206 CounterGroup g = counters.getGroup("undef");
1207 Iterator<Counter> it = g.iterator();
1208 while (it.hasNext()) {
1209 String keyString = it.next().getName();
1210 byte[] key = Bytes.toBytes(keyString);
1211 HRegionLocation loc = rl.getRegionLocation(key, true);
1212 LOG.error("undefined row " + keyString + ", " + loc);
1213 }
1214 g = counters.getGroup("unref");
1215 it = g.iterator();
1216 while (it.hasNext()) {
1217 String keyString = it.next().getName();
1218 byte[] key = Bytes.toBytes(keyString);
1219 HRegionLocation loc = rl.getRegionLocation(key, true);
1220 LOG.error("unreferred row " + keyString + ", " + loc);
1221 }
1222 }
1223 }
1224 }
1225 }
1226
1227
1228
1229
1230
1231 static class Loop extends Configured implements Tool {
1232
1233 private static final Log LOG = LogFactory.getLog(Loop.class);
1234
1235 IntegrationTestBigLinkedList it;
1236
1237 protected void runGenerator(int numMappers, long numNodes,
1238 String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
1239 Path outputPath = new Path(outputDir);
1240 UUID uuid = UUID.randomUUID();
1241 Path generatorOutput = new Path(outputPath, uuid.toString());
1242
1243 Generator generator = new Generator();
1244 generator.setConf(getConf());
1245 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
1246 if (retCode > 0) {
1247 throw new RuntimeException("Generator failed with return code: " + retCode);
1248 }
1249 }
1250
1251 protected void runVerify(String outputDir,
1252 int numReducers, long expectedNumNodes) throws Exception {
1253 Path outputPath = new Path(outputDir);
1254 UUID uuid = UUID.randomUUID();
1255 Path iterationOutput = new Path(outputPath, uuid.toString());
1256
1257 Verify verify = new Verify();
1258 verify.setConf(getConf());
1259 int retCode = verify.run(iterationOutput, numReducers);
1260 if (retCode > 0) {
1261 throw new RuntimeException("Verify.run failed with return code: " + retCode);
1262 }
1263
1264 if (!verify.verify(expectedNumNodes)) {
1265 throw new RuntimeException("Verify.verify failed");
1266 }
1267
1268 LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
1269 }
1270
1271 @Override
1272 public int run(String[] args) throws Exception {
1273 if (args.length < 5) {
1274 System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
1275 return 1;
1276 }
1277 LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1278
1279 int numIterations = Integer.parseInt(args[0]);
1280 int numMappers = Integer.parseInt(args[1]);
1281 long numNodes = Long.parseLong(args[2]);
1282 String outputDir = args[3];
1283 int numReducers = Integer.parseInt(args[4]);
1284 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1285 Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1286
1287 long expectedNumNodes = 0;
1288
1289 if (numIterations < 0) {
1290 numIterations = Integer.MAX_VALUE;
1291 }
1292
1293 for (int i = 0; i < numIterations; i++) {
1294 LOG.info("Starting iteration = " + i);
1295 runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
1296 expectedNumNodes += numMappers * numNodes;
1297
1298 runVerify(outputDir, numReducers, expectedNumNodes);
1299 }
1300
1301 return 0;
1302 }
1303 }
1304
1305
1306
1307
1308 private static class Print extends Configured implements Tool {
1309 @Override
1310 public int run(String[] args) throws Exception {
1311 Options options = new Options();
1312 options.addOption("s", "start", true, "start key");
1313 options.addOption("e", "end", true, "end key");
1314 options.addOption("l", "limit", true, "number to print");
1315
1316 GnuParser parser = new GnuParser();
1317 CommandLine cmd = null;
1318 try {
1319 cmd = parser.parse(options, args);
1320 if (cmd.getArgs().length != 0) {
1321 throw new ParseException("Command takes no arguments");
1322 }
1323 } catch (ParseException e) {
1324 System.err.println("Failed to parse command line " + e.getMessage());
1325 System.err.println();
1326 HelpFormatter formatter = new HelpFormatter();
1327 formatter.printHelp(getClass().getSimpleName(), options);
1328 System.exit(-1);
1329 }
1330
1331 Table table = new HTable(getConf(), getTableName(getConf()));
1332
1333 Scan scan = new Scan();
1334 scan.setBatch(10000);
1335
1336 if (cmd.hasOption("s"))
1337 scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1338
1339 if (cmd.hasOption("e"))
1340 scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1341
1342 int limit = 0;
1343 if (cmd.hasOption("l"))
1344 limit = Integer.parseInt(cmd.getOptionValue("l"));
1345 else
1346 limit = 100;
1347
1348 ResultScanner scanner = table.getScanner(scan);
1349
1350 CINode node = new CINode();
1351 Result result = scanner.next();
1352 int count = 0;
1353 while (result != null && count++ < limit) {
1354 node = getCINode(result, node);
1355 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1356 Bytes.toStringBinary(node.prev), node.count, node.client);
1357 result = scanner.next();
1358 }
1359 scanner.close();
1360 table.close();
1361
1362 return 0;
1363 }
1364 }
1365
1366
1367
1368
1369 private static class Delete extends Configured implements Tool {
1370 @Override
1371 public int run(String[] args) throws Exception {
1372 if (args.length != 1) {
1373 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1374 return 0;
1375 }
1376 byte[] val = Bytes.toBytesBinary(args[0]);
1377
1378 org.apache.hadoop.hbase.client.Delete delete
1379 = new org.apache.hadoop.hbase.client.Delete(val);
1380
1381 Table table = new HTable(getConf(), getTableName(getConf()));
1382 table.delete(delete);
1383 table.close();
1384
1385 System.out.println("Delete successful");
1386 return 0;
1387 }
1388 }
1389
1390
1391
1392
1393
1394 private static class Walker extends Configured implements Tool {
1395 @Override
1396 public int run(String[] args) throws IOException {
1397 Options options = new Options();
1398 options.addOption("n", "num", true, "number of queries");
1399 options.addOption("s", "start", true, "key to start at, binary string");
1400 options.addOption("l", "logevery", true, "log every N queries");
1401
1402 GnuParser parser = new GnuParser();
1403 CommandLine cmd = null;
1404 try {
1405 cmd = parser.parse(options, args);
1406 if (cmd.getArgs().length != 0) {
1407 throw new ParseException("Command takes no arguments");
1408 }
1409 } catch (ParseException e) {
1410 System.err.println("Failed to parse command line " + e.getMessage());
1411 System.err.println();
1412 HelpFormatter formatter = new HelpFormatter();
1413 formatter.printHelp(getClass().getSimpleName(), options);
1414 System.exit(-1);
1415 }
1416
1417 long maxQueries = Long.MAX_VALUE;
1418 if (cmd.hasOption('n')) {
1419 maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1420 }
1421 Random rand = new SecureRandom();
1422 boolean isSpecificStart = cmd.hasOption('s');
1423 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1424 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1425
1426 Table table = new HTable(getConf(), getTableName(getConf()));
1427 long numQueries = 0;
1428
1429
1430
1431 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1432 if (!isSpecificStart) {
1433 startKey = new byte[ROWKEY_LENGTH];
1434 rand.nextBytes(startKey);
1435 }
1436 CINode node = findStartNode(table, startKey);
1437 if (node == null && isSpecificStart) {
1438 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1439 }
1440 numQueries++;
1441 while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
1442 byte[] prev = node.prev;
1443 long t1 = System.currentTimeMillis();
1444 node = getNode(prev, table, node);
1445 long t2 = System.currentTimeMillis();
1446 if (numQueries % logEvery == 0) {
1447 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1448 }
1449 numQueries++;
1450 if (node == null) {
1451 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1452 } else if (node.prev.length == NO_KEY.length) {
1453 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1454 }
1455 }
1456 }
1457
1458 table.close();
1459 return 0;
1460 }
1461
1462 private static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1463 Scan scan = new Scan();
1464 scan.setStartRow(startKey);
1465 scan.setBatch(1);
1466 scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1467
1468 long t1 = System.currentTimeMillis();
1469 ResultScanner scanner = table.getScanner(scan);
1470 Result result = scanner.next();
1471 long t2 = System.currentTimeMillis();
1472 scanner.close();
1473
1474 if ( result != null) {
1475 CINode node = getCINode(result, new CINode());
1476 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1477 return node;
1478 }
1479
1480 System.out.println("FSR " + (t2 - t1));
1481
1482 return null;
1483 }
1484
1485 private CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1486 Get get = new Get(row);
1487 get.addColumn(FAMILY_NAME, COLUMN_PREV);
1488 Result result = table.get(get);
1489 return getCINode(result, node);
1490 }
1491 }
1492
1493 private static class Clean extends Configured implements Tool {
1494 @Override public int run(String[] args) throws Exception {
1495 if (args.length < 1) {
1496 System.err.println("Usage: Clean <output dir>");
1497 return -1;
1498 }
1499
1500 Path p = new Path(args[0]);
1501 Configuration conf = getConf();
1502 TableName tableName = getTableName(conf);
1503 try (FileSystem fs = HFileSystem.get(conf);
1504 Connection conn = ConnectionFactory.createConnection(conf);
1505 Admin admin = conn.getAdmin()) {
1506 if (admin.tableExists(tableName)) {
1507 admin.disableTable(tableName);
1508 admin.deleteTable(tableName);
1509 }
1510
1511 if (fs.exists(p)) {
1512 fs.delete(p, true);
1513 }
1514 }
1515
1516 return 0;
1517 }
1518 }
1519
1520 static TableName getTableName(Configuration conf) {
1521 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1522 }
1523
1524 private static CINode getCINode(Result result, CINode node) {
1525 node.key = Bytes.copy(result.getRow());
1526 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1527 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1528 } else {
1529 node.prev = NO_KEY;
1530 }
1531 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1532 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1533 } else {
1534 node.count = -1;
1535 }
1536 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1537 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1538 } else {
1539 node.client = "";
1540 }
1541 return node;
1542 }
1543
1544 protected IntegrationTestingUtility util;
1545
1546 @Override
1547 public void setUpCluster() throws Exception {
1548 util = getTestingUtil(getConf());
1549 boolean isDistributed = util.isDistributedCluster();
1550 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1551 if (!isDistributed) {
1552 util.startMiniMapReduceCluster();
1553 }
1554 this.setConf(util.getConfiguration());
1555 }
1556
1557 @Override
1558 public void cleanUpCluster() throws Exception {
1559 super.cleanUpCluster();
1560 if (util.isDistributedCluster()) {
1561 util.shutdownMiniMapReduceCluster();
1562 }
1563 }
1564
1565 private static boolean isMultiUnevenColumnFamilies(Configuration conf) {
1566 return conf.getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,true);
1567 }
1568
1569 @Test
1570 public void testContinuousIngest() throws IOException, Exception {
1571
1572 Configuration conf = getTestingUtil(getConf()).getConfiguration();
1573 if (isMultiUnevenColumnFamilies(getConf())) {
1574
1575 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
1576 }
1577 int ret =
1578 ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1579 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1580 org.junit.Assert.assertEquals(0, ret);
1581 }
1582
1583 private void usage() {
1584 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1585 printCommands();
1586 }
1587
1588 private void printCommands() {
1589 System.err.println("Commands:");
1590 System.err.println(" generator Map only job that generates data.");
1591 System.err.println(" verify A map reduce job that looks for holes. Check return code and");
1592 System.err.println(" look at the counts after running. See REFERENCED and");
1593 System.err.println(" UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1594 System.err.println(" with the Generator.");
1595 System.err.println(" walker " +
1596 "Standalone program that starts following a linked list & emits timing info.");
1597 System.err.println(" print Standalone program that prints nodes in the linked list.");
1598 System.err.println(" delete Standalone program that deletes a·single node.");
1599 System.err.println(" loop Program to Loop through Generator and Verify steps");
1600 System.err.println(" clean Program to clean all left over detritus.");
1601 System.err.println(" search Search for missing keys.");
1602 System.err.flush();
1603 }
1604
1605 @Override
1606 protected void processOptions(CommandLine cmd) {
1607 super.processOptions(cmd);
1608 String[] args = cmd.getArgs();
1609
1610 if (args.length < 1) {
1611 printUsage(this.getClass().getSimpleName() +
1612 " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1613 printCommands();
1614
1615 throw new RuntimeException("Incorrect Number of args.");
1616 }
1617 toRun = args[0];
1618 otherArgs = Arrays.copyOfRange(args, 1, args.length);
1619 }
1620
1621 @Override
1622 public int runTestFromCommandLine() throws Exception {
1623 Tool tool = null;
1624 if (toRun.equalsIgnoreCase("Generator")) {
1625 tool = new Generator();
1626 } else if (toRun.equalsIgnoreCase("Verify")) {
1627 tool = new Verify();
1628 } else if (toRun.equalsIgnoreCase("Loop")) {
1629 Loop loop = new Loop();
1630 loop.it = this;
1631 tool = loop;
1632 } else if (toRun.equalsIgnoreCase("Walker")) {
1633 tool = new Walker();
1634 } else if (toRun.equalsIgnoreCase("Print")) {
1635 tool = new Print();
1636 } else if (toRun.equalsIgnoreCase("Delete")) {
1637 tool = new Delete();
1638 } else if (toRun.equalsIgnoreCase("Clean")) {
1639 tool = new Clean();
1640 } else if (toRun.equalsIgnoreCase("Search")) {
1641 tool = new Search();
1642 } else {
1643 usage();
1644 throw new RuntimeException("Unknown arg");
1645 }
1646
1647 return ToolRunner.run(getConf(), tool, otherArgs);
1648 }
1649
1650 @Override
1651 public TableName getTablename() {
1652 Configuration c = getConf();
1653 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1654 }
1655
1656 @Override
1657 protected Set<String> getColumnFamilies() {
1658 if (isMultiUnevenColumnFamilies(getConf())) {
1659 return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1660 Bytes.toString(TINY_FAMILY_NAME));
1661 } else {
1662 return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1663 }
1664 }
1665
1666 private static void setJobConf(Job job, int numMappers, long numNodes,
1667 Integer width, Integer wrapMultiplier) {
1668 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1669 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1670 if (width != null) {
1671 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1672 }
1673 if (wrapMultiplier != null) {
1674 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1675 }
1676 }
1677
1678 public static void setJobScannerConf(Job job) {
1679
1680 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1681 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1682 }
1683
1684 public static void main(String[] args) throws Exception {
1685 Configuration conf = HBaseConfiguration.create();
1686 IntegrationTestingUtility.setUseDistributedCluster(conf);
1687 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1688 System.exit(ret);
1689 }
1690 }