[runtime_env] Centralize runtime_env logic into ray._private.runtime_env submodule (#18310)

This commit is contained in:
Edward Oakes 2021-09-03 10:19:00 -05:00 committed by GitHub
parent 6aa8a4eddc
commit 1f6705d35d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 654 additions and 673 deletions

View file

@ -51,7 +51,6 @@ class DashboardAgent(object):
temp_dir=None,
session_dir=None,
runtime_env_dir=None,
runtime_env_setup_hook=None,
log_dir=None,
metrics_export_port=None,
node_manager_port=None,
@ -67,7 +66,6 @@ class DashboardAgent(object):
self.temp_dir = temp_dir
self.session_dir = session_dir
self.runtime_env_dir = runtime_env_dir
self.runtime_env_setup_hook = runtime_env_setup_hook
self.log_dir = log_dir
self.dashboard_agent_port = dashboard_agent_port
self.metrics_export_port = metrics_export_port
@ -323,13 +321,6 @@ if __name__ == "__main__":
type=str,
default=None,
help="Specify the path of the resource directory used by runtime_env.")
parser.add_argument(
"--runtime-env-setup-hook",
required=True,
type=str,
default=None,
help="The module path to a Python function that"
"will be imported and run to set up the runtime env.")
args = parser.parse_args()
try:
@ -360,7 +351,6 @@ if __name__ == "__main__":
temp_dir=args.temp_dir,
session_dir=args.session_dir,
runtime_env_dir=args.runtime_env_dir,
runtime_env_setup_hook=args.runtime_env_setup_hook,
log_dir=args.log_dir,
metrics_export_port=args.metrics_export_port,
node_manager_port=args.node_manager_port,

View file

@ -2,7 +2,8 @@ import asyncio
from dataclasses import dataclass
import json
import logging
from ray._private.ray_logging import setup_component_logger
import os
import time
from typing import Dict
from ray.core.generated import runtime_env_agent_pb2
@ -13,13 +14,17 @@ from ray.experimental.internal_kv import (_initialize_internal_kv,
import ray.new_dashboard.utils as dashboard_utils
import ray.new_dashboard.modules.runtime_env.runtime_env_consts \
as runtime_env_consts
import ray._private.runtime_env as runtime_env_pkg
from ray._private.utils import import_attr
from ray.workers.pluggable_runtime_env import (RuntimeEnvContext,
using_thread_local_logger)
from ray._private.ray_logging import setup_component_logger
from ray._private.runtime_env.conda import setup_conda_or_pip
from ray._private.runtime_env.working_dir import setup_working_dir
from ray._private.runtime_env import RuntimeEnvContext
logger = logging.getLogger(__name__)
# TODO(edoakes): this is used for unit tests. We should replace it with a
# better pluggability mechanism once available.
SLEEP_FOR_TESTING_S = os.environ.get("RAY_RUNTIME_ENV_SLEEP_FOR_TESTING_S")
@dataclass
class CreatedEnvResult:
@ -40,12 +45,9 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
def __init__(self, dashboard_agent):
super().__init__(dashboard_agent)
self._session_dir = dashboard_agent.session_dir
self._runtime_env_dir = dashboard_agent.runtime_env_dir
self._setup = import_attr(dashboard_agent.runtime_env_setup_hook)
self._logging_params = dashboard_agent.logging_params
self._per_job_logger_cache = dict()
runtime_env_pkg.PKG_DIR = dashboard_agent.runtime_env_dir
# Cache the results of creating envs to avoid repeatedly calling into
# conda and other slow calls.
self._env_cache: Dict[str, CreatedEnvResult] = dict()
@ -68,22 +70,17 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
return self._per_job_logger_cache[job_id]
async def CreateRuntimeEnv(self, request, context):
async def _setup_runtime_env(serialized_runtime_env, session_dir):
async def _setup_runtime_env(serialized_runtime_env):
# This function will be ran inside a thread
def run_setup_with_logger():
runtime_env: dict = json.loads(serialized_runtime_env or "{}")
per_job_logger = self.get_or_create_logger(request.job_id)
# Here we set the logger context for the setup hook execution.
# The logger needs to be thread local because there can be
# setup hooks ran for arbitrary job in arbitrary threads.
with using_thread_local_logger(per_job_logger):
env_context = self._setup(runtime_env, session_dir)
if "uris" in runtime_env:
working_dir = runtime_env_pkg.ensure_runtime_env_setup(
runtime_env["uris"])
env_context.working_dir = working_dir
return env_context
# Use a separate logger for each job.
per_job_logger = self.get_or_create_logger(request.job_id)
context = RuntimeEnvContext(self._runtime_env_dir)
setup_conda_or_pip(runtime_env, context, logger=per_job_logger)
setup_working_dir(runtime_env, context, logger=per_job_logger)
return context
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, run_setup_with_logger)
@ -113,13 +110,17 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule,
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message=error_message)
if SLEEP_FOR_TESTING_S:
logger.info(f"Sleeping for {SLEEP_FOR_TESTING_S}s.")
time.sleep(int(SLEEP_FOR_TESTING_S))
logger.info(f"Creating runtime env: {serialized_env}")
runtime_env_context: RuntimeEnvContext = None
error_message = None
for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES):
try:
runtime_env_context = await _setup_runtime_env(
serialized_env, self._session_dir)
serialized_env)
break
except Exception as ex:
logger.exception("Runtime env creation failed.")

View file

@ -76,8 +76,6 @@ class RayParams:
worker_setup_hook to set up the environment for the worker process.
worker_setup_hook (str): The module path to a Python function that will
be imported and run to set up the environment for the worker.
runtime_env_setup_hook (str): The module path to a Python function that
will be imported and run to set up the runtime env in agent.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
include_dashboard: Boolean flag indicating whether to start the web
@ -153,8 +151,6 @@ class RayParams:
worker_path=None,
setup_worker_path=None,
worker_setup_hook=ray_constants.DEFAULT_WORKER_SETUP_HOOK,
runtime_env_setup_hook=ray_constants.
DEFAULT_RUNTIME_ENV_SETUP_HOOK,
huge_pages=False,
include_dashboard=None,
dashboard_host=ray_constants.DEFAULT_DASHBOARD_IP,
@ -206,7 +202,6 @@ class RayParams:
self.worker_path = worker_path
self.setup_worker_path = setup_worker_path
self.worker_setup_hook = worker_setup_hook
self.runtime_env_setup_hook = runtime_env_setup_hook
self.huge_pages = huge_pages
self.include_dashboard = include_dashboard
self.dashboard_host = dashboard_host

View file

@ -0,0 +1,3 @@
from ray._private.runtime_env.context import RuntimeEnvContext # noqa: F401
from ray._private.runtime_env.validation import ( # noqa: F401
override_task_or_actor_runtime_env, RuntimeEnvDict) # noqa: F401

View file

@ -0,0 +1,243 @@
import os
import sys
import logging
import yaml
import hashlib
import subprocess
import runpy
import shutil
from filelock import FileLock
from typing import Optional, List, Dict, Any
from pathlib import Path
import ray
from ray._private.runtime_env import RuntimeEnvContext
from ray._private.runtime_env.conda_utils import get_or_create_conda_env
from ray._private.utils import try_to_create_directory
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
get_release_wheel_url)
def _resolve_current_ray_path():
# When ray is built from source with pip install -e,
# ray.__file__ returns .../python/ray/__init__.py.
# When ray is installed from a prebuilt binary, it returns
# .../site-packages/ray/__init__.py
return os.path.split(os.path.split(ray.__file__)[0])[0]
def _resolve_install_from_source_ray_dependencies():
"""Find the ray dependencies when Ray is install from source"""
ray_source_python_path = _resolve_current_ray_path()
setup_py_path = os.path.join(ray_source_python_path, "setup.py")
ray_install_requires = runpy.run_path(setup_py_path)[
"setup_spec"].install_requires
return ray_install_requires
def _inject_ray_to_conda_site(conda_path,
logger: Optional[logging.Logger] = None):
"""Write the current Ray site package directory to a new site"""
if logger is None:
logger = logging.getLogger(__name__)
python_binary = os.path.join(conda_path, "bin/python")
site_packages_path = subprocess.check_output(
[python_binary, "-c",
"import site; print(site.getsitepackages()[0])"]).decode().strip()
ray_path = _resolve_current_ray_path()
logger.warning(f"Injecting {ray_path} to environment {conda_path} "
"because _inject_current_ray flag is on.")
maybe_ray_dir = os.path.join(site_packages_path, "ray")
if os.path.isdir(maybe_ray_dir):
logger.warning(f"Replacing existing ray installation with {ray_path}")
shutil.rmtree(maybe_ray_dir)
# See usage of *.pth file at
# https://docs.python.org/3/library/site.html
with open(os.path.join(site_packages_path, "ray.pth"), "w") as f:
f.write(ray_path)
def _current_py_version():
return ".".join(map(str, sys.version_info[:3])) # like 3.6.10
def setup_conda_or_pip(runtime_env: dict,
context: RuntimeEnvContext,
logger: Optional[logging.Logger] = None):
if logger is None:
logger = logging.getLogger(__name__)
if not runtime_env.get("conda") and not runtime_env.get("pip"):
return
logger.debug(f"Setting up conda or pip for runtime_env: {runtime_env}")
conda_dict = get_conda_dict(runtime_env, context.session_dir)
if isinstance(runtime_env.get("conda"), str):
conda_env_name = runtime_env["conda"]
else:
assert conda_dict is not None
ray_pip = current_ray_pip_specifier(logger)
if ray_pip:
extra_pip_dependencies = [ray_pip, "ray[default]"]
elif runtime_env.get("_inject_current_ray"):
extra_pip_dependencies = (
_resolve_install_from_source_ray_dependencies())
else:
extra_pip_dependencies = []
conda_dict = inject_dependencies(conda_dict, _current_py_version(),
extra_pip_dependencies)
logger.info(f"Setting up conda environment with {runtime_env}")
# It is not safe for multiple processes to install conda envs
# concurrently, even if the envs are different, so use a global
# lock for all conda installs.
# See https://github.com/ray-project/ray/issues/17086
file_lock_name = "ray-conda-install.lock"
with FileLock(os.path.join(context.session_dir, file_lock_name)):
conda_dir = os.path.join(context.session_dir, "runtime_resources",
"conda")
try_to_create_directory(conda_dir)
conda_yaml_path = os.path.join(conda_dir, "environment.yml")
with open(conda_yaml_path, "w") as file:
# Sort keys because we hash based on the file contents,
# and we don't want the hash to depend on the order
# of the dependencies.
yaml.dump(conda_dict, file, sort_keys=True)
conda_env_name = get_or_create_conda_env(
conda_yaml_path, conda_dir, logger=logger)
if runtime_env.get("_inject_current_ray"):
conda_path = os.path.join(conda_dir, conda_env_name)
_inject_ray_to_conda_site(conda_path, logger)
context.conda_env_name = conda_env_name
logger.info(f"Finished setting up runtime environment at {conda_env_name}")
def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]:
""" Construct a conda dependencies dict from a runtime env.
This function does not inject Ray or Python into the conda dict.
If the runtime env does not specify pip or conda, or if it specifies
the name of a preinstalled conda environment, this function returns
None. If pip is specified, a conda dict is created containing the
pip dependencies. If conda is already given as a dict, this function
is the identity function.
"""
if runtime_env.get("conda"):
if isinstance(runtime_env["conda"], dict):
return runtime_env["conda"]
else:
return None
if runtime_env.get("pip"):
requirements_txt = runtime_env["pip"]
pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest()
pip_hash_str = f"pip-generated-{pip_hash}"
conda_dir = os.path.join(runtime_env_dir, "conda")
requirements_txt_path = os.path.join(
conda_dir, f"requirements-{pip_hash_str}.txt")
conda_dict = {
"name": pip_hash_str,
"dependencies": ["pip", {
"pip": [f"-r {requirements_txt_path}"]
}]
}
file_lock_name = f"ray-{pip_hash_str}.lock"
with FileLock(os.path.join(runtime_env_dir, file_lock_name)):
try_to_create_directory(conda_dir)
with open(requirements_txt_path, "w") as file:
file.write(requirements_txt)
return conda_dict
return None
def current_ray_pip_specifier(
logger: Optional[logging.Logger] = None) -> Optional[str]:
"""The pip requirement specifier for the running version of Ray.
Returns:
A string which can be passed to `pip install` to install the
currently running Ray version, or None if running on a version
built from source locally (likely if you are developing Ray).
Examples:
Returns "https://s3-us-west-2.amazonaws.com/ray-wheels/[..].whl"
if running a stable release, a nightly or a specific commit
"""
if logger is None:
logger = logging.getLogger(__name__)
if os.environ.get("RAY_CI_POST_WHEEL_TESTS"):
# Running in Buildkite CI after the wheel has been built.
# Wheels are at in the ray/.whl directory, but use relative path to
# allow for testing locally if needed.
return os.path.join(
Path(ray.__file__).resolve().parents[2], ".whl",
get_wheel_filename())
elif ray.__commit__ == "{{RAY_COMMIT_SHA}}":
# Running on a version built from source locally.
if os.environ.get("RAY_RUNTIME_ENV_LOCAL_DEV_MODE") != "1":
logger.warning(
"Current Ray version could not be detected, most likely "
"because you have manually built Ray from source. To use "
"runtime_env in this case, set the environment variable "
"RAY_RUNTIME_ENV_LOCAL_DEV_MODE=1.")
return None
elif "dev" in ray.__version__:
# Running on a nightly wheel.
return get_master_wheel_url()
else:
return get_release_wheel_url()
def inject_dependencies(
conda_dict: Dict[Any, Any],
py_version: str,
pip_dependencies: Optional[List[str]] = None) -> Dict[Any, Any]:
"""Add Ray, Python and (optionally) extra pip dependencies to a conda dict.
Args:
conda_dict (dict): A dict representing the JSON-serialized conda
environment YAML file. This dict will be modified and returned.
py_version (str): A string representing a Python version to inject
into the conda dependencies, e.g. "3.7.7"
pip_dependencies (List[str]): A list of pip dependencies that
will be prepended to the list of pip dependencies in
the conda dict. If the conda dict does not already have a "pip"
field, one will be created.
Returns:
The modified dict. (Note: the input argument conda_dict is modified
and returned.)
"""
if pip_dependencies is None:
pip_dependencies = []
if conda_dict.get("dependencies") is None:
conda_dict["dependencies"] = []
# Inject Python dependency.
deps = conda_dict["dependencies"]
# Add current python dependency. If the user has already included a
# python version dependency, conda will raise a readable error if the two
# are incompatible, e.g:
# ResolvePackageNotFound: - python[version='3.5.*,>=3.6']
deps.append(f"python={py_version}")
if "pip" not in deps:
deps.append("pip")
# Insert pip dependencies.
found_pip_dict = False
for dep in deps:
if isinstance(dep, dict) and dep.get("pip") and isinstance(
dep["pip"], list):
dep["pip"] = pip_dependencies + dep["pip"]
found_pip_dict = True
break
if not found_pip_dict:
deps.append({"pip": pip_dependencies})
return conda_dict

View file

@ -4,12 +4,8 @@ import subprocess
import hashlib
import json
from typing import Optional, List, Union, Tuple
from ray.workers.pluggable_runtime_env import get_hook_logger
"""Utilities for conda. Adapted from https://github.com/mlflow/mlflow."""
logger = logging.getLogger(__name__)
# Name of environment variable indicating a path to a conda installation. Ray
# will default to running "conda" if unset.
RAY_CONDA_HOME = "RAY_CONDA_HOME"
@ -67,7 +63,8 @@ def _get_conda_env_name(conda_env_path: str) -> str:
def get_or_create_conda_env(conda_env_path: str,
base_dir: Optional[str] = None) -> str:
base_dir: Optional[str] = None,
logger: Optional[logging.Logger] = None) -> str:
"""
Given a conda YAML, creates a conda environment containing the required
dependencies if such a conda environment doesn't already exist. Returns the
@ -83,7 +80,8 @@ def get_or_create_conda_env(conda_env_path: str,
In either case, the return value should be valid to pass in to
`conda activate`.
"""
logger = get_hook_logger()
if logger is None:
logger = logging.getLogger(__name__)
conda_path = get_conda_bin_executable("conda")
try:
exec_cmd([conda_path, "--help"], throw_on_error=False)

View file

@ -0,0 +1,20 @@
import json
class RuntimeEnvContext:
"""A context used to describe the created runtime env."""
def __init__(self,
session_dir: str,
conda_env_name: str = None,
working_dir: str = None):
self.conda_env_name: str = conda_env_name
self.session_dir: str = session_dir
self.working_dir: str = working_dir
def serialize(self) -> str:
return json.dumps(self.__dict__)
@staticmethod
def deserialize(json_string):
return RuntimeEnvContext(**json.loads(json_string))

View file

@ -0,0 +1,231 @@
import json
import logging
import os
from pathlib import Path
import sys
from typing import Any, Dict, Optional
import yaml
import ray
# We need to setup this variable before
# using this module
PKG_DIR = None
logger = logging.getLogger(__name__)
FILE_SIZE_WARNING = 10 * 1024 * 1024 # 10MiB
# NOTE(edoakes): we should be able to support up to 512 MiB based on the GCS'
# limit, but for some reason that causes failures when downloading.
GCS_STORAGE_MAX_SIZE = 100 * 1024 * 1024 # 100MiB
class RuntimeEnvDict:
"""Parses and validates the runtime env dictionary from the user.
Attributes:
working_dir (Path): Specifies the working directory of the worker.
This can either be a local directory or zip file.
Examples:
"." # cwd
"local_project.zip" # archive is unpacked into directory
py_modules (List[Path]): Similar to working_dir, but specifies python
modules to add to the `sys.path`.
Examples:
["/path/to/other_module", "/other_path/local_project.zip"]
pip (List[str] | str): Either a list of pip packages, or a string
containing the path to a pip requirements.txt file.
conda (dict | str): Either the conda YAML config, the name of a
local conda env (e.g., "pytorch_p36"), or the path to a conda
environment.yaml file.
The Ray dependency will be automatically injected into the conda
env to ensure compatibility with the cluster Ray. The conda name
may be mangled automatically to avoid conflicts between runtime
envs.
This field cannot be specified at the same time as the 'pip' field.
To use pip with conda, please specify your pip dependencies within
the conda YAML config:
https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-e
nvironments.html#create-env-file-manually
Examples:
{"channels": ["defaults"], "dependencies": ["codecov"]}
"pytorch_p36" # Found on DLAMIs
container (dict): Require a given (Docker) container image,
The Ray worker process will run in a container with this image.
The `worker_path` is the default_worker.py path.
The `run_options` list spec is here:
https://docs.docker.com/engine/reference/run/
Examples:
{"image": "anyscale/ray-ml:nightly-py38-cpu",
"worker_path": "/root/python/ray/workers/default_worker.py",
"run_options": ["--cap-drop SYS_ADMIN","--log-level=debug"]}
env_vars (dict): Environment variables to set.
Examples:
{"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"}
"""
def __init__(self, runtime_env_json: dict):
# Simple dictionary with all options validated. This will always
# contain all supported keys; values will be set to None if
# unspecified. However, if all values are None this is set to {}.
self._dict = dict()
if "working_dir" in runtime_env_json:
self._dict["working_dir"] = runtime_env_json["working_dir"]
if not isinstance(self._dict["working_dir"], str):
raise TypeError("`working_dir` must be a string. Type "
f"{type(self._dict['working_dir'])} received.")
working_dir = Path(self._dict["working_dir"]).absolute()
else:
self._dict["working_dir"] = None
working_dir = None
self._dict["conda"] = None
if "conda" in runtime_env_json:
if sys.platform == "win32":
raise NotImplementedError("The 'conda' field in runtime_env "
"is not currently supported on "
"Windows.")
conda = runtime_env_json["conda"]
if isinstance(conda, str):
yaml_file = Path(conda)
if yaml_file.suffix in (".yaml", ".yml"):
if working_dir and not yaml_file.is_absolute():
yaml_file = working_dir / yaml_file
if not yaml_file.is_file():
raise ValueError(
f"Can't find conda YAML file {yaml_file}")
try:
self._dict["conda"] = yaml.safe_load(
yaml_file.read_text())
except Exception as e:
raise ValueError(
f"Invalid conda file {yaml_file} with error {e}")
else:
logger.info(
f"Using preinstalled conda environment: {conda}")
self._dict["conda"] = conda
elif isinstance(conda, dict):
self._dict["conda"] = conda
elif conda is not None:
raise TypeError("runtime_env['conda'] must be of type str or "
"dict")
self._dict["pip"] = None
if "pip" in runtime_env_json:
if sys.platform == "win32":
raise NotImplementedError("The 'pip' field in runtime_env "
"is not currently supported on "
"Windows.")
if ("conda" in runtime_env_json
and runtime_env_json["conda"] is not None):
raise ValueError(
"The 'pip' field and 'conda' field of "
"runtime_env cannot both be specified.\n"
f"specified pip field: {runtime_env_json['pip']}\n"
f"specified conda field: {runtime_env_json['conda']}\n"
"To use pip with conda, please only set the 'conda' "
"field, and specify your pip dependencies "
"within the conda YAML config dict: see "
"https://conda.io/projects/conda/en/latest/"
"user-guide/tasks/manage-environments.html"
"#create-env-file-manually")
pip = runtime_env_json["pip"]
if isinstance(pip, str):
# We have been given a path to a requirements.txt file.
pip_file = Path(pip)
if working_dir and not pip_file.is_absolute():
pip_file = working_dir / pip_file
if not pip_file.is_file():
raise ValueError(f"{pip_file} is not a valid file")
self._dict["pip"] = pip_file.read_text()
elif isinstance(pip, list) and all(
isinstance(dep, str) for dep in pip):
# Construct valid pip requirements.txt from list of packages.
self._dict["pip"] = "\n".join(pip) + "\n"
else:
raise TypeError("runtime_env['pip'] must be of type str or "
"List[str]")
if "uris" in runtime_env_json:
self._dict["uris"] = runtime_env_json["uris"]
if "container" in runtime_env_json:
self._dict["container"] = runtime_env_json["container"]
self._dict["env_vars"] = None
if "env_vars" in runtime_env_json:
env_vars = runtime_env_json["env_vars"]
self._dict["env_vars"] = env_vars
if not (isinstance(env_vars, dict) and all(
isinstance(k, str) and isinstance(v, str)
for (k, v) in env_vars.items())):
raise TypeError("runtime_env['env_vars'] must be of type"
"Dict[str, str]")
if "_ray_release" in runtime_env_json:
self._dict["_ray_release"] = runtime_env_json["_ray_release"]
if "_ray_commit" in runtime_env_json:
self._dict["_ray_commit"] = runtime_env_json["_ray_commit"]
else:
if self._dict.get("pip") or self._dict.get("conda"):
self._dict["_ray_commit"] = ray.__commit__
# Used for testing wheels that have not yet been merged into master.
# If this is set to True, then we do not inject Ray into the conda
# or pip dependencies.
if os.environ.get("RAY_RUNTIME_ENV_LOCAL_DEV_MODE"):
runtime_env_json["_inject_current_ray"] = True
if "_inject_current_ray" in runtime_env_json:
self._dict["_inject_current_ray"] = runtime_env_json[
"_inject_current_ray"]
# TODO(ekl) we should have better schema validation here.
# TODO(ekl) support py_modules
# TODO(architkulkarni) support docker
# TODO(architkulkarni) This is to make it easy for the worker caching
# code in C++ to check if the env is empty without deserializing and
# parsing it. We should use a less confusing approach here.
if all(val is None for val in self._dict.values()):
self._dict = {}
def get_parsed_dict(self) -> dict:
return self._dict
def serialize(self) -> str:
# Use sort_keys=True because we will use the output as a key to cache
# workers by, so we need the serialization to be independent of the
# dict order.
return json.dumps(self._dict, sort_keys=True)
def set_uris(self, uris):
self._dict["uris"] = uris
def override_task_or_actor_runtime_env(
runtime_env: Optional[Dict[str, Any]],
parent_runtime_env: Dict[str, Any]) -> Dict[str, Any]:
if runtime_env:
if runtime_env.get("working_dir"):
raise NotImplementedError(
"Overriding working_dir for actors is not supported. "
"Please use ray.init(runtime_env={'working_dir': ...}) "
"to configure per-job environment instead.")
# NOTE(edoakes): this is sort of hacky, but we manually add the right
# working_dir here so the relative path to a requirements.txt file
# works. The right solution would be to merge the runtime_env with the
# parent runtime env before validation.
if parent_runtime_env.get("working_dir"):
runtime_env["working_dir"] = parent_runtime_env["working_dir"]
runtime_env_dict = RuntimeEnvDict(runtime_env).get_parsed_dict()
else:
runtime_env_dict = {}
# If per-actor URIs aren't specified, override them with those in the
# job config.
if "uris" not in runtime_env_dict and "uris" in parent_runtime_env:
runtime_env_dict["uris"] = parent_runtime_env.get("uris")
return runtime_env_dict

View file

@ -1,30 +1,25 @@
from enum import Enum
from filelock import FileLock
import hashlib
import logging
import json
import yaml
from filelock import FileLock
import os
from pathlib import Path
import sys
from typing import Callable, List, Optional, Tuple
from urllib.parse import urlparse
from zipfile import ZipFile
from ray._private.thirdparty.pathspec import PathSpec
from ray.job_config import JobConfig
from enum import Enum
import ray
from ray.experimental.internal_kv import (_internal_kv_put, _internal_kv_get,
_internal_kv_exists,
_internal_kv_initialized)
from ray.job_config import JobConfig
from ray._private.thirdparty.pathspec import PathSpec
from ray._private.runtime_env import RuntimeEnvContext
from typing import Any, Callable, Dict, List, Optional, Tuple
from urllib.parse import urlparse
import os
import sys
# We need to setup this variable before
# using this module
# We need to setup this variable before using this module.
PKG_DIR = None
logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)
FILE_SIZE_WARNING = 10 * 1024 * 1024 # 10MiB
# NOTE(edoakes): we should be able to support up to 512 MiB based on the GCS'
@ -32,190 +27,6 @@ FILE_SIZE_WARNING = 10 * 1024 * 1024 # 10MiB
GCS_STORAGE_MAX_SIZE = 100 * 1024 * 1024 # 100MiB
class RuntimeEnvDict:
"""Parses and validates the runtime env dictionary from the user.
Attributes:
working_dir (Path): Specifies the working directory of the worker.
This can either be a local directory or zip file.
Examples:
"." # cwd
"local_project.zip" # archive is unpacked into directory
py_modules (List[Path]): Similar to working_dir, but specifies python
modules to add to the `sys.path`.
Examples:
["/path/to/other_module", "/other_path/local_project.zip"]
pip (List[str] | str): Either a list of pip packages, or a string
containing the path to a pip requirements.txt file.
conda (dict | str): Either the conda YAML config, the name of a
local conda env (e.g., "pytorch_p36"), or the path to a conda
environment.yaml file.
The Ray dependency will be automatically injected into the conda
env to ensure compatibility with the cluster Ray. The conda name
may be mangled automatically to avoid conflicts between runtime
envs.
This field cannot be specified at the same time as the 'pip' field.
To use pip with conda, please specify your pip dependencies within
the conda YAML config:
https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-e
nvironments.html#create-env-file-manually
Examples:
{"channels": ["defaults"], "dependencies": ["codecov"]}
"pytorch_p36" # Found on DLAMIs
container (dict): Require a given (Docker) container image,
The Ray worker process will run in a container with this image.
The `worker_path` is the default_worker.py path.
The `run_options` list spec is here:
https://docs.docker.com/engine/reference/run/
Examples:
{"image": "anyscale/ray-ml:nightly-py38-cpu",
"worker_path": "/root/python/ray/workers/default_worker.py",
"run_options": ["--cap-drop SYS_ADMIN","--log-level=debug"]}
env_vars (dict): Environment variables to set.
Examples:
{"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"}
"""
def __init__(self, runtime_env_json: dict):
# Simple dictionary with all options validated. This will always
# contain all supported keys; values will be set to None if
# unspecified. However, if all values are None this is set to {}.
self._dict = dict()
if "working_dir" in runtime_env_json:
self._dict["working_dir"] = runtime_env_json["working_dir"]
if not isinstance(self._dict["working_dir"], str):
raise TypeError("`working_dir` must be a string. Type "
f"{type(self._dict['working_dir'])} received.")
working_dir = Path(self._dict["working_dir"]).absolute()
else:
self._dict["working_dir"] = None
working_dir = None
self._dict["conda"] = None
if "conda" in runtime_env_json:
if sys.platform == "win32":
raise NotImplementedError("The 'conda' field in runtime_env "
"is not currently supported on "
"Windows.")
conda = runtime_env_json["conda"]
if isinstance(conda, str):
yaml_file = Path(conda)
if yaml_file.suffix in (".yaml", ".yml"):
if working_dir and not yaml_file.is_absolute():
yaml_file = working_dir / yaml_file
if not yaml_file.is_file():
raise ValueError(
f"Can't find conda YAML file {yaml_file}")
try:
self._dict["conda"] = yaml.safe_load(
yaml_file.read_text())
except Exception as e:
raise ValueError(
f"Invalid conda file {yaml_file} with error {e}")
else:
logger.info(
f"Using preinstalled conda environment: {conda}")
self._dict["conda"] = conda
elif isinstance(conda, dict):
self._dict["conda"] = conda
elif conda is not None:
raise TypeError("runtime_env['conda'] must be of type str or "
"dict")
self._dict["pip"] = None
if "pip" in runtime_env_json:
if sys.platform == "win32":
raise NotImplementedError("The 'pip' field in runtime_env "
"is not currently supported on "
"Windows.")
if ("conda" in runtime_env_json
and runtime_env_json["conda"] is not None):
raise ValueError(
"The 'pip' field and 'conda' field of "
"runtime_env cannot both be specified.\n"
f"specified pip field: {runtime_env_json['pip']}\n"
f"specified conda field: {runtime_env_json['conda']}\n"
"To use pip with conda, please only set the 'conda' "
"field, and specify your pip dependencies "
"within the conda YAML config dict: see "
"https://conda.io/projects/conda/en/latest/"
"user-guide/tasks/manage-environments.html"
"#create-env-file-manually")
pip = runtime_env_json["pip"]
if isinstance(pip, str):
# We have been given a path to a requirements.txt file.
pip_file = Path(pip)
if working_dir and not pip_file.is_absolute():
pip_file = working_dir / pip_file
if not pip_file.is_file():
raise ValueError(f"{pip_file} is not a valid file")
self._dict["pip"] = pip_file.read_text()
elif isinstance(pip, list) and all(
isinstance(dep, str) for dep in pip):
# Construct valid pip requirements.txt from list of packages.
self._dict["pip"] = "\n".join(pip) + "\n"
else:
raise TypeError("runtime_env['pip'] must be of type str or "
"List[str]")
if "uris" in runtime_env_json:
self._dict["uris"] = runtime_env_json["uris"]
if "container" in runtime_env_json:
self._dict["container"] = runtime_env_json["container"]
self._dict["env_vars"] = None
if "env_vars" in runtime_env_json:
env_vars = runtime_env_json["env_vars"]
self._dict["env_vars"] = env_vars
if not (isinstance(env_vars, dict) and all(
isinstance(k, str) and isinstance(v, str)
for (k, v) in env_vars.items())):
raise TypeError("runtime_env['env_vars'] must be of type"
"Dict[str, str]")
if "_ray_release" in runtime_env_json:
self._dict["_ray_release"] = runtime_env_json["_ray_release"]
if "_ray_commit" in runtime_env_json:
self._dict["_ray_commit"] = runtime_env_json["_ray_commit"]
else:
if self._dict.get("pip") or self._dict.get("conda"):
self._dict["_ray_commit"] = ray.__commit__
# Used for testing wheels that have not yet been merged into master.
# If this is set to True, then we do not inject Ray into the conda
# or pip dependencies.
if os.environ.get("RAY_RUNTIME_ENV_LOCAL_DEV_MODE"):
runtime_env_json["_inject_current_ray"] = True
if "_inject_current_ray" in runtime_env_json:
self._dict["_inject_current_ray"] = runtime_env_json[
"_inject_current_ray"]
# TODO(ekl) we should have better schema validation here.
# TODO(ekl) support py_modules
# TODO(architkulkarni) support docker
# TODO(architkulkarni) This is to make it easy for the worker caching
# code in C++ to check if the env is empty without deserializing and
# parsing it. We should use a less confusing approach here.
if all(val is None for val in self._dict.values()):
self._dict = {}
def get_parsed_dict(self) -> dict:
return self._dict
def serialize(self) -> str:
# Use sort_keys=True because we will use the output as a key to cache
# workers by, so we need the serialization to be independent of the
# dict order.
return json.dumps(self._dict, sort_keys=True)
def set_uris(self, uris):
self._dict["uris"] = uris
class Protocol(Enum):
"""A enum for supported backend storage."""
@ -250,7 +61,7 @@ def _dir_travel(
try:
handler(path)
except Exception as e:
logger.error(f"Issue with path: {path}")
_logger.error(f"Issue with path: {path}")
raise e
if path.is_dir():
for sub_path in path.iterdir():
@ -269,7 +80,7 @@ def _zip_module(root: Path, relative_path: Path, excludes: Optional[Callable],
None) is None or path.is_file():
file_size = path.stat().st_size
if file_size >= FILE_SIZE_WARNING:
logger.warning(
_logger.warning(
f"File {path} is very large ({file_size} bytes). "
"Consider excluding this file from the working directory.")
to_path = path.relative_to(relative_path)
@ -447,7 +258,7 @@ def fetch_package(pkg_uri: str) -> int:
if local_dir.exists():
assert local_dir.is_dir(), f"{local_dir} is not a directory"
return local_dir
logger.debug("Fetch packge")
_logger.debug("Fetch packge")
(protocol, pkg_name) = _parse_uri(pkg_uri)
if protocol in (Protocol.GCS, Protocol.PIN_GCS):
code = _internal_kv_get(pkg_uri)
@ -458,7 +269,7 @@ def fetch_package(pkg_uri: str) -> int:
else:
raise NotImplementedError(f"Protocol {protocol} is not supported")
logger.debug(f"Unpack {pkg_file} to {local_dir}")
_logger.debug(f"Unpack {pkg_file} to {local_dir}")
with ZipFile(str(pkg_file), "r") as zip_ref:
zip_ref.extractall(local_dir)
pkg_file.unlink()
@ -561,14 +372,14 @@ def upload_runtime_env_package_if_needed(job_config: JobConfig):
working_dir = job_config.runtime_env.get("working_dir")
py_modules = job_config.runtime_env.get("py_modules")
excludes = job_config.runtime_env.get("excludes") or []
logger.info(f"{pkg_uri} doesn't exist. Create new package with"
f" {working_dir} and {py_modules}")
_logger.info(f"{pkg_uri} doesn't exist. Create new package with"
f" {working_dir} and {py_modules}")
if not pkg_file.exists():
create_project_package(working_dir, py_modules, excludes,
file_path)
# Push the data to remote storage
pkg_size = push_package(pkg_uri, pkg_file)
logger.info(f"{pkg_uri} has been pushed with {pkg_size} bytes")
_logger.info(f"{pkg_uri} has been pushed with {pkg_size} bytes")
def ensure_runtime_env_setup(pkg_uris: List[str]) -> Optional[str]:
@ -593,33 +404,28 @@ def ensure_runtime_env_setup(pkg_uris: List[str]) -> Optional[str]:
with FileLock(str(pkg_file) + ".lock"):
pkg_dir = fetch_package(pkg_uri)
sys.path.insert(0, str(pkg_dir))
# Right now, multiple pkg_uris are not supported correctly.
# We return the last one as working directory
return str(pkg_dir) if pkg_dir else None
def override_task_or_actor_runtime_env(
runtime_env: Optional[Dict[str, Any]],
parent_runtime_env: Dict[str, Any]) -> Dict[str, Any]:
if runtime_env:
if runtime_env.get("working_dir"):
raise NotImplementedError(
"Overriding working_dir for actors is not supported. "
"Please use ray.init(runtime_env={'working_dir': ...}) "
"to configure per-job environment instead.")
# NOTE(edoakes): this is sort of hacky, but we manually add the right
# working_dir here so the relative path to a requirements.txt file
# works. The right solution would be to merge the runtime_env with the
# parent runtime env before validation.
if parent_runtime_env.get("working_dir"):
runtime_env["working_dir"] = parent_runtime_env["working_dir"]
runtime_env_dict = RuntimeEnvDict(runtime_env).get_parsed_dict()
else:
runtime_env_dict = {}
def setup_working_dir(runtime_env: dict,
context: RuntimeEnvContext,
logger: Optional[logging.Logger] = None):
if not runtime_env.get("uris"):
return
# If per-actor URIs aren't specified, override them with those in the
# job config.
if "uris" not in runtime_env_dict and "uris" in parent_runtime_env:
runtime_env_dict["uris"] = parent_runtime_env.get("uris")
# Overwrite the module-wide logger and PKG_DIR temporarily.
# TODO(edoakes): we should be able to remove this by refactoring the
# working_dir setup code into a class instead of using global vars.
global _logger, PKG_DIR
prev_logger = _logger
prev_pkg_dir = PKG_DIR
_logger = logger
PKG_DIR = context.session_dir
return runtime_env_dict
context.working_dir = ensure_runtime_env_setup(runtime_env["uris"])
PKG_DIR = prev_pkg_dir
_logger = prev_logger

View file

@ -1335,7 +1335,6 @@ def start_raylet(redis_address,
worker_path,
setup_worker_path,
worker_setup_hook,
runtime_env_setup_hook,
temp_dir,
session_dir,
resource_dir,
@ -1379,8 +1378,6 @@ def start_raylet(redis_address,
worker_setup_hook to set up the environment for the worker process.
worker_setup_hook (str): The module path to a Python function that will
be imported and run to set up the environment for the worker.
runtime_env_setup_hook (str): The module path to a Python function that
will be imported and run to set up the runtime env in agent.
temp_dir (str): The path of the temporary directory Ray will use.
session_dir (str): The path of this session.
resource_dir(str): The path of resource of this session .
@ -1535,7 +1532,6 @@ def start_raylet(redis_address,
f"--temp-dir={temp_dir}",
f"--session-dir={session_dir}",
f"--runtime-env-dir={resource_dir}",
f"--runtime-env-setup-hook={runtime_env_setup_hook}",
f"--log-dir={log_dir}",
f"--logging-rotate-bytes={max_bytes}",
f"--logging-rotate-backup-count={backup_count}",

View file

@ -12,6 +12,7 @@ import timeit
import socket
import math
import traceback
import logging
from typing import Optional, Any, List, Dict
from contextlib import redirect_stdout, redirect_stderr
import yaml
@ -22,8 +23,7 @@ import ray._private.utils
import ray._private.gcs_utils as gcs_utils
from ray.util.queue import Queue, _QueueActor, Empty
from ray.scripts.scripts import main as ray_main
from ray.workers.pluggable_runtime_env import (RuntimeEnvContext,
get_hook_logger)
from ray._private.runtime_env import RuntimeEnvContext
try:
from prometheus_client.parser import text_string_to_metric_families
except (ImportError, ModuleNotFoundError):
@ -625,13 +625,15 @@ def set_setup_func():
runtime_env.VAR = "hello world"
def sleep_setup_runtime_env(runtime_env: dict, session_dir):
logger = get_hook_logger()
def sleep_setup_runtime_env(runtime_env: dict,
context: RuntimeEnvContext,
logger=None):
logger = logging.getLogger(__name__)
logger.info(f"Setting up runtime environment {runtime_env}")
logger.info("Simulating long runtime env setup. Sleeping for 15s...")
time.sleep(15)
logger.info("Finished sleeping for 15s")
return RuntimeEnvContext()
return
class BatchQueue(Queue):

View file

@ -19,7 +19,7 @@ import tempfile
import yaml
import ray
import ray._private.runtime_env as runtime_support
from ray._private.runtime_env import working_dir as working_dir_pkg
def load_package(config_path: str) -> "_RuntimePackage":
@ -68,22 +68,22 @@ def load_package(config_path: str) -> "_RuntimePackage":
# Autofill working directory by uploading to GCS storage.
if "working_dir" not in runtime_env:
pkg_name = runtime_support.get_project_package_name(
pkg_name = working_dir_pkg.get_project_package_name(
working_dir=base_dir, py_modules=[], excludes=[])
pkg_uri = runtime_support.Protocol.GCS.value + "://" + pkg_name
pkg_uri = working_dir_pkg.Protocol.GCS.value + "://" + pkg_name
def do_register_package():
if not runtime_support.package_exists(pkg_uri):
if not working_dir_pkg.package_exists(pkg_uri):
tmp_path = os.path.join(_pkg_tmp(), "_tmp{}".format(pkg_name))
runtime_support.create_project_package(
working_dir_pkg.create_project_package(
working_dir=base_dir,
py_modules=[],
excludes=[],
output_path=tmp_path)
# TODO(ekl) does this get garbage collected correctly with the
# current job id?
runtime_support.push_package(pkg_uri, tmp_path)
if not runtime_support.package_exists(pkg_uri):
working_dir_pkg.push_package(pkg_uri, tmp_path)
if not working_dir_pkg.package_exists(pkg_uri):
raise RuntimeError(
"Failed to upload package {}".format(pkg_uri))

View file

@ -22,6 +22,7 @@ import ray.ray_constants as ray_constants
import ray._private.services
import ray._private.utils
from ray._private.resource_spec import ResourceSpec
from ray._private.runtime_env import working_dir as working_dir_pkg
from ray._private.utils import (try_to_create_directory, try_to_symlink,
open_log)
@ -326,8 +327,7 @@ class Node:
self._resource_dir = os.path.join(self._session_dir,
"runtime_resources")
try_to_create_directory(self._resource_dir)
import ray._private.runtime_env as runtime_env
runtime_env.PKG_DIR = self._resource_dir
working_dir_pkg.PKG_DIR = self._resource_dir
def get_resource_spec(self):
"""Resolve and return the current resource spec for the node."""
@ -810,7 +810,6 @@ class Node:
self._ray_params.worker_path,
self._ray_params.setup_worker_path,
self._ray_params.worker_setup_hook,
self._ray_params.runtime_env_setup_hook,
self._temp_dir,
self._session_dir,
self._resource_dir,

View file

@ -247,10 +247,6 @@ REDIS_DEFAULT_PASSWORD = "5241590000000000"
# The default module path to a Python function that sets up the worker env.
DEFAULT_WORKER_SETUP_HOOK = "ray.workers.setup_runtime_env.setup_worker"
# The default module path to a Python function that sets up runtime envs.
DEFAULT_RUNTIME_ENV_SETUP_HOOK = \
"ray.workers.setup_runtime_env.setup_runtime_env"
# The default ip address to bind to.
NODE_DEFAULT_IP = "127.0.0.1"

View file

@ -475,13 +475,6 @@ def debug(address):
type=str,
help="Module path to the Python function that will be used to set up the "
"environment for the worker process.")
@click.option(
"--runtime-env-setup-hook",
hidden=True,
default=ray_constants.DEFAULT_RUNTIME_ENV_SETUP_HOOK,
type=str,
help="Module path to the Python function that will be used to set up the "
"runtime env in agent.")
@click.option(
"--ray-debugger-external",
is_flag=True,
@ -489,19 +482,18 @@ def debug(address):
help="Make the Ray debugger available externally to the node. This is only"
"safe to activate if the node is behind a firewall.")
@add_click_options(logging_options)
def start(node_ip_address, address, port, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, gcs_server_port,
min_worker_port, max_worker_port, worker_port_list,
ray_client_server_port, memory, object_store_memory,
redis_max_memory, num_cpus, num_gpus, resources, head,
include_dashboard, dashboard_host, dashboard_port,
dashboard_agent_listen_port, block, plasma_directory,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir,
system_config, lru_evict, enable_object_reconstruction,
metrics_export_port, no_monitor, tracing_startup_hook,
worker_setup_hook, runtime_env_setup_hook, ray_debugger_external,
log_style, log_color, verbose):
def start(
node_ip_address, address, port, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, gcs_server_port,
min_worker_port, max_worker_port, worker_port_list,
ray_client_server_port, memory, object_store_memory, redis_max_memory,
num_cpus, num_gpus, resources, head, include_dashboard, dashboard_host,
dashboard_port, dashboard_agent_listen_port, block, plasma_directory,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, system_config,
lru_evict, enable_object_reconstruction, metrics_export_port,
no_monitor, tracing_startup_hook, worker_setup_hook,
ray_debugger_external, log_style, log_color, verbose):
"""Start Ray processes manually on the local machine."""
cli_logger.configure(log_style, log_color, verbose)
if gcs_server_port and not head:
@ -562,7 +554,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
no_monitor=no_monitor,
tracing_startup_hook=tracing_startup_hook,
worker_setup_hook=worker_setup_hook,
runtime_env_setup_hook=runtime_env_setup_hook,
ray_debugger_external=ray_debugger_external)
if head:
# Use default if port is none, allocate an available port if port is 0

View file

@ -9,20 +9,23 @@ from pathlib import Path
import ray
from ray.exceptions import RuntimeEnvSetupError
import ray.experimental.internal_kv as kv
from ray._private.test_utils import (
run_string_as_driver, run_string_as_driver_nonblocking, wait_for_condition)
from ray._private.runtime_env import working_dir as working_dir_pkg
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
get_release_wheel_url)
import ray.experimental.internal_kv as kv
from time import sleep
driver_script = """
from time import sleep
import sys
import logging
import os
import sys
import time
sys.path.insert(0, "{working_dir}")
import ray
import ray.util
import os
try:
import test_module
@ -84,7 +87,7 @@ if os.environ.get("USE_RAY_CLIENT"):
ray.util.disconnect()
else:
ray.shutdown()
sleep(10)
time.sleep(10)
"""
@ -119,7 +122,7 @@ from test_module.test import one
def start_client_server(cluster, client_mode):
from ray._private.runtime_env import PKG_DIR
from ray._private.runtime_env.working_dir import PKG_DIR
if not client_mode:
return (cluster.address, {}, PKG_DIR)
ray.worker._global_node._ray_params.ray_client_server_port = "10003"
@ -174,7 +177,7 @@ def test_travel():
item_num += 1
construct(root)
exclude_spec = ray._private.runtime_env._get_excludes(root, excludes)
exclude_spec = working_dir_pkg._get_excludes(root, excludes)
visited_dir_paths = set()
visited_file_paths = set()
@ -185,7 +188,7 @@ def test_travel():
with open(path) as f:
visited_file_paths.add((str(path), f.read()))
ray._private.runtime_env._dir_travel(root, [exclude_spec], handler)
working_dir_pkg._dir_travel(root, [exclude_spec], handler)
assert file_paths == visited_file_paths
assert dir_paths == visited_dir_paths
@ -264,6 +267,7 @@ def test_single_node(ray_start_cluster_head, working_dir, client_mode):
execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))"
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
print(out)
assert out.strip().split()[-1] == "1000"
assert len(list(Path(PKG_DIR).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@ -461,13 +465,13 @@ print(ray.get_runtime_context().runtime_env["working_dir"])
def test_two_node_uri(two_node_cluster, working_dir, client_mode):
cluster, _ = two_node_cluster
(address, env, PKG_DIR) = start_client_server(cluster, client_mode)
import ray._private.runtime_env as runtime_env
import tempfile
with tempfile.NamedTemporaryFile(suffix="zip") as tmp_file:
pkg_name = runtime_env.get_project_package_name(working_dir, [], [])
pkg_uri = runtime_env.Protocol.PIN_GCS.value + "://" + pkg_name
runtime_env.create_project_package(working_dir, [], [], tmp_file.name)
runtime_env.push_package(pkg_uri, tmp_file.name)
pkg_name = working_dir_pkg.get_project_package_name(
working_dir, [], [])
pkg_uri = working_dir_pkg.Protocol.PIN_GCS.value + "://" + pkg_name
working_dir_pkg.create_project_package(working_dir, [], [],
tmp_file.name)
working_dir_pkg.push_package(pkg_uri, tmp_file.name)
runtime_env = f"""{{ "uris": ["{pkg_uri}"] }}"""
# Execute the following cmd in driver with runtime_env
execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))"
@ -521,8 +525,7 @@ print(sum(ray.get([test_actor.one.remote()] * 1000)))
test_actor = ray.get_actor("test_actor")
assert sum(ray.get([test_actor.one.remote()] * 1000)) == 1000
ray.kill(test_actor)
from time import sleep
sleep(5)
time.sleep(5)
assert len(list(Path(PKG_DIR).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@ -536,13 +539,13 @@ def test_jobconfig_compatible_1(ray_start_cluster_head, working_dir):
runtime_env = None
# To make the first one hanging there
execute_statement = """
sleep(600)
time.sleep(600)
"""
script = driver_script.format(**locals())
# Have one running with job config = None
proc = run_string_as_driver_nonblocking(script, env)
# waiting it to be up
sleep(5)
time.sleep(5)
runtime_env = f"""{{ "working_dir": "{working_dir}" }}"""
# Execute the second one which should work because Ray Client servers.
execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))"
@ -562,11 +565,11 @@ def test_jobconfig_compatible_2(ray_start_cluster_head, working_dir):
runtime_env = """{ "py_modules": [test_module.__path__[0]] }"""
# To make the first one hanging there
execute_statement = """
sleep(600)
time.sleep(600)
"""
script = driver_script.format(**locals())
proc = run_string_as_driver_nonblocking(script, env)
sleep(5)
time.sleep(5)
runtime_env = None
# Execute the following in the second one which should
# succeed
@ -587,11 +590,11 @@ def test_jobconfig_compatible_3(ray_start_cluster_head, working_dir):
runtime_env = """{ "py_modules": [test_module.__path__[0]] }"""
# To make the first one hanging ther
execute_statement = """
sleep(600)
time.sleep(600)
"""
script = driver_script.format(**locals())
proc = run_string_as_driver_nonblocking(script, env)
sleep(5)
time.sleep(5)
runtime_env = f"""
{{ "working_dir": test_module.__path__[0] }}""" # noqa: F541
# Execute the following cmd in the second one and ensure that
@ -830,29 +833,12 @@ def test_invalid_conda_env(shutdown_only):
@pytest.mark.skipif(
sys.platform == "win32", reason="runtime_env unsupported on Windows.")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"_system_config": {
"event_stats_print_interval_ms": 100,
"debug_dump_period_milliseconds": 100,
"event_stats": True
}
}],
indirect=True)
def test_no_spurious_worker_startup(ray_start_cluster):
def test_no_spurious_worker_startup(shutdown_only):
"""Test that no extra workers start up during a long env installation."""
cluster = ray_start_cluster
# This hook sleeps for 15 seconds to simulate creating a runtime env.
cluster.add_node(
num_cpus=1,
runtime_env_setup_hook=(
"ray._private.test_utils.sleep_setup_runtime_env"))
# Set a nonempty runtime env so that the runtime env setup hook is called.
runtime_env = {"env_vars": {"a": "b"}}
ray.init(address=cluster.address)
# Causes agent to sleep for 15 seconds to simulate creating a runtime env.
os.environ["RAY_RUNTIME_ENV_SLEEP_FOR_TESTING_S"] = "15"
ray.init(num_cpus=1)
@ray.remote
class Counter(object):
@ -862,6 +848,9 @@ def test_no_spurious_worker_startup(ray_start_cluster):
def get(self):
return self.value
# Set a nonempty runtime env so that the runtime env setup hook is called.
runtime_env = {"env_vars": {"a": "b"}}
# Instantiate an actor that requires the long runtime env installation.
a = Counter.options(runtime_env=runtime_env).remote()
assert ray.get(a.get.remote()) == 0
@ -904,7 +893,7 @@ def test_large_file_boundary(shutdown_only):
os.chdir(tmp_dir)
# Check that packages just under the max size work as expected.
size = ray._private.runtime_env.GCS_STORAGE_MAX_SIZE - 1024 * 1024
size = working_dir_pkg.GCS_STORAGE_MAX_SIZE - 1024 * 1024
with open("test_file", "wb") as f:
f.write(os.urandom(size))
@ -929,7 +918,7 @@ def test_large_file_error(shutdown_only):
# Write to two separate files, each of which is below the threshold to
# make sure the error is for the full package size.
size = ray._private.runtime_env.GCS_STORAGE_MAX_SIZE // 2 + 1
size = working_dir_pkg.GCS_STORAGE_MAX_SIZE // 2 + 1
with open("test_file_1", "wb") as f:
f.write(os.urandom(size))

View file

@ -1,29 +1,27 @@
import json
import os
from contextlib import contextmanager
from typing import List
from ray.workers.setup_runtime_env import inject_dependencies
import pytest
import sys
import unittest
import tempfile
import yaml
import time
import subprocess
from pathlib import Path
from unittest import mock
import pytest
import subprocess
import sys
import tempfile
import time
from typing import List
from unittest import mock, skipIf
import yaml
import ray
from ray._private.utils import get_conda_env_dir, get_conda_bin_executable
from ray._private.runtime_env import RuntimeEnvDict
from ray.workers.setup_runtime_env import (
from ray._private.runtime_env.conda import (
inject_dependencies,
_inject_ray_to_conda_site,
_resolve_install_from_source_ray_dependencies,
_current_py_version,
)
from ray._private.test_utils import (run_string_as_driver,
run_string_as_driver_nonblocking)
from ray._private.utils import get_conda_env_dir, get_conda_bin_executable
if not os.environ.get("CI"):
# This flags turns on the local development that link against current ray
@ -193,7 +191,6 @@ def test_job_config_conda_env(conda_envs, shutdown_only):
def test_get_conda_env_dir(tmp_path):
from pathlib import Path
"""
Typical output of `conda env list`, for context:
@ -474,7 +471,7 @@ def test_conda_input_filepath(use_working_dir, tmp_path):
assert output_conda_dict == conda_dict
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
@skipIf(sys.platform == "win32", "Fail to create temp dir.")
def test_experimental_package(shutdown_only):
ray.init(num_cpus=2)
pkg = ray.experimental.load_package(
@ -486,7 +483,7 @@ def test_experimental_package(shutdown_only):
assert ray.get(pkg.my_func.remote()) == "hello world"
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
@skipIf(sys.platform == "win32", "Fail to create temp dir.")
def test_experimental_package_lazy(shutdown_only):
pkg = ray.experimental.load_package(
os.path.join(
@ -498,7 +495,7 @@ def test_experimental_package_lazy(shutdown_only):
assert ray.get(pkg.my_func.remote()) == "hello world"
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
@skipIf(sys.platform == "win32", "Fail to create temp dir.")
def test_experimental_package_github(shutdown_only):
ray.init(num_cpus=2)
pkg = ray.experimental.load_package(

View file

@ -23,7 +23,7 @@ from ray.util.client.common import (ClientServerHandle,
CLIENT_SERVER_MAX_THREADS, GRPC_OPTIONS)
from ray._private.client_mode_hook import disable_client_hook
from ray._private.parameter import RayParams
import ray._private.runtime_env as runtime_pkg
import ray._private.runtime_env.working_dir as working_dir_pkg
from ray._private.services import ProcessInfo, start_ray_client_server
from ray._private.utils import detect_fate_sharing_support
@ -221,7 +221,7 @@ class ProxyManager():
uris = job_config.get_runtime_env_uris() if job_config else []
if uris:
# Download and set up the working_dir locally.
working_dir = runtime_pkg.ensure_runtime_env_setup(uris)
working_dir = working_dir_pkg.ensure_runtime_env_setup(uris)
# Set PYTHONPATH in the environment variables so the working_dir
# is included in the module search path.

View file

@ -515,14 +515,15 @@ class Worker:
else:
# Generate and upload URIs for the working directory. This
# uses internal_kv to upload to the GCS.
import ray._private.runtime_env as runtime_env
import ray._private.runtime_env.working_dir as working_dir_pkg
with tempfile.TemporaryDirectory() as tmp_dir:
(old_dir, runtime_env.PKG_DIR) = (runtime_env.PKG_DIR,
tmp_dir)
runtime_env.rewrite_runtime_env_uris(job_config)
runtime_env.upload_runtime_env_package_if_needed(
(old_dir,
working_dir_pkg.PKG_DIR) = (working_dir_pkg.PKG_DIR,
tmp_dir)
working_dir_pkg.rewrite_runtime_env_uris(job_config)
working_dir_pkg.upload_runtime_env_package_if_needed(
job_config)
runtime_env.PKG_DIR = old_dir
working_dir_pkg.PKG_DIR = old_dir
serialized_job_config = pickle.dumps(job_config)

View file

@ -27,7 +27,7 @@ import ray.remote_function
import ray.serialization as serialization
import ray._private.gcs_utils as gcs_utils
import ray._private.services as services
import ray._private.runtime_env as runtime_env_pkg
from ray._private.runtime_env import working_dir as working_dir_pkg
import ray._private.import_thread as import_thread
from ray.util.tracing.tracing_helper import import_from_string
from ray.util.annotations import PublicAPI, DeveloperAPI, Deprecated
@ -938,7 +938,7 @@ def init(
if driver_mode == SCRIPT_MODE and job_config:
# Rewrite the URI. Note the package isn't uploaded to the URI until
# later in the connect
runtime_env_pkg.rewrite_runtime_env_uris(job_config)
working_dir_pkg.rewrite_runtime_env_uris(job_config)
connect(
_global_node,
@ -1397,7 +1397,7 @@ def connect(node,
# environment here. If it's ray client, the environmen will be prepared
# at the server side.
if mode == SCRIPT_MODE and not job_config.client_job:
runtime_env_pkg.upload_runtime_env_package_if_needed(job_config)
working_dir_pkg.upload_runtime_env_package_if_needed(job_config)
# Notify raylet that the core worker is ready.
worker.core_worker.notify_raylet()

View file

@ -1,46 +0,0 @@
import contextlib
import json
import logging
import threading
logger = logging.getLogger(__name__)
thread_local_logger = threading.local()
thread_local_logger.logger = None
def get_hook_logger():
"""Retrieve a logger to be used by the setup hook function. Logs from this
logger will be streamed to driver.
"""
thread_logger = thread_local_logger.logger
if thread_logger is None:
logger.warning(
"Tried to receive the per job logger in runtime env agent but it "
"hasn't been properly setup, default to dashboard_agent logger.")
thread_logger = logger
return thread_logger
@contextlib.contextmanager
def using_thread_local_logger(new_logger):
"""Configure the logger to be used by the setup hook function. This sets
a logger to be retrieved by get_hook_logger function.
"""
thread_local_logger.logger = new_logger
yield
thread_local_logger.logger = None
class RuntimeEnvContext:
"""A context used to describe the created runtime env."""
def __init__(self, conda_env_name: str = None, working_dir: str = None):
self.conda_env_name = conda_env_name
self.working_dir = working_dir
def serialize(self) -> str:
return json.dumps(self.__dict__)
@staticmethod
def deserialize(json_string):
return RuntimeEnvContext(**json.loads(json_string))

View file

@ -3,24 +3,10 @@ import sys
import argparse
import json
import logging
import yaml
import hashlib
import subprocess
import runpy
import shutil
from filelock import FileLock
from typing import Optional, List, Dict, Any
from pathlib import Path
import ray
from ray._private.conda import (get_conda_activate_commands,
get_or_create_conda_env)
from ray._private.utils import try_to_create_directory
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
get_release_wheel_url)
from ray.workers.pluggable_runtime_env import (RuntimeEnvContext,
get_hook_logger)
from ray._private.runtime_env import RuntimeEnvContext
from ray._private.runtime_env.conda import setup_conda_or_pip
from ray._private.runtime_env.conda_utils import get_conda_activate_commands
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
@ -40,99 +26,6 @@ parser.add_argument(
"--session-dir", type=str, help="the directory for the current session")
def _resolve_current_ray_path():
# When ray is built from source with pip install -e,
# ray.__file__ returns .../python/ray/__init__.py.
# When ray is installed from a prebuilt binary, it returns
# .../site-packages/ray/__init__.py
return os.path.split(os.path.split(ray.__file__)[0])[0]
def _resolve_install_from_source_ray_dependencies():
"""Find the ray dependencies when Ray is install from source"""
ray_source_python_path = _resolve_current_ray_path()
setup_py_path = os.path.join(ray_source_python_path, "setup.py")
ray_install_requires = runpy.run_path(setup_py_path)[
"setup_spec"].install_requires
return ray_install_requires
def _inject_ray_to_conda_site(conda_path):
"""Write the current Ray site package directory to a new site"""
python_binary = os.path.join(conda_path, "bin/python")
site_packages_path = subprocess.check_output(
[python_binary, "-c",
"import site; print(site.getsitepackages()[0])"]).decode().strip()
ray_path = _resolve_current_ray_path()
logger = get_hook_logger()
logger.warning(f"Injecting {ray_path} to environment {conda_path} "
"because _inject_current_ray flag is on.")
maybe_ray_dir = os.path.join(site_packages_path, "ray")
if os.path.isdir(maybe_ray_dir):
logger.warning(f"Replacing existing ray installation with {ray_path}")
shutil.rmtree(maybe_ray_dir)
# See usage of *.pth file at
# https://docs.python.org/3/library/site.html
with open(os.path.join(site_packages_path, "ray.pth"), "w") as f:
f.write(ray_path)
def _current_py_version():
return ".".join(map(str, sys.version_info[:3])) # like 3.6.10
def setup_runtime_env(runtime_env: dict, session_dir):
logger = get_hook_logger()
logger.debug(f"Setting up runtime environment {runtime_env}")
if runtime_env.get("conda") or runtime_env.get("pip"):
conda_dict = get_conda_dict(runtime_env, session_dir)
if isinstance(runtime_env.get("conda"), str):
conda_env_name = runtime_env["conda"]
else:
assert conda_dict is not None
ray_pip = current_ray_pip_specifier()
if ray_pip:
extra_pip_dependencies = [ray_pip, "ray[default]"]
elif runtime_env.get("_inject_current_ray"):
extra_pip_dependencies = (
_resolve_install_from_source_ray_dependencies())
else:
extra_pip_dependencies = []
conda_dict = inject_dependencies(conda_dict, _current_py_version(),
extra_pip_dependencies)
logger.info(f"Setting up conda environment with {runtime_env}")
# It is not safe for multiple processes to install conda envs
# concurrently, even if the envs are different, so use a global
# lock for all conda installs.
# See https://github.com/ray-project/ray/issues/17086
file_lock_name = "ray-conda-install.lock"
with FileLock(os.path.join(session_dir, file_lock_name)):
conda_dir = os.path.join(session_dir, "runtime_resources",
"conda")
try_to_create_directory(conda_dir)
conda_yaml_path = os.path.join(conda_dir, "environment.yml")
with open(conda_yaml_path, "w") as file:
# Sort keys because we hash based on the file contents,
# and we don't want the hash to depend on the order
# of the dependencies.
yaml.dump(conda_dict, file, sort_keys=True)
conda_env_name = get_or_create_conda_env(
conda_yaml_path, conda_dir)
if runtime_env.get("_inject_current_ray"):
conda_path = os.path.join(conda_dir, conda_env_name)
_inject_ray_to_conda_site(conda_path)
logger.info(
f"Finished setting up runtime environment at {conda_env_name}")
return RuntimeEnvContext(conda_env_name)
return RuntimeEnvContext()
def setup_worker(input_args):
# remaining_args contains the arguments to the original worker command,
# minus the python executable, e.g. default_worker.py --node-ip-address=...
@ -149,8 +42,8 @@ def setup_worker(input_args):
# Ray client server setups runtime env by itself instead of agent.
if runtime_env.get("conda") or runtime_env.get("pip"):
if not args.serialized_runtime_env_context:
runtime_env_context = setup_runtime_env(runtime_env,
args.session_dir)
runtime_env_context = RuntimeEnvContext(args.session_dir)
setup_conda_or_pip(runtime_env, runtime_env_context, logger=logger)
if runtime_env_context and runtime_env_context.working_dir is not None:
commands += [f"cd {runtime_env_context.working_dir}"]
@ -196,129 +89,5 @@ def setup_worker(input_args):
os.execvp("bash", ["bash", "-c", command_str])
def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]:
""" Construct a conda dependencies dict from a runtime env.
This function does not inject Ray or Python into the conda dict.
If the runtime env does not specify pip or conda, or if it specifies
the name of a preinstalled conda environment, this function returns
None. If pip is specified, a conda dict is created containing the
pip dependencies. If conda is already given as a dict, this function
is the identity function.
"""
if runtime_env.get("conda"):
if isinstance(runtime_env["conda"], dict):
return runtime_env["conda"]
else:
return None
if runtime_env.get("pip"):
requirements_txt = runtime_env["pip"]
pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest()
pip_hash_str = f"pip-generated-{pip_hash}"
conda_dir = os.path.join(runtime_env_dir, "conda")
requirements_txt_path = os.path.join(
conda_dir, f"requirements-{pip_hash_str}.txt")
conda_dict = {
"name": pip_hash_str,
"dependencies": ["pip", {
"pip": [f"-r {requirements_txt_path}"]
}]
}
file_lock_name = f"ray-{pip_hash_str}.lock"
with FileLock(os.path.join(runtime_env_dir, file_lock_name)):
try_to_create_directory(conda_dir)
with open(requirements_txt_path, "w") as file:
file.write(requirements_txt)
return conda_dict
return None
def current_ray_pip_specifier() -> Optional[str]:
"""The pip requirement specifier for the running version of Ray.
Returns:
A string which can be passed to `pip install` to install the
currently running Ray version, or None if running on a version
built from source locally (likely if you are developing Ray).
Examples:
Returns "https://s3-us-west-2.amazonaws.com/ray-wheels/[..].whl"
if running a stable release, a nightly or a specific commit
"""
logger = get_hook_logger()
if os.environ.get("RAY_CI_POST_WHEEL_TESTS"):
# Running in Buildkite CI after the wheel has been built.
# Wheels are at in the ray/.whl directory, and the present file is
# at ray/python/ray/workers. Use relative paths to allow for
# testing locally if needed.
return os.path.join(
Path(__file__).resolve().parents[3], ".whl", get_wheel_filename())
elif ray.__commit__ == "{{RAY_COMMIT_SHA}}":
# Running on a version built from source locally.
if os.environ.get("RAY_RUNTIME_ENV_LOCAL_DEV_MODE") != "1":
logger.warning(
"Current Ray version could not be detected, most likely "
"because you have manually built Ray from source. To use "
"runtime_env in this case, set the environment variable "
"RAY_RUNTIME_ENV_LOCAL_DEV_MODE=1.")
return None
elif "dev" in ray.__version__:
# Running on a nightly wheel.
return get_master_wheel_url()
else:
return get_release_wheel_url()
def inject_dependencies(
conda_dict: Dict[Any, Any],
py_version: str,
pip_dependencies: Optional[List[str]] = None) -> Dict[Any, Any]:
"""Add Ray, Python and (optionally) extra pip dependencies to a conda dict.
Args:
conda_dict (dict): A dict representing the JSON-serialized conda
environment YAML file. This dict will be modified and returned.
py_version (str): A string representing a Python version to inject
into the conda dependencies, e.g. "3.7.7"
pip_dependencies (List[str]): A list of pip dependencies that
will be prepended to the list of pip dependencies in
the conda dict. If the conda dict does not already have a "pip"
field, one will be created.
Returns:
The modified dict. (Note: the input argument conda_dict is modified
and returned.)
"""
if pip_dependencies is None:
pip_dependencies = []
if conda_dict.get("dependencies") is None:
conda_dict["dependencies"] = []
# Inject Python dependency.
deps = conda_dict["dependencies"]
# Add current python dependency. If the user has already included a
# python version dependency, conda will raise a readable error if the two
# are incompatible, e.g:
# ResolvePackageNotFound: - python[version='3.5.*,>=3.6']
deps.append(f"python={py_version}")
if "pip" not in deps:
deps.append("pip")
# Insert pip dependencies.
found_pip_dict = False
for dep in deps:
if isinstance(dep, dict) and dep.get("pip") and isinstance(
dep["pip"], list):
dep["pip"] = pip_dependencies + dep["pip"]
found_pip_dict = True
break
if not found_pip_dict:
deps.append({"pip": pip_dependencies})
return conda_dict
if __name__ == "__main__":
setup_worker(sys.argv[1:])