View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.BindException;
24  import java.net.InetSocketAddress;
25  import java.net.UnknownHostException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.NavigableMap;
34  import java.util.Set;
35  import java.util.TreeSet;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.atomic.AtomicLong;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.CellScannable;
44  import org.apache.hadoop.hbase.CellScanner;
45  import org.apache.hadoop.hbase.CellUtil;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.DroppedSnapshotException;
48  import org.apache.hadoop.hbase.HBaseIOException;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.MetaTableAccessor;
53  import org.apache.hadoop.hbase.MultiActionResultTooLarge;
54  import org.apache.hadoop.hbase.NotServingRegionException;
55  import org.apache.hadoop.hbase.ServerName;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.UnknownScannerException;
58  import org.apache.hadoop.hbase.classification.InterfaceAudience;
59  import org.apache.hadoop.hbase.client.Append;
60  import org.apache.hadoop.hbase.client.ConnectionUtils;
61  import org.apache.hadoop.hbase.client.Delete;
62  import org.apache.hadoop.hbase.client.Durability;
63  import org.apache.hadoop.hbase.client.Get;
64  import org.apache.hadoop.hbase.client.Increment;
65  import org.apache.hadoop.hbase.client.Mutation;
66  import org.apache.hadoop.hbase.client.Put;
67  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.client.RowMutations;
70  import org.apache.hadoop.hbase.client.Scan;
71  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
72  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
73  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
74  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
75  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
76  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
77  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
78  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
79  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
80  import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
81  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
82  import org.apache.hadoop.hbase.ipc.PriorityFunction;
83  import org.apache.hadoop.hbase.ipc.QosPriority;
84  import org.apache.hadoop.hbase.ipc.RpcCallContext;
85  import org.apache.hadoop.hbase.ipc.RpcServer;
86  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
87  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
88  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
89  import org.apache.hadoop.hbase.ipc.ServerRpcController;
90  import org.apache.hadoop.hbase.master.MasterRpcServices;
91  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
92  import org.apache.hadoop.hbase.protobuf.RequestConverter;
93  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
94  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
95  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
96  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
97  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
98  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
99  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
130 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
131 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
152 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
153 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
154 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
156 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
157 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
158 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
159 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
160 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
162 import org.apache.hadoop.hbase.quotas.OperationQuota;
163 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
164 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
165 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
166 import org.apache.hadoop.hbase.regionserver.Region.Operation;
167 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
168 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
169 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
170 import org.apache.hadoop.hbase.wal.WAL;
171 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
172 import org.apache.hadoop.hbase.security.User;
173 import org.apache.hadoop.hbase.util.Bytes;
174 import org.apache.hadoop.hbase.util.Counter;
175 import org.apache.hadoop.hbase.util.DNS;
176 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
177 import org.apache.hadoop.hbase.util.Pair;
178 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
179 import org.apache.hadoop.hbase.util.Strings;
180 import org.apache.hadoop.hbase.wal.WALKey;
181 import org.apache.hadoop.hbase.wal.WALSplitter;
182 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
183 import org.apache.zookeeper.KeeperException;
184 
185 import com.google.common.annotations.VisibleForTesting;
186 import com.google.protobuf.ByteString;
187 import com.google.protobuf.Message;
188 import com.google.protobuf.RpcController;
189 import com.google.protobuf.ServiceException;
190 import com.google.protobuf.TextFormat;
191 
192 /**
193  * Implements the regionserver RPC services.
194  */
195 @InterfaceAudience.Private
196 @SuppressWarnings("deprecation")
197 public class RSRpcServices implements HBaseRPCErrorHandler,
198     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
199     ConfigurationObserver {
200   protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
201 
202   /** RPC scheduler to use for the region server. */
203   public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
204     "hbase.region.server.rpc.scheduler.factory.class";
205 
206   /**
207    * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
208    * configuration exists to prevent the scenario where a time limit is specified to be so
209    * restrictive that the time limit is reached immediately (before any cells are scanned).
210    */
211   private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
212       "hbase.region.server.rpc.minimum.scan.time.limit.delta";
213   /**
214    * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
215    */
216   private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
217 
218   // Request counter. (Includes requests that are not serviced by regions.)
219   final Counter requestCount = new Counter();
220   // Server to handle client requests.
221   final RpcServerInterface rpcServer;
222   final InetSocketAddress isa;
223 
224   private final HRegionServer regionServer;
225   private final long maxScannerResultSize;
226 
227   // The reference to the priority extraction function
228   private final PriorityFunction priority;
229 
230   private final AtomicLong scannerIdGen = new AtomicLong(0L);
231   private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
232     new ConcurrentHashMap<String, RegionScannerHolder>();
233 
234   /**
235    * The lease timeout period for client scanners (milliseconds).
236    */
237   private final int scannerLeaseTimeoutPeriod;
238 
239   /**
240    * The RPC timeout period (milliseconds)
241    */
242   private final int rpcTimeout;
243 
244   /**
245    * The minimum allowable delta to use for the scan limit
246    */
247   private final long minimumScanTimeLimitDelta;
248 
249   /**
250    * Holder class which holds the RegionScanner and nextCallSeq together.
251    */
252   private static class RegionScannerHolder {
253     private AtomicLong nextCallSeq = new AtomicLong(0);
254     private RegionScanner s;
255     private Region r;
256 
257     public RegionScannerHolder(RegionScanner s, Region r) {
258       this.s = s;
259       this.r = r;
260     }
261 
262     private long getNextCallSeq() {
263       return nextCallSeq.get();
264     }
265 
266     private void incNextCallSeq() {
267       nextCallSeq.incrementAndGet();
268     }
269 
270     private void rollbackNextCallSeq() {
271       nextCallSeq.decrementAndGet();
272     }
273   }
274 
275   /**
276    * Instantiated as a scanner lease. If the lease times out, the scanner is
277    * closed
278    */
279   private class ScannerListener implements LeaseListener {
280     private final String scannerName;
281 
282     ScannerListener(final String n) {
283       this.scannerName = n;
284     }
285 
286     @Override
287     public void leaseExpired() {
288       RegionScannerHolder rsh = scanners.remove(this.scannerName);
289       if (rsh != null) {
290         RegionScanner s = rsh.s;
291         LOG.info("Scanner " + this.scannerName + " lease expired on region "
292           + s.getRegionInfo().getRegionNameAsString());
293         try {
294           Region region = regionServer.getRegion(s.getRegionInfo().getRegionName());
295           if (region != null && region.getCoprocessorHost() != null) {
296             region.getCoprocessorHost().preScannerClose(s);
297           }
298 
299           s.close();
300           if (region != null && region.getCoprocessorHost() != null) {
301             region.getCoprocessorHost().postScannerClose(s);
302           }
303         } catch (IOException e) {
304           LOG.error("Closing scanner for "
305             + s.getRegionInfo().getRegionNameAsString(), e);
306         }
307       } else {
308         LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
309           " scanner found, hence no chance to close that related scanner!");
310       }
311     }
312   }
313 
314   private static ResultOrException getResultOrException(
315       final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
316     return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
317   }
318 
319   private static ResultOrException getResultOrException(final Exception e, final int index) {
320     return getResultOrException(ResponseConverter.buildActionResult(e), index);
321   }
322 
323   private static ResultOrException getResultOrException(
324       final ResultOrException.Builder builder, final int index) {
325     return builder.setIndex(index).build();
326   }
327 
328   /**
329    * Starts the nonce operation for a mutation, if needed.
330    * @param mutation Mutation.
331    * @param nonceGroup Nonce group from the request.
332    * @returns Nonce used (can be NO_NONCE).
333    */
334   private long startNonceOperation(final MutationProto mutation, long nonceGroup)
335       throws IOException, OperationConflictException {
336     if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
337     boolean canProceed = false;
338     try {
339       canProceed = regionServer.nonceManager.startOperation(
340         nonceGroup, mutation.getNonce(), regionServer);
341     } catch (InterruptedException ex) {
342       throw new InterruptedIOException("Nonce start operation interrupted");
343     }
344     if (!canProceed) {
345       // TODO: instead, we could convert append/increment to get w/mvcc
346       String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
347         + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
348         + "] may have already completed";
349       throw new OperationConflictException(message);
350     }
351     return mutation.getNonce();
352   }
353 
354   /**
355    * Ends nonce operation for a mutation, if needed.
356    * @param mutation Mutation.
357    * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
358    * @param success Whether the operation for this nonce has succeeded.
359    */
360   private void endNonceOperation(final MutationProto mutation,
361       long nonceGroup, boolean success) {
362     if (regionServer.nonceManager != null && mutation.hasNonce()) {
363       regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
364     }
365   }
366 
367   /**
368    * @return True if current call supports cellblocks
369    */
370   private boolean isClientCellBlockSupport() {
371     RpcCallContext context = RpcServer.getCurrentCall();
372     return context != null && context.isClientCellBlockSupported();
373   }
374 
375   private void addResult(final MutateResponse.Builder builder,
376       final Result result, final PayloadCarryingRpcController rpcc) {
377     if (result == null) return;
378     if (isClientCellBlockSupport()) {
379       builder.setResult(ProtobufUtil.toResultNoData(result));
380       rpcc.setCellScanner(result.cellScanner());
381     } else {
382       ClientProtos.Result pbr = ProtobufUtil.toResult(result);
383       builder.setResult(pbr);
384     }
385   }
386 
387   private void addResults(final ScanResponse.Builder builder, final List<Result> results,
388       final RpcController controller, boolean isDefaultRegion) {
389     builder.setStale(!isDefaultRegion);
390     if (results == null || results.isEmpty()) return;
391     if (isClientCellBlockSupport()) {
392       for (Result res : results) {
393         builder.addCellsPerResult(res.size());
394         builder.addPartialFlagPerResult(res.isPartial());
395       }
396       ((PayloadCarryingRpcController)controller).
397         setCellScanner(CellUtil.createCellScanner(results));
398     } else {
399       for (Result res: results) {
400         ClientProtos.Result pbr = ProtobufUtil.toResult(res);
401         builder.addResults(pbr);
402       }
403     }
404   }
405 
406   /**
407    * Mutate a list of rows atomically.
408    *
409    * @param region
410    * @param actions
411    * @param cellScanner if non-null, the mutation data -- the Cell content.
412    * @throws IOException
413    */
414   private ClientProtos.RegionLoadStats mutateRows(final Region region,
415       final List<ClientProtos.Action> actions,
416       final CellScanner cellScanner) throws IOException {
417     if (!region.getRegionInfo().isMetaTable()) {
418       regionServer.cacheFlusher.reclaimMemStoreMemory();
419     }
420     RowMutations rm = null;
421     for (ClientProtos.Action action: actions) {
422       if (action.hasGet()) {
423         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
424           action.getGet());
425       }
426       MutationType type = action.getMutation().getMutateType();
427       if (rm == null) {
428         rm = new RowMutations(action.getMutation().getRow().toByteArray());
429       }
430       switch (type) {
431         case PUT:
432           rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
433           break;
434         case DELETE:
435           rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
436           break;
437         default:
438           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
439       }
440     }
441     region.mutateRow(rm);
442     return ((HRegion)region).getRegionStats();
443   }
444 
445   /**
446    * Mutate a list of rows atomically.
447    *
448    * @param region
449    * @param actions
450    * @param cellScanner if non-null, the mutation data -- the Cell content.
451    * @param row
452    * @param family
453    * @param qualifier
454    * @param compareOp
455    * @param comparator @throws IOException
456    */
457   private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
458       final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
459       CompareOp compareOp, ByteArrayComparable comparator) throws IOException {
460     if (!region.getRegionInfo().isMetaTable()) {
461       regionServer.cacheFlusher.reclaimMemStoreMemory();
462     }
463     RowMutations rm = null;
464     for (ClientProtos.Action action: actions) {
465       if (action.hasGet()) {
466         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
467             action.getGet());
468       }
469       MutationType type = action.getMutation().getMutateType();
470       if (rm == null) {
471         rm = new RowMutations(action.getMutation().getRow().toByteArray());
472       }
473       switch (type) {
474         case PUT:
475           rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
476           break;
477         case DELETE:
478           rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
479           break;
480         default:
481           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
482       }
483     }
484     return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
485   }
486 
487   /**
488    * Execute an append mutation.
489    *
490    * @param region
491    * @param m
492    * @param cellScanner
493    * @return result to return to client if default operation should be
494    * bypassed as indicated by RegionObserver, null otherwise
495    * @throws IOException
496    */
497   private Result append(final Region region, final OperationQuota quota, final MutationProto m,
498       final CellScanner cellScanner, long nonceGroup) throws IOException {
499     long before = EnvironmentEdgeManager.currentTime();
500     Append append = ProtobufUtil.toAppend(m, cellScanner);
501     quota.addMutation(append);
502     Result r = null;
503     if (region.getCoprocessorHost() != null) {
504       r = region.getCoprocessorHost().preAppend(append);
505     }
506     if (r == null) {
507       long nonce = startNonceOperation(m, nonceGroup);
508       boolean success = false;
509       try {
510         r = region.append(append, nonceGroup, nonce);
511         success = true;
512       } finally {
513         endNonceOperation(m, nonceGroup, success);
514       }
515       if (region.getCoprocessorHost() != null) {
516         region.getCoprocessorHost().postAppend(append, r);
517       }
518     }
519     if (regionServer.metricsRegionServer != null) {
520       regionServer.metricsRegionServer.updateAppend(
521         EnvironmentEdgeManager.currentTime() - before);
522     }
523     return r;
524   }
525 
526   /**
527    * Execute an increment mutation.
528    *
529    * @param region
530    * @param mutation
531    * @return the Result
532    * @throws IOException
533    */
534   private Result increment(final Region region, final OperationQuota quota,
535       final MutationProto mutation, final CellScanner cells, long nonceGroup) throws IOException {
536     long before = EnvironmentEdgeManager.currentTime();
537     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
538     quota.addMutation(increment);
539     Result r = null;
540     if (region.getCoprocessorHost() != null) {
541       r = region.getCoprocessorHost().preIncrement(increment);
542     }
543     if (r == null) {
544       long nonce = startNonceOperation(mutation, nonceGroup);
545       boolean success = false;
546       try {
547         r = region.increment(increment, nonceGroup, nonce);
548         success = true;
549       } finally {
550         endNonceOperation(mutation, nonceGroup, success);
551       }
552       if (region.getCoprocessorHost() != null) {
553         r = region.getCoprocessorHost().postIncrement(increment, r);
554       }
555     }
556     if (regionServer.metricsRegionServer != null) {
557       regionServer.metricsRegionServer.updateIncrement(
558         EnvironmentEdgeManager.currentTime() - before);
559     }
560     return r;
561   }
562 
563   /**
564    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
565    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
566    * @param region
567    * @param actions
568    * @param cellScanner
569    * @param builder
570    * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
571    * method returns as a 'result'.
572    * @return Return the <code>cellScanner</code> passed
573    */
574   private List<CellScannable> doNonAtomicRegionMutation(final Region region,
575       final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
576       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
577     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
578     // one at a time, we instead pass them in batch.  Be aware that the corresponding
579     // ResultOrException instance that matches each Put or Delete is then added down in the
580     // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
581     List<ClientProtos.Action> mutations = null;
582     long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
583     RpcCallContext context = RpcServer.getCurrentCall();
584     IOException sizeIOE = null;
585     Object lastBlock = null;
586     for (ClientProtos.Action action : actions.getActionList()) {
587       ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = null;
588       try {
589         Result r = null;
590 
591         if (context != null
592             && context.isRetryImmediatelySupported()
593             && (context.getResponseCellSize() > maxQuotaResultSize
594               || context.getResponseBlockSize() > maxQuotaResultSize)) {
595 
596           // We're storing the exception since the exception and reason string won't
597           // change after the response size limit is reached.
598           if (sizeIOE == null ) {
599             // We don't need the stack un-winding do don't throw the exception.
600             // Throwing will kill the JVM's JIT.
601             //
602             // Instead just create the exception and then store it.
603             sizeIOE = new MultiActionResultTooLarge("Max size exceeded"
604                 + " CellSize: " + context.getResponseCellSize()
605                 + " BlockSize: " + context.getResponseBlockSize());
606 
607             // Only report the exception once since there's only one request that
608             // caused the exception. Otherwise this number will dominate the exceptions count.
609             rpcServer.getMetrics().exception(sizeIOE);
610           }
611 
612           // Now that there's an exception is known to be created
613           // use it for the response.
614           //
615           // This will create a copy in the builder.
616           resultOrExceptionBuilder = ResultOrException.newBuilder().
617               setException(ResponseConverter.buildException(sizeIOE));
618           resultOrExceptionBuilder.setIndex(action.getIndex());
619           builder.addResultOrException(resultOrExceptionBuilder.build());
620           if (cellScanner != null) {
621             skipCellsForMutation(action, cellScanner);
622           }
623           continue;
624         }
625         if (action.hasGet()) {
626           long before = EnvironmentEdgeManager.currentTime();
627           try {
628             Get get = ProtobufUtil.toGet(action.getGet());
629             r = region.get(get);
630           } finally {
631             if (regionServer.metricsRegionServer != null) {
632               regionServer.metricsRegionServer.updateGet(
633                 EnvironmentEdgeManager.currentTime() - before);
634             }
635           }
636         } else if (action.hasServiceCall()) {
637           resultOrExceptionBuilder = ResultOrException.newBuilder();
638           try {
639             Message result = execServiceOnRegion(region, action.getServiceCall());
640             ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
641                 ClientProtos.CoprocessorServiceResult.newBuilder();
642             resultOrExceptionBuilder.setServiceResult(
643                 serviceResultBuilder.setValue(
644                   serviceResultBuilder.getValueBuilder()
645                     .setName(result.getClass().getName())
646                     .setValue(result.toByteString())));
647           } catch (IOException ioe) {
648             rpcServer.getMetrics().exception(ioe);
649             resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
650           }
651         } else if (action.hasMutation()) {
652           MutationType type = action.getMutation().getMutateType();
653           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
654               !mutations.isEmpty()) {
655             // Flush out any Puts or Deletes already collected.
656             doBatchOp(builder, region, quota, mutations, cellScanner);
657             mutations.clear();
658           }
659           switch (type) {
660           case APPEND:
661             r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
662             break;
663           case INCREMENT:
664             r = increment(region, quota, action.getMutation(), cellScanner,  nonceGroup);
665             break;
666           case PUT:
667           case DELETE:
668             // Collect the individual mutations and apply in a batch
669             if (mutations == null) {
670               mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
671             }
672             mutations.add(action);
673             break;
674           default:
675             throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
676           }
677         } else {
678           throw new HBaseIOException("Unexpected Action type");
679         }
680         if (r != null) {
681           ClientProtos.Result pbResult = null;
682           if (isClientCellBlockSupport()) {
683             pbResult = ProtobufUtil.toResultNoData(r);
684             //  Hard to guess the size here.  Just make a rough guess.
685             if (cellsToReturn == null) {
686               cellsToReturn = new ArrayList<CellScannable>();
687             }
688             cellsToReturn.add(r);
689           } else {
690             pbResult = ProtobufUtil.toResult(r);
691           }
692           lastBlock = addSize(context, r, lastBlock);
693           resultOrExceptionBuilder =
694             ClientProtos.ResultOrException.newBuilder().setResult(pbResult);
695         }
696         // Could get to here and there was no result and no exception.  Presumes we added
697         // a Put or Delete to the collecting Mutations List for adding later.  In this
698         // case the corresponding ResultOrException instance for the Put or Delete will be added
699         // down in the doBatchOp method call rather than up here.
700       } catch (IOException ie) {
701         rpcServer.getMetrics().exception(ie);
702         resultOrExceptionBuilder = ResultOrException.newBuilder().
703           setException(ResponseConverter.buildException(ie));
704       }
705       if (resultOrExceptionBuilder != null) {
706         // Propagate index.
707         resultOrExceptionBuilder.setIndex(action.getIndex());
708         builder.addResultOrException(resultOrExceptionBuilder.build());
709       }
710     }
711     // Finish up any outstanding mutations
712     if (mutations != null && !mutations.isEmpty()) {
713       doBatchOp(builder, region, quota, mutations, cellScanner);
714     }
715     return cellsToReturn;
716   }
717 
718   /**
719    * Execute a list of Put/Delete mutations.
720    *
721    * @param builder
722    * @param region
723    * @param mutations
724    */
725   private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
726       final OperationQuota quota,
727       final List<ClientProtos.Action> mutations, final CellScanner cells) {
728     Mutation[] mArray = new Mutation[mutations.size()];
729     long before = EnvironmentEdgeManager.currentTime();
730     boolean batchContainsPuts = false, batchContainsDelete = false;
731     try {
732       int i = 0;
733       for (ClientProtos.Action action: mutations) {
734         MutationProto m = action.getMutation();
735         Mutation mutation;
736         if (m.getMutateType() == MutationType.PUT) {
737           mutation = ProtobufUtil.toPut(m, cells);
738           batchContainsPuts = true;
739         } else {
740           mutation = ProtobufUtil.toDelete(m, cells);
741           batchContainsDelete = true;
742         }
743         mArray[i++] = mutation;
744         quota.addMutation(mutation);
745       }
746 
747       if (!region.getRegionInfo().isMetaTable()) {
748         regionServer.cacheFlusher.reclaimMemStoreMemory();
749       }
750 
751       OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE,
752         HConstants.NO_NONCE);
753       for (i = 0; i < codes.length; i++) {
754         int index = mutations.get(i).getIndex();
755         Exception e = null;
756         switch (codes[i].getOperationStatusCode()) {
757           case BAD_FAMILY:
758             e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
759             builder.addResultOrException(getResultOrException(e, index));
760             break;
761 
762           case SANITY_CHECK_FAILURE:
763             e = new FailedSanityCheckException(codes[i].getExceptionMsg());
764             builder.addResultOrException(getResultOrException(e, index));
765             break;
766 
767           default:
768             e = new DoNotRetryIOException(codes[i].getExceptionMsg());
769             builder.addResultOrException(getResultOrException(e, index));
770             break;
771 
772           case SUCCESS:
773             builder.addResultOrException(getResultOrException(
774                 ClientProtos.Result.getDefaultInstance(), index,
775                 ((HRegion) region).getRegionStats()));
776             break;
777         }
778       }
779     } catch (IOException ie) {
780       for (int i = 0; i < mutations.size(); i++) {
781         builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
782       }
783     }
784     if (regionServer.metricsRegionServer != null) {
785       long after = EnvironmentEdgeManager.currentTime();
786       if (batchContainsPuts) {
787         regionServer.metricsRegionServer.updatePut(after - before);
788       }
789       if (batchContainsDelete) {
790         regionServer.metricsRegionServer.updateDelete(after - before);
791       }
792     }
793   }
794 
795   /**
796    * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
797    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
798    * @param region
799    * @param mutations
800    * @param replaySeqId
801    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
802    *         exceptionMessage if any
803    * @throws IOException
804    */
805   private OperationStatus [] doReplayBatchOp(final Region region,
806       final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
807     long before = EnvironmentEdgeManager.currentTime();
808     boolean batchContainsPuts = false, batchContainsDelete = false;
809     try {
810       for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
811         WALSplitter.MutationReplay m = it.next();
812 
813         if (m.type == MutationType.PUT) {
814           batchContainsPuts = true;
815         } else {
816           batchContainsDelete = true;
817         }
818 
819         NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
820         List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
821         if (metaCells != null && !metaCells.isEmpty()) {
822           for (Cell metaCell : metaCells) {
823             CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
824             boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
825             HRegion hRegion = (HRegion)region;
826             if (compactionDesc != null) {
827               // replay the compaction. Remove the files from stores only if we are the primary
828               // region replica (thus own the files)
829               hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
830                 replaySeqId);
831               continue;
832             }
833             FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
834             if (flushDesc != null && !isDefaultReplica) {
835               hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
836               continue;
837             }
838             RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
839             if (regionEvent != null && !isDefaultReplica) {
840               hRegion.replayWALRegionEventMarker(regionEvent);
841               continue;
842             }
843             BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
844             if (bulkLoadEvent != null) {
845               hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
846               continue;
847             }
848           }
849           it.remove();
850         }
851       }
852       requestCount.add(mutations.size());
853       if (!region.getRegionInfo().isMetaTable()) {
854         regionServer.cacheFlusher.reclaimMemStoreMemory();
855       }
856       return region.batchReplay(mutations.toArray(
857         new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
858     } finally {
859       if (regionServer.metricsRegionServer != null) {
860         long after = EnvironmentEdgeManager.currentTime();
861           if (batchContainsPuts) {
862           regionServer.metricsRegionServer.updatePut(after - before);
863         }
864         if (batchContainsDelete) {
865           regionServer.metricsRegionServer.updateDelete(after - before);
866         }
867       }
868     }
869   }
870 
871   private void closeAllScanners() {
872     // Close any outstanding scanners. Means they'll get an UnknownScanner
873     // exception next time they come in.
874     for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
875       try {
876         e.getValue().s.close();
877       } catch (IOException ioe) {
878         LOG.warn("Closing scanner " + e.getKey(), ioe);
879       }
880     }
881   }
882 
883   public RSRpcServices(HRegionServer rs) throws IOException {
884     regionServer = rs;
885 
886     RpcSchedulerFactory rpcSchedulerFactory;
887     try {
888       Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
889           REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
890           SimpleRpcSchedulerFactory.class);
891       rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
892     } catch (InstantiationException e) {
893       throw new IllegalArgumentException(e);
894     } catch (IllegalAccessException e) {
895       throw new IllegalArgumentException(e);
896     }
897     // Server to handle client requests.
898     InetSocketAddress initialIsa;
899     InetSocketAddress bindAddress;
900     if(this instanceof MasterRpcServices) {
901       String hostname = getHostname(rs.conf, true);
902       int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
903       // Creation of a HSA will force a resolve.
904       initialIsa = new InetSocketAddress(hostname, port);
905       bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
906     } else {
907       String hostname = getHostname(rs.conf, false);
908       int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
909         HConstants.DEFAULT_REGIONSERVER_PORT);
910       // Creation of a HSA will force a resolve.
911       initialIsa = new InetSocketAddress(hostname, port);
912       bindAddress = new InetSocketAddress(
913         rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
914     }
915     if (initialIsa.getAddress() == null) {
916       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
917     }
918     priority = createPriority();
919     String name = rs.getProcessName() + "/" + initialIsa.toString();
920     // Set how many times to retry talking to another server over HConnection.
921     ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
922     try {
923       rpcServer = new RpcServer(rs, name, getServices(),
924           bindAddress, // use final bindAddress for this server.
925           rs.conf,
926           rpcSchedulerFactory.create(rs.conf, this, rs));
927     } catch (BindException be) {
928       String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT :
929           HConstants.REGIONSERVER_PORT;
930       throw new IOException(be.getMessage() + ". To switch ports use the '" + configName +
931           "' configuration property.", be.getCause() != null ? be.getCause() : be);
932     }
933 
934     scannerLeaseTimeoutPeriod = rs.conf.getInt(
935       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
936       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
937     maxScannerResultSize = rs.conf.getLong(
938       HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
939       HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
940     rpcTimeout = rs.conf.getInt(
941       HConstants.HBASE_RPC_TIMEOUT_KEY,
942       HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
943     minimumScanTimeLimitDelta = rs.conf.getLong(
944       REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
945       DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
946 
947     InetSocketAddress address = rpcServer.getListenerAddress();
948     if (address == null) {
949       throw new IOException("Listener channel is closed");
950     }
951     // Set our address, however we need the final port that was given to rpcServer
952     isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
953     rpcServer.setErrorHandler(this);
954     rs.setName(name);
955   }
956 
957   @Override
958   public void onConfigurationChange(Configuration newConf) {
959     if (rpcServer instanceof ConfigurationObserver) {
960       ((ConfigurationObserver)rpcServer).onConfigurationChange(newConf);
961     }
962   }
963 
964   protected PriorityFunction createPriority() {
965     return new AnnotationReadingPriorityFunction(this);
966   }
967 
968   public static String getHostname(Configuration conf, boolean isMaster)
969       throws UnknownHostException {
970     String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
971       HRegionServer.RS_HOSTNAME_KEY);
972     if (hostname == null || hostname.isEmpty()) {
973       String masterOrRS = isMaster ? "master" : "regionserver";
974       return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
975         conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
976         conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
977     } else {
978       LOG.info("hostname is configured to be " + hostname);
979       return hostname;
980     }
981   }
982 
983   RegionScanner getScanner(long scannerId) {
984     String scannerIdString = Long.toString(scannerId);
985     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
986     if (scannerHolder != null) {
987       return scannerHolder.s;
988     }
989     return null;
990   }
991 
992   /**
993    * Get the vtime associated with the scanner.
994    * Currently the vtime is the number of "next" calls.
995    */
996   long getScannerVirtualTime(long scannerId) {
997     String scannerIdString = Long.toString(scannerId);
998     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
999     if (scannerHolder != null) {
1000       return scannerHolder.getNextCallSeq();
1001     }
1002     return 0L;
1003   }
1004 
1005   long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
1006     long scannerId = this.scannerIdGen.incrementAndGet();
1007     String scannerName = String.valueOf(scannerId);
1008 
1009     RegionScannerHolder existing =
1010       scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
1011     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1012 
1013     regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1014         new ScannerListener(scannerName));
1015     return scannerId;
1016   }
1017 
1018   /**
1019    * Method to account for the size of retained cells and retained data blocks.
1020    * @return an object that represents the last referenced block from this response.
1021    */
1022   Object addSize(RpcCallContext context, Result r, Object lastBlock) {
1023     if (context != null && !r.isEmpty()) {
1024       for (Cell c : r.rawCells()) {
1025         context.incrementResponseCellSize(CellUtil.estimatedHeapSizeOf(c));
1026         // We're using the last block being the same as the current block as
1027         // a proxy for pointing to a new block. This won't be exact.
1028         // If there are multiple gets that bounce back and forth
1029         // Then it's possible that this will over count the size of
1030         // referenced blocks. However it's better to over count and
1031         // use two RPC's than to OOME the RegionServer.
1032         byte[] valueArray = c.getValueArray();
1033         if (valueArray != lastBlock) {
1034           context.incrementResponseBlockSize(valueArray.length);
1035           lastBlock = valueArray;
1036         }
1037       }
1038     }
1039     return lastBlock;
1040   }
1041 
1042 
1043   /**
1044    * Find the HRegion based on a region specifier
1045    *
1046    * @param regionSpecifier the region specifier
1047    * @return the corresponding region
1048    * @throws IOException if the specifier is not null,
1049    *    but failed to find the region
1050    */
1051   Region getRegion(
1052       final RegionSpecifier regionSpecifier) throws IOException {
1053     return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
1054         ProtobufUtil.getRegionEncodedName(regionSpecifier));
1055   }
1056 
1057   @VisibleForTesting
1058   public PriorityFunction getPriority() {
1059     return priority;
1060   }
1061 
1062   @VisibleForTesting
1063   public Configuration getConfiguration() {
1064     return regionServer.getConfiguration();
1065   }
1066 
1067   private RegionServerQuotaManager getQuotaManager() {
1068     return regionServer.getRegionServerQuotaManager();
1069   }
1070 
1071   void start() {
1072     rpcServer.start();
1073   }
1074 
1075   void stop() {
1076     closeAllScanners();
1077     rpcServer.stop();
1078   }
1079 
1080   /**
1081    * Called to verify that this server is up and running.
1082    *
1083    * @throws IOException
1084    */
1085   protected void checkOpen() throws IOException {
1086     if (regionServer.isAborted()) {
1087       throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1088     }
1089     if (regionServer.isStopped()) {
1090       throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1091     }
1092     if (!regionServer.fsOk) {
1093       throw new RegionServerStoppedException("File system not available");
1094     }
1095     if (!regionServer.isOnline()) {
1096       throw new ServerNotRunningYetException("Server is not running yet");
1097     }
1098   }
1099 
1100   /**
1101    * @return list of blocking services and their security info classes that this server supports
1102    */
1103   protected List<BlockingServiceAndInterface> getServices() {
1104     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1105     bssi.add(new BlockingServiceAndInterface(
1106       ClientService.newReflectiveBlockingService(this),
1107       ClientService.BlockingInterface.class));
1108     bssi.add(new BlockingServiceAndInterface(
1109       AdminService.newReflectiveBlockingService(this),
1110       AdminService.BlockingInterface.class));
1111     return bssi;
1112   }
1113 
1114   public InetSocketAddress getSocketAddress() {
1115     return isa;
1116   }
1117 
1118   @Override
1119   public int getPriority(RequestHeader header, Message param, User user) {
1120     return priority.getPriority(header, param, user);
1121   }
1122 
1123   @Override
1124   public long getDeadline(RequestHeader header, Message param) {
1125     return priority.getDeadline(header, param);
1126   }
1127 
1128   /*
1129    * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1130    *
1131    * @param e
1132    *
1133    * @return True if we OOME'd and are aborting.
1134    */
1135   @Override
1136   public boolean checkOOME(final Throwable e) {
1137     boolean stop = false;
1138     try {
1139       if (e instanceof OutOfMemoryError
1140           || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1141           || (e.getMessage() != null && e.getMessage().contains(
1142               "java.lang.OutOfMemoryError"))) {
1143         stop = true;
1144         LOG.fatal("Run out of memory; " + getClass().getSimpleName()
1145           + " will abort itself immediately", e);
1146       }
1147     } finally {
1148       if (stop) {
1149         Runtime.getRuntime().halt(1);
1150       }
1151     }
1152     return stop;
1153   }
1154 
1155   /**
1156    * Close a region on the region server.
1157    *
1158    * @param controller the RPC controller
1159    * @param request the request
1160    * @throws ServiceException
1161    */
1162   @Override
1163   @QosPriority(priority=HConstants.ADMIN_QOS)
1164   public CloseRegionResponse closeRegion(final RpcController controller,
1165       final CloseRegionRequest request) throws ServiceException {
1166     final ServerName sn = (request.hasDestinationServer() ?
1167       ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1168 
1169     try {
1170       checkOpen();
1171       if (request.hasServerStartCode()) {
1172         // check that we are the same server that this RPC is intended for.
1173         long serverStartCode = request.getServerStartCode();
1174         if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1175           throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1176               "different server with startCode: " + serverStartCode + ", this server is: "
1177               + regionServer.serverName));
1178         }
1179       }
1180       final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1181 
1182       // Can be null if we're calling close on a region that's not online
1183       final Region region = regionServer.getFromOnlineRegions(encodedRegionName);
1184       if ((region  != null) && (region .getCoprocessorHost() != null)) {
1185         region.getCoprocessorHost().preClose(false);
1186       }
1187 
1188       requestCount.increment();
1189       LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1190       CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1191         .getCloseRegionCoordination().parseFromProtoRequest(request);
1192 
1193       boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1194       CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1195       return builder.build();
1196     } catch (IOException ie) {
1197       throw new ServiceException(ie);
1198     }
1199   }
1200 
1201   /**
1202    * Compact a region on the region server.
1203    *
1204    * @param controller the RPC controller
1205    * @param request the request
1206    * @throws ServiceException
1207    */
1208   @Override
1209   @QosPriority(priority=HConstants.ADMIN_QOS)
1210   public CompactRegionResponse compactRegion(final RpcController controller,
1211       final CompactRegionRequest request) throws ServiceException {
1212     try {
1213       checkOpen();
1214       requestCount.increment();
1215       Region region = getRegion(request.getRegion());
1216       region.startRegionOperation(Operation.COMPACT_REGION);
1217       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1218       boolean major = false;
1219       byte [] family = null;
1220       Store store = null;
1221       if (request.hasFamily()) {
1222         family = request.getFamily().toByteArray();
1223         store = region.getStore(family);
1224         if (store == null) {
1225           throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1226             + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1227         }
1228       }
1229       if (request.hasMajor()) {
1230         major = request.getMajor();
1231       }
1232       if (major) {
1233         if (family != null) {
1234           store.triggerMajorCompaction();
1235         } else {
1236           region.triggerMajorCompaction();
1237         }
1238       }
1239 
1240       String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1241       if (LOG.isTraceEnabled()) {
1242         LOG.trace("User-triggered compaction requested for region "
1243           + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1244       }
1245       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1246       if(family != null) {
1247         regionServer.compactSplitThread.requestCompaction(region, store, log,
1248           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1249       } else {
1250         regionServer.compactSplitThread.requestCompaction(region, log,
1251           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1252       }
1253       return CompactRegionResponse.newBuilder().build();
1254     } catch (IOException ie) {
1255       throw new ServiceException(ie);
1256     }
1257   }
1258 
1259   /**
1260    * Flush a region on the region server.
1261    *
1262    * @param controller the RPC controller
1263    * @param request the request
1264    * @throws ServiceException
1265    */
1266   @Override
1267   @QosPriority(priority=HConstants.ADMIN_QOS)
1268   public FlushRegionResponse flushRegion(final RpcController controller,
1269       final FlushRegionRequest request) throws ServiceException {
1270     try {
1271       checkOpen();
1272       requestCount.increment();
1273       Region region = getRegion(request.getRegion());
1274       LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1275       boolean shouldFlush = true;
1276       if (request.hasIfOlderThanTs()) {
1277         shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1278       }
1279       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1280       if (shouldFlush) {
1281         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1282             request.getWriteFlushWalMarker() : false;
1283         long startTime = EnvironmentEdgeManager.currentTime();
1284         // Go behind the curtain so we can manage writing of the flush WAL marker
1285         HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1286             ((HRegion)region).flushcache(true, writeFlushWalMarker);
1287         if (flushResult.isFlushSucceeded()) {
1288           long endTime = EnvironmentEdgeManager.currentTime();
1289           regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1290         }
1291         boolean compactionNeeded = flushResult.isCompactionNeeded();
1292         if (compactionNeeded) {
1293           regionServer.compactSplitThread.requestSystemCompaction(region,
1294             "Compaction through user triggered flush");
1295         }
1296         builder.setFlushed(flushResult.isFlushSucceeded());
1297         builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1298       }
1299       builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1300       return builder.build();
1301     } catch (DroppedSnapshotException ex) {
1302       // Cache flush can fail in a few places. If it fails in a critical
1303       // section, we get a DroppedSnapshotException and a replay of wal
1304       // is required. Currently the only way to do this is a restart of
1305       // the server.
1306       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1307       throw new ServiceException(ex);
1308     } catch (IOException ie) {
1309       throw new ServiceException(ie);
1310     }
1311   }
1312 
1313   @Override
1314   @QosPriority(priority=HConstants.ADMIN_QOS)
1315   public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1316       final GetOnlineRegionRequest request) throws ServiceException {
1317     try {
1318       checkOpen();
1319       requestCount.increment();
1320       Map<String, Region> onlineRegions = regionServer.onlineRegions;
1321       List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1322       for (Region region: onlineRegions.values()) {
1323         list.add(region.getRegionInfo());
1324       }
1325       Collections.sort(list);
1326       return ResponseConverter.buildGetOnlineRegionResponse(list);
1327     } catch (IOException ie) {
1328       throw new ServiceException(ie);
1329     }
1330   }
1331 
1332   @Override
1333   @QosPriority(priority=HConstants.ADMIN_QOS)
1334   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1335       final GetRegionInfoRequest request) throws ServiceException {
1336     try {
1337       checkOpen();
1338       requestCount.increment();
1339       Region region = getRegion(request.getRegion());
1340       HRegionInfo info = region.getRegionInfo();
1341       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1342       builder.setRegionInfo(HRegionInfo.convert(info));
1343       if (request.hasCompactionState() && request.getCompactionState()) {
1344         builder.setCompactionState(region.getCompactionState());
1345       }
1346       builder.setIsRecovering(region.isRecovering());
1347       return builder.build();
1348     } catch (IOException ie) {
1349       throw new ServiceException(ie);
1350     }
1351   }
1352 
1353   /**
1354    * Get some information of the region server.
1355    *
1356    * @param controller the RPC controller
1357    * @param request the request
1358    * @throws ServiceException
1359    */
1360   @Override
1361   @QosPriority(priority=HConstants.ADMIN_QOS)
1362   public GetServerInfoResponse getServerInfo(final RpcController controller,
1363       final GetServerInfoRequest request) throws ServiceException {
1364     try {
1365       checkOpen();
1366     } catch (IOException ie) {
1367       throw new ServiceException(ie);
1368     }
1369     requestCount.increment();
1370     int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1371     return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1372   }
1373 
1374   @Override
1375   @QosPriority(priority=HConstants.ADMIN_QOS)
1376   public GetStoreFileResponse getStoreFile(final RpcController controller,
1377       final GetStoreFileRequest request) throws ServiceException {
1378     try {
1379       checkOpen();
1380       Region region = getRegion(request.getRegion());
1381       requestCount.increment();
1382       Set<byte[]> columnFamilies;
1383       if (request.getFamilyCount() == 0) {
1384         columnFamilies = region.getTableDesc().getFamiliesKeys();
1385       } else {
1386         columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1387         for (ByteString cf: request.getFamilyList()) {
1388           columnFamilies.add(cf.toByteArray());
1389         }
1390       }
1391       int nCF = columnFamilies.size();
1392       List<String>  fileList = region.getStoreFileList(
1393         columnFamilies.toArray(new byte[nCF][]));
1394       GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1395       builder.addAllStoreFile(fileList);
1396       return builder.build();
1397     } catch (IOException ie) {
1398       throw new ServiceException(ie);
1399     }
1400   }
1401 
1402   /**
1403    * Merge regions on the region server.
1404    *
1405    * @param controller the RPC controller
1406    * @param request the request
1407    * @return merge regions response
1408    * @throws ServiceException
1409    */
1410   @Override
1411   @QosPriority(priority = HConstants.ADMIN_QOS)
1412   public MergeRegionsResponse mergeRegions(final RpcController controller,
1413       final MergeRegionsRequest request) throws ServiceException {
1414     try {
1415       checkOpen();
1416       requestCount.increment();
1417       Region regionA = getRegion(request.getRegionA());
1418       Region regionB = getRegion(request.getRegionB());
1419       boolean forcible = request.getForcible();
1420       long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1421       regionA.startRegionOperation(Operation.MERGE_REGION);
1422       regionB.startRegionOperation(Operation.MERGE_REGION);
1423       if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1424           regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1425         throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1426       }
1427       LOG.info("Receiving merging request for  " + regionA + ", " + regionB
1428           + ",forcible=" + forcible);
1429       long startTime = EnvironmentEdgeManager.currentTime();
1430       FlushResult flushResult = regionA.flush(true);
1431       if (flushResult.isFlushSucceeded()) {
1432         long endTime = EnvironmentEdgeManager.currentTime();
1433         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1434       }
1435       startTime = EnvironmentEdgeManager.currentTime();
1436       flushResult = regionB.flush(true);
1437       if (flushResult.isFlushSucceeded()) {
1438         long endTime = EnvironmentEdgeManager.currentTime();
1439         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1440       }
1441       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1442           masterSystemTime, RpcServer.getRequestUser());
1443       return MergeRegionsResponse.newBuilder().build();
1444     } catch (DroppedSnapshotException ex) {
1445       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1446       throw new ServiceException(ex);
1447     } catch (IOException ie) {
1448       throw new ServiceException(ie);
1449     }
1450   }
1451 
1452   /**
1453    * Open asynchronously a region or a set of regions on the region server.
1454    *
1455    * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1456    *  before being called. As a consequence, this method should be called only from the master.
1457    * <p>
1458    * Different manages states for the region are:
1459    * </p><ul>
1460    *  <li>region not opened: the region opening will start asynchronously.</li>
1461    *  <li>a close is already in progress: this is considered as an error.</li>
1462    *  <li>an open is already in progress: this new open request will be ignored. This is important
1463    *  because the Master can do multiple requests if it crashes.</li>
1464    *  <li>the region is already opened:  this new open request will be ignored.</li>
1465    *  </ul>
1466    * <p>
1467    * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1468    * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1469    * errors are put in the response as FAILED_OPENING.
1470    * </p>
1471    * @param controller the RPC controller
1472    * @param request the request
1473    * @throws ServiceException
1474    */
1475   @Override
1476   @QosPriority(priority=HConstants.ADMIN_QOS)
1477   public OpenRegionResponse openRegion(final RpcController controller,
1478       final OpenRegionRequest request) throws ServiceException {
1479     requestCount.increment();
1480     if (request.hasServerStartCode()) {
1481       // check that we are the same server that this RPC is intended for.
1482       long serverStartCode = request.getServerStartCode();
1483       if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1484         throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1485             "different server with startCode: " + serverStartCode + ", this server is: "
1486             + regionServer.serverName));
1487       }
1488     }
1489 
1490     OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1491     final int regionCount = request.getOpenInfoCount();
1492     final Map<TableName, HTableDescriptor> htds =
1493         new HashMap<TableName, HTableDescriptor>(regionCount);
1494     final boolean isBulkAssign = regionCount > 1;
1495     try {
1496       checkOpen();
1497     } catch (IOException ie) {
1498       TableName tableName = null;
1499       if (regionCount == 1) {
1500         RegionInfo ri = request.getOpenInfo(0).getRegion();
1501         if (ri != null) {
1502           tableName = ProtobufUtil.toTableName(ri.getTableName());
1503         }
1504       }
1505       if (!TableName.META_TABLE_NAME.equals(tableName)) {
1506         throw new ServiceException(ie);
1507       }
1508       // We are assigning meta, wait a little for regionserver to finish initialization.
1509       int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1510         HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
1511       long endTime = System.currentTimeMillis() + timeout;
1512       synchronized (regionServer.online) {
1513         try {
1514           while (System.currentTimeMillis() <= endTime
1515               && !regionServer.isStopped() && !regionServer.isOnline()) {
1516             regionServer.online.wait(regionServer.msgInterval);
1517           }
1518           checkOpen();
1519         } catch (InterruptedException t) {
1520           Thread.currentThread().interrupt();
1521           throw new ServiceException(t);
1522         } catch (IOException e) {
1523           throw new ServiceException(e);
1524         }
1525       }
1526     }
1527 
1528     long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1529 
1530     for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1531       final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1532       OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1533         getOpenRegionCoordination();
1534       OpenRegionCoordination.OpenRegionDetails ord =
1535         coordination.parseFromProtoRequest(regionOpenInfo);
1536 
1537       HTableDescriptor htd;
1538       try {
1539         final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1540         if (onlineRegion != null) {
1541           //Check if the region can actually be opened.
1542           if (onlineRegion.getCoprocessorHost() != null) {
1543             onlineRegion.getCoprocessorHost().preOpen();
1544           }
1545           // See HBASE-5094. Cross check with hbase:meta if still this RS is owning
1546           // the region.
1547           Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1548             regionServer.getConnection(), region.getRegionName());
1549           if (regionServer.serverName.equals(p.getSecond())) {
1550             Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1551             // Map regionsInTransitionInRSOnly has an entry for a region only if the region
1552             // is in transition on this RS, so here closing can be null. If not null, it can
1553             // be true or false. True means the region is opening on this RS; while false
1554             // means the region is closing. Only return ALREADY_OPENED if not closing (i.e.
1555             // not in transition any more, or still transition to open.
1556             if (!Boolean.FALSE.equals(closing)
1557                 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1558               LOG.warn("Attempted open of " + region.getEncodedName()
1559                 + " but already online on this server");
1560               builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1561               continue;
1562             }
1563           } else {
1564             LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1565               + " but hbase:meta does not have this server - continue opening.");
1566             regionServer.removeFromOnlineRegions(onlineRegion, null);
1567           }
1568         }
1569         LOG.info("Open " + region.getRegionNameAsString());
1570         htd = htds.get(region.getTable());
1571         if (htd == null) {
1572           htd = regionServer.tableDescriptors.get(region.getTable());
1573           htds.put(region.getTable(), htd);
1574         }
1575 
1576         final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1577           region.getEncodedNameAsBytes(), Boolean.TRUE);
1578 
1579         if (Boolean.FALSE.equals(previous)) {
1580           // There is a close in progress. We need to mark this open as failed in ZK.
1581 
1582           coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1583 
1584           throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1585             + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1586         }
1587 
1588         if (Boolean.TRUE.equals(previous)) {
1589           // An open is in progress. This is supported, but let's log this.
1590           LOG.info("Receiving OPEN for the region:" +
1591             region.getRegionNameAsString() + " , which we are already trying to OPEN"
1592               + " - ignoring this new request for this region.");
1593         }
1594 
1595         // We are opening this region. If it moves back and forth for whatever reason, we don't
1596         // want to keep returning the stale moved record while we are opening/if we close again.
1597         regionServer.removeFromMovedRegions(region.getEncodedName());
1598 
1599         if (previous == null) {
1600           // check if the region to be opened is marked in recovering state in ZK
1601           if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1602               region.getEncodedName())) {
1603             // Check if current region open is for distributedLogReplay. This check is to support
1604             // rolling restart/upgrade where we want to Master/RS see same configuration
1605             if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1606                   || regionOpenInfo.getOpenForDistributedLogReplay()) {
1607               regionServer.recoveringRegions.put(region.getEncodedName(), null);
1608             } else {
1609               // Remove stale recovery region from ZK when we open region not for recovering which
1610               // could happen when turn distributedLogReplay off from on.
1611               List<String> tmpRegions = new ArrayList<String>();
1612               tmpRegions.add(region.getEncodedName());
1613               ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1614                 tmpRegions);
1615             }
1616           }
1617           // If there is no action in progress, we can submit a specific handler.
1618           // Need to pass the expected version in the constructor.
1619           if (region.isMetaRegion()) {
1620             regionServer.service.submit(new OpenMetaHandler(
1621               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1622           } else {
1623             regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1624               regionOpenInfo.getFavoredNodesList());
1625             regionServer.service.submit(new OpenRegionHandler(
1626               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1627           }
1628         }
1629 
1630         builder.addOpeningState(RegionOpeningState.OPENED);
1631 
1632       } catch (KeeperException zooKeeperEx) {
1633         LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1634         throw new ServiceException(zooKeeperEx);
1635       } catch (IOException ie) {
1636         LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1637         if (isBulkAssign) {
1638           builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1639         } else {
1640           throw new ServiceException(ie);
1641         }
1642       }
1643     }
1644     return builder.build();
1645   }
1646 
1647   /**
1648    *  Wamrmup a region on this server.
1649    *
1650    * This method should only be called by Master. It synchrnously opens the region and
1651    * closes the region bringing the most important pages in cache.
1652    * <p>
1653    *
1654    * @param controller the RPC controller
1655    * @param request the request
1656    * @throws ServiceException
1657    */
1658   @Override
1659   public WarmupRegionResponse warmupRegion(final RpcController controller,
1660       final WarmupRegionRequest request) throws ServiceException {
1661 
1662     RegionInfo regionInfo = request.getRegionInfo();
1663     final HRegionInfo region = HRegionInfo.convert(regionInfo);
1664     HTableDescriptor htd;
1665     WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1666 
1667     try {
1668       checkOpen();
1669       String encodedName = region.getEncodedName();
1670       byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1671       final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1672 
1673       if (onlineRegion != null) {
1674         LOG.info("Region already online. Skipping warming up " + region);
1675         return response;
1676       }
1677 
1678       if (LOG.isDebugEnabled()) {
1679         LOG.debug("Warming up Region " + region.getRegionNameAsString());
1680       }
1681 
1682       htd = regionServer.tableDescriptors.get(region.getTable());
1683 
1684       if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1685         LOG.info("Region is in transition. Skipping warmup " + region);
1686         return response;
1687       }
1688 
1689       HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1690           regionServer.getConfiguration(), regionServer, null);
1691 
1692     } catch (IOException ie) {
1693       LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1694       throw new ServiceException(ie);
1695     }
1696 
1697     return response;
1698   }
1699 
1700   /**
1701    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
1702    * that the given mutations will be durable on the receiving RS if this method returns without any
1703    * exception.
1704    * @param controller the RPC controller
1705    * @param request the request
1706    * @throws ServiceException
1707    */
1708   @Override
1709   @QosPriority(priority = HConstants.REPLAY_QOS)
1710   public ReplicateWALEntryResponse replay(final RpcController controller,
1711       final ReplicateWALEntryRequest request) throws ServiceException {
1712     long before = EnvironmentEdgeManager.currentTime();
1713     CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1714     try {
1715       checkOpen();
1716       List<WALEntry> entries = request.getEntryList();
1717       if (entries == null || entries.isEmpty()) {
1718         // empty input
1719         return ReplicateWALEntryResponse.newBuilder().build();
1720       }
1721       ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1722       Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1723       RegionCoprocessorHost coprocessorHost =
1724           ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1725             ? region.getCoprocessorHost()
1726             : null; // do not invoke coprocessors if this is a secondary region replica
1727       List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1728 
1729       // Skip adding the edits to WAL if this is a secondary region replica
1730       boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1731       Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1732 
1733       for (WALEntry entry : entries) {
1734         if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1735           throw new NotServingRegionException("Replay request contains entries from multiple " +
1736               "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1737               + entry.getKey().getEncodedRegionName());
1738         }
1739         if (regionServer.nonceManager != null && isPrimary) {
1740           long nonceGroup = entry.getKey().hasNonceGroup()
1741             ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1742           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1743           regionServer.nonceManager.reportOperationFromWal(
1744               nonceGroup,
1745               nonce,
1746               entry.getKey().getWriteTime());
1747         }
1748         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1749           new Pair<WALKey, WALEdit>();
1750         List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1751           cells, walEntry, durability);
1752         if (coprocessorHost != null) {
1753           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
1754           // KeyValue.
1755           if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1756             walEntry.getSecond())) {
1757             // if bypass this log entry, ignore it ...
1758             continue;
1759           }
1760           walEntries.add(walEntry);
1761         }
1762         if(edits!=null && !edits.isEmpty()) {
1763           long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1764             entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1765           OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1766           // check if it's a partial success
1767           for (int i = 0; result != null && i < result.length; i++) {
1768             if (result[i] != OperationStatus.SUCCESS) {
1769               throw new IOException(result[i].getExceptionMsg());
1770             }
1771           }
1772         }
1773       }
1774 
1775       //sync wal at the end because ASYNC_WAL is used above
1776       WAL wal = getWAL(region);
1777       if (wal != null) {
1778         wal.sync();
1779       }
1780 
1781       if (coprocessorHost != null) {
1782         for (Pair<WALKey, WALEdit> entry : walEntries) {
1783           coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1784             entry.getSecond());
1785         }
1786       }
1787       return ReplicateWALEntryResponse.newBuilder().build();
1788     } catch (IOException ie) {
1789       throw new ServiceException(ie);
1790     } finally {
1791       if (regionServer.metricsRegionServer != null) {
1792         regionServer.metricsRegionServer.updateReplay(
1793           EnvironmentEdgeManager.currentTime() - before);
1794       }
1795     }
1796   }
1797 
1798   WAL getWAL(Region region) {
1799     return ((HRegion)region).getWAL();
1800   }
1801 
1802   /**
1803    * Replicate WAL entries on the region server.
1804    *
1805    * @param controller the RPC controller
1806    * @param request the request
1807    * @throws ServiceException
1808    */
1809   @Override
1810   @QosPriority(priority=HConstants.REPLICATION_QOS)
1811   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1812       final ReplicateWALEntryRequest request) throws ServiceException {
1813     try {
1814       checkOpen();
1815       if (regionServer.replicationSinkHandler != null) {
1816         requestCount.increment();
1817         List<WALEntry> entries = request.getEntryList();
1818         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1819         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1820         regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
1821         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1822         return ReplicateWALEntryResponse.newBuilder().build();
1823       } else {
1824         throw new ServiceException("Replication services are not initialized yet");
1825       }
1826     } catch (IOException ie) {
1827       throw new ServiceException(ie);
1828     }
1829   }
1830 
1831   /**
1832    * Roll the WAL writer of the region server.
1833    * @param controller the RPC controller
1834    * @param request the request
1835    * @throws ServiceException
1836    */
1837   @Override
1838   public RollWALWriterResponse rollWALWriter(final RpcController controller,
1839       final RollWALWriterRequest request) throws ServiceException {
1840     try {
1841       checkOpen();
1842       requestCount.increment();
1843       regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1844       regionServer.walRoller.requestRollAll();
1845       regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1846       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1847       return builder.build();
1848     } catch (IOException ie) {
1849       throw new ServiceException(ie);
1850     }
1851   }
1852 
1853   /**
1854    * Split a region on the region server.
1855    *
1856    * @param controller the RPC controller
1857    * @param request the request
1858    * @throws ServiceException
1859    */
1860   @Override
1861   @QosPriority(priority=HConstants.ADMIN_QOS)
1862   public SplitRegionResponse splitRegion(final RpcController controller,
1863       final SplitRegionRequest request) throws ServiceException {
1864     try {
1865       checkOpen();
1866       requestCount.increment();
1867       Region region = getRegion(request.getRegion());
1868       region.startRegionOperation(Operation.SPLIT_REGION);
1869       if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1870         throw new IOException("Can't split replicas directly. "
1871             + "Replicas are auto-split when their primary is split.");
1872       }
1873       LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1874       long startTime = EnvironmentEdgeManager.currentTime();
1875       FlushResult flushResult = region.flush(true);
1876       if (flushResult.isFlushSucceeded()) {
1877         long endTime = EnvironmentEdgeManager.currentTime();
1878         regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
1879       }
1880       byte[] splitPoint = null;
1881       if (request.hasSplitPoint()) {
1882         splitPoint = request.getSplitPoint().toByteArray();
1883       }
1884       ((HRegion)region).forceSplit(splitPoint);
1885       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1886         RpcServer.getRequestUser());
1887       return SplitRegionResponse.newBuilder().build();
1888     } catch (DroppedSnapshotException ex) {
1889       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1890       throw new ServiceException(ex);
1891     } catch (IOException ie) {
1892       throw new ServiceException(ie);
1893     }
1894   }
1895 
1896   /**
1897    * Stop the region server.
1898    *
1899    * @param controller the RPC controller
1900    * @param request the request
1901    * @throws ServiceException
1902    */
1903   @Override
1904   @QosPriority(priority=HConstants.ADMIN_QOS)
1905   public StopServerResponse stopServer(final RpcController controller,
1906       final StopServerRequest request) throws ServiceException {
1907     requestCount.increment();
1908     String reason = request.getReason();
1909     regionServer.stop(reason);
1910     return StopServerResponse.newBuilder().build();
1911   }
1912 
1913   @Override
1914   public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1915       UpdateFavoredNodesRequest request) throws ServiceException {
1916     List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1917     UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1918     for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1919       HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1920       regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1921         regionUpdateInfo.getFavoredNodesList());
1922     }
1923     respBuilder.setResponse(openInfoList.size());
1924     return respBuilder.build();
1925   }
1926 
1927   /**
1928    * Atomically bulk load several HFiles into an open region
1929    * @return true if successful, false is failed but recoverably (no action)
1930    * @throws ServiceException if failed unrecoverably
1931    */
1932   @Override
1933   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1934       final BulkLoadHFileRequest request) throws ServiceException {
1935     try {
1936       checkOpen();
1937       requestCount.increment();
1938       Region region = getRegion(request.getRegion());
1939       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1940       for (FamilyPath familyPath: request.getFamilyPathList()) {
1941         familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1942           familyPath.getPath()));
1943       }
1944       boolean bypass = false;
1945       if (region.getCoprocessorHost() != null) {
1946         bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1947       }
1948       boolean loaded = false;
1949       if (!bypass) {
1950         loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
1951       }
1952       if (region.getCoprocessorHost() != null) {
1953         loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
1954       }
1955       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
1956       builder.setLoaded(loaded);
1957       return builder.build();
1958     } catch (IOException ie) {
1959       throw new ServiceException(ie);
1960     }
1961   }
1962 
1963   @Override
1964   public CoprocessorServiceResponse execService(final RpcController controller,
1965       final CoprocessorServiceRequest request) throws ServiceException {
1966     try {
1967       checkOpen();
1968       requestCount.increment();
1969       Region region = getRegion(request.getRegion());
1970       Message result = execServiceOnRegion(region, request.getCall());
1971       CoprocessorServiceResponse.Builder builder =
1972         CoprocessorServiceResponse.newBuilder();
1973       builder.setRegion(RequestConverter.buildRegionSpecifier(
1974         RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
1975       builder.setValue(
1976         builder.getValueBuilder().setName(result.getClass().getName())
1977           .setValue(result.toByteString()));
1978       return builder.build();
1979     } catch (IOException ie) {
1980       throw new ServiceException(ie);
1981     }
1982   }
1983 
1984   private Message execServiceOnRegion(Region region,
1985       final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
1986     // ignore the passed in controller (from the serialized call)
1987     ServerRpcController execController = new ServerRpcController();
1988     return region.execService(execController, serviceCall);
1989   }
1990 
1991   /**
1992    * Get data from a table.
1993    *
1994    * @param controller the RPC controller
1995    * @param request the get request
1996    * @throws ServiceException
1997    */
1998   @Override
1999   public GetResponse get(final RpcController controller,
2000       final GetRequest request) throws ServiceException {
2001     long before = EnvironmentEdgeManager.currentTime();
2002     OperationQuota quota = null;
2003     try {
2004       checkOpen();
2005       requestCount.increment();
2006       Region region = getRegion(request.getRegion());
2007 
2008       GetResponse.Builder builder = GetResponse.newBuilder();
2009       ClientProtos.Get get = request.getGet();
2010       Boolean existence = null;
2011       Result r = null;
2012       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2013 
2014       if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2015         if (get.getColumnCount() != 1) {
2016           throw new DoNotRetryIOException(
2017             "get ClosestRowBefore supports one and only one family now, not "
2018               + get.getColumnCount() + " families");
2019         }
2020         byte[] row = get.getRow().toByteArray();
2021         byte[] family = get.getColumn(0).getFamily().toByteArray();
2022         r = region.getClosestRowBefore(row, family);
2023       } else {
2024         Get clientGet = ProtobufUtil.toGet(get);
2025         if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2026           existence = region.getCoprocessorHost().preExists(clientGet);
2027         }
2028         if (existence == null) {
2029           r = region.get(clientGet);
2030           if (get.getExistenceOnly()) {
2031             boolean exists = r.getExists();
2032             if (region.getCoprocessorHost() != null) {
2033               exists = region.getCoprocessorHost().postExists(clientGet, exists);
2034             }
2035             existence = exists;
2036           }
2037         }
2038       }
2039       if (existence != null){
2040         ClientProtos.Result pbr =
2041             ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2042         builder.setResult(pbr);
2043       } else  if (r != null) {
2044         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2045         builder.setResult(pbr);
2046       }
2047       if (r != null) {
2048         quota.addGetResult(r);
2049       }
2050       return builder.build();
2051     } catch (IOException ie) {
2052       throw new ServiceException(ie);
2053     } finally {
2054       if (regionServer.metricsRegionServer != null) {
2055         regionServer.metricsRegionServer.updateGet(
2056           EnvironmentEdgeManager.currentTime() - before);
2057       }
2058       if (quota != null) {
2059         quota.close();
2060       }
2061     }
2062   }
2063 
2064   /**
2065    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2066    *
2067    * @param rpcc the RPC controller
2068    * @param request the multi request
2069    * @throws ServiceException
2070    */
2071   @Override
2072   public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2073   throws ServiceException {
2074     try {
2075       checkOpen();
2076     } catch (IOException ie) {
2077       throw new ServiceException(ie);
2078     }
2079 
2080     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2081     // It is also the conduit via which we pass back data.
2082     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2083     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2084     if (controller != null) {
2085       controller.setCellScanner(null);
2086     }
2087 
2088     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2089 
2090     // this will contain all the cells that we need to return. It's created later, if needed.
2091     List<CellScannable> cellsToReturn = null;
2092     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2093     RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2094     Boolean processed = null;
2095 
2096     for (RegionAction regionAction : request.getRegionActionList()) {
2097       this.requestCount.add(regionAction.getActionCount());
2098       OperationQuota quota;
2099       Region region;
2100       regionActionResultBuilder.clear();
2101       try {
2102         region = getRegion(regionAction.getRegion());
2103         quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
2104       } catch (IOException e) {
2105         rpcServer.getMetrics().exception(e);
2106         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2107         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2108         // All Mutations in this RegionAction not executed as we can not see the Region online here
2109         // in this RS. Will be retried from Client. Skipping all the Cells in CellScanner
2110         // corresponding to these Mutations.
2111         if (cellScanner != null) {
2112           skipCellsForMutations(regionAction.getActionList(), cellScanner);
2113         }
2114         continue;  // For this region it's a failure.
2115       }
2116 
2117       if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2118         // How does this call happen?  It may need some work to play well w/ the surroundings.
2119         // Need to return an item per Action along w/ Action index.  TODO.
2120         try {
2121           if (request.hasCondition()) {
2122             Condition condition = request.getCondition();
2123             byte[] row = condition.getRow().toByteArray();
2124             byte[] family = condition.getFamily().toByteArray();
2125             byte[] qualifier = condition.getQualifier().toByteArray();
2126             CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2127             ByteArrayComparable comparator =
2128                 ProtobufUtil.toComparator(condition.getComparator());
2129             processed = checkAndRowMutate(region, regionAction.getActionList(),
2130                   cellScanner, row, family, qualifier, compareOp, comparator);
2131           } else {
2132             ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2133                 cellScanner);
2134             // add the stats to the request
2135             if(stats != null) {
2136               responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2137                   .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2138             }
2139             processed = Boolean.TRUE;
2140           }
2141         } catch (IOException e) {
2142           rpcServer.getMetrics().exception(e);
2143           // As it's atomic, we may expect it's a global failure.
2144           regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2145         }
2146       } else {
2147         // doNonAtomicRegionMutation manages the exception internally
2148         cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2149             regionActionResultBuilder, cellsToReturn, nonceGroup);
2150       }
2151       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2152       quota.close();
2153     }
2154     // Load the controller with the Cells to return.
2155     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2156       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2157     }
2158     if (processed != null) {
2159       responseBuilder.setProcessed(processed);
2160     }
2161     return responseBuilder.build();
2162   }
2163 
2164   private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2165     for (Action action : actions) {
2166       skipCellsForMutation(action, cellScanner);
2167     }
2168   }
2169 
2170   private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2171     try {
2172       if (action.hasMutation()) {
2173         MutationProto m = action.getMutation();
2174         if (m.hasAssociatedCellCount()) {
2175           for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2176             cellScanner.advance();
2177           }
2178         }
2179       }
2180     } catch (IOException e) {
2181       // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2182       // marked as failed as we could not see the Region here. At client side the top level
2183       // RegionAction exception will be considered first.
2184       LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2185     }
2186   }
2187 
2188   /**
2189    * Mutate data in a table.
2190    *
2191    * @param rpcc the RPC controller
2192    * @param request the mutate request
2193    * @throws ServiceException
2194    */
2195   @Override
2196   public MutateResponse mutate(final RpcController rpcc,
2197       final MutateRequest request) throws ServiceException {
2198     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2199     // It is also the conduit via which we pass back data.
2200     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2201     CellScanner cellScanner = controller != null ? controller.cellScanner() : null;
2202     OperationQuota quota = null;
2203     // Clear scanner so we are not holding on to reference across call.
2204     if (controller != null) {
2205       controller.setCellScanner(null);
2206     }
2207     try {
2208       checkOpen();
2209       requestCount.increment();
2210       Region region = getRegion(request.getRegion());
2211       MutateResponse.Builder builder = MutateResponse.newBuilder();
2212       MutationProto mutation = request.getMutation();
2213       if (!region.getRegionInfo().isMetaTable()) {
2214         regionServer.cacheFlusher.reclaimMemStoreMemory();
2215       }
2216       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2217       Result r = null;
2218       Boolean processed = null;
2219       MutationType type = mutation.getMutateType();
2220       long mutationSize = 0;
2221       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2222       switch (type) {
2223       case APPEND:
2224         // TODO: this doesn't actually check anything.
2225         r = append(region, quota, mutation, cellScanner, nonceGroup);
2226         break;
2227       case INCREMENT:
2228         // TODO: this doesn't actually check anything.
2229         r = increment(region, quota, mutation, cellScanner, nonceGroup);
2230         break;
2231       case PUT:
2232         Put put = ProtobufUtil.toPut(mutation, cellScanner);
2233         quota.addMutation(put);
2234         if (request.hasCondition()) {
2235           Condition condition = request.getCondition();
2236           byte[] row = condition.getRow().toByteArray();
2237           byte[] family = condition.getFamily().toByteArray();
2238           byte[] qualifier = condition.getQualifier().toByteArray();
2239           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2240           ByteArrayComparable comparator =
2241             ProtobufUtil.toComparator(condition.getComparator());
2242           if (region.getCoprocessorHost() != null) {
2243             processed = region.getCoprocessorHost().preCheckAndPut(
2244               row, family, qualifier, compareOp, comparator, put);
2245           }
2246           if (processed == null) {
2247             boolean result = region.checkAndMutate(row, family,
2248               qualifier, compareOp, comparator, put, true);
2249             if (region.getCoprocessorHost() != null) {
2250               result = region.getCoprocessorHost().postCheckAndPut(row, family,
2251                 qualifier, compareOp, comparator, put, result);
2252             }
2253             processed = result;
2254           }
2255         } else {
2256           region.put(put);
2257           processed = Boolean.TRUE;
2258         }
2259         break;
2260       case DELETE:
2261         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2262         quota.addMutation(delete);
2263         if (request.hasCondition()) {
2264           Condition condition = request.getCondition();
2265           byte[] row = condition.getRow().toByteArray();
2266           byte[] family = condition.getFamily().toByteArray();
2267           byte[] qualifier = condition.getQualifier().toByteArray();
2268           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2269           ByteArrayComparable comparator =
2270             ProtobufUtil.toComparator(condition.getComparator());
2271           if (region.getCoprocessorHost() != null) {
2272             processed = region.getCoprocessorHost().preCheckAndDelete(
2273               row, family, qualifier, compareOp, comparator, delete);
2274           }
2275           if (processed == null) {
2276             boolean result = region.checkAndMutate(row, family,
2277               qualifier, compareOp, comparator, delete, true);
2278             if (region.getCoprocessorHost() != null) {
2279               result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2280                 qualifier, compareOp, comparator, delete, result);
2281             }
2282             processed = result;
2283           }
2284         } else {
2285           region.delete(delete);
2286           processed = Boolean.TRUE;
2287         }
2288         break;
2289       default:
2290           throw new DoNotRetryIOException(
2291             "Unsupported mutate type: " + type.name());
2292       }
2293       if (processed != null) builder.setProcessed(processed.booleanValue());
2294       addResult(builder, r, controller);
2295       return builder.build();
2296     } catch (IOException ie) {
2297       regionServer.checkFileSystem();
2298       throw new ServiceException(ie);
2299     } finally {
2300       if (quota != null) {
2301         quota.close();
2302       }
2303     }
2304   }
2305 
2306   /**
2307    * Scan data in a table.
2308    *
2309    * @param controller the RPC controller
2310    * @param request the scan request
2311    * @throws ServiceException
2312    */
2313   @Override
2314   public ScanResponse scan(final RpcController controller, final ScanRequest request)
2315   throws ServiceException {
2316     OperationQuota quota = null;
2317     Leases.Lease lease = null;
2318     String scannerName = null;
2319     try {
2320       if (!request.hasScannerId() && !request.hasScan()) {
2321         throw new DoNotRetryIOException(
2322           "Missing required input: scannerId or scan");
2323       }
2324       long scannerId = -1;
2325       if (request.hasScannerId()) {
2326         scannerId = request.getScannerId();
2327         scannerName = String.valueOf(scannerId);
2328       }
2329       try {
2330         checkOpen();
2331       } catch (IOException e) {
2332         // If checkOpen failed, server not running or filesystem gone,
2333         // cancel this lease; filesystem is gone or we're closing or something.
2334         if (scannerName != null) {
2335           LOG.debug("Server shutting down and client tried to access missing scanner "
2336             + scannerName);
2337           if (regionServer.leases != null) {
2338             try {
2339               regionServer.leases.cancelLease(scannerName);
2340             } catch (LeaseException le) {
2341               // No problem, ignore
2342               if (LOG.isTraceEnabled()) {
2343                 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2344               }
2345              }
2346           }
2347         }
2348         throw e;
2349       }
2350       requestCount.increment();
2351 
2352       int ttl = 0;
2353       Region region = null;
2354       RegionScanner scanner = null;
2355       RegionScannerHolder rsh = null;
2356       boolean moreResults = true;
2357       boolean closeScanner = false;
2358       boolean isSmallScan = false;
2359       RpcCallContext context = RpcServer.getCurrentCall();
2360       Object lastBlock = null;
2361 
2362       ScanResponse.Builder builder = ScanResponse.newBuilder();
2363       if (request.hasCloseScanner()) {
2364         closeScanner = request.getCloseScanner();
2365       }
2366       int rows = closeScanner ? 0 : 1;
2367       if (request.hasNumberOfRows()) {
2368         rows = request.getNumberOfRows();
2369       }
2370       if (request.hasScannerId()) {
2371         rsh = scanners.get(scannerName);
2372         if (rsh == null) {
2373           LOG.info("Client tried to access missing scanner " + scannerName);
2374           throw new UnknownScannerException(
2375             "Name: " + scannerName + ", already closed?");
2376         }
2377         scanner = rsh.s;
2378         HRegionInfo hri = scanner.getRegionInfo();
2379         region = regionServer.getRegion(hri.getRegionName());
2380         if (region != rsh.r) { // Yes, should be the same instance
2381           throw new NotServingRegionException("Region was re-opened after the scanner"
2382             + scannerName + " was created: " + hri.getRegionNameAsString());
2383         }
2384       } else {
2385         region = getRegion(request.getRegion());
2386         ClientProtos.Scan protoScan = request.getScan();
2387         boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2388         Scan scan = ProtobufUtil.toScan(protoScan);
2389         // if the request doesn't set this, get the default region setting.
2390         if (!isLoadingCfsOnDemandSet) {
2391           scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2392         }
2393 
2394         isSmallScan = scan.isSmall();
2395         if (!scan.hasFamilies()) {
2396           // Adding all families to scanner
2397           for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2398             scan.addFamily(family);
2399           }
2400         }
2401 
2402         if (region.getCoprocessorHost() != null) {
2403           scanner = region.getCoprocessorHost().preScannerOpen(scan);
2404         }
2405         if (scanner == null) {
2406           scanner = region.getScanner(scan);
2407         }
2408         if (region.getCoprocessorHost() != null) {
2409           scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2410         }
2411         scannerId = addScanner(scanner, region);
2412         scannerName = String.valueOf(scannerId);
2413         ttl = this.scannerLeaseTimeoutPeriod;
2414       }
2415       if (request.hasRenew() && request.getRenew()) {
2416         rsh = scanners.get(scannerName);
2417         lease = regionServer.leases.removeLease(scannerName);
2418         if (lease != null && rsh != null) {
2419           regionServer.leases.addLease(lease);
2420           // Increment the nextCallSeq value which is the next expected from client.
2421           rsh.incNextCallSeq();
2422         }
2423         return builder.build();
2424       }
2425 
2426       quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2427       long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2428       if (rows > 0) {
2429         // if nextCallSeq does not match throw Exception straight away. This needs to be
2430         // performed even before checking of Lease.
2431         // See HBASE-5974
2432         if (request.hasNextCallSeq()) {
2433           if (rsh == null) {
2434             rsh = scanners.get(scannerName);
2435           }
2436           if (rsh != null) {
2437             if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2438               throw new OutOfOrderScannerNextException(
2439                 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2440                 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2441                 "; request=" + TextFormat.shortDebugString(request));
2442             }
2443             // Increment the nextCallSeq value which is the next expected from client.
2444             rsh.incNextCallSeq();
2445           }
2446         }
2447         try {
2448           // Remove lease while its being processed in server; protects against case
2449           // where processing of request takes > lease expiration time.
2450           lease = regionServer.leases.removeLease(scannerName);
2451           List<Result> results = new ArrayList<Result>();
2452 
2453           boolean done = false;
2454           // Call coprocessor. Get region info from scanner.
2455           if (region != null && region.getCoprocessorHost() != null) {
2456             Boolean bypass = region.getCoprocessorHost().preScannerNext(
2457               scanner, results, rows);
2458             if (!results.isEmpty()) {
2459               for (Result r : results) {
2460                 lastBlock = addSize(context, r, lastBlock);
2461               }
2462             }
2463             if (bypass != null && bypass.booleanValue()) {
2464               done = true;
2465             }
2466           }
2467 
2468           if (!done) {
2469             long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2470             if (maxResultSize <= 0) {
2471               maxResultSize = maxQuotaResultSize;
2472             }
2473             // This is cells inside a row. Default size is 10 so if many versions or many cfs,
2474             // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
2475             // arbitrary 32. TODO: keep record of general size of results being returned.
2476             List<Cell> values = new ArrayList<Cell>(32);
2477             region.startRegionOperation(Operation.SCAN);
2478             try {
2479               int i = 0;
2480               synchronized(scanner) {
2481                 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2482                 boolean clientHandlesPartials =
2483                     request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2484                 boolean clientHandlesHeartbeats =
2485                     request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2486 
2487                 // On the server side we must ensure that the correct ordering of partial results is
2488                 // returned to the client to allow them to properly reconstruct the partial results.
2489                 // If the coprocessor host is adding to the result list, we cannot guarantee the
2490                 // correct ordering of partial results and so we prevent partial results from being
2491                 // formed.
2492                 boolean serverGuaranteesOrderOfPartials = results.isEmpty();
2493                 boolean allowPartialResults =
2494                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2495                 boolean moreRows = false;
2496 
2497                 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
2498                 // certain time threshold on the server. When the time threshold is exceeded, the
2499                 // server stops the scan and sends back whatever Results it has accumulated within
2500                 // that time period (may be empty). Since heartbeat messages have the potential to
2501                 // create partial Results (in the event that the timeout occurs in the middle of a
2502                 // row), we must only generate heartbeat messages when the client can handle both
2503                 // heartbeats AND partials
2504                 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2505 
2506                 // Default value of timeLimit is negative to indicate no timeLimit should be
2507                 // enforced.
2508                 long timeLimit = -1;
2509 
2510                 // Set the time limit to be half of the more restrictive timeout value (one of the
2511                 // timeout values must be positive). In the event that both values are positive, the
2512                 // more restrictive of the two is used to calculate the limit.
2513                 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2514                   long timeLimitDelta;
2515                   if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2516                     timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2517                   } else {
2518                     timeLimitDelta =
2519                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2520                   }
2521                   // Use half of whichever timeout value was more restrictive... But don't allow
2522                   // the time limit to be less than the allowable minimum (could cause an
2523                   // immediatate timeout before scanning any data).
2524                   timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2525                   timeLimit = System.currentTimeMillis() + timeLimitDelta;
2526                 }
2527 
2528                 final LimitScope sizeScope =
2529                     allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2530                 final LimitScope timeScope =
2531                     allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2532 
2533                 boolean trackMetrics =
2534                     request.hasTrackScanMetrics() && request.getTrackScanMetrics();
2535 
2536                 // Configure with limits for this RPC. Set keep progress true since size progress
2537                 // towards size limit should be kept between calls to nextRaw
2538                 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2539                 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2540                 contextBuilder.setBatchLimit(scanner.getBatch());
2541                 contextBuilder.setTimeLimit(timeScope, timeLimit);
2542                 contextBuilder.setTrackMetrics(trackMetrics);
2543                 ScannerContext scannerContext = contextBuilder.build();
2544 
2545                 boolean limitReached = false;
2546                 while (i < rows) {
2547                   // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
2548                   // batch limit is a limit on the number of cells per Result. Thus, if progress is
2549                   // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
2550                   // reset the batch progress between nextRaw invocations since we don't want the
2551                   // batch progress from previous calls to affect future calls
2552                   scannerContext.setBatchProgress(0);
2553 
2554                   // Collect values to be returned here
2555                   moreRows = scanner.nextRaw(values, scannerContext);
2556 
2557                   if (!values.isEmpty()) {
2558                     final boolean partial = scannerContext.partialResultFormed();
2559                     Result r = Result.create(values, null, stale, partial);
2560                     lastBlock = addSize(context, r, lastBlock);
2561                     results.add(r);
2562                     i++;
2563                   }
2564 
2565                   boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2566                   boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2567                   boolean rowLimitReached = i >= rows;
2568                   limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2569 
2570                   if (limitReached || !moreRows) {
2571                     if (LOG.isTraceEnabled()) {
2572                       LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2573                           + moreRows + " scannerContext: " + scannerContext);
2574                     }
2575                     // We only want to mark a ScanResponse as a heartbeat message in the event that
2576                     // there are more values to be read server side. If there aren't more values,
2577                     // marking it as a heartbeat is wasteful because the client will need to issue
2578                     // another ScanRequest only to realize that they already have all the values
2579                     if (moreRows) {
2580                       // Heartbeat messages occur when the time limit has been reached.
2581                       builder.setHeartbeatMessage(timeLimitReached);
2582                     }
2583                     break;
2584                   }
2585                   values.clear();
2586                 }
2587 
2588                 if (limitReached || moreRows) {
2589                   // We stopped prematurely
2590                   builder.setMoreResultsInRegion(true);
2591                 } else {
2592                   // We didn't get a single batch
2593                   builder.setMoreResultsInRegion(false);
2594                 }
2595 
2596                 // Check to see if the client requested that we track metrics server side. If the
2597                 // client requested metrics, retrieve the metrics from the scanner context.
2598                 if (trackMetrics) {
2599                   Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
2600                   ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
2601                   NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
2602 
2603                   for (Entry<String, Long> entry : metrics.entrySet()) {
2604                     pairBuilder.setName(entry.getKey());
2605                     pairBuilder.setValue(entry.getValue());
2606                     metricBuilder.addMetrics(pairBuilder.build());
2607                   }
2608 
2609                   builder.setScanMetrics(metricBuilder.build());
2610                 }
2611               }
2612               region.updateReadRequestsCount(i);
2613               long responseCellSize = context != null ? context.getResponseCellSize() : 0;
2614               region.getMetrics().updateScanNext(responseCellSize);
2615               if (regionServer.metricsRegionServer != null) {
2616                 regionServer.metricsRegionServer.updateScannerNext(responseCellSize);
2617               }
2618             } finally {
2619               region.closeRegionOperation();
2620             }
2621 
2622             // coprocessor postNext hook
2623             if (region != null && region.getCoprocessorHost() != null) {
2624               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2625             }
2626           }
2627 
2628           quota.addScanResult(results);
2629 
2630           // If the scanner's filter - if any - is done with the scan
2631           // and wants to tell the client to stop the scan. This is done by passing
2632           // a null result, and setting moreResults to false.
2633           if (scanner.isFilterDone() && results.isEmpty()) {
2634             moreResults = false;
2635             results = null;
2636           } else {
2637             addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2638           }
2639         } catch (IOException e) {
2640           // if we have an exception on scanner next and we are using the callSeq
2641           // we should rollback because the client will retry with the same callSeq
2642           // and get an OutOfOrderScannerNextException if we don't do so.
2643           if (rsh != null && request.hasNextCallSeq()) {
2644             rsh.rollbackNextCallSeq();
2645           }
2646           throw e;
2647         } finally {
2648           // We're done. On way out re-add the above removed lease.
2649           // Adding resets expiration time on lease.
2650           if (scanners.containsKey(scannerName)) {
2651             if (lease != null) regionServer.leases.addLease(lease);
2652             ttl = this.scannerLeaseTimeoutPeriod;
2653           }
2654         }
2655       }
2656 
2657       if (!moreResults || closeScanner) {
2658         ttl = 0;
2659         moreResults = false;
2660         if (region != null && region.getCoprocessorHost() != null) {
2661           if (region.getCoprocessorHost().preScannerClose(scanner)) {
2662             return builder.build(); // bypass
2663           }
2664         }
2665         rsh = scanners.remove(scannerName);
2666         if (rsh != null) {
2667           scanner = rsh.s;
2668           scanner.close();
2669           regionServer.leases.cancelLease(scannerName);
2670           if (region != null && region.getCoprocessorHost() != null) {
2671             region.getCoprocessorHost().postScannerClose(scanner);
2672           }
2673         }
2674       }
2675 
2676       if (ttl > 0) {
2677         builder.setTtl(ttl);
2678       }
2679       builder.setScannerId(scannerId);
2680       builder.setMoreResults(moreResults);
2681       return builder.build();
2682     } catch (IOException ie) {
2683       if (scannerName != null && ie instanceof NotServingRegionException) {
2684         RegionScannerHolder rsh = scanners.remove(scannerName);
2685         if (rsh != null) {
2686           try {
2687             RegionScanner scanner = rsh.s;
2688             LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2689             scanner.close();
2690             regionServer.leases.cancelLease(scannerName);
2691           } catch (IOException e) {
2692            LOG.warn("Getting exception closing " + scannerName, e);
2693           }
2694         }
2695       }
2696       throw new ServiceException(ie);
2697     } finally {
2698       if (quota != null) {
2699         quota.close();
2700       }
2701     }
2702   }
2703 
2704   @Override
2705   public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2706       CoprocessorServiceRequest request) throws ServiceException {
2707     return regionServer.execRegionServerService(controller, request);
2708   }
2709 
2710   @Override
2711   public UpdateConfigurationResponse updateConfiguration(
2712       RpcController controller, UpdateConfigurationRequest request)
2713       throws ServiceException {
2714     try {
2715       this.regionServer.updateConfiguration();
2716     } catch (Exception e) {
2717       throw new ServiceException(e);
2718     }
2719     return UpdateConfigurationResponse.getDefaultInstance();
2720   }
2721 }