View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.util.List;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FileStatus;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HColumnDescriptor;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.testclassification.MediumTests;
42  import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
43  import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
44  import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
45  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
46  import org.apache.hadoop.hbase.wal.WAL;
47  import org.apache.hadoop.hbase.wal.WALFactory;
48  import org.apache.hadoop.hbase.wal.WALKey;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.mapreduce.InputSplit;
51  import org.apache.hadoop.mapreduce.MapReduceTestUtil;
52  import org.junit.AfterClass;
53  import org.junit.Before;
54  import org.junit.BeforeClass;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  
58  /**
59   * JUnit tests for the WALRecordReader
60   */
61  @Category(MediumTests.class)
62  public class TestWALRecordReader {
63    private static final Log LOG = LogFactory.getLog(TestWALRecordReader.class);
64    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
65    private static Configuration conf;
66    private static FileSystem fs;
67    private static Path hbaseDir;
68    // visible for TestHLogRecordReader
69    static final TableName tableName = TableName.valueOf(getName());
70    private static final byte [] rowName = tableName.getName();
71    // visible for TestHLogRecordReader
72    static final HRegionInfo info = new HRegionInfo(tableName,
73        Bytes.toBytes(""), Bytes.toBytes(""), false);
74    private static final byte [] family = Bytes.toBytes("column");
75    private static final byte [] value = Bytes.toBytes("value");
76    private static HTableDescriptor htd;
77    private static Path logDir;
78    protected MultiVersionConcurrencyControl mvcc;
79  
80    private static String getName() {
81      return "TestWALRecordReader";
82    }
83  
84    @Before
85    public void setUp() throws Exception {
86      mvcc = new MultiVersionConcurrencyControl();
87      FileStatus[] entries = fs.listStatus(hbaseDir);
88      for (FileStatus dir : entries) {
89        fs.delete(dir.getPath(), true);
90      }
91  
92    }
93    @BeforeClass
94    public static void setUpBeforeClass() throws Exception {
95      // Make block sizes small.
96      conf = TEST_UTIL.getConfiguration();
97      conf.setInt("dfs.blocksize", 1024 * 1024);
98      conf.setInt("dfs.replication", 1);
99      TEST_UTIL.startMiniDFSCluster(1);
100 
101     conf = TEST_UTIL.getConfiguration();
102     fs = TEST_UTIL.getDFSCluster().getFileSystem();
103 
104     hbaseDir = TEST_UTIL.createRootDir();
105     
106     logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
107 
108     htd = new HTableDescriptor(tableName);
109     htd.addFamily(new HColumnDescriptor(family));
110   }
111 
112   @AfterClass
113   public static void tearDownAfterClass() throws Exception {
114     TEST_UTIL.shutdownMiniCluster();
115   }
116 
117   /**
118    * Test partial reads from the log based on passed time range
119    * @throws Exception
120    */
121   @Test
122   public void testPartialRead() throws Exception {
123     final WALFactory walfactory = new WALFactory(conf, null, getName());
124     WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
125     // This test depends on timestamp being millisecond based and the filename of the WAL also
126     // being millisecond based.
127     long ts = System.currentTimeMillis();
128     WALEdit edit = new WALEdit();
129     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
130     log.append(htd, info, getWalKey(ts), edit, true);
131     edit = new WALEdit();
132     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
133     log.append(htd, info, getWalKey(ts+1), edit, true);
134     log.sync();
135     LOG.info("Before 1st WAL roll " + log.toString());
136     log.rollWriter();
137     LOG.info("Past 1st WAL roll " + log.toString());
138 
139     Thread.sleep(1);
140     long ts1 = System.currentTimeMillis();
141 
142     edit = new WALEdit();
143     edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
144     log.append(htd, info, getWalKey(ts1+1), edit, true);
145     edit = new WALEdit();
146     edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
147     log.append(htd, info, getWalKey(ts1+2), edit, true);
148     log.sync();
149     log.shutdown();
150     walfactory.shutdown();
151     LOG.info("Closed WAL " + log.toString());
152 
153  
154     WALInputFormat input = new WALInputFormat();
155     Configuration jobConf = new Configuration(conf);
156     jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
157     jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
158 
159     // only 1st file is considered, and only its 1st entry is used
160     List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
161 
162     assertEquals(1, splits.size());
163     testSplit(splits.get(0), Bytes.toBytes("1"));
164 
165     jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
166     jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
167     splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
168     // both files need to be considered
169     assertEquals(2, splits.size());
170     // only the 2nd entry from the 1st file is used
171     testSplit(splits.get(0), Bytes.toBytes("2"));
172     // only the 1nd entry from the 2nd file is used
173     testSplit(splits.get(1), Bytes.toBytes("3"));
174   }
175 
176   /**
177    * Test basic functionality
178    * @throws Exception
179    */
180   @Test
181   public void testWALRecordReader() throws Exception {
182     final WALFactory walfactory = new WALFactory(conf, null, getName());
183     WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
184     byte [] value = Bytes.toBytes("value");
185     final AtomicLong sequenceId = new AtomicLong(0);
186     WALEdit edit = new WALEdit();
187     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
188         System.currentTimeMillis(), value));
189     long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
190     log.sync(txid);
191 
192     Thread.sleep(1); // make sure 2nd log gets a later timestamp
193     long secondTs = System.currentTimeMillis();
194     log.rollWriter();
195 
196     edit = new WALEdit();
197     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
198         System.currentTimeMillis(), value));
199     txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
200     log.sync(txid);
201     log.shutdown();
202     walfactory.shutdown();
203     long thirdTs = System.currentTimeMillis();
204 
205     // should have 2 log files now
206     WALInputFormat input = new WALInputFormat();
207     Configuration jobConf = new Configuration(conf);
208     jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
209 
210     // make sure both logs are found
211     List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
212     assertEquals(2, splits.size());
213 
214     // should return exactly one KV
215     testSplit(splits.get(0), Bytes.toBytes("1"));
216     // same for the 2nd split
217     testSplit(splits.get(1), Bytes.toBytes("2"));
218 
219     // now test basic time ranges:
220 
221     // set an endtime, the 2nd log file can be ignored completely.
222     jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
223     splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
224     assertEquals(1, splits.size());
225     testSplit(splits.get(0), Bytes.toBytes("1"));
226 
227     // now set a start time
228     jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
229     jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
230     splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
231     // both logs need to be considered
232     assertEquals(2, splits.size());
233     // but both readers skip all edits
234     testSplit(splits.get(0));
235     testSplit(splits.get(1));
236   }
237 
238   protected WALKey getWalKey(final long time) {
239     return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
240   }
241 
242   protected WALRecordReader getReader() {
243     return new WALKeyRecordReader();
244   }
245 
246   /**
247    * Create a new reader from the split, and match the edits against the passed columns.
248    */
249   private void testSplit(InputSplit split, byte[]... columns) throws Exception {
250     final WALRecordReader reader = getReader();
251     reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
252 
253     for (byte[] column : columns) {
254       assertTrue(reader.nextKeyValue());
255       Cell cell = reader.getCurrentValue().getCells().get(0);
256       if (!Bytes.equals(column, cell.getQualifier())) {
257         assertTrue("expected [" + Bytes.toString(column) + "], actual ["
258             + Bytes.toString(cell.getQualifier()) + "]", false);
259       }
260     }
261     assertFalse(reader.nextKeyValue());
262     reader.close();
263   }
264 
265 }