1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23
24 import java.io.IOException;
25 import java.util.List;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.CellScannable;
30 import org.apache.hadoop.hbase.CellScanner;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.testclassification.MediumTests;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
36 import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
37 import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
38 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
39 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.junit.AfterClass;
42 import org.junit.BeforeClass;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45
46 import com.google.common.collect.Lists;
47
48 @Category(MediumTests.class)
49 public class TestRpcControllerFactory {
50
51 public static class StaticRpcControllerFactory extends RpcControllerFactory {
52
53 public StaticRpcControllerFactory(Configuration conf) {
54 super(conf);
55 }
56
57 @Override
58 public PayloadCarryingRpcController newController() {
59 return new CountingRpcController(super.newController());
60 }
61
62 @Override
63 public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
64 return new CountingRpcController(super.newController(cellScanner));
65 }
66
67 @Override
68 public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
69 return new CountingRpcController(super.newController(cellIterables));
70 }
71 }
72
73 public static class CountingRpcController extends DelegatingPayloadCarryingRpcController {
74
75 private static AtomicInteger INT_PRIORITY = new AtomicInteger();
76 private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
77
78 public CountingRpcController(PayloadCarryingRpcController delegate) {
79 super(delegate);
80 }
81
82 @Override
83 public void setPriority(int priority) {
84 super.setPriority(priority);
85 INT_PRIORITY.incrementAndGet();
86 }
87
88 @Override
89 public void setPriority(TableName tn) {
90 super.setPriority(tn);
91
92
93 if (!tn.isSystemTable()) {
94 TABLE_PRIORITY.incrementAndGet();
95 }
96
97 }
98 }
99
100 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
101
102 @BeforeClass
103 public static void setup() throws Exception {
104
105
106 Configuration conf = UTIL.getConfiguration();
107 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
108 ProtobufCoprocessorService.class.getName());
109
110 UTIL.startMiniCluster();
111 }
112
113 @AfterClass
114 public static void teardown() throws Exception {
115 UTIL.shutdownMiniCluster();
116 }
117
118
119
120
121
122
123
124 @Test
125 public void testCountController() throws Exception {
126 Configuration conf = new Configuration(UTIL.getConfiguration());
127
128 conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
129 StaticRpcControllerFactory.class.getName());
130
131 TableName name = TableName.valueOf("testcustomcontroller");
132 UTIL.createTable(name, fam1).close();
133
134
135 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
136
137 Connection connection = ConnectionFactory.createConnection(conf);
138 Table table = connection.getTable(name);
139 byte[] row = Bytes.toBytes("row");
140 Put p = new Put(row);
141 p.add(fam1, fam1, Bytes.toBytes("val0"));
142 table.put(p);
143
144 Integer counter = 1;
145 counter = verifyCount(counter);
146
147 Delete d = new Delete(row);
148 d.addColumn(fam1, fam1);
149 table.delete(d);
150 counter = verifyCount(counter);
151
152 Put p2 = new Put(row);
153 p2.add(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
154 table.batch(Lists.newArrayList(p, p2), new Object[2]);
155
156 counter = verifyCount(counter);
157
158 Append append = new Append(row);
159 append.add(fam1, fam1, Bytes.toBytes("val2"));
160 table.append(append);
161 counter = verifyCount(counter);
162
163
164 Get g = new Get(row);
165 table.get(g);
166 counter = verifyCount(counter);
167
168 ResultScanner scan = table.getScanner(fam1);
169 scan.next();
170 scan.close();
171 counter = verifyCount(counter);
172
173 Get g2 = new Get(row);
174 table.get(Lists.newArrayList(g, g2));
175
176 counter = verifyCount(counter);
177
178
179 Scan scanInfo = new Scan(row);
180
181 scanInfo.setSmall(true);
182 counter = doScan(table, scanInfo, counter);
183
184
185 scanInfo.setReversed(true);
186 counter = doScan(table, scanInfo, counter);
187
188
189 scanInfo.setSmall(false);
190 counter = doScan(table, scanInfo, counter);
191
192 table.close();
193 }
194
195 int doScan(Table table, Scan scan, int expectedCount) throws IOException {
196 ResultScanner results = table.getScanner(scan);
197 results.next();
198 results.close();
199 return verifyCount(expectedCount);
200 }
201
202 int verifyCount(Integer counter) {
203 assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
204 assertEquals(0, CountingRpcController.INT_PRIORITY.get());
205 return counter + 1;
206 }
207
208 @Test
209 public void testFallbackToDefaultRpcControllerFactory() {
210 Configuration conf = new Configuration(UTIL.getConfiguration());
211 conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, "foo.bar.Baz");
212
213
214 RpcControllerFactory factory = RpcControllerFactory.instantiate(conf);
215 assertNotNull(factory);
216 assertEquals(factory.getClass(), RpcControllerFactory.class);
217 }
218 }