View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.util.HashMap;
23  import java.util.Map;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.fs.FileStatus;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
35  import org.apache.hadoop.hbase.testclassification.LargeTests;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.io.MapFile;
38  import org.junit.AfterClass;
39  import org.junit.Assert;
40  import org.junit.BeforeClass;
41  import org.junit.Test;
42  import org.junit.experimental.categories.Category;
43  
44  import com.google.common.collect.ImmutableMap;
45  import com.google.common.collect.Maps;
46  
47  /**
48   * Basic test for the HashTable M/R tool
49   */
50  @Category(LargeTests.class)
51  public class TestHashTable {
52    
53    private static final Log LOG = LogFactory.getLog(TestHashTable.class);
54    
55    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();  
56    
57    @BeforeClass
58    public static void beforeClass() throws Exception {
59      TEST_UTIL.setJobWithoutMRCluster();
60      TEST_UTIL.startMiniCluster(3);
61    }
62    
63    @AfterClass
64    public static void afterClass() throws Exception {
65      TEST_UTIL.shutdownMiniCluster();
66    }
67    
68    @Test
69    public void testHashTable() throws Exception {
70      final String tableName = "testHashTable";
71      final byte[] family = Bytes.toBytes("family");
72      final byte[] column1 = Bytes.toBytes("c1");
73      final byte[] column2 = Bytes.toBytes("c2");
74      final byte[] column3 = Bytes.toBytes("c3");
75      
76      int numRows = 100;
77      int numRegions = 10;
78      int numHashFiles = 3;
79      
80      byte[][] splitRows = new byte[numRegions-1][];
81      for (int i = 1; i < numRegions; i++) {
82        splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
83      }
84      
85      long timestamp = 1430764183454L;
86      // put rows into the first table
87      HTable t1 = TEST_UTIL.createTable(TableName.valueOf(tableName), family, splitRows);
88      for (int i = 0; i < numRows; i++) {
89        Put p = new Put(Bytes.toBytes(i), timestamp);
90        p.addColumn(family, column1, column1);
91        p.addColumn(family, column2, column2);
92        p.addColumn(family, column3, column3);
93        t1.put(p);
94      }
95      t1.close();
96      
97      HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
98      
99      Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName);
100     
101     long batchSize = 300;
102     int code = hashTable.run(new String[] { 
103         "--batchsize=" + batchSize,
104         "--numhashfiles=" + numHashFiles,
105         "--scanbatch=2",
106         tableName,
107         testDir.toString()});
108     assertEquals("test job failed", 0, code);
109     
110     FileSystem fs = TEST_UTIL.getTestFileSystem();
111     
112     HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
113     assertEquals(tableName, tableHash.tableName);
114     assertEquals(batchSize, tableHash.batchSize);
115     assertEquals(numHashFiles, tableHash.numHashFiles);
116     assertEquals(numHashFiles - 1, tableHash.partitions.size());
117     for (ImmutableBytesWritable bytes : tableHash.partitions) {
118       LOG.debug("partition: " + Bytes.toInt(bytes.get()));
119     }
120     
121     ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes
122       = ImmutableMap.<Integer, ImmutableBytesWritable>builder()
123       .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f")))
124       .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96")))
125       .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa")))
126       .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881")))
127       .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352")))
128       .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93")))
129       .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666")))
130       .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090")))
131       .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3")))
132       .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb")))
133       .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc")))
134       .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4")))
135       .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b")))
136       .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59")))
137       .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f")))
138       .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56")))
139       .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095")))
140       .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91")))
141       .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38")))
142       .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56")))
143       .build();
144   
145     Map<Integer, ImmutableBytesWritable> actualHashes
146       = new HashMap<Integer, ImmutableBytesWritable>();
147     Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
148     for (int i = 0; i < numHashFiles; i++) {
149       Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
150       
151       MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf());
152       ImmutableBytesWritable key = new ImmutableBytesWritable();
153       ImmutableBytesWritable hash = new ImmutableBytesWritable();
154       while(reader.next(key, hash)) {
155         String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength());
156         LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16))
157             + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength()));
158         
159         int intKey = -1;
160         if (key.getLength() > 0) {
161           intKey = Bytes.toInt(key.get(),  key.getOffset(), key.getLength());
162         }
163         if (actualHashes.containsKey(intKey)) {
164           Assert.fail("duplicate key in data files: " + intKey);
165         }
166         actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes()));
167       }
168       reader.close();
169     }
170     
171     FileStatus[] files = fs.listStatus(testDir);
172     for (FileStatus file : files) {
173       LOG.debug("Output file: " + file.getPath());
174     }
175     
176     files = fs.listStatus(dataDir);
177     for (FileStatus file : files) {
178       LOG.debug("Data file: " + file.getPath());
179     }
180     
181     if (!expectedHashes.equals(actualHashes)) {
182       LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
183     }
184     Assert.assertEquals(expectedHashes, actualHashes);
185     
186     TEST_UTIL.deleteTable(tableName);
187     TEST_UTIL.cleanupDataTestDirOnTestFS();
188   }
189   
190 
191 }