1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertNull;
26 import static org.junit.Assert.assertTrue;
27 import static org.junit.Assert.fail;
28
29 import java.io.IOException;
30 import java.lang.reflect.Method;
31 import java.net.BindException;
32 import java.util.List;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FSDataInputStream;
38 import org.apache.hadoop.fs.FSDataOutputStream;
39 import org.apache.hadoop.fs.FileStatus;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.Coprocessor;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HColumnDescriptor;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.HRegionInfo;
48 import org.apache.hadoop.hbase.HTableDescriptor;
49 import org.apache.hadoop.hbase.KeyValue;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
53 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
54 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
55 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
56 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter;
57 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
58 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
59 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
60 import org.apache.hadoop.hbase.testclassification.MediumTests;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.FSUtils;
63 import org.apache.hadoop.hbase.util.Threads;
64 import org.apache.hadoop.hdfs.DistributedFileSystem;
65 import org.apache.hadoop.hdfs.MiniDFSCluster;
66 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
67 import org.junit.After;
68 import org.junit.AfterClass;
69 import org.junit.Before;
70 import org.junit.BeforeClass;
71 import org.junit.Rule;
72 import org.junit.Test;
73 import org.junit.experimental.categories.Category;
74 import org.junit.rules.TestName;
75
76
77
78
79 @Category(MediumTests.class)
80 public class TestWALFactory {
81 private static final Log LOG = LogFactory.getLog(TestWALFactory.class);
82
83 protected static Configuration conf;
84 private static MiniDFSCluster cluster;
85 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
86 protected static Path hbaseDir;
87
88 protected FileSystem fs;
89 protected Path dir;
90 protected WALFactory wals;
91
92 @Rule
93 public final TestName currentTest = new TestName();
94
95 @Before
96 public void setUp() throws Exception {
97 fs = cluster.getFileSystem();
98 dir = new Path(hbaseDir, currentTest.getMethodName());
99 wals = new WALFactory(conf, null, currentTest.getMethodName());
100 }
101
102 @After
103 public void tearDown() throws Exception {
104
105 try {
106 wals.close();
107 } catch (IOException exception) {
108 LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" +
109 " may be the cause. Message: " + exception);
110 LOG.debug("Exception details for failure to close wal factory.", exception);
111 }
112 FileStatus[] entries = fs.listStatus(new Path("/"));
113 for (FileStatus dir : entries) {
114 fs.delete(dir.getPath(), true);
115 }
116 }
117
118 @BeforeClass
119 public static void setUpBeforeClass() throws Exception {
120
121 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
122
123 TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
124 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
125
126 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
127 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
128 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
129
130
131 TEST_UTIL.getConfiguration()
132 .setInt("hbase.ipc.client.connect.max.retries", 1);
133 TEST_UTIL.getConfiguration().setInt(
134 "dfs.client.block.recovery.retries", 1);
135 TEST_UTIL.getConfiguration().setInt(
136 "hbase.ipc.client.connection.maxidletime", 500);
137 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
138 SampleRegionWALObserver.class.getName());
139 TEST_UTIL.startMiniDFSCluster(3);
140
141 conf = TEST_UTIL.getConfiguration();
142 cluster = TEST_UTIL.getDFSCluster();
143
144 hbaseDir = TEST_UTIL.createRootDir();
145 }
146
147 @AfterClass
148 public static void tearDownAfterClass() throws Exception {
149 TEST_UTIL.shutdownMiniCluster();
150 }
151
152 @Test
153 public void canCloseSingleton() throws IOException {
154 WALFactory.getInstance(conf).close();
155 }
156
157
158
159
160
161
162 @Test
163 public void testSplit() throws IOException {
164 final TableName tableName = TableName.valueOf(currentTest.getMethodName());
165 final byte [] rowName = tableName.getName();
166 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
167 final Path logdir = new Path(hbaseDir,
168 DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
169 Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
170 final int howmany = 3;
171 HRegionInfo[] infos = new HRegionInfo[3];
172 Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
173 fs.mkdirs(tabledir);
174 for(int i = 0; i < howmany; i++) {
175 infos[i] = new HRegionInfo(tableName,
176 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
177 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
178 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
179 }
180 HTableDescriptor htd = new HTableDescriptor(tableName);
181 htd.addFamily(new HColumnDescriptor("column"));
182
183
184 for (int ii = 0; ii < howmany; ii++) {
185 for (int i = 0; i < howmany; i++) {
186 final WAL log = wals.getWAL(infos[i].getEncodedNameAsBytes());
187 for (int j = 0; j < howmany; j++) {
188 WALEdit edit = new WALEdit();
189 byte [] family = Bytes.toBytes("column");
190 byte [] qualifier = Bytes.toBytes(Integer.toString(j));
191 byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
192 edit.add(new KeyValue(rowName, family, qualifier,
193 System.currentTimeMillis(), column));
194 LOG.info("Region " + i + ": " + edit);
195 WALKey walKey = new WALKey(infos[i].getEncodedNameAsBytes(), tableName,
196 System.currentTimeMillis(), mvcc);
197 log.append(htd, infos[i], walKey, edit, true);
198 walKey.getWriteEntry();
199 }
200 log.sync();
201 log.rollWriter(true);
202 }
203 }
204 wals.shutdown();
205 List<Path> splits = WALSplitter.split(hbaseDir, logdir, oldLogDir, fs, conf, wals);
206 verifySplits(splits, howmany);
207 }
208
209
210
211
212
213 @Test
214 public void Broken_testSync() throws Exception {
215 TableName tableName = TableName.valueOf(currentTest.getMethodName());
216 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
217
218 Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
219 FSDataOutputStream out = fs.create(p);
220 out.write(tableName.getName());
221 Method syncMethod = null;
222 try {
223 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
224 } catch (NoSuchMethodException e) {
225 try {
226 syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
227 } catch (NoSuchMethodException ex) {
228 fail("This version of Hadoop supports neither Syncable.sync() " +
229 "nor Syncable.hflush().");
230 }
231 }
232 syncMethod.invoke(out, new Object[]{});
233 FSDataInputStream in = fs.open(p);
234 assertTrue(in.available() > 0);
235 byte [] buffer = new byte [1024];
236 int read = in.read(buffer);
237 assertEquals(tableName.getName().length, read);
238 out.close();
239 in.close();
240
241 final int total = 20;
242 WAL.Reader reader = null;
243
244 try {
245 HRegionInfo info = new HRegionInfo(tableName,
246 null,null, false);
247 HTableDescriptor htd = new HTableDescriptor();
248 htd.addFamily(new HColumnDescriptor(tableName.getName()));
249 final WAL wal = wals.getWAL(info.getEncodedNameAsBytes());
250
251 for (int i = 0; i < total; i++) {
252 WALEdit kvs = new WALEdit();
253 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
254 wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
255 System.currentTimeMillis(), mvcc), kvs, true);
256 }
257
258
259 wal.sync();
260
261 Path walPath = DefaultWALProvider.getCurrentFileName(wal);
262 reader = wals.createReader(fs, walPath);
263 int count = 0;
264 WAL.Entry entry = new WAL.Entry();
265 while ((entry = reader.next(entry)) != null) count++;
266 assertEquals(total, count);
267 reader.close();
268
269
270 for (int i = 0; i < total; i++) {
271 WALEdit kvs = new WALEdit();
272 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
273 wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
274 System.currentTimeMillis(), mvcc), kvs, true);
275 }
276 wal.sync();
277 reader = wals.createReader(fs, walPath);
278 count = 0;
279 while((entry = reader.next(entry)) != null) count++;
280 assertTrue(count >= total);
281 reader.close();
282
283 wal.sync();
284 reader = wals.createReader(fs, walPath);
285 count = 0;
286 while((entry = reader.next(entry)) != null) count++;
287 assertEquals(total * 2, count);
288 reader.close();
289
290
291 final byte [] value = new byte[1025 * 1024];
292 for (int i = 0; i < total; i++) {
293 WALEdit kvs = new WALEdit();
294 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
295 wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
296 System.currentTimeMillis(), mvcc), kvs, true);
297 }
298
299 wal.sync();
300 reader = wals.createReader(fs, walPath);
301 count = 0;
302 while((entry = reader.next(entry)) != null) count++;
303 assertEquals(total * 3, count);
304 reader.close();
305
306 wal.shutdown();
307 reader = wals.createReader(fs, walPath);
308 count = 0;
309 while((entry = reader.next(entry)) != null) count++;
310 assertEquals(total * 3, count);
311 reader.close();
312 } finally {
313 if (reader != null) reader.close();
314 }
315 }
316
317 private void verifySplits(final List<Path> splits, final int howmany)
318 throws IOException {
319 assertEquals(howmany * howmany, splits.size());
320 for (int i = 0; i < splits.size(); i++) {
321 LOG.info("Verifying=" + splits.get(i));
322 WAL.Reader reader = wals.createReader(fs, splits.get(i));
323 try {
324 int count = 0;
325 String previousRegion = null;
326 long seqno = -1;
327 WAL.Entry entry = new WAL.Entry();
328 while((entry = reader.next(entry)) != null) {
329 WALKey key = entry.getKey();
330 String region = Bytes.toString(key.getEncodedRegionName());
331
332 if (previousRegion != null) {
333 assertEquals(previousRegion, region);
334 }
335 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
336 assertTrue(seqno < key.getLogSeqNum());
337 seqno = key.getLogSeqNum();
338 previousRegion = region;
339 count++;
340 }
341 assertEquals(howmany, count);
342 } finally {
343 reader.close();
344 }
345 }
346 }
347
348
349
350
351
352
353
354
355
356
357 @Test (timeout=300000)
358 public void testAppendClose() throws Exception {
359 TableName tableName =
360 TableName.valueOf(currentTest.getMethodName());
361 HRegionInfo regioninfo = new HRegionInfo(tableName,
362 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
363
364 final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
365 final int total = 20;
366
367 HTableDescriptor htd = new HTableDescriptor();
368 htd.addFamily(new HColumnDescriptor(tableName.getName()));
369
370 for (int i = 0; i < total; i++) {
371 WALEdit kvs = new WALEdit();
372 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
373 wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
374 System.currentTimeMillis()), kvs, true);
375 }
376
377 wal.sync();
378 int namenodePort = cluster.getNameNodePort();
379 final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
380
381
382
383 try {
384 DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
385 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
386 TEST_UTIL.shutdownMiniDFSCluster();
387 try {
388
389
390 wal.shutdown();
391 } catch (IOException e) {
392 LOG.info(e);
393 }
394 fs.close();
395 LOG.info("STOPPED first instance of the cluster");
396 } finally {
397
398 while (cluster.isClusterUp()){
399 LOG.error("Waiting for cluster to go down");
400 Thread.sleep(1000);
401 }
402 assertFalse(cluster.isClusterUp());
403 cluster = null;
404 for (int i = 0; i < 100; i++) {
405 try {
406 cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
407 break;
408 } catch (BindException e) {
409 LOG.info("Sleeping. BindException bringing up new cluster");
410 Threads.sleep(1000);
411 }
412 }
413 cluster.waitActive();
414 fs = cluster.getFileSystem();
415 LOG.info("STARTED second instance.");
416 }
417
418
419
420 Method setLeasePeriod = cluster.getClass()
421 .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
422 setLeasePeriod.setAccessible(true);
423 setLeasePeriod.invoke(cluster, 1000L, 1000L);
424 try {
425 Thread.sleep(1000);
426 } catch (InterruptedException e) {
427 LOG.info(e);
428 }
429
430
431 final FileSystem recoveredFs = fs;
432 final Configuration rlConf = conf;
433
434 class RecoverLogThread extends Thread {
435 public Exception exception = null;
436 public void run() {
437 try {
438 FSUtils.getInstance(fs, rlConf)
439 .recoverFileLease(recoveredFs, walPath, rlConf, null);
440 } catch (IOException e) {
441 exception = e;
442 }
443 }
444 }
445
446 RecoverLogThread t = new RecoverLogThread();
447 t.start();
448
449 t.join(60 * 1000);
450 if(t.isAlive()) {
451 t.interrupt();
452 throw new Exception("Timed out waiting for WAL.recoverLog()");
453 }
454
455 if (t.exception != null)
456 throw t.exception;
457
458
459 WAL.Reader reader = wals.createReader(fs, walPath);
460 int count = 0;
461 WAL.Entry entry = new WAL.Entry();
462 while (reader.next(entry) != null) {
463 count++;
464 assertTrue("Should be one KeyValue per WALEdit",
465 entry.getEdit().getCells().size() == 1);
466 }
467 assertEquals(total, count);
468 reader.close();
469
470
471 setLeasePeriod.invoke(cluster, new Object[]{new Long(60000), new Long(3600000)});
472 }
473
474
475
476
477
478 @Test
479 public void testEditAdd() throws IOException {
480 final int COL_COUNT = 10;
481 final HTableDescriptor htd =
482 new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
483 "column"));
484 final byte [] row = Bytes.toBytes("row");
485 WAL.Reader reader = null;
486 try {
487 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
488
489
490
491 long timestamp = System.currentTimeMillis();
492 WALEdit cols = new WALEdit();
493 for (int i = 0; i < COL_COUNT; i++) {
494 cols.add(new KeyValue(row, Bytes.toBytes("column"),
495 Bytes.toBytes(Integer.toString(i)),
496 timestamp, new byte[] { (byte)(i + '0') }));
497 }
498 HRegionInfo info = new HRegionInfo(htd.getTableName(),
499 row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
500 final WAL log = wals.getWAL(info.getEncodedNameAsBytes());
501
502 final long txid = log.append(htd, info,
503 new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
504 mvcc),
505 cols, true);
506 log.sync(txid);
507 log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
508 log.completeCacheFlush(info.getEncodedNameAsBytes());
509 log.shutdown();
510 Path filename = DefaultWALProvider.getCurrentFileName(log);
511
512 reader = wals.createReader(fs, filename);
513
514
515 for (int i = 0; i < 1; i++) {
516 WAL.Entry entry = reader.next(null);
517 if (entry == null) break;
518 WALKey key = entry.getKey();
519 WALEdit val = entry.getEdit();
520 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
521 assertTrue(htd.getTableName().equals(key.getTablename()));
522 Cell cell = val.getCells().get(0);
523 assertTrue(Bytes.equals(row, cell.getRow()));
524 assertEquals((byte)(i + '0'), cell.getValue()[0]);
525 System.out.println(key + " " + val);
526 }
527 } finally {
528 if (reader != null) {
529 reader.close();
530 }
531 }
532 }
533
534
535
536
537 @Test
538 public void testAppend() throws IOException {
539 final int COL_COUNT = 10;
540 final HTableDescriptor htd =
541 new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
542 "column"));
543 final byte [] row = Bytes.toBytes("row");
544 WAL.Reader reader = null;
545 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
546 try {
547
548
549 long timestamp = System.currentTimeMillis();
550 WALEdit cols = new WALEdit();
551 for (int i = 0; i < COL_COUNT; i++) {
552 cols.add(new KeyValue(row, Bytes.toBytes("column"),
553 Bytes.toBytes(Integer.toString(i)),
554 timestamp, new byte[] { (byte)(i + '0') }));
555 }
556 HRegionInfo hri = new HRegionInfo(htd.getTableName(),
557 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
558 final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
559 final long txid = log.append(htd, hri,
560 new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
561 mvcc),
562 cols, true);
563 log.sync(txid);
564 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
565 log.completeCacheFlush(hri.getEncodedNameAsBytes());
566 log.shutdown();
567 Path filename = DefaultWALProvider.getCurrentFileName(log);
568
569 reader = wals.createReader(fs, filename);
570 WAL.Entry entry = reader.next();
571 assertEquals(COL_COUNT, entry.getEdit().size());
572 int idx = 0;
573 for (Cell val : entry.getEdit().getCells()) {
574 assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
575 entry.getKey().getEncodedRegionName()));
576 assertTrue(htd.getTableName().equals(entry.getKey().getTablename()));
577 assertTrue(Bytes.equals(row, val.getRow()));
578 assertEquals((byte)(idx + '0'), val.getValue()[0]);
579 System.out.println(entry.getKey() + " " + val);
580 idx++;
581 }
582 } finally {
583 if (reader != null) {
584 reader.close();
585 }
586 }
587 }
588
589
590
591
592
593 @Test
594 public void testVisitors() throws Exception {
595 final int COL_COUNT = 10;
596 final TableName tableName =
597 TableName.valueOf("tablename");
598 final byte [] row = Bytes.toBytes("row");
599 final DumbWALActionsListener visitor = new DumbWALActionsListener();
600 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
601 long timestamp = System.currentTimeMillis();
602 HTableDescriptor htd = new HTableDescriptor();
603 htd.addFamily(new HColumnDescriptor("column"));
604
605 HRegionInfo hri = new HRegionInfo(tableName,
606 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
607 final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
608 log.registerWALActionsListener(visitor);
609 for (int i = 0; i < COL_COUNT; i++) {
610 WALEdit cols = new WALEdit();
611 cols.add(new KeyValue(row, Bytes.toBytes("column"),
612 Bytes.toBytes(Integer.toString(i)),
613 timestamp, new byte[]{(byte) (i + '0')}));
614 log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
615 System.currentTimeMillis(), mvcc), cols, true);
616 }
617 log.sync();
618 assertEquals(COL_COUNT, visitor.increments);
619 log.unregisterWALActionsListener(visitor);
620 WALEdit cols = new WALEdit();
621 cols.add(new KeyValue(row, Bytes.toBytes("column"),
622 Bytes.toBytes(Integer.toString(11)),
623 timestamp, new byte[]{(byte) (11 + '0')}));
624 log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
625 System.currentTimeMillis(), mvcc), cols, true);
626 log.sync();
627 assertEquals(COL_COUNT, visitor.increments);
628 }
629
630
631
632
633 @Test
634 public void testWALCoprocessorLoaded() throws Exception {
635
636 WALCoprocessorHost host = wals.getWAL(UNSPECIFIED_REGION).getCoprocessorHost();
637 Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
638 assertNotNull(c);
639 }
640
641
642
643
644 @Test
645 public void testReadLegacyLog() throws IOException {
646 final int columnCount = 5;
647 final int recordCount = 5;
648 final TableName tableName =
649 TableName.valueOf("tablename");
650 final byte[] row = Bytes.toBytes("row");
651 long timestamp = System.currentTimeMillis();
652 Path path = new Path(dir, "tempwal");
653 SequenceFileLogWriter sflw = null;
654 WAL.Reader reader = null;
655 try {
656 HRegionInfo hri = new HRegionInfo(tableName,
657 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
658 HTableDescriptor htd = new HTableDescriptor(tableName);
659 fs.mkdirs(dir);
660
661 sflw = new SequenceFileLogWriter();
662 sflw.init(fs, path, conf, false);
663 for (int i = 0; i < recordCount; ++i) {
664 WALKey key = new HLogKey(
665 hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
666 WALEdit edit = new WALEdit();
667 for (int j = 0; j < columnCount; ++j) {
668 if (i == 0) {
669 htd.addFamily(new HColumnDescriptor("column" + j));
670 }
671 String value = i + "" + j;
672 edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
673 }
674 sflw.append(new WAL.Entry(key, edit));
675 }
676 sflw.sync();
677 sflw.close();
678
679
680 reader = wals.createReader(fs, path);
681 assertTrue(reader instanceof SequenceFileLogReader);
682 for (int i = 0; i < recordCount; ++i) {
683 WAL.Entry entry = reader.next();
684 assertNotNull(entry);
685 assertEquals(columnCount, entry.getEdit().size());
686 assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
687 assertEquals(tableName, entry.getKey().getTablename());
688 int idx = 0;
689 for (Cell val : entry.getEdit().getCells()) {
690 assertTrue(Bytes.equals(row, val.getRow()));
691 String value = i + "" + idx;
692 assertArrayEquals(Bytes.toBytes(value), val.getValue());
693 idx++;
694 }
695 }
696 WAL.Entry entry = reader.next();
697 assertNull(entry);
698 } finally {
699 if (sflw != null) {
700 sflw.close();
701 }
702 if (reader != null) {
703 reader.close();
704 }
705 }
706 }
707
708 static class DumbWALActionsListener extends WALActionsListener.Base {
709 int increments = 0;
710
711 @Override
712 public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey,
713 WALEdit logEdit) {
714 increments++;
715 }
716
717 @Override
718 public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
719
720 increments++;
721 }
722 }
723
724 private static final byte[] UNSPECIFIED_REGION = new byte[]{};
725
726 }