View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.ipc;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.protobuf.BlockingRpcChannel;
23  import com.google.protobuf.Descriptors;
24  import com.google.protobuf.Message;
25  import com.google.protobuf.RpcController;
26  import com.google.protobuf.ServiceException;
27  
28  import java.io.IOException;
29  import java.net.ConnectException;
30  import java.net.InetSocketAddress;
31  import java.net.SocketAddress;
32  import java.net.SocketTimeoutException;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.CellScanner;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.ServerName;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.MetricsConnection;
42  import org.apache.hadoop.hbase.codec.Codec;
43  import org.apache.hadoop.hbase.codec.KeyValueCodec;
44  import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
45  import org.apache.hadoop.hbase.security.User;
46  import org.apache.hadoop.hbase.security.UserProvider;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.hbase.util.Pair;
49  import org.apache.hadoop.hbase.util.PoolMap;
50  import org.apache.hadoop.io.compress.CompressionCodec;
51  
52  /**
53   * Provides the basics for a RpcClient implementation like configuration and Logging.
54   */
55  @InterfaceAudience.Private
56  public abstract class AbstractRpcClient implements RpcClient {
57    // Log level is being changed in tests
58    public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
59  
60    protected final Configuration conf;
61    protected String clusterId;
62    protected final SocketAddress localAddr;
63    protected final MetricsConnection metrics;
64  
65    protected UserProvider userProvider;
66    protected final IPCUtil ipcUtil;
67  
68    protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
69    // time (in ms), it will be closed at any moment.
70    protected final int maxRetries; //the max. no. of retries for socket connections
71    protected final long failureSleep; // Time to sleep before retry on failure.
72    protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
73    protected final boolean tcpKeepAlive; // if T then use keepalives
74    protected final Codec codec;
75    protected final CompressionCodec compressor;
76    protected final boolean fallbackAllowed;
77  
78    protected final int connectTO;
79    protected final int readTO;
80    protected final int writeTO;
81  
82    /**
83     * Construct an IPC client for the cluster <code>clusterId</code>
84     *
85     * @param conf configuration
86     * @param clusterId the cluster id
87     * @param localAddr client socket bind address.
88     * @param metrics the connection metrics
89     */
90    public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
91        MetricsConnection metrics) {
92      this.userProvider = UserProvider.instantiate(conf);
93      this.localAddr = localAddr;
94      this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
95      this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
96      this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
97          HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
98      this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
99      this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
100     this.ipcUtil = new IPCUtil(conf);
101 
102     this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
103     this.conf = conf;
104     this.codec = getCodec();
105     this.compressor = getCompressor(conf);
106     this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
107         IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
108     this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
109     this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
110     this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
111     this.metrics = metrics;
112 
113     // login the server principal (if using secure Hadoop)
114     if (LOG.isDebugEnabled()) {
115       LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
116           ", tcpKeepAlive=" + this.tcpKeepAlive +
117           ", tcpNoDelay=" + this.tcpNoDelay +
118           ", connectTO=" + this.connectTO +
119           ", readTO=" + this.readTO +
120           ", writeTO=" + this.writeTO +
121           ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
122           ", maxRetries=" + this.maxRetries +
123           ", fallbackAllowed=" + this.fallbackAllowed +
124           ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
125     }
126   }
127 
128   @VisibleForTesting
129   public static String getDefaultCodec(final Configuration c) {
130     // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
131     // Configuration will complain -- then no default codec (and we'll pb everything).  Else
132     // default is KeyValueCodec
133     return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
134   }
135 
136   /**
137    * Encapsulate the ugly casting and RuntimeException conversion in private method.
138    * @return Codec to use on this client.
139    */
140   Codec getCodec() {
141     // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
142     // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
143     String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
144     if (className == null || className.length() == 0) return null;
145     try {
146       return (Codec)Class.forName(className).newInstance();
147     } catch (Exception e) {
148       throw new RuntimeException("Failed getting codec " + className, e);
149     }
150   }
151 
152   @Override
153   public boolean hasCellBlockSupport() {
154     return this.codec != null;
155   }
156 
157   /**
158    * Encapsulate the ugly casting and RuntimeException conversion in private method.
159    * @param conf configuration
160    * @return The compressor to use on this client.
161    */
162   private static CompressionCodec getCompressor(final Configuration conf) {
163     String className = conf.get("hbase.client.rpc.compressor", null);
164     if (className == null || className.isEmpty()) return null;
165     try {
166         return (CompressionCodec)Class.forName(className).newInstance();
167     } catch (Exception e) {
168       throw new RuntimeException("Failed getting compressor " + className, e);
169     }
170   }
171 
172   /**
173    * Return the pool type specified in the configuration, which must be set to
174    * either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
175    * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
176    * otherwise default to the former.
177    *
178    * For applications with many user threads, use a small round-robin pool. For
179    * applications with few user threads, you may want to try using a
180    * thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
181    * instances should not exceed the operating system's hard limit on the number of
182    * connections.
183    *
184    * @param config configuration
185    * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
186    *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
187    */
188   protected static PoolMap.PoolType getPoolType(Configuration config) {
189     return PoolMap.PoolType
190         .valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
191             PoolMap.PoolType.ThreadLocal);
192   }
193 
194   /**
195    * Return the pool size specified in the configuration, which is applicable only if
196    * the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
197    *
198    * @param config configuration
199    * @return the maximum pool size
200    */
201   protected static int getPoolSize(Configuration config) {
202     return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
203   }
204 
205   /**
206    * Make a blocking call. Throws exceptions if there are network problems or if the remote code
207    * threw an exception.
208    *
209    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
210    *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
211    *               will be a
212    *               new Connection each time.
213    * @return A pair with the Message response and the Cell data (if any).
214    */
215   Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc,
216       Message param, Message returnType, final User ticket, final InetSocketAddress isa)
217       throws ServiceException {
218     if (pcrc == null) {
219       pcrc = new PayloadCarryingRpcController();
220     }
221 
222     Pair<Message, CellScanner> val;
223     try {
224       final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
225       cs.setStartTime(EnvironmentEdgeManager.currentTime());
226       val = call(pcrc, md, param, returnType, ticket, isa, cs);
227       // Shove the results into controller so can be carried across the proxy/pb service void.
228       pcrc.setCellScanner(val.getSecond());
229 
230       cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
231       if (metrics != null) {
232         metrics.updateRpc(md, param, cs);
233       }
234       if (LOG.isTraceEnabled()) {
235         LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
236       }
237       return val.getFirst();
238     } catch (Throwable e) {
239       throw new ServiceException(e);
240     }
241   }
242 
243   /**
244    * Make a call, passing <code>param</code>, to the IPC server running at
245    * <code>address</code> which is servicing the <code>protocol</code> protocol,
246    * with the <code>ticket</code> credentials, returning the value.
247    * Throws exceptions if there are network problems or if the remote code
248    * threw an exception.
249    *
250    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
251    *               {@link UserProvider#getCurrent()} makes a new instance of User each time so
252    *               will be a
253    *               new Connection each time.
254    * @return A pair with the Message response and the Cell data (if any).
255    * @throws InterruptedException
256    * @throws java.io.IOException
257    */
258   protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
259       Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
260       InetSocketAddress isa, MetricsConnection.CallStats callStats)
261       throws IOException, InterruptedException;
262 
263   @Override
264   public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
265       int defaultOperationTimeout) {
266     return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
267   }
268 
269   /**
270    * Takes an Exception and the address we were trying to connect to and return an IOException with
271    * the input exception as the cause. The new exception provides the stack trace of the place where
272    * the exception is thrown and some extra diagnostics information. If the exception is
273    * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
274    * an IOException.
275    * @param addr target address
276    * @param exception the relevant exception
277    * @return an exception to throw
278    */
279   protected IOException wrapException(InetSocketAddress addr, Exception exception) {
280     if (exception instanceof ConnectException) {
281       // connection refused; include the host:port in the error
282       return (ConnectException) new ConnectException("Call to " + addr
283           + " failed on connection exception: " + exception).initCause(exception);
284     } else if (exception instanceof SocketTimeoutException) {
285       return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
286           + " failed because " + exception).initCause(exception);
287     } else if (exception instanceof ConnectionClosingException) {
288       return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
289           + " failed on local exception: " + exception).initCause(exception);
290     } else {
291       return (IOException) new IOException("Call to " + addr + " failed on local exception: "
292           + exception).initCause(exception);
293     }
294   }
295 
296   /**
297    * Blocking rpc channel that goes via hbase rpc.
298    */
299   @VisibleForTesting
300   public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
301     private final InetSocketAddress isa;
302     private final AbstractRpcClient rpcClient;
303     private final User ticket;
304     private final int channelOperationTimeout;
305 
306     /**
307      * @param channelOperationTimeout - the default timeout when no timeout is given
308      */
309     protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
310         final ServerName sn, final User ticket, int channelOperationTimeout) {
311       this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
312       this.rpcClient = rpcClient;
313       this.ticket = ticket;
314       this.channelOperationTimeout = channelOperationTimeout;
315     }
316 
317     @Override
318     public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
319         Message param, Message returnType) throws ServiceException {
320       PayloadCarryingRpcController pcrc;
321       if (controller != null && controller instanceof PayloadCarryingRpcController) {
322         pcrc = (PayloadCarryingRpcController) controller;
323         if (!pcrc.hasCallTimeout()) {
324           pcrc.setCallTimeout(channelOperationTimeout);
325         }
326       } else {
327         pcrc = new PayloadCarryingRpcController();
328         pcrc.setCallTimeout(channelOperationTimeout);
329       }
330 
331       return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
332     }
333   }
334 }