1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security;
20
21 import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting;
22 import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting;
23 import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotSame;
26 import static org.junit.Assert.assertSame;
27
28 import java.io.File;
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Properties;
34 import java.util.concurrent.ThreadLocalRandom;
35
36 import com.google.protobuf.RpcController;
37 import com.google.protobuf.ServiceException;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.fs.CommonConfigurationKeys;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellScanner;
42 import org.apache.hadoop.hbase.CellUtil;
43 import org.apache.hadoop.hbase.HBaseTestingUtility;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
47 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
48 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
49 import org.apache.hadoop.hbase.ipc.RpcClient;
50 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
51 import org.apache.hadoop.hbase.ipc.RpcClientImpl;
52 import org.apache.hadoop.hbase.ipc.RpcServer;
53 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
54 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
55 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
56 import org.apache.hadoop.hbase.testclassification.SmallTests;
57 import org.apache.hadoop.minikdc.MiniKdc;
58 import org.apache.hadoop.security.UserGroupInformation;
59 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
60 import org.junit.AfterClass;
61 import org.junit.BeforeClass;
62 import org.junit.Test;
63 import org.junit.experimental.categories.Category;
64 import org.mockito.Mockito;
65
66 import com.google.common.collect.Lists;
67 import com.google.protobuf.BlockingRpcChannel;
68 import com.google.protobuf.BlockingService;
69
70 @Category(SmallTests.class)
71 public class TestSecureRPC {
72
73 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
74
75 private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri()
76 .getPath());
77
78 static final BlockingService SERVICE =
79 TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(
80 new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
81
82 @Override
83 public TestProtos.EmptyResponseProto ping(RpcController controller,
84 TestProtos.EmptyRequestProto request)
85 throws ServiceException {
86 return null;
87 }
88
89 @Override
90 public TestProtos.EmptyResponseProto error(RpcController controller,
91 TestProtos.EmptyRequestProto request)
92 throws ServiceException {
93 return null;
94 }
95
96 @Override
97 public TestProtos.EchoResponseProto echo(RpcController controller,
98 TestProtos.EchoRequestProto request)
99 throws ServiceException {
100 if (controller instanceof PayloadCarryingRpcController) {
101 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
102
103
104
105
106
107 CellScanner cellScanner = pcrc.cellScanner();
108 List<Cell> list = null;
109 if (cellScanner != null) {
110 list = new ArrayList<Cell>();
111 try {
112 while (cellScanner.advance()) {
113 list.add(cellScanner.current());
114 }
115 } catch (IOException e) {
116 throw new ServiceException(e);
117 }
118 }
119 cellScanner = CellUtil.createCellScanner(list);
120 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
121 }
122 return TestProtos.EchoResponseProto.newBuilder()
123 .setMessage(request.getMessage()).build();
124 }
125 });
126
127 private static MiniKdc KDC;
128
129 private static String HOST = "localhost";
130
131 private static String PRINCIPAL;
132
133 @BeforeClass
134 public static void setUp() throws Exception {
135 Properties conf = MiniKdc.createConf();
136 conf.put(MiniKdc.DEBUG, true);
137 KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath()));
138 KDC.start();
139 PRINCIPAL = "hbase/" + HOST;
140 KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL);
141 HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
142 HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm());
143 }
144
145 @AfterClass
146 public static void tearDown() throws IOException {
147 if (KDC != null) {
148 KDC.stop();
149 }
150 TEST_UTIL.cleanupTestDir();
151 }
152
153 @Test
154 public void testRpc() throws Exception {
155 testRpcCallWithEnabledKerberosSaslAuth(RpcClientImpl.class);
156 }
157
158 @Test
159 public void testRpcWithInsecureFallback() throws Exception {
160 testRpcFallbackToSimpleAuth(RpcClientImpl.class);
161 }
162
163 @Test
164 public void testAsyncRpc() throws Exception {
165 testRpcCallWithEnabledKerberosSaslAuth(AsyncRpcClient.class);
166 }
167
168 @Test
169 public void testAsyncRpcWithInsecureFallback() throws Exception {
170 testRpcFallbackToSimpleAuth(AsyncRpcClient.class);
171 }
172
173 private void testRpcCallWithEnabledKerberosSaslAuth(Class<? extends RpcClient> rpcImplClass)
174 throws Exception {
175 String krbKeytab = getKeytabFileForTesting();
176 String krbPrincipal = getPrincipalForTesting();
177
178 UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
179 UserGroupInformation ugi2 = UserGroupInformation.getCurrentUser();
180
181
182 assertSame(ugi, ugi2);
183 assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
184 assertEquals(krbPrincipal, ugi.getUserName());
185
186 Configuration clientConf = getSecuredConfiguration();
187 callRpcService(rpcImplClass, User.create(ugi2), clientConf, false);
188 }
189
190 private UserGroupInformation loginKerberosPrincipal(String krbKeytab, String krbPrincipal)
191 throws Exception {
192 Configuration cnf = new Configuration();
193 cnf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
194 UserGroupInformation.setConfiguration(cnf);
195 UserGroupInformation.loginUserFromKeytab(krbPrincipal, krbKeytab);
196 return UserGroupInformation.getLoginUser();
197 }
198
199 private void callRpcService(Class<? extends RpcClient> rpcImplClass, User clientUser,
200 Configuration clientConf, boolean allowInsecureFallback)
201 throws Exception {
202 Configuration clientConfCopy = new Configuration(clientConf);
203 clientConfCopy.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, rpcImplClass.getName());
204
205 Configuration conf = getSecuredConfiguration();
206 conf.setBoolean(RpcServer.FALLBACK_TO_INSECURE_CLIENT_AUTH, allowInsecureFallback);
207
208 SecurityInfo securityInfoMock = Mockito.mock(SecurityInfo.class);
209 Mockito.when(securityInfoMock.getServerPrincipal())
210 .thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
211 SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
212
213 InetSocketAddress isa = new InetSocketAddress(HOST, 0);
214
215 RpcServerInterface rpcServer =
216 new RpcServer(null, "AbstractTestSecureIPC",
217 Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), isa,
218 conf, new FifoRpcScheduler(conf, 1));
219 rpcServer.start();
220 try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf,
221 HConstants.DEFAULT_CLUSTER_ID.toString())) {
222 InetSocketAddress address = rpcServer.getListenerAddress();
223 if (address == null) {
224 throw new IOException("Listener channel is closed");
225 }
226 BlockingRpcChannel channel =
227 rpcClient.createBlockingRpcChannel(
228
229 ServerName.valueOf(address.getHostName(), address.getPort(),
230 System.currentTimeMillis()), clientUser, 5000);
231 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
232 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
233 List<String> results = new ArrayList<String>();
234 TestThread th1 = new TestThread(stub, results);
235 th1.start();
236 th1.join();
237
238 } finally {
239 rpcServer.stop();
240 }
241 }
242
243 public void testRpcFallbackToSimpleAuth(Class<? extends RpcClient> rpcImplClass) throws Exception {
244 String krbKeytab = getKeytabFileForTesting();
245 String krbPrincipal = getPrincipalForTesting();
246
247 UserGroupInformation ugi = loginKerberosPrincipal(krbKeytab, krbPrincipal);
248 assertEquals(AuthenticationMethod.KERBEROS, ugi.getAuthenticationMethod());
249 assertEquals(krbPrincipal, ugi.getUserName());
250
251 String clientUsername = "testuser";
252 UserGroupInformation clientUgi = UserGroupInformation.createUserForTesting(clientUsername,
253 new String[]{clientUsername});
254
255
256 assertNotSame(ugi, clientUgi);
257 assertEquals(AuthenticationMethod.SIMPLE, clientUgi.getAuthenticationMethod());
258 assertEquals(clientUsername, clientUgi.getUserName());
259
260 Configuration clientConf = new Configuration();
261 clientConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
262 callRpcService(rpcImplClass, User.create(clientUgi), clientConf, true);
263 }
264
265 public static class TestThread extends Thread {
266 private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub;
267
268 private final List<String> results;
269
270 public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List<String> results) {
271 this.stub = stub;
272 this.results = results;
273 }
274
275 @Override
276 public void run() {
277 String result;
278 try {
279 result = stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage(String.valueOf(
280 ThreadLocalRandom.current().nextInt())).build()).getMessage();
281 } catch (ServiceException e) {
282 throw new RuntimeException(e);
283 }
284 if (results != null) {
285 synchronized (results) {
286 results.add(result);
287 }
288 }
289 }
290 }
291 }