1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.commons.cli.CommandLine;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FSDataInputStream;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.client.HTable;
31 import org.apache.hadoop.hbase.client.Connection;
32 import org.apache.hadoop.hbase.client.ConnectionFactory;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.client.TableSnapshotScanner;
38 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
39 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
41 import org.apache.hadoop.hbase.mapreduce.TableMapper;
42 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
43 import org.apache.hadoop.hbase.util.FSUtils;
44 import org.apache.hadoop.io.NullWritable;
45 import org.apache.hadoop.mapreduce.Counters;
46 import org.apache.hadoop.mapreduce.Job;
47 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
48 import org.apache.hadoop.util.StringUtils;
49 import org.apache.hadoop.util.ToolRunner;
50
51 import com.google.common.base.Stopwatch;
52
53
54
55
56
57 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
58 public class ScanPerformanceEvaluation extends AbstractHBaseTool {
59
60 private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
61
62 private String type;
63 private String file;
64 private String tablename;
65 private String snapshotName;
66 private String restoreDir;
67 private String caching;
68
69 @Override
70 public void setConf(Configuration conf) {
71 super.setConf(conf);
72 Path rootDir;
73 try {
74 rootDir = FSUtils.getRootDir(conf);
75 rootDir.getFileSystem(conf);
76 } catch (IOException ex) {
77 throw new RuntimeException(ex);
78 }
79 }
80
81 @Override
82 protected void addOptions() {
83 this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
84 this.addOptWithArg("f", "file", "the filename to read from");
85 this.addOptWithArg("tn", "table", "the tablename to read from");
86 this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
87 this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
88 this.addOptWithArg("ch", "caching", "scanner caching value");
89 }
90
91 @Override
92 protected void processOptions(CommandLine cmd) {
93 type = cmd.getOptionValue("type");
94 file = cmd.getOptionValue("file");
95 tablename = cmd.getOptionValue("table");
96 snapshotName = cmd.getOptionValue("snapshot");
97 restoreDir = cmd.getOptionValue("restoredir");
98 caching = cmd.getOptionValue("caching");
99 }
100
101 protected void testHdfsStreaming(Path filename) throws IOException {
102 byte[] buf = new byte[1024];
103 FileSystem fs = filename.getFileSystem(getConf());
104
105
106 Stopwatch fileOpenTimer = new Stopwatch();
107 Stopwatch streamTimer = new Stopwatch();
108
109 fileOpenTimer.start();
110 FSDataInputStream in = fs.open(filename);
111 fileOpenTimer.stop();
112
113 long totalBytes = 0;
114 streamTimer.start();
115 while (true) {
116 int read = in.read(buf);
117 if (read < 0) {
118 break;
119 }
120 totalBytes += read;
121 }
122 streamTimer.stop();
123
124 double throughput = (double)totalBytes / streamTimer.elapsedTime(TimeUnit.SECONDS);
125
126 System.out.println("HDFS streaming: ");
127 System.out.println("total time to open: " + fileOpenTimer.elapsedMillis() + " ms");
128 System.out.println("total time to read: " + streamTimer.elapsedMillis() + " ms");
129 System.out.println("total bytes: " + totalBytes + " bytes ("
130 + StringUtils.humanReadableInt(totalBytes) + ")");
131 System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
132 }
133
134 private Scan getScan() {
135 Scan scan = new Scan();
136 scan.setCacheBlocks(false);
137 scan.setMaxVersions(1);
138 scan.setScanMetricsEnabled(true);
139 if (caching != null) {
140 scan.setCaching(Integer.parseInt(caching));
141 }
142
143 return scan;
144 }
145
146 public void testScan() throws IOException {
147 Stopwatch tableOpenTimer = new Stopwatch();
148 Stopwatch scanOpenTimer = new Stopwatch();
149 Stopwatch scanTimer = new Stopwatch();
150
151 tableOpenTimer.start();
152 Table table = new HTable(getConf(), TableName.valueOf(tablename));
153 tableOpenTimer.stop();
154
155 Scan scan = getScan();
156 scanOpenTimer.start();
157 ResultScanner scanner = table.getScanner(scan);
158 scanOpenTimer.stop();
159
160 long numRows = 0;
161 long numCells = 0;
162 scanTimer.start();
163 while (true) {
164 Result result = scanner.next();
165 if (result == null) {
166 break;
167 }
168 numRows++;
169
170 numCells += result.rawCells().length;
171 }
172 scanTimer.stop();
173 scanner.close();
174 table.close();
175
176 ScanMetrics metrics = scan.getScanMetrics();
177 long totalBytes = metrics.countOfBytesInResults.get();
178 double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
179 double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
180 double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
181
182 System.out.println("HBase scan: ");
183 System.out.println("total time to open table: " + tableOpenTimer.elapsedMillis() + " ms");
184 System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
185 System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
186
187 System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
188
189 System.out.println("total bytes: " + totalBytes + " bytes ("
190 + StringUtils.humanReadableInt(totalBytes) + ")");
191 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
192 System.out.println("total rows : " + numRows);
193 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
194 System.out.println("total cells : " + numCells);
195 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
196 }
197
198
199 public void testSnapshotScan() throws IOException {
200 Stopwatch snapshotRestoreTimer = new Stopwatch();
201 Stopwatch scanOpenTimer = new Stopwatch();
202 Stopwatch scanTimer = new Stopwatch();
203
204 Path restoreDir = new Path(this.restoreDir);
205
206 snapshotRestoreTimer.start();
207 restoreDir.getFileSystem(conf).delete(restoreDir, true);
208 snapshotRestoreTimer.stop();
209
210 Scan scan = getScan();
211 scanOpenTimer.start();
212 TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
213 scanOpenTimer.stop();
214
215 long numRows = 0;
216 long numCells = 0;
217 scanTimer.start();
218 while (true) {
219 Result result = scanner.next();
220 if (result == null) {
221 break;
222 }
223 numRows++;
224
225 numCells += result.rawCells().length;
226 }
227 scanTimer.stop();
228 scanner.close();
229
230 ScanMetrics metrics = scanner.getScanMetrics();
231 long totalBytes = metrics.countOfBytesInResults.get();
232 double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
233 double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
234 double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
235
236 System.out.println("HBase scan snapshot: ");
237 System.out.println("total time to restore snapshot: " + snapshotRestoreTimer.elapsedMillis() + " ms");
238 System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
239 System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
240
241 System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
242
243 System.out.println("total bytes: " + totalBytes + " bytes ("
244 + StringUtils.humanReadableInt(totalBytes) + ")");
245 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
246 System.out.println("total rows : " + numRows);
247 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
248 System.out.println("total cells : " + numCells);
249 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
250
251 }
252
253 public static enum ScanCounter {
254 NUM_ROWS,
255 NUM_CELLS,
256 }
257
258 public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
259 @Override
260 protected void map(ImmutableBytesWritable key, Result value,
261 Context context) throws IOException,
262 InterruptedException {
263 context.getCounter(ScanCounter.NUM_ROWS).increment(1);
264 context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
265 }
266 }
267
268 public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
269 Stopwatch scanOpenTimer = new Stopwatch();
270 Stopwatch scanTimer = new Stopwatch();
271
272 Scan scan = getScan();
273
274 String jobName = "testScanMapReduce";
275
276 Job job = new Job(conf);
277 job.setJobName(jobName);
278
279 job.setJarByClass(getClass());
280
281 TableMapReduceUtil.initTableMapperJob(
282 this.tablename,
283 scan,
284 MyMapper.class,
285 NullWritable.class,
286 NullWritable.class,
287 job
288 );
289
290 job.setNumReduceTasks(0);
291 job.setOutputKeyClass(NullWritable.class);
292 job.setOutputValueClass(NullWritable.class);
293 job.setOutputFormatClass(NullOutputFormat.class);
294
295 scanTimer.start();
296 job.waitForCompletion(true);
297 scanTimer.stop();
298
299 Counters counters = job.getCounters();
300 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
301 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
302
303 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
304 double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
305 double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
306 double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
307
308 System.out.println("HBase scan mapreduce: ");
309 System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
310 System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
311
312 System.out.println("total bytes: " + totalBytes + " bytes ("
313 + StringUtils.humanReadableInt(totalBytes) + ")");
314 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
315 System.out.println("total rows : " + numRows);
316 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
317 System.out.println("total cells : " + numCells);
318 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
319 }
320
321 public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
322 Stopwatch scanOpenTimer = new Stopwatch();
323 Stopwatch scanTimer = new Stopwatch();
324
325 Scan scan = getScan();
326
327 String jobName = "testSnapshotScanMapReduce";
328
329 Job job = new Job(conf);
330 job.setJobName(jobName);
331
332 job.setJarByClass(getClass());
333
334 TableMapReduceUtil.initTableSnapshotMapperJob(
335 this.snapshotName,
336 scan,
337 MyMapper.class,
338 NullWritable.class,
339 NullWritable.class,
340 job,
341 true,
342 new Path(restoreDir)
343 );
344
345 job.setNumReduceTasks(0);
346 job.setOutputKeyClass(NullWritable.class);
347 job.setOutputValueClass(NullWritable.class);
348 job.setOutputFormatClass(NullOutputFormat.class);
349
350 scanTimer.start();
351 job.waitForCompletion(true);
352 scanTimer.stop();
353
354 Counters counters = job.getCounters();
355 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
356 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
357
358 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
359 double throughput = (double)totalBytes / scanTimer.elapsedTime(TimeUnit.SECONDS);
360 double throughputRows = (double)numRows / scanTimer.elapsedTime(TimeUnit.SECONDS);
361 double throughputCells = (double)numCells / scanTimer.elapsedTime(TimeUnit.SECONDS);
362
363 System.out.println("HBase scan mapreduce: ");
364 System.out.println("total time to open scanner: " + scanOpenTimer.elapsedMillis() + " ms");
365 System.out.println("total time to scan: " + scanTimer.elapsedMillis() + " ms");
366
367 System.out.println("total bytes: " + totalBytes + " bytes ("
368 + StringUtils.humanReadableInt(totalBytes) + ")");
369 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
370 System.out.println("total rows : " + numRows);
371 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
372 System.out.println("total cells : " + numCells);
373 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
374 }
375
376 @Override
377 protected int doWork() throws Exception {
378 if (type.equals("streaming")) {
379 testHdfsStreaming(new Path(file));
380 } else if (type.equals("scan")){
381 testScan();
382 } else if (type.equals("snapshotscan")) {
383 testSnapshotScan();
384 } else if (type.equals("scanmapreduce")) {
385 testScanMapReduce();
386 } else if (type.equals("snapshotscanmapreduce")) {
387 testSnapshotScanMapReduce();
388 }
389 return 0;
390 }
391
392 public static void main (String[] args) throws Exception {
393 int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
394 System.exit(ret);
395 }
396 }