View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  import static org.mockito.Matchers.any;
26  import static org.mockito.Matchers.eq;
27  import static org.mockito.Mockito.doAnswer;
28  import static org.mockito.Mockito.spy;
29  import static org.mockito.Mockito.when;
30  
31  import java.io.FilterInputStream;
32  import java.io.IOException;
33  import java.lang.reflect.Field;
34  import java.security.PrivilegedExceptionAction;
35  import java.util.ArrayList;
36  import java.util.Collection;
37  import java.util.HashSet;
38  import java.util.List;
39  import java.util.Set;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.fs.FSDataInputStream;
47  import org.apache.hadoop.fs.FileStatus;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.fs.PathFilter;
51  import org.apache.hadoop.hbase.Cell;
52  import org.apache.hadoop.hbase.HBaseConfiguration;
53  import org.apache.hadoop.hbase.HBaseTestingUtility;
54  import org.apache.hadoop.hbase.HColumnDescriptor;
55  import org.apache.hadoop.hbase.HConstants;
56  import org.apache.hadoop.hbase.HRegionInfo;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.KeyValue;
59  import org.apache.hadoop.hbase.MasterNotRunningException;
60  import org.apache.hadoop.hbase.testclassification.MediumTests;
61  import org.apache.hadoop.hbase.MiniHBaseCluster;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
65  import org.apache.hadoop.hbase.client.Delete;
66  import org.apache.hadoop.hbase.client.Get;
67  import org.apache.hadoop.hbase.client.HTable;
68  import org.apache.hadoop.hbase.client.Put;
69  import org.apache.hadoop.hbase.client.Result;
70  import org.apache.hadoop.hbase.client.ResultScanner;
71  import org.apache.hadoop.hbase.client.Scan;
72  import org.apache.hadoop.hbase.client.Table;
73  import org.apache.hadoop.hbase.master.HMaster;
74  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
75  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
76  import org.apache.hadoop.hbase.regionserver.*;
77  import org.apache.hadoop.hbase.security.User;
78  import org.apache.hadoop.hbase.util.Bytes;
79  import org.apache.hadoop.hbase.util.EnvironmentEdge;
80  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
81  import org.apache.hadoop.hbase.util.FSUtils;
82  import org.apache.hadoop.hbase.util.HFileTestUtil;
83  import org.apache.hadoop.hbase.util.Pair;
84  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
85  import org.apache.hadoop.hbase.wal.WAL;
86  import org.apache.hadoop.hbase.wal.WALFactory;
87  import org.apache.hadoop.hbase.wal.WALKey;
88  import org.apache.hadoop.hbase.wal.WALSplitter;
89  import org.apache.hadoop.hdfs.DFSInputStream;
90  import org.junit.After;
91  import org.junit.AfterClass;
92  import org.junit.Before;
93  import org.junit.BeforeClass;
94  import org.junit.Rule;
95  import org.junit.Test;
96  import org.junit.experimental.categories.Category;
97  import org.junit.rules.TestName;
98  import org.mockito.Mockito;
99  import org.mockito.invocation.InvocationOnMock;
100 import org.mockito.stubbing.Answer;
101 
102 /**
103  * Test replay of edits out of a WAL split.
104  */
105 @Category(MediumTests.class)
106 public class TestWALReplay {
107   private static final Log LOG = LogFactory.getLog(TestWALReplay.class);
108   static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
109   private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
110   private Path hbaseRootDir = null;
111   private String logName;
112   private Path oldLogDir;
113   private Path logDir;
114   private FileSystem fs;
115   private Configuration conf;
116   private RecoveryMode mode;
117   private WALFactory wals;
118 
119   @Rule
120   public final TestName currentTest = new TestName();
121 
122 
123   @BeforeClass
124   public static void setUpBeforeClass() throws Exception {
125     Configuration conf = TEST_UTIL.getConfiguration();
126     conf.setBoolean("dfs.support.append", true);
127     // The below config supported by 0.20-append and CDH3b2
128     conf.setInt("dfs.client.block.recovery.retries", 2);
129     TEST_UTIL.startMiniCluster(3);
130     Path hbaseRootDir =
131       TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
132     LOG.info("hbase.rootdir=" + hbaseRootDir);
133     FSUtils.setRootDir(conf, hbaseRootDir);
134   }
135 
136   @AfterClass
137   public static void tearDownAfterClass() throws Exception {
138     TEST_UTIL.shutdownMiniCluster();
139   }
140 
141   @Before
142   public void setUp() throws Exception {
143     this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
144     this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
145     this.hbaseRootDir = FSUtils.getRootDir(this.conf);
146     this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
147     this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual");
148     this.logDir = new Path(this.hbaseRootDir, logName);
149     if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
150       TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
151     }
152     this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
153         RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
154     this.wals = new WALFactory(conf, null, currentTest.getMethodName());
155   }
156 
157   @After
158   public void tearDown() throws Exception {
159     this.wals.close();
160     TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
161   }
162 
163   /*
164    * @param p Directory to cleanup
165    */
166   private void deleteDir(final Path p) throws IOException {
167     if (this.fs.exists(p)) {
168       if (!this.fs.delete(p, true)) {
169         throw new IOException("Failed remove of " + p);
170       }
171     }
172   }
173 
174   /**
175    *
176    * @throws Exception
177    */
178   @Test
179   public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
180     final TableName tableName =
181         TableName.valueOf("testReplayEditsAfterRegionMovedWithMultiCF");
182     byte[] family1 = Bytes.toBytes("cf1");
183     byte[] family2 = Bytes.toBytes("cf2");
184     byte[] qualifier = Bytes.toBytes("q");
185     byte[] value = Bytes.toBytes("testV");
186     byte[][] familys = { family1, family2 };
187     TEST_UTIL.createTable(tableName, familys);
188     Table htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
189     Put put = new Put(Bytes.toBytes("r1"));
190     put.add(family1, qualifier, value);
191     htable.put(put);
192     ResultScanner resultScanner = htable.getScanner(new Scan());
193     int count = 0;
194     while (resultScanner.next() != null) {
195       count++;
196     }
197     resultScanner.close();
198     assertEquals(1, count);
199 
200     MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
201     List<HRegion> regions = hbaseCluster.getRegions(tableName);
202     assertEquals(1, regions.size());
203 
204     // move region to another regionserver
205     Region destRegion = regions.get(0);
206     int originServerNum = hbaseCluster
207         .getServerWith(destRegion.getRegionInfo().getRegionName());
208     assertTrue("Please start more than 1 regionserver", hbaseCluster
209         .getRegionServerThreads().size() > 1);
210     int destServerNum = 0;
211     while (destServerNum == originServerNum) {
212       destServerNum++;
213     }
214     HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
215     HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
216     // move region to destination regionserver
217     moveRegionAndWait(destRegion, destServer);
218 
219     // delete the row
220     Delete del = new Delete(Bytes.toBytes("r1"));
221     htable.delete(del);
222     resultScanner = htable.getScanner(new Scan());
223     count = 0;
224     while (resultScanner.next() != null) {
225       count++;
226     }
227     resultScanner.close();
228     assertEquals(0, count);
229 
230     // flush region and make major compaction
231     Region region =  destServer.getOnlineRegion(destRegion.getRegionInfo().getRegionName());
232     region.flush(true);
233     // wait to complete major compaction
234     for (Store store : region.getStores()) {
235       store.triggerMajorCompaction();
236     }
237     region.compact(true);
238 
239     // move region to origin regionserver
240     moveRegionAndWait(destRegion, originServer);
241     // abort the origin regionserver
242     originServer.abort("testing");
243 
244     // see what we get
245     Result result = htable.get(new Get(Bytes.toBytes("r1")));
246     if (result != null) {
247       assertTrue("Row is deleted, but we get" + result.toString(),
248           (result == null) || result.isEmpty());
249     }
250     resultScanner.close();
251   }
252 
253   private void moveRegionAndWait(Region destRegion, HRegionServer destServer)
254       throws InterruptedException, MasterNotRunningException,
255       ZooKeeperConnectionException, IOException {
256     HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
257     TEST_UTIL.getHBaseAdmin().move(
258         destRegion.getRegionInfo().getEncodedNameAsBytes(),
259         Bytes.toBytes(destServer.getServerName().getServerName()));
260     while (true) {
261       ServerName serverName = master.getAssignmentManager()
262         .getRegionStates().getRegionServerOfRegion(destRegion.getRegionInfo());
263       if (serverName != null && serverName.equals(destServer.getServerName())) {
264         TEST_UTIL.assertRegionOnServer(
265           destRegion.getRegionInfo(), serverName, 200);
266         break;
267       }
268       Thread.sleep(10);
269     }
270   }
271 
272   /**
273    * Tests for hbase-2727.
274    * @throws Exception
275    * @see https://issues.apache.org/jira/browse/HBASE-2727
276    */
277   @Test
278   public void test2727() throws Exception {
279     // Test being able to have > 1 set of edits in the recovered.edits directory.
280     // Ensure edits are replayed properly.
281     final TableName tableName =
282         TableName.valueOf("test2727");
283 
284     MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
285     HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
286     Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
287     deleteDir(basedir);
288 
289     HTableDescriptor htd = createBasic3FamilyHTD(tableName);
290     HRegion region2 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
291     HRegion.closeHRegion(region2);
292     final byte [] rowName = tableName.getName();
293 
294     WAL wal1 = createWAL(this.conf);
295     // Add 1k to each family.
296     final int countPerFamily = 1000;
297 
298     for (HColumnDescriptor hcd: htd.getFamilies()) {
299       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
300           wal1, htd, mvcc);
301     }
302     wal1.shutdown();
303     runWALSplit(this.conf);
304 
305     WAL wal2 = createWAL(this.conf);
306     // Add 1k to each family.
307     for (HColumnDescriptor hcd: htd.getFamilies()) {
308       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
309           ee, wal2, htd, mvcc);
310     }
311     wal2.shutdown();
312     runWALSplit(this.conf);
313 
314     WAL wal3 = createWAL(this.conf);
315     try {
316       HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
317       long seqid = region.getOpenSeqNum();
318       // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
319       // When opened, this region would apply 6k edits, and increment the sequenceId by 1
320       assertTrue(seqid > mvcc.getWritePoint());
321       assertEquals(seqid - 1, mvcc.getWritePoint());
322       LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
323           + mvcc.getReadPoint());
324 
325       // TODO: Scan all.
326       region.close();
327     } finally {
328       wal3.close();
329     }
330   }
331 
332   /**
333    * Test case of HRegion that is only made out of bulk loaded files.  Assert
334    * that we don't 'crash'.
335    * @throws IOException
336    * @throws IllegalAccessException
337    * @throws NoSuchFieldException
338    * @throws IllegalArgumentException
339    * @throws SecurityException
340    */
341   @Test
342   public void testRegionMadeOfBulkLoadedFilesOnly()
343   throws IOException, SecurityException, IllegalArgumentException,
344       NoSuchFieldException, IllegalAccessException, InterruptedException {
345     final TableName tableName =
346         TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
347     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
348     final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
349     deleteDir(basedir);
350     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
351     HRegion region2 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
352     HRegion.closeHRegion(region2);
353     WAL wal = createWAL(this.conf);
354     HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
355 
356     byte [] family = htd.getFamilies().iterator().next().getName();
357     Path f =  new Path(basedir, "hfile");
358     HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
359         Bytes.toBytes("z"), 10);
360     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
361     hfs.add(Pair.newPair(family, f.toString()));
362     region.bulkLoadHFiles(hfs, true, null);
363 
364     // Add an edit so something in the WAL
365     byte [] row = tableName.getName();
366     region.put((new Put(row)).add(family, family, family));
367     wal.sync();
368     final int rowsInsertedCount = 11;
369 
370     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
371 
372     // Now 'crash' the region by stealing its wal
373     final Configuration newConf = HBaseConfiguration.create(this.conf);
374     User user = HBaseTestingUtility.getDifferentUser(newConf,
375         tableName.getNameAsString());
376     user.runAs(new PrivilegedExceptionAction() {
377       @Override
378       public Object run() throws Exception {
379         runWALSplit(newConf);
380         WAL wal2 = createWAL(newConf);
381 
382         HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
383           hbaseRootDir, hri, htd, wal2);
384         long seqid2 = region2.getOpenSeqNum();
385         assertTrue(seqid2 > -1);
386         assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
387 
388         // I can't close wal1.  Its been appropriated when we split.
389         region2.close();
390         wal2.close();
391         return null;
392       }
393     });
394   }
395 
396   /**
397    * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
398    * files) and an edit in the memstore.
399    * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries
400    * from being replayed"
401    * @throws IOException
402    * @throws IllegalAccessException
403    * @throws NoSuchFieldException
404    * @throws IllegalArgumentException
405    * @throws SecurityException
406    */
407   @Test
408   public void testCompactedBulkLoadedFiles()
409       throws IOException, SecurityException, IllegalArgumentException,
410       NoSuchFieldException, IllegalAccessException, InterruptedException {
411     final TableName tableName =
412         TableName.valueOf("testCompactedBulkLoadedFiles");
413     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
414     final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
415     deleteDir(basedir);
416     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
417     HRegion region2 = HRegion.createHRegion(hri,
418         hbaseRootDir, this.conf, htd);
419     HRegion.closeHRegion(region2);
420     WAL wal = createWAL(this.conf);
421     HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
422 
423     // Add an edit so something in the WAL
424     byte [] row = tableName.getName();
425     byte [] family = htd.getFamilies().iterator().next().getName();
426     region.put((new Put(row)).add(family, family, family));
427     wal.sync();
428 
429     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
430     for (int i = 0; i < 3; i++) {
431       Path f = new Path(basedir, "hfile"+i);
432       HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
433           Bytes.toBytes(i + "50"), 10);
434       hfs.add(Pair.newPair(family, f.toString()));
435     }
436     region.bulkLoadHFiles(hfs, true, null);
437     final int rowsInsertedCount = 31;
438     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
439 
440     // major compact to turn all the bulk loaded files into one normal file
441     region.compact(true);
442     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
443 
444     // Now 'crash' the region by stealing its wal
445     final Configuration newConf = HBaseConfiguration.create(this.conf);
446     User user = HBaseTestingUtility.getDifferentUser(newConf,
447         tableName.getNameAsString());
448     user.runAs(new PrivilegedExceptionAction() {
449       @Override
450       public Object run() throws Exception {
451         runWALSplit(newConf);
452         WAL wal2 = createWAL(newConf);
453 
454         HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
455             hbaseRootDir, hri, htd, wal2);
456         long seqid2 = region2.getOpenSeqNum();
457         assertTrue(seqid2 > -1);
458         assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
459 
460         // I can't close wal1.  Its been appropriated when we split.
461         region2.close();
462         wal2.close();
463         return null;
464       }
465     });
466   }
467 
468 
469   /**
470    * Test writing edits into an HRegion, closing it, splitting logs, opening
471    * Region again.  Verify seqids.
472    * @throws IOException
473    * @throws IllegalAccessException
474    * @throws NoSuchFieldException
475    * @throws IllegalArgumentException
476    * @throws SecurityException
477    */
478   @Test
479   public void testReplayEditsWrittenViaHRegion()
480   throws IOException, SecurityException, IllegalArgumentException,
481       NoSuchFieldException, IllegalAccessException, InterruptedException {
482     final TableName tableName =
483         TableName.valueOf("testReplayEditsWrittenViaHRegion");
484     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
485     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
486     deleteDir(basedir);
487     final byte[] rowName = tableName.getName();
488     final int countPerFamily = 10;
489     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
490     HRegion region3 = HRegion.createHRegion(hri,
491             hbaseRootDir, this.conf, htd);
492     HRegion.closeHRegion(region3);
493     // Write countPerFamily edits into the three families.  Do a flush on one
494     // of the families during the load of edits so its seqid is not same as
495     // others to test we do right thing when different seqids.
496     WAL wal = createWAL(this.conf);
497     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
498     long seqid = region.getOpenSeqNum();
499     boolean first = true;
500     for (HColumnDescriptor hcd: htd.getFamilies()) {
501       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
502       if (first) {
503         // If first, so we have at least one family w/ different seqid to rest.
504         region.flush(true);
505         first = false;
506       }
507     }
508     // Now assert edits made it in.
509     final Get g = new Get(rowName);
510     Result result = region.get(g);
511     assertEquals(countPerFamily * htd.getFamilies().size(),
512       result.size());
513     // Now close the region (without flush), split the log, reopen the region and assert that
514     // replay of log has the correct effect, that our seqids are calculated correctly so
515     // all edits in logs are seen as 'stale'/old.
516     region.close(true);
517     wal.shutdown();
518     runWALSplit(this.conf);
519     WAL wal2 = createWAL(this.conf);
520     HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
521     long seqid2 = region2.getOpenSeqNum();
522     assertTrue(seqid + result.size() < seqid2);
523     final Result result1b = region2.get(g);
524     assertEquals(result.size(), result1b.size());
525 
526     // Next test.  Add more edits, then 'crash' this region by stealing its wal
527     // out from under it and assert that replay of the log adds the edits back
528     // correctly when region is opened again.
529     for (HColumnDescriptor hcd: htd.getFamilies()) {
530       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
531     }
532     // Get count of edits.
533     final Result result2 = region2.get(g);
534     assertEquals(2 * result.size(), result2.size());
535     wal2.sync();
536     final Configuration newConf = HBaseConfiguration.create(this.conf);
537     User user = HBaseTestingUtility.getDifferentUser(newConf,
538       tableName.getNameAsString());
539     user.runAs(new PrivilegedExceptionAction() {
540       @Override
541       public Object run() throws Exception {
542         runWALSplit(newConf);
543         FileSystem newFS = FileSystem.get(newConf);
544         // Make a new wal for new region open.
545         WAL wal3 = createWAL(newConf);
546         final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
547         HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
548           @Override
549           protected boolean restoreEdit(Store s, Cell cell) {
550             boolean b = super.restoreEdit(s, cell);
551             countOfRestoredEdits.incrementAndGet();
552             return b;
553           }
554         };
555         long seqid3 = region3.initialize();
556         Result result3 = region3.get(g);
557         // Assert that count of cells is same as before crash.
558         assertEquals(result2.size(), result3.size());
559         assertEquals(htd.getFamilies().size() * countPerFamily,
560           countOfRestoredEdits.get());
561 
562         // I can't close wal1.  Its been appropriated when we split.
563         region3.close();
564         wal3.close();
565         return null;
566       }
567     });
568   }
569 
570   /**
571    * Test that we recover correctly when there is a failure in between the
572    * flushes. i.e. Some stores got flushed but others did not.
573    *
574    * Unfortunately, there is no easy hook to flush at a store level. The way
575    * we get around this is by flushing at the region level, and then deleting
576    * the recently flushed store file for one of the Stores. This would put us
577    * back in the situation where all but that store got flushed and the region
578    * died.
579    *
580    * We restart Region again, and verify that the edits were replayed.
581    *
582    * @throws IOException
583    * @throws IllegalAccessException
584    * @throws NoSuchFieldException
585    * @throws IllegalArgumentException
586    * @throws SecurityException
587    */
588   @Test
589   public void testReplayEditsAfterPartialFlush()
590   throws IOException, SecurityException, IllegalArgumentException,
591       NoSuchFieldException, IllegalAccessException, InterruptedException {
592     final TableName tableName =
593         TableName.valueOf("testReplayEditsWrittenViaHRegion");
594     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
595     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
596     deleteDir(basedir);
597     final byte[] rowName = tableName.getName();
598     final int countPerFamily = 10;
599     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
600     HRegion region3 = HRegion.createHRegion(hri,
601             hbaseRootDir, this.conf, htd);
602     HRegion.closeHRegion(region3);
603     // Write countPerFamily edits into the three families.  Do a flush on one
604     // of the families during the load of edits so its seqid is not same as
605     // others to test we do right thing when different seqids.
606     WAL wal = createWAL(this.conf);
607     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
608     long seqid = region.getOpenSeqNum();
609     for (HColumnDescriptor hcd: htd.getFamilies()) {
610       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
611     }
612 
613     // Now assert edits made it in.
614     final Get g = new Get(rowName);
615     Result result = region.get(g);
616     assertEquals(countPerFamily * htd.getFamilies().size(),
617       result.size());
618 
619     // Let us flush the region
620     region.flush(true);
621     region.close(true);
622     wal.shutdown();
623 
624     // delete the store files in the second column family to simulate a failure
625     // in between the flushcache();
626     // we have 3 families. killing the middle one ensures that taking the maximum
627     // will make us fail.
628     int cf_count = 0;
629     for (HColumnDescriptor hcd: htd.getFamilies()) {
630       cf_count++;
631       if (cf_count == 2) {
632         region.getRegionFileSystem().deleteFamily(hcd.getNameAsString());
633       }
634     }
635 
636 
637     // Let us try to split and recover
638     runWALSplit(this.conf);
639     WAL wal2 = createWAL(this.conf);
640     HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
641     long seqid2 = region2.getOpenSeqNum();
642     assertTrue(seqid + result.size() < seqid2);
643 
644     final Result result1b = region2.get(g);
645     assertEquals(result.size(), result1b.size());
646   }
647 
648 
649   // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
650   // Only throws exception if throwExceptionWhenFlushing is set true.
651   public static class CustomStoreFlusher extends DefaultStoreFlusher {
652     // Switch between throw and not throw exception in flush
653     static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
654 
655     public CustomStoreFlusher(Configuration conf, Store store) {
656       super(conf, store);
657     }
658     @Override
659     public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
660         MonitoredTask status) throws IOException {
661       if (throwExceptionWhenFlushing.get()) {
662         throw new IOException("Simulated exception by tests");
663       }
664       return super.flushSnapshot(snapshot, cacheFlushId, status);
665     }
666 
667   };
668 
669   /**
670    * Test that we could recover the data correctly after aborting flush. In the
671    * test, first we abort flush after writing some data, then writing more data
672    * and flush again, at last verify the data.
673    * @throws IOException
674    */
675   @Test
676   public void testReplayEditsAfterAbortingFlush() throws IOException {
677     final TableName tableName =
678         TableName.valueOf("testReplayEditsAfterAbortingFlush");
679     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
680     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
681     deleteDir(basedir);
682     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
683     HRegion region3 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd);
684     region3.close();
685     region3.getWAL().close();
686     // Write countPerFamily edits into the three families. Do a flush on one
687     // of the families during the load of edits so its seqid is not same as
688     // others to test we do right thing when different seqids.
689     WAL wal = createWAL(this.conf);
690     RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
691     Mockito.doReturn(false).when(rsServices).isAborted();
692     when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
693     Configuration customConf = new Configuration(this.conf);
694     customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
695         CustomStoreFlusher.class.getName());
696     HRegion region =
697       HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
698     int writtenRowCount = 10;
699     List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
700         htd.getFamilies());
701     for (int i = 0; i < writtenRowCount; i++) {
702       Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
703       put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
704           Bytes.toBytes("val"));
705       region.put(put);
706     }
707 
708     // Now assert edits made it in.
709     RegionScanner scanner = region.getScanner(new Scan());
710     assertEquals(writtenRowCount, getScannedCount(scanner));
711 
712     // Let us flush the region
713     CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
714     try {
715       region.flush(true);
716       fail("Injected exception hasn't been thrown");
717     } catch (Throwable t) {
718       LOG.info("Expected simulated exception when flushing region,"
719           + t.getMessage());
720       // simulated to abort server
721       Mockito.doReturn(true).when(rsServices).isAborted();
722       region.setClosing(false); // region normally does not accept writes after
723       // DroppedSnapshotException. We mock around it for this test.
724     }
725     // writing more data
726     int moreRow = 10;
727     for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
728       Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
729       put.add(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
730           Bytes.toBytes("val"));
731       region.put(put);
732     }
733     writtenRowCount += moreRow;
734     // call flush again
735     CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
736     try {
737       region.flush(true);
738     } catch (IOException t) {
739       LOG.info("Expected exception when flushing region because server is stopped,"
740           + t.getMessage());
741     }
742 
743     region.close(true);
744     wal.shutdown();
745 
746     // Let us try to split and recover
747     runWALSplit(this.conf);
748     WAL wal2 = createWAL(this.conf);
749     Mockito.doReturn(false).when(rsServices).isAborted();
750     HRegion region2 =
751       HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
752     scanner = region2.getScanner(new Scan());
753     assertEquals(writtenRowCount, getScannedCount(scanner));
754   }
755 
756   private int getScannedCount(RegionScanner scanner) throws IOException {
757     int scannedCount = 0;
758     List<Cell> results = new ArrayList<Cell>();
759     while (true) {
760       boolean existMore = scanner.next(results);
761       if (!results.isEmpty())
762         scannedCount++;
763       if (!existMore)
764         break;
765       results.clear();
766     }
767     return scannedCount;
768   }
769 
770   /**
771    * Create an HRegion with the result of a WAL split and test we only see the
772    * good edits
773    * @throws Exception
774    */
775   @Test
776   public void testReplayEditsWrittenIntoWAL() throws Exception {
777     final TableName tableName =
778         TableName.valueOf("testReplayEditsWrittenIntoWAL");
779     final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
780     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
781     final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
782     deleteDir(basedir);
783 
784     final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
785     HRegion region2 = HRegion.createHRegion(hri,
786             hbaseRootDir, this.conf, htd);
787     HRegion.closeHRegion(region2);
788     final WAL wal = createWAL(this.conf);
789     final byte[] rowName = tableName.getName();
790     final byte[] regionName = hri.getEncodedNameAsBytes();
791 
792     // Add 1k to each family.
793     final int countPerFamily = 1000;
794     Set<byte[]> familyNames = new HashSet<byte[]>();
795     for (HColumnDescriptor hcd: htd.getFamilies()) {
796       addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
797           ee, wal, htd, mvcc);
798       familyNames.add(hcd.getName());
799     }
800 
801     // Add a cache flush, shouldn't have any effect
802     wal.startCacheFlush(regionName, familyNames);
803     wal.completeCacheFlush(regionName);
804 
805     // Add an edit to another family, should be skipped.
806     WALEdit edit = new WALEdit();
807     long now = ee.currentTime();
808     edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
809       now, rowName));
810     wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
811 
812     // Delete the c family to verify deletes make it over.
813     edit = new WALEdit();
814     now = ee.currentTime();
815     edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
816     wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
817 
818     // Sync.
819     wal.sync();
820     // Make a new conf and a new fs for the splitter to run on so we can take
821     // over old wal.
822     final Configuration newConf = HBaseConfiguration.create(this.conf);
823     User user = HBaseTestingUtility.getDifferentUser(newConf,
824       ".replay.wal.secondtime");
825     user.runAs(new PrivilegedExceptionAction<Void>() {
826       @Override
827       public Void run() throws Exception {
828         runWALSplit(newConf);
829         FileSystem newFS = FileSystem.get(newConf);
830         // 100k seems to make for about 4 flushes during HRegion#initialize.
831         newConf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 100);
832         // Make a new wal for new region.
833         WAL newWal = createWAL(newConf);
834         final AtomicInteger flushcount = new AtomicInteger(0);
835         try {
836           final HRegion region =
837               new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
838             @Override
839             protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
840                 final Collection<Store> storesToFlush, MonitoredTask status,
841                 boolean writeFlushWalMarker)
842                     throws IOException {
843               LOG.info("InternalFlushCache Invoked");
844               FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush,
845                   Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
846               flushcount.incrementAndGet();
847               return fs;
848             }
849           };
850           // The seq id this region has opened up with
851           long seqid = region.initialize();
852 
853           // The mvcc readpoint of from inserting data.
854           long writePoint = mvcc.getWritePoint();
855 
856           // We flushed during init.
857           assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
858           assertTrue((seqid - 1) == writePoint);
859 
860           Get get = new Get(rowName);
861           Result result = region.get(get);
862           // Make sure we only see the good edits
863           assertEquals(countPerFamily * (htd.getFamilies().size() - 1),
864             result.size());
865           region.close();
866         } finally {
867           newWal.close();
868         }
869         return null;
870       }
871     });
872   }
873 
874   @Test
875   // the following test is for HBASE-6065
876   public void testSequentialEditLogSeqNum() throws IOException {
877     final TableName tableName = TableName.valueOf(currentTest.getMethodName());
878     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
879     final Path basedir =
880         FSUtils.getTableDir(this.hbaseRootDir, tableName);
881     deleteDir(basedir);
882     final byte[] rowName = tableName.getName();
883     final int countPerFamily = 10;
884     final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
885 
886     // Mock the WAL
887     MockWAL wal = createMockWAL();
888 
889     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
890     for (HColumnDescriptor hcd : htd.getFamilies()) {
891       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
892     }
893 
894     // Let us flush the region
895     // But this time completeflushcache is not yet done
896     region.flush(true);
897     for (HColumnDescriptor hcd : htd.getFamilies()) {
898       addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
899     }
900     long lastestSeqNumber = region.getSequenceId();
901     // get the current seq no
902     wal.doCompleteCacheFlush = true;
903     // allow complete cache flush with the previous seq number got after first
904     // set of edits.
905     wal.completeCacheFlush(hri.getEncodedNameAsBytes());
906     wal.shutdown();
907     FileStatus[] listStatus = wal.getFiles();
908     assertNotNull(listStatus);
909     assertTrue(listStatus.length > 0);
910     WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
911         this.fs, this.conf, null, null, null, mode, wals);
912     FileStatus[] listStatus1 = this.fs.listStatus(
913       new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(),
914           "recovered.edits")), new PathFilter() {
915         @Override
916         public boolean accept(Path p) {
917           if (WALSplitter.isSequenceIdFile(p)) {
918             return false;
919           }
920           return true;
921         }
922       });
923     int editCount = 0;
924     for (FileStatus fileStatus : listStatus1) {
925       editCount = Integer.parseInt(fileStatus.getPath().getName());
926     }
927     // The sequence number should be same
928     assertEquals(
929         "The sequence number of the recoverd.edits and the current edit seq should be same",
930         lastestSeqNumber, editCount);
931   }
932 
933   /**
934    * testcase for https://issues.apache.org/jira/browse/HBASE-15252
935    */
936   @Test
937   public void testDatalossWhenInputError() throws IOException, InstantiationException,
938       IllegalAccessException {
939     final TableName tableName = TableName.valueOf("testDatalossWhenInputError");
940     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
941     final Path basedir = FSUtils.getTableDir(this.hbaseRootDir, tableName);
942     deleteDir(basedir);
943     final byte[] rowName = tableName.getName();
944     final int countPerFamily = 10;
945     final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
946     HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
947     Path regionDir = region1.getRegionFileSystem().getRegionDir();
948     HBaseTestingUtility.closeRegionAndWAL(region1);
949 
950     WAL wal = createWAL(this.conf);
951     HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
952     for (HColumnDescriptor hcd : htd.getFamilies()) {
953       addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
954     }
955     // Now assert edits made it in.
956     final Get g = new Get(rowName);
957     Result result = region.get(g);
958     assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
959     // Now close the region (without flush), split the log, reopen the region and assert that
960     // replay of log has the correct effect.
961     region.close(true);
962     wal.shutdown();
963 
964     runWALSplit(this.conf);
965 
966     // here we let the DFSInputStream throw an IOException just after the WALHeader.
967     Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
968     FSDataInputStream stream = fs.open(editFile);
969     stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
970     Class<? extends DefaultWALProvider.Reader> logReaderClass =
971         conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
972           DefaultWALProvider.Reader.class);
973     DefaultWALProvider.Reader reader = logReaderClass.newInstance();
974     reader.init(this.fs, editFile, conf, stream);
975     final long headerLength = stream.getPos();
976     reader.close();
977     FileSystem spyFs = spy(this.fs);
978     doAnswer(new Answer<FSDataInputStream>() {
979 
980       @Override
981       public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
982         FSDataInputStream stream = (FSDataInputStream) invocation.callRealMethod();
983         Field field = FilterInputStream.class.getDeclaredField("in");
984         field.setAccessible(true);
985         final DFSInputStream in = (DFSInputStream) field.get(stream);
986         DFSInputStream spyIn = spy(in);
987         doAnswer(new Answer<Integer>() {
988 
989           private long pos;
990 
991           @Override
992           public Integer answer(InvocationOnMock invocation) throws Throwable {
993             if (pos >= headerLength) {
994               throw new IOException("read over limit");
995             }
996             int b = (Integer) invocation.callRealMethod();
997             if (b > 0) {
998               pos += b;
999             }
1000             return b;
1001           }
1002         }).when(spyIn).read(any(byte[].class), any(int.class), any(int.class));
1003         doAnswer(new Answer<Void>() {
1004 
1005           @Override
1006           public Void answer(InvocationOnMock invocation) throws Throwable {
1007             invocation.callRealMethod();
1008             in.close();
1009             return null;
1010           }
1011         }).when(spyIn).close();
1012         field.set(stream, spyIn);
1013         return stream;
1014       }
1015     }).when(spyFs).open(eq(editFile));
1016 
1017     WAL wal2 = createWAL(this.conf);
1018     HRegion region2;
1019     try {
1020       // log replay should fail due to the IOException, otherwise we may lose data.
1021       region2 = HRegion.openHRegion(conf, spyFs, hbaseRootDir, hri, htd, wal2);
1022       assertEquals(result.size(), region2.get(g).size());
1023     } catch (IOException e) {
1024       assertEquals("read over limit", e.getMessage());
1025     }
1026     region2 = HRegion.openHRegion(conf, fs, hbaseRootDir, hri, htd, wal2);
1027     assertEquals(result.size(), region2.get(g).size());
1028   }
1029 
1030   static class MockWAL extends FSHLog {
1031     boolean doCompleteCacheFlush = false;
1032 
1033     public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
1034         throws IOException {
1035       super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
1036     }
1037 
1038     @Override
1039     public void completeCacheFlush(byte[] encodedRegionName) {
1040       if (!doCompleteCacheFlush) {
1041         return;
1042       }
1043       super.completeCacheFlush(encodedRegionName);
1044     }
1045   }
1046 
1047   private HTableDescriptor createBasic1FamilyHTD(final TableName tableName) {
1048     HTableDescriptor htd = new HTableDescriptor(tableName);
1049     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1050     htd.addFamily(a);
1051     return htd;
1052   }
1053 
1054   private MockWAL createMockWAL() throws IOException {
1055     MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
1056     // Set down maximum recovery so we dfsclient doesn't linger retrying something
1057     // long gone.
1058     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1059     return wal;
1060   }
1061 
1062   // Flusher used in this test.  Keep count of how often we are called and
1063   // actually run the flush inside here.
1064   class TestFlusher implements FlushRequester {
1065     private HRegion r;
1066 
1067     @Override
1068     public void requestFlush(Region region, boolean force) {
1069       try {
1070         r.flush(force);
1071       } catch (IOException e) {
1072         throw new RuntimeException("Exception flushing", e);
1073       }
1074     }
1075 
1076     @Override
1077     public void requestDelayedFlush(Region region, long when, boolean forceFlushAllStores) {
1078       // TODO Auto-generated method stub
1079 
1080     }
1081 
1082     @Override
1083     public void registerFlushRequestListener(FlushRequestListener listener) {
1084 
1085     }
1086 
1087     @Override
1088     public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
1089       return false;
1090     }
1091 
1092     @Override
1093     public void setGlobalMemstoreLimit(long globalMemStoreSize) {
1094 
1095     }
1096   }
1097 
1098   private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
1099       final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
1100       final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc)
1101   throws IOException {
1102     String familyStr = Bytes.toString(family);
1103     for (int j = 0; j < count; j++) {
1104       byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
1105       byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
1106       WALEdit edit = new WALEdit();
1107       edit.add(new KeyValue(rowName, family, qualifierBytes,
1108         ee.currentTime(), columnBytes));
1109       wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,999, mvcc),
1110           edit, true);
1111     }
1112     wal.sync();
1113   }
1114 
1115   static List<Put> addRegionEdits (final byte [] rowName, final byte [] family,
1116       final int count, EnvironmentEdge ee, final Region r,
1117       final String qualifierPrefix)
1118   throws IOException {
1119     List<Put> puts = new ArrayList<Put>();
1120     for (int j = 0; j < count; j++) {
1121       byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
1122       Put p = new Put(rowName);
1123       p.add(family, qualifier, ee.currentTime(), rowName);
1124       r.put(p);
1125       puts.add(p);
1126     }
1127     return puts;
1128   }
1129 
1130   /*
1131    * Creates an HRI around an HTD that has <code>tableName</code> and three
1132    * column families named 'a','b', and 'c'.
1133    * @param tableName Name of table to use when we create HTableDescriptor.
1134    */
1135    private HRegionInfo createBasic3FamilyHRegionInfo(final TableName tableName) {
1136     return new HRegionInfo(tableName, null, null, false);
1137    }
1138 
1139   /*
1140    * Run the split.  Verify only single split file made.
1141    * @param c
1142    * @return The single split file made
1143    * @throws IOException
1144    */
1145   private Path runWALSplit(final Configuration c) throws IOException {
1146     List<Path> splits = WALSplitter.split(
1147       hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
1148     // Split should generate only 1 file since there's only 1 region
1149     assertEquals("splits=" + splits, 1, splits.size());
1150     // Make sure the file exists
1151     assertTrue(fs.exists(splits.get(0)));
1152     LOG.info("Split file=" + splits.get(0));
1153     return splits.get(0);
1154   }
1155 
1156   /*
1157    * @param c
1158    * @return WAL with retries set down from 5 to 1 only.
1159    * @throws IOException
1160    */
1161   private WAL createWAL(final Configuration c) throws IOException {
1162     FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
1163     // Set down maximum recovery so we dfsclient doesn't linger retrying something
1164     // long gone.
1165     HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
1166     return wal;
1167   }
1168 
1169   private HTableDescriptor createBasic3FamilyHTD(final TableName tableName) {
1170     HTableDescriptor htd = new HTableDescriptor(tableName);
1171     HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
1172     htd.addFamily(a);
1173     HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
1174     htd.addFamily(b);
1175     HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
1176     htd.addFamily(c);
1177     return htd;
1178   }
1179 }