1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.EOFException;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.HBaseTestingUtility;
41 import org.apache.hadoop.hbase.HColumnDescriptor;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HTableDescriptor;
44 import org.apache.hadoop.hbase.testclassification.LargeTests;
45 import org.apache.hadoop.hbase.MiniHBaseCluster;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.Admin;
49 import org.apache.hadoop.hbase.client.Get;
50 import org.apache.hadoop.hbase.client.HTable;
51 import org.apache.hadoop.hbase.client.Put;
52 import org.apache.hadoop.hbase.client.Result;
53 import org.apache.hadoop.hbase.client.ResultScanner;
54 import org.apache.hadoop.hbase.client.Scan;
55 import org.apache.hadoop.hbase.client.Table;
56 import org.apache.hadoop.hbase.fs.HFileSystem;
57 import org.apache.hadoop.hbase.regionserver.HRegionServer;
58 import org.apache.hadoop.hbase.regionserver.Region;
59 import org.apache.hadoop.hbase.regionserver.Store;
60 import org.apache.hadoop.hbase.util.Bytes;
61 import org.apache.hadoop.hbase.util.FSUtils;
62 import org.apache.hadoop.hbase.util.JVMClusterUtil;
63 import org.apache.hadoop.hbase.util.Threads;
64 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
65 import org.apache.hadoop.hbase.wal.WAL;
66 import org.apache.hadoop.hbase.wal.WALFactory;
67 import org.apache.hadoop.hdfs.MiniDFSCluster;
68 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
69 import org.apache.hadoop.hdfs.server.datanode.DataNode;
70 import org.junit.After;
71 import org.junit.Assert;
72 import org.junit.Before;
73 import org.junit.BeforeClass;
74 import org.junit.Test;
75 import org.junit.experimental.categories.Category;
76
77
78
79
80 @Category(LargeTests.class)
81 public class TestLogRolling {
82 private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
83 private HRegionServer server;
84 private String tableName;
85 private byte[] value;
86 private FileSystem fs;
87 private MiniDFSCluster dfsCluster;
88 private Admin admin;
89 private MiniHBaseCluster cluster;
90 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
91
92 public TestLogRolling() {
93 this.server = null;
94 this.tableName = null;
95
96 String className = this.getClass().getName();
97 StringBuilder v = new StringBuilder(className);
98 while (v.length() < 1000) {
99 v.append(className);
100 }
101 this.value = Bytes.toBytes(v.toString());
102 }
103
104
105
106 @BeforeClass
107 public static void setUpBeforeClass() throws Exception {
108
109
110 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
111
112
113
114 TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L);
115
116
117 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
118
119 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
120 TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
121
122
123 TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
124
125
126 TEST_UTIL.getConfiguration().setInt(
127 HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192);
128
129
130 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
131
132
133
134 TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
135
136
137
138 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
139
140
141 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
142 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
143
144
145 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
146 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2);
147 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
148 }
149
150 @Before
151 public void setUp() throws Exception {
152 TEST_UTIL.startMiniCluster(1, 1, 2);
153
154 cluster = TEST_UTIL.getHBaseCluster();
155 dfsCluster = TEST_UTIL.getDFSCluster();
156 fs = TEST_UTIL.getTestFileSystem();
157 admin = TEST_UTIL.getHBaseAdmin();
158
159
160 cluster.getMaster().balanceSwitch(false);
161 }
162
163 @After
164 public void tearDown() throws Exception {
165 TEST_UTIL.shutdownMiniCluster();
166 }
167
168 private void startAndWriteData() throws IOException, InterruptedException {
169
170 new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
171 this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
172
173 Table table = createTestTable(this.tableName);
174
175 server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
176 for (int i = 1; i <= 256; i++) {
177 doPut(table, i);
178 if (i % 32 == 0) {
179
180 try {
181 Thread.sleep(2000);
182 } catch (InterruptedException e) {
183
184 }
185 }
186 }
187 }
188
189
190
191
192 @Test(timeout=120000)
193 public void testLogRollOnNothingWritten() throws Exception {
194 final Configuration conf = TEST_UTIL.getConfiguration();
195 final WALFactory wals = new WALFactory(conf, null,
196 ServerName.valueOf("test.com",8080, 1).toString());
197 final WAL newLog = wals.getWAL(new byte[]{});
198 try {
199
200 newLog.rollWriter(true);
201 } finally {
202 wals.close();
203 }
204 }
205
206
207
208
209
210
211 @Test
212 public void testLogRolling() throws Exception {
213 this.tableName = getName();
214
215 startAndWriteData();
216 final WAL log = server.getWAL(null);
217 LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) +
218 " log files");
219
220
221 for (Region r: server.getOnlineRegionsLocalContext()) {
222 r.flush(true);
223 }
224
225
226 log.rollWriter();
227
228 int count = DefaultWALProvider.getNumRolledLogFiles(log);
229 LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
230 assertTrue(("actual count: " + count), count <= 2);
231 }
232
233 private static String getName() {
234 return "TestLogRolling";
235 }
236
237 void writeData(Table table, int rownum) throws IOException {
238 doPut(table, rownum);
239
240
241 try {
242 Thread.sleep(2000);
243 } catch (InterruptedException e) {
244
245 }
246 }
247
248 void validateData(Table table, int rownum) throws IOException {
249 String row = "row" + String.format("%1$04d", rownum);
250 Get get = new Get(Bytes.toBytes(row));
251 get.addFamily(HConstants.CATALOG_FAMILY);
252 Result result = table.get(get);
253 assertTrue(result.size() == 1);
254 assertTrue(Bytes.equals(value,
255 result.getValue(HConstants.CATALOG_FAMILY, null)));
256 LOG.info("Validated row " + row);
257 }
258
259 void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout)
260 throws IOException {
261 for (int i = 0; i < 10; i++) {
262 Put put = new Put(Bytes.toBytes("row"
263 + String.format("%1$04d", (start + i))));
264 put.add(HConstants.CATALOG_FAMILY, null, value);
265 table.put(put);
266 }
267 Put tmpPut = new Put(Bytes.toBytes("tmprow"));
268 tmpPut.add(HConstants.CATALOG_FAMILY, null, value);
269 long startTime = System.currentTimeMillis();
270 long remaining = timeout;
271 while (remaining > 0) {
272 if (log.isLowReplicationRollEnabled() == expect) {
273 break;
274 } else {
275
276 table.put(tmpPut);
277 try {
278 Thread.sleep(200);
279 } catch (InterruptedException e) {
280
281 }
282 remaining = timeout - (System.currentTimeMillis() - startTime);
283 }
284 }
285 }
286
287
288
289
290
291 @Test
292 public void testLogRollOnDatanodeDeath() throws Exception {
293 TEST_UTIL.ensureSomeRegionServersAvailable(2);
294 assertTrue("This test requires WAL file replication set to 2.",
295 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2);
296 LOG.info("Replication=" +
297 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
298
299 this.server = cluster.getRegionServer(0);
300
301
302 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(getName()));
303 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
304
305 admin.createTable(desc);
306 Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
307 assertTrue(((HTable) table).isAutoFlush());
308
309 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
310 final FSHLog log = (FSHLog) server.getWAL(null);
311 final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false);
312
313 log.registerWALActionsListener(new WALActionsListener.Base() {
314 @Override
315 public void logRollRequested(boolean lowReplication) {
316 if (lowReplication) {
317 lowReplicationHookCalled.lazySet(true);
318 }
319 }
320 });
321
322
323 assertTrue("Need append support for this test", FSUtils
324 .isAppendSupported(TEST_UTIL.getConfiguration()));
325
326
327
328
329
330 List<DataNode> existingNodes = dfsCluster.getDataNodes();
331 int numDataNodes = 3;
332 dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true,
333 null, null);
334 List<DataNode> allNodes = dfsCluster.getDataNodes();
335 for (int i = allNodes.size()-1; i >= 0; i--) {
336 if (existingNodes.contains(allNodes.get(i))) {
337 dfsCluster.stopDataNode( i );
338 }
339 }
340
341 assertTrue("DataNodes " + dfsCluster.getDataNodes().size() +
342 " default replication " +
343 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()),
344 dfsCluster.getDataNodes().size() >=
345 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1);
346
347 writeData(table, 2);
348
349 long curTime = System.currentTimeMillis();
350 LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName());
351 long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
352 assertTrue("Log should have a timestamp older than now",
353 curTime > oldFilenum && oldFilenum != -1);
354
355 assertTrue("The log shouldn't have rolled yet",
356 oldFilenum == DefaultWALProvider.extractFileNumFromWAL(log));
357 final DatanodeInfo[] pipeline = log.getPipeLine();
358 assertTrue(pipeline.length ==
359 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
360
361
362
363 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
364
365
366 writeData(table, 2);
367 long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
368
369 assertTrue("Missing datanode should've triggered a log roll",
370 newFilenum > oldFilenum && newFilenum > curTime);
371
372 assertTrue("The log rolling hook should have been called with the low replication flag",
373 lowReplicationHookCalled.get());
374
375
376 writeData(table, 3);
377 assertTrue("The log should not roll again.",
378 DefaultWALProvider.extractFileNumFromWAL(log) == newFilenum);
379
380
381 assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
382
383 batchWriteAndWait(table, log, 3, false, 14000);
384 int replication = log.getLogReplication();
385 assertTrue("LowReplication Roller should've been disabled, current replication="
386 + replication, !log.isLowReplicationRollEnabled());
387
388 dfsCluster
389 .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
390
391
392
393 log.rollWriter(true);
394 batchWriteAndWait(table, log, 13, true, 10000);
395 replication = log.getLogReplication();
396 assertTrue("New log file should have the default replication instead of " +
397 replication,
398 replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
399 assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled());
400 }
401
402
403
404
405
406
407 @Test
408 public void testLogRollOnPipelineRestart() throws Exception {
409 LOG.info("Starting testLogRollOnPipelineRestart");
410 assertTrue("This test requires WAL file replication.",
411 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1);
412 LOG.info("Replication=" +
413 fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
414
415 Table t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
416 try {
417 this.server = cluster.getRegionServer(0);
418
419
420 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(getName()));
421 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
422
423 admin.createTable(desc);
424 Table table = new HTable(TEST_UTIL.getConfiguration(), desc.getTableName());
425
426 server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
427 final WAL log = server.getWAL(null);
428 final List<Path> paths = new ArrayList<Path>();
429 final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
430
431 paths.add(DefaultWALProvider.getCurrentFileName(log));
432 log.registerWALActionsListener(new WALActionsListener.Base() {
433
434 @Override
435 public void preLogRoll(Path oldFile, Path newFile) {
436 LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);
437 preLogRolledCalled.add(new Integer(1));
438 }
439 @Override
440 public void postLogRoll(Path oldFile, Path newFile) {
441 paths.add(newFile);
442 }
443 });
444
445
446 assertTrue("Need append support for this test", FSUtils
447 .isAppendSupported(TEST_UTIL.getConfiguration()));
448
449 writeData(table, 1002);
450
451 long curTime = System.currentTimeMillis();
452 LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log));
453 long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
454 assertTrue("Log should have a timestamp older than now",
455 curTime > oldFilenum && oldFilenum != -1);
456
457 assertTrue("The log shouldn't have rolled yet", oldFilenum ==
458 DefaultWALProvider.extractFileNumFromWAL(log));
459
460
461 dfsCluster.restartDataNodes();
462 Thread.sleep(1000);
463 dfsCluster.waitActive();
464 LOG.info("Data Nodes restarted");
465 validateData(table, 1002);
466
467
468 writeData(table, 1003);
469 long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log);
470
471 assertTrue("Missing datanode should've triggered a log roll",
472 newFilenum > oldFilenum && newFilenum > curTime);
473 validateData(table, 1003);
474
475 writeData(table, 1004);
476
477
478 dfsCluster.restartDataNodes();
479 Thread.sleep(1000);
480 dfsCluster.waitActive();
481 LOG.info("Data Nodes restarted");
482 validateData(table, 1004);
483
484
485 writeData(table, 1005);
486
487
488 log.rollWriter(true);
489 assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
490 preLogRolledCalled.size() >= 1);
491
492
493 Set<String> loggedRows = new HashSet<String>();
494 FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
495 for (Path p : paths) {
496 LOG.debug("recovering lease for " + p);
497 fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p,
498 TEST_UTIL.getConfiguration(), null);
499
500 LOG.debug("Reading WAL "+FSUtils.getPath(p));
501 WAL.Reader reader = null;
502 try {
503 reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration());
504 WAL.Entry entry;
505 while ((entry = reader.next()) != null) {
506 LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getCells());
507 for (Cell cell : entry.getEdit().getCells()) {
508 loggedRows.add(Bytes.toStringBinary(cell.getRow()));
509 }
510 }
511 } catch (EOFException e) {
512 LOG.debug("EOF reading file "+FSUtils.getPath(p));
513 } finally {
514 if (reader != null) reader.close();
515 }
516 }
517
518
519 assertTrue(loggedRows.contains("row1002"));
520 assertTrue(loggedRows.contains("row1003"));
521 assertTrue(loggedRows.contains("row1004"));
522 assertTrue(loggedRows.contains("row1005"));
523
524
525 for (Region r: server.getOnlineRegionsLocalContext()) {
526 try {
527 r.flush(true);
528 } catch (Exception e) {
529
530
531
532
533
534 LOG.info(e);
535 }
536 }
537
538 ResultScanner scanner = table.getScanner(new Scan());
539 try {
540 for (int i=2; i<=5; i++) {
541 Result r = scanner.next();
542 assertNotNull(r);
543 assertFalse(r.isEmpty());
544 assertEquals("row100"+i, Bytes.toString(r.getRow()));
545 }
546 } finally {
547 scanner.close();
548 }
549
550
551 for (JVMClusterUtil.RegionServerThread rsThread:
552 TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
553 assertFalse(rsThread.getRegionServer().isAborted());
554 }
555 } finally {
556 if (t != null) t.close();
557 }
558 }
559
560
561
562
563
564 @Test
565 public void testCompactionRecordDoesntBlockRolling() throws Exception {
566 Table table = null;
567 Table table2 = null;
568
569
570 Table t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
571 try {
572 table = createTestTable(getName());
573 table2 = createTestTable(getName() + "1");
574
575 server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
576 final WAL log = server.getWAL(null);
577 Region region = server.getOnlineRegions(table2.getName()).get(0);
578 Store s = region.getStore(HConstants.CATALOG_FAMILY);
579
580
581 admin.flush(TableName.NAMESPACE_TABLE_NAME);
582
583
584 for (int i = 1; i <= 2; ++i) {
585 doPut(table2, i);
586 admin.flush(table2.getName());
587 }
588 doPut(table2, 3);
589 assertEquals("Should have no WAL after initial writes", 0,
590 DefaultWALProvider.getNumRolledLogFiles(log));
591 assertEquals(2, s.getStorefilesCount());
592
593
594 log.rollWriter();
595 assertEquals("Should have WAL; one table is not flushed", 1,
596 DefaultWALProvider.getNumRolledLogFiles(log));
597 admin.flush(table2.getName());
598 region.compact(false);
599
600 Assert.assertNotNull(s);
601 for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
602 Threads.sleepWithoutInterrupt(200);
603 }
604 assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
605
606
607 doPut(table, 0);
608 log.rollWriter();
609 assertEquals("Should have WAL; one table is not flushed", 1,
610 DefaultWALProvider.getNumRolledLogFiles(log));
611
612
613 admin.flush(table.getName());
614 doPut(table, 1);
615 log.rollWriter();
616 assertEquals("Should have 1 WALs at the end", 1,
617 DefaultWALProvider.getNumRolledLogFiles(log));
618 } finally {
619 if (t != null) t.close();
620 if (table != null) table.close();
621 if (table2 != null) table2.close();
622 }
623 }
624
625 private void doPut(Table table, int i) throws IOException {
626 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
627 put.add(HConstants.CATALOG_FAMILY, null, value);
628 table.put(put);
629 }
630
631 private Table createTestTable(String tableName) throws IOException {
632
633 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
634 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
635 admin.createTable(desc);
636 return new HTable(TEST_UTIL.getConfiguration(), desc.getTableName());
637 }
638 }
639