1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
20 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
21
22 import java.io.IOException;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HRegionLocation;
39 import org.apache.hadoop.hbase.RegionLocations;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.client.ClusterConnection;
42 import org.apache.hadoop.hbase.client.HConnection;
43 import org.apache.hadoop.hbase.client.HConnectionManager;
44 import org.apache.hadoop.hbase.client.Result;
45 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
46 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
47 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
48 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
49 import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
50 import org.apache.hadoop.util.StringUtils;
51
52 import com.google.common.base.Preconditions;
53
54
55
56
57
58 public abstract class MultiThreadedAction {
59 private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
60
61 protected final TableName tableName;
62 protected final Configuration conf;
63 protected final HConnection connection;
64
65 protected int numThreads = 1;
66
67
68 protected long startKey = 0;
69
70
71 protected long endKey = 1;
72
73 protected AtomicInteger numThreadsWorking = new AtomicInteger();
74 protected AtomicLong numKeys = new AtomicLong();
75 protected AtomicLong numCols = new AtomicLong();
76 protected AtomicLong totalOpTimeMs = new AtomicLong();
77 protected boolean verbose = false;
78
79 protected LoadTestDataGenerator dataGenerator = null;
80
81
82
83
84
85
86
87
88 public static class DefaultDataGenerator extends LoadTestDataGenerator {
89 private byte[][] columnFamilies = null;
90 private int minColumnsPerKey;
91 private int maxColumnsPerKey;
92 private final Random random = new Random();
93
94 public DefaultDataGenerator(int minValueSize, int maxValueSize,
95 int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
96 super(minValueSize, maxValueSize);
97 this.columnFamilies = columnFamilies;
98 this.minColumnsPerKey = minColumnsPerKey;
99 this.maxColumnsPerKey = maxColumnsPerKey;
100 }
101
102 public DefaultDataGenerator(byte[]... columnFamilies) {
103
104 this(256, 1024, 1, 10, columnFamilies);
105 }
106
107 @Override
108 public byte[] getDeterministicUniqueKey(long keyBase) {
109 return LoadTestKVGenerator.md5PrefixedKey(keyBase).getBytes();
110 }
111
112 @Override
113 public byte[][] getColumnFamilies() {
114 return columnFamilies;
115 }
116
117 @Override
118 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
119 int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
120 byte[][] columns = new byte[numColumns][];
121 for (int i = 0; i < numColumns; ++i) {
122 columns[i] = Integer.toString(i).getBytes();
123 }
124 return columns;
125 }
126
127 @Override
128 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
129 return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
130 }
131
132 @Override
133 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
134 return LoadTestKVGenerator.verify(value, rowKey, cf, column);
135 }
136
137 @Override
138 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
139 return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
140 }
141 }
142
143
144 private String actionLetter;
145
146
147 private boolean streamingCounters;
148
149 public static final int REPORTING_INTERVAL_MS = 5000;
150
151 public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf,
152 TableName tableName,
153 String actionLetter) throws IOException {
154 this.conf = conf;
155 this.dataGenerator = dataGen;
156 this.tableName = tableName;
157 this.actionLetter = actionLetter;
158 this.connection = HConnectionManager.createConnection(conf);
159 }
160
161 public void start(long startKey, long endKey, int numThreads) throws IOException {
162 this.startKey = startKey;
163 this.endKey = endKey;
164 this.numThreads = numThreads;
165 (new Thread(new ProgressReporter(actionLetter))).start();
166 }
167
168 private static String formatTime(long elapsedTime) {
169 String format = String.format("%%0%dd", 2);
170 elapsedTime = elapsedTime / 1000;
171 String seconds = String.format(format, elapsedTime % 60);
172 String minutes = String.format(format, (elapsedTime % 3600) / 60);
173 String hours = String.format(format, elapsedTime / 3600);
174 String time = hours + ":" + minutes + ":" + seconds;
175 return time;
176 }
177
178
179 private class ProgressReporter implements Runnable {
180
181 private String reporterId = "";
182
183 public ProgressReporter(String id) {
184 this.reporterId = id;
185 }
186
187 @Override
188 public void run() {
189 long startTime = System.currentTimeMillis();
190 long priorNumKeys = 0;
191 long priorCumulativeOpTime = 0;
192 int priorAverageKeysPerSecond = 0;
193
194
195 Threads.sleep(REPORTING_INTERVAL_MS);
196
197 while (numThreadsWorking.get() != 0) {
198 String threadsLeft =
199 "[" + reporterId + ":" + numThreadsWorking.get() + "] ";
200 if (numKeys.get() == 0) {
201 LOG.info(threadsLeft + "Number of keys = 0");
202 } else {
203 long numKeys = MultiThreadedAction.this.numKeys.get();
204 long time = System.currentTimeMillis() - startTime;
205 long totalOpTime = totalOpTimeMs.get();
206
207 long numKeysDelta = numKeys - priorNumKeys;
208 long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
209
210 double averageKeysPerSecond =
211 (time > 0) ? (numKeys * 1000 / time) : 0;
212
213 LOG.info(threadsLeft
214 + "Keys="
215 + numKeys
216 + ", cols="
217 + StringUtils.humanReadableInt(numCols.get())
218 + ", time="
219 + formatTime(time)
220 + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= "
221 + numKeys * 1000 / time + ", latency=" + totalOpTime
222 / numKeys + " ms]") : "")
223 + ((numKeysDelta > 0) ? (" Current: [" + "keys/s="
224 + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency="
225 + totalOpTimeDelta / numKeysDelta + " ms]") : "")
226 + progressInfo());
227
228 if (streamingCounters) {
229 printStreamingCounters(numKeysDelta,
230 averageKeysPerSecond - priorAverageKeysPerSecond);
231 }
232
233 priorNumKeys = numKeys;
234 priorCumulativeOpTime = totalOpTime;
235 priorAverageKeysPerSecond = (int) averageKeysPerSecond;
236 }
237
238 Threads.sleep(REPORTING_INTERVAL_MS);
239 }
240 }
241
242 private void printStreamingCounters(long numKeysDelta,
243 double avgKeysPerSecondDelta) {
244
245
246 System.err.println("reporter:counter:numKeys," + reporterId + ","
247 + numKeysDelta);
248 System.err.println("reporter:counter:numCols," + reporterId + ","
249 + numCols.get());
250 System.err.println("reporter:counter:avgKeysPerSecond," + reporterId
251 + "," + (long) (avgKeysPerSecondDelta));
252 }
253 }
254
255 public void close() {
256 if (connection != null) {
257 try {
258 connection.close();
259 } catch (Exception ex) {
260 LOG.warn("Could not close the connection: " + ex);
261 }
262 }
263 }
264
265 public void waitForFinish() {
266 while (numThreadsWorking.get() != 0) {
267 Threads.sleepWithoutInterrupt(1000);
268 }
269 close();
270 }
271
272 public boolean isDone() {
273 return (numThreadsWorking.get() == 0);
274 }
275
276 protected void startThreads(Collection<? extends Thread> threads) {
277 numThreadsWorking.addAndGet(threads.size());
278 for (Thread thread : threads) {
279 thread.start();
280 }
281 }
282
283
284 public long getEndKey() {
285 return endKey;
286 }
287
288
289 protected abstract String progressInfo();
290
291 protected static void appendToStatus(StringBuilder sb, String desc,
292 long v) {
293 if (v == 0) {
294 return;
295 }
296 sb.append(", ");
297 sb.append(desc);
298 sb.append("=");
299 sb.append(v);
300 }
301
302 protected static void appendToStatus(StringBuilder sb, String desc,
303 String v) {
304 sb.append(", ");
305 sb.append(desc);
306 sb.append("=");
307 sb.append(v);
308 }
309
310
311
312
313
314 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
315 return verifyResultAgainstDataGenerator(result, verifyValues, false);
316 }
317
318
319
320
321
322
323
324
325
326
327 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
328 boolean verifyCfAndColumnIntegrity) {
329 String rowKeyStr = Bytes.toString(result.getRow());
330
331 if (result.isEmpty()) {
332 LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
333 printLocations(result);
334 return false;
335 }
336
337 if (!verifyValues && !verifyCfAndColumnIntegrity) {
338 return true;
339 }
340
341
342 byte[][] expectedCfs = dataGenerator.getColumnFamilies();
343 if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
344 LOG.error("Error checking data for key [" + rowKeyStr
345 + "], bad family count: " + result.getMap().size());
346 printLocations(result);
347 return false;
348 }
349
350
351 for (byte[] cf : result.getMap().keySet()) {
352 String cfStr = Bytes.toString(cf);
353 Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
354 if (columnValues == null) {
355 LOG.error("Error checking data for key [" + rowKeyStr
356 + "], no data for family [" + cfStr + "]]");
357 printLocations(result);
358 return false;
359 }
360
361 Map<String, MutationType> mutateInfo = null;
362 if (verifyCfAndColumnIntegrity || verifyValues) {
363 if (!columnValues.containsKey(MUTATE_INFO)) {
364 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
365 + cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
366 printLocations(result);
367 return false;
368 }
369
370 long cfHash = Arrays.hashCode(cf);
371
372 byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
373 mutateInfo = parseMutateInfo(mutateInfoValue);
374 for (Map.Entry<String, MutationType> mutate: mutateInfo.entrySet()) {
375 if (mutate.getValue() == MutationType.DELETE) {
376 byte[] column = Bytes.toBytes(mutate.getKey());
377 long columnHash = Arrays.hashCode(column);
378 long hashCode = cfHash + columnHash;
379 if (hashCode % 2 == 0) {
380 if (columnValues.containsKey(column)) {
381 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
382 + cfStr + "], column [" + mutate.getKey() + "]; should be deleted");
383 printLocations(result);
384 return false;
385 }
386 byte[] hashCodeBytes = Bytes.toBytes(hashCode);
387 columnValues.put(column, hashCodeBytes);
388 }
389 }
390 }
391
392
393 if (!columnValues.containsKey(INCREMENT)) {
394 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
395 + cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
396 printLocations(result);
397 return false;
398 }
399 long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
400 if (verifyValues) {
401 long amount = mutateInfo.isEmpty() ? 0 : cfHash;
402 long originalValue = Arrays.hashCode(result.getRow());
403 long extra = currentValue - originalValue;
404 if (extra != 0 && (amount == 0 || extra % amount != 0)) {
405 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
406 + cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
407 printLocations(result);
408 return false;
409 }
410 if (amount != 0 && extra != amount) {
411 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
412 + cfStr + "], column [increment], incremented [" + (extra / amount) + "] times");
413 }
414 }
415
416
417 if (verifyCfAndColumnIntegrity
418 && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
419 String colsStr = "";
420 for (byte[] col : columnValues.keySet()) {
421 if (colsStr.length() > 0) {
422 colsStr += ", ";
423 }
424 colsStr += "[" + Bytes.toString(col) + "]";
425 }
426 LOG.error("Error checking data for key [" + rowKeyStr
427 + "], bad columns for family [" + cfStr + "]: " + colsStr);
428 printLocations(result);
429 return false;
430 }
431
432 if (verifyValues) {
433 for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
434 String column = Bytes.toString(kv.getKey());
435 MutationType mutation = mutateInfo.get(column);
436 boolean verificationNeeded = true;
437 byte[] bytes = kv.getValue();
438 if (mutation != null) {
439 boolean mutationVerified = true;
440 long columnHash = Arrays.hashCode(kv.getKey());
441 long hashCode = cfHash + columnHash;
442 byte[] hashCodeBytes = Bytes.toBytes(hashCode);
443 if (mutation == MutationType.APPEND) {
444 int offset = bytes.length - hashCodeBytes.length;
445 mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes,
446 0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
447 if (mutationVerified) {
448 int n = 1;
449 while (true) {
450 int newOffset = offset - hashCodeBytes.length;
451 if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0,
452 hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) {
453 break;
454 }
455 offset = newOffset;
456 n++;
457 }
458 if (n > 1) {
459 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
460 + cfStr + "], column [" + column + "], appended [" + n + "] times");
461 }
462 byte[] dest = new byte[offset];
463 System.arraycopy(bytes, 0, dest, 0, offset);
464 bytes = dest;
465 }
466 } else if (hashCode % 2 == 0) {
467 mutationVerified = Bytes.equals(bytes, hashCodeBytes);
468 verificationNeeded = false;
469 }
470 if (!mutationVerified) {
471 LOG.error("Error checking data for key [" + rowKeyStr
472 + "], mutation checking failed for column family [" + cfStr + "], column ["
473 + column + "]; mutation [" + mutation + "], hashCode ["
474 + hashCode + "], verificationNeeded ["
475 + verificationNeeded + "]");
476 printLocations(result);
477 return false;
478 }
479 }
480 if (verificationNeeded &&
481 !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) {
482 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
483 + cfStr + "], column [" + column + "], mutation [" + mutation
484 + "]; value of length " + bytes.length);
485 printLocations(result);
486 return false;
487 }
488 }
489 }
490 }
491 }
492 return true;
493 }
494
495 private void printLocations(Result r) {
496 RegionLocations rl = null;
497 if (r == null) {
498 LOG.info("FAILED FOR null Result");
499 return;
500 }
501 LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale());
502 if (r.getRow() == null) {
503 return;
504 }
505 try {
506 rl = ((ClusterConnection)connection).locateRegion(tableName, r.getRow(), true, true);
507 } catch (IOException e) {
508 LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow()));
509 }
510 HRegionLocation locations[] = rl.getRegionLocations();
511 for (HRegionLocation h : locations) {
512 LOG.info("LOCATION " + h);
513 }
514 }
515
516 private String resultToString(Result result) {
517 StringBuilder sb = new StringBuilder();
518 sb.append("cells=");
519 if(result.isEmpty()) {
520 sb.append("NONE");
521 return sb.toString();
522 }
523 sb.append("{");
524 boolean moreThanOne = false;
525 for(Cell cell : result.listCells()) {
526 if(moreThanOne) {
527 sb.append(", ");
528 } else {
529 moreThanOne = true;
530 }
531 sb.append(CellUtil.toString(cell, true));
532 }
533 sb.append("}");
534 return sb.toString();
535 }
536
537
538 private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
539 Map<String, MutationType> mi = new HashMap<String, MutationType>();
540 if (mutateInfo != null) {
541 String mutateInfoStr = Bytes.toString(mutateInfo);
542 String[] mutations = mutateInfoStr.split("#");
543 for (String mutation: mutations) {
544 if (mutation.isEmpty()) continue;
545 Preconditions.checkArgument(mutation.contains(":"),
546 "Invalid mutation info " + mutation);
547 int p = mutation.indexOf(":");
548 String column = mutation.substring(0, p);
549 MutationType type = MutationType.valueOf(
550 Integer.parseInt(mutation.substring(p+1)));
551 mi.put(column, type);
552 }
553 }
554 return mi;
555 }
556 }