1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.List;
31
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.HRegionInfo;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
42 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
43 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44 import org.apache.hadoop.hbase.testclassification.LargeTests;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.wal.WAL;
47 import org.apache.hadoop.hbase.wal.WALFactory;
48 import org.apache.hadoop.hbase.wal.WALKey;
49 import org.apache.hadoop.hdfs.MiniDFSCluster;
50 import org.junit.After;
51 import org.junit.AfterClass;
52 import org.junit.Before;
53 import org.junit.BeforeClass;
54 import org.junit.Rule;
55 import org.junit.Test;
56 import org.junit.experimental.categories.Category;
57 import org.junit.rules.TestName;
58 import org.junit.runner.RunWith;
59 import org.junit.runners.Parameterized;
60 import org.junit.runners.Parameterized.Parameters;
61 @Category({LargeTests.class})
62 @RunWith(Parameterized.class)
63 public class TestReplicationWALReaderManager {
64
65 private static HBaseTestingUtility TEST_UTIL;
66 private static Configuration conf;
67 private static FileSystem fs;
68 private static MiniDFSCluster cluster;
69 private static final TableName tableName = TableName.valueOf("tablename");
70 private static final byte [] family = Bytes.toBytes("column");
71 private static final byte [] qualifier = Bytes.toBytes("qualifier");
72 private static final HRegionInfo info = new HRegionInfo(tableName,
73 HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
74 private static final HTableDescriptor htd = new HTableDescriptor(tableName);
75
76 private WAL log;
77 private ReplicationWALReaderManager logManager;
78 private PathWatcher pathWatcher;
79 private int nbRows;
80 private int walEditKVs;
81 @Rule public TestName tn = new TestName();
82 private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
83
84 @Parameters
85 public static Collection<Object[]> parameters() {
86
87 int[] NB_ROWS = { 1500, 60000 };
88 int[] NB_KVS = { 1, 100 };
89
90 Boolean[] BOOL_VALS = { false, true };
91 List<Object[]> parameters = new ArrayList<Object[]>();
92 for (int nbRows : NB_ROWS) {
93 for (int walEditKVs : NB_KVS) {
94 for (boolean b : BOOL_VALS) {
95 Object[] arr = new Object[3];
96 arr[0] = nbRows;
97 arr[1] = walEditKVs;
98 arr[2] = b;
99 parameters.add(arr);
100 }
101 }
102 }
103 return parameters;
104 }
105
106 public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
107 this.nbRows = nbRows;
108 this.walEditKVs = walEditKVs;
109 TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
110 enableCompression);
111 mvcc.advanceTo(1);
112 }
113
114 @BeforeClass
115 public static void setUpBeforeClass() throws Exception {
116 TEST_UTIL = new HBaseTestingUtility();
117 conf = TEST_UTIL.getConfiguration();
118 TEST_UTIL.startMiniDFSCluster(3);
119
120 cluster = TEST_UTIL.getDFSCluster();
121 fs = cluster.getFileSystem();
122 }
123
124 @AfterClass
125 public static void tearDownAfterClass() throws Exception {
126 TEST_UTIL.shutdownMiniCluster();
127 }
128
129 @Before
130 public void setUp() throws Exception {
131 logManager = new ReplicationWALReaderManager(fs, conf);
132 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
133 pathWatcher = new PathWatcher();
134 listeners.add(pathWatcher);
135 final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
136 log = wals.getWAL(info.getEncodedNameAsBytes());
137 }
138
139 @After
140 public void tearDown() throws Exception {
141 log.close();
142 }
143
144 @Test
145 public void test() throws Exception {
146
147 Path path = pathWatcher.currentPath;
148
149 assertEquals(0, logManager.getPosition());
150
151 appendToLog();
152
153
154 assertNotNull(logManager.openReader(path));
155 logManager.seek();
156 WAL.Entry entry = logManager.readNextAndSetPosition();
157 assertNotNull(entry);
158 entry = logManager.readNextAndSetPosition();
159 assertNull(entry);
160 logManager.closeReader();
161 long oldPos = logManager.getPosition();
162
163 appendToLog();
164
165
166 assertNotNull(logManager.openReader(path));
167 logManager.seek();
168 entry = logManager.readNextAndSetPosition();
169 assertNotEquals(oldPos, logManager.getPosition());
170 assertNotNull(entry);
171 logManager.closeReader();
172 oldPos = logManager.getPosition();
173
174 log.rollWriter();
175
176
177 assertNotNull(logManager.openReader(path));
178 logManager.seek();
179 entry = logManager.readNextAndSetPosition();
180 assertEquals(oldPos, logManager.getPosition());
181 assertNull(entry);
182 logManager.finishCurrentFile();
183
184 path = pathWatcher.currentPath;
185
186 for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
187 log.rollWriter();
188 logManager.openReader(path);
189 logManager.seek();
190 for (int i = 0; i < nbRows; i++) {
191 WAL.Entry e = logManager.readNextAndSetPosition();
192 if (e == null) {
193 fail("Should have enough entries");
194 }
195 }
196 }
197
198 private void appendToLog() throws IOException {
199 appendToLogPlus(1);
200 }
201
202 private void appendToLogPlus(int count) throws IOException {
203 final long txid = log.append(htd, info,
204 new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
205 getWALEdits(count), true);
206 log.sync(txid);
207 }
208
209 private WALEdit getWALEdits(int count) {
210 WALEdit edit = new WALEdit();
211 for (int i = 0; i < count; i++) {
212 edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
213 System.currentTimeMillis(), qualifier));
214 }
215 return edit;
216 }
217
218 class PathWatcher extends WALActionsListener.Base {
219
220 Path currentPath;
221
222 @Override
223 public void preLogRoll(Path oldPath, Path newPath) throws IOException {
224 currentPath = newPath;
225 }
226 }
227 }