mirror of
https://github.com/vale981/ray
synced 2025-03-09 12:56:46 -04:00
[Docker] Run docker stop in parallel (#14901)
* first pass at parallel docker stop * real impl * use env var variable * lint fix
This commit is contained in:
parent
107effb370
commit
32e50b8c67
2 changed files with 27 additions and 4 deletions
|
@ -1,4 +1,5 @@
|
||||||
import copy
|
import copy
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
import datetime
|
import datetime
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
|
@ -25,7 +26,8 @@ from ray.experimental.internal_kv import _internal_kv_put
|
||||||
import ray._private.services as services
|
import ray._private.services as services
|
||||||
from ray.autoscaler.node_provider import NodeProvider
|
from ray.autoscaler.node_provider import NodeProvider
|
||||||
from ray.autoscaler._private.constants import \
|
from ray.autoscaler._private.constants import \
|
||||||
AUTOSCALER_RESOURCE_REQUEST_CHANNEL
|
AUTOSCALER_RESOURCE_REQUEST_CHANNEL, \
|
||||||
|
MAX_PARALLEL_SHUTDOWN_WORKERS
|
||||||
from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \
|
from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \
|
||||||
hash_launch_conf, prepare_config
|
hash_launch_conf, prepare_config
|
||||||
from ray.autoscaler._private.providers import _get_node_provider, \
|
from ray.autoscaler._private.providers import _get_node_provider, \
|
||||||
|
@ -406,7 +408,12 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
|
||||||
file_mounts_contents_hash="",
|
file_mounts_contents_hash="",
|
||||||
is_head_node=False,
|
is_head_node=False,
|
||||||
docker_config=config.get("docker"))
|
docker_config=config.get("docker"))
|
||||||
_exec(updater, cmd=f"docker stop {container_name}", run_env="host")
|
|
||||||
|
_exec(
|
||||||
|
updater,
|
||||||
|
f"docker stop {container_name}",
|
||||||
|
with_output=False,
|
||||||
|
run_env="host")
|
||||||
except Exception:
|
except Exception:
|
||||||
cli_logger.warning(f"Docker stop failed on {node}")
|
cli_logger.warning(f"Docker stop failed on {node}")
|
||||||
|
|
||||||
|
@ -416,9 +423,21 @@ def teardown_cluster(config_file: str, yes: bool, workers_only: bool,
|
||||||
|
|
||||||
container_name = config.get("docker", {}).get("container_name")
|
container_name = config.get("docker", {}).get("container_name")
|
||||||
if container_name:
|
if container_name:
|
||||||
for node in A:
|
|
||||||
run_docker_stop(node, container_name)
|
|
||||||
|
|
||||||
|
# This is to ensure that the parallel SSH calls below do not mess with
|
||||||
|
# the users terminal.
|
||||||
|
output_redir = cmd_output_util.is_output_redirected()
|
||||||
|
cmd_output_util.set_output_redirected(True)
|
||||||
|
allow_interactive = cmd_output_util.does_allow_interactive()
|
||||||
|
cmd_output_util.set_allow_interactive(False)
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(
|
||||||
|
max_workers=MAX_PARALLEL_SHUTDOWN_WORKERS) as executor:
|
||||||
|
for node in A:
|
||||||
|
executor.submit(
|
||||||
|
run_docker_stop, node=node, container_name=container_name)
|
||||||
|
cmd_output_util.set_output_redirected(output_redir)
|
||||||
|
cmd_output_util.set_allow_interactive(allow_interactive)
|
||||||
with LogTimer("teardown_cluster: done."):
|
with LogTimer("teardown_cluster: done."):
|
||||||
while A:
|
while A:
|
||||||
provider.terminate_nodes(A)
|
provider.terminate_nodes(A)
|
||||||
|
|
|
@ -89,3 +89,7 @@ RAY_PROCESSES = [
|
||||||
["new_dashboard/agent.py", False],
|
["new_dashboard/agent.py", False],
|
||||||
["ray_process_reaper.py", False],
|
["ray_process_reaper.py", False],
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# Max Concurrent SSH Calls to stop Docker
|
||||||
|
MAX_PARALLEL_SHUTDOWN_WORKERS = env_integer("MAX_PARALLEL_SHUTDOWN_WORKERS",
|
||||||
|
50)
|
||||||
|
|
Loading…
Add table
Reference in a new issue