1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23 import static org.mockito.Matchers.anyObject;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.internal.verification.VerificationModeFactory.times;
27
28 import java.io.IOException;
29 import java.net.ConnectException;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.SocketTimeoutException;
33 import java.util.ArrayList;
34 import java.util.List;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellScanner;
41 import org.apache.hadoop.hbase.CellUtil;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.KeyValue;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.client.MetricsConnection;
47 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
48 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
49 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
50 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
51 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
52 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
53 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
54 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
55 import org.apache.hadoop.hbase.security.User;
56 import org.apache.hadoop.hbase.util.Bytes;
57 import org.apache.hadoop.hbase.util.Pair;
58 import org.apache.hadoop.io.compress.GzipCodec;
59 import org.apache.hadoop.util.StringUtils;
60 import org.junit.Assert;
61 import org.junit.Test;
62
63 import com.google.common.collect.ImmutableList;
64 import com.google.common.collect.Lists;
65 import com.google.protobuf.BlockingRpcChannel;
66 import com.google.protobuf.BlockingService;
67 import com.google.protobuf.Descriptors.MethodDescriptor;
68 import com.google.protobuf.Message;
69 import com.google.protobuf.RpcController;
70 import com.google.protobuf.ServiceException;
71
72
73
74
75 public abstract class AbstractTestIPC {
76
77 private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
78
79 private static byte[] CELL_BYTES = Bytes.toBytes("xyz");
80 private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
81 static byte[] BIG_CELL_BYTES = new byte[10 * 1024];
82 static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
83 static final Configuration CONF = HBaseConfiguration.create();
84
85
86
87
88 static final BlockingService SERVICE =
89 TestRpcServiceProtos.TestProtobufRpcProto
90 .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
91
92 @Override
93 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
94 throws ServiceException {
95 return null;
96 }
97
98 @Override
99 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
100 throws ServiceException {
101 return null;
102 }
103
104 @Override
105 public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
106 throws ServiceException {
107 if (controller instanceof PayloadCarryingRpcController) {
108 PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller;
109
110
111
112
113
114 CellScanner cellScanner = pcrc.cellScanner();
115 List<Cell> list = null;
116 if (cellScanner != null) {
117 list = new ArrayList<Cell>();
118 try {
119 while (cellScanner.advance()) {
120 list.add(cellScanner.current());
121 }
122 } catch (IOException e) {
123 throw new ServiceException(e);
124 }
125 }
126 cellScanner = CellUtil.createCellScanner(list);
127 ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner);
128 }
129 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
130 }
131 });
132
133
134
135
136
137 static class TestRpcServer extends RpcServer {
138
139 TestRpcServer() throws IOException {
140 this(new FifoRpcScheduler(CONF, 1));
141 }
142
143 TestRpcServer(RpcScheduler scheduler) throws IOException {
144 super(null, "testRpcServer", Lists
145 .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
146 "localhost", 0), CONF, scheduler);
147 }
148
149 @Override
150 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
151 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
152 throws IOException {
153 return super.call(service, md, param, cellScanner, receiveTime, status);
154 }
155 }
156
157 protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
158
159
160
161
162
163
164 @Test
165 public void testNoCodec() throws InterruptedException, IOException {
166 Configuration conf = HBaseConfiguration.create();
167 AbstractRpcClient client = createRpcClientNoCodec(conf);
168 TestRpcServer rpcServer = new TestRpcServer();
169 try {
170 rpcServer.start();
171 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
172 final String message = "hello";
173 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
174 InetSocketAddress address = rpcServer.getListenerAddress();
175 if (address == null) {
176 throw new IOException("Listener channel is closed");
177 }
178 Pair<Message, CellScanner> r =
179 client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
180 new MetricsConnection.CallStats());
181 assertTrue(r.getSecond() == null);
182
183 assertTrue(r.getFirst().toString().contains(message));
184 } finally {
185 client.close();
186 rpcServer.stop();
187 }
188 }
189
190 protected abstract AbstractRpcClient createRpcClient(Configuration conf);
191
192
193
194
195
196
197
198
199
200
201 @Test
202 public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException,
203 NoSuchMethodException, ServiceException {
204 Configuration conf = new Configuration(HBaseConfiguration.create());
205 conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
206 List<Cell> cells = new ArrayList<Cell>();
207 int count = 3;
208 for (int i = 0; i < count; i++) {
209 cells.add(CELL);
210 }
211 AbstractRpcClient client = createRpcClient(conf);
212 TestRpcServer rpcServer = new TestRpcServer();
213 try {
214 rpcServer.start();
215 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
216 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
217 PayloadCarryingRpcController pcrc =
218 new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
219 InetSocketAddress address = rpcServer.getListenerAddress();
220 if (address == null) {
221 throw new IOException("Listener channel is closed");
222 }
223 Pair<Message, CellScanner> r =
224 client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
225 new MetricsConnection.CallStats());
226 int index = 0;
227 while (r.getSecond().advance()) {
228 assertTrue(CELL.equals(r.getSecond().current()));
229 index++;
230 }
231 assertEquals(count, index);
232 } finally {
233 client.close();
234 rpcServer.stop();
235 }
236 }
237
238 protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
239 throws IOException;
240
241 @Test
242 public void testRTEDuringConnectionSetup() throws Exception {
243 Configuration conf = HBaseConfiguration.create();
244 TestRpcServer rpcServer = new TestRpcServer();
245 AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf);
246 try {
247 rpcServer.start();
248 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
249 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
250 InetSocketAddress address = rpcServer.getListenerAddress();
251 if (address == null) {
252 throw new IOException("Listener channel is closed");
253 }
254 client.call(null, md, param, null, User.getCurrent(), address,
255 new MetricsConnection.CallStats());
256 fail("Expected an exception to have been thrown!");
257 } catch (Exception e) {
258 LOG.info("Caught expected exception: " + e.toString());
259 assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
260 } finally {
261 client.close();
262 rpcServer.stop();
263 }
264 }
265
266
267 @Test
268 public void testRpcScheduler() throws IOException, InterruptedException {
269 RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
270 RpcServer rpcServer = new TestRpcServer(scheduler);
271 verify(scheduler).init((RpcScheduler.Context) anyObject());
272 AbstractRpcClient client = createRpcClient(CONF);
273 try {
274 rpcServer.start();
275 verify(scheduler).start();
276 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
277 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
278 InetSocketAddress address = rpcServer.getListenerAddress();
279 if (address == null) {
280 throw new IOException("Listener channel is closed");
281 }
282 for (int i = 0; i < 10; i++) {
283 client.call(new PayloadCarryingRpcController(
284 CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
285 md.getOutputType().toProto(), User.getCurrent(), address,
286 new MetricsConnection.CallStats());
287 }
288 verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
289 } finally {
290 rpcServer.stop();
291 verify(scheduler).stop();
292 }
293 }
294
295
296
297
298 static class TestRpcServer1 extends RpcServer {
299
300 private static BlockingInterface SERVICE1 =
301 new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
302 @Override
303 public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request)
304 throws ServiceException {
305 return EmptyResponseProto.newBuilder().build();
306 }
307
308 @Override
309 public EchoResponseProto echo(RpcController unused, EchoRequestProto request)
310 throws ServiceException {
311 final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress();
312 final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress();
313 return EchoResponseProto.newBuilder().setMessage(message).build();
314 }
315
316 @Override
317 public EmptyResponseProto error(RpcController unused, EmptyRequestProto request)
318 throws ServiceException {
319 throw new ServiceException("error", new IOException("error"));
320 }
321 };
322
323 TestRpcServer1() throws IOException {
324 this(new FifoRpcScheduler(CONF, 1));
325 }
326
327 TestRpcServer1(RpcScheduler scheduler) throws IOException {
328 super(null, "testRemoteAddressInCallObject", Lists
329 .newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto
330 .newReflectiveBlockingService(SERVICE1), null)),
331 new InetSocketAddress("localhost", 0), CONF, scheduler);
332 }
333 }
334
335
336
337
338
339
340 @Test
341 public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException,
342 ServiceException {
343 final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1);
344 final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler);
345 final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
346 final AbstractRpcClient client =
347 new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, localAddr, null);
348 try {
349 rpcServer.start();
350 final InetSocketAddress isa = rpcServer.getListenerAddress();
351 if (isa == null) {
352 throw new IOException("Listener channel is closed");
353 }
354 final BlockingRpcChannel channel =
355 client.createBlockingRpcChannel(
356 ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()),
357 User.getCurrent(), 0);
358 TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
359 TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel);
360 final EchoRequestProto echoRequest =
361 EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build();
362 final EchoResponseProto echoResponse = stub.echo(null, echoRequest);
363 Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage());
364 } finally {
365 client.close();
366 rpcServer.stop();
367 }
368 }
369
370 @Test
371 public void testWrapException() throws Exception {
372 AbstractRpcClient client =
373 (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC");
374 final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
375 assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException);
376 assertTrue(client.wrapException(address,
377 new SocketTimeoutException()) instanceof SocketTimeoutException);
378 assertTrue(client.wrapException(address, new ConnectionClosingException(
379 "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
380 assertTrue(client
381 .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
382 .getCause() instanceof CallTimeoutException);
383 }
384 }