View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.junit.Assert.assertTrue;
21  import static org.junit.Assert.fail;
22  
23  import java.io.IOException;
24  import java.net.SocketTimeoutException;
25  import java.util.Comparator;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.concurrent.ConcurrentSkipListMap;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Executors;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import org.apache.hadoop.hbase.util.ByteStringer;
37  import org.apache.commons.lang.NotImplementedException;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.conf.Configured;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HRegionLocation;
46  import org.apache.hadoop.hbase.RegionLocations;
47  import org.apache.hadoop.hbase.KeyValue;
48  import org.apache.hadoop.hbase.RegionTooBusyException;
49  import org.apache.hadoop.hbase.ServerName;
50  import org.apache.hadoop.hbase.testclassification.SmallTests;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
53  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
54  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
55  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
56  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
57  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
58  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
59  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
60  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
61  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
62  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
71  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
72  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
73  import org.apache.hadoop.hbase.security.User;
74  import org.apache.hadoop.hbase.util.Bytes;
75  import org.apache.hadoop.hbase.util.Pair;
76  import org.apache.hadoop.hbase.util.Threads;
77  import org.apache.hadoop.util.Tool;
78  import org.apache.hadoop.util.ToolRunner;
79  import org.junit.Before;
80  import org.junit.Ignore;
81  import org.junit.Test;
82  import org.junit.experimental.categories.Category;
83  import org.mockito.Mockito;
84  
85  import com.google.common.base.Stopwatch;
86  import com.google.protobuf.ByteString;
87  import com.google.protobuf.RpcController;
88  import com.google.protobuf.ServiceException;
89  
90  /**
91   * Test client behavior w/o setting up a cluster.
92   * Mock up cluster emissions.
93   */
94  @Category(SmallTests.class)
95  public class TestClientNoCluster extends Configured implements Tool {
96    private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class);
97    private Configuration conf;
98    public static final ServerName META_SERVERNAME =
99        ServerName.valueOf("meta.example.org", 16010, 12345);
100 
101   @Before
102   public void setUp() throws Exception {
103     this.conf = HBaseConfiguration.create();
104     // Run my HConnection overrides.  Use my little HConnectionImplementation below which
105     // allows me insert mocks and also use my Registry below rather than the default zk based
106     // one so tests run faster and don't have zk dependency.
107     this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
108   }
109 
110   /**
111    * Simple cluster registry inserted in place of our usual zookeeper based one.
112    */
113   static class SimpleRegistry implements Registry {
114     final ServerName META_HOST = META_SERVERNAME;
115 
116     @Override
117     public void init(Connection connection) {
118     }
119 
120     @Override
121     public RegionLocations getMetaRegionLocation() throws IOException {
122       return new RegionLocations(
123         new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, META_HOST));
124     }
125 
126     @Override
127     public String getClusterId() {
128       return HConstants.CLUSTER_ID_DEFAULT;
129     }
130 
131     @Override
132     public boolean isTableOnlineState(TableName tableName, boolean enabled)
133     throws IOException {
134       return enabled;
135     }
136 
137     @Override
138     public int getCurrentNrHRS() throws IOException {
139       return 1;
140     }
141   }
142 
143   /**
144    * Remove the @Ignore to try out timeout and retry asettings
145    * @throws IOException
146    */
147   @Ignore
148   @Test
149   public void testTimeoutAndRetries() throws IOException {
150     Configuration localConfig = HBaseConfiguration.create(this.conf);
151     // This override mocks up our exists/get call to throw a RegionServerStoppedException.
152     localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
153     Table table = new HTable(localConfig, TableName.META_TABLE_NAME);
154     Throwable t = null;
155     LOG.info("Start");
156     try {
157       // An exists call turns into a get w/ a flag.
158       table.exists(new Get(Bytes.toBytes("abc")));
159     } catch (SocketTimeoutException e) {
160       // I expect this exception.
161       LOG.info("Got expected exception", e);
162       t = e;
163     } catch (RetriesExhaustedException e) {
164       // This is the old, unwanted behavior.  If we get here FAIL!!!
165       fail();
166     } finally {
167       table.close();
168     }
169     LOG.info("Stop");
170     assertTrue(t != null);
171   }
172 
173   /**
174    * Test that operation timeout prevails over rpc default timeout and retries, etc.
175    * @throws IOException
176    */
177   @Test
178   public void testRpcTimeout() throws IOException {
179     Configuration localConfig = HBaseConfiguration.create(this.conf);
180     // This override mocks up our exists/get call to throw a RegionServerStoppedException.
181     localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
182     int pause = 10;
183     localConfig.setInt("hbase.client.pause", pause);
184     localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10);
185     // Set the operation timeout to be < the pause.  Expectation is that after first pause, we will
186     // fail out of the rpc because the rpc timeout will have been set to the operation tiemout
187     // and it has expired.  Otherwise, if this functionality is broke, all retries will be run --
188     // all ten of them -- and we'll get the RetriesExhaustedException exception.
189     localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1);
190     Table table = new HTable(localConfig, TableName.META_TABLE_NAME);
191     Throwable t = null;
192     try {
193       // An exists call turns into a get w/ a flag.
194       table.exists(new Get(Bytes.toBytes("abc")));
195     } catch (SocketTimeoutException e) {
196       // I expect this exception.
197       LOG.info("Got expected exception", e);
198       t = e;
199     } catch (RetriesExhaustedException e) {
200       // This is the old, unwanted behavior.  If we get here FAIL!!!
201       fail();
202     } finally {
203       table.close();
204     }
205     assertTrue(t != null);
206   }
207 
208   @Test
209   public void testDoNotRetryMetaScanner() throws IOException {
210     this.conf.set("hbase.client.connection.impl",
211       RegionServerStoppedOnScannerOpenConnection.class.getName());
212     try (Connection connection = ConnectionFactory.createConnection(conf)) {
213       MetaScanner.metaScan(connection, null);
214     }
215   }
216 
217   @Test
218   public void testDoNotRetryOnScanNext() throws IOException {
219     this.conf.set("hbase.client.connection.impl",
220       RegionServerStoppedOnScannerOpenConnection.class.getName());
221     // Go against meta else we will try to find first region for the table on construction which
222     // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
223     // good for a bit of testing.
224     Table table = new HTable(this.conf, TableName.META_TABLE_NAME);
225     ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
226     try {
227       Result result = null;
228       while ((result = scanner.next()) != null) {
229         LOG.info(result);
230       }
231     } finally {
232       scanner.close();
233       table.close();
234     }
235   }
236 
237   @Test
238   public void testRegionServerStoppedOnScannerOpen() throws IOException {
239     this.conf.set("hbase.client.connection.impl",
240       RegionServerStoppedOnScannerOpenConnection.class.getName());
241     // Go against meta else we will try to find first region for the table on construction which
242     // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
243     // good for a bit of testing.
244     Table table = new HTable(this.conf, TableName.META_TABLE_NAME);
245     ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
246     try {
247       Result result = null;
248       while ((result = scanner.next()) != null) {
249         LOG.info(result);
250       }
251     } finally {
252       scanner.close();
253       table.close();
254     }
255   }
256 
257   /**
258    * Override to shutdown going to zookeeper for cluster id and meta location.
259    */
260   static class ScanOpenNextThenExceptionThenRecoverConnection
261   extends ConnectionManager.HConnectionImplementation {
262     final ClientService.BlockingInterface stub;
263 
264     ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf,
265         boolean managed, ExecutorService pool) throws IOException {
266       super(conf, managed);
267       // Mock up my stub so open scanner returns a scanner id and then on next, we throw
268       // exceptions for three times and then after that, we return no more to scan.
269       this.stub = Mockito.mock(ClientService.BlockingInterface.class);
270       long sid = 12345L;
271       try {
272         Mockito.when(stub.scan((RpcController)Mockito.any(),
273             (ClientProtos.ScanRequest)Mockito.any())).
274           thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
275           thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
276           thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
277               setMoreResults(false).build());
278       } catch (ServiceException e) {
279         throw new IOException(e);
280       }
281     }
282 
283     @Override
284     public BlockingInterface getClient(ServerName sn) throws IOException {
285       return this.stub;
286     }
287   }
288 
289   /**
290    * Override to shutdown going to zookeeper for cluster id and meta location.
291    */
292   static class RegionServerStoppedOnScannerOpenConnection
293   extends ConnectionManager.HConnectionImplementation {
294     final ClientService.BlockingInterface stub;
295 
296     RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
297         ExecutorService pool, User user) throws IOException {
298       super(conf, managed);
299       // Mock up my stub so open scanner returns a scanner id and then on next, we throw
300       // exceptions for three times and then after that, we return no more to scan.
301       this.stub = Mockito.mock(ClientService.BlockingInterface.class);
302       long sid = 12345L;
303       try {
304         Mockito.when(stub.scan((RpcController)Mockito.any(),
305             (ClientProtos.ScanRequest)Mockito.any())).
306           thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
307           thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
308           thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
309               setMoreResults(false).build());
310       } catch (ServiceException e) {
311         throw new IOException(e);
312       }
313     }
314 
315     @Override
316     public BlockingInterface getClient(ServerName sn) throws IOException {
317       return this.stub;
318     }
319   }
320 
321   /**
322    * Override to check we are setting rpc timeout right.
323    */
324   static class RpcTimeoutConnection
325   extends ConnectionManager.HConnectionImplementation {
326     final ClientService.BlockingInterface stub;
327 
328     RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user)
329     throws IOException {
330       super(conf, managed);
331       // Mock up my stub so an exists call -- which turns into a get -- throws an exception
332       this.stub = Mockito.mock(ClientService.BlockingInterface.class);
333       try {
334         Mockito.when(stub.get((RpcController)Mockito.any(),
335             (ClientProtos.GetRequest)Mockito.any())).
336           thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito")));
337       } catch (ServiceException e) {
338         throw new IOException(e);
339       }
340     }
341 
342     @Override
343     public BlockingInterface getClient(ServerName sn) throws IOException {
344       return this.stub;
345     }
346   }
347 
348   /**
349    * Fake many regionservers and many regions on a connection implementation.
350    */
351   static class ManyServersManyRegionsConnection
352   extends ConnectionManager.HConnectionImplementation {
353     // All access should be synchronized
354     final Map<ServerName, ClientService.BlockingInterface> serversByClient;
355 
356     /**
357      * Map of faked-up rows of a 'meta table'.
358      */
359     final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
360     final AtomicLong sequenceids = new AtomicLong(0);
361     private final Configuration conf;
362 
363     ManyServersManyRegionsConnection(Configuration conf, boolean managed,
364         ExecutorService pool, User user)
365     throws IOException {
366       super(conf, managed, pool, user);
367       int serverCount = conf.getInt("hbase.test.servers", 10);
368       this.serversByClient =
369         new HashMap<ServerName, ClientService.BlockingInterface>(serverCount);
370       this.meta = makeMeta(Bytes.toBytes(
371         conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
372         conf.getInt("hbase.test.regions", 100),
373         conf.getLong("hbase.test.namespace.span", 1000),
374         serverCount);
375       this.conf = conf;
376     }
377 
378     @Override
379     public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
380       // if (!sn.toString().startsWith("meta")) LOG.info(sn);
381       ClientService.BlockingInterface stub = null;
382       synchronized (this.serversByClient) {
383         stub = this.serversByClient.get(sn);
384         if (stub == null) {
385           stub = new FakeServer(this.conf, meta, sequenceids);
386           this.serversByClient.put(sn, stub);
387         }
388       }
389       return stub;
390     }
391   }
392 
393   static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
394       final AtomicLong sequenceids, final MultiRequest request) {
395     // Make a response to match the request.  Act like there were no failures.
396     ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
397     // Per Region.
398     RegionActionResult.Builder regionActionResultBuilder =
399         RegionActionResult.newBuilder();
400     ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
401     for (RegionAction regionAction: request.getRegionActionList()) {
402       regionActionResultBuilder.clear();
403       // Per Action in a Region.
404       for (ClientProtos.Action action: regionAction.getActionList()) {
405         roeBuilder.clear();
406         // Return empty Result and proper index as result.
407         roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
408         roeBuilder.setIndex(action.getIndex());
409         regionActionResultBuilder.addResultOrException(roeBuilder.build());
410       }
411       builder.addRegionActionResult(regionActionResultBuilder.build());
412     }
413     return builder.build();
414   }
415 
416   /**
417    * Fake 'server'.
418    * Implements the ClientService responding as though it were a 'server' (presumes a new
419    * ClientService.BlockingInterface made per server).
420    */
421   static class FakeServer implements ClientService.BlockingInterface {
422     private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
423     private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
424     private final AtomicLong sequenceids;
425     private final long multiPause;
426     private final int tooManyMultiRequests;
427 
428     FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
429         final AtomicLong sequenceids) {
430       this.meta = meta;
431       this.sequenceids = sequenceids;
432 
433       // Pause to simulate the server taking time applying the edits.  This will drive up the
434       // number of threads used over in client.
435       this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
436       this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
437     }
438 
439     @Override
440     public GetResponse get(RpcController controller, GetRequest request)
441     throws ServiceException {
442       boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
443         request.getRegion().getType());
444       if (!metaRegion) {
445         return doGetResponse(request);
446       }
447       return doMetaGetResponse(meta, request);
448     }
449 
450     private GetResponse doGetResponse(GetRequest request) {
451       ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
452       ByteString row = request.getGet().getRow();
453       resultBuilder.addCell(getStartCode(row));
454       GetResponse.Builder builder = GetResponse.newBuilder();
455       builder.setResult(resultBuilder.build());
456       return builder.build();
457     }
458 
459     @Override
460     public MutateResponse mutate(RpcController controller,
461         MutateRequest request) throws ServiceException {
462       throw new NotImplementedException();
463     }
464 
465     @Override
466     public ScanResponse scan(RpcController controller,
467         ScanRequest request) throws ServiceException {
468       // Presume it is a scan of meta for now. Not all scans provide a region spec expecting
469       // the server to keep reference by scannerid.  TODO.
470       return doMetaScanResponse(meta, sequenceids, request);
471     }
472 
473     @Override
474     public BulkLoadHFileResponse bulkLoadHFile(
475         RpcController controller, BulkLoadHFileRequest request)
476         throws ServiceException {
477       throw new NotImplementedException();
478     }
479 
480     @Override
481     public CoprocessorServiceResponse execService(
482         RpcController controller, CoprocessorServiceRequest request)
483         throws ServiceException {
484       throw new NotImplementedException();
485     }
486 
487     @Override
488     public MultiResponse multi(RpcController controller, MultiRequest request)
489     throws ServiceException {
490       int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
491       try {
492         if (concurrentInvocations >= tooManyMultiRequests) {
493           throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
494            concurrentInvocations));
495         }
496         Threads.sleep(multiPause);
497         return doMultiResponse(meta, sequenceids, request);
498       } finally {
499         this.multiInvocationsCount.decrementAndGet();
500       }
501     }
502 
503     @Override
504     public CoprocessorServiceResponse execRegionServerService(RpcController controller,
505         CoprocessorServiceRequest request) throws ServiceException {
506       throw new NotImplementedException();
507     }
508   }
509 
510   static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
511       final AtomicLong sequenceids, final ScanRequest request) {
512     ScanResponse.Builder builder = ScanResponse.newBuilder();
513     int max = request.getNumberOfRows();
514     int count = 0;
515     Map<byte [], Pair<HRegionInfo, ServerName>> tail =
516       request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
517       ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
518     for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
519       // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only.
520       if (max <= 0) break;
521       if (++count > max) break;
522       HRegionInfo hri = e.getValue().getFirst();
523       ByteString row = ByteStringer.wrap(hri.getRegionName());
524       resultBuilder.clear();
525       resultBuilder.addCell(getRegionInfo(row, hri));
526       resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
527       resultBuilder.addCell(getStartCode(row));
528       builder.addResults(resultBuilder.build());
529       // Set more to false if we are on the last region in table.
530       if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
531       else builder.setMoreResults(true);
532     }
533     // If no scannerid, set one.
534     builder.setScannerId(request.hasScannerId()?
535       request.getScannerId(): sequenceids.incrementAndGet());
536     return builder.build();
537   }
538 
539   static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
540       final GetRequest request) {
541     ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
542     ByteString row = request.getGet().getRow();
543     Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
544     if (p == null) {
545       if (request.getGet().getClosestRowBefore()) {
546         byte [] bytes = row.toByteArray();
547         SortedMap<byte [], Pair<HRegionInfo, ServerName>> head =
548           bytes != null? meta.headMap(bytes): meta;
549         p = head == null? null: head.get(head.lastKey());
550       }
551     }
552     if (p != null) {
553       resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
554       resultBuilder.addCell(getServer(row, p.getSecond()));
555     }
556     resultBuilder.addCell(getStartCode(row));
557     GetResponse.Builder builder = GetResponse.newBuilder();
558     builder.setResult(resultBuilder.build());
559     return builder.build();
560   }
561 
562   /**
563    * @param name region name or encoded region name.
564    * @param type
565    * @return True if we are dealing with a hbase:meta region.
566    */
567   static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
568     switch (type) {
569     case REGION_NAME:
570       return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
571     case ENCODED_REGION_NAME:
572       return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
573     default: throw new UnsupportedOperationException();
574     }
575   }
576 
577   private final static ByteString CATALOG_FAMILY_BYTESTRING =
578       ByteStringer.wrap(HConstants.CATALOG_FAMILY);
579   private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
580       ByteStringer.wrap(HConstants.REGIONINFO_QUALIFIER);
581   private final static ByteString SERVER_QUALIFIER_BYTESTRING =
582       ByteStringer.wrap(HConstants.SERVER_QUALIFIER);
583 
584   static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
585     CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
586     cellBuilder.setRow(row);
587     cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
588     cellBuilder.setTimestamp(System.currentTimeMillis());
589     return cellBuilder;
590   }
591 
592   static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
593     CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
594     cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
595     cellBuilder.setValue(ByteStringer.wrap(hri.toByteArray()));
596     return cellBuilder.build();
597   }
598 
599   static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
600     CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
601     cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
602     cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
603     return cellBuilder.build();
604   }
605 
606   static CellProtos.Cell getStartCode(final ByteString row) {
607     CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
608     cellBuilder.setQualifier(ByteStringer.wrap(HConstants.STARTCODE_QUALIFIER));
609     // TODO:
610     cellBuilder.setValue(ByteStringer.wrap(Bytes.toBytes(META_SERVERNAME.getStartcode())));
611     return cellBuilder.build();
612   }
613 
614   private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
615 
616   /**
617    * Format passed integer.  Zero-pad.
618    * Copied from hbase-server PE class and small amendment.  Make them share.
619    * @param number
620    * @return Returns zero-prefixed 10-byte wide decimal version of passed
621    * number (Does absolute in case number is negative).
622    */
623   private static byte [] format(final long number) {
624     byte [] b = new byte[10];
625     long d = number;
626     for (int i = b.length - 1; i >= 0; i--) {
627       b[i] = (byte)((d % 10) + '0');
628       d /= 10;
629     }
630     return b;
631   }
632 
633   /**
634    * @param count
635    * @param namespaceSpan
636    * @return <code>count</code> regions
637    */
638   private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
639       final long namespaceSpan) {
640     byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
641     byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
642     long interval = namespaceSpan / count;
643     HRegionInfo [] hris = new HRegionInfo[count];
644     for (int i = 0; i < count; i++) {
645       if (i == 0) {
646         endKey = format(interval);
647       } else {
648         startKey = endKey;
649         if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
650         else endKey = format((i + 1) * interval);
651       }
652       hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
653     }
654     return hris;
655   }
656 
657   /**
658    * @param count
659    * @return Return <code>count</code> servernames.
660    */
661   private static ServerName [] makeServerNames(final int count) {
662     ServerName [] sns = new ServerName[count];
663     for (int i = 0; i < count; i++) {
664       sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i);
665     }
666     return sns;
667   }
668 
669   /**
670    * Comparator for meta row keys.
671    */
672   private static class MetaRowsComparator implements Comparator<byte []> {
673     private final KeyValue.KVComparator delegate = new KeyValue.MetaComparator();
674     @Override
675     public int compare(byte[] left, byte[] right) {
676       return delegate.compareRows(left, 0, left.length, right, 0, right.length);
677     }
678   }
679 
680   /**
681    * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and
682    * ServerName to return for this row.
683    * @return Map with faked hbase:meta content in it.
684    */
685   static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
686       final int regionCount, final long namespaceSpan, final int serverCount) {
687     // I need a comparator for meta rows so we sort properly.
688     SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
689       new ConcurrentSkipListMap<byte[], Pair<HRegionInfo,ServerName>>(new MetaRowsComparator());
690     HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
691     ServerName [] serverNames = makeServerNames(serverCount);
692     int per = regionCount / serverCount;
693     int count = 0;
694     for (HRegionInfo hri: hris) {
695       Pair<HRegionInfo, ServerName> p =
696         new Pair<HRegionInfo, ServerName>(hri, serverNames[count++ / per]);
697       meta.put(hri.getRegionName(), p);
698     }
699     return meta;
700   }
701 
702   /**
703    * Code for each 'client' to run.
704    *
705    * @param id
706    * @param c
707    * @param sharedConnection
708    * @throws IOException
709    */
710   static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
711     long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
712     long startTime = System.currentTimeMillis();
713     final int printInterval = 100000;
714     Random rd = new Random(id);
715     boolean get = c.getBoolean("hbase.test.do.gets", false);
716     TableName tableName = TableName.valueOf(BIG_USER_TABLE);
717     if (get) {
718       try (Table table = sharedConnection.getTable(tableName)){
719         Stopwatch stopWatch = new Stopwatch();
720         stopWatch.start();
721         for (int i = 0; i < namespaceSpan; i++) {
722           byte [] b = format(rd.nextLong());
723           Get g = new Get(b);
724           table.get(g);
725           if (i % printInterval == 0) {
726             LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis());
727             stopWatch.reset();
728             stopWatch.start();
729           }
730         }
731         LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
732             (System.currentTimeMillis() - startTime) + "ms");
733       }
734     } else {
735       try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
736         Stopwatch stopWatch = new Stopwatch();
737         stopWatch.start();
738         for (int i = 0; i < namespaceSpan; i++) {
739           byte [] b = format(rd.nextLong());
740           Put p = new Put(b);
741           p.add(HConstants.CATALOG_FAMILY, b, b);
742           mutator.mutate(p);
743           if (i % printInterval == 0) {
744             LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
745             stopWatch.reset();
746             stopWatch.start();
747           }
748         }
749         LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
750             (System.currentTimeMillis() - startTime) + "ms");
751         }
752     }
753   }
754 
755   @Override
756   public int run(String[] arg0) throws Exception {
757     int errCode = 0;
758     // TODO: Make command options.
759     // How many servers to fake.
760     final int servers = 1;
761     // How many regions to put on the faked servers.
762     final int regions = 100000;
763     // How many 'keys' in the faked regions.
764     final long namespaceSpan = 50000000;
765     // How long to take to pause after doing a put; make this long if you want to fake a struggling
766     // server.
767     final long multiPause = 0;
768     // Check args make basic sense.
769     if ((namespaceSpan < regions) || (regions < servers)) {
770       throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
771         regions + " which must be > servers=" + servers);
772     }
773 
774     // Set my many servers and many regions faking connection in place.
775     getConf().set("hbase.client.connection.impl",
776       ManyServersManyRegionsConnection.class.getName());
777     // Use simple kv registry rather than zk
778     getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
779     // When to report fails.  Default is we report the 10th.  This means we'll see log everytime
780     // an exception is thrown -- usually RegionTooBusyException when we have more than
781     // hbase.test.multi.too.many requests outstanding at any time.
782     getConf().setInt("hbase.client.start.log.errors.counter", 0);
783 
784     // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class.
785     getConf().setInt("hbase.test.regions", regions);
786     getConf().setLong("hbase.test.namespace.span", namespaceSpan);
787     getConf().setLong("hbase.test.servers", servers);
788     getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
789     getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
790     // Let there be ten outstanding requests at a time before we throw RegionBusyException.
791     getConf().setInt("hbase.test.multi.too.many", 10);
792     final int clients = 2;
793 
794     // Have them all share the same connection so they all share the same instance of
795     // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
796     final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
797       // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
798     // Share a connection so I can keep counts in the 'server' on concurrency.
799     final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
800     try {
801       Thread [] ts = new Thread[clients];
802       for (int j = 0; j < ts.length; j++) {
803         final int id = j;
804         ts[j] = new Thread("" + j) {
805           final Configuration c = getConf();
806 
807           @Override
808           public void run() {
809             try {
810               cycle(id, c, sharedConnection);
811             } catch (IOException e) {
812               e.printStackTrace();
813             }
814           }
815         };
816         ts[j].start();
817       }
818       for (int j = 0; j < ts.length; j++) {
819         ts[j].join();
820       }
821     } finally {
822       sharedConnection.close();
823     }
824     return errCode;
825   }
826 
827   /**
828    * Run a client instance against a faked up server.
829    * @param args TODO
830    * @throws Exception
831    */
832   public static void main(String[] args) throws Exception {
833     System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
834   }
835 }