View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNull;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.io.IOException;
29  import java.lang.reflect.Field;
30  import java.lang.reflect.Modifier;
31  import java.net.SocketTimeoutException;
32  import java.util.ArrayList;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Random;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.SynchronousQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.atomic.AtomicLong;
44  import java.util.concurrent.atomic.AtomicReference;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.hbase.CategoryBasedTimeout;
50  import org.apache.hadoop.hbase.Cell;
51  import org.apache.hadoop.hbase.HBaseConfiguration;
52  import org.apache.hadoop.hbase.HBaseTestingUtility;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionLocation;
55  import org.apache.hadoop.hbase.HTableDescriptor;
56  import org.apache.hadoop.hbase.RegionLocations;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.TableName;
59  import org.apache.hadoop.hbase.Waiter;
60  import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
61  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
62  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
63  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
64  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
65  import org.apache.hadoop.hbase.exceptions.DeserializationException;
66  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
67  import org.apache.hadoop.hbase.filter.Filter;
68  import org.apache.hadoop.hbase.filter.FilterBase;
69  import org.apache.hadoop.hbase.ipc.RpcClient;
70  import org.apache.hadoop.hbase.master.HMaster;
71  import org.apache.hadoop.hbase.regionserver.HRegionServer;
72  import org.apache.hadoop.hbase.regionserver.Region;
73  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
74  import org.apache.hadoop.hbase.testclassification.LargeTests;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.hbase.util.JVMClusterUtil;
78  import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
79  import org.apache.hadoop.hbase.util.Threads;
80  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
81  import org.junit.AfterClass;
82  import org.junit.Assert;
83  import org.junit.BeforeClass;
84  import org.junit.Ignore;
85  import org.junit.Rule;
86  import org.junit.Test;
87  import org.junit.experimental.categories.Category;
88  import org.junit.rules.TestRule;
89  
90  import com.google.common.collect.Lists;
91  
92  /**
93   * This class is for testing HBaseConnectionManager features
94   */
95  @Category({LargeTests.class})
96  public class TestHCM {
97    @Rule public final TestRule timeout = CategoryBasedTimeout.builder()
98        .withTimeout(this.getClass())
99        .withLookingForStuckThread(true)
100       .build();
101   private static final Log LOG = LogFactory.getLog(TestHCM.class);
102   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
103   private static final TableName TABLE_NAME =
104       TableName.valueOf("test");
105   private static final TableName TABLE_NAME1 =
106       TableName.valueOf("test1");
107   private static final TableName TABLE_NAME2 =
108       TableName.valueOf("test2");
109   private static final TableName TABLE_NAME3 =
110       TableName.valueOf("test3");
111   private static final TableName TABLE_NAME4 =
112       TableName.valueOf("test4");
113   private static final byte[] FAM_NAM = Bytes.toBytes("f");
114   private static final byte[] ROW = Bytes.toBytes("bbb");
115   private static final byte[] ROW_X = Bytes.toBytes("xxx");
116   private static Random _randy = new Random();
117 
118 /**
119 * This copro sleeps 20 second. The first call it fails. The second time, it works.
120 */
121   public static class SleepAndFailFirstTime extends BaseRegionObserver {
122     static final AtomicLong ct = new AtomicLong(0);
123 
124     public SleepAndFailFirstTime() {
125     }
126 
127     @Override
128     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
129               final Get get, final List<Cell> results) throws IOException {
130       Threads.sleep(20000);
131       if (ct.incrementAndGet() == 1){
132         throw new IOException("first call I fail");
133       }
134     }
135   }
136 
137   @BeforeClass
138   public static void setUpBeforeClass() throws Exception {
139     TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
140     // Up the handlers; this test needs more than usual.
141     TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
142     TEST_UTIL.startMiniCluster(2);
143   }
144 
145   @AfterClass public static void tearDownAfterClass() throws Exception {
146     TEST_UTIL.shutdownMiniCluster();
147   }
148 
149 
150   private static int getHConnectionManagerCacheSize(){
151     return HConnectionTestingUtility.getConnectionCount();
152   }
153 
154   @Test
155   public void testClusterConnection() throws IOException {
156     ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
157         5, TimeUnit.SECONDS,
158         new SynchronousQueue<Runnable>(),
159         Threads.newDaemonThreadFactory("test-hcm"));
160 
161     HConnection con1 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
162     HConnection con2 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool);
163     // make sure the internally created ExecutorService is the one passed
164     assertTrue(otherPool == ((HConnectionImplementation)con2).getCurrentBatchPool());
165 
166     String tableName = "testClusterConnection";
167     TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
168     HTable t = (HTable)con1.getTable(tableName, otherPool);
169     // make sure passing a pool to the getTable does not trigger creation of an internal pool
170     assertNull("Internal Thread pool should be null", ((HConnectionImplementation)con1).getCurrentBatchPool());
171     // table should use the pool passed
172     assertTrue(otherPool == t.getPool());
173     t.close();
174 
175     t = (HTable)con2.getTable(tableName);
176     // table should use the connectin's internal pool
177     assertTrue(otherPool == t.getPool());
178     t.close();
179 
180     t = (HTable)con2.getTable(Bytes.toBytes(tableName));
181     // try other API too
182     assertTrue(otherPool == t.getPool());
183     t.close();
184 
185     t = (HTable)con2.getTable(TableName.valueOf(tableName));
186     // try other API too
187     assertTrue(otherPool == t.getPool());
188     t.close();
189 
190     t = (HTable)con1.getTable(tableName);
191     ExecutorService pool = ((HConnectionImplementation)con1).getCurrentBatchPool();
192     // make sure an internal pool was created
193     assertNotNull("An internal Thread pool should have been created", pool);
194     // and that the table is using it
195     assertTrue(t.getPool() == pool);
196     t.close();
197 
198     t = (HTable)con1.getTable(tableName);
199     // still using the *same* internal pool
200     assertTrue(t.getPool() == pool);
201     t.close();
202 
203     con1.close();
204     // if the pool was created on demand it should be closed upon connection close
205     assertTrue(pool.isShutdown());
206 
207     con2.close();
208     // if the pool is passed, it is not closed
209     assertFalse(otherPool.isShutdown());
210     otherPool.shutdownNow();
211   }
212 
213   /**
214    * Naive test to check that HConnection#getAdmin returns a properly constructed HBaseAdmin object
215    * @throws IOException Unable to construct admin
216    */
217   @Test
218   public void testAdminFactory() throws IOException {
219     Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
220     Admin admin = con1.getAdmin();
221     assertTrue(admin.getConnection() == con1);
222     assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
223     con1.close();
224   }
225 
226   // Fails too often!  Needs work.  HBASE-12558
227   @Ignore @Test (expected = RegionServerStoppedException.class)
228   public void testClusterStatus() throws Exception {
229 
230     TableName tn =
231         TableName.valueOf("testClusterStatus");
232     byte[] cf = "cf".getBytes();
233     byte[] rk = "rk1".getBytes();
234 
235     JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
236     rs.waitForServerOnline();
237     final ServerName sn = rs.getRegionServer().getServerName();
238 
239     HTable t = TEST_UTIL.createTable(tn, cf);
240     TEST_UTIL.waitTableAvailable(tn);
241 
242     while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
243         getRegionStates().isRegionsInTransition()){
244       Thread.sleep(1);
245     }
246     final HConnectionImplementation hci =  (HConnectionImplementation)t.getConnection();
247     while (t.getRegionLocation(rk).getPort() != sn.getPort()){
248       TEST_UTIL.getHBaseAdmin().move(t.getRegionLocation(rk).getRegionInfo().
249           getEncodedNameAsBytes(), Bytes.toBytes(sn.toString()));
250       while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
251           getRegionStates().isRegionsInTransition()){
252         Thread.sleep(1);
253       }
254       hci.clearRegionCache(tn);
255     }
256     Assert.assertNotNull(hci.clusterStatusListener);
257     TEST_UTIL.assertRegionOnServer(t.getRegionLocation(rk).getRegionInfo(), sn, 20000);
258 
259     Put p1 = new Put(rk);
260     p1.add(cf, "qual".getBytes(), "val".getBytes());
261     t.put(p1);
262 
263     rs.getRegionServer().abort("I'm dead");
264 
265     // We want the status to be updated. That's a least 10 second
266     TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
267       @Override
268       public boolean evaluate() throws Exception {
269         return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
270             getDeadServers().isDeadServer(sn);
271       }
272     });
273 
274     TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
275       @Override
276       public boolean evaluate() throws Exception {
277         return hci.clusterStatusListener.isDeadServer(sn);
278       }
279     });
280 
281     t.close();
282     hci.getClient(sn);  // will throw an exception: RegionServerStoppedException
283   }
284 
285   /**
286    * Test that we can handle connection close: it will trigger a retry, but the calls will
287    *  finish.
288    */
289   @Test
290   public void testConnectionCloseAllowsInterrupt() throws Exception {
291     testConnectionClose(true);
292   }
293 
294   @Test
295   public void testConnectionNotAllowsInterrupt() throws Exception {
296     testConnectionClose(false);
297   }
298 
299   /**
300    * Test that an operation can fail if we read the global operation timeout, even if the
301    * individual timeout is fine. We do that with:
302    * - client side: an operation timeout of 30 seconds
303    * - server side: we sleep 20 second at each attempt. The first work fails, the second one
304    * succeeds. But the client won't wait that much, because 20 + 20 > 30, so the client
305    * timeouted when the server answers.
306    */
307   @Test
308   public void testOperationTimeout() throws Exception {
309     HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
310     hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
311     HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
312 
313     // Check that it works if the timeout is big enough
314     table.setOperationTimeout(120 * 1000);
315     table.get(new Get(FAM_NAM));
316 
317     // Resetting and retrying. Will fail this time, not enough time for the second try
318     SleepAndFailFirstTime.ct.set(0);
319     try {
320       table.setOperationTimeout(30 * 1000);
321       table.get(new Get(FAM_NAM));
322       Assert.fail("We expect an exception here");
323     } catch (SocketTimeoutException e) {
324       // The client has a CallTimeout class, but it's not shared.We're not very clean today,
325       //  in the general case you can expect the call to stop, but the exception may vary.
326       // In this test however, we're sure that it will be a socket timeout.
327       LOG.info("We received an exception, as expected ", e);
328     } catch (IOException e) {
329       Assert.fail("Wrong exception:" + e.getMessage());
330     } finally {
331       table.close();
332     }
333   }
334 
335 
336   private void testConnectionClose(boolean allowsInterrupt) throws Exception {
337     TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
338     TEST_UTIL.createTable(tableName, FAM_NAM).close();
339 
340     boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
341 
342     Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
343     // We want to work on a separate connection.
344     c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
345     c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
346     c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
347     c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
348     c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
349 
350     final HTable table = new HTable(c2, tableName);
351 
352     Put put = new Put(ROW);
353     put.add(FAM_NAM, ROW, ROW);
354     table.put(put);
355 
356     // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
357     final AtomicInteger step = new AtomicInteger(0);
358 
359     final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(null);
360     Thread t = new Thread("testConnectionCloseThread") {
361       @Override
362       public void run() {
363         int done = 0;
364         try {
365           step.set(1);
366           while (step.get() == 1) {
367             Get get = new Get(ROW);
368             table.get(get);
369             done++;
370             if (done % 100 == 0)
371               LOG.info("done=" + done);
372           }
373         } catch (Throwable t) {
374           failed.set(t);
375           LOG.error(t);
376         }
377         step.set(3);
378       }
379     };
380     t.start();
381     TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
382       @Override
383       public boolean evaluate() throws Exception {
384         return step.get() == 1;
385       }
386     });
387 
388     ServerName sn = table.getRegionLocation(ROW).getServerName();
389     ConnectionManager.HConnectionImplementation conn =
390         (ConnectionManager.HConnectionImplementation) table.getConnection();
391     RpcClient rpcClient = conn.getRpcClient();
392 
393     LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
394     for (int i = 0; i < 5000; i++) {
395       rpcClient.cancelConnections(sn);
396       Thread.sleep(5);
397     }
398 
399     step.compareAndSet(1, 2);
400     // The test may fail here if the thread doing the gets is stuck. The way to find
401     //  out what's happening is to look for the thread named 'testConnectionCloseThread'
402     TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
403       @Override
404       public boolean evaluate() throws Exception {
405         return step.get() == 3;
406       }
407     });
408     table.close();
409     Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
410     TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
411   }
412 
413   /**
414    * Test that connection can become idle without breaking everything.
415    */
416   @Test
417   public void testConnectionIdle() throws Exception {
418     TableName tableName = TableName.valueOf("HCM-testConnectionIdle");
419     TEST_UTIL.createTable(tableName, FAM_NAM).close();
420     int idleTime =  20000;
421     boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
422 
423     Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
424     // We want to work on a separate connection.
425     c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
426     c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
427     c2.setInt(RpcClient.IDLE_TIME, idleTime);
428 
429     final Table table = new HTable(c2, tableName);
430 
431     Put put = new Put(ROW);
432     put.add(FAM_NAM, ROW, ROW);
433     table.put(put);
434 
435     ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
436     mee.setValue(System.currentTimeMillis());
437     EnvironmentEdgeManager.injectEdge(mee);
438     LOG.info("first get");
439     table.get(new Get(ROW));
440 
441     LOG.info("first get - changing the time & sleeping");
442     mee.incValue(idleTime + 1000);
443     Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
444                         // 1500 = sleep time in RpcClient#waitForWork + a margin
445 
446     LOG.info("second get - connection has been marked idle in the middle");
447     // To check that the connection actually became idle would need to read some private
448     //  fields of RpcClient.
449     table.get(new Get(ROW));
450     mee.incValue(idleTime + 1000);
451 
452     LOG.info("third get - connection is idle, but the reader doesn't know yet");
453     // We're testing here a special case:
454     //  time limit reached BUT connection not yet reclaimed AND a new call.
455     //  in this situation, we don't close the connection, instead we use it immediately.
456     // If we're very unlucky we can have a race condition in the test: the connection is already
457     //  under closing when we do the get, so we have an exception, and we don't retry as the
458     //  retry number is 1. The probability is very very low, and seems acceptable for now. It's
459     //  a test issue only.
460     table.get(new Get(ROW));
461 
462     LOG.info("we're done - time will change back");
463 
464     table.close();
465     EnvironmentEdgeManager.reset();
466     TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
467   }
468 
469     /**
470      * Test that the connection to the dead server is cut immediately when we receive the
471      *  notification.
472      * @throws Exception
473      */
474   @Test
475   public void testConnectionCut() throws Exception {
476 
477     TableName tableName = TableName.valueOf("HCM-testConnectionCut");
478 
479     TEST_UTIL.createTable(tableName, FAM_NAM).close();
480     boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
481 
482     Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
483     // We want to work on a separate connection.
484     c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
485     c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
486     c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
487 
488     HTable table = new HTable(c2, tableName);
489 
490     Put p = new Put(FAM_NAM);
491     p.add(FAM_NAM, FAM_NAM, FAM_NAM);
492     table.put(p);
493 
494     final HConnectionImplementation hci =  (HConnectionImplementation)table.getConnection();
495     final HRegionLocation loc = table.getRegionLocation(FAM_NAM);
496 
497     Get get = new Get(FAM_NAM);
498     Assert.assertNotNull(table.get(get));
499 
500     get = new Get(FAM_NAM);
501     get.setFilter(new BlockingFilter());
502 
503     // This thread will mark the server as dead while we're waiting during a get.
504     Thread t = new Thread() {
505       @Override
506       public void run() {
507         synchronized (syncBlockingFilter) {
508           try {
509             syncBlockingFilter.wait();
510           } catch (InterruptedException e) {
511             throw new RuntimeException(e);
512           }
513         }
514         hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
515       }
516     };
517 
518     t.start();
519     try {
520       table.get(get);
521       Assert.fail();
522     } catch (IOException expected) {
523       LOG.debug("Received: " + expected);
524       Assert.assertFalse(expected instanceof SocketTimeoutException);
525       Assert.assertFalse(syncBlockingFilter.get());
526     } finally {
527       syncBlockingFilter.set(true);
528       t.join();
529       HConnectionManager.getConnection(c2).close();
530       TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
531     }
532 
533     table.close();
534   }
535 
536   protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
537 
538   public static class BlockingFilter extends FilterBase {
539     @Override
540     public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
541       int i = 0;
542       while (i++ < 1000 && !syncBlockingFilter.get()) {
543         synchronized (syncBlockingFilter) {
544           syncBlockingFilter.notifyAll();
545         }
546         Threads.sleep(100);
547       }
548       syncBlockingFilter.set(true);
549       return false;
550     }
551     @Override
552     public ReturnCode filterKeyValue(Cell ignored) throws IOException {
553       return ReturnCode.INCLUDE;
554     }
555 
556     public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
557       return new BlockingFilter();
558     }
559   }
560 
561   @Test (timeout=120000)
562   public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
563     // Save off current HConnections
564     Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
565         new HashMap<HConnectionKey, HConnectionImplementation>();
566     oldHBaseInstances.putAll(ConnectionManager.CONNECTION_INSTANCES);
567 
568     ConnectionManager.CONNECTION_INSTANCES.clear();
569 
570     try {
571       HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
572       connection.abort("test abortingHConnectionRemovesItselfFromHCM", new Exception(
573           "test abortingHConnectionRemovesItselfFromHCM"));
574       Assert.assertNotSame(connection,
575         HConnectionManager.getConnection(TEST_UTIL.getConfiguration()));
576     } finally {
577       // Put original HConnections back
578       ConnectionManager.CONNECTION_INSTANCES.clear();
579       ConnectionManager.CONNECTION_INSTANCES.putAll(oldHBaseInstances);
580     }
581   }
582 
583   /**
584    * Test that when we delete a location using the first row of a region
585    * that we really delete it.
586    * @throws Exception
587    */
588   @Test
589   public void testRegionCaching() throws Exception{
590     TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
591     Configuration conf =  new Configuration(TEST_UTIL.getConfiguration());
592     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
593     HTable table = new HTable(conf, TABLE_NAME);
594 
595     TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
596     Put put = new Put(ROW);
597     put.add(FAM_NAM, ROW, ROW);
598     table.put(put);
599     ConnectionManager.HConnectionImplementation conn =
600       (ConnectionManager.HConnectionImplementation)table.getConnection();
601 
602     assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
603 
604     final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() + 1;
605     HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
606     conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
607         ServerName.valueOf("127.0.0.1", nextPort,
608         HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
609     Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW)
610       .getRegionLocation().getPort(), nextPort);
611 
612     conn.clearRegionCache(TABLE_NAME, ROW.clone());
613     RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
614     assertNull("What is this location?? " + rl, rl);
615 
616     // We're now going to move the region and check that it works for the client
617     // First a new put to add the location in the cache
618     conn.clearRegionCache(TABLE_NAME);
619     Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
620     Put put2 = new Put(ROW);
621     put2.add(FAM_NAM, ROW, ROW);
622     table.put(put2);
623     assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
624     assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
625 
626     TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
627     HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
628 
629     // We can wait for all regions to be online, that makes log reading easier when debugging
630     while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
631       Thread.sleep(1);
632     }
633 
634     // Now moving the region to the second server
635     HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
636     byte[] regionName = toMove.getRegionInfo().getRegionName();
637     byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
638 
639     // Choose the other server.
640     int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
641     int destServerId = (curServerId == 0 ? 1 : 0);
642 
643     HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
644     HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
645 
646     ServerName destServerName = destServer.getServerName();
647 
648     // Check that we are in the expected state
649     Assert.assertTrue(curServer != destServer);
650     Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
651     Assert.assertFalse( toMove.getPort() == destServerName.getPort());
652     Assert.assertNotNull(curServer.getOnlineRegion(regionName));
653     Assert.assertNull(destServer.getOnlineRegion(regionName));
654     Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
655         getAssignmentManager().getRegionStates().isRegionsInTransition());
656 
657     // Moving. It's possible that we don't have all the regions online at this point, so
658     //  the test must depends only on the region we're looking at.
659     LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
660     TEST_UTIL.getHBaseAdmin().move(
661       toMove.getRegionInfo().getEncodedNameAsBytes(),
662       destServerName.getServerName().getBytes()
663     );
664 
665     while (destServer.getOnlineRegion(regionName) == null ||
666         destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
667         curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
668         master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
669       // wait for the move to be finished
670       Thread.sleep(1);
671     }
672 
673     LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
674 
675     // Check our new state.
676     Assert.assertNull(curServer.getOnlineRegion(regionName));
677     Assert.assertNotNull(destServer.getOnlineRegion(regionName));
678     Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
679     Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
680 
681 
682     // Cache was NOT updated and points to the wrong server
683     Assert.assertFalse(
684         conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation()
685           .getPort() == destServerName.getPort());
686 
687     // This part relies on a number of tries equals to 1.
688     // We do a put and expect the cache to be updated, even if we don't retry
689     LOG.info("Put starting");
690     Put put3 = new Put(ROW);
691     put3.add(FAM_NAM, ROW, ROW);
692     try {
693       table.put(put3);
694       Assert.fail("Unreachable point");
695     } catch (RetriesExhaustedWithDetailsException e){
696       LOG.info("Put done, exception caught: " + e.getClass());
697       Assert.assertEquals(1, e.getNumExceptions());
698       Assert.assertEquals(1, e.getCauses().size());
699       Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
700 
701       // Check that we unserialized the exception as expected
702       Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
703       Assert.assertNotNull(cause);
704       Assert.assertTrue(cause instanceof RegionMovedException);
705     }
706     Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
707     Assert.assertEquals(
708         "Previous server was " + curServer.getServerName().getHostAndPort(),
709         destServerName.getPort(),
710         conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
711 
712     Assert.assertFalse(destServer.getRegionsInTransitionInRS()
713       .containsKey(encodedRegionNameBytes));
714     Assert.assertFalse(curServer.getRegionsInTransitionInRS()
715       .containsKey(encodedRegionNameBytes));
716 
717     // We move it back to do another test with a scan
718     LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
719     TEST_UTIL.getHBaseAdmin().move(
720       toMove.getRegionInfo().getEncodedNameAsBytes(),
721       curServer.getServerName().getServerName().getBytes()
722     );
723 
724     while (curServer.getOnlineRegion(regionName) == null ||
725         destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
726         curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
727         master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
728       // wait for the move to be finished
729       Thread.sleep(1);
730     }
731 
732     // Check our new state.
733     Assert.assertNotNull(curServer.getOnlineRegion(regionName));
734     Assert.assertNull(destServer.getOnlineRegion(regionName));
735     LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
736 
737     // Cache was NOT updated and points to the wrong server
738     Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() ==
739       curServer.getServerName().getPort());
740 
741     Scan sc = new Scan();
742     sc.setStopRow(ROW);
743     sc.setStartRow(ROW);
744 
745     // The scanner takes the max retries from the connection configuration, not the table as
746     // the put.
747     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
748 
749     try {
750       ResultScanner rs = table.getScanner(sc);
751       while (rs.next() != null) {
752       }
753       Assert.fail("Unreachable point");
754     } catch (RetriesExhaustedException e) {
755       LOG.info("Scan done, expected exception caught: " + e.getClass());
756     }
757 
758     // Cache is updated with the right value.
759     Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
760     Assert.assertEquals(
761       "Previous server was "+destServer.getServerName().getHostAndPort(),
762       curServer.getServerName().getPort(),
763       conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
764 
765     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
766         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
767     table.close();
768   }
769 
770   /**
771    * Test that Connection or Pool are not closed when managed externally
772    * @throws Exception
773    */
774   @Test
775   public void testConnectionManagement() throws Exception{
776     Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
777     Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
778     HTable table = (HTable) conn.getTable(TABLE_NAME1);
779     table.close();
780     assertFalse(conn.isClosed());
781     assertFalse(table.getPool().isShutdown());
782     table = (HTable) conn.getTable(TABLE_NAME1);
783     table.close();
784     assertFalse(table.getPool().isShutdown());
785     conn.close();
786     assertTrue(table.getPool().isShutdown());
787     table0.close();
788   }
789 
790   /**
791    * Test that stale cache updates don't override newer cached values.
792    */
793   @Test
794   public void testCacheSeqNums() throws Exception{
795     HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
796     Put put = new Put(ROW);
797     put.add(FAM_NAM, ROW, ROW);
798     table.put(put);
799     ConnectionManager.HConnectionImplementation conn =
800       (ConnectionManager.HConnectionImplementation)table.getConnection();
801 
802     HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
803     assertNotNull(location);
804 
805     ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
806 
807     // Same server as already in cache reporting - overwrites any value despite seqNum.
808     int nextPort = location.getPort() + 1;
809     conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
810         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
811     location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
812     Assert.assertEquals(nextPort, location.getPort());
813 
814     // No source specified - same.
815     nextPort = location.getPort() + 1;
816     conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
817         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
818     location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
819     Assert.assertEquals(nextPort, location.getPort());
820 
821     // Higher seqNum - overwrites lower seqNum.
822     nextPort = location.getPort() + 1;
823     conn.updateCachedLocation(location.getRegionInfo(), anySource,
824         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
825     location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
826     Assert.assertEquals(nextPort, location.getPort());
827 
828     // Lower seqNum - does not overwrite higher seqNum.
829     nextPort = location.getPort() + 1;
830     conn.updateCachedLocation(location.getRegionInfo(), anySource,
831         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
832     location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
833     Assert.assertEquals(nextPort - 1, location.getPort());
834     table.close();
835   }
836 
837   /**
838    * Make sure that {@link Configuration} instances that are essentially the
839    * same map to the same {@link HConnection} instance.
840    */
841   @Test
842   public void testConnectionSameness() throws Exception {
843     Connection previousConnection = null;
844     for (int i = 0; i < 2; i++) {
845       // set random key to differentiate the connection from previous ones
846       Configuration configuration = TEST_UTIL.getConfiguration();
847       configuration.set("some_key", String.valueOf(_randy.nextInt()));
848       LOG.info("The hash code of the current configuration is: "
849           + configuration.hashCode());
850       Connection currentConnection = HConnectionManager
851           .getConnection(configuration);
852       if (previousConnection != null) {
853         assertTrue(
854             "Did not get the same connection even though its key didn't change",
855             previousConnection == currentConnection);
856       }
857       previousConnection = currentConnection;
858       // change the configuration, so that it is no longer reachable from the
859       // client's perspective. However, since its part of the LRU doubly linked
860       // list, it will eventually get thrown out, at which time it should also
861       // close the corresponding {@link HConnection}.
862       configuration.set("other_key", String.valueOf(_randy.nextInt()));
863     }
864   }
865 
866   /**
867    * Makes sure that there is no leaking of
868    * {@link ConnectionManager.HConnectionImplementation} in the {@link HConnectionManager}
869    * class.
870    * @deprecated Tests deprecated functionality.  Remove in 1.0.
871    */
872   @Deprecated
873   @Test
874   public void testConnectionUniqueness() throws Exception {
875     int zkmaxconnections = TEST_UTIL.getConfiguration().
876       getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
877           HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS);
878     // Test up to a max that is < the maximum number of zk connections.  If we
879     // go above zk connections, we just fall into cycle where we are failing
880     // to set up a session and test runs for a long time.
881     int maxConnections = Math.min(zkmaxconnections - 1, 20);
882     List<HConnection> connections = new ArrayList<HConnection>(maxConnections);
883     Connection previousConnection = null;
884     try {
885       for (int i = 0; i < maxConnections; i++) {
886         // set random key to differentiate the connection from previous ones
887         Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
888         configuration.set("some_key", String.valueOf(_randy.nextInt()));
889         configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
890             String.valueOf(_randy.nextInt()));
891         LOG.info("The hash code of the current configuration is: "
892             + configuration.hashCode());
893         HConnection currentConnection =
894           HConnectionManager.getConnection(configuration);
895         if (previousConnection != null) {
896           assertTrue("Got the same connection even though its key changed!",
897               previousConnection != currentConnection);
898         }
899         // change the configuration, so that it is no longer reachable from the
900         // client's perspective. However, since its part of the LRU doubly linked
901         // list, it will eventually get thrown out, at which time it should also
902         // close the corresponding {@link HConnection}.
903         configuration.set("other_key", String.valueOf(_randy.nextInt()));
904 
905         previousConnection = currentConnection;
906         LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: "
907             + getHConnectionManagerCacheSize());
908         Thread.sleep(50);
909         connections.add(currentConnection);
910       }
911     } finally {
912       for (Connection c: connections) {
913         // Clean up connections made so we don't interfere w/ subsequent tests.
914         HConnectionManager.deleteConnection(c.getConfiguration());
915       }
916     }
917   }
918 
919   @Test
920   public void testClosing() throws Exception {
921     Configuration configuration =
922       new Configuration(TEST_UTIL.getConfiguration());
923     configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
924         String.valueOf(_randy.nextInt()));
925 
926     Connection c1 = ConnectionFactory.createConnection(configuration);
927     // We create two connections with the same key.
928     Connection c2 = ConnectionFactory.createConnection(configuration);
929 
930     Connection c3 = HConnectionManager.getConnection(configuration);
931     Connection c4 = HConnectionManager.getConnection(configuration);
932     assertTrue(c3 == c4);
933 
934     c1.close();
935     assertTrue(c1.isClosed());
936     assertFalse(c2.isClosed());
937     assertFalse(c3.isClosed());
938 
939     c3.close();
940     // still a reference left
941     assertFalse(c3.isClosed());
942     c3.close();
943     assertTrue(c3.isClosed());
944     // c3 was removed from the cache
945     Connection c5 = HConnectionManager.getConnection(configuration);
946     assertTrue(c5 != c3);
947 
948     assertFalse(c2.isClosed());
949     c2.close();
950     assertTrue(c2.isClosed());
951     c5.close();
952     assertTrue(c5.isClosed());
953   }
954 
955   /**
956    * Trivial test to verify that nobody messes with
957    * {@link HConnectionManager#createConnection(Configuration)}
958    */
959   @Test
960   public void testCreateConnection() throws Exception {
961     Configuration configuration = TEST_UTIL.getConfiguration();
962     Connection c1 = ConnectionFactory.createConnection(configuration);
963     Connection c2 = ConnectionFactory.createConnection(configuration);
964     // created from the same configuration, yet they are different
965     assertTrue(c1 != c2);
966     assertTrue(c1.getConfiguration() == c2.getConfiguration());
967     // make sure these were not cached
968     Connection c3 = HConnectionManager.getConnection(configuration);
969     assertTrue(c1 != c3);
970     assertTrue(c2 != c3);
971   }
972 
973 
974   /**
975    * This test checks that one can connect to the cluster with only the
976    *  ZooKeeper quorum set. Other stuff like master address will be read
977    *  from ZK by the client.
978    */
979   @Test
980   public void testConnection() throws Exception{
981     // We create an empty config and add the ZK address.
982     Configuration c = new Configuration();
983     c.set(HConstants.ZOOKEEPER_QUORUM,
984       TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
985     c.set(HConstants.ZOOKEEPER_CLIENT_PORT ,
986       TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
987 
988     // This should be enough to connect
989     HConnection conn = HConnectionManager.getConnection(c);
990     assertTrue( conn.isMasterRunning() );
991     conn.close();
992   }
993 
994   private int setNumTries(HConnectionImplementation hci, int newVal) throws Exception {
995     Field numTries = hci.getClass().getDeclaredField("numTries");
996     numTries.setAccessible(true);
997     Field modifiersField = Field.class.getDeclaredField("modifiers");
998     modifiersField.setAccessible(true);
999     modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
1000     final int prevNumRetriesVal = (Integer)numTries.get(hci);
1001     numTries.set(hci, newVal);
1002 
1003     return prevNumRetriesVal;
1004   }
1005 
1006   @Test
1007   public void testMulti() throws Exception {
1008     HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
1009      try {
1010        ConnectionManager.HConnectionImplementation conn =
1011            ( ConnectionManager.HConnectionImplementation)table.getConnection();
1012 
1013        // We're now going to move the region and check that it works for the client
1014        // First a new put to add the location in the cache
1015        conn.clearRegionCache(TABLE_NAME3);
1016        Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
1017 
1018        TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
1019        HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
1020 
1021        // We can wait for all regions to be online, that makes log reading easier when debugging
1022        while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
1023          Thread.sleep(1);
1024        }
1025 
1026        Put put = new Put(ROW_X);
1027        put.add(FAM_NAM, ROW_X, ROW_X);
1028        table.put(put);
1029 
1030        // Now moving the region to the second server
1031        HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
1032        byte[] regionName = toMove.getRegionInfo().getRegionName();
1033        byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
1034 
1035        // Choose the other server.
1036        int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
1037        int destServerId = (curServerId == 0 ? 1 : 0);
1038 
1039        HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
1040        HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
1041 
1042        ServerName destServerName = destServer.getServerName();
1043 
1044        //find another row in the cur server that is less than ROW_X
1045        List<Region> regions = curServer.getOnlineRegions(TABLE_NAME3);
1046        byte[] otherRow = null;
1047        for (Region region : regions) {
1048          if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
1049              && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
1050            otherRow = region.getRegionInfo().getStartKey();
1051            break;
1052          }
1053        }
1054        assertNotNull(otherRow);
1055        // If empty row, set it to first row.-f
1056        if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
1057        Put put2 = new Put(otherRow);
1058        put2.add(FAM_NAM, otherRow, otherRow);
1059        table.put(put2); //cache put2's location
1060 
1061        // Check that we are in the expected state
1062        Assert.assertTrue(curServer != destServer);
1063        Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
1064        Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
1065        Assert.assertNotNull(curServer.getOnlineRegion(regionName));
1066        Assert.assertNull(destServer.getOnlineRegion(regionName));
1067        Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
1068            getAssignmentManager().getRegionStates().isRegionsInTransition());
1069 
1070        // Moving. It's possible that we don't have all the regions online at this point, so
1071        //  the test must depends only on the region we're looking at.
1072        LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
1073        TEST_UTIL.getHBaseAdmin().move(
1074            toMove.getRegionInfo().getEncodedNameAsBytes(),
1075            destServerName.getServerName().getBytes()
1076            );
1077 
1078        while (destServer.getOnlineRegion(regionName) == null ||
1079            destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
1080            curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
1081            master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
1082          // wait for the move to be finished
1083          Thread.sleep(1);
1084         }
1085 
1086        LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
1087 
1088        // Check our new state.
1089        Assert.assertNull(curServer.getOnlineRegion(regionName));
1090        Assert.assertNotNull(destServer.getOnlineRegion(regionName));
1091        Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
1092        Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
1093 
1094 
1095        // Cache was NOT updated and points to the wrong server
1096        Assert.assertFalse(
1097            conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
1098             .getPort() == destServerName.getPort());
1099 
1100        // Hijack the number of retry to fail after 2 tries
1101        final int prevNumRetriesVal = setNumTries(conn, 2);
1102 
1103        Put put3 = new Put(ROW_X);
1104        put3.add(FAM_NAM, ROW_X, ROW_X);
1105        Put put4 = new Put(otherRow);
1106        put4.add(FAM_NAM, otherRow, otherRow);
1107 
1108        // do multi
1109        table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
1110        // second we get RegionMovedException.
1111 
1112        setNumTries(conn, prevNumRetriesVal);
1113      } finally {
1114        table.close();
1115      }
1116   }
1117 
1118   @Ignore ("Test presumes RETRY_BACKOFF will never change; it has") @Test
1119   public void testErrorBackoffTimeCalculation() throws Exception {
1120     // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
1121     final long ANY_PAUSE = 100;
1122     ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
1123     ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
1124 
1125     ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
1126     EnvironmentEdgeManager.injectEdge(timeMachine);
1127     try {
1128       long timeBase = timeMachine.currentTime();
1129       long largeAmountOfTime = ANY_PAUSE * 1000;
1130       ConnectionManager.ServerErrorTracker tracker =
1131           new ConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
1132 
1133       // The default backoff is 0.
1134       assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
1135 
1136       // Check some backoff values from HConstants sequence.
1137       tracker.reportServerError(location);
1138       assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(location, ANY_PAUSE));
1139       tracker.reportServerError(location);
1140       tracker.reportServerError(location);
1141       tracker.reportServerError(location);
1142       assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(location, ANY_PAUSE));
1143 
1144       // All of this shouldn't affect backoff for different location.
1145       assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1146       tracker.reportServerError(diffLocation);
1147       assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1148 
1149       // Check with different base.
1150       assertEqualsWithJitter(ANY_PAUSE * 10,
1151           tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1152 
1153       // See that time from last error is taken into account. Time shift is applied after jitter,
1154       // so pass the original expected backoff as the base for jitter.
1155       long timeShift = (long)(ANY_PAUSE * 0.5);
1156       timeMachine.setValue(timeBase + timeShift);
1157       assertEqualsWithJitter((ANY_PAUSE * 5) - timeShift,
1158         tracker.calculateBackoffTime(location, ANY_PAUSE), ANY_PAUSE * 2);
1159 
1160       // However we should not go into negative.
1161       timeMachine.setValue(timeBase + ANY_PAUSE * 100);
1162       assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
1163 
1164       // We also should not go over the boundary; last retry would be on it.
1165       long timeLeft = (long)(ANY_PAUSE * 0.5);
1166       timeMachine.setValue(timeBase + largeAmountOfTime - timeLeft);
1167       assertTrue(tracker.canRetryMore(1));
1168       tracker.reportServerError(location);
1169       assertEquals(timeLeft, tracker.calculateBackoffTime(location, ANY_PAUSE));
1170       timeMachine.setValue(timeBase + largeAmountOfTime);
1171       assertFalse(tracker.canRetryMore(1));
1172     } finally {
1173       EnvironmentEdgeManager.reset();
1174     }
1175   }
1176 
1177   private static void assertEqualsWithJitter(long expected, long actual) {
1178     assertEqualsWithJitter(expected, actual, expected);
1179   }
1180 
1181   private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1182     assertTrue("Value not within jitter: " + expected + " vs " + actual,
1183         Math.abs(actual - expected) <= (0.01f * jitterBase));
1184   }
1185 
1186   /**
1187    * Tests that a destroyed connection does not have a live zookeeper.
1188    * Below is timing based.  We put up a connection to a table and then close the connection while
1189    * having a background thread running that is forcing close of the connection to try and
1190    * provoke a close catastrophe; we are hoping for a car crash so we can see if we are leaking
1191    * zk connections.
1192    * @throws Exception
1193    */
1194   @Ignore ("Flakey test: See HBASE-8996")@Test
1195   public void testDeleteForZKConnLeak() throws Exception {
1196     TEST_UTIL.createTable(TABLE_NAME4, FAM_NAM);
1197     final Configuration config = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
1198     config.setInt("zookeeper.recovery.retry", 1);
1199     config.setInt("zookeeper.recovery.retry.intervalmill", 1000);
1200     config.setInt("hbase.rpc.timeout", 2000);
1201     config.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
1202 
1203     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10,
1204       5, TimeUnit.SECONDS,
1205       new SynchronousQueue<Runnable>(),
1206       Threads.newDaemonThreadFactory("test-hcm-delete"));
1207 
1208     pool.submit(new Runnable() {
1209       @Override
1210       public void run() {
1211         while (!Thread.interrupted()) {
1212           try {
1213             HConnection conn = HConnectionManager.getConnection(config);
1214             LOG.info("Connection " + conn);
1215             HConnectionManager.deleteStaleConnection(conn);
1216             LOG.info("Connection closed " + conn);
1217             // TODO: This sleep time should be less than the time that it takes to open and close
1218             // a table.  Ideally we would do a few runs first to measure.  For now this is
1219             // timing based; hopefully we hit the bad condition.
1220             Threads.sleep(10);
1221           } catch (Exception e) {
1222           }
1223         }
1224       }
1225     });
1226 
1227     // Use connection multiple times.
1228     for (int i = 0; i < 30; i++) {
1229       Connection c1 = null;
1230       try {
1231         c1 = ConnectionManager.getConnectionInternal(config);
1232         LOG.info("HTable connection " + i + " " + c1);
1233         Table table = new HTable(config, TABLE_NAME4, pool);
1234         table.close();
1235         LOG.info("HTable connection " + i + " closed " + c1);
1236       } catch (Exception e) {
1237         LOG.info("We actually want this to happen!!!!  So we can see if we are leaking zk", e);
1238       } finally {
1239         if (c1 != null) {
1240           if (c1.isClosed()) {
1241             // cannot use getZooKeeper as method instantiates watcher if null
1242             Field zkwField = c1.getClass().getDeclaredField("keepAliveZookeeper");
1243             zkwField.setAccessible(true);
1244             Object watcher = zkwField.get(c1);
1245 
1246             if (watcher != null) {
1247               if (((ZooKeeperWatcher)watcher).getRecoverableZooKeeper().getState().isAlive()) {
1248                 // non-synchronized access to watcher; sleep and check again in case zk connection
1249                 // hasn't been cleaned up yet.
1250                 Thread.sleep(1000);
1251                 if (((ZooKeeperWatcher) watcher).getRecoverableZooKeeper().getState().isAlive()) {
1252                   pool.shutdownNow();
1253                   fail("Live zookeeper in closed connection");
1254                 }
1255               }
1256             }
1257           }
1258           c1.close();
1259         }
1260       }
1261     }
1262     pool.shutdownNow();
1263   }
1264 
1265   @Test
1266   public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1267     Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1268 
1269     TableName tableName = TableName.valueOf("testConnectionRideOverClusterRestart");
1270     TEST_UTIL.createTable(tableName.getName(), new byte[][] {FAM_NAM}, config).close();
1271 
1272     Connection connection = ConnectionFactory.createConnection(config);
1273     Table table = connection.getTable(tableName);
1274 
1275     // this will cache the meta location and table's region location
1276     table.get(new Get(Bytes.toBytes("foo")));
1277 
1278     // restart HBase
1279     TEST_UTIL.shutdownMiniHBaseCluster();
1280     TEST_UTIL.restartHBaseCluster(2);
1281     // this should be able to discover new locations for meta and table's region
1282     table.get(new Get(Bytes.toBytes("foo")));
1283     TEST_UTIL.deleteTable(tableName);
1284     table.close();
1285     connection.close();
1286   }
1287 }