View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  import static org.mockito.Matchers.any;
24  import static org.mockito.Mockito.doAnswer;
25  import static org.mockito.Mockito.mock;
26  import static org.mockito.Mockito.when;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.PrintStream;
30  import java.util.ArrayList;
31  
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.CellUtil;
36  import org.apache.hadoop.hbase.HBaseTestingUtility;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.testclassification.LargeTests;
40  import org.apache.hadoop.hbase.MiniHBaseCluster;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.client.Delete;
43  import org.apache.hadoop.hbase.client.Get;
44  import org.apache.hadoop.hbase.client.Put;
45  import org.apache.hadoop.hbase.client.Result;
46  import org.apache.hadoop.hbase.client.Table;
47  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
48  import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
49  import org.apache.hadoop.hbase.wal.WAL;
50  import org.apache.hadoop.hbase.wal.WALKey;
51  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
54  import org.apache.hadoop.mapreduce.Mapper;
55  import org.apache.hadoop.mapreduce.Mapper.Context;
56  import org.junit.AfterClass;
57  import org.junit.BeforeClass;
58  import org.junit.Test;
59  import org.junit.experimental.categories.Category;
60  import org.mockito.invocation.InvocationOnMock;
61  import org.mockito.stubbing.Answer;
62  
63  /**
64   * Basic test for the WALPlayer M/R tool
65   */
66  @Category(LargeTests.class)
67  public class TestWALPlayer {
68    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
69    private static MiniHBaseCluster cluster;
70  
71    @BeforeClass
72    public static void beforeClass() throws Exception {
73      TEST_UTIL.setJobWithoutMRCluster();
74      cluster = TEST_UTIL.startMiniCluster();
75    }
76  
77    @AfterClass
78    public static void afterClass() throws Exception {
79      TEST_UTIL.shutdownMiniCluster();
80    }
81  
82    /**
83     * Simple end-to-end test
84     * @throws Exception
85     */
86    @Test
87    public void testWALPlayer() throws Exception {
88      final TableName TABLENAME1 = TableName.valueOf("testWALPlayer1");
89      final TableName TABLENAME2 = TableName.valueOf("testWALPlayer2");
90      final byte[] FAMILY = Bytes.toBytes("family");
91      final byte[] COLUMN1 = Bytes.toBytes("c1");
92      final byte[] COLUMN2 = Bytes.toBytes("c2");
93      final byte[] ROW = Bytes.toBytes("row");
94      Table t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
95      Table t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
96  
97      // put a row into the first table
98      Put p = new Put(ROW);
99      p.add(FAMILY, COLUMN1, COLUMN1);
100     p.add(FAMILY, COLUMN2, COLUMN2);
101     t1.put(p);
102     // delete one column
103     Delete d = new Delete(ROW);
104     d.deleteColumns(FAMILY, COLUMN1);
105     t1.delete(d);
106 
107     // replay the WAL, map table 1 to table 2
108     WAL log = cluster.getRegionServer(0).getWAL(null);
109     log.rollWriter();
110     String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
111         .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
112 
113     Configuration configuration= TEST_UTIL.getConfiguration();
114     WALPlayer player = new WALPlayer(configuration);
115     String optionName="_test_.name";
116     configuration.set(optionName, "1000");
117     player.setupTime(configuration, optionName);
118     assertEquals(1000,configuration.getLong(optionName,0));
119     assertEquals(0, player.run(new String[] {walInputDir, TABLENAME1.getNameAsString(),
120         TABLENAME2.getNameAsString() }));
121 
122     
123     // verify the WAL was player into table 2
124     Get g = new Get(ROW);
125     Result r = t2.get(g);
126     assertEquals(1, r.size());
127     assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
128   }
129 
130   /**
131    * Test WALKeyValueMapper setup and map
132    */
133   @Test
134   public void testWALKeyValueMapper() throws Exception {
135     testWALKeyValueMapper(WALPlayer.TABLES_KEY);
136   }
137 
138   @Test
139   public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception {
140     testWALKeyValueMapper("hlog.input.tables");
141   }
142 
143   private void testWALKeyValueMapper(final String tableConfigKey) throws Exception {
144     Configuration configuration = new Configuration();
145     configuration.set(tableConfigKey, "table");
146     WALKeyValueMapper mapper = new WALKeyValueMapper();
147     WALKey key = mock(WALKey.class);
148     when(key.getTablename()).thenReturn(TableName.valueOf("table"));
149     @SuppressWarnings("unchecked")
150     Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
151         mock(Context.class);
152     when(context.getConfiguration()).thenReturn(configuration);
153 
154     WALEdit value = mock(WALEdit.class);
155     ArrayList<Cell> values = new ArrayList<Cell>();
156     KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("q"));
157     values.add(kv1);
158     when(value.getCells()).thenReturn(values);
159     mapper.setup(context);
160 
161     doAnswer(new Answer<Void>() {
162 
163       @Override
164       public Void answer(InvocationOnMock invocation) throws Throwable {
165         ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
166         KeyValue key = (KeyValue) invocation.getArguments()[1];
167         assertEquals("row", Bytes.toString(writer.get()));
168         assertEquals("row", Bytes.toString(key.getRow()));
169         return null;
170       }
171     }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
172 
173     mapper.map(key, value, context);
174 
175   }
176 
177   /**
178    * Test main method
179    */
180   @Test
181   public void testMainMethod() throws Exception {
182 
183     PrintStream oldPrintStream = System.err;
184     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
185     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
186     System.setSecurityManager(newSecurityManager);
187     ByteArrayOutputStream data = new ByteArrayOutputStream();
188     String[] args = {};
189     System.setErr(new PrintStream(data));
190     try {
191       System.setErr(new PrintStream(data));
192       try {
193         WALPlayer.main(args);
194         fail("should be SecurityException");
195       } catch (SecurityException e) {
196         assertEquals(-1, newSecurityManager.getExitCode());
197         assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
198         assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
199             " <tables> [<tableMappings>]"));
200         assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output"));
201       }
202 
203     } finally {
204       System.setErr(oldPrintStream);
205       System.setSecurityManager(SECURITY_MANAGER);
206     }
207 
208   }
209 
210 }