1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketAddress;
23 import java.util.List;
24
25 import com.google.common.collect.Lists;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.Connection;
33 import org.apache.hadoop.hbase.client.ConnectionFactory;
34 import org.apache.hadoop.hbase.client.Get;
35 import org.apache.hadoop.hbase.client.MetricsConnection;
36 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.codec.Codec;
39 import org.apache.hadoop.hbase.testclassification.SmallTests;
40 import org.apache.hadoop.io.compress.CompressionCodec;
41 import org.junit.AfterClass;
42 import org.junit.BeforeClass;
43 import org.junit.Rule;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46 import org.junit.rules.ExpectedException;
47
48 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
49 import static org.junit.Assert.*;
50
51 @Category(SmallTests.class)
52 public class TestRpcClientLeaks {
53
54 public static class MyRpcClientImpl extends RpcClientImpl {
55 public static List<Socket> savedSockets = Lists.newArrayList();
56 @Rule public ExpectedException thrown = ExpectedException.none();
57
58 public MyRpcClientImpl(Configuration conf, String clusterId) {
59 super(conf, clusterId);
60 }
61
62 public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address,
63 MetricsConnection metrics) {
64 super(conf, clusterId, address, metrics);
65 }
66
67 @Override
68 protected Connection createConnection(ConnectionId remoteId, Codec codec,
69 CompressionCodec compressor) throws IOException {
70 return new Connection(remoteId, codec, compressor) {
71 @Override
72 protected synchronized void setupConnection() throws IOException {
73 super.setupConnection();
74 synchronized (savedSockets) {
75 savedSockets.add(socket);
76 }
77 throw new IOException("Sample exception for " +
78 "verifying socket closure in case of exceptions.");
79 }
80 };
81 }
82 }
83
84 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
85
86 @BeforeClass
87 public static void setup() throws Exception {
88 UTIL.startMiniCluster();
89 }
90
91 @AfterClass
92 public static void teardown() throws Exception {
93 UTIL.shutdownMiniCluster();
94 }
95
96 public static final Log LOG = LogFactory.getLog(TestRpcClientLeaks.class);
97
98 @Test(expected=RetriesExhaustedException.class)
99 public void testSocketClosed() throws IOException, InterruptedException {
100 String tableName = "testSocketClosed";
101 TableName name = TableName.valueOf(tableName);
102 UTIL.createTable(name, fam1).close();
103
104 Configuration conf = new Configuration(UTIL.getConfiguration());
105 conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
106 MyRpcClientImpl.class.getName());
107 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
108 Connection connection = ConnectionFactory.createConnection(conf);
109 Table table = connection.getTable(TableName.valueOf(tableName));
110 table.get(new Get("asd".getBytes()));
111 connection.close();
112 for (Socket socket : MyRpcClientImpl.savedSockets) {
113 assertTrue("Socket + " + socket + " is not closed", socket.isClosed());
114 }
115 }
116 }
117