View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  
21  package org.apache.hadoop.hbase.client;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.Cell;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.testclassification.MediumTests;
34  import org.apache.hadoop.hbase.Waiter;
35  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
36  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
37  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
38  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
39  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
40  import org.apache.hadoop.hbase.protobuf.RequestConverter;
41  import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
42  import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.Pair;
45  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
46  import org.junit.AfterClass;
47  import org.junit.Assert;
48  import org.junit.BeforeClass;
49  import org.junit.Test;
50  import org.junit.experimental.categories.Category;
51  
52  import java.io.IOException;
53  import java.util.ArrayList;
54  import java.util.Arrays;
55  import java.util.List;
56  import java.util.concurrent.CountDownLatch;
57  import java.util.concurrent.TimeUnit;
58  import java.util.concurrent.atomic.AtomicLong;
59  import java.util.concurrent.atomic.AtomicReference;
60  
61  @Category(MediumTests.class)
62  public class TestReplicaWithCluster {
63    private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
64  
65    private static final int NB_SERVERS = 2;
66    private static final byte[] row = TestReplicaWithCluster.class.getName().getBytes();
67    private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
68  
69    // second minicluster used in testing of replication
70    private static HBaseTestingUtility HTU2;
71    private static final byte[] f = HConstants.CATALOG_FAMILY;
72  
73    private final static int REFRESH_PERIOD = 1000;
74  
75    /**
76     * This copro is used to synchronize the tests.
77     */
78    public static class SlowMeCopro extends BaseRegionObserver {
79      static final AtomicLong sleepTime = new AtomicLong(0);
80      static final AtomicReference<CountDownLatch> cdl =
81          new AtomicReference<CountDownLatch>(new CountDownLatch(0));
82  
83      public SlowMeCopro() {
84      }
85  
86      @Override
87      public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
88                           final Get get, final List<Cell> results) throws IOException {
89  
90        if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
91          CountDownLatch latch = cdl.get();
92          try {
93            if (sleepTime.get() > 0) {
94              LOG.info("Sleeping for " + sleepTime.get() + " ms");
95              Thread.sleep(sleepTime.get());
96            } else if (latch.getCount() > 0) {
97              LOG.info("Waiting for the counterCountDownLatch");
98              latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
99              if (latch.getCount() > 0) {
100               throw new RuntimeException("Can't wait more");
101             }
102           }
103         } catch (InterruptedException e1) {
104           LOG.error(e1);
105         }
106       } else {
107         LOG.info("We're not the primary replicas.");
108       }
109     }
110   }
111 
112   @BeforeClass
113   public static void beforeClass() throws Exception {
114     // enable store file refreshing
115     HTU.getConfiguration().setInt(
116         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
117 
118     HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
119     HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
120     HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
121     HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
122     HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
123     HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
124     HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
125 
126     HTU.startMiniCluster(NB_SERVERS);
127     HTU.getHBaseCluster().startMaster();
128   }
129 
130   @AfterClass
131   public static void afterClass() throws Exception {
132     HTU2.shutdownMiniCluster();
133     HTU.shutdownMiniCluster();
134   }
135 
136   @Test (timeout=30000)
137   public void testCreateDeleteTable() throws IOException {
138     // Create table then get the single region for our new table.
139     HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
140     hdt.setRegionReplication(NB_SERVERS);
141     hdt.addCoprocessor(SlowMeCopro.class.getName());
142     Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
143 
144     Put p = new Put(row);
145     p.add(f, row, row);
146     table.put(p);
147 
148     Get g = new Get(row);
149     Result r = table.get(g);
150     Assert.assertFalse(r.isStale());
151 
152     try {
153       // But if we ask for stale we will get it
154       SlowMeCopro.cdl.set(new CountDownLatch(1));
155       g = new Get(row);
156       g.setConsistency(Consistency.TIMELINE);
157       r = table.get(g);
158       Assert.assertTrue(r.isStale());
159       SlowMeCopro.cdl.get().countDown();
160     } finally {
161       SlowMeCopro.cdl.get().countDown();
162       SlowMeCopro.sleepTime.set(0);
163     }
164 
165     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
166     HTU.deleteTable(hdt.getTableName());
167   }
168 
169   @Test (timeout=120000)
170   public void testChangeTable() throws Exception {
171     HTableDescriptor hdt = HTU.createTableDescriptor("testChangeTable");
172     hdt.setRegionReplication(NB_SERVERS);
173     hdt.addCoprocessor(SlowMeCopro.class.getName());
174     Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
175 
176     // basic test: it should work.
177     Put p = new Put(row);
178     p.add(f, row, row);
179     table.put(p);
180 
181     Get g = new Get(row);
182     Result r = table.get(g);
183     Assert.assertFalse(r.isStale());
184 
185     // Add a CF, it should work.
186     HTableDescriptor bHdt = HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
187     HColumnDescriptor hcd = new HColumnDescriptor(row);
188     hdt.addFamily(hcd);
189     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
190     HTU.getHBaseAdmin().modifyTable(hdt.getTableName(), hdt);
191     HTU.getHBaseAdmin().enableTable(hdt.getTableName());
192     HTableDescriptor nHdt = HTU.getHBaseAdmin().getTableDescriptor(hdt.getTableName());
193     Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
194         bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
195 
196     p = new Put(row);
197     p.add(row, row, row);
198     table.put(p);
199 
200     g = new Get(row);
201     r = table.get(g);
202     Assert.assertFalse(r.isStale());
203 
204     try {
205       SlowMeCopro.cdl.set(new CountDownLatch(1));
206       g = new Get(row);
207       g.setConsistency(Consistency.TIMELINE);
208       r = table.get(g);
209       Assert.assertTrue(r.isStale());
210     } finally {
211       SlowMeCopro.cdl.get().countDown();
212       SlowMeCopro.sleepTime.set(0);
213     }
214 
215     HTU.getHBaseCluster().stopMaster(0);
216     Admin admin = new HBaseAdmin(HTU.getConfiguration());
217     nHdt =admin.getTableDescriptor(hdt.getTableName());
218     Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
219         bHdt.getColumnFamilies().length + 1, nHdt.getColumnFamilies().length);
220 
221     admin.disableTable(hdt.getTableName());
222     admin.deleteTable(hdt.getTableName());
223     HTU.getHBaseCluster().startMaster();
224     admin.close();
225   }
226 
227   @SuppressWarnings("deprecation")
228   @Test (timeout=300000)
229   public void testReplicaAndReplication() throws Exception {
230     HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication");
231     hdt.setRegionReplication(NB_SERVERS);
232 
233     HColumnDescriptor fam = new HColumnDescriptor(row);
234     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
235     hdt.addFamily(fam);
236 
237     hdt.addCoprocessor(SlowMeCopro.class.getName());
238     HTU.getHBaseAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
239 
240     Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
241     conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
242     conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
243     MiniZooKeeperCluster miniZK = HTU.getZkCluster();
244 
245     HTU2 = new HBaseTestingUtility(conf2);
246     HTU2.setZkCluster(miniZK);
247     HTU2.startMiniCluster(NB_SERVERS);
248     LOG.info("Setup second Zk");
249     HTU2.getHBaseAdmin().createTable(hdt, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
250 
251     ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
252     admin.addPeer("2", HTU2.getClusterKey());
253     admin.close();
254 
255     Put p = new Put(row);
256     p.add(row, row, row);
257     final Table table = new HTable(HTU.getConfiguration(), hdt.getTableName());
258     table.put(p);
259 
260     HTU.getHBaseAdmin().flush(table.getName());
261     LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
262 
263     Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
264       @Override
265       public boolean evaluate() throws Exception {
266         try {
267           SlowMeCopro.cdl.set(new CountDownLatch(1));
268           Get g = new Get(row);
269           g.setConsistency(Consistency.TIMELINE);
270           Result r = table.get(g);
271           Assert.assertTrue(r.isStale());
272           return  !r.isEmpty();
273         } finally {
274           SlowMeCopro.cdl.get().countDown();
275           SlowMeCopro.sleepTime.set(0);
276         }
277       }});
278     table.close();
279     LOG.info("stale get on the first cluster done. Now for the second.");
280 
281     final Table table2 = new HTable(HTU.getConfiguration(), hdt.getTableName());
282     Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
283       @Override
284       public boolean evaluate() throws Exception {
285         try {
286           SlowMeCopro.cdl.set(new CountDownLatch(1));
287           Get g = new Get(row);
288           g.setConsistency(Consistency.TIMELINE);
289           Result r = table2.get(g);
290           Assert.assertTrue(r.isStale());
291           return  !r.isEmpty();
292         } finally {
293           SlowMeCopro.cdl.get().countDown();
294           SlowMeCopro.sleepTime.set(0);
295         }
296       }});
297     table2.close();
298 
299     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
300     HTU.deleteTable(hdt.getTableName());
301 
302     HTU2.getHBaseAdmin().disableTable(hdt.getTableName());
303     HTU2.deleteTable(hdt.getTableName());
304 
305     // We shutdown HTU2 minicluster later, in afterClass(), as shutting down
306     // the minicluster has negative impact of deleting all HConnections in JVM.
307   }
308 
309   @Test (timeout=30000)
310   public void testBulkLoad() throws IOException {
311     // Create table then get the single region for our new table.
312     LOG.debug("Creating test table");
313     HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad");
314     hdt.setRegionReplication(NB_SERVERS);
315     hdt.addCoprocessor(SlowMeCopro.class.getName());
316     Table table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
317 
318     // create hfiles to load.
319     LOG.debug("Creating test data");
320     Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
321     final int numRows = 10;
322     final byte[] qual = Bytes.toBytes("qual");
323     final byte[] val  = Bytes.toBytes("val");
324     final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>();
325     for (HColumnDescriptor col : hdt.getColumnFamilies()) {
326       Path hfile = new Path(dir, col.getNameAsString());
327       TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(),
328         qual, val, numRows);
329       famPaths.add(new Pair<byte[], String>(col.getName(), hfile.toString()));
330     }
331 
332     // bulk load HFiles
333     LOG.debug("Loading test data");
334     @SuppressWarnings("deprecation")
335     final HConnection conn = HTU.getHBaseAdmin().getConnection();
336     RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
337       conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
338         @Override
339         public Void call(int timeout) throws Exception {
340           LOG.debug("Going to connect to server " + getLocation() + " for row "
341             + Bytes.toStringBinary(getRow()));
342           byte[] regionName = getLocation().getRegionInfo().getRegionName();
343           BulkLoadHFileRequest request =
344             RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
345           getStub().bulkLoadHFile(null, request);
346           return null;
347         }
348       };
349     RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
350     RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
351     caller.callWithRetries(callable, 10000);
352 
353     // verify we can read them from the primary
354     LOG.debug("Verifying data load");
355     for (int i = 0; i < numRows; i++) {
356       byte[] row = TestHRegionServerBulkLoad.rowkey(i);
357       Get g = new Get(row);
358       Result r = table.get(g);
359       Assert.assertFalse(r.isStale());
360     }
361 
362     // verify we can read them from the replica
363     LOG.debug("Verifying replica queries");
364     try {
365       SlowMeCopro.cdl.set(new CountDownLatch(1));
366       for (int i = 0; i < numRows; i++) {
367         byte[] row = TestHRegionServerBulkLoad.rowkey(i);
368         Get g = new Get(row);
369         g.setConsistency(Consistency.TIMELINE);
370         Result r = table.get(g);
371         Assert.assertTrue(r.isStale());
372       }
373       SlowMeCopro.cdl.get().countDown();
374     } finally {
375       SlowMeCopro.cdl.get().countDown();
376       SlowMeCopro.sleepTime.set(0);
377     }
378 
379     HTU.getHBaseAdmin().disableTable(hdt.getTableName());
380     HTU.deleteTable(hdt.getTableName());
381   }
382 }