1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
22 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
29
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.List;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.CategoryBasedTimeout;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.CellUtil;
39 import org.apache.hadoop.hbase.HBaseTestCase;
40 import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
41 import org.apache.hadoop.hbase.HBaseTestCase.ScannerIncommon;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HColumnDescriptor;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.UnknownScannerException;
49 import org.apache.hadoop.hbase.client.Delete;
50 import org.apache.hadoop.hbase.client.Get;
51 import org.apache.hadoop.hbase.client.Put;
52 import org.apache.hadoop.hbase.client.Result;
53 import org.apache.hadoop.hbase.client.Scan;
54 import org.apache.hadoop.hbase.filter.Filter;
55 import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
56 import org.apache.hadoop.hbase.filter.PrefixFilter;
57 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
58 import org.apache.hadoop.hbase.testclassification.SmallTests;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.junit.Rule;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63 import org.junit.rules.TestName;
64 import org.junit.rules.TestRule;
65
66
67
68
69
70 @Category(SmallTests.class)
71 public class TestScanner {
72 @Rule public TestName name = new TestName();
73 @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
74 withTimeout(this.getClass()).withLookingForStuckThread(true).build();
75
76 private static final Log LOG = LogFactory.getLog(TestScanner.class);
77 private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
78
79 private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
80 private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
81 private static final byte [][] EXPLICIT_COLS = {
82 HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
83
84
85 };
86
87 static final HTableDescriptor TESTTABLEDESC =
88 new HTableDescriptor(TableName.valueOf("testscanner"));
89 static {
90 TESTTABLEDESC.addFamily(
91 new HColumnDescriptor(HConstants.CATALOG_FAMILY)
92
93 .setMaxVersions(10)
94 .setBlockCacheEnabled(false)
95 .setBlocksize(8 * 1024)
96 );
97 }
98
99 public static final HRegionInfo REGION_INFO =
100 new HRegionInfo(TESTTABLEDESC.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
101 HConstants.EMPTY_BYTE_ARRAY);
102
103 private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
104
105 private static final long START_CODE = Long.MAX_VALUE;
106
107 private HRegion r;
108 private HRegionIncommon region;
109
110 private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
111 final private byte[] col1, col2;
112
113 public TestScanner() {
114 super();
115
116 firstRowBytes = START_KEY_BYTES;
117 secondRowBytes = START_KEY_BYTES.clone();
118
119 secondRowBytes[START_KEY_BYTES.length - 1]++;
120 thirdRowBytes = START_KEY_BYTES.clone();
121 thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
122 col1 = Bytes.toBytes("column1");
123 col2 = Bytes.toBytes("column2");
124 }
125
126
127
128
129
130 @Test
131 public void testStopRow() throws Exception {
132 byte [] startrow = Bytes.toBytes("bbb");
133 byte [] stoprow = Bytes.toBytes("ccc");
134 try {
135 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
136 HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
137 List<Cell> results = new ArrayList<Cell>();
138
139 Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
140 scan.addFamily(HConstants.CATALOG_FAMILY);
141
142 InternalScanner s = r.getScanner(scan);
143 int count = 0;
144 while (s.next(results)) {
145 count++;
146 }
147 s.close();
148 assertEquals(0, count);
149
150 scan = new Scan(startrow, stoprow);
151 scan.addFamily(HConstants.CATALOG_FAMILY);
152
153 s = r.getScanner(scan);
154 count = 0;
155 Cell kv = null;
156 results = new ArrayList<Cell>();
157 for (boolean first = true; s.next(results);) {
158 kv = results.get(0);
159 if (first) {
160 assertTrue(CellUtil.matchingRow(kv, startrow));
161 first = false;
162 }
163 count++;
164 }
165 assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
166
167 assertTrue(count > 10);
168 s.close();
169 } finally {
170 HRegion.closeHRegion(this.r);
171 }
172 }
173
174 void rowPrefixFilter(Scan scan) throws IOException {
175 List<Cell> results = new ArrayList<Cell>();
176 scan.addFamily(HConstants.CATALOG_FAMILY);
177 InternalScanner s = r.getScanner(scan);
178 boolean hasMore = true;
179 while (hasMore) {
180 hasMore = s.next(results);
181 for (Cell kv : results) {
182 assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]);
183 assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]);
184 }
185 results.clear();
186 }
187 s.close();
188 }
189
190 void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
191 List<Cell> results = new ArrayList<Cell>();
192 scan.addFamily(HConstants.CATALOG_FAMILY);
193 InternalScanner s = r.getScanner(scan);
194 boolean hasMore = true;
195 while (hasMore) {
196 hasMore = s.next(results);
197 for (Cell kv : results) {
198 assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
199 }
200 results.clear();
201 }
202 s.close();
203 }
204
205 @Test
206 public void testFilters() throws IOException {
207 try {
208 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
209 HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
210 byte [] prefix = Bytes.toBytes("ab");
211 Filter newFilter = new PrefixFilter(prefix);
212 Scan scan = new Scan();
213 scan.setFilter(newFilter);
214 rowPrefixFilter(scan);
215
216 byte[] stopRow = Bytes.toBytes("bbc");
217 newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
218 scan = new Scan();
219 scan.setFilter(newFilter);
220 rowInclusiveStopFilter(scan, stopRow);
221
222 } finally {
223 HRegion.closeHRegion(this.r);
224 }
225 }
226
227
228
229
230
231
232 @Test
233 public void testRaceBetweenClientAndTimeout() throws Exception {
234 try {
235 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
236 HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
237 Scan scan = new Scan();
238 InternalScanner s = r.getScanner(scan);
239 List<Cell> results = new ArrayList<Cell>();
240 try {
241 s.next(results);
242 s.close();
243 s.next(results);
244 fail("We don't want anything more, we should be failing");
245 } catch (UnknownScannerException ex) {
246
247 return;
248 }
249 } finally {
250 HRegion.closeHRegion(this.r);
251 }
252 }
253
254
255
256
257 @Test
258 public void testScanner() throws IOException {
259 try {
260 r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
261 region = new HRegionIncommon(r);
262
263
264
265 Put put = new Put(ROW_KEY, System.currentTimeMillis());
266
267 put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
268 REGION_INFO.toByteArray());
269 region.put(put);
270
271
272
273
274 scan(false, null);
275 getRegionInfo();
276
277
278
279 ((HRegion)r).close();
280 r = HRegion.openHRegion(r, null);
281 region = new HRegionIncommon(r);
282
283
284
285 scan(false, null);
286 getRegionInfo();
287
288
289
290 String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort();
291
292 put = new Put(ROW_KEY, System.currentTimeMillis());
293 put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
294 Bytes.toBytes(address));
295
296
297
298 region.put(put);
299
300
301
302
303 scan(true, address.toString());
304 getRegionInfo();
305
306
307
308 region.flushcache();
309
310
311
312 scan(true, address.toString());
313 getRegionInfo();
314
315
316
317 ((HRegion)r).close();
318 r = HRegion.openHRegion(r,null);
319 region = new HRegionIncommon(r);
320
321
322
323 scan(true, address.toString());
324 getRegionInfo();
325
326
327
328 address = "bar.foo.com:4321";
329
330 put = new Put(ROW_KEY, System.currentTimeMillis());
331
332 put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
333 Bytes.toBytes(address));
334 region.put(put);
335
336
337
338 scan(true, address.toString());
339 getRegionInfo();
340
341
342
343 region.flushcache();
344
345
346
347 scan(true, address.toString());
348 getRegionInfo();
349
350
351
352 ((HRegion)r).close();
353 r = HRegion.openHRegion(r,null);
354 region = new HRegionIncommon(r);
355
356
357
358 scan(true, address.toString());
359 getRegionInfo();
360
361 } finally {
362
363 HRegion.closeHRegion(r);
364 }
365 }
366
367
368 private void validateRegionInfo(byte [] regionBytes) throws IOException {
369 HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes);
370
371 assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
372 assertEquals(0, info.getStartKey().length);
373 assertEquals(0, info.getEndKey().length);
374 assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
375
376 }
377
378
379 private void scan(boolean validateStartcode, String serverName)
380 throws IOException {
381 InternalScanner scanner = null;
382 Scan scan = null;
383 List<Cell> results = new ArrayList<Cell>();
384 byte [][][] scanColumns = {
385 COLS,
386 EXPLICIT_COLS
387 };
388
389 for(int i = 0; i < scanColumns.length; i++) {
390 try {
391 scan = new Scan(FIRST_ROW);
392 for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
393 scan.addColumn(COLS[0], EXPLICIT_COLS[ii]);
394 }
395 scanner = r.getScanner(scan);
396 while (scanner.next(results)) {
397 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
398 HConstants.REGIONINFO_QUALIFIER));
399 byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
400 HConstants.REGIONINFO_QUALIFIER));
401 validateRegionInfo(val);
402 if(validateStartcode) {
403
404
405
406
407 assertNotNull(val);
408 assertFalse(val.length == 0);
409 long startCode = Bytes.toLong(val);
410 assertEquals(START_CODE, startCode);
411 }
412
413 if(serverName != null) {
414 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
415 HConstants.SERVER_QUALIFIER));
416 val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
417 HConstants.SERVER_QUALIFIER));
418 assertNotNull(val);
419 assertFalse(val.length == 0);
420 String server = Bytes.toString(val);
421 assertEquals(0, server.compareTo(serverName));
422 }
423 }
424 } finally {
425 InternalScanner s = scanner;
426 scanner = null;
427 if(s != null) {
428 s.close();
429 }
430 }
431 }
432 }
433
434 private boolean hasColumn(final List<Cell> kvs, final byte [] family,
435 final byte [] qualifier) {
436 for (Cell kv: kvs) {
437 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
438 return true;
439 }
440 }
441 return false;
442 }
443
444 private Cell getColumn(final List<Cell> kvs, final byte [] family,
445 final byte [] qualifier) {
446 for (Cell kv: kvs) {
447 if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
448 return kv;
449 }
450 }
451 return null;
452 }
453
454
455
456 private void getRegionInfo() throws IOException {
457 Get get = new Get(ROW_KEY);
458 get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
459 Result result = region.get(get);
460 byte [] bytes = result.value();
461 validateRegionInfo(bytes);
462 }
463
464
465
466
467
468
469
470 @Test
471 public void testScanAndSyncFlush() throws Exception {
472 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
473 HRegionIncommon hri = new HRegionIncommon(r);
474 try {
475 LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
476 Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
477 int count = count(hri, -1, false);
478 assertEquals(count, count(hri, 100, false));
479 } catch (Exception e) {
480 LOG.error("Failed", e);
481 throw e;
482 } finally {
483 HRegion.closeHRegion(this.r);
484 }
485 }
486
487
488
489
490
491
492
493 @Test
494 public void testScanAndRealConcurrentFlush() throws Exception {
495 this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
496 HRegionIncommon hri = new HRegionIncommon(r);
497 try {
498 LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
499 Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
500 int count = count(hri, -1, false);
501 assertEquals(count, count(hri, 100, true));
502 } catch (Exception e) {
503 LOG.error("Failed", e);
504 throw e;
505 } finally {
506 HRegion.closeHRegion(this.r);
507 }
508 }
509
510
511
512
513
514
515
516 @Test
517 @SuppressWarnings("deprecation")
518 public void testScanAndConcurrentMajorCompact() throws Exception {
519 HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName());
520 this.r = TEST_UTIL.createLocalHRegion(htd, null, null);
521 HRegionIncommon hri = new HRegionIncommon(r);
522
523 try {
524 HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
525 firstRowBytes, secondRowBytes);
526 HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
527 firstRowBytes, secondRowBytes);
528
529 Delete dc = new Delete(firstRowBytes);
530
531 dc.deleteColumns(fam1, col1);
532 r.delete(dc);
533 r.flush(true);
534
535 HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
536 secondRowBytes, thirdRowBytes);
537 HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
538 secondRowBytes, thirdRowBytes);
539 r.flush(true);
540
541 InternalScanner s = r.getScanner(new Scan());
542
543 r.compact(true);
544
545 List<Cell> results = new ArrayList<Cell>();
546 s.next(results);
547
548
549 assertTrue("result is not correct, keyValues : " + results,
550 results.size() == 1);
551 assertTrue(CellUtil.matchingRow(results.get(0), firstRowBytes));
552 assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
553
554 results = new ArrayList<Cell>();
555 s.next(results);
556
557
558 assertTrue(results.size() == 2);
559 assertTrue(CellUtil.matchingRow(results.get(0), secondRowBytes));
560 assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
561 assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
562 } finally {
563 HRegion.closeHRegion(this.r);
564 }
565 }
566
567
568
569
570
571
572
573
574
575 private int count(final HRegionIncommon hri, final int flushIndex,
576 boolean concurrent)
577 throws IOException {
578 LOG.info("Taking out counting scan");
579 ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
580 HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
581 List<Cell> values = new ArrayList<Cell>();
582 int count = 0;
583 boolean justFlushed = false;
584 while (s.next(values)) {
585 if (justFlushed) {
586 LOG.info("after next() just after next flush");
587 justFlushed=false;
588 }
589 count++;
590 if (flushIndex == count) {
591 LOG.info("Starting flush at flush index " + flushIndex);
592 Thread t = new Thread() {
593 public void run() {
594 try {
595 hri.flushcache();
596 LOG.info("Finishing flush");
597 } catch (IOException e) {
598 LOG.info("Failed flush cache");
599 }
600 }
601 };
602 if (concurrent) {
603 t.start();
604 } else {
605 t.run();
606 }
607 LOG.info("Continuing on after kicking off background flush");
608 justFlushed = true;
609 }
610 }
611 s.close();
612 LOG.info("Found " + count + " items");
613 return count;
614 }
615
616 }