View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.nio.charset.StandardCharsets;
25  import java.util.List;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.Executors;
28  
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.KeyValue.Type;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
37  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
38  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
39  import org.apache.hadoop.hbase.testclassification.SmallTests;
40  import org.junit.After;
41  import org.junit.Before;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  import org.mockito.Mockito;
45  import org.mockito.invocation.InvocationOnMock;
46  import org.mockito.stubbing.Answer;
47  
48  /**
49   * Test the ClientSmallScanner.
50   */
51  @Category(SmallTests.class)
52  public class TestClientSmallScanner {
53  
54    Scan scan;
55    ExecutorService pool;
56    Configuration conf;
57  
58    ClusterConnection clusterConn;
59    RpcRetryingCallerFactory rpcFactory;
60    RpcControllerFactory controllerFactory;
61    RpcRetryingCaller<Result[]> caller;
62  
63    @Before
64    @SuppressWarnings({"deprecation", "unchecked"})
65    public void setup() throws IOException {
66      clusterConn = Mockito.mock(ClusterConnection.class);
67      rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
68      controllerFactory = Mockito.mock(RpcControllerFactory.class);
69      pool = Executors.newSingleThreadExecutor();
70      scan = new Scan();
71      conf = new Configuration();
72      Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
73      // Mock out the RpcCaller
74      caller = Mockito.mock(RpcRetryingCaller.class);
75      // Return the mock from the factory
76      Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
77    }
78  
79    @After
80    public void teardown() {
81      if (null != pool) {
82        pool.shutdownNow();
83      }
84    }
85  
86    /**
87     * Create a simple Answer which returns true the first time, and false every time after.
88     */
89    private Answer<Boolean> createTrueThenFalseAnswer() {
90      return new Answer<Boolean>() {
91        boolean first = true;
92  
93        @Override
94        public Boolean answer(InvocationOnMock invocation) {
95          if (first) {
96            first = false;
97            return true;
98          }
99          return false;
100       }
101     };
102   }
103 
104   private SmallScannerCallableFactory getFactory(
105       final ScannerCallableWithReplicas callableWithReplicas) {
106     return new SmallScannerCallableFactory() {
107       @Override
108       public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
109           Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
110           RpcControllerFactory controllerFactory, ExecutorService pool,
111           int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
112           RpcRetryingCaller<Result[]> caller) {
113         return callableWithReplicas;
114       }
115     };
116   }
117 
118   @Test
119   public void testContextPresent() throws Exception {
120     final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
121         Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
122         Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
123         Type.Maximum);
124 
125     ScannerCallableWithReplicas callableWithReplicas = Mockito
126         .mock(ScannerCallableWithReplicas.class);
127 
128     // Mock out the RpcCaller
129     @SuppressWarnings("unchecked")
130     RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
131     // Return the mock from the factory
132     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
133 
134     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
135 
136     // Intentionally leave a "default" caching size in the Scan. No matter the value, we
137     // should continue based on the server context
138 
139     try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
140         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
141 
142       css.setScannerCallableFactory(factory);
143 
144       // Return some data the first time, less the second, and none after that
145       Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
146           .thenAnswer(new Answer<Result[]>() {
147             int count = 0;
148 
149             @Override
150             public Result[] answer(InvocationOnMock invocation) {
151               Result[] results;
152               if (0 == count) {
153                 results = new Result[] {Result.create(new Cell[] {kv1}),
154                     Result.create(new Cell[] {kv2})};
155               } else if (1 == count) {
156                 results = new Result[] {Result.create(new Cell[] {kv3})};
157               } else {
158                 results = new Result[0];
159               }
160               count++;
161               return results;
162             }
163           });
164 
165       // Pass back the context always
166       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
167       // Only have more results the first time
168       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
169           createTrueThenFalseAnswer());
170 
171       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
172       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
173       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
174       // Trigger the "no more data" branch for #nextScanner(...)
175       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
176 
177       css.loadCache();
178 
179       List<Result> results = css.cache;
180       assertEquals(3, results.size());
181       for (int i = 1; i <= 3; i++) {
182         Result result = results.get(i - 1);
183         byte[] row = result.getRow();
184         assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
185         assertEquals(1, result.getMap().size());
186       }
187 
188       assertTrue(css.closed);
189     }
190   }
191 
192   @Test
193   public void testNoContextFewerRecords() throws Exception {
194     final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
195         Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
196         Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
197         Type.Maximum);
198 
199     ScannerCallableWithReplicas callableWithReplicas = Mockito
200         .mock(ScannerCallableWithReplicas.class);
201 
202     // While the server returns 2 records per batch, we expect more records.
203     scan.setCaching(2);
204     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
205 
206     try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
207         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
208 
209       css.setScannerCallableFactory(factory);
210       // Return some data the first time, less the second, and none after that
211       Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
212           .thenAnswer(new Answer<Result[]>() {
213             int count = 0;
214 
215             @Override
216             public Result[] answer(InvocationOnMock invocation) {
217               Result[] results;
218               if (0 == count) {
219                 results = new Result[] {Result.create(new Cell[] {kv1}),
220                     Result.create(new Cell[] {kv2})};
221               } else if (1 == count) {
222                 // Return fewer records than expected (2)
223                 results = new Result[] {Result.create(new Cell[] {kv3})};
224               } else {
225                 throw new RuntimeException("Should not fetch a third batch from the server");
226               }
227               count++;
228               return results;
229             }
230           });
231 
232       // Server doesn't return the context
233       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
234       // Only have more results the first time
235       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
236           new RuntimeException("Should not be called"));
237 
238       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
239       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
240       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
241       // Trigger the "no more data" branch for #nextScanner(...)
242       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
243 
244       css.loadCache();
245 
246       List<Result> results = css.cache;
247       assertEquals(2, results.size());
248       for (int i = 1; i <= 2; i++) {
249         Result result = results.get(i - 1);
250         byte[] row = result.getRow();
251         assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
252         assertEquals(1, result.getMap().size());
253       }
254 
255       // "consume" the results we verified
256       results.clear();
257 
258       css.loadCache();
259 
260       assertEquals(1, results.size());
261       Result result = results.get(0);
262       assertEquals("row3", new String(result.getRow(), StandardCharsets.UTF_8));
263       assertEquals(1, result.getMap().size());
264       assertTrue(css.closed);
265     }
266   }
267 
268   @Test
269   public void testNoContextNoRecords() throws Exception {
270     ScannerCallableWithReplicas callableWithReplicas = Mockito
271         .mock(ScannerCallableWithReplicas.class);
272 
273     // While the server return 2 records per RPC, we expect there to be more records.
274     scan.setCaching(2);
275 
276     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
277 
278     try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
279         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
280 
281       css.setScannerCallableFactory(factory);
282 
283       // Return some data the first time, less the second, and none after that
284       Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
285           .thenReturn(new Result[0]);
286 
287       // Server doesn't return the context
288       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
289       // Only have more results the first time
290       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenThrow(
291           new RuntimeException("Should not be called"));
292 
293       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
294       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
295       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
296       // Trigger the "no more data" branch for #nextScanner(...)
297       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
298 
299       css.loadCache();
300 
301       assertEquals(0, css.cache.size());
302       assertTrue(css.closed);
303     }
304   }
305 
306   @Test
307   public void testContextNoRecords() throws Exception {
308     ScannerCallableWithReplicas callableWithReplicas = Mockito
309         .mock(ScannerCallableWithReplicas.class);
310 
311     SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
312 
313     try (ClientSmallScanner css = new ClientSmallScanner(conf, scan, TableName.valueOf("table"),
314         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
315 
316       css.setScannerCallableFactory(factory);
317 
318       // Return some data the first time, less the second, and none after that
319       Mockito.when(caller.callWithoutRetries(callableWithReplicas, css.getScannerTimeout()))
320           .thenReturn(new Result[0]);
321 
322       // Server doesn't return the context
323       Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
324       // Only have more results the first time
325       Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenReturn(false);
326 
327       // A mocked HRegionInfo so ClientSmallScanner#nextScanner(...) works right
328       HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
329       Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
330       // Trigger the "no more data" branch for #nextScanner(...)
331       Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
332 
333       css.loadCache();
334 
335       assertEquals(0, css.cache.size());
336       assertTrue(css.closed);
337     }
338   }
339 }