1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25 import static org.mockito.Matchers.any;
26 import static org.mockito.Matchers.eq;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.spy;
29 import static org.mockito.Mockito.when;
30
31 import java.io.FilterInputStream;
32 import java.io.IOException;
33 import java.lang.reflect.Field;
34 import java.security.PrivilegedExceptionAction;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.HashSet;
38 import java.util.List;
39 import java.util.Set;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.fs.FSDataInputStream;
47 import org.apache.hadoop.fs.FileStatus;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.fs.PathFilter;
51 import org.apache.hadoop.hbase.Cell;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HBaseTestingUtility;
54 import org.apache.hadoop.hbase.HColumnDescriptor;
55 import org.apache.hadoop.hbase.HConstants;
56 import org.apache.hadoop.hbase.HRegionInfo;
57 import org.apache.hadoop.hbase.HTableDescriptor;
58 import org.apache.hadoop.hbase.KeyValue;
59 import org.apache.hadoop.hbase.MasterNotRunningException;
60 import org.apache.hadoop.hbase.testclassification.MediumTests;
61 import org.apache.hadoop.hbase.MiniHBaseCluster;
62 import org.apache.hadoop.hbase.ServerName;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
65 import org.apache.hadoop.hbase.client.Delete;
66 import org.apache.hadoop.hbase.client.Get;
67 import org.apache.hadoop.hbase.client.HTable;
68 import org.apache.hadoop.hbase.client.Put;
69 import org.apache.hadoop.hbase.client.Result;
70 import org.apache.hadoop.hbase.client.ResultScanner;
71 import org.apache.hadoop.hbase.client.Scan;
72 import org.apache.hadoop.hbase.client.Table;
73 import org.apache.hadoop.hbase.master.HMaster;
74 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
75 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
76 import org.apache.hadoop.hbase.regionserver.*;
77 import org.apache.hadoop.hbase.security.User;
78 import org.apache.hadoop.hbase.util.Bytes;
79 import org.apache.hadoop.hbase.util.EnvironmentEdge;
80 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
81 import org.apache.hadoop.hbase.util.FSUtils;
82 import org.apache.hadoop.hbase.util.HFileTestUtil;
83 import org.apache.hadoop.hbase.util.Pair;
84 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
85 import org.apache.hadoop.hbase.wal.WAL;
86 import org.apache.hadoop.hbase.wal.WALFactory;
87 import org.apache.hadoop.hbase.wal.WALKey;
88 import org.apache.hadoop.hbase.wal.WALSplitter;
89 import org.apache.hadoop.hdfs.DFSInputStream;
90 import org.junit.After;
91 import org.junit.AfterClass;
92 import org.junit.Before;
93 import org.junit.BeforeClass;
94 import org.junit.Rule;
95 import org.junit.Test;
96 import org.junit.experimental.categories.Category;
97 import org.junit.rules.TestName;
98 import org.mockito.Mockito;
99 import org.mockito.invocation.InvocationOnMock;
100 import org.mockito.stubbing.Answer;
101
102
103
104
105 @Category(MediumTests.class)
106 public class TestWALReplay {
107 private static final Log LOG = LogFactory.getLog(TestWALReplay.class);
108 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
109 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
110 private Path hbaseRootDir = null;
111 private String logName;
112 private Path oldLogDir;
113 private Path logDir;
114 private FileSystem fs;
115 private Configuration conf;
116 private RecoveryMode mode;
117 private WALFactory wals;
118
119 @Rule
120 public final TestName currentTest = new TestName();
121
122
123 @BeforeClass
124 public static void setUpBeforeClass() throws Exception {
125 Configuration conf = TEST_UTIL.getConfiguration();
126 conf.setBoolean("dfs.support.append", true);
127
128 conf.setInt("dfs.client.block.recovery.retries", 2);
129 TEST_UTIL.startMiniCluster(3);
130 Path hbaseRootDir =
131 TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
132 LOG.info("hbase.rootdir=" + hbaseRootDir);
133 FSUtils.setRootDir(conf, hbaseRootDir);
134 }
135
136 @AfterClass
137 public static void tearDownAfterClass() throws Exception {
138 TEST_UTIL.shutdownMiniCluster();
139 }
140
141 @Before
142 public void setUp() throws Exception {
143 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
144 this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
145 this.hbaseRootDir = FSUtils.getRootDir(this.conf);
146 this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
147 this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual");
148 this.logDir = new Path(this.hbaseRootDir, logName);
149 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
150 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
151 }
152 this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
153 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
154 this.wals = new WALFactory(conf, null, currentTest.getMethodName());
155 }
156
157 @After
158 public void tearDown() throws Exception {
159 this.wals.close();
160 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
161 }
162
163
164
165
166 private void deleteDir(final Path p) throws IOException {
167 if (this.fs.exists(p)) {
168 if (!this.fs.delete(p, true)) {
169 throw new IOException("Failed remove of " + p);
170 }
171 }
172 }
173
174
175
176
177
178 @Test
179 public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
180 final TableName tableName =
181 TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
182 byte[] family1 = Bytes.toBytes("cf1");
183 byte[] family2 = Bytes.toBytes("cf2");
184 byte[] qualifier = Bytes.toBytes("q");
185 byte[] value = Bytes.toBytes("testV");
186 byte[][] familys = { family1, family2 };
187 TEST_UTIL.createTable(tableName, familys);
188 Table htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
189 Put put = new Put(Bytes.toBytes("r1"));
190 put.add(family1, qualifier, value);
191 htable.put(put);
192 ResultScanner resultScanner = htable.getScanner(new Scan());
193 int count = 0;
194 while (resultScanner.next() != null) {
195 count++;
196 }
197 resultScanner.close();
198 assertEquals(1, count);
199
200 MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
201 List<HRegion> regions = hbaseCluster.getRegions(tableName);
202 assertEquals(1, regions.size());
203
204
205 Region destRegion = regions.get(0);
206 int originServerNum = hbaseCluster
207 .getServerWith(destRegion.getRegionInfo().getRegionName());
208 assertTrue("Please start more than 1 regionserver", hbaseCluster
209 .getRegionServerThreads().size() > 1);
210 int destServerNum = 0;
211 while (destServerNum == originServerNum) {
212 destServerNum++;
213 }
214 HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
215 HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
216
217 moveRegionAndWait(destRegion, destServer);
218
219
220 Delete del = new Delete(Bytes.toBytes("r1"));
221 htable.delete(del);
222 resultScanner = htable.getScanner(new Scan());
223 count = 0;
224 while (resultScanner.next() != null) {
225 count++;
226 }
227 resultScanner.close();
228 assertEquals(0, count);
229
230
231 Region region = destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
232 region.flush(true);
233
234 for (Store store : region.getStores()) {
235 store.triggerMajorCompaction();
236 }
237 region.compact(true);
238
239
240 moveRegionAndWait(destRegion, originServer);
241
242 originServer.abort("testing");
243
244
245 Result result = htable.get(new Get(Bytes.toBytes("r1")));
246 if (result != null) {
247 assertTrue("Row is deleted, but we get" + result.toString(),
248 (result == null) || result.isEmpty());
249 }
250 resultScanner.close();
251 }
252
253 private void moveRegionAndWait(Region destRegion, HRegionServer destServer)
254 throws InterruptedException, MasterNotRunningException,
255 ZooKeeperConnectionException, IOException {
256 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
257 TEST_UTIL.getHBaseAdmin().move(
258 destRegion.getRegionInfo().getEncodedNameAsBytes(),
259 Bytes.toBytes(destServer.getServerName().getServerName()));
260 while (true) {
261 ServerName serverName = master.getAssignmentManager()
262 .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
263 if (serverName != null && serverName.equals(destServer.getServerName())) {
264 TEST_UTIL.assertRegionOnServer(
265 destRegion.getRegionInfo(), serverName, 200);
266 break;
267 }
268 Thread.sleep(10);
269 }
270 }
271
272
273
274
275
276
277 @Test
278 public void test2727() throws Exception {
279
280
281 final TableName tableName =
282 TableName.valueOf("test2727");
283
284 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
285 HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
286 Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
287 deleteDir(basedir);
288
289 HTableDescriptor htd = createBasic3FamilyHTD(tableName);
290 HRegion region2 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
291 HRegion.closeHRegion(region2);
292 final byte [] rowName = tableName.getName();
293
294 WAL wal1 = createWAL(this.conf);
295
296 final int countPerFamily = 1000;
297
298 for (HColumnDescriptor hcd: htd.getFamilies()) {
299 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
300 wal1, htd, mvcc);
301 }
302 wal1.shutdown();
303 runWALSplit(this.conf);
304
305 WAL wal2 = createWAL(this.conf);
306
307 for (HColumnDescriptor hcd: htd.getFamilies()) {
308 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
309 ee, wal2, htd, mvcc);
310 }
311 wal2.shutdown();
312 runWALSplit(this.conf);
313
314 WAL wal3 = createWAL(this.conf);
315 try {
316 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
317 long seqid = region.getOpenSeqNum();
318
319
320 assertTrue(seqid > mvcc.getWritePoint());
321 assertEquals(seqid - 1, mvcc.getWritePoint());
322 LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
323 + mvcc.getReadPoint());
324
325
326 region.close();
327 } finally {
328 wal3.close();
329 }
330 }
331
332
333
334
335
336
337
338
339
340
341 @Test
342 public void testRegionMadeOfBulkLoadedFilesOnly()
343 throws IOException, SecurityException, IllegalArgumentException,
344 NoSuchFieldException, IllegalAccessException, InterruptedException {
345 final TableName tableName =
346 TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
347 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
348 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
349 deleteDir(basedir);
350 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
351 HRegion region2 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
352 HRegion.closeHRegion(region2);
353 WAL wal = createWAL(this.conf);
354 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
355
356 byte [] family = htd.getFamilies().iterator().next().getName();
357 Path f = new Path(basedir, "hfile");
358 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
359 Bytes.toBytes("z"), 10);
360 List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
361 hfs.add(Pair.newPair(family, f.toString()));
362 region.bulkLoadHFiles(hfs, true, null);
363
364
365 byte [] row = tableName.getName();
366 region.put((new Put(row)).add(family, family, family));
367 wal.sync();
368 final int rowsInsertedCount = 11;
369
370 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
371
372
373 final Configuration newConf = HBaseConfiguration.create(this.conf);
374 User user = HBaseTestingUtility.getDifferentUser(newConf,
375 tableName.getNameAsString());
376 user.runAs(new PrivilegedExceptionAction() {
377 @Override
378 public Object run() throws Exception {
379 runWALSplit(newConf);
380 WAL wal2 = createWAL(newConf);
381
382 HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
383 hbaseRootDir, hri, htd, wal2);
384 long seqid2 = region2.getOpenSeqNum();
385 assertTrue(seqid2 > -1);
386 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
387
388
389 region2.close();
390 wal2.close();
391 return null;
392 }
393 });
394 }
395
396
397
398
399
400
401
402
403
404
405
406
407 @Test
408 public void testCompactedBulkLoadedFiles()
409 throws IOException, SecurityException, IllegalArgumentException,
410 NoSuchFieldException, IllegalAccessException, InterruptedException {
411 final TableName tableName =
412 TableName.valueOf("testCompactedBulkLoadedFiles");
413 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
414 final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
415 deleteDir(basedir);
416 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
417 HRegion region2 = HRegion.createHRegion(hri,
418 hbaseRootDir, this.conf, htd);
419 HRegion.closeHRegion(region2);
420 WAL wal = createWAL(this.conf);
421 HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
422
423
424 byte [] row = tableName.getName();
425 byte [] family = htd.getFamilies().iterator().next().getName();
426 region.put((new Put(row)).add(family, family, family));
427 wal.sync();
428
429 List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
430 for (int i = 0; i < 3; i++) {
431 Path f = new Path(basedir, "hfile"+i);
432 HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
433 Bytes.toBytes(i + "50"), 10);
434 hfs.add(Pair.newPair(family, f.toString()));
435 }
436 region.bulkLoadHFiles(hfs, true, null);
437 final int rowsInsertedCount = 31;
438 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
439
440
441 region.compact(true);
442 assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
443
444
445 final Configuration newConf = HBaseConfiguration.create(this.conf);
446 User user = HBaseTestingUtility.getDifferentUser(newConf,
447 tableName.getNameAsString());
448 user.runAs(new PrivilegedExceptionAction() {
449 @Override
450 public Object run() throws Exception {
451 runWALSplit(newConf);
452 WAL wal2 = createWAL(newConf);
453
454 HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
455 hbaseRootDir, hri, htd, wal2);
456 long seqid2 = region2.getOpenSeqNum();
457 assertTrue(seqid2 > -1);
458 assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
459
460
461 region2.close();
462 wal2.close();
463 return null;
464 }
465 });
466 }
467
468
469
470
471
472
473
474
475
476
477
478 @Test
479 public void testReplayEditsWrittenViaHRegion()
480 throws IOException, SecurityException, IllegalArgumentException,
481 NoSuchFieldException, IllegalAccessException, InterruptedException {
482 final TableName tableName =
483 TableName.valueOf("testReplayEditsWrittenViaHRegion");
484 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
485 final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
486 deleteDir(basedir);
487 final byte[] rowName = tableName.getName();
488 final int countPerFamily = 10;
489 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
490 HRegion region3 = HRegion.createHRegion(hri,
491 hbaseRootDir, this.conf, htd);
492 HRegion.closeHRegion(region3);
493
494
495
496 WAL wal = createWAL(this.conf);
497 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
498 long seqid = region.getOpenSeqNum();
499 boolean first = true;
500 for (HColumnDescriptor hcd: htd.getFamilies()) {
501 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
502 if (first) {
503
504 region.flush(true);
505 first = false;
506 }
507 }
508
509 final Get g = new Get(rowName);
510 Result result = region.get(g);
511 assertEquals(countPerFamily * htd.getFamilies().size(),
512 result.size());
513
514
515
516 region.close(true);
517 wal.shutdown();
518 runWALSplit(this.conf);
519 WAL wal2 = createWAL(this.conf);
520 HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
521 long seqid2 = region2.getOpenSeqNum();
522 assertTrue(seqid + result.size() < seqid2);
523 final Result result1b = region2.get(g);
524 assertEquals(result.size(), result1b.size());
525
526
527
528
529 for (HColumnDescriptor hcd: htd.getFamilies()) {
530 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
531 }
532
533 final Result result2 = region2.get(g);
534 assertEquals(2 * result.size(), result2.size());
535 wal2.sync();
536 final Configuration newConf = HBaseConfiguration.create(this.conf);
537 User user = HBaseTestingUtility.getDifferentUser(newConf,
538 tableName.getNameAsString());
539 user.runAs(new PrivilegedExceptionAction() {
540 @Override
541 public Object run() throws Exception {
542 runWALSplit(newConf);
543 FileSystem newFS = FileSystem.get(newConf);
544
545 WAL wal3 = createWAL(newConf);
546 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
547 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
548 @Override
549 protected boolean restoreEdit(Store s, Cell cell) {
550 boolean b = super.restoreEdit(s, cell);
551 countOfRestoredEdits.incrementAndGet();
552 return b;
553 }
554 };
555 long seqid3 = region3.initialize();
556 Result result3 = region3.get(g);
557
558 assertEquals(result2.size(), result3.size());
559 assertEquals(htd.getFamilies().size() * countPerFamily,
560 countOfRestoredEdits.get());
561
562
563 region3.close();
564 wal3.close();
565 return null;
566 }
567 });
568 }
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588 @Test
589 public void testReplayEditsAfterPartialFlush()
590 throws IOException, SecurityException, IllegalArgumentException,
591 NoSuchFieldException, IllegalAccessException, InterruptedException {
592 final TableName tableName =
593 TableName.valueOf("testReplayEditsWrittenViaHRegion");
594 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
595 final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
596 deleteDir(basedir);
597 final byte[] rowName = tableName.getName();
598 final int countPerFamily = 10;
599 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
600 HRegion region3 = HRegion.createHRegion(hri,
601 hbaseRootDir, this.conf, htd);
602 HRegion.closeHRegion(region3);
603
604
605
606 WAL wal = createWAL(this.conf);
607 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
608 long seqid = region.getOpenSeqNum();
609 for (HColumnDescriptor hcd: htd.getFamilies()) {
610 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
611 }
612
613
614 final Get g = new Get(rowName);
615 Result result = region.get(g);
616 assertEquals(countPerFamily * htd.getFamilies().size(),
617 result.size());
618
619
620 region.flush(true);
621 region.close(true);
622 wal.shutdown();
623
624
625
626
627
628 int cf_count = 0;
629 for (HColumnDescriptor hcd: htd.getFamilies()) {
630 cf_count++;
631 if (cf_count == 2) {
632 region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
633 }
634 }
635
636
637
638 runWALSplit(this.conf);
639 WAL wal2 = createWAL(this.conf);
640 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
641 long seqid2 = region2.getOpenSeqNum();
642 assertTrue(seqid + result.size() < seqid2);
643
644 final Result result1b = region2.get(g);
645 assertEquals(result.size(), result1b.size());
646 }
647
648
649
650
651 public static class CustomStoreFlusher extends DefaultStoreFlusher {
652
653 static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
654
655 public CustomStoreFlusher(Configuration conf, Store store) {
656 super(conf, store);
657 }
658 @Override
659 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
660 MonitoredTask status) throws IOException {
661 if (throwExceptionWhenFlushing.get()) {
662 throw new IOException("Simulated exception by tests");
663 }
664 return super.flushSnapshot(snapshot, cacheFlushId, status);
665 }
666
667 };
668
669
670
671
672
673
674
675 @Test
676 public void testReplayEditsAfterAbortingFlush() throws IOException {
677 final TableName tableName =
678 TableName.valueOf("testReplayEditsAfterAbortingFlush");
679 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
680 final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
681 deleteDir(basedir);
682 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
683 HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
684 region3.close();
685 region3.getWAL().close();
686
687
688
689 WAL wal = createWAL(this.conf);
690 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
691 Mockito.doReturn(false).when(rsServices).isAborted();
692 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
693 Configuration customConf = new Configuration(this.conf);
694 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
695 CustomStoreFlusher.class.getName());
696 HRegion region =
697 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
698 int writtenRowCount = 10;
699 List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
700 htd.getFamilies());
701 for (int i = 0; i < writtenRowCount; i++) {
702 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
703 put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
704 Bytes.toBytes("val"));
705 region.put(put);
706 }
707
708
709 RegionScanner scanner = region.getScanner(new Scan());
710 assertEquals(writtenRowCount, getScannedCount(scanner));
711
712
713 CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
714 try {
715 region.flush(true);
716 fail("Injected exception hasn't been thrown");
717 } catch (Throwable t) {
718 LOG.info("Expected simulated exception when flushing region,"
719 + t.getMessage());
720
721 Mockito.doReturn(true).when(rsServices).isAborted();
722 region.setClosing(false);
723
724 }
725
726 int moreRow = 10;
727 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
728 Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
729 put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
730 Bytes.toBytes("val"));
731 region.put(put);
732 }
733 writtenRowCount += moreRow;
734
735 CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
736 try {
737 region.flush(true);
738 } catch (IOException t) {
739 LOG.info("Expected exception when flushing region because server is stopped,"
740 + t.getMessage());
741 }
742
743 region.close(true);
744 wal.shutdown();
745
746
747 runWALSplit(this.conf);
748 WAL wal2 = createWAL(this.conf);
749 Mockito.doReturn(false).when(rsServices).isAborted();
750 HRegion region2 =
751 HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
752 scanner = region2.getScanner(new Scan());
753 assertEquals(writtenRowCount, getScannedCount(scanner));
754 }
755
756 private int getScannedCount(RegionScanner scanner) throws IOException {
757 int scannedCount = 0;
758 List<Cell> results = new ArrayList<Cell>();
759 while (true) {
760 boolean existMore = scanner.next(results);
761 if (!results.isEmpty())
762 scannedCount++;
763 if (!existMore)
764 break;
765 results.clear();
766 }
767 return scannedCount;
768 }
769
770
771
772
773
774
775 @Test
776 public void testReplayEditsWrittenIntoWAL() throws Exception {
777 final TableName tableName =
778 TableName.valueOf("testReplayEditsWrittenIntoWAL");
779 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
780 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
781 final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
782 deleteDir(basedir);
783
784 final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
785 HRegion region2 = HRegion.createHRegion(hri,
786 hbaseRootDir, this.conf, htd);
787 HRegion.closeHRegion(region2);
788 final WAL wal = createWAL(this.conf);
789 final byte[] rowName = tableName.getName();
790 final byte[] regionName = hri.getEncodedNameAsBytes();
791
792
793 final int countPerFamily = 1000;
794 Set<byte[]> familyNames = new HashSet<byte[]>();
795 for (HColumnDescriptor hcd: htd.getFamilies()) {
796 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
797 ee, wal, htd, mvcc);
798 familyNames.add(hcd.getName());
799 }
800
801
802 wal.startCacheFlush(regionName, familyNames);
803 wal.completeCacheFlush(regionName);
804
805
806 WALEdit edit = new WALEdit();
807 long now = ee.currentTime();
808 edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
809 now, rowName));
810 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
811
812
813 edit = new WALEdit();
814 now = ee.currentTime();
815 edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
816 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
817
818
819 wal.sync();
820
821
822 final Configuration newConf = HBaseConfiguration.create(this.conf);
823 User user = HBaseTestingUtility.getDifferentUser(newConf,
824 ".replay.wal.secondtime");
825 user.runAs(new PrivilegedExceptionAction<Void>() {
826 @Override
827 public Void run() throws Exception {
828 runWALSplit(newConf);
829 FileSystem newFS = FileSystem.get(newConf);
830
831 newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
832
833 WAL newWal = createWAL(newConf);
834 final AtomicInteger flushcount = new AtomicInteger(0);
835 try {
836 final HRegion region =
837 new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
838 @Override
839 protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
840 final Collection<Store> storesToFlush, MonitoredTask status,
841 boolean writeFlushWalMarker)
842 throws IOException {
843 LOG.info("InternalFlushCache Invoked");
844 FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
845 Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
846 flushcount.incrementAndGet();
847 return fs;
848 }
849 };
850
851 long seqid = region.initialize();
852
853
854 long writePoint = mvcc.getWritePoint();
855
856
857 assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
858 assertTrue((seqid - 1) == writePoint);
859
860 Get get = new Get(rowName);
861 Result result = region.get(get);
862
863 assertEquals(countPerFamily * (htd.getFamilies().size() - 1),
864 result.size());
865 region.close();
866 } finally {
867 newWal.close();
868 }
869 return null;
870 }
871 });
872 }
873
874 @Test
875
876 public void testSequentialEditLogSeqNum() throws IOException {
877 final TableName tableName = TableName.valueOf(currentTest.getMethodName());
878 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
879 final Path basedir =
880 FSUtils.getTableDir(this.hbaseRootDir, tableName);
881 deleteDir(basedir);
882 final byte[] rowName = tableName.getName();
883 final int countPerFamily = 10;
884 final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
885
886
887 MockWAL wal = createMockWAL();
888
889 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
890 for (HColumnDescriptor hcd : htd.getFamilies()) {
891 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
892 }
893
894
895
896 region.flush(true);
897 for (HColumnDescriptor hcd : htd.getFamilies()) {
898 addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
899 }
900 long lastestSeqNumber = region.getSequenceId();
901
902 wal.doCompleteCacheFlush = true;
903
904
905 wal.completeCacheFlush(hri.getEncodedNameAsBytes());
906 wal.shutdown();
907 FileStatus[] listStatus = wal.getFiles();
908 assertNotNull(listStatus);
909 assertTrue(listStatus.length > 0);
910 WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
911 this.fs, this.conf, null, null, null, mode, wals);
912 FileStatus[] listStatus1 = this.fs.listStatus(
913 new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
914 "recovered.edits")), new PathFilter() {
915 @Override
916 public boolean accept(Path p) {
917 if (WALSplitter.isSequenceIdFile(p)) {
918 return false;
919 }
920 return true;
921 }
922 });
923 int editCount = 0;
924 for (FileStatus fileStatus : listStatus1) {
925 editCount = Integer.parseInt(fileStatus.getPath().getName());
926 }
927
928 assertEquals(
929 "The sequence number of the recoverd.edits and the current edit seq should be same",
930 lastestSeqNumber, editCount);
931 }
932
933
934
935
936 @Test
937 public void testDatalossWhenInputError() throws IOException, InstantiationException,
938 IllegalAccessException {
939 final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
940 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
941 final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
942 deleteDir(basedir);
943 final byte[] rowName = tableName.getName();
944 final int countPerFamily = 10;
945 final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
946 HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
947 Path regionDir = region1.getRegionFileSystem().getRegionDir();
948 HBaseTestingUtility.closeRegionAndWAL(region1);
949
950 WAL wal = createWAL(this.conf);
951 HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
952 for (HColumnDescriptor hcd : htd.getFamilies()) {
953 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
954 }
955
956 final Get g = new Get(rowName);
957 Result result = region.get(g);
958 assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
959
960
961 region.close(true);
962 wal.shutdown();
963
964 runWALSplit(this.conf);
965
966
967 Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
968 FSDataInputStream stream = fs.open(editFile);
969 stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
970 Class<? extends DefaultWALProvider.Reader> logReaderClass =
971 conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
972 DefaultWALProvider.Reader.class);
973 DefaultWALProvider.Reader reader = logReaderClass.newInstance();
974 reader.init(this.fs, editFile, conf, stream);
975 final long headerLength = stream.getPos();
976 reader.close();
977 FileSystem spyFs = spy(this.fs);
978 doAnswer(new Answer<FSDataInputStream>() {
979
980 @Override
981 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
982 FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
983 Field field = FilterInputStream.class.getDeclaredField("in");
984 field.setAccessible(true);
985 final DFSInputStream in = (DFSInputStream) field.get(stream);
986 DFSInputStream spyIn = spy(in);
987 doAnswer(new Answer<Integer>() {
988
989 private long pos;
990
991 @Override
992 public Integer answer(InvocationOnMock invocation) throws Throwable {
993 if (pos >= headerLength) {
994 throw new IOException("read over limit");
995 }
996 int b = (Integer) invocation.callRealMethod();
997 if (b > 0) {
998 pos += b;
999 }
1000 return b;
1001 }
1002 }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
1003 doAnswer(new Answer<Void>() {
1004
1005 @Override
1006 public Void answer(InvocationOnMock invocation) throws Throwable {
1007 invocation.callRealMethod();
1008 in.close();
1009 return null;
1010 }
1011 }).when(spyIn).close();
1012 field.set(stream, spyIn);
1013 return stream;
1014 }
1015 }).when(spyFs).open(eq(editFile));
1016
1017 WAL wal2 = createWAL(this.conf);
1018 HRegion region2;
1019 try {
1020
1021 region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
1022 assertEquals(result.size(), region2.get(g).size());
1023 } catch (IOException e) {
1024 assertEquals("read over limit", e.getMessage());
1025 }
1026 region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
1027 assertEquals(result.size(), region2.get(g).size());
1028 }
1029
1030 static class MockWAL extends FSHLog {
1031 boolean doCompleteCacheFlush = false;
1032
1033 public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1034 throws IOException {
1035 super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1036 }
1037
1038 @Override
1039 public void completeCacheFlush(byte[] encodedRegionName) {
1040 if (!doCompleteCacheFlush) {
1041 return;
1042 }
1043 super.completeCacheFlush(encodedRegionName);
1044 }
1045 }
1046
1047 private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1048 HTableDescriptor htd = new HTableDescriptor(tableName);
1049 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1050 htd.addFamily(a);
1051 return htd;
1052 }
1053
1054 private MockWAL createMockWAL() throws IOException {
1055 MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1056
1057
1058 HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1059 return wal;
1060 }
1061
1062
1063
1064 class TestFlusher implements FlushRequester {
1065 private HRegion r;
1066
1067 @Override
1068 public void requestFlush(Region region, boolean force) {
1069 try {
1070 r.flush(force);
1071 } catch (IOException e) {
1072 throw new RuntimeException("Exception flushing", e);
1073 }
1074 }
1075
1076 @Override
1077 public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) {
1078
1079
1080 }
1081
1082 @Override
1083 public void registerFlushRequestListener(FlushRequestListener listener) {
1084
1085 }
1086
1087 @Override
1088 public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1089 return false;
1090 }
1091
1092 @Override
1093 public void setGlobalMemstoreLimit(long globalMemStoreSize) {
1094
1095 }
1096 }
1097
1098 private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
1099 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1100 final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc)
1101 throws IOException {
1102 String familyStr = Bytes.toString(family);
1103 for (int j = 0; j < count; j++) {
1104 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
1105 byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
1106 WALEdit edit = new WALEdit();
1107 edit.add(new KeyValue(rowName, family, qualifierBytes,
1108 ee.currentTime(), columnBytes));
1109 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,999, mvcc),
1110 edit, true);
1111 }
1112 wal.sync();
1113 }
1114
1115 static List<Put> addRegionEdits (final byte [] rowName, final byte [] family,
1116 final int count, EnvironmentEdge ee, final Region r,
1117 final String qualifierPrefix)
1118 throws IOException {
1119 List<Put> puts = new ArrayList<Put>();
1120 for (int j = 0; j < count; j++) {
1121 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1122 Put p = new Put(rowName);
1123 p.add(family, qualifier, ee.currentTime(), rowName);
1124 r.put(p);
1125 puts.add(p);
1126 }
1127 return puts;
1128 }
1129
1130
1131
1132
1133
1134
1135 private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1136 return new HRegionInfo(tableName, null, null, false);
1137 }
1138
1139
1140
1141
1142
1143
1144
1145 private Path runWALSplit(final Configuration c) throws IOException {
1146 List<Path> splits = WALSplitter.split(
1147 hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1148
1149 assertEquals("splits=" + splits, 1, splits.size());
1150
1151 assertTrue(fs.exists(splits.get(0)));
1152 LOG.info("Split file=" + splits.get(0));
1153 return splits.get(0);
1154 }
1155
1156
1157
1158
1159
1160
1161 private WAL createWAL(final Configuration c) throws IOException {
1162 FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
1163
1164
1165 HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1166 return wal;
1167 }
1168
1169 private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1170 HTableDescriptor htd = new HTableDescriptor(tableName);
1171 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1172 htd.addFamily(a);
1173 HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
1174 htd.addFamily(b);
1175 HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
1176 htd.addFamily(c);
1177 return htd;
1178 }
1179 }