1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.Random;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
30 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
31 import org.apache.hadoop.hbase.client.Get;
32 import org.apache.hadoop.hbase.client.HBaseAdmin;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.ResultScanner;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
40 import org.apache.hadoop.hbase.testclassification.MediumTests;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.util.StringUtils;
43 import org.apache.hadoop.util.Tool;
44 import org.apache.hadoop.util.ToolRunner;
45 import org.junit.Test;
46 import org.junit.experimental.categories.Category;
47
48 import com.google.common.collect.Lists;
49
50
51
52
53
54
55
56
57 @Category(MediumTests.class)
58 public class TestAcidGuarantees implements Tool {
59 protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
60 public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
61 public static final byte [] FAMILY_A = Bytes.toBytes("A");
62 public static final byte [] FAMILY_B = Bytes.toBytes("B");
63 public static final byte [] FAMILY_C = Bytes.toBytes("C");
64 public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
65
66 public static final byte[][] FAMILIES = new byte[][] {
67 FAMILY_A, FAMILY_B, FAMILY_C };
68
69 private HBaseTestingUtility util;
70
71 public static int NUM_COLS_TO_CHECK = 50;
72
73
74 private Configuration conf;
75
76 private void createTableIfMissing()
77 throws IOException {
78 try {
79 util.createTable(TABLE_NAME, FAMILIES);
80 } catch (TableExistsException tee) {
81 }
82 }
83
84 public TestAcidGuarantees() {
85
86 Configuration conf = HBaseConfiguration.create();
87 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
88
89 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
90 ConstantSizeRegionSplitPolicy.class.getName());
91 util = new HBaseTestingUtility(conf);
92 }
93
94 public void setHBaseTestingUtil(HBaseTestingUtility util) {
95 this.util = util;
96 }
97
98
99
100
101 public static class AtomicityWriter extends RepeatingTestThread {
102 Random rand = new Random();
103 byte data[] = new byte[10];
104 byte targetRows[][];
105 byte targetFamilies[][];
106 Table table;
107 AtomicLong numWritten = new AtomicLong();
108
109 public AtomicityWriter(TestContext ctx, byte targetRows[][],
110 byte targetFamilies[][]) throws IOException {
111 super(ctx);
112 this.targetRows = targetRows;
113 this.targetFamilies = targetFamilies;
114 table = new HTable(ctx.getConf(), TABLE_NAME);
115 }
116 public void doAnAction() throws Exception {
117
118 byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
119 Put p = new Put(targetRow);
120 rand.nextBytes(data);
121
122 for (byte[] family : targetFamilies) {
123 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
124 byte qualifier[] = Bytes.toBytes("col" + i);
125 p.add(family, qualifier, data);
126 }
127 }
128 table.put(p);
129 numWritten.getAndIncrement();
130 }
131 }
132
133
134
135
136
137 public static class AtomicGetReader extends RepeatingTestThread {
138 byte targetRow[];
139 byte targetFamilies[][];
140 Table table;
141 int numVerified = 0;
142 AtomicLong numRead = new AtomicLong();
143
144 public AtomicGetReader(TestContext ctx, byte targetRow[],
145 byte targetFamilies[][]) throws IOException {
146 super(ctx);
147 this.targetRow = targetRow;
148 this.targetFamilies = targetFamilies;
149 table = new HTable(ctx.getConf(), TABLE_NAME);
150 }
151
152 public void doAnAction() throws Exception {
153 Get g = new Get(targetRow);
154 Result res = table.get(g);
155 byte[] gotValue = null;
156 if (res.getRow() == null) {
157
158
159
160 return;
161 }
162
163 for (byte[] family : targetFamilies) {
164 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
165 byte qualifier[] = Bytes.toBytes("col" + i);
166 byte thisValue[] = res.getValue(family, qualifier);
167 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
168 gotFailure(gotValue, res);
169 }
170 numVerified++;
171 gotValue = thisValue;
172 }
173 }
174 numRead.getAndIncrement();
175 }
176
177 private void gotFailure(byte[] expected, Result res) {
178 StringBuilder msg = new StringBuilder();
179 msg.append("Failed after ").append(numVerified).append("!");
180 msg.append("Expected=").append(Bytes.toStringBinary(expected));
181 msg.append("Got:\n");
182 for (Cell kv : res.listCells()) {
183 msg.append(kv.toString());
184 msg.append(" val= ");
185 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
186 msg.append("\n");
187 }
188 throw new RuntimeException(msg.toString());
189 }
190 }
191
192
193
194
195
196 public static class AtomicScanReader extends RepeatingTestThread {
197 byte targetFamilies[][];
198 Table table;
199 AtomicLong numScans = new AtomicLong();
200 AtomicLong numRowsScanned = new AtomicLong();
201
202 public AtomicScanReader(TestContext ctx,
203 byte targetFamilies[][]) throws IOException {
204 super(ctx);
205 this.targetFamilies = targetFamilies;
206 table = new HTable(ctx.getConf(), TABLE_NAME);
207 }
208
209 public void doAnAction() throws Exception {
210 Scan s = new Scan();
211 for (byte[] family : targetFamilies) {
212 s.addFamily(family);
213 }
214 ResultScanner scanner = table.getScanner(s);
215
216 for (Result res : scanner) {
217 byte[] gotValue = null;
218
219 for (byte[] family : targetFamilies) {
220 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
221 byte qualifier[] = Bytes.toBytes("col" + i);
222 byte thisValue[] = res.getValue(family, qualifier);
223 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
224 gotFailure(gotValue, res);
225 }
226 gotValue = thisValue;
227 }
228 }
229 numRowsScanned.getAndIncrement();
230 }
231 numScans.getAndIncrement();
232 }
233
234 private void gotFailure(byte[] expected, Result res) {
235 StringBuilder msg = new StringBuilder();
236 msg.append("Failed after ").append(numRowsScanned).append("!");
237 msg.append("Expected=").append(Bytes.toStringBinary(expected));
238 msg.append("Got:\n");
239 for (Cell kv : res.listCells()) {
240 msg.append(kv.toString());
241 msg.append(" val= ");
242 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
243 msg.append("\n");
244 }
245 throw new RuntimeException(msg.toString());
246 }
247 }
248
249 public void runTestAtomicity(long millisToRun,
250 int numWriters,
251 int numGetters,
252 int numScanners,
253 int numUniqueRows) throws Exception {
254 runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
255 }
256
257 public void runTestAtomicity(long millisToRun,
258 int numWriters,
259 int numGetters,
260 int numScanners,
261 int numUniqueRows,
262 final boolean systemTest) throws Exception {
263 createTableIfMissing();
264 TestContext ctx = new TestContext(util.getConfiguration());
265
266 byte rows[][] = new byte[numUniqueRows][];
267 for (int i = 0; i < numUniqueRows; i++) {
268 rows[i] = Bytes.toBytes("test_row_" + i);
269 }
270
271 List<AtomicityWriter> writers = Lists.newArrayList();
272 for (int i = 0; i < numWriters; i++) {
273 AtomicityWriter writer = new AtomicityWriter(
274 ctx, rows, FAMILIES);
275 writers.add(writer);
276 ctx.addThread(writer);
277 }
278
279 ctx.addThread(new RepeatingTestThread(ctx) {
280 HBaseAdmin admin = util.getHBaseAdmin();
281 public void doAnAction() throws Exception {
282 try {
283 admin.flush(TABLE_NAME);
284 } catch(IOException ioe) {
285 LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
286 }
287
288
289
290
291
292
293
294
295 if (systemTest) Thread.sleep(60000);
296 }
297 });
298
299 List<AtomicGetReader> getters = Lists.newArrayList();
300 for (int i = 0; i < numGetters; i++) {
301 AtomicGetReader getter = new AtomicGetReader(
302 ctx, rows[i % numUniqueRows], FAMILIES);
303 getters.add(getter);
304 ctx.addThread(getter);
305 }
306
307 List<AtomicScanReader> scanners = Lists.newArrayList();
308 for (int i = 0; i < numScanners; i++) {
309 AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
310 scanners.add(scanner);
311 ctx.addThread(scanner);
312 }
313
314 ctx.startThreads();
315 ctx.waitFor(millisToRun);
316 ctx.stop();
317
318 LOG.info("Finished test. Writers:");
319 for (AtomicityWriter writer : writers) {
320 LOG.info(" wrote " + writer.numWritten.get());
321 }
322 LOG.info("Readers:");
323 for (AtomicGetReader reader : getters) {
324 LOG.info(" read " + reader.numRead.get());
325 }
326 LOG.info("Scanners:");
327 for (AtomicScanReader scanner : scanners) {
328 LOG.info(" scanned " + scanner.numScans.get());
329 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
330 }
331 }
332
333 @Test
334 public void testGetAtomicity() throws Exception {
335 util.startMiniCluster(1);
336 try {
337 runTestAtomicity(20000, 5, 5, 0, 3);
338 } finally {
339 util.shutdownMiniCluster();
340 }
341 }
342
343 @Test
344 public void testScanAtomicity() throws Exception {
345 util.startMiniCluster(1);
346 try {
347 runTestAtomicity(20000, 5, 0, 5, 3);
348 } finally {
349 util.shutdownMiniCluster();
350 }
351 }
352
353 @Test
354 public void testMixedAtomicity() throws Exception {
355 util.startMiniCluster(1);
356 try {
357 runTestAtomicity(20000, 5, 2, 2, 3);
358 } finally {
359 util.shutdownMiniCluster();
360 }
361 }
362
363
364
365
366 @Override
367 public Configuration getConf() {
368 return conf;
369 }
370
371 @Override
372 public void setConf(Configuration c) {
373 this.conf = c;
374 this.util = new HBaseTestingUtility(c);
375 }
376
377 @Override
378 public int run(String[] arg0) throws Exception {
379 Configuration c = getConf();
380 int millis = c.getInt("millis", 5000);
381 int numWriters = c.getInt("numWriters", 50);
382 int numGetters = c.getInt("numGetters", 2);
383 int numScanners = c.getInt("numScanners", 2);
384 int numUniqueRows = c.getInt("numUniqueRows", 3);
385 runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true);
386 return 0;
387 }
388
389 public static void main(String args[]) throws Exception {
390 Configuration c = HBaseConfiguration.create();
391 int status;
392 try {
393 TestAcidGuarantees test = new TestAcidGuarantees();
394 status = ToolRunner.run(c, test, args);
395 } catch (Exception e) {
396 LOG.error("Exiting due to error", e);
397 status = -1;
398 }
399 System.exit(status);
400 }
401
402
403 }
404