1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.net.InetSocketAddress;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Random;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicReference;
34 import java.util.concurrent.locks.ReadWriteLock;
35 import java.util.concurrent.locks.ReentrantReadWriteLock;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.CellScanner;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.codec.Codec;
44 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
45 import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
46 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
47 import org.apache.hadoop.hbase.ipc.RpcClientImpl;
48 import org.apache.hadoop.hbase.ipc.RpcScheduler;
49 import org.apache.hadoop.hbase.ipc.RpcServer;
50 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
51 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
52 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
53 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
54 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
55 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
56 import org.apache.hadoop.hbase.security.User;
57 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
58 import org.apache.hadoop.hbase.util.Pair;
59 import org.apache.hadoop.hbase.util.Threads;
60 import org.junit.Ignore;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63 import com.google.common.collect.Lists;
64 import com.google.protobuf.BlockingService;
65 import com.google.protobuf.Message;
66 import com.google.protobuf.RpcController;
67 import com.google.protobuf.ServiceException;
68 import com.google.protobuf.Descriptors.MethodDescriptor;
69
70 @Category(IntegrationTests.class)
71 public class IntegrationTestRpcClient {
72
73 private static final Log LOG = LogFactory.getLog(IntegrationTestRpcClient.class);
74
75 private final Configuration conf;
76
77 private int numIterations = 10;
78
79 public IntegrationTestRpcClient() {
80 conf = HBaseConfiguration.create();
81 }
82
83 static class TestRpcServer extends RpcServer {
84
85 TestRpcServer(Configuration conf) throws IOException {
86 this(new FifoRpcScheduler(conf, 1), conf);
87 }
88
89 TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
90 super(null, "testRpcServer", Lists
91 .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress(
92 "localhost", 0), conf, scheduler);
93 }
94
95 @Override
96 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
97 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
98 throws IOException {
99 return super.call(service, md, param, cellScanner, receiveTime, status);
100 }
101 }
102
103 static final BlockingService SERVICE =
104 TestRpcServiceProtos.TestProtobufRpcProto
105 .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() {
106
107 @Override
108 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
109 throws ServiceException {
110 return null;
111 }
112
113 @Override
114 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request)
115 throws ServiceException {
116 return null;
117 }
118
119 @Override
120 public EchoResponseProto echo(RpcController controller, EchoRequestProto request)
121 throws ServiceException {
122 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
123 }
124 });
125
126 protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
127 return isSyncClient ?
128 new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
129 new AsyncRpcClient(conf) {
130 @Override
131 Codec getCodec() {
132 return null;
133 }
134 };
135 }
136
137 static String BIG_PAYLOAD;
138
139 static {
140 StringBuilder builder = new StringBuilder();
141
142 while (builder.length() < 1024 * 1024) {
143 builder.append("big.payload.");
144 }
145
146 BIG_PAYLOAD = builder.toString();
147 }
148
149 class Cluster {
150 Random random = new Random();
151 ReadWriteLock lock = new ReentrantReadWriteLock();
152 HashMap<InetSocketAddress, TestRpcServer> rpcServers = new HashMap<>();
153 List<TestRpcServer> serverList = new ArrayList<>();
154 int maxServers;
155 int minServers;
156
157 Cluster(int minServers, int maxServers) {
158 this.minServers = minServers;
159 this.maxServers = maxServers;
160 }
161
162 TestRpcServer startServer() throws IOException {
163 lock.writeLock().lock();
164 try {
165 if (rpcServers.size() >= maxServers) {
166 return null;
167 }
168
169 TestRpcServer rpcServer = new TestRpcServer(conf);
170 rpcServer.start();
171 InetSocketAddress address = rpcServer.getListenerAddress();
172 if (address == null) {
173 throw new IOException("Listener channel is closed");
174 }
175 rpcServers.put(address, rpcServer);
176 serverList.add(rpcServer);
177 LOG.info("Started server: " + address);
178 return rpcServer;
179 } finally {
180 lock.writeLock().unlock();
181 }
182 }
183
184 void stopRandomServer() throws Exception {
185 lock.writeLock().lock();
186 TestRpcServer rpcServer = null;
187 try {
188 if (rpcServers.size() <= minServers) {
189 return;
190 }
191 int size = rpcServers.size();
192 int rand = random.nextInt(size);
193 rpcServer = serverList.remove(rand);
194 InetSocketAddress address = rpcServer.getListenerAddress();
195 if (address == null) {
196
197
198 throw new IOException("Listener channel is closed");
199 }
200 rpcServers.remove(address);
201
202 if (rpcServer != null) {
203 stopServer(rpcServer);
204 }
205 } finally {
206 lock.writeLock().unlock();
207 }
208 }
209
210 void stopServer(TestRpcServer rpcServer) throws InterruptedException {
211 InetSocketAddress address = rpcServer.getListenerAddress();
212 LOG.info("Stopping server: " + address);
213 rpcServer.stop();
214 rpcServer.join();
215 LOG.info("Stopped server: " + address);
216 }
217
218 void stopRunning() throws InterruptedException {
219 lock.writeLock().lock();
220 try {
221 for (TestRpcServer rpcServer : serverList) {
222 stopServer(rpcServer);
223 }
224
225 } finally {
226 lock.writeLock().unlock();
227 }
228 }
229
230 TestRpcServer getRandomServer() {
231 lock.readLock().lock();
232 try {
233 int size = rpcServers.size();
234 int rand = random.nextInt(size);
235 return serverList.get(rand);
236 } finally {
237 lock.readLock().unlock();
238 }
239 }
240 }
241
242 static class MiniChaosMonkey extends Thread {
243 AtomicBoolean running = new AtomicBoolean(true);
244 Random random = new Random();
245 AtomicReference<Exception> exception = new AtomicReference<>(null);
246 Cluster cluster;
247
248 public MiniChaosMonkey(Cluster cluster) {
249 this.cluster = cluster;
250 }
251
252 @Override
253 public void run() {
254 while (running.get()) {
255 switch (random.nextInt() % 2) {
256 case 0:
257 try {
258 cluster.startServer();
259 } catch (Exception e) {
260 LOG.warn(e);
261 exception.compareAndSet(null, e);
262 }
263 break;
264
265 case 1:
266 try {
267 cluster.stopRandomServer();
268 } catch (Exception e) {
269 LOG.warn(e);
270 exception.compareAndSet(null, e);
271 }
272 default:
273 }
274
275 Threads.sleep(100);
276 }
277 }
278
279 void stopRunning() {
280 running.set(false);
281 }
282
283 void rethrowException() throws Exception {
284 if (exception.get() != null) {
285 throw exception.get();
286 }
287 }
288 }
289
290 static class SimpleClient extends Thread {
291 AbstractRpcClient rpcClient;
292 AtomicBoolean running = new AtomicBoolean(true);
293 AtomicReference<Throwable> exception = new AtomicReference<>(null);
294 Cluster cluster;
295 String id;
296 long numCalls = 0;
297 Random random = new Random();
298
299 public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) {
300 this.cluster = cluster;
301 this.rpcClient = rpcClient;
302 this.id = id;
303 }
304
305 @Override
306 public void run() {
307 MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
308
309 while (running.get()) {
310 boolean isBigPayload = random.nextBoolean();
311 String message = isBigPayload ? BIG_PAYLOAD : id + numCalls;
312 EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
313 EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build();
314
315 TestRpcServer server = cluster.getRandomServer();
316 try {
317 User user = User.getCurrent();
318 InetSocketAddress address = server.getListenerAddress();
319 if (address == null) {
320 throw new IOException("Listener channel is closed");
321 }
322 ret = (EchoResponseProto)
323 rpcClient.callBlockingMethod(md, null, param, ret, user, address);
324 } catch (Exception e) {
325 LOG.warn(e);
326 continue;
327 }
328
329 try {
330 assertNotNull(ret);
331 assertEquals(message, ret.getMessage());
332 } catch (Throwable t) {
333 exception.compareAndSet(null, t);
334 }
335
336 numCalls++;
337 }
338 }
339
340 void stopRunning() {
341 running.set(false);
342 }
343
344 void rethrowException() throws Throwable {
345 if (exception.get() != null) {
346 throw exception.get();
347 }
348 }
349 }
350
351 @Test (timeout = 900000)
352 public void testRpcWithChaosMonkeyWithSyncClient() throws Throwable {
353 for (int i = 0; i < numIterations; i++) {
354 TimeoutThread.runWithTimeout(new Callable<Void>() {
355 @Override
356 public Void call() throws Exception {
357 try {
358 testRpcWithChaosMonkey(true);
359 } catch (Throwable e) {
360 if (e instanceof Exception) {
361 throw (Exception)e;
362 } else {
363 throw new Exception(e);
364 }
365 }
366 return null;
367 }
368 }, 90000);
369 }
370 }
371
372 @Test (timeout = 900000)
373 @Ignore
374 public void testRpcWithChaosMonkeyWithAsyncClient() throws Throwable {
375 for (int i = 0; i < numIterations; i++) {
376 TimeoutThread.runWithTimeout(new Callable<Void>() {
377 @Override
378 public Void call() throws Exception {
379 try {
380 testRpcWithChaosMonkey(false);
381 } catch (Throwable e) {
382 if (e instanceof Exception) {
383 throw (Exception)e;
384 } else {
385 throw new Exception(e);
386 }
387 }
388 return null;
389 }
390 }, 90000);
391 }
392 }
393
394 static class TimeoutThread extends Thread {
395 long timeout;
396 public TimeoutThread(long timeout) {
397 this.timeout = timeout;
398 }
399
400 @Override
401 public void run() {
402 try {
403 Thread.sleep(timeout);
404 Threads.printThreadInfo(System.err, "TEST TIMEOUT STACK DUMP");
405 System.exit(1);
406 } catch (InterruptedException e) {
407
408 }
409 }
410
411
412
413 static void runWithTimeout(Callable<?> callable, long timeout) throws Exception {
414 TimeoutThread thread = new TimeoutThread(timeout);
415 thread.start();
416 callable.call();
417 thread.interrupt();
418 }
419 }
420
421 public void testRpcWithChaosMonkey(boolean isSyncClient) throws Throwable {
422 LOG.info("Starting test");
423 Cluster cluster = new Cluster(10, 100);
424 for (int i = 0; i < 10; i++) {
425 cluster.startServer();
426 }
427
428 ArrayList<SimpleClient> clients = new ArrayList<>();
429
430
431 AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient);
432
433 for (int i = 0; i < 30; i++) {
434 String clientId = "client_" + i + "_";
435 LOG.info("Starting client: " + clientId);
436 SimpleClient client = new SimpleClient(cluster, rpcClient, clientId);
437 client.start();
438 clients.add(client);
439 }
440
441 LOG.info("Starting MiniChaosMonkey");
442 MiniChaosMonkey cm = new MiniChaosMonkey(cluster);
443 cm.start();
444
445 Threads.sleep(30000);
446
447 LOG.info("Stopping MiniChaosMonkey");
448 cm.stopRunning();
449 cm.join();
450 cm.rethrowException();
451
452 LOG.info("Stopping clients");
453 for (SimpleClient client : clients) {
454 LOG.info("Stopping client: " + client.id);
455 LOG.info(client.id + " numCalls:" + client.numCalls);
456 client.stopRunning();
457 client.join();
458 client.rethrowException();
459 assertTrue(client.numCalls > 10);
460 }
461
462 LOG.info("Stopping RpcClient");
463 rpcClient.close();
464
465 LOG.info("Stopping Cluster");
466 cluster.stopRunning();
467 }
468 }