1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.Map;
24
25 import org.apache.commons.lang.StringUtils;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.conf.Configured;
31 import org.apache.hadoop.hbase.HBaseClusterManager.CommandProvider.Operation;
32 import org.apache.hadoop.hbase.util.Pair;
33 import org.apache.hadoop.hbase.util.RetryCounter;
34 import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
35 import org.apache.hadoop.hbase.util.RetryCounterFactory;
36 import org.apache.hadoop.util.Shell;
37
38
39
40
41
42
43
44
45 @InterfaceAudience.Private
46 public class HBaseClusterManager extends Configured implements ClusterManager {
47 private static final String SIGKILL = "SIGKILL";
48 private static final String SIGSTOP = "SIGSTOP";
49 private static final String SIGCONT = "SIGCONT";
50
51 protected static final Log LOG = LogFactory.getLog(HBaseClusterManager.class);
52 private String sshUserName;
53 private String sshOptions;
54
55
56
57
58
59
60 private static final String DEFAULT_TUNNEL_CMD =
61 "/usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo -u %6$s %5$s\"";
62 private String tunnelCmd;
63
64 private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
65 private static final int DEFAULT_RETRY_ATTEMPTS = 5;
66
67 private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
68 private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
69
70 protected RetryCounterFactory retryCounterFactory;
71
72 @Override
73 public void setConf(Configuration conf) {
74 super.setConf(conf);
75 if (conf == null) {
76
77 return;
78 }
79 sshUserName = conf.get("hbase.it.clustermanager.ssh.user", "");
80 String extraSshOptions = conf.get("hbase.it.clustermanager.ssh.opts", "");
81 sshOptions = System.getenv("HBASE_SSH_OPTS");
82 if (!extraSshOptions.isEmpty()) {
83 sshOptions = StringUtils.join(new Object[] { sshOptions, extraSshOptions }, " ");
84 }
85 sshOptions = (sshOptions == null) ? "" : sshOptions;
86 tunnelCmd = conf.get("hbase.it.clustermanager.ssh.cmd", DEFAULT_TUNNEL_CMD);
87
88 if ((sshUserName != null && sshUserName.length() > 0) ||
89 (sshOptions != null && sshOptions.length() > 0)) {
90 LOG.info("Running with SSH user [" + sshUserName + "] and options [" + sshOptions + "]");
91 }
92
93 this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
94 .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
95 .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
96 }
97
98 private String getServiceUser(ServiceType service) {
99 Configuration conf = getConf();
100 switch (service) {
101 case HADOOP_DATANODE:
102 return conf.get("hbase.it.clustermanager.hadoop.hdfs.user", "hdfs");
103 case ZOOKEEPER_SERVER:
104 return conf.get("hbase.it.clustermanager.zookeeper.user", "zookeeper");
105 default:
106 return conf.get("hbase.it.clustermanager.hbase.user", "hbase");
107 }
108 }
109
110
111
112
113 protected class RemoteShell extends Shell.ShellCommandExecutor {
114 private String hostname;
115 private String user;
116
117 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env,
118 long timeout) {
119 super(execString, dir, env, timeout);
120 this.hostname = hostname;
121 }
122
123 public RemoteShell(String hostname, String[] execString, File dir, Map<String, String> env) {
124 super(execString, dir, env);
125 this.hostname = hostname;
126 }
127
128 public RemoteShell(String hostname, String[] execString, File dir) {
129 super(execString, dir);
130 this.hostname = hostname;
131 }
132
133 public RemoteShell(String hostname, String[] execString) {
134 super(execString);
135 this.hostname = hostname;
136 }
137
138 public RemoteShell(String hostname, String user, String[] execString) {
139 super(execString);
140 this.hostname = hostname;
141 this.user = user;
142 }
143
144 @Override
145 public String[] getExecString() {
146 String at = sshUserName.isEmpty() ? "" : "@";
147 String remoteCmd = StringUtils.join(super.getExecString(), " ");
148 String cmd = String.format(tunnelCmd, sshOptions, sshUserName, at, hostname, remoteCmd, user);
149 LOG.info("Executing full command [" + cmd + "]");
150 return new String[] { "/usr/bin/env", "bash", "-c", cmd };
151 }
152
153 @Override
154 public void execute() throws IOException {
155 super.execute();
156 }
157 }
158
159
160
161
162
163
164 static abstract class CommandProvider {
165
166 enum Operation {
167 START, STOP, RESTART
168 }
169
170 public abstract String getCommand(ServiceType service, Operation op);
171
172 public String isRunningCommand(ServiceType service) {
173 return findPidCommand(service);
174 }
175
176 protected String findPidCommand(ServiceType service) {
177 return String.format("ps ux | grep proc_%s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
178 service);
179 }
180
181 public String signalCommand(ServiceType service, String signal) {
182 return String.format("%s | xargs kill -s %s", findPidCommand(service), signal);
183 }
184 }
185
186
187
188
189 static class HBaseShellCommandProvider extends CommandProvider {
190 private final String hbaseHome;
191 private final String confDir;
192
193 HBaseShellCommandProvider(Configuration conf) {
194 hbaseHome = conf.get("hbase.it.clustermanager.hbase.home",
195 System.getenv("HBASE_HOME"));
196 String tmp = conf.get("hbase.it.clustermanager.hbase.conf.dir",
197 System.getenv("HBASE_CONF_DIR"));
198 if (tmp != null) {
199 confDir = String.format("--config %s", tmp);
200 } else {
201 confDir = "";
202 }
203 }
204
205 @Override
206 public String getCommand(ServiceType service, Operation op) {
207 return String.format("%s/bin/hbase-daemon.sh %s %s %s", hbaseHome, confDir,
208 op.toString().toLowerCase(), service);
209 }
210 }
211
212
213
214
215 static class HadoopShellCommandProvider extends CommandProvider {
216 private final String hadoopHome;
217 private final String confDir;
218
219 HadoopShellCommandProvider(Configuration conf) throws IOException {
220 hadoopHome = conf.get("hbase.it.clustermanager.hadoop.home",
221 System.getenv("HADOOP_HOME"));
222 String tmp = conf.get("hbase.it.clustermanager.hadoop.conf.dir",
223 System.getenv("HADOOP_CONF_DIR"));
224 if (hadoopHome == null) {
225 throw new IOException("Hadoop home configuration parameter i.e. " +
226 "'hbase.it.clustermanager.hadoop.home' is not configured properly.");
227 }
228 if (tmp != null) {
229 confDir = String.format("--config %s", tmp);
230 } else {
231 confDir = "";
232 }
233 }
234
235 @Override
236 public String getCommand(ServiceType service, Operation op) {
237 return String.format("%s/sbin/hadoop-daemon.sh %s %s %s", hadoopHome, confDir,
238 op.toString().toLowerCase(), service);
239 }
240 }
241
242
243
244
245 static class ZookeeperShellCommandProvider extends CommandProvider {
246 private final String zookeeperHome;
247 private final String confDir;
248
249 ZookeeperShellCommandProvider(Configuration conf) throws IOException {
250 zookeeperHome = conf.get("hbase.it.clustermanager.zookeeper.home",
251 System.getenv("ZOOBINDIR"));
252 String tmp = conf.get("hbase.it.clustermanager.zookeeper.conf.dir",
253 System.getenv("ZOOCFGDIR"));
254 if (zookeeperHome == null) {
255 throw new IOException("Zookeeper home configuration parameter i.e. " +
256 "'hbase.it.clustermanager.zookeeper.home' is not configured properly.");
257 }
258 if (tmp != null) {
259 confDir = String.format("--config %s", tmp);
260 } else {
261 confDir = "";
262 }
263 }
264
265 @Override
266 public String getCommand(ServiceType service, Operation op) {
267 return String.format("%s/bin/zkServer.sh %s", zookeeperHome, op.toString().toLowerCase());
268 }
269
270 @Override
271 protected String findPidCommand(ServiceType service) {
272 return String.format("ps ux | grep %s | grep -v grep | tr -s ' ' | cut -d ' ' -f2",
273 service);
274 }
275 }
276
277 public HBaseClusterManager() {
278 }
279
280 protected CommandProvider getCommandProvider(ServiceType service) throws IOException {
281 switch (service) {
282 case HADOOP_DATANODE:
283 return new HadoopShellCommandProvider(getConf());
284 case ZOOKEEPER_SERVER:
285 return new ZookeeperShellCommandProvider(getConf());
286 default:
287 return new HBaseShellCommandProvider(getConf());
288 }
289 }
290
291
292
293
294
295
296 private Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
297 throws IOException {
298 LOG.info("Executing remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname);
299
300 RemoteShell shell = new RemoteShell(hostname, getServiceUser(service), cmd);
301 try {
302 shell.execute();
303 } catch (Shell.ExitCodeException ex) {
304
305 String output = shell.getOutput();
306
307 throw new Shell.ExitCodeException(ex.getExitCode(), "stderr: " + ex.getMessage()
308 + ", stdout: " + output);
309 }
310
311 LOG.info("Executed remote command, exit code:" + shell.getExitCode()
312 + " , output:" + shell.getOutput());
313
314 return new Pair<Integer, String>(shell.getExitCode(), shell.getOutput());
315 }
316
317 private Pair<Integer, String> execWithRetries(String hostname, ServiceType service, String... cmd)
318 throws IOException {
319 RetryCounter retryCounter = retryCounterFactory.create();
320 while (true) {
321 try {
322 return exec(hostname, service, cmd);
323 } catch (IOException e) {
324 retryOrThrow(retryCounter, e, hostname, cmd);
325 }
326 try {
327 retryCounter.sleepUntilNextRetry();
328 } catch (InterruptedException ex) {
329
330 LOG.warn("Sleep Interrupted:" + ex);
331 }
332 }
333 }
334
335 private <E extends Exception> void retryOrThrow(RetryCounter retryCounter, E ex,
336 String hostname, String[] cmd) throws E {
337 if (retryCounter.shouldRetry()) {
338 LOG.warn("Remote command: " + StringUtils.join(cmd, " ") + " , hostname:" + hostname
339 + " failed at attempt " + retryCounter.getAttemptTimes() + ". Retrying until maxAttempts: "
340 + retryCounter.getMaxAttempts() + ". Exception: " + ex.getMessage());
341 return;
342 }
343 throw ex;
344 }
345
346 private void exec(String hostname, ServiceType service, Operation op) throws IOException {
347 execWithRetries(hostname, service, getCommandProvider(service).getCommand(service, op));
348 }
349
350 @Override
351 public void start(ServiceType service, String hostname, int port) throws IOException {
352 exec(hostname, service, Operation.START);
353 }
354
355 @Override
356 public void stop(ServiceType service, String hostname, int port) throws IOException {
357 exec(hostname, service, Operation.STOP);
358 }
359
360 @Override
361 public void restart(ServiceType service, String hostname, int port) throws IOException {
362 exec(hostname, service, Operation.RESTART);
363 }
364
365 public void signal(ServiceType service, String signal, String hostname) throws IOException {
366 execWithRetries(hostname, service, getCommandProvider(service).signalCommand(service, signal));
367 }
368
369 @Override
370 public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
371 String ret = execWithRetries(hostname, service,
372 getCommandProvider(service).isRunningCommand(service)).getSecond();
373 return ret.length() > 0;
374 }
375
376 @Override
377 public void kill(ServiceType service, String hostname, int port) throws IOException {
378 signal(service, SIGKILL, hostname);
379 }
380
381 @Override
382 public void suspend(ServiceType service, String hostname, int port) throws IOException {
383 signal(service, SIGSTOP, hostname);
384 }
385
386 @Override
387 public void resume(ServiceType service, String hostname, int port) throws IOException {
388 signal(service, SIGCONT, hostname);
389 }
390 }