From 32e50b8c6724c47e13a737e628521c992dedbdf0 Mon Sep 17 00:00:00 2001 From: Ian Rodney Date: Wed, 31 Mar 2021 08:41:52 -0700 Subject: [PATCH] [Docker] Run docker stop in parallel (#14901) * first pass at parallel docker stop * real impl * use env var variable * lint fix --- python/ray/autoscaler/_private/commands.py | 27 ++++++++++++++++++--- python/ray/autoscaler/_private/constants.py | 4 +++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index 13e72a799..166966929 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -1,4 +1,5 @@ import copy +from concurrent.futures import ThreadPoolExecutor import datetime import hashlib import json @@ -25,7 +26,8 @@ from ray.experimental.internal_kv import _internal_kv_put import ray._private.services as services from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler._private.constants import \ - AUTOSCALER_RESOURCE_REQUEST_CHANNEL + AUTOSCALER_RESOURCE_REQUEST_CHANNEL, \ + MAX_PARALLEL_SHUTDOWN_WORKERS from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \ hash_launch_conf, prepare_config from ray.autoscaler._private.providers import _get_node_provider, \ @@ -406,7 +408,12 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, file_mounts_contents_hash="", is_head_node=False, docker_config=config.get("docker")) - _exec(updater, cmd=f"docker stop {container_name}", run_env="host") + + _exec( + updater, + f"docker stop {container_name}", + with_output=False, + run_env="host") except Exception: cli_logger.warning(f"Docker stop failed on {node}") @@ -416,9 +423,21 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool, container_name = config.get("docker", {}).get("container_name") if container_name: - for node in A: - run_docker_stop(node, container_name) + # This is to ensure that the parallel SSH calls below do not mess with + # the users terminal. + output_redir = cmd_output_util.is_output_redirected() + cmd_output_util.set_output_redirected(True) + allow_interactive = cmd_output_util.does_allow_interactive() + cmd_output_util.set_allow_interactive(False) + + with ThreadPoolExecutor( + max_workers=MAX_PARALLEL_SHUTDOWN_WORKERS) as executor: + for node in A: + executor.submit( + run_docker_stop, node=node, container_name=container_name) + cmd_output_util.set_output_redirected(output_redir) + cmd_output_util.set_allow_interactive(allow_interactive) with LogTimer("teardown_cluster: done."): while A: provider.terminate_nodes(A) diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py index 0a7adebbf..85014279b 100644 --- a/python/ray/autoscaler/_private/constants.py +++ b/python/ray/autoscaler/_private/constants.py @@ -89,3 +89,7 @@ RAY_PROCESSES = [ ["new_dashboard/agent.py", False], ["ray_process_reaper.py", False], ] + +# Max Concurrent SSH Calls to stop Docker +MAX_PARALLEL_SHUTDOWN_WORKERS = env_integer("MAX_PARALLEL_SHUTDOWN_WORKERS", + 50)