View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.tool;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeSet;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.Callable;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.ScheduledThreadPoolExecutor;
40  import java.util.regex.Matcher;
41  import java.util.regex.Pattern;
42  
43  import org.apache.commons.lang.time.StopWatch;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.AuthUtil;
48  import org.apache.hadoop.hbase.ChoreService;
49  import org.apache.hadoop.hbase.DoNotRetryIOException;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HColumnDescriptor;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionInfo;
54  import org.apache.hadoop.hbase.HRegionLocation;
55  import org.apache.hadoop.hbase.HTableDescriptor;
56  import org.apache.hadoop.hbase.NamespaceDescriptor;
57  import org.apache.hadoop.hbase.ScheduledChore;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.TableNotEnabledException;
61  import org.apache.hadoop.hbase.TableNotFoundException;
62  import org.apache.hadoop.hbase.client.Admin;
63  import org.apache.hadoop.hbase.client.Connection;
64  import org.apache.hadoop.hbase.client.ConnectionFactory;
65  import org.apache.hadoop.hbase.client.Get;
66  import org.apache.hadoop.hbase.client.Put;
67  import org.apache.hadoop.hbase.client.RegionLocator;
68  import org.apache.hadoop.hbase.client.ResultScanner;
69  import org.apache.hadoop.hbase.client.Scan;
70  import org.apache.hadoop.hbase.client.Table;
71  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
72  import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
73  import org.apache.hadoop.hbase.util.Bytes;
74  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
75  import org.apache.hadoop.hbase.util.ReflectionUtils;
76  import org.apache.hadoop.hbase.util.RegionSplitter;
77  import org.apache.hadoop.util.GenericOptionsParser;
78  import org.apache.hadoop.util.Tool;
79  import org.apache.hadoop.util.ToolRunner;
80  
81  /**
82   * HBase Canary Tool, that that can be used to do
83   * "canary monitoring" of a running HBase cluster.
84   *
85   * Here are two modes
86   * 1. region mode - Foreach region tries to get one row per column family
87   * and outputs some information about failure or latency.
88   *
89   * 2. regionserver mode - Foreach regionserver tries to get one row from one table
90   * selected randomly and outputs some information about failure or latency.
91   */
92  public final class Canary implements Tool {
93    // Sink interface used by the canary to outputs information
94    public interface Sink {
95      public long getReadFailureCount();
96      public long incReadFailureCount();
97      public void publishReadFailure(HRegionInfo region, Exception e);
98      public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
99      public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
100     public long getWriteFailureCount();
101     public void publishWriteFailure(HRegionInfo region, Exception e);
102     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
103     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
104   }
105   // new extended sink for output regionserver mode info
106   // do not change the Sink interface directly due to maintaining the API
107   public interface ExtendedSink extends Sink {
108     public void publishReadFailure(String table, String server);
109     public void publishReadTiming(String table, String server, long msTime);
110   }
111 
112   // Simple implementation of canary sink that allows to plot on
113   // file or standard output timings or failures.
114   public static class StdOutSink implements Sink {
115     private AtomicLong readFailureCount = new AtomicLong(0),
116         writeFailureCount = new AtomicLong(0);
117 
118     @Override
119     public long getReadFailureCount() {
120       return readFailureCount.get();
121     }
122 
123     @Override
124     public long incReadFailureCount() {
125       return readFailureCount.incrementAndGet();
126     }
127 
128     @Override
129     public void publishReadFailure(HRegionInfo region, Exception e) {
130       readFailureCount.incrementAndGet();
131       LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
132     }
133 
134     @Override
135     public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
136       readFailureCount.incrementAndGet();
137       LOG.error(String.format("read from region %s column family %s failed",
138                 region.getRegionNameAsString(), column.getNameAsString()), e);
139     }
140 
141     @Override
142     public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
143       LOG.info(String.format("read from region %s column family %s in %dms",
144                region.getRegionNameAsString(), column.getNameAsString(), msTime));
145     }
146 
147     @Override
148     public long getWriteFailureCount() {
149       return writeFailureCount.get();
150     }
151 
152     @Override
153     public void publishWriteFailure(HRegionInfo region, Exception e) {
154       writeFailureCount.incrementAndGet();
155       LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
156     }
157 
158     @Override
159     public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
160       writeFailureCount.incrementAndGet();
161       LOG.error(String.format("write to region %s column family %s failed",
162         region.getRegionNameAsString(), column.getNameAsString()), e);
163     }
164 
165     @Override
166     public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
167       LOG.info(String.format("write to region %s column family %s in %dms",
168         region.getRegionNameAsString(), column.getNameAsString(), msTime));
169     }
170   }
171   // a ExtendedSink implementation
172   public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
173 
174     @Override
175     public void publishReadFailure(String table, String server) {
176       incReadFailureCount();
177       LOG.error(String.format("Read from table:%s on region server:%s", table, server));
178     }
179 
180     @Override
181     public void publishReadTiming(String table, String server, long msTime) {
182       LOG.info(String.format("Read from table:%s on region server:%s in %dms",
183           table, server, msTime));
184     }
185   }
186 
187   /**
188    * For each column family of the region tries to get one row and outputs the latency, or the
189    * failure.
190    */
191   static class RegionTask implements Callable<Void> {
192     public enum TaskType{
193       READ, WRITE
194     }
195     private Connection connection;
196     private HRegionInfo region;
197     private Sink sink;
198     private TaskType taskType;
199 
200     RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
201       this.connection = connection;
202       this.region = region;
203       this.sink = sink;
204       this.taskType = taskType;
205     }
206 
207     @Override
208     public Void call() {
209       switch (taskType) {
210       case READ:
211         return read();
212       case WRITE:
213         return write();
214       default:
215         return read();
216       }
217     }
218 
219     public Void read() {
220       Table table = null;
221       HTableDescriptor tableDesc = null;
222       try {
223         table = connection.getTable(region.getTable());
224         tableDesc = table.getTableDescriptor();
225       } catch (IOException e) {
226         LOG.debug("sniffRegion failed", e);
227         sink.publishReadFailure(region, e);
228         if (table != null) {
229           try {
230             table.close();
231           } catch (IOException ioe) {
232             LOG.error("Close table failed", e);
233           }
234         }
235         return null;
236       }
237 
238       byte[] startKey = null;
239       Get get = null;
240       Scan scan = null;
241       ResultScanner rs = null;
242       StopWatch stopWatch = new StopWatch();
243       for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
244         stopWatch.reset();
245         startKey = region.getStartKey();
246         // Can't do a get on empty start row so do a Scan of first element if any instead.
247         if (startKey.length > 0) {
248           get = new Get(startKey);
249           get.setCacheBlocks(false);
250           get.setFilter(new FirstKeyOnlyFilter());
251           get.addFamily(column.getName());
252         } else {
253           scan = new Scan();
254           scan.setCaching(1);
255           scan.setCacheBlocks(false);
256           scan.setFilter(new FirstKeyOnlyFilter());
257           scan.addFamily(column.getName());
258           scan.setMaxResultSize(1L);
259         }
260 
261         try {
262           if (startKey.length > 0) {
263             stopWatch.start();
264             table.get(get);
265             stopWatch.stop();
266             sink.publishReadTiming(region, column, stopWatch.getTime());
267           } else {
268             stopWatch.start();
269             rs = table.getScanner(scan);
270             stopWatch.stop();
271             sink.publishReadTiming(region, column, stopWatch.getTime());
272           }
273         } catch (Exception e) {
274           sink.publishReadFailure(region, column, e);
275         } finally {
276           if (rs != null) {
277             rs.close();
278           }
279           scan = null;
280           get = null;
281           startKey = null;
282         }
283       }
284       try {
285         table.close();
286       } catch (IOException e) {
287         LOG.error("Close table failed", e);
288       }
289       return null;
290     }
291 
292     /**
293      * Check writes for the canary table
294      * @return
295      */
296     private Void write() {
297       Table table = null;
298       HTableDescriptor tableDesc = null;
299       try {
300         table = connection.getTable(region.getTable());
301         tableDesc = table.getTableDescriptor();
302         byte[] rowToCheck = region.getStartKey();
303         if (rowToCheck.length == 0) {
304           rowToCheck = new byte[]{0x0};
305         }
306         int writeValueSize =
307             connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
308         for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
309           Put put = new Put(rowToCheck);
310           byte[] value = new byte[writeValueSize];
311           Bytes.random(value);
312           put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
313           try {
314             long startTime = System.currentTimeMillis();
315             table.put(put);
316             long time = System.currentTimeMillis() - startTime;
317             sink.publishWriteTiming(region, column, time);
318           } catch (Exception e) {
319             sink.publishWriteFailure(region, column, e);
320           }
321         }
322         table.close();
323       } catch (IOException e) {
324         sink.publishWriteFailure(region, e);
325       }
326       return null;
327     }
328   }
329 
330   /**
331    * Get one row from a region on the regionserver and outputs the latency, or the failure.
332    */
333   static class RegionServerTask implements Callable<Void> {
334     private Connection connection;
335     private String serverName;
336     private HRegionInfo region;
337     private ExtendedSink sink;
338     private AtomicLong successes;
339 
340     RegionServerTask(Connection connection, String serverName, HRegionInfo region,
341         ExtendedSink sink, AtomicLong successes) {
342       this.connection = connection;
343       this.serverName = serverName;
344       this.region = region;
345       this.sink = sink;
346       this.successes = successes;
347     }
348 
349     @Override
350     public Void call() {
351       TableName tableName = null;
352       Table table = null;
353       Get get = null;
354       byte[] startKey = null;
355       Scan scan = null;
356       StopWatch stopWatch = new StopWatch();
357       // monitor one region on every region server
358       stopWatch.reset();
359       try {
360         tableName = region.getTable();
361         table = connection.getTable(tableName);
362         startKey = region.getStartKey();
363         // Can't do a get on empty start row so do a Scan of first element if any instead.
364         if (startKey.length > 0) {
365           get = new Get(startKey);
366           get.setCacheBlocks(false);
367           get.setFilter(new FirstKeyOnlyFilter());
368           stopWatch.start();
369           table.get(get);
370           stopWatch.stop();
371         } else {
372           scan = new Scan();
373           scan.setCacheBlocks(false);
374           scan.setFilter(new FirstKeyOnlyFilter());
375           scan.setCaching(1);
376           scan.setMaxResultSize(1L);
377           stopWatch.start();
378           ResultScanner s = table.getScanner(scan);
379           s.close();
380           stopWatch.stop();
381         }
382         successes.incrementAndGet();
383         sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
384       } catch (TableNotFoundException tnfe) {
385         LOG.error("Table may be deleted", tnfe);
386         // This is ignored because it doesn't imply that the regionserver is dead
387       } catch (TableNotEnabledException tnee) {
388         // This is considered a success since we got a response.
389         successes.incrementAndGet();
390         LOG.debug("The targeted table was disabled.  Assuming success.");
391       } catch (DoNotRetryIOException dnrioe) {
392         sink.publishReadFailure(tableName.getNameAsString(), serverName);
393         LOG.error(dnrioe);
394       } catch (IOException e) {
395         sink.publishReadFailure(tableName.getNameAsString(), serverName);
396         LOG.error(e);
397       } finally {
398         if (table != null) {
399           try {
400             table.close();
401           } catch (IOException e) {/* DO NOTHING */
402             LOG.error("Close table failed", e);
403           }
404         }
405         scan = null;
406         get = null;
407         startKey = null;
408       }
409       return null;
410     }
411   }
412 
413   private static final int USAGE_EXIT_CODE = 1;
414   private static final int INIT_ERROR_EXIT_CODE = 2;
415   private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
416   private static final int ERROR_EXIT_CODE = 4;
417   private static final int FAILURE_EXIT_CODE = 5;
418 
419   private static final long DEFAULT_INTERVAL = 6000;
420 
421   private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
422   private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
423 
424   private static final Log LOG = LogFactory.getLog(Canary.class);
425 
426   public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
427     NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
428 
429   private static final String CANARY_TABLE_FAMILY_NAME = "Test";
430 
431   private Configuration conf = null;
432   private long interval = 0;
433   private Sink sink = null;
434 
435   private boolean useRegExp;
436   private long timeout = DEFAULT_TIMEOUT;
437   private boolean failOnError = true;
438   private boolean regionServerMode = false;
439   private boolean regionServerAllRegions = false;
440   private boolean writeSniffing = false;
441   private boolean treatFailureAsError = false;
442   private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
443 
444   private ExecutorService executor; // threads to retrieve data from regionservers
445 
446   public Canary() {
447     this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
448   }
449 
450   public Canary(ExecutorService executor, Sink sink) {
451     this.executor = executor;
452     this.sink = sink;
453   }
454 
455   @Override
456   public Configuration getConf() {
457     return conf;
458   }
459 
460   @Override
461   public void setConf(Configuration conf) {
462     this.conf = conf;
463   }
464 
465   private int parseArgs(String[] args) {
466     int index = -1;
467     // Process command line args
468     for (int i = 0; i < args.length; i++) {
469       String cmd = args[i];
470 
471       if (cmd.startsWith("-")) {
472         if (index >= 0) {
473           // command line args must be in the form: [opts] [table 1 [table 2 ...]]
474           System.err.println("Invalid command line options");
475           printUsageAndExit();
476         }
477 
478         if (cmd.equals("-help")) {
479           // user asked for help, print the help and quit.
480           printUsageAndExit();
481         } else if (cmd.equals("-daemon") && interval == 0) {
482           // user asked for daemon mode, set a default interval between checks
483           interval = DEFAULT_INTERVAL;
484         } else if (cmd.equals("-interval")) {
485           // user has specified an interval for canary breaths (-interval N)
486           i++;
487 
488           if (i == args.length) {
489             System.err.println("-interval needs a numeric value argument.");
490             printUsageAndExit();
491           }
492 
493           try {
494             interval = Long.parseLong(args[i]) * 1000;
495           } catch (NumberFormatException e) {
496             System.err.println("-interval needs a numeric value argument.");
497             printUsageAndExit();
498           }
499         } else if(cmd.equals("-regionserver")) {
500           this.regionServerMode = true;
501         } else if(cmd.equals("-allRegions")) {
502           this.regionServerAllRegions = true;
503         } else if(cmd.equals("-writeSniffing")) {
504           this.writeSniffing = true;
505         } else if(cmd.equals("-treatFailureAsError")) {
506           this.treatFailureAsError = true;
507         } else if (cmd.equals("-e")) {
508           this.useRegExp = true;
509         } else if (cmd.equals("-t")) {
510           i++;
511 
512           if (i == args.length) {
513             System.err.println("-t needs a numeric value argument.");
514             printUsageAndExit();
515           }
516 
517           try {
518             this.timeout = Long.parseLong(args[i]);
519           } catch (NumberFormatException e) {
520             System.err.println("-t needs a numeric value argument.");
521             printUsageAndExit();
522           }
523         } else if (cmd.equals("-writeTable")) {
524           i++;
525 
526           if (i == args.length) {
527             System.err.println("-writeTable needs a string value argument.");
528             printUsageAndExit();
529           }
530           this.writeTableName = TableName.valueOf(args[i]);
531         } else if (cmd.equals("-f")) {
532           i++;
533 
534           if (i == args.length) {
535             System.err
536                 .println("-f needs a boolean value argument (true|false).");
537             printUsageAndExit();
538           }
539 
540           this.failOnError = Boolean.parseBoolean(args[i]);
541         } else {
542           // no options match
543           System.err.println(cmd + " options is invalid.");
544           printUsageAndExit();
545         }
546       } else if (index < 0) {
547         // keep track of first table name specified by the user
548         index = i;
549       }
550     }
551     if (this.regionServerAllRegions && !this.regionServerMode) {
552       System.err.println("-allRegions can only be specified in regionserver mode.");
553       printUsageAndExit();
554     }
555     return index;
556   }
557 
558   @Override
559   public int run(String[] args) throws Exception {
560     int index = parseArgs(args);
561     ChoreService choreService = null;
562 
563     // Launches chore for refreshing kerberos credentials if security is enabled.
564     // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
565     // for more details.
566     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
567     if (authChore != null) {
568       choreService = new ChoreService("CANARY_TOOL");
569       choreService.scheduleChore(authChore);
570     }
571 
572     // Start to prepare the stuffs
573     Monitor monitor = null;
574     Thread monitorThread = null;
575     long startTime = 0;
576     long currentTimeLength = 0;
577     // Get a connection to use in below.
578     // try-with-resources jdk7 construct. See
579     // http://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
580     try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
581       do {
582         // Do monitor !!
583         try {
584           monitor = this.newMonitor(connection, index, args);
585           monitorThread = new Thread(monitor);
586           startTime = System.currentTimeMillis();
587           monitorThread.start();
588           while (!monitor.isDone()) {
589             // wait for 1 sec
590             Thread.sleep(1000);
591             // exit if any error occurs
592             if (this.failOnError && monitor.hasError()) {
593               monitorThread.interrupt();
594               if (monitor.initialized) {
595                 return monitor.errorCode;
596               } else {
597                 return INIT_ERROR_EXIT_CODE;
598               }
599             }
600             currentTimeLength = System.currentTimeMillis() - startTime;
601             if (currentTimeLength > this.timeout) {
602               LOG.error("The monitor is running too long (" + currentTimeLength
603                   + ") after timeout limit:" + this.timeout
604                   + " will be killed itself !!");
605               if (monitor.initialized) {
606                 return TIMEOUT_ERROR_EXIT_CODE;
607               } else {
608                 return INIT_ERROR_EXIT_CODE;
609               }
610             }
611           }
612 
613           if (this.failOnError && monitor.finalCheckForErrors()) {
614             monitorThread.interrupt();
615             return monitor.errorCode;
616           }
617         } finally {
618           if (monitor != null) monitor.close();
619         }
620 
621         Thread.sleep(interval);
622       } while (interval > 0);
623     } // try-with-resources close
624 
625     if (choreService != null) {
626       choreService.shutdown();
627     }
628     return monitor.errorCode;
629   }
630 
631   private void printUsageAndExit() {
632     System.err.printf(
633       "Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
634         getClass().getName());
635     System.err.println(" where [opts] are:");
636     System.err.println("   -help          Show this help and exit.");
637     System.err.println("   -regionserver  replace the table argument to regionserver,");
638     System.err.println("      which means to enable regionserver mode");
639     System.err.println("   -allRegions    Tries all regions on a regionserver,");
640     System.err.println("      only works in regionserver mode.");
641     System.err.println("   -daemon        Continuous check at defined intervals.");
642     System.err.println("   -interval <N>  Interval between checks (sec)");
643     System.err.println("   -e             Use region/regionserver as regular expression");
644     System.err.println("      which means the region/regionserver is regular expression pattern");
645     System.err.println("   -f <B>         stop whole program if first error occurs," +
646         " default is true");
647     System.err.println("   -t <N>         timeout for a check, default is 600000 (milisecs)");
648     System.err.println("   -writeSniffing enable the write sniffing in canary");
649     System.err.println("   -treatFailureAsError treats read / write failure as error");
650     System.err.println("   -writeTable    The table used for write sniffing."
651         + " Default is hbase:canary");
652     System.err
653         .println("   -D<configProperty>=<value> assigning or override the configuration params");
654     System.exit(USAGE_EXIT_CODE);
655   }
656 
657   /**
658    * A Factory method for {@link Monitor}.
659    * Can be overridden by user.
660    * @param index a start index for monitor target
661    * @param args args passed from user
662    * @return a Monitor instance
663    */
664   public Monitor newMonitor(final Connection connection, int index, String[] args) {
665     Monitor monitor = null;
666     String[] monitorTargets = null;
667 
668     if(index >= 0) {
669       int length = args.length - index;
670       monitorTargets = new String[length];
671       System.arraycopy(args, index, monitorTargets, 0, length);
672     }
673 
674     if (this.regionServerMode) {
675       monitor =
676           new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
677               (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
678               this.treatFailureAsError);
679     } else {
680       monitor =
681           new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
682               this.writeSniffing, this.writeTableName, this.treatFailureAsError);
683     }
684     return monitor;
685   }
686 
687   // a Monitor super-class can be extended by users
688   public static abstract class Monitor implements Runnable, Closeable {
689 
690     protected Connection connection;
691     protected Admin admin;
692     protected String[] targets;
693     protected boolean useRegExp;
694     protected boolean treatFailureAsError;
695     protected boolean initialized = false;
696 
697     protected boolean done = false;
698     protected int errorCode = 0;
699     protected Sink sink;
700     protected ExecutorService executor;
701 
702     public boolean isDone() {
703       return done;
704     }
705 
706     public boolean hasError() {
707       return errorCode != 0;
708     }
709 
710     public boolean finalCheckForErrors() {
711       if (errorCode != 0) {
712         return true;
713       }
714       if (treatFailureAsError &&
715           (sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
716         errorCode = FAILURE_EXIT_CODE;
717         return true;
718       }
719       return false;
720     }
721 
722     @Override
723     public void close() throws IOException {
724       if (this.admin != null) this.admin.close();
725     }
726 
727     protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
728         ExecutorService executor, boolean treatFailureAsError) {
729       if (null == connection) throw new IllegalArgumentException("connection shall not be null");
730 
731       this.connection = connection;
732       this.targets = monitorTargets;
733       this.useRegExp = useRegExp;
734       this.treatFailureAsError = treatFailureAsError;
735       this.sink = sink;
736       this.executor = executor;
737     }
738 
739     public abstract void run();
740 
741     protected boolean initAdmin() {
742       if (null == this.admin) {
743         try {
744           this.admin = this.connection.getAdmin();
745         } catch (Exception e) {
746           LOG.error("Initial HBaseAdmin failed...", e);
747           this.errorCode = INIT_ERROR_EXIT_CODE;
748         }
749       } else if (admin.isAborted()) {
750         LOG.error("HBaseAdmin aborted");
751         this.errorCode = INIT_ERROR_EXIT_CODE;
752       }
753       return !this.hasError();
754     }
755   }
756 
757   // a monitor for region mode
758   private static class RegionMonitor extends Monitor {
759     // 10 minutes
760     private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
761     // 1 days
762     private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
763 
764     private long lastCheckTime = -1;
765     private boolean writeSniffing;
766     private TableName writeTableName;
767     private int writeDataTTL;
768     private float regionsLowerLimit;
769     private float regionsUpperLimit;
770     private int checkPeriod;
771 
772     public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
773         Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
774         boolean treatFailureAsError) {
775       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
776       Configuration conf = connection.getConfiguration();
777       this.writeSniffing = writeSniffing;
778       this.writeTableName = writeTableName;
779       this.writeDataTTL =
780           conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
781       this.regionsLowerLimit =
782           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
783       this.regionsUpperLimit =
784           conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
785       this.checkPeriod =
786           conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
787             DEFAULT_WRITE_TABLE_CHECK_PERIOD);
788     }
789 
790     @Override
791     public void run() {
792       if (this.initAdmin()) {
793         try {
794           List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
795           if (this.targets != null && this.targets.length > 0) {
796             String[] tables = generateMonitorTables(this.targets);
797             this.initialized = true;
798             for (String table : tables) {
799               taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
800             }
801           } else {
802             taskFutures.addAll(sniff(TaskType.READ));
803           }
804 
805           if (writeSniffing) {
806             if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
807               try {
808                 checkWriteTableDistribution();
809               } catch (IOException e) {
810                 LOG.error("Check canary table distribution failed!", e);
811               }
812               lastCheckTime = EnvironmentEdgeManager.currentTime();
813             }
814             // sniff canary table with write operation
815             taskFutures.addAll(Canary.sniff(admin, sink,
816               admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
817           }
818 
819           for (Future<Void> future : taskFutures) {
820             try {
821               future.get();
822             } catch (ExecutionException e) {
823               LOG.error("Sniff region failed!", e);
824             }
825           }
826         } catch (Exception e) {
827           LOG.error("Run regionMonitor failed", e);
828           this.errorCode = ERROR_EXIT_CODE;
829         }
830       }
831       this.done = true;
832     }
833 
834     private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
835       String[] returnTables = null;
836 
837       if (this.useRegExp) {
838         Pattern pattern = null;
839         HTableDescriptor[] tds = null;
840         Set<String> tmpTables = new TreeSet<String>();
841         try {
842           for (String monitorTarget : monitorTargets) {
843             pattern = Pattern.compile(monitorTarget);
844             tds = this.admin.listTables(pattern);
845             if (tds != null) {
846               for (HTableDescriptor td : tds) {
847                 tmpTables.add(td.getNameAsString());
848               }
849             }
850           }
851         } catch (IOException e) {
852           LOG.error("Communicate with admin failed", e);
853           throw e;
854         }
855 
856         if (tmpTables.size() > 0) {
857           returnTables = tmpTables.toArray(new String[tmpTables.size()]);
858         } else {
859           String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
860           LOG.error(msg);
861           this.errorCode = INIT_ERROR_EXIT_CODE;
862           throw new TableNotFoundException(msg);
863         }
864       } else {
865         returnTables = monitorTargets;
866       }
867 
868       return returnTables;
869     }
870 
871     /*
872      * canary entry point to monitor all the tables.
873      */
874     private List<Future<Void>> sniff(TaskType taskType) throws Exception {
875       List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
876       for (HTableDescriptor table : admin.listTables()) {
877         if (admin.isTableEnabled(table.getTableName())
878             && (!table.getTableName().equals(writeTableName))) {
879           taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
880         }
881       }
882       return taskFutures;
883     }
884 
885     private void checkWriteTableDistribution() throws IOException {
886       if (!admin.tableExists(writeTableName)) {
887         int numberOfServers = admin.getClusterStatus().getServers().size();
888         if (numberOfServers == 0) {
889           throw new IllegalStateException("No live regionservers");
890         }
891         createWriteTable(numberOfServers);
892       }
893 
894       if (!admin.isTableEnabled(writeTableName)) {
895         admin.enableTable(writeTableName);
896       }
897 
898       int numberOfServers = admin.getClusterStatus().getServers().size();
899       List<HRegionLocation> locations;
900       RegionLocator locator = connection.getRegionLocator(writeTableName);
901       try {
902         locations = locator.getAllRegionLocations();
903       } finally {
904         locator.close();
905       }
906       int numberOfRegions = locations.size();
907       if (numberOfRegions < numberOfServers * regionsLowerLimit
908           || numberOfRegions > numberOfServers * regionsUpperLimit) {
909         admin.disableTable(writeTableName);
910         admin.deleteTable(writeTableName);
911         createWriteTable(numberOfServers);
912       }
913       HashSet<ServerName> serverSet = new HashSet<ServerName>();
914       for (HRegionLocation location: locations) {
915         serverSet.add(location.getServerName());
916       }
917       int numberOfCoveredServers = serverSet.size();
918       if (numberOfCoveredServers < numberOfServers) {
919         admin.balancer();
920       }
921     }
922 
923     private void createWriteTable(int numberOfServers) throws IOException {
924       int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
925       LOG.info("Number of live regionservers: " + numberOfServers + ", "
926           + "pre-splitting the canary table into " + numberOfRegions + " regions "
927           + "(current  lower limi of regions per server is " + regionsLowerLimit
928           + " and you can change it by config: "
929           + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
930       HTableDescriptor desc = new HTableDescriptor(writeTableName);
931       HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
932       family.setMaxVersions(1);
933       family.setTimeToLive(writeDataTTL);
934 
935       desc.addFamily(family);
936       byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
937       admin.createTable(desc, splits);
938     }
939   }
940 
941   /**
942    * Canary entry point for specified table.
943    * @throws Exception
944    */
945   public static void sniff(final Admin admin, TableName tableName)
946       throws Exception {
947     sniff(admin, tableName, TaskType.READ);
948   }
949 
950   /**
951    * Canary entry point for specified table with task type(read/write)
952    * @throws Exception
953    */
954   public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
955       throws Exception {
956     List<Future<Void>> taskFutures =
957         Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
958           new ScheduledThreadPoolExecutor(1), taskType);
959     for (Future<Void> future : taskFutures) {
960       future.get();
961     }
962   }
963 
964   /**
965    * Canary entry point for specified table.
966    * @throws Exception
967    */
968   private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
969       ExecutorService executor, TaskType taskType) throws Exception {
970     if (admin.isTableEnabled(TableName.valueOf(tableName))) {
971       return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
972         executor, taskType);
973     } else {
974       LOG.warn(String.format("Table %s is not enabled", tableName));
975     }
976     return new LinkedList<Future<Void>>();
977   }
978 
979   /*
980    * Loops over regions that owns this table, and output some information abouts the state.
981    */
982   private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
983       HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
984     Table table = null;
985     try {
986       table = admin.getConnection().getTable(tableDesc.getTableName());
987     } catch (TableNotFoundException e) {
988       return new ArrayList<Future<Void>>();
989     }
990     List<RegionTask> tasks = new ArrayList<RegionTask>();
991     try {
992       for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
993         tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
994       }
995     } finally {
996       table.close();
997     }
998     return executor.invokeAll(tasks);
999   }
1000 
1001   /*
1002    * For each column family of the region tries to get one row and outputs the latency, or the
1003    * failure.
1004    */
1005   private static void sniffRegion(
1006       final Admin admin,
1007       final Sink sink,
1008       HRegionInfo region,
1009       Table table) throws Exception {
1010     HTableDescriptor tableDesc = table.getTableDescriptor();
1011     byte[] startKey = null;
1012     Get get = null;
1013     Scan scan = null;
1014     ResultScanner rs = null;
1015     StopWatch stopWatch = new StopWatch();
1016     for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
1017       stopWatch.reset();
1018       startKey = region.getStartKey();
1019       // Can't do a get on empty start row so do a Scan of first element if any instead.
1020       if (startKey.length > 0) {
1021         get = new Get(startKey);
1022         get.setCacheBlocks(false);
1023         get.setFilter(new FirstKeyOnlyFilter());
1024         get.addFamily(column.getName());
1025       } else {
1026         scan = new Scan();
1027         scan.setRaw(true);
1028         scan.setCaching(1);
1029         scan.setCacheBlocks(false);
1030         scan.setFilter(new FirstKeyOnlyFilter());
1031         scan.addFamily(column.getName());
1032         scan.setMaxResultSize(1L);
1033       }
1034 
1035       try {
1036         if (startKey.length > 0) {
1037           stopWatch.start();
1038           table.get(get);
1039           stopWatch.stop();
1040           sink.publishReadTiming(region, column, stopWatch.getTime());
1041         } else {
1042           stopWatch.start();
1043           rs = table.getScanner(scan);
1044           stopWatch.stop();
1045           sink.publishReadTiming(region, column, stopWatch.getTime());
1046         }
1047       } catch (Exception e) {
1048         sink.publishReadFailure(region, column, e);
1049       } finally {
1050         if (rs != null) {
1051           rs.close();
1052         }
1053         scan = null;
1054         get = null;
1055         startKey = null;
1056       }
1057     }
1058   }
1059   // a monitor for regionserver mode
1060   private static class RegionServerMonitor extends Monitor {
1061 
1062     private boolean allRegions;
1063 
1064     public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
1065         ExtendedSink sink, ExecutorService executor, boolean allRegions,
1066         boolean treatFailureAsError) {
1067       super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
1068       this.allRegions = allRegions;
1069     }
1070 
1071     private ExtendedSink getSink() {
1072       return (ExtendedSink) this.sink;
1073     }
1074 
1075     @Override
1076     public void run() {
1077       if (this.initAdmin() && this.checkNoTableNames()) {
1078         Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
1079         this.initialized = true;
1080         this.monitorRegionServers(rsAndRMap);
1081       }
1082       this.done = true;
1083     }
1084 
1085     private boolean checkNoTableNames() {
1086       List<String> foundTableNames = new ArrayList<String>();
1087       TableName[] tableNames = null;
1088 
1089       try {
1090         tableNames = this.admin.listTableNames();
1091       } catch (IOException e) {
1092         LOG.error("Get listTableNames failed", e);
1093         this.errorCode = INIT_ERROR_EXIT_CODE;
1094         return false;
1095       }
1096 
1097       if (this.targets == null || this.targets.length == 0) return true;
1098 
1099       for (String target : this.targets) {
1100         for (TableName tableName : tableNames) {
1101           if (target.equals(tableName.getNameAsString())) {
1102             foundTableNames.add(target);
1103           }
1104         }
1105       }
1106 
1107       if (foundTableNames.size() > 0) {
1108         System.err.println("Cannot pass a tablename when using the -regionserver " +
1109             "option, tablenames:" + foundTableNames.toString());
1110         this.errorCode = USAGE_EXIT_CODE;
1111       }
1112       return foundTableNames.size() == 0;
1113     }
1114 
1115     private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
1116       List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
1117       Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
1118       Random rand = new Random();
1119       for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1120         String serverName = entry.getKey();
1121         AtomicLong successes = new AtomicLong(0);
1122         successMap.put(serverName, successes);
1123         if (this.allRegions) {
1124           for (HRegionInfo region : entry.getValue()) {
1125             tasks.add(new RegionServerTask(this.connection,
1126                 serverName,
1127                 region,
1128                 getSink(),
1129                 successes));
1130           }
1131         } else {
1132           // random select a region if flag not set
1133           HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
1134           tasks.add(new RegionServerTask(this.connection,
1135               serverName,
1136               region,
1137               getSink(),
1138               successes));
1139         }
1140       }
1141       try {
1142         for (Future<Void> future : this.executor.invokeAll(tasks)) {
1143           try {
1144             future.get();
1145           } catch (ExecutionException e) {
1146             LOG.error("Sniff regionserver failed!", e);
1147             this.errorCode = ERROR_EXIT_CODE;
1148           }
1149         }
1150         if (this.allRegions) {
1151           for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
1152             String serverName = entry.getKey();
1153             LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
1154                     + entry.getValue().size() + " on regionserver:" + serverName);
1155           }
1156         }
1157       } catch (InterruptedException e) {
1158         this.errorCode = ERROR_EXIT_CODE;
1159         LOG.error("Sniff regionserver interrupted!", e);
1160       }
1161     }
1162 
1163     private Map<String, List<HRegionInfo>> filterRegionServerByName() {
1164       Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
1165       regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
1166       return regionServerAndRegionsMap;
1167     }
1168 
1169     private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
1170       Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
1171       Table table = null;
1172       RegionLocator regionLocator = null;
1173       try {
1174         HTableDescriptor[] tableDescs = this.admin.listTables();
1175         List<HRegionInfo> regions = null;
1176         for (HTableDescriptor tableDesc : tableDescs) {
1177           table = this.admin.getConnection().getTable(tableDesc.getTableName());
1178           regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
1179 
1180           for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
1181             ServerName rs = location.getServerName();
1182             String rsName = rs.getHostname();
1183             HRegionInfo r = location.getRegionInfo();
1184 
1185             if (rsAndRMap.containsKey(rsName)) {
1186               regions = rsAndRMap.get(rsName);
1187             } else {
1188               regions = new ArrayList<HRegionInfo>();
1189               rsAndRMap.put(rsName, regions);
1190             }
1191             regions.add(r);
1192           }
1193           table.close();
1194         }
1195 
1196       } catch (IOException e) {
1197         String msg = "Get HTables info failed";
1198         LOG.error(msg, e);
1199         this.errorCode = INIT_ERROR_EXIT_CODE;
1200       } finally {
1201         if (table != null) {
1202           try {
1203             table.close();
1204           } catch (IOException e) {
1205             LOG.warn("Close table failed", e);
1206           }
1207         }
1208       }
1209 
1210       return rsAndRMap;
1211     }
1212 
1213     private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
1214         Map<String, List<HRegionInfo>> fullRsAndRMap) {
1215 
1216       Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
1217 
1218       if (this.targets != null && this.targets.length > 0) {
1219         filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
1220         Pattern pattern = null;
1221         Matcher matcher = null;
1222         boolean regExpFound = false;
1223         for (String rsName : this.targets) {
1224           if (this.useRegExp) {
1225             regExpFound = false;
1226             pattern = Pattern.compile(rsName);
1227             for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
1228               matcher = pattern.matcher(entry.getKey());
1229               if (matcher.matches()) {
1230                 filteredRsAndRMap.put(entry.getKey(), entry.getValue());
1231                 regExpFound = true;
1232               }
1233             }
1234             if (!regExpFound) {
1235               LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
1236             }
1237           } else {
1238             if (fullRsAndRMap.containsKey(rsName)) {
1239               filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
1240             } else {
1241               LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
1242             }
1243           }
1244         }
1245       } else {
1246         filteredRsAndRMap = fullRsAndRMap;
1247       }
1248       return filteredRsAndRMap;
1249     }
1250   }
1251 
1252   public static void main(String[] args) throws Exception {
1253     final Configuration conf = HBaseConfiguration.create();
1254     final ChoreService choreService = new ChoreService("CANARY_TOOL");
1255     final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
1256     if (authChore != null) {
1257       choreService.scheduleChore(authChore);
1258     }
1259 
1260     // loading the generic options to conf
1261     new GenericOptionsParser(conf, args);
1262 
1263     int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
1264     LOG.info("Number of exection threads " + numThreads);
1265 
1266     ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
1267 
1268     Class<? extends Sink> sinkClass =
1269         conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
1270     Sink sink = ReflectionUtils.newInstance(sinkClass);
1271 
1272     int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
1273     choreService.shutdown();
1274     executor.shutdown();
1275     System.exit(exitCode);
1276   }
1277 }