Revert "[autoscaler] Create Docker Command Runner" (#8816)

This reverts commit 54189bca5a.
This commit is contained in:
Ian Rodney 2020-06-06 14:21:44 -07:00 committed by GitHub
parent ad695a818b
commit b07b4f2e55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 16 additions and 86 deletions

View file

@ -321,8 +321,7 @@ class StandardAutoscaler:
self.config["worker_start_ray_commands"]),
runtime_hash=self.runtime_hash,
process_runner=self.process_runner,
use_internal_ip=True,
docker_config=self.config["docker"])
use_internal_ip=True)
updater.start()
self.updaters[node_id] = updater
@ -361,8 +360,7 @@ class StandardAutoscaler:
ray_start_commands=with_head_node_ip(ray_start_commands),
runtime_hash=self.runtime_hash,
process_runner=self.process_runner,
use_internal_ip=True,
docker_config=self.config["docker"])
use_internal_ip=True)
updater.start()
self.updaters[node_id] = updater

View file

@ -147,8 +147,7 @@ def kill_node(config_file, yes, hard, override_cluster_name):
initialization_commands=[],
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
docker_config=config["docker"])
runtime_hash="")
_exec(updater, "ray stop", False, False)
@ -287,7 +286,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
setup_commands=init_commands,
ray_start_commands=ray_start_commands,
runtime_hash=runtime_hash,
docker_config=config["docker"])
)
updater.start()
updater.join()
@ -408,7 +407,7 @@ def exec_cluster(config_file,
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
docker_config=config["docker"])
)
def wrap_docker(command):
container_name = config["docker"]["container_name"]
@ -530,7 +529,7 @@ def rsync(config_file,
setup_commands=[],
ray_start_commands=[],
runtime_hash="",
docker_config=config["docker"])
)
if down:
rsync = updater.rsync_down
else:

View file

@ -87,13 +87,7 @@ class KubernetesNodeProvider(NodeProvider):
for node_id in node_ids:
self.terminate_node(node_id)
def get_command_runner(self,
log_prefix,
node_id,
auth_config,
cluster_name,
process_runner,
use_internal_ip,
docker_config=None):
def get_command_runner(self, log_prefix, node_id, auth_config,
cluster_name, process_runner, use_internal_ip):
return KubernetesCommandRunner(log_prefix, self.namespace, node_id,
auth_config, process_runner)

View file

@ -3,7 +3,7 @@ import logging
import os
import yaml
from ray.autoscaler.updater import SSHCommandRunner, DockerCommandRunner
from ray.autoscaler.updater import SSHCommandRunner
logger = logging.getLogger(__name__)
@ -211,14 +211,8 @@ class NodeProvider:
"""Clean-up when a Provider is no longer required."""
pass
def get_command_runner(self,
log_prefix,
node_id,
auth_config,
cluster_name,
process_runner,
use_internal_ip,
docker_config=None):
def get_command_runner(self, log_prefix, node_id, auth_config,
cluster_name, process_runner, use_internal_ip):
""" Returns the CommandRunner class used to perform SSH commands.
Args:
@ -232,19 +226,7 @@ class NodeProvider:
in the CommandRunner. E.g., subprocess.
use_internal_ip(bool): whether the node_id belongs to an internal ip
or external ip.
docker_config(dict): If set, the docker information of the docker
container that commands should be run on.
"""
common_args = {
"log_prefix": log_prefix,
"node_id": node_id,
"provider": self,
"auth_config": auth_config,
"cluster_name": cluster_name,
"process_runner": process_runner,
"use_internal_ip": use_internal_ip
}
if docker_config and docker_config["container_name"] != "":
return DockerCommandRunner(docker_config, **common_args)
else:
return SSHCommandRunner(**common_args)
return SSHCommandRunner(log_prefix, node_id, self, auth_config,
cluster_name, process_runner, use_internal_ip)

View file

@ -294,48 +294,6 @@ class SSHCommandRunner:
self.ssh_private_key, self.ssh_user, self.ssh_ip)
class DockerCommandRunner(SSHCommandRunner):
def __init__(self, docker_config, **common_args):
self.ssh_command_runner = SSHCommandRunner(**common_args)
self.docker_name = docker_config["container_name"]
self.docker_config = docker_config
def run(self,
cmd,
timeout=120,
allocate_tty=False,
exit_on_fail=False,
port_forward=None,
with_output=False):
return self.ssh_command_runner.run(cmd, timeout, allocate_tty,
exit_on_fail, port_forward,
with_output)
def run_rsync_up(self, source, target):
self.ssh_command_runner.run_rsync_up(source, target)
self.ssh_command_runner.run("docker cp {} {}:{}".format(
target, self.docker_name, self.docker_expand_user(target)))
def run_rsync_down(self, source, target):
self.ssh_command_runner.run("docker cp {}:{} {}".format(
self.docker_name, self.docker_expand_user(source), source))
self.ssh_command_runner.run_rsync_down(source, target)
def remote_shell_command_str(self):
inner_str = self.ssh_command_runner.remote_shell_command_str().replace(
"ssh", "ssh -tt", 1).strip("\n")
return inner_str + " docker exec -it {} /bin/bash\n".format(
self.docker_name)
def docker_expand_user(self, string):
if string.find("~") == 0:
return string.replace(
"~",
"`docker exec ray_docker env | grep HOME | cut -d'=' -f2`", 1)
else:
return string
class NodeUpdater:
"""A process for syncing files and running init commands on a node."""
@ -351,15 +309,14 @@ class NodeUpdater:
ray_start_commands,
runtime_hash,
process_runner=subprocess,
use_internal_ip=False,
docker_config=None):
use_internal_ip=False):
self.log_prefix = "NodeUpdater: {}: ".format(node_id)
use_internal_ip = (use_internal_ip
or provider_config.get("use_internal_ips", False))
self.cmd_runner = provider.get_command_runner(
self.log_prefix, node_id, auth_config, cluster_name,
process_runner, use_internal_ip, docker_config)
process_runner, use_internal_ip)
self.daemon = True
self.process_runner = process_runner