1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.security.PrivilegedAction;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileSystem;
32 import org.apache.hadoop.hbase.master.HMaster;
33 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
34 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
35 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
36 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
37 import org.apache.hadoop.hbase.regionserver.HRegion;
38 import org.apache.hadoop.hbase.regionserver.HRegionServer;
39 import org.apache.hadoop.hbase.regionserver.Region;
40 import org.apache.hadoop.hbase.security.User;
41 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
42 import org.apache.hadoop.hbase.util.JVMClusterUtil;
43 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
44 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
45 import org.apache.hadoop.hbase.util.Threads;
46
47
48
49
50
51
52
53 @InterfaceAudience.Public
54 @InterfaceStability.Evolving
55 public class MiniHBaseCluster extends HBaseCluster {
56 private static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
57 public LocalHBaseCluster hbaseCluster;
58 private static int index;
59
60
61
62
63
64
65
66 public MiniHBaseCluster(Configuration conf, int numRegionServers)
67 throws IOException, InterruptedException {
68 this(conf, 1, numRegionServers);
69 }
70
71
72
73
74
75
76
77
78 public MiniHBaseCluster(Configuration conf, int numMasters,
79 int numRegionServers)
80 throws IOException, InterruptedException {
81 this(conf, numMasters, numRegionServers, null, null);
82 }
83
84 public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
85 Class<? extends HMaster> masterClass,
86 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
87 throws IOException, InterruptedException {
88 super(conf);
89 conf.set(HConstants.MASTER_PORT, "0");
90
91
92 CompatibilityFactory.getInstance(MetricsAssertHelper.class).init();
93
94 init(numMasters, numRegionServers, masterClass, regionserverClass);
95 this.initialClusterStatus = getClusterStatus();
96 }
97
98 public Configuration getConfiguration() {
99 return this.conf;
100 }
101
102
103
104
105
106
107
108 public static class MiniHBaseClusterRegionServer extends HRegionServer {
109 private Thread shutdownThread = null;
110 private User user = null;
111 public static boolean TEST_SKIP_CLOSE = false;
112
113 public MiniHBaseClusterRegionServer(Configuration conf, CoordinatedStateManager cp)
114 throws IOException, InterruptedException {
115 super(conf, cp);
116 this.user = User.getCurrent();
117 }
118
119
120
121
122
123
124
125
126
127 @Override
128 protected void handleReportForDutyResponse(
129 final RegionServerStartupResponse c) throws IOException {
130 super.handleReportForDutyResponse(c);
131
132 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
133 }
134
135 @Override
136 public void run() {
137 try {
138 this.user.runAs(new PrivilegedAction<Object>(){
139 public Object run() {
140 runRegionServer();
141 return null;
142 }
143 });
144 } catch (Throwable t) {
145 LOG.error("Exception in run", t);
146 } finally {
147
148 if (this.shutdownThread != null) {
149 this.shutdownThread.start();
150 Threads.shutdown(this.shutdownThread, 30000);
151 }
152 }
153 }
154
155 private void runRegionServer() {
156 super.run();
157 }
158
159 @Override
160 public void kill() {
161 super.kill();
162 }
163
164 public void abort(final String reason, final Throwable cause) {
165 this.user.runAs(new PrivilegedAction<Object>() {
166 public Object run() {
167 abortRegionServer(reason, cause);
168 return null;
169 }
170 });
171 }
172
173 private void abortRegionServer(String reason, Throwable cause) {
174 super.abort(reason, cause);
175 }
176 }
177
178
179
180
181
182 static class SingleFileSystemShutdownThread extends Thread {
183 private final FileSystem fs;
184 SingleFileSystemShutdownThread(final FileSystem fs) {
185 super("Shutdown of " + fs);
186 this.fs = fs;
187 }
188 @Override
189 public void run() {
190 try {
191 LOG.info("Hook closing fs=" + this.fs);
192 this.fs.close();
193 } catch (NullPointerException npe) {
194 LOG.debug("Need to fix these: " + npe.toString());
195 } catch (IOException e) {
196 LOG.warn("Running hook", e);
197 }
198 }
199 }
200
201 private void init(final int nMasterNodes, final int nRegionNodes,
202 Class<? extends HMaster> masterClass,
203 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
204 throws IOException, InterruptedException {
205 try {
206 if (masterClass == null){
207 masterClass = HMaster.class;
208 }
209 if (regionserverClass == null){
210 regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
211 }
212
213
214 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
215 masterClass, regionserverClass);
216
217
218 for (int i=0; i<nRegionNodes; i++) {
219 Configuration rsConf = HBaseConfiguration.create(conf);
220 User user = HBaseTestingUtility.getDifferentUser(rsConf,
221 ".hfs."+index++);
222 hbaseCluster.addRegionServer(rsConf, i, user);
223 }
224
225 hbaseCluster.startup();
226 } catch (IOException e) {
227 shutdown();
228 throw e;
229 } catch (Throwable t) {
230 LOG.error("Error starting cluster", t);
231 shutdown();
232 throw new IOException("Shutting down", t);
233 }
234 }
235
236 @Override
237 public void startRegionServer(String hostname, int port) throws IOException {
238 this.startRegionServer();
239 }
240
241 @Override
242 public void killRegionServer(ServerName serverName) throws IOException {
243 HRegionServer server = getRegionServer(getRegionServerIndex(serverName));
244 if (server instanceof MiniHBaseClusterRegionServer) {
245 LOG.info("Killing " + server.toString());
246 ((MiniHBaseClusterRegionServer) server).kill();
247 } else {
248 abortRegionServer(getRegionServerIndex(serverName));
249 }
250 }
251
252 @Override
253 public void stopRegionServer(ServerName serverName) throws IOException {
254 stopRegionServer(getRegionServerIndex(serverName));
255 }
256
257 @Override
258 public void waitForRegionServerToStop(ServerName serverName, long timeout) throws IOException {
259
260 waitOnRegionServer(getRegionServerIndex(serverName));
261 }
262
263 @Override
264 public void startZkNode(String hostname, int port) throws IOException {
265 LOG.warn("Starting zookeeper nodes on mini cluster is not supported");
266 }
267
268 @Override
269 public void killZkNode(ServerName serverName) throws IOException {
270 LOG.warn("Aborting zookeeper nodes on mini cluster is not supported");
271 }
272
273 @Override
274 public void stopZkNode(ServerName serverName) throws IOException {
275 LOG.warn("Stopping zookeeper nodes on mini cluster is not supported");
276 }
277
278 @Override
279 public void waitForZkNodeToStart(ServerName serverName, long timeout) throws IOException {
280 LOG.warn("Waiting for zookeeper nodes to start on mini cluster is not supported");
281 }
282
283 @Override
284 public void waitForZkNodeToStop(ServerName serverName, long timeout) throws IOException {
285 LOG.warn("Waiting for zookeeper nodes to stop on mini cluster is not supported");
286 }
287
288 @Override
289 public void startDataNode(ServerName serverName) throws IOException {
290 LOG.warn("Starting datanodes on mini cluster is not supported");
291 }
292
293 @Override
294 public void killDataNode(ServerName serverName) throws IOException {
295 LOG.warn("Aborting datanodes on mini cluster is not supported");
296 }
297
298 @Override
299 public void stopDataNode(ServerName serverName) throws IOException {
300 LOG.warn("Stopping datanodes on mini cluster is not supported");
301 }
302
303 @Override
304 public void waitForDataNodeToStart(ServerName serverName, long timeout) throws IOException {
305 LOG.warn("Waiting for datanodes to start on mini cluster is not supported");
306 }
307
308 @Override
309 public void waitForDataNodeToStop(ServerName serverName, long timeout) throws IOException {
310 LOG.warn("Waiting for datanodes to stop on mini cluster is not supported");
311 }
312
313 @Override
314 public void startMaster(String hostname, int port) throws IOException {
315 this.startMaster();
316 }
317
318 @Override
319 public void killMaster(ServerName serverName) throws IOException {
320 abortMaster(getMasterIndex(serverName));
321 }
322
323 @Override
324 public void stopMaster(ServerName serverName) throws IOException {
325 stopMaster(getMasterIndex(serverName));
326 }
327
328 @Override
329 public void waitForMasterToStop(ServerName serverName, long timeout) throws IOException {
330
331 waitOnMaster(getMasterIndex(serverName));
332 }
333
334
335
336
337
338
339
340 public JVMClusterUtil.RegionServerThread startRegionServer()
341 throws IOException {
342 final Configuration newConf = HBaseConfiguration.create(conf);
343 User rsUser =
344 HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
345 JVMClusterUtil.RegionServerThread t = null;
346 try {
347 t = hbaseCluster.addRegionServer(
348 newConf, hbaseCluster.getRegionServers().size(), rsUser);
349 t.start();
350 t.waitForServerOnline();
351 } catch (InterruptedException ie) {
352 throw new IOException("Interrupted adding regionserver to cluster", ie);
353 }
354 return t;
355 }
356
357
358
359
360
361 public String abortRegionServer(int serverNumber) {
362 HRegionServer server = getRegionServer(serverNumber);
363 LOG.info("Aborting " + server.toString());
364 server.abort("Aborting for tests", new Exception("Trace info"));
365 return server.toString();
366 }
367
368
369
370
371
372
373
374 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
375 return stopRegionServer(serverNumber, true);
376 }
377
378
379
380
381
382
383
384
385
386
387
388 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
389 final boolean shutdownFS) {
390 JVMClusterUtil.RegionServerThread server =
391 hbaseCluster.getRegionServers().get(serverNumber);
392 LOG.info("Stopping " + server.toString());
393 server.getRegionServer().stop("Stopping rs " + serverNumber);
394 return server;
395 }
396
397
398
399
400
401
402
403 public String waitOnRegionServer(final int serverNumber) {
404 return this.hbaseCluster.waitOnRegionServer(serverNumber);
405 }
406
407
408
409
410
411
412
413
414 public JVMClusterUtil.MasterThread startMaster() throws IOException {
415 Configuration c = HBaseConfiguration.create(conf);
416 User user =
417 HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
418
419 JVMClusterUtil.MasterThread t = null;
420 try {
421 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
422 t.start();
423 } catch (InterruptedException ie) {
424 throw new IOException("Interrupted adding master to cluster", ie);
425 }
426 return t;
427 }
428
429
430
431
432
433 public MasterService.BlockingInterface getMasterAdminService() {
434 return this.hbaseCluster.getActiveMaster().getMasterRpcServices();
435 }
436
437
438
439
440
441 public HMaster getMaster() {
442 return this.hbaseCluster.getActiveMaster();
443 }
444
445
446
447
448
449 public MasterThread getMasterThread() {
450 for (MasterThread mt: hbaseCluster.getLiveMasters()) {
451 if (mt.getMaster().isActiveMaster()) {
452 return mt;
453 }
454 }
455 return null;
456 }
457
458
459
460
461
462 public HMaster getMaster(final int serverNumber) {
463 return this.hbaseCluster.getMaster(serverNumber);
464 }
465
466
467
468
469
470 public String abortMaster(int serverNumber) {
471 HMaster server = getMaster(serverNumber);
472 LOG.info("Aborting " + server.toString());
473 server.abort("Aborting for tests", new Exception("Trace info"));
474 return server.toString();
475 }
476
477
478
479
480
481
482
483 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
484 return stopMaster(serverNumber, true);
485 }
486
487
488
489
490
491
492
493
494
495
496
497 public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
498 final boolean shutdownFS) {
499 JVMClusterUtil.MasterThread server =
500 hbaseCluster.getMasters().get(serverNumber);
501 LOG.info("Stopping " + server.toString());
502 server.getMaster().stop("Stopping master " + serverNumber);
503 return server;
504 }
505
506
507
508
509
510
511
512 public String waitOnMaster(final int serverNumber) {
513 return this.hbaseCluster.waitOnMaster(serverNumber);
514 }
515
516
517
518
519
520
521
522
523
524 public boolean waitForActiveAndReadyMaster(long timeout) throws IOException {
525 List<JVMClusterUtil.MasterThread> mts;
526 long start = System.currentTimeMillis();
527 while (!(mts = getMasterThreads()).isEmpty()
528 && (System.currentTimeMillis() - start) < timeout) {
529 for (JVMClusterUtil.MasterThread mt : mts) {
530 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
531 return true;
532 }
533 }
534
535 Threads.sleep(100);
536 }
537 return false;
538 }
539
540
541
542
543 public List<JVMClusterUtil.MasterThread> getMasterThreads() {
544 return this.hbaseCluster.getMasters();
545 }
546
547
548
549
550 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
551 return this.hbaseCluster.getLiveMasters();
552 }
553
554
555
556
557 public void join() {
558 this.hbaseCluster.join();
559 }
560
561
562
563
564
565 @SuppressWarnings("deprecation")
566 public void shutdown() throws IOException {
567 if (this.hbaseCluster != null) {
568 this.hbaseCluster.shutdown();
569 }
570 }
571
572 @Override
573 public void close() throws IOException {
574 }
575
576 @Override
577 public ClusterStatus getClusterStatus() throws IOException {
578 HMaster master = getMaster();
579 return master == null ? null : master.getClusterStatus();
580 }
581
582
583
584
585
586 public void flushcache() throws IOException {
587 for (JVMClusterUtil.RegionServerThread t:
588 this.hbaseCluster.getRegionServers()) {
589 for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
590 r.flush(true);
591 }
592 }
593 }
594
595
596
597
598
599 public void flushcache(TableName tableName) throws IOException {
600 for (JVMClusterUtil.RegionServerThread t:
601 this.hbaseCluster.getRegionServers()) {
602 for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
603 if(r.getTableDesc().getTableName().equals(tableName)) {
604 r.flush(true);
605 }
606 }
607 }
608 }
609
610
611
612
613
614 public void compact(boolean major) throws IOException {
615 for (JVMClusterUtil.RegionServerThread t:
616 this.hbaseCluster.getRegionServers()) {
617 for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
618 r.compact(major);
619 }
620 }
621 }
622
623
624
625
626
627 public void compact(TableName tableName, boolean major) throws IOException {
628 for (JVMClusterUtil.RegionServerThread t:
629 this.hbaseCluster.getRegionServers()) {
630 for(Region r: t.getRegionServer().getOnlineRegionsLocalContext()) {
631 if(r.getTableDesc().getTableName().equals(tableName)) {
632 r.compact(major);
633 }
634 }
635 }
636 }
637
638
639
640
641 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
642 return this.hbaseCluster.getRegionServers();
643 }
644
645
646
647
648 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
649 return this.hbaseCluster.getLiveRegionServers();
650 }
651
652
653
654
655
656
657 public HRegionServer getRegionServer(int serverNumber) {
658 return hbaseCluster.getRegionServer(serverNumber);
659 }
660
661 public List<HRegion> getRegions(byte[] tableName) {
662 return getRegions(TableName.valueOf(tableName));
663 }
664
665 public List<HRegion> getRegions(TableName tableName) {
666 List<HRegion> ret = new ArrayList<HRegion>();
667 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
668 HRegionServer hrs = rst.getRegionServer();
669 for (Region region : hrs.getOnlineRegionsLocalContext()) {
670 if (region.getTableDesc().getTableName().equals(tableName)) {
671 ret.add((HRegion)region);
672 }
673 }
674 }
675 return ret;
676 }
677
678
679
680
681
682 public int getServerWithMeta() {
683 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
684 }
685
686
687
688
689
690
691
692 public int getServerWith(byte[] regionName) {
693 int index = -1;
694 int count = 0;
695 for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
696 HRegionServer hrs = rst.getRegionServer();
697 Region region = hrs.getOnlineRegion(regionName);
698 if (region != null) {
699 index = count;
700 break;
701 }
702 count++;
703 }
704 return index;
705 }
706
707 @Override
708 public ServerName getServerHoldingRegion(final TableName tn, byte[] regionName)
709 throws IOException {
710
711
712
713
714 HMaster master = getMaster();
715 Region region = master.getOnlineRegion(regionName);
716 if (region != null) {
717 return master.getServerName();
718 }
719 int index = getServerWith(regionName);
720 if (index < 0) {
721 return null;
722 }
723 return getRegionServer(index).getServerName();
724 }
725
726
727
728
729
730
731
732 public long countServedRegions() {
733 long count = 0;
734 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
735 count += rst.getRegionServer().getNumberOfOnlineRegions();
736 }
737 for (JVMClusterUtil.MasterThread mt : getLiveMasterThreads()) {
738 count += mt.getMaster().getNumberOfOnlineRegions();
739 }
740 return count;
741 }
742
743
744
745
746
747 public void killAll() {
748 for (RegionServerThread rst : getRegionServerThreads()) {
749 rst.getRegionServer().abort("killAll");
750 }
751 for (MasterThread masterThread : getMasterThreads()) {
752 masterThread.getMaster().abort("killAll", new Throwable());
753 }
754 }
755
756 @Override
757 public void waitUntilShutDown() {
758 this.hbaseCluster.join();
759 }
760
761 public List<HRegion> findRegionsForTable(TableName tableName) {
762 ArrayList<HRegion> ret = new ArrayList<HRegion>();
763 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
764 HRegionServer hrs = rst.getRegionServer();
765 for (Region region : hrs.getOnlineRegions(tableName)) {
766 if (region.getTableDesc().getTableName().equals(tableName)) {
767 ret.add((HRegion)region);
768 }
769 }
770 }
771 return ret;
772 }
773
774
775 protected int getRegionServerIndex(ServerName serverName) {
776
777 List<RegionServerThread> servers = getRegionServerThreads();
778 for (int i=0; i < servers.size(); i++) {
779 if (servers.get(i).getRegionServer().getServerName().equals(serverName)) {
780 return i;
781 }
782 }
783 return -1;
784 }
785
786 protected int getMasterIndex(ServerName serverName) {
787 List<MasterThread> masters = getMasterThreads();
788 for (int i = 0; i < masters.size(); i++) {
789 if (masters.get(i).getMaster().getServerName().equals(serverName)) {
790 return i;
791 }
792 }
793 return -1;
794 }
795
796 @Override
797 public AdminService.BlockingInterface getAdminProtocol(ServerName serverName) throws IOException {
798 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
799 }
800
801 @Override
802 public ClientService.BlockingInterface getClientProtocol(ServerName serverName)
803 throws IOException {
804 return getRegionServer(getRegionServerIndex(serverName)).getRSRpcServices();
805 }
806 }