View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.Collection;
25  import java.util.List;
26  import java.util.concurrent.CountDownLatch;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FileSystem;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.client.Admin;
34  import org.apache.hadoop.hbase.client.HBaseAdmin;
35  import org.apache.hadoop.hbase.client.Table;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
38  import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
39  import org.apache.hadoop.hbase.regionserver.HRegion;
40  import org.apache.hadoop.hbase.regionserver.HRegionServer;
41  import org.apache.hadoop.hbase.regionserver.HStore;
42  import org.apache.hadoop.hbase.regionserver.Region;
43  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
44  import org.apache.hadoop.hbase.regionserver.Store;
45  import org.apache.hadoop.hbase.regionserver.StoreFile;
46  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
47  import org.apache.hadoop.hbase.testclassification.MediumTests;
48  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
49  import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
50  import org.apache.hadoop.hbase.security.User;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
53  import org.apache.hadoop.hbase.wal.WAL;
54  import org.junit.Test;
55  import org.junit.experimental.categories.Category;
56  
57  import com.google.common.collect.Lists;
58  
59  /**
60   * Test for the case where a regionserver going down has enough cycles to do damage to regions
61   * that have actually been assigned elsehwere.
62   *
63   * <p>If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the
64   * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise
65   * change the region file set.  The region in its new location will then get a surprise when it tries to do something
66   * w/ a file removed by the region in its old location on dying server.
67   *
68   * <p>Making a test for this case is a little tough in that even if a file is deleted up on the namenode,
69   * if the file was opened before the delete, it will continue to let reads happen until something changes the
70   * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned
71   * from the datanode by NN).
72   *
73   * <p>What we will do below is do an explicit check for existence on the files listed in the region that
74   * has had some files removed because of a compaction.  This sort of hurry's along and makes certain what is a chance
75   * occurance.
76   */
77  @Category(MediumTests.class)
78  public class TestIOFencing {
79    private static final Log LOG = LogFactory.getLog(TestIOFencing.class);
80    static {
81      // Uncomment the following lines if more verbosity is needed for
82      // debugging (see HBASE-12285 for details).
83      //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
84      //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
85      //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
86      //((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
87      //    .getLogger().setLevel(Level.ALL);
88      //((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
89    }
90  
91    public abstract static class CompactionBlockerRegion extends HRegion {
92      volatile int compactCount = 0;
93      volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
94      volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
95  
96      @SuppressWarnings("deprecation")
97      public CompactionBlockerRegion(Path tableDir, WAL log,
98          FileSystem fs, Configuration confParam, HRegionInfo info,
99          HTableDescriptor htd, RegionServerServices rsServices) {
100       super(tableDir, log, fs, confParam, info, htd, rsServices);
101     }
102 
103     public void stopCompactions() {
104       compactionsBlocked = new CountDownLatch(1);
105       compactionsWaiting = new CountDownLatch(1);
106     }
107 
108     public void allowCompactions() {
109       LOG.debug("allowing compactions");
110       compactionsBlocked.countDown();
111     }
112     public void waitForCompactionToBlock() throws IOException {
113       try {
114         LOG.debug("waiting for compaction to block");
115         compactionsWaiting.await();
116         LOG.debug("compaction block reached");
117       } catch (InterruptedException ex) {
118         throw new IOException(ex);
119       }
120     }
121 
122     @Override
123     public boolean compact(CompactionContext compaction, Store store,
124         CompactionThroughputController throughputController) throws IOException {
125       try {
126         return super.compact(compaction, store, throughputController);
127       } finally {
128         compactCount++;
129       }
130     }
131 
132     @Override
133     public boolean compact(CompactionContext compaction, Store store,
134         CompactionThroughputController throughputController, User user) throws IOException {
135       try {
136         return super.compact(compaction, store, throughputController, user);
137       } finally {
138         compactCount++;
139       }
140     }
141 
142     public int countStoreFiles() {
143       int count = 0;
144       for (Store store : stores.values()) {
145         count += store.getStorefilesCount();
146       }
147       return count;
148     }
149   }
150 
151   /**
152    * An override of HRegion that allows us park compactions in a holding pattern and
153    * then when appropriate for the test, allow them proceed again.
154    */
155   public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
156 
157     public BlockCompactionsInPrepRegion(Path tableDir, WAL log,
158         FileSystem fs, Configuration confParam, HRegionInfo info,
159         HTableDescriptor htd, RegionServerServices rsServices) {
160       super(tableDir, log, fs, confParam, info, htd, rsServices);
161     }
162     @Override
163     protected void doRegionCompactionPrep() throws IOException {
164       compactionsWaiting.countDown();
165       try {
166         compactionsBlocked.await();
167       } catch (InterruptedException ex) {
168         throw new IOException();
169       }
170       super.doRegionCompactionPrep();
171     }
172   }
173 
174   /**
175    * An override of HRegion that allows us park compactions in a holding pattern and
176    * then when appropriate for the test, allow them proceed again. This allows the compaction
177    * entry to go the WAL before blocking, but blocks afterwards
178    */
179   public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
180     public BlockCompactionsInCompletionRegion(Path tableDir, WAL log,
181         FileSystem fs, Configuration confParam, HRegionInfo info,
182         HTableDescriptor htd, RegionServerServices rsServices) {
183       super(tableDir, log, fs, confParam, info, htd, rsServices);
184     }
185     @Override
186     protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
187       return new BlockCompactionsInCompletionHStore(this, family, this.conf);
188     }
189   }
190 
191   public static class BlockCompactionsInCompletionHStore extends HStore {
192     CompactionBlockerRegion r;
193     protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family,
194         Configuration confParam) throws IOException {
195       super(region, family, confParam);
196       r = (CompactionBlockerRegion) region;
197     }
198 
199     @Override
200     protected void completeCompaction(final Collection<StoreFile> compactedFiles,
201         boolean removeFiles) throws IOException {
202       try {
203         r.compactionsWaiting.countDown();
204         r.compactionsBlocked.await();
205       } catch (InterruptedException ex) {
206         throw new IOException(ex);
207       }
208       super.completeCompaction(compactedFiles, removeFiles);
209     }
210     @Override
211     protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
212       try {
213         r.compactionsWaiting.countDown();
214         r.compactionsBlocked.await();
215       } catch (InterruptedException ex) {
216         throw new IOException(ex);
217       }
218       super.completeCompaction(compactedFiles);
219     }
220   }
221 
222   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
223   private final static TableName TABLE_NAME =
224       TableName.valueOf("tabletest");
225   private final static byte[] FAMILY = Bytes.toBytes("family");
226   private static final int FIRST_BATCH_COUNT = 4000;
227   private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
228 
229   /**
230    * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
231    * compaction until after we have killed the server and the region has come up on
232    * a new regionserver altogether.  This fakes the double assignment case where region in one
233    * location changes the files out from underneath a region being served elsewhere.
234    */
235   @Test
236   public void testFencingAroundCompaction() throws Exception {
237     doTest(BlockCompactionsInPrepRegion.class);
238   }
239 
240   /**
241    * Test that puts up a regionserver, starts a compaction on a loaded region but holds the
242    * compaction completion until after we have killed the server and the region has come up on
243    * a new regionserver altogether.  This fakes the double assignment case where region in one
244    * location changes the files out from underneath a region being served elsewhere.
245    */
246   @Test
247   public void testFencingAroundCompactionAfterWALSync() throws Exception {
248     doTest(BlockCompactionsInCompletionRegion.class);
249   }
250 
251   public void doTest(Class<?> regionClass) throws Exception {
252     Configuration c = TEST_UTIL.getConfiguration();
253     // Insert our custom region
254     c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
255     c.setBoolean("dfs.support.append", true);
256     // Encourage plenty of flushes
257     c.setLong("hbase.hregion.memstore.flush.size", 200000);
258     c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
259     // Only run compaction when we tell it to
260     c.setInt("hbase.hstore.compactionThreshold", 1000);
261     c.setLong("hbase.hstore.blockingStoreFiles", 1000);
262     // Compact quickly after we tell it to!
263     c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
264     LOG.info("Starting mini cluster");
265     TEST_UTIL.startMiniCluster(1);
266     CompactionBlockerRegion compactingRegion = null;
267     Admin admin = null;
268     try {
269       LOG.info("Creating admin");
270       admin = TEST_UTIL.getConnection().getAdmin();
271       LOG.info("Creating table");
272       TEST_UTIL.createTable(TABLE_NAME, FAMILY);
273       Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
274       LOG.info("Loading test table");
275       // Find the region
276       List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
277       assertEquals(1, testRegions.size());
278       compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
279       LOG.info("Blocking compactions");
280       compactingRegion.stopCompactions();
281       long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
282       // Load some rows
283       TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
284 
285       // add a compaction from an older (non-existing) region to see whether we successfully skip
286       // those entries
287       HRegionInfo oldHri = new HRegionInfo(table.getName(),
288         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
289       CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
290         FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
291         new Path("store_dir"));
292       WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
293         oldHri, compactionDescriptor, compactingRegion.getMVCC());
294 
295       // Wait till flush has happened, otherwise there won't be multiple store files
296       long startWaitTime = System.currentTimeMillis();
297       while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime ||
298           compactingRegion.countStoreFiles() <= 1) {
299         LOG.info("Waiting for the region to flush " +
300           compactingRegion.getRegionInfo().getRegionNameAsString());
301         Thread.sleep(1000);
302         assertTrue("Timed out waiting for the region to flush",
303           System.currentTimeMillis() - startWaitTime < 30000);
304       }
305       assertTrue(compactingRegion.countStoreFiles() > 1);
306       final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName();
307       LOG.info("Asking for compaction");
308       ((HBaseAdmin)admin).majorCompact(TABLE_NAME.getName());
309       LOG.info("Waiting for compaction to be about to start");
310       compactingRegion.waitForCompactionToBlock();
311       LOG.info("Starting a new server");
312       RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
313       final HRegionServer newServer = newServerThread.getRegionServer();
314       LOG.info("Killing region server ZK lease");
315       TEST_UTIL.expireRegionServerSession(0);
316       CompactionBlockerRegion newRegion = null;
317       startWaitTime = System.currentTimeMillis();
318       LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
319 
320       // wait for region to be assigned and to go out of log replay if applicable
321       Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
322         @Override
323         public boolean evaluate() throws Exception {
324           Region newRegion = newServer.getOnlineRegion(REGION_NAME);
325           return newRegion != null && !newRegion.isRecovering();
326         }
327       });
328 
329       newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
330 
331       // After compaction of old region finishes on the server that was going down, make sure that
332       // all the files we expect are still working when region is up in new location.
333       FileSystem fs = newRegion.getFilesystem();
334       for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
335         assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
336       }
337       LOG.info("Allowing compaction to proceed");
338       compactingRegion.allowCompactions();
339       while (compactingRegion.compactCount == 0) {
340         Thread.sleep(1000);
341       }
342       // The server we killed stays up until the compaction that was started before it was killed completes.  In logs
343       // you should see the old regionserver now going down.
344       LOG.info("Compaction finished");
345 
346       // If we survive the split keep going...
347       // Now we make sure that the region isn't totally confused.  Load up more rows.
348       TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
349       ((HBaseAdmin)admin).majorCompact(TABLE_NAME.getName());
350       startWaitTime = System.currentTimeMillis();
351       while (newRegion.compactCount == 0) {
352         Thread.sleep(1000);
353         assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000);
354       }
355       assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
356     } finally {
357       if (compactingRegion != null) {
358         compactingRegion.allowCompactions();
359       }
360       admin.close();
361       TEST_UTIL.shutdownMiniCluster();
362     }
363   }
364 }