View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.concurrent.ThreadPoolExecutor;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.ServerName;
29  import org.apache.hadoop.hbase.errorhandling.ForeignException;
30  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
31  import org.apache.hadoop.hbase.master.MasterServices;
32  import org.apache.hadoop.hbase.master.MetricsMaster;
33  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
34  import org.apache.zookeeper.KeeperException;
35  
36  public class SimpleMasterProcedureManager extends MasterProcedureManager {
37  
38    public static final String SIMPLE_SIGNATURE = "simle_test";
39    public static final String SIMPLE_DATA = "simple_test_data";
40  
41    private static final Log LOG = LogFactory.getLog(SimpleMasterProcedureManager.class);
42  
43    private MasterServices master;
44    private ProcedureCoordinator coordinator;
45  
46    private boolean done;
47  
48    @Override
49    public void stop(String why) {
50      LOG.info("stop: " + why);
51    }
52  
53    @Override
54    public boolean isStopped() {
55      return false;
56    }
57  
58    @Override
59    public void initialize(MasterServices master, MetricsMaster metricsMaster)
60        throws KeeperException, IOException, UnsupportedOperationException {
61      this.master = master;
62      this.done = false;
63  
64      // setup the default procedure coordinator
65      String name = master.getServerName().toString();
66      ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
67      ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
68          master.getZooKeeper(), getProcedureSignature(), name);
69  
70      this.coordinator = new ProcedureCoordinator(comms, tpool);
71    }
72  
73    @Override
74    public String getProcedureSignature() {
75      return SIMPLE_SIGNATURE;
76    }
77  
78    @Override
79    public byte[] execProcedureWithRet(ProcedureDescription desc) throws IOException {
80      this.done = false;
81      // start the process on the RS
82      ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
83  
84      List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
85      List<String> servers = new ArrayList<String>();
86      for (ServerName sn : serverNames) {
87        servers.add(sn.toString());
88      }
89      Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers);
90      if (proc == null) {
91        String msg = "Failed to submit distributed procedure for '"
92            + getProcedureSignature() + "'";
93        LOG.error(msg);
94        throw new IOException(msg);
95      }
96  
97      HashMap<String, byte[]> returnData = null;
98      try {
99        // wait for the procedure to complete.  A timer thread is kicked off that should cancel this
100       // if it takes too long.
101       returnData = proc.waitForCompletedWithRet();
102       LOG.info("Done waiting - exec procedure for " + desc.getInstance());
103       this.done = true;
104     } catch (InterruptedException e) {
105       ForeignException ee =
106           new ForeignException("Interrupted while waiting for procdure to finish", e);
107       monitor.receive(ee);
108       Thread.currentThread().interrupt();
109     } catch (ForeignException e) {
110       monitor.receive(e);
111     }
112     // return the first value for testing
113     return returnData.values().iterator().next();
114   }
115 
116   @Override
117   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
118     return done;
119   }
120 
121 }