1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.lang.reflect.UndeclaredThrowableException;
27 import java.net.InetAddress;
28 import java.net.InetSocketAddress;
29 import java.util.ArrayList;
30 import java.util.Date;
31 import java.util.HashSet;
32 import java.util.LinkedHashMap;
33 import java.util.List;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.Map;
36 import java.util.Map.Entry;
37 import java.util.NavigableMap;
38 import java.util.Set;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentMap;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicInteger;
47
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.apache.hadoop.conf.Configuration;
51 import org.apache.hadoop.hbase.DoNotRetryIOException;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.HRegionLocation;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.MasterNotRunningException;
58 import org.apache.hadoop.hbase.MetaTableAccessor;
59 import org.apache.hadoop.hbase.RegionLocations;
60 import org.apache.hadoop.hbase.ServerName;
61 import org.apache.hadoop.hbase.TableName;
62 import org.apache.hadoop.hbase.TableNotEnabledException;
63 import org.apache.hadoop.hbase.TableNotFoundException;
64 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
65 import org.apache.hadoop.hbase.classification.InterfaceAudience;
66 import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
67 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
68 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
69 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
70 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
71 import org.apache.hadoop.hbase.client.coprocessor.Batch;
72 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
73 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
74 import org.apache.hadoop.hbase.ipc.RpcClient;
75 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
76 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
77 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
78 import org.apache.hadoop.hbase.protobuf.RequestConverter;
79 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
80 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
81 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
82 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
83 import org.apache.hadoop.hbase.protobuf.generated.*;
84 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
85 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
86 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
87 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
88 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
89 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
90 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
91 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
92 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
93 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
94 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
95 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
96 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
97 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
98 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
99 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeResponse;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
182 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
183 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
184 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
185 import org.apache.hadoop.hbase.security.User;
186 import org.apache.hadoop.hbase.security.UserProvider;
187 import org.apache.hadoop.hbase.util.Bytes;
188 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
189 import org.apache.hadoop.hbase.util.ExceptionUtil;
190 import org.apache.hadoop.hbase.util.Threads;
191 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
192 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
193 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
194 import org.apache.hadoop.ipc.RemoteException;
195 import org.apache.zookeeper.KeeperException;
196
197 import com.google.common.annotations.VisibleForTesting;
198 import com.google.protobuf.BlockingRpcChannel;
199 import com.google.protobuf.RpcController;
200 import com.google.protobuf.ServiceException;
201
202
203
204
205 @SuppressWarnings("serial")
206 @InterfaceAudience.Private
207
208 class ConnectionManager {
209 static final Log LOG = LogFactory.getLog(ConnectionManager.class);
210
211 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
212 private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
213 private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
214
215
216
217
218 static final Map<HConnectionKey, HConnectionImplementation> CONNECTION_INSTANCES;
219
220 public static final int MAX_CACHED_CONNECTION_INSTANCES;
221
222
223
224
225
226 private static volatile NonceGenerator nonceGenerator = null;
227
228 private static Object nonceGeneratorCreateLock = new Object();
229
230 static {
231
232
233
234
235 MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
236 HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
237 CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, HConnectionImplementation>(
238 (int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
239 @Override
240 protected boolean removeEldestEntry(
241 Map.Entry<HConnectionKey, HConnectionImplementation> eldest) {
242 return size() > MAX_CACHED_CONNECTION_INSTANCES;
243 }
244 };
245 }
246
247
248 static class NoNonceGenerator implements NonceGenerator {
249 @Override
250 public long getNonceGroup() {
251 return HConstants.NO_NONCE;
252 }
253 @Override
254 public long newNonce() {
255 return HConstants.NO_NONCE;
256 }
257 }
258
259
260
261
262 private ConnectionManager() {
263 super();
264 }
265
266
267
268
269
270
271 @VisibleForTesting
272 static NonceGenerator injectNonceGeneratorForTesting(
273 ClusterConnection conn, NonceGenerator cnm) {
274 HConnectionImplementation connImpl = (HConnectionImplementation)conn;
275 NonceGenerator ng = connImpl.getNonceGenerator();
276 LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
277 connImpl.nonceGenerator = cnm;
278 return ng;
279 }
280
281
282
283
284
285
286
287
288
289
290 @Deprecated
291 public static HConnection getConnection(final Configuration conf) throws IOException {
292 return getConnectionInternal(conf);
293 }
294
295
296 static ClusterConnection getConnectionInternal(final Configuration conf)
297 throws IOException {
298 HConnectionKey connectionKey = new HConnectionKey(conf);
299 synchronized (CONNECTION_INSTANCES) {
300 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
301 if (connection == null) {
302 connection = (HConnectionImplementation)createConnection(conf, true);
303 CONNECTION_INSTANCES.put(connectionKey, connection);
304 } else if (connection.isClosed()) {
305 ConnectionManager.deleteConnection(connectionKey, true);
306 connection = (HConnectionImplementation)createConnection(conf, true);
307 CONNECTION_INSTANCES.put(connectionKey, connection);
308 }
309 connection.incCount();
310 return connection;
311 }
312 }
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334 public static HConnection createConnection(Configuration conf) throws IOException {
335 return createConnectionInternal(conf);
336 }
337
338 static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
339 UserProvider provider = UserProvider.instantiate(conf);
340 return createConnection(conf, false, null, provider.getCurrent());
341 }
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363 public static HConnection createConnection(Configuration conf, ExecutorService pool)
364 throws IOException {
365 UserProvider provider = UserProvider.instantiate(conf);
366 return createConnection(conf, false, pool, provider.getCurrent());
367 }
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389 public static HConnection createConnection(Configuration conf, User user)
390 throws IOException {
391 return createConnection(conf, false, null, user);
392 }
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415 public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
416 throws IOException {
417 return createConnection(conf, false, pool, user);
418 }
419
420 @Deprecated
421 static HConnection createConnection(final Configuration conf, final boolean managed)
422 throws IOException {
423 UserProvider provider = UserProvider.instantiate(conf);
424 return createConnection(conf, managed, null, provider.getCurrent());
425 }
426
427 @Deprecated
428 static ClusterConnection createConnection(final Configuration conf, final boolean managed,
429 final ExecutorService pool, final User user)
430 throws IOException {
431 return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
432 }
433
434
435
436
437
438
439
440
441
442 @Deprecated
443 public static void deleteConnection(Configuration conf) {
444 deleteConnection(new HConnectionKey(conf), false);
445 }
446
447
448
449
450
451
452
453
454 @Deprecated
455 public static void deleteStaleConnection(HConnection connection) {
456 deleteConnection(connection, true);
457 }
458
459
460
461
462
463
464
465 @Deprecated
466 public static void deleteAllConnections(boolean staleConnection) {
467 synchronized (CONNECTION_INSTANCES) {
468 Set<HConnectionKey> connectionKeys = new HashSet<HConnectionKey>();
469 connectionKeys.addAll(CONNECTION_INSTANCES.keySet());
470 for (HConnectionKey connectionKey : connectionKeys) {
471 deleteConnection(connectionKey, staleConnection);
472 }
473 CONNECTION_INSTANCES.clear();
474 }
475 }
476
477
478
479
480
481 @Deprecated
482 public static void deleteAllConnections() {
483 deleteAllConnections(false);
484 }
485
486
487 @Deprecated
488 private static void deleteConnection(HConnection connection, boolean staleConnection) {
489 synchronized (CONNECTION_INSTANCES) {
490 for (Entry<HConnectionKey, HConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
491 if (e.getValue() == connection) {
492 deleteConnection(e.getKey(), staleConnection);
493 break;
494 }
495 }
496 }
497 }
498
499 @Deprecated
500 private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
501 synchronized (CONNECTION_INSTANCES) {
502 HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
503 if (connection != null) {
504 connection.decCount();
505 if (connection.isZeroReference() || staleConnection) {
506 CONNECTION_INSTANCES.remove(connectionKey);
507 connection.internalClose();
508 }
509 } else {
510 LOG.error("Connection not found in the list, can't delete it "+
511 "(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
512 }
513 }
514 }
515
516
517
518
519
520
521
522
523
524
525
526
527 @InterfaceAudience.Private
528 public static <T> T execute(HConnectable<T> connectable) throws IOException {
529 if (connectable == null || connectable.conf == null) {
530 return null;
531 }
532 Configuration conf = connectable.conf;
533 HConnection connection = getConnection(conf);
534 boolean connectSucceeded = false;
535 try {
536 T returnValue = connectable.connect(connection);
537 connectSucceeded = true;
538 return returnValue;
539 } finally {
540 try {
541 connection.close();
542 } catch (Exception e) {
543 ExceptionUtil.rethrowIfInterrupt(e);
544 if (connectSucceeded) {
545 throw new IOException("The connection to " + connection
546 + " could not be deleted.", e);
547 }
548 }
549 }
550 }
551
552
553 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
554 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
555 justification="Access to the conncurrent hash map is under a lock so should be fine.")
556 static class HConnectionImplementation implements ClusterConnection, Closeable {
557 static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
558 private final boolean hostnamesCanChange;
559 private final long pause;
560 private final boolean useMetaReplicas;
561 private final int numTries;
562 final int rpcTimeout;
563 private NonceGenerator nonceGenerator = null;
564 private final AsyncProcess asyncProcess;
565
566 private final ServerStatisticTracker stats;
567
568 private volatile boolean closed;
569 private volatile boolean aborted;
570
571
572 ClusterStatusListener clusterStatusListener;
573
574
575 private final Object metaRegionLock = new Object();
576
577
578
579
580
581
582 private final Object masterAndZKLock = new Object();
583
584 private long keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
585
586
587
588 private volatile ExecutorService batchPool = null;
589
590
591 private volatile ExecutorService metaLookupPool = null;
592 private volatile boolean cleanupPool = false;
593
594 private final Configuration conf;
595
596
597
598 private final ConnectionConfiguration connectionConfig;
599
600
601 private RpcClient rpcClient;
602
603 private final MetaCache metaCache;
604 private final MetricsConnection metrics;
605
606 private int refCount;
607
608
609 private boolean managed;
610
611 protected User user;
612
613 private RpcRetryingCallerFactory rpcCallerFactory;
614
615 private RpcControllerFactory rpcControllerFactory;
616
617 private final RetryingCallerInterceptor interceptor;
618
619
620
621
622 Registry registry;
623
624 private final ClientBackoffPolicy backoffPolicy;
625
626 HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
627 this(conf, managed, null, null);
628 }
629
630
631
632
633
634
635
636
637
638
639
640
641 HConnectionImplementation(Configuration conf, boolean managed,
642 ExecutorService pool, User user) throws IOException {
643 this(conf);
644 this.user = user;
645 this.batchPool = pool;
646 this.managed = managed;
647 this.registry = setupRegistry();
648 retrieveClusterId();
649
650 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
651 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
652
653
654 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
655 HConstants.STATUS_PUBLISHED_DEFAULT);
656 Class<? extends ClusterStatusListener.Listener> listenerClass =
657 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
658 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
659 ClusterStatusListener.Listener.class);
660 if (shouldListen) {
661 if (listenerClass == null) {
662 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
663 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
664 } else {
665 clusterStatusListener = new ClusterStatusListener(
666 new ClusterStatusListener.DeadServerHandler() {
667 @Override
668 public void newDead(ServerName sn) {
669 clearCaches(sn);
670 rpcClient.cancelConnections(sn);
671 }
672 }, conf, listenerClass);
673 }
674 }
675 }
676
677
678
679
680 protected HConnectionImplementation(Configuration conf) {
681 this.conf = conf;
682 this.connectionConfig = new ConnectionConfiguration(conf);
683 this.closed = false;
684 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
685 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
686 this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
687 HConstants.DEFAULT_USE_META_REPLICAS);
688 this.numTries = connectionConfig.getRetriesNumber();
689 this.rpcTimeout = conf.getInt(
690 HConstants.HBASE_RPC_TIMEOUT_KEY,
691 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
692 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
693 synchronized (nonceGeneratorCreateLock) {
694 if (ConnectionManager.nonceGenerator == null) {
695 ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
696 }
697 this.nonceGenerator = ConnectionManager.nonceGenerator;
698 }
699 } else {
700 this.nonceGenerator = new NoNonceGenerator();
701 }
702 stats = ServerStatisticTracker.create(conf);
703 this.asyncProcess = createAsyncProcess(this.conf);
704 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
705 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
706 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
707 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
708 this.metrics = new MetricsConnection(this);
709 } else {
710 this.metrics = null;
711 }
712
713 this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
714 this.metaCache = new MetaCache(this.metrics);
715 }
716
717 @Override
718 public HTableInterface getTable(String tableName) throws IOException {
719 return getTable(TableName.valueOf(tableName));
720 }
721
722 @Override
723 public HTableInterface getTable(byte[] tableName) throws IOException {
724 return getTable(TableName.valueOf(tableName));
725 }
726
727 @Override
728 public HTableInterface getTable(TableName tableName) throws IOException {
729 return getTable(tableName, getBatchPool());
730 }
731
732 @Override
733 public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
734 return getTable(TableName.valueOf(tableName), pool);
735 }
736
737 @Override
738 public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
739 return getTable(TableName.valueOf(tableName), pool);
740 }
741
742 @Override
743 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
744 if (managed) {
745 throw new NeedUnmanagedConnectionException();
746 }
747 return new HTable(tableName, this, connectionConfig, rpcCallerFactory, rpcControllerFactory, pool);
748 }
749
750 @Override
751 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
752 if (params.getTableName() == null) {
753 throw new IllegalArgumentException("TableName cannot be null.");
754 }
755 if (params.getPool() == null) {
756 params.pool(HTable.getDefaultExecutor(getConfiguration()));
757 }
758 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
759 params.writeBufferSize(connectionConfig.getWriteBufferSize());
760 }
761 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
762 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
763 }
764 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
765 }
766
767 @Override
768 public BufferedMutator getBufferedMutator(TableName tableName) {
769 return getBufferedMutator(new BufferedMutatorParams(tableName));
770 }
771
772 @Override
773 public RegionLocator getRegionLocator(TableName tableName) throws IOException {
774 return new HRegionLocator(tableName, this);
775 }
776
777 @Override
778 public Admin getAdmin() throws IOException {
779 if (managed) {
780 throw new NeedUnmanagedConnectionException();
781 }
782 return new HBaseAdmin(this);
783 }
784
785 @Override
786 public MetricsConnection getConnectionMetrics() {
787 return this.metrics;
788 }
789
790 private ExecutorService getBatchPool() {
791 if (batchPool == null) {
792 synchronized (this) {
793 if (batchPool == null) {
794 this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
795 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
796 this.cleanupPool = true;
797 }
798 }
799 }
800 return this.batchPool;
801 }
802
803 private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
804 BlockingQueue<Runnable> passedWorkQueue) {
805
806 if (maxThreads == 0) {
807 maxThreads = Runtime.getRuntime().availableProcessors() * 8;
808 }
809 if (coreThreads == 0) {
810 coreThreads = Runtime.getRuntime().availableProcessors() * 8;
811 }
812 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
813 BlockingQueue<Runnable> workQueue = passedWorkQueue;
814 if (workQueue == null) {
815 workQueue =
816 new LinkedBlockingQueue<Runnable>(maxThreads *
817 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
818 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
819 }
820 ThreadPoolExecutor tpe = new ThreadPoolExecutor(
821 coreThreads,
822 maxThreads,
823 keepAliveTime,
824 TimeUnit.SECONDS,
825 workQueue,
826 Threads.newDaemonThreadFactory(toString() + nameHint));
827 tpe.allowCoreThreadTimeOut(true);
828 return tpe;
829 }
830
831 private ExecutorService getMetaLookupPool() {
832 if (this.metaLookupPool == null) {
833 synchronized (this) {
834 if (this.metaLookupPool == null) {
835
836
837
838
839 this.metaLookupPool = getThreadPool(
840 conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
841 conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10),
842 "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
843 }
844 }
845 }
846 return this.metaLookupPool;
847 }
848
849 protected ExecutorService getCurrentMetaLookupPool() {
850 return metaLookupPool;
851 }
852
853 protected ExecutorService getCurrentBatchPool() {
854 return batchPool;
855 }
856
857 private void shutdownPools() {
858 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
859 shutdownBatchPool(this.batchPool);
860 }
861 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
862 shutdownBatchPool(this.metaLookupPool);
863 }
864 }
865
866 private void shutdownBatchPool(ExecutorService pool) {
867 pool.shutdown();
868 try {
869 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
870 pool.shutdownNow();
871 }
872 } catch (InterruptedException e) {
873 pool.shutdownNow();
874 }
875 }
876
877
878
879
880
881 private Registry setupRegistry() throws IOException {
882 return RegistryFactory.getRegistry(this);
883 }
884
885
886
887
888 @VisibleForTesting
889 RpcClient getRpcClient() {
890 return rpcClient;
891 }
892
893
894
895
896 @Override
897 public String toString(){
898 return "hconnection-0x" + Integer.toHexString(hashCode());
899 }
900
901 protected String clusterId = null;
902
903 void retrieveClusterId() {
904 if (clusterId != null) return;
905 this.clusterId = this.registry.getClusterId();
906 if (clusterId == null) {
907 clusterId = HConstants.CLUSTER_ID_DEFAULT;
908 LOG.debug("clusterid came back null, using default " + clusterId);
909 }
910 }
911
912 @Override
913 public Configuration getConfiguration() {
914 return this.conf;
915 }
916
917 private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw)
918 throws MasterNotRunningException {
919 String errorMsg;
920 try {
921 if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) {
922 errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. "
923 + "It should have been written by the master. "
924 + "Check the value configured in 'zookeeper.znode.parent'. "
925 + "There could be a mismatch with the one configured in the master.";
926 LOG.error(errorMsg);
927 throw new MasterNotRunningException(errorMsg);
928 }
929 } catch (KeeperException e) {
930 errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage();
931 LOG.error(errorMsg);
932 throw new MasterNotRunningException(errorMsg, e);
933 }
934 }
935
936
937
938
939
940
941 @Deprecated
942 @Override
943 public boolean isMasterRunning()
944 throws MasterNotRunningException, ZooKeeperConnectionException {
945
946
947
948 MasterKeepAliveConnection m = getKeepAliveMasterService();
949 m.close();
950 return true;
951 }
952
953 @Override
954 public HRegionLocation getRegionLocation(final TableName tableName,
955 final byte [] row, boolean reload)
956 throws IOException {
957 return reload? relocateRegion(tableName, row): locateRegion(tableName, row);
958 }
959
960 @Override
961 public HRegionLocation getRegionLocation(final byte[] tableName,
962 final byte [] row, boolean reload)
963 throws IOException {
964 return getRegionLocation(TableName.valueOf(tableName), row, reload);
965 }
966
967 @Override
968 public boolean isTableEnabled(TableName tableName) throws IOException {
969 return this.registry.isTableOnlineState(tableName, true);
970 }
971
972 @Override
973 public boolean isTableEnabled(byte[] tableName) throws IOException {
974 return isTableEnabled(TableName.valueOf(tableName));
975 }
976
977 @Override
978 public boolean isTableDisabled(TableName tableName) throws IOException {
979 return this.registry.isTableOnlineState(tableName, false);
980 }
981
982 @Override
983 public boolean isTableDisabled(byte[] tableName) throws IOException {
984 return isTableDisabled(TableName.valueOf(tableName));
985 }
986
987 @Override
988 public boolean isTableAvailable(final TableName tableName) throws IOException {
989 final AtomicBoolean available = new AtomicBoolean(true);
990 final AtomicInteger regionCount = new AtomicInteger(0);
991 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
992 @Override
993 public boolean processRow(Result row) throws IOException {
994 HRegionInfo info = MetaScanner.getHRegionInfo(row);
995 if (info != null && !info.isSplitParent()) {
996 if (tableName.equals(info.getTable())) {
997 ServerName server = HRegionInfo.getServerName(row);
998 if (server == null) {
999 available.set(false);
1000 return false;
1001 }
1002 regionCount.incrementAndGet();
1003 } else if (tableName.compareTo(info.getTable()) < 0) {
1004
1005 return false;
1006 }
1007 }
1008 return true;
1009 }
1010 };
1011 MetaScanner.metaScan(this, visitor, tableName);
1012 return available.get() && (regionCount.get() > 0);
1013 }
1014
1015 @Override
1016 public boolean isTableAvailable(final byte[] tableName) throws IOException {
1017 return isTableAvailable(TableName.valueOf(tableName));
1018 }
1019
1020 @Override
1021 public boolean isTableAvailable(final TableName tableName, final byte[][] splitKeys)
1022 throws IOException {
1023 final AtomicBoolean available = new AtomicBoolean(true);
1024 final AtomicInteger regionCount = new AtomicInteger(0);
1025 MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
1026 @Override
1027 public boolean processRow(Result row) throws IOException {
1028 HRegionInfo info = MetaScanner.getHRegionInfo(row);
1029 if (info != null && !info.isSplitParent()) {
1030 if (tableName.equals(info.getTable())) {
1031 ServerName server = HRegionInfo.getServerName(row);
1032 if (server == null) {
1033 available.set(false);
1034 return false;
1035 }
1036 if (!Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
1037 for (byte[] splitKey : splitKeys) {
1038
1039 if (Bytes.equals(info.getStartKey(), splitKey)) {
1040 regionCount.incrementAndGet();
1041 break;
1042 }
1043 }
1044 } else {
1045
1046 regionCount.incrementAndGet();
1047 }
1048 } else if (tableName.compareTo(info.getTable()) < 0) {
1049
1050 return false;
1051 }
1052 }
1053 return true;
1054 }
1055 };
1056 MetaScanner.metaScan(this, visitor, tableName);
1057
1058 return available.get() && (regionCount.get() == splitKeys.length + 1);
1059 }
1060
1061 @Override
1062 public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys)
1063 throws IOException {
1064 return isTableAvailable(TableName.valueOf(tableName), splitKeys);
1065 }
1066
1067 @Override
1068 public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
1069 RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName),
1070 HRegionInfo.getStartKey(regionName), false, true);
1071 return locations == null ? null : locations.getRegionLocation();
1072 }
1073
1074 @Override
1075 public boolean isDeadServer(ServerName sn) {
1076 if (clusterStatusListener == null) {
1077 return false;
1078 } else {
1079 return clusterStatusListener.isDeadServer(sn);
1080 }
1081 }
1082
1083 @Override
1084 public List<HRegionLocation> locateRegions(final TableName tableName)
1085 throws IOException {
1086 return locateRegions (tableName, false, true);
1087 }
1088
1089 @Override
1090 public List<HRegionLocation> locateRegions(final byte[] tableName)
1091 throws IOException {
1092 return locateRegions(TableName.valueOf(tableName));
1093 }
1094
1095 @Override
1096 public List<HRegionLocation> locateRegions(final TableName tableName,
1097 final boolean useCache, final boolean offlined) throws IOException {
1098 NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(this, tableName);
1099 final List<HRegionLocation> locations = new ArrayList<HRegionLocation>();
1100 for (HRegionInfo regionInfo : regions.keySet()) {
1101 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
1102 if (list != null) {
1103 for (HRegionLocation loc : list.getRegionLocations()) {
1104 if (loc != null) {
1105 locations.add(loc);
1106 }
1107 }
1108 }
1109 }
1110 return locations;
1111 }
1112
1113 @Override
1114 public List<HRegionLocation> locateRegions(final byte[] tableName,
1115 final boolean useCache, final boolean offlined) throws IOException {
1116 return locateRegions(TableName.valueOf(tableName), useCache, offlined);
1117 }
1118
1119 @Override
1120 public HRegionLocation locateRegion(
1121 final TableName tableName, final byte[] row) throws IOException{
1122 RegionLocations locations = locateRegion(tableName, row, true, true);
1123 return locations == null ? null : locations.getRegionLocation();
1124 }
1125
1126 @Override
1127 public HRegionLocation locateRegion(final byte[] tableName,
1128 final byte [] row)
1129 throws IOException{
1130 return locateRegion(TableName.valueOf(tableName), row);
1131 }
1132
1133 @Override
1134 public HRegionLocation relocateRegion(final TableName tableName,
1135 final byte [] row) throws IOException{
1136 RegionLocations locations = relocateRegion(tableName, row,
1137 RegionReplicaUtil.DEFAULT_REPLICA_ID);
1138 return locations == null ? null :
1139 locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
1140 }
1141
1142 @Override
1143 public RegionLocations relocateRegion(final TableName tableName,
1144 final byte [] row, int replicaId) throws IOException{
1145
1146
1147
1148 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
1149 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
1150 }
1151
1152 return locateRegion(tableName, row, false, true, replicaId);
1153 }
1154
1155 @Override
1156 public HRegionLocation relocateRegion(final byte[] tableName,
1157 final byte [] row) throws IOException {
1158 return relocateRegion(TableName.valueOf(tableName), row);
1159 }
1160
1161 @Override
1162 public RegionLocations locateRegion(final TableName tableName,
1163 final byte [] row, boolean useCache, boolean retry)
1164 throws IOException {
1165 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
1166 }
1167
1168 @Override
1169 public RegionLocations locateRegion(final TableName tableName,
1170 final byte [] row, boolean useCache, boolean retry, int replicaId)
1171 throws IOException {
1172 if (this.closed) throw new IOException(toString() + " closed");
1173 if (tableName== null || tableName.getName().length == 0) {
1174 throw new IllegalArgumentException(
1175 "table name cannot be null or zero length");
1176 }
1177 if (tableName.equals(TableName.META_TABLE_NAME)) {
1178 return locateMeta(tableName, useCache, replicaId);
1179 } else {
1180
1181 return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
1182 }
1183 }
1184
1185 private RegionLocations locateMeta(final TableName tableName,
1186 boolean useCache, int replicaId) throws IOException {
1187
1188
1189
1190 byte[] metaCacheKey = HConstants.EMPTY_START_ROW;
1191 RegionLocations locations = null;
1192 if (useCache) {
1193 locations = getCachedLocation(tableName, metaCacheKey);
1194 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1195 return locations;
1196 }
1197 }
1198
1199
1200 synchronized (metaRegionLock) {
1201
1202
1203 if (useCache) {
1204 locations = getCachedLocation(tableName, metaCacheKey);
1205 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1206 return locations;
1207 }
1208 }
1209
1210
1211 locations = this.registry.getMetaRegionLocation();
1212 if (locations != null) {
1213 cacheLocation(tableName, locations);
1214 }
1215 }
1216 return locations;
1217 }
1218
1219
1220
1221
1222
1223 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
1224 boolean useCache, boolean retry, int replicaId) throws IOException {
1225
1226
1227
1228 if (useCache) {
1229 RegionLocations locations = getCachedLocation(tableName, row);
1230 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1231 return locations;
1232 }
1233 }
1234
1235
1236
1237
1238 byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
1239
1240 Scan s = new Scan();
1241 s.setReversed(true);
1242 s.setStartRow(metaKey);
1243 s.setSmall(true);
1244 s.setCaching(1);
1245 if (this.useMetaReplicas) {
1246 s.setConsistency(Consistency.TIMELINE);
1247 }
1248
1249 int localNumRetries = (retry ? numTries : 1);
1250
1251 for (int tries = 0; true; tries++) {
1252 if (tries >= localNumRetries) {
1253 throw new NoServerForRegionException("Unable to find region for "
1254 + Bytes.toStringBinary(row) + " in " + tableName +
1255 " after " + localNumRetries + " tries.");
1256 }
1257 if (useCache) {
1258 RegionLocations locations = getCachedLocation(tableName, row);
1259 if (locations != null && locations.getRegionLocation(replicaId) != null) {
1260 return locations;
1261 }
1262 } else {
1263
1264
1265 metaCache.clearCache(tableName, row);
1266 }
1267
1268
1269 try {
1270 Result regionInfoRow = null;
1271 ReversedClientScanner rcs = null;
1272 try {
1273 rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
1274 rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
1275 regionInfoRow = rcs.next();
1276 } finally {
1277 if (rcs != null) {
1278 rcs.close();
1279 }
1280 }
1281
1282 if (regionInfoRow == null) {
1283 throw new TableNotFoundException(tableName);
1284 }
1285
1286
1287 RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
1288 if (locations == null || locations.getRegionLocation(replicaId) == null) {
1289 throw new IOException("HRegionInfo was null in " +
1290 tableName + ", row=" + regionInfoRow);
1291 }
1292 HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
1293 if (regionInfo == null) {
1294 throw new IOException("HRegionInfo was null or empty in " +
1295 TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
1296 }
1297
1298
1299 if (!regionInfo.getTable().equals(tableName)) {
1300 throw new TableNotFoundException(
1301 "Table '" + tableName + "' was not found, got: " +
1302 regionInfo.getTable() + ".");
1303 }
1304 if (regionInfo.isSplit()) {
1305 throw new RegionOfflineException("the only available region for" +
1306 " the required row is a split parent," +
1307 " the daughters should be online soon: " +
1308 regionInfo.getRegionNameAsString());
1309 }
1310 if (regionInfo.isOffline()) {
1311 throw new RegionOfflineException("the region is offline, could" +
1312 " be caused by a disable table call: " +
1313 regionInfo.getRegionNameAsString());
1314 }
1315
1316 ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1317 if (serverName == null) {
1318 throw new NoServerForRegionException("No server address listed " +
1319 "in " + TableName.META_TABLE_NAME + " for region " +
1320 regionInfo.getRegionNameAsString() + " containing row " +
1321 Bytes.toStringBinary(row));
1322 }
1323
1324 if (isDeadServer(serverName)){
1325 throw new RegionServerStoppedException("hbase:meta says the region "+
1326 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
1327 ", but it is dead.");
1328 }
1329
1330 cacheLocation(tableName, locations);
1331 return locations;
1332 } catch (TableNotFoundException e) {
1333
1334
1335
1336 throw e;
1337 } catch (IOException e) {
1338 ExceptionUtil.rethrowIfInterrupt(e);
1339
1340 if (e instanceof RemoteException) {
1341 e = ((RemoteException)e).unwrapRemoteException();
1342 }
1343 if (tries < localNumRetries - 1) {
1344 if (LOG.isDebugEnabled()) {
1345 LOG.debug("locateRegionInMeta parentTable=" +
1346 TableName.META_TABLE_NAME + ", metaLocation=" +
1347 ", attempt=" + tries + " of " +
1348 localNumRetries + " failed; retrying after sleep of " +
1349 ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
1350 }
1351 } else {
1352 throw e;
1353 }
1354
1355 if(!(e instanceof RegionOfflineException ||
1356 e instanceof NoServerForRegionException)) {
1357 relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
1358 }
1359 }
1360 try{
1361 Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
1362 } catch (InterruptedException e) {
1363 throw new InterruptedIOException("Giving up trying to location region in " +
1364 "meta: thread is interrupted.");
1365 }
1366 }
1367 }
1368
1369
1370
1371
1372
1373
1374 private void cacheLocation(final TableName tableName, final RegionLocations location) {
1375 metaCache.cacheLocation(tableName, location);
1376 }
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386 RegionLocations getCachedLocation(final TableName tableName,
1387 final byte [] row) {
1388 return metaCache.getCachedLocation(tableName, row);
1389 }
1390
1391 public void clearRegionCache(final TableName tableName, byte[] row) {
1392 metaCache.clearCache(tableName, row);
1393 }
1394
1395
1396
1397
1398 @Override
1399 public void clearCaches(final ServerName serverName) {
1400 metaCache.clearCache(serverName);
1401 }
1402
1403 @Override
1404 public void clearRegionCache() {
1405 metaCache.clearCache();
1406 }
1407
1408 @Override
1409 public void clearRegionCache(final TableName tableName) {
1410 metaCache.clearCache(tableName);
1411 }
1412
1413 @Override
1414 public void clearRegionCache(final byte[] tableName) {
1415 clearRegionCache(TableName.valueOf(tableName));
1416 }
1417
1418
1419
1420
1421
1422
1423
1424 private void cacheLocation(final TableName tableName, final ServerName source,
1425 final HRegionLocation location) {
1426 metaCache.cacheLocation(tableName, source, location);
1427 }
1428
1429
1430 private final ConcurrentHashMap<String, Object> stubs =
1431 new ConcurrentHashMap<String, Object>();
1432
1433 private final ConcurrentHashMap<String, String> connectionLock =
1434 new ConcurrentHashMap<String, String>();
1435
1436
1437
1438
1439 static class MasterServiceState {
1440 HConnection connection;
1441 MasterService.BlockingInterface stub;
1442 int userCount;
1443
1444 MasterServiceState (final HConnection connection) {
1445 super();
1446 this.connection = connection;
1447 }
1448
1449 @Override
1450 public String toString() {
1451 return "MasterService";
1452 }
1453
1454 Object getStub() {
1455 return this.stub;
1456 }
1457
1458 void clearStub() {
1459 this.stub = null;
1460 }
1461
1462 boolean isMasterRunning() throws ServiceException {
1463 IsMasterRunningResponse response =
1464 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1465 return response != null? response.getIsMasterRunning(): false;
1466 }
1467 }
1468
1469
1470
1471
1472
1473
1474 abstract class StubMaker {
1475
1476
1477
1478 protected abstract String getServiceName();
1479
1480
1481
1482
1483
1484 protected abstract Object makeStub(final BlockingRpcChannel channel);
1485
1486
1487
1488
1489
1490 protected abstract void isMasterRunning() throws ServiceException;
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500 private Object makeStubNoRetries() throws IOException, KeeperException, ServiceException {
1501 ZooKeeperKeepAliveConnection zkw;
1502 try {
1503 zkw = getKeepAliveZooKeeperWatcher();
1504 } catch (IOException e) {
1505 ExceptionUtil.rethrowIfInterrupt(e);
1506 throw new ZooKeeperConnectionException("Can't connect to ZooKeeper", e);
1507 }
1508 try {
1509 checkIfBaseNodeAvailable(zkw);
1510 ServerName sn = MasterAddressTracker.getMasterAddress(zkw);
1511 if (sn == null) {
1512 String msg = "ZooKeeper available but no active master location found";
1513 LOG.info(msg);
1514 throw new MasterNotRunningException(msg);
1515 }
1516 if (isDeadServer(sn)) {
1517 throw new MasterNotRunningException(sn + " is dead.");
1518 }
1519
1520 String key = getStubKey(getServiceName(),
1521 sn.getHostname(), sn.getPort(), hostnamesCanChange);
1522 connectionLock.putIfAbsent(key, key);
1523 Object stub = null;
1524 synchronized (connectionLock.get(key)) {
1525 stub = stubs.get(key);
1526 if (stub == null) {
1527 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1528 stub = makeStub(channel);
1529 isMasterRunning();
1530 stubs.put(key, stub);
1531 }
1532 }
1533 return stub;
1534 } finally {
1535 zkw.close();
1536 }
1537 }
1538
1539
1540
1541
1542
1543
1544 Object makeStub() throws IOException {
1545
1546
1547 synchronized (masterAndZKLock) {
1548 Exception exceptionCaught = null;
1549 if (!closed) {
1550 try {
1551 return makeStubNoRetries();
1552 } catch (IOException e) {
1553 exceptionCaught = e;
1554 } catch (KeeperException e) {
1555 exceptionCaught = e;
1556 } catch (ServiceException e) {
1557 exceptionCaught = e;
1558 }
1559
1560 throw new MasterNotRunningException(exceptionCaught);
1561 } else {
1562 throw new DoNotRetryIOException("Connection was closed while trying to get master");
1563 }
1564 }
1565 }
1566 }
1567
1568
1569
1570
1571 class MasterServiceStubMaker extends StubMaker {
1572 private MasterService.BlockingInterface stub;
1573 @Override
1574 protected String getServiceName() {
1575 return MasterService.getDescriptor().getName();
1576 }
1577
1578 @Override
1579 MasterService.BlockingInterface makeStub() throws IOException {
1580 return (MasterService.BlockingInterface)super.makeStub();
1581 }
1582
1583 @Override
1584 protected Object makeStub(BlockingRpcChannel channel) {
1585 this.stub = MasterService.newBlockingStub(channel);
1586 return this.stub;
1587 }
1588
1589 @Override
1590 protected void isMasterRunning() throws ServiceException {
1591 this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1592 }
1593 }
1594
1595 @Override
1596 public AdminService.BlockingInterface getAdmin(final ServerName serverName)
1597 throws IOException {
1598 return getAdmin(serverName, false);
1599 }
1600
1601 @Override
1602
1603 public AdminService.BlockingInterface getAdmin(final ServerName serverName,
1604 final boolean master)
1605 throws IOException {
1606 if (isDeadServer(serverName)) {
1607 throw new RegionServerStoppedException(serverName + " is dead.");
1608 }
1609 String key = getStubKey(AdminService.BlockingInterface.class.getName(),
1610 serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange);
1611 this.connectionLock.putIfAbsent(key, key);
1612 AdminService.BlockingInterface stub = null;
1613 synchronized (this.connectionLock.get(key)) {
1614 stub = (AdminService.BlockingInterface)this.stubs.get(key);
1615 if (stub == null) {
1616 BlockingRpcChannel channel =
1617 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1618 stub = AdminService.newBlockingStub(channel);
1619 this.stubs.put(key, stub);
1620 }
1621 }
1622 return stub;
1623 }
1624
1625 @Override
1626 public ClientService.BlockingInterface getClient(final ServerName sn)
1627 throws IOException {
1628 if (isDeadServer(sn)) {
1629 throw new RegionServerStoppedException(sn + " is dead.");
1630 }
1631 String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
1632 sn.getPort(), this.hostnamesCanChange);
1633 this.connectionLock.putIfAbsent(key, key);
1634 ClientService.BlockingInterface stub = null;
1635 synchronized (this.connectionLock.get(key)) {
1636 stub = (ClientService.BlockingInterface)this.stubs.get(key);
1637 if (stub == null) {
1638 BlockingRpcChannel channel =
1639 this.rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1640 stub = ClientService.newBlockingStub(channel);
1641
1642
1643 this.stubs.put(key, stub);
1644 }
1645 }
1646 return stub;
1647 }
1648
1649 static String getStubKey(final String serviceName,
1650 final String rsHostname,
1651 int port,
1652 boolean resolveHostnames) {
1653
1654
1655
1656
1657
1658
1659 String address = rsHostname;
1660 if (resolveHostnames) {
1661 InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
1662 if (i != null) {
1663 address = i.getHostAddress() + "-" + rsHostname;
1664 }
1665 }
1666 return serviceName + "@" + address + ":" + port;
1667 }
1668
1669 private ZooKeeperKeepAliveConnection keepAliveZookeeper;
1670 private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
1671 private boolean canCloseZKW = true;
1672
1673
1674 private static final long keepAlive = 5 * 60 * 1000;
1675
1676
1677
1678
1679
1680 ZooKeeperKeepAliveConnection getKeepAliveZooKeeperWatcher()
1681 throws IOException {
1682 synchronized (masterAndZKLock) {
1683 if (keepAliveZookeeper == null) {
1684 if (this.closed) {
1685 throw new IOException(toString() + " closed");
1686 }
1687
1688
1689 keepAliveZookeeper = new ZooKeeperKeepAliveConnection(conf, this.toString(), this);
1690 }
1691 keepAliveZookeeperUserCount.addAndGet(1);
1692 keepZooKeeperWatcherAliveUntil = Long.MAX_VALUE;
1693 return keepAliveZookeeper;
1694 }
1695 }
1696
1697 void releaseZooKeeperWatcher(final ZooKeeperWatcher zkw) {
1698 if (zkw == null){
1699 return;
1700 }
1701 if (keepAliveZookeeperUserCount.addAndGet(-1) <= 0 ){
1702 keepZooKeeperWatcherAliveUntil = System.currentTimeMillis() + keepAlive;
1703 }
1704 }
1705
1706 private void closeZooKeeperWatcher() {
1707 synchronized (masterAndZKLock) {
1708 if (keepAliveZookeeper != null) {
1709 LOG.info("Closing zookeeper sessionid=0x" +
1710 Long.toHexString(
1711 keepAliveZookeeper.getRecoverableZooKeeper().getSessionId()));
1712 keepAliveZookeeper.internalClose();
1713 keepAliveZookeeper = null;
1714 }
1715 keepAliveZookeeperUserCount.set(0);
1716 }
1717 }
1718
1719 final MasterServiceState masterServiceState = new MasterServiceState(this);
1720
1721 @Override
1722 public MasterService.BlockingInterface getMaster() throws MasterNotRunningException {
1723 return getKeepAliveMasterService();
1724 }
1725
1726 private void resetMasterServiceState(final MasterServiceState mss) {
1727 mss.userCount++;
1728 }
1729
1730 @Override
1731 public MasterKeepAliveConnection getKeepAliveMasterService()
1732 throws MasterNotRunningException {
1733 synchronized (masterAndZKLock) {
1734 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1735 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1736 try {
1737 this.masterServiceState.stub = stubMaker.makeStub();
1738 } catch (MasterNotRunningException ex) {
1739 throw ex;
1740 } catch (IOException e) {
1741
1742 throw new MasterNotRunningException(e);
1743 }
1744 }
1745 resetMasterServiceState(this.masterServiceState);
1746 }
1747
1748 final MasterService.BlockingInterface stub = this.masterServiceState.stub;
1749 return new MasterKeepAliveConnection() {
1750 MasterServiceState mss = masterServiceState;
1751 @Override
1752 public MasterProtos.AbortProcedureResponse abortProcedure(
1753 RpcController controller,
1754 MasterProtos.AbortProcedureRequest request) throws ServiceException {
1755 return stub.abortProcedure(controller, request);
1756 }
1757 @Override
1758 public MasterProtos.ListProceduresResponse listProcedures(
1759 RpcController controller,
1760 MasterProtos.ListProceduresRequest request) throws ServiceException {
1761 return stub.listProcedures(controller, request);
1762 }
1763 @Override
1764 public AddColumnResponse addColumn(RpcController controller, AddColumnRequest request)
1765 throws ServiceException {
1766 return stub.addColumn(controller, request);
1767 }
1768
1769 @Override
1770 public DeleteColumnResponse deleteColumn(RpcController controller,
1771 DeleteColumnRequest request)
1772 throws ServiceException {
1773 return stub.deleteColumn(controller, request);
1774 }
1775
1776 @Override
1777 public ModifyColumnResponse modifyColumn(RpcController controller,
1778 ModifyColumnRequest request)
1779 throws ServiceException {
1780 return stub.modifyColumn(controller, request);
1781 }
1782
1783 @Override
1784 public MoveRegionResponse moveRegion(RpcController controller,
1785 MoveRegionRequest request) throws ServiceException {
1786 return stub.moveRegion(controller, request);
1787 }
1788
1789 @Override
1790 public DispatchMergingRegionsResponse dispatchMergingRegions(
1791 RpcController controller, DispatchMergingRegionsRequest request)
1792 throws ServiceException {
1793 return stub.dispatchMergingRegions(controller, request);
1794 }
1795
1796 @Override
1797 public AssignRegionResponse assignRegion(RpcController controller,
1798 AssignRegionRequest request) throws ServiceException {
1799 return stub.assignRegion(controller, request);
1800 }
1801
1802 @Override
1803 public UnassignRegionResponse unassignRegion(RpcController controller,
1804 UnassignRegionRequest request) throws ServiceException {
1805 return stub.unassignRegion(controller, request);
1806 }
1807
1808 @Override
1809 public OfflineRegionResponse offlineRegion(RpcController controller,
1810 OfflineRegionRequest request) throws ServiceException {
1811 return stub.offlineRegion(controller, request);
1812 }
1813
1814 @Override
1815 public DeleteTableResponse deleteTable(RpcController controller,
1816 DeleteTableRequest request) throws ServiceException {
1817 return stub.deleteTable(controller, request);
1818 }
1819
1820 @Override
1821 public TruncateTableResponse truncateTable(RpcController controller,
1822 TruncateTableRequest request) throws ServiceException {
1823 return stub.truncateTable(controller, request);
1824 }
1825
1826 @Override
1827 public EnableTableResponse enableTable(RpcController controller,
1828 EnableTableRequest request) throws ServiceException {
1829 return stub.enableTable(controller, request);
1830 }
1831
1832 @Override
1833 public DisableTableResponse disableTable(RpcController controller,
1834 DisableTableRequest request) throws ServiceException {
1835 return stub.disableTable(controller, request);
1836 }
1837
1838 @Override
1839 public ModifyTableResponse modifyTable(RpcController controller,
1840 ModifyTableRequest request) throws ServiceException {
1841 return stub.modifyTable(controller, request);
1842 }
1843
1844 @Override
1845 public CreateTableResponse createTable(RpcController controller,
1846 CreateTableRequest request) throws ServiceException {
1847 return stub.createTable(controller, request);
1848 }
1849
1850 @Override
1851 public ShutdownResponse shutdown(RpcController controller,
1852 ShutdownRequest request) throws ServiceException {
1853 return stub.shutdown(controller, request);
1854 }
1855
1856 @Override
1857 public StopMasterResponse stopMaster(RpcController controller,
1858 StopMasterRequest request) throws ServiceException {
1859 return stub.stopMaster(controller, request);
1860 }
1861
1862 @Override
1863 public BalanceResponse balance(RpcController controller,
1864 BalanceRequest request) throws ServiceException {
1865 return stub.balance(controller, request);
1866 }
1867
1868 @Override
1869 public SetBalancerRunningResponse setBalancerRunning(
1870 RpcController controller, SetBalancerRunningRequest request)
1871 throws ServiceException {
1872 return stub.setBalancerRunning(controller, request);
1873 }
1874
1875 @Override
1876 public NormalizeResponse normalize(RpcController controller,
1877 NormalizeRequest request) throws ServiceException {
1878 return stub.normalize(controller, request);
1879 }
1880
1881 @Override
1882 public SetNormalizerRunningResponse setNormalizerRunning(
1883 RpcController controller, SetNormalizerRunningRequest request)
1884 throws ServiceException {
1885 return stub.setNormalizerRunning(controller, request);
1886 }
1887
1888 @Override
1889 public RunCatalogScanResponse runCatalogScan(RpcController controller,
1890 RunCatalogScanRequest request) throws ServiceException {
1891 return stub.runCatalogScan(controller, request);
1892 }
1893
1894 @Override
1895 public EnableCatalogJanitorResponse enableCatalogJanitor(
1896 RpcController controller, EnableCatalogJanitorRequest request)
1897 throws ServiceException {
1898 return stub.enableCatalogJanitor(controller, request);
1899 }
1900
1901 @Override
1902 public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1903 RpcController controller, IsCatalogJanitorEnabledRequest request)
1904 throws ServiceException {
1905 return stub.isCatalogJanitorEnabled(controller, request);
1906 }
1907
1908 @Override
1909 public CoprocessorServiceResponse execMasterService(
1910 RpcController controller, CoprocessorServiceRequest request)
1911 throws ServiceException {
1912 return stub.execMasterService(controller, request);
1913 }
1914
1915 @Override
1916 public SnapshotResponse snapshot(RpcController controller,
1917 SnapshotRequest request) throws ServiceException {
1918 return stub.snapshot(controller, request);
1919 }
1920
1921 @Override
1922 public GetCompletedSnapshotsResponse getCompletedSnapshots(
1923 RpcController controller, GetCompletedSnapshotsRequest request)
1924 throws ServiceException {
1925 return stub.getCompletedSnapshots(controller, request);
1926 }
1927
1928 @Override
1929 public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1930 DeleteSnapshotRequest request) throws ServiceException {
1931 return stub.deleteSnapshot(controller, request);
1932 }
1933
1934 @Override
1935 public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1936 IsSnapshotDoneRequest request) throws ServiceException {
1937 return stub.isSnapshotDone(controller, request);
1938 }
1939
1940 @Override
1941 public RestoreSnapshotResponse restoreSnapshot(
1942 RpcController controller, RestoreSnapshotRequest request)
1943 throws ServiceException {
1944 return stub.restoreSnapshot(controller, request);
1945 }
1946
1947 @Override
1948 public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(
1949 RpcController controller, IsRestoreSnapshotDoneRequest request)
1950 throws ServiceException {
1951 return stub.isRestoreSnapshotDone(controller, request);
1952 }
1953
1954 @Override
1955 public ExecProcedureResponse execProcedure(
1956 RpcController controller, ExecProcedureRequest request)
1957 throws ServiceException {
1958 return stub.execProcedure(controller, request);
1959 }
1960
1961 @Override
1962 public ExecProcedureResponse execProcedureWithRet(
1963 RpcController controller, ExecProcedureRequest request)
1964 throws ServiceException {
1965 return stub.execProcedureWithRet(controller, request);
1966 }
1967
1968 @Override
1969 public IsProcedureDoneResponse isProcedureDone(RpcController controller,
1970 IsProcedureDoneRequest request) throws ServiceException {
1971 return stub.isProcedureDone(controller, request);
1972 }
1973
1974 @Override
1975 public GetProcedureResultResponse getProcedureResult(RpcController controller,
1976 GetProcedureResultRequest request) throws ServiceException {
1977 return stub.getProcedureResult(controller, request);
1978 }
1979
1980 @Override
1981 public IsMasterRunningResponse isMasterRunning(
1982 RpcController controller, IsMasterRunningRequest request)
1983 throws ServiceException {
1984 return stub.isMasterRunning(controller, request);
1985 }
1986
1987 @Override
1988 public ModifyNamespaceResponse modifyNamespace(RpcController controller,
1989 ModifyNamespaceRequest request)
1990 throws ServiceException {
1991 return stub.modifyNamespace(controller, request);
1992 }
1993
1994 @Override
1995 public CreateNamespaceResponse createNamespace(
1996 RpcController controller, CreateNamespaceRequest request) throws ServiceException {
1997 return stub.createNamespace(controller, request);
1998 }
1999
2000 @Override
2001 public DeleteNamespaceResponse deleteNamespace(
2002 RpcController controller, DeleteNamespaceRequest request) throws ServiceException {
2003 return stub.deleteNamespace(controller, request);
2004 }
2005
2006 @Override
2007 public GetNamespaceDescriptorResponse getNamespaceDescriptor(RpcController controller,
2008 GetNamespaceDescriptorRequest request) throws ServiceException {
2009 return stub.getNamespaceDescriptor(controller, request);
2010 }
2011
2012 @Override
2013 public ListNamespaceDescriptorsResponse listNamespaceDescriptors(RpcController controller,
2014 ListNamespaceDescriptorsRequest request) throws ServiceException {
2015 return stub.listNamespaceDescriptors(controller, request);
2016 }
2017
2018 @Override
2019 public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
2020 RpcController controller, ListTableDescriptorsByNamespaceRequest request)
2021 throws ServiceException {
2022 return stub.listTableDescriptorsByNamespace(controller, request);
2023 }
2024
2025 @Override
2026 public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
2027 RpcController controller, ListTableNamesByNamespaceRequest request)
2028 throws ServiceException {
2029 return stub.listTableNamesByNamespace(controller, request);
2030 }
2031
2032 @Override
2033 public void close() {
2034 release(this.mss);
2035 }
2036
2037 @Override
2038 public GetSchemaAlterStatusResponse getSchemaAlterStatus(
2039 RpcController controller, GetSchemaAlterStatusRequest request)
2040 throws ServiceException {
2041 return stub.getSchemaAlterStatus(controller, request);
2042 }
2043
2044 @Override
2045 public GetTableDescriptorsResponse getTableDescriptors(
2046 RpcController controller, GetTableDescriptorsRequest request)
2047 throws ServiceException {
2048 return stub.getTableDescriptors(controller, request);
2049 }
2050
2051 @Override
2052 public GetTableNamesResponse getTableNames(
2053 RpcController controller, GetTableNamesRequest request)
2054 throws ServiceException {
2055 return stub.getTableNames(controller, request);
2056 }
2057
2058 @Override
2059 public GetClusterStatusResponse getClusterStatus(
2060 RpcController controller, GetClusterStatusRequest request)
2061 throws ServiceException {
2062 return stub.getClusterStatus(controller, request);
2063 }
2064
2065 @Override
2066 public SetQuotaResponse setQuota(RpcController controller, SetQuotaRequest request)
2067 throws ServiceException {
2068 return stub.setQuota(controller, request);
2069 }
2070
2071 @Override
2072 public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
2073 RpcController controller, MajorCompactionTimestampRequest request)
2074 throws ServiceException {
2075 return stub.getLastMajorCompactionTimestamp(controller, request);
2076 }
2077
2078 @Override
2079 public MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
2080 RpcController controller, MajorCompactionTimestampForRegionRequest request)
2081 throws ServiceException {
2082 return stub.getLastMajorCompactionTimestampForRegion(controller, request);
2083 }
2084
2085 @Override
2086 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
2087 IsBalancerEnabledRequest request) throws ServiceException {
2088 return stub.isBalancerEnabled(controller, request);
2089 }
2090
2091 @Override
2092 public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
2093 IsNormalizerEnabledRequest request) throws ServiceException {
2094 return stub.isNormalizerEnabled(controller, request);
2095 }
2096
2097 @Override
2098 public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
2099 SecurityCapabilitiesRequest request) throws ServiceException {
2100 return stub.getSecurityCapabilities(controller, request);
2101 }
2102 };
2103 }
2104
2105
2106 private static void release(MasterServiceState mss) {
2107 if (mss != null && mss.connection != null) {
2108 ((HConnectionImplementation)mss.connection).releaseMaster(mss);
2109 }
2110 }
2111
2112 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
2113 if (mss.getStub() == null){
2114 return false;
2115 }
2116 try {
2117 return mss.isMasterRunning();
2118 } catch (UndeclaredThrowableException e) {
2119
2120
2121 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
2122 return false;
2123 } catch (ServiceException se) {
2124 LOG.warn("Checking master connection", se);
2125 return false;
2126 }
2127 }
2128
2129 void releaseMaster(MasterServiceState mss) {
2130 if (mss.getStub() == null) return;
2131 synchronized (masterAndZKLock) {
2132 --mss.userCount;
2133 }
2134 }
2135
2136 private void closeMasterService(MasterServiceState mss) {
2137 if (mss.getStub() != null) {
2138 LOG.info("Closing master protocol: " + mss);
2139 mss.clearStub();
2140 }
2141 mss.userCount = 0;
2142 }
2143
2144
2145
2146
2147
2148 private void closeMaster() {
2149 synchronized (masterAndZKLock) {
2150 closeMasterService(masterServiceState);
2151 }
2152 }
2153
2154 void updateCachedLocation(HRegionInfo hri, ServerName source,
2155 ServerName serverName, long seqNum) {
2156 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
2157 cacheLocation(hri.getTable(), source, newHrl);
2158 }
2159
2160 @Override
2161 public void deleteCachedRegionLocation(final HRegionLocation location) {
2162 metaCache.clearCache(location);
2163 }
2164
2165 @Override
2166 public void updateCachedLocations(final TableName tableName, byte[] rowkey,
2167 final Object exception, final HRegionLocation source) {
2168 assert source != null;
2169 updateCachedLocations(tableName, source.getRegionInfo().getRegionName()
2170 , rowkey, exception, source.getServerName());
2171 }
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181 @Override
2182 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2183 final Object exception, final ServerName source) {
2184 if (rowkey == null || tableName == null) {
2185 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
2186 ", tableName=" + (tableName == null ? "null" : tableName));
2187 return;
2188 }
2189
2190 if (source == null) {
2191
2192 return;
2193 }
2194
2195 if (regionName == null) {
2196
2197 metaCache.clearCache(tableName, rowkey, source);
2198 return;
2199 }
2200
2201
2202 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2203 HRegionLocation oldLocation = null;
2204 if (oldLocations != null) {
2205 oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2206 }
2207 if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2208
2209
2210 return;
2211 }
2212
2213 HRegionInfo regionInfo = oldLocation.getRegionInfo();
2214 Throwable cause = ClientExceptionsUtil.findException(exception);
2215 if (cause != null) {
2216 if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
2217
2218 return;
2219 }
2220
2221 if (cause instanceof RegionMovedException) {
2222 RegionMovedException rme = (RegionMovedException) cause;
2223 if (LOG.isTraceEnabled()) {
2224 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2225 rme.getHostname() + ":" + rme.getPort() +
2226 " according to " + source.getHostAndPort());
2227 }
2228
2229
2230 updateCachedLocation(
2231 regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2232 return;
2233 }
2234 }
2235
2236
2237
2238 metaCache.clearCache(regionInfo);
2239 }
2240
2241 @Override
2242 public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
2243 final Object exception, final HRegionLocation source) {
2244 updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
2245 }
2246
2247 @Override
2248 @Deprecated
2249 public void processBatch(List<? extends Row> list,
2250 final TableName tableName,
2251 ExecutorService pool,
2252 Object[] results) throws IOException, InterruptedException {
2253
2254
2255
2256 if (results.length != list.size()) {
2257 throw new IllegalArgumentException(
2258 "argument results must be the same size as argument list");
2259 }
2260 processBatchCallback(list, tableName, pool, results, null);
2261 }
2262
2263 @Override
2264 @Deprecated
2265 public void processBatch(List<? extends Row> list,
2266 final byte[] tableName,
2267 ExecutorService pool,
2268 Object[] results) throws IOException, InterruptedException {
2269 processBatch(list, TableName.valueOf(tableName), pool, results);
2270 }
2271
2272
2273
2274
2275
2276
2277
2278
2279 @Override
2280 @Deprecated
2281 public <R> void processBatchCallback(
2282 List<? extends Row> list,
2283 TableName tableName,
2284 ExecutorService pool,
2285 Object[] results,
2286 Batch.Callback<R> callback)
2287 throws IOException, InterruptedException {
2288
2289 AsyncRequestFuture ars = this.asyncProcess.submitAll(
2290 pool, tableName, list, callback, results);
2291 ars.waitUntilDone();
2292 if (ars.hasError()) {
2293 throw ars.getErrors();
2294 }
2295 }
2296
2297 @Override
2298 @Deprecated
2299 public <R> void processBatchCallback(
2300 List<? extends Row> list,
2301 byte[] tableName,
2302 ExecutorService pool,
2303 Object[] results,
2304 Batch.Callback<R> callback)
2305 throws IOException, InterruptedException {
2306 processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
2307 }
2308
2309
2310 protected AsyncProcess createAsyncProcess(Configuration conf) {
2311
2312 return new AsyncProcess(this, conf, this.batchPool,
2313 RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
2314 RpcControllerFactory.instantiate(conf));
2315 }
2316
2317 @Override
2318 public AsyncProcess getAsyncProcess() {
2319 return asyncProcess;
2320 }
2321
2322 @Override
2323 public ServerStatisticTracker getStatisticsTracker() {
2324 return this.stats;
2325 }
2326
2327 @Override
2328 public ClientBackoffPolicy getBackoffPolicy() {
2329 return this.backoffPolicy;
2330 }
2331
2332
2333
2334
2335
2336 @VisibleForTesting
2337 int getNumberOfCachedRegionLocations(final TableName tableName) {
2338 return metaCache.getNumberOfCachedRegionLocations(tableName);
2339 }
2340
2341 @Override
2342 @Deprecated
2343 public void setRegionCachePrefetch(final TableName tableName, final boolean enable) {
2344 }
2345
2346 @Override
2347 @Deprecated
2348 public void setRegionCachePrefetch(final byte[] tableName,
2349 final boolean enable) {
2350 }
2351
2352 @Override
2353 @Deprecated
2354 public boolean getRegionCachePrefetch(TableName tableName) {
2355 return false;
2356 }
2357
2358 @Override
2359 @Deprecated
2360 public boolean getRegionCachePrefetch(byte[] tableName) {
2361 return false;
2362 }
2363
2364 @Override
2365 public void abort(final String msg, Throwable t) {
2366 if (t instanceof KeeperException.SessionExpiredException
2367 && keepAliveZookeeper != null) {
2368 synchronized (masterAndZKLock) {
2369 if (keepAliveZookeeper != null) {
2370 LOG.warn("This client just lost it's session with ZooKeeper," +
2371 " closing it." +
2372 " It will be recreated next time someone needs it", t);
2373 closeZooKeeperWatcher();
2374 }
2375 }
2376 } else {
2377 if (t != null) {
2378 LOG.fatal(msg, t);
2379 } else {
2380 LOG.fatal(msg);
2381 }
2382 this.aborted = true;
2383 close();
2384 this.closed = true;
2385 }
2386 }
2387
2388 @Override
2389 public boolean isClosed() {
2390 return this.closed;
2391 }
2392
2393 @Override
2394 public boolean isAborted(){
2395 return this.aborted;
2396 }
2397
2398 @Override
2399 public int getCurrentNrHRS() throws IOException {
2400 return this.registry.getCurrentNrHRS();
2401 }
2402
2403
2404
2405
2406 void incCount() {
2407 ++refCount;
2408 }
2409
2410
2411
2412
2413 void decCount() {
2414 if (refCount > 0) {
2415 --refCount;
2416 }
2417 }
2418
2419
2420
2421
2422
2423
2424 boolean isZeroReference() {
2425 return refCount == 0;
2426 }
2427
2428 void internalClose() {
2429 if (this.closed) {
2430 return;
2431 }
2432 closeMaster();
2433 shutdownPools();
2434 if (this.metrics != null) {
2435 this.metrics.shutdown();
2436 }
2437 this.closed = true;
2438 closeZooKeeperWatcher();
2439 this.stubs.clear();
2440 if (clusterStatusListener != null) {
2441 clusterStatusListener.close();
2442 }
2443 if (rpcClient != null) {
2444 rpcClient.close();
2445 }
2446 }
2447
2448 @Override
2449 public void close() {
2450 if (managed) {
2451 if (aborted) {
2452 ConnectionManager.deleteStaleConnection(this);
2453 } else {
2454 ConnectionManager.deleteConnection(this, false);
2455 }
2456 } else {
2457 internalClose();
2458 }
2459 }
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472 @Override
2473 protected void finalize() throws Throwable {
2474 super.finalize();
2475
2476 refCount = 1;
2477 close();
2478 }
2479
2480
2481
2482
2483 @Deprecated
2484 @Override
2485 public HTableDescriptor[] listTables() throws IOException {
2486 MasterKeepAliveConnection master = getKeepAliveMasterService();
2487 try {
2488 GetTableDescriptorsRequest req =
2489 RequestConverter.buildGetTableDescriptorsRequest((List<TableName>)null);
2490 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2491 } catch (ServiceException se) {
2492 throw ProtobufUtil.getRemoteException(se);
2493 } finally {
2494 master.close();
2495 }
2496 }
2497
2498
2499
2500
2501 @Deprecated
2502 @Override
2503 public String[] getTableNames() throws IOException {
2504 TableName[] tableNames = listTableNames();
2505 String result[] = new String[tableNames.length];
2506 for (int i = 0; i < tableNames.length; i++) {
2507 result[i] = tableNames[i].getNameAsString();
2508 }
2509 return result;
2510 }
2511
2512
2513
2514
2515 @Deprecated
2516 @Override
2517 public TableName[] listTableNames() throws IOException {
2518 MasterKeepAliveConnection master = getKeepAliveMasterService();
2519 try {
2520 return ProtobufUtil.getTableNameArray(master.getTableNames(null,
2521 GetTableNamesRequest.newBuilder().build())
2522 .getTableNamesList());
2523 } catch (ServiceException se) {
2524 throw ProtobufUtil.getRemoteException(se);
2525 } finally {
2526 master.close();
2527 }
2528 }
2529
2530
2531
2532
2533 @Deprecated
2534 @Override
2535 public HTableDescriptor[] getHTableDescriptorsByTableName(
2536 List<TableName> tableNames) throws IOException {
2537 if (tableNames == null || tableNames.isEmpty()) return new HTableDescriptor[0];
2538 MasterKeepAliveConnection master = getKeepAliveMasterService();
2539 try {
2540 GetTableDescriptorsRequest req =
2541 RequestConverter.buildGetTableDescriptorsRequest(tableNames);
2542 return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(null, req));
2543 } catch (ServiceException se) {
2544 throw ProtobufUtil.getRemoteException(se);
2545 } finally {
2546 master.close();
2547 }
2548 }
2549
2550
2551
2552
2553 @Deprecated
2554 @Override
2555 public HTableDescriptor[] getHTableDescriptors(
2556 List<String> names) throws IOException {
2557 List<TableName> tableNames = new ArrayList<TableName>(names.size());
2558 for(String name : names) {
2559 tableNames.add(TableName.valueOf(name));
2560 }
2561
2562 return getHTableDescriptorsByTableName(tableNames);
2563 }
2564
2565 @Override
2566 public NonceGenerator getNonceGenerator() {
2567 return this.nonceGenerator;
2568 }
2569
2570
2571
2572
2573
2574
2575
2576
2577 @Deprecated
2578 @Override
2579 public HTableDescriptor getHTableDescriptor(final TableName tableName)
2580 throws IOException {
2581 if (tableName == null) return null;
2582 MasterKeepAliveConnection master = getKeepAliveMasterService();
2583 GetTableDescriptorsResponse htds;
2584 try {
2585 GetTableDescriptorsRequest req =
2586 RequestConverter.buildGetTableDescriptorsRequest(tableName);
2587 htds = master.getTableDescriptors(null, req);
2588 } catch (ServiceException se) {
2589 throw ProtobufUtil.getRemoteException(se);
2590 } finally {
2591 master.close();
2592 }
2593 if (!htds.getTableSchemaList().isEmpty()) {
2594 return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
2595 }
2596 throw new TableNotFoundException(tableName.getNameAsString());
2597 }
2598
2599
2600
2601
2602 @Deprecated
2603 @Override
2604 public HTableDescriptor getHTableDescriptor(final byte[] tableName)
2605 throws IOException {
2606 return getHTableDescriptor(TableName.valueOf(tableName));
2607 }
2608
2609 @Override
2610 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2611 return RpcRetryingCallerFactory
2612 .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2613 }
2614
2615 @Override
2616 public boolean isManaged() {
2617 return managed;
2618 }
2619
2620 @Override
2621 public boolean hasCellBlockSupport() {
2622 return this.rpcClient.hasCellBlockSupport();
2623 }
2624
2625 @Override
2626 public ConnectionConfiguration getConnectionConfiguration() {
2627 return this.connectionConfig;
2628 }
2629
2630 @Override
2631 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2632 return this.rpcCallerFactory;
2633 }
2634
2635 @Override
2636 public RpcControllerFactory getRpcControllerFactory() {
2637 return this.rpcControllerFactory;
2638 }
2639 }
2640
2641
2642
2643
2644 static class ServerErrorTracker {
2645
2646 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
2647 new ConcurrentHashMap<ServerName, ServerErrors>();
2648 private final long canRetryUntil;
2649 private final int maxRetries;
2650 private final long startTrackingTime;
2651
2652 public ServerErrorTracker(long timeout, int maxRetries) {
2653 this.maxRetries = maxRetries;
2654 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
2655 this.startTrackingTime = new Date().getTime();
2656 }
2657
2658
2659
2660
2661 boolean canRetryMore(int numRetry) {
2662
2663 return numRetry < maxRetries || (maxRetries > 1 &&
2664 EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
2665 }
2666
2667
2668
2669
2670
2671
2672
2673
2674 long calculateBackoffTime(ServerName server, long basePause) {
2675 long result;
2676 ServerErrors errorStats = errorsByServer.get(server);
2677 if (errorStats != null) {
2678 result = ConnectionUtils.getPauseTime(basePause, errorStats.retries.get());
2679 } else {
2680 result = 0;
2681 }
2682 return result;
2683 }
2684
2685
2686
2687
2688
2689
2690 void reportServerError(ServerName server) {
2691 ServerErrors errors = errorsByServer.get(server);
2692 if (errors != null) {
2693 errors.addError();
2694 } else {
2695 errors = errorsByServer.putIfAbsent(server, new ServerErrors());
2696 if (errors != null){
2697 errors.addError();
2698 }
2699 }
2700 }
2701
2702 long getStartTrackingTime() {
2703 return startTrackingTime;
2704 }
2705
2706
2707
2708
2709 private static class ServerErrors {
2710 public final AtomicInteger retries = new AtomicInteger(0);
2711
2712 public void addError() {
2713 retries.incrementAndGet();
2714 }
2715 }
2716 }
2717 }