1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import com.google.common.base.Objects;
22 import com.google.common.collect.Sets;
23 import com.yammer.metrics.core.Histogram;
24 import org.apache.commons.cli.CommandLine;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
29 import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
30 import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
31 import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
32 import org.apache.hadoop.hbase.chaos.policies.Policy;
33 import org.apache.hadoop.hbase.client.Admin;
34 import org.apache.hadoop.hbase.ipc.RpcClient;
35 import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
36 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.util.YammerHistogramUtils;
39 import org.apache.hadoop.mapreduce.Counters;
40 import org.apache.hadoop.mapreduce.Job;
41 import org.apache.hadoop.util.ToolRunner;
42 import org.junit.experimental.categories.Category;
43
44 import java.util.*;
45 import java.util.concurrent.Callable;
46
47 import static java.lang.String.format;
48 import static org.junit.Assert.assertEquals;
49 import static org.junit.Assert.assertNotNull;
50 import static org.junit.Assert.assertTrue;
51
52
53
54
55
56
57
58
59 @Category(IntegrationTests.class)
60 public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
61
62 private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class);
63
64 private static final String SLEEP_TIME_KEY = "sleeptime";
65
66 private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
67 private static final String TABLE_NAME_KEY = "tableName";
68 private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
69 private static final String REPLICA_COUNT_KEY = "replicas";
70 private static final String REPLICA_COUNT_DEFAULT = "" + 3;
71 private static final String PRIMARY_TIMEOUT_KEY = "timeout";
72 private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000;
73 private static final String NUM_RS_KEY = "numRs";
74 private static final String NUM_RS_DEFAULT = "" + 3;
75
76
77 private enum Stat {
78 STDEV {
79 @Override
80 double apply(Histogram hist) {
81 return hist.stdDev();
82 }
83 },
84 FOUR_9S {
85 @Override
86 double apply(Histogram hist) {
87 return hist.getSnapshot().getValue(0.9999);
88 }
89 };
90
91 abstract double apply(Histogram hist);
92 }
93
94 private TableName tableName;
95 private long sleepTime;
96 private int replicaCount;
97 private int primaryTimeout;
98 private int clusterSize;
99
100
101
102
103 static class PerfEvalCallable implements Callable<TimingResult> {
104 private final Queue<String> argv = new LinkedList<String>();
105 private final Admin admin;
106
107 public PerfEvalCallable(Admin admin, String argv) {
108
109 this.admin = admin;
110 this.argv.addAll(Arrays.asList(argv.split(" ")));
111 LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
112 }
113
114 @Override
115 public TimingResult call() throws Exception {
116 PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
117 PerformanceEvaluation.checkTable(admin, opts);
118 PerformanceEvaluation.RunResult results[] = null;
119 long numRows = opts.totalRows;
120 long elapsedTime = 0;
121 if (opts.nomapred) {
122 results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
123 for (PerformanceEvaluation.RunResult r : results) {
124 elapsedTime = Math.max(elapsedTime, r.duration);
125 }
126 } else {
127 Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
128 Counters counters = job.getCounters();
129 numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
130 elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
131 }
132 return new TimingResult(numRows, elapsedTime, results);
133 }
134 }
135
136
137
138
139 static class TimingResult {
140 public final long numRows;
141 public final long elapsedTime;
142 public final PerformanceEvaluation.RunResult results[];
143
144 public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) {
145 this.numRows = numRows;
146 this.elapsedTime = elapsedTime;
147 this.results = results;
148 }
149
150 @Override
151 public String toString() {
152 return Objects.toStringHelper(this)
153 .add("numRows", numRows)
154 .add("elapsedTime", elapsedTime)
155 .toString();
156 }
157 }
158
159 @Override
160 public void setUp() throws Exception {
161 super.setUp();
162 Configuration conf = util.getConfiguration();
163
164
165
166 assertEquals("Master must be configured with StochasticLoadBalancer",
167 "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
168 conf.get("hbase.master.loadbalancer.class"));
169
170 assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
171 conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
172
173
174 conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
175
176 conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
177 conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
178 }
179
180 @Override
181 public void setUpCluster() throws Exception {
182 util = getTestingUtil(getConf());
183 util.initializeCluster(clusterSize);
184 }
185
186 @Override
187 public void setUpMonkey() throws Exception {
188 Policy p = new PeriodicRandomActionPolicy(sleepTime,
189 new RestartRandomRsExceptMetaAction(sleepTime),
190 new MoveRandomRegionOfTableAction(tableName));
191 this.monkey = new PolicyBasedChaosMonkey(util, p);
192
193 }
194
195 @Override
196 protected void addOptions() {
197 addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
198 + TABLE_NAME_DEFAULT + "'");
199 addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
200 + SLEEP_TIME_DEFAULT);
201 addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
202 + REPLICA_COUNT_DEFAULT);
203 addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
204 + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
205 addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
206 + NUM_RS_DEFAULT);
207 }
208
209 @Override
210 protected void processOptions(CommandLine cmd) {
211 tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
212 sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
213 replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
214 primaryTimeout =
215 Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
216 clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
217 LOG.debug(Objects.toStringHelper("Parsed Options")
218 .add(TABLE_NAME_KEY, tableName)
219 .add(SLEEP_TIME_KEY, sleepTime)
220 .add(REPLICA_COUNT_KEY, replicaCount)
221 .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
222 .add(NUM_RS_KEY, clusterSize)
223 .toString());
224 }
225
226 @Override
227 public int runTestFromCommandLine() throws Exception {
228 test();
229 return 0;
230 }
231
232 @Override
233 public TableName getTablename() {
234 return tableName;
235 }
236
237 @Override
238 protected Set<String> getColumnFamilies() {
239 return Sets.newHashSet(Bytes.toString(PerformanceEvaluation.FAMILY_NAME));
240 }
241
242
243 private static double calcMean(String desc, Stat stat, List<TimingResult> results) {
244 double sum = 0;
245 int count = 0;
246
247 for (TimingResult tr : results) {
248 for (PerformanceEvaluation.RunResult r : tr.results) {
249 assertNotNull("One of the run results is missing detailed run data.", r.hist);
250 sum += stat.apply(r.hist);
251 count += 1;
252 LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}");
253 }
254 }
255 return sum / count;
256 }
257
258 public void test() throws Exception {
259 int maxIters = 3;
260 String replicas = "--replicas=" + replicaCount;
261
262 String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
263 String writeOpts = format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4",
264 splitPolicy, tableName);
265 String readOpts =
266 format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
267 String replicaReadOpts = format("%s %s", replicas, readOpts);
268
269 ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<TimingResult>(maxIters);
270 ArrayList<TimingResult> resultsWithReplicas = new ArrayList<TimingResult>(maxIters);
271
272
273 LOG.debug("Populating table.");
274 new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call();
275
276
277 assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
278 DisabledRegionSplitPolicy.class.getName(),
279 util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
280 startMonkey();
281
282
283 for (int i = 0; i < maxIters; i++) {
284 LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
285 resultsWithoutReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
286
287 Thread.sleep(5000l);
288 }
289
290
291 cleanUpMonkey("Altering table.");
292 LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
293 IntegrationTestingUtility.setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
294 setUpMonkey();
295 startMonkey();
296
297
298 for (int i = 0; i < maxIters; i++) {
299 LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
300 resultsWithReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
301
302 Thread.sleep(5000l);
303 }
304
305
306
307 double withoutReplicasStdevMean =
308 calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas);
309 double withoutReplicas9999Mean =
310 calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas);
311 double withReplicasStdevMean =
312 calcMean("withReplicas", Stat.STDEV, resultsWithReplicas);
313 double withReplicas9999Mean =
314 calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas);
315
316 LOG.info(Objects.toStringHelper(this)
317 .add("withoutReplicas", resultsWithoutReplicas)
318 .add("withReplicas", resultsWithReplicas)
319 .add("withoutReplicasStdevMean", withoutReplicasStdevMean)
320 .add("withoutReplicas99.99Mean", withoutReplicas9999Mean)
321 .add("withReplicasStdevMean", withReplicasStdevMean)
322 .add("withReplicas99.99Mean", withReplicas9999Mean)
323 .toString());
324
325 assertTrue(
326 "Running with region replicas under chaos should have less request variance than without. "
327 + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms "
328 + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.",
329 withReplicasStdevMean <= withoutReplicasStdevMean);
330 assertTrue(
331 "Running with region replicas under chaos should improve 99.99pct latency. "
332 + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms "
333 + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.",
334 withReplicas9999Mean <= withoutReplicas9999Mean);
335 }
336
337 public static void main(String[] args) throws Exception {
338 Configuration conf = HBaseConfiguration.create();
339 IntegrationTestingUtility.setUseDistributedCluster(conf);
340 int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
341 System.exit(status);
342 }
343 }