View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.test;
20  
21  import java.io.IOException;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ScheduledExecutorService;
27  import java.util.concurrent.ScheduledFuture;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.apache.commons.lang.math.RandomUtils;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.HBaseConfiguration;
36  import org.apache.hadoop.hbase.HRegionLocation;
37  import org.apache.hadoop.hbase.IntegrationTestIngest;
38  import org.apache.hadoop.hbase.IntegrationTestingUtility;
39  import org.apache.hadoop.hbase.RegionLocations;
40  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
43  import org.apache.hadoop.hbase.client.Admin;
44  import org.apache.hadoop.hbase.client.ClusterConnection;
45  import org.apache.hadoop.hbase.client.Consistency;
46  import org.apache.hadoop.hbase.client.Get;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.Table;
49  import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
50  import org.apache.hadoop.hbase.util.LoadTestTool;
51  import org.apache.hadoop.hbase.util.MultiThreadedReader;
52  import org.apache.hadoop.hbase.util.Threads;
53  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
54  import org.apache.hadoop.util.StringUtils;
55  import org.apache.hadoop.util.ToolRunner;
56  import org.junit.Assert;
57  import org.junit.experimental.categories.Category;
58  
59  import com.google.common.collect.Lists;
60  
61  /**
62   * An IntegrationTest for doing reads with a timeout, to a read-only table with region
63   * replicas. ChaosMonkey is run which kills the region servers and master, but ensures
64   * that meta region server is not killed, and at most 2 region servers are dead at any point
65   * in time. The expected behavior is that all reads with stale mode true will return
66   * before the timeout (5 sec by default). The test fails if the read requests does not finish
67   * in time.
68   *
69   * <p> This test uses LoadTestTool to read and write the data from a single client but
70   * multiple threads. The data is written first, then we allow the region replicas to catch
71   * up. Then we start the reader threads doing get requests with stale mode true. Chaos Monkey is
72   * started after some delay (20 sec by default) after the reader threads are started so that
73   * there is enough time to fully cache meta.
74   *
75   * These parameters (and some other parameters from LoadTestTool) can be used to
76   * control behavior, given values are default:
77   * <pre>
78   * -Dhbase.DIntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
79   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
80   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000
81   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500
82   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3
83   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20
84   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20
85   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
86   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000
87   * </pre>
88   * Use this test with "serverKilling" ChaosMonkey. Sample usage:
89   * <pre>
90   * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas
91   * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
92   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40
93   * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40
94   * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling
95   * </pre>
96   */
97  @Category(IntegrationTests.class)
98  public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest {
99  
100   private static final Log LOG = LogFactory.getLog(
101     IntegrationTestTimeBoundedRequestsWithRegionReplicas.class);
102 
103   private static final String TEST_NAME
104     = IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName();
105 
106   protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec
107   protected static final String GET_TIMEOUT_KEY = "get_timeout_ms";
108 
109   protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec
110   protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay";
111 
112   protected static final int DEFAULT_REGION_REPLICATION = 3;
113 
114   @Override
115   protected void startMonkey() throws Exception {
116     // we do not want to start the monkey at the start of the test.
117   }
118 
119   @Override
120   protected MonkeyFactory getDefaultMonkeyFactory() {
121     return MonkeyFactory.getFactory(MonkeyFactory.CALM);
122   }
123 
124   @Override
125   public void setConf(Configuration conf) {
126     super.setConf(conf);
127     // default replication for this test is 3
128     String clazz = this.getClass().getSimpleName();
129     conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION),
130       Integer.toString(DEFAULT_REGION_REPLICATION));
131   }
132 
133   protected void writeData(int colsPerKey, int recordSize, int writeThreads,
134       long startKey, long numKeys) throws IOException {
135     int ret = loadTool.run(getArgsForLoadTestTool("-write",
136       String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
137     if (0 != ret) {
138       String errorMsg = "Load failed with error code " + ret;
139       LOG.error(errorMsg);
140       Assert.fail(errorMsg);
141     }
142   }
143 
144   @Override
145   protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
146       int recordSize, int writeThreads, int readThreads) throws Exception {
147     LOG.info("Cluster size:"+
148       util.getHBaseClusterInterface().getClusterStatus().getServersSize());
149 
150     long start = System.currentTimeMillis();
151     String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
152     long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
153     long startKey = 0;
154 
155     long numKeys = getNumKeys(keysPerServerPerIter);
156 
157 
158     // write data once
159     LOG.info("Writing some data to the table");
160     writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys);
161 
162     // flush the table
163     LOG.info("Flushing the table");
164     Admin admin = util.getHBaseAdmin();
165     admin.flush(getTablename());
166 
167     // re-open the regions to make sure that the replicas are up to date
168     long refreshTime = conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
169     if (refreshTime > 0 && refreshTime <= 10000) {
170       LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated");
171       Threads.sleep(refreshTime*3);
172     } else {
173       LOG.info("Reopening the table");
174       admin.disableTable(getTablename());
175       admin.enableTable(getTablename());
176     }
177 
178     // We should only start the ChaosMonkey after the readers are started and have cached
179     // all of the region locations. Because the meta is not replicated, the timebounded reads
180     // will timeout if meta server is killed.
181     // We will start the chaos monkey after 1 minute, and since the readers are reading random
182     // keys, it should be enough to cache every region entry.
183     long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY)
184       , DEFAUL_CHAOS_MONKEY_DELAY);
185     ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
186     LOG.info(String.format("ChaosMonkey delay is : %d seconds. Will start %s " +
187         "ChaosMonkey after delay", chaosMonkeyDelay / 1000, monkeyToUse));
188     ScheduledFuture<?> result = executorService.schedule(new Runnable() {
189       @Override
190       public void run() {
191         try {
192           LOG.info("Starting ChaosMonkey");
193           monkey.start();
194           monkey.waitForStop();
195         } catch (Exception e) {
196           LOG.warn(StringUtils.stringifyException(e));
197         }
198 
199       }
200     }, chaosMonkeyDelay, TimeUnit.MILLISECONDS);
201 
202     // set the intended run time for the reader. The reader will do read requests
203     // to random keys for this amount of time.
204     long remainingTime = runtime - (System.currentTimeMillis() - start);
205     LOG.info("Reading random keys from the table for " + remainingTime/60000 + " min");
206     this.conf.setLong(
207       String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName())
208       , remainingTime); // load tool shares the same conf
209 
210     // now start the readers which will run for configured run time
211     try {
212       int ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
213         , startKey, numKeys));
214       if (0 != ret) {
215         String errorMsg = "Verification failed with error code " + ret;
216         LOG.error(errorMsg);
217         Assert.fail(errorMsg);
218       }
219     } finally {
220       if (result != null) result.cancel(false);
221       monkey.stop("Stopping the test");
222       monkey.waitForStop();
223       executorService.shutdown();
224     }
225   }
226 
227   @Override
228   protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
229       long numKeys) {
230     List<String> args = Lists.newArrayList(super.getArgsForLoadTestTool(
231       mode, modeSpecificArg, startKey, numKeys));
232     args.add("-reader");
233     args.add(TimeBoundedMultiThreadedReader.class.getName());
234     return args.toArray(new String[args.size()]);
235   }
236 
237   public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader {
238     protected long timeoutNano;
239     protected AtomicLong timedOutReads = new AtomicLong();
240     protected long runTime;
241     protected Thread timeoutThread;
242     protected AtomicLong staleReads = new AtomicLong();
243 
244     public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
245         TableName tableName, double verifyPercent) throws IOException {
246       super(dataGen, conf, tableName, verifyPercent);
247       long timeoutMs = conf.getLong(
248         String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT);
249       timeoutNano = timeoutMs * 1000000;
250       LOG.info("Timeout for gets: " + timeoutMs);
251       String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
252       this.runTime = conf.getLong(runTimeKey, -1);
253       if (this.runTime <= 0) {
254         throw new IllegalArgumentException("Please configure " + runTimeKey);
255       }
256     }
257 
258     @Override
259     public void waitForFinish() {
260       try {
261         this.timeoutThread.join();
262       } catch (InterruptedException e) {
263         e.printStackTrace();
264       }
265       this.aborted = true;
266       super.waitForFinish();
267     }
268 
269     @Override
270     protected String progressInfo() {
271       StringBuilder builder = new StringBuilder(super.progressInfo());
272       appendToStatus(builder, "stale_reads", staleReads.get());
273       appendToStatus(builder, "get_timeouts", timedOutReads.get());
274       return builder.toString();
275     }
276 
277     @Override
278     public void start(long startKey, long endKey, int numThreads) throws IOException {
279       super.start(startKey, endKey, numThreads);
280       this.timeoutThread = new TimeoutThread(this.runTime);
281       this.timeoutThread.start();
282     }
283 
284     @Override
285     protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
286       return new TimeBoundedMultiThreadedReaderThread(readerId);
287     }
288 
289     private class TimeoutThread extends Thread {
290       long timeout;
291       long reportInterval = 60000;
292       public TimeoutThread(long timeout) {
293         this.timeout = timeout;
294       }
295 
296       @Override
297       public void run() {
298         while (true) {
299           long rem = Math.min(timeout, reportInterval);
300           if (rem <= 0) {
301             break;
302           }
303           LOG.info("Remaining execution time:" + timeout / 60000 + " min");
304           Threads.sleep(rem);
305           timeout -= rem;
306         }
307       }
308     }
309 
310     public class TimeBoundedMultiThreadedReaderThread
311       extends MultiThreadedReader.HBaseReaderThread {
312 
313       public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException {
314         super(readerId);
315       }
316 
317       @Override
318       protected Get createGet(long keyToRead) throws IOException {
319         Get get = super.createGet(keyToRead);
320         get.setConsistency(Consistency.TIMELINE);
321         return get;
322       }
323 
324       @Override
325       protected long getNextKeyToRead() {
326         // always read a random key, assuming that the writer has finished writing all keys
327         long key = startKey + Math.abs(RandomUtils.nextLong())
328             % (endKey - startKey);
329         return key;
330       }
331 
332       @Override
333       protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
334           Result[] results, Table table, boolean isNullExpected)
335           throws IOException {
336         super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected);
337         for (Result r : results) {
338           if (r.isStale()) staleReads.incrementAndGet();
339         }
340         // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC
341         // to complete, but if the request took longer than timeout, we treat that as error.
342         if (elapsedNano > timeoutNano) {
343           timedOutReads.incrementAndGet();
344           numReadFailures.addAndGet(1); // fail the test
345           for (Result r : results) {
346             LOG.error("FAILED FOR " + r);
347             RegionLocations rl = ((ClusterConnection)connection).
348                 locateRegion(tableName, r.getRow(), true, true);
349             HRegionLocation locations[] = rl.getRegionLocations();
350             for (HRegionLocation h : locations) {
351               LOG.error("LOCATION " + h);
352             }
353           }
354         }
355       }
356     }
357   }
358 
359   public static void main(String[] args) throws Exception {
360     Configuration conf = HBaseConfiguration.create();
361     IntegrationTestingUtility.setUseDistributedCluster(conf);
362     int ret = ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args);
363     System.exit(ret);
364   }
365 }