View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  import static org.mockito.Matchers.any;
25  import static org.mockito.Mockito.doAnswer;
26  import static org.mockito.Mockito.mock;
27  import static org.mockito.Mockito.when;
28  
29  import java.io.ByteArrayOutputStream;
30  import java.io.File;
31  import java.io.IOException;
32  import java.io.PrintStream;
33  import java.net.URL;
34  import java.util.ArrayList;
35  import java.util.Arrays;
36  import java.util.List;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.Cell;
44  import org.apache.hadoop.hbase.CellUtil;
45  import org.apache.hadoop.hbase.CategoryBasedTimeout;
46  import org.apache.hadoop.hbase.HBaseTestingUtility;
47  import org.apache.hadoop.hbase.HColumnDescriptor;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.KeyValue;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.client.Delete;
54  import org.apache.hadoop.hbase.client.Durability;
55  import org.apache.hadoop.hbase.client.Get;
56  import org.apache.hadoop.hbase.client.Put;
57  import org.apache.hadoop.hbase.client.Result;
58  import org.apache.hadoop.hbase.client.ResultScanner;
59  import org.apache.hadoop.hbase.client.Scan;
60  import org.apache.hadoop.hbase.client.Table;
61  import org.apache.hadoop.hbase.filter.Filter;
62  import org.apache.hadoop.hbase.filter.FilterBase;
63  import org.apache.hadoop.hbase.filter.PrefixFilter;
64  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
65  import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
66  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
67  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
68  import org.apache.hadoop.hbase.testclassification.MediumTests;
69  import org.apache.hadoop.hbase.util.Bytes;
70  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
71  import org.apache.hadoop.hbase.wal.WAL;
72  import org.apache.hadoop.hbase.wal.WALKey;
73  import org.apache.hadoop.mapreduce.Job;
74  import org.apache.hadoop.mapreduce.Mapper.Context;
75  import org.apache.hadoop.util.GenericOptionsParser;
76  import org.junit.After;
77  import org.junit.AfterClass;
78  import org.junit.Assert;
79  import org.junit.Before;
80  import org.junit.BeforeClass;
81  import org.junit.Rule;
82  import org.junit.Test;
83  import org.junit.experimental.categories.Category;
84  import org.junit.rules.TestRule;
85  import org.mockito.invocation.InvocationOnMock;
86  import org.mockito.stubbing.Answer;
87  
88  /**
89   * Tests the table import and table export MR job functionality
90   */
91  @Category(MediumTests.class)
92  public class TestImportExport {
93    private static final Log LOG = LogFactory.getLog(TestImportExport.class);
94    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
95    private static final byte[] ROW1 = Bytes.toBytes("row1");
96    private static final byte[] ROW2 = Bytes.toBytes("row2");
97    private static final String FAMILYA_STRING = "a";
98    private static final String FAMILYB_STRING = "b";
99    private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
100   private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
101   private static final byte[] QUAL = Bytes.toBytes("q");
102   private static final String OUTPUT_DIR = "outputdir";
103   private static String FQ_OUTPUT_DIR;
104   private static final String EXPORT_BATCH_SIZE = "100";
105 
106   private static long now = System.currentTimeMillis();
107 
108   @Rule
109   public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
110           withLookingForStuckThread(true).build();
111 
112   @BeforeClass
113   public static void beforeClass() throws Exception {
114     // Up the handlers; this test needs more than usual.
115     UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
116     UTIL.setJobWithoutMRCluster();
117     UTIL.startMiniCluster();
118     FQ_OUTPUT_DIR =
119       new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
120   }
121 
122   @AfterClass
123   public static void afterClass() throws Exception {
124     UTIL.shutdownMiniCluster();
125   }
126 
127   @Before
128   @After
129   public void cleanup() throws Exception {
130     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
131     fs.delete(new Path(OUTPUT_DIR), true);
132   }
133 
134   /**
135    * Runs an export job with the specified command line args
136    * @param args
137    * @return true if job completed successfully
138    * @throws IOException
139    * @throws InterruptedException
140    * @throws ClassNotFoundException
141    */
142   boolean runExport(String[] args)
143   throws IOException, InterruptedException, ClassNotFoundException {
144     // need to make a copy of the configuration because to make sure different temp dirs are used.
145     GenericOptionsParser opts =
146       new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
147     Configuration conf = opts.getConfiguration();
148     args = opts.getRemainingArgs();
149     Job job = Export.createSubmittableJob(conf, args);
150     job.waitForCompletion(false);
151     return job.isSuccessful();
152   }
153 
154   /**
155    * Runs an import job with the specified command line args
156    * @param args
157    * @return true if job completed successfully
158    * @throws IOException
159    * @throws InterruptedException
160    * @throws ClassNotFoundException
161    */
162   boolean runImport(String[] args)
163   throws IOException, InterruptedException, ClassNotFoundException {
164     // need to make a copy of the configuration because to make sure different temp dirs are used.
165     GenericOptionsParser opts =
166       new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
167     Configuration conf = opts.getConfiguration();
168     args = opts.getRemainingArgs();
169     Job job = Import.createSubmittableJob(conf, args);
170     job.waitForCompletion(false);
171     return job.isSuccessful();
172   }
173 
174   /**
175    * Test simple replication case with column mapping
176    * @throws Exception
177    */
178   @Test
179   public void testSimpleCase() throws Exception {
180     String EXPORT_TABLE = "exportSimpleCase";
181     try (Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3);) {
182       Put p = new Put(ROW1);
183       p.add(FAMILYA, QUAL, now, QUAL);
184       p.add(FAMILYA, QUAL, now+1, QUAL);
185       p.add(FAMILYA, QUAL, now+2, QUAL);
186       t.put(p);
187       p = new Put(ROW2);
188       p.add(FAMILYA, QUAL, now, QUAL);
189       p.add(FAMILYA, QUAL, now+1, QUAL);
190       p.add(FAMILYA, QUAL, now+2, QUAL);
191       t.put(p);
192     }
193 
194     String[] args = new String[] {
195         EXPORT_TABLE,
196         FQ_OUTPUT_DIR,
197         "1000", // max number of key versions per key to export
198     };
199     assertTrue(runExport(args));
200 
201     String IMPORT_TABLE = "importTableSimpleCase";
202     try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) {
203       args = new String[] {
204         "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
205         IMPORT_TABLE,
206         FQ_OUTPUT_DIR
207       };
208       assertTrue(runImport(args));
209 
210       Get g = new Get(ROW1);
211       g.setMaxVersions();
212       Result r = t.get(g);
213       assertEquals(3, r.size());
214       g = new Get(ROW2);
215       g.setMaxVersions();
216       r = t.get(g);
217       assertEquals(3, r.size());
218     }
219   }
220 
221   /**
222    * Test export hbase:meta table
223    *
224    * @throws Exception
225    */
226   @Test
227   public void testMetaExport() throws Exception {
228     String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
229     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
230     assertTrue(runExport(args));
231   }
232 
233   /**
234    * Test import data from 0.94 exported file
235    * @throws Exception
236    */
237   @Test
238   public void testImport94Table() throws Exception {
239     final String name = "exportedTableIn94Format";
240     URL url = TestImportExport.class.getResource(name);
241     File f = new File(url.toURI());
242     if (!f.exists()) {
243       LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
244       return;
245     }
246     assertTrue(f.exists());
247     LOG.info("FILE=" + f);
248     Path importPath = new Path(f.toURI());
249     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
250     fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name));
251     String IMPORT_TABLE = name;
252     try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) {
253       String[] args = new String[] {
254           "-Dhbase.import.version=0.94" ,
255           IMPORT_TABLE, FQ_OUTPUT_DIR
256       };
257       assertTrue(runImport(args));
258       /* exportedTableIn94Format contains 5 rows
259       ROW         COLUMN+CELL
260       r1          column=f1:c1, timestamp=1383766761171, value=val1
261       r2          column=f1:c1, timestamp=1383766771642, value=val2
262       r3          column=f1:c1, timestamp=1383766777615, value=val3
263       r4          column=f1:c1, timestamp=1383766785146, value=val4
264       r5          column=f1:c1, timestamp=1383766791506, value=val5
265       */
266      assertEquals(5, UTIL.countRows(t));
267     }
268   }
269 
270   /**
271    * Test export scanner batching
272    */
273    @Test
274    public void testExportScannerBatching() throws Exception {
275     String BATCH_TABLE = "exportWithBatch";
276     try (Table t = UTIL.createTable(TableName.valueOf(BATCH_TABLE), FAMILYA, 1);) {
277       Put p = new Put(ROW1);
278       p.add(FAMILYA, QUAL, now, QUAL);
279       p.add(FAMILYA, QUAL, now+1, QUAL);
280       p.add(FAMILYA, QUAL, now+2, QUAL);
281       p.add(FAMILYA, QUAL, now+3, QUAL);
282       p.add(FAMILYA, QUAL, now+4, QUAL);
283       t.put(p);
284 
285       String[] args = new String[] {
286         "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,  // added scanner batching arg.
287         BATCH_TABLE,
288         FQ_OUTPUT_DIR
289       };
290       assertTrue(runExport(args));
291 
292       FileSystem fs = FileSystem.get(UTIL.getConfiguration());
293       fs.delete(new Path(FQ_OUTPUT_DIR), true);
294     }
295   }
296 
297   @Test
298   public void testWithDeletes() throws Exception {
299     String IMPORT_TABLE = "importWithDeletes";
300     String EXPORT_TABLE = "exportWithDeletes";
301     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
302     desc.addFamily(new HColumnDescriptor(FAMILYA)
303         .setMaxVersions(5)
304         .setKeepDeletedCells(true)
305     );
306     UTIL.getHBaseAdmin().createTable(desc);
307     try (Table t = UTIL.getConnection().getTable(TableName.valueOf(EXPORT_TABLE));) {
308       Put p = new Put(ROW1);
309       p.add(FAMILYA, QUAL, now, QUAL);
310       p.add(FAMILYA, QUAL, now+1, QUAL);
311       p.add(FAMILYA, QUAL, now+2, QUAL);
312       p.add(FAMILYA, QUAL, now+3, QUAL);
313       p.add(FAMILYA, QUAL, now+4, QUAL);
314       t.put(p);
315 
316       Delete d = new Delete(ROW1, now+3);
317       t.delete(d);
318       d = new Delete(ROW1);
319       d.deleteColumns(FAMILYA, QUAL, now+2);
320       t.delete(d);
321 
322       String[] args = new String[] {
323         "-D" + Export.RAW_SCAN + "=true",
324         EXPORT_TABLE,
325         FQ_OUTPUT_DIR,
326         "1000", // max number of key versions per key to export
327       };
328       assertTrue(runExport(args));
329 
330       desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
331       desc.addFamily(new HColumnDescriptor(FAMILYA)
332         .setMaxVersions(5)
333         .setKeepDeletedCells(true)
334       );
335     }
336     UTIL.getHBaseAdmin().createTable(desc);
337     try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
338        String [] args = new String[] {
339         IMPORT_TABLE,
340         FQ_OUTPUT_DIR
341        };
342        assertTrue(runImport(args));
343 
344        Scan s = new Scan();
345        s.setMaxVersions();
346        s.setRaw(true);
347        ResultScanner scanner = t.getScanner(s);
348        Result r = scanner.next();
349        Cell[] res = r.rawCells();
350        assertTrue(CellUtil.isDeleteFamily(res[0]));
351        assertEquals(now+4, res[1].getTimestamp());
352        assertEquals(now+3, res[2].getTimestamp());
353        assertTrue(CellUtil.isDelete(res[3]));
354        assertEquals(now+2, res[4].getTimestamp());
355       assertEquals(now+1, res[5].getTimestamp());
356       assertEquals(now, res[6].getTimestamp());
357     }
358   }
359 
360   @Test
361   public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception {
362     String EXPORT_TABLE = "exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
363     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
364     desc.addFamily(new HColumnDescriptor(FAMILYA)
365         .setMaxVersions(5)
366         .setKeepDeletedCells(true)
367     );
368     UTIL.getHBaseAdmin().createTable(desc);
369     Table exportT = UTIL.getConnection().getTable(desc.getTableName());
370       //Add first version of QUAL
371       Put p = new Put(ROW1);
372     p.add(FAMILYA, QUAL, now, QUAL);
373       exportT.put(p);
374 
375       //Add Delete family marker
376       Delete d = new Delete(ROW1, now+3);
377       exportT.delete(d);
378 
379     //Add second version of QUAL
380     p = new Put(ROW1);
381     p.add(FAMILYA, QUAL, now + 5, "s".getBytes());
382     exportT.put(p);
383 
384     //Add second Delete family marker
385     d = new Delete(ROW1, now+7);
386     exportT.delete(d);
387 
388 
389     String[] args = new String[] {
390         "-D" + Export.RAW_SCAN + "=true",
391         EXPORT_TABLE,
392         FQ_OUTPUT_DIR,
393         "1000", // max number of key versions per key to export
394     };
395     assertTrue(runExport(args));
396 
397     String IMPORT_TABLE = "importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
398     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
399     desc.addFamily(new HColumnDescriptor(FAMILYA)
400         .setMaxVersions(5)
401         .setKeepDeletedCells(true)
402     );
403     UTIL.getHBaseAdmin().createTable(desc);
404 
405     Table importT = UTIL.getConnection().getTable(desc.getTableName());
406     args = new String[] {
407         IMPORT_TABLE,
408         FQ_OUTPUT_DIR
409     };
410     assertTrue(runImport(args));
411 
412     Scan s = new Scan();
413     s.setMaxVersions();
414     s.setRaw(true);
415 
416     ResultScanner importedTScanner = importT.getScanner(s);
417     Result importedTResult = importedTScanner.next();
418 
419     ResultScanner exportedTScanner = exportT.getScanner(s);
420     Result  exportedTResult =  exportedTScanner.next();
421     try {
422       Result.compareResults(exportedTResult, importedTResult);
423     } catch (Exception e) {
424       fail("Original and imported tables data comparision failed with error:"+e.getMessage());
425     } finally {
426       exportT.close();
427       importT.close();
428     }
429   }
430 
431   /**
432    * Create a simple table, run an Export Job on it, Import with filtering on,  verify counts,
433    * attempt with invalid values.
434    */
435   @Test
436   public void testWithFilter() throws Exception {
437     // Create simple table to export
438     String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
439     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
440     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
441     UTIL.getHBaseAdmin().createTable(desc);
442     Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
443 
444     Put p1 = new Put(ROW1);
445     p1.add(FAMILYA, QUAL, now, QUAL);
446     p1.add(FAMILYA, QUAL, now + 1, QUAL);
447     p1.add(FAMILYA, QUAL, now + 2, QUAL);
448     p1.add(FAMILYA, QUAL, now + 3, QUAL);
449     p1.add(FAMILYA, QUAL, now + 4, QUAL);
450 
451     // Having another row would actually test the filter.
452     Put p2 = new Put(ROW2);
453     p2.add(FAMILYA, QUAL, now, QUAL);
454 
455     exportTable.put(Arrays.asList(p1, p2));
456 
457     // Export the simple table
458     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
459     assertTrue(runExport(args));
460 
461     // Import to a new table
462     String IMPORT_TABLE = "importWithFilter";
463     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
464     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
465     UTIL.getHBaseAdmin().createTable(desc);
466 
467     Table importTable = UTIL.getConnection().getTable(desc.getTableName());
468     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
469         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE,
470         FQ_OUTPUT_DIR,
471         "1000" };
472     assertTrue(runImport(args));
473 
474     // get the count of the source table for that time range
475     PrefixFilter filter = new PrefixFilter(ROW1);
476     int count = getCount(exportTable, filter);
477 
478     Assert.assertEquals("Unexpected row count between export and import tables", count,
479       getCount(importTable, null));
480 
481     // and then test that a broken command doesn't bork everything - easier here because we don't
482     // need to re-run the export job
483 
484     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
485         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
486         FQ_OUTPUT_DIR, "1000" };
487     assertFalse(runImport(args));
488 
489     // cleanup
490     exportTable.close();
491     importTable.close();
492   }
493 
494   /**
495    * Count the number of keyvalues in the specified table for the given timerange
496    * @param start
497    * @param end
498    * @param table
499    * @return
500    * @throws IOException
501    */
502   private int getCount(Table table, Filter filter) throws IOException {
503     Scan scan = new Scan();
504     scan.setFilter(filter);
505     ResultScanner results = table.getScanner(scan);
506     int count = 0;
507     for (Result res : results) {
508       count += res.size();
509     }
510     results.close();
511     return count;
512   }
513 
514   /**
515    * test main method. Import should print help and call System.exit
516    */
517   @Test
518   public void testImportMain() throws Exception {
519     PrintStream oldPrintStream = System.err;
520     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
521     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
522     System.setSecurityManager(newSecurityManager);
523     ByteArrayOutputStream data = new ByteArrayOutputStream();
524     String[] args = {};
525     System.setErr(new PrintStream(data));
526     try {
527       System.setErr(new PrintStream(data));
528       Import.main(args);
529       fail("should be SecurityException");
530     } catch (SecurityException e) {
531       assertEquals(-1, newSecurityManager.getExitCode());
532       assertTrue(data.toString().contains("Wrong number of arguments:"));
533       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
534       assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
535       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
536       assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
537     } finally {
538       System.setErr(oldPrintStream);
539       System.setSecurityManager(SECURITY_MANAGER);
540     }
541   }
542 
543   /**
544    * test main method. Export should print help and call System.exit
545    */
546   @Test
547   public void testExportMain() throws Exception {
548     PrintStream oldPrintStream = System.err;
549     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
550     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
551     System.setSecurityManager(newSecurityManager);
552     ByteArrayOutputStream data = new ByteArrayOutputStream();
553     String[] args = {};
554     System.setErr(new PrintStream(data));
555     try {
556       System.setErr(new PrintStream(data));
557       Export.main(args);
558       fail("should be SecurityException");
559     } catch (SecurityException e) {
560       assertEquals(-1, newSecurityManager.getExitCode());
561       assertTrue(data.toString().contains("Wrong number of arguments:"));
562       assertTrue(data.toString().contains(
563               "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
564               "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
565       assertTrue(data.toString().contains("-D hbase.mapreduce.scan.column.family=<familyName>"));
566       assertTrue(data.toString().contains("-D hbase.mapreduce.include.deleted.rows=true"));
567       assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
568       assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
569       assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
570       assertTrue(data.toString().contains("-Dhbase.export.scanner.batch=10"));
571     } finally {
572       System.setErr(oldPrintStream);
573       System.setSecurityManager(SECURITY_MANAGER);
574     }
575   }
576 
577   /**
578    * Test map method of Importer
579    */
580   @SuppressWarnings({ "unchecked", "rawtypes" })
581   @Test
582   public void testKeyValueImporter() throws Exception {
583     KeyValueImporter importer = new KeyValueImporter();
584     Configuration configuration = new Configuration();
585     Context ctx = mock(Context.class);
586     when(ctx.getConfiguration()).thenReturn(configuration);
587 
588     doAnswer(new Answer<Void>() {
589 
590       @Override
591       public Void answer(InvocationOnMock invocation) throws Throwable {
592         ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
593         KeyValue key = (KeyValue) invocation.getArguments()[1];
594         assertEquals("Key", Bytes.toString(writer.get()));
595         assertEquals("row", Bytes.toString(key.getRow()));
596         return null;
597       }
598     }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
599 
600     importer.setup(ctx);
601     Result value = mock(Result.class);
602     KeyValue[] keys = {
603         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
604             Bytes.toBytes("value")),
605         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
606             Bytes.toBytes("value1")) };
607     when(value.rawCells()).thenReturn(keys);
608     importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
609 
610   }
611 
612   /**
613    * Test addFilterAndArguments method of Import This method set couple
614    * parameters into Configuration
615    */
616   @Test
617   public void testAddFilterAndArguments() throws IOException {
618     Configuration configuration = new Configuration();
619 
620     List<String> args = new ArrayList<String>();
621     args.add("param1");
622     args.add("param2");
623 
624     Import.addFilterAndArguments(configuration, FilterBase.class, args);
625     assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
626         configuration.get(Import.FILTER_CLASS_CONF_KEY));
627     assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
628   }
629 
630   @Test
631   public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
632     // Create an export table.
633     String exportTableName = "exporttestDurability";
634     try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
635       // Insert some data
636       Put put = new Put(ROW1);
637       put.add(FAMILYA, QUAL, now, QUAL);
638       put.add(FAMILYA, QUAL, now + 1, QUAL);
639       put.add(FAMILYA, QUAL, now + 2, QUAL);
640       exportTable.put(put);
641 
642       put = new Put(ROW2);
643       put.add(FAMILYA, QUAL, now, QUAL);
644       put.add(FAMILYA, QUAL, now + 1, QUAL);
645       put.add(FAMILYA, QUAL, now + 2, QUAL);
646       exportTable.put(put);
647     }
648 
649     // Run the export
650     String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
651     assertTrue(runExport(args));
652 
653     // Create the table for import
654     String importTableName = "importTestDurability1";
655     WAL wal = null;
656     HRegionInfo region = null;
657     TableWALActionListener walListener = null;
658     try (Table importTable =
659       UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);) {
660 
661       // Register the wal listener for the import table
662       walListener = new TableWALActionListener(importTableName);
663       region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
664           .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
665       wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
666       wal.registerWALActionsListener(walListener);
667 
668       // Run the import with SKIP_WAL
669       args =
670           new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
671             importTableName, FQ_OUTPUT_DIR };
672       assertTrue(runImport(args));
673       //Assert that the wal is not visisted
674       assertTrue(!walListener.isWALVisited());
675       //Ensure that the count is 2 (only one version of key value is obtained)
676       assertTrue(getCount(importTable, null) == 2);
677 
678       // Run the import with the default durability option
679     }
680     importTableName = "importTestDurability2";
681     try (Table importTable =
682         UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);) {
683       region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
684         .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
685       wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
686       walListener = new TableWALActionListener(importTableName);
687       wal.registerWALActionsListener(walListener);
688       args = new String[] { importTableName, FQ_OUTPUT_DIR };
689       assertTrue(runImport(args));
690       //Assert that the wal is visisted
691       assertTrue(walListener.isWALVisited());
692       //Ensure that the count is 2 (only one version of key value is obtained)
693       assertTrue(getCount(importTable, null) == 2);
694     }
695   }
696 
697   /**
698    * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, WALKey, WALEdit)} to
699    * identify that an entry is written to the Write Ahead Log for the given table.
700    */
701   private static class TableWALActionListener extends WALActionsListener.Base {
702 
703     private String tableName;
704     private boolean isVisited = false;
705 
706     public TableWALActionListener(String tableName) {
707       this.tableName = tableName;
708     }
709 
710     @Override
711     public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
712       if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
713         isVisited = true;
714       }
715     }
716 
717     public boolean isWALVisited() {
718       return isVisited;
719     }
720   }
721 }