1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentNavigableMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import java.util.concurrent.CopyOnWriteArrayList;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.ClockOutOfSyncException;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.NotServingRegionException;
44 import org.apache.hadoop.hbase.RegionLoad;
45 import org.apache.hadoop.hbase.Server;
46 import org.apache.hadoop.hbase.ServerLoad;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.YouAreDeadException;
49 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.client.ClusterConnection;
52 import org.apache.hadoop.hbase.client.ConnectionFactory;
53 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
54 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
55 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
57 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
58 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60 import org.apache.hadoop.hbase.protobuf.RequestConverter;
61 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
62 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
63 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
64 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
65 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
66 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
67 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
68 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
69 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
70 import org.apache.hadoop.hbase.regionserver.HRegionServer;
71 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
72 import org.apache.hadoop.hbase.util.Bytes;
73 import org.apache.hadoop.hbase.util.Triple;
74 import org.apache.hadoop.hbase.util.RetryCounter;
75 import org.apache.hadoop.hbase.util.RetryCounterFactory;
76 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
77 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
78 import org.apache.zookeeper.KeeperException;
79
80 import com.google.common.annotations.VisibleForTesting;
81 import com.google.protobuf.ByteString;
82 import com.google.protobuf.ServiceException;
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 @InterfaceAudience.Private
107 public class ServerManager {
108 public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
109 "hbase.master.wait.on.regionservers.maxtostart";
110
111 public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
112 "hbase.master.wait.on.regionservers.mintostart";
113
114 public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
115 "hbase.master.wait.on.regionservers.timeout";
116
117 public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
118 "hbase.master.wait.on.regionservers.interval";
119
120 private static final Log LOG = LogFactory.getLog(ServerManager.class);
121
122
123 private volatile boolean clusterShutdown = false;
124
125
126
127
128 private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
129 new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
130
131
132
133
134 private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
135 storeFlushedSequenceIdsByRegion =
136 new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
137
138
139 private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
140 new ConcurrentHashMap<ServerName, ServerLoad>();
141
142
143
144
145
146 private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
147 new HashMap<ServerName, AdminService.BlockingInterface>();
148
149
150
151
152
153 private final ArrayList<ServerName> drainingServers =
154 new ArrayList<ServerName>();
155
156 private final Server master;
157 private final MasterServices services;
158 private final ClusterConnection connection;
159
160 private final DeadServer deadservers = new DeadServer();
161
162 private final long maxSkew;
163 private final long warningSkew;
164
165 private final RetryCounterFactory pingRetryCounterFactory;
166 private final RpcControllerFactory rpcControllerFactory;
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201 private Map<ServerName, Boolean> requeuedDeadServers
202 = new ConcurrentHashMap<ServerName, Boolean>();
203
204
205 private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
206
207
208
209
210
211
212
213 public ServerManager(final Server master, final MasterServices services)
214 throws IOException {
215 this(master, services, true);
216 }
217
218 ServerManager(final Server master, final MasterServices services,
219 final boolean connect) throws IOException {
220 this.master = master;
221 this.services = services;
222 Configuration c = master.getConfiguration();
223 maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
224 warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
225 this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null;
226 int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
227 "hbase.master.maximum.ping.server.attempts", 10));
228 int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
229 "hbase.master.ping.server.retry.sleep.interval", 100));
230 this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
231 this.rpcControllerFactory = this.connection == null
232 ? null
233 : connection.getRpcControllerFactory();
234 }
235
236
237
238
239
240 public void registerListener(final ServerListener listener) {
241 this.listeners.add(listener);
242 }
243
244
245
246
247
248 public boolean unregisterListener(final ServerListener listener) {
249 return this.listeners.remove(listener);
250 }
251
252
253
254
255
256
257
258
259 ServerName regionServerStartup(RegionServerStartupRequest request, InetAddress ia)
260 throws IOException {
261
262
263
264
265
266
267
268
269 final String hostname = request.hasUseThisHostnameInstead() ?
270 request.getUseThisHostnameInstead() :ia.getHostName();
271 ServerName sn = ServerName.valueOf(hostname, request.getPort(),
272 request.getServerStartCode());
273 checkClockSkew(sn, request.getServerCurrentTime());
274 checkIsDead(sn, "STARTUP");
275 if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
276 LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
277 + " could not record the server: " + sn);
278 }
279 return sn;
280 }
281
282 private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId(
283 byte[] regionName) {
284 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
285 storeFlushedSequenceIdsByRegion.get(regionName);
286 if (storeFlushedSequenceId != null) {
287 return storeFlushedSequenceId;
288 }
289 storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
290 ConcurrentNavigableMap<byte[], Long> alreadyPut =
291 storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId);
292 return alreadyPut == null ? storeFlushedSequenceId : alreadyPut;
293 }
294
295
296
297
298
299 private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
300 Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
301 for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
302 byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
303 Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
304 long l = entry.getValue().getCompleteSequenceId();
305
306 if (LOG.isTraceEnabled()) {
307 LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
308 ", completeSequenceId=" + l);
309 }
310 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
311 flushedSequenceIdByRegion.put(encodedRegionName, l);
312 } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
313 LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
314 + l + ") that is less than the previous last flushed sequence id ("
315 + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
316 }
317 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
318 getOrCreateStoreFlushedSequenceId(encodedRegionName);
319 for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) {
320 byte[] family = storeSeqId.getFamilyName().toByteArray();
321 existingValue = storeFlushedSequenceId.get(family);
322 l = storeSeqId.getSequenceId();
323 if (LOG.isTraceEnabled()) {
324 LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
325 ", existingValue=" + existingValue + ", completeSequenceId=" + l);
326 }
327
328 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
329 storeFlushedSequenceId.put(family, l);
330 }
331 }
332 }
333 }
334
335 void regionServerReport(ServerName sn,
336 ServerLoad sl) throws YouAreDeadException {
337 checkIsDead(sn, "REPORT");
338 if (null == this.onlineServers.replace(sn, sl)) {
339
340
341
342
343
344
345 if (!checkAndRecordNewServer(sn, sl)) {
346 LOG.info("RegionServerReport ignored, could not record the server: " + sn);
347 return;
348 }
349 }
350 updateLastFlushedSequenceIds(sn, sl);
351 }
352
353
354
355
356
357
358
359
360
361 boolean checkAndRecordNewServer(
362 final ServerName serverName, final ServerLoad sl) {
363 ServerName existingServer = null;
364 synchronized (this.onlineServers) {
365 existingServer = findServerWithSameHostnamePortWithLock(serverName);
366 if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
367 LOG.info("Server serverName=" + serverName + " rejected; we already have "
368 + existingServer.toString() + " registered with same hostname and port");
369 return false;
370 }
371 recordNewServerWithLock(serverName, sl);
372 }
373
374
375 if (!this.listeners.isEmpty()) {
376 for (ServerListener listener : this.listeners) {
377 listener.serverAdded(serverName);
378 }
379 }
380
381
382
383 if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
384 LOG.info("Triggering server recovery; existingServer " +
385 existingServer + " looks stale, new server:" + serverName);
386 expireServer(existingServer);
387 }
388 return true;
389 }
390
391
392
393
394
395
396
397
398
399 private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
400 throws ClockOutOfSyncException {
401 long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
402 if (skew > maxSkew) {
403 String message = "Server " + serverName + " has been " +
404 "rejected; Reported time is too far out of sync with master. " +
405 "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
406 LOG.warn(message);
407 throw new ClockOutOfSyncException(message);
408 } else if (skew > warningSkew){
409 String message = "Reported time for server " + serverName + " is out of sync with master " +
410 "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
411 "error threshold is " + maxSkew + "ms)";
412 LOG.warn(message);
413 }
414 }
415
416
417
418
419
420
421
422
423
424 private void checkIsDead(final ServerName serverName, final String what)
425 throws YouAreDeadException {
426 if (this.deadservers.isDeadServer(serverName)) {
427
428
429 String message = "Server " + what + " rejected; currently processing " +
430 serverName + " as dead server";
431 LOG.debug(message);
432 throw new YouAreDeadException(message);
433 }
434
435
436 if ((this.services == null || ((HMaster) this.services).isInitialized())
437 && this.deadservers.cleanPreviousInstance(serverName)) {
438
439
440 LOG.debug(what + ":" + " Server " + serverName + " came back up," +
441 " removed it from the dead servers list");
442 }
443 }
444
445
446
447
448
449 private ServerName findServerWithSameHostnamePortWithLock(
450 final ServerName serverName) {
451 for (ServerName sn: this.onlineServers.keySet()) {
452 if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
453 }
454 return null;
455 }
456
457
458
459
460
461
462
463 @VisibleForTesting
464 void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
465 LOG.info("Registering server=" + serverName);
466 this.onlineServers.put(serverName, sl);
467 this.rsAdmins.remove(serverName);
468 }
469
470 public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
471 RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
472 Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
473 builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
474 Map<byte[], Long> storeFlushedSequenceId =
475 storeFlushedSequenceIdsByRegion.get(encodedRegionName);
476 if (storeFlushedSequenceId != null) {
477 for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
478 builder.addStoreSequenceId(StoreSequenceId.newBuilder()
479 .setFamilyName(ByteString.copyFrom(entry.getKey()))
480 .setSequenceId(entry.getValue().longValue()).build());
481 }
482 }
483 return builder.build();
484 }
485
486
487
488
489
490 public ServerLoad getLoad(final ServerName serverName) {
491 return this.onlineServers.get(serverName);
492 }
493
494
495
496
497
498
499
500 public double getAverageLoad() {
501 int totalLoad = 0;
502 int numServers = 0;
503 for (ServerLoad sl: this.onlineServers.values()) {
504 numServers++;
505 totalLoad += sl.getNumberOfRegions();
506 }
507 return numServers == 0 ? 0 :
508 (double)totalLoad / (double)numServers;
509 }
510
511
512 public int countOfRegionServers() {
513
514 return this.onlineServers.size();
515 }
516
517
518
519
520 public Map<ServerName, ServerLoad> getOnlineServers() {
521
522 synchronized (this.onlineServers) {
523 return Collections.unmodifiableMap(this.onlineServers);
524 }
525 }
526
527
528 public DeadServer getDeadServers() {
529 return this.deadservers;
530 }
531
532
533
534
535
536 public boolean areDeadServersInProgress() {
537 return this.deadservers.areDeadServersInProgress();
538 }
539
540 void letRegionServersShutdown() {
541 long previousLogTime = 0;
542 ServerName sn = master.getServerName();
543 ZooKeeperWatcher zkw = master.getZooKeeper();
544 int onlineServersCt;
545 while ((onlineServersCt = onlineServers.size()) > 0){
546
547 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
548 Set<ServerName> remainingServers = onlineServers.keySet();
549 synchronized (onlineServers) {
550 if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
551
552 return;
553 }
554 }
555 StringBuilder sb = new StringBuilder();
556
557 for (ServerName key : remainingServers) {
558 if (sb.length() > 0) {
559 sb.append(", ");
560 }
561 sb.append(key);
562 }
563 LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
564 previousLogTime = System.currentTimeMillis();
565 }
566
567 try {
568 List<String> servers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
569 if (servers == null || servers.size() == 0 || (servers.size() == 1
570 && servers.contains(sn.toString()))) {
571 LOG.info("ZK shows there is only the master self online, exiting now");
572
573 break;
574 }
575 } catch (KeeperException ke) {
576 LOG.warn("Failed to list regionservers", ke);
577
578 break;
579 }
580 synchronized (onlineServers) {
581 try {
582 if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
583 } catch (InterruptedException ignored) {
584
585 }
586 }
587 }
588 }
589
590
591
592
593
594 public synchronized void expireServer(final ServerName serverName) {
595 if (serverName.equals(master.getServerName())) {
596 if (!(master.isAborted() || master.isStopped())) {
597 master.stop("We lost our znode?");
598 }
599 return;
600 }
601 if (!services.isServerCrashProcessingEnabled()) {
602 LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
603 + "delay expiring server " + serverName);
604 this.queuedDeadServers.add(serverName);
605 return;
606 }
607 if (this.deadservers.isDeadServer(serverName)) {
608
609 LOG.warn("Expiration of " + serverName +
610 " but server shutdown already in progress");
611 return;
612 }
613 moveFromOnelineToDeadServers(serverName);
614
615
616
617 if (this.clusterShutdown) {
618 LOG.info("Cluster shutdown set; " + serverName +
619 " expired; onlineServers=" + this.onlineServers.size());
620 if (this.onlineServers.isEmpty()) {
621 master.stop("Cluster shutdown set; onlineServer=0");
622 }
623 return;
624 }
625
626 boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName) ==
627 AssignmentManager.ServerHostRegion.HOSTING_REGION;
628 this.services.getMasterProcedureExecutor().
629 submitProcedure(new ServerCrashProcedure(serverName, true, carryingMeta));
630 LOG.debug("Added=" + serverName +
631 " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
632
633
634 if (!this.listeners.isEmpty()) {
635 for (ServerListener listener : this.listeners) {
636 listener.serverRemoved(serverName);
637 }
638 }
639 }
640
641 @VisibleForTesting
642 public void moveFromOnelineToDeadServers(final ServerName sn) {
643 synchronized (onlineServers) {
644 if (!this.onlineServers.containsKey(sn)) {
645 LOG.warn("Expiration of " + sn + " but server not online");
646 }
647
648
649
650 this.deadservers.add(sn);
651 this.onlineServers.remove(sn);
652 onlineServers.notifyAll();
653 }
654 this.rsAdmins.remove(sn);
655 }
656
657 public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
658
659
660
661
662
663
664
665
666 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
667 requeuedDeadServers.put(serverName, shouldSplitWal);
668 return;
669 }
670
671 this.deadservers.add(serverName);
672 this.services.getMasterProcedureExecutor().
673 submitProcedure(new ServerCrashProcedure(serverName, shouldSplitWal, false));
674 }
675
676
677
678
679
680 synchronized void processQueuedDeadServers() {
681 if (!services.isServerCrashProcessingEnabled()) {
682 LOG.info("Master hasn't enabled ServerShutdownHandler");
683 }
684 Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
685 while (serverIterator.hasNext()) {
686 ServerName tmpServerName = serverIterator.next();
687 expireServer(tmpServerName);
688 serverIterator.remove();
689 requeuedDeadServers.remove(tmpServerName);
690 }
691
692 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
693 LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
694 }
695
696 for(ServerName tmpServerName : requeuedDeadServers.keySet()){
697 processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
698 }
699 requeuedDeadServers.clear();
700 }
701
702
703
704
705 public boolean removeServerFromDrainList(final ServerName sn) {
706
707
708
709 if (!this.isServerOnline(sn)) {
710 LOG.warn("Server " + sn + " is not currently online. " +
711 "Removing from draining list anyway, as requested.");
712 }
713
714 return this.drainingServers.remove(sn);
715 }
716
717
718
719
720 public boolean addServerToDrainList(final ServerName sn) {
721
722
723
724 if (!this.isServerOnline(sn)) {
725 LOG.warn("Server " + sn + " is not currently online. " +
726 "Ignoring request to add it to draining list.");
727 return false;
728 }
729
730
731 if (this.drainingServers.contains(sn)) {
732 LOG.warn("Server " + sn + " is already in the draining server list." +
733 "Ignoring request to add it again.");
734 return false;
735 }
736 return this.drainingServers.add(sn);
737 }
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752 public RegionOpeningState sendRegionOpen(final ServerName server,
753 HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes)
754 throws IOException {
755 AdminService.BlockingInterface admin = getRsAdmin(server);
756 if (admin == null) {
757 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
758 " failed because no RPC connection found to this server");
759 return RegionOpeningState.FAILED_OPENING;
760 }
761 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
762 region, versionOfOfflineNode, favoredNodes,
763 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
764 try {
765 OpenRegionResponse response = admin.openRegion(null, request);
766 return ResponseConverter.getRegionOpeningState(response);
767 } catch (ServiceException se) {
768 throw ProtobufUtil.getRemoteException(se);
769 }
770 }
771
772
773
774
775
776
777
778
779
780
781 public List<RegionOpeningState> sendRegionOpen(ServerName server,
782 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos)
783 throws IOException {
784 AdminService.BlockingInterface admin = getRsAdmin(server);
785 if (admin == null) {
786 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
787 " failed because no RPC connection found to this server");
788 return null;
789 }
790
791 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
792 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
793 try {
794 OpenRegionResponse response = admin.openRegion(null, request);
795 return ResponseConverter.getRegionOpeningStateList(response);
796 } catch (ServiceException se) {
797 throw ProtobufUtil.getRemoteException(se);
798 }
799 }
800
801 private PayloadCarryingRpcController newRpcController() {
802 return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
803 }
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819 public boolean sendRegionClose(ServerName server, HRegionInfo region,
820 int versionOfClosingNode, ServerName dest, boolean transitionInZK) throws IOException {
821 if (server == null) throw new NullPointerException("Passed server is null");
822 AdminService.BlockingInterface admin = getRsAdmin(server);
823 if (admin == null) {
824 throw new IOException("Attempting to send CLOSE RPC to server " +
825 server.toString() + " for region " +
826 region.getRegionNameAsString() +
827 " failed because no RPC connection found to this server");
828 }
829 PayloadCarryingRpcController controller = newRpcController();
830 return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(),
831 versionOfClosingNode, dest, transitionInZK);
832 }
833
834 public boolean sendRegionClose(ServerName server,
835 HRegionInfo region, int versionOfClosingNode) throws IOException {
836 return sendRegionClose(server, region, versionOfClosingNode, null, true);
837 }
838
839
840
841
842
843
844
845
846
847 public void sendRegionWarmup(ServerName server,
848 HRegionInfo region) {
849 if (server == null) return;
850 try {
851 AdminService.BlockingInterface admin = getRsAdmin(server);
852 PayloadCarryingRpcController controller = newRpcController();
853 ProtobufUtil.warmupRegion(controller, admin, region);
854 } catch (IOException e) {
855 LOG.error("Received exception in RPC for warmup server:" +
856 server + "region: " + region +
857 "exception: " + e);
858 }
859 }
860
861
862
863
864
865 public static void closeRegionSilentlyAndWait(ClusterConnection connection,
866 ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException {
867 AdminService.BlockingInterface rs = connection.getAdmin(server);
868 PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController();
869 try {
870 ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName(), false);
871 } catch (IOException e) {
872 LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
873 }
874 long expiration = timeout + System.currentTimeMillis();
875 while (System.currentTimeMillis() < expiration) {
876 try {
877 HRegionInfo rsRegion =
878 ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
879 if (rsRegion == null) return;
880 } catch (IOException ioe) {
881 if (ioe instanceof NotServingRegionException)
882 return;
883 LOG.warn("Exception when retrieving regioninfo from: "
884 + region.getRegionNameAsString(), ioe);
885 }
886 Thread.sleep(1000);
887 }
888 throw new IOException("Region " + region + " failed to close within"
889 + " timeout " + timeout);
890 }
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905 public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
906 HRegionInfo region_b, boolean forcible) throws IOException {
907 if (server == null)
908 throw new NullPointerException("Passed server is null");
909 if (region_a == null || region_b == null)
910 throw new NullPointerException("Passed region is null");
911 AdminService.BlockingInterface admin = getRsAdmin(server);
912 if (admin == null) {
913 throw new IOException("Attempting to send MERGE REGIONS RPC to server "
914 + server.toString() + " for region "
915 + region_a.getRegionNameAsString() + ","
916 + region_b.getRegionNameAsString()
917 + " failed because no RPC connection found to this server");
918 }
919 PayloadCarryingRpcController controller = newRpcController();
920 ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible);
921 }
922
923
924
925
926 public boolean isServerReachable(ServerName server) {
927 if (server == null) throw new NullPointerException("Passed server is null");
928
929
930 RetryCounter retryCounter = pingRetryCounterFactory.create();
931 while (retryCounter.shouldRetry()) {
932 synchronized (this.onlineServers) {
933 if (this.deadservers.isDeadServer(server)) {
934 return false;
935 }
936 }
937 try {
938 PayloadCarryingRpcController controller = newRpcController();
939 AdminService.BlockingInterface admin = getRsAdmin(server);
940 if (admin != null) {
941 ServerInfo info = ProtobufUtil.getServerInfo(controller, admin);
942 return info != null && info.hasServerName()
943 && server.getStartcode() == info.getServerName().getStartCode();
944 }
945 } catch (IOException ioe) {
946 if (LOG.isDebugEnabled()) {
947 LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes() + " of "
948 + retryCounter.getMaxAttempts(), ioe);
949 }
950 try {
951 retryCounter.sleepUntilNextRetry();
952 } catch(InterruptedException ie) {
953 Thread.currentThread().interrupt();
954 break;
955 }
956 }
957 }
958 return false;
959 }
960
961
962
963
964
965
966
967 private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
968 throws IOException {
969 AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
970 if (admin == null) {
971 LOG.debug("New admin connection to " + sn.toString());
972 if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
973
974 admin = ((HRegionServer)master).getRSRpcServices();
975 } else {
976 admin = this.connection.getAdmin(sn);
977 }
978 this.rsAdmins.put(sn, admin);
979 }
980 return admin;
981 }
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996 public void waitForRegionServers(MonitoredTask status)
997 throws InterruptedException {
998 final long interval = this.master.getConfiguration().
999 getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
1000 final long timeout = this.master.getConfiguration().
1001 getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
1002 int defaultMinToStart = 1;
1003 if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
1004
1005
1006
1007
1008 defaultMinToStart = 2;
1009 }
1010 int minToStart = this.master.getConfiguration().
1011 getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, defaultMinToStart);
1012 if (minToStart < 1) {
1013 LOG.warn(String.format(
1014 "The value of '%s' (%d) can not be less than 1, ignoring.",
1015 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1016 minToStart = 1;
1017 }
1018 int maxToStart = this.master.getConfiguration().
1019 getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
1020 if (maxToStart < minToStart) {
1021 LOG.warn(String.format(
1022 "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
1023 WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
1024 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1025 maxToStart = Integer.MAX_VALUE;
1026 }
1027
1028 long now = System.currentTimeMillis();
1029 final long startTime = now;
1030 long slept = 0;
1031 long lastLogTime = 0;
1032 long lastCountChange = startTime;
1033 int count = countOfRegionServers();
1034 int oldCount = 0;
1035 while (!this.master.isStopped() && count < maxToStart
1036 && (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
1037
1038 if (oldCount != count || lastLogTime+interval < now){
1039 lastLogTime = now;
1040 String msg =
1041 "Waiting for region servers count to settle; currently"+
1042 " checked in " + count + ", slept for " + slept + " ms," +
1043 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
1044 ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
1045 LOG.info(msg);
1046 status.setStatus(msg);
1047 }
1048
1049
1050 final long sleepTime = 50;
1051 Thread.sleep(sleepTime);
1052 now = System.currentTimeMillis();
1053 slept = now - startTime;
1054
1055 oldCount = count;
1056 count = countOfRegionServers();
1057 if (count != oldCount) {
1058 lastCountChange = now;
1059 }
1060 }
1061
1062 LOG.info("Finished waiting for region servers count to settle;" +
1063 " checked in " + count + ", slept for " + slept + " ms," +
1064 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
1065 " master is "+ (this.master.isStopped() ? "stopped.": "running")
1066 );
1067 }
1068
1069
1070
1071
1072 public List<ServerName> getOnlineServersList() {
1073
1074
1075 return new ArrayList<ServerName>(this.onlineServers.keySet());
1076 }
1077
1078
1079
1080
1081 public List<ServerName> getDrainingServersList() {
1082 return new ArrayList<ServerName>(this.drainingServers);
1083 }
1084
1085
1086
1087
1088 Set<ServerName> getDeadNotExpiredServers() {
1089 return new HashSet<ServerName>(this.queuedDeadServers);
1090 }
1091
1092
1093
1094
1095
1096
1097 void removeRequeuedDeadServers() {
1098 requeuedDeadServers.clear();
1099 }
1100
1101
1102
1103
1104
1105 Map<ServerName, Boolean> getRequeuedDeadServers() {
1106 return Collections.unmodifiableMap(this.requeuedDeadServers);
1107 }
1108
1109 public boolean isServerOnline(ServerName serverName) {
1110 return serverName != null && onlineServers.containsKey(serverName);
1111 }
1112
1113
1114
1115
1116
1117
1118
1119 public synchronized boolean isServerDead(ServerName serverName) {
1120 return serverName == null || deadservers.isDeadServer(serverName)
1121 || queuedDeadServers.contains(serverName)
1122 || requeuedDeadServers.containsKey(serverName);
1123 }
1124
1125 public void shutdownCluster() {
1126 this.clusterShutdown = true;
1127 this.master.stop("Cluster shutdown requested");
1128 }
1129
1130 public boolean isClusterShutdown() {
1131 return this.clusterShutdown;
1132 }
1133
1134
1135
1136
1137 public void stop() {
1138 if (connection != null) {
1139 try {
1140 connection.close();
1141 } catch (IOException e) {
1142 LOG.error("Attempt to close connection to master failed", e);
1143 }
1144 }
1145 }
1146
1147
1148
1149
1150
1151
1152 public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
1153 final List<ServerName> destServers = getOnlineServersList();
1154
1155 if (serverToExclude != null){
1156 destServers.remove(serverToExclude);
1157 }
1158
1159
1160 final List<ServerName> drainingServersCopy = getDrainingServersList();
1161 if (!drainingServersCopy.isEmpty()) {
1162 for (final ServerName server: drainingServersCopy) {
1163 destServers.remove(server);
1164 }
1165 }
1166
1167
1168 removeDeadNotExpiredServers(destServers);
1169 return destServers;
1170 }
1171
1172
1173
1174
1175 public List<ServerName> createDestinationServersList(){
1176 return createDestinationServersList(null);
1177 }
1178
1179
1180
1181
1182
1183
1184
1185 void removeDeadNotExpiredServers(List<ServerName> servers) {
1186 Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1187 if (!deadNotExpiredServersCopy.isEmpty()) {
1188 for (ServerName server : deadNotExpiredServersCopy) {
1189 LOG.debug("Removing dead but not expired server: " + server
1190 + " from eligible server pool.");
1191 servers.remove(server);
1192 }
1193 }
1194 }
1195
1196
1197
1198
1199 void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1200 for (ServerName serverName : getOnlineServersList()) {
1201 deadservers.cleanAllPreviousInstances(serverName);
1202 }
1203 }
1204 }