View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertArrayEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.mockito.Mockito.doAnswer;
23  import static org.mockito.Mockito.spy;
24  
25  import java.io.IOException;
26  import java.util.Collection;
27  import java.util.Map;
28  
29  import org.apache.commons.lang.mutable.MutableBoolean;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.DroppedSnapshotException;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HColumnDescriptor;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.NamespaceDescriptor;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Connection;
40  import org.apache.hadoop.hbase.client.Get;
41  import org.apache.hadoop.hbase.client.HBaseAdmin;
42  import org.apache.hadoop.hbase.client.Put;
43  import org.apache.hadoop.hbase.client.Result;
44  import org.apache.hadoop.hbase.client.Table;
45  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
46  import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
47  import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
48  import org.apache.hadoop.hbase.testclassification.MediumTests;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  import org.apache.hadoop.hbase.wal.WAL;
52  import org.junit.After;
53  import org.junit.Before;
54  import org.junit.Test;
55  import org.junit.experimental.categories.Category;
56  import org.mockito.Matchers;
57  import org.mockito.invocation.InvocationOnMock;
58  import org.mockito.stubbing.Answer;
59  
60  /**
61   * Testcase for https://issues.apache.org/jira/browse/HBASE-13811
62   */
63  @Category({ MediumTests.class })
64  public class TestSplitWalDataLoss {
65  
66    private static final Log LOG = LogFactory.getLog(TestSplitWalDataLoss.class);
67  
68    private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
69  
70    private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
71        .build();
72  
73    private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
74  
75    private byte[] family = Bytes.toBytes("f");
76  
77    private byte[] qualifier = Bytes.toBytes("q");
78  
79    @Before
80    public void setUp() throws Exception {
81      testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
82      testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
83      testUtil.startMiniCluster(2);
84      HBaseAdmin admin = testUtil.getHBaseAdmin();
85      admin.createNamespace(namespace);
86      admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
87      testUtil.waitTableAvailable(tableName);
88    }
89  
90    @After
91    public void tearDown() throws Exception {
92      testUtil.shutdownMiniCluster();
93    }
94  
95    @Test
96    public void test() throws IOException, InterruptedException {
97      final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
98      final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0);
99      HRegion spiedRegion = spy(region);
100     final MutableBoolean flushed = new MutableBoolean(false);
101     final MutableBoolean reported = new MutableBoolean(false);
102     doAnswer(new Answer<FlushResult>() {
103       @Override
104       public FlushResult answer(InvocationOnMock invocation) throws Throwable {
105         synchronized (flushed) {
106           flushed.setValue(true);
107           flushed.notifyAll();
108         }
109         synchronized (reported) {
110           while (!reported.booleanValue()) {
111             reported.wait();
112           }
113         }
114         rs.getWAL(region.getRegionInfo()).abortCacheFlush(
115           region.getRegionInfo().getEncodedNameAsBytes());
116         throw new DroppedSnapshotException("testcase");
117       }
118     }).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
119       Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
120       Matchers.<Collection<Store>> any());
121     // Find region key; don't pick up key for hbase:meta by mistake.
122     String key = null;
123     for (Map.Entry<String, Region> entry: rs.onlineRegions.entrySet()) {
124       if (entry.getValue().getRegionInfo().getTable().equals(this.tableName)) {
125         key = entry.getKey();
126         break;
127       }
128     }
129     rs.onlineRegions.put(key, spiedRegion);
130     Connection conn = testUtil.getConnection();
131 
132     try (Table table = conn.getTable(tableName)) {
133       table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0")));
134     }
135     long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
136     LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
137     assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
138     rs.cacheFlusher.requestFlush(spiedRegion, false);
139     synchronized (flushed) {
140       while (!flushed.booleanValue()) {
141         flushed.wait();
142       }
143     }
144     try (Table table = conn.getTable(tableName)) {
145       table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1")));
146     }
147     long now = EnvironmentEdgeManager.currentTime();
148     rs.tryRegionServerReport(now - 500, now);
149     synchronized (reported) {
150       reported.setValue(true);
151       reported.notifyAll();
152     }
153     while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
154       Thread.sleep(100);
155     }
156     try (Table table = conn.getTable(tableName)) {
157       Result result = table.get(new Get(Bytes.toBytes("row0")));
158       assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
159     }
160   }
161 }