View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  
24  import java.io.IOException;
25  import java.util.Iterator;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.CategoryBasedTimeout;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellUtil;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.client.HTable;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.client.Result;
40  import org.apache.hadoop.hbase.client.ResultScanner;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.client.Table;
43  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.junit.AfterClass;
46  import org.junit.BeforeClass;
47  import org.junit.Rule;
48  import org.junit.Test;
49  import org.junit.rules.TestRule;
50  
51  /**
52   * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing
53   * on our tables is simple - take every row in the table, reverse the value of a particular cell,
54   * and write it back to the table. Implements common components between mapred and mapreduce
55   * implementations.
56   */
57  public abstract class TestTableMapReduceBase {
58    @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
59        withTimeout(this.getClass()).withLookingForStuckThread(true).build();
60    protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
61    protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
62    protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
63    protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
64  
65    protected static final byte[][] columns = new byte[][] {
66      INPUT_FAMILY,
67      OUTPUT_FAMILY
68    };
69  
70    /**
71     * Retrieve my logger instance.
72     */
73    protected abstract Log getLog();
74  
75    /**
76     * Handles API-specifics for setting up and executing the job.
77     */
78    protected abstract void runTestOnTable(HTable table) throws IOException;
79  
80    @BeforeClass
81    public static void beforeClass() throws Exception {
82      UTIL.setJobWithoutMRCluster();
83      UTIL.startMiniCluster();
84      HTable table =
85          UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
86              OUTPUT_FAMILY });
87      UTIL.loadTable(table, INPUT_FAMILY, false);
88    }
89  
90    @AfterClass
91    public static void afterClass() throws Exception {
92      UTIL.shutdownMiniCluster();
93    }
94  
95    /**
96     * Test a map/reduce against a multi-region table
97     * @throws IOException
98     */
99    @Test
100   public void testMultiRegionTable() throws IOException {
101     runTestOnTable(new HTable(UTIL.getConfiguration(), MULTI_REGION_TABLE_NAME));
102   }
103 
104   @Test
105   public void testCombiner() throws IOException {
106     Configuration conf = new Configuration(UTIL.getConfiguration());
107     // force use of combiner for testing purposes
108     conf.setInt("mapreduce.map.combine.minspills", 1);
109     runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
110   }
111 
112   /**
113    * Implements mapper logic for use across APIs.
114    */
115   protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
116     if (value.size() != 1) {
117       throw new IOException("There should only be one input column");
118     }
119     Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
120       cf = value.getMap();
121     if(!cf.containsKey(INPUT_FAMILY)) {
122       throw new IOException("Wrong input columns. Missing: '" +
123         Bytes.toString(INPUT_FAMILY) + "'.");
124     }
125 
126     // Get the original value and reverse it
127 
128     String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
129     StringBuilder newValue = new StringBuilder(originalValue);
130     newValue.reverse();
131 
132     // Now set the value to be collected
133 
134     Put outval = new Put(key.get());
135     outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
136     return outval;
137   }
138 
139   protected void verify(TableName tableName) throws IOException {
140     Table table = new HTable(UTIL.getConfiguration(), tableName);
141     boolean verified = false;
142     long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
143     int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
144     for (int i = 0; i < numRetries; i++) {
145       try {
146         getLog().info("Verification attempt #" + i);
147         verifyAttempt(table);
148         verified = true;
149         break;
150       } catch (NullPointerException e) {
151         // If here, a cell was empty. Presume its because updates came in
152         // after the scanner had been opened. Wait a while and retry.
153         getLog().debug("Verification attempt failed: " + e.getMessage());
154       }
155       try {
156         Thread.sleep(pause);
157       } catch (InterruptedException e) {
158         // continue
159       }
160     }
161     assertTrue(verified);
162   }
163 
164   /**
165    * Looks at every value of the mapreduce output and verifies that indeed
166    * the values have been reversed.
167    * @param table Table to scan.
168    * @throws IOException
169    * @throws NullPointerException if we failed to find a cell value
170    */
171   private void verifyAttempt(final Table table) throws IOException, NullPointerException {
172     Scan scan = new Scan();
173     TableInputFormat.addColumns(scan, columns);
174     ResultScanner scanner = table.getScanner(scan);
175     try {
176       Iterator<Result> itr = scanner.iterator();
177       assertTrue(itr.hasNext());
178       while(itr.hasNext()) {
179         Result r = itr.next();
180         if (getLog().isDebugEnabled()) {
181           if (r.size() > 2 ) {
182             throw new IOException("Too many results, expected 2 got " +
183               r.size());
184           }
185         }
186         byte[] firstValue = null;
187         byte[] secondValue = null;
188         int count = 0;
189          for(Cell kv : r.listCells()) {
190           if (count == 0) {
191             firstValue = CellUtil.cloneValue(kv);
192           }
193           if (count == 1) {
194             secondValue = CellUtil.cloneValue(kv);
195           }
196           count++;
197           if (count == 2) {
198             break;
199           }
200         }
201 
202 
203         if (firstValue == null) {
204           throw new NullPointerException(Bytes.toString(r.getRow()) +
205             ": first value is null");
206         }
207         String first = Bytes.toString(firstValue);
208 
209         if (secondValue == null) {
210           throw new NullPointerException(Bytes.toString(r.getRow()) +
211             ": second value is null");
212         }
213         byte[] secondReversed = new byte[secondValue.length];
214         for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
215           secondReversed[i] = secondValue[j];
216         }
217         String second = Bytes.toString(secondReversed);
218 
219         if (first.compareTo(second) != 0) {
220           if (getLog().isDebugEnabled()) {
221             getLog().debug("second key is not the reverse of first. row=" +
222                 Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
223                 ", second value=" + second);
224           }
225           fail();
226         }
227       }
228     } finally {
229       scanner.close();
230     }
231   }
232 }