View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Random;
28  import java.util.Set;
29  import java.util.concurrent.CountDownLatch;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.commons.logging.impl.Log4JLogger;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.NotServingRegionException;
47  import org.apache.hadoop.hbase.RegionLocations;
48  import org.apache.hadoop.hbase.TableNotFoundException;
49  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
50  import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl;
51  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
52  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
54  import org.apache.hadoop.hbase.protobuf.RequestConverter;
55  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
56  import org.apache.hadoop.hbase.regionserver.HRegionServer;
57  import org.apache.hadoop.hbase.regionserver.InternalScanner;
58  import org.apache.hadoop.hbase.regionserver.RegionScanner;
59  import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
60  import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
61  import org.apache.hadoop.hbase.testclassification.MediumTests;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
64  import org.apache.log4j.Level;
65  import org.apache.zookeeper.KeeperException;
66  import org.junit.After;
67  import org.junit.AfterClass;
68  import org.junit.Assert;
69  import org.junit.Before;
70  import org.junit.BeforeClass;
71  import org.junit.Test;
72  import org.junit.experimental.categories.Category;
73  
74  /**
75   * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
76   * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
77   */
78  @Category(MediumTests.class)
79  public class TestReplicasClient {
80    private static final Log LOG = LogFactory.getLog(TestReplicasClient.class);
81  
82    static {
83      ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
84    }
85  
86    private static final int NB_SERVERS = 1;
87    private static HTable table = null;
88    private static final byte[] row = TestReplicasClient.class.getName().getBytes();
89  
90    private static HRegionInfo hriPrimary;
91    private static HRegionInfo hriSecondary;
92  
93    private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
94    private static final byte[] f = HConstants.CATALOG_FAMILY;
95  
96    private final static int REFRESH_PERIOD = 1000;
97  
98    /**
99     * This copro is used to synchronize the tests.
100    */
101   public static class SlowMeCopro extends BaseRegionObserver {
102     static final AtomicLong sleepTime = new AtomicLong(0);
103     static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
104     static final AtomicInteger countOfNext = new AtomicInteger(0);
105     private static final AtomicReference<CountDownLatch> cdl =
106         new AtomicReference<CountDownLatch>(new CountDownLatch(0));
107     Random r = new Random();
108     public SlowMeCopro() {
109     }
110 
111     @Override
112     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
113                          final Get get, final List<Cell> results) throws IOException {
114       slowdownCode(e);
115     }
116 
117     @Override
118     public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
119         final Scan scan, final RegionScanner s) throws IOException {
120       slowdownCode(e);
121       return s;
122     }
123 
124     @Override
125     public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
126         final InternalScanner s, final List<Result> results,
127         final int limit, final boolean hasMore) throws IOException {
128       //this will slow down a certain next operation if the conditions are met. The slowness
129       //will allow the call to go to a replica
130       if (slowDownNext.get()) {
131         //have some "next" return successfully from the primary; hence countOfNext checked
132         if (countOfNext.incrementAndGet() == 2) {
133           sleepTime.set(2000);
134           slowdownCode(e);
135         }
136       }
137       return true;
138     }
139 
140     private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
141       if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
142         CountDownLatch latch = getCdl().get();
143         try {
144           if (sleepTime.get() > 0) {
145             LOG.info("Sleeping for " + sleepTime.get() + " ms");
146             Thread.sleep(sleepTime.get());
147           } else if (latch.getCount() > 0) {
148             LOG.info("Waiting for the counterCountDownLatch");
149             latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
150             if (latch.getCount() > 0) {
151               throw new RuntimeException("Can't wait more");
152             }
153           }
154         } catch (InterruptedException e1) {
155           LOG.error(e1);
156         }
157       } else {
158         LOG.info("We're not the primary replicas.");
159       }
160     }
161 
162     public static AtomicReference<CountDownLatch> getCdl() {
163       return cdl;
164     }
165   }
166 
167   @BeforeClass
168   public static void beforeClass() throws Exception {
169     // enable store file refreshing
170     HTU.getConfiguration().setInt(
171         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
172     HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
173     ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
174     HTU.startMiniCluster(NB_SERVERS);
175 
176     // Create table then get the single region for our new table.
177     HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
178     hdt.addCoprocessor(SlowMeCopro.class.getName());
179     table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
180 
181     hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
182 
183     // mock a secondary region info to open
184     hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
185         hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
186 
187     // No master
188     LOG.info("Master is going to be stopped");
189     TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
190     Configuration c = new Configuration(HTU.getConfiguration());
191     c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
192     LOG.info("Master has stopped");
193   }
194 
195   @AfterClass
196   public static void afterClass() throws Exception {
197     if (table != null) table.close();
198     HTU.shutdownMiniCluster();
199   }
200 
201   @Before
202   public void before() throws IOException {
203     HTU.getHBaseAdmin().getConnection().clearRegionCache();
204     try {
205       openRegion(hriPrimary);
206     } catch (Exception ignored) {
207     }
208     try {
209       openRegion(hriSecondary);
210     } catch (Exception ignored) {
211     }
212   }
213 
214   @After
215   public void after() throws IOException, KeeperException {
216     try {
217       closeRegion(hriSecondary);
218     } catch (Exception ignored) {
219     }
220     try {
221       closeRegion(hriPrimary);
222     } catch (Exception ignored) {
223     }
224     ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
225     ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriSecondary);
226 
227     HTU.getHBaseAdmin().getConnection().clearRegionCache();
228   }
229 
230   private HRegionServer getRS() {
231     return HTU.getMiniHBaseCluster().getRegionServer(0);
232   }
233 
234   private void openRegion(HRegionInfo hri) throws Exception {
235     try {
236       if (isRegionOpened(hri)) return;
237     } catch (Exception e){}
238     ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
239     // first version is '0'
240     AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(
241       getRS().getServerName(), hri, 0, null, null);
242     AdminProtos.OpenRegionResponse responseOpen = getRS().getRSRpcServices().openRegion(null, orr);
243     Assert.assertEquals(responseOpen.getOpeningStateCount(), 1);
244     Assert.assertEquals(responseOpen.getOpeningState(0),
245       AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED);
246     checkRegionIsOpened(hri);
247   }
248 
249   private void closeRegion(HRegionInfo hri) throws Exception {
250     ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
251 
252     AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
253       getRS().getServerName(), hri.getEncodedName(), true);
254     AdminProtos.CloseRegionResponse responseClose = getRS()
255         .getRSRpcServices().closeRegion(null, crr);
256     Assert.assertTrue(responseClose.getClosed());
257 
258     checkRegionIsClosed(hri.getEncodedName());
259 
260     ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null);
261   }
262 
263   private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
264 
265     while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
266       Thread.sleep(1);
267     }
268 
269     Assert.assertTrue(
270         ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null));
271   }
272 
273   private boolean isRegionOpened(HRegionInfo hri) throws Exception {
274     return getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
275   }
276 
277   private void checkRegionIsClosed(String encodedRegionName) throws Exception {
278 
279     while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
280       Thread.sleep(1);
281     }
282 
283     try {
284       Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
285     } catch (NotServingRegionException expected) {
286       // That's how it work: if the region is closed we have an exception.
287     }
288 
289     // We don't delete the znode here, because there is not always a znode.
290   }
291 
292   private void flushRegion(HRegionInfo regionInfo) throws IOException {
293     TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
294   }
295 
296   @Test
297   public void testUseRegionWithoutReplica() throws Exception {
298     byte[] b1 = "testUseRegionWithoutReplica".getBytes();
299     openRegion(hriSecondary);
300     SlowMeCopro.getCdl().set(new CountDownLatch(0));
301     try {
302       Get g = new Get(b1);
303       Result r = table.get(g);
304       Assert.assertFalse(r.isStale());
305     } finally {
306       closeRegion(hriSecondary);
307     }
308   }
309 
310   @Test
311   public void testLocations() throws Exception {
312     byte[] b1 = "testLocations".getBytes();
313     openRegion(hriSecondary);
314     ClusterConnection hc = (ClusterConnection) HTU.getHBaseAdmin().getConnection();
315 
316     try {
317       hc.clearRegionCache();
318       RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
319       Assert.assertEquals(2, rl.size());
320 
321       rl = hc.locateRegion(table.getName(), b1, true, false);
322       Assert.assertEquals(2, rl.size());
323 
324       hc.clearRegionCache();
325       rl = hc.locateRegion(table.getName(), b1, true, false);
326       Assert.assertEquals(2, rl.size());
327 
328       rl = hc.locateRegion(table.getName(), b1, false, false);
329       Assert.assertEquals(2, rl.size());
330     } finally {
331       closeRegion(hriSecondary);
332     }
333   }
334 
335   @Test
336   public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
337     byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
338     openRegion(hriSecondary);
339 
340     try {
341       // A get works and is not stale
342       Get g = new Get(b1);
343       Result r = table.get(g);
344       Assert.assertFalse(r.isStale());
345     } finally {
346       closeRegion(hriSecondary);
347     }
348   }
349 
350 
351   @Test
352   public void testGetNoResultStaleRegionWithReplica() throws Exception {
353     byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
354     openRegion(hriSecondary);
355 
356     SlowMeCopro.getCdl().set(new CountDownLatch(1));
357     try {
358       Get g = new Get(b1);
359       g.setConsistency(Consistency.TIMELINE);
360       Result r = table.get(g);
361       Assert.assertTrue(r.isStale());
362     } finally {
363       SlowMeCopro.getCdl().get().countDown();
364       closeRegion(hriSecondary);
365     }
366   }
367 
368   @Test
369   public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
370     byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
371     openRegion(hriSecondary);
372 
373     try {
374       // We sleep; but we won't go to the stale region as we don't get the stale by default.
375       SlowMeCopro.sleepTime.set(2000);
376       Get g = new Get(b1);
377       Result r = table.get(g);
378       Assert.assertFalse(r.isStale());
379 
380     } finally {
381       SlowMeCopro.sleepTime.set(0);
382       closeRegion(hriSecondary);
383     }
384   }
385 
386   @Test
387   public void testFlushTable() throws Exception {
388     openRegion(hriSecondary);
389     try {
390       flushRegion(hriPrimary);
391       flushRegion(hriSecondary);
392 
393       Put p = new Put(row);
394       p.add(f, row, row);
395       table.put(p);
396 
397       flushRegion(hriPrimary);
398       flushRegion(hriSecondary);
399     } finally {
400       Delete d = new Delete(row);
401       table.delete(d);
402       closeRegion(hriSecondary);
403     }
404   }
405 
406   @Test
407   public void testFlushPrimary() throws Exception {
408     openRegion(hriSecondary);
409 
410     try {
411       flushRegion(hriPrimary);
412 
413       Put p = new Put(row);
414       p.add(f, row, row);
415       table.put(p);
416 
417       flushRegion(hriPrimary);
418     } finally {
419       Delete d = new Delete(row);
420       table.delete(d);
421       closeRegion(hriSecondary);
422     }
423   }
424 
425   @Test
426   public void testFlushSecondary() throws Exception {
427     openRegion(hriSecondary);
428     try {
429       flushRegion(hriSecondary);
430 
431       Put p = new Put(row);
432       p.add(f, row, row);
433       table.put(p);
434 
435       flushRegion(hriSecondary);
436     } catch (TableNotFoundException expected) {
437     } finally {
438       Delete d = new Delete(row);
439       table.delete(d);
440       closeRegion(hriSecondary);
441     }
442   }
443 
444   @Test
445   public void testUseRegionWithReplica() throws Exception {
446     byte[] b1 = "testUseRegionWithReplica".getBytes();
447     openRegion(hriSecondary);
448 
449     try {
450       // A simple put works, even if there here a second replica
451       Put p = new Put(b1);
452       p.add(f, b1, b1);
453       table.put(p);
454       LOG.info("Put done");
455 
456       // A get works and is not stale
457       Get g = new Get(b1);
458       Result r = table.get(g);
459       Assert.assertFalse(r.isStale());
460       Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
461       LOG.info("get works and is not stale done");
462 
463       // Even if it we have to wait a little on the main region
464       SlowMeCopro.sleepTime.set(2000);
465       g = new Get(b1);
466       r = table.get(g);
467       Assert.assertFalse(r.isStale());
468       Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
469       SlowMeCopro.sleepTime.set(0);
470       LOG.info("sleep and is not stale done");
471 
472       // But if we ask for stale we will get it
473       SlowMeCopro.getCdl().set(new CountDownLatch(1));
474       g = new Get(b1);
475       g.setConsistency(Consistency.TIMELINE);
476       r = table.get(g);
477       Assert.assertTrue(r.isStale());
478       Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
479       SlowMeCopro.getCdl().get().countDown();
480 
481       LOG.info("stale done");
482 
483       // exists works and is not stale
484       g = new Get(b1);
485       g.setCheckExistenceOnly(true);
486       r = table.get(g);
487       Assert.assertFalse(r.isStale());
488       Assert.assertTrue(r.getExists());
489       LOG.info("exists not stale done");
490 
491       // exists works on stale but don't see the put
492       SlowMeCopro.getCdl().set(new CountDownLatch(1));
493       g = new Get(b1);
494       g.setCheckExistenceOnly(true);
495       g.setConsistency(Consistency.TIMELINE);
496       r = table.get(g);
497       Assert.assertTrue(r.isStale());
498       Assert.assertFalse("The secondary has stale data", r.getExists());
499       SlowMeCopro.getCdl().get().countDown();
500       LOG.info("exists stale before flush done");
501 
502       flushRegion(hriPrimary);
503       flushRegion(hriSecondary);
504       LOG.info("flush done");
505       Thread.sleep(1000 + REFRESH_PERIOD * 2);
506 
507       // get works and is not stale
508       SlowMeCopro.getCdl().set(new CountDownLatch(1));
509       g = new Get(b1);
510       g.setConsistency(Consistency.TIMELINE);
511       r = table.get(g);
512       Assert.assertTrue(r.isStale());
513       Assert.assertFalse(r.isEmpty());
514       SlowMeCopro.getCdl().get().countDown();
515       LOG.info("stale done");
516 
517       // exists works on stale and we see the put after the flush
518       SlowMeCopro.getCdl().set(new CountDownLatch(1));
519       g = new Get(b1);
520       g.setCheckExistenceOnly(true);
521       g.setConsistency(Consistency.TIMELINE);
522       r = table.get(g);
523       Assert.assertTrue(r.isStale());
524       Assert.assertTrue(r.getExists());
525       SlowMeCopro.getCdl().get().countDown();
526       LOG.info("exists stale after flush done");
527 
528     } finally {
529       SlowMeCopro.getCdl().get().countDown();
530       SlowMeCopro.sleepTime.set(0);
531       Delete d = new Delete(b1);
532       table.delete(d);
533       closeRegion(hriSecondary);
534     }
535   }
536 
537   @Test
538   public void testCancelOfMultiGet() throws Exception {
539     openRegion(hriSecondary);
540     try {
541       List<Put> puts = new ArrayList<Put>(2);
542       byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + 0);
543       Put p = new Put(b1);
544       p.add(f, b1, b1);
545       puts.add(p);
546 
547       byte[] b2 = Bytes.toBytes("testCancelOfMultiGet" + 1);
548       p = new Put(b2);
549       p.add(f, b2, b2);
550       puts.add(p);
551       table.put(puts);
552       LOG.debug("PUT done");
553       flushRegion(hriPrimary);
554       LOG.info("flush done");
555 
556       Thread.sleep(1000 + REFRESH_PERIOD * 2);
557 
558       AsyncProcess ap = ((ClusterConnection) HTU.getHBaseAdmin().getConnection())
559           .getAsyncProcess();
560 
561       // Make primary slowdown
562       SlowMeCopro.getCdl().set(new CountDownLatch(1));
563 
564       List<Get> gets = new ArrayList<Get>();
565       Get g = new Get(b1);
566       g.setCheckExistenceOnly(true);
567       g.setConsistency(Consistency.TIMELINE);
568       gets.add(g);
569       g = new Get(b2);
570       g.setCheckExistenceOnly(true);
571       g.setConsistency(Consistency.TIMELINE);
572       gets.add(g);
573       Object[] results = new Object[2];
574       AsyncRequestFuture reqs = ap.submitAll(table.getPool(), table.getName(),
575           gets, null, results);
576       reqs.waitUntilDone();
577       // verify we got the right results back
578       for (Object r : results) {
579         Assert.assertTrue(((Result)r).isStale());
580         Assert.assertTrue(((Result)r).getExists());
581       }
582       Set<MultiServerCallable<Row>> set = ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
583       // verify we did cancel unneeded calls
584       Assert.assertTrue(!set.isEmpty());
585       for (MultiServerCallable<Row> m : set) {
586         Assert.assertTrue(m.isCancelled());
587       }
588     } finally {
589       SlowMeCopro.getCdl().get().countDown();
590       SlowMeCopro.sleepTime.set(0);
591       SlowMeCopro.slowDownNext.set(false);
592       SlowMeCopro.countOfNext.set(0);
593       for (int i = 0; i < 2; i++) {
594         byte[] b1 = Bytes.toBytes("testCancelOfMultiGet" + i);
595         Delete d = new Delete(b1);
596         table.delete(d);
597       }
598       closeRegion(hriSecondary);
599     }
600   }
601 
602   @Test
603   public void testScanWithReplicas() throws Exception {
604     //simple scan
605     runMultipleScansOfOneType(false, false);
606   }
607 
608   @Test
609   public void testSmallScanWithReplicas() throws Exception {
610     //small scan
611     runMultipleScansOfOneType(false, true);
612   }
613 
614   @Test
615   public void testReverseScanWithReplicas() throws Exception {
616     //reverse scan
617     runMultipleScansOfOneType(true, false);
618   }
619 
620   @Test
621   public void testCancelOfScan() throws Exception {
622     openRegion(hriSecondary);
623     int NUMROWS = 100;
624     try {
625       for (int i = 0; i < NUMROWS; i++) {
626         byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
627         Put p = new Put(b1);
628         p.add(f, b1, b1);
629         table.put(p);
630       }
631       LOG.debug("PUT done");
632       int caching = 20;
633       byte[] start;
634       start = Bytes.toBytes("testUseRegionWithReplica" + 0);
635 
636       flushRegion(hriPrimary);
637       LOG.info("flush done");
638       Thread.sleep(1000 + REFRESH_PERIOD * 2);
639 
640       // now make some 'next' calls slow
641       SlowMeCopro.slowDownNext.set(true);
642       SlowMeCopro.countOfNext.set(0);
643       SlowMeCopro.sleepTime.set(5000);
644 
645       Scan scan = new Scan(start);
646       scan.setCaching(caching);
647       scan.setConsistency(Consistency.TIMELINE);
648       ResultScanner scanner = table.getScanner(scan);
649       Iterator<Result> iter = scanner.iterator();
650       iter.next();
651       Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
652       SlowMeCopro.slowDownNext.set(false);
653       SlowMeCopro.countOfNext.set(0);
654     } finally {
655       SlowMeCopro.cdl.get().countDown();
656       SlowMeCopro.sleepTime.set(0);
657       SlowMeCopro.slowDownNext.set(false);
658       SlowMeCopro.countOfNext.set(0);
659       for (int i = 0; i < NUMROWS; i++) {
660         byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
661         Delete d = new Delete(b1);
662         table.delete(d);
663       }
664       closeRegion(hriSecondary);
665     }
666   }
667 
668   private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
669     openRegion(hriSecondary);
670     int NUMROWS = 100;
671     int NUMCOLS = 10;
672     try {
673       for (int i = 0; i < NUMROWS; i++) {
674         byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
675         for (int col = 0; col < NUMCOLS; col++) {
676           Put p = new Put(b1);
677           String qualifier = "qualifer" + col;
678           KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
679           p.add(kv);
680           table.put(p);
681         }
682       }
683       LOG.debug("PUT done");
684       int caching = 20;
685       long maxResultSize = Long.MAX_VALUE;
686 
687       byte[] start;
688       if (reversed) start = Bytes.toBytes("testUseRegionWithReplica" + (NUMROWS - 1));
689       else start = Bytes.toBytes("testUseRegionWithReplica" + 0);
690 
691       scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
692         start, NUMROWS, NUMCOLS, false, false);
693 
694       // Even if we were to slow the server down, unless we ask for stale
695       // we won't get it
696       SlowMeCopro.sleepTime.set(5000);
697       scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, NUMROWS,
698         NUMCOLS, false, false);
699       SlowMeCopro.sleepTime.set(0);
700 
701       flushRegion(hriPrimary);
702       LOG.info("flush done");
703       Thread.sleep(1000 + REFRESH_PERIOD * 2);
704 
705       //Now set the flag to get a response even if stale
706       SlowMeCopro.sleepTime.set(5000);
707       scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize,
708         start, NUMROWS, NUMCOLS, true, false);
709       SlowMeCopro.sleepTime.set(0);
710 
711       // now make some 'next' calls slow
712       SlowMeCopro.slowDownNext.set(true);
713       SlowMeCopro.countOfNext.set(0);
714       scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
715         NUMROWS, NUMCOLS, true, true);
716       SlowMeCopro.slowDownNext.set(false);
717       SlowMeCopro.countOfNext.set(0);
718 
719       // Make sure we do not get stale data..
720       SlowMeCopro.sleepTime.set(5000);
721       scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize,
722         start, NUMROWS, NUMCOLS, false, false);
723       SlowMeCopro.sleepTime.set(0);
724 
725       // While the next calls are slow, set maxResultSize to 1 so that some partial results will be
726       // returned from the server before the replica switch occurs.
727       maxResultSize = 1;
728       SlowMeCopro.slowDownNext.set(true);
729       SlowMeCopro.countOfNext.set(0);
730       scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start,
731         NUMROWS, NUMCOLS, true, true);
732       maxResultSize = Long.MAX_VALUE;
733       SlowMeCopro.slowDownNext.set(false);
734       SlowMeCopro.countOfNext.set(0);
735     } finally {
736       SlowMeCopro.getCdl().get().countDown();
737       SlowMeCopro.sleepTime.set(0);
738       SlowMeCopro.slowDownNext.set(false);
739       SlowMeCopro.countOfNext.set(0);
740       for (int i = 0; i < NUMROWS; i++) {
741         byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
742         Delete d = new Delete(b1);
743         table.delete(d);
744       }
745       closeRegion(hriSecondary);
746     }
747   }
748 
749   private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency,
750       int caching, long maxResultSize, byte[] startRow, int numRows, int numCols,
751       boolean staleExpected, boolean slowNext)
752           throws Exception {
753     Scan scan = new Scan(startRow);
754     scan.setCaching(caching);
755     scan.setMaxResultSize(maxResultSize);
756     scan.setReversed(reversed);
757     scan.setSmall(small);
758     scan.setConsistency(consistency);
759     ResultScanner scanner = table.getScanner(scan);
760     Iterator<Result> iter = scanner.iterator();
761 
762     // Maps of row keys that we have seen so far
763     HashMap<String, Boolean> map = new HashMap<String, Boolean>();
764 
765     // Tracked metrics
766     int rowCount = 0;
767     int cellCount = 0;
768     int countOfStale = 0;
769 
770     while (iter.hasNext()) {
771       rowCount++;
772       Result r = iter.next();
773       String row = new String(r.getRow());
774 
775       if (map.containsKey(row)) {
776         throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString(r.getRow()));
777       }
778 
779       map.put(row, true);
780 
781       for (Cell cell : r.rawCells()) {
782         cellCount++;
783       }
784 
785       if (!slowNext) Assert.assertTrue(r.isStale() == staleExpected);
786       if (r.isStale()) countOfStale++;
787     }
788     Assert.assertTrue("Count of rows " + rowCount + " num rows expected " + numRows,
789       rowCount == numRows);
790     Assert.assertTrue("Count of cells: " + cellCount + " cells expected: " + numRows * numCols,
791       cellCount == (numRows * numCols));
792 
793     if (slowNext) {
794       LOG.debug("Count of Stale " + countOfStale);
795       Assert.assertTrue(countOfStale > 1);
796 
797       // If the scan was configured in such a way that a full row was NOT retrieved before the
798       // replica switch occurred, then it is possible that all rows were stale
799       if (maxResultSize != Long.MAX_VALUE) {
800         Assert.assertTrue(countOfStale <= numRows);
801       } else {
802         Assert.assertTrue(countOfStale < numRows);
803       }
804     }
805   }
806 }