View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.BindException;
28  import java.net.InetAddress;
29  import java.net.InetSocketAddress;
30  import java.net.ServerSocket;
31  import java.net.Socket;
32  import java.net.SocketException;
33  import java.net.UnknownHostException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.Channels;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.GatheringByteChannel;
39  import java.nio.channels.ReadableByteChannel;
40  import java.nio.channels.SelectionKey;
41  import java.nio.channels.Selector;
42  import java.nio.channels.ServerSocketChannel;
43  import java.nio.channels.SocketChannel;
44  import java.nio.channels.WritableByteChannel;
45  import java.security.PrivilegedExceptionAction;
46  import java.util.ArrayList;
47  import java.util.Arrays;
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.Map;
54  import java.util.Random;
55  import java.util.Set;
56  import java.util.concurrent.ConcurrentHashMap;
57  import java.util.concurrent.ConcurrentLinkedDeque;
58  import java.util.concurrent.ExecutorService;
59  import java.util.concurrent.Executors;
60  import java.util.concurrent.atomic.AtomicInteger;
61  import java.util.concurrent.locks.Lock;
62  import java.util.concurrent.locks.ReentrantLock;
63  
64  import javax.security.sasl.Sasl;
65  import javax.security.sasl.SaslException;
66  import javax.security.sasl.SaslServer;
67  
68  import org.apache.commons.logging.Log;
69  import org.apache.commons.logging.LogFactory;
70  import org.apache.hadoop.hbase.CallQueueTooBigException;
71  import org.apache.hadoop.hbase.classification.InterfaceAudience;
72  import org.apache.hadoop.hbase.classification.InterfaceStability;
73  import org.apache.hadoop.conf.Configuration;
74  import org.apache.hadoop.hbase.CellScanner;
75  import org.apache.hadoop.hbase.DoNotRetryIOException;
76  import org.apache.hadoop.hbase.HBaseIOException;
77  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
78  import org.apache.hadoop.hbase.HConstants;
79  import org.apache.hadoop.hbase.HRegionInfo;
80  import org.apache.hadoop.hbase.Server;
81  import org.apache.hadoop.hbase.TableName;
82  import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
83  import org.apache.hadoop.hbase.client.Operation;
84  import org.apache.hadoop.hbase.client.VersionInfoUtil;
85  import org.apache.hadoop.hbase.codec.Codec;
86  import org.apache.hadoop.hbase.conf.ConfigurationObserver;
87  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
88  import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89  import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
90  import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
91  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
92  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
94  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
95  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
96  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
97  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
98  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
99  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
100 import org.apache.hadoop.hbase.regionserver.HRegionServer;
101 import org.apache.hadoop.hbase.security.AccessDeniedException;
102 import org.apache.hadoop.hbase.security.AuthMethod;
103 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
104 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
105 import org.apache.hadoop.hbase.security.User;
106 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
107 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
108 import org.apache.hadoop.hbase.security.SaslStatus;
109 import org.apache.hadoop.hbase.security.SaslUtil;
110 import org.apache.hadoop.hbase.security.UserProvider;
111 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
112 import org.apache.hadoop.hbase.util.Bytes;
113 import org.apache.hadoop.hbase.util.Counter;
114 import org.apache.hadoop.hbase.util.Pair;
115 import org.apache.hadoop.io.BytesWritable;
116 import org.apache.hadoop.io.IntWritable;
117 import org.apache.hadoop.io.Writable;
118 import org.apache.hadoop.io.WritableUtils;
119 import org.apache.hadoop.io.compress.CompressionCodec;
120 import org.apache.hadoop.security.UserGroupInformation;
121 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
122 import org.apache.hadoop.security.authorize.AuthorizationException;
123 import org.apache.hadoop.security.authorize.PolicyProvider;
124 import org.apache.hadoop.security.authorize.ProxyUsers;
125 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
126 import org.apache.hadoop.security.token.SecretManager;
127 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
128 import org.apache.hadoop.security.token.TokenIdentifier;
129 import org.apache.hadoop.util.StringUtils;
130 import org.codehaus.jackson.map.ObjectMapper;
131 import org.apache.htrace.TraceInfo;
132 
133 import com.google.common.util.concurrent.ThreadFactoryBuilder;
134 import com.google.protobuf.BlockingService;
135 import com.google.protobuf.CodedInputStream;
136 import com.google.protobuf.Descriptors.MethodDescriptor;
137 import com.google.protobuf.Message;
138 import com.google.protobuf.ServiceException;
139 import com.google.protobuf.TextFormat;
140 
141 /**
142  * An RPC server that hosts protobuf described Services.
143  *
144  * An RpcServer instance has a Listener that hosts the socket.  Listener has fixed number
145  * of Readers in an ExecutorPool, 10 by default.  The Listener does an accept and then
146  * round robin a Reader is chosen to do the read.  The reader is registered on Selector.  Read does
147  * total read off the channel and the parse from which it makes a Call.  The call is wrapped in a
148  * CallRunner and passed to the scheduler to be run.  Reader goes back to see if more to be done
149  * and loops till done.
150  *
151  * <p>Scheduler can be variously implemented but default simple scheduler has handlers to which it
152  * has given the queues into which calls (i.e. CallRunner instances) are inserted.  Handlers run
153  * taking from the queue.  They run the CallRunner#run method on each item gotten from queue
154  * and keep taking while the server is up.
155  *
156  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
157  * queue for Responder to pull from and return result to client.
158  *
159  * @see RpcClientImpl
160  */
161 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
162 @InterfaceStability.Evolving
163 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
164   // LOG is being used in CallRunner and the log level is being changed in tests
165   public static final Log LOG = LogFactory.getLog(RpcServer.class);
166   private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
167       = new CallQueueTooBigException();
168 
169   private final boolean authorize;
170   private boolean isSecurityEnabled;
171 
172   public static final byte CURRENT_VERSION = 0;
173 
174   /**
175    * Whether we allow a fallback to SIMPLE auth for insecure clients when security is enabled.
176    */
177   public static final String FALLBACK_TO_INSECURE_CLIENT_AUTH =
178           "hbase.ipc.server.fallback-to-simple-auth-allowed";
179 
180   /**
181    * How many calls/handler are allowed in the queue.
182    */
183   static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
184 
185   /**
186    * The maximum size that we can hold in the RPC queue
187    */
188   private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
189 
190   private final IPCUtil ipcUtil;
191 
192   private static final String AUTH_FAILED_FOR = "Auth failed for ";
193   private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
194   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
195     Server.class.getName());
196   protected SecretManager<TokenIdentifier> secretManager;
197   protected ServiceAuthorizationManager authManager;
198 
199   /** This is set to Call object before Handler invokes an RPC and ybdie
200    * after the call returns.
201    */
202   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
203 
204   /** Keeps MonitoredRPCHandler per handler thread. */
205   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
206       = new ThreadLocal<MonitoredRPCHandler>();
207 
208   protected final InetSocketAddress bindAddress;
209   protected int port;                             // port we listen on
210   private int readThreads;                        // number of read threads
211   protected int maxIdleTime;                      // the maximum idle time after
212                                                   // which a client may be
213                                                   // disconnected
214   protected int thresholdIdleConnections;         // the number of idle
215                                                   // connections after which we
216                                                   // will start cleaning up idle
217                                                   // connections
218   int maxConnectionsToNuke;                       // the max number of
219                                                   // connections to nuke
220                                                   // during a cleanup
221 
222   protected MetricsHBaseServer metrics;
223 
224   protected final Configuration conf;
225 
226   private int maxQueueSize;
227   protected int socketSendBufferSize;
228   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
229   protected final boolean tcpKeepAlive; // if T then use keepalives
230   protected final long purgeTimeout;    // in milliseconds
231 
232   /**
233    * This flag is used to indicate to sub threads when they should go down.  When we call
234    * {@link #start()}, all threads started will consult this flag on whether they should
235    * keep going.  It is set to false when {@link #stop()} is called.
236    */
237   volatile boolean running = true;
238 
239   /**
240    * This flag is set to true after all threads are up and 'running' and the server is then opened
241    * for business by the call to {@link #start()}.
242    */
243   volatile boolean started = false;
244 
245   /**
246    * This is a running count of the size of all outstanding calls by size.
247    */
248   protected final Counter callQueueSize = new Counter();
249 
250   protected final List<Connection> connectionList =
251     Collections.synchronizedList(new LinkedList<Connection>());
252   //maintain a list
253   //of client connections
254   private Listener listener = null;
255   protected Responder responder = null;
256   protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
257   protected int numConnections = 0;
258 
259   protected HBaseRPCErrorHandler errorHandler = null;
260 
261   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
262   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
263 
264   /** Default value for above params */
265   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
266   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
267 
268   private static final ObjectMapper MAPPER = new ObjectMapper();
269 
270   private final int warnResponseTime;
271   private final int warnResponseSize;
272   private final Server server;
273   private final List<BlockingServiceAndInterface> services;
274 
275   private final RpcScheduler scheduler;
276 
277   private UserProvider userProvider;
278 
279   private final BoundedByteBufferPool reservoir;
280 
281   private volatile boolean allowFallbackToSimpleAuth;
282 
283   /**
284    * Datastructure that holds all necessary to a method invocation and then afterward, carries
285    * the result.
286    */
287   @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
288   @InterfaceStability.Evolving
289   public class Call implements RpcCallContext {
290     protected int id;                             // the client's call id
291     protected BlockingService service;
292     protected MethodDescriptor md;
293     protected RequestHeader header;
294     protected Message param;                      // the parameter passed
295     // Optional cell data passed outside of protobufs.
296     protected CellScanner cellScanner;
297     protected Connection connection;              // connection to client
298     protected long timestamp;      // the time received when response is null
299                                    // the time served when response is not null
300     /**
301      * Chain of buffers to send as response.
302      */
303     protected BufferChain response;
304     protected Responder responder;
305 
306     protected long size;                          // size of current call
307     protected boolean isError;
308     protected TraceInfo tinfo;
309     private ByteBuffer cellBlock = null;
310 
311     private User user;
312     private InetAddress remoteAddress;
313 
314     private long responseCellSize = 0;
315     private long responseBlockSize = 0;
316     private boolean retryImmediatelySupported;
317 
318     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
319         justification="Can't figure why this complaint is happening... see below")
320     Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
321          Message param, CellScanner cellScanner, Connection connection, Responder responder,
322          long size, TraceInfo tinfo, final InetAddress remoteAddress) {
323       this.id = id;
324       this.service = service;
325       this.md = md;
326       this.header = header;
327       this.param = param;
328       this.cellScanner = cellScanner;
329       this.connection = connection;
330       this.timestamp = System.currentTimeMillis();
331       this.response = null;
332       this.responder = responder;
333       this.isError = false;
334       this.size = size;
335       this.tinfo = tinfo;
336       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
337       this.remoteAddress = remoteAddress;
338       this.retryImmediatelySupported =
339           connection == null? null: connection.retryImmediatelySupported;
340     }
341 
342     /**
343      * Call is done. Execution happened and we returned results to client. It is now safe to
344      * cleanup.
345      */
346     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
347         justification="Presume the lock on processing request held by caller is protection enough")
348     void done() {
349       if (this.cellBlock != null && reservoir != null) {
350         // Return buffer to reservoir now we are done with it.
351         reservoir.putBuffer(this.cellBlock);
352         this.cellBlock = null;
353       }
354       this.connection.decRpcCount();  // Say that we're done with this call.
355     }
356 
357     @Override
358     public String toString() {
359       return toShortString() + " param: " +
360         (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
361         " connection: " + connection.toString();
362     }
363 
364     protected RequestHeader getHeader() {
365       return this.header;
366     }
367 
368     public boolean hasPriority() {
369       return this.header.hasPriority();
370     }
371 
372     public int getPriority() {
373       return this.header.getPriority();
374     }
375 
376     /*
377      * Short string representation without param info because param itself could be huge depends on
378      * the payload of a command
379      */
380     String toShortString() {
381       String serviceName = this.connection.service != null ?
382           this.connection.service.getDescriptorForType().getName() : "null";
383       return "callId: " + this.id + " service: " + serviceName +
384           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
385           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
386           " connection: " + connection.toString();
387     }
388 
389     String toTraceString() {
390       String serviceName = this.connection.service != null ?
391                            this.connection.service.getDescriptorForType().getName() : "";
392       String methodName = (this.md != null) ? this.md.getName() : "";
393       return serviceName + "." + methodName;
394     }
395 
396     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
397       this.response = new BufferChain(response);
398     }
399 
400     protected synchronized void setResponse(Object m, final CellScanner cells,
401         Throwable t, String errorMsg) {
402       if (this.isError) return;
403       if (t != null) this.isError = true;
404       BufferChain bc = null;
405       try {
406         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
407         // Presume it a pb Message.  Could be null.
408         Message result = (Message)m;
409         // Call id.
410         headerBuilder.setCallId(this.id);
411         if (t != null) {
412           ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
413           exceptionBuilder.setExceptionClassName(t.getClass().getName());
414           exceptionBuilder.setStackTrace(errorMsg);
415           exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException ||
416             t instanceof NeedUnmanagedConnectionException);
417           if (t instanceof RegionMovedException) {
418             // Special casing for this exception.  This is only one carrying a payload.
419             // Do this instead of build a generic system for allowing exceptions carry
420             // any kind of payload.
421             RegionMovedException rme = (RegionMovedException)t;
422             exceptionBuilder.setHostname(rme.getHostname());
423             exceptionBuilder.setPort(rme.getPort());
424           }
425           // Set the exception as the result of the method invocation.
426           headerBuilder.setException(exceptionBuilder.build());
427         }
428         // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
429         // reservoir when finished. This is hacky and the hack is not contained but benefits are
430         // high when we can avoid a big buffer allocation on each rpc.
431         this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
432           this.connection.compressionCodec, cells, reservoir);
433         if (this.cellBlock != null) {
434           CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
435           // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
436           cellBlockBuilder.setLength(this.cellBlock.limit());
437           headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
438         }
439         Message header = headerBuilder.build();
440 
441         // Organize the response as a set of bytebuffers rather than collect it all together inside
442         // one big byte array; save on allocations.
443         ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
444         ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
445         int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
446           (this.cellBlock == null? 0: this.cellBlock.limit());
447         ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
448         bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
449         if (connection.useWrap) {
450           bc = wrapWithSasl(bc);
451         }
452       } catch (IOException e) {
453         LOG.warn("Exception while creating response " + e);
454       }
455       this.response = bc;
456     }
457 
458     private BufferChain wrapWithSasl(BufferChain bc)
459         throws IOException {
460       if (!this.connection.useSasl) return bc;
461       // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
462       // THIS IS A BIG UGLY COPY.
463       byte [] responseBytes = bc.getBytes();
464       byte [] token;
465       // synchronization may be needed since there can be multiple Handler
466       // threads using saslServer to wrap responses.
467       synchronized (connection.saslServer) {
468         token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
469       }
470       if (LOG.isTraceEnabled()) {
471         LOG.trace("Adding saslServer wrapped token of size " + token.length
472             + " as call response.");
473       }
474 
475       ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
476       ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
477       return new BufferChain(bbTokenLength, bbTokenBytes);
478     }
479 
480     @Override
481     public boolean isClientCellBlockSupported() {
482       return this.connection != null && this.connection.codec != null;
483     }
484 
485     @Override
486     public long disconnectSince() {
487       if (!connection.channel.isOpen()) {
488         return System.currentTimeMillis() - timestamp;
489       } else {
490         return -1L;
491       }
492     }
493 
494     public long getSize() {
495       return this.size;
496     }
497 
498     public long getResponseCellSize() {
499       return responseCellSize;
500     }
501 
502     public void incrementResponseCellSize(long cellSize) {
503       responseCellSize += cellSize;
504     }
505 
506     @Override
507     public long getResponseBlockSize() {
508       return responseBlockSize;
509     }
510 
511     @Override
512     public void incrementResponseBlockSize(long blockSize) {
513       responseBlockSize += blockSize;
514     }
515 
516     public synchronized void sendResponseIfReady() throws IOException {
517       this.responder.doRespond(this);
518     }
519 
520     public UserGroupInformation getRemoteUser() {
521       return connection.ugi;
522     }
523 
524     @Override
525     public User getRequestUser() {
526       return user;
527     }
528 
529     @Override
530     public String getRequestUserName() {
531       User user = getRequestUser();
532       return user == null? null: user.getShortName();
533     }
534 
535     @Override
536     public InetAddress getRemoteAddress() {
537       return remoteAddress;
538     }
539 
540     @Override
541     public VersionInfo getClientVersionInfo() {
542       return connection.getVersionInfo();
543     }
544 
545     @Override
546     public boolean isRetryImmediatelySupported() {
547       return retryImmediatelySupported;
548     }
549   }
550 
551   /** Listens on the socket. Creates jobs for the handler threads*/
552   private class Listener extends Thread {
553 
554     private ServerSocketChannel acceptChannel = null; //the accept channel
555     private Selector selector = null; //the selector that we use for the server
556     private Reader[] readers = null;
557     private int currentReader = 0;
558     private Random rand = new Random();
559     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
560                                          //-tion (for idle connections) ran
561     private long cleanupInterval = 10000; //the minimum interval between
562                                           //two cleanup runs
563     private int backlogLength;
564 
565     private ExecutorService readPool;
566 
567     public Listener(final String name) throws IOException {
568       super(name);
569       backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
570       // Create a new server socket and set to non blocking mode
571       acceptChannel = ServerSocketChannel.open();
572       acceptChannel.configureBlocking(false);
573 
574       // Bind the server socket to the binding addrees (can be different from the default interface)
575       bind(acceptChannel.socket(), bindAddress, backlogLength);
576       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
577       // create a selector;
578       selector= Selector.open();
579 
580       readers = new Reader[readThreads];
581       readPool = Executors.newFixedThreadPool(readThreads,
582         new ThreadFactoryBuilder().setNameFormat(
583           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
584           ",port=" + port).setDaemon(true).build());
585       for (int i = 0; i < readThreads; ++i) {
586         Reader reader = new Reader();
587         readers[i] = reader;
588         readPool.execute(reader);
589       }
590       LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);
591 
592       // Register accepts on the server socket with the selector.
593       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
594       this.setName("RpcServer.listener,port=" + port);
595       this.setDaemon(true);
596     }
597 
598 
599     private class Reader implements Runnable {
600       private volatile boolean adding = false;
601       private final Selector readSelector;
602 
603       Reader() throws IOException {
604         this.readSelector = Selector.open();
605       }
606       @Override
607       public void run() {
608         try {
609           doRunLoop();
610         } finally {
611           try {
612             readSelector.close();
613           } catch (IOException ioe) {
614             LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
615           }
616         }
617       }
618 
619       private synchronized void doRunLoop() {
620         while (running) {
621           try {
622             readSelector.select();
623             while (adding) {
624               this.wait(1000);
625             }
626 
627             Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
628             while (iter.hasNext()) {
629               SelectionKey key = iter.next();
630               iter.remove();
631               if (key.isValid()) {
632                 if (key.isReadable()) {
633                   doRead(key);
634                 }
635               }
636             }
637           } catch (InterruptedException e) {
638             LOG.debug("Interrupted while sleeping");
639             return;
640           } catch (IOException ex) {
641             LOG.info(getName() + ": IOException in Reader", ex);
642           }
643         }
644       }
645 
646       /**
647        * This gets reader into the state that waits for the new channel
648        * to be registered with readSelector. If it was waiting in select()
649        * the thread will be woken up, otherwise whenever select() is called
650        * it will return even if there is nothing to read and wait
651        * in while(adding) for finishAdd call
652        */
653       public void startAdd() {
654         adding = true;
655         readSelector.wakeup();
656       }
657 
658       public synchronized SelectionKey registerChannel(SocketChannel channel)
659         throws IOException {
660         return channel.register(readSelector, SelectionKey.OP_READ);
661       }
662 
663       public synchronized void finishAdd() {
664         adding = false;
665         this.notify();
666       }
667     }
668 
669     /** cleanup connections from connectionList. Choose a random range
670      * to scan and also have a limit on the number of the connections
671      * that will be cleanedup per run. The criteria for cleanup is the time
672      * for which the connection was idle. If 'force' is true then all
673      * connections will be looked at for the cleanup.
674      * @param force all connections will be looked at for cleanup
675      */
676     private void cleanupConnections(boolean force) {
677       if (force || numConnections > thresholdIdleConnections) {
678         long currentTime = System.currentTimeMillis();
679         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
680           return;
681         }
682         int start = 0;
683         int end = numConnections - 1;
684         if (!force) {
685           start = rand.nextInt() % numConnections;
686           end = rand.nextInt() % numConnections;
687           int temp;
688           if (end < start) {
689             temp = start;
690             start = end;
691             end = temp;
692           }
693         }
694         int i = start;
695         int numNuked = 0;
696         while (i <= end) {
697           Connection c;
698           synchronized (connectionList) {
699             try {
700               c = connectionList.get(i);
701             } catch (Exception e) {return;}
702           }
703           if (c.timedOut(currentTime)) {
704             if (LOG.isDebugEnabled())
705               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
706             closeConnection(c);
707             numNuked++;
708             end--;
709             //noinspection UnusedAssignment
710             c = null;
711             if (!force && numNuked == maxConnectionsToNuke) break;
712           }
713           else i++;
714         }
715         lastCleanupRunTime = System.currentTimeMillis();
716       }
717     }
718 
719     @Override
720     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
721       justification="selector access is not synchronized; seems fine but concerned changing " +
722         "it will have per impact")
723     public void run() {
724       LOG.info(getName() + ": starting");
725       while (running) {
726         SelectionKey key = null;
727         try {
728           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
729           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
730           while (iter.hasNext()) {
731             key = iter.next();
732             iter.remove();
733             try {
734               if (key.isValid()) {
735                 if (key.isAcceptable())
736                   doAccept(key);
737               }
738             } catch (IOException ignored) {
739               if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
740             }
741             key = null;
742           }
743         } catch (OutOfMemoryError e) {
744           if (errorHandler != null) {
745             if (errorHandler.checkOOME(e)) {
746               LOG.info(getName() + ": exiting on OutOfMemoryError");
747               closeCurrentConnection(key, e);
748               cleanupConnections(true);
749               return;
750             }
751           } else {
752             // we can run out of memory if we have too many threads
753             // log the event and sleep for a minute and give
754             // some thread(s) a chance to finish
755             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
756             closeCurrentConnection(key, e);
757             cleanupConnections(true);
758             try {
759               Thread.sleep(60000);
760             } catch (InterruptedException ex) {
761               LOG.debug("Interrupted while sleeping");
762               return;
763             }
764           }
765         } catch (Exception e) {
766           closeCurrentConnection(key, e);
767         }
768         cleanupConnections(false);
769       }
770 
771       LOG.info(getName() + ": stopping");
772 
773       synchronized (this) {
774         try {
775           acceptChannel.close();
776           selector.close();
777         } catch (IOException ignored) {
778           if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
779         }
780 
781         selector= null;
782         acceptChannel= null;
783 
784         // clean up all connections
785         while (!connectionList.isEmpty()) {
786           closeConnection(connectionList.remove(0));
787         }
788       }
789     }
790 
791     private void closeCurrentConnection(SelectionKey key, Throwable e) {
792       if (key != null) {
793         Connection c = (Connection)key.attachment();
794         if (c != null) {
795           if (LOG.isDebugEnabled()) {
796             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
797                 (e != null ? " on error " + e.getMessage() : ""));
798           }
799           closeConnection(c);
800           key.attach(null);
801         }
802       }
803     }
804 
805     InetSocketAddress getAddress() {
806       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
807     }
808 
809     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
810       Connection c;
811       ServerSocketChannel server = (ServerSocketChannel) key.channel();
812 
813       SocketChannel channel;
814       while ((channel = server.accept()) != null) {
815         try {
816           channel.configureBlocking(false);
817           channel.socket().setTcpNoDelay(tcpNoDelay);
818           channel.socket().setKeepAlive(tcpKeepAlive);
819         } catch (IOException ioe) {
820           channel.close();
821           throw ioe;
822         }
823 
824         Reader reader = getReader();
825         try {
826           reader.startAdd();
827           SelectionKey readKey = reader.registerChannel(channel);
828           c = getConnection(channel, System.currentTimeMillis());
829           readKey.attach(c);
830           synchronized (connectionList) {
831             connectionList.add(numConnections, c);
832             numConnections++;
833           }
834           if (LOG.isDebugEnabled())
835             LOG.debug(getName() + ": connection from " + c.toString() +
836                 "; # active connections: " + numConnections);
837         } finally {
838           reader.finishAdd();
839         }
840       }
841     }
842 
843     void doRead(SelectionKey key) throws InterruptedException {
844       int count;
845       Connection c = (Connection) key.attachment();
846       if (c == null) {
847         return;
848       }
849       c.setLastContact(System.currentTimeMillis());
850       try {
851         count = c.readAndProcess();
852 
853         if (count > 0) {
854           c.setLastContact(System.currentTimeMillis());
855         }
856 
857       } catch (InterruptedException ieo) {
858         throw ieo;
859       } catch (Exception e) {
860         if (LOG.isDebugEnabled()) {
861           LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
862         }
863         count = -1; //so that the (count < 0) block is executed
864       }
865       if (count < 0) {
866         if (LOG.isDebugEnabled()) {
867           LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
868               " because read count=" + count +
869               ". Number of active connections: " + numConnections);
870         }
871         closeConnection(c);
872       }
873     }
874 
875     synchronized void doStop() {
876       if (selector != null) {
877         selector.wakeup();
878         Thread.yield();
879       }
880       if (acceptChannel != null) {
881         try {
882           acceptChannel.socket().close();
883         } catch (IOException e) {
884           LOG.info(getName() + ": exception in closing listener socket. " + e);
885         }
886       }
887       readPool.shutdownNow();
888     }
889 
890     // The method that will return the next reader to work with
891     // Simplistic implementation of round robin for now
892     Reader getReader() {
893       currentReader = (currentReader + 1) % readers.length;
894       return readers[currentReader];
895     }
896   }
897 
898   // Sends responses of RPC back to clients.
899   protected class Responder extends Thread {
900     private final Selector writeSelector;
901     private final Set<Connection> writingCons =
902         Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
903 
904     Responder() throws IOException {
905       this.setName("RpcServer.responder");
906       this.setDaemon(true);
907       writeSelector = Selector.open(); // create a selector
908     }
909 
910     @Override
911     public void run() {
912       LOG.info(getName() + ": starting");
913       try {
914         doRunLoop();
915       } finally {
916         LOG.info(getName() + ": stopping");
917         try {
918           writeSelector.close();
919         } catch (IOException ioe) {
920           LOG.error(getName() + ": couldn't close write selector", ioe);
921         }
922       }
923     }
924 
925     /**
926      * Take the list of the connections that want to write, and register them
927      * in the selector.
928      */
929     private void registerWrites() {
930       Iterator<Connection> it = writingCons.iterator();
931       while (it.hasNext()) {
932         Connection c = it.next();
933         it.remove();
934         SelectionKey sk = c.channel.keyFor(writeSelector);
935         try {
936           if (sk == null) {
937             try {
938               c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
939             } catch (ClosedChannelException e) {
940               // ignore: the client went away.
941               if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
942             }
943           } else {
944             sk.interestOps(SelectionKey.OP_WRITE);
945           }
946         } catch (CancelledKeyException e) {
947           // ignore: the client went away.
948           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
949         }
950       }
951     }
952 
953     /**
954      * Add a connection to the list that want to write,
955      */
956     public void registerForWrite(Connection c) {
957       if (writingCons.add(c)) {
958         writeSelector.wakeup();
959       }
960     }
961 
962     private void doRunLoop() {
963       long lastPurgeTime = 0;   // last check for old calls.
964       while (running) {
965         try {
966           registerWrites();
967           int keyCt = writeSelector.select(purgeTimeout);
968           if (keyCt == 0) {
969             continue;
970           }
971 
972           Set<SelectionKey> keys = writeSelector.selectedKeys();
973           Iterator<SelectionKey> iter = keys.iterator();
974           while (iter.hasNext()) {
975             SelectionKey key = iter.next();
976             iter.remove();
977             try {
978               if (key.isValid() && key.isWritable()) {
979                 doAsyncWrite(key);
980               }
981             } catch (IOException e) {
982               LOG.debug(getName() + ": asyncWrite", e);
983             }
984           }
985 
986           lastPurgeTime = purge(lastPurgeTime);
987 
988         } catch (OutOfMemoryError e) {
989           if (errorHandler != null) {
990             if (errorHandler.checkOOME(e)) {
991               LOG.info(getName() + ": exiting on OutOfMemoryError");
992               return;
993             }
994           } else {
995             //
996             // we can run out of memory if we have too many threads
997             // log the event and sleep for a minute and give
998             // some thread(s) a chance to finish
999             //
1000             LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1001             try {
1002               Thread.sleep(60000);
1003             } catch (InterruptedException ex) {
1004               LOG.debug("Interrupted while sleeping");
1005               return;
1006             }
1007           }
1008         } catch (Exception e) {
1009           LOG.warn(getName() + ": exception in Responder " +
1010               StringUtils.stringifyException(e), e);
1011         }
1012       }
1013       LOG.info(getName() + ": stopped");
1014     }
1015 
1016     /**
1017      * If there were some calls that have not been sent out for a
1018      * long time, we close the connection.
1019      * @return the time of the purge.
1020      */
1021     private long purge(long lastPurgeTime) {
1022       long now = System.currentTimeMillis();
1023       if (now < lastPurgeTime + purgeTimeout) {
1024         return lastPurgeTime;
1025       }
1026 
1027       ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1028       // get the list of channels from list of keys.
1029       synchronized (writeSelector.keys()) {
1030         for (SelectionKey key : writeSelector.keys()) {
1031           Connection connection = (Connection) key.attachment();
1032           if (connection == null) {
1033             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1034           }
1035           Call call = connection.responseQueue.peekFirst();
1036           if (call != null && now > call.timestamp + purgeTimeout) {
1037             conWithOldCalls.add(call.connection);
1038           }
1039         }
1040       }
1041 
1042       // Seems safer to close the connection outside of the synchronized loop...
1043       for (Connection connection : conWithOldCalls) {
1044         closeConnection(connection);
1045       }
1046 
1047       return now;
1048     }
1049 
1050     private void doAsyncWrite(SelectionKey key) throws IOException {
1051       Connection connection = (Connection) key.attachment();
1052       if (connection == null) {
1053         throw new IOException("doAsyncWrite: no connection");
1054       }
1055       if (key.channel() != connection.channel) {
1056         throw new IOException("doAsyncWrite: bad channel");
1057       }
1058 
1059       if (processAllResponses(connection)) {
1060         try {
1061           // We wrote everything, so we don't need to be told when the socket is ready for
1062           //  write anymore.
1063          key.interestOps(0);
1064         } catch (CancelledKeyException e) {
1065           /* The Listener/reader might have closed the socket.
1066            * We don't explicitly cancel the key, so not sure if this will
1067            * ever fire.
1068            * This warning could be removed.
1069            */
1070           LOG.warn("Exception while changing ops : " + e);
1071         }
1072       }
1073     }
1074 
1075     /**
1076      * Process the response for this call. You need to have the lock on
1077      * {@link org.apache.hadoop.hbase.ipc.RpcServer.Connection#responseWriteLock}
1078      *
1079      * @param call the call
1080      * @return true if we proceed the call fully, false otherwise.
1081      * @throws IOException
1082      */
1083     private boolean processResponse(final Call call) throws IOException {
1084       boolean error = true;
1085       try {
1086         // Send as much data as we can in the non-blocking fashion
1087         long numBytes = channelWrite(call.connection.channel, call.response);
1088         if (numBytes < 0) {
1089           throw new HBaseIOException("Error writing on the socket " +
1090             "for the call:" + call.toShortString());
1091         }
1092         error = false;
1093       } finally {
1094         if (error) {
1095           LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1096           closeConnection(call.connection);
1097         }
1098       }
1099 
1100       if (!call.response.hasRemaining()) {
1101         call.done();
1102         return true;
1103       } else {
1104         return false; // Socket can't take more, we will have to come back.
1105       }
1106     }
1107 
1108     /**
1109      * Process all the responses for this connection
1110      *
1111      * @return true if all the calls were processed or that someone else is doing it.
1112      * false if there * is still some work to do. In this case, we expect the caller to
1113      * delay us.
1114      * @throws IOException
1115      */
1116     private boolean processAllResponses(final Connection connection) throws IOException {
1117       // We want only one writer on the channel for a connection at a time.
1118       connection.responseWriteLock.lock();
1119       try {
1120         for (int i = 0; i < 20; i++) {
1121           // protection if some handlers manage to need all the responder
1122           Call call = connection.responseQueue.pollFirst();
1123           if (call == null) {
1124             return true;
1125           }
1126           if (!processResponse(call)) {
1127             connection.responseQueue.addFirst(call);
1128             return false;
1129           }
1130         }
1131       } finally {
1132         connection.responseWriteLock.unlock();
1133       }
1134 
1135       return connection.responseQueue.isEmpty();
1136     }
1137 
1138     //
1139     // Enqueue a response from the application.
1140     //
1141     void doRespond(Call call) throws IOException {
1142       boolean added = false;
1143 
1144       // If there is already a write in progress, we don't wait. This allows to free the handlers
1145       //  immediately for other tasks.
1146       if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1147         try {
1148           if (call.connection.responseQueue.isEmpty()) {
1149             // If we're alone, we can try to do a direct call to the socket. It's
1150             //  an optimisation to save on context switches and data transfer between cores..
1151             if (processResponse(call)) {
1152               return; // we're done.
1153             }
1154             // Too big to fit, putting ahead.
1155             call.connection.responseQueue.addFirst(call);
1156             added = true; // We will register to the selector later, outside of the lock.
1157           }
1158         } finally {
1159           call.connection.responseWriteLock.unlock();
1160         }
1161       }
1162 
1163       if (!added) {
1164         call.connection.responseQueue.addLast(call);
1165       }
1166       call.responder.registerForWrite(call.connection);
1167 
1168       // set the serve time when the response has to be sent later
1169       call.timestamp = System.currentTimeMillis();
1170     }
1171   }
1172 
1173   /** Reads calls from a connection and queues them for handling. */
1174   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1175       value="VO_VOLATILE_INCREMENT",
1176       justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1177   public class Connection {
1178     // If initial preamble with version and magic has been read or not.
1179     private boolean connectionPreambleRead = false;
1180     // If the connection header has been read or not.
1181     private boolean connectionHeaderRead = false;
1182     protected SocketChannel channel;
1183     private ByteBuffer data;
1184     private ByteBuffer dataLengthBuffer;
1185     protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1186     private final Lock responseWriteLock = new ReentrantLock();
1187     private Counter rpcCount = new Counter(); // number of outstanding rpcs
1188     private long lastContact;
1189     private InetAddress addr;
1190     protected Socket socket;
1191     // Cache the remote host & port info so that even if the socket is
1192     // disconnected, we can say where it used to connect to.
1193     protected String hostAddress;
1194     protected int remotePort;
1195     ConnectionHeader connectionHeader;
1196     /**
1197      * Codec the client asked use.
1198      */
1199     private Codec codec;
1200     /**
1201      * Compression codec the client asked us use.
1202      */
1203     private CompressionCodec compressionCodec;
1204     BlockingService service;
1205 
1206     private AuthMethod authMethod;
1207     private boolean saslContextEstablished;
1208     private boolean skipInitialSaslHandshake;
1209     private ByteBuffer unwrappedData;
1210     // When is this set?  FindBugs wants to know!  Says NP
1211     private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1212     boolean useSasl;
1213     SaslServer saslServer;
1214     private boolean useWrap = false;
1215     // Fake 'call' for failed authorization response
1216     private static final int AUTHORIZATION_FAILED_CALLID = -1;
1217     private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
1218         null, null, this, null, 0, null, null);
1219     private ByteArrayOutputStream authFailedResponse =
1220         new ByteArrayOutputStream();
1221     // Fake 'call' for SASL context setup
1222     private static final int SASL_CALLID = -33;
1223     private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
1224         0, null, null);
1225 
1226     // was authentication allowed with a fallback to simple auth
1227     private boolean authenticatedWithFallback;
1228 
1229     private boolean retryImmediatelySupported = false;
1230 
1231     public UserGroupInformation attemptingUser = null; // user name before auth
1232     protected User user = null;
1233     protected UserGroupInformation ugi = null;
1234 
1235     public Connection(SocketChannel channel, long lastContact) {
1236       this.channel = channel;
1237       this.lastContact = lastContact;
1238       this.data = null;
1239       this.dataLengthBuffer = ByteBuffer.allocate(4);
1240       this.socket = channel.socket();
1241       this.addr = socket.getInetAddress();
1242       if (addr == null) {
1243         this.hostAddress = "*Unknown*";
1244       } else {
1245         this.hostAddress = addr.getHostAddress();
1246       }
1247       this.remotePort = socket.getPort();
1248       if (socketSendBufferSize != 0) {
1249         try {
1250           socket.setSendBufferSize(socketSendBufferSize);
1251         } catch (IOException e) {
1252           LOG.warn("Connection: unable to set socket send buffer size to " +
1253                    socketSendBufferSize);
1254         }
1255       }
1256     }
1257 
1258       @Override
1259     public String toString() {
1260       return getHostAddress() + ":" + remotePort;
1261     }
1262 
1263     public String getHostAddress() {
1264       return hostAddress;
1265     }
1266 
1267     public InetAddress getHostInetAddress() {
1268       return addr;
1269     }
1270 
1271     public int getRemotePort() {
1272       return remotePort;
1273     }
1274 
1275     public void setLastContact(long lastContact) {
1276       this.lastContact = lastContact;
1277     }
1278 
1279     public VersionInfo getVersionInfo() {
1280       if (connectionHeader.hasVersionInfo()) {
1281         return connectionHeader.getVersionInfo();
1282       }
1283       return null;
1284     }
1285 
1286     /* Return true if the connection has no outstanding rpc */
1287     private boolean isIdle() {
1288       return rpcCount.get() == 0;
1289     }
1290 
1291     /* Decrement the outstanding RPC count */
1292     protected void decRpcCount() {
1293       rpcCount.decrement();
1294     }
1295 
1296     /* Increment the outstanding RPC count */
1297     protected void incRpcCount() {
1298       rpcCount.increment();
1299     }
1300 
1301     protected boolean timedOut(long currentTime) {
1302       return isIdle() && currentTime - lastContact > maxIdleTime;
1303     }
1304 
1305     private UserGroupInformation getAuthorizedUgi(String authorizedId)
1306         throws IOException {
1307       UserGroupInformation authorizedUgi;
1308       if (authMethod == AuthMethod.DIGEST) {
1309         TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1310             secretManager);
1311         authorizedUgi = tokenId.getUser();
1312         if (authorizedUgi == null) {
1313           throw new AccessDeniedException(
1314               "Can't retrieve username from tokenIdentifier.");
1315         }
1316         authorizedUgi.addTokenIdentifier(tokenId);
1317       } else {
1318         authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
1319       }
1320       authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
1321       return authorizedUgi;
1322     }
1323 
1324     private void saslReadAndProcess(byte[] saslToken) throws IOException,
1325         InterruptedException {
1326       if (saslContextEstablished) {
1327         if (LOG.isTraceEnabled())
1328           LOG.trace("Have read input token of size " + saslToken.length
1329               + " for processing by saslServer.unwrap()");
1330 
1331         if (!useWrap) {
1332           processOneRpc(saslToken);
1333         } else {
1334           byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
1335           processUnwrappedData(plaintextData);
1336         }
1337       } else {
1338         byte[] replyToken;
1339         try {
1340           if (saslServer == null) {
1341             switch (authMethod) {
1342             case DIGEST:
1343               if (secretManager == null) {
1344                 throw new AccessDeniedException(
1345                     "Server is not configured to do DIGEST authentication.");
1346               }
1347               saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1348                   .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1349                   SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1350                       secretManager, this));
1351               break;
1352             default:
1353               UserGroupInformation current = UserGroupInformation.getCurrentUser();
1354               String fullName = current.getUserName();
1355               if (LOG.isDebugEnabled()) {
1356                 LOG.debug("Kerberos principal name is " + fullName);
1357               }
1358               final String names[] = SaslUtil.splitKerberosName(fullName);
1359               if (names.length != 3) {
1360                 throw new AccessDeniedException(
1361                     "Kerberos principal name does NOT have the expected "
1362                         + "hostname part: " + fullName);
1363               }
1364               current.doAs(new PrivilegedExceptionAction<Object>() {
1365                 @Override
1366                 public Object run() throws SaslException {
1367                   saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1368                       .getMechanismName(), names[0], names[1],
1369                       SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1370                   return null;
1371                 }
1372               });
1373             }
1374             if (saslServer == null)
1375               throw new AccessDeniedException(
1376                   "Unable to find SASL server implementation for "
1377                       + authMethod.getMechanismName());
1378             if (LOG.isDebugEnabled()) {
1379               LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1380             }
1381           }
1382           if (LOG.isDebugEnabled()) {
1383             LOG.debug("Have read input token of size " + saslToken.length
1384                 + " for processing by saslServer.evaluateResponse()");
1385           }
1386           replyToken = saslServer.evaluateResponse(saslToken);
1387         } catch (IOException e) {
1388           IOException sendToClient = e;
1389           Throwable cause = e;
1390           while (cause != null) {
1391             if (cause instanceof InvalidToken) {
1392               sendToClient = (InvalidToken) cause;
1393               break;
1394             }
1395             cause = cause.getCause();
1396           }
1397           doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1398             sendToClient.getLocalizedMessage());
1399           metrics.authenticationFailure();
1400           String clientIP = this.toString();
1401           // attempting user could be null
1402           AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1403           throw e;
1404         }
1405         if (replyToken != null) {
1406           if (LOG.isDebugEnabled()) {
1407             LOG.debug("Will send token of size " + replyToken.length
1408                 + " from saslServer.");
1409           }
1410           doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1411               null);
1412         }
1413         if (saslServer.isComplete()) {
1414           String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1415           useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1416           ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
1417           if (LOG.isDebugEnabled()) {
1418             LOG.debug("SASL server context established. Authenticated client: "
1419               + ugi + ". Negotiated QoP is "
1420               + saslServer.getNegotiatedProperty(Sasl.QOP));
1421           }
1422           metrics.authenticationSuccess();
1423           AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1424           saslContextEstablished = true;
1425         }
1426       }
1427     }
1428 
1429     /**
1430      * No protobuf encoding of raw sasl messages
1431      */
1432     private void doRawSaslReply(SaslStatus status, Writable rv,
1433         String errorClass, String error) throws IOException {
1434       ByteBufferOutputStream saslResponse = null;
1435       DataOutputStream out = null;
1436       try {
1437         // In my testing, have noticed that sasl messages are usually
1438         // in the ballpark of 100-200. That's why the initial capacity is 256.
1439         saslResponse = new ByteBufferOutputStream(256);
1440         out = new DataOutputStream(saslResponse);
1441         out.writeInt(status.state); // write status
1442         if (status == SaslStatus.SUCCESS) {
1443           rv.write(out);
1444         } else {
1445           WritableUtils.writeString(out, errorClass);
1446           WritableUtils.writeString(out, error);
1447         }
1448         saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1449         saslCall.responder = responder;
1450         saslCall.sendResponseIfReady();
1451       } finally {
1452         if (saslResponse != null) {
1453           saslResponse.close();
1454         }
1455         if (out != null) {
1456           out.close();
1457         }
1458       }
1459     }
1460 
1461     private void disposeSasl() {
1462       if (saslServer != null) {
1463         try {
1464           saslServer.dispose();
1465           saslServer = null;
1466         } catch (SaslException ignored) {
1467           // Ignored. This is being disposed of anyway.
1468         }
1469       }
1470     }
1471 
1472     private int readPreamble() throws IOException {
1473       int count;
1474       // Check for 'HBas' magic.
1475       this.dataLengthBuffer.flip();
1476       if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1477         return doBadPreambleHandling("Expected HEADER=" +
1478             Bytes.toStringBinary(HConstants.RPC_HEADER) +
1479             " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1480             " from " + toString());
1481       }
1482       // Now read the next two bytes, the version and the auth to use.
1483       ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1484       count = channelRead(channel, versionAndAuthBytes);
1485       if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1486         return count;
1487       }
1488       int version = versionAndAuthBytes.get(0);
1489       byte authbyte = versionAndAuthBytes.get(1);
1490       this.authMethod = AuthMethod.valueOf(authbyte);
1491       if (version != CURRENT_VERSION) {
1492         String msg = getFatalConnectionString(version, authbyte);
1493         return doBadPreambleHandling(msg, new WrongVersionException(msg));
1494       }
1495       if (authMethod == null) {
1496         String msg = getFatalConnectionString(version, authbyte);
1497         return doBadPreambleHandling(msg, new BadAuthException(msg));
1498       }
1499       if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1500         if (allowFallbackToSimpleAuth) {
1501           metrics.authenticationFallback();
1502           authenticatedWithFallback = true;
1503         } else {
1504           AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1505           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1506           responder.doRespond(authFailedCall);
1507           throw ae;
1508         }
1509       }
1510       if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1511         doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1512             SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1513         authMethod = AuthMethod.SIMPLE;
1514         // client has already sent the initial Sasl message and we
1515         // should ignore it. Both client and server should fall back
1516         // to simple auth from now on.
1517         skipInitialSaslHandshake = true;
1518       }
1519       if (authMethod != AuthMethod.SIMPLE) {
1520         useSasl = true;
1521       }
1522 
1523       dataLengthBuffer.clear();
1524       connectionPreambleRead = true;
1525       return count;
1526     }
1527 
1528     private int read4Bytes() throws IOException {
1529       if (this.dataLengthBuffer.remaining() > 0) {
1530         return channelRead(channel, this.dataLengthBuffer);
1531       } else {
1532         return 0;
1533       }
1534     }
1535 
1536 
1537     /**
1538      * Read off the wire. If there is not enough data to read, update the connection state with
1539      *  what we have and returns.
1540      * @return Returns -1 if failure (and caller will close connection), else zero or more.
1541      * @throws IOException
1542      * @throws InterruptedException
1543      */
1544     public int readAndProcess() throws IOException, InterruptedException {
1545       // Try and read in an int.  If new connection, the int will hold the 'HBas' HEADER.  If it
1546       // does, read in the rest of the connection preamble, the version and the auth method.
1547       // Else it will be length of the data to read (or -1 if a ping).  We catch the integer
1548       // length into the 4-byte this.dataLengthBuffer.
1549       int count = read4Bytes();
1550       if (count < 0 || dataLengthBuffer.remaining() > 0) {
1551         return count;
1552       }
1553 
1554       // If we have not read the connection setup preamble, look to see if that is on the wire.
1555       if (!connectionPreambleRead) {
1556         count = readPreamble();
1557         if (!connectionPreambleRead) {
1558           return count;
1559         }
1560 
1561         count = read4Bytes();
1562         if (count < 0 || dataLengthBuffer.remaining() > 0) {
1563           return count;
1564         }
1565       }
1566 
1567       // We have read a length and we have read the preamble.  It is either the connection header
1568       // or it is a request.
1569       if (data == null) {
1570         dataLengthBuffer.flip();
1571         int dataLength = dataLengthBuffer.getInt();
1572         if (dataLength == RpcClient.PING_CALL_ID) {
1573           if (!useWrap) { //covers the !useSasl too
1574             dataLengthBuffer.clear();
1575             return 0;  //ping message
1576           }
1577         }
1578         if (dataLength < 0) { // A data length of zero is legal.
1579           throw new IllegalArgumentException("Unexpected data length "
1580               + dataLength + "!! from " + getHostAddress());
1581         }
1582         data = ByteBuffer.allocate(dataLength);
1583 
1584         // Increment the rpc count. This counter will be decreased when we write
1585         //  the response.  If we want the connection to be detected as idle properly, we
1586         //  need to keep the inc / dec correct.
1587         incRpcCount();
1588       }
1589 
1590       count = channelRead(channel, data);
1591 
1592       if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
1593         process();
1594       }
1595 
1596       return count;
1597     }
1598 
1599     /**
1600      * Process the data buffer and clean the connection state for the next call.
1601      */
1602     private void process() throws IOException, InterruptedException {
1603       data.flip();
1604       try {
1605         if (skipInitialSaslHandshake) {
1606           skipInitialSaslHandshake = false;
1607           return;
1608         }
1609 
1610         if (useSasl) {
1611           saslReadAndProcess(data.array());
1612         } else {
1613           processOneRpc(data.array());
1614         }
1615 
1616       } finally {
1617         dataLengthBuffer.clear(); // Clean for the next call
1618         data = null; // For the GC
1619       }
1620     }
1621 
1622     private String getFatalConnectionString(final int version, final byte authByte) {
1623       return "serverVersion=" + CURRENT_VERSION +
1624       ", clientVersion=" + version + ", authMethod=" + authByte +
1625       ", authSupported=" + (authMethod != null) + " from " + toString();
1626     }
1627 
1628     private int doBadPreambleHandling(final String msg) throws IOException {
1629       return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1630     }
1631 
1632     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1633       LOG.warn(msg);
1634       Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1635       setupResponse(null, fakeCall, e, msg);
1636       responder.doRespond(fakeCall);
1637       // Returning -1 closes out the connection.
1638       return -1;
1639     }
1640 
1641     // Reads the connection header following version
1642     private void processConnectionHeader(byte[] buf) throws IOException {
1643       this.connectionHeader = ConnectionHeader.parseFrom(buf);
1644       String serviceName = connectionHeader.getServiceName();
1645       if (serviceName == null) throw new EmptyServiceNameException();
1646       this.service = getService(services, serviceName);
1647       if (this.service == null) throw new UnknownServiceException(serviceName);
1648       setupCellBlockCodecs(this.connectionHeader);
1649       UserGroupInformation protocolUser = createUser(connectionHeader);
1650       if (!useSasl) {
1651         ugi = protocolUser;
1652         if (ugi != null) {
1653           ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1654         }
1655         // audit logging for SASL authenticated users happens in saslReadAndProcess()
1656         if (authenticatedWithFallback) {
1657           LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
1658               + " connecting from " + getHostAddress());
1659         }
1660         AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
1661       } else {
1662         // user is authenticated
1663         ugi.setAuthenticationMethod(authMethod.authenticationMethod);
1664         //Now we check if this is a proxy user case. If the protocol user is
1665         //different from the 'user', it is a proxy user scenario. However,
1666         //this is not allowed if user authenticated with DIGEST.
1667         if ((protocolUser != null)
1668             && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
1669           if (authMethod == AuthMethod.DIGEST) {
1670             // Not allowed to doAs if token authentication is used
1671             throw new AccessDeniedException("Authenticated user (" + ugi
1672                 + ") doesn't match what the client claims to be ("
1673                 + protocolUser + ")");
1674           } else {
1675             // Effective user can be different from authenticated user
1676             // for simple auth or kerberos auth
1677             // The user is the real user. Now we create a proxy user
1678             UserGroupInformation realUser = ugi;
1679             ugi = UserGroupInformation.createProxyUser(protocolUser
1680                 .getUserName(), realUser);
1681             // Now the user is a proxy user, set Authentication method Proxy.
1682             ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
1683           }
1684         }
1685       }
1686       if (connectionHeader.hasVersionInfo()) {
1687         // see if this connection will support RetryImmediatelyException
1688         retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
1689 
1690         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1691             + " with version info: "
1692             + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1693       } else {
1694         AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1695             + " with unknown version info");
1696       }
1697 
1698 
1699     }
1700 
1701     /**
1702      * Set up cell block codecs
1703      * @throws FatalConnectionException
1704      */
1705     private void setupCellBlockCodecs(final ConnectionHeader header)
1706     throws FatalConnectionException {
1707       // TODO: Plug in other supported decoders.
1708       if (!header.hasCellBlockCodecClass()) return;
1709       String className = header.getCellBlockCodecClass();
1710       if (className == null || className.length() == 0) return;
1711       try {
1712         this.codec = (Codec)Class.forName(className).newInstance();
1713       } catch (Exception e) {
1714         throw new UnsupportedCellCodecException(className, e);
1715       }
1716       if (!header.hasCellBlockCompressorClass()) return;
1717       className = header.getCellBlockCompressorClass();
1718       try {
1719         this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1720       } catch (Exception e) {
1721         throw new UnsupportedCompressionCodecException(className, e);
1722       }
1723     }
1724 
1725     private void processUnwrappedData(byte[] inBuf) throws IOException,
1726     InterruptedException {
1727       ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1728       // Read all RPCs contained in the inBuf, even partial ones
1729       while (true) {
1730         int count;
1731         if (unwrappedDataLengthBuffer.remaining() > 0) {
1732           count = channelRead(ch, unwrappedDataLengthBuffer);
1733           if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1734             return;
1735         }
1736 
1737         if (unwrappedData == null) {
1738           unwrappedDataLengthBuffer.flip();
1739           int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1740 
1741           if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1742             if (LOG.isDebugEnabled())
1743               LOG.debug("Received ping message");
1744             unwrappedDataLengthBuffer.clear();
1745             continue; // ping message
1746           }
1747           unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1748         }
1749 
1750         count = channelRead(ch, unwrappedData);
1751         if (count <= 0 || unwrappedData.remaining() > 0)
1752           return;
1753 
1754         if (unwrappedData.remaining() == 0) {
1755           unwrappedDataLengthBuffer.clear();
1756           unwrappedData.flip();
1757           processOneRpc(unwrappedData.array());
1758           unwrappedData = null;
1759         }
1760       }
1761     }
1762 
1763     private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
1764       if (connectionHeaderRead) {
1765         processRequest(buf);
1766       } else {
1767         processConnectionHeader(buf);
1768         this.connectionHeaderRead = true;
1769         if (!authorizeConnection()) {
1770           // Throw FatalConnectionException wrapping ACE so client does right thing and closes
1771           // down the connection instead of trying to read non-existent retun.
1772           throw new AccessDeniedException("Connection from " + this + " for service " +
1773             connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
1774         }
1775         this.user = userProvider.create(this.ugi);
1776       }
1777     }
1778 
1779     /**
1780      * @param buf Has the request header and the request param and optionally encoded data buffer
1781      * all in this one array.
1782      * @throws IOException
1783      * @throws InterruptedException
1784      */
1785     protected void processRequest(byte[] buf) throws IOException, InterruptedException {
1786       long totalRequestSize = buf.length;
1787       int offset = 0;
1788       // Here we read in the header.  We avoid having pb
1789       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
1790       CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
1791       int headerSize = cis.readRawVarint32();
1792       offset = cis.getTotalBytesRead();
1793       Message.Builder builder = RequestHeader.newBuilder();
1794       ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
1795       RequestHeader header = (RequestHeader) builder.build();
1796       offset += headerSize;
1797       int id = header.getCallId();
1798       if (LOG.isTraceEnabled()) {
1799         LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1800           " totalRequestSize: " + totalRequestSize + " bytes");
1801       }
1802       // Enforcing the call queue size, this triggers a retry in the client
1803       // This is a bit late to be doing this check - we have already read in the total request.
1804       if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1805         final Call callTooBig =
1806           new Call(id, this.service, null, null, null, null, this,
1807             responder, totalRequestSize, null, null);
1808         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1809         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1810         InetSocketAddress address = getListenerAddress();
1811         setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1812             "Call queue is full on " + (address != null ? address : "(channel closed)") +
1813                 ", is hbase.ipc.server.max.callqueue.size too small?");
1814         responder.doRespond(callTooBig);
1815         return;
1816       }
1817       MethodDescriptor md = null;
1818       Message param = null;
1819       CellScanner cellScanner = null;
1820       try {
1821         if (header.hasRequestParam() && header.getRequestParam()) {
1822           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1823           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1824           builder = this.service.getRequestPrototype(md).newBuilderForType();
1825           // To read the varint, I need an inputstream; might as well be a CIS.
1826           cis = CodedInputStream.newInstance(buf, offset, buf.length);
1827           int paramSize = cis.readRawVarint32();
1828           offset += cis.getTotalBytesRead();
1829           if (builder != null) {
1830             ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
1831             param = builder.build();
1832           }
1833           offset += paramSize;
1834         }
1835         if (header.hasCellBlockMeta()) {
1836           cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
1837             buf, offset, buf.length);
1838         }
1839       } catch (Throwable t) {
1840         InetSocketAddress address = getListenerAddress();
1841         String msg = (address != null ? address : "(channel closed)") +
1842             " is unable to read call parameter from client " + getHostAddress();
1843         LOG.warn(msg, t);
1844 
1845         metrics.exception(t);
1846 
1847         // probably the hbase hadoop version does not match the running hadoop version
1848         if (t instanceof LinkageError) {
1849           t = new DoNotRetryIOException(t);
1850         }
1851         // If the method is not present on the server, do not retry.
1852         if (t instanceof UnsupportedOperationException) {
1853           t = new DoNotRetryIOException(t);
1854         }
1855 
1856         final Call readParamsFailedCall =
1857           new Call(id, this.service, null, null, null, null, this,
1858             responder, totalRequestSize, null, null);
1859         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1860         setupResponse(responseBuffer, readParamsFailedCall, t,
1861           msg + "; " + t.getMessage());
1862         responder.doRespond(readParamsFailedCall);
1863         return;
1864       }
1865 
1866       TraceInfo traceInfo = header.hasTraceInfo()
1867           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1868           : null;
1869       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1870               totalRequestSize, traceInfo, this.addr);
1871 
1872       if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1873         callQueueSize.add(-1 * call.getSize());
1874 
1875         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1876         metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1877         InetSocketAddress address = getListenerAddress();
1878         setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1879             "Call queue is full on " + (address != null ? address : "(channel closed)") +
1880                 ", too many items queued ?");
1881         responder.doRespond(call);
1882       }
1883     }
1884 
1885     private boolean authorizeConnection() throws IOException {
1886       try {
1887         // If auth method is DIGEST, the token was obtained by the
1888         // real user for the effective user, therefore not required to
1889         // authorize real user. doAs is allowed only for simple or kerberos
1890         // authentication
1891         if (ugi != null && ugi.getRealUser() != null
1892             && (authMethod != AuthMethod.DIGEST)) {
1893           ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
1894         }
1895         authorize(ugi, connectionHeader, getHostInetAddress());
1896         metrics.authorizationSuccess();
1897       } catch (AuthorizationException ae) {
1898         if (LOG.isDebugEnabled()) {
1899           LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1900         }
1901         metrics.authorizationFailure();
1902         setupResponse(authFailedResponse, authFailedCall,
1903           new AccessDeniedException(ae), ae.getMessage());
1904         responder.doRespond(authFailedCall);
1905         return false;
1906       }
1907       return true;
1908     }
1909 
1910     protected synchronized void close() {
1911       disposeSasl();
1912       data = null;
1913       if (!channel.isOpen())
1914         return;
1915       try {socket.shutdownOutput();} catch(Exception ignored) {
1916         if (LOG.isTraceEnabled()) {
1917           LOG.trace(ignored);
1918         }
1919       }
1920       if (channel.isOpen()) {
1921         try {channel.close();} catch(Exception ignored) {
1922           if (LOG.isTraceEnabled()) {
1923             LOG.trace(ignored);
1924           }
1925         }
1926       }
1927       try {socket.close();} catch(Exception ignored) {
1928         if (LOG.isTraceEnabled()) {
1929           LOG.trace(ignored);
1930         }
1931       }
1932     }
1933 
1934     private UserGroupInformation createUser(ConnectionHeader head) {
1935       UserGroupInformation ugi = null;
1936 
1937       if (!head.hasUserInfo()) {
1938         return null;
1939       }
1940       UserInformation userInfoProto = head.getUserInfo();
1941       String effectiveUser = null;
1942       if (userInfoProto.hasEffectiveUser()) {
1943         effectiveUser = userInfoProto.getEffectiveUser();
1944       }
1945       String realUser = null;
1946       if (userInfoProto.hasRealUser()) {
1947         realUser = userInfoProto.getRealUser();
1948       }
1949       if (effectiveUser != null) {
1950         if (realUser != null) {
1951           UserGroupInformation realUserUgi =
1952               UserGroupInformation.createRemoteUser(realUser);
1953           ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1954         } else {
1955           ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1956         }
1957       }
1958       return ugi;
1959     }
1960   }
1961 
1962   /**
1963    * Datastructure for passing a {@link BlockingService} and its associated class of
1964    * protobuf service interface.  For example, a server that fielded what is defined
1965    * in the client protobuf service would pass in an implementation of the client blocking service
1966    * and then its ClientService.BlockingInterface.class.  Used checking connection setup.
1967    */
1968   public static class BlockingServiceAndInterface {
1969     private final BlockingService service;
1970     private final Class<?> serviceInterface;
1971     public BlockingServiceAndInterface(final BlockingService service,
1972         final Class<?> serviceInterface) {
1973       this.service = service;
1974       this.serviceInterface = serviceInterface;
1975     }
1976     public Class<?> getServiceInterface() {
1977       return this.serviceInterface;
1978     }
1979     public BlockingService getBlockingService() {
1980       return this.service;
1981     }
1982   }
1983 
1984   /**
1985    * Constructs a server listening on the named port and address.
1986    * @param server hosting instance of {@link Server}. We will do authentications if an
1987    * instance else pass null for no authentication check.
1988    * @param name Used keying this rpc servers' metrics and for naming the Listener thread.
1989    * @param services A list of services.
1990    * @param bindAddress Where to listen
1991    * @param conf
1992    * @param scheduler
1993    */
1994   public RpcServer(final Server server, final String name,
1995       final List<BlockingServiceAndInterface> services,
1996       final InetSocketAddress bindAddress, Configuration conf,
1997       RpcScheduler scheduler)
1998       throws IOException {
1999 
2000     if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) {
2001       this.reservoir = new BoundedByteBufferPool(
2002           conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
2003           conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
2004           // Make the max twice the number of handlers to be safe.
2005           conf.getInt("hbase.ipc.server.reservoir.initial.max",
2006               conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
2007                   HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
2008     } else {
2009       reservoir = null;
2010     }
2011     
2012     this.server = server;
2013     this.services = services;
2014     this.bindAddress = bindAddress;
2015     this.conf = conf;
2016     this.socketSendBufferSize = 0;
2017     this.maxQueueSize =
2018       this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2019     this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2020     this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2021     this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2022     this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2023     this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2024       2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2025     this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2026     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2027 
2028     // Start the listener here and let it bind to the port
2029     listener = new Listener(name);
2030     this.port = listener.getAddress().getPort();
2031 
2032     this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2033     this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2034     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2035 
2036     this.ipcUtil = new IPCUtil(conf);
2037 
2038 
2039     // Create the responder here
2040     responder = new Responder();
2041     this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2042     this.userProvider = UserProvider.instantiate(conf);
2043     this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2044     if (isSecurityEnabled) {
2045       HBaseSaslRpcServer.init(conf);
2046     }
2047     initReconfigurable(conf);
2048 
2049     this.scheduler = scheduler;
2050     this.scheduler.init(new RpcSchedulerContext(this));
2051   }
2052 
2053   @Override
2054   public void onConfigurationChange(Configuration newConf) {
2055     initReconfigurable(newConf);
2056   }
2057 
2058   private void initReconfigurable(Configuration confToLoad) {
2059     this.allowFallbackToSimpleAuth = confToLoad.getBoolean(FALLBACK_TO_INSECURE_CLIENT_AUTH, false);
2060     if (isSecurityEnabled && allowFallbackToSimpleAuth) {
2061       LOG.warn("********* WARNING! *********");
2062       LOG.warn("This server is configured to allow connections from INSECURE clients");
2063       LOG.warn("(" + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = true).");
2064       LOG.warn("While this option is enabled, client identities cannot be secured, and user");
2065       LOG.warn("impersonation is possible!");
2066       LOG.warn("For secure operation, please disable SIMPLE authentication as soon as possible,");
2067       LOG.warn("by setting " + FALLBACK_TO_INSECURE_CLIENT_AUTH + " = false in hbase-site.xml");
2068       LOG.warn("****************************");
2069     }
2070   }
2071 
2072   /**
2073    * Subclasses of HBaseServer can override this to provide their own
2074    * Connection implementations.
2075    */
2076   protected Connection getConnection(SocketChannel channel, long time) {
2077     return new Connection(channel, time);
2078   }
2079 
2080   /**
2081    * Setup response for the RPC Call.
2082    *
2083    * @param response buffer to serialize the response into
2084    * @param call {@link Call} to which we are setting up the response
2085    * @param error error message, if the call failed
2086    * @throws IOException
2087    */
2088   private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2089   throws IOException {
2090     if (response != null) response.reset();
2091     call.setResponse(null, null, t, error);
2092   }
2093 
2094   protected void closeConnection(Connection connection) {
2095     synchronized (connectionList) {
2096       if (connectionList.remove(connection)) {
2097         numConnections--;
2098       }
2099     }
2100     connection.close();
2101   }
2102 
2103   Configuration getConf() {
2104     return conf;
2105   }
2106 
2107   /** Sets the socket buffer size used for responding to RPCs.
2108    * @param size send size
2109    */
2110   @Override
2111   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2112 
2113   @Override
2114   public boolean isStarted() {
2115     return this.started;
2116   }
2117 
2118   /** Starts the service.  Must be called before any calls will be handled. */
2119   @Override
2120   public synchronized void start() {
2121     if (started) return;
2122     authTokenSecretMgr = createSecretManager();
2123     if (authTokenSecretMgr != null) {
2124       setSecretManager(authTokenSecretMgr);
2125       authTokenSecretMgr.start();
2126     }
2127     this.authManager = new ServiceAuthorizationManager();
2128     HBasePolicyProvider.init(conf, authManager);
2129     responder.start();
2130     listener.start();
2131     scheduler.start();
2132     started = true;
2133   }
2134 
2135   @Override
2136   public synchronized void refreshAuthManager(PolicyProvider pp) {
2137     // Ignore warnings that this should be accessed in a static way instead of via an instance;
2138     // it'll break if you go via static route.
2139     this.authManager.refresh(this.conf, pp);
2140   }
2141 
2142   private AuthenticationTokenSecretManager createSecretManager() {
2143     if (!isSecurityEnabled) return null;
2144     if (server == null) return null;
2145     Configuration conf = server.getConfiguration();
2146     long keyUpdateInterval =
2147         conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2148     long maxAge =
2149         conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2150     return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2151         server.getServerName().toString(), keyUpdateInterval, maxAge);
2152   }
2153 
2154   public SecretManager<? extends TokenIdentifier> getSecretManager() {
2155     return this.secretManager;
2156   }
2157 
2158   @SuppressWarnings("unchecked")
2159   public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2160     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2161   }
2162 
2163   /**
2164    * This is a server side method, which is invoked over RPC. On success
2165    * the return response has protobuf response payload. On failure, the
2166    * exception name and the stack trace are returned in the protobuf response.
2167    */
2168   @Override
2169   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2170       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2171   throws IOException {
2172     try {
2173       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2174       // TODO: Review after we add in encoded data blocks.
2175       status.setRPCPacket(param);
2176       status.resume("Servicing call");
2177       //get an instance of the method arg type
2178       long startTime = System.currentTimeMillis();
2179       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2180       Message result = service.callBlockingMethod(md, controller, param);
2181       long endTime = System.currentTimeMillis();
2182       int processingTime = (int) (endTime - startTime);
2183       int qTime = (int) (startTime - receiveTime);
2184       int totalTime = (int) (endTime - receiveTime);
2185       if (LOG.isTraceEnabled()) {
2186         LOG.trace(CurCall.get().toString() +
2187             ", response " + TextFormat.shortDebugString(result) +
2188             " queueTime: " + qTime +
2189             " processingTime: " + processingTime +
2190             " totalTime: " + totalTime);
2191       }
2192       long requestSize = param.getSerializedSize();
2193       long responseSize = result.getSerializedSize();
2194       metrics.dequeuedCall(qTime);
2195       metrics.processedCall(processingTime);
2196       metrics.totalCall(totalTime);
2197       metrics.receivedRequest(requestSize);
2198       metrics.sentResponse(responseSize);
2199       // log any RPC responses that are slower than the configured warn
2200       // response time or larger than configured warning size
2201       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2202       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2203       if (tooSlow || tooLarge) {
2204         // when tagging, we let TooLarge trump TooSmall to keep output simple
2205         // note that large responses will often also be slow.
2206         logResponse(new Object[]{param},
2207             md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2208             (tooLarge ? "TooLarge" : "TooSlow"),
2209             status.getClient(), startTime, processingTime, qTime,
2210             responseSize);
2211       }
2212       return new Pair<Message, CellScanner>(result, controller.cellScanner());
2213     } catch (Throwable e) {
2214       // The above callBlockingMethod will always return a SE.  Strip the SE wrapper before
2215       // putting it on the wire.  Its needed to adhere to the pb Service Interface but we don't
2216       // need to pass it over the wire.
2217       if (e instanceof ServiceException) e = e.getCause();
2218 
2219       // increment the number of requests that were exceptions.
2220       metrics.exception(e);
2221 
2222       if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2223       if (e instanceof IOException) throw (IOException)e;
2224       LOG.error("Unexpected throwable object ", e);
2225       throw new IOException(e.getMessage(), e);
2226     }
2227   }
2228 
2229   /**
2230    * Logs an RPC response to the LOG file, producing valid JSON objects for
2231    * client Operations.
2232    * @param params The parameters received in the call.
2233    * @param methodName The name of the method invoked
2234    * @param call The string representation of the call
2235    * @param tag  The tag that will be used to indicate this event in the log.
2236    * @param clientAddress   The address of the client who made this call.
2237    * @param startTime       The time that the call was initiated, in ms.
2238    * @param processingTime  The duration that the call took to run, in ms.
2239    * @param qTime           The duration that the call spent on the queue
2240    *                        prior to being initiated, in ms.
2241    * @param responseSize    The size in bytes of the response buffer.
2242    */
2243   void logResponse(Object[] params, String methodName, String call, String tag,
2244       String clientAddress, long startTime, int processingTime, int qTime,
2245       long responseSize)
2246           throws IOException {
2247     // base information that is reported regardless of type of call
2248     Map<String, Object> responseInfo = new HashMap<String, Object>();
2249     responseInfo.put("starttimems", startTime);
2250     responseInfo.put("processingtimems", processingTime);
2251     responseInfo.put("queuetimems", qTime);
2252     responseInfo.put("responsesize", responseSize);
2253     responseInfo.put("client", clientAddress);
2254     responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2255     responseInfo.put("method", methodName);
2256     if (params.length == 2 && server instanceof HRegionServer &&
2257         params[0] instanceof byte[] &&
2258         params[1] instanceof Operation) {
2259       // if the slow process is a query, we want to log its table as well
2260       // as its own fingerprint
2261       TableName tableName = TableName.valueOf(
2262           HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2263       responseInfo.put("table", tableName.getNameAsString());
2264       // annotate the response map with operation details
2265       responseInfo.putAll(((Operation) params[1]).toMap());
2266       // report to the log file
2267       LOG.warn("(operation" + tag + "): " +
2268                MAPPER.writeValueAsString(responseInfo));
2269     } else if (params.length == 1 && server instanceof HRegionServer &&
2270         params[0] instanceof Operation) {
2271       // annotate the response map with operation details
2272       responseInfo.putAll(((Operation) params[0]).toMap());
2273       // report to the log file
2274       LOG.warn("(operation" + tag + "): " +
2275                MAPPER.writeValueAsString(responseInfo));
2276     } else {
2277       // can't get JSON details, so just report call.toString() along with
2278       // a more generic tag.
2279       responseInfo.put("call", call);
2280       LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2281     }
2282   }
2283 
2284   /** Stops the service.  No new calls will be handled after this is called. */
2285   @Override
2286   public synchronized void stop() {
2287     LOG.info("Stopping server on " + port);
2288     running = false;
2289     if (authTokenSecretMgr != null) {
2290       authTokenSecretMgr.stop();
2291       authTokenSecretMgr = null;
2292     }
2293     listener.interrupt();
2294     listener.doStop();
2295     responder.interrupt();
2296     scheduler.stop();
2297     notifyAll();
2298   }
2299 
2300   /** Wait for the server to be stopped.
2301    * Does not wait for all subthreads to finish.
2302    *  See {@link #stop()}.
2303    * @throws InterruptedException e
2304    */
2305   @Override
2306   public synchronized void join() throws InterruptedException {
2307     while (running) {
2308       wait();
2309     }
2310   }
2311 
2312   /**
2313    * Return the socket (ip+port) on which the RPC server is listening to. May return null if
2314    * the listener channel is closed.
2315    * @return the socket (ip+port) on which the RPC server is listening to, or null if this
2316    * information cannot be determined
2317    */
2318   @Override
2319   public synchronized InetSocketAddress getListenerAddress() {
2320     if (listener == null) {
2321       return null;
2322     }
2323     return listener.getAddress();
2324   }
2325 
2326   /**
2327    * Set the handler for calling out of RPC for error conditions.
2328    * @param handler the handler implementation
2329    */
2330   @Override
2331   public void setErrorHandler(HBaseRPCErrorHandler handler) {
2332     this.errorHandler = handler;
2333   }
2334 
2335   @Override
2336   public HBaseRPCErrorHandler getErrorHandler() {
2337     return this.errorHandler;
2338   }
2339 
2340   /**
2341    * Returns the metrics instance for reporting RPC call statistics
2342    */
2343   @Override
2344   public MetricsHBaseServer getMetrics() {
2345     return metrics;
2346   }
2347 
2348   @Override
2349   public void addCallSize(final long diff) {
2350     this.callQueueSize.add(diff);
2351   }
2352 
2353   /**
2354    * Authorize the incoming client connection.
2355    *
2356    * @param user client user
2357    * @param connection incoming connection
2358    * @param addr InetAddress of incoming connection
2359    * @throws org.apache.hadoop.security.authorize.AuthorizationException
2360    *         when the client isn't authorized to talk the protocol
2361    */
2362   public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
2363       InetAddress addr)
2364   throws AuthorizationException {
2365     if (authorize) {
2366       Class<?> c = getServiceInterface(services, connection.getServiceName());
2367       this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2368     }
2369   }
2370 
2371   /**
2372    * When the read or write buffer size is larger than this limit, i/o will be
2373    * done in chunks of this size. Most RPC requests and responses would be
2374    * be smaller.
2375    */
2376   private static int NIO_BUFFER_LIMIT = 64 * 1024; //should not be more than 64KB.
2377 
2378   /**
2379    * This is a wrapper around {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}.
2380    * If the amount of data is large, it writes to channel in smaller chunks.
2381    * This is to avoid jdk from creating many direct buffers as the size of
2382    * buffer increases. This also minimizes extra copies in NIO layer
2383    * as a result of multiple write operations required to write a large
2384    * buffer.
2385    *
2386    * @param channel writable byte channel to write to
2387    * @param bufferChain Chain of buffers to write
2388    * @return number of bytes written
2389    * @throws java.io.IOException e
2390    * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
2391    */
2392   protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2393   throws IOException {
2394     long count =  bufferChain.write(channel, NIO_BUFFER_LIMIT);
2395     if (count > 0) this.metrics.sentBytes(count);
2396     return count;
2397   }
2398 
2399   /**
2400    * This is a wrapper around {@link java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)}.
2401    * If the amount of data is large, it writes to channel in smaller chunks.
2402    * This is to avoid jdk from creating many direct buffers as the size of
2403    * ByteBuffer increases. There should not be any performance degredation.
2404    *
2405    * @param channel writable byte channel to write on
2406    * @param buffer buffer to write
2407    * @return number of bytes written
2408    * @throws java.io.IOException e
2409    * @see java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
2410    */
2411   protected int channelRead(ReadableByteChannel channel,
2412                                    ByteBuffer buffer) throws IOException {
2413 
2414     int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2415            channel.read(buffer) : channelIO(channel, null, buffer);
2416     if (count > 0) {
2417       metrics.receivedBytes(count);
2418     }
2419     return count;
2420   }
2421 
2422   /**
2423    * Helper for {@link #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)}
2424    * and {@link #channelWrite(GatheringByteChannel, BufferChain)}. Only
2425    * one of readCh or writeCh should be non-null.
2426    *
2427    * @param readCh read channel
2428    * @param writeCh write channel
2429    * @param buf buffer to read or write into/out of
2430    * @return bytes written
2431    * @throws java.io.IOException e
2432    * @see #channelRead(java.nio.channels.ReadableByteChannel, java.nio.ByteBuffer)
2433    * @see #channelWrite(GatheringByteChannel, BufferChain)
2434    */
2435   private static int channelIO(ReadableByteChannel readCh,
2436                                WritableByteChannel writeCh,
2437                                ByteBuffer buf) throws IOException {
2438 
2439     int originalLimit = buf.limit();
2440     int initialRemaining = buf.remaining();
2441     int ret = 0;
2442 
2443     while (buf.remaining() > 0) {
2444       try {
2445         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2446         buf.limit(buf.position() + ioSize);
2447 
2448         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2449 
2450         if (ret < ioSize) {
2451           break;
2452         }
2453 
2454       } finally {
2455         buf.limit(originalLimit);
2456       }
2457     }
2458 
2459     int nBytes = initialRemaining - buf.remaining();
2460     return (nBytes > 0) ? nBytes : ret;
2461   }
2462 
2463   /**
2464    * Needed for features such as delayed calls.  We need to be able to store the current call
2465    * so that we can complete it later or ask questions of what is supported by the current ongoing
2466    * call.
2467    * @return An RpcCallContext backed by the currently ongoing call (gotten from a thread local)
2468    */
2469   public static RpcCallContext getCurrentCall() {
2470     return CurCall.get();
2471   }
2472 
2473   public static boolean isInRpcCallContext() {
2474     return CurCall.get() != null;
2475   }
2476 
2477   /**
2478    * Returns the user credentials associated with the current RPC request or
2479    * <code>null</code> if no credentials were provided.
2480    * @return A User
2481    */
2482   public static User getRequestUser() {
2483     RpcCallContext ctx = getCurrentCall();
2484     return ctx == null? null: ctx.getRequestUser();
2485   }
2486 
2487   /**
2488    * Returns the username for any user associated with the current RPC
2489    * request or <code>null</code> if no user is set.
2490    */
2491   public static String getRequestUserName() {
2492     User user = getRequestUser();
2493     return user == null? null: user.getShortName();
2494   }
2495 
2496   /**
2497    * @return Address of remote client if a request is ongoing, else null
2498    */
2499   public static InetAddress getRemoteAddress() {
2500     RpcCallContext ctx = getCurrentCall();
2501     return ctx == null? null: ctx.getRemoteAddress();
2502   }
2503 
2504   /**
2505    * @param serviceName Some arbitrary string that represents a 'service'.
2506    * @param services Available service instances
2507    * @return Matching BlockingServiceAndInterface pair
2508    */
2509   static BlockingServiceAndInterface getServiceAndInterface(
2510       final List<BlockingServiceAndInterface> services, final String serviceName) {
2511     for (BlockingServiceAndInterface bs : services) {
2512       if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2513         return bs;
2514       }
2515     }
2516     return null;
2517   }
2518 
2519   /**
2520    * @param serviceName Some arbitrary string that represents a 'service'.
2521    * @param services Available services and their service interfaces.
2522    * @return Service interface class for <code>serviceName</code>
2523    */
2524   static Class<?> getServiceInterface(
2525       final List<BlockingServiceAndInterface> services,
2526       final String serviceName) {
2527     BlockingServiceAndInterface bsasi =
2528         getServiceAndInterface(services, serviceName);
2529     return bsasi == null? null: bsasi.getServiceInterface();
2530   }
2531 
2532   /**
2533    * @param serviceName Some arbitrary string that represents a 'service'.
2534    * @param services Available services and their service interfaces.
2535    * @return BlockingService that goes with the passed <code>serviceName</code>
2536    */
2537   static BlockingService getService(
2538       final List<BlockingServiceAndInterface> services,
2539       final String serviceName) {
2540     BlockingServiceAndInterface bsasi =
2541         getServiceAndInterface(services, serviceName);
2542     return bsasi == null? null: bsasi.getBlockingService();
2543   }
2544 
2545   static MonitoredRPCHandler getStatus() {
2546     // It is ugly the way we park status up in RpcServer.  Let it be for now.  TODO.
2547     MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2548     if (status != null) {
2549       return status;
2550     }
2551     status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2552     status.pause("Waiting for a call");
2553     RpcServer.MONITORED_RPC.set(status);
2554     return status;
2555   }
2556 
2557   /** Returns the remote side ip address when invoked inside an RPC
2558    *  Returns null incase of an error.
2559    *  @return InetAddress
2560    */
2561   public static InetAddress getRemoteIp() {
2562     Call call = CurCall.get();
2563     if (call != null && call.connection != null && call.connection.socket != null) {
2564       return call.connection.socket.getInetAddress();
2565     }
2566     return null;
2567   }
2568 
2569 
2570   /**
2571    * A convenience method to bind to a given address and report
2572    * better exceptions if the address is not a valid host.
2573    * @param socket the socket to bind
2574    * @param address the address to bind to
2575    * @param backlog the number of connections allowed in the queue
2576    * @throws BindException if the address can't be bound
2577    * @throws UnknownHostException if the address isn't a valid host name
2578    * @throws IOException other random errors from bind
2579    */
2580   public static void bind(ServerSocket socket, InetSocketAddress address,
2581                           int backlog) throws IOException {
2582     try {
2583       socket.bind(address, backlog);
2584     } catch (BindException e) {
2585       BindException bindException =
2586         new BindException("Problem binding to " + address + " : " +
2587             e.getMessage());
2588       bindException.initCause(e);
2589       throw bindException;
2590     } catch (SocketException e) {
2591       // If they try to bind to a different host's address, give a better
2592       // error message.
2593       if ("Unresolved address".equals(e.getMessage())) {
2594         throw new UnknownHostException("Invalid hostname for server: " +
2595                                        address.getHostName());
2596       }
2597       throw e;
2598     }
2599   }
2600 
2601   @Override
2602   public RpcScheduler getScheduler() {
2603     return scheduler;
2604   }
2605 }