[docker] Use tmp paths for rsync and fix file_mounts on docker (#10368)

This commit is contained in:
Ian Rodney 2020-09-01 13:14:35 -07:00 committed by GitHub
parent 52a5ec99d0
commit 283f4d1060
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 215 additions and 92 deletions

View file

@ -1,19 +1,20 @@
from getpass import getuser
from shlex import quote
from typing import List, Tuple, Dict
from typing import Any, List, Tuple, Dict, Optional
import click
import hashlib
import json
import logging
import os
import subprocess
import sys
import time
import json
from ray.autoscaler.docker import check_docker_running_cmd, \
from ray.autoscaler.docker import check_bind_mounts_cmd, \
check_docker_running_cmd, \
check_docker_image, \
docker_autoscaler_setup, \
docker_start_cmds, \
DOCKER_MOUNT_PREFIX, \
with_docker_exec
from ray.autoscaler.log_timer import LogTimer
@ -134,7 +135,10 @@ class CommandRunnerInterface:
"""
raise NotImplementedError
def run_rsync_up(self, source: str, target: str) -> None:
def run_rsync_up(self,
source: str,
target: str,
options: Optional[Dict[str, Any]] = None) -> None:
"""Rsync files up to the cluster node.
Args:
@ -143,7 +147,10 @@ class CommandRunnerInterface:
"""
raise NotImplementedError
def run_rsync_down(self, source: str, target: str) -> None:
def run_rsync_down(self,
source: str,
target: str,
options: Optional[Dict[str, Any]] = None) -> None:
"""Rsync files down from the cluster node.
Args:
@ -231,7 +238,7 @@ class KubernetesCommandRunner(CommandRunnerInterface):
else:
raise
def run_rsync_up(self, source, target):
def run_rsync_up(self, source, target, options=None):
if target.startswith("~"):
target = "/root" + target[1:]
@ -257,7 +264,7 @@ class KubernetesCommandRunner(CommandRunnerInterface):
target)
])
def run_rsync_down(self, source, target):
def run_rsync_down(self, source, target, options=None):
if target.startswith("~"):
target = "/root" + target[1:]
@ -531,7 +538,7 @@ class SSHCommandRunner(CommandRunnerInterface):
else:
return self._run_helper(final_cmd, with_output, exit_on_fail)
def run_rsync_up(self, source, target):
def run_rsync_up(self, source, target, options=None):
self._set_ssh_ip_if_required()
command = [
"rsync", "--rsh",
@ -543,7 +550,7 @@ class SSHCommandRunner(CommandRunnerInterface):
cli_logger.verbose("Running `{}`", cf.bold(" ".join(command)))
self._run_helper(command, silent=is_rsync_silent())
def run_rsync_down(self, source, target):
def run_rsync_down(self, source, target, options=None):
self._set_ssh_ip_if_required()
command = [
@ -609,37 +616,41 @@ class DockerCommandRunner(CommandRunnerInterface):
with_output=with_output,
ssh_options_override_ssh_key=ssh_options_override_ssh_key)
def run_rsync_up(self, source, target):
# TODO(ilr) Expose this to before NodeUpdater::sync_file_mounts
protected_path = target
if target.find("/root") == 0:
target = target.replace("/root", "/tmp/root")
def run_rsync_up(self, source, target, options=None):
options = options or {}
host_destination = os.path.join(DOCKER_MOUNT_PREFIX,
target.lstrip("/"))
self.ssh_command_runner.run(
f"mkdir -p {os.path.dirname(target.rstrip('/'))}")
self.ssh_command_runner.run_rsync_up(source, target)
if self._check_container_status():
f"mkdir -p {os.path.dirname(host_destination.rstrip('/'))}")
self.ssh_command_runner.run_rsync_up(
source, host_destination, options=None)
if self._check_container_status() and not options.get(
"file_mount", False):
if os.path.isdir(source):
# Adding a "." means that docker copies the *contents*
# Without it, docker copies the source *into* the target
target += "/."
host_destination += "/."
self.ssh_command_runner.run("docker cp {} {}:{}".format(
target, self.container_name,
self._docker_expand_user(protected_path)))
host_destination, self.container_name,
self._docker_expand_user(target)))
def run_rsync_down(self, source, target):
protected_path = source
if source.find("/root") == 0:
source = source.replace("/root", "/tmp/root")
def run_rsync_down(self, source, target, options=None):
options = options or {}
host_source = os.path.join(DOCKER_MOUNT_PREFIX, source.lstrip("/"))
self.ssh_command_runner.run(
f"mkdir -p {os.path.dirname(source.rstrip('/'))}")
if protected_path[-1] == "/":
protected_path += "."
f"mkdir -p {os.path.dirname(host_source.rstrip('/'))}")
if source[-1] == "/":
source += "."
# Adding a "." means that docker copies the *contents*
# Without it, docker copies the source *into* the target
self.ssh_command_runner.run("docker cp {}:{} {}".format(
self.container_name, self._docker_expand_user(protected_path),
source))
self.ssh_command_runner.run_rsync_down(source, target)
if not options.get("file_mount", False):
self.ssh_command_runner.run("docker cp {}:{} {}".format(
self.container_name, self._docker_expand_user(source),
host_source))
self.ssh_command_runner.run_rsync_down(
host_source, target, options=None)
def remote_shell_command_str(self):
inner_str = self.ssh_command_runner.remote_shell_command_str().replace(
@ -722,9 +733,23 @@ class DockerCommandRunner(CommandRunnerInterface):
logger.error(f"A container with name {self.container_name} " +
f"is running image {running_image} instead " +
f"of {image} (which was provided in the YAML")
# Copy bootstrap config & key over
if as_head:
for copy_cmd in docker_autoscaler_setup(self.container_name):
self.run(copy_cmd, run_env="host")
mounts = self.run(
check_bind_mounts_cmd(self.container_name),
with_output=True,
run_env="host").decode("utf-8").strip()
try:
active_mounts = json.loads(mounts)
active_remote_mounts = [
mnt["Destination"] for mnt in active_mounts
]
for remote, local in file_mounts.items():
remote = self._docker_expand_user(remote)
if remote not in active_remote_mounts:
cli_logger.error(
"Please ray stop & restart cluster to "
f"allow mount {remote}:{local} to take hold")
except json.JSONDecodeError:
cli_logger.verbose(
"Unable to check if file_mounts specified in the YAML "
"differ from those on the running container.")
self.initialized = True

View file

@ -664,13 +664,13 @@ def get_or_create_head_node(config,
cli_logger.print("Prepared bootstrap config")
if restart_only:
init_commands = []
setup_commands = []
ray_start_commands = config["head_start_ray_commands"]
elif no_restart:
init_commands = config["head_setup_commands"]
setup_commands = config["head_setup_commands"]
ray_start_commands = []
else:
init_commands = config["head_setup_commands"]
setup_commands = config["head_setup_commands"]
ray_start_commands = config["head_start_ray_commands"]
if not no_restart:
@ -684,7 +684,7 @@ def get_or_create_head_node(config,
cluster_name=config["cluster_name"],
file_mounts=config["file_mounts"],
initialization_commands=config["initialization_commands"],
setup_commands=init_commands,
setup_commands=setup_commands,
ray_start_commands=ray_start_commands,
process_runner=_runner,
runtime_hash=runtime_hash,

View file

@ -1,4 +1,3 @@
import os
import logging
try: # py3
from shlex import quote
@ -7,6 +6,8 @@ except ImportError: # py2
logger = logging.getLogger(__name__)
DOCKER_MOUNT_PREFIX = "/tmp/ray_tmp_mount"
def validate_docker_config(config):
if "docker" not in config:
@ -46,20 +47,27 @@ def with_docker_exec(cmds,
]
def check_docker_running_cmd(cname):
def _check_helper(cname, template):
return " ".join([
"docker", "inspect", "-f", "'{{.State.Running}}'", cname, "||", "true"
"docker", "inspect", "-f", "'{{" + template + "}}'", cname, "||",
"true"
])
def check_docker_running_cmd(cname):
return _check_helper(cname, ".State.Running")
def check_bind_mounts_cmd(cname):
return _check_helper(cname, "json .Mounts")
def check_docker_image(cname):
return " ".join([
"docker", "inspect", "-f", "'{{.Config.Image}}'", cname, "||", "true"
])
return _check_helper(cname, ".Config.Image")
def docker_start_cmds(user, image, mount_dict, cname, user_options):
mount = {dst: dst for dst in mount_dict}
mount = {f"{DOCKER_MOUNT_PREFIX}/{dst}": dst for dst in mount_dict}
# TODO(ilr) Move away from defaulting to /root/
mount_flags = " ".join([
@ -78,17 +86,3 @@ def docker_start_cmds(user, image, mount_dict, cname, user_options):
mount_flags, env_flags, user_options_str, "--net=host", image, "bash"
]
return " ".join(docker_run)
def docker_autoscaler_setup(cname):
cmds = []
for path in ["~/ray_bootstrap_config.yaml", "~/ray_bootstrap_key.pem"]:
# needed because docker doesn't allow relative paths
base_path = os.path.basename(path)
cmds.append("docker cp {path} {cname}:{dpath}".format(
path=path, dpath=base_path, cname=cname))
cmds.extend(
with_docker_exec(
["cp {} {}".format("/" + base_path, path)],
container_name=cname))
return cmds

View file

@ -181,10 +181,12 @@ class NodeUpdater:
with LogTimer(self.log_prefix +
"Synced {} to {}".format(local_path, remote_path)):
self.cmd_runner.run(
"mkdir -p {}".format(os.path.dirname(remote_path)),
run_env="host")
sync_cmd(local_path, remote_path)
if not isinstance(self.cmd_runner, DockerCommandRunner):
# The DockerCommandRunner handles this internally
self.cmd_runner.run(
"mkdir -p {}".format(os.path.dirname(remote_path)),
run_env="host")
sync_cmd(local_path, remote_path, file_mount=True)
if remote_path not in nolog_paths:
# todo: timed here?
@ -413,19 +415,23 @@ class NodeUpdater:
raise click.ClickException("Start command failed.")
def rsync_up(self, source, target):
def rsync_up(self, source, target, file_mount=False):
cli_logger.old_info(logger, "{}Syncing {} to {}...", self.log_prefix,
source, target)
self.cmd_runner.run_rsync_up(source, target)
options = {}
options["file_mount"] = file_mount
self.cmd_runner.run_rsync_up(source, target, options=options)
cli_logger.verbose("`rsync`ed {} (local) to {} (remote)",
cf.bold(source), cf.bold(target))
def rsync_down(self, source, target):
def rsync_down(self, source, target, file_mount=False):
cli_logger.old_info(logger, "{}Syncing {} from {}...", self.log_prefix,
source, target)
self.cmd_runner.run_rsync_down(source, target)
options = {}
options["file_mount"] = file_mount
self.cmd_runner.run_rsync_down(source, target, options=options)
cli_logger.verbose("`rsync`ed {} (remote) to {} (local)",
cf.bold(source), cf.bold(target))

View file

@ -12,6 +12,7 @@ import ray
import ray.services as services
from ray.autoscaler.util import prepare_config, validate_config
from ray.autoscaler.commands import get_or_create_head_node
from ray.autoscaler.docker import DOCKER_MOUNT_PREFIX
from ray.autoscaler.load_metrics import LoadMetrics
from ray.autoscaler.autoscaler import StandardAutoscaler
from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, \
@ -42,6 +43,7 @@ class MockProcessRunner:
def __init__(self, fail_cmds=[]):
self.calls = []
self.fail_cmds = fail_cmds
self.call_response = {}
def check_call(self, cmd, *args, **kwargs):
for token in self.fail_cmds:
@ -51,7 +53,18 @@ class MockProcessRunner:
def check_output(self, cmd):
self.check_call(cmd)
return "command-output".encode()
return_string = "command-output"
key_to_delete = None
for pattern, pair in self.call_response.items():
if pattern in str(cmd):
return_string = pair[0]
if pair[1] - 1 == 0:
key_to_delete = pattern
break
if key_to_delete:
del self.call_response[key_to_delete]
return return_string.encode()
def assert_has_call(self, ip, pattern=None, exact=None):
assert pattern or exact, \
@ -95,6 +108,9 @@ class MockProcessRunner:
def clear_history(self):
self.calls = []
def respond_to_call(self, pattern, response, num_times=1):
self.call_response[pattern] = (response, num_times)
class MockProvider(NodeProvider):
def __init__(self, cache_stopped=False):
@ -1144,11 +1160,10 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
for i in [0, 1]:
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(f"172.0.0.{i}", "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
runner.clear_history()
@ -1164,11 +1179,11 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
for i in [0, 1]:
runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_not_has_call(f"172.0.0.{i}", "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
f"172.0.0.{i}", f"172.0.0.{i}",
f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
def testFileMountsNonContinuous(self):
file_mount_dir = tempfile.mkdtemp()
@ -1197,11 +1212,11 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
for i in [0, 1]:
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(f"172.0.0.{i}", "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
f"172.0.0.{i}", f"172.0.0.{i}",
f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
runner.clear_history()
@ -1215,11 +1230,10 @@ class AutoscalingTest(unittest.TestCase):
2, tag_filters={TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE})
for i in [0, 1]:
runner.assert_not_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_not_has_call(f"172.0.0.{i}", "setup_cmd")
runner.assert_not_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
f"172.0.0.{i}", f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
# Simulate a second `ray up` call
from ray.autoscaler import util
@ -1241,11 +1255,11 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
for i in [0, 1]:
runner.assert_has_call("172.0.0.{}".format(i), "setup_cmd")
runner.assert_has_call(f"172.0.0.{i}", "setup_cmd")
runner.assert_has_call(
"172.0.0.{}".format(i),
"{}/ ubuntu@172.0.0.{}:/home/test-folder/".format(
file_mount_dir, i))
f"172.0.0.{i}", f"172.0.0.{i}",
f"{file_mount_dir}/ ubuntu@172.0.0.{i}:"
f"{DOCKER_MOUNT_PREFIX}/home/test-folder/")
if __name__ == "__main__":

View file

@ -2,6 +2,7 @@ import pytest
from ray.tests.test_autoscaler import MockProvider, MockProcessRunner
from ray.autoscaler.command_runner import SSHCommandRunner, \
_with_environment_variables, DockerCommandRunner
from ray.autoscaler.docker import DOCKER_MOUNT_PREFIX
from getpass import getuser
import hashlib
@ -141,6 +142,88 @@ def test_docker_command_runner():
process_runner.assert_has_call("1.2.3.4", exact=expected)
def test_docker_rsync():
process_runner = MockProcessRunner()
provider = MockProvider()
provider.create_node({}, {}, 1)
cluster_name = "cluster"
docker_config = {"container_name": "container"}
args = {
"log_prefix": "prefix",
"node_id": 0,
"provider": provider,
"auth_config": auth_config,
"cluster_name": cluster_name,
"process_runner": process_runner,
"use_internal_ip": False,
"docker_config": docker_config,
}
cmd_runner = DockerCommandRunner(**args)
local_mount = "/home/ubuntu/base/mount/"
remote_mount = "/root/protected_mount/"
remote_host_mount = f"{DOCKER_MOUNT_PREFIX}{remote_mount}"
local_file = "/home/ubuntu/base-file"
remote_file = "/root/protected-file"
remote_host_file = f"{DOCKER_MOUNT_PREFIX}{remote_file}"
process_runner.respond_to_call("docker inspect -f", "true")
cmd_runner.run_rsync_up(
local_mount, remote_mount, options={"file_mount": True})
# Make sure we do not copy directly to raw destination
process_runner.assert_not_has_call(
"1.2.3.4", pattern=f"-avz {local_mount} ray@1.2.3.4:{remote_mount}")
process_runner.assert_not_has_call(
"1.2.3.4", pattern=f"mkdir -p {remote_mount}")
# No docker cp for file_mounts
process_runner.assert_not_has_call("1.2.3.4", pattern=f"docker cp")
process_runner.assert_has_call(
"1.2.3.4",
pattern=f"-avz {local_mount} ray@1.2.3.4:{remote_host_mount}")
process_runner.clear_history()
##############################
process_runner.respond_to_call("docker inspect -f", "true")
cmd_runner.run_rsync_up(
local_file, remote_file, options={"file_mount": False})
# Make sure we do not copy directly to raw destination
process_runner.assert_not_has_call(
"1.2.3.4", pattern=f"-avz {local_file} ray@1.2.3.4:{remote_file}")
process_runner.assert_not_has_call(
"1.2.3.4", pattern=f"mkdir -p {remote_file}")
process_runner.assert_has_call("1.2.3.4", pattern=f"docker cp")
process_runner.assert_has_call(
"1.2.3.4", pattern=f"-avz {local_file} ray@1.2.3.4:{remote_host_file}")
process_runner.clear_history()
##############################
cmd_runner.run_rsync_down(
remote_mount, local_mount, options={"file_mount": True})
process_runner.assert_not_has_call("1.2.3.4", pattern=f"docker cp")
process_runner.assert_not_has_call(
"1.2.3.4", pattern=f"-avz ray@1.2.3.4:{remote_mount} {local_mount}")
process_runner.assert_has_call(
"1.2.3.4",
pattern=f"-avz ray@1.2.3.4:{remote_host_mount} {local_mount}")
process_runner.clear_history()
##############################
cmd_runner.run_rsync_down(
remote_file, local_file, options={"file_mount": False})
process_runner.assert_has_call("1.2.3.4", pattern=f"docker cp")
process_runner.assert_not_has_call(
"1.2.3.4", pattern=f"-avz ray@1.2.3.4:{remote_file} {local_file}")
process_runner.assert_has_call(
"1.2.3.4", pattern=f"-avz ray@1.2.3.4:{remote_host_file} {local_file}")
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -257,7 +257,8 @@ inline ID JavaByteArrayToId(JNIEnv *env, const jbyteArray &bytes) {
env->GetByteArrayRegion(bytes, 0, ID::Size(),
reinterpret_cast<jbyte *>(&id_str.front()));
auto arr_size = env->GetArrayLength(bytes);
RAY_CHECK(arr_size == ID::Size()) << "ID length should be " << ID::Size() << " instead of " << arr_size;
RAY_CHECK(arr_size == ID::Size())
<< "ID length should be " << ID::Size() << " instead of " << arr_size;
return ID::FromBinary(id_str);
}