1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.ipc;
21
22 import java.io.BufferedInputStream;
23 import java.io.BufferedOutputStream;
24 import java.io.Closeable;
25 import java.io.DataInputStream;
26 import java.io.DataOutputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.InterruptedIOException;
30 import java.io.OutputStream;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.SocketTimeoutException;
35 import java.net.UnknownHostException;
36 import java.nio.ByteBuffer;
37 import java.security.PrivilegedExceptionAction;
38 import java.util.HashMap;
39 import java.util.HashSet;
40 import java.util.Iterator;
41 import java.util.Map;
42 import java.util.Map.Entry;
43 import java.util.Random;
44 import java.util.Set;
45 import java.util.concurrent.ArrayBlockingQueue;
46 import java.util.concurrent.BlockingQueue;
47 import java.util.concurrent.ConcurrentSkipListMap;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicInteger;
50
51 import javax.net.SocketFactory;
52 import javax.security.sasl.SaslException;
53
54 import org.apache.commons.logging.Log;
55 import org.apache.commons.logging.LogFactory;
56 import org.apache.hadoop.conf.Configuration;
57 import org.apache.hadoop.hbase.CellScanner;
58 import org.apache.hadoop.hbase.DoNotRetryIOException;
59 import org.apache.hadoop.hbase.HConstants;
60 import org.apache.hadoop.hbase.ServerName;
61 import org.apache.hadoop.hbase.classification.InterfaceAudience;
62 import org.apache.hadoop.hbase.client.MetricsConnection;
63 import org.apache.hadoop.hbase.codec.Codec;
64 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
65 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
66 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
67 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
68 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
69 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
70 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
71 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
72 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
73 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
74 import org.apache.hadoop.hbase.security.AuthMethod;
75 import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
76 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
77 import org.apache.hadoop.hbase.security.SecurityInfo;
78 import org.apache.hadoop.hbase.security.User;
79 import org.apache.hadoop.hbase.security.UserProvider;
80 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
81 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
82 import org.apache.hadoop.hbase.util.ExceptionUtil;
83 import org.apache.hadoop.hbase.util.Pair;
84 import org.apache.hadoop.hbase.util.PoolMap;
85 import org.apache.hadoop.io.IOUtils;
86 import org.apache.hadoop.io.Text;
87 import org.apache.hadoop.io.compress.CompressionCodec;
88 import org.apache.hadoop.ipc.RemoteException;
89 import org.apache.hadoop.net.NetUtils;
90 import org.apache.hadoop.security.SecurityUtil;
91 import org.apache.hadoop.security.UserGroupInformation;
92 import org.apache.hadoop.security.token.Token;
93 import org.apache.hadoop.security.token.TokenIdentifier;
94 import org.apache.hadoop.security.token.TokenSelector;
95 import org.apache.htrace.Span;
96 import org.apache.htrace.Trace;
97 import org.apache.htrace.TraceScope;
98
99 import com.google.common.annotations.VisibleForTesting;
100 import com.google.protobuf.Descriptors.MethodDescriptor;
101 import com.google.protobuf.Message;
102 import com.google.protobuf.Message.Builder;
103 import com.google.protobuf.RpcCallback;
104
105
106
107
108
109 @InterfaceAudience.Private
110 public class RpcClientImpl extends AbstractRpcClient {
111 private static final Log LOG = LogFactory.getLog(RpcClientImpl.class);
112 protected final AtomicInteger callIdCnt = new AtomicInteger();
113
114 protected final PoolMap<ConnectionId, Connection> connections;
115
116 protected final AtomicBoolean running = new AtomicBoolean(true);
117
118 protected final FailedServers failedServers;
119
120 protected final SocketFactory socketFactory;
121
122 protected final static Map<AuthenticationProtos.TokenIdentifier.Kind,
123 TokenSelector<? extends TokenIdentifier>> tokenHandlers =
124 new HashMap<AuthenticationProtos.TokenIdentifier.Kind,
125 TokenSelector<? extends TokenIdentifier>>();
126 static {
127 tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
128 new AuthenticationTokenSelector());
129 }
130
131
132
133
134
135 protected Connection createConnection(ConnectionId remoteId, final Codec codec,
136 final CompressionCodec compressor)
137 throws IOException {
138 return new Connection(remoteId, codec, compressor);
139 }
140
141
142
143
144 private static class CallFuture {
145 final Call call;
146 final int priority;
147 final Span span;
148
149
150 final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
151
152 CallFuture(Call call, int priority, Span span) {
153 this.call = call;
154 this.priority = priority;
155 this.span = span;
156 }
157 }
158
159
160
161
162 protected class Connection extends Thread {
163 private ConnectionHeader header;
164 protected ConnectionId remoteId;
165 protected Socket socket = null;
166 protected DataInputStream in;
167 protected DataOutputStream out;
168 private Object outLock = new Object();
169 private InetSocketAddress server;
170 private String serverPrincipal;
171 private AuthMethod authMethod;
172 private boolean useSasl;
173 private Token<? extends TokenIdentifier> token;
174 private HBaseSaslRpcClient saslRpcClient;
175 private int reloginMaxBackoff;
176 private final Codec codec;
177 private final CompressionCodec compressor;
178
179
180 protected final ConcurrentSkipListMap<Integer, Call> calls =
181 new ConcurrentSkipListMap<Integer, Call>();
182
183 protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
184 protected final CallSender callSender;
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 private class CallSender extends Thread implements Closeable {
206 protected final BlockingQueue<CallFuture> callsToWrite;
207
208
209 public CallFuture sendCall(Call call, int priority, Span span)
210 throws InterruptedException, IOException {
211 CallFuture cts = new CallFuture(call, priority, span);
212 if (!callsToWrite.offer(cts)) {
213 throw new IOException("Can't add the call " + call.id +
214 " to the write queue. callsToWrite.size()=" + callsToWrite.size());
215 }
216 checkIsOpen();
217
218 return cts;
219 }
220
221 @Override
222 public void close(){
223 assert shouldCloseConnection.get();
224 callsToWrite.offer(CallFuture.DEATH_PILL);
225
226
227 }
228
229 CallSender(String name, Configuration conf) {
230 int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
231 callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
232 setDaemon(true);
233 setName(name + " - writer");
234 }
235
236 public void remove(CallFuture cts){
237 callsToWrite.remove(cts);
238
239
240
241 calls.remove(cts.call.id);
242 cts.call.callComplete();
243 }
244
245
246
247
248 @Override
249 public void run() {
250 while (!shouldCloseConnection.get()) {
251 CallFuture cts = null;
252 try {
253 cts = callsToWrite.take();
254 } catch (InterruptedException e) {
255 markClosed(new InterruptedIOException());
256 }
257
258 if (cts == null || cts == CallFuture.DEATH_PILL) {
259 assert shouldCloseConnection.get();
260 break;
261 }
262
263 if (cts.call.done) {
264 continue;
265 }
266
267 if (cts.call.checkAndSetTimeout()) {
268 continue;
269 }
270
271 try {
272 Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
273 } catch (IOException e) {
274 if (LOG.isDebugEnabled()) {
275 LOG.debug("call write error for call #" + cts.call.id
276 + ", message =" + e.getMessage());
277 }
278 cts.call.setException(e);
279 markClosed(e);
280 }
281 }
282
283 cleanup();
284 }
285
286
287
288
289 private void cleanup() {
290 assert shouldCloseConnection.get();
291
292 IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
293 while (true) {
294 CallFuture cts = callsToWrite.poll();
295 if (cts == null) {
296 break;
297 }
298 if (cts.call != null && !cts.call.done) {
299 cts.call.setException(ie);
300 }
301 }
302 }
303 }
304
305 Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
306 throws IOException {
307 if (remoteId.getAddress().isUnresolved()) {
308 throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
309 }
310 this.server = remoteId.getAddress();
311 this.codec = codec;
312 this.compressor = compressor;
313
314 UserGroupInformation ticket = remoteId.getTicket().getUGI();
315 SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
316 this.useSasl = userProvider.isHBaseSecurityEnabled();
317 if (useSasl && securityInfo != null) {
318 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
319 if (tokenKind != null) {
320 TokenSelector<? extends TokenIdentifier> tokenSelector =
321 tokenHandlers.get(tokenKind);
322 if (tokenSelector != null) {
323 token = tokenSelector.selectToken(new Text(clusterId),
324 ticket.getTokens());
325 } else if (LOG.isDebugEnabled()) {
326 LOG.debug("No token selector found for type "+tokenKind);
327 }
328 }
329 String serverKey = securityInfo.getServerPrincipal();
330 if (serverKey == null) {
331 throw new IOException(
332 "Can't obtain server Kerberos config key from SecurityInfo");
333 }
334 serverPrincipal = SecurityUtil.getServerPrincipal(
335 conf.get(serverKey), server.getAddress().getCanonicalHostName().toLowerCase());
336 if (LOG.isDebugEnabled()) {
337 LOG.debug("RPC Server Kerberos principal name for service="
338 + remoteId.getServiceName() + " is " + serverPrincipal);
339 }
340 }
341
342 if (!useSasl) {
343 authMethod = AuthMethod.SIMPLE;
344 } else if (token != null) {
345 authMethod = AuthMethod.DIGEST;
346 } else {
347 authMethod = AuthMethod.KERBEROS;
348 }
349
350 if (LOG.isDebugEnabled()) {
351 LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName +
352 ", sasl=" + useSasl);
353 }
354 reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);
355 this.remoteId = remoteId;
356
357 ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
358 builder.setServiceName(remoteId.getServiceName());
359 UserInformation userInfoPB = getUserInfo(ticket);
360 if (userInfoPB != null) {
361 builder.setUserInfo(userInfoPB);
362 }
363 if (this.codec != null) {
364 builder.setCellBlockCodecClass(this.codec.getClass().getCanonicalName());
365 }
366 if (this.compressor != null) {
367 builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
368 }
369 builder.setVersionInfo(ProtobufUtil.getVersionInfo());
370 this.header = builder.build();
371
372 this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
373 remoteId.getAddress().toString() +
374 ((ticket==null)?" from an unknown user": (" from "
375 + ticket.getUserName())));
376 this.setDaemon(true);
377
378 if (conf.getBoolean(SPECIFIC_WRITE_THREAD, false)) {
379 callSender = new CallSender(getName(), conf);
380 callSender.start();
381 } else {
382 callSender = null;
383 }
384 }
385
386 private synchronized UserInformation getUserInfo(UserGroupInformation ugi) {
387 if (ugi == null || authMethod == AuthMethod.DIGEST) {
388
389 return null;
390 }
391 UserInformation.Builder userInfoPB = UserInformation.newBuilder();
392 if (authMethod == AuthMethod.KERBEROS) {
393
394 userInfoPB.setEffectiveUser(ugi.getUserName());
395 } else if (authMethod == AuthMethod.SIMPLE) {
396
397 userInfoPB.setEffectiveUser(ugi.getUserName());
398 if (ugi.getRealUser() != null) {
399 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
400 }
401 }
402 return userInfoPB.build();
403 }
404
405 protected synchronized void setupConnection() throws IOException {
406 short ioFailures = 0;
407 short timeoutFailures = 0;
408 while (true) {
409 try {
410 this.socket = socketFactory.createSocket();
411 this.socket.setTcpNoDelay(tcpNoDelay);
412 this.socket.setKeepAlive(tcpKeepAlive);
413 if (localAddr != null) {
414 this.socket.bind(localAddr);
415 }
416 NetUtils.connect(this.socket, remoteId.getAddress(), connectTO);
417 this.socket.setSoTimeout(readTO);
418 return;
419 } catch (SocketTimeoutException toe) {
420
421
422
423 handleConnectionFailure(timeoutFailures++, maxRetries, toe);
424 } catch (IOException ie) {
425 handleConnectionFailure(ioFailures++, maxRetries, ie);
426 }
427 }
428 }
429
430 protected synchronized void closeConnection() {
431 if (socket == null) {
432 return;
433 }
434
435
436 try {
437 if (socket.getOutputStream() != null) {
438 socket.getOutputStream().close();
439 }
440 } catch (IOException ignored) {
441 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
442 }
443 try {
444 if (socket.getInputStream() != null) {
445 socket.getInputStream().close();
446 }
447 } catch (IOException ignored) {
448 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
449 }
450 try {
451 if (socket.getChannel() != null) {
452 socket.getChannel().close();
453 }
454 } catch (IOException ignored) {
455 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
456 }
457 try {
458 socket.close();
459 } catch (IOException e) {
460 LOG.warn("Not able to close a socket", e);
461 }
462
463
464
465 socket = null;
466 }
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483 private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
484 throws IOException {
485 closeConnection();
486
487
488 if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
489 throw ioe;
490 }
491
492
493 try {
494 Thread.sleep(failureSleep);
495 } catch (InterruptedException ie) {
496 ExceptionUtil.rethrowIfInterrupt(ie);
497 }
498
499 LOG.info("Retrying connect to server: " + remoteId.getAddress() +
500 " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
501 " time(s).");
502 }
503
504
505
506
507 private void checkIsOpen() throws IOException {
508 if (shouldCloseConnection.get()) {
509 throw new ConnectionClosingException(getName() + " is closing");
510 }
511 }
512
513
514
515
516
517
518
519 protected synchronized boolean waitForWork() throws InterruptedException {
520
521
522 long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
523
524 while (true) {
525 if (shouldCloseConnection.get()) {
526 return false;
527 }
528
529 if (!running.get()) {
530 markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
531 return false;
532 }
533
534 if (!calls.isEmpty()) {
535
536
537 return true;
538 }
539
540 if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
541
542
543
544
545 markClosed(new IOException(
546 "idle connection closed with " + calls.size() + " pending request(s)"));
547 return false;
548 }
549
550 wait(Math.min(minIdleTimeBeforeClose, 1000));
551 }
552 }
553
554 public InetSocketAddress getRemoteAddress() {
555 return remoteId.getAddress();
556 }
557
558 @Override
559 public void run() {
560 if (LOG.isTraceEnabled()) {
561 LOG.trace(getName() + ": starting, connections " + connections.size());
562 }
563
564 try {
565 while (waitForWork()) {
566 readResponse();
567 }
568 } catch (InterruptedException t) {
569 if (LOG.isTraceEnabled()) {
570 LOG.trace(getName() + ": interrupted while waiting for call responses");
571 }
572 markClosed(ExceptionUtil.asInterrupt(t));
573 } catch (Throwable t) {
574 if (LOG.isDebugEnabled()) {
575 LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
576 }
577 markClosed(new IOException("Unexpected throwable while waiting call responses", t));
578 }
579
580 close();
581
582 if (LOG.isTraceEnabled()) {
583 LOG.trace(getName() + ": stopped, connections " + connections.size());
584 }
585 }
586
587 private synchronized void disposeSasl() {
588 if (saslRpcClient != null) {
589 try {
590 saslRpcClient.dispose();
591 saslRpcClient = null;
592 } catch (IOException ioe) {
593 LOG.error("Error disposing of SASL client", ioe);
594 }
595 }
596 }
597
598 private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
599 UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
600 UserGroupInformation currentUser =
601 UserGroupInformation.getCurrentUser();
602 UserGroupInformation realUser = currentUser.getRealUser();
603 return authMethod == AuthMethod.KERBEROS &&
604 loginUser != null &&
605
606 loginUser.hasKerberosCredentials() &&
607
608
609 (loginUser.equals(currentUser) || loginUser.equals(realUser));
610 }
611
612 private synchronized boolean setupSaslConnection(final InputStream in2,
613 final OutputStream out2) throws IOException {
614 saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal, fallbackAllowed,
615 conf.get("hbase.rpc.protection",
616 QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
617 return saslRpcClient.saslConnect(in2, out2);
618 }
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638 private synchronized void handleSaslConnectionFailure(
639 final int currRetries,
640 final int maxRetries, final Exception ex, final Random rand,
641 final UserGroupInformation user)
642 throws IOException, InterruptedException{
643 user.doAs(new PrivilegedExceptionAction<Object>() {
644 @Override
645 public Object run() throws IOException, InterruptedException {
646 closeConnection();
647 if (shouldAuthenticateOverKrb()) {
648 if (currRetries < maxRetries) {
649 if (LOG.isDebugEnabled()) {
650 LOG.debug("Exception encountered while connecting to " +
651 "the server : " + ex);
652 }
653
654 if (UserGroupInformation.isLoginKeytabBased()) {
655 UserGroupInformation.getLoginUser().reloginFromKeytab();
656 } else {
657 UserGroupInformation.getLoginUser().reloginFromTicketCache();
658 }
659 disposeSasl();
660
661
662
663
664 Thread.sleep((rand.nextInt(reloginMaxBackoff) + 1));
665 return null;
666 } else {
667 String msg = "Couldn't setup connection for " +
668 UserGroupInformation.getLoginUser().getUserName() +
669 " to " + serverPrincipal;
670 LOG.warn(msg);
671 throw (IOException) new IOException(msg).initCause(ex);
672 }
673 } else {
674 LOG.warn("Exception encountered while connecting to " +
675 "the server : " + ex);
676 }
677 if (ex instanceof RemoteException) {
678 throw (RemoteException)ex;
679 }
680 if (ex instanceof SaslException) {
681 String msg = "SASL authentication failed." +
682 " The most likely cause is missing or invalid credentials." +
683 " Consider 'kinit'.";
684 LOG.fatal(msg, ex);
685 throw new RuntimeException(msg, ex);
686 }
687 throw new IOException(ex);
688 }
689 });
690 }
691
692 protected synchronized void setupIOstreams() throws IOException {
693 if (socket != null) {
694
695 return;
696 }
697
698 if (shouldCloseConnection.get()){
699 throw new ConnectionClosingException("This connection is closing");
700 }
701
702 if (failedServers.isFailedServer(remoteId.getAddress())) {
703 if (LOG.isDebugEnabled()) {
704 LOG.debug("Not trying to connect to " + server +
705 " this server is in the failed servers list");
706 }
707 IOException e = new FailedServerException(
708 "This server is in the failed servers list: " + server);
709 markClosed(e);
710 close();
711 throw e;
712 }
713
714 try {
715 if (LOG.isDebugEnabled()) {
716 LOG.debug("Connecting to " + server);
717 }
718 short numRetries = 0;
719 final short MAX_RETRIES = 5;
720 Random rand = null;
721 while (true) {
722 setupConnection();
723 InputStream inStream = NetUtils.getInputStream(socket);
724
725 OutputStream outStream = NetUtils.getOutputStream(socket, writeTO);
726
727 writeConnectionHeaderPreamble(outStream);
728 if (useSasl) {
729 final InputStream in2 = inStream;
730 final OutputStream out2 = outStream;
731 UserGroupInformation ticket = remoteId.getTicket().getUGI();
732 if (authMethod == AuthMethod.KERBEROS) {
733 if (ticket != null && ticket.getRealUser() != null) {
734 ticket = ticket.getRealUser();
735 }
736 }
737 boolean continueSasl;
738 if (ticket == null) throw new FatalConnectionException("ticket/user is null");
739 try {
740 continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
741 @Override
742 public Boolean run() throws IOException {
743 return setupSaslConnection(in2, out2);
744 }
745 });
746 } catch (Exception ex) {
747 ExceptionUtil.rethrowIfInterrupt(ex);
748 if (rand == null) {
749 rand = new Random();
750 }
751 handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, rand, ticket);
752 continue;
753 }
754 if (continueSasl) {
755
756 inStream = saslRpcClient.getInputStream(inStream);
757 outStream = saslRpcClient.getOutputStream(outStream);
758 } else {
759
760 authMethod = AuthMethod.SIMPLE;
761 useSasl = false;
762 }
763 }
764 this.in = new DataInputStream(new BufferedInputStream(inStream));
765 synchronized (this.outLock) {
766 this.out = new DataOutputStream(new BufferedOutputStream(outStream));
767 }
768
769 writeConnectionHeader();
770
771
772 start();
773 return;
774 }
775 } catch (Throwable t) {
776 IOException e = ExceptionUtil.asInterrupt(t);
777 if (e == null) {
778 failedServers.addToFailedServers(remoteId.address);
779 if (t instanceof LinkageError) {
780
781 e = new DoNotRetryIOException(t);
782 } else if (t instanceof IOException) {
783 e = (IOException) t;
784 } else {
785 e = new IOException("Could not set up IO Streams to " + server, t);
786 }
787 }
788 markClosed(e);
789 close();
790 throw e;
791 }
792 }
793
794
795
796
797 private void writeConnectionHeaderPreamble(OutputStream outStream) throws IOException {
798
799
800
801
802
803 int rpcHeaderLen = HConstants.RPC_HEADER.length;
804 byte [] preamble = new byte [rpcHeaderLen + 2];
805 System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen);
806 preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION;
807 synchronized (this) {
808 preamble[rpcHeaderLen + 1] = authMethod.code;
809 }
810 outStream.write(preamble);
811 outStream.flush();
812 }
813
814
815
816
817 private synchronized void writeConnectionHeader() throws IOException {
818 synchronized (this.outLock) {
819 this.out.writeInt(this.header.getSerializedSize());
820 this.header.writeTo(this.out);
821 this.out.flush();
822 }
823 }
824
825
826 protected synchronized void close() {
827 if (!shouldCloseConnection.get()) {
828 LOG.error(getName() + ": the connection is not in the closed state");
829 return;
830 }
831
832
833
834 synchronized (connections) {
835 connections.removeValue(remoteId, this);
836 }
837
838
839 synchronized(this.outLock) {
840 if (this.out != null) {
841 IOUtils.closeStream(out);
842 this.out = null;
843 }
844 }
845 IOUtils.closeStream(in);
846 this.in = null;
847 if (this.socket != null) {
848 try {
849 this.socket.close();
850 this.socket = null;
851 } catch (IOException e) {
852 LOG.error("Error while closing socket", e);
853 }
854 }
855
856 disposeSasl();
857
858
859 if (LOG.isTraceEnabled()) {
860 LOG.trace(getName() + ": closing ipc connection to " + server);
861 }
862
863 cleanupCalls(true);
864
865 if (LOG.isTraceEnabled()) {
866 LOG.trace(getName() + ": ipc connection to " + server + " closed");
867 }
868 }
869
870 protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
871 TraceScope ts = Trace.continueSpan(span);
872 try {
873 writeRequest(call, priority, span);
874 } finally {
875 ts.close();
876 }
877 }
878
879
880
881
882
883
884
885 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
886 justification="Findbugs is misinterpreting locking missing fact that this.outLock is held")
887 private void writeRequest(Call call, final int priority, Span span) throws IOException {
888 RequestHeader.Builder builder = RequestHeader.newBuilder();
889 builder.setCallId(call.id);
890 if (span != null) {
891 builder.setTraceInfo(
892 RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
893 }
894 builder.setMethodName(call.md.getName());
895 builder.setRequestParam(call.param != null);
896 ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
897 if (cellBlock != null) {
898 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
899 cellBlockBuilder.setLength(cellBlock.limit());
900 builder.setCellBlockMeta(cellBlockBuilder.build());
901 }
902
903 if (priority != 0) builder.setPriority(priority);
904 RequestHeader header = builder.build();
905
906 setupIOstreams();
907
908
909
910
911 checkIsOpen();
912 IOException writeException = null;
913 synchronized (this.outLock) {
914 if (Thread.interrupted()) throw new InterruptedIOException();
915
916 calls.put(call.id, call);
917 checkIsOpen();
918
919 try {
920 call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param,
921 cellBlock));
922 } catch (IOException e) {
923
924
925 shouldCloseConnection.set(true);
926 writeException = e;
927 interrupt();
928 }
929 }
930
931
932 if (writeException != null) {
933 markClosed(writeException);
934 close();
935 }
936
937
938
939 doNotify();
940
941
942 if (writeException != null) throw writeException;
943 }
944
945 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
946 justification="Presume notifyAll is because we are closing/shutting down")
947 private synchronized void doNotify() {
948
949
950 notifyAll();
951 }
952
953
954
955
956 protected void readResponse() {
957 if (shouldCloseConnection.get()) return;
958 Call call = null;
959 boolean expectedCall = false;
960 try {
961
962
963 int totalSize = in.readInt();
964
965
966 ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
967 int id = responseHeader.getCallId();
968 call = calls.remove(id);
969 expectedCall = (call != null && !call.done);
970 if (!expectedCall) {
971
972
973
974
975
976 int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
977 int whatIsLeftToRead = totalSize - readSoFar;
978 IOUtils.skipFully(in, whatIsLeftToRead);
979 if (call != null) {
980 call.callStats.setResponseSizeBytes(totalSize);
981 call.callStats.setCallTimeMs(
982 EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
983 }
984 return;
985 }
986 if (responseHeader.hasException()) {
987 ExceptionResponse exceptionResponse = responseHeader.getException();
988 RemoteException re = createRemoteException(exceptionResponse);
989 call.setException(re);
990 call.callStats.setResponseSizeBytes(totalSize);
991 call.callStats.setCallTimeMs(
992 EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
993 if (isFatalConnectionException(exceptionResponse)) {
994 markClosed(re);
995 }
996 } else {
997 Message value = null;
998 if (call.responseDefaultType != null) {
999 Builder builder = call.responseDefaultType.newBuilderForType();
1000 ProtobufUtil.mergeDelimitedFrom(builder, in);
1001 value = builder.build();
1002 }
1003 CellScanner cellBlockScanner = null;
1004 if (responseHeader.hasCellBlockMeta()) {
1005 int size = responseHeader.getCellBlockMeta().getLength();
1006 byte [] cellBlock = new byte[size];
1007 IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
1008 cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
1009 }
1010 call.setResponse(value, cellBlockScanner);
1011 call.callStats.setResponseSizeBytes(totalSize);
1012 call.callStats.setCallTimeMs(
1013 EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
1014 }
1015 } catch (IOException e) {
1016 if (expectedCall) call.setException(e);
1017 if (e instanceof SocketTimeoutException) {
1018
1019
1020
1021 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
1022 } else {
1023
1024 markClosed(e);
1025 }
1026 } finally {
1027 cleanupCalls(false);
1028 }
1029 }
1030
1031
1032
1033
1034 private boolean isFatalConnectionException(final ExceptionResponse e) {
1035 return e.getExceptionClassName().
1036 equals(FatalConnectionException.class.getName());
1037 }
1038
1039
1040
1041
1042
1043 private RemoteException createRemoteException(final ExceptionResponse e) {
1044 String innerExceptionClassName = e.getExceptionClassName();
1045 boolean doNotRetry = e.getDoNotRetry();
1046 return e.hasHostname()?
1047
1048 new RemoteWithExtrasException(innerExceptionClassName,
1049 e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
1050 new RemoteWithExtrasException(innerExceptionClassName,
1051 e.getStackTrace(), doNotRetry);
1052 }
1053
1054 protected synchronized boolean markClosed(IOException e) {
1055 if (e == null) throw new NullPointerException();
1056
1057 boolean ret = shouldCloseConnection.compareAndSet(false, true);
1058 if (ret) {
1059 if (LOG.isTraceEnabled()) {
1060 LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
1061 }
1062 if (callSender != null) {
1063 callSender.close();
1064 }
1065 notifyAll();
1066 }
1067 return ret;
1068 }
1069
1070
1071
1072
1073
1074
1075 protected synchronized void cleanupCalls(boolean allCalls) {
1076 Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
1077 while (itor.hasNext()) {
1078 Call c = itor.next().getValue();
1079 if (c.done) {
1080
1081 itor.remove();
1082 } else if (allCalls) {
1083 long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
1084 IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
1085 + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
1086 c.setException(ie);
1087 itor.remove();
1088 } else if (c.checkAndSetTimeout()) {
1089 itor.remove();
1090 } else {
1091
1092
1093
1094 break;
1095 }
1096 }
1097 }
1098 }
1099
1100
1101
1102
1103
1104
1105
1106
1107 @VisibleForTesting
1108 RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
1109 this(conf, clusterId, factory, null, null);
1110 }
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120 RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
1121 SocketAddress localAddr, MetricsConnection metrics) {
1122 super(conf, clusterId, localAddr, metrics);
1123
1124 this.socketFactory = factory;
1125 this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
1126 this.failedServers = new FailedServers(conf);
1127 }
1128
1129
1130
1131
1132
1133 @VisibleForTesting
1134 RpcClientImpl(Configuration conf, String clusterId) {
1135 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null);
1136 }
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148 public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr,
1149 MetricsConnection metrics) {
1150 this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);
1151 }
1152
1153
1154
1155 @Override
1156 public void close() {
1157 if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
1158 if (!running.compareAndSet(true, false)) return;
1159
1160 Set<Connection> connsToClose = null;
1161
1162 synchronized (connections) {
1163 for (Connection conn : connections.values()) {
1164 conn.interrupt();
1165 if (conn.callSender != null) {
1166 conn.callSender.interrupt();
1167 }
1168
1169
1170
1171 if (!conn.isAlive()) {
1172 if (connsToClose == null) {
1173 connsToClose = new HashSet<Connection>();
1174 }
1175 connsToClose.add(conn);
1176 }
1177 }
1178 }
1179 if (connsToClose != null) {
1180 for (Connection conn : connsToClose) {
1181 if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
1182 conn.close();
1183 }
1184 }
1185 }
1186
1187 while (!connections.isEmpty()) {
1188 try {
1189 Thread.sleep(10);
1190 } catch (InterruptedException e) {
1191 LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
1192 " connections.");
1193 Thread.currentThread().interrupt();
1194 return;
1195 }
1196 }
1197 }
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211 @Override
1212 protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
1213 Message param, Message returnType, User ticket, InetSocketAddress addr,
1214 MetricsConnection.CallStats callStats)
1215 throws IOException, InterruptedException {
1216 if (pcrc == null) {
1217 pcrc = new PayloadCarryingRpcController();
1218 }
1219 CellScanner cells = pcrc.cellScanner();
1220
1221 final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
1222 pcrc.getCallTimeout(), MetricsConnection.newCallStats());
1223
1224 final Connection connection = getConnection(ticket, call, addr);
1225
1226 final CallFuture cts;
1227 if (connection.callSender != null) {
1228 cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());
1229 pcrc.notifyOnCancel(new RpcCallback<Object>() {
1230 @Override
1231 public void run(Object parameter) {
1232 connection.callSender.remove(cts);
1233 }
1234 });
1235 if (pcrc.isCanceled()) {
1236
1237 call.callComplete();
1238 return new Pair<Message, CellScanner>(call.response, call.cells);
1239 }
1240 } else {
1241 cts = null;
1242 connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
1243 }
1244
1245 while (!call.done) {
1246 if (call.checkAndSetTimeout()) {
1247 if (cts != null) connection.callSender.remove(cts);
1248 break;
1249 }
1250 if (connection.shouldCloseConnection.get()) {
1251 throw new ConnectionClosingException("Call id=" + call.id +
1252 " on server " + addr + " aborted: connection is closing");
1253 }
1254 try {
1255 synchronized (call) {
1256 if (call.done) break;
1257 call.wait(Math.min(call.remainingTime(), 1000) + 1);
1258 }
1259 } catch (InterruptedException e) {
1260 call.setException(new InterruptedIOException());
1261 if (cts != null) connection.callSender.remove(cts);
1262 throw e;
1263 }
1264 }
1265
1266 if (call.error != null) {
1267 if (call.error instanceof RemoteException) {
1268 call.error.fillInStackTrace();
1269 throw call.error;
1270 }
1271
1272 throw wrapException(addr, call.error);
1273 }
1274
1275 return new Pair<Message, CellScanner>(call.response, call.cells);
1276 }
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287 @Override
1288 public void cancelConnections(ServerName sn) {
1289 synchronized (connections) {
1290 for (Connection connection : connections.values()) {
1291 if (connection.isAlive() &&
1292 connection.getRemoteAddress().getPort() == sn.getPort() &&
1293 connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {
1294 LOG.info("The server on " + sn.toString() +
1295 " is dead - stopping the connection " + connection.remoteId);
1296 connection.interrupt();
1297
1298 }
1299 }
1300 }
1301 }
1302
1303
1304
1305
1306
1307 protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)
1308 throws IOException {
1309 if (!running.get()) throw new StoppedRpcClientException();
1310 Connection connection;
1311 ConnectionId remoteId =
1312 new ConnectionId(ticket, call.md.getService().getName(), addr);
1313 synchronized (connections) {
1314 connection = connections.get(remoteId);
1315 if (connection == null) {
1316 connection = createConnection(remoteId, this.codec, this.compressor);
1317 connections.put(remoteId, connection);
1318 }
1319 }
1320
1321 return connection;
1322 }
1323 }