Capture output for commands run by the autoscaler (#7381)

This commit is contained in:
Allen 2020-03-03 10:19:22 -08:00 committed by GitHub
parent 4d42664b2a
commit b74eb5fce6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 10 deletions

View file

@ -351,7 +351,8 @@ def exec_cluster(config_file,
stop=False, stop=False,
start=False, start=False,
override_cluster_name=None, override_cluster_name=None,
port_forward=None): port_forward=None,
with_output=False):
"""Runs a command on the specified cluster. """Runs a command on the specified cluster.
Arguments: Arguments:
@ -408,7 +409,13 @@ def exec_cluster(config_file,
shutdown_cmd = wrap_docker(shutdown_cmd) shutdown_cmd = wrap_docker(shutdown_cmd)
cmd += ("; {}; sudo shutdown -h now".format(shutdown_cmd)) cmd += ("; {}; sudo shutdown -h now".format(shutdown_cmd))
_exec(updater, cmd, screen, tmux, port_forward=port_forward) result = _exec(
updater,
cmd,
screen,
tmux,
port_forward=port_forward,
with_output=with_output)
if tmux or screen: if tmux or screen:
attach_command_parts = ["ray attach", config_file] attach_command_parts = ["ray attach", config_file]
@ -424,11 +431,12 @@ def exec_cluster(config_file,
attach_info = "Use `{}` to check on command status.".format( attach_info = "Use `{}` to check on command status.".format(
attach_command) attach_command)
logger.info(attach_info) logger.info(attach_info)
return result
finally: finally:
provider.cleanup() provider.cleanup()
def _exec(updater, cmd, screen, tmux, port_forward=None): def _exec(updater, cmd, screen, tmux, port_forward=None, with_output=False):
if cmd: if cmd:
if screen: if screen:
cmd = [ cmd = [
@ -443,8 +451,12 @@ def _exec(updater, cmd, screen, tmux, port_forward=None):
quote(cmd + "; exec bash") quote(cmd + "; exec bash")
] ]
cmd = " ".join(cmd) cmd = " ".join(cmd)
updater.cmd_runner.run( return updater.cmd_runner.run(
cmd, allocate_tty=True, exit_on_fail=True, port_forward=port_forward) cmd,
allocate_tty=True,
exit_on_fail=True,
port_forward=port_forward,
with_output=with_output)
def rsync(config_file, def rsync(config_file,

View file

@ -48,7 +48,8 @@ class KubernetesCommandRunner:
timeout=120, timeout=120,
allocate_tty=False, allocate_tty=False,
exit_on_fail=False, exit_on_fail=False,
port_forward=None): port_forward=None,
with_output=False):
if cmd and port_forward: if cmd and port_forward:
raise Exception( raise Exception(
"exec with Kubernetes can't forward ports and execute" "exec with Kubernetes can't forward ports and execute"
@ -82,7 +83,12 @@ class KubernetesCommandRunner:
"--", "--",
] + with_interactive(cmd) ] + with_interactive(cmd)
try: try:
self.process_runner.check_call(" ".join(final_cmd), shell=True) if with_output:
return self.process_runner.check_output(
" ".join(final_cmd), shell=True)
else:
self.process_runner.check_call(
" ".join(final_cmd), shell=True)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
if exit_on_fail: if exit_on_fail:
quoted_cmd = " ".join(final_cmd[:-1] + quoted_cmd = " ".join(final_cmd[:-1] +
@ -232,7 +238,8 @@ class SSHCommandRunner:
timeout=120, timeout=120,
allocate_tty=False, allocate_tty=False,
exit_on_fail=False, exit_on_fail=False,
port_forward=None): port_forward=None,
with_output=False):
self.set_ssh_ip_if_required() self.set_ssh_ip_if_required()
@ -260,7 +267,10 @@ class SSHCommandRunner:
# still create an interactive shell in some ssh versions. # still create an interactive shell in some ssh versions.
final_cmd.append("while true; do sleep 86400; done") final_cmd.append("while true; do sleep 86400; done")
try: try:
self.process_runner.check_call(final_cmd) if with_output:
return self.process_runner.check_output(final_cmd)
else:
self.process_runner.check_call(final_cmd)
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
if exit_on_fail: if exit_on_fail:
quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])]) quoted_cmd = " ".join(final_cmd[:-1] + [quote(final_cmd[-1])])
@ -402,7 +412,6 @@ class NodeUpdater:
def do_update(self): def do_update(self):
self.provider.set_node_tags( self.provider.set_node_tags(
self.node_id, {TAG_RAY_NODE_STATUS: STATUS_WAITING_FOR_SSH}) self.node_id, {TAG_RAY_NODE_STATUS: STATUS_WAITING_FOR_SSH})
deadline = time.time() + NODE_START_WAIT_S deadline = time.time() + NODE_START_WAIT_S
self.wait_ready(deadline) self.wait_ready(deadline)