View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.util.concurrent.atomic.AtomicInteger;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.Comparator;
29  import java.util.Iterator;
30  import java.util.HashSet;
31  import java.util.Set;
32  import java.util.Random;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
41  import org.apache.hadoop.hbase.procedure2.Procedure;
42  import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
43  import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
44  import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
45  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
46  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
47  import org.apache.hadoop.hbase.testclassification.SmallTests;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.io.IOUtils;
50  
51  import org.junit.After;
52  import org.junit.Before;
53  import org.junit.Assert;
54  import org.junit.Test;
55  import org.junit.experimental.categories.Category;
56  
57  import static org.junit.Assert.assertEquals;
58  import static org.junit.Assert.assertFalse;
59  import static org.junit.Assert.assertTrue;
60  import static org.junit.Assert.fail;
61  
62  @Category(SmallTests.class)
63  public class TestWALProcedureStore {
64    private static final Log LOG = LogFactory.getLog(TestWALProcedureStore.class);
65  
66    private static final int PROCEDURE_STORE_SLOTS = 1;
67    private static final Procedure NULL_PROC = null;
68  
69    private WALProcedureStore procStore;
70  
71    private HBaseCommonTestingUtility htu;
72    private FileSystem fs;
73    private Path testDir;
74    private Path logDir;
75  
76    @Before
77    public void setUp() throws IOException {
78      htu = new HBaseCommonTestingUtility();
79      testDir = htu.getDataTestDir();
80      fs = testDir.getFileSystem(htu.getConfiguration());
81      assertTrue(testDir.depth() > 1);
82  
83      logDir = new Path(testDir, "proc-logs");
84      procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
85      procStore.start(PROCEDURE_STORE_SLOTS);
86      procStore.recoverLease();
87      procStore.load(new LoadCounter());
88    }
89  
90    @After
91    public void tearDown() throws IOException {
92      procStore.stop(false);
93      fs.delete(logDir, true);
94    }
95  
96    private void storeRestart(ProcedureStore.ProcedureLoader loader) throws Exception {
97      procStore.stop(false);
98      procStore.start(PROCEDURE_STORE_SLOTS);
99      procStore.recoverLease();
100     procStore.load(loader);
101   }
102 
103   @Test
104   public void testEmptyRoll() throws Exception {
105     for (int i = 0; i < 10; ++i) {
106       procStore.periodicRollForTesting();
107     }
108     FileStatus[] status = fs.listStatus(logDir);
109     assertEquals(1, status.length);
110   }
111 
112   @Test
113   public void testEmptyLogLoad() throws Exception {
114     LoadCounter loader = new LoadCounter();
115     storeRestart(loader);
116     assertEquals(0, loader.getMaxProcId());
117     assertEquals(0, loader.getLoadedCount());
118     assertEquals(0, loader.getCorruptedCount());
119   }
120 
121   @Test
122   public void testLoad() throws Exception {
123     Set<Long> procIds = new HashSet<>();
124 
125     // Insert something in the log
126     Procedure proc1 = new TestSequentialProcedure();
127     procIds.add(proc1.getProcId());
128     procStore.insert(proc1, null);
129 
130     Procedure proc2 = new TestSequentialProcedure();
131     Procedure[] child2 = new Procedure[2];
132     child2[0] = new TestSequentialProcedure();
133     child2[1] = new TestSequentialProcedure();
134 
135     procIds.add(proc2.getProcId());
136     procIds.add(child2[0].getProcId());
137     procIds.add(child2[1].getProcId());
138     procStore.insert(proc2, child2);
139 
140     // Verify that everything is there
141     verifyProcIdsOnRestart(procIds);
142 
143     // Update and delete something
144     procStore.update(proc1);
145     procStore.update(child2[1]);
146     procStore.delete(child2[1].getProcId());
147     procIds.remove(child2[1].getProcId());
148 
149     // Verify that everything is there
150     verifyProcIdsOnRestart(procIds);
151 
152     // Remove 4 byte from the trailers
153     procStore.stop(false);
154     FileStatus[] logs = fs.listStatus(logDir);
155     assertEquals(3, logs.length);
156     for (int i = 0; i < logs.length; ++i) {
157       corruptLog(logs[i], 4);
158     }
159     verifyProcIdsOnRestart(procIds);
160   }
161 
162   @Test
163   public void testNoTrailerDoubleRestart() throws Exception {
164     // log-0001: proc 0, 1 and 2 are inserted
165     Procedure proc0 = new TestSequentialProcedure();
166     procStore.insert(proc0, null);
167     Procedure proc1 = new TestSequentialProcedure();
168     procStore.insert(proc1, null);
169     Procedure proc2 = new TestSequentialProcedure();
170     procStore.insert(proc2, null);
171     procStore.rollWriterForTesting();
172 
173     // log-0002: proc 1 deleted
174     procStore.delete(proc1.getProcId());
175     procStore.rollWriterForTesting();
176 
177     // log-0003: proc 2 is update
178     procStore.update(proc2);
179     procStore.rollWriterForTesting();
180 
181     // log-0004: proc 2 deleted
182     procStore.delete(proc2.getProcId());
183 
184     // stop the store and remove the trailer
185     procStore.stop(false);
186     FileStatus[] logs = fs.listStatus(logDir);
187     assertEquals(4, logs.length);
188     for (int i = 0; i < logs.length; ++i) {
189       corruptLog(logs[i], 4);
190     }
191 
192     // Test Load 1
193     LoadCounter loader = new LoadCounter();
194     storeRestart(loader);
195     assertEquals(1, loader.getLoadedCount());
196     assertEquals(0, loader.getCorruptedCount());
197 
198     // Test Load 2
199     assertEquals(5, fs.listStatus(logDir).length);
200     loader = new LoadCounter();
201     storeRestart(loader);
202     assertEquals(1, loader.getLoadedCount());
203     assertEquals(0, loader.getCorruptedCount());
204 
205     // remove proc-0
206     procStore.delete(proc0.getProcId());
207     procStore.periodicRollForTesting();
208     assertEquals(1, fs.listStatus(logDir).length);
209     storeRestart(loader);
210   }
211 
212   @Test
213   public void testCorruptedTrailer() throws Exception {
214     // Insert something
215     for (int i = 0; i < 100; ++i) {
216       procStore.insert(new TestSequentialProcedure(), null);
217     }
218 
219     // Stop the store
220     procStore.stop(false);
221 
222     // Remove 4 byte from the trailer
223     FileStatus[] logs = fs.listStatus(logDir);
224     assertEquals(1, logs.length);
225     corruptLog(logs[0], 4);
226 
227     LoadCounter loader = new LoadCounter();
228     storeRestart(loader);
229     assertEquals(100, loader.getLoadedCount());
230     assertEquals(0, loader.getCorruptedCount());
231   }
232 
233   @Test
234   public void testCorruptedEntries() throws Exception {
235     // Insert something
236     for (int i = 0; i < 100; ++i) {
237       procStore.insert(new TestSequentialProcedure(), null);
238     }
239 
240     // Stop the store
241     procStore.stop(false);
242 
243     // Remove some byte from the log
244     // (enough to cut the trailer and corrupt some entries)
245     FileStatus[] logs = fs.listStatus(logDir);
246     assertEquals(1, logs.length);
247     corruptLog(logs[0], 1823);
248 
249     LoadCounter loader = new LoadCounter();
250     storeRestart(loader);
251     assertTrue(procStore.getCorruptedLogs() != null);
252     assertEquals(1, procStore.getCorruptedLogs().size());
253     assertEquals(85, loader.getLoadedCount());
254     assertEquals(0, loader.getCorruptedCount());
255   }
256 
257   @Test
258   public void testCorruptedProcedures() throws Exception {
259     // Insert root-procedures
260     TestProcedure[] rootProcs = new TestProcedure[10];
261     for (int i = 1; i <= rootProcs.length; i++) {
262       rootProcs[i-1] = new TestProcedure(i, 0);
263       procStore.insert(rootProcs[i-1], null);
264       rootProcs[i-1].addStackId(0);
265       procStore.update(rootProcs[i-1]);
266     }
267     // insert root-child txn
268     procStore.rollWriterForTesting();
269     for (int i = 1; i <= rootProcs.length; i++) {
270       TestProcedure b = new TestProcedure(rootProcs.length + i, i);
271       rootProcs[i-1].addStackId(1);
272       procStore.insert(rootProcs[i-1], new Procedure[] { b });
273     }
274     // insert child updates
275     procStore.rollWriterForTesting();
276     for (int i = 1; i <= rootProcs.length; i++) {
277       procStore.update(new TestProcedure(rootProcs.length + i, i));
278     }
279 
280     // Stop the store
281     procStore.stop(false);
282 
283     // the first log was removed,
284     // we have insert-txn and updates in the others so everything is fine
285     FileStatus[] logs = fs.listStatus(logDir);
286     assertEquals(Arrays.toString(logs), 2, logs.length);
287     Arrays.sort(logs, new Comparator<FileStatus>() {
288       @Override
289       public int compare(FileStatus o1, FileStatus o2) {
290         return o1.getPath().getName().compareTo(o2.getPath().getName());
291       }
292     });
293 
294     LoadCounter loader = new LoadCounter();
295     storeRestart(loader);
296     assertEquals(rootProcs.length * 2, loader.getLoadedCount());
297     assertEquals(0, loader.getCorruptedCount());
298 
299     // Remove the second log, we have lost all the root/parent references
300     fs.delete(logs[0].getPath(), false);
301     loader.reset();
302     storeRestart(loader);
303     assertEquals(0, loader.getLoadedCount());
304     assertEquals(rootProcs.length, loader.getCorruptedCount());
305     for (Procedure proc: loader.getCorrupted()) {
306       assertTrue(proc.toString(), proc.getParentProcId() <= rootProcs.length);
307       assertTrue(proc.toString(),
308                   proc.getProcId() > rootProcs.length &&
309                   proc.getProcId() <= (rootProcs.length * 2));
310     }
311   }
312 
313   @Test(timeout=60000)
314   public void testWalReplayOrder_AB_A() throws Exception {
315     /*
316      * | A B | -> | A |
317      */
318     TestProcedure a = new TestProcedure(1, 0);
319     TestProcedure b = new TestProcedure(2, 1);
320 
321     procStore.insert(a, null);
322     a.addStackId(0);
323     procStore.update(a);
324 
325     procStore.insert(a, new Procedure[] { b });
326     b.addStackId(1);
327     procStore.update(b);
328 
329     procStore.rollWriterForTesting();
330 
331     a.addStackId(2);
332     procStore.update(a);
333 
334     storeRestart(new ProcedureStore.ProcedureLoader() {
335       @Override
336       public void setMaxProcId(long maxProcId) {
337         assertEquals(2, maxProcId);
338       }
339 
340       @Override
341       public void load(ProcedureIterator procIter) throws IOException {
342         assertTrue(procIter.hasNext());
343         assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
344         assertTrue(procIter.hasNext());
345         assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
346         assertFalse(procIter.hasNext());
347       }
348 
349       @Override
350       public void handleCorrupted(ProcedureIterator procIter) throws IOException {
351         assertFalse(procIter.hasNext());
352       }
353     });
354   }
355 
356   @Test(timeout=60000)
357   public void testWalReplayOrder_ABC_BAD() throws Exception {
358     /*
359      * | A B C | -> | B A D |
360      */
361     TestProcedure a = new TestProcedure(1, 0);
362     TestProcedure b = new TestProcedure(2, 1);
363     TestProcedure c = new TestProcedure(3, 2);
364     TestProcedure d = new TestProcedure(4, 0);
365 
366     procStore.insert(a, null);
367     a.addStackId(0);
368     procStore.update(a);
369 
370     procStore.insert(a, new Procedure[] { b });
371     b.addStackId(1);
372     procStore.update(b);
373 
374     procStore.insert(b, new Procedure[] { c });
375     b.addStackId(2);
376     procStore.update(b);
377 
378     procStore.rollWriterForTesting();
379 
380     b.addStackId(3);
381     procStore.update(b);
382 
383     a.addStackId(4);
384     procStore.update(a);
385 
386     procStore.insert(d, null);
387     d.addStackId(0);
388     procStore.update(d);
389 
390     storeRestart(new ProcedureStore.ProcedureLoader() {
391       @Override
392       public void setMaxProcId(long maxProcId) {
393         assertEquals(4, maxProcId);
394       }
395 
396       @Override
397       public void load(ProcedureIterator procIter) throws IOException {
398         assertTrue(procIter.hasNext());
399         assertEquals(4, procIter.nextAsProcedureInfo().getProcId());
400         // TODO: This will be multiple call once we do fast-start
401         //assertFalse(procIter.hasNext());
402 
403         assertTrue(procIter.hasNext());
404         assertEquals(1, procIter.nextAsProcedureInfo().getProcId());
405         assertTrue(procIter.hasNext());
406         assertEquals(2, procIter.nextAsProcedureInfo().getProcId());
407         assertTrue(procIter.hasNext());
408         assertEquals(3, procIter.nextAsProcedureInfo().getProcId());
409         assertFalse(procIter.hasNext());
410       }
411 
412       @Override
413       public void handleCorrupted(ProcedureIterator procIter) throws IOException {
414         assertFalse(procIter.hasNext());
415       }
416     });
417   }
418 
419   @Test
420   public void testInsertUpdateDelete() throws Exception {
421     final int NTHREAD = 2;
422 
423     procStore.stop(false);
424     fs.delete(logDir, true);
425 
426     org.apache.hadoop.conf.Configuration conf =
427       new org.apache.hadoop.conf.Configuration(htu.getConfiguration());
428     conf.setBoolean("hbase.procedure.store.wal.use.hsync", false);
429     conf.setInt("hbase.procedure.store.wal.periodic.roll.msec", 10000);
430     conf.setInt("hbase.procedure.store.wal.roll.threshold", 128 * 1024);
431 
432     fs.mkdirs(logDir);
433     procStore = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
434     procStore.start(NTHREAD);
435     procStore.recoverLease();
436 
437     LoadCounter loader = new LoadCounter();
438     procStore.load(loader);
439     assertEquals(0, loader.getMaxProcId());
440     assertEquals(0, loader.getLoadedCount());
441     assertEquals(0, loader.getCorruptedCount());
442 
443     final long LAST_PROC_ID = 9999;
444     final Thread[] thread = new Thread[NTHREAD];
445     final AtomicLong procCounter = new AtomicLong((long)Math.round(Math.random() * 100));
446     for (int i = 0; i < thread.length; ++i) {
447       thread[i] = new Thread() {
448         @Override
449         public void run() {
450           Random rand = new Random();
451           TestProcedure proc;
452           do {
453             proc = new TestProcedure(procCounter.addAndGet(1));
454             // Insert
455             procStore.insert(proc, null);
456             // Update
457             for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
458               try { Thread.sleep(0, rand.nextInt(15)); } catch (InterruptedException e) {}
459               procStore.update(proc);
460             }
461             // Delete
462             procStore.delete(proc.getProcId());
463           } while (proc.getProcId() < LAST_PROC_ID);
464         }
465       };
466       thread[i].start();
467     }
468 
469     for (int i = 0; i < thread.length; ++i) {
470       thread[i].join();
471     }
472 
473     procStore.getStoreTracker().dump();
474     assertTrue(procCounter.get() >= LAST_PROC_ID);
475     assertTrue(procStore.getStoreTracker().isEmpty());
476     assertEquals(1, procStore.getActiveLogs().size());
477   }
478 
479   @Test
480   public void testRollAndRemove() throws IOException {
481     // Insert something in the log
482     Procedure proc1 = new TestSequentialProcedure();
483     procStore.insert(proc1, null);
484 
485     Procedure proc2 = new TestSequentialProcedure();
486     procStore.insert(proc2, null);
487 
488     // roll the log, now we have 2
489     procStore.rollWriterForTesting();
490     assertEquals(2, procStore.getActiveLogs().size());
491 
492     // everything will be up to date in the second log
493     // so we can remove the first one
494     procStore.update(proc1);
495     procStore.update(proc2);
496     assertEquals(1, procStore.getActiveLogs().size());
497 
498     // roll the log, now we have 2
499     procStore.rollWriterForTesting();
500     assertEquals(2, procStore.getActiveLogs().size());
501 
502     // remove everything active
503     // so we can remove all the logs
504     procStore.delete(proc1.getProcId());
505     procStore.delete(proc2.getProcId());
506     assertEquals(1, procStore.getActiveLogs().size());
507   }
508 
509   private void corruptLog(final FileStatus logFile, final long dropBytes)
510       throws IOException {
511     assertTrue(logFile.getLen() > dropBytes);
512     LOG.debug("corrupt log " + logFile.getPath() +
513               " size=" + logFile.getLen() + " drop=" + dropBytes);
514     Path tmpPath = new Path(testDir, "corrupted.log");
515     InputStream in = fs.open(logFile.getPath());
516     OutputStream out =  fs.create(tmpPath);
517     IOUtils.copyBytes(in, out, logFile.getLen() - dropBytes, true);
518     if (!fs.rename(tmpPath, logFile.getPath())) {
519       throw new IOException("Unable to rename");
520     }
521   }
522 
523   private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
524     LOG.debug("expected: " + procIds);
525     LoadCounter loader = new LoadCounter();
526     storeRestart(loader);
527     assertEquals(procIds.size(), loader.getLoadedCount());
528     assertEquals(0, loader.getCorruptedCount());
529   }
530 
531   private void assertEmptyLogDir() {
532     try {
533       FileStatus[] status = fs.listStatus(logDir);
534       assertTrue("expected empty state-log dir", status == null || status.length == 0);
535     } catch (FileNotFoundException e) {
536       fail("expected the state-log dir to be present: " + logDir);
537     } catch (IOException e) {
538       fail("got en exception on state-log dir list: " + e.getMessage());
539     }
540   }
541 
542   public static class TestSequentialProcedure extends SequentialProcedure<Void> {
543     private static long seqid = 0;
544 
545     public TestSequentialProcedure() {
546       setProcId(++seqid);
547     }
548 
549     @Override
550     protected Procedure[] execute(Void env) { return null; }
551 
552     @Override
553     protected void rollback(Void env) { }
554 
555     @Override
556     protected boolean abort(Void env) { return false; }
557 
558     @Override
559     protected void serializeStateData(final OutputStream stream) throws IOException {
560       long procId = getProcId();
561       if (procId % 2 == 0) {
562         stream.write(Bytes.toBytes(procId));
563       }
564     }
565 
566     @Override
567     protected void deserializeStateData(InputStream stream) throws IOException {
568       long procId = getProcId();
569       if (procId % 2 == 0) {
570         byte[] bProcId = new byte[8];
571         assertEquals(8, stream.read(bProcId));
572         assertEquals(procId, Bytes.toLong(bProcId));
573       } else {
574         assertEquals(0, stream.available());
575       }
576     }
577   }
578 
579   private class LoadCounter implements ProcedureStore.ProcedureLoader {
580     private final ArrayList<Procedure> corrupted = new ArrayList<Procedure>();
581     private final ArrayList<Procedure> loaded = new ArrayList<Procedure>();
582 
583     private Set<Long> procIds;
584     private long maxProcId = 0;
585 
586     public LoadCounter() {
587       this(null);
588     }
589 
590     public LoadCounter(final Set<Long> procIds) {
591       this.procIds = procIds;
592     }
593 
594     public void reset() {
595       reset(null);
596     }
597 
598     public void reset(final Set<Long> procIds) {
599       corrupted.clear();
600       loaded.clear();
601       this.procIds = procIds;
602       this.maxProcId = 0;
603     }
604 
605     public long getMaxProcId() {
606       return maxProcId;
607     }
608 
609     public ArrayList<Procedure> getLoaded() {
610       return loaded;
611     }
612 
613     public int getLoadedCount() {
614       return loaded.size();
615     }
616 
617     public ArrayList<Procedure> getCorrupted() {
618       return corrupted;
619     }
620 
621     public int getCorruptedCount() {
622       return corrupted.size();
623     }
624 
625     @Override
626     public void setMaxProcId(long maxProcId) {
627       maxProcId = maxProcId;
628     }
629 
630     @Override
631     public void load(ProcedureIterator procIter) throws IOException {
632       while (procIter.hasNext()) {
633         Procedure proc = procIter.nextAsProcedure();
634         LOG.debug("loading procId=" + proc.getProcId() + ": " + proc);
635         if (procIds != null) {
636           assertTrue("procId=" + proc.getProcId() + " unexpected",
637                      procIds.contains(proc.getProcId()));
638         }
639         loaded.add(proc);
640       }
641     }
642 
643     @Override
644     public void handleCorrupted(ProcedureIterator procIter) throws IOException {
645       while (procIter.hasNext()) {
646         Procedure proc = procIter.nextAsProcedure();
647         LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
648         corrupted.add(proc);
649       }
650     }
651   }
652 }