[runtime_env] Support specifying the runtime_resources directory for testing (#20257)

This commit is contained in:
Edward Oakes 2021-11-11 21:50:42 -08:00 committed by GitHub
parent 33f680095d
commit ee4e4f4036
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 14 additions and 8 deletions

View file

@ -97,6 +97,8 @@ class RayParams:
used by the raylet process. used by the raylet process.
temp_dir (str): If provided, it will specify the root temporary temp_dir (str): If provided, it will specify the root temporary
directory for the Ray process. directory for the Ray process.
runtime_env_dir_name (str): If provided, specifies the directory that
will be created in the session dir to hold runtime_env files.
include_log_monitor (bool): If True, then start a log monitor to include_log_monitor (bool): If True, then start a log monitor to
monitor the log files for all processes on this node and push their monitor the log files for all processes on this node and push their
contents to Redis. contents to Redis.
@ -159,6 +161,7 @@ class RayParams:
plasma_store_socket_name=None, plasma_store_socket_name=None,
raylet_socket_name=None, raylet_socket_name=None,
temp_dir=None, temp_dir=None,
runtime_env_dir_name=None,
include_log_monitor=None, include_log_monitor=None,
autoscaling_config=None, autoscaling_config=None,
start_initial_python_workers_for_first_job=False, start_initial_python_workers_for_first_job=False,
@ -207,6 +210,8 @@ class RayParams:
self.plasma_store_socket_name = plasma_store_socket_name self.plasma_store_socket_name = plasma_store_socket_name
self.raylet_socket_name = raylet_socket_name self.raylet_socket_name = raylet_socket_name
self.temp_dir = temp_dir self.temp_dir = temp_dir
self.runtime_env_dir_name = (
runtime_env_dir_name or ray_constants.DEFAULT_RUNTIME_ENV_DIR_NAME)
self.include_log_monitor = include_log_monitor self.include_log_monitor = include_log_monitor
self.autoscaling_config = autoscaling_config self.autoscaling_config = autoscaling_config
self.metrics_agent_port = metrics_agent_port self.metrics_agent_port = metrics_agent_port

View file

@ -300,8 +300,8 @@ class Node:
old_logs_dir = os.path.join(self._logs_dir, "old") old_logs_dir = os.path.join(self._logs_dir, "old")
try_to_create_directory(old_logs_dir) try_to_create_directory(old_logs_dir)
# Create a directory to be used for runtime environment. # Create a directory to be used for runtime environment.
self._runtime_env_dir = os.path.join(self._session_dir, self._runtime_env_dir = os.path.join(
"runtime_resources") self._session_dir, self._ray_params.runtime_env_dir_name)
try_to_create_directory(self._runtime_env_dir) try_to_create_directory(self._runtime_env_dir)
def get_resource_spec(self): def get_resource_spec(self):

View file

@ -271,6 +271,9 @@ HEALTHCHECK_EXPIRATION_S = os.environ.get("RAY_HEALTHCHECK_EXPIRATION_S", 10)
# src/ray/common/constants.h. # src/ray/common/constants.h.
SETUP_WORKER_FILENAME = "setup_worker.py" SETUP_WORKER_FILENAME = "setup_worker.py"
# Directory name where runtime_env resources will be created & cached.
DEFAULT_RUNTIME_ENV_DIR_NAME = "runtime_resources"
# Used to separate lines when formatting the call stack where an ObjectRef was # Used to separate lines when formatting the call stack where an ObjectRef was
# created. # created.
CALL_STACK_LINE_DELIMITER = " | " CALL_STACK_LINE_DELIMITER = " | "

View file

@ -308,13 +308,11 @@ def test_s3_uri(start_cluster, option, per_task_actor):
"source", [S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")]) "source", [S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")])
def test_multi_node(start_cluster, option: str, source: str): def test_multi_node(start_cluster, option: str, source: str):
"""Tests that the working_dir is propagated across multi-node clusters.""" """Tests that the working_dir is propagated across multi-node clusters."""
# TODO(architkulkarni): Currently all nodes in cluster_utils share the same NUM_NODES = 2
# session directory, which isn't the case for real world clusters. Once
# this is fixed, we should test GC with NUM_NODES > 1 here.
NUM_NODES = 1
cluster, address = start_cluster cluster, address = start_cluster
for _ in range(NUM_NODES - 1): # Head node already added. for i in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(num_cpus=1) cluster.add_node(
num_cpus=1, runtime_env_dir_name=f"node_{i}_runtime_resources")
if option == "working_dir": if option == "working_dir":
ray.init(address, runtime_env={"working_dir": source}) ray.init(address, runtime_env={"working_dir": source})