View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.cleaner;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.io.IOException;
23  import java.lang.reflect.Field;
24  import java.net.URLEncoder;
25  import java.util.LinkedList;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.ChoreService;
32  import org.apache.hadoop.hbase.CoordinatedStateManager;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.testclassification.MediumTests;
36  import org.apache.hadoop.hbase.Server;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.Waiter;
39  import org.apache.hadoop.hbase.client.ClusterConnection;
40  import org.apache.hadoop.hbase.replication.ReplicationFactory;
41  import org.apache.hadoop.hbase.replication.ReplicationQueues;
42  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
43  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
44  import org.apache.hadoop.hbase.replication.regionserver.Replication;
45  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
46  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
47  import org.junit.AfterClass;
48  import org.junit.BeforeClass;
49  import org.junit.Test;
50  import org.junit.experimental.categories.Category;
51  import org.mockito.Mockito;
52  
53  @Category(MediumTests.class)
54  public class TestLogsCleaner {
55  
56    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
57  
58    /**
59     * @throws java.lang.Exception
60     */
61    @BeforeClass
62    public static void setUpBeforeClass() throws Exception {
63      TEST_UTIL.startMiniZKCluster();
64    }
65  
66    /**
67     * @throws java.lang.Exception
68     */
69    @AfterClass
70    public static void tearDownAfterClass() throws Exception {
71      TEST_UTIL.shutdownMiniZKCluster();
72    }
73  
74    @Test
75    public void testLogCleaning() throws Exception{
76      Configuration conf = TEST_UTIL.getConfiguration();
77      // set TTL
78      long ttl = 10000;
79      conf.setLong("hbase.master.logcleaner.ttl", ttl);
80      conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
81      Replication.decorateMasterConfiguration(conf);
82      Server server = new DummyServer();
83      ReplicationQueues repQueues =
84          ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
85      repQueues.init(server.getServerName().toString());
86      final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
87          HConstants.HREGION_OLDLOGDIR_NAME);
88      String fakeMachineName =
89        URLEncoder.encode(server.getServerName().toString(), "UTF8");
90  
91      final FileSystem fs = FileSystem.get(conf);
92  
93      // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files
94      long now = System.currentTimeMillis();
95      fs.delete(oldLogDir, true);
96      fs.mkdirs(oldLogDir);
97      // Case 1: 2 invalid files, which would be deleted directly
98      fs.createNewFile(new Path(oldLogDir, "a"));
99      fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
100     // Case 2: 1 "recent" file, not even deletable for the first log cleaner
101     // (TimeToLiveLogCleaner), so we are not going down the chain
102     System.out.println("Now is: " + now);
103     for (int i = 1; i < 31; i++) {
104       // Case 3: old files which would be deletable for the first log cleaner
105       // (TimeToLiveLogCleaner), and also for the second (ReplicationLogCleaner)
106       Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) );
107       fs.createNewFile(fileName);
108       // Case 4: put 3 old log files in ZK indicating that they are scheduled
109       // for replication so these files would pass the first log cleaner
110       // (TimeToLiveLogCleaner) but would be rejected by the second
111       // (ReplicationLogCleaner)
112       if (i % (30/3) == 1) {
113         repQueues.addLog(fakeMachineName, fileName.getName());
114         System.out.println("Replication log file: " + fileName);
115       }
116     }
117 
118     // sleep for sometime to get newer modifcation time
119     Thread.sleep(ttl);
120     fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
121 
122     // Case 2: 1 newer file, not even deletable for the first log cleaner
123     // (TimeToLiveLogCleaner), so we are not going down the chain
124     fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) ));
125 
126     for (FileStatus stat : fs.listStatus(oldLogDir)) {
127       System.out.println(stat.getPath().toString());
128     }
129 
130     assertEquals(34, fs.listStatus(oldLogDir).length);
131 
132     LogCleaner cleaner  = new LogCleaner(1000, server, conf, fs, oldLogDir);
133 
134     cleaner.chore();
135 
136     // We end up with the current log file, a newer one and the 3 old log
137     // files which are scheduled for replication
138     TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
139       @Override
140       public boolean evaluate() throws Exception {
141         return 5 == fs.listStatus(oldLogDir).length;
142       }
143     });
144 
145     for (FileStatus file : fs.listStatus(oldLogDir)) {
146       System.out.println("Kept log files: " + file.getPath().getName());
147     }
148   }
149 
150   @Test(timeout=5000)
151   public void testZnodeCversionChange() throws Exception {
152     Configuration conf = TEST_UTIL.getConfiguration();
153     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
154     cleaner.setConf(conf);
155 
156     ReplicationQueuesClient rqcMock = Mockito.mock(ReplicationQueuesClient.class);
157     Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
158 
159     Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
160     rqc.setAccessible(true);
161 
162     rqc.set(cleaner, rqcMock);
163 
164     // This should return eventually when cversion stabilizes
165     cleaner.getDeletableFiles(new LinkedList<FileStatus>());
166   }
167 
168   static class DummyServer implements Server {
169 
170     @Override
171     public Configuration getConfiguration() {
172       return TEST_UTIL.getConfiguration();
173     }
174 
175     @Override
176     public ZooKeeperWatcher getZooKeeper() {
177       try {
178         return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
179       } catch (IOException e) {
180         e.printStackTrace();
181       }
182       return null;
183     }
184 
185     @Override
186     public CoordinatedStateManager getCoordinatedStateManager() {
187       return null;
188     }
189 
190     @Override
191     public ClusterConnection getConnection() {
192       return null;
193     }
194 
195     @Override
196     public MetaTableLocator getMetaTableLocator() {
197       return null;
198     }
199 
200     @Override
201     public ServerName getServerName() {
202       return ServerName.valueOf("regionserver,60020,000000");
203     }
204 
205     @Override
206     public void abort(String why, Throwable e) {}
207 
208     @Override
209     public boolean isAborted() {
210       return false;
211     }
212 
213     @Override
214     public void stop(String why) {}
215 
216     @Override
217     public boolean isStopped() {
218       return false;
219     }
220 
221     @Override
222     public ChoreService getChoreService() {
223       return null;
224     }
225   }
226 }