1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.tool;
21
22 import java.io.Closeable;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Random;
32 import java.util.Set;
33 import java.util.TreeSet;
34 import java.util.concurrent.atomic.AtomicLong;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.ScheduledThreadPoolExecutor;
40 import java.util.regex.Matcher;
41 import java.util.regex.Pattern;
42
43 import org.apache.commons.lang.time.StopWatch;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.hbase.AuthUtil;
48 import org.apache.hadoop.hbase.ChoreService;
49 import org.apache.hadoop.hbase.DoNotRetryIOException;
50 import org.apache.hadoop.hbase.HBaseConfiguration;
51 import org.apache.hadoop.hbase.HColumnDescriptor;
52 import org.apache.hadoop.hbase.HConstants;
53 import org.apache.hadoop.hbase.HRegionInfo;
54 import org.apache.hadoop.hbase.HRegionLocation;
55 import org.apache.hadoop.hbase.HTableDescriptor;
56 import org.apache.hadoop.hbase.NamespaceDescriptor;
57 import org.apache.hadoop.hbase.ScheduledChore;
58 import org.apache.hadoop.hbase.ServerName;
59 import org.apache.hadoop.hbase.TableName;
60 import org.apache.hadoop.hbase.TableNotEnabledException;
61 import org.apache.hadoop.hbase.TableNotFoundException;
62 import org.apache.hadoop.hbase.client.Admin;
63 import org.apache.hadoop.hbase.client.Connection;
64 import org.apache.hadoop.hbase.client.ConnectionFactory;
65 import org.apache.hadoop.hbase.client.Get;
66 import org.apache.hadoop.hbase.client.Put;
67 import org.apache.hadoop.hbase.client.RegionLocator;
68 import org.apache.hadoop.hbase.client.ResultScanner;
69 import org.apache.hadoop.hbase.client.Scan;
70 import org.apache.hadoop.hbase.client.Table;
71 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
72 import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
73 import org.apache.hadoop.hbase.util.Bytes;
74 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
75 import org.apache.hadoop.hbase.util.ReflectionUtils;
76 import org.apache.hadoop.hbase.util.RegionSplitter;
77 import org.apache.hadoop.util.GenericOptionsParser;
78 import org.apache.hadoop.util.Tool;
79 import org.apache.hadoop.util.ToolRunner;
80
81
82
83
84
85
86
87
88
89
90
91
92 public final class Canary implements Tool {
93
94 public interface Sink {
95 public long getReadFailureCount();
96 public long incReadFailureCount();
97 public void publishReadFailure(HRegionInfo region, Exception e);
98 public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
99 public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
100 public long getWriteFailureCount();
101 public void publishWriteFailure(HRegionInfo region, Exception e);
102 public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
103 public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
104 }
105
106
107 public interface ExtendedSink extends Sink {
108 public void publishReadFailure(String table, String server);
109 public void publishReadTiming(String table, String server, long msTime);
110 }
111
112
113
114 public static class StdOutSink implements Sink {
115 private AtomicLong readFailureCount = new AtomicLong(0),
116 writeFailureCount = new AtomicLong(0);
117
118 @Override
119 public long getReadFailureCount() {
120 return readFailureCount.get();
121 }
122
123 @Override
124 public long incReadFailureCount() {
125 return readFailureCount.incrementAndGet();
126 }
127
128 @Override
129 public void publishReadFailure(HRegionInfo region, Exception e) {
130 readFailureCount.incrementAndGet();
131 LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
132 }
133
134 @Override
135 public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
136 readFailureCount.incrementAndGet();
137 LOG.error(String.format("read from region %s column family %s failed",
138 region.getRegionNameAsString(), column.getNameAsString()), e);
139 }
140
141 @Override
142 public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
143 LOG.info(String.format("read from region %s column family %s in %dms",
144 region.getRegionNameAsString(), column.getNameAsString(), msTime));
145 }
146
147 @Override
148 public long getWriteFailureCount() {
149 return writeFailureCount.get();
150 }
151
152 @Override
153 public void publishWriteFailure(HRegionInfo region, Exception e) {
154 writeFailureCount.incrementAndGet();
155 LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
156 }
157
158 @Override
159 public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
160 writeFailureCount.incrementAndGet();
161 LOG.error(String.format("write to region %s column family %s failed",
162 region.getRegionNameAsString(), column.getNameAsString()), e);
163 }
164
165 @Override
166 public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
167 LOG.info(String.format("write to region %s column family %s in %dms",
168 region.getRegionNameAsString(), column.getNameAsString(), msTime));
169 }
170 }
171
172 public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
173
174 @Override
175 public void publishReadFailure(String table, String server) {
176 incReadFailureCount();
177 LOG.error(String.format("Read from table:%s on region server:%s", table, server));
178 }
179
180 @Override
181 public void publishReadTiming(String table, String server, long msTime) {
182 LOG.info(String.format("Read from table:%s on region server:%s in %dms",
183 table, server, msTime));
184 }
185 }
186
187
188
189
190
191 static class RegionTask implements Callable<Void> {
192 public enum TaskType{
193 READ, WRITE
194 }
195 private Connection connection;
196 private HRegionInfo region;
197 private Sink sink;
198 private TaskType taskType;
199
200 RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
201 this.connection = connection;
202 this.region = region;
203 this.sink = sink;
204 this.taskType = taskType;
205 }
206
207 @Override
208 public Void call() {
209 switch (taskType) {
210 case READ:
211 return read();
212 case WRITE:
213 return write();
214 default:
215 return read();
216 }
217 }
218
219 public Void read() {
220 Table table = null;
221 HTableDescriptor tableDesc = null;
222 try {
223 table = connection.getTable(region.getTable());
224 tableDesc = table.getTableDescriptor();
225 } catch (IOException e) {
226 LOG.debug("sniffRegion failed", e);
227 sink.publishReadFailure(region, e);
228 if (table != null) {
229 try {
230 table.close();
231 } catch (IOException ioe) {
232 LOG.error("Close table failed", e);
233 }
234 }
235 return null;
236 }
237
238 byte[] startKey = null;
239 Get get = null;
240 Scan scan = null;
241 ResultScanner rs = null;
242 StopWatch stopWatch = new StopWatch();
243 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
244 stopWatch.reset();
245 startKey = region.getStartKey();
246
247 if (startKey.length > 0) {
248 get = new Get(startKey);
249 get.setCacheBlocks(false);
250 get.setFilter(new FirstKeyOnlyFilter());
251 get.addFamily(column.getName());
252 } else {
253 scan = new Scan();
254 scan.setCaching(1);
255 scan.setCacheBlocks(false);
256 scan.setFilter(new FirstKeyOnlyFilter());
257 scan.addFamily(column.getName());
258 scan.setMaxResultSize(1L);
259 }
260
261 try {
262 if (startKey.length > 0) {
263 stopWatch.start();
264 table.get(get);
265 stopWatch.stop();
266 sink.publishReadTiming(region, column, stopWatch.getTime());
267 } else {
268 stopWatch.start();
269 rs = table.getScanner(scan);
270 stopWatch.stop();
271 sink.publishReadTiming(region, column, stopWatch.getTime());
272 }
273 } catch (Exception e) {
274 sink.publishReadFailure(region, column, e);
275 } finally {
276 if (rs != null) {
277 rs.close();
278 }
279 scan = null;
280 get = null;
281 startKey = null;
282 }
283 }
284 try {
285 table.close();
286 } catch (IOException e) {
287 LOG.error("Close table failed", e);
288 }
289 return null;
290 }
291
292
293
294
295
296 private Void write() {
297 Table table = null;
298 HTableDescriptor tableDesc = null;
299 try {
300 table = connection.getTable(region.getTable());
301 tableDesc = table.getTableDescriptor();
302 byte[] rowToCheck = region.getStartKey();
303 if (rowToCheck.length == 0) {
304 rowToCheck = new byte[]{0x0};
305 }
306 int writeValueSize =
307 connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
308 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
309 Put put = new Put(rowToCheck);
310 byte[] value = new byte[writeValueSize];
311 Bytes.random(value);
312 put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
313 try {
314 long startTime = System.currentTimeMillis();
315 table.put(put);
316 long time = System.currentTimeMillis() - startTime;
317 sink.publishWriteTiming(region, column, time);
318 } catch (Exception e) {
319 sink.publishWriteFailure(region, column, e);
320 }
321 }
322 table.close();
323 } catch (IOException e) {
324 sink.publishWriteFailure(region, e);
325 }
326 return null;
327 }
328 }
329
330
331
332
333 static class RegionServerTask implements Callable<Void> {
334 private Connection connection;
335 private String serverName;
336 private HRegionInfo region;
337 private ExtendedSink sink;
338 private AtomicLong successes;
339
340 RegionServerTask(Connection connection, String serverName, HRegionInfo region,
341 ExtendedSink sink, AtomicLong successes) {
342 this.connection = connection;
343 this.serverName = serverName;
344 this.region = region;
345 this.sink = sink;
346 this.successes = successes;
347 }
348
349 @Override
350 public Void call() {
351 TableName tableName = null;
352 Table table = null;
353 Get get = null;
354 byte[] startKey = null;
355 Scan scan = null;
356 StopWatch stopWatch = new StopWatch();
357
358 stopWatch.reset();
359 try {
360 tableName = region.getTable();
361 table = connection.getTable(tableName);
362 startKey = region.getStartKey();
363
364 if (startKey.length > 0) {
365 get = new Get(startKey);
366 get.setCacheBlocks(false);
367 get.setFilter(new FirstKeyOnlyFilter());
368 stopWatch.start();
369 table.get(get);
370 stopWatch.stop();
371 } else {
372 scan = new Scan();
373 scan.setCacheBlocks(false);
374 scan.setFilter(new FirstKeyOnlyFilter());
375 scan.setCaching(1);
376 scan.setMaxResultSize(1L);
377 stopWatch.start();
378 ResultScanner s = table.getScanner(scan);
379 s.close();
380 stopWatch.stop();
381 }
382 successes.incrementAndGet();
383 sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
384 } catch (TableNotFoundException tnfe) {
385 LOG.error("Table may be deleted", tnfe);
386
387 } catch (TableNotEnabledException tnee) {
388
389 successes.incrementAndGet();
390 LOG.debug("The targeted table was disabled. Assuming success.");
391 } catch (DoNotRetryIOException dnrioe) {
392 sink.publishReadFailure(tableName.getNameAsString(), serverName);
393 LOG.error(dnrioe);
394 } catch (IOException e) {
395 sink.publishReadFailure(tableName.getNameAsString(), serverName);
396 LOG.error(e);
397 } finally {
398 if (table != null) {
399 try {
400 table.close();
401 } catch (IOException e) {
402 LOG.error("Close table failed", e);
403 }
404 }
405 scan = null;
406 get = null;
407 startKey = null;
408 }
409 return null;
410 }
411 }
412
413 private static final int USAGE_EXIT_CODE = 1;
414 private static final int INIT_ERROR_EXIT_CODE = 2;
415 private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
416 private static final int ERROR_EXIT_CODE = 4;
417 private static final int FAILURE_EXIT_CODE = 5;
418
419 private static final long DEFAULT_INTERVAL = 6000;
420
421 private static final long DEFAULT_TIMEOUT = 600000;
422 private static final int MAX_THREADS_NUM = 16;
423
424 private static final Log LOG = LogFactory.getLog(Canary.class);
425
426 public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
427 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
428
429 private static final String CANARY_TABLE_FAMILY_NAME = "Test";
430
431 private Configuration conf = null;
432 private long interval = 0;
433 private Sink sink = null;
434
435 private boolean useRegExp;
436 private long timeout = DEFAULT_TIMEOUT;
437 private boolean failOnError = true;
438 private boolean regionServerMode = false;
439 private boolean regionServerAllRegions = false;
440 private boolean writeSniffing = false;
441 private boolean treatFailureAsError = false;
442 private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
443
444 private ExecutorService executor;
445
446 public Canary() {
447 this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
448 }
449
450 public Canary(ExecutorService executor, Sink sink) {
451 this.executor = executor;
452 this.sink = sink;
453 }
454
455 @Override
456 public Configuration getConf() {
457 return conf;
458 }
459
460 @Override
461 public void setConf(Configuration conf) {
462 this.conf = conf;
463 }
464
465 private int parseArgs(String[] args) {
466 int index = -1;
467
468 for (int i = 0; i < args.length; i++) {
469 String cmd = args[i];
470
471 if (cmd.startsWith("-")) {
472 if (index >= 0) {
473
474 System.err.println("Invalid command line options");
475 printUsageAndExit();
476 }
477
478 if (cmd.equals("-help")) {
479
480 printUsageAndExit();
481 } else if (cmd.equals("-daemon") && interval == 0) {
482
483 interval = DEFAULT_INTERVAL;
484 } else if (cmd.equals("-interval")) {
485
486 i++;
487
488 if (i == args.length) {
489 System.err.println("-interval needs a numeric value argument.");
490 printUsageAndExit();
491 }
492
493 try {
494 interval = Long.parseLong(args[i]) * 1000;
495 } catch (NumberFormatException e) {
496 System.err.println("-interval needs a numeric value argument.");
497 printUsageAndExit();
498 }
499 } else if(cmd.equals("-regionserver")) {
500 this.regionServerMode = true;
501 } else if(cmd.equals("-allRegions")) {
502 this.regionServerAllRegions = true;
503 } else if(cmd.equals("-writeSniffing")) {
504 this.writeSniffing = true;
505 } else if(cmd.equals("-treatFailureAsError")) {
506 this.treatFailureAsError = true;
507 } else if (cmd.equals("-e")) {
508 this.useRegExp = true;
509 } else if (cmd.equals("-t")) {
510 i++;
511
512 if (i == args.length) {
513 System.err.println("-t needs a numeric value argument.");
514 printUsageAndExit();
515 }
516
517 try {
518 this.timeout = Long.parseLong(args[i]);
519 } catch (NumberFormatException e) {
520 System.err.println("-t needs a numeric value argument.");
521 printUsageAndExit();
522 }
523 } else if (cmd.equals("-writeTable")) {
524 i++;
525
526 if (i == args.length) {
527 System.err.println("-writeTable needs a string value argument.");
528 printUsageAndExit();
529 }
530 this.writeTableName = TableName.valueOf(args[i]);
531 } else if (cmd.equals("-f")) {
532 i++;
533
534 if (i == args.length) {
535 System.err
536 .println("-f needs a boolean value argument (true|false).");
537 printUsageAndExit();
538 }
539
540 this.failOnError = Boolean.parseBoolean(args[i]);
541 } else {
542
543 System.err.println(cmd + " options is invalid.");
544 printUsageAndExit();
545 }
546 } else if (index < 0) {
547
548 index = i;
549 }
550 }
551 if (this.regionServerAllRegions && !this.regionServerMode) {
552 System.err.println("-allRegions can only be specified in regionserver mode.");
553 printUsageAndExit();
554 }
555 return index;
556 }
557
558 @Override
559 public int run(String[] args) throws Exception {
560 int index = parseArgs(args);
561 ChoreService choreService = null;
562
563
564
565
566 final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
567 if (authChore != null) {
568 choreService = new ChoreService("CANARY_TOOL");
569 choreService.scheduleChore(authChore);
570 }
571
572
573 Monitor monitor = null;
574 Thread monitorThread = null;
575 long startTime = 0;
576 long currentTimeLength = 0;
577
578
579
580 try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
581 do {
582
583 try {
584 monitor = this.newMonitor(connection, index, args);
585 monitorThread = new Thread(monitor);
586 startTime = System.currentTimeMillis();
587 monitorThread.start();
588 while (!monitor.isDone()) {
589
590 Thread.sleep(1000);
591
592 if (this.failOnError && monitor.hasError()) {
593 monitorThread.interrupt();
594 if (monitor.initialized) {
595 return monitor.errorCode;
596 } else {
597 return INIT_ERROR_EXIT_CODE;
598 }
599 }
600 currentTimeLength = System.currentTimeMillis() - startTime;
601 if (currentTimeLength > this.timeout) {
602 LOG.error("The monitor is running too long (" + currentTimeLength
603 + ") after timeout limit:" + this.timeout
604 + " will be killed itself !!");
605 if (monitor.initialized) {
606 return TIMEOUT_ERROR_EXIT_CODE;
607 } else {
608 return INIT_ERROR_EXIT_CODE;
609 }
610 }
611 }
612
613 if (this.failOnError && monitor.finalCheckForErrors()) {
614 monitorThread.interrupt();
615 return monitor.errorCode;
616 }
617 } finally {
618 if (monitor != null) monitor.close();
619 }
620
621 Thread.sleep(interval);
622 } while (interval > 0);
623 }
624
625 if (choreService != null) {
626 choreService.shutdown();
627 }
628 return monitor.errorCode;
629 }
630
631 private void printUsageAndExit() {
632 System.err.printf(
633 "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
634 getClass().getName());
635 System.err.println(" where [opts] are:");
636 System.err.println(" -help Show this help and exit.");
637 System.err.println(" -regionserver replace the table argument to regionserver,");
638 System.err.println(" which means to enable regionserver mode");
639 System.err.println(" -allRegions Tries all regions on a regionserver,");
640 System.err.println(" only works in regionserver mode.");
641 System.err.println(" -daemon Continuous check at defined intervals.");
642 System.err.println(" -interval <N> Interval between checks (sec)");
643 System.err.println(" -e Use region/regionserver as regular expression");
644 System.err.println(" which means the region/regionserver is regular expression pattern");
645 System.err.println(" -f <B> stop whole program if first error occurs," +
646 " default is true");
647 System.err.println(" -t <N> timeout for a check, default is 600000 (milisecs)");
648 System.err.println(" -writeSniffing enable the write sniffing in canary");
649 System.err.println(" -treatFailureAsError treats read / write failure as error");
650 System.err.println(" -writeTable The table used for write sniffing."
651 + " Default is hbase:canary");
652 System.err
653 .println(" -D<configProperty>=<value> assigning or override the configuration params");
654 System.exit(USAGE_EXIT_CODE);
655 }
656
657
658
659
660
661
662
663
664 public Monitor newMonitor(final Connection connection, int index, String[] args) {
665 Monitor monitor = null;
666 String[] monitorTargets = null;
667
668 if(index >= 0) {
669 int length = args.length - index;
670 monitorTargets = new String[length];
671 System.arraycopy(args, index, monitorTargets, 0, length);
672 }
673
674 if (this.regionServerMode) {
675 monitor =
676 new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
677 (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
678 this.treatFailureAsError);
679 } else {
680 monitor =
681 new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
682 this.writeSniffing, this.writeTableName, this.treatFailureAsError);
683 }
684 return monitor;
685 }
686
687
688 public static abstract class Monitor implements Runnable, Closeable {
689
690 protected Connection connection;
691 protected Admin admin;
692 protected String[] targets;
693 protected boolean useRegExp;
694 protected boolean treatFailureAsError;
695 protected boolean initialized = false;
696
697 protected boolean done = false;
698 protected int errorCode = 0;
699 protected Sink sink;
700 protected ExecutorService executor;
701
702 public boolean isDone() {
703 return done;
704 }
705
706 public boolean hasError() {
707 return errorCode != 0;
708 }
709
710 public boolean finalCheckForErrors() {
711 if (errorCode != 0) {
712 return true;
713 }
714 if (treatFailureAsError &&
715 (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
716 errorCode = FAILURE_EXIT_CODE;
717 return true;
718 }
719 return false;
720 }
721
722 @Override
723 public void close() throws IOException {
724 if (this.admin != null) this.admin.close();
725 }
726
727 protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
728 ExecutorService executor, boolean treatFailureAsError) {
729 if (null == connection) throw new IllegalArgumentException("connection shall not be null");
730
731 this.connection = connection;
732 this.targets = monitorTargets;
733 this.useRegExp = useRegExp;
734 this.treatFailureAsError = treatFailureAsError;
735 this.sink = sink;
736 this.executor = executor;
737 }
738
739 public abstract void run();
740
741 protected boolean initAdmin() {
742 if (null == this.admin) {
743 try {
744 this.admin = this.connection.getAdmin();
745 } catch (Exception e) {
746 LOG.error("Initial HBaseAdmin failed...", e);
747 this.errorCode = INIT_ERROR_EXIT_CODE;
748 }
749 } else if (admin.isAborted()) {
750 LOG.error("HBaseAdmin aborted");
751 this.errorCode = INIT_ERROR_EXIT_CODE;
752 }
753 return !this.hasError();
754 }
755 }
756
757
758 private static class RegionMonitor extends Monitor {
759
760 private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
761
762 private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
763
764 private long lastCheckTime = -1;
765 private boolean writeSniffing;
766 private TableName writeTableName;
767 private int writeDataTTL;
768 private float regionsLowerLimit;
769 private float regionsUpperLimit;
770 private int checkPeriod;
771
772 public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
773 Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
774 boolean treatFailureAsError) {
775 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
776 Configuration conf = connection.getConfiguration();
777 this.writeSniffing = writeSniffing;
778 this.writeTableName = writeTableName;
779 this.writeDataTTL =
780 conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
781 this.regionsLowerLimit =
782 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
783 this.regionsUpperLimit =
784 conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
785 this.checkPeriod =
786 conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
787 DEFAULT_WRITE_TABLE_CHECK_PERIOD);
788 }
789
790 @Override
791 public void run() {
792 if (this.initAdmin()) {
793 try {
794 List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
795 if (this.targets != null && this.targets.length > 0) {
796 String[] tables = generateMonitorTables(this.targets);
797 this.initialized = true;
798 for (String table : tables) {
799 taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
800 }
801 } else {
802 taskFutures.addAll(sniff(TaskType.READ));
803 }
804
805 if (writeSniffing) {
806 if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
807 try {
808 checkWriteTableDistribution();
809 } catch (IOException e) {
810 LOG.error("Check canary table distribution failed!", e);
811 }
812 lastCheckTime = EnvironmentEdgeManager.currentTime();
813 }
814
815 taskFutures.addAll(Canary.sniff(admin, sink,
816 admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
817 }
818
819 for (Future<Void> future : taskFutures) {
820 try {
821 future.get();
822 } catch (ExecutionException e) {
823 LOG.error("Sniff region failed!", e);
824 }
825 }
826 } catch (Exception e) {
827 LOG.error("Run regionMonitor failed", e);
828 this.errorCode = ERROR_EXIT_CODE;
829 }
830 }
831 this.done = true;
832 }
833
834 private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
835 String[] returnTables = null;
836
837 if (this.useRegExp) {
838 Pattern pattern = null;
839 HTableDescriptor[] tds = null;
840 Set<String> tmpTables = new TreeSet<String>();
841 try {
842 for (String monitorTarget : monitorTargets) {
843 pattern = Pattern.compile(monitorTarget);
844 tds = this.admin.listTables(pattern);
845 if (tds != null) {
846 for (HTableDescriptor td : tds) {
847 tmpTables.add(td.getNameAsString());
848 }
849 }
850 }
851 } catch (IOException e) {
852 LOG.error("Communicate with admin failed", e);
853 throw e;
854 }
855
856 if (tmpTables.size() > 0) {
857 returnTables = tmpTables.toArray(new String[tmpTables.size()]);
858 } else {
859 String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
860 LOG.error(msg);
861 this.errorCode = INIT_ERROR_EXIT_CODE;
862 throw new TableNotFoundException(msg);
863 }
864 } else {
865 returnTables = monitorTargets;
866 }
867
868 return returnTables;
869 }
870
871
872
873
874 private List<Future<Void>> sniff(TaskType taskType) throws Exception {
875 List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
876 for (HTableDescriptor table : admin.listTables()) {
877 if (admin.isTableEnabled(table.getTableName())
878 && (!table.getTableName().equals(writeTableName))) {
879 taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
880 }
881 }
882 return taskFutures;
883 }
884
885 private void checkWriteTableDistribution() throws IOException {
886 if (!admin.tableExists(writeTableName)) {
887 int numberOfServers = admin.getClusterStatus().getServers().size();
888 if (numberOfServers == 0) {
889 throw new IllegalStateException("No live regionservers");
890 }
891 createWriteTable(numberOfServers);
892 }
893
894 if (!admin.isTableEnabled(writeTableName)) {
895 admin.enableTable(writeTableName);
896 }
897
898 int numberOfServers = admin.getClusterStatus().getServers().size();
899 List<HRegionLocation> locations;
900 RegionLocator locator = connection.getRegionLocator(writeTableName);
901 try {
902 locations = locator.getAllRegionLocations();
903 } finally {
904 locator.close();
905 }
906 int numberOfRegions = locations.size();
907 if (numberOfRegions < numberOfServers * regionsLowerLimit
908 || numberOfRegions > numberOfServers * regionsUpperLimit) {
909 admin.disableTable(writeTableName);
910 admin.deleteTable(writeTableName);
911 createWriteTable(numberOfServers);
912 }
913 HashSet<ServerName> serverSet = new HashSet<ServerName>();
914 for (HRegionLocation location: locations) {
915 serverSet.add(location.getServerName());
916 }
917 int numberOfCoveredServers = serverSet.size();
918 if (numberOfCoveredServers < numberOfServers) {
919 admin.balancer();
920 }
921 }
922
923 private void createWriteTable(int numberOfServers) throws IOException {
924 int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
925 LOG.info("Number of live regionservers: " + numberOfServers + ", "
926 + "pre-splitting the canary table into " + numberOfRegions + " regions "
927 + "(current lower limi of regions per server is " + regionsLowerLimit
928 + " and you can change it by config: "
929 + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
930 HTableDescriptor desc = new HTableDescriptor(writeTableName);
931 HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
932 family.setMaxVersions(1);
933 family.setTimeToLive(writeDataTTL);
934
935 desc.addFamily(family);
936 byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
937 admin.createTable(desc, splits);
938 }
939 }
940
941
942
943
944
945 public static void sniff(final Admin admin, TableName tableName)
946 throws Exception {
947 sniff(admin, tableName, TaskType.READ);
948 }
949
950
951
952
953
954 public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
955 throws Exception {
956 List<Future<Void>> taskFutures =
957 Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
958 new ScheduledThreadPoolExecutor(1), taskType);
959 for (Future<Void> future : taskFutures) {
960 future.get();
961 }
962 }
963
964
965
966
967
968 private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
969 ExecutorService executor, TaskType taskType) throws Exception {
970 if (admin.isTableEnabled(TableName.valueOf(tableName))) {
971 return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
972 executor, taskType);
973 } else {
974 LOG.warn(String.format("Table %s is not enabled", tableName));
975 }
976 return new LinkedList<Future<Void>>();
977 }
978
979
980
981
982 private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
983 HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
984 Table table = null;
985 try {
986 table = admin.getConnection().getTable(tableDesc.getTableName());
987 } catch (TableNotFoundException e) {
988 return new ArrayList<Future<Void>>();
989 }
990 List<RegionTask> tasks = new ArrayList<RegionTask>();
991 try {
992 for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
993 tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
994 }
995 } finally {
996 table.close();
997 }
998 return executor.invokeAll(tasks);
999 }
1000
1001
1002
1003
1004
1005 private static void sniffRegion(
1006 final Admin admin,
1007 final Sink sink,
1008 HRegionInfo region,
1009 Table table) throws Exception {
1010 HTableDescriptor tableDesc = table.getTableDescriptor();
1011 byte[] startKey = null;
1012 Get get = null;
1013 Scan scan = null;
1014 ResultScanner rs = null;
1015 StopWatch stopWatch = new StopWatch();
1016 for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
1017 stopWatch.reset();
1018 startKey = region.getStartKey();
1019
1020 if (startKey.length > 0) {
1021 get = new Get(startKey);
1022 get.setCacheBlocks(false);
1023 get.setFilter(new FirstKeyOnlyFilter());
1024 get.addFamily(column.getName());
1025 } else {
1026 scan = new Scan();
1027 scan.setRaw(true);
1028 scan.setCaching(1);
1029 scan.setCacheBlocks(false);
1030 scan.setFilter(new FirstKeyOnlyFilter());
1031 scan.addFamily(column.getName());
1032 scan.setMaxResultSize(1L);
1033 }
1034
1035 try {
1036 if (startKey.length > 0) {
1037 stopWatch.start();
1038 table.get(get);
1039 stopWatch.stop();
1040 sink.publishReadTiming(region, column, stopWatch.getTime());
1041 } else {
1042 stopWatch.start();
1043 rs = table.getScanner(scan);
1044 stopWatch.stop();
1045 sink.publishReadTiming(region, column, stopWatch.getTime());
1046 }
1047 } catch (Exception e) {
1048 sink.publishReadFailure(region, column, e);
1049 } finally {
1050 if (rs != null) {
1051 rs.close();
1052 }
1053 scan = null;
1054 get = null;
1055 startKey = null;
1056 }
1057 }
1058 }
1059
1060 private static class RegionServerMonitor extends Monitor {
1061
1062 private boolean allRegions;
1063
1064 public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1065 ExtendedSink sink, ExecutorService executor, boolean allRegions,
1066 boolean treatFailureAsError) {
1067 super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1068 this.allRegions = allRegions;
1069 }
1070
1071 private ExtendedSink getSink() {
1072 return (ExtendedSink) this.sink;
1073 }
1074
1075 @Override
1076 public void run() {
1077 if (this.initAdmin() && this.checkNoTableNames()) {
1078 Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1079 this.initialized = true;
1080 this.monitorRegionServers(rsAndRMap);
1081 }
1082 this.done = true;
1083 }
1084
1085 private boolean checkNoTableNames() {
1086 List<String> foundTableNames = new ArrayList<String>();
1087 TableName[] tableNames = null;
1088
1089 try {
1090 tableNames = this.admin.listTableNames();
1091 } catch (IOException e) {
1092 LOG.error("Get listTableNames failed", e);
1093 this.errorCode = INIT_ERROR_EXIT_CODE;
1094 return false;
1095 }
1096
1097 if (this.targets == null || this.targets.length == 0) return true;
1098
1099 for (String target : this.targets) {
1100 for (TableName tableName : tableNames) {
1101 if (target.equals(tableName.getNameAsString())) {
1102 foundTableNames.add(target);
1103 }
1104 }
1105 }
1106
1107 if (foundTableNames.size() > 0) {
1108 System.err.println("Cannot pass a tablename when using the -regionserver " +
1109 "option, tablenames:" + foundTableNames.toString());
1110 this.errorCode = USAGE_EXIT_CODE;
1111 }
1112 return foundTableNames.size() == 0;
1113 }
1114
1115 private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1116 List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1117 Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1118 Random rand = new Random();
1119 for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1120 String serverName = entry.getKey();
1121 AtomicLong successes = new AtomicLong(0);
1122 successMap.put(serverName, successes);
1123 if (this.allRegions) {
1124 for (HRegionInfo region : entry.getValue()) {
1125 tasks.add(new RegionServerTask(this.connection,
1126 serverName,
1127 region,
1128 getSink(),
1129 successes));
1130 }
1131 } else {
1132
1133 HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1134 tasks.add(new RegionServerTask(this.connection,
1135 serverName,
1136 region,
1137 getSink(),
1138 successes));
1139 }
1140 }
1141 try {
1142 for (Future<Void> future : this.executor.invokeAll(tasks)) {
1143 try {
1144 future.get();
1145 } catch (ExecutionException e) {
1146 LOG.error("Sniff regionserver failed!", e);
1147 this.errorCode = ERROR_EXIT_CODE;
1148 }
1149 }
1150 if (this.allRegions) {
1151 for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1152 String serverName = entry.getKey();
1153 LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1154 + entry.getValue().size() + " on regionserver:" + serverName);
1155 }
1156 }
1157 } catch (InterruptedException e) {
1158 this.errorCode = ERROR_EXIT_CODE;
1159 LOG.error("Sniff regionserver interrupted!", e);
1160 }
1161 }
1162
1163 private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1164 Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1165 regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1166 return regionServerAndRegionsMap;
1167 }
1168
1169 private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1170 Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1171 Table table = null;
1172 RegionLocator regionLocator = null;
1173 try {
1174 HTableDescriptor[] tableDescs = this.admin.listTables();
1175 List<HRegionInfo> regions = null;
1176 for (HTableDescriptor tableDesc : tableDescs) {
1177 table = this.admin.getConnection().getTable(tableDesc.getTableName());
1178 regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1179
1180 for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1181 ServerName rs = location.getServerName();
1182 String rsName = rs.getHostname();
1183 HRegionInfo r = location.getRegionInfo();
1184
1185 if (rsAndRMap.containsKey(rsName)) {
1186 regions = rsAndRMap.get(rsName);
1187 } else {
1188 regions = new ArrayList<HRegionInfo>();
1189 rsAndRMap.put(rsName, regions);
1190 }
1191 regions.add(r);
1192 }
1193 table.close();
1194 }
1195
1196 } catch (IOException e) {
1197 String msg = "Get HTables info failed";
1198 LOG.error(msg, e);
1199 this.errorCode = INIT_ERROR_EXIT_CODE;
1200 } finally {
1201 if (table != null) {
1202 try {
1203 table.close();
1204 } catch (IOException e) {
1205 LOG.warn("Close table failed", e);
1206 }
1207 }
1208 }
1209
1210 return rsAndRMap;
1211 }
1212
1213 private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1214 Map<String, List<HRegionInfo>> fullRsAndRMap) {
1215
1216 Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1217
1218 if (this.targets != null && this.targets.length > 0) {
1219 filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1220 Pattern pattern = null;
1221 Matcher matcher = null;
1222 boolean regExpFound = false;
1223 for (String rsName : this.targets) {
1224 if (this.useRegExp) {
1225 regExpFound = false;
1226 pattern = Pattern.compile(rsName);
1227 for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1228 matcher = pattern.matcher(entry.getKey());
1229 if (matcher.matches()) {
1230 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1231 regExpFound = true;
1232 }
1233 }
1234 if (!regExpFound) {
1235 LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1236 }
1237 } else {
1238 if (fullRsAndRMap.containsKey(rsName)) {
1239 filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1240 } else {
1241 LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1242 }
1243 }
1244 }
1245 } else {
1246 filteredRsAndRMap = fullRsAndRMap;
1247 }
1248 return filteredRsAndRMap;
1249 }
1250 }
1251
1252 public static void main(String[] args) throws Exception {
1253 final Configuration conf = HBaseConfiguration.create();
1254 final ChoreService choreService = new ChoreService("CANARY_TOOL");
1255 final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
1256 if (authChore != null) {
1257 choreService.scheduleChore(authChore);
1258 }
1259
1260
1261 new GenericOptionsParser(conf, args);
1262
1263 int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1264 LOG.info("Number of exection threads " + numThreads);
1265
1266 ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1267
1268 Class<? extends Sink> sinkClass =
1269 conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1270 Sink sink = ReflectionUtils.newInstance(sinkClass);
1271
1272 int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1273 choreService.shutdown();
1274 executor.shutdown();
1275 System.exit(exitCode);
1276 }
1277 }