View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeMap;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.CoordinatedStateException;
52  import org.apache.hadoop.hbase.HBaseIOException;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HRegionLocation;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.MetaTableAccessor;
58  import org.apache.hadoop.hbase.NotServingRegionException;
59  import org.apache.hadoop.hbase.RegionLocations;
60  import org.apache.hadoop.hbase.RegionStateListener;
61  import org.apache.hadoop.hbase.RegionTransition;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.TableStateManager;
66  import org.apache.hadoop.hbase.classification.InterfaceAudience;
67  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
70  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71  import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
72  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
73  import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
74  import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
75  import org.apache.hadoop.hbase.exceptions.DeserializationException;
76  import org.apache.hadoop.hbase.executor.EventHandler;
77  import org.apache.hadoop.hbase.executor.EventType;
78  import org.apache.hadoop.hbase.executor.ExecutorService;
79  import org.apache.hadoop.hbase.ipc.FailedServerException;
80  import org.apache.hadoop.hbase.ipc.RpcClient;
81  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
82  import org.apache.hadoop.hbase.master.RegionState.State;
83  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
84  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
85  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
86  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
87  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
88  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
89  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
90  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
91  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
92  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
93  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
94  import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
95  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
96  import org.apache.hadoop.hbase.util.ConfigUtil;
97  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
98  import org.apache.hadoop.hbase.util.FSUtils;
99  import org.apache.hadoop.hbase.util.KeyLocker;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.hbase.util.PairOfSameType;
102 import org.apache.hadoop.hbase.util.Threads;
103 import org.apache.hadoop.hbase.util.Triple;
104 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
105 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
106 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
107 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
109 import org.apache.hadoop.ipc.RemoteException;
110 import org.apache.hadoop.util.StringUtils;
111 import org.apache.zookeeper.AsyncCallback;
112 import org.apache.zookeeper.KeeperException;
113 import org.apache.zookeeper.KeeperException.NoNodeException;
114 import org.apache.zookeeper.KeeperException.NodeExistsException;
115 import org.apache.zookeeper.data.Stat;
116 
117 import com.google.common.annotations.VisibleForTesting;
118 import com.google.common.collect.LinkedHashMultimap;
119 
120 /**
121  * Manages and performs region assignment.
122  * <p>
123  * Monitors ZooKeeper for events related to regions in transition.
124  * <p>
125  * Handles existing regions in transition during master failover.
126  */
127 @InterfaceAudience.Private
128 public class AssignmentManager extends ZooKeeperListener {
129   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
130 
131   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
132       -1, -1L);
133 
134   static final String ALREADY_IN_TRANSITION_WAITTIME
135     = "hbase.assignment.already.intransition.waittime";
136   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
137 
138   protected final MasterServices server;
139 
140   private ServerManager serverManager;
141 
142   private boolean shouldAssignRegionsWithFavoredNodes;
143 
144   private LoadBalancer balancer;
145 
146   private final MetricsAssignmentManager metricsAssignmentManager;
147 
148   private final TableLockManager tableLockManager;
149 
150   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
151 
152   final private KeyLocker<String> locker = new KeyLocker<String>();
153 
154   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
155 
156   /**
157    * Map of regions to reopen after the schema of a table is changed. Key -
158    * encoded region name, value - HRegionInfo
159    */
160   private final Map <String, HRegionInfo> regionsToReopen;
161 
162   /*
163    * Maximum times we recurse an assignment/unassignment.
164    * See below in {@link #assign()} and {@link #unassign()}.
165    */
166   private final int maximumAttempts;
167 
168   /**
169    * Map of two merging regions from the region to be created.
170    */
171   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
172     = new HashMap<String, PairOfSameType<HRegionInfo>>();
173 
174   private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
175   = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
176 
177   /**
178    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
179    * failure due to lack of availability of region plan or bad region plan
180    */
181   private final long sleepTimeBeforeRetryingMetaAssignment;
182 
183   /** Plans for region movement. Key is the encoded version of a region name*/
184   // TODO: When do plans get cleaned out?  Ever? In server open and in server
185   // shutdown processing -- St.Ack
186   // All access to this Map must be synchronized.
187   final NavigableMap<String, RegionPlan> regionPlans =
188     new TreeMap<String, RegionPlan>();
189 
190   private final TableStateManager tableStateManager;
191 
192   private final ExecutorService executorService;
193 
194   // For unit tests, keep track of calls to ClosedRegionHandler
195   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
196 
197   // For unit tests, keep track of calls to OpenedRegionHandler
198   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
199 
200   //Thread pool executor service for timeout monitor
201   private java.util.concurrent.ExecutorService threadPoolExecutorService;
202 
203   // A bunch of ZK events workers. Each is a single thread executor service
204   private final java.util.concurrent.ExecutorService zkEventWorkers;
205 
206   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
207       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
208 
209   private final RegionStates regionStates;
210 
211   // The threshold to use bulk assigning. Using bulk assignment
212   // only if assigning at least this many regions to at least this
213   // many servers. If assigning fewer regions to fewer servers,
214   // bulk assigning may be not as efficient.
215   private final int bulkAssignThresholdRegions;
216   private final int bulkAssignThresholdServers;
217   private final int bulkPerRegionOpenTimeGuesstimate;
218 
219   // Should bulk assignment wait till all regions are assigned,
220   // or it is timed out?  This is useful to measure bulk assignment
221   // performance, but not needed in most use cases.
222   private final boolean bulkAssignWaitTillAllAssigned;
223 
224   /**
225    * Indicator that AssignmentManager has recovered the region states so
226    * that ServerShutdownHandler can be fully enabled and re-assign regions
227    * of dead servers. So that when re-assignment happens, AssignmentManager
228    * has proper region states.
229    *
230    * Protected to ease testing.
231    */
232   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
233 
234   /**
235    * A map to track the count a region fails to open in a row.
236    * So that we don't try to open a region forever if the failure is
237    * unrecoverable.  We don't put this information in region states
238    * because we don't expect this to happen frequently; we don't
239    * want to copy this information over during each state transition either.
240    */
241   private final ConcurrentHashMap<String, AtomicInteger>
242     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
243 
244   // A flag to indicate if we are using ZK for region assignment
245   private final boolean useZKForAssignment;
246 
247   // In case not using ZK for region assignment, region states
248   // are persisted in meta with a state store
249   private final RegionStateStore regionStateStore;
250 
251   /**
252    * For testing only!  Set to true to skip handling of split.
253    */
254   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
255   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
256 
257   /** Listeners that are called on assignment events. */
258   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
259 
260   private RegionStateListener regionStateListener;
261 
262   public enum ServerHostRegion {
263     NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
264   }
265 
266   /**
267    * Constructs a new assignment manager.
268    *
269    * @param server instance of HMaster this AM running inside
270    * @param serverManager serverManager for associated HMaster
271    * @param balancer implementation of {@link LoadBalancer}
272    * @param service Executor service
273    * @param metricsMaster metrics manager
274    * @param tableLockManager TableLock manager
275    * @throws KeeperException
276    * @throws IOException
277    */
278   public AssignmentManager(MasterServices server, ServerManager serverManager,
279       final LoadBalancer balancer,
280       final ExecutorService service, MetricsMaster metricsMaster,
281       final TableLockManager tableLockManager) throws KeeperException,
282         IOException, CoordinatedStateException {
283     super(server.getZooKeeper());
284     this.server = server;
285     this.serverManager = serverManager;
286     this.executorService = service;
287     this.regionStateStore = new RegionStateStore(server);
288     this.regionsToReopen = Collections.synchronizedMap
289                            (new HashMap<String, HRegionInfo> ());
290     Configuration conf = server.getConfiguration();
291     // Only read favored nodes if using the favored nodes load balancer.
292     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
293            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
294            FavoredNodeLoadBalancer.class);
295     try {
296       if (server.getCoordinatedStateManager() != null) {
297         this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
298       } else {
299         this.tableStateManager = null;
300       }
301     } catch (InterruptedException e) {
302       throw new InterruptedIOException();
303     }
304     // This is the max attempts, not retries, so it should be at least 1.
305     this.maximumAttempts = Math.max(1,
306       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
307     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
308         "hbase.meta.assignment.retry.sleeptime", 1000l);
309     this.balancer = balancer;
310     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
311     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
312       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
313     this.regionStates = new RegionStates(
314       server, tableStateManager, serverManager, regionStateStore);
315 
316     this.bulkAssignWaitTillAllAssigned =
317       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
318     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
319     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
320     this.bulkPerRegionOpenTimeGuesstimate =
321       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
322 
323     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
324     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
325     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
326             TimeUnit.SECONDS, threadFactory);
327     this.tableLockManager = tableLockManager;
328 
329     this.metricsAssignmentManager = new MetricsAssignmentManager();
330     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
331   }
332 
333   /**
334    * Add the listener to the notification list.
335    * @param listener The AssignmentListener to register
336    */
337   public void registerListener(final AssignmentListener listener) {
338     this.listeners.add(listener);
339   }
340 
341   /**
342    * Remove the listener from the notification list.
343    * @param listener The AssignmentListener to unregister
344    */
345   public boolean unregisterListener(final AssignmentListener listener) {
346     return this.listeners.remove(listener);
347   }
348 
349   /**
350    * @return Instance of ZKTableStateManager.
351    */
352   public TableStateManager getTableStateManager() {
353     // These are 'expensive' to make involving trip to zk ensemble so allow
354     // sharing.
355     return this.tableStateManager;
356   }
357 
358   /**
359    * This SHOULD not be public. It is public now
360    * because of some unit tests.
361    *
362    * TODO: make it package private and keep RegionStates in the master package
363    */
364   public RegionStates getRegionStates() {
365     return regionStates;
366   }
367 
368   /**
369    * Used in some tests to mock up region state in meta
370    */
371   @VisibleForTesting
372   RegionStateStore getRegionStateStore() {
373     return regionStateStore;
374   }
375 
376   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
377     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
378   }
379 
380   /**
381    * Add a regionPlan for the specified region.
382    * @param encodedName
383    * @param plan
384    */
385   public void addPlan(String encodedName, RegionPlan plan) {
386     synchronized (regionPlans) {
387       regionPlans.put(encodedName, plan);
388     }
389   }
390 
391   /**
392    * Add a map of region plans.
393    */
394   public void addPlans(Map<String, RegionPlan> plans) {
395     synchronized (regionPlans) {
396       regionPlans.putAll(plans);
397     }
398   }
399 
400   /**
401    * Set the list of regions that will be reopened
402    * because of an update in table schema
403    *
404    * @param regions
405    *          list of regions that should be tracked for reopen
406    */
407   public void setRegionsToReopen(List <HRegionInfo> regions) {
408     for(HRegionInfo hri : regions) {
409       regionsToReopen.put(hri.getEncodedName(), hri);
410     }
411   }
412 
413   /**
414    * Used by the client to identify if all regions have the schema updates
415    *
416    * @param tableName
417    * @return Pair indicating the status of the alter command
418    * @throws IOException
419    */
420   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
421       throws IOException {
422     List<HRegionInfo> hris;
423     if (TableName.META_TABLE_NAME.equals(tableName)) {
424       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
425     } else {
426       hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
427         server.getConnection(), tableName, true);
428     }
429 
430     Integer pending = 0;
431     for (HRegionInfo hri : hris) {
432       String name = hri.getEncodedName();
433       // no lock concurrent access ok: sequential consistency respected.
434       if (regionsToReopen.containsKey(name)
435           || regionStates.isRegionInTransition(name)) {
436         pending++;
437       }
438     }
439     return new Pair<Integer, Integer>(pending, hris.size());
440   }
441 
442   /**
443    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
444    * the failover cleanup before re-assigning regions of dead servers. So that
445    * when re-assignment happens, AssignmentManager has proper region states.
446    */
447   public boolean isFailoverCleanupDone() {
448     return failoverCleanupDone.get();
449   }
450 
451   /**
452    * To avoid racing with AM, external entities may need to lock a region,
453    * for example, when SSH checks what regions to skip re-assigning.
454    */
455   public Lock acquireRegionLock(final String encodedName) {
456     return locker.acquireLock(encodedName);
457   }
458 
459   /**
460    * Now, failover cleanup is completed. Notify server manager to
461    * process queued up dead servers processing, if any.
462    */
463   void failoverCleanupDone() {
464     failoverCleanupDone.set(true);
465     serverManager.processQueuedDeadServers();
466   }
467 
468   /**
469    * Called on startup.
470    * Figures whether a fresh cluster start of we are joining extant running cluster.
471    * @throws IOException
472    * @throws KeeperException
473    * @throws InterruptedException
474    * @throws CoordinatedStateException
475    */
476   void joinCluster() throws IOException,
477       KeeperException, InterruptedException, CoordinatedStateException {
478     long startTime = System.currentTimeMillis();
479     // Concurrency note: In the below the accesses on regionsInTransition are
480     // outside of a synchronization block where usually all accesses to RIT are
481     // synchronized.  The presumption is that in this case it is safe since this
482     // method is being played by a single thread on startup.
483 
484     // TODO: Regions that have a null location and are not in regionsInTransitions
485     // need to be handled.
486 
487     // Scan hbase:meta to build list of existing regions, servers, and assignment
488     // Returns servers who have not checked in (assumed dead) that some regions
489     // were assigned to (according to the meta)
490     Set<ServerName> deadServers = rebuildUserRegions();
491 
492     // This method will assign all user regions if a clean server startup or
493     // it will reconstruct master state and cleanup any leftovers from previous master process.
494     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
495 
496     if (!useZKForAssignment) {
497       // Not use ZK for assignment any more, remove the ZNode
498       ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
499     }
500     recoverTableInDisablingState();
501     recoverTableInEnablingState();
502     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
503       - startTime) + "ms, failover=" + failover);
504   }
505 
506   /**
507    * Process all regions that are in transition in zookeeper and also
508    * processes the list of dead servers.
509    * Used by master joining an cluster.  If we figure this is a clean cluster
510    * startup, will assign all user regions.
511    * @param deadServers Set of servers that are offline probably legitimately that were carrying
512    * regions according to a scan of hbase:meta. Can be null.
513    * @throws KeeperException
514    * @throws IOException
515    * @throws InterruptedException
516    */
517   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
518   throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
519     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
520 
521     if (useZKForAssignment && nodes == null) {
522       String errorMessage = "Failed to get the children from ZK";
523       server.abort(errorMessage, new IOException(errorMessage));
524       return true; // Doesn't matter in this case
525     }
526 
527     boolean failover = !serverManager.getDeadServers().isEmpty();
528     if (failover) {
529       // This may not be a failover actually, especially if meta is on this master.
530       if (LOG.isDebugEnabled()) {
531         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
532       }
533     } else {
534       // If any one region except meta is assigned, it's a failover.
535       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
536       for (Map.Entry<HRegionInfo, ServerName> en:
537           regionStates.getRegionAssignments().entrySet()) {
538         HRegionInfo hri = en.getKey();
539         if (!hri.isMetaTable()
540             && onlineServers.contains(en.getValue())) {
541           LOG.debug("Found " + hri + " out on cluster");
542           failover = true;
543           break;
544         }
545       }
546       if (!failover && nodes != null) {
547         // If any one region except meta is in transition, it's a failover.
548         for (String encodedName: nodes) {
549           RegionState regionState = regionStates.getRegionState(encodedName);
550           if (regionState != null && !regionState.getRegion().isMetaRegion()) {
551             LOG.debug("Found " + regionState + " in RITs");
552             failover = true;
553             break;
554           }
555         }
556       }
557     }
558     if (!failover && !useZKForAssignment) {
559       // If any region except meta is in transition on a live server, it's a failover.
560       Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
561       if (!regionsInTransition.isEmpty()) {
562         Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
563         for (RegionState regionState: regionsInTransition.values()) {
564           ServerName serverName = regionState.getServerName();
565           if (!regionState.getRegion().isMetaRegion()
566               && serverName != null && onlineServers.contains(serverName)) {
567             LOG.debug("Found " + regionState + " in RITs");
568             failover = true;
569             break;
570           }
571         }
572       }
573     }
574     if (!failover) {
575       // If we get here, we have a full cluster restart. It is a failover only
576       // if there are some WALs are not split yet. For meta WALs, they should have
577       // been split already, if any. We can walk through those queued dead servers,
578       // if they don't have any WALs, this restart should be considered as a clean one
579       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
580       if (!queuedDeadServers.isEmpty()) {
581         Configuration conf = server.getConfiguration();
582         Path rootdir = FSUtils.getRootDir(conf);
583         FileSystem fs = rootdir.getFileSystem(conf);
584         for (ServerName serverName: queuedDeadServers) {
585           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
586           // removed empty directories.
587           Path logDir = new Path(rootdir,
588               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
589           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
590           if (fs.exists(logDir) || fs.exists(splitDir)) {
591             LOG.debug("Found queued dead server " + serverName);
592             failover = true;
593             break;
594           }
595         }
596         if (!failover) {
597           // We figured that it's not a failover, so no need to
598           // work on these re-queued dead servers any more.
599           LOG.info("AM figured that it's not a failover and cleaned up "
600             + queuedDeadServers.size() + " queued dead servers");
601           serverManager.removeRequeuedDeadServers();
602         }
603       }
604     }
605 
606     Set<TableName> disabledOrDisablingOrEnabling = null;
607     Map<HRegionInfo, ServerName> allRegions = null;
608 
609     if (!failover) {
610       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
611         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
612         ZooKeeperProtos.Table.State.ENABLING);
613 
614       // Clean re/start, mark all user regions closed before reassignment
615       allRegions = regionStates.closeAllUserRegions(
616         disabledOrDisablingOrEnabling);
617     }
618 
619     // Now region states are restored
620     regionStateStore.start();
621 
622     // If we found user regions out on cluster, its a failover.
623     if (failover) {
624       LOG.info("Found regions out on cluster or in RIT; presuming failover");
625       // Process list of dead servers and regions in RIT.
626       // See HBASE-4580 for more information.
627       processDeadServersAndRecoverLostRegions(deadServers);
628     }
629 
630     if (!failover && useZKForAssignment) {
631       // Cleanup any existing ZK nodes and start watching
632       ZKAssign.deleteAllNodes(watcher);
633       ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
634         this.watcher.assignmentZNode);
635     }
636 
637     // Now we can safely claim failover cleanup completed and enable
638     // ServerShutdownHandler for further processing. The nodes (below)
639     // in transition, if any, are for regions not related to those
640     // dead servers at all, and can be done in parallel to SSH.
641     failoverCleanupDone();
642     if (!failover) {
643       // Fresh cluster startup.
644       LOG.info("Clean cluster startup. Assigning user regions");
645       assignAllUserRegions(allRegions);
646     }
647     // unassign replicas of the split parents and the merged regions
648     // the daughter replicas are opened in assignAllUserRegions if it was
649     // not already opened.
650     for (HRegionInfo h : replicasToClose) {
651       unassign(h);
652     }
653     replicasToClose.clear();
654     return failover;
655   }
656 
657   /**
658    * If region is up in zk in transition, then do fixup and block and wait until
659    * the region is assigned and out of transition.  Used on startup for
660    * catalog regions.
661    * @param hri Region to look for.
662    * @return True if we processed a region in transition else false if region
663    * was not up in zk in transition.
664    * @throws InterruptedException
665    * @throws KeeperException
666    * @throws IOException
667    */
668   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
669       throws InterruptedException, KeeperException, IOException {
670     String encodedRegionName = hri.getEncodedName();
671     if (!processRegionInTransition(encodedRegionName, hri)) {
672       return false; // The region is not in transition
673     }
674     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
675     while (!this.server.isStopped() &&
676         this.regionStates.isRegionInTransition(encodedRegionName)) {
677       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
678       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
679         // The region is not in transition, or not in transition on an online
680         // server. Doesn't help to block here any more. Caller need to
681         // verify the region is actually assigned.
682         break;
683       }
684       this.regionStates.waitForUpdate(100);
685     }
686     return true;
687   }
688 
689   /**
690    * Process failover of new master for region <code>encodedRegionName</code>
691    * up in zookeeper.
692    * @param encodedRegionName Region to process failover for.
693    * @param regionInfo If null we'll go get it from meta table.
694    * @return True if we processed <code>regionInfo</code> as a RIT.
695    * @throws KeeperException
696    * @throws IOException
697    */
698   boolean processRegionInTransition(final String encodedRegionName,
699       final HRegionInfo regionInfo) throws KeeperException, IOException {
700     // We need a lock here to ensure that we will not put the same region twice
701     // It has no reason to be a lock shared with the other operations.
702     // We can do the lock on the region only, instead of a global lock: what we want to ensure
703     // is that we don't have two threads working on the same region.
704     Lock lock = locker.acquireLock(encodedRegionName);
705     try {
706       Stat stat = new Stat();
707       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
708       if (data == null) return false;
709       RegionTransition rt;
710       try {
711         rt = RegionTransition.parseFrom(data);
712       } catch (DeserializationException e) {
713         LOG.warn("Failed parse znode data", e);
714         return false;
715       }
716       HRegionInfo hri = regionInfo;
717       if (hri == null) {
718         // The region info is not passed in. We will try to find the region
719         // from region states map/meta based on the encoded region name. But we
720         // may not be able to find it. This is valid for online merge that
721         // the region may have not been created if the merge is not completed.
722         // Therefore, it is not in meta at master recovery time.
723         hri = regionStates.getRegionInfo(rt.getRegionName());
724         EventType et = rt.getEventType();
725         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
726             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
727           LOG.warn("Couldn't find the region in recovering " + rt);
728           return false;
729         }
730       }
731 
732       // TODO: This code is tied to ZK anyway, so for now leaving it as is,
733       // will refactor when whole region assignment will be abstracted from ZK
734       BaseCoordinatedStateManager cp =
735         (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
736       OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
737 
738       ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
739         new ZkOpenRegionCoordination.ZkOpenRegionDetails();
740       zkOrd.setVersion(stat.getVersion());
741       zkOrd.setServerName(cp.getServer().getServerName());
742 
743       return processRegionsInTransition(
744         rt, hri, openRegionCoordination, zkOrd);
745     } finally {
746       lock.unlock();
747     }
748   }
749 
750   /**
751    * This call is invoked only (1) master assign meta;
752    * (2) during failover mode startup, zk assignment node processing.
753    * The locker is set in the caller. It returns true if the region
754    * is in transition for sure, false otherwise.
755    *
756    * It should be private but it is used by some test too.
757    */
758   boolean processRegionsInTransition(
759       final RegionTransition rt, final HRegionInfo regionInfo,
760       OpenRegionCoordination coordination,
761       final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
762     EventType et = rt.getEventType();
763     // Get ServerName.  Could not be null.
764     final ServerName sn = rt.getServerName();
765     final byte[] regionName = rt.getRegionName();
766     final String encodedName = HRegionInfo.encodeRegionName(regionName);
767     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
768     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
769 
770     if (regionStates.isRegionInTransition(encodedName)
771         && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
772       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
773         + et + ", does nothing since the region is already in transition "
774         + regionStates.getRegionTransitionState(encodedName));
775       // Just return
776       return true;
777     }
778     if (!serverManager.isServerOnline(sn)) {
779       // It was transitioning on a dead server, so it's closed now.
780       // Force to OFFLINE and put it in transition, but not assign it
781       // since log splitting for the dead server is not done yet.
782       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
783         " was on deadserver; forcing offline");
784       if (regionStates.isRegionOnline(regionInfo)) {
785         // Meta could still show the region is assigned to the previous
786         // server. If that server is online, when we reload the meta, the
787         // region is put back to online, we need to offline it.
788         regionStates.regionOffline(regionInfo);
789         sendRegionClosedNotification(regionInfo);
790       }
791       // Put it back in transition so that SSH can re-assign it
792       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
793 
794       if (regionInfo.isMetaRegion()) {
795         // If it's meta region, reset the meta location.
796         // So that master knows the right meta region server.
797         MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
798       } else {
799         // No matter the previous server is online or offline,
800         // we need to reset the last region server of the region.
801         regionStates.setLastRegionServerOfRegion(sn, encodedName);
802         // Make sure we know the server is dead.
803         if (!serverManager.isServerDead(sn)) {
804           serverManager.expireServer(sn);
805         }
806       }
807       return false;
808     }
809     switch (et) {
810       case M_ZK_REGION_CLOSING:
811         // Insert into RIT & resend the query to the region server: may be the previous master
812         // died before sending the query the first time.
813         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
814         this.executorService.submit(
815           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
816             @Override
817             public void process() throws IOException {
818               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
819               try {
820                 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
821                   .getVersion();
822                 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
823                 if (regionStates.isRegionOffline(regionInfo)) {
824                   assign(regionInfo, true);
825                 }
826               } finally {
827                 lock.unlock();
828               }
829             }
830           });
831         break;
832 
833       case RS_ZK_REGION_CLOSED:
834       case RS_ZK_REGION_FAILED_OPEN:
835         // Region is closed, insert into RIT and handle it
836         regionStates.setLastRegionServerOfRegion(sn, encodedName);
837         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
838         if (!replicasToClose.contains(regionInfo)) {
839           invokeAssign(regionInfo);
840         } else {
841           offlineDisabledRegion(regionInfo);
842         }
843         break;
844 
845       case M_ZK_REGION_OFFLINE:
846         // Insert in RIT and resend to the regionserver
847         regionStates.updateRegionState(rt, State.PENDING_OPEN);
848         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
849         this.executorService.submit(
850           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
851             @Override
852             public void process() throws IOException {
853               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
854               try {
855                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
856                 addPlan(encodedName, plan);
857                 assign(rsOffline, false, false);
858               } finally {
859                 lock.unlock();
860               }
861             }
862           });
863         break;
864 
865       case RS_ZK_REGION_OPENING:
866         regionStates.updateRegionState(rt, State.OPENING);
867         break;
868 
869       case RS_ZK_REGION_OPENED:
870         // Region is opened, insert into RIT and handle it
871         // This could be done asynchronously, we would need then to acquire the lock in the
872         //  handler.
873         regionStates.updateRegionState(rt, State.OPEN);
874         new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
875         break;
876       case RS_ZK_REQUEST_REGION_SPLIT:
877       case RS_ZK_REGION_SPLITTING:
878       case RS_ZK_REGION_SPLIT:
879         // Splitting region should be online. We could have skipped it during
880         // user region rebuilding since we may consider the split is completed.
881         // Put it in SPLITTING state to avoid complications.
882         regionStates.regionOnline(regionInfo, sn);
883         regionStates.updateRegionState(rt, State.SPLITTING);
884         if (!handleRegionSplitting(
885             rt, encodedName, prettyPrintedRegionName, sn)) {
886           deleteSplittingNode(encodedName, sn);
887         }
888         break;
889       case RS_ZK_REQUEST_REGION_MERGE:
890       case RS_ZK_REGION_MERGING:
891       case RS_ZK_REGION_MERGED:
892         if (!handleRegionMerging(
893             rt, encodedName, prettyPrintedRegionName, sn)) {
894           deleteMergingNode(encodedName, sn);
895         }
896         break;
897       default:
898         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
899     }
900     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
901       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
902       + "server: " + sn);
903     return true;
904   }
905 
906   /**
907    * When a region is closed, it should be removed from the regionsToReopen
908    * @param hri HRegionInfo of the region which was closed
909    */
910   public void removeClosedRegion(HRegionInfo hri) {
911     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
912       LOG.debug("Removed region from reopening regions because it was closed");
913     }
914   }
915 
916   /**
917    * Handles various states an unassigned node can be in.
918    * <p>
919    * Method is called when a state change is suspected for an unassigned node.
920    * <p>
921    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
922    * yet).
923    * @param rt region transition
924    * @param coordination coordination for opening region
925    * @param ord details about opening region
926    */
927   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
928       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
929       justification="Needs work; says access to ConcurrentHashMaps not ATOMIC!!!")
930   void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
931                     OpenRegionCoordination.OpenRegionDetails ord) {
932     if (rt == null) {
933       LOG.warn("Unexpected NULL input for RegionTransition rt");
934       return;
935     }
936     final ServerName sn = rt.getServerName();
937     // Check if this is a special HBCK transition
938     if (sn.equals(HBCK_CODE_SERVERNAME)) {
939       handleHBCK(rt);
940       return;
941     }
942     final long createTime = rt.getCreateTime();
943     final byte[] regionName = rt.getRegionName();
944     String encodedName = HRegionInfo.encodeRegionName(regionName);
945     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
946     // Verify this is a known server
947     if (!serverManager.isServerOnline(sn)
948       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
949       LOG.warn("Attempted to handle region transition for server but " +
950         "it is not online: " + prettyPrintedRegionName + ", " + rt);
951       return;
952     }
953 
954     RegionState regionState =
955       regionStates.getRegionState(encodedName);
956     long startTime = System.currentTimeMillis();
957     if (LOG.isDebugEnabled()) {
958       boolean lateEvent = createTime < (startTime - 15000);
959       LOG.debug("Handling " + rt.getEventType() +
960         ", server=" + sn + ", region=" +
961         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
962         (lateEvent ? ", which is more than 15 seconds late" : "") +
963         ", current_state=" + regionState);
964     }
965     // We don't do anything for this event,
966     // so separate it out, no need to lock/unlock anything
967     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
968       return;
969     }
970 
971     // We need a lock on the region as we could update it
972     Lock lock = locker.acquireLock(encodedName);
973     try {
974       RegionState latestState =
975         regionStates.getRegionState(encodedName);
976       if ((regionState == null && latestState != null)
977           || (regionState != null && latestState == null)
978           || (regionState != null && latestState != null
979             && latestState.getState() != regionState.getState())) {
980         LOG.warn("Region state changed from " + regionState + " to "
981           + latestState + ", while acquiring lock");
982       }
983       long waitedTime = System.currentTimeMillis() - startTime;
984       if (waitedTime > 5000) {
985         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
986       }
987       regionState = latestState;
988       switch (rt.getEventType()) {
989       case RS_ZK_REQUEST_REGION_SPLIT:
990       case RS_ZK_REGION_SPLITTING:
991       case RS_ZK_REGION_SPLIT:
992         if (!handleRegionSplitting(
993             rt, encodedName, prettyPrintedRegionName, sn)) {
994           deleteSplittingNode(encodedName, sn);
995         }
996         break;
997 
998       case RS_ZK_REQUEST_REGION_MERGE:
999       case RS_ZK_REGION_MERGING:
1000       case RS_ZK_REGION_MERGED:
1001         // Merged region is a new region, we can't find it in the region states now.
1002         // However, the two merging regions are not new. They should be in state for merging.
1003         if (!handleRegionMerging(
1004             rt, encodedName, prettyPrintedRegionName, sn)) {
1005           deleteMergingNode(encodedName, sn);
1006         }
1007         break;
1008 
1009       case M_ZK_REGION_CLOSING:
1010         // Should see CLOSING after we have asked it to CLOSE or additional
1011         // times after already being in state of CLOSING
1012         if (regionState == null
1013             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1014           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1015             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1016             + regionStates.getRegionState(encodedName));
1017           return;
1018         }
1019         // Transition to CLOSING (or update stamp if already CLOSING)
1020         regionStates.updateRegionState(rt, State.CLOSING);
1021         break;
1022 
1023       case RS_ZK_REGION_CLOSED:
1024         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
1025         if (regionState == null
1026             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1027           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1028             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1029             + regionStates.getRegionState(encodedName));
1030           return;
1031         }
1032         // Handle CLOSED by assigning elsewhere or stopping if a disable
1033         // If we got here all is good.  Need to update RegionState -- else
1034         // what follows will fail because not in expected state.
1035         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1036         updateClosedRegionHandlerTracker(regionState.getRegion());
1037         break;
1038 
1039         case RS_ZK_REGION_FAILED_OPEN:
1040           if (regionState == null
1041               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1042             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1043               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1044               + regionStates.getRegionState(encodedName));
1045             return;
1046           }
1047           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1048           if (failedOpenCount == null) {
1049             failedOpenCount = new AtomicInteger();
1050             // No need to use putIfAbsent, or extra synchronization since
1051             // this whole handleRegion block is locked on the encoded region
1052             // name, and failedOpenTracker is updated only in this block
1053             // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
1054             failedOpenTracker.put(encodedName, failedOpenCount);
1055           }
1056           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1057             // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
1058             regionStates.updateRegionState(rt, State.FAILED_OPEN);
1059             // remove the tracking info to save memory, also reset
1060             // the count for next open initiative
1061             failedOpenTracker.remove(encodedName);
1062           } else {
1063             // Handle this the same as if it were opened and then closed.
1064             regionState = regionStates.updateRegionState(rt, State.CLOSED);
1065             if (regionState != null) {
1066               // When there are more than one region server a new RS is selected as the
1067               // destination and the same is updated in the regionplan. (HBASE-5546)
1068               try {
1069                 getRegionPlan(regionState.getRegion(), sn, true);
1070                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1071               } catch (HBaseIOException e) {
1072                 LOG.warn("Failed to get region plan", e);
1073               }
1074             }
1075           }
1076           break;
1077 
1078         case RS_ZK_REGION_OPENING:
1079           // Should see OPENING after we have asked it to OPEN or additional
1080           // times after already being in state of OPENING
1081           if (regionState == null
1082               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1083             LOG.warn("Received OPENING for " + prettyPrintedRegionName
1084               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1085               + regionStates.getRegionState(encodedName));
1086             return;
1087           }
1088           // Transition to OPENING (or update stamp if already OPENING)
1089           regionStates.updateRegionState(rt, State.OPENING);
1090           break;
1091 
1092         case RS_ZK_REGION_OPENED:
1093           // Should see OPENED after OPENING but possible after PENDING_OPEN.
1094           if (regionState == null
1095               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1096             LOG.warn("Received OPENED for " + prettyPrintedRegionName
1097               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1098               + regionStates.getRegionState(encodedName));
1099 
1100             if (regionState != null) {
1101               // Close it without updating the internal region states,
1102               // so as not to create double assignments in unlucky scenarios
1103               // mentioned in OpenRegionHandler#process
1104               unassign(regionState.getRegion(), null, -1, null, false, sn);
1105             }
1106             return;
1107           }
1108           // Handle OPENED by removing from transition and deleted zk node
1109           regionState =
1110               regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1111           if (regionState != null) {
1112             failedOpenTracker.remove(encodedName); // reset the count, if any
1113             new OpenedRegionHandler(
1114               server, this, regionState.getRegion(), coordination, ord).process();
1115             updateOpenedRegionHandlerTracker(regionState.getRegion());
1116           }
1117           break;
1118 
1119         default:
1120           throw new IllegalStateException("Received event is not valid.");
1121       }
1122     } finally {
1123       lock.unlock();
1124     }
1125   }
1126 
1127   //For unit tests only
1128   boolean wasClosedHandlerCalled(HRegionInfo hri) {
1129     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1130     //compareAndSet to be sure that unit tests don't see stale values. Means,
1131     //we will return true exactly once unless the handler code resets to true
1132     //this value.
1133     return b == null ? false : b.compareAndSet(true, false);
1134   }
1135 
1136   //For unit tests only
1137   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1138     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1139     //compareAndSet to be sure that unit tests don't see stale values. Means,
1140     //we will return true exactly once unless the handler code resets to true
1141     //this value.
1142     return b == null ? false : b.compareAndSet(true, false);
1143   }
1144 
1145   //For unit tests only
1146   void initializeHandlerTrackers() {
1147     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1148     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1149   }
1150 
1151   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1152     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1153       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1154     }
1155   }
1156 
1157   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1158     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1159       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1160     }
1161   }
1162 
1163   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1164   // meta could not be contacted/updated. We need to see how seriously to treat
1165   // this problem as. Should we fail the current assignment. We should be able
1166   // to recover from this problem eventually (if the meta couldn't be updated
1167   // things should work normally and eventually get fixed up).
1168   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1169     if (!shouldAssignRegionsWithFavoredNodes) return;
1170     // The AM gets the favored nodes info for each region and updates the meta
1171     // table with that info
1172     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1173         new HashMap<HRegionInfo, List<ServerName>>();
1174     for (HRegionInfo region : regions) {
1175       regionToFavoredNodes.put(region,
1176           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1177     }
1178     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1179       this.server.getConnection());
1180   }
1181 
1182   /**
1183    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1184    * <p>
1185    * This is handled in a separate code path because it breaks the normal rules.
1186    * @param rt
1187    */
1188   @SuppressWarnings("deprecation")
1189   private void handleHBCK(RegionTransition rt) {
1190     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1191     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1192       ", server=" + rt.getServerName() + ", region=" +
1193       HRegionInfo.prettyPrint(encodedName));
1194     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1195     switch (rt.getEventType()) {
1196       case M_ZK_REGION_OFFLINE:
1197         HRegionInfo regionInfo;
1198         if (regionState != null) {
1199           regionInfo = regionState.getRegion();
1200         } else {
1201           try {
1202             byte [] name = rt.getRegionName();
1203             Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1204               this.server.getConnection(), name);
1205             regionInfo = p.getFirst();
1206           } catch (IOException e) {
1207             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1208             return;
1209           }
1210         }
1211         LOG.info("HBCK repair is triggering assignment of region=" +
1212             regionInfo.getRegionNameAsString());
1213         // trigger assign, node is already in OFFLINE so don't need to update ZK
1214         assign(regionInfo, false);
1215         break;
1216 
1217       default:
1218         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1219         break;
1220     }
1221 
1222   }
1223 
1224   // ZooKeeper events
1225 
1226   /**
1227    * New unassigned node has been created.
1228    *
1229    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1230    * creating an unassigned node.
1231    *
1232    * <p>When this happens we must:
1233    * <ol>
1234    *   <li>Watch the node for further events</li>
1235    *   <li>Read and handle the state in the node</li>
1236    * </ol>
1237    */
1238   @Override
1239   public void nodeCreated(String path) {
1240     handleAssignmentEvent(path);
1241   }
1242 
1243   /**
1244    * Existing unassigned node has had data changed.
1245    *
1246    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1247    * OPENING/OPENED and CLOSING/CLOSED.
1248    *
1249    * <p>When this happens we must:
1250    * <ol>
1251    *   <li>Watch the node for further events</li>
1252    *   <li>Read and handle the state in the node</li>
1253    * </ol>
1254    */
1255   @Override
1256   public void nodeDataChanged(String path) {
1257     handleAssignmentEvent(path);
1258   }
1259 
1260 
1261   // We  don't want to have two events on the same region managed simultaneously.
1262   // For this reason, we need to wait if an event on the same region is currently in progress.
1263   // So we track the region names of the events in progress, and we keep a waiting list.
1264   private final Set<String> regionsInProgress = new HashSet<String>();
1265   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1266   //  this as we want the events to be managed in the same order as we received them.
1267   private final LinkedHashMultimap <String, RegionRunnable>
1268       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1269 
1270   /**
1271    * A specific runnable that works only on a region.
1272    */
1273   private interface RegionRunnable extends Runnable{
1274     /**
1275      * @return - the name of the region it works on.
1276      */
1277     String getRegionName();
1278   }
1279 
1280   /**
1281    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1282    * Order is respected.
1283    */
1284   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1285 
1286     synchronized (regionsInProgress) {
1287       // If we're there is already a task with this region, we add it to the
1288       //  waiting list and return.
1289       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1290         synchronized (zkEventWorkerWaitingList){
1291           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1292         }
1293         return;
1294       }
1295 
1296       // No event in progress on this region => we can submit a new task immediately.
1297       regionsInProgress.add(regRunnable.getRegionName());
1298       zkEventWorkers.submit(new Runnable() {
1299         @Override
1300         public void run() {
1301           try {
1302             regRunnable.run();
1303           } finally {
1304             // now that we have finished, let's see if there is an event for the same region in the
1305             //  waiting list. If it's the case, we can now submit it to the pool.
1306             synchronized (regionsInProgress) {
1307               regionsInProgress.remove(regRunnable.getRegionName());
1308               synchronized (zkEventWorkerWaitingList) {
1309                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1310                     regRunnable.getRegionName());
1311                 if (!waiting.isEmpty()) {
1312                   // We want the first object only. The only way to get it is through an iterator.
1313                   RegionRunnable toSubmit = waiting.iterator().next();
1314                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1315                   zkEventWorkersSubmit(toSubmit);
1316                 }
1317               }
1318             }
1319           }
1320         }
1321       });
1322     }
1323   }
1324 
1325   @Override
1326   public void nodeDeleted(final String path) {
1327     if (path.startsWith(watcher.assignmentZNode)) {
1328       final String regionName = ZKAssign.getRegionName(watcher, path);
1329       zkEventWorkersSubmit(new RegionRunnable() {
1330         @Override
1331         public String getRegionName() {
1332           return regionName;
1333         }
1334 
1335         @Override
1336         public void run() {
1337           Lock lock = locker.acquireLock(regionName);
1338           try {
1339             RegionState rs = regionStates.getRegionTransitionState(regionName);
1340             if (rs == null) {
1341               rs = regionStates.getRegionState(regionName);
1342               if (rs == null || !rs.isMergingNew()) {
1343                 // MergingNew is an offline state
1344                 return;
1345               }
1346             }
1347 
1348             HRegionInfo regionInfo = rs.getRegion();
1349             String regionNameStr = regionInfo.getRegionNameAsString();
1350             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1351 
1352             boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1353                 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1354 
1355             ServerName serverName = rs.getServerName();
1356             if (serverManager.isServerOnline(serverName)) {
1357               if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1358                 synchronized (regionStates) {
1359                   regionOnline(regionInfo, serverName);
1360                   if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1361                     // Check if the daugter regions are still there, if they are present, offline
1362                     // as its the case of a rollback.
1363                     HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1364                     HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1365                     if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1366                       LOG.warn("Split daughter region not in transition " + hri_a);
1367                     }
1368                     if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1369                       LOG.warn("Split daughter region not in transition" + hri_b);
1370                     }
1371                     regionOffline(hri_a);
1372                     regionOffline(hri_b);
1373                     splitRegions.remove(regionInfo);
1374                   }
1375                   if (disabled) {
1376                     // if server is offline, no hurt to unassign again
1377                     LOG.info("Opened " + regionNameStr
1378                         + "but this table is disabled, triggering close of region");
1379                     unassign(regionInfo);
1380                   }
1381                 }
1382               } else if (rs.isMergingNew()) {
1383                 synchronized (regionStates) {
1384                   String p = regionInfo.getEncodedName();
1385                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1386                   if (regions != null) {
1387                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1388                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1389                   }
1390                 }
1391               }
1392             }
1393           } finally {
1394             lock.unlock();
1395           }
1396         }
1397 
1398         private void onlineMergingRegion(boolean disabled,
1399             final HRegionInfo hri, final ServerName serverName) {
1400           RegionState regionState = regionStates.getRegionState(hri);
1401           if (regionState != null && regionState.isMerging()
1402               && regionState.isOnServer(serverName)) {
1403             regionOnline(regionState.getRegion(), serverName);
1404             if (disabled) {
1405               unassign(hri);
1406             }
1407           }
1408         }
1409       });
1410     }
1411   }
1412 
1413   /**
1414    * New unassigned node has been created.
1415    *
1416    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1417    * region by creating a znode.
1418    *
1419    * <p>When this happens we must:
1420    * <ol>
1421    *   <li>Watch the node for further children changed events</li>
1422    *   <li>Watch all new children for changed events</li>
1423    * </ol>
1424    */
1425   @Override
1426   public void nodeChildrenChanged(String path) {
1427     if (path.equals(watcher.assignmentZNode)) {
1428       zkEventWorkers.submit(new Runnable() {
1429         @Override
1430         public void run() {
1431           try {
1432             // Just make sure we see the changes for the new znodes
1433             List<String> children =
1434               ZKUtil.listChildrenAndWatchForNewChildren(
1435                 watcher, watcher.assignmentZNode);
1436             if (children != null) {
1437               Stat stat = new Stat();
1438               for (String child : children) {
1439                 // if region is in transition, we already have a watch
1440                 // on it, so no need to watch it again. So, as I know for now,
1441                 // this is needed to watch splitting nodes only.
1442                 if (!regionStates.isRegionInTransition(child)) {
1443                   ZKAssign.getDataAndWatch(watcher, child, stat);
1444                 }
1445               }
1446             }
1447           } catch (KeeperException e) {
1448             server.abort("Unexpected ZK exception reading unassigned children", e);
1449           }
1450         }
1451       });
1452     }
1453   }
1454 
1455 
1456   /**
1457    * Marks the region as online.  Removes it from regions in transition and
1458    * updates the in-memory assignment information.
1459    * <p>
1460    * Used when a region has been successfully opened on a region server.
1461    * @param regionInfo
1462    * @param sn
1463    */
1464   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1465     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1466   }
1467 
1468   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1469     numRegionsOpened.incrementAndGet();
1470     regionStates.regionOnline(regionInfo, sn, openSeqNum);
1471 
1472     // Remove plan if one.
1473     clearRegionPlan(regionInfo);
1474     balancer.regionOnline(regionInfo, sn);
1475 
1476     // Tell our listeners that a region was opened
1477     sendRegionOpenedNotification(regionInfo, sn);
1478   }
1479 
1480   /**
1481    * Pass the assignment event to a worker for processing.
1482    * Each worker is a single thread executor service.  The reason
1483    * for just one thread is to make sure all events for a given
1484    * region are processed in order.
1485    *
1486    * @param path
1487    */
1488   private void handleAssignmentEvent(final String path) {
1489     if (path.startsWith(watcher.assignmentZNode)) {
1490       final String regionName = ZKAssign.getRegionName(watcher, path);
1491 
1492       zkEventWorkersSubmit(new RegionRunnable() {
1493         @Override
1494         public String getRegionName() {
1495           return regionName;
1496         }
1497 
1498         @Override
1499         public void run() {
1500           try {
1501             Stat stat = new Stat();
1502             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1503             if (data == null) return;
1504 
1505             RegionTransition rt = RegionTransition.parseFrom(data);
1506 
1507             // TODO: This code is tied to ZK anyway, so for now leaving it as is,
1508             // will refactor when whole region assignment will be abstracted from ZK
1509             BaseCoordinatedStateManager csm =
1510               (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1511             OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1512 
1513             ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1514               new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1515             zkOrd.setVersion(stat.getVersion());
1516             zkOrd.setServerName(csm.getServer().getServerName());
1517 
1518             handleRegion(rt, openRegionCoordination, zkOrd);
1519           } catch (KeeperException e) {
1520             server.abort("Unexpected ZK exception reading unassigned node data", e);
1521           } catch (DeserializationException e) {
1522             server.abort("Unexpected exception deserializing node data", e);
1523           }
1524         }
1525       });
1526     }
1527   }
1528 
1529   /**
1530    * Marks the region as offline.  Removes it from regions in transition and
1531    * removes in-memory assignment information.
1532    * <p>
1533    * Used when a region has been closed and should remain closed.
1534    * @param regionInfo
1535    */
1536   public void regionOffline(final HRegionInfo regionInfo) {
1537     regionOffline(regionInfo, null);
1538   }
1539 
1540   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1541     if (useZKForAssignment) {
1542       // Disabling so should not be reassigned, just delete the CLOSED node
1543       LOG.debug("Table being disabled so deleting ZK node and removing from " +
1544         "regions in transition, skipping assignment of region " +
1545           regionInfo.getRegionNameAsString());
1546       String encodedName = regionInfo.getEncodedName();
1547       deleteNodeInStates(encodedName, "closed", null,
1548         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1549     }
1550     replicasToClose.remove(regionInfo);
1551     regionOffline(regionInfo);
1552   }
1553 
1554   // Assignment methods
1555 
1556   /**
1557    * Assigns the specified region.
1558    * <p>
1559    * If a RegionPlan is available with a valid destination then it will be used
1560    * to determine what server region is assigned to.  If no RegionPlan is
1561    * available, region will be assigned to a random available server.
1562    * <p>
1563    * Updates the RegionState and sends the OPEN RPC.
1564    * <p>
1565    * This will only succeed if the region is in transition and in a CLOSED or
1566    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1567    * chosen server is up and running (It may have just crashed!).  If the
1568    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1569    *
1570    * @param region server to be assigned
1571    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1572    *                       OFFLINE state before assigning the region
1573    */
1574   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1575     assign(region, setOfflineInZK, false);
1576   }
1577 
1578   /**
1579    * Use care with forceNewPlan. It could cause double assignment.
1580    */
1581   public void assign(HRegionInfo region,
1582       boolean setOfflineInZK, boolean forceNewPlan) {
1583     if (isDisabledorDisablingRegionInRIT(region)) {
1584       return;
1585     }
1586     String encodedName = region.getEncodedName();
1587     Lock lock = locker.acquireLock(encodedName);
1588     try {
1589       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1590       if (state != null) {
1591         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1592           LOG.info("Skip assigning " + region.getRegionNameAsString()
1593             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1594             + " is dead but not processed yet");
1595           return;
1596         }
1597         assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1598       }
1599     } finally {
1600       lock.unlock();
1601     }
1602   }
1603 
1604   /**
1605    * Bulk assign regions to <code>destination</code>.
1606    * @param destination
1607    * @param regions Regions to assign.
1608    * @return true if successful
1609    */
1610   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1611     throws InterruptedException {
1612     long startTime = EnvironmentEdgeManager.currentTime();
1613     try {
1614       int regionCount = regions.size();
1615       if (regionCount == 0) {
1616         return true;
1617       }
1618       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1619       Set<String> encodedNames = new HashSet<String>(regionCount);
1620       for (HRegionInfo region : regions) {
1621         encodedNames.add(region.getEncodedName());
1622       }
1623 
1624       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1625       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1626       try {
1627         AtomicInteger counter = new AtomicInteger(0);
1628         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1629         OfflineCallback cb = new OfflineCallback(
1630           watcher, destination, counter, offlineNodesVersions);
1631         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1632         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1633         for (HRegionInfo region : regions) {
1634           String encodedName = region.getEncodedName();
1635           if (!isDisabledorDisablingRegionInRIT(region)) {
1636             RegionState state = forceRegionStateToOffline(region, false);
1637             boolean onDeadServer = false;
1638             if (state != null) {
1639               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1640                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1641                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1642                   + " is dead but not processed yet");
1643                 onDeadServer = true;
1644               } else if (!useZKForAssignment
1645                   || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1646                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1647                 plans.put(encodedName, plan);
1648                 states.add(state);
1649                 continue;
1650               }
1651             }
1652             // Reassign if the region wasn't on a dead server
1653             if (!onDeadServer) {
1654               LOG.info("failed to force region state to offline or "
1655                 + "failed to set it offline in ZK, will reassign later: " + region);
1656               failedToOpenRegions.add(region); // assign individually later
1657             }
1658           }
1659           // Release the lock, this region is excluded from bulk assign because
1660           // we can't update its state, or set its znode to offline.
1661           Lock lock = locks.remove(encodedName);
1662           lock.unlock();
1663         }
1664 
1665         if (useZKForAssignment) {
1666           // Wait until all unassigned nodes have been put up and watchers set.
1667           int total = states.size();
1668           for (int oldCounter = 0; !server.isStopped();) {
1669             int count = counter.get();
1670             if (oldCounter != count) {
1671               LOG.debug(destination.toString() + " unassigned znodes=" + count +
1672                 " of total=" + total + "; oldCounter=" + oldCounter);
1673               oldCounter = count;
1674             }
1675             if (count >= total) break;
1676             Thread.sleep(5);
1677           }
1678         }
1679 
1680         if (server.isStopped()) {
1681           return false;
1682         }
1683 
1684         // Add region plans, so we can updateTimers when one region is opened so
1685         // that unnecessary timeout on RIT is reduced.
1686         this.addPlans(plans);
1687 
1688         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1689           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1690         for (RegionState state: states) {
1691           HRegionInfo region = state.getRegion();
1692           String encodedRegionName = region.getEncodedName();
1693           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1694           if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1695             LOG.warn("failed to offline in zookeeper: " + region);
1696             failedToOpenRegions.add(region); // assign individually later
1697             Lock lock = locks.remove(encodedRegionName);
1698             lock.unlock();
1699           } else {
1700             regionStates.updateRegionState(
1701               region, State.PENDING_OPEN, destination);
1702             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1703             if (this.shouldAssignRegionsWithFavoredNodes) {
1704               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1705             }
1706             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1707               region, nodeVersion, favoredNodes));
1708           }
1709         }
1710 
1711         // Move on to open regions.
1712         try {
1713           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1714           // regions will be assigned individually.
1715           long maxWaitTime = System.currentTimeMillis() +
1716             this.server.getConfiguration().
1717               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1718           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1719             try {
1720               // regionOpenInfos is empty if all regions are in failedToOpenRegions list
1721               if (regionOpenInfos.isEmpty()) {
1722                 break;
1723               }
1724               List<RegionOpeningState> regionOpeningStateList = serverManager
1725                 .sendRegionOpen(destination, regionOpenInfos);
1726               if (regionOpeningStateList == null) {
1727                 // Failed getting RPC connection to this server
1728                 return false;
1729               }
1730               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1731                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1732                 if (openingState != RegionOpeningState.OPENED) {
1733                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1734                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1735                     processAlreadyOpenedRegion(region, destination);
1736                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1737                     // Failed opening this region, reassign it later
1738                     failedToOpenRegions.add(region);
1739                   } else {
1740                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1741                       + openingState + " in assigning region " + region);
1742                   }
1743                 }
1744               }
1745               break;
1746             } catch (IOException e) {
1747               if (e instanceof RemoteException) {
1748                 e = ((RemoteException)e).unwrapRemoteException();
1749               }
1750               if (e instanceof RegionServerStoppedException) {
1751                 LOG.warn("The region server was shut down, ", e);
1752                 // No need to retry, the region server is a goner.
1753                 return false;
1754               } else if (e instanceof ServerNotRunningYetException) {
1755                 long now = System.currentTimeMillis();
1756                 if (now < maxWaitTime) {
1757                   LOG.debug("Server is not yet up; waiting up to " +
1758                     (maxWaitTime - now) + "ms", e);
1759                   Thread.sleep(100);
1760                   i--; // reset the try count
1761                   continue;
1762                 }
1763               } else if (e instanceof java.net.SocketTimeoutException
1764                   && this.serverManager.isServerOnline(destination)) {
1765                 // In case socket is timed out and the region server is still online,
1766                 // the openRegion RPC could have been accepted by the server and
1767                 // just the response didn't go through.  So we will retry to
1768                 // open the region on the same server.
1769                 if (LOG.isDebugEnabled()) {
1770                   LOG.debug("Bulk assigner openRegion() to " + destination
1771                     + " has timed out, but the regions might"
1772                     + " already be opened on it.", e);
1773                 }
1774                 // wait and reset the re-try count, server might be just busy.
1775                 Thread.sleep(100);
1776                 i--;
1777                 continue;
1778               }
1779               throw e;
1780             }
1781           }
1782         } catch (IOException e) {
1783           // Can be a socket timeout, EOF, NoRouteToHost, etc
1784           LOG.info("Unable to communicate with " + destination
1785             + " in order to assign regions, ", e);
1786           return false;
1787         }
1788       } finally {
1789         for (Lock lock : locks.values()) {
1790           lock.unlock();
1791         }
1792       }
1793 
1794       if (!failedToOpenRegions.isEmpty()) {
1795         for (HRegionInfo region : failedToOpenRegions) {
1796           if (!regionStates.isRegionOnline(region)) {
1797             invokeAssign(region);
1798           }
1799         }
1800       }
1801 
1802       // wait for assignment completion
1803       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1804       for (HRegionInfo region: regions) {
1805         if (!region.getTable().isSystemTable()) {
1806           userRegionSet.add(region);
1807         }
1808       }
1809       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1810             System.currentTimeMillis())) {
1811         LOG.debug("some user regions are still in transition: " + userRegionSet);
1812       }
1813       LOG.debug("Bulk assigning done for " + destination);
1814       return true;
1815     } finally {
1816       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1817     }
1818   }
1819 
1820   /**
1821    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1822    *
1823    * The RPC will be sent only to the region sever found in the region state
1824    * if it is passed in, otherwise, to the src server specified. If region
1825    * state is not specified, we don't update region state at all, instead
1826    * we just send the RPC call. This is useful for some cleanup without
1827    * messing around the region states (see handleRegion, on region opened
1828    * on an unexpected server scenario, for an example)
1829    */
1830   private void unassign(final HRegionInfo region,
1831       final RegionState state, final int versionOfClosingNode,
1832       final ServerName dest, final boolean transitionInZK,
1833       final ServerName src) {
1834     ServerName server = src;
1835     if (state != null) {
1836       server = state.getServerName();
1837     }
1838     long maxWaitTime = -1;
1839     for (int i = 1; i <= this.maximumAttempts; i++) {
1840       if (this.server.isStopped() || this.server.isAborted()) {
1841         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1842         return;
1843       }
1844       // ClosedRegionhandler can remove the server from this.regions
1845       if (!serverManager.isServerOnline(server)) {
1846         LOG.debug("Offline " + region.getRegionNameAsString()
1847           + ", no need to unassign since it's on a dead server: " + server);
1848         if (transitionInZK) {
1849           // delete the node. if no node exists need not bother.
1850           deleteClosingOrClosedNode(region, server);
1851         }
1852         if (state != null) {
1853           regionOffline(region);
1854         }
1855         return;
1856       }
1857       try {
1858         // Send CLOSE RPC
1859         if (serverManager.sendRegionClose(server, region,
1860           versionOfClosingNode, dest, transitionInZK)) {
1861           LOG.debug("Sent CLOSE to " + server + " for region " +
1862             region.getRegionNameAsString());
1863           if (useZKForAssignment && !transitionInZK && state != null) {
1864             // Retry to make sure the region is
1865             // closed so as to avoid double assignment.
1866             unassign(region, state, versionOfClosingNode,
1867               dest, transitionInZK, src);
1868           }
1869           return;
1870         }
1871         // This never happens. Currently regionserver close always return true.
1872         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1873         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1874           region.getRegionNameAsString());
1875       } catch (Throwable t) {
1876         long sleepTime = 0;
1877         Configuration conf = this.server.getConfiguration();
1878         if (t instanceof RemoteException) {
1879           t = ((RemoteException)t).unwrapRemoteException();
1880         }
1881         boolean logRetries = true;
1882         if (t instanceof RegionServerAbortedException
1883             || t instanceof RegionServerStoppedException
1884             || t instanceof ServerNotRunningYetException) {
1885           // RS is aborting or stopping, we cannot offline the region since the region may need
1886           // to do WAL recovery. Until we see  the RS expiration, we should retry.
1887           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1888             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1889 
1890         } else if (t instanceof NotServingRegionException) {
1891           LOG.debug("Offline " + region.getRegionNameAsString()
1892             + ", it's not any more on " + server, t);
1893           if (transitionInZK) {
1894             deleteClosingOrClosedNode(region, server);
1895           }
1896           if (state != null) {
1897             regionOffline(region);
1898           }
1899           return;
1900         } else if ((t instanceof FailedServerException) || (state != null &&
1901             t instanceof RegionAlreadyInTransitionException)) {
1902           if(t instanceof FailedServerException) {
1903             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1904                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1905           } else {
1906             // RS is already processing this region, only need to update the timestamp
1907             LOG.debug("update " + state + " the timestamp.");
1908             state.updateTimestampToNow();
1909             if (maxWaitTime < 0) {
1910               maxWaitTime =
1911                   EnvironmentEdgeManager.currentTime()
1912                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1913                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1914             }
1915             long now = EnvironmentEdgeManager.currentTime();
1916             if (now < maxWaitTime) {
1917               LOG.debug("Region is already in transition; "
1918                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1919               sleepTime = 100;
1920               i--; // reset the try count
1921               logRetries = false;
1922             }
1923           }
1924         }
1925 
1926         try {
1927           if (sleepTime > 0) {
1928             Thread.sleep(sleepTime);
1929           }
1930         } catch (InterruptedException ie) {
1931           LOG.warn("Failed to unassign "
1932             + region.getRegionNameAsString() + " since interrupted", ie);
1933           Thread.currentThread().interrupt();
1934           if (state != null) {
1935             regionStates.updateRegionState(region, State.FAILED_CLOSE);
1936           }
1937           return;
1938         }
1939 
1940         if (logRetries) {
1941           LOG.info("Server " + server + " returned " + t + " for "
1942             + region.getRegionNameAsString() + ", try=" + i
1943             + " of " + this.maximumAttempts, t);
1944           // Presume retry or server will expire.
1945         }
1946       }
1947     }
1948     // Run out of attempts
1949     if (state != null) {
1950       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1951     }
1952   }
1953 
1954   /**
1955    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1956    */
1957   private RegionState forceRegionStateToOffline(
1958       final HRegionInfo region, final boolean forceNewPlan) {
1959     RegionState state = regionStates.getRegionState(region);
1960     if (state == null) {
1961       LOG.warn("Assigning but not in region states: " + region);
1962       state = regionStates.createRegionState(region);
1963     }
1964 
1965     ServerName sn = state.getServerName();
1966     if (forceNewPlan && LOG.isDebugEnabled()) {
1967       LOG.debug("Force region state offline " + state);
1968     }
1969 
1970     switch (state.getState()) {
1971     case OPEN:
1972     case OPENING:
1973     case PENDING_OPEN:
1974     case CLOSING:
1975     case PENDING_CLOSE:
1976       if (!forceNewPlan) {
1977         LOG.debug("Skip assigning " +
1978           region + ", it is already " + state);
1979         return null;
1980       }
1981     case FAILED_CLOSE:
1982     case FAILED_OPEN:
1983       unassign(region, state, -1, null, false, null);
1984       state = regionStates.getRegionState(region);
1985       if (state.isFailedClose()) {
1986         // If we can't close the region, we can't re-assign
1987         // it so as to avoid possible double assignment/data loss.
1988         LOG.info("Skip assigning " +
1989           region + ", we couldn't close it: " + state);
1990         return null;
1991       }
1992     case OFFLINE:
1993       // This region could have been open on this server
1994       // for a while. If the server is dead and not processed
1995       // yet, we can move on only if the meta shows the
1996       // region is not on this server actually, or on a server
1997       // not dead, or dead and processed already.
1998       // In case not using ZK, we don't need this check because
1999       // we have the latest info in memory, and the caller
2000       // will do another round checking any way.
2001       if (useZKForAssignment
2002           && regionStates.isServerDeadAndNotProcessed(sn)
2003           && wasRegionOnDeadServerByMeta(region, sn)) {
2004         if (!regionStates.isRegionInTransition(region)) {
2005           LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2006           regionStates.updateRegionState(region, State.OFFLINE);
2007         }
2008         LOG.info("Skip assigning " + region.getRegionNameAsString()
2009             + ", it is on a dead but not processed yet server: " + sn);
2010         return null;
2011       }
2012     case CLOSED:
2013       break;
2014     default:
2015       LOG.error("Trying to assign region " + region
2016         + ", which is " + state);
2017       return null;
2018     }
2019     return state;
2020   }
2021 
2022   @SuppressWarnings("deprecation")
2023   protected boolean wasRegionOnDeadServerByMeta(
2024       final HRegionInfo region, final ServerName sn) {
2025     try {
2026       if (region.isMetaRegion()) {
2027         ServerName server = this.server.getMetaTableLocator().
2028           getMetaRegionLocation(this.server.getZooKeeper());
2029         return regionStates.isServerDeadAndNotProcessed(server);
2030       }
2031       while (!server.isStopped()) {
2032         try {
2033           this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2034           Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2035             region.getRegionName());
2036           if (r == null || r.isEmpty()) return false;
2037           ServerName server = HRegionInfo.getServerName(r);
2038           return regionStates.isServerDeadAndNotProcessed(server);
2039         } catch (IOException ioe) {
2040           LOG.info("Received exception accessing hbase:meta during force assign "
2041             + region.getRegionNameAsString() + ", retrying", ioe);
2042         }
2043       }
2044     } catch (InterruptedException e) {
2045       Thread.currentThread().interrupt();
2046       LOG.info("Interrupted accessing hbase:meta", e);
2047     }
2048     // Call is interrupted or server is stopped.
2049     return regionStates.isServerDeadAndNotProcessed(sn);
2050   }
2051 
2052   /**
2053    * Caller must hold lock on the passed <code>state</code> object.
2054    * @param state
2055    * @param setOfflineInZK
2056    * @param forceNewPlan
2057    */
2058   private void assign(RegionState state,
2059       boolean setOfflineInZK, final boolean forceNewPlan) {
2060     long startTime = EnvironmentEdgeManager.currentTime();
2061     try {
2062       Configuration conf = server.getConfiguration();
2063       RegionState currentState = state;
2064       int versionOfOfflineNode = -1;
2065       RegionPlan plan = null;
2066       long maxWaitTime = -1;
2067       HRegionInfo region = state.getRegion();
2068       RegionOpeningState regionOpenState;
2069       Throwable previousException = null;
2070       for (int i = 1; i <= maximumAttempts; i++) {
2071         if (server.isStopped() || server.isAborted()) {
2072           LOG.info("Skip assigning " + region.getRegionNameAsString()
2073             + ", the server is stopped/aborted");
2074           return;
2075         }
2076 
2077         if (plan == null) { // Get a server for the region at first
2078           try {
2079             plan = getRegionPlan(region, forceNewPlan);
2080           } catch (HBaseIOException e) {
2081             LOG.warn("Failed to get region plan", e);
2082           }
2083         }
2084 
2085         if (plan == null) {
2086           LOG.warn("Unable to determine a plan to assign " + region);
2087 
2088           // For meta region, we have to keep retrying until succeeding
2089           if (region.isMetaRegion()) {
2090             if (i == maximumAttempts) {
2091               i = 0; // re-set attempt count to 0 for at least 1 retry
2092 
2093               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2094                 " after maximumAttempts (" + this.maximumAttempts +
2095                 "). Reset attempts count and continue retrying.");
2096             }
2097             waitForRetryingMetaAssignment();
2098             continue;
2099           }
2100 
2101           regionStates.updateRegionState(region, State.FAILED_OPEN);
2102           return;
2103         }
2104         if (setOfflineInZK && versionOfOfflineNode == -1) {
2105           LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2106           // get the version of the znode after setting it to OFFLINE.
2107           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
2108           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2109           if (versionOfOfflineNode != -1) {
2110             if (isDisabledorDisablingRegionInRIT(region)) {
2111               return;
2112             }
2113             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
2114             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
2115             // try to set to ENABLED directly then client API may think table is enabled.
2116             // When we have a case such as all the regions are added directly into hbase:meta and we call
2117             // assignRegion then we need to make the table ENABLED. Hence in such case the table
2118             // will not be in ENABLING or ENABLED state.
2119             TableName tableName = region.getTable();
2120             if (!tableStateManager.isTableState(tableName,
2121               ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2122               LOG.debug("Setting table " + tableName + " to ENABLED state.");
2123               setEnabledTable(tableName);
2124             }
2125           }
2126         }
2127         if (setOfflineInZK && versionOfOfflineNode == -1) {
2128           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2129           // Setting offline in ZK must have been failed due to ZK racing or some
2130           // exception which may make the server to abort. If it is ZK racing,
2131           // we should retry since we already reset the region state,
2132           // existing (re)assignment will fail anyway.
2133           if (!server.isAborted()) {
2134             continue;
2135           }
2136         }
2137         LOG.info("Assigning " + region.getRegionNameAsString() +
2138             " to " + plan.getDestination().toString());
2139         // Transition RegionState to PENDING_OPEN
2140         currentState = regionStates.updateRegionState(region,
2141           State.PENDING_OPEN, plan.getDestination());
2142 
2143         boolean needNewPlan;
2144         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2145             " to " + plan.getDestination();
2146         try {
2147           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2148           if (this.shouldAssignRegionsWithFavoredNodes) {
2149             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2150           }
2151           regionOpenState = serverManager.sendRegionOpen(
2152               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2153 
2154           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2155             // Failed opening this region, looping again on a new server.
2156             needNewPlan = true;
2157             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2158                 " trying to assign elsewhere instead; " +
2159                 "try=" + i + " of " + this.maximumAttempts);
2160           } else {
2161             // we're done
2162             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2163               processAlreadyOpenedRegion(region, plan.getDestination());
2164             }
2165             return;
2166           }
2167 
2168         } catch (Throwable t) {
2169           if (t instanceof RemoteException) {
2170             t = ((RemoteException) t).unwrapRemoteException();
2171           }
2172           previousException = t;
2173 
2174           // Should we wait a little before retrying? If the server is starting it's yes.
2175           // If the region is already in transition, it's yes as well: we want to be sure that
2176           //  the region will get opened but we don't want a double assignment.
2177           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2178               t instanceof ServerNotRunningYetException);
2179 
2180           // In case socket is timed out and the region server is still online,
2181           // the openRegion RPC could have been accepted by the server and
2182           // just the response didn't go through.  So we will retry to
2183           // open the region on the same server to avoid possible
2184           // double assignment.
2185           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2186               && this.serverManager.isServerOnline(plan.getDestination()));
2187 
2188 
2189           if (hold) {
2190             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2191               "try=" + i + " of " + this.maximumAttempts, t);
2192 
2193             if (maxWaitTime < 0) {
2194               if (t instanceof RegionAlreadyInTransitionException) {
2195                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2196                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2197                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2198               } else {
2199                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2200                   + this.server.getConfiguration().getLong(
2201                     "hbase.regionserver.rpc.startup.waittime", 60000);
2202               }
2203             }
2204             try {
2205               needNewPlan = false;
2206               long now = EnvironmentEdgeManager.currentTime();
2207               if (now < maxWaitTime) {
2208                 LOG.debug("Server is not yet up or region is already in transition; "
2209                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2210                 Thread.sleep(100);
2211                 i--; // reset the try count
2212               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2213                 LOG.debug("Server is not up for a while; try a new one", t);
2214                 needNewPlan = true;
2215               }
2216             } catch (InterruptedException ie) {
2217               LOG.warn("Failed to assign "
2218                   + region.getRegionNameAsString() + " since interrupted", ie);
2219               regionStates.updateRegionState(region, State.FAILED_OPEN);
2220               Thread.currentThread().interrupt();
2221               return;
2222             }
2223           } else if (retry) {
2224             needNewPlan = false;
2225             i--; // we want to retry as many times as needed as long as the RS is not dead.
2226             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2227           } else {
2228             needNewPlan = true;
2229             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2230                 " try=" + i + " of " + this.maximumAttempts, t);
2231           }
2232         }
2233 
2234         if (i == this.maximumAttempts) {
2235           // For meta region, we have to keep retrying until succeeding
2236           if (region.isMetaRegion()) {
2237             i = 0; // re-set attempt count to 0 for at least 1 retry
2238             LOG.warn(assignMsg +
2239                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2240                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
2241             waitForRetryingMetaAssignment();
2242           }
2243           else {
2244             // Don't reset the region state or get a new plan any more.
2245             // This is the last try.
2246             continue;
2247           }
2248         }
2249 
2250         // If region opened on destination of present plan, reassigning to new
2251         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2252         // reassigning to same RS.
2253         if (needNewPlan) {
2254           // Force a new plan and reassign. Will return null if no servers.
2255           // The new plan could be the same as the existing plan since we don't
2256           // exclude the server of the original plan, which should not be
2257           // excluded since it could be the only server up now.
2258           RegionPlan newPlan = null;
2259           try {
2260             newPlan = getRegionPlan(region, true);
2261           } catch (HBaseIOException e) {
2262             LOG.warn("Failed to get region plan", e);
2263           }
2264           if (newPlan == null) {
2265             regionStates.updateRegionState(region, State.FAILED_OPEN);
2266             LOG.warn("Unable to find a viable location to assign region " +
2267                 region.getRegionNameAsString());
2268             return;
2269           }
2270 
2271           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2272             // Clean out plan we failed execute and one that doesn't look like it'll
2273             // succeed anyways; we need a new plan!
2274             // Transition back to OFFLINE
2275             LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2276                 + newPlan.getDestination() + " server.");
2277             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2278             versionOfOfflineNode = -1;
2279             if (useZKForAssignment) {
2280               setOfflineInZK = true;
2281             }
2282             plan = newPlan;
2283           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2284               previousException instanceof FailedServerException) {
2285             try {
2286               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2287                 " to the same failed server.");
2288               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2289                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2290             } catch (InterruptedException ie) {
2291               LOG.warn("Failed to assign "
2292                   + region.getRegionNameAsString() + " since interrupted", ie);
2293               regionStates.updateRegionState(region, State.FAILED_OPEN);
2294               Thread.currentThread().interrupt();
2295               return;
2296             }
2297           }
2298         }
2299       }
2300       // Run out of attempts
2301       regionStates.updateRegionState(region, State.FAILED_OPEN);
2302     } finally {
2303       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2304     }
2305   }
2306 
2307   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2308     // Remove region from in-memory transition and unassigned node from ZK
2309     // While trying to enable the table the regions of the table were
2310     // already enabled.
2311     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2312       + " to " + sn);
2313     String encodedName = region.getEncodedName();
2314     
2315     //If use ZkForAssignment, region already Opened event should not be handled, 
2316     //leave it to zk event. See HBase-14407.
2317     if(useZKForAssignment){
2318       String node = ZKAssign.getNodeName(watcher, encodedName);
2319       Stat stat = new Stat();
2320       try {
2321         byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2322         if(existingBytes!=null){
2323           RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2324           EventType et = rt.getEventType();
2325           if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2326             LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2327               + " and node in "+et+" state");
2328             return;
2329           }
2330         }
2331       } catch (KeeperException ke) {
2332         LOG.warn("Unexpected ZK exception getData " + node
2333           + " node for the region " + encodedName, ke);
2334       } catch (DeserializationException e) {
2335         LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2336       }
2337       
2338       deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2339     }
2340     
2341     regionStates.regionOnline(region, sn);
2342   }
2343 
2344   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2345     if (this.tableStateManager.isTableState(region.getTable(),
2346         ZooKeeperProtos.Table.State.DISABLED,
2347         ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2348       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2349         + " skipping assign of " + region.getRegionNameAsString());
2350       offlineDisabledRegion(region);
2351       return true;
2352     }
2353     return false;
2354   }
2355 
2356   /**
2357    * Set region as OFFLINED up in zookeeper
2358    *
2359    * @param state
2360    * @return the version of the offline node if setting of the OFFLINE node was
2361    *         successful, -1 otherwise.
2362    */
2363   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2364     if (!state.isClosed() && !state.isOffline()) {
2365       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2366       this.server.abort(msg, new IllegalStateException(msg));
2367       return -1;
2368     }
2369     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2370     int versionOfOfflineNode;
2371     try {
2372       // get the version after setting the znode to OFFLINE
2373       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2374         state.getRegion(), destination);
2375       if (versionOfOfflineNode == -1) {
2376         LOG.warn("Attempted to create/force node into OFFLINE state before "
2377             + "completing assignment but failed to do so for " + state);
2378         return -1;
2379       }
2380     } catch (KeeperException e) {
2381       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2382       return -1;
2383     }
2384     return versionOfOfflineNode;
2385   }
2386 
2387   /**
2388    * @param region the region to assign
2389    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2390    * if no servers to assign, it returns null).
2391    */
2392   private RegionPlan getRegionPlan(final HRegionInfo region,
2393       final boolean forceNewPlan)  throws HBaseIOException {
2394     return getRegionPlan(region, null, forceNewPlan);
2395   }
2396 
2397   /**
2398    * @param region the region to assign
2399    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2400    * all servers are thought to be assignable.
2401    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2402    * will be generated.
2403    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2404    * if no servers to assign, it returns null).
2405    */
2406   private RegionPlan getRegionPlan(final HRegionInfo region,
2407       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2408     // Pickup existing plan or make a new one
2409     final String encodedName = region.getEncodedName();
2410     final List<ServerName> destServers =
2411       serverManager.createDestinationServersList(serverToExclude);
2412 
2413     if (destServers.isEmpty()){
2414       LOG.warn("Can't move " + encodedName +
2415         ", there is no destination server available.");
2416       return null;
2417     }
2418 
2419     RegionPlan randomPlan = null;
2420     boolean newPlan = false;
2421     RegionPlan existingPlan;
2422 
2423     synchronized (this.regionPlans) {
2424       existingPlan = this.regionPlans.get(encodedName);
2425 
2426       if (existingPlan != null && existingPlan.getDestination() != null) {
2427         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2428           + " destination server is " + existingPlan.getDestination() +
2429             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2430       }
2431 
2432       if (forceNewPlan
2433           || existingPlan == null
2434           || existingPlan.getDestination() == null
2435           || !destServers.contains(existingPlan.getDestination())) {
2436         newPlan = true;
2437       }
2438     }
2439 
2440     if (newPlan) {
2441       ServerName destination = balancer.randomAssignment(region, destServers);
2442       if (destination == null) {
2443         LOG.warn("Can't find a destination for " + encodedName);
2444         return null;
2445       }
2446       synchronized (this.regionPlans) {
2447         randomPlan = new RegionPlan(region, null, destination);
2448         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2449           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2450           regions.add(region);
2451           try {
2452             processFavoredNodes(regions);
2453           } catch (IOException ie) {
2454             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2455           }
2456         }
2457         this.regionPlans.put(encodedName, randomPlan);
2458       }
2459       LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2460           + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2461           + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2462           + ") available servers, forceNewPlan=" + forceNewPlan);
2463       return randomPlan;
2464     }
2465     LOG.debug("Using pre-existing plan for " +
2466       region.getRegionNameAsString() + "; plan=" + existingPlan);
2467     return existingPlan;
2468   }
2469 
2470   /**
2471    * Wait for some time before retrying meta table region assignment
2472    */
2473   private void waitForRetryingMetaAssignment() {
2474     try {
2475       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2476     } catch (InterruptedException e) {
2477       LOG.error("Got exception while waiting for hbase:meta assignment");
2478       Thread.currentThread().interrupt();
2479     }
2480   }
2481 
2482   /**
2483    * Unassigns the specified region.
2484    * <p>
2485    * Updates the RegionState and sends the CLOSE RPC unless region is being
2486    * split by regionserver; then the unassign fails (silently) because we
2487    * presume the region being unassigned no longer exists (its been split out
2488    * of existence). TODO: What to do if split fails and is rolled back and
2489    * parent is revivified?
2490    * <p>
2491    * If a RegionPlan is already set, it will remain.
2492    *
2493    * @param region server to be unassigned
2494    */
2495   public void unassign(HRegionInfo region) {
2496     unassign(region, false);
2497   }
2498 
2499 
2500   /**
2501    * Unassigns the specified region.
2502    * <p>
2503    * Updates the RegionState and sends the CLOSE RPC unless region is being
2504    * split by regionserver; then the unassign fails (silently) because we
2505    * presume the region being unassigned no longer exists (its been split out
2506    * of existence). TODO: What to do if split fails and is rolled back and
2507    * parent is revivified?
2508    * <p>
2509    * If a RegionPlan is already set, it will remain.
2510    *
2511    * @param region server to be unassigned
2512    * @param force if region should be closed even if already closing
2513    */
2514   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2515     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2516     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2517       + " (offlining), current state: " + regionStates.getRegionState(region));
2518 
2519     String encodedName = region.getEncodedName();
2520     // Grab the state of this region and synchronize on it
2521     int versionOfClosingNode = -1;
2522     // We need a lock here as we're going to do a put later and we don't want multiple states
2523     //  creation
2524     ReentrantLock lock = locker.acquireLock(encodedName);
2525     RegionState state = regionStates.getRegionTransitionState(encodedName);
2526     boolean reassign = true;
2527     try {
2528       if (state == null) {
2529         // Region is not in transition.
2530         // We can unassign it only if it's not SPLIT/MERGED.
2531         state = regionStates.getRegionState(encodedName);
2532         if (state != null && state.isUnassignable()) {
2533           LOG.info("Attempting to unassign " + state + ", ignored");
2534           // Offline region will be reassigned below
2535           return;
2536         }
2537         // Create the znode in CLOSING state
2538         try {
2539           if (state == null || state.getServerName() == null) {
2540             // We don't know where the region is, offline it.
2541             // No need to send CLOSE RPC
2542             LOG.warn("Attempting to unassign a region not in RegionStates "
2543               + region.getRegionNameAsString() + ", offlined");
2544             regionOffline(region);
2545             return;
2546           }
2547           if (useZKForAssignment) {
2548             versionOfClosingNode = ZKAssign.createNodeClosing(
2549               watcher, region, state.getServerName());
2550             if (versionOfClosingNode == -1) {
2551               LOG.info("Attempting to unassign " +
2552                 region.getRegionNameAsString() + " but ZK closing node "
2553                 + "can't be created.");
2554               reassign = false; // not unassigned at all
2555               return;
2556             }
2557           }
2558         } catch (KeeperException e) {
2559           if (e instanceof NodeExistsException) {
2560             // Handle race between master initiated close and regionserver
2561             // orchestrated splitting. See if existing node is in a
2562             // SPLITTING or SPLIT state.  If so, the regionserver started
2563             // an op on node before we could get our CLOSING in.  Deal.
2564             NodeExistsException nee = (NodeExistsException)e;
2565             String path = nee.getPath();
2566             try {
2567               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2568                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2569                   "skipping unassign because region no longer exists -- its split or merge");
2570                 reassign = false; // no need to reassign for split/merged region
2571                 return;
2572               }
2573             } catch (KeeperException.NoNodeException ke) {
2574               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2575                 "; presuming split and that the region to unassign, " +
2576                 encodedName + ", no longer exists -- confirm", ke);
2577               return;
2578             } catch (KeeperException ke) {
2579               LOG.error("Unexpected zk state", ke);
2580             } catch (DeserializationException de) {
2581               LOG.error("Failed parse", de);
2582             }
2583           }
2584           // If we get here, don't understand whats going on -- abort.
2585           server.abort("Unexpected ZK exception creating node CLOSING", e);
2586           reassign = false; // heading out already
2587           return;
2588         }
2589         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2590       } else if (state.isFailedOpen()) {
2591         // The region is not open yet
2592         regionOffline(region);
2593         return;
2594       } else if (force && state.isPendingCloseOrClosing()) {
2595         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2596           " which is already " + state.getState()  +
2597           " but forcing to send a CLOSE RPC again ");
2598         if (state.isFailedClose()) {
2599           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2600         }
2601         state.updateTimestampToNow();
2602       } else {
2603         LOG.debug("Attempting to unassign " +
2604           region.getRegionNameAsString() + " but it is " +
2605           "already in transition (" + state.getState() + ", force=" + force + ")");
2606         return;
2607       }
2608 
2609       unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2610     } finally {
2611       lock.unlock();
2612 
2613       // Region is expected to be reassigned afterwards
2614       if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2615         assign(region, true);
2616       }
2617     }
2618   }
2619 
2620   public void unassign(HRegionInfo region, boolean force){
2621      unassign(region, force, null);
2622   }
2623 
2624   /**
2625    * @param region regioninfo of znode to be deleted.
2626    */
2627   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2628     String encodedName = region.getEncodedName();
2629     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2630       EventType.RS_ZK_REGION_CLOSED);
2631   }
2632 
2633   /**
2634    * @param path
2635    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2636    * @throws KeeperException Can happen if the znode went away in meantime.
2637    * @throws DeserializationException
2638    */
2639   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2640       throws KeeperException, DeserializationException {
2641     boolean result = false;
2642     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2643     // cleaned up before we can get data from it.
2644     byte [] data = ZKAssign.getData(watcher, path);
2645     if (data == null) {
2646       LOG.info("Node " + path + " is gone");
2647       return false;
2648     }
2649     RegionTransition rt = RegionTransition.parseFrom(data);
2650     switch (rt.getEventType()) {
2651     case RS_ZK_REQUEST_REGION_SPLIT:
2652     case RS_ZK_REGION_SPLIT:
2653     case RS_ZK_REGION_SPLITTING:
2654     case RS_ZK_REQUEST_REGION_MERGE:
2655     case RS_ZK_REGION_MERGED:
2656     case RS_ZK_REGION_MERGING:
2657       result = true;
2658       break;
2659     default:
2660       LOG.info("Node " + path + " is in " + rt.getEventType());
2661       break;
2662     }
2663     return result;
2664   }
2665 
2666   /**
2667    * Used by unit tests. Return the number of regions opened so far in the life
2668    * of the master. Increases by one every time the master opens a region
2669    * @return the counter value of the number of regions opened so far
2670    */
2671   public int getNumRegionsOpened() {
2672     return numRegionsOpened.get();
2673   }
2674 
2675   /**
2676    * Waits until the specified region has completed assignment.
2677    * <p>
2678    * If the region is already assigned, returns immediately.  Otherwise, method
2679    * blocks until the region is assigned.
2680    * @param regionInfo region to wait on assignment for
2681    * @return true if the region is assigned false otherwise.
2682    * @throws InterruptedException
2683    */
2684   public boolean waitForAssignment(HRegionInfo regionInfo)
2685       throws InterruptedException {
2686     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2687     regionSet.add(regionInfo);
2688     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2689   }
2690 
2691   /**
2692    * Waits until the specified region has completed assignment, or the deadline is reached.
2693    */
2694   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2695       final boolean waitTillAllAssigned, final int reassigningRegions,
2696       final long minEndTime) throws InterruptedException {
2697     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2698     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2699   }
2700 
2701   /**
2702    * Waits until the specified region has completed assignment, or the deadline is reached.
2703    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
2704    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
2705    * @param deadline the timestamp after which the wait is aborted
2706    * @return true if all the regions are assigned false otherwise.
2707    * @throws InterruptedException
2708    */
2709   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2710       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2711     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
2712     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2713       int failedOpenCount = 0;
2714       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2715       while (regionInfoIterator.hasNext()) {
2716         HRegionInfo hri = regionInfoIterator.next();
2717         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2718             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2719           regionInfoIterator.remove();
2720         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2721           failedOpenCount++;
2722         }
2723       }
2724       if (!waitTillAllAssigned) {
2725         // No need to wait, let assignment going on asynchronously
2726         break;
2727       }
2728       if (!regionSet.isEmpty()) {
2729         if (failedOpenCount == regionSet.size()) {
2730           // all the regions we are waiting had an error on open.
2731           break;
2732         }
2733         regionStates.waitForUpdate(100);
2734       }
2735     }
2736     return regionSet.isEmpty();
2737   }
2738 
2739   /**
2740    * Assigns the hbase:meta region or a replica.
2741    * <p>
2742    * Assumes that hbase:meta is currently closed and is not being actively served by
2743    * any RegionServer.
2744    * <p>
2745    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2746    * hbase:meta to a random RegionServer.
2747    * @param hri TODO
2748    * @throws KeeperException
2749    */
2750   public void assignMeta(HRegionInfo hri) throws KeeperException {
2751     this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2752     assign(hri, true);
2753   }
2754 
2755   /**
2756    * Assigns specified regions retaining assignments, if any.
2757    * <p>
2758    * This is a synchronous call and will return once every region has been
2759    * assigned.  If anything fails, an exception is thrown
2760    * @throws InterruptedException
2761    * @throws IOException
2762    */
2763   public void assign(Map<HRegionInfo, ServerName> regions)
2764         throws IOException, InterruptedException {
2765     if (regions == null || regions.isEmpty()) {
2766       return;
2767     }
2768     List<ServerName> servers = serverManager.createDestinationServersList();
2769     if (servers == null || servers.isEmpty()) {
2770       throw new IOException("Found no destination server to assign region(s)");
2771     }
2772 
2773     // Reuse existing assignment info
2774     Map<ServerName, List<HRegionInfo>> bulkPlan =
2775       balancer.retainAssignment(regions, servers);
2776     if (bulkPlan == null) {
2777       throw new IOException("Unable to determine a plan to assign region(s)");
2778     }
2779 
2780     assign(regions.size(), servers.size(),
2781       "retainAssignment=true", bulkPlan);
2782   }
2783 
2784   /**
2785    * Assigns specified regions round robin, if any.
2786    * <p>
2787    * This is a synchronous call and will return once every region has been
2788    * assigned.  If anything fails, an exception is thrown
2789    * @throws InterruptedException
2790    * @throws IOException
2791    */
2792   public void assign(List<HRegionInfo> regions)
2793         throws IOException, InterruptedException {
2794     if (regions == null || regions.isEmpty()) {
2795       return;
2796     }
2797 
2798     List<ServerName> servers = serverManager.createDestinationServersList();
2799     if (servers == null || servers.isEmpty()) {
2800       throw new IOException("Found no destination server to assign region(s)");
2801     }
2802 
2803     // Generate a round-robin bulk assignment plan
2804     Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
2805     if (bulkPlan == null) {
2806       throw new IOException("Unable to determine a plan to assign region(s)");
2807     }
2808 
2809     processFavoredNodes(regions);
2810     assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
2811   }
2812 
2813   private void assign(int regions, int totalServers,
2814       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2815           throws InterruptedException, IOException {
2816 
2817     int servers = bulkPlan.size();
2818     if (servers == 1 || (regions < bulkAssignThresholdRegions
2819         && servers < bulkAssignThresholdServers)) {
2820 
2821       // Not use bulk assignment.  This could be more efficient in small
2822       // cluster, especially mini cluster for testing, so that tests won't time out
2823       if (LOG.isTraceEnabled()) {
2824         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2825           " region(s) to " + servers + " server(s)");
2826       }
2827 
2828       // invoke assignment (async)
2829       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2830       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2831         if (!assign(plan.getKey(), plan.getValue())) {
2832           for (HRegionInfo region: plan.getValue()) {
2833             if (!regionStates.isRegionOnline(region)) {
2834               invokeAssign(region);
2835               if (!region.getTable().isSystemTable()) {
2836                 userRegionSet.add(region);
2837               }
2838             }
2839           }
2840         }
2841       }
2842 
2843       // wait for assignment completion
2844       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2845             System.currentTimeMillis())) {
2846         LOG.debug("some user regions are still in transition: " + userRegionSet);
2847       }
2848     } else {
2849       LOG.info("Bulk assigning " + regions + " region(s) across "
2850         + totalServers + " server(s), " + message);
2851 
2852       // Use fixed count thread pool assigning.
2853       BulkAssigner ba = new GeneralBulkAssigner(
2854         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2855       ba.bulkAssign();
2856       LOG.info("Bulk assigning done");
2857     }
2858   }
2859 
2860   /**
2861    * Assigns all user regions, if any exist.  Used during cluster startup.
2862    * <p>
2863    * This is a synchronous call and will return once every region has been
2864    * assigned.  If anything fails, an exception is thrown and the cluster
2865    * should be shutdown.
2866    * @throws InterruptedException
2867    * @throws IOException
2868    */
2869   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2870       throws IOException, InterruptedException {
2871     if (allRegions == null || allRegions.isEmpty()) return;
2872 
2873     // Determine what type of assignment to do on startup
2874     boolean retainAssignment = server.getConfiguration().
2875       getBoolean("hbase.master.startup.retainassign", true);
2876 
2877     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2878     if (retainAssignment) {
2879       assign(allRegions);
2880     } else {
2881       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2882       assign(regions);
2883     }
2884 
2885     for (HRegionInfo hri : regionsFromMetaScan) {
2886       TableName tableName = hri.getTable();
2887       if (!tableStateManager.isTableState(tableName,
2888           ZooKeeperProtos.Table.State.ENABLED)) {
2889         setEnabledTable(tableName);
2890       }
2891     }
2892     // assign all the replicas that were not recorded in the meta
2893     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server));
2894   }
2895 
2896   /**
2897    * Get a list of replica regions that are:
2898    * not recorded in meta yet. We might not have recorded the locations
2899    * for the replicas since the replicas may not have been online yet, master restarted
2900    * in the middle of assigning, ZK erased, etc.
2901    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
2902    * either as a default, or, as the location of a replica
2903    * @param master
2904    * @return list of replica regions
2905    * @throws IOException
2906    */
2907   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
2908       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
2909     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
2910     for (HRegionInfo hri : regionsRecordedInMeta) {
2911       TableName table = hri.getTable();
2912       HTableDescriptor htd = master.getTableDescriptors().get(table);
2913       // look at the HTD for the replica count. That's the source of truth
2914       int desiredRegionReplication = htd.getRegionReplication();
2915       for (int i = 0; i < desiredRegionReplication; i++) {
2916         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
2917         if (regionsRecordedInMeta.contains(replica)) continue;
2918         regionsNotRecordedInMeta.add(replica);
2919       }
2920     }
2921     return regionsNotRecordedInMeta;
2922   }
2923 
2924   /**
2925    * Wait until no regions in transition.
2926    * @param timeout How long to wait.
2927    * @return True if nothing in regions in transition.
2928    * @throws InterruptedException
2929    */
2930   boolean waitUntilNoRegionsInTransition(final long timeout)
2931       throws InterruptedException {
2932     // Blocks until there are no regions in transition. It is possible that
2933     // there
2934     // are regions in transition immediately after this returns but guarantees
2935     // that if it returns without an exception that there was a period of time
2936     // with no regions in transition from the point-of-view of the in-memory
2937     // state of the Master.
2938     final long endTime = System.currentTimeMillis() + timeout;
2939 
2940     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2941         && endTime > System.currentTimeMillis()) {
2942       regionStates.waitForUpdate(100);
2943     }
2944 
2945     return !regionStates.isRegionsInTransition();
2946   }
2947 
2948   /**
2949    * Rebuild the list of user regions and assignment information.
2950    * Updates regionstates with findings as we go through list of regions.
2951    * @return set of servers not online that hosted some regions according to a scan of hbase:meta
2952    * @throws IOException
2953    */
2954   Set<ServerName> rebuildUserRegions() throws
2955       IOException, KeeperException, CoordinatedStateException {
2956     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2957       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2958 
2959     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2960       ZooKeeperProtos.Table.State.DISABLED,
2961       ZooKeeperProtos.Table.State.DISABLING,
2962       ZooKeeperProtos.Table.State.ENABLING);
2963 
2964     // Region assignment from META
2965     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
2966     // Get any new but slow to checkin region server that joined the cluster
2967     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2968     // Set of offline servers to be returned
2969     Set<ServerName> offlineServers = new HashSet<ServerName>();
2970     // Iterate regions in META
2971     for (Result result : results) {
2972       if (result == null && LOG.isDebugEnabled()){
2973         LOG.debug("null result from meta - ignoring but this is strange.");
2974         continue;
2975       }
2976       // keep a track of replicas to close. These were the replicas of the originally
2977       // unmerged regions. The master might have closed them before but it mightn't
2978       // maybe because it crashed.
2979       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
2980       if (p.getFirst() != null && p.getSecond() != null) {
2981         int numReplicas = server.getTableDescriptors().get(p.getFirst().
2982             getTable()).getRegionReplication();
2983         for (HRegionInfo merge : p) {
2984           for (int i = 1; i < numReplicas; i++) {
2985             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
2986           }
2987         }
2988       }
2989       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
2990       if (rl == null) continue;
2991       HRegionLocation[] locations = rl.getRegionLocations();
2992       if (locations == null) continue;
2993       for (HRegionLocation hrl : locations) {
2994         if (hrl == null) continue;
2995         HRegionInfo regionInfo = hrl.getRegionInfo();
2996         if (regionInfo == null) continue;
2997         int replicaId = regionInfo.getReplicaId();
2998         State state = RegionStateStore.getRegionState(result, replicaId);
2999         // keep a track of replicas to close. These were the replicas of the split parents
3000         // from the previous life of the master. The master should have closed them before
3001         // but it couldn't maybe because it crashed
3002         if (replicaId == 0 && state.equals(State.SPLIT)) {
3003           for (HRegionLocation h : locations) {
3004             replicasToClose.add(h.getRegionInfo());
3005           }
3006         }
3007         ServerName lastHost = hrl.getServerName();
3008         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3009         if (tableStateManager.isTableState(regionInfo.getTable(),
3010              ZooKeeperProtos.Table.State.DISABLED)) {
3011           // force region to forget it hosts for disabled/disabling tables.
3012           // see HBASE-13326
3013           lastHost = null;
3014           regionLocation = null;
3015         }
3016         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3017         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3018           // Region is not open (either offline or in transition), skip
3019           continue;
3020         }
3021         TableName tableName = regionInfo.getTable();
3022         if (!onlineServers.contains(regionLocation)) {
3023           // Region is located on a server that isn't online
3024           offlineServers.add(regionLocation);
3025           if (useZKForAssignment) {
3026             regionStates.regionOffline(regionInfo);
3027           }
3028         } else if (!disabledOrEnablingTables.contains(tableName)) {
3029           // Region is being served and on an active server
3030           // add only if region not in disabled or enabling table
3031           regionStates.regionOnline(regionInfo, regionLocation);
3032           balancer.regionOnline(regionInfo, regionLocation);
3033         } else if (useZKForAssignment) {
3034           regionStates.regionOffline(regionInfo);
3035         }
3036         // need to enable the table if not disabled or disabling or enabling
3037         // this will be used in rolling restarts
3038         if (!disabledOrDisablingOrEnabling.contains(tableName)
3039           && !getTableStateManager().isTableState(tableName,
3040             ZooKeeperProtos.Table.State.ENABLED)) {
3041           setEnabledTable(tableName);
3042         }
3043       }
3044     }
3045     return offlineServers;
3046   }
3047 
3048   /**
3049    * Recover the tables that were not fully moved to DISABLED state. These
3050    * tables are in DISABLING state when the master restarted/switched.
3051    *
3052    * @throws KeeperException
3053    * @throws TableNotFoundException
3054    * @throws IOException
3055    */
3056   private void recoverTableInDisablingState()
3057       throws KeeperException, IOException, CoordinatedStateException {
3058     Set<TableName> disablingTables =
3059       tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3060     if (disablingTables.size() != 0) {
3061       for (TableName tableName : disablingTables) {
3062         // Recover by calling DisableTableHandler
3063         LOG.info("The table " + tableName
3064             + " is in DISABLING state.  Hence recovering by moving the table"
3065             + " to DISABLED state.");
3066         new DisableTableHandler(this.server, tableName,
3067             this, tableLockManager, true).prepare().process();
3068       }
3069     }
3070   }
3071 
3072   /**
3073    * Recover the tables that are not fully moved to ENABLED state. These tables
3074    * are in ENABLING state when the master restarted/switched
3075    *
3076    * @throws KeeperException
3077    * @throws org.apache.hadoop.hbase.TableNotFoundException
3078    * @throws IOException
3079    */
3080   private void recoverTableInEnablingState()
3081       throws KeeperException, IOException, CoordinatedStateException {
3082     Set<TableName> enablingTables = tableStateManager.
3083       getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3084     if (enablingTables.size() != 0) {
3085       for (TableName tableName : enablingTables) {
3086         // Recover by calling EnableTableHandler
3087         LOG.info("The table " + tableName
3088             + " is in ENABLING state.  Hence recovering by moving the table"
3089             + " to ENABLED state.");
3090         // enableTable in sync way during master startup,
3091         // no need to invoke coprocessor
3092         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3093           this, tableLockManager, true);
3094         try {
3095           eth.prepare();
3096         } catch (TableNotFoundException e) {
3097           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3098           continue;
3099         }
3100         eth.process();
3101       }
3102     }
3103   }
3104 
3105   /**
3106    * Processes list of dead servers from result of hbase:meta scan and regions in RIT.
3107    * This is used for failover to recover the lost regions that belonged to
3108    * RegionServers which failed while there was no active master or are offline for whatever
3109    * reason and for regions that were in RIT.
3110    *
3111    * @param deadServers
3112    *          The list of dead servers which failed while there was no active master. Can be null.
3113    * @throws IOException
3114    * @throws KeeperException
3115    */
3116   private void processDeadServersAndRecoverLostRegions(Set<ServerName> deadServers)
3117   throws IOException, KeeperException {
3118     if (deadServers != null && !deadServers.isEmpty()) {
3119       for (ServerName serverName: deadServers) {
3120         if (!serverManager.isServerDead(serverName)) {
3121           serverManager.expireServer(serverName); // Let SSH do region re-assign
3122         }
3123       }
3124     }
3125 
3126     List<String> nodes = useZKForAssignment ?
3127       ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3128       : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3129     if (nodes != null && !nodes.isEmpty()) {
3130       for (String encodedRegionName : nodes) {
3131         processRegionInTransition(encodedRegionName, null);
3132       }
3133     } else if (!useZKForAssignment) {
3134       processRegionInTransitionZkLess();
3135     }
3136   }
3137 
3138   void processRegionInTransitionZkLess() {
3139     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
3140     // in case the RPC call is not sent out yet before the master was shut down
3141     // since we update the state before we send the RPC call. We can't update
3142     // the state after the RPC call. Otherwise, we don't know what's happened
3143     // to the region if the master dies right after the RPC call is out.
3144     Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3145     for (RegionState regionState : rits.values()) {
3146       LOG.info("Processing " + regionState);
3147       ServerName serverName = regionState.getServerName();
3148       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
3149       // case, try assigning it here.
3150       if (serverName != null
3151           && !serverManager.getOnlineServers().containsKey(serverName)) {
3152         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3153         continue;
3154       }
3155       HRegionInfo regionInfo = regionState.getRegion();
3156       State state = regionState.getState();
3157 
3158       switch (state) {
3159       case CLOSED:
3160         invokeAssign(regionInfo);
3161         break;
3162       case PENDING_OPEN:
3163         retrySendRegionOpen(regionState);
3164         break;
3165       case PENDING_CLOSE:
3166         retrySendRegionClose(regionState);
3167         break;
3168       case FAILED_CLOSE:
3169       case FAILED_OPEN:
3170         invokeUnAssign(regionInfo);
3171         break;
3172       default:
3173         // No process for other states
3174       }
3175     }
3176   }
3177 
3178   /**
3179    * At master failover, for pending_open region, make sure
3180    * sendRegionOpen RPC call is sent to the target regionserver
3181    */
3182   private void retrySendRegionOpen(final RegionState regionState) {
3183     this.executorService.submit(
3184       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3185         @Override
3186         public void process() throws IOException {
3187           HRegionInfo hri = regionState.getRegion();
3188           ServerName serverName = regionState.getServerName();
3189           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3190           try {
3191             for (int i = 1; i <= maximumAttempts; i++) {
3192               if (!serverManager.isServerOnline(serverName)
3193                   || server.isStopped() || server.isAborted()) {
3194                 return; // No need any more
3195               }
3196               try {
3197                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3198                   return; // Region is not in the expected state any more
3199                 }
3200                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3201                 if (shouldAssignRegionsWithFavoredNodes) {
3202                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3203                 }
3204                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3205                   serverName, hri, -1, favoredNodes);
3206 
3207                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3208                   // Failed opening this region, this means the target server didn't get
3209                   // the original region open RPC, so re-assign it with a new plan
3210                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
3211                     + regionState + ", re-assign it");
3212                   invokeAssign(hri, true);
3213                 }
3214                 return; // Done.
3215               } catch (Throwable t) {
3216                 if (t instanceof RemoteException) {
3217                   t = ((RemoteException) t).unwrapRemoteException();
3218                 }
3219                 // In case SocketTimeoutException/FailedServerException, retry
3220                 if (t instanceof java.net.SocketTimeoutException
3221                     || t instanceof FailedServerException) {
3222                   Threads.sleep(100);
3223                   continue;
3224                 }
3225                 // For other exceptions, re-assign it
3226                 LOG.debug("Got exception in retry sendRegionOpen for "
3227                   + regionState + ", re-assign it", t);
3228                 invokeAssign(hri);
3229                 return; // Done.
3230               }
3231             }
3232           } finally {
3233             lock.unlock();
3234           }
3235         }
3236       });
3237   }
3238 
3239   /**
3240    * At master failover, for pending_close region, make sure
3241    * sendRegionClose RPC call is sent to the target regionserver
3242    */
3243   private void retrySendRegionClose(final RegionState regionState) {
3244     this.executorService.submit(
3245       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3246         @Override
3247         public void process() throws IOException {
3248           HRegionInfo hri = regionState.getRegion();
3249           ServerName serverName = regionState.getServerName();
3250           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3251           try {
3252             for (int i = 1; i <= maximumAttempts; i++) {
3253               if (!serverManager.isServerOnline(serverName)
3254                   || server.isStopped() || server.isAborted()) {
3255                 return; // No need any more
3256               }
3257               try {
3258                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3259                   return; // Region is not in the expected state any more
3260                 }
3261                 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3262                   // This means the region is still on the target server
3263                   LOG.debug("Got false in retry sendRegionClose for "
3264                     + regionState + ", re-close it");
3265                   invokeUnAssign(hri);
3266                 }
3267                 return; // Done.
3268               } catch (Throwable t) {
3269                 if (t instanceof RemoteException) {
3270                   t = ((RemoteException) t).unwrapRemoteException();
3271                 }
3272                 // In case SocketTimeoutException/FailedServerException, retry
3273                 if (t instanceof java.net.SocketTimeoutException
3274                     || t instanceof FailedServerException) {
3275                   Threads.sleep(100);
3276                   continue;
3277                 }
3278                 if (!(t instanceof NotServingRegionException
3279                     || t instanceof RegionAlreadyInTransitionException)) {
3280                   // NotServingRegionException/RegionAlreadyInTransitionException
3281                   // means the target server got the original region close request.
3282                   // For other exceptions, re-close it
3283                   LOG.debug("Got exception in retry sendRegionClose for "
3284                     + regionState + ", re-close it", t);
3285                   invokeUnAssign(hri);
3286                 }
3287                 return; // Done.
3288               }
3289             }
3290           } finally {
3291             lock.unlock();
3292           }
3293         }
3294       });
3295   }
3296 
3297   /**
3298    * Set Regions in transitions metrics.
3299    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
3300    * This iterator is not fail fast, which may lead to stale read; but that's better than
3301    * creating a copy of the map for metrics computation, as this method will be invoked
3302    * on a frequent interval.
3303    */
3304   public void updateRegionsInTransitionMetrics() {
3305     long currentTime = System.currentTimeMillis();
3306     int totalRITs = 0;
3307     int totalRITsOverThreshold = 0;
3308     long oldestRITTime = 0;
3309     int ritThreshold = this.server.getConfiguration().
3310       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3311     for (RegionState state: regionStates.getRegionsInTransition().values()) {
3312       totalRITs++;
3313       long ritTime = currentTime - state.getStamp();
3314       if (ritTime > ritThreshold) { // more than the threshold
3315         totalRITsOverThreshold++;
3316       }
3317       if (oldestRITTime < ritTime) {
3318         oldestRITTime = ritTime;
3319       }
3320     }
3321     if (this.metricsAssignmentManager != null) {
3322       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3323       this.metricsAssignmentManager.updateRITCount(totalRITs);
3324       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3325     }
3326   }
3327 
3328   /**
3329    * @param region Region whose plan we are to clear.
3330    */
3331   void clearRegionPlan(final HRegionInfo region) {
3332     synchronized (this.regionPlans) {
3333       this.regionPlans.remove(region.getEncodedName());
3334     }
3335   }
3336 
3337   /**
3338    * Wait on region to clear regions-in-transition.
3339    * @param hri Region to wait on.
3340    * @throws IOException
3341    */
3342   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3343       throws IOException, InterruptedException {
3344     waitOnRegionToClearRegionsInTransition(hri, -1L);
3345   }
3346 
3347   /**
3348    * Wait on region to clear regions-in-transition or time out
3349    * @param hri
3350    * @param timeOut Milliseconds to wait for current region to be out of transition state.
3351    * @return True when a region clears regions-in-transition before timeout otherwise false
3352    * @throws InterruptedException
3353    */
3354   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3355       throws InterruptedException {
3356     if (!regionStates.isRegionInTransition(hri)) return true;
3357     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3358         + timeOut;
3359     // There is already a timeout monitor on regions in transition so I
3360     // should not have to have one here too?
3361     LOG.info("Waiting for " + hri.getEncodedName() +
3362         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3363     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3364       regionStates.waitForUpdate(100);
3365       if (EnvironmentEdgeManager.currentTime() > end) {
3366         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3367         return false;
3368       }
3369     }
3370     if (this.server.isStopped()) {
3371       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3372       return false;
3373     }
3374     return true;
3375   }
3376 
3377   void invokeAssign(HRegionInfo regionInfo) {
3378     invokeAssign(regionInfo, true);
3379   }
3380 
3381   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3382     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3383   }
3384 
3385   void invokeUnAssign(HRegionInfo regionInfo) {
3386     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3387   }
3388 
3389   public ServerHostRegion isCarryingMeta(ServerName serverName) {
3390     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3391   }
3392 
3393   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3394     return isCarryingRegion(serverName,
3395         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3396   }
3397 
3398   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3399     return isCarryingRegion(serverName, metaHri);
3400   }
3401 
3402   /**
3403    * Check if the shutdown server carries the specific region.
3404    * We have a bunch of places that store region location
3405    * Those values aren't consistent. There is a delay of notification.
3406    * The location from zookeeper unassigned node has the most recent data;
3407    * but the node could be deleted after the region is opened by AM.
3408    * The AM's info could be old when OpenedRegionHandler
3409    * processing hasn't finished yet when server shutdown occurs.
3410    * @return whether the serverName currently hosts the region
3411    */
3412   private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3413     RegionTransition rt = null;
3414     try {
3415       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3416       // This call can legitimately come by null
3417       rt = data == null? null: RegionTransition.parseFrom(data);
3418     } catch (KeeperException e) {
3419       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3420     } catch (DeserializationException e) {
3421       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3422     }
3423 
3424     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3425     if (addressFromZK != null) {
3426       // if we get something from ZK, we will use the data
3427       boolean matchZK = addressFromZK.equals(serverName);
3428       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3429         " current=" + serverName + ", matches=" + matchZK);
3430       return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3431     }
3432 
3433     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3434     if (LOG.isDebugEnabled()) {
3435       LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3436         " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3437         " server being checked: " + serverName);
3438     }
3439     if (addressFromAM != null) {
3440       return addressFromAM.equals(serverName) ?
3441           ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3442     }
3443 
3444     if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3445       // For the Meta region (default replica), we can do one more check on MetaTableLocator
3446       final ServerName serverNameInZK =
3447           server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3448       if (LOG.isDebugEnabled()) {
3449         LOG.debug("Based on MetaTableLocator, the META region is on server=" +
3450           (serverNameInZK == null ? "null" : serverNameInZK) +
3451           " server being checked: " + serverName);
3452       }
3453       if (serverNameInZK != null) {
3454         return serverNameInZK.equals(serverName) ?
3455             ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3456       }
3457     }
3458 
3459     // Checked everywhere, if reaching here, we are unsure whether the server is carrying region.
3460     return ServerHostRegion.UNKNOWN;
3461   }
3462 
3463   /**
3464    * Clean out crashed server removing any assignments.
3465    * @param sn Server that went down.
3466    * @return list of regions in transition on this server
3467    */
3468   public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
3469     // Clean out any existing assignment plans for this server
3470     synchronized (this.regionPlans) {
3471       for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
3472           i.hasNext();) {
3473         Map.Entry<String, RegionPlan> e = i.next();
3474         ServerName otherSn = e.getValue().getDestination();
3475         // The name will be null if the region is planned for a random assign.
3476         if (otherSn != null && otherSn.equals(sn)) {
3477           // Use iterator's remove else we'll get CME
3478           i.remove();
3479         }
3480       }
3481     }
3482     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3483     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3484       HRegionInfo hri = it.next();
3485       String encodedName = hri.getEncodedName();
3486 
3487       // We need a lock on the region as we could update it
3488       Lock lock = locker.acquireLock(encodedName);
3489       try {
3490         RegionState regionState = regionStates.getRegionTransitionState(encodedName);
3491         if (regionState == null
3492             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3493             || !(regionState.isFailedClose() || regionState.isOffline()
3494               || regionState.isPendingOpenOrOpening())) {
3495           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3496             + " on the dead server any more: " + sn);
3497           it.remove();
3498         } else {
3499           try {
3500             // Delete the ZNode if exists
3501             ZKAssign.deleteNodeFailSilent(watcher, hri);
3502           } catch (KeeperException ke) {
3503             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3504           }
3505           if (tableStateManager.isTableState(hri.getTable(),
3506               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3507             regionStates.regionOffline(hri);
3508             it.remove();
3509             continue;
3510           }
3511           // Mark the region offline and assign it again by SSH
3512           regionStates.updateRegionState(hri, State.OFFLINE);
3513         }
3514       } finally {
3515         lock.unlock();
3516       }
3517     }
3518     return regions;
3519   }
3520 
3521   /**
3522    * @param plan Plan to execute.
3523    */
3524   public void balance(final RegionPlan plan) {
3525 
3526     HRegionInfo hri = plan.getRegionInfo();
3527     TableName tableName = hri.getTable();
3528     if (tableStateManager.isTableState(tableName,
3529       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3530       LOG.info("Ignored moving region of disabling/disabled table "
3531         + tableName);
3532       return;
3533     }
3534 
3535     // Move the region only if it's assigned
3536     String encodedName = hri.getEncodedName();
3537     ReentrantLock lock = locker.acquireLock(encodedName);
3538     try {
3539       if (!regionStates.isRegionOnline(hri)) {
3540         RegionState state = regionStates.getRegionState(encodedName);
3541         LOG.info("Ignored moving region not assigned: " + hri + ", "
3542           + (state == null ? "not in region states" : state));
3543         return;
3544       }
3545       synchronized (this.regionPlans) {
3546         this.regionPlans.put(plan.getRegionName(), plan);
3547       }
3548       unassign(hri, false, plan.getDestination());
3549     } finally {
3550       lock.unlock();
3551     }
3552   }
3553 
3554   public void stop() {
3555     shutdown(); // Stop executor service, etc
3556   }
3557 
3558   /**
3559    * Shutdown the threadpool executor service
3560    */
3561   public void shutdown() {
3562     // It's an immediate shutdown, so we're clearing the remaining tasks.
3563     synchronized (zkEventWorkerWaitingList){
3564       zkEventWorkerWaitingList.clear();
3565     }
3566 
3567     // Shutdown the threadpool executor service
3568     threadPoolExecutorService.shutdownNow();
3569     zkEventWorkers.shutdownNow();
3570     regionStateStore.stop();
3571   }
3572 
3573   protected void setEnabledTable(TableName tableName) {
3574     try {
3575       this.tableStateManager.setTableState(tableName,
3576         ZooKeeperProtos.Table.State.ENABLED);
3577     } catch (CoordinatedStateException e) {
3578       // here we can abort as it is the start up flow
3579       String errorMsg = "Unable to ensure that the table " + tableName
3580           + " will be" + " enabled because of a ZooKeeper issue";
3581       LOG.error(errorMsg);
3582       this.server.abort(errorMsg, e);
3583     }
3584   }
3585 
3586   /**
3587    * Set region as OFFLINED up in zookeeper asynchronously.
3588    * @param state
3589    * @return True if we succeeded, false otherwise (State was incorrect or failed
3590    * updating zk).
3591    */
3592   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3593       final AsyncCallback.StringCallback cb, final ServerName destination) {
3594     if (!state.isClosed() && !state.isOffline()) {
3595       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3596         new IllegalStateException());
3597       return false;
3598     }
3599     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3600     try {
3601       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3602         destination, cb, state);
3603     } catch (KeeperException e) {
3604       if (e instanceof NodeExistsException) {
3605         LOG.warn("Node for " + state.getRegion() + " already exists");
3606       } else {
3607         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3608       }
3609       return false;
3610     }
3611     return true;
3612   }
3613 
3614   private boolean deleteNodeInStates(String encodedName,
3615       String desc, ServerName sn, EventType... types) {
3616     try {
3617       for (EventType et: types) {
3618         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3619           return true;
3620         }
3621       }
3622       LOG.info("Failed to delete the " + desc + " node for "
3623         + encodedName + ". The node type may not match");
3624     } catch (NoNodeException e) {
3625       if (LOG.isDebugEnabled()) {
3626         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3627       }
3628     } catch (KeeperException ke) {
3629       server.abort("Unexpected ZK exception deleting " + desc
3630         + " node for the region " + encodedName, ke);
3631     }
3632     return false;
3633   }
3634 
3635   private void deleteMergingNode(String encodedName, ServerName sn) {
3636     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3637       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3638   }
3639 
3640   private void deleteSplittingNode(String encodedName, ServerName sn) {
3641     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3642       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3643   }
3644 
3645   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
3646       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
3647       justification="Modification of Maps not ATOMIC!!!! FIX!!!")
3648   private void onRegionFailedOpen(
3649       final HRegionInfo hri, final ServerName sn) {
3650     String encodedName = hri.getEncodedName();
3651     // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
3652     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3653     if (failedOpenCount == null) {
3654       failedOpenCount = new AtomicInteger();
3655       // No need to use putIfAbsent, or extra synchronization since
3656       // this whole handleRegion block is locked on the encoded region
3657       // name, and failedOpenTracker is updated only in this block
3658       // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
3659       failedOpenTracker.put(encodedName, failedOpenCount);
3660     }
3661     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3662       // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
3663       regionStates.updateRegionState(hri, State.FAILED_OPEN);
3664       // remove the tracking info to save memory, also reset
3665       // the count for next open initiative
3666       failedOpenTracker.remove(encodedName);
3667     } else {
3668       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3669         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
3670         // so that we are aware of potential problem if it persists for a long time.
3671         LOG.warn("Failed to open the hbase:meta region " +
3672             hri.getRegionNameAsString() + " after" +
3673             failedOpenCount.get() + " retries. Continue retrying.");
3674       }
3675 
3676       // Handle this the same as if it were opened and then closed.
3677       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3678       if (regionState != null) {
3679         // When there are more than one region server a new RS is selected as the
3680         // destination and the same is updated in the region plan. (HBASE-5546)
3681         if (getTableStateManager().isTableState(hri.getTable(),
3682             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3683             replicasToClose.contains(hri)) {
3684           offlineDisabledRegion(hri);
3685           return;
3686         }
3687         // ZK Node is in CLOSED state, assign it.
3688          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3689         // This below has to do w/ online enable/disable of a table
3690         removeClosedRegion(hri);
3691         try {
3692           getRegionPlan(hri, sn, true);
3693         } catch (HBaseIOException e) {
3694           LOG.warn("Failed to get region plan", e);
3695         }
3696         invokeAssign(hri, false);
3697       }
3698     }
3699   }
3700 
3701   private void onRegionOpen(final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3702     regionOnline(hri, sn, openSeqNum);
3703     if (useZKForAssignment) {
3704       try {
3705         // Delete the ZNode if exists
3706         ZKAssign.deleteNodeFailSilent(watcher, hri);
3707       } catch (KeeperException ke) {
3708         server.abort("Unexpected ZK exception deleting node " + hri, ke);
3709       }
3710     }
3711 
3712     // reset the count, if any
3713     failedOpenTracker.remove(hri.getEncodedName());
3714     if (getTableStateManager().isTableState(hri.getTable(),
3715         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3716       invokeUnAssign(hri);
3717     }
3718   }
3719 
3720   private void onRegionClosed(final HRegionInfo hri) {
3721     if (getTableStateManager().isTableState(hri.getTable(),
3722         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3723         replicasToClose.contains(hri)) {
3724       offlineDisabledRegion(hri);
3725       return;
3726     }
3727     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3728     sendRegionClosedNotification(hri);
3729     // This below has to do w/ online enable/disable of a table
3730     removeClosedRegion(hri);
3731     invokeAssign(hri, false);
3732   }
3733 
3734   private String checkInStateForSplit(ServerName sn,
3735       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3736     final RegionState rs_p = regionStates.getRegionState(p);
3737     RegionState rs_a = regionStates.getRegionState(a);
3738     RegionState rs_b = regionStates.getRegionState(b);
3739     if (!(rs_p.isOpenOrSplittingOnServer(sn)
3740         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3741         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3742       return "Not in state good for split";
3743     }
3744     return "";
3745   }
3746 
3747   private String onRegionSplitReverted(ServerName sn,
3748       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3749     String s = checkInStateForSplit(sn, p, a, b);
3750     if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3751       return s;
3752     }
3753     regionOnline(p, sn);
3754     regionOffline(a);
3755     regionOffline(b);
3756 
3757     if (getTableStateManager().isTableState(p.getTable(),
3758         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3759       invokeUnAssign(p);
3760     }
3761     return null;
3762   }
3763 
3764   private String onRegionSplit(ServerName sn, TransitionCode code,
3765       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3766     String s = checkInStateForSplit(sn, p, a, b);
3767     if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3768       return s;
3769     }
3770 
3771     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3772     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3773     regionStates.updateRegionState(p, State.SPLITTING);
3774 
3775     if (code == TransitionCode.SPLIT) {
3776       if (TEST_SKIP_SPLIT_HANDLING) {
3777         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3778       }
3779       regionOffline(p, State.SPLIT);
3780       regionOnline(a, sn, 1);
3781       regionOnline(b, sn, 1);
3782 
3783       // User could disable the table before master knows the new region.
3784       if (getTableStateManager().isTableState(p.getTable(),
3785           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3786         invokeUnAssign(a);
3787         invokeUnAssign(b);
3788       } else {
3789         Callable<Object> splitReplicasCallable = new Callable<Object>() {
3790           @Override
3791           public Object call() {
3792             doSplittingOfReplicas(p, a, b);
3793             return null;
3794           }
3795         };
3796         threadPoolExecutorService.submit(splitReplicasCallable);
3797       }
3798     } else if (code == TransitionCode.SPLIT_PONR) {
3799       try {
3800         regionStates.splitRegion(p, a, b, sn);
3801       } catch (IOException ioe) {
3802         LOG.info("Failed to record split region " + p.getShortNameToLog());
3803         return "Failed to record the splitting in meta";
3804       }
3805     }
3806     return null;
3807   }
3808 
3809   private String onRegionMerge(ServerName sn, TransitionCode code,
3810       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3811     RegionState rs_p = regionStates.getRegionState(p);
3812     RegionState rs_a = regionStates.getRegionState(a);
3813     RegionState rs_b = regionStates.getRegionState(b);
3814     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3815         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3816       return "Not in state good for merge";
3817     }
3818 
3819     regionStates.updateRegionState(a, State.MERGING);
3820     regionStates.updateRegionState(b, State.MERGING);
3821     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3822 
3823     String encodedName = p.getEncodedName();
3824     if (code == TransitionCode.READY_TO_MERGE) {
3825       mergingRegions.put(encodedName,
3826         new PairOfSameType<HRegionInfo>(a, b));
3827     } else if (code == TransitionCode.MERGED) {
3828       mergingRegions.remove(encodedName);
3829       regionOffline(a, State.MERGED);
3830       regionOffline(b, State.MERGED);
3831       regionOnline(p, sn, 1);
3832 
3833       // User could disable the table before master knows the new region.
3834       if (getTableStateManager().isTableState(p.getTable(),
3835           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3836         invokeUnAssign(p);
3837       } else {
3838         Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3839           @Override
3840           public Object call() {
3841             doMergingOfReplicas(p, a, b);
3842             return null;
3843           }
3844         };
3845         threadPoolExecutorService.submit(mergeReplicasCallable);
3846       }
3847     } else if (code == TransitionCode.MERGE_PONR) {
3848       try {
3849         regionStates.mergeRegions(p, a, b, sn);
3850       } catch (IOException ioe) {
3851         LOG.info("Failed to record merged region " + p.getShortNameToLog());
3852         return "Failed to record the merging in meta";
3853       }
3854     } else {
3855       mergingRegions.remove(encodedName);
3856       regionOnline(a, sn);
3857       regionOnline(b, sn);
3858       regionOffline(p);
3859 
3860       if (getTableStateManager().isTableState(p.getTable(),
3861           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3862         invokeUnAssign(a);
3863         invokeUnAssign(b);
3864       }
3865     }
3866     return null;
3867   }
3868 
3869   /**
3870    * A helper to handle region merging transition event.
3871    * It transitions merging regions to MERGING state.
3872    */
3873   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3874       final String prettyPrintedRegionName, final ServerName sn) {
3875     if (!serverManager.isServerOnline(sn)) {
3876       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3877       return false;
3878     }
3879     byte [] payloadOfMerging = rt.getPayload();
3880     List<HRegionInfo> mergingRegions;
3881     try {
3882       mergingRegions = HRegionInfo.parseDelimitedFrom(
3883         payloadOfMerging, 0, payloadOfMerging.length);
3884     } catch (IOException e) {
3885       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3886         + " payload for " + prettyPrintedRegionName);
3887       return false;
3888     }
3889     assert mergingRegions.size() == 3;
3890     HRegionInfo p = mergingRegions.get(0);
3891     HRegionInfo hri_a = mergingRegions.get(1);
3892     HRegionInfo hri_b = mergingRegions.get(2);
3893 
3894     RegionState rs_p = regionStates.getRegionState(p);
3895     RegionState rs_a = regionStates.getRegionState(hri_a);
3896     RegionState rs_b = regionStates.getRegionState(hri_b);
3897 
3898     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3899         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3900         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3901       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3902         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3903       return false;
3904     }
3905 
3906     EventType et = rt.getEventType();
3907     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3908       try {
3909         RegionMergeCoordination.RegionMergeDetails std =
3910             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3911                 .getRegionMergeCoordination().getDefaultDetails();
3912         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3913             .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3914         if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3915           byte[] data = ZKAssign.getData(watcher, encodedName);
3916          EventType currentType = null;
3917           if (data != null) {
3918             RegionTransition newRt = RegionTransition.parseFrom(data);
3919             currentType = newRt.getEventType();
3920           }
3921           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3922               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3923             LOG.warn("Failed to transition pending_merge node "
3924               + encodedName + " to merging, it's now " + currentType);
3925             return false;
3926           }
3927         }
3928       } catch (Exception e) {
3929         LOG.warn("Failed to transition pending_merge node "
3930           + encodedName + " to merging", e);
3931         return false;
3932       }
3933     }
3934 
3935     synchronized (regionStates) {
3936       regionStates.updateRegionState(hri_a, State.MERGING);
3937       regionStates.updateRegionState(hri_b, State.MERGING);
3938       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3939 
3940       if (et != EventType.RS_ZK_REGION_MERGED) {
3941         this.mergingRegions.put(encodedName,
3942           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3943       } else {
3944         this.mergingRegions.remove(encodedName);
3945         regionOffline(hri_a, State.MERGED);
3946         regionOffline(hri_b, State.MERGED);
3947         regionOnline(p, sn);
3948       }
3949     }
3950 
3951     if (et == EventType.RS_ZK_REGION_MERGED) {
3952       doMergingOfReplicas(p, hri_a, hri_b);
3953       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3954       // Remove region from ZK
3955       try {
3956         boolean successful = false;
3957         while (!successful) {
3958           // It's possible that the RS tickles in between the reading of the
3959           // znode and the deleting, so it's safe to retry.
3960           successful = ZKAssign.deleteNode(watcher, encodedName,
3961             EventType.RS_ZK_REGION_MERGED, sn);
3962         }
3963       } catch (KeeperException e) {
3964         if (e instanceof NoNodeException) {
3965           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3966           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3967         } else {
3968           server.abort("Error deleting MERGED node " + encodedName, e);
3969         }
3970       }
3971       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3972         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3973         + hri_b.getRegionNameAsString() + ", on " + sn);
3974 
3975       // User could disable the table before master knows the new region.
3976       if (tableStateManager.isTableState(p.getTable(),
3977           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3978         unassign(p);
3979       }
3980     }
3981     return true;
3982   }
3983 
3984   /**
3985    * A helper to handle region splitting transition event.
3986    */
3987   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3988       final String prettyPrintedRegionName, final ServerName sn) {
3989     if (!serverManager.isServerOnline(sn)) {
3990       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3991       return false;
3992     }
3993     byte [] payloadOfSplitting = rt.getPayload();
3994     List<HRegionInfo> splittingRegions;
3995     try {
3996       splittingRegions = HRegionInfo.parseDelimitedFrom(
3997         payloadOfSplitting, 0, payloadOfSplitting.length);
3998     } catch (IOException e) {
3999       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
4000         + " payload for " + prettyPrintedRegionName);
4001       return false;
4002     }
4003     assert splittingRegions.size() == 2;
4004     HRegionInfo hri_a = splittingRegions.get(0);
4005     HRegionInfo hri_b = splittingRegions.get(1);
4006 
4007     RegionState rs_p = regionStates.getRegionState(encodedName);
4008     RegionState rs_a = regionStates.getRegionState(hri_a);
4009     RegionState rs_b = regionStates.getRegionState(hri_b);
4010 
4011     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4012         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4013         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4014       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4015         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4016       return false;
4017     }
4018 
4019     if (rs_p == null) {
4020       // Splitting region should be online
4021       rs_p = regionStates.updateRegionState(rt, State.OPEN);
4022       if (rs_p == null) {
4023         LOG.warn("Received splitting for region " + prettyPrintedRegionName
4024           + " from server " + sn + " but it doesn't exist anymore,"
4025           + " probably already processed its split");
4026         return false;
4027       }
4028       regionStates.regionOnline(rs_p.getRegion(), sn);
4029     }
4030 
4031     HRegionInfo p = rs_p.getRegion();
4032     EventType et = rt.getEventType();
4033     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4034       try {
4035         SplitTransactionDetails std =
4036             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4037                 .getSplitTransactionCoordination().getDefaultDetails();
4038         if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4039             .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4040           byte[] data = ZKAssign.getData(watcher, encodedName);
4041           EventType currentType = null;
4042           if (data != null) {
4043             RegionTransition newRt = RegionTransition.parseFrom(data);
4044             currentType = newRt.getEventType();
4045           }
4046           if (currentType == null
4047               || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4048             LOG.warn("Failed to transition pending_split node " + encodedName
4049                 + " to splitting, it's now " + currentType);
4050             return false;
4051           }
4052         }
4053       } catch (Exception e) {
4054         LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4055         return false;
4056       }
4057     }
4058 
4059     synchronized (regionStates) {
4060       splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4061       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4062       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4063       regionStates.updateRegionState(rt, State.SPLITTING);
4064 
4065       // The below is for testing ONLY!  We can't do fault injection easily, so
4066       // resort to this kinda uglyness -- St.Ack 02/25/2011.
4067       if (TEST_SKIP_SPLIT_HANDLING) {
4068         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4069         return true; // return true so that the splitting node stays
4070       }
4071 
4072       if (et == EventType.RS_ZK_REGION_SPLIT) {
4073         regionOffline(p, State.SPLIT);
4074         regionOnline(hri_a, sn);
4075         regionOnline(hri_b, sn);
4076         splitRegions.remove(p);
4077       }
4078     }
4079 
4080     if (et == EventType.RS_ZK_REGION_SPLIT) {
4081       // split replicas
4082       doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4083       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4084       // Remove region from ZK
4085       try {
4086         boolean successful = false;
4087         while (!successful) {
4088           // It's possible that the RS tickles in between the reading of the
4089           // znode and the deleting, so it's safe to retry.
4090           successful = ZKAssign.deleteNode(watcher, encodedName,
4091             EventType.RS_ZK_REGION_SPLIT, sn);
4092         }
4093       } catch (KeeperException e) {
4094         if (e instanceof NoNodeException) {
4095           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4096           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
4097         } else {
4098           server.abort("Error deleting SPLIT node " + encodedName, e);
4099         }
4100       }
4101       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4102         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4103         + hri_b.getRegionNameAsString() + ", on " + sn);
4104 
4105       // User could disable the table before master knows the new region.
4106       if (tableStateManager.isTableState(p.getTable(),
4107           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4108         unassign(hri_a);
4109         unassign(hri_b);
4110       }
4111     }
4112     return true;
4113   }
4114 
4115   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4116       final HRegionInfo hri_b) {
4117     // Close replicas for the original unmerged regions. create/assign new replicas
4118     // for the merged parent.
4119     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4120     unmergedRegions.add(hri_a);
4121     unmergedRegions.add(hri_b);
4122     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4123     Collection<List<HRegionInfo>> c = map.values();
4124     for (List<HRegionInfo> l : c) {
4125       for (HRegionInfo h : l) {
4126         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4127           LOG.debug("Unassigning un-merged replica " + h);
4128           unassign(h);
4129         }
4130       }
4131     }
4132     int numReplicas = 1;
4133     try {
4134       numReplicas = server.getTableDescriptors().get(mergedHri.getTable()).
4135           getRegionReplication();
4136     } catch (IOException e) {
4137       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4138           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4139           "will not be done");
4140     }
4141     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4142     for (int i = 1; i < numReplicas; i++) {
4143       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4144     }
4145     try {
4146       assign(regions);
4147     } catch (IOException ioe) {
4148       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4149                 ioe.getMessage());
4150     } catch (InterruptedException ie) {
4151       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4152                 ie.getMessage());
4153     }
4154   }
4155 
4156   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4157       final HRegionInfo hri_b) {
4158     // create new regions for the replica, and assign them to match with the
4159     // current replica assignments. If replica1 of parent is assigned to RS1,
4160     // the replica1s of daughters will be on the same machine
4161     int numReplicas = 1;
4162     try {
4163       numReplicas = server.getTableDescriptors().get(parentHri.getTable()).
4164           getRegionReplication();
4165     } catch (IOException e) {
4166       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4167           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4168           "replicas will not be done");
4169     }
4170     // unassign the old replicas
4171     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4172     parentRegion.add(parentHri);
4173     Map<ServerName, List<HRegionInfo>> currentAssign =
4174         regionStates.getRegionAssignments(parentRegion);
4175     Collection<List<HRegionInfo>> c = currentAssign.values();
4176     for (List<HRegionInfo> l : c) {
4177       for (HRegionInfo h : l) {
4178         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4179           LOG.debug("Unassigning parent's replica " + h);
4180           unassign(h);
4181         }
4182       }
4183     }
4184     // assign daughter replicas
4185     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4186     for (int i = 1; i < numReplicas; i++) {
4187       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4188       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4189     }
4190     try {
4191       assign(map);
4192     } catch (IOException e) {
4193       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4194     } catch (InterruptedException e) {
4195       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4196     }
4197   }
4198 
4199   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4200       int replicaId, Map<HRegionInfo, ServerName> map) {
4201     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4202     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4203         replicaId);
4204     LOG.debug("Created replica region for daughter " + daughterReplica);
4205     ServerName sn;
4206     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4207       map.put(daughterReplica, sn);
4208     } else {
4209       List<ServerName> servers = serverManager.getOnlineServersList();
4210       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4211       map.put(daughterReplica, sn);
4212     }
4213   }
4214 
4215   public Set<HRegionInfo> getReplicasToClose() {
4216     return replicasToClose;
4217   }
4218 
4219   /**
4220    * A region is offline.  The new state should be the specified one,
4221    * if not null.  If the specified state is null, the new state is Offline.
4222    * The specified state can be Split/Merged/Offline/null only.
4223    */
4224   private void regionOffline(final HRegionInfo regionInfo, final State state) {
4225     regionStates.regionOffline(regionInfo, state);
4226     removeClosedRegion(regionInfo);
4227     // remove the region plan as well just in case.
4228     clearRegionPlan(regionInfo);
4229     balancer.regionOffline(regionInfo);
4230 
4231     // Tell our listeners that a region was closed
4232     sendRegionClosedNotification(regionInfo);
4233     // also note that all the replicas of the primary should be closed
4234     if (state != null && state.equals(State.SPLIT)) {
4235       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4236       c.add(regionInfo);
4237       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4238       Collection<List<HRegionInfo>> allReplicas = map.values();
4239       for (List<HRegionInfo> list : allReplicas) {
4240         replicasToClose.addAll(list);
4241       }
4242     }
4243     else if (state != null && state.equals(State.MERGED)) {
4244       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4245       c.add(regionInfo);
4246       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4247       Collection<List<HRegionInfo>> allReplicas = map.values();
4248       for (List<HRegionInfo> list : allReplicas) {
4249         replicasToClose.addAll(list);
4250       }
4251     }
4252   }
4253 
4254   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4255       final ServerName serverName) {
4256     if (!this.listeners.isEmpty()) {
4257       for (AssignmentListener listener : this.listeners) {
4258         listener.regionOpened(regionInfo, serverName);
4259       }
4260     }
4261   }
4262 
4263   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4264     if (!this.listeners.isEmpty()) {
4265       for (AssignmentListener listener : this.listeners) {
4266         listener.regionClosed(regionInfo);
4267       }
4268     }
4269   }
4270 
4271   /**
4272    * Try to update some region states. If the state machine prevents
4273    * such update, an error message is returned to explain the reason.
4274    *
4275    * It's expected that in each transition there should have just one
4276    * region for opening/closing, 3 regions for splitting/merging.
4277    * These regions should be on the server that requested the change.
4278    *
4279    * Region state machine. Only these transitions
4280    * are expected to be triggered by a region server.
4281    *
4282    * On the state transition:
4283    *  (1) Open/Close should be initiated by master
4284    *      (a) Master sets the region to pending_open/pending_close
4285    *        in memory and hbase:meta after sending the request
4286    *        to the region server
4287    *      (b) Region server reports back to the master
4288    *        after open/close is done (either success/failure)
4289    *      (c) If region server has problem to report the status
4290    *        to master, it must be because the master is down or some
4291    *        temporary network issue. Otherwise, the region server should
4292    *        abort since it must be a bug. If the master is not accessible,
4293    *        the region server should keep trying until the server is
4294    *        stopped or till the status is reported to the (new) master
4295    *      (d) If region server dies in the middle of opening/closing
4296    *        a region, SSH picks it up and finishes it
4297    *      (e) If master dies in the middle, the new master recovers
4298    *        the state during initialization from hbase:meta. Region server
4299    *        can report any transition that has not been reported to
4300    *        the previous active master yet
4301    *  (2) Split/merge is initiated by region servers
4302    *      (a) To split a region, a region server sends a request
4303    *        to master to try to set a region to splitting, together with
4304    *        two daughters (to be created) to splitting new. If approved
4305    *        by the master, the splitting can then move ahead
4306    *      (b) To merge two regions, a region server sends a request to
4307    *        master to try to set the new merged region (to be created) to
4308    *        merging_new, together with two regions (to be merged) to merging.
4309    *        If it is ok with the master, the merge can then move ahead
4310    *      (c) Once the splitting/merging is done, the region server
4311    *        reports the status back to the master either success/failure.
4312    *      (d) Other scenarios should be handled similarly as for
4313    *        region open/close
4314    */
4315   protected String onRegionTransition(final ServerName serverName,
4316       final RegionStateTransition transition) {
4317     TransitionCode code = transition.getTransitionCode();
4318     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4319     RegionState current = regionStates.getRegionState(hri);
4320     if (LOG.isDebugEnabled()) {
4321       LOG.debug("Got transition " + code + " for "
4322         + (current != null ? current.toString() : hri.getShortNameToLog())
4323         + " from " + serverName);
4324     }
4325     String errorMsg = null;
4326     switch (code) {
4327     case OPENED:
4328       if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4329         LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4330             + serverName);
4331         break;
4332       }
4333     case FAILED_OPEN:
4334       if (current == null
4335           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4336         errorMsg = hri.getShortNameToLog()
4337           + " is not pending open on " + serverName;
4338       } else if (code == TransitionCode.FAILED_OPEN) {
4339         onRegionFailedOpen(hri, serverName);
4340       } else {
4341         long openSeqNum = HConstants.NO_SEQNUM;
4342         if (transition.hasOpenSeqNum()) {
4343           openSeqNum = transition.getOpenSeqNum();
4344         }
4345         if (openSeqNum < 0) {
4346           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4347         } else {
4348           onRegionOpen(hri, serverName, openSeqNum);
4349         }
4350       }
4351       break;
4352 
4353     case CLOSED:
4354       if (current == null
4355           || !current.isPendingCloseOrClosingOnServer(serverName)) {
4356         errorMsg = hri.getShortNameToLog()
4357           + " is not pending close on " + serverName;
4358       } else {
4359         onRegionClosed(hri);
4360       }
4361       break;
4362 
4363     case READY_TO_SPLIT:
4364       try {
4365         regionStateListener.onRegionSplit(hri);
4366       } catch (IOException exp) {
4367         errorMsg = StringUtils.stringifyException(exp);
4368       }
4369       break;
4370     case SPLIT_PONR:
4371     case SPLIT:
4372       errorMsg =
4373       onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4374         HRegionInfo.convert(transition.getRegionInfo(2)));
4375       break;
4376 
4377     case SPLIT_REVERTED:
4378       errorMsg =
4379           onRegionSplitReverted(serverName, hri,
4380             HRegionInfo.convert(transition.getRegionInfo(1)),
4381             HRegionInfo.convert(transition.getRegionInfo(2)));
4382       if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4383         try {
4384           regionStateListener.onRegionSplitReverted(hri);
4385         } catch (IOException exp) {
4386           LOG.warn(StringUtils.stringifyException(exp));
4387         }
4388       }
4389       break;
4390     case READY_TO_MERGE:
4391     case MERGE_PONR:
4392     case MERGED:
4393     case MERGE_REVERTED:
4394       errorMsg = onRegionMerge(serverName, code, hri,
4395         HRegionInfo.convert(transition.getRegionInfo(1)),
4396         HRegionInfo.convert(transition.getRegionInfo(2)));
4397       if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4398         try {
4399           regionStateListener.onRegionMerged(hri);
4400         } catch (IOException exp) {
4401           errorMsg = StringUtils.stringifyException(exp);
4402         }
4403       }
4404       break;
4405 
4406     default:
4407       errorMsg = "Unexpected transition code " + code;
4408     }
4409     if (errorMsg != null) {
4410       LOG.error("Failed to transtion region from " + current + " to "
4411         + code + " by " + serverName + ": " + errorMsg);
4412     }
4413     return errorMsg;
4414   }
4415 
4416   /**
4417    * @return Instance of load balancer
4418    */
4419   public LoadBalancer getBalancer() {
4420     return this.balancer;
4421   }
4422 
4423   public Map<ServerName, List<HRegionInfo>>
4424     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4425     return getRegionStates().getRegionAssignments(infos);
4426   }
4427 
4428   void setRegionStateListener(RegionStateListener listener) {
4429     this.regionStateListener = listener;
4430   }
4431 }