1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import io.netty.bootstrap.Bootstrap;
21 import io.netty.buffer.ByteBuf;
22 import io.netty.buffer.ByteBufOutputStream;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
27 import io.netty.util.Timeout;
28 import io.netty.util.TimerTask;
29 import io.netty.util.concurrent.GenericFutureListener;
30 import io.netty.util.concurrent.Promise;
31
32 import java.io.IOException;
33 import java.net.ConnectException;
34 import java.net.InetSocketAddress;
35 import java.net.SocketException;
36 import java.nio.ByteBuffer;
37 import java.security.PrivilegedExceptionAction;
38 import java.util.ArrayList;
39 import java.util.HashMap;
40 import java.util.Iterator;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Random;
44 import java.util.concurrent.TimeUnit;
45
46 import javax.security.sasl.SaslException;
47
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.classification.InterfaceAudience;
52 import org.apache.hadoop.hbase.client.MetricsConnection;
53 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
54 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
56 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
57 import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
58 import org.apache.hadoop.hbase.security.AuthMethod;
59 import org.apache.hadoop.hbase.security.SaslClientHandler;
60 import org.apache.hadoop.hbase.security.SaslUtil;
61 import org.apache.hadoop.hbase.security.SecurityInfo;
62 import org.apache.hadoop.hbase.security.User;
63 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
64 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65 import org.apache.hadoop.io.Text;
66 import org.apache.hadoop.ipc.RemoteException;
67 import org.apache.hadoop.security.SecurityUtil;
68 import org.apache.hadoop.security.UserGroupInformation;
69 import org.apache.hadoop.security.token.Token;
70 import org.apache.hadoop.security.token.TokenIdentifier;
71 import org.apache.hadoop.security.token.TokenSelector;
72 import org.apache.htrace.Span;
73 import org.apache.htrace.Trace;
74
75 import com.google.protobuf.Descriptors;
76 import com.google.protobuf.Message;
77 import com.google.protobuf.RpcCallback;
78
79
80
81
82 @InterfaceAudience.Private
83 public class AsyncRpcChannel {
84 private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
85
86 private static final int MAX_SASL_RETRIES = 5;
87
88 protected final static Map<AuthenticationProtos.TokenIdentifier.Kind, TokenSelector<? extends
89 TokenIdentifier>> tokenHandlers = new HashMap<>();
90
91 static {
92 tokenHandlers.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
93 new AuthenticationTokenSelector());
94 }
95
96 final AsyncRpcClient client;
97
98
99
100 private Channel channel;
101
102 String name;
103 final User ticket;
104 final String serviceName;
105 final InetSocketAddress address;
106
107 private int ioFailureCounter = 0;
108 private int connectFailureCounter = 0;
109
110 boolean useSasl;
111 AuthMethod authMethod;
112 private int reloginMaxBackoff;
113 private Token<? extends TokenIdentifier> token;
114 private String serverPrincipal;
115
116
117
118 private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
119 private boolean connected = false;
120 private boolean closed = false;
121
122 private Timeout cleanupTimer;
123
124 private final TimerTask timeoutTask = new TimerTask() {
125 @Override
126 public void run(Timeout timeout) throws Exception {
127 cleanupCalls();
128 }
129 };
130
131
132
133
134
135
136
137
138
139
140 public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket, String
141 serviceName, InetSocketAddress address) {
142 this.client = client;
143
144 this.ticket = ticket;
145 this.serviceName = serviceName;
146 this.address = address;
147
148 this.channel = connect(bootstrap).channel();
149
150 name = ("IPC Client (" + channel.hashCode() + ") to " +
151 address.toString() +
152 ((ticket == null) ?
153 " from unknown user" :
154 (" from " + ticket.getName())));
155 }
156
157
158
159
160
161
162
163 private ChannelFuture connect(final Bootstrap bootstrap) {
164 return bootstrap.remoteAddress(address).connect()
165 .addListener(new GenericFutureListener<ChannelFuture>() {
166 @Override
167 public void operationComplete(final ChannelFuture f) throws Exception {
168 if (!f.isSuccess()) {
169 if (f.cause() instanceof SocketException) {
170 retryOrClose(bootstrap, connectFailureCounter++, f.cause());
171 } else {
172 retryOrClose(bootstrap, ioFailureCounter++, f.cause());
173 }
174 return;
175 }
176 channel = f.channel();
177
178 setupAuthorization();
179
180 ByteBuf b = channel.alloc().directBuffer(6);
181 createPreamble(b, authMethod);
182 channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
183 if (useSasl) {
184 UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
185 if (authMethod == AuthMethod.KERBEROS) {
186 if (ticket != null && ticket.getRealUser() != null) {
187 ticket = ticket.getRealUser();
188 }
189 }
190 SaslClientHandler saslHandler;
191 if (ticket == null) {
192 throw new FatalConnectionException("ticket/user is null");
193 }
194 final UserGroupInformation realTicket = ticket;
195 saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
196 @Override
197 public SaslClientHandler run() throws IOException {
198 return getSaslHandler(realTicket, bootstrap);
199 }
200 });
201 if (saslHandler != null) {
202
203 channel.pipeline().addFirst(saslHandler);
204 } else {
205
206 authMethod = AuthMethod.SIMPLE;
207 useSasl = false;
208 }
209 } else {
210 startHBaseConnection(f.channel());
211 }
212 }
213 });
214 }
215
216
217
218
219
220
221 private void startHBaseConnection(Channel ch) {
222 ch.pipeline()
223 .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
224 ch.pipeline().addLast(new AsyncServerResponseHandler(this));
225 try {
226 writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
227 @Override
228 public void operationComplete(ChannelFuture future) throws Exception {
229 if (!future.isSuccess()) {
230 close(future.cause());
231 return;
232 }
233 List<AsyncCall> callsToWrite;
234 synchronized (pendingCalls) {
235 connected = true;
236 callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
237 }
238 for (AsyncCall call : callsToWrite) {
239 writeRequest(call);
240 }
241 }
242 });
243 } catch (IOException e) {
244 close(e);
245 }
246 }
247
248
249
250
251
252
253
254 private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
255 final Bootstrap bootstrap) throws IOException {
256 return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
257 client.fallbackAllowed, client.conf.get("hbase.rpc.protection",
258 SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
259 new SaslClientHandler.SaslExceptionHandler() {
260 @Override
261 public void handle(int retryCount, Random random, Throwable cause) {
262 try {
263
264 handleSaslConnectionFailure(retryCount, cause, realTicket);
265
266
267 client.newTimeout(new TimerTask() {
268 @Override
269 public void run(Timeout timeout) throws Exception {
270 connect(bootstrap);
271 }
272 }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
273 } catch (IOException | InterruptedException e) {
274 close(e);
275 }
276 }
277 }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
278 @Override
279 public void onSuccess(Channel channel) {
280 startHBaseConnection(channel);
281 }
282 });
283 }
284
285
286
287
288
289
290
291
292 private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) {
293 if (connectCounter < client.maxRetries) {
294 client.newTimeout(new TimerTask() {
295 @Override public void run(Timeout timeout) throws Exception {
296 connect(bootstrap);
297 }
298 }, client.failureSleep, TimeUnit.MILLISECONDS);
299 } else {
300 client.failedServers.addToFailedServers(address);
301 close(e);
302 }
303 }
304
305
306
307
308
309
310
311
312 public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
313 final PayloadCarryingRpcController controller, final Message request,
314 final Message responsePrototype, MetricsConnection.CallStats callStats) {
315 final AsyncCall call =
316 new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
317 controller, responsePrototype, callStats);
318 controller.notifyOnCancel(new RpcCallback<Object>() {
319 @Override
320 public void run(Object parameter) {
321
322 synchronized (pendingCalls) {
323 pendingCalls.remove(call.id);
324 }
325 }
326 });
327
328 if (controller.isCanceled()) {
329
330 call.cancel(true);
331 return call;
332 }
333
334 synchronized (pendingCalls) {
335 if (closed) {
336 Promise<Message> promise = channel.eventLoop().newPromise();
337 promise.setFailure(new ConnectException());
338 return promise;
339 }
340 pendingCalls.put(call.id, call);
341
342 if (cleanupTimer == null && call.getRpcTimeout() > 0) {
343 cleanupTimer =
344 client.newTimeout(timeoutTask, call.getRpcTimeout(),
345 TimeUnit.MILLISECONDS);
346 }
347 if (!connected) {
348 return call;
349 }
350 }
351 writeRequest(call);
352 return call;
353 }
354
355 AsyncCall removePendingCall(int id) {
356 synchronized (pendingCalls) {
357 return pendingCalls.remove(id);
358 }
359 }
360
361
362
363
364
365
366
367
368 private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
369 RPCProtos.ConnectionHeader.Builder headerBuilder =
370 RPCProtos.ConnectionHeader.newBuilder().setServiceName(serviceName);
371
372 RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
373 if (userInfoPB != null) {
374 headerBuilder.setUserInfo(userInfoPB);
375 }
376
377 if (client.codec != null) {
378 headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
379 }
380 if (client.compressor != null) {
381 headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
382 }
383
384 headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
385 RPCProtos.ConnectionHeader header = headerBuilder.build();
386
387
388 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
389
390 ByteBuf b = channel.alloc().directBuffer(totalSize);
391
392 b.writeInt(header.getSerializedSize());
393 b.writeBytes(header.toByteArray());
394
395 return channel.writeAndFlush(b);
396 }
397
398
399
400
401
402
403 private void writeRequest(final AsyncCall call) {
404 try {
405 final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
406 .newBuilder();
407 requestHeaderBuilder.setCallId(call.id)
408 .setMethodName(call.method.getName()).setRequestParam(call.param != null);
409
410 if (Trace.isTracing()) {
411 Span s = Trace.currentSpan();
412 requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder().
413 setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
414 }
415
416 ByteBuffer cellBlock = client.buildCellBlock(call.controller.cellScanner());
417 if (cellBlock != null) {
418 final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
419 .newBuilder();
420 cellBlockBuilder.setLength(cellBlock.limit());
421 requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
422 }
423
424 if (call.controller.getPriority() != 0) {
425 requestHeaderBuilder.setPriority(call.controller.getPriority());
426 }
427
428 RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
429
430 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
431 if (cellBlock != null) {
432 totalSize += cellBlock.remaining();
433 }
434
435 ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
436 try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
437 call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
438 }
439
440 channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
441 } catch (IOException e) {
442 close(e);
443 }
444 }
445
446
447
448
449
450
451 private void setupAuthorization() throws IOException {
452 SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
453 this.useSasl = client.userProvider.isHBaseSecurityEnabled();
454
455 this.token = null;
456 if (useSasl && securityInfo != null) {
457 AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
458 if (tokenKind != null) {
459 TokenSelector<? extends TokenIdentifier> tokenSelector = tokenHandlers.get(tokenKind);
460 if (tokenSelector != null) {
461 token = tokenSelector
462 .selectToken(new Text(client.clusterId), ticket.getUGI().getTokens());
463 } else if (LOG.isDebugEnabled()) {
464 LOG.debug("No token selector found for type " + tokenKind);
465 }
466 }
467 String serverKey = securityInfo.getServerPrincipal();
468 if (serverKey == null) {
469 throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
470 }
471 this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
472 address.getAddress().getCanonicalHostName().toLowerCase());
473 if (LOG.isDebugEnabled()) {
474 LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
475 + serverPrincipal);
476 }
477 }
478
479 if (!useSasl) {
480 authMethod = AuthMethod.SIMPLE;
481 } else if (token != null) {
482 authMethod = AuthMethod.DIGEST;
483 } else {
484 authMethod = AuthMethod.KERBEROS;
485 }
486
487 if (LOG.isDebugEnabled()) {
488 LOG.debug("Use " + authMethod + " authentication for service " + serviceName +
489 ", sasl=" + useSasl);
490 }
491 reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
492 }
493
494
495
496
497
498
499
500
501 private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
502 if (ugi == null || authMethod == AuthMethod.DIGEST) {
503
504 return null;
505 }
506 RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
507 if (authMethod == AuthMethod.KERBEROS) {
508
509 userInfoPB.setEffectiveUser(ugi.getUserName());
510 } else if (authMethod == AuthMethod.SIMPLE) {
511
512 userInfoPB.setEffectiveUser(ugi.getUserName());
513 if (ugi.getRealUser() != null) {
514 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
515 }
516 }
517 return userInfoPB.build();
518 }
519
520
521
522
523
524
525
526 private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
527 byteBuf.writeBytes(HConstants.RPC_HEADER);
528 byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
529 byteBuf.writeByte(authMethod.code);
530 }
531
532
533
534
535
536
537 public void close(final Throwable e) {
538 client.removeConnection(this);
539
540
541 channel.eventLoop().execute(new Runnable() {
542 @Override
543 public void run() {
544 List<AsyncCall> toCleanup;
545 synchronized (pendingCalls) {
546 if (closed) {
547 return;
548 }
549 closed = true;
550 toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
551 pendingCalls.clear();
552 }
553 IOException closeException = null;
554 if (e != null) {
555 if (e instanceof IOException) {
556 closeException = (IOException) e;
557 } else {
558 closeException = new IOException(e);
559 }
560 }
561
562 if (LOG.isDebugEnabled() && closeException != null) {
563 LOG.debug(name + ": closing ipc connection to " + address, closeException);
564 }
565 if (cleanupTimer != null) {
566 cleanupTimer.cancel();
567 cleanupTimer = null;
568 }
569 for (AsyncCall call : toCleanup) {
570 call.setFailed(closeException != null ? closeException : new ConnectionClosingException(
571 "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
572 }
573 channel.disconnect().addListener(ChannelFutureListener.CLOSE);
574 if (LOG.isDebugEnabled()) {
575 LOG.debug(name + ": closed");
576 }
577 }
578 });
579 }
580
581
582
583
584 private void cleanupCalls() {
585 List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
586 long currentTime = EnvironmentEdgeManager.currentTime();
587 long nextCleanupTaskDelay = -1L;
588 synchronized (pendingCalls) {
589 for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
590 AsyncCall call = iter.next();
591 long timeout = call.getRpcTimeout();
592 if (timeout > 0) {
593 if (currentTime - call.getStartTime() >= timeout) {
594 iter.remove();
595 toCleanup.add(call);
596 } else {
597 if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
598 nextCleanupTaskDelay = timeout;
599 }
600 }
601 }
602 }
603 if (nextCleanupTaskDelay > 0) {
604 cleanupTimer =
605 client.newTimeout(timeoutTask, nextCleanupTaskDelay,
606 TimeUnit.MILLISECONDS);
607 } else {
608 cleanupTimer = null;
609 }
610 }
611 for (AsyncCall call : toCleanup) {
612 call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
613 + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
614 }
615 }
616
617
618
619
620
621
622 public boolean isAlive() {
623 return channel.isOpen();
624 }
625
626
627
628
629
630
631
632 private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
633 UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
634 UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
635 UserGroupInformation realUser = currentUser.getRealUser();
636 return authMethod == AuthMethod.KERBEROS &&
637 loginUser != null &&
638
639 loginUser.hasKerberosCredentials() &&
640
641
642 (loginUser.equals(currentUser) || loginUser.equals(realUser));
643 }
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669 private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
670 final UserGroupInformation user) throws IOException, InterruptedException {
671 user.doAs(new PrivilegedExceptionAction<Void>() {
672 public Void run() throws IOException, InterruptedException {
673 if (shouldAuthenticateOverKrb()) {
674 if (currRetries < MAX_SASL_RETRIES) {
675 LOG.debug("Exception encountered while connecting to the server : " + ex);
676
677 if (UserGroupInformation.isLoginKeytabBased()) {
678 UserGroupInformation.getLoginUser().reloginFromKeytab();
679 } else {
680 UserGroupInformation.getLoginUser().reloginFromTicketCache();
681 }
682
683
684 return null;
685 } else {
686 String msg = "Couldn't setup connection for " +
687 UserGroupInformation.getLoginUser().getUserName() +
688 " to " + serverPrincipal;
689 LOG.warn(msg);
690 throw (IOException) new IOException(msg).initCause(ex);
691 }
692 } else {
693 LOG.warn("Exception encountered while connecting to " +
694 "the server : " + ex);
695 }
696 if (ex instanceof RemoteException) {
697 throw (RemoteException) ex;
698 }
699 if (ex instanceof SaslException) {
700 String msg = "SASL authentication failed." +
701 " The most likely cause is missing or invalid credentials." +
702 " Consider 'kinit'.";
703 LOG.fatal(msg, ex);
704 throw new RuntimeException(msg, ex);
705 }
706 throw new IOException(ex);
707 }
708 });
709 }
710
711 public int getConnectionHashCode() {
712 return ConnectionId.hashCode(ticket, serviceName, address);
713 }
714
715 @Override
716 public int hashCode() {
717 return getConnectionHashCode();
718 }
719
720 @Override
721 public boolean equals(Object obj) {
722 if (obj instanceof AsyncRpcChannel) {
723 AsyncRpcChannel channel = (AsyncRpcChannel) obj;
724 return channel.hashCode() == obj.hashCode();
725 }
726 return false;
727 }
728
729
730 @Override
731 public String toString() {
732 return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
733 }
734
735
736
737
738 private static final class CallWriteListener implements ChannelFutureListener {
739 private final AsyncRpcChannel rpcChannel;
740 private final int id;
741
742 public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
743 this.rpcChannel = asyncRpcChannel;
744 this.id = id;
745 }
746
747 @Override
748 public void operationComplete(ChannelFuture future) throws Exception {
749 if (!future.isSuccess()) {
750 AsyncCall call = rpcChannel.removePendingCall(id);
751 if (call != null) {
752 if (future.cause() instanceof IOException) {
753 call.setFailed((IOException) future.cause());
754 } else {
755 call.setFailed(new IOException(future.cause()));
756 }
757 }
758 }
759 }
760 }
761 }