1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
22 import java.io.IOException;
23 import java.util.Random;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.testclassification.MediumTests;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.TestMetaTableAccessor;
39 import org.apache.hadoop.hbase.client.Consistency;
40 import org.apache.hadoop.hbase.client.Get;
41 import org.apache.hadoop.hbase.client.HTable;
42 import org.apache.hadoop.hbase.client.Put;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.Table;
45 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.RequestConverter;
48 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.Threads;
51 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
52 import org.apache.hadoop.hdfs.DFSConfigKeys;
53 import org.apache.hadoop.util.StringUtils;
54 import org.junit.After;
55 import org.junit.AfterClass;
56 import org.junit.Assert;
57 import org.junit.BeforeClass;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60
61 import com.google.protobuf.ServiceException;
62
63
64
65
66
67 @Category(MediumTests.class)
68 public class TestRegionReplicas {
69 private static final Log LOG = LogFactory.getLog(TestRegionReplicas.class);
70
71 private static final int NB_SERVERS = 1;
72 private static HTable table;
73 private static final byte[] row = "TestRegionReplicas".getBytes();
74
75 private static HRegionInfo hriPrimary;
76 private static HRegionInfo hriSecondary;
77
78 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
79 private static final byte[] f = HConstants.CATALOG_FAMILY;
80
81 @BeforeClass
82 public static void before() throws Exception {
83
84
85 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
86 HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
87 HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
88
89 HTU.startMiniCluster(NB_SERVERS);
90 final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName());
91
92
93 table = HTU.createTable(tableName, f);
94
95 hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
96
97
98 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
99 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
100
101
102 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
103 }
104
105 @AfterClass
106 public static void afterClass() throws Exception {
107 table.close();
108 HTU.shutdownMiniCluster();
109 }
110
111 @After
112 public void after() throws Exception {
113
114
115
116 ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
117 }
118
119 private HRegionServer getRS() {
120 return HTU.getMiniHBaseCluster().getRegionServer(0);
121 }
122
123 @Test(timeout = 60000)
124 public void testOpenRegionReplica() throws Exception {
125 openRegion(HTU, getRS(), hriSecondary);
126 try {
127
128 HTU.loadNumericRows(table, f, 0, 1000);
129
130
131 Assert.assertEquals(1000, HTU.countRows(table));
132 } finally {
133 HTU.deleteNumericRows(table, f, 0, 1000);
134 closeRegion(HTU, getRS(), hriSecondary);
135 }
136 }
137
138
139 @Test(timeout = 60000)
140 public void testRegionReplicaUpdatesMetaLocation() throws Exception {
141 openRegion(HTU, getRS(), hriSecondary);
142 Table meta = null;
143 try {
144 meta = HTU.getConnection().getTable(TableName.META_TABLE_NAME);
145 TestMetaTableAccessor.assertMetaLocation(meta, hriPrimary.getRegionName()
146 , getRS().getServerName(), -1, 1, false);
147 } finally {
148 if (meta != null ) meta.close();
149 closeRegion(HTU, getRS(), hriSecondary);
150 }
151 }
152
153 @Test(timeout = 60000)
154 public void testRegionReplicaGets() throws Exception {
155 try {
156
157 HTU.loadNumericRows(table, f, 0, 1000);
158
159 Assert.assertEquals(1000, HTU.countRows(table));
160
161 Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
162 region.flush(true);
163
164 openRegion(HTU, getRS(), hriSecondary);
165
166
167 region = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
168 assertGet(region, 42, true);
169
170 assertGetRpc(hriSecondary, 42, true);
171 } finally {
172 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
173 closeRegion(HTU, getRS(), hriSecondary);
174 }
175 }
176
177 @Test(timeout = 60000)
178 public void testGetOnTargetRegionReplica() throws Exception {
179 try {
180
181 HTU.loadNumericRows(table, f, 0, 1000);
182
183 Assert.assertEquals(1000, HTU.countRows(table));
184
185 Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
186 region.flush(true);
187
188 openRegion(HTU, getRS(), hriSecondary);
189
190
191 byte[] row = Bytes.toBytes(String.valueOf(42));
192 Get get = new Get(row);
193 get.setConsistency(Consistency.TIMELINE);
194 get.setReplicaId(1);
195 Result result = table.get(get);
196 Assert.assertArrayEquals(row, result.getValue(f, null));
197 } finally {
198 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
199 closeRegion(HTU, getRS(), hriSecondary);
200 }
201 }
202
203 private void assertGet(Region region, int value, boolean expect) throws IOException {
204 byte[] row = Bytes.toBytes(String.valueOf(value));
205 Get get = new Get(row);
206 Result result = region.get(get);
207 if (expect) {
208 Assert.assertArrayEquals(row, result.getValue(f, null));
209 } else {
210 result.isEmpty();
211 }
212 }
213
214
215 private void assertGetRpc(HRegionInfo info, int value, boolean expect)
216 throws IOException, ServiceException {
217 byte[] row = Bytes.toBytes(String.valueOf(value));
218 Get get = new Get(row);
219 ClientProtos.GetRequest getReq = RequestConverter.buildGetRequest(info.getRegionName(), get);
220 ClientProtos.GetResponse getResp = getRS().getRSRpcServices().get(null, getReq);
221 Result result = ProtobufUtil.toResult(getResp.getResult());
222 if (expect) {
223 Assert.assertArrayEquals(row, result.getValue(f, null));
224 } else {
225 result.isEmpty();
226 }
227 }
228
229 private void restartRegionServer() throws Exception {
230 afterClass();
231 before();
232 }
233
234 @Test(timeout = 300000)
235 public void testRefreshStoreFiles() throws Exception {
236
237 final int refreshPeriod = 2000;
238 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 100);
239 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
240 refreshPeriod);
241
242 restartRegionServer();
243
244 try {
245 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
246 openRegion(HTU, getRS(), hriSecondary);
247
248
249 LOG.info("Loading data to primary region");
250 HTU.loadNumericRows(table, f, 0, 1000);
251
252 Assert.assertEquals(1000, HTU.countRows(table));
253
254 LOG.info("Flushing primary region");
255 Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
256 region.flush(true);
257
258
259 LOG.info("Sleeping for " + (4 * refreshPeriod));
260 Threads.sleep(4 * refreshPeriod);
261
262 LOG.info("Checking results from secondary region replica");
263 Region secondaryRegion = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
264 Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
265
266 assertGet(secondaryRegion, 42, true);
267 assertGetRpc(hriSecondary, 42, true);
268 assertGetRpc(hriSecondary, 1042, false);
269
270
271 HTU.loadNumericRows(table, f, 1000, 1100);
272 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
273 region.flush(true);
274
275 HTU.loadNumericRows(table, f, 2000, 2100);
276 region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
277 region.flush(true);
278
279
280 Threads.sleep(4 * refreshPeriod);
281
282 assertGetRpc(hriSecondary, 42, true);
283 assertGetRpc(hriSecondary, 1042, true);
284 assertGetRpc(hriSecondary, 2042, true);
285
286
287 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
288
289
290 HTU.compact(table.getName(), true);
291
292 long wakeUpTime = System.currentTimeMillis() + 4 * refreshPeriod;
293 while (System.currentTimeMillis() < wakeUpTime) {
294 assertGetRpc(hriSecondary, 42, true);
295 assertGetRpc(hriSecondary, 1042, true);
296 assertGetRpc(hriSecondary, 2042, true);
297 Threads.sleep(10);
298 }
299
300
301 Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
302
303 } finally {
304 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
305 closeRegion(HTU, getRS(), hriSecondary);
306 }
307 }
308
309 @Test(timeout = 300000)
310 public void testFlushAndCompactionsInPrimary() throws Exception {
311
312 long runtime = 30 * 1000;
313
314 final int refreshPeriod = 100;
315 HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3);
316 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod);
317
318 restartRegionServer();
319 final int startKey = 0, endKey = 1000;
320
321 try {
322 openRegion(HTU, getRS(), hriSecondary);
323
324
325 HTU.loadNumericRows(table, f, startKey, endKey);
326 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
327
328 Threads.sleep(2 * refreshPeriod);
329
330 final AtomicBoolean running = new AtomicBoolean(true);
331 @SuppressWarnings("unchecked")
332 final AtomicReference<Exception>[] exceptions = new AtomicReference[3];
333 for (int i=0; i < exceptions.length; i++) {
334 exceptions[i] = new AtomicReference<Exception>();
335 }
336
337 Runnable writer = new Runnable() {
338 int key = startKey;
339 @Override
340 public void run() {
341 try {
342 while (running.get()) {
343 byte[] data = Bytes.toBytes(String.valueOf(key));
344 Put put = new Put(data);
345 put.add(f, null, data);
346 table.put(put);
347 key++;
348 if (key == endKey) key = startKey;
349 }
350 } catch (Exception ex) {
351 LOG.warn(ex);
352 exceptions[0].compareAndSet(null, ex);
353 }
354 }
355 };
356
357 Runnable flusherCompactor = new Runnable() {
358 Random random = new Random();
359 @Override
360 public void run() {
361 try {
362 while (running.get()) {
363
364 if (random.nextBoolean()) {
365 TestRegionServerNoMaster.flushRegion(HTU, hriPrimary);
366 } else {
367 HTU.compact(table.getName(), random.nextBoolean());
368 }
369 }
370 } catch (Exception ex) {
371 LOG.warn(ex);
372 exceptions[1].compareAndSet(null, ex);
373 }
374 }
375 };
376
377 Runnable reader = new Runnable() {
378 Random random = new Random();
379 @Override
380 public void run() {
381 try {
382 while (running.get()) {
383
384 if (random.nextInt(10) == 0) {
385 try {
386 closeRegion(HTU, getRS(), hriSecondary);
387 } catch (Exception ex) {
388 LOG.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
389 exceptions[2].compareAndSet(null, ex);
390 }
391 try {
392 openRegion(HTU, getRS(), hriSecondary);
393 } catch (Exception ex) {
394 LOG.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
395 exceptions[2].compareAndSet(null, ex);
396 }
397 }
398
399 int key = random.nextInt(endKey - startKey) + startKey;
400 assertGetRpc(hriSecondary, key, true);
401 }
402 } catch (Exception ex) {
403 LOG.warn("Failed getting the value in the region " + hriSecondary + " " + StringUtils.stringifyException(ex));
404 exceptions[2].compareAndSet(null, ex);
405 }
406 }
407 };
408
409 LOG.info("Starting writer and reader");
410 ExecutorService executor = Executors.newFixedThreadPool(3);
411 executor.submit(writer);
412 executor.submit(flusherCompactor);
413 executor.submit(reader);
414
415
416 Threads.sleep(runtime);
417 running.set(false);
418 executor.shutdown();
419 executor.awaitTermination(30, TimeUnit.SECONDS);
420
421 for (AtomicReference<Exception> exRef : exceptions) {
422 Assert.assertNull(exRef.get());
423 }
424 } finally {
425 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey);
426 closeRegion(HTU, getRS(), hriSecondary);
427 }
428 }
429
430 @Test(timeout = 300000)
431 public void testVerifySecondaryAbilityToReadWithOnFiles() throws Exception {
432
433 HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
434 restartRegionServer();
435
436 try {
437 LOG.info("Opening the secondary region " + hriSecondary.getEncodedName());
438 openRegion(HTU, getRS(), hriSecondary);
439
440
441 LOG.info("Loading data to primary region");
442 for (int i = 0; i < 3; ++i) {
443 HTU.loadNumericRows(table, f, i * 1000, (i + 1) * 1000);
444 Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
445 region.flush(true);
446 }
447
448 Region primaryRegion = getRS().getFromOnlineRegions(hriPrimary.getEncodedName());
449 Assert.assertEquals(3, primaryRegion.getStore(f).getStorefilesCount());
450
451
452 Region secondaryRegion = getRS().getFromOnlineRegions(hriSecondary.getEncodedName());
453 secondaryRegion.getStore(f).refreshStoreFiles();
454 Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
455
456
457 LOG.info("Force Major compaction on primary region " + hriPrimary);
458 primaryRegion.compact(true);
459 Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
460
461
462
463
464
465 int keys = 0;
466 int sum = 0;
467 for (StoreFile sf: secondaryRegion.getStore(f).getStorefiles()) {
468
469 LOG.debug(getRS().getFileSystem().exists(sf.getPath()));
470 Assert.assertFalse(getRS().getFileSystem().exists(sf.getPath()));
471
472 HFileScanner scanner = sf.getReader().getScanner(false, false);
473 scanner.seekTo();
474 do {
475 keys++;
476
477 Cell cell = scanner.getKeyValue();
478 sum += Integer.parseInt(Bytes.toString(cell.getRowArray(),
479 cell.getRowOffset(), cell.getRowLength()));
480 } while (scanner.next());
481 }
482 Assert.assertEquals(3000, keys);
483 Assert.assertEquals(4498500, sum);
484 } finally {
485 HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
486 closeRegion(HTU, getRS(), hriSecondary);
487 }
488 }
489 }