diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 2364811d8..f377a5fe7 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -351,7 +351,8 @@ def exec_cluster(config_file, stop=False, start=False, override_cluster_name=None, - port_forward=None): + port_forward=None, + with_output=False): """Runs a command on the specified cluster. Arguments: @@ -408,7 +409,13 @@ def exec_cluster(config_file, shutdown_cmd = wrap_docker(shutdown_cmd) cmd += ("; {}; sudo shutdown -h now".format(shutdown_cmd)) - _exec(updater, cmd, screen, tmux, port_forward=port_forward) + result = _exec( + updater, + cmd, + screen, + tmux, + port_forward=port_forward, + with_output=with_output) if tmux or screen: attach_command_parts = ["ray attach", config_file] @@ -424,11 +431,12 @@ def exec_cluster(config_file, attach_info = "Use `{}` to check on command status.".format( attach_command) logger.info(attach_info) + return result finally: provider.cleanup() -def _exec(updater, cmd, screen, tmux, port_forward=None): +def _exec(updater, cmd, screen, tmux, port_forward=None, with_output=False): if cmd: if screen: cmd = [ @@ -443,8 +451,12 @@ def _exec(updater, cmd, screen, tmux, port_forward=None): quote(cmd + "; exec bash") ] cmd = " ".join(cmd) - updater.cmd_runner.run( - cmd, allocate_tty=True, exit_on_fail=True, port_forward=port_forward) + return updater.cmd_runner.run( + cmd, + allocate_tty=True, + exit_on_fail=True, + port_forward=port_forward, + with_output=with_output) def rsync(config_file, diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index 664df34ba..1e1d3f52c 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -48,7 +48,8 @@ class KubernetesCommandRunner: timeout=120, allocate_tty=False, exit_on_fail=False, - port_forward=None): + port_forward=None, + with_output=False): if cmd and port_forward: raise Exception( "exec with Kubernetes can't forward ports and execute" @@ -82,7 +83,12 @@ class KubernetesCommandRunner: "--", ] + with_interactive(cmd) try: - self.process_runner.check_call(" ".join(final_cmd), shell=True) + if with_output: + return self.process_runner.check_output( + " ".join(final_cmd), shell=True) + else: + self.process_runner.check_call( + " ".join(final_cmd), shell=True) except subprocess.CalledProcessError: if exit_on_fail: quoted_cmd = " ".join(final_cmd[:-1] + @@ -232,7 +238,8 @@ class SSHCommandRunner: timeout=120, allocate_tty=False, exit_on_fail=False, - port_forward=None): + port_forward=None, + with_output=False): self.set_ssh_ip_if_required() @@ -260,7 +267,10 @@ class SSHCommandRunner: # still create an interactive shell in some ssh versions. final_cmd.append("while true; do sleep 86400; done") try: - self.process_runner.check_call(final_cmd) + if with_output: + return self.process_runner.check_output(final_cmd) + else: + self.process_runner.check_call(final_cmd) except subprocess.CalledProcessError: if exit_on_fail: quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) @@ -402,7 +412,6 @@ class NodeUpdater: def do_update(self): self.provider.set_node_tags( self.node_id, {TAG_RAY_NODE_STATUS: STATUS_WAITING_FOR_SSH}) - deadline = time.time() + NODE_START_WAIT_S self.wait_ready(deadline)