1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.procedure;
20
21 import java.io.IOException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.CountDownLatch;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HRegionInfo;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.MiniHBaseCluster;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.master.HMaster;
36 import org.apache.hadoop.hbase.procedure2.Procedure;
37 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
38 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
39 import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
40 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
41 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
42 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
43 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
44 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
45 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
46 import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
47 import org.apache.hadoop.hbase.testclassification.LargeTests;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.FSUtils;
50 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
51 import org.apache.hadoop.hbase.util.Threads;
52 import org.apache.hadoop.hdfs.MiniDFSCluster;
53 import org.apache.hadoop.hdfs.server.datanode.DataNode;
54
55 import org.junit.After;
56 import org.junit.Before;
57 import org.junit.Ignore;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.mockito.Mockito;
61
62 import static org.junit.Assert.assertEquals;
63 import static org.junit.Assert.assertFalse;
64 import static org.junit.Assert.assertTrue;
65 import static org.junit.Assert.fail;
66
67 @Category(LargeTests.class)
68 public class TestWALProcedureStoreOnHDFS {
69 private static final Log LOG = LogFactory.getLog(TestWALProcedureStoreOnHDFS.class);
70
71 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
72
73 private WALProcedureStore store;
74
75 private ProcedureStore.ProcedureStoreListener stopProcedureListener = new ProcedureStore.ProcedureStoreListener() {
76 @Override
77 public void postSync() {}
78
79 @Override
80 public void abortProcess() {
81 LOG.fatal("Abort the Procedure Store");
82 store.stop(true);
83 }
84 };
85
86 private static void initConfig(Configuration conf) {
87 conf.setInt("dfs.replication", 3);
88 conf.setInt("dfs.namenode.replication.min", 3);
89
90
91 conf.setInt("hbase.procedure.store.wal.wait.before.roll", 1000);
92 conf.setInt("hbase.procedure.store.wal.max.roll.retries", 10);
93 conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10);
94 }
95
96 public void setup() throws Exception {
97 MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
98
99 Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
100 store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
101 store.registerListener(stopProcedureListener);
102 store.start(8);
103 store.recoverLease();
104 }
105
106 public void tearDown() throws Exception {
107 store.stop(false);
108 UTIL.getDFSCluster().getFileSystem().delete(store.getLogDir(), true);
109
110 try {
111 UTIL.shutdownMiniCluster();
112 } catch (Exception e) {
113 LOG.warn("failure shutting down cluster", e);
114 }
115 }
116
117 @Test(timeout=60000, expected=RuntimeException.class)
118 public void testWalAbortOnLowReplication() throws Exception {
119 initConfig(UTIL.getConfiguration());
120 setup();
121 try {
122 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
123
124 LOG.info("Stop DataNode");
125 UTIL.getDFSCluster().stopDataNode(0);
126 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
127
128 store.insert(new TestProcedure(1, -1), null);
129 for (long i = 2; store.isRunning(); ++i) {
130 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
131 store.insert(new TestProcedure(i, -1), null);
132 Thread.sleep(100);
133 }
134 assertFalse(store.isRunning());
135 fail("The store.insert() should throw an exeption");
136 } finally {
137 tearDown();
138 }
139 }
140
141 @Test(timeout=60000)
142 public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
143 initConfig(UTIL.getConfiguration());
144 setup();
145 try {
146 assertEquals(3, UTIL.getDFSCluster().getDataNodes().size());
147 store.registerListener(new ProcedureStore.ProcedureStoreListener() {
148 @Override
149 public void postSync() {
150 Threads.sleepWithoutInterrupt(2000);
151 }
152
153 @Override
154 public void abortProcess() {}
155 });
156
157 final AtomicInteger reCount = new AtomicInteger(0);
158 Thread[] thread = new Thread[store.getNumThreads() * 2 + 1];
159 for (int i = 0; i < thread.length; ++i) {
160 final long procId = i + 1;
161 thread[i] = new Thread() {
162 public void run() {
163 try {
164 LOG.debug("[S] INSERT " + procId);
165 store.insert(new TestProcedure(procId, -1), null);
166 LOG.debug("[E] INSERT " + procId);
167 } catch (RuntimeException e) {
168 reCount.incrementAndGet();
169 LOG.debug("[F] INSERT " + procId + ": " + e.getMessage());
170 }
171 }
172 };
173 thread[i].start();
174 }
175
176 Thread.sleep(1000);
177 LOG.info("Stop DataNode");
178 UTIL.getDFSCluster().stopDataNode(0);
179 assertEquals(2, UTIL.getDFSCluster().getDataNodes().size());
180
181 for (int i = 0; i < thread.length; ++i) {
182 thread[i].join();
183 }
184
185 assertFalse(store.isRunning());
186 assertTrue(reCount.toString(), reCount.get() >= store.getNumThreads() &&
187 reCount.get() < thread.length);
188 } finally {
189 tearDown();
190 }
191 }
192
193 @Test(timeout=60000)
194 public void testWalRollOnLowReplication() throws Exception {
195 initConfig(UTIL.getConfiguration());
196 UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
197 setup();
198 try {
199 int dnCount = 0;
200 store.insert(new TestProcedure(1, -1), null);
201 UTIL.getDFSCluster().restartDataNode(dnCount);
202 for (long i = 2; i < 100; ++i) {
203 store.insert(new TestProcedure(i, -1), null);
204 waitForNumReplicas(3);
205 Thread.sleep(100);
206 if ((i % 30) == 0) {
207 LOG.info("Restart Data Node");
208 UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
209 }
210 }
211 assertTrue(store.isRunning());
212 } finally {
213 tearDown();
214 }
215 }
216
217 public void waitForNumReplicas(int numReplicas) throws Exception {
218 while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) {
219 Thread.sleep(100);
220 }
221
222 for (int i = 0; i < numReplicas; ++i) {
223 for (DataNode dn: UTIL.getDFSCluster().getDataNodes()) {
224 while (!dn.isDatanodeFullyStarted()) {
225 Thread.sleep(100);
226 }
227 }
228 }
229 }
230 }