1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.Random;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.HBaseConfiguration;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HColumnDescriptor;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.HRegionInfo;
39 import org.apache.hadoop.hbase.HTableDescriptor;
40 import org.apache.hadoop.hbase.MiniHBaseCluster;
41 import org.apache.hadoop.hbase.NamespaceDescriptor;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.Waiter;
44 import org.apache.hadoop.hbase.client.Admin;
45 import org.apache.hadoop.hbase.client.Connection;
46 import org.apache.hadoop.hbase.client.ConnectionFactory;
47 import org.apache.hadoop.hbase.client.Get;
48 import org.apache.hadoop.hbase.client.HTable;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.client.Result;
51 import org.apache.hadoop.hbase.client.Table;
52 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
53 import org.apache.hadoop.hbase.testclassification.LargeTests;
54 import org.apache.hadoop.hbase.util.Bytes;
55 import org.apache.hadoop.hbase.util.JVMClusterUtil;
56 import org.apache.hadoop.hbase.util.Pair;
57 import org.apache.hadoop.hbase.wal.WAL;
58 import org.junit.Ignore;
59 import org.junit.Test;
60 import org.junit.experimental.categories.Category;
61
62 import com.google.common.hash.Hashing;
63
64
65
66
67 @Category(LargeTests.class)
68 public class TestPerColumnFamilyFlush {
69 private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class);
70
71 Region region = null;
72
73 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
74
75 private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
76
77 public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
78
79 public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
80 Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
81
82 public static final byte[] FAMILY1 = FAMILIES[0];
83
84 public static final byte[] FAMILY2 = FAMILIES[1];
85
86 public static final byte[] FAMILY3 = FAMILIES[2];
87
88 private void initHRegion(String callingMethod, Configuration conf) throws IOException {
89 HTableDescriptor htd = new HTableDescriptor(TABLENAME);
90 for (byte[] family : FAMILIES) {
91 htd.addFamily(new HColumnDescriptor(family));
92 }
93 HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
94 Path path = new Path(DIR, callingMethod);
95 region = HRegion.createHRegion(info, path, conf, htd);
96 }
97
98
99 private Put createPut(int familyNum, int putNum) {
100 byte[] qf = Bytes.toBytes("q" + familyNum);
101 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
102 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
103 Put p = new Put(row);
104 p.addColumn(FAMILIES[familyNum - 1], qf, val);
105 return p;
106 }
107
108
109 private Get createGet(int familyNum, int putNum) {
110 byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
111 return new Get(row);
112 }
113
114
115 void verifyEdit(int familyNum, int putNum, HTable table) throws IOException {
116 Result r = table.get(createGet(familyNum, putNum));
117 byte[] family = FAMILIES[familyNum - 1];
118 byte[] qf = Bytes.toBytes("q" + familyNum);
119 byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
120 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
121 assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
122 r.getFamilyMap(family).get(qf));
123 assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
124 Arrays.equals(r.getFamilyMap(family).get(qf), val));
125 }
126
127 @Test(timeout = 180000)
128 public void testSelectiveFlushWhenEnabled() throws IOException {
129
130 Configuration conf = HBaseConfiguration.create();
131 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
132 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
133 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100 * 1024);
134
135 initHRegion("testSelectiveFlushWhenEnabled", conf);
136
137 for (int i = 1; i <= 1200; i++) {
138 region.put(createPut(1, i));
139
140 if (i <= 100) {
141 region.put(createPut(2, i));
142 if (i <= 50) {
143 region.put(createPut(3, i));
144 }
145 }
146 }
147
148 long totalMemstoreSize = region.getMemstoreSize();
149
150
151 long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
152 long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
153 long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
154
155
156 long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
157 long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
158 long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
159
160
161 long smallestSeqInRegionCurrentMemstore = getWAL(region)
162 .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
163
164
165
166 assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
167
168
169 assertTrue(smallestSeqCF1 < smallestSeqCF2);
170 assertTrue(smallestSeqCF2 < smallestSeqCF3);
171 assertTrue(cf1MemstoreSize > 0);
172 assertTrue(cf2MemstoreSize > 0);
173 assertTrue(cf3MemstoreSize > 0);
174
175
176
177 assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
178 + cf2MemstoreSize + cf3MemstoreSize);
179
180
181 region.flush(false);
182
183
184 long oldCF2MemstoreSize = cf2MemstoreSize;
185 long oldCF3MemstoreSize = cf3MemstoreSize;
186
187
188 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
189 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
190 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
191 totalMemstoreSize = region.getMemstoreSize();
192 smallestSeqInRegionCurrentMemstore = getWAL(region)
193 .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
194
195
196
197 assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
198
199 assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
200
201 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
202
203
204 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
205
206 assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
207 + cf3MemstoreSize);
208
209
210 for (int i = 1200; i < 2400; i++) {
211 region.put(createPut(2, i));
212
213
214 if (i - 1200 < 100) {
215 region.put(createPut(3, i));
216 }
217 }
218
219
220 oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
221
222
223 region.flush(false);
224
225
226 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
227 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
228 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
229 totalMemstoreSize = region.getMemstoreSize();
230 smallestSeqInRegionCurrentMemstore = getWAL(region)
231 .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
232
233
234 assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
235 assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
236
237 assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
238 assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
239 assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
240
241
242
243
244
245
246 region.flush(true);
247
248
249
250
251 for (int i = 1; i <= 300; i++) {
252 region.put(createPut(1, i));
253 region.put(createPut(2, i));
254 region.put(createPut(3, i));
255 region.put(createPut(4, i));
256 region.put(createPut(5, i));
257 }
258
259 region.flush(false);
260
261
262
263 assertEquals(0, region.getMemstoreSize());
264 }
265
266 @Test(timeout = 180000)
267 public void testSelectiveFlushWhenNotEnabled() throws IOException {
268
269 Configuration conf = HBaseConfiguration.create();
270 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
271 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
272
273
274 initHRegion("testSelectiveFlushWhenNotEnabled", conf);
275
276 for (int i = 1; i <= 1200; i++) {
277 region.put(createPut(1, i));
278 if (i <= 100) {
279 region.put(createPut(2, i));
280 if (i <= 50) {
281 region.put(createPut(3, i));
282 }
283 }
284 }
285
286 long totalMemstoreSize = region.getMemstoreSize();
287
288
289 long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
290 long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
291 long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
292
293
294 assertTrue(cf1MemstoreSize > 0);
295 assertTrue(cf2MemstoreSize > 0);
296 assertTrue(cf3MemstoreSize > 0);
297
298
299
300 assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
301 + cf2MemstoreSize + cf3MemstoreSize);
302
303
304 region.flush(false);
305
306 cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
307 cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
308 cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
309 totalMemstoreSize = region.getMemstoreSize();
310 long smallestSeqInRegionCurrentMemstore = ((HRegion)region).getWAL()
311 .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
312
313
314 assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
315 assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
316 assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
317 assertEquals(0, totalMemstoreSize);
318 assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
319 }
320
321
322 private static Pair<Region, HRegionServer> getRegionWithName(TableName tableName) {
323 MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
324 List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
325 for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
326 HRegionServer hrs = rsts.get(i).getRegionServer();
327 for (Region region : hrs.getOnlineRegions(tableName)) {
328 return Pair.newPair(region, hrs);
329 }
330 }
331 return null;
332 }
333
334 private void doTestLogReplay() throws Exception {
335 Configuration conf = TEST_UTIL.getConfiguration();
336 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
337
338 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
339 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000);
340 final int numRegionServers = 4;
341 try {
342 TEST_UTIL.startMiniCluster(numRegionServers);
343 TEST_UTIL.getHBaseAdmin().createNamespace(
344 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
345 HTable table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
346 HTableDescriptor htd = table.getTableDescriptor();
347
348 for (byte[] family : FAMILIES) {
349 if (!htd.hasFamily(family)) {
350 htd.addFamily(new HColumnDescriptor(family));
351 }
352 }
353
354
355
356 for (int i = 1; i <= 80; i++) {
357 table.put(createPut(1, i));
358 if (i <= 10) {
359 table.put(createPut(2, i));
360 table.put(createPut(3, i));
361 }
362 }
363 table.flushCommits();
364 Thread.sleep(1000);
365
366 Pair<Region, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
367 Region desiredRegion = desiredRegionAndServer.getFirst();
368 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
369
370
371 desiredRegion.flush(false);
372
373 long totalMemstoreSize;
374 long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
375 totalMemstoreSize = desiredRegion.getMemstoreSize();
376
377
378 cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize();
379 cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize();
380 cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
381
382
383 assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
384
385 assertTrue(cf2MemstoreSize > 0);
386 assertTrue(cf3MemstoreSize > 0);
387 assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
388 + cf3MemstoreSize);
389
390
391
392
393
394 Thread.sleep(2000);
395
396
397 HRegionServer rs = desiredRegionAndServer.getSecond();
398 rs.abort("testing");
399
400
401
402
403
404
405 for (int i = 1; i <= 80; i++) {
406 verifyEdit(1, i, table);
407 if (i <= 10) {
408 verifyEdit(2, i, table);
409 verifyEdit(3, i, table);
410 }
411 }
412 } finally {
413 TEST_UTIL.shutdownMiniCluster();
414 }
415 }
416
417
418
419
420
421 @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 180000)
422 public void testLogReplayWithDistributedReplay() throws Exception {
423 TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
424 doTestLogReplay();
425 }
426
427
428 @Test(timeout = 180000)
429 public void testLogReplayWithDistributedLogSplit() throws Exception {
430 TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
431 doTestLogReplay();
432 }
433
434 private WAL getWAL(Region region) {
435 return ((HRegion)region).getWAL();
436 }
437
438 private int getNumRolledLogFiles(Region region) {
439 return ((FSHLog)getWAL(region)).getNumRolledLogFiles();
440 }
441
442
443
444
445
446
447
448 @Test(timeout = 180000)
449 public void testFlushingWhenLogRolling() throws Exception {
450 TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
451 Configuration conf = TEST_UTIL.getConfiguration();
452 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
453 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
454 long cfFlushSizeLowerBound = 2048;
455 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
456 cfFlushSizeLowerBound);
457
458
459 conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
460
461 conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
462
463 final int maxLogs = 10;
464 conf.setInt("hbase.regionserver.maxlogs", maxLogs);
465
466 final int numRegionServers = 1;
467 TEST_UTIL.startMiniCluster(numRegionServers);
468 try {
469 HTable table = null;
470 table = TEST_UTIL.createTable(tableName, FAMILIES);
471
472
473
474 try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
475 admin.flush(TableName.NAMESPACE_TABLE_NAME);
476 }
477 Pair<Region, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
478 final Region desiredRegion = desiredRegionAndServer.getFirst();
479 assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
480 LOG.info("Writing to region=" + desiredRegion);
481
482
483 for (int i = 1; i <= 3; i++) {
484 table.put(createPut(i, 0));
485 }
486
487
488 for (int i = 0; i < maxLogs; i++) {
489 for (int j = 0; j < 100; j++) {
490 table.put(createPut(1, i * 100 + j));
491 }
492 table.flushCommits();
493
494 int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
495 assertNull(getWAL(desiredRegion).rollWriter());
496 while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
497 Thread.sleep(100);
498 }
499 }
500 table.close();
501 assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
502 assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound);
503 assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound);
504 assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize() < cfFlushSizeLowerBound);
505 table.put(createPut(1, 12345678));
506 table.flushCommits();
507
508 desiredRegionAndServer.getSecond().walRoller.requestRollAll();
509
510 TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
511
512 @Override
513 public boolean evaluate() throws Exception {
514 return desiredRegion.getMemstoreSize() == 0;
515 }
516
517 @Override
518 public String explainFailure() throws Exception {
519 long memstoreSize = desiredRegion.getMemstoreSize();
520 if (memstoreSize > 0) {
521 return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
522 }
523 return "Unknown";
524 }
525 });
526 LOG.info("Finished waiting on flush after too many WALs...");
527
528 assertEquals(DefaultMemStore.DEEP_OVERHEAD,
529 desiredRegion.getStore(FAMILY1).getMemStoreSize());
530 assertEquals(DefaultMemStore.DEEP_OVERHEAD,
531 desiredRegion.getStore(FAMILY2).getMemStoreSize());
532 assertEquals(DefaultMemStore.DEEP_OVERHEAD,
533 desiredRegion.getStore(FAMILY3).getMemStoreSize());
534
535 assertNull(getWAL(desiredRegion).rollWriter(true));
536 assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
537 } finally {
538 TEST_UTIL.shutdownMiniCluster();
539 }
540 }
541
542 private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
543 Region region = getRegionWithName(table.getName()).getFirst();
544
545 byte[] qf = Bytes.toBytes("qf");
546 Random rand = new Random();
547 byte[] value1 = new byte[100];
548 byte[] value2 = new byte[200];
549 byte[] value3 = new byte[400];
550 for (int i = 0; i < 10000; i++) {
551 Put put = new Put(Bytes.toBytes("row-" + i));
552 rand.setSeed(i);
553 rand.nextBytes(value1);
554 rand.nextBytes(value2);
555 rand.nextBytes(value3);
556 put.addColumn(FAMILY1, qf, value1);
557 put.addColumn(FAMILY2, qf, value2);
558 put.addColumn(FAMILY3, qf, value3);
559 table.put(put);
560
561 while (region.getMemstoreSize() > memstoreFlushSize) {
562 Thread.sleep(100);
563 }
564 }
565 }
566
567
568
569 @Test(timeout = 180000)
570 public void testCompareStoreFileCount() throws Exception {
571 long memstoreFlushSize = 1024L * 1024;
572 Configuration conf = TEST_UTIL.getConfiguration();
573 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize);
574 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
575 conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 400 * 1024);
576 conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
577 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
578 ConstantSizeRegionSplitPolicy.class.getName());
579
580 HTableDescriptor htd = new HTableDescriptor(TABLENAME);
581 htd.setCompactionEnabled(false);
582 htd.addFamily(new HColumnDescriptor(FAMILY1));
583 htd.addFamily(new HColumnDescriptor(FAMILY2));
584 htd.addFamily(new HColumnDescriptor(FAMILY3));
585
586 LOG.info("==============Test with selective flush disabled===============");
587 int cf1StoreFileCount = -1;
588 int cf2StoreFileCount = -1;
589 int cf3StoreFileCount = -1;
590 int cf1StoreFileCount1 = -1;
591 int cf2StoreFileCount1 = -1;
592 int cf3StoreFileCount1 = -1;
593 try {
594 TEST_UTIL.startMiniCluster(1);
595 TEST_UTIL.getHBaseAdmin().createNamespace(
596 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
597 TEST_UTIL.getHBaseAdmin().createTable(htd);
598 TEST_UTIL.waitTableAvailable(TABLENAME);
599 Connection conn = ConnectionFactory.createConnection(conf);
600 Table table = conn.getTable(TABLENAME);
601 doPut(table, memstoreFlushSize);
602 table.close();
603 conn.close();
604
605 Region region = getRegionWithName(TABLENAME).getFirst();
606 cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
607 cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
608 cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
609 } finally {
610 TEST_UTIL.shutdownMiniCluster();
611 }
612
613 LOG.info("==============Test with selective flush enabled===============");
614 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
615 try {
616 TEST_UTIL.startMiniCluster(1);
617 TEST_UTIL.getHBaseAdmin().createNamespace(
618 NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
619 TEST_UTIL.getHBaseAdmin().createTable(htd);
620 Connection conn = ConnectionFactory.createConnection(conf);
621 Table table = conn.getTable(TABLENAME);
622 doPut(table, memstoreFlushSize);
623 table.close();
624 conn.close();
625
626 region = getRegionWithName(TABLENAME).getFirst();
627 cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
628 cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
629 cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
630 } finally {
631 TEST_UTIL.shutdownMiniCluster();
632 }
633
634 LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount
635 + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "
636 + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount);
637 LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1
638 + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", "
639 + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1);
640
641 assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
642 assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
643 }
644
645 public static void main(String[] args) throws Exception {
646 int numRegions = Integer.parseInt(args[0]);
647 long numRows = Long.parseLong(args[1]);
648
649 HTableDescriptor htd = new HTableDescriptor(TABLENAME);
650 htd.setMaxFileSize(10L * 1024 * 1024 * 1024);
651 htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
652 htd.addFamily(new HColumnDescriptor(FAMILY1));
653 htd.addFamily(new HColumnDescriptor(FAMILY2));
654 htd.addFamily(new HColumnDescriptor(FAMILY3));
655
656 Configuration conf = HBaseConfiguration.create();
657 Connection conn = ConnectionFactory.createConnection(conf);
658 Admin admin = conn.getAdmin();
659 if (admin.tableExists(TABLENAME)) {
660 admin.disableTable(TABLENAME);
661 admin.deleteTable(TABLENAME);
662 }
663 if (numRegions >= 3) {
664 byte[] startKey = new byte[16];
665 byte[] endKey = new byte[16];
666 Arrays.fill(endKey, (byte) 0xFF);
667 admin.createTable(htd, startKey, endKey, numRegions);
668 } else {
669 admin.createTable(htd);
670 }
671 admin.close();
672
673 Table table = conn.getTable(TABLENAME);
674 byte[] qf = Bytes.toBytes("qf");
675 Random rand = new Random();
676 byte[] value1 = new byte[16];
677 byte[] value2 = new byte[256];
678 byte[] value3 = new byte[4096];
679 for (long i = 0; i < numRows; i++) {
680 Put put = new Put(Hashing.md5().hashLong(i).asBytes());
681 rand.setSeed(i);
682 rand.nextBytes(value1);
683 rand.nextBytes(value2);
684 rand.nextBytes(value3);
685 put.addColumn(FAMILY1, qf, value1);
686 put.addColumn(FAMILY2, qf, value2);
687 put.addColumn(FAMILY3, qf, value3);
688 table.put(put);
689 if (i % 10000 == 0) {
690 LOG.info(i + " rows put");
691 }
692 }
693 table.close();
694 conn.close();
695 }
696 }