1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift2;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.net.InetSocketAddress;
24 import java.net.UnknownHostException;
25 import java.security.PrivilegedAction;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34 import javax.security.auth.callback.Callback;
35 import javax.security.auth.callback.UnsupportedCallbackException;
36 import javax.security.sasl.AuthorizeCallback;
37 import javax.security.sasl.Sasl;
38 import javax.security.sasl.SaslServer;
39
40 import org.apache.commons.cli.CommandLine;
41 import org.apache.commons.cli.CommandLineParser;
42 import org.apache.commons.cli.HelpFormatter;
43 import org.apache.commons.cli.Option;
44 import org.apache.commons.cli.OptionGroup;
45 import org.apache.commons.cli.Options;
46 import org.apache.commons.cli.ParseException;
47 import org.apache.commons.cli.PosixParser;
48 import org.apache.commons.logging.Log;
49 import org.apache.commons.logging.LogFactory;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
54 import org.apache.hadoop.hbase.filter.ParseFilter;
55 import org.apache.hadoop.hbase.http.InfoServer;
56 import org.apache.hadoop.hbase.security.SaslUtil;
57 import org.apache.hadoop.hbase.security.SecurityUtil;
58 import org.apache.hadoop.hbase.security.UserProvider;
59 import org.apache.hadoop.hbase.thrift.CallQueue;
60 import org.apache.hadoop.hbase.thrift.CallQueue.Call;
61 import org.apache.hadoop.hbase.thrift.ThriftMetrics;
62 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
63 import org.apache.hadoop.hbase.util.DNS;
64 import org.apache.hadoop.hbase.util.Strings;
65 import org.apache.hadoop.security.UserGroupInformation;
66 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
67 import org.apache.hadoop.util.GenericOptionsParser;
68 import org.apache.thrift.TException;
69 import org.apache.thrift.TProcessor;
70 import org.apache.thrift.protocol.TBinaryProtocol;
71 import org.apache.thrift.protocol.TCompactProtocol;
72 import org.apache.thrift.protocol.TProtocol;
73 import org.apache.thrift.protocol.TProtocolFactory;
74 import org.apache.thrift.server.THsHaServer;
75 import org.apache.thrift.server.TNonblockingServer;
76 import org.apache.thrift.server.TServer;
77 import org.apache.thrift.server.TThreadPoolServer;
78 import org.apache.thrift.transport.TFramedTransport;
79 import org.apache.thrift.transport.TNonblockingServerSocket;
80 import org.apache.thrift.transport.TNonblockingServerTransport;
81 import org.apache.thrift.transport.TSaslServerTransport;
82 import org.apache.thrift.transport.TServerSocket;
83 import org.apache.thrift.transport.TServerTransport;
84 import org.apache.thrift.transport.TTransportException;
85 import org.apache.thrift.transport.TTransportFactory;
86
87 import com.google.common.util.concurrent.ThreadFactoryBuilder;
88
89
90
91
92
93 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
94 @SuppressWarnings({ "rawtypes", "unchecked" })
95 public class ThriftServer {
96 private static final Log log = LogFactory.getLog(ThriftServer.class);
97
98
99
100
101
102
103
104
105
106
107 static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
108
109 public static final int DEFAULT_LISTEN_PORT = 9090;
110
111 private static final String READ_TIMEOUT_OPTION = "readTimeout";
112
113 static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
114
115
116
117
118
119
120 public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
121 "hbase.thrift.server.socket.read.timeout";
122 public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
123
124 public ThriftServer() {
125 }
126
127 private static void printUsage() {
128 HelpFormatter formatter = new HelpFormatter();
129 formatter.printHelp("Thrift", null, getOptions(),
130 "To start the Thrift server run 'bin/hbase-daemon.sh start thrift2'\n" +
131 "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift2' or" +
132 " send a kill signal to the thrift server pid",
133 true);
134 }
135
136 private static Options getOptions() {
137 Options options = new Options();
138 options.addOption("b", "bind", true,
139 "Address to bind the Thrift server to. [default: 0.0.0.0]");
140 options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
141 options.addOption("f", "framed", false, "Use framed transport");
142 options.addOption("c", "compact", false, "Use the compact protocol");
143 options.addOption("w", "workers", true, "How many worker threads to use.");
144 options.addOption("h", "help", false, "Print help information");
145 options.addOption(null, "infoport", true, "Port for web UI");
146 options.addOption("t", READ_TIMEOUT_OPTION, true,
147 "Amount of time in milliseconds before a server thread will timeout " +
148 "waiting for client to send data on a connected socket. Currently, " +
149 "only applies to TBoundedThreadPoolServer");
150 OptionGroup servers = new OptionGroup();
151 servers.addOption(
152 new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
153 servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
154 servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
155 options.addOptionGroup(servers);
156 return options;
157 }
158
159 private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
160 throws ParseException, IOException {
161 GenericOptionsParser genParser = new GenericOptionsParser(conf, args);
162 String[] remainingArgs = genParser.getRemainingArgs();
163 CommandLineParser parser = new PosixParser();
164 return parser.parse(options, remainingArgs);
165 }
166
167 private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
168 if (isCompact) {
169 log.debug("Using compact protocol");
170 return new TCompactProtocol.Factory();
171 } else {
172 log.debug("Using binary protocol");
173 return new TBinaryProtocol.Factory();
174 }
175 }
176
177 private static TTransportFactory getTTransportFactory(
178 SaslUtil.QualityOfProtection qop, String name, String host,
179 boolean framed, int frameSize) {
180 if (framed) {
181 if (qop != null) {
182 throw new RuntimeException("Thrift server authentication"
183 + " doesn't work with framed transport yet");
184 }
185 log.debug("Using framed transport");
186 return new TFramedTransport.Factory(frameSize);
187 } else if (qop == null) {
188 return new TTransportFactory();
189 } else {
190 Map<String, String> saslProperties = new HashMap<String, String>();
191 saslProperties.put(Sasl.QOP, qop.getSaslQop());
192 TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
193 saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
194 new SaslGssCallbackHandler() {
195 @Override
196 public void handle(Callback[] callbacks)
197 throws UnsupportedCallbackException {
198 AuthorizeCallback ac = null;
199 for (Callback callback : callbacks) {
200 if (callback instanceof AuthorizeCallback) {
201 ac = (AuthorizeCallback) callback;
202 } else {
203 throw new UnsupportedCallbackException(callback,
204 "Unrecognized SASL GSSAPI Callback");
205 }
206 }
207 if (ac != null) {
208 String authid = ac.getAuthenticationID();
209 String authzid = ac.getAuthorizationID();
210 if (!authid.equals(authzid)) {
211 ac.setAuthorized(false);
212 } else {
213 ac.setAuthorized(true);
214 String userName = SecurityUtil.getUserFromPrincipal(authzid);
215 log.info("Effective user: " + userName);
216 ac.setAuthorizedID(userName);
217 }
218 }
219 }
220 });
221 return saslFactory;
222 }
223 }
224
225
226
227
228 private static InetSocketAddress bindToPort(String bindValue, int listenPort)
229 throws UnknownHostException {
230 try {
231 if (bindValue == null) {
232 return new InetSocketAddress(listenPort);
233 } else {
234 return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
235 }
236 } catch (UnknownHostException e) {
237 throw new RuntimeException("Could not bind to provided ip address", e);
238 }
239 }
240
241 private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
242 TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
243 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
244 log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
245 TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
246 serverArgs.processor(processor);
247 serverArgs.transportFactory(transportFactory);
248 serverArgs.protocolFactory(protocolFactory);
249 return new TNonblockingServer(serverArgs);
250 }
251
252 private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
253 TProcessor processor, TTransportFactory transportFactory,
254 int workerThreads,
255 InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
256 throws TTransportException {
257 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
258 log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
259 THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
260 if (workerThreads > 0) {
261
262 serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads);
263 }
264 ExecutorService executorService = createExecutor(
265 workerThreads, metrics);
266 serverArgs.executorService(executorService);
267 serverArgs.processor(processor);
268 serverArgs.transportFactory(transportFactory);
269 serverArgs.protocolFactory(protocolFactory);
270 return new THsHaServer(serverArgs);
271 }
272
273 private static ExecutorService createExecutor(
274 int workerThreads, ThriftMetrics metrics) {
275 CallQueue callQueue = new CallQueue(
276 new LinkedBlockingQueue<Call>(), metrics);
277 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
278 tfb.setDaemon(true);
279 tfb.setNameFormat("thrift2-worker-%d");
280 ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads,
281 Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
282 pool.prestartAllCoreThreads();
283 return pool;
284 }
285
286 private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
287 TProcessor processor,
288 TTransportFactory transportFactory,
289 int workerThreads,
290 InetSocketAddress inetSocketAddress,
291 int backlog,
292 int clientTimeout)
293 throws TTransportException {
294 TServerTransport serverTransport = new TServerSocket(
295 new TServerSocket.ServerSocketTransportArgs().
296 bindAddr(inetSocketAddress).backlog(backlog).
297 clientTimeout(clientTimeout));
298 log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
299 TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
300 serverArgs.processor(processor);
301 serverArgs.transportFactory(transportFactory);
302 serverArgs.protocolFactory(protocolFactory);
303 if (workerThreads > 0) {
304 serverArgs.maxWorkerThreads(workerThreads);
305 }
306 return new TThreadPoolServer(serverArgs);
307 }
308
309
310
311
312
313
314 protected static void registerFilters(Configuration conf) {
315 String[] filters = conf.getStrings("hbase.thrift.filters");
316 if(filters != null) {
317 for(String filterClass: filters) {
318 String[] filterPart = filterClass.split(":");
319 if(filterPart.length != 2) {
320 log.warn("Invalid filter specification " + filterClass + " - skipping");
321 } else {
322 ParseFilter.registerFilter(filterPart[0], filterPart[1]);
323 }
324 }
325 }
326 }
327
328
329
330
331
332
333 public static void main(String[] args) throws Exception {
334 TServer server = null;
335 Options options = getOptions();
336 Configuration conf = HBaseConfiguration.create();
337 CommandLine cmd = parseArguments(conf, options, args);
338 int workerThreads = 0;
339
340
341
342
343
344 List<?> argList = cmd.getArgList();
345 if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
346 printUsage();
347 System.exit(1);
348 }
349
350
351 String bindAddress;
352 if (cmd.hasOption("bind")) {
353 bindAddress = cmd.getOptionValue("bind");
354 conf.set("hbase.thrift.info.bindAddress", bindAddress);
355 } else {
356 bindAddress = conf.get("hbase.thrift.info.bindAddress");
357 }
358
359
360 int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
361 if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
362 try {
363 readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
364 } catch (NumberFormatException e) {
365 throw new RuntimeException("Could not parse the value provided for the timeout option", e);
366 }
367 } else {
368 readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
369 THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
370 }
371
372
373 int listenPort = 0;
374 try {
375 if (cmd.hasOption("port")) {
376 listenPort = Integer.parseInt(cmd.getOptionValue("port"));
377 } else {
378 listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
379 }
380 } catch (NumberFormatException e) {
381 throw new RuntimeException("Could not parse the value provided for the port option", e);
382 }
383
384
385 int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
386
387
388
389 String host = null;
390 String name = null;
391
392 UserProvider userProvider = UserProvider.instantiate(conf);
393
394 boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
395 && userProvider.isHBaseSecurityEnabled();
396 if (securityEnabled) {
397 host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
398 conf.get("hbase.thrift.dns.interface", "default"),
399 conf.get("hbase.thrift.dns.nameserver", "default")));
400 userProvider.login("hbase.thrift.keytab.file",
401 "hbase.thrift.kerberos.principal", host);
402 }
403
404 UserGroupInformation realUser = userProvider.getCurrent().getUGI();
405 String stringQop = conf.get(THRIFT_QOP_KEY);
406 SaslUtil.QualityOfProtection qop = null;
407 if (stringQop != null) {
408 qop = SaslUtil.getQop(stringQop);
409 if (!securityEnabled) {
410 throw new IOException("Thrift server must"
411 + " run in secure mode to support authentication");
412 }
413
414 name = SecurityUtil.getUserFromPrincipal(
415 conf.get("hbase.thrift.kerberos.principal"));
416 }
417
418 boolean nonblocking = cmd.hasOption("nonblocking");
419 boolean hsha = cmd.hasOption("hsha");
420
421 ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
422
423 String implType = "threadpool";
424 if (nonblocking) {
425 implType = "nonblocking";
426 } else if (hsha) {
427 implType = "hsha";
428 }
429
430 conf.set("hbase.regionserver.thrift.server.type", implType);
431 conf.setInt("hbase.regionserver.thrift.port", listenPort);
432 registerFilters(conf);
433
434
435 boolean compact = cmd.hasOption("compact") ||
436 conf.getBoolean("hbase.regionserver.thrift.compact", false);
437 TProtocolFactory protocolFactory = getTProtocolFactory(compact);
438 final ThriftHBaseServiceHandler hbaseHandler =
439 new ThriftHBaseServiceHandler(conf, userProvider);
440 THBaseService.Iface handler =
441 ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
442 final THBaseService.Processor p = new THBaseService.Processor(handler);
443 conf.setBoolean("hbase.regionserver.thrift.compact", compact);
444 TProcessor processor = p;
445
446 boolean framed = cmd.hasOption("framed") ||
447 conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
448 TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
449 conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
450 InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
451 conf.setBoolean("hbase.regionserver.thrift.framed", framed);
452 if (qop != null) {
453
454 processor = new TProcessor() {
455 @Override
456 public boolean process(TProtocol inProt,
457 TProtocol outProt) throws TException {
458 TSaslServerTransport saslServerTransport =
459 (TSaslServerTransport)inProt.getTransport();
460 SaslServer saslServer = saslServerTransport.getSaslServer();
461 String principal = saslServer.getAuthorizationID();
462 hbaseHandler.setEffectiveUser(principal);
463 return p.process(inProt, outProt);
464 }
465 };
466 }
467
468 if (cmd.hasOption("w")) {
469 workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
470 }
471
472
473 try {
474 if (cmd.hasOption("infoport")) {
475 String val = cmd.getOptionValue("infoport");
476 conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
477 log.debug("Web UI port set to " + val);
478 }
479 } catch (NumberFormatException e) {
480 log.error("Could not parse the value provided for the infoport option", e);
481 printUsage();
482 System.exit(1);
483 }
484
485
486 int port = conf.getInt("hbase.thrift.info.port", 9095);
487 if (port >= 0) {
488 conf.setLong("startcode", System.currentTimeMillis());
489 String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
490 InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
491 infoServer.setAttribute("hbase.conf", conf);
492 infoServer.start();
493 }
494
495 if (nonblocking) {
496 server = getTNonBlockingServer(protocolFactory,
497 processor,
498 transportFactory,
499 inetSocketAddress);
500 } else if (hsha) {
501 server = getTHsHaServer(protocolFactory,
502 processor,
503 transportFactory,
504 workerThreads,
505 inetSocketAddress,
506 metrics);
507 } else {
508 server = getTThreadPoolServer(protocolFactory,
509 processor,
510 transportFactory,
511 workerThreads,
512 inetSocketAddress,
513 backlog,
514 readTimeout);
515 }
516
517 final TServer tserver = server;
518 realUser.doAs(
519 new PrivilegedAction<Object>() {
520 @Override
521 public Object run() {
522 tserver.serve();
523 return null;
524 }
525 });
526 }
527 }