View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.junit.Assert.assertTrue;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.commons.logging.impl.Log4JLogger;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HBaseTestingUtility;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.testclassification.MediumTests;
31  import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.CoordinatedStateManager;
34  import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
35  import org.apache.hadoop.hbase.ipc.RpcServer;
36  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
37  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
38  import org.apache.hadoop.hbase.regionserver.HRegionServer;
39  import org.apache.hadoop.hbase.regionserver.RSRpcServices;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.log4j.Level;
42  import org.junit.AfterClass;
43  import org.junit.BeforeClass;
44  import org.junit.Test;
45  import org.junit.experimental.categories.Category;
46  
47  import com.google.protobuf.RpcController;
48  import com.google.protobuf.ServiceException;
49  
50  /**
51   * Test the scenario where a HRegionServer#scan() call, while scanning, timeout at client side and
52   * getting retried. This scenario should not result in some data being skipped at RS side.
53   */
54  @Category(MediumTests.class)
55  public class TestClientScannerRPCTimeout {
56    private static final Log LOG = LogFactory.getLog(TestClientScannerRPCTimeout.class);
57    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
58    private static final byte[] FAMILY = Bytes.toBytes("testFamily");
59    private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
60    private static final byte[] VALUE = Bytes.toBytes("testValue");
61    private static final int rpcTimeout = 2 * 1000;
62    private static final int CLIENT_RETRIES_NUMBER = 3;
63  
64    @BeforeClass
65    public static void setUpBeforeClass() throws Exception {
66      ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
67      ((Log4JLogger)AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
68      ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
69      Configuration conf = TEST_UTIL.getConfiguration();
70      // Don't report so often so easier to see other rpcs
71      conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
72      conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
73      conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
74      conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
75      conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
76      TEST_UTIL.startMiniCluster(1);
77    }
78  
79    @AfterClass
80    public static void tearDownAfterClass() throws Exception {
81      TEST_UTIL.shutdownMiniCluster();
82    }
83  
84    @Test
85    public void testScannerNextRPCTimesout() throws Exception {
86      final TableName TABLE_NAME = TableName.valueOf("testScannerNextRPCTimesout");
87      Table ht = TEST_UTIL.createTable(TABLE_NAME, FAMILY);
88      byte[] r1 = Bytes.toBytes("row-1");
89      byte[] r2 = Bytes.toBytes("row-2");
90      byte[] r3 = Bytes.toBytes("row-3");
91      putToTable(ht, r1);
92      putToTable(ht, r2);
93      putToTable(ht, r3);
94      LOG.info("Wrote our three values");
95      RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
96      Scan scan = new Scan();
97      scan.setCaching(1);
98      ResultScanner scanner = ht.getScanner(scan);
99      Result result = scanner.next();
100     assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow()));
101     LOG.info("Got expected first row");
102     long t1 = System.currentTimeMillis();
103     result = scanner.next();
104     assertTrue((System.currentTimeMillis() - t1) > rpcTimeout);
105     assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow()));
106     RSRpcServicesWithScanTimeout.seqNoToSleepOn = -1;// No need of sleep
107     result = scanner.next();
108     assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
109     scanner.close();
110 
111     // test the case that RPC is always timesout
112     scanner = ht.getScanner(scan);
113     RSRpcServicesWithScanTimeout.sleepAlways = true;
114     RSRpcServicesWithScanTimeout.tryNumber = 0;
115     try {
116       result = scanner.next();
117     } catch (IOException ioe) {
118       // catch the exception after max retry number
119       LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe);
120     }
121     assertTrue("Expected maximal try number=" + CLIENT_RETRIES_NUMBER
122         + ", actual =" + RSRpcServicesWithScanTimeout.tryNumber,
123         RSRpcServicesWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER);
124   }
125 
126   private void putToTable(Table ht, byte[] rowkey) throws IOException {
127     Put put = new Put(rowkey);
128     put.add(FAMILY, QUALIFIER, VALUE);
129     ht.put(put);
130   }
131 
132   private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
133     public RegionServerWithScanTimeout(Configuration conf, CoordinatedStateManager cp)
134         throws IOException, InterruptedException {
135       super(conf, cp);
136     }
137 
138     protected RSRpcServices createRpcServices() throws IOException {
139       return new RSRpcServicesWithScanTimeout(this);
140     }
141   }
142 
143   private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
144     private long tableScannerId;
145     private boolean slept;
146     private static long seqNoToSleepOn = -1;
147     private static boolean sleepAlways = false;
148     private static int tryNumber = 0;
149 
150     public RSRpcServicesWithScanTimeout(HRegionServer rs)
151         throws IOException {
152       super(rs);
153     }
154 
155     @Override
156     public ScanResponse scan(final RpcController controller, final ScanRequest request)
157         throws ServiceException {
158       if (request.hasScannerId()) {
159         ScanResponse scanResponse = super.scan(controller, request);
160         if (this.tableScannerId == request.getScannerId() && 
161             (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) {
162           try {
163             LOG.info("SLEEPING " + (rpcTimeout + 500));
164             Thread.sleep(rpcTimeout + 500);
165           } catch (InterruptedException e) {
166           }
167           slept = true;
168           tryNumber++;
169           if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) {
170             sleepAlways = false;
171           }
172         }
173         return scanResponse;
174       } else {
175         ScanResponse scanRes = super.scan(controller, request);
176         String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
177         if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
178           tableScannerId = scanRes.getScannerId();
179         }
180         return scanRes;
181       }
182     }
183   }
184 }