View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
28  import org.apache.hadoop.hbase.util.ConstantDelayQueue;
29  import org.apache.hadoop.hbase.util.LoadTestTool;
30  import org.apache.hadoop.hbase.util.MultiThreadedUpdater;
31  import org.apache.hadoop.hbase.util.MultiThreadedWriter;
32  import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
33  import org.apache.hadoop.hbase.util.Threads;
34  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
35  import org.apache.hadoop.util.StringUtils;
36  import org.apache.hadoop.util.ToolRunner;
37  import org.junit.Assert;
38  import org.junit.Test;
39  import org.junit.experimental.categories.Category;
40  
41  import com.google.common.collect.Lists;
42  
43  /**
44   * Integration test for testing async wal replication to secondary region replicas. Sets up a table
45   * with given region replication (default 2), and uses LoadTestTool client writer, updater and
46   * reader threads for writes and reads and verification. It uses a delay queue with a given delay
47   * ("read_delay_ms", default 5000ms) between the writer/updater and reader threads to make the
48   * written items available to readers. This means that a reader will only start reading from a row
49   * written by the writer / updater after 5secs has passed. The reader thread performs the reads from
50   * the given region replica id (default 1) to perform the reads. Async wal replication has to finish
51   * with the replication of the edits before read_delay_ms to the given region replica id so that
52   * the read and verify will not fail.
53   *
54   * The job will run for <b>at least<b> given runtime (default 10min) by running a concurrent
55   * writer and reader workload followed by a concurrent updater and reader workload for
56   * num_keys_per_server.
57   *<p>
58   * Example usage:
59   * <pre>
60   * hbase org.apache.hadoop.hbase.IntegrationTestRegionReplicaReplication
61   * -DIntegrationTestRegionReplicaReplication.num_keys_per_server=10000
62   * -Dhbase.IntegrationTestRegionReplicaReplication.runtime=600000
63   * -DIntegrationTestRegionReplicaReplication.read_delay_ms=5000
64   * -DIntegrationTestRegionReplicaReplication.region_replication=3
65   * -DIntegrationTestRegionReplicaReplication.region_replica_id=2
66   * -DIntegrationTestRegionReplicaReplication.num_read_threads=100
67   * -DIntegrationTestRegionReplicaReplication.num_write_threads=100
68   * </pre>
69   */
70  @Category(IntegrationTests.class)
71  public class IntegrationTestRegionReplicaReplication extends IntegrationTestIngest {
72  
73    private static final String TEST_NAME
74      = IntegrationTestRegionReplicaReplication.class.getSimpleName();
75  
76    private static final String OPT_READ_DELAY_MS = "read_delay_ms";
77  
78    private static final int DEFAULT_REGION_REPLICATION = 2;
79    private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
80    private static final String[] DEFAULT_COLUMN_FAMILIES = new String[] {"f1", "f2", "f3"};
81  
82    @Override
83    protected int getMinServerCount() {
84      return SERVER_COUNT;
85    }
86  
87    @Override
88    public void setConf(Configuration conf) {
89      conf.setIfUnset(
90        String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION),
91        String.valueOf(DEFAULT_REGION_REPLICATION));
92  
93      conf.setIfUnset(
94        String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES),
95        StringUtils.join(",", DEFAULT_COLUMN_FAMILIES));
96  
97      conf.setBoolean("hbase.table.sanity.checks", true);
98  
99      // enable async wal replication to region replicas for unit tests
100     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
101     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
102 
103     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB
104     conf.setInt("hbase.hstore.blockingStoreFiles", 100);
105 
106     super.setConf(conf);
107   }
108 
109   @Override
110   @Test
111   public void testIngest() throws Exception {
112     runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20);
113   }
114 
115   /**
116    * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
117    * threads to become available to the MultiThradedReader threads. We add this delay because of
118    * the async nature of the wal replication to region replicas.
119    */
120   public static class DelayingMultiThreadedWriter extends MultiThreadedWriter {
121     private long delayMs;
122     public DelayingMultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
123         TableName tableName) throws IOException {
124       super(dataGen, conf, tableName);
125     }
126     @Override
127     protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
128       this.delayMs = conf.getLong(String.format("%s.%s",
129         IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
130       return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
131     }
132   }
133 
134   /**
135    * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
136    * threads to become available to the MultiThradedReader threads. We add this delay because of
137    * the async nature of the wal replication to region replicas.
138    */
139   public static class DelayingMultiThreadedUpdater extends MultiThreadedUpdater {
140     private long delayMs;
141     public DelayingMultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
142         TableName tableName, double updatePercent) throws IOException {
143       super(dataGen, conf, tableName, updatePercent);
144     }
145     @Override
146     protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
147       this.delayMs = conf.getLong(String.format("%s.%s",
148         IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
149       return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
150     }
151   }
152 
153   @Override
154   protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
155       int recordSize, int writeThreads, int readThreads) throws Exception {
156 
157     LOG.info("Running ingest");
158     LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
159 
160     // sleep for some time so that the cache for disabled tables does not interfere.
161     Threads.sleep(
162       getConf().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
163         5000) + 1000);
164 
165     long start = System.currentTimeMillis();
166     String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
167     long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
168     long startKey = 0;
169 
170     long numKeys = getNumKeys(keysPerServerPerIter);
171     while (System.currentTimeMillis() - start < 0.9 * runtime) {
172       LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
173           ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
174 
175       int verifyPercent = 100;
176       int updatePercent = 20;
177       int ret = -1;
178       int regionReplicaId = conf.getInt(String.format("%s.%s"
179         , TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1);
180 
181       // we will run writers and readers at the same time.
182       List<String> args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
183       args.add("-write");
184       args.add(String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads));
185       args.add("-" + LoadTestTool.OPT_MULTIPUT);
186       args.add("-writer");
187       args.add(DelayingMultiThreadedWriter.class.getName()); // inject writer class
188       args.add("-read");
189       args.add(String.format("%d:%d", verifyPercent, readThreads));
190       args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
191       args.add(String.valueOf(regionReplicaId));
192 
193       ret = loadTool.run(args.toArray(new String[args.size()]));
194       if (0 != ret) {
195         String errorMsg = "Load failed with error code " + ret;
196         LOG.error(errorMsg);
197         Assert.fail(errorMsg);
198       }
199 
200       args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
201       args.add("-update");
202       args.add(String.format("%s:%s:1", updatePercent, writeThreads));
203       args.add("-updater");
204       args.add(DelayingMultiThreadedUpdater.class.getName()); // inject updater class
205       args.add("-read");
206       args.add(String.format("%d:%d", verifyPercent, readThreads));
207       args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
208       args.add(String.valueOf(regionReplicaId));
209 
210       ret = loadTool.run(args.toArray(new String[args.size()]));
211       if (0 != ret) {
212         String errorMsg = "Load failed with error code " + ret;
213         LOG.error(errorMsg);
214         Assert.fail(errorMsg);
215       }
216       startKey += numKeys;
217     }
218   }
219 
220   public static void main(String[] args) throws Exception {
221     Configuration conf = HBaseConfiguration.create();
222     IntegrationTestingUtility.setUseDistributedCluster(conf);
223     int ret = ToolRunner.run(conf, new IntegrationTestRegionReplicaReplication(), args);
224     System.exit(ret);
225   }
226 }