1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.snapshot;
20
21 import java.io.BufferedInputStream;
22 import java.io.FileNotFoundException;
23 import java.io.DataInput;
24 import java.io.DataOutput;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Random;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.classification.InterfaceStability;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.conf.Configured;
40 import org.apache.hadoop.fs.FSDataInputStream;
41 import org.apache.hadoop.fs.FSDataOutputStream;
42 import org.apache.hadoop.fs.FileChecksum;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.FileUtil;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.fs.permission.FsPermission;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.HBaseConfiguration;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.io.FileLink;
53 import org.apache.hadoop.hbase.io.HFileLink;
54 import org.apache.hadoop.hbase.io.WALLink;
55 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
57 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
58 import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
59 import org.apache.hadoop.hbase.util.FSUtils;
60 import org.apache.hadoop.hbase.util.Pair;
61 import org.apache.hadoop.io.BytesWritable;
62 import org.apache.hadoop.io.IOUtils;
63 import org.apache.hadoop.io.NullWritable;
64 import org.apache.hadoop.io.Writable;
65 import org.apache.hadoop.mapreduce.Job;
66 import org.apache.hadoop.mapreduce.JobContext;
67 import org.apache.hadoop.mapreduce.Mapper;
68 import org.apache.hadoop.mapreduce.InputFormat;
69 import org.apache.hadoop.mapreduce.InputSplit;
70 import org.apache.hadoop.mapreduce.RecordReader;
71 import org.apache.hadoop.mapreduce.TaskAttemptContext;
72 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73 import org.apache.hadoop.mapreduce.security.TokenCache;
74 import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
75 import org.apache.hadoop.util.StringUtils;
76 import org.apache.hadoop.util.Tool;
77 import org.apache.hadoop.util.ToolRunner;
78
79
80
81
82
83
84
85
86 @InterfaceAudience.Public
87 @InterfaceStability.Evolving
88 public class ExportSnapshot extends Configured implements Tool {
89 public static final String NAME = "exportsnapshot";
90
91 public static final String CONF_SOURCE_PREFIX = NAME + ".from.";
92
93 public static final String CONF_DEST_PREFIX = NAME + ".to.";
94
95 private static final Log LOG = LogFactory.getLog(ExportSnapshot.class);
96
97 private static final String MR_NUM_MAPS = "mapreduce.job.maps";
98 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits";
99 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name";
100 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir";
101 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user";
102 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group";
103 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode";
104 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify";
105 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root";
106 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
107 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
108 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
109 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
110 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
111
112 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
113 static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
114
115 private static final String INPUT_FOLDER_PREFIX = "export-files.";
116
117
118 public enum Counter {
119 MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED,
120 BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED
121 }
122
123 private static class ExportMapper extends Mapper<BytesWritable, NullWritable,
124 NullWritable, NullWritable> {
125 final static int REPORT_SIZE = 1 * 1024 * 1024;
126 final static int BUFFER_SIZE = 64 * 1024;
127
128 private boolean testFailures;
129 private Random random;
130
131 private boolean verifyChecksum;
132 private String filesGroup;
133 private String filesUser;
134 private short filesMode;
135 private int bufferSize;
136
137 private FileSystem outputFs;
138 private Path outputArchive;
139 private Path outputRoot;
140
141 private FileSystem inputFs;
142 private Path inputArchive;
143 private Path inputRoot;
144
145 @Override
146 public void setup(Context context) throws IOException {
147 Configuration conf = context.getConfiguration();
148 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
149 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
150
151 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
152
153 filesGroup = conf.get(CONF_FILES_GROUP);
154 filesUser = conf.get(CONF_FILES_USER);
155 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
156 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
157 inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
158
159 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
160 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
161
162 testFailures = conf.getBoolean(CONF_TEST_FAILURE, false);
163
164 try {
165 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
166 inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
167 } catch (IOException e) {
168 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e);
169 }
170
171 try {
172 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
173 outputFs = FileSystem.get(outputRoot.toUri(), destConf);
174 } catch (IOException e) {
175 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e);
176 }
177
178
179 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE);
180 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize);
181 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize));
182
183 for (Counter c : Counter.values()) {
184 context.getCounter(c).increment(0);
185 }
186 }
187
188 @Override
189 protected void cleanup(Context context) {
190 IOUtils.closeStream(inputFs);
191 IOUtils.closeStream(outputFs);
192 }
193
194 @Override
195 public void map(BytesWritable key, NullWritable value, Context context)
196 throws InterruptedException, IOException {
197 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes());
198 Path outputPath = getOutputPath(inputInfo);
199
200 copyFile(context, inputInfo, outputPath);
201 }
202
203
204
205
206 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException {
207 Path path = null;
208 switch (inputInfo.getType()) {
209 case HFILE:
210 Path inputPath = new Path(inputInfo.getHfile());
211 String family = inputPath.getParent().getName();
212 TableName table =HFileLink.getReferencedTableName(inputPath.getName());
213 String region = HFileLink.getReferencedRegionName(inputPath.getName());
214 String hfile = HFileLink.getReferencedHFileName(inputPath.getName());
215 path = new Path(FSUtils.getTableDir(new Path("./"), table),
216 new Path(region, new Path(family, hfile)));
217 break;
218 case WAL:
219 Path oldLogsDir = new Path(outputRoot, HConstants.HREGION_OLDLOGDIR_NAME);
220 path = new Path(oldLogsDir, inputInfo.getWalName());
221 break;
222 default:
223 throw new IOException("Invalid File Type: " + inputInfo.getType().toString());
224 }
225 return new Path(outputArchive, path);
226 }
227
228
229
230
231 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo)
232 throws IOException {
233 if (testFailures) {
234 if (context.getConfiguration().getBoolean(CONF_TEST_RETRY, false)) {
235 if (random == null) {
236 random = new Random();
237 }
238
239
240
241
242 if (random.nextFloat() < 0.03) {
243 throw new IOException("TEST RETRY FAILURE: Unable to copy input=" + inputInfo
244 + " time=" + System.currentTimeMillis());
245 }
246 } else {
247 context.getCounter(Counter.COPY_FAILED).increment(1);
248 throw new IOException("TEST FAILURE: Unable to copy input=" + inputInfo);
249 }
250 }
251 }
252
253 private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
254 final Path outputPath) throws IOException {
255 injectTestFailure(context, inputInfo);
256
257
258 FileStatus inputStat = getSourceFileStatus(context, inputInfo);
259
260
261 if (outputFs.exists(outputPath)) {
262 FileStatus outputStat = outputFs.getFileStatus(outputPath);
263 if (outputStat != null && sameFile(inputStat, outputStat)) {
264 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file.");
265 context.getCounter(Counter.FILES_SKIPPED).increment(1);
266 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen());
267 return;
268 }
269 }
270
271 InputStream in = openSourceFile(context, inputInfo);
272 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
273 if (Integer.MAX_VALUE != bandwidthMB) {
274 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024L * 1024L);
275 }
276
277 try {
278 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
279
280
281 createOutputPath(outputPath.getParent());
282 FSDataOutputStream out = outputFs.create(outputPath, true);
283 try {
284 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen());
285 } finally {
286 out.close();
287 }
288
289
290 if (!preserveAttributes(outputPath, inputStat)) {
291 LOG.warn("You may have to run manually chown on: " + outputPath);
292 }
293 } finally {
294 in.close();
295 }
296 }
297
298
299
300
301 private void createOutputPath(final Path path) throws IOException {
302 if (filesUser == null && filesGroup == null) {
303 outputFs.mkdirs(path);
304 } else {
305 Path parent = path.getParent();
306 if (!outputFs.exists(parent) && !parent.isRoot()) {
307 createOutputPath(parent);
308 }
309 outputFs.mkdirs(path);
310 if (filesUser != null || filesGroup != null) {
311
312 outputFs.setOwner(path, filesUser, filesGroup);
313 }
314 if (filesMode > 0) {
315 outputFs.setPermission(path, new FsPermission(filesMode));
316 }
317 }
318 }
319
320
321
322
323
324
325
326
327
328 private boolean preserveAttributes(final Path path, final FileStatus refStat) {
329 FileStatus stat;
330 try {
331 stat = outputFs.getFileStatus(path);
332 } catch (IOException e) {
333 LOG.warn("Unable to get the status for file=" + path);
334 return false;
335 }
336
337 try {
338 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) {
339 outputFs.setPermission(path, new FsPermission(filesMode));
340 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) {
341 outputFs.setPermission(path, refStat.getPermission());
342 }
343 } catch (IOException e) {
344 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage());
345 return false;
346 }
347
348 boolean hasRefStat = (refStat != null);
349 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner();
350 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup();
351 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) {
352 try {
353 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) {
354 outputFs.setOwner(path, user, group);
355 }
356 } catch (IOException e) {
357 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage());
358 LOG.warn("The user/group may not exist on the destination cluster: user=" +
359 user + " group=" + group);
360 return false;
361 }
362 }
363
364 return true;
365 }
366
367 private boolean stringIsNotEmpty(final String str) {
368 return str != null && str.length() > 0;
369 }
370
371 private void copyData(final Context context,
372 final Path inputPath, final InputStream in,
373 final Path outputPath, final FSDataOutputStream out,
374 final long inputFileSize)
375 throws IOException {
376 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) +
377 " (%.1f%%)";
378
379 try {
380 byte[] buffer = new byte[bufferSize];
381 long totalBytesWritten = 0;
382 int reportBytes = 0;
383 int bytesRead;
384
385 long stime = System.currentTimeMillis();
386 while ((bytesRead = in.read(buffer)) > 0) {
387 out.write(buffer, 0, bytesRead);
388 totalBytesWritten += bytesRead;
389 reportBytes += bytesRead;
390
391 if (reportBytes >= REPORT_SIZE) {
392 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
393 context.setStatus(String.format(statusMessage,
394 StringUtils.humanReadableInt(totalBytesWritten),
395 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
396 " from " + inputPath + " to " + outputPath);
397 reportBytes = 0;
398 }
399 }
400 long etime = System.currentTimeMillis();
401
402 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes);
403 context.setStatus(String.format(statusMessage,
404 StringUtils.humanReadableInt(totalBytesWritten),
405 (totalBytesWritten/(float)inputFileSize) * 100.0f) +
406 " from " + inputPath + " to " + outputPath);
407
408
409 if (totalBytesWritten != inputFileSize) {
410 String msg = "number of bytes copied not matching copied=" + totalBytesWritten +
411 " expected=" + inputFileSize + " for file=" + inputPath;
412 throw new IOException(msg);
413 }
414
415 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath);
416 LOG.info("size=" + totalBytesWritten +
417 " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" +
418 " time=" + StringUtils.formatTimeDiff(etime, stime) +
419 String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0));
420 context.getCounter(Counter.FILES_COPIED).increment(1);
421 } catch (IOException e) {
422 LOG.error("Error copying " + inputPath + " to " + outputPath, e);
423 context.getCounter(Counter.COPY_FAILED).increment(1);
424 throw e;
425 }
426 }
427
428
429
430
431
432
433 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo)
434 throws IOException {
435 try {
436 Configuration conf = context.getConfiguration();
437 FileLink link = null;
438 switch (fileInfo.getType()) {
439 case HFILE:
440 Path inputPath = new Path(fileInfo.getHfile());
441 link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
442 break;
443 case WAL:
444 String serverName = fileInfo.getWalServer();
445 String logName = fileInfo.getWalName();
446 link = new WALLink(inputRoot, serverName, logName);
447 break;
448 default:
449 throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
450 }
451 return link.open(inputFs);
452 } catch (IOException e) {
453 context.getCounter(Counter.MISSING_FILES).increment(1);
454 LOG.error("Unable to open source file=" + fileInfo.toString(), e);
455 throw e;
456 }
457 }
458
459 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo)
460 throws IOException {
461 try {
462 Configuration conf = context.getConfiguration();
463 FileLink link = null;
464 switch (fileInfo.getType()) {
465 case HFILE:
466 Path inputPath = new Path(fileInfo.getHfile());
467 link = HFileLink.buildFromHFileLinkPattern(conf, inputPath);
468 break;
469 case WAL:
470 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName());
471 break;
472 default:
473 throw new IOException("Invalid File Type: " + fileInfo.getType().toString());
474 }
475 return link.getFileStatus(inputFs);
476 } catch (FileNotFoundException e) {
477 context.getCounter(Counter.MISSING_FILES).increment(1);
478 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
479 throw e;
480 } catch (IOException e) {
481 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e);
482 throw e;
483 }
484 }
485
486 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) {
487 try {
488 return fs.getFileChecksum(path);
489 } catch (IOException e) {
490 LOG.warn("Unable to get checksum for file=" + path, e);
491 return null;
492 }
493 }
494
495
496
497
498
499 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) {
500
501 if (inputStat.getLen() != outputStat.getLen()) return false;
502
503
504 if (!verifyChecksum) return true;
505
506
507 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath());
508 if (inChecksum == null) return false;
509
510 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath());
511 if (outChecksum == null) return false;
512
513 return inChecksum.equals(outChecksum);
514 }
515 }
516
517
518
519
520
521
522
523
524
525 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf,
526 final FileSystem fs, final Path snapshotDir) throws IOException {
527 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
528
529 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<Pair<SnapshotFileInfo, Long>>();
530 final TableName table = TableName.valueOf(snapshotDesc.getTable());
531
532
533 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list");
534 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc,
535 new SnapshotReferenceUtil.SnapshotVisitor() {
536 @Override
537 public void storeFile(final HRegionInfo regionInfo, final String family,
538 final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
539 if (storeFile.hasReference()) {
540
541 } else {
542 String region = regionInfo.getEncodedName();
543 String hfile = storeFile.getName();
544 Path path = HFileLink.createPath(table, region, family, hfile);
545
546 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
547 .setType(SnapshotFileInfo.Type.HFILE)
548 .setHfile(path.toString())
549 .build();
550
551 long size;
552 if (storeFile.hasFileSize()) {
553 size = storeFile.getFileSize();
554 } else {
555 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen();
556 }
557 files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
558 }
559 }
560
561 @Override
562 public void logFile (final String server, final String logfile)
563 throws IOException {
564 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
565 .setType(SnapshotFileInfo.Type.WAL)
566 .setWalServer(server)
567 .setWalName(logfile)
568 .build();
569
570 long size = new WALLink(conf, server, logfile).getFileStatus(fs).getLen();
571 files.add(new Pair<SnapshotFileInfo, Long>(fileInfo, size));
572 }
573 });
574
575 return files;
576 }
577
578
579
580
581
582
583
584
585
586 static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
587 final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
588
589 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
590 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
591 long r = a.getSecond() - b.getSecond();
592 return (r < 0) ? -1 : ((r > 0) ? 1 : 0);
593 }
594 });
595
596
597 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups =
598 new LinkedList<List<Pair<SnapshotFileInfo, Long>>>();
599 long[] sizeGroups = new long[ngroups];
600 int hi = files.size() - 1;
601 int lo = 0;
602
603 List<Pair<SnapshotFileInfo, Long>> group;
604 int dir = 1;
605 int g = 0;
606
607 while (hi >= lo) {
608 if (g == fileGroups.size()) {
609 group = new LinkedList<Pair<SnapshotFileInfo, Long>>();
610 fileGroups.add(group);
611 } else {
612 group = fileGroups.get(g);
613 }
614
615 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);
616
617
618 sizeGroups[g] += fileInfo.getSecond();
619 group.add(fileInfo);
620
621
622 g += dir;
623 if (g == ngroups) {
624 dir = -1;
625 g = ngroups - 1;
626 } else if (g < 0) {
627 dir = 1;
628 g = 0;
629 }
630 }
631
632 if (LOG.isDebugEnabled()) {
633 for (int i = 0; i < sizeGroups.length; ++i) {
634 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i]));
635 }
636 }
637
638 return fileGroups;
639 }
640
641 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
642 @Override
643 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
644 TaskAttemptContext tac) throws IOException, InterruptedException {
645 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys());
646 }
647
648 @Override
649 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
650 Configuration conf = context.getConfiguration();
651 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
652 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
653
654 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
655 int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
656 if (mappers == 0 && snapshotFiles.size() > 0) {
657 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
658 mappers = Math.min(mappers, snapshotFiles.size());
659 conf.setInt(CONF_NUM_SPLITS, mappers);
660 conf.setInt(MR_NUM_MAPS, mappers);
661 }
662
663 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
664 List<InputSplit> splits = new ArrayList(groups.size());
665 for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
666 splits.add(new ExportSnapshotInputSplit(files));
667 }
668 return splits;
669 }
670
671 private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
672 private List<Pair<BytesWritable, Long>> files;
673 private long length;
674
675 public ExportSnapshotInputSplit() {
676 this.files = null;
677 }
678
679 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
680 this.files = new ArrayList(snapshotFiles.size());
681 for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) {
682 this.files.add(new Pair<BytesWritable, Long>(
683 new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
684 this.length += fileInfo.getSecond();
685 }
686 }
687
688 private List<Pair<BytesWritable, Long>> getSplitKeys() {
689 return files;
690 }
691
692 @Override
693 public long getLength() throws IOException, InterruptedException {
694 return length;
695 }
696
697 @Override
698 public String[] getLocations() throws IOException, InterruptedException {
699 return new String[] {};
700 }
701
702 @Override
703 public void readFields(DataInput in) throws IOException {
704 int count = in.readInt();
705 files = new ArrayList<Pair<BytesWritable, Long>>(count);
706 length = 0;
707 for (int i = 0; i < count; ++i) {
708 BytesWritable fileInfo = new BytesWritable();
709 fileInfo.readFields(in);
710 long size = in.readLong();
711 files.add(new Pair<BytesWritable, Long>(fileInfo, size));
712 length += size;
713 }
714 }
715
716 @Override
717 public void write(DataOutput out) throws IOException {
718 out.writeInt(files.size());
719 for (final Pair<BytesWritable, Long> fileInfo: files) {
720 fileInfo.getFirst().write(out);
721 out.writeLong(fileInfo.getSecond());
722 }
723 }
724 }
725
726 private static class ExportSnapshotRecordReader
727 extends RecordReader<BytesWritable, NullWritable> {
728 private final List<Pair<BytesWritable, Long>> files;
729 private long totalSize = 0;
730 private long procSize = 0;
731 private int index = -1;
732
733 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) {
734 this.files = files;
735 for (Pair<BytesWritable, Long> fileInfo: files) {
736 totalSize += fileInfo.getSecond();
737 }
738 }
739
740 @Override
741 public void close() { }
742
743 @Override
744 public BytesWritable getCurrentKey() { return files.get(index).getFirst(); }
745
746 @Override
747 public NullWritable getCurrentValue() { return NullWritable.get(); }
748
749 @Override
750 public float getProgress() { return (float)procSize / totalSize; }
751
752 @Override
753 public void initialize(InputSplit split, TaskAttemptContext tac) { }
754
755 @Override
756 public boolean nextKeyValue() {
757 if (index >= 0) {
758 procSize += files.get(index).getSecond();
759 }
760 return(++index < files.size());
761 }
762 }
763 }
764
765
766
767
768
769
770
771
772 private void runCopyJob(final Path inputRoot, final Path outputRoot,
773 final String snapshotName, final Path snapshotDir, final boolean verifyChecksum,
774 final String filesUser, final String filesGroup, final int filesMode,
775 final int mappers, final int bandwidthMB)
776 throws IOException, InterruptedException, ClassNotFoundException {
777 Configuration conf = getConf();
778 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
779 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
780 if (mappers > 0) {
781 conf.setInt(CONF_NUM_SPLITS, mappers);
782 conf.setInt(MR_NUM_MAPS, mappers);
783 }
784 conf.setInt(CONF_FILES_MODE, filesMode);
785 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum);
786 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
787 conf.set(CONF_INPUT_ROOT, inputRoot.toString());
788 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
789 conf.set(CONF_SNAPSHOT_NAME, snapshotName);
790 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
791
792 Job job = new Job(conf);
793 job.setJobName("ExportSnapshot-" + snapshotName);
794 job.setJarByClass(ExportSnapshot.class);
795 TableMapReduceUtil.addDependencyJars(job);
796 job.setMapperClass(ExportMapper.class);
797 job.setInputFormatClass(ExportSnapshotInputFormat.class);
798 job.setOutputFormatClass(NullOutputFormat.class);
799 job.setMapSpeculativeExecution(false);
800 job.setNumReduceTasks(0);
801
802
803 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
804 TokenCache.obtainTokensForNamenodes(job.getCredentials(),
805 new Path[] { inputRoot }, srcConf);
806 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
807 TokenCache.obtainTokensForNamenodes(job.getCredentials(),
808 new Path[] { outputRoot }, destConf);
809
810
811 if (!job.waitForCompletion(true)) {
812
813
814 throw new ExportSnapshotException("Copy Files Map-Reduce Job failed");
815 }
816 }
817
818 private void verifySnapshot(final Configuration baseConf,
819 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException {
820
821 Configuration conf = new Configuration(baseConf);
822 FSUtils.setRootDir(conf, rootDir);
823 FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf));
824 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
825 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc);
826 }
827
828
829
830
831 private void setOwner(final FileSystem fs, final Path path, final String user,
832 final String group, final boolean recursive) throws IOException {
833 if (user != null || group != null) {
834 if (recursive && fs.isDirectory(path)) {
835 for (FileStatus child : fs.listStatus(path)) {
836 setOwner(fs, child.getPath(), user, group, recursive);
837 }
838 }
839 fs.setOwner(path, user, group);
840 }
841 }
842
843
844
845
846 private void setPermission(final FileSystem fs, final Path path, final short filesMode,
847 final boolean recursive) throws IOException {
848 if (filesMode > 0) {
849 FsPermission perm = new FsPermission(filesMode);
850 if (recursive && fs.isDirectory(path)) {
851 for (FileStatus child : fs.listStatus(path)) {
852 setPermission(fs, child.getPath(), filesMode, recursive);
853 }
854 }
855 fs.setPermission(path, perm);
856 }
857 }
858
859
860
861
862
863 @Override
864 public int run(String[] args) throws IOException {
865 boolean verifyTarget = true;
866 boolean verifyChecksum = true;
867 String snapshotName = null;
868 String targetName = null;
869 boolean overwrite = false;
870 String filesGroup = null;
871 String filesUser = null;
872 Path outputRoot = null;
873 int bandwidthMB = Integer.MAX_VALUE;
874 int filesMode = 0;
875 int mappers = 0;
876
877 Configuration conf = getConf();
878 Path inputRoot = FSUtils.getRootDir(conf);
879
880
881 for (int i = 0; i < args.length; i++) {
882 String cmd = args[i];
883 if (cmd.equals("-snapshot")) {
884 snapshotName = args[++i];
885 } else if (cmd.equals("-target")) {
886 targetName = args[++i];
887 } else if (cmd.equals("-copy-to")) {
888 outputRoot = new Path(args[++i]);
889 } else if (cmd.equals("-copy-from")) {
890 inputRoot = new Path(args[++i]);
891 FSUtils.setRootDir(conf, inputRoot);
892 } else if (cmd.equals("-no-checksum-verify")) {
893 verifyChecksum = false;
894 } else if (cmd.equals("-no-target-verify")) {
895 verifyTarget = false;
896 } else if (cmd.equals("-mappers")) {
897 mappers = Integer.parseInt(args[++i]);
898 } else if (cmd.equals("-chuser")) {
899 filesUser = args[++i];
900 } else if (cmd.equals("-chgroup")) {
901 filesGroup = args[++i];
902 } else if (cmd.equals("-bandwidth")) {
903 bandwidthMB = Integer.parseInt(args[++i]);
904 } else if (cmd.equals("-chmod")) {
905 filesMode = Integer.parseInt(args[++i], 8);
906 } else if (cmd.equals("-overwrite")) {
907 overwrite = true;
908 } else if (cmd.equals("-h") || cmd.equals("--help")) {
909 printUsageAndExit();
910 } else {
911 System.err.println("UNEXPECTED: " + cmd);
912 printUsageAndExit();
913 }
914 }
915
916
917 if (snapshotName == null) {
918 System.err.println("Snapshot name not provided.");
919 printUsageAndExit();
920 }
921
922 if (outputRoot == null) {
923 System.err.println("Destination file-system not provided.");
924 printUsageAndExit();
925 }
926
927 if (targetName == null) {
928 targetName = snapshotName;
929 }
930
931 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX);
932 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true);
933 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf);
934 LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot);
935 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX);
936 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true);
937 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf);
938 LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString());
939
940 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false);
941
942 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot);
943 Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot);
944 Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot);
945 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir;
946
947
948 if (outputFs.exists(outputSnapshotDir)) {
949 if (overwrite) {
950 if (!outputFs.delete(outputSnapshotDir, true)) {
951 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir);
952 return 1;
953 }
954 } else {
955 System.err.println("The snapshot '" + targetName +
956 "' already exists in the destination: " + outputSnapshotDir);
957 return 1;
958 }
959 }
960
961 if (!skipTmp) {
962
963 if (outputFs.exists(snapshotTmpDir)) {
964 if (overwrite) {
965 if (!outputFs.delete(snapshotTmpDir, true)) {
966 System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir);
967 return 1;
968 }
969 } else {
970 System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress");
971 System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, ");
972 System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option");
973 return 1;
974 }
975 }
976 }
977
978
979
980
981 try {
982 LOG.info("Copy Snapshot Manifest");
983 FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf);
984 if (filesUser != null || filesGroup != null) {
985 setOwner(outputFs, snapshotTmpDir, filesUser, filesGroup, true);
986 }
987 if (filesMode > 0) {
988 setPermission(outputFs, snapshotTmpDir, (short)filesMode, true);
989 }
990 } catch (IOException e) {
991 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" +
992 snapshotDir + " to=" + initialOutputSnapshotDir, e);
993 }
994
995
996 if (!targetName.equals(snapshotName)) {
997 SnapshotDescription snapshotDesc =
998 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir)
999 .toBuilder()
1000 .setName(targetName)
1001 .build();
1002 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, snapshotTmpDir, outputFs);
1003 }
1004
1005
1006
1007
1008 try {
1009 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum,
1010 filesUser, filesGroup, filesMode, mappers, bandwidthMB);
1011
1012 LOG.info("Finalize the Snapshot Export");
1013 if (!skipTmp) {
1014
1015 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) {
1016 throw new ExportSnapshotException("Unable to rename snapshot directory from=" +
1017 snapshotTmpDir + " to=" + outputSnapshotDir);
1018 }
1019 }
1020
1021
1022 if (verifyTarget) {
1023 LOG.info("Verify snapshot integrity");
1024 verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir);
1025 }
1026
1027 LOG.info("Export Completed: " + targetName);
1028 return 0;
1029 } catch (Exception e) {
1030 LOG.error("Snapshot export failed", e);
1031 if (!skipTmp) {
1032 outputFs.delete(snapshotTmpDir, true);
1033 }
1034 outputFs.delete(outputSnapshotDir, true);
1035 return 1;
1036 } finally {
1037 IOUtils.closeStream(inputFs);
1038 IOUtils.closeStream(outputFs);
1039 }
1040 }
1041
1042
1043 private void printUsageAndExit() {
1044 System.err.printf("Usage: bin/hbase %s [options]%n", getClass().getName());
1045 System.err.println(" where [options] are:");
1046 System.err.println(" -h|-help Show this help and exit.");
1047 System.err.println(" -snapshot NAME Snapshot to restore.");
1048 System.err.println(" -copy-to NAME Remote destination hdfs://");
1049 System.err.println(" -copy-from NAME Input folder hdfs:// (default hbase.rootdir)");
1050 System.err.println(" -no-checksum-verify Do not verify checksum, use name+length only.");
1051 System.err.println(" -no-target-verify Do not verify the integrity of the \\" +
1052 "exported snapshot.");
1053 System.err.println(" -overwrite Rewrite the snapshot manifest if already exists");
1054 System.err.println(" -chuser USERNAME Change the owner of the files " +
1055 "to the specified one.");
1056 System.err.println(" -chgroup GROUP Change the group of the files to " +
1057 "the specified one.");
1058 System.err.println(" -chmod MODE Change the permission of the files " +
1059 "to the specified one.");
1060 System.err.println(" -mappers Number of mappers to use during the " +
1061 "copy (mapreduce.job.maps).");
1062 System.err.println(" -bandwidth Limit bandwidth to this value in MB/second.");
1063 System.err.println();
1064 System.err.println("Examples:");
1065 System.err.println(" hbase " + getClass().getName() + " \\");
1066 System.err.println(" -snapshot MySnapshot -copy-to hdfs://srv2:8082/hbase \\");
1067 System.err.println(" -chuser MyUser -chgroup MyGroup -chmod 700 -mappers 16");
1068 System.err.println();
1069 System.err.println(" hbase " + getClass().getName() + " \\");
1070 System.err.println(" -snapshot MySnapshot -copy-from hdfs://srv2:8082/hbase \\");
1071 System.err.println(" -copy-to hdfs://srv1:50070/hbase \\");
1072 System.exit(1);
1073 }
1074
1075
1076
1077
1078
1079
1080
1081
1082 static int innerMain(final Configuration conf, final String [] args) throws Exception {
1083 return ToolRunner.run(conf, new ExportSnapshot(), args);
1084 }
1085
1086 public static void main(String[] args) throws Exception {
1087 System.exit(innerMain(HBaseConfiguration.create(), args));
1088 }
1089 }