View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.IOException;
25  import java.util.Random;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.testclassification.MediumTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Get;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
44  import org.junit.After;
45  import org.junit.Before;
46  import org.junit.BeforeClass;
47  import org.junit.Rule;
48  import org.junit.Test;
49  import org.junit.experimental.categories.Category;
50  import org.junit.rules.TestName;
51  
52  
53  /**
54   * Testing of multiPut in parallel.
55   *
56   */
57  @Category(MediumTests.class)
58  public class TestParallelPut {
59    private static final Log LOG = LogFactory.getLog(TestParallelPut.class);
60    @Rule public TestName name = new TestName(); 
61    
62    private HRegion region = null;
63    private static HBaseTestingUtility HBTU = new HBaseTestingUtility();
64    private static final int THREADS100 = 100;
65  
66    // Test names
67    static byte[] tableName;
68    static final byte[] qual1 = Bytes.toBytes("qual1");
69    static final byte[] qual2 = Bytes.toBytes("qual2");
70    static final byte[] qual3 = Bytes.toBytes("qual3");
71    static final byte[] value1 = Bytes.toBytes("value1");
72    static final byte[] value2 = Bytes.toBytes("value2");
73    static final byte [] row = Bytes.toBytes("rowA");
74    static final byte [] row2 = Bytes.toBytes("rowB");
75  
76    @BeforeClass
77    public static void beforeClass() {
78      // Make sure enough handlers.
79      HBTU.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, THREADS100);
80    }
81  
82  
83    /**
84     * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
85     */
86    @Before
87    public void setUp() throws Exception {
88      tableName = Bytes.toBytes(name.getMethodName());
89    }
90  
91    @After
92    public void tearDown() throws Exception {
93      EnvironmentEdgeManagerTestHelper.reset();
94      if (region != null) region.close(true);
95    }
96    
97    public String getName() {
98      return name.getMethodName();
99    }
100 
101   //////////////////////////////////////////////////////////////////////////////
102   // New tests that don't spin up a mini cluster but rather just test the
103   // individual code pieces in the HRegion. 
104   //////////////////////////////////////////////////////////////////////////////
105 
106   /**
107    * Test one put command.
108    */
109   @Test
110   public void testPut() throws IOException {
111     LOG.info("Starting testPut");
112     this.region = initHRegion(tableName, getName(), fam1);
113 
114     long value = 1L;
115 
116     Put put = new Put(row);
117     put.add(fam1, qual1, Bytes.toBytes(value));
118     region.put(put);
119 
120     assertGet(this.region, row, fam1, qual1, Bytes.toBytes(value));
121   }
122 
123   /**
124    * Test multi-threaded Puts.
125    */
126   @Test
127   public void testParallelPuts() throws IOException {
128 
129     LOG.info("Starting testParallelPuts");
130 
131     this.region = initHRegion(tableName, getName(), fam1);
132     int numOps = 1000; // these many operations per thread
133 
134     // create 100 threads, each will do its own puts
135     Putter[] all = new Putter[THREADS100];
136 
137     // create all threads
138     for (int i = 0; i < THREADS100; i++) {
139       all[i] = new Putter(region, i, numOps);
140     }
141 
142     // run all threads
143     for (int i = 0; i < THREADS100; i++) {
144       all[i].start();
145     }
146 
147     // wait for all threads to finish
148     for (int i = 0; i < THREADS100; i++) {
149       try {
150         all[i].join();
151       } catch (InterruptedException e) {
152         LOG.warn("testParallelPuts encountered InterruptedException." +
153                  " Ignoring....", e);
154       }
155     }
156     LOG.info("testParallelPuts successfully verified " + 
157              (numOps * THREADS100) + " put operations.");
158   }
159 
160 
161   private static void assertGet(final HRegion region, byte [] row, byte [] familiy,
162       byte[] qualifier, byte[] value) throws IOException {
163     // run a get and see if the value matches
164     Get get = new Get(row);
165     get.addColumn(familiy, qualifier);
166     Result result = region.get(get);
167     assertEquals(1, result.size());
168 
169     Cell kv = result.rawCells()[0];
170     byte[] r = CellUtil.cloneValue(kv);
171     assertTrue(Bytes.compareTo(r, value) == 0);
172   }
173 
174   private HRegion initHRegion(byte [] tableName, String callingMethod,
175     byte[] ... families)
176   throws IOException {
177     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
178     for(byte [] family : families) {
179       htd.addFamily(new HColumnDescriptor(family));
180     }
181     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
182     return HBTU.createLocalHRegion(info, htd);
183   }
184 
185   /**
186    * A thread that makes a few put calls
187    */
188   public static class Putter extends Thread {
189 
190     private final HRegion region;
191     private final int threadNumber;
192     private final int numOps;
193     private final Random rand = new Random();
194     byte [] rowkey = null;
195 
196     public Putter(HRegion region, int threadNumber, int numOps) {
197       this.region = region;
198       this.threadNumber = threadNumber;
199       this.numOps = numOps;
200       this.rowkey = Bytes.toBytes((long)threadNumber); // unique rowid per thread
201       setDaemon(true);
202     }
203 
204     @Override
205     public void run() {
206       byte[] value = new byte[100];
207       Put[]  in = new Put[1];
208 
209       // iterate for the specified number of operations
210       for (int i=0; i<numOps; i++) {
211         // generate random bytes
212         rand.nextBytes(value);  
213 
214         // put the randombytes and verify that we can read it. This is one
215         // way of ensuring that rwcc manipulation in HRegion.put() is fine.
216         Put put = new Put(rowkey);
217         put.add(fam1, qual1, value);
218         in[0] = put;
219         try {
220           OperationStatus[] ret = region.batchMutate(in);
221           assertEquals(1, ret.length);
222           assertEquals(OperationStatusCode.SUCCESS, ret[0].getOperationStatusCode());
223           assertGet(this.region, rowkey, fam1, qual1, value);
224         } catch (IOException e) {
225           assertTrue("Thread id " + threadNumber + " operation " + i + " failed.",
226                      false);
227         }
228       }
229     }
230   }
231 }