1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import java.io.IOException;
20 import java.util.Arrays;
21 import java.util.HashSet;
22 import java.util.Set;
23 import java.util.concurrent.atomic.AtomicLong;
24
25 import org.apache.commons.lang.math.RandomUtils;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HRegionLocation;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.Get;
32
33 import org.apache.hadoop.hbase.client.Consistency;
34 import org.apache.hadoop.hbase.client.HTableInterface;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
38
39
40 public class MultiThreadedReader extends MultiThreadedAction
41 {
42 private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
43
44 protected Set<HBaseReaderThread> readers = new HashSet<HBaseReaderThread>();
45 private final double verifyPercent;
46 protected volatile boolean aborted;
47
48 protected MultiThreadedWriterBase writer = null;
49
50
51
52
53
54
55 private final AtomicLong numUniqueKeysVerified = new AtomicLong();
56
57
58
59
60
61 public static final int DEFAULT_MAX_ERRORS = 10;
62
63
64
65
66
67
68
69 public static final int DEFAULT_KEY_WINDOW = 0;
70
71
72
73
74 public static final int DEFAULT_BATCH_SIZE = 1;
75
76 protected AtomicLong numKeysVerified = new AtomicLong(0);
77 protected AtomicLong numReadErrors = new AtomicLong(0);
78 protected AtomicLong numReadFailures = new AtomicLong(0);
79 protected AtomicLong nullResult = new AtomicLong(0);
80 private int maxErrors = DEFAULT_MAX_ERRORS;
81 private int keyWindow = DEFAULT_KEY_WINDOW;
82 private int batchSize = DEFAULT_BATCH_SIZE;
83 private int regionReplicaId = -1;
84
85 public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
86 TableName tableName, double verifyPercent) throws IOException {
87 super(dataGen, conf, tableName, "R");
88 this.verifyPercent = verifyPercent;
89 }
90
91 public void linkToWriter(MultiThreadedWriterBase writer) {
92 this.writer = writer;
93 writer.setTrackWroteKeys(true);
94 }
95
96 public void setMaxErrors(int maxErrors) {
97 this.maxErrors = maxErrors;
98 }
99
100 public void setKeyWindow(int keyWindow) {
101 this.keyWindow = keyWindow;
102 }
103
104 public void setMultiGetBatchSize(int batchSize) {
105 this.batchSize = batchSize;
106 }
107
108 public void setRegionReplicaId(int regionReplicaId) {
109 this.regionReplicaId = regionReplicaId;
110 }
111
112 @Override
113 public void start(long startKey, long endKey, int numThreads) throws IOException {
114 super.start(startKey, endKey, numThreads);
115 if (verbose) {
116 LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
117 }
118
119 addReaderThreads(numThreads);
120 startThreads(readers);
121 }
122
123 protected void addReaderThreads(int numThreads) throws IOException {
124 for (int i = 0; i < numThreads; ++i) {
125 HBaseReaderThread reader = createReaderThread(i);
126 readers.add(reader);
127 }
128 }
129
130 protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
131 HBaseReaderThread reader = new HBaseReaderThread(readerId);
132 Threads.setLoggingUncaughtExceptionHandler(reader);
133 return reader;
134 }
135
136 public class HBaseReaderThread extends Thread {
137 protected final int readerId;
138 protected final Table table;
139
140
141 private long curKey;
142
143
144 protected long startTimeMs;
145
146
147 private boolean readingRandomKey;
148
149 private boolean printExceptionTrace = true;
150
151
152
153
154
155 public HBaseReaderThread(int readerId) throws IOException {
156 this.readerId = readerId;
157 table = createTable();
158 setName(getClass().getSimpleName() + "_" + readerId);
159 }
160
161 protected HTableInterface createTable() throws IOException {
162 return connection.getTable(tableName);
163 }
164
165 @Override
166 public void run() {
167 try {
168 runReader();
169 } finally {
170 closeTable();
171 numThreadsWorking.decrementAndGet();
172 }
173 }
174
175 protected void closeTable() {
176 try {
177 if (table != null) {
178 table.close();
179 }
180 } catch (IOException e) {
181 LOG.error("Error closing table", e);
182 }
183 }
184
185 private void runReader() {
186 if (verbose) {
187 LOG.info("Started thread #" + readerId + " for reads...");
188 }
189
190 startTimeMs = System.currentTimeMillis();
191 curKey = startKey;
192 long [] keysForThisReader = new long[batchSize];
193 while (curKey < endKey && !aborted) {
194 int readingRandomKeyStartIndex = -1;
195 int numKeys = 0;
196
197 do {
198 long k = getNextKeyToRead();
199 if (k < startKey || k >= endKey) {
200 numReadErrors.incrementAndGet();
201 throw new AssertionError("Load tester logic error: proposed key " +
202 "to read " + k + " is out of range (startKey=" + startKey +
203 ", endKey=" + endKey + ")");
204 }
205 if (k % numThreads != readerId ||
206 writer != null && writer.failedToWriteKey(k)) {
207
208
209 continue;
210 }
211 keysForThisReader[numKeys] = k;
212 if (readingRandomKey && readingRandomKeyStartIndex == -1) {
213
214 readingRandomKeyStartIndex = numKeys;
215 }
216 numKeys++;
217 } while (numKeys < batchSize && curKey < endKey && !aborted);
218
219 if (numKeys > 0) {
220 readKey(keysForThisReader);
221
222 numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ?
223 numKeys : readingRandomKeyStartIndex);
224 }
225 }
226 }
227
228
229
230
231
232
233 private long maxKeyWeCanRead() {
234 long insertedUpToKey = writer.wroteUpToKey();
235 if (insertedUpToKey >= endKey - 1) {
236
237
238 return endKey - 1;
239 }
240 return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
241 }
242
243 protected long getNextKeyToRead() {
244 readingRandomKey = false;
245 if (writer == null || curKey <= maxKeyWeCanRead()) {
246 return curKey++;
247 }
248
249
250 long maxKeyToRead;
251 while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
252
253
254
255 Threads.sleepWithoutInterrupt(50);
256 }
257
258 if (curKey <= maxKeyToRead) {
259
260
261 return curKey++;
262 }
263
264
265
266
267
268 readingRandomKey = true;
269 return startKey + Math.abs(RandomUtils.nextLong())
270 % (maxKeyToRead - startKey + 1);
271 }
272
273 private Get[] readKey(long[] keysToRead) {
274 Get [] gets = new Get[keysToRead.length];
275 int i = 0;
276 for (long keyToRead : keysToRead) {
277 try {
278 gets[i] = createGet(keyToRead);
279 if (keysToRead.length == 1) {
280 queryKey(gets[i], RandomUtils.nextInt(100) < verifyPercent, keyToRead);
281 }
282 i++;
283 } catch (IOException e) {
284 numReadFailures.addAndGet(1);
285 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
286 + ", time from start: "
287 + (System.currentTimeMillis() - startTimeMs) + " ms");
288 if (printExceptionTrace) {
289 LOG.warn(e);
290 printExceptionTrace = false;
291 }
292 }
293 }
294 if (keysToRead.length > 1) {
295 try {
296 queryKey(gets, RandomUtils.nextInt(100) < verifyPercent, keysToRead);
297 } catch (IOException e) {
298 numReadFailures.addAndGet(gets.length);
299 for (long keyToRead : keysToRead) {
300 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
301 + ", time from start: "
302 + (System.currentTimeMillis() - startTimeMs) + " ms");
303 }
304 if (printExceptionTrace) {
305 LOG.warn(e);
306 printExceptionTrace = false;
307 }
308 }
309 }
310 return gets;
311 }
312
313 protected Get createGet(long keyToRead) throws IOException {
314 Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
315 String cfsString = "";
316 byte[][] columnFamilies = dataGenerator.getColumnFamilies();
317 for (byte[] cf : columnFamilies) {
318 get.addFamily(cf);
319 if (verbose) {
320 if (cfsString.length() > 0) {
321 cfsString += ", ";
322 }
323 cfsString += "[" + Bytes.toStringBinary(cf) + "]";
324 }
325 }
326 get = dataGenerator.beforeGet(keyToRead, get);
327 if (regionReplicaId > 0) {
328 get.setReplicaId(regionReplicaId);
329 get.setConsistency(Consistency.TIMELINE);
330 }
331 if (verbose) {
332 LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
333 }
334 return get;
335 }
336
337 public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
338
339 long start = System.nanoTime();
340
341 Result[] results = table.get(Arrays.asList(gets));
342 long end = System.nanoTime();
343 verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
344 }
345
346 public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
347
348
349 long start = System.nanoTime();
350
351 Result result = table.get(get);
352 long end = System.nanoTime();
353 verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
354 }
355
356 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
357 Result[] results, Table table, boolean isNullExpected)
358 throws IOException {
359 totalOpTimeMs.addAndGet(elapsedNano / 1000000);
360 numKeys.addAndGet(gets.length);
361 int i = 0;
362 for (Result result : results) {
363 verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
364 isNullExpected);
365 }
366 }
367
368 protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
369 Result result, Table table, boolean isNullExpected)
370 throws IOException {
371 verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano,
372 new Result[]{result}, table, isNullExpected);
373 }
374
375 private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get,
376 Result result, Table table, boolean isNullExpected) throws IOException {
377 if (!result.isEmpty()) {
378 if (verify) {
379 numKeysVerified.incrementAndGet();
380 }
381 } else {
382 HRegionLocation hloc = connection.getRegionLocation(tableName,
383 get.getRow(), false);
384 String rowKey = Bytes.toString(get.getRow());
385 LOG.info("Key = " + rowKey + ", Region location: " + hloc);
386 if(isNullExpected) {
387 nullResult.incrementAndGet();
388 LOG.debug("Null result obtained for the key ="+rowKey);
389 return;
390 }
391 }
392 boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
393 long numErrorsAfterThis = 0;
394 if (isOk) {
395 long cols = 0;
396
397 for (byte[] cf : result.getMap().keySet()) {
398 cols += result.getFamilyMap(cf).size();
399 }
400 numCols.addAndGet(cols);
401 } else {
402 if (writer != null) {
403 LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
404 }
405 numErrorsAfterThis = numReadErrors.incrementAndGet();
406 }
407
408 if (numErrorsAfterThis > maxErrors) {
409 LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
410 aborted = true;
411 }
412 }
413 }
414
415 public long getNumReadFailures() {
416 return numReadFailures.get();
417 }
418
419 public long getNumReadErrors() {
420 return numReadErrors.get();
421 }
422
423 public long getNumKeysVerified() {
424 return numKeysVerified.get();
425 }
426
427 public long getNumUniqueKeysVerified() {
428 return numUniqueKeysVerified.get();
429 }
430
431 public long getNullResultsCount() {
432 return nullResult.get();
433 }
434
435 @Override
436 protected String progressInfo() {
437 StringBuilder sb = new StringBuilder();
438 appendToStatus(sb, "verified", numKeysVerified.get());
439 appendToStatus(sb, "READ FAILURES", numReadFailures.get());
440 appendToStatus(sb, "READ ERRORS", numReadErrors.get());
441 appendToStatus(sb, "NULL RESULT", nullResult.get());
442 return sb.toString();
443 }
444 }