diff --git a/dashboard/agent.py b/dashboard/agent.py index e3b84533f..a66358fd9 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -51,7 +51,6 @@ class DashboardAgent(object): temp_dir=None, session_dir=None, runtime_env_dir=None, - runtime_env_setup_hook=None, log_dir=None, metrics_export_port=None, node_manager_port=None, @@ -67,7 +66,6 @@ class DashboardAgent(object): self.temp_dir = temp_dir self.session_dir = session_dir self.runtime_env_dir = runtime_env_dir - self.runtime_env_setup_hook = runtime_env_setup_hook self.log_dir = log_dir self.dashboard_agent_port = dashboard_agent_port self.metrics_export_port = metrics_export_port @@ -323,13 +321,6 @@ if __name__ == "__main__": type=str, default=None, help="Specify the path of the resource directory used by runtime_env.") - parser.add_argument( - "--runtime-env-setup-hook", - required=True, - type=str, - default=None, - help="The module path to a Python function that" - "will be imported and run to set up the runtime env.") args = parser.parse_args() try: @@ -360,7 +351,6 @@ if __name__ == "__main__": temp_dir=args.temp_dir, session_dir=args.session_dir, runtime_env_dir=args.runtime_env_dir, - runtime_env_setup_hook=args.runtime_env_setup_hook, log_dir=args.log_dir, metrics_export_port=args.metrics_export_port, node_manager_port=args.node_manager_port, diff --git a/dashboard/modules/runtime_env/runtime_env_agent.py b/dashboard/modules/runtime_env/runtime_env_agent.py index cb418ee35..76514801b 100644 --- a/dashboard/modules/runtime_env/runtime_env_agent.py +++ b/dashboard/modules/runtime_env/runtime_env_agent.py @@ -2,7 +2,8 @@ import asyncio from dataclasses import dataclass import json import logging -from ray._private.ray_logging import setup_component_logger +import os +import time from typing import Dict from ray.core.generated import runtime_env_agent_pb2 @@ -13,13 +14,17 @@ from ray.experimental.internal_kv import (_initialize_internal_kv, import ray.new_dashboard.utils as dashboard_utils import ray.new_dashboard.modules.runtime_env.runtime_env_consts \ as runtime_env_consts -import ray._private.runtime_env as runtime_env_pkg -from ray._private.utils import import_attr -from ray.workers.pluggable_runtime_env import (RuntimeEnvContext, - using_thread_local_logger) +from ray._private.ray_logging import setup_component_logger +from ray._private.runtime_env.conda import setup_conda_or_pip +from ray._private.runtime_env.working_dir import setup_working_dir +from ray._private.runtime_env import RuntimeEnvContext logger = logging.getLogger(__name__) +# TODO(edoakes): this is used for unit tests. We should replace it with a +# better pluggability mechanism once available. +SLEEP_FOR_TESTING_S = os.environ.get("RAY_RUNTIME_ENV_SLEEP_FOR_TESTING_S") + @dataclass class CreatedEnvResult: @@ -40,12 +45,9 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule, def __init__(self, dashboard_agent): super().__init__(dashboard_agent) - self._session_dir = dashboard_agent.session_dir self._runtime_env_dir = dashboard_agent.runtime_env_dir - self._setup = import_attr(dashboard_agent.runtime_env_setup_hook) self._logging_params = dashboard_agent.logging_params self._per_job_logger_cache = dict() - runtime_env_pkg.PKG_DIR = dashboard_agent.runtime_env_dir # Cache the results of creating envs to avoid repeatedly calling into # conda and other slow calls. self._env_cache: Dict[str, CreatedEnvResult] = dict() @@ -68,22 +70,17 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule, return self._per_job_logger_cache[job_id] async def CreateRuntimeEnv(self, request, context): - async def _setup_runtime_env(serialized_runtime_env, session_dir): + async def _setup_runtime_env(serialized_runtime_env): # This function will be ran inside a thread def run_setup_with_logger(): runtime_env: dict = json.loads(serialized_runtime_env or "{}") - per_job_logger = self.get_or_create_logger(request.job_id) - # Here we set the logger context for the setup hook execution. - # The logger needs to be thread local because there can be - # setup hooks ran for arbitrary job in arbitrary threads. - with using_thread_local_logger(per_job_logger): - env_context = self._setup(runtime_env, session_dir) - if "uris" in runtime_env: - working_dir = runtime_env_pkg.ensure_runtime_env_setup( - runtime_env["uris"]) - env_context.working_dir = working_dir - return env_context + # Use a separate logger for each job. + per_job_logger = self.get_or_create_logger(request.job_id) + context = RuntimeEnvContext(self._runtime_env_dir) + setup_conda_or_pip(runtime_env, context, logger=per_job_logger) + setup_working_dir(runtime_env, context, logger=per_job_logger) + return context loop = asyncio.get_event_loop() return await loop.run_in_executor(None, run_setup_with_logger) @@ -113,13 +110,17 @@ class RuntimeEnvAgent(dashboard_utils.DashboardAgentModule, status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED, error_message=error_message) + if SLEEP_FOR_TESTING_S: + logger.info(f"Sleeping for {SLEEP_FOR_TESTING_S}s.") + time.sleep(int(SLEEP_FOR_TESTING_S)) + logger.info(f"Creating runtime env: {serialized_env}") runtime_env_context: RuntimeEnvContext = None error_message = None for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES): try: runtime_env_context = await _setup_runtime_env( - serialized_env, self._session_dir) + serialized_env) break except Exception as ex: logger.exception("Runtime env creation failed.") diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 9dd345117..c281ead90 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -76,8 +76,6 @@ class RayParams: worker_setup_hook to set up the environment for the worker process. worker_setup_hook (str): The module path to a Python function that will be imported and run to set up the environment for the worker. - runtime_env_setup_hook (str): The module path to a Python function that - will be imported and run to set up the runtime env in agent. huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. include_dashboard: Boolean flag indicating whether to start the web @@ -153,8 +151,6 @@ class RayParams: worker_path=None, setup_worker_path=None, worker_setup_hook=ray_constants.DEFAULT_WORKER_SETUP_HOOK, - runtime_env_setup_hook=ray_constants. - DEFAULT_RUNTIME_ENV_SETUP_HOOK, huge_pages=False, include_dashboard=None, dashboard_host=ray_constants.DEFAULT_DASHBOARD_IP, @@ -206,7 +202,6 @@ class RayParams: self.worker_path = worker_path self.setup_worker_path = setup_worker_path self.worker_setup_hook = worker_setup_hook - self.runtime_env_setup_hook = runtime_env_setup_hook self.huge_pages = huge_pages self.include_dashboard = include_dashboard self.dashboard_host = dashboard_host diff --git a/python/ray/_private/runtime_env/__init__.py b/python/ray/_private/runtime_env/__init__.py new file mode 100644 index 000000000..20401cb96 --- /dev/null +++ b/python/ray/_private/runtime_env/__init__.py @@ -0,0 +1,3 @@ +from ray._private.runtime_env.context import RuntimeEnvContext # noqa: F401 +from ray._private.runtime_env.validation import ( # noqa: F401 + override_task_or_actor_runtime_env, RuntimeEnvDict) # noqa: F401 diff --git a/python/ray/_private/runtime_env/conda.py b/python/ray/_private/runtime_env/conda.py new file mode 100644 index 000000000..454f145f7 --- /dev/null +++ b/python/ray/_private/runtime_env/conda.py @@ -0,0 +1,243 @@ +import os +import sys +import logging +import yaml +import hashlib +import subprocess +import runpy +import shutil + +from filelock import FileLock +from typing import Optional, List, Dict, Any +from pathlib import Path + +import ray +from ray._private.runtime_env import RuntimeEnvContext +from ray._private.runtime_env.conda_utils import get_or_create_conda_env +from ray._private.utils import try_to_create_directory +from ray._private.utils import (get_wheel_filename, get_master_wheel_url, + get_release_wheel_url) + + +def _resolve_current_ray_path(): + # When ray is built from source with pip install -e, + # ray.__file__ returns .../python/ray/__init__.py. + # When ray is installed from a prebuilt binary, it returns + # .../site-packages/ray/__init__.py + return os.path.split(os.path.split(ray.__file__)[0])[0] + + +def _resolve_install_from_source_ray_dependencies(): + """Find the ray dependencies when Ray is install from source""" + ray_source_python_path = _resolve_current_ray_path() + setup_py_path = os.path.join(ray_source_python_path, "setup.py") + ray_install_requires = runpy.run_path(setup_py_path)[ + "setup_spec"].install_requires + return ray_install_requires + + +def _inject_ray_to_conda_site(conda_path, + logger: Optional[logging.Logger] = None): + """Write the current Ray site package directory to a new site""" + if logger is None: + logger = logging.getLogger(__name__) + python_binary = os.path.join(conda_path, "bin/python") + site_packages_path = subprocess.check_output( + [python_binary, "-c", + "import site; print(site.getsitepackages()[0])"]).decode().strip() + + ray_path = _resolve_current_ray_path() + logger.warning(f"Injecting {ray_path} to environment {conda_path} " + "because _inject_current_ray flag is on.") + + maybe_ray_dir = os.path.join(site_packages_path, "ray") + if os.path.isdir(maybe_ray_dir): + logger.warning(f"Replacing existing ray installation with {ray_path}") + shutil.rmtree(maybe_ray_dir) + + # See usage of *.pth file at + # https://docs.python.org/3/library/site.html + with open(os.path.join(site_packages_path, "ray.pth"), "w") as f: + f.write(ray_path) + + +def _current_py_version(): + return ".".join(map(str, sys.version_info[:3])) # like 3.6.10 + + +def setup_conda_or_pip(runtime_env: dict, + context: RuntimeEnvContext, + logger: Optional[logging.Logger] = None): + if logger is None: + logger = logging.getLogger(__name__) + + if not runtime_env.get("conda") and not runtime_env.get("pip"): + return + + logger.debug(f"Setting up conda or pip for runtime_env: {runtime_env}") + conda_dict = get_conda_dict(runtime_env, context.session_dir) + if isinstance(runtime_env.get("conda"), str): + conda_env_name = runtime_env["conda"] + else: + assert conda_dict is not None + ray_pip = current_ray_pip_specifier(logger) + if ray_pip: + extra_pip_dependencies = [ray_pip, "ray[default]"] + elif runtime_env.get("_inject_current_ray"): + extra_pip_dependencies = ( + _resolve_install_from_source_ray_dependencies()) + else: + extra_pip_dependencies = [] + conda_dict = inject_dependencies(conda_dict, _current_py_version(), + extra_pip_dependencies) + logger.info(f"Setting up conda environment with {runtime_env}") + # It is not safe for multiple processes to install conda envs + # concurrently, even if the envs are different, so use a global + # lock for all conda installs. + # See https://github.com/ray-project/ray/issues/17086 + file_lock_name = "ray-conda-install.lock" + with FileLock(os.path.join(context.session_dir, file_lock_name)): + conda_dir = os.path.join(context.session_dir, "runtime_resources", + "conda") + try_to_create_directory(conda_dir) + conda_yaml_path = os.path.join(conda_dir, "environment.yml") + with open(conda_yaml_path, "w") as file: + # Sort keys because we hash based on the file contents, + # and we don't want the hash to depend on the order + # of the dependencies. + yaml.dump(conda_dict, file, sort_keys=True) + conda_env_name = get_or_create_conda_env( + conda_yaml_path, conda_dir, logger=logger) + + if runtime_env.get("_inject_current_ray"): + conda_path = os.path.join(conda_dir, conda_env_name) + _inject_ray_to_conda_site(conda_path, logger) + + context.conda_env_name = conda_env_name + logger.info(f"Finished setting up runtime environment at {conda_env_name}") + + +def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]: + """ Construct a conda dependencies dict from a runtime env. + + This function does not inject Ray or Python into the conda dict. + If the runtime env does not specify pip or conda, or if it specifies + the name of a preinstalled conda environment, this function returns + None. If pip is specified, a conda dict is created containing the + pip dependencies. If conda is already given as a dict, this function + is the identity function. + """ + if runtime_env.get("conda"): + if isinstance(runtime_env["conda"], dict): + return runtime_env["conda"] + else: + return None + if runtime_env.get("pip"): + requirements_txt = runtime_env["pip"] + pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest() + pip_hash_str = f"pip-generated-{pip_hash}" + + conda_dir = os.path.join(runtime_env_dir, "conda") + requirements_txt_path = os.path.join( + conda_dir, f"requirements-{pip_hash_str}.txt") + conda_dict = { + "name": pip_hash_str, + "dependencies": ["pip", { + "pip": [f"-r {requirements_txt_path}"] + }] + } + file_lock_name = f"ray-{pip_hash_str}.lock" + with FileLock(os.path.join(runtime_env_dir, file_lock_name)): + try_to_create_directory(conda_dir) + with open(requirements_txt_path, "w") as file: + file.write(requirements_txt) + return conda_dict + return None + + +def current_ray_pip_specifier( + logger: Optional[logging.Logger] = None) -> Optional[str]: + """The pip requirement specifier for the running version of Ray. + + Returns: + A string which can be passed to `pip install` to install the + currently running Ray version, or None if running on a version + built from source locally (likely if you are developing Ray). + + Examples: + Returns "https://s3-us-west-2.amazonaws.com/ray-wheels/[..].whl" + if running a stable release, a nightly or a specific commit + """ + if logger is None: + logger = logging.getLogger(__name__) + if os.environ.get("RAY_CI_POST_WHEEL_TESTS"): + # Running in Buildkite CI after the wheel has been built. + # Wheels are at in the ray/.whl directory, but use relative path to + # allow for testing locally if needed. + return os.path.join( + Path(ray.__file__).resolve().parents[2], ".whl", + get_wheel_filename()) + elif ray.__commit__ == "{{RAY_COMMIT_SHA}}": + # Running on a version built from source locally. + if os.environ.get("RAY_RUNTIME_ENV_LOCAL_DEV_MODE") != "1": + logger.warning( + "Current Ray version could not be detected, most likely " + "because you have manually built Ray from source. To use " + "runtime_env in this case, set the environment variable " + "RAY_RUNTIME_ENV_LOCAL_DEV_MODE=1.") + return None + elif "dev" in ray.__version__: + # Running on a nightly wheel. + return get_master_wheel_url() + else: + return get_release_wheel_url() + + +def inject_dependencies( + conda_dict: Dict[Any, Any], + py_version: str, + pip_dependencies: Optional[List[str]] = None) -> Dict[Any, Any]: + """Add Ray, Python and (optionally) extra pip dependencies to a conda dict. + + Args: + conda_dict (dict): A dict representing the JSON-serialized conda + environment YAML file. This dict will be modified and returned. + py_version (str): A string representing a Python version to inject + into the conda dependencies, e.g. "3.7.7" + pip_dependencies (List[str]): A list of pip dependencies that + will be prepended to the list of pip dependencies in + the conda dict. If the conda dict does not already have a "pip" + field, one will be created. + Returns: + The modified dict. (Note: the input argument conda_dict is modified + and returned.) + """ + if pip_dependencies is None: + pip_dependencies = [] + if conda_dict.get("dependencies") is None: + conda_dict["dependencies"] = [] + + # Inject Python dependency. + deps = conda_dict["dependencies"] + + # Add current python dependency. If the user has already included a + # python version dependency, conda will raise a readable error if the two + # are incompatible, e.g: + # ResolvePackageNotFound: - python[version='3.5.*,>=3.6'] + deps.append(f"python={py_version}") + + if "pip" not in deps: + deps.append("pip") + + # Insert pip dependencies. + found_pip_dict = False + for dep in deps: + if isinstance(dep, dict) and dep.get("pip") and isinstance( + dep["pip"], list): + dep["pip"] = pip_dependencies + dep["pip"] + found_pip_dict = True + break + if not found_pip_dict: + deps.append({"pip": pip_dependencies}) + + return conda_dict diff --git a/python/ray/_private/conda.py b/python/ray/_private/runtime_env/conda_utils.py similarity index 96% rename from python/ray/_private/conda.py rename to python/ray/_private/runtime_env/conda_utils.py index 484052f32..f4dc73cc0 100644 --- a/python/ray/_private/conda.py +++ b/python/ray/_private/runtime_env/conda_utils.py @@ -4,12 +4,8 @@ import subprocess import hashlib import json from typing import Optional, List, Union, Tuple - -from ray.workers.pluggable_runtime_env import get_hook_logger """Utilities for conda. Adapted from https://github.com/mlflow/mlflow.""" -logger = logging.getLogger(__name__) - # Name of environment variable indicating a path to a conda installation. Ray # will default to running "conda" if unset. RAY_CONDA_HOME = "RAY_CONDA_HOME" @@ -67,7 +63,8 @@ def _get_conda_env_name(conda_env_path: str) -> str: def get_or_create_conda_env(conda_env_path: str, - base_dir: Optional[str] = None) -> str: + base_dir: Optional[str] = None, + logger: Optional[logging.Logger] = None) -> str: """ Given a conda YAML, creates a conda environment containing the required dependencies if such a conda environment doesn't already exist. Returns the @@ -83,7 +80,8 @@ def get_or_create_conda_env(conda_env_path: str, In either case, the return value should be valid to pass in to `conda activate`. """ - logger = get_hook_logger() + if logger is None: + logger = logging.getLogger(__name__) conda_path = get_conda_bin_executable("conda") try: exec_cmd([conda_path, "--help"], throw_on_error=False) diff --git a/python/ray/_private/runtime_env/context.py b/python/ray/_private/runtime_env/context.py new file mode 100644 index 000000000..9f301c930 --- /dev/null +++ b/python/ray/_private/runtime_env/context.py @@ -0,0 +1,20 @@ +import json + + +class RuntimeEnvContext: + """A context used to describe the created runtime env.""" + + def __init__(self, + session_dir: str, + conda_env_name: str = None, + working_dir: str = None): + self.conda_env_name: str = conda_env_name + self.session_dir: str = session_dir + self.working_dir: str = working_dir + + def serialize(self) -> str: + return json.dumps(self.__dict__) + + @staticmethod + def deserialize(json_string): + return RuntimeEnvContext(**json.loads(json_string)) diff --git a/python/ray/_private/runtime_env/validation.py b/python/ray/_private/runtime_env/validation.py new file mode 100644 index 000000000..5924e1401 --- /dev/null +++ b/python/ray/_private/runtime_env/validation.py @@ -0,0 +1,231 @@ +import json +import logging +import os +from pathlib import Path +import sys +from typing import Any, Dict, Optional +import yaml + +import ray + +# We need to setup this variable before +# using this module +PKG_DIR = None + +logger = logging.getLogger(__name__) + +FILE_SIZE_WARNING = 10 * 1024 * 1024 # 10MiB +# NOTE(edoakes): we should be able to support up to 512 MiB based on the GCS' +# limit, but for some reason that causes failures when downloading. +GCS_STORAGE_MAX_SIZE = 100 * 1024 * 1024 # 100MiB + + +class RuntimeEnvDict: + """Parses and validates the runtime env dictionary from the user. + + Attributes: + working_dir (Path): Specifies the working directory of the worker. + This can either be a local directory or zip file. + Examples: + "." # cwd + "local_project.zip" # archive is unpacked into directory + py_modules (List[Path]): Similar to working_dir, but specifies python + modules to add to the `sys.path`. + Examples: + ["/path/to/other_module", "/other_path/local_project.zip"] + pip (List[str] | str): Either a list of pip packages, or a string + containing the path to a pip requirements.txt file. + conda (dict | str): Either the conda YAML config, the name of a + local conda env (e.g., "pytorch_p36"), or the path to a conda + environment.yaml file. + The Ray dependency will be automatically injected into the conda + env to ensure compatibility with the cluster Ray. The conda name + may be mangled automatically to avoid conflicts between runtime + envs. + This field cannot be specified at the same time as the 'pip' field. + To use pip with conda, please specify your pip dependencies within + the conda YAML config: + https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-e + nvironments.html#create-env-file-manually + Examples: + {"channels": ["defaults"], "dependencies": ["codecov"]} + "pytorch_p36" # Found on DLAMIs + container (dict): Require a given (Docker) container image, + The Ray worker process will run in a container with this image. + The `worker_path` is the default_worker.py path. + The `run_options` list spec is here: + https://docs.docker.com/engine/reference/run/ + Examples: + {"image": "anyscale/ray-ml:nightly-py38-cpu", + "worker_path": "/root/python/ray/workers/default_worker.py", + "run_options": ["--cap-drop SYS_ADMIN","--log-level=debug"]} + env_vars (dict): Environment variables to set. + Examples: + {"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"} + """ + + def __init__(self, runtime_env_json: dict): + # Simple dictionary with all options validated. This will always + # contain all supported keys; values will be set to None if + # unspecified. However, if all values are None this is set to {}. + self._dict = dict() + + if "working_dir" in runtime_env_json: + self._dict["working_dir"] = runtime_env_json["working_dir"] + if not isinstance(self._dict["working_dir"], str): + raise TypeError("`working_dir` must be a string. Type " + f"{type(self._dict['working_dir'])} received.") + working_dir = Path(self._dict["working_dir"]).absolute() + else: + self._dict["working_dir"] = None + working_dir = None + + self._dict["conda"] = None + if "conda" in runtime_env_json: + if sys.platform == "win32": + raise NotImplementedError("The 'conda' field in runtime_env " + "is not currently supported on " + "Windows.") + conda = runtime_env_json["conda"] + if isinstance(conda, str): + yaml_file = Path(conda) + if yaml_file.suffix in (".yaml", ".yml"): + if working_dir and not yaml_file.is_absolute(): + yaml_file = working_dir / yaml_file + if not yaml_file.is_file(): + raise ValueError( + f"Can't find conda YAML file {yaml_file}") + try: + self._dict["conda"] = yaml.safe_load( + yaml_file.read_text()) + except Exception as e: + raise ValueError( + f"Invalid conda file {yaml_file} with error {e}") + else: + logger.info( + f"Using preinstalled conda environment: {conda}") + self._dict["conda"] = conda + elif isinstance(conda, dict): + self._dict["conda"] = conda + elif conda is not None: + raise TypeError("runtime_env['conda'] must be of type str or " + "dict") + + self._dict["pip"] = None + if "pip" in runtime_env_json: + if sys.platform == "win32": + raise NotImplementedError("The 'pip' field in runtime_env " + "is not currently supported on " + "Windows.") + if ("conda" in runtime_env_json + and runtime_env_json["conda"] is not None): + raise ValueError( + "The 'pip' field and 'conda' field of " + "runtime_env cannot both be specified.\n" + f"specified pip field: {runtime_env_json['pip']}\n" + f"specified conda field: {runtime_env_json['conda']}\n" + "To use pip with conda, please only set the 'conda' " + "field, and specify your pip dependencies " + "within the conda YAML config dict: see " + "https://conda.io/projects/conda/en/latest/" + "user-guide/tasks/manage-environments.html" + "#create-env-file-manually") + pip = runtime_env_json["pip"] + if isinstance(pip, str): + # We have been given a path to a requirements.txt file. + pip_file = Path(pip) + if working_dir and not pip_file.is_absolute(): + pip_file = working_dir / pip_file + if not pip_file.is_file(): + raise ValueError(f"{pip_file} is not a valid file") + self._dict["pip"] = pip_file.read_text() + elif isinstance(pip, list) and all( + isinstance(dep, str) for dep in pip): + # Construct valid pip requirements.txt from list of packages. + self._dict["pip"] = "\n".join(pip) + "\n" + else: + raise TypeError("runtime_env['pip'] must be of type str or " + "List[str]") + + if "uris" in runtime_env_json: + self._dict["uris"] = runtime_env_json["uris"] + + if "container" in runtime_env_json: + self._dict["container"] = runtime_env_json["container"] + + self._dict["env_vars"] = None + if "env_vars" in runtime_env_json: + env_vars = runtime_env_json["env_vars"] + self._dict["env_vars"] = env_vars + if not (isinstance(env_vars, dict) and all( + isinstance(k, str) and isinstance(v, str) + for (k, v) in env_vars.items())): + raise TypeError("runtime_env['env_vars'] must be of type" + "Dict[str, str]") + + if "_ray_release" in runtime_env_json: + self._dict["_ray_release"] = runtime_env_json["_ray_release"] + + if "_ray_commit" in runtime_env_json: + self._dict["_ray_commit"] = runtime_env_json["_ray_commit"] + else: + if self._dict.get("pip") or self._dict.get("conda"): + self._dict["_ray_commit"] = ray.__commit__ + + # Used for testing wheels that have not yet been merged into master. + # If this is set to True, then we do not inject Ray into the conda + # or pip dependencies. + if os.environ.get("RAY_RUNTIME_ENV_LOCAL_DEV_MODE"): + runtime_env_json["_inject_current_ray"] = True + if "_inject_current_ray" in runtime_env_json: + self._dict["_inject_current_ray"] = runtime_env_json[ + "_inject_current_ray"] + + # TODO(ekl) we should have better schema validation here. + # TODO(ekl) support py_modules + # TODO(architkulkarni) support docker + + # TODO(architkulkarni) This is to make it easy for the worker caching + # code in C++ to check if the env is empty without deserializing and + # parsing it. We should use a less confusing approach here. + if all(val is None for val in self._dict.values()): + self._dict = {} + + def get_parsed_dict(self) -> dict: + return self._dict + + def serialize(self) -> str: + # Use sort_keys=True because we will use the output as a key to cache + # workers by, so we need the serialization to be independent of the + # dict order. + return json.dumps(self._dict, sort_keys=True) + + def set_uris(self, uris): + self._dict["uris"] = uris + + +def override_task_or_actor_runtime_env( + runtime_env: Optional[Dict[str, Any]], + parent_runtime_env: Dict[str, Any]) -> Dict[str, Any]: + if runtime_env: + if runtime_env.get("working_dir"): + raise NotImplementedError( + "Overriding working_dir for actors is not supported. " + "Please use ray.init(runtime_env={'working_dir': ...}) " + "to configure per-job environment instead.") + # NOTE(edoakes): this is sort of hacky, but we manually add the right + # working_dir here so the relative path to a requirements.txt file + # works. The right solution would be to merge the runtime_env with the + # parent runtime env before validation. + if parent_runtime_env.get("working_dir"): + runtime_env["working_dir"] = parent_runtime_env["working_dir"] + runtime_env_dict = RuntimeEnvDict(runtime_env).get_parsed_dict() + else: + runtime_env_dict = {} + + # If per-actor URIs aren't specified, override them with those in the + # job config. + if "uris" not in runtime_env_dict and "uris" in parent_runtime_env: + runtime_env_dict["uris"] = parent_runtime_env.get("uris") + + return runtime_env_dict diff --git a/python/ray/_private/runtime_env.py b/python/ray/_private/runtime_env/working_dir.py similarity index 56% rename from python/ray/_private/runtime_env.py rename to python/ray/_private/runtime_env/working_dir.py index 35be92629..e48cbb135 100644 --- a/python/ray/_private/runtime_env.py +++ b/python/ray/_private/runtime_env/working_dir.py @@ -1,30 +1,25 @@ +from enum import Enum +from filelock import FileLock import hashlib import logging -import json -import yaml - -from filelock import FileLock +import os from pathlib import Path +import sys +from typing import Callable, List, Optional, Tuple +from urllib.parse import urlparse from zipfile import ZipFile -from ray._private.thirdparty.pathspec import PathSpec -from ray.job_config import JobConfig -from enum import Enum -import ray from ray.experimental.internal_kv import (_internal_kv_put, _internal_kv_get, _internal_kv_exists, _internal_kv_initialized) +from ray.job_config import JobConfig +from ray._private.thirdparty.pathspec import PathSpec +from ray._private.runtime_env import RuntimeEnvContext -from typing import Any, Callable, Dict, List, Optional, Tuple -from urllib.parse import urlparse -import os -import sys - -# We need to setup this variable before -# using this module +# We need to setup this variable before using this module. PKG_DIR = None -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) FILE_SIZE_WARNING = 10 * 1024 * 1024 # 10MiB # NOTE(edoakes): we should be able to support up to 512 MiB based on the GCS' @@ -32,190 +27,6 @@ FILE_SIZE_WARNING = 10 * 1024 * 1024 # 10MiB GCS_STORAGE_MAX_SIZE = 100 * 1024 * 1024 # 100MiB -class RuntimeEnvDict: - """Parses and validates the runtime env dictionary from the user. - - Attributes: - working_dir (Path): Specifies the working directory of the worker. - This can either be a local directory or zip file. - Examples: - "." # cwd - "local_project.zip" # archive is unpacked into directory - py_modules (List[Path]): Similar to working_dir, but specifies python - modules to add to the `sys.path`. - Examples: - ["/path/to/other_module", "/other_path/local_project.zip"] - pip (List[str] | str): Either a list of pip packages, or a string - containing the path to a pip requirements.txt file. - conda (dict | str): Either the conda YAML config, the name of a - local conda env (e.g., "pytorch_p36"), or the path to a conda - environment.yaml file. - The Ray dependency will be automatically injected into the conda - env to ensure compatibility with the cluster Ray. The conda name - may be mangled automatically to avoid conflicts between runtime - envs. - This field cannot be specified at the same time as the 'pip' field. - To use pip with conda, please specify your pip dependencies within - the conda YAML config: - https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-e - nvironments.html#create-env-file-manually - Examples: - {"channels": ["defaults"], "dependencies": ["codecov"]} - "pytorch_p36" # Found on DLAMIs - container (dict): Require a given (Docker) container image, - The Ray worker process will run in a container with this image. - The `worker_path` is the default_worker.py path. - The `run_options` list spec is here: - https://docs.docker.com/engine/reference/run/ - Examples: - {"image": "anyscale/ray-ml:nightly-py38-cpu", - "worker_path": "/root/python/ray/workers/default_worker.py", - "run_options": ["--cap-drop SYS_ADMIN","--log-level=debug"]} - env_vars (dict): Environment variables to set. - Examples: - {"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"} - """ - - def __init__(self, runtime_env_json: dict): - # Simple dictionary with all options validated. This will always - # contain all supported keys; values will be set to None if - # unspecified. However, if all values are None this is set to {}. - self._dict = dict() - - if "working_dir" in runtime_env_json: - self._dict["working_dir"] = runtime_env_json["working_dir"] - if not isinstance(self._dict["working_dir"], str): - raise TypeError("`working_dir` must be a string. Type " - f"{type(self._dict['working_dir'])} received.") - working_dir = Path(self._dict["working_dir"]).absolute() - else: - self._dict["working_dir"] = None - working_dir = None - - self._dict["conda"] = None - if "conda" in runtime_env_json: - if sys.platform == "win32": - raise NotImplementedError("The 'conda' field in runtime_env " - "is not currently supported on " - "Windows.") - conda = runtime_env_json["conda"] - if isinstance(conda, str): - yaml_file = Path(conda) - if yaml_file.suffix in (".yaml", ".yml"): - if working_dir and not yaml_file.is_absolute(): - yaml_file = working_dir / yaml_file - if not yaml_file.is_file(): - raise ValueError( - f"Can't find conda YAML file {yaml_file}") - try: - self._dict["conda"] = yaml.safe_load( - yaml_file.read_text()) - except Exception as e: - raise ValueError( - f"Invalid conda file {yaml_file} with error {e}") - else: - logger.info( - f"Using preinstalled conda environment: {conda}") - self._dict["conda"] = conda - elif isinstance(conda, dict): - self._dict["conda"] = conda - elif conda is not None: - raise TypeError("runtime_env['conda'] must be of type str or " - "dict") - - self._dict["pip"] = None - if "pip" in runtime_env_json: - if sys.platform == "win32": - raise NotImplementedError("The 'pip' field in runtime_env " - "is not currently supported on " - "Windows.") - if ("conda" in runtime_env_json - and runtime_env_json["conda"] is not None): - raise ValueError( - "The 'pip' field and 'conda' field of " - "runtime_env cannot both be specified.\n" - f"specified pip field: {runtime_env_json['pip']}\n" - f"specified conda field: {runtime_env_json['conda']}\n" - "To use pip with conda, please only set the 'conda' " - "field, and specify your pip dependencies " - "within the conda YAML config dict: see " - "https://conda.io/projects/conda/en/latest/" - "user-guide/tasks/manage-environments.html" - "#create-env-file-manually") - pip = runtime_env_json["pip"] - if isinstance(pip, str): - # We have been given a path to a requirements.txt file. - pip_file = Path(pip) - if working_dir and not pip_file.is_absolute(): - pip_file = working_dir / pip_file - if not pip_file.is_file(): - raise ValueError(f"{pip_file} is not a valid file") - self._dict["pip"] = pip_file.read_text() - elif isinstance(pip, list) and all( - isinstance(dep, str) for dep in pip): - # Construct valid pip requirements.txt from list of packages. - self._dict["pip"] = "\n".join(pip) + "\n" - else: - raise TypeError("runtime_env['pip'] must be of type str or " - "List[str]") - - if "uris" in runtime_env_json: - self._dict["uris"] = runtime_env_json["uris"] - - if "container" in runtime_env_json: - self._dict["container"] = runtime_env_json["container"] - - self._dict["env_vars"] = None - if "env_vars" in runtime_env_json: - env_vars = runtime_env_json["env_vars"] - self._dict["env_vars"] = env_vars - if not (isinstance(env_vars, dict) and all( - isinstance(k, str) and isinstance(v, str) - for (k, v) in env_vars.items())): - raise TypeError("runtime_env['env_vars'] must be of type" - "Dict[str, str]") - - if "_ray_release" in runtime_env_json: - self._dict["_ray_release"] = runtime_env_json["_ray_release"] - - if "_ray_commit" in runtime_env_json: - self._dict["_ray_commit"] = runtime_env_json["_ray_commit"] - else: - if self._dict.get("pip") or self._dict.get("conda"): - self._dict["_ray_commit"] = ray.__commit__ - - # Used for testing wheels that have not yet been merged into master. - # If this is set to True, then we do not inject Ray into the conda - # or pip dependencies. - if os.environ.get("RAY_RUNTIME_ENV_LOCAL_DEV_MODE"): - runtime_env_json["_inject_current_ray"] = True - if "_inject_current_ray" in runtime_env_json: - self._dict["_inject_current_ray"] = runtime_env_json[ - "_inject_current_ray"] - - # TODO(ekl) we should have better schema validation here. - # TODO(ekl) support py_modules - # TODO(architkulkarni) support docker - - # TODO(architkulkarni) This is to make it easy for the worker caching - # code in C++ to check if the env is empty without deserializing and - # parsing it. We should use a less confusing approach here. - if all(val is None for val in self._dict.values()): - self._dict = {} - - def get_parsed_dict(self) -> dict: - return self._dict - - def serialize(self) -> str: - # Use sort_keys=True because we will use the output as a key to cache - # workers by, so we need the serialization to be independent of the - # dict order. - return json.dumps(self._dict, sort_keys=True) - - def set_uris(self, uris): - self._dict["uris"] = uris - - class Protocol(Enum): """A enum for supported backend storage.""" @@ -250,7 +61,7 @@ def _dir_travel( try: handler(path) except Exception as e: - logger.error(f"Issue with path: {path}") + _logger.error(f"Issue with path: {path}") raise e if path.is_dir(): for sub_path in path.iterdir(): @@ -269,7 +80,7 @@ def _zip_module(root: Path, relative_path: Path, excludes: Optional[Callable], None) is None or path.is_file(): file_size = path.stat().st_size if file_size >= FILE_SIZE_WARNING: - logger.warning( + _logger.warning( f"File {path} is very large ({file_size} bytes). " "Consider excluding this file from the working directory.") to_path = path.relative_to(relative_path) @@ -447,7 +258,7 @@ def fetch_package(pkg_uri: str) -> int: if local_dir.exists(): assert local_dir.is_dir(), f"{local_dir} is not a directory" return local_dir - logger.debug("Fetch packge") + _logger.debug("Fetch packge") (protocol, pkg_name) = _parse_uri(pkg_uri) if protocol in (Protocol.GCS, Protocol.PIN_GCS): code = _internal_kv_get(pkg_uri) @@ -458,7 +269,7 @@ def fetch_package(pkg_uri: str) -> int: else: raise NotImplementedError(f"Protocol {protocol} is not supported") - logger.debug(f"Unpack {pkg_file} to {local_dir}") + _logger.debug(f"Unpack {pkg_file} to {local_dir}") with ZipFile(str(pkg_file), "r") as zip_ref: zip_ref.extractall(local_dir) pkg_file.unlink() @@ -561,14 +372,14 @@ def upload_runtime_env_package_if_needed(job_config: JobConfig): working_dir = job_config.runtime_env.get("working_dir") py_modules = job_config.runtime_env.get("py_modules") excludes = job_config.runtime_env.get("excludes") or [] - logger.info(f"{pkg_uri} doesn't exist. Create new package with" - f" {working_dir} and {py_modules}") + _logger.info(f"{pkg_uri} doesn't exist. Create new package with" + f" {working_dir} and {py_modules}") if not pkg_file.exists(): create_project_package(working_dir, py_modules, excludes, file_path) # Push the data to remote storage pkg_size = push_package(pkg_uri, pkg_file) - logger.info(f"{pkg_uri} has been pushed with {pkg_size} bytes") + _logger.info(f"{pkg_uri} has been pushed with {pkg_size} bytes") def ensure_runtime_env_setup(pkg_uris: List[str]) -> Optional[str]: @@ -593,33 +404,28 @@ def ensure_runtime_env_setup(pkg_uris: List[str]) -> Optional[str]: with FileLock(str(pkg_file) + ".lock"): pkg_dir = fetch_package(pkg_uri) sys.path.insert(0, str(pkg_dir)) + # Right now, multiple pkg_uris are not supported correctly. # We return the last one as working directory return str(pkg_dir) if pkg_dir else None -def override_task_or_actor_runtime_env( - runtime_env: Optional[Dict[str, Any]], - parent_runtime_env: Dict[str, Any]) -> Dict[str, Any]: - if runtime_env: - if runtime_env.get("working_dir"): - raise NotImplementedError( - "Overriding working_dir for actors is not supported. " - "Please use ray.init(runtime_env={'working_dir': ...}) " - "to configure per-job environment instead.") - # NOTE(edoakes): this is sort of hacky, but we manually add the right - # working_dir here so the relative path to a requirements.txt file - # works. The right solution would be to merge the runtime_env with the - # parent runtime env before validation. - if parent_runtime_env.get("working_dir"): - runtime_env["working_dir"] = parent_runtime_env["working_dir"] - runtime_env_dict = RuntimeEnvDict(runtime_env).get_parsed_dict() - else: - runtime_env_dict = {} +def setup_working_dir(runtime_env: dict, + context: RuntimeEnvContext, + logger: Optional[logging.Logger] = None): + if not runtime_env.get("uris"): + return - # If per-actor URIs aren't specified, override them with those in the - # job config. - if "uris" not in runtime_env_dict and "uris" in parent_runtime_env: - runtime_env_dict["uris"] = parent_runtime_env.get("uris") + # Overwrite the module-wide logger and PKG_DIR temporarily. + # TODO(edoakes): we should be able to remove this by refactoring the + # working_dir setup code into a class instead of using global vars. + global _logger, PKG_DIR + prev_logger = _logger + prev_pkg_dir = PKG_DIR + _logger = logger + PKG_DIR = context.session_dir - return runtime_env_dict + context.working_dir = ensure_runtime_env_setup(runtime_env["uris"]) + + PKG_DIR = prev_pkg_dir + _logger = prev_logger diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 497bcf17c..2d5e0ae97 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1335,7 +1335,6 @@ def start_raylet(redis_address, worker_path, setup_worker_path, worker_setup_hook, - runtime_env_setup_hook, temp_dir, session_dir, resource_dir, @@ -1379,8 +1378,6 @@ def start_raylet(redis_address, worker_setup_hook to set up the environment for the worker process. worker_setup_hook (str): The module path to a Python function that will be imported and run to set up the environment for the worker. - runtime_env_setup_hook (str): The module path to a Python function that - will be imported and run to set up the runtime env in agent. temp_dir (str): The path of the temporary directory Ray will use. session_dir (str): The path of this session. resource_dir(str): The path of resource of this session . @@ -1535,7 +1532,6 @@ def start_raylet(redis_address, f"--temp-dir={temp_dir}", f"--session-dir={session_dir}", f"--runtime-env-dir={resource_dir}", - f"--runtime-env-setup-hook={runtime_env_setup_hook}", f"--log-dir={log_dir}", f"--logging-rotate-bytes={max_bytes}", f"--logging-rotate-backup-count={backup_count}", diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index a958864ff..f8ae53d43 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -12,6 +12,7 @@ import timeit import socket import math import traceback +import logging from typing import Optional, Any, List, Dict from contextlib import redirect_stdout, redirect_stderr import yaml @@ -22,8 +23,7 @@ import ray._private.utils import ray._private.gcs_utils as gcs_utils from ray.util.queue import Queue, _QueueActor, Empty from ray.scripts.scripts import main as ray_main -from ray.workers.pluggable_runtime_env import (RuntimeEnvContext, - get_hook_logger) +from ray._private.runtime_env import RuntimeEnvContext try: from prometheus_client.parser import text_string_to_metric_families except (ImportError, ModuleNotFoundError): @@ -625,13 +625,15 @@ def set_setup_func(): runtime_env.VAR = "hello world" -def sleep_setup_runtime_env(runtime_env: dict, session_dir): - logger = get_hook_logger() +def sleep_setup_runtime_env(runtime_env: dict, + context: RuntimeEnvContext, + logger=None): + logger = logging.getLogger(__name__) logger.info(f"Setting up runtime environment {runtime_env}") logger.info("Simulating long runtime env setup. Sleeping for 15s...") time.sleep(15) logger.info("Finished sleeping for 15s") - return RuntimeEnvContext() + return class BatchQueue(Queue): diff --git a/python/ray/experimental/packaging/load_package.py b/python/ray/experimental/packaging/load_package.py index f1ffda74f..36d9e984f 100644 --- a/python/ray/experimental/packaging/load_package.py +++ b/python/ray/experimental/packaging/load_package.py @@ -19,7 +19,7 @@ import tempfile import yaml import ray -import ray._private.runtime_env as runtime_support +from ray._private.runtime_env import working_dir as working_dir_pkg def load_package(config_path: str) -> "_RuntimePackage": @@ -68,22 +68,22 @@ def load_package(config_path: str) -> "_RuntimePackage": # Autofill working directory by uploading to GCS storage. if "working_dir" not in runtime_env: - pkg_name = runtime_support.get_project_package_name( + pkg_name = working_dir_pkg.get_project_package_name( working_dir=base_dir, py_modules=[], excludes=[]) - pkg_uri = runtime_support.Protocol.GCS.value + "://" + pkg_name + pkg_uri = working_dir_pkg.Protocol.GCS.value + "://" + pkg_name def do_register_package(): - if not runtime_support.package_exists(pkg_uri): + if not working_dir_pkg.package_exists(pkg_uri): tmp_path = os.path.join(_pkg_tmp(), "_tmp{}".format(pkg_name)) - runtime_support.create_project_package( + working_dir_pkg.create_project_package( working_dir=base_dir, py_modules=[], excludes=[], output_path=tmp_path) # TODO(ekl) does this get garbage collected correctly with the # current job id? - runtime_support.push_package(pkg_uri, tmp_path) - if not runtime_support.package_exists(pkg_uri): + working_dir_pkg.push_package(pkg_uri, tmp_path) + if not working_dir_pkg.package_exists(pkg_uri): raise RuntimeError( "Failed to upload package {}".format(pkg_uri)) diff --git a/python/ray/node.py b/python/ray/node.py index be1da7a01..e061e62b9 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -22,6 +22,7 @@ import ray.ray_constants as ray_constants import ray._private.services import ray._private.utils from ray._private.resource_spec import ResourceSpec +from ray._private.runtime_env import working_dir as working_dir_pkg from ray._private.utils import (try_to_create_directory, try_to_symlink, open_log) @@ -326,8 +327,7 @@ class Node: self._resource_dir = os.path.join(self._session_dir, "runtime_resources") try_to_create_directory(self._resource_dir) - import ray._private.runtime_env as runtime_env - runtime_env.PKG_DIR = self._resource_dir + working_dir_pkg.PKG_DIR = self._resource_dir def get_resource_spec(self): """Resolve and return the current resource spec for the node.""" @@ -810,7 +810,6 @@ class Node: self._ray_params.worker_path, self._ray_params.setup_worker_path, self._ray_params.worker_setup_hook, - self._ray_params.runtime_env_setup_hook, self._temp_dir, self._session_dir, self._resource_dir, diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 7a1229a83..31a882019 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -247,10 +247,6 @@ REDIS_DEFAULT_PASSWORD = "5241590000000000" # The default module path to a Python function that sets up the worker env. DEFAULT_WORKER_SETUP_HOOK = "ray.workers.setup_runtime_env.setup_worker" -# The default module path to a Python function that sets up runtime envs. -DEFAULT_RUNTIME_ENV_SETUP_HOOK = \ - "ray.workers.setup_runtime_env.setup_runtime_env" - # The default ip address to bind to. NODE_DEFAULT_IP = "127.0.0.1" diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 50086583c..66f9cd1b8 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -475,13 +475,6 @@ def debug(address): type=str, help="Module path to the Python function that will be used to set up the " "environment for the worker process.") -@click.option( - "--runtime-env-setup-hook", - hidden=True, - default=ray_constants.DEFAULT_RUNTIME_ENV_SETUP_HOOK, - type=str, - help="Module path to the Python function that will be used to set up the " - "runtime env in agent.") @click.option( "--ray-debugger-external", is_flag=True, @@ -489,19 +482,18 @@ def debug(address): help="Make the Ray debugger available externally to the node. This is only" "safe to activate if the node is behind a firewall.") @add_click_options(logging_options) -def start(node_ip_address, address, port, redis_password, redis_shard_ports, - object_manager_port, node_manager_port, gcs_server_port, - min_worker_port, max_worker_port, worker_port_list, - ray_client_server_port, memory, object_store_memory, - redis_max_memory, num_cpus, num_gpus, resources, head, - include_dashboard, dashboard_host, dashboard_port, - dashboard_agent_listen_port, block, plasma_directory, - autoscaling_config, no_redirect_worker_output, no_redirect_output, - plasma_store_socket_name, raylet_socket_name, temp_dir, - system_config, lru_evict, enable_object_reconstruction, - metrics_export_port, no_monitor, tracing_startup_hook, - worker_setup_hook, runtime_env_setup_hook, ray_debugger_external, - log_style, log_color, verbose): +def start( + node_ip_address, address, port, redis_password, redis_shard_ports, + object_manager_port, node_manager_port, gcs_server_port, + min_worker_port, max_worker_port, worker_port_list, + ray_client_server_port, memory, object_store_memory, redis_max_memory, + num_cpus, num_gpus, resources, head, include_dashboard, dashboard_host, + dashboard_port, dashboard_agent_listen_port, block, plasma_directory, + autoscaling_config, no_redirect_worker_output, no_redirect_output, + plasma_store_socket_name, raylet_socket_name, temp_dir, system_config, + lru_evict, enable_object_reconstruction, metrics_export_port, + no_monitor, tracing_startup_hook, worker_setup_hook, + ray_debugger_external, log_style, log_color, verbose): """Start Ray processes manually on the local machine.""" cli_logger.configure(log_style, log_color, verbose) if gcs_server_port and not head: @@ -562,7 +554,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, no_monitor=no_monitor, tracing_startup_hook=tracing_startup_hook, worker_setup_hook=worker_setup_hook, - runtime_env_setup_hook=runtime_env_setup_hook, ray_debugger_external=ray_debugger_external) if head: # Use default if port is none, allocate an available port if port is 0 diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 555b04bfc..5117e5444 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -9,20 +9,23 @@ from pathlib import Path import ray from ray.exceptions import RuntimeEnvSetupError +import ray.experimental.internal_kv as kv from ray._private.test_utils import ( run_string_as_driver, run_string_as_driver_nonblocking, wait_for_condition) +from ray._private.runtime_env import working_dir as working_dir_pkg from ray._private.utils import (get_wheel_filename, get_master_wheel_url, get_release_wheel_url) -import ray.experimental.internal_kv as kv -from time import sleep + driver_script = """ -from time import sleep -import sys import logging +import os +import sys +import time + sys.path.insert(0, "{working_dir}") + import ray import ray.util -import os try: import test_module @@ -84,7 +87,7 @@ if os.environ.get("USE_RAY_CLIENT"): ray.util.disconnect() else: ray.shutdown() -sleep(10) +time.sleep(10) """ @@ -119,7 +122,7 @@ from test_module.test import one def start_client_server(cluster, client_mode): - from ray._private.runtime_env import PKG_DIR + from ray._private.runtime_env.working_dir import PKG_DIR if not client_mode: return (cluster.address, {}, PKG_DIR) ray.worker._global_node._ray_params.ray_client_server_port = "10003" @@ -174,7 +177,7 @@ def test_travel(): item_num += 1 construct(root) - exclude_spec = ray._private.runtime_env._get_excludes(root, excludes) + exclude_spec = working_dir_pkg._get_excludes(root, excludes) visited_dir_paths = set() visited_file_paths = set() @@ -185,7 +188,7 @@ def test_travel(): with open(path) as f: visited_file_paths.add((str(path), f.read())) - ray._private.runtime_env._dir_travel(root, [exclude_spec], handler) + working_dir_pkg._dir_travel(root, [exclude_spec], handler) assert file_paths == visited_file_paths assert dir_paths == visited_dir_paths @@ -264,6 +267,7 @@ def test_single_node(ray_start_cluster_head, working_dir, client_mode): execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))" script = driver_script.format(**locals()) out = run_string_as_driver(script, env) + print(out) assert out.strip().split()[-1] == "1000" assert len(list(Path(PKG_DIR).iterdir())) == 1 assert len(kv._internal_kv_list("gcs://")) == 0 @@ -461,13 +465,13 @@ print(ray.get_runtime_context().runtime_env["working_dir"]) def test_two_node_uri(two_node_cluster, working_dir, client_mode): cluster, _ = two_node_cluster (address, env, PKG_DIR) = start_client_server(cluster, client_mode) - import ray._private.runtime_env as runtime_env - import tempfile with tempfile.NamedTemporaryFile(suffix="zip") as tmp_file: - pkg_name = runtime_env.get_project_package_name(working_dir, [], []) - pkg_uri = runtime_env.Protocol.PIN_GCS.value + "://" + pkg_name - runtime_env.create_project_package(working_dir, [], [], tmp_file.name) - runtime_env.push_package(pkg_uri, tmp_file.name) + pkg_name = working_dir_pkg.get_project_package_name( + working_dir, [], []) + pkg_uri = working_dir_pkg.Protocol.PIN_GCS.value + "://" + pkg_name + working_dir_pkg.create_project_package(working_dir, [], [], + tmp_file.name) + working_dir_pkg.push_package(pkg_uri, tmp_file.name) runtime_env = f"""{{ "uris": ["{pkg_uri}"] }}""" # Execute the following cmd in driver with runtime_env execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))" @@ -521,8 +525,7 @@ print(sum(ray.get([test_actor.one.remote()] * 1000))) test_actor = ray.get_actor("test_actor") assert sum(ray.get([test_actor.one.remote()] * 1000)) == 1000 ray.kill(test_actor) - from time import sleep - sleep(5) + time.sleep(5) assert len(list(Path(PKG_DIR).iterdir())) == 1 assert len(kv._internal_kv_list("gcs://")) == 0 @@ -536,13 +539,13 @@ def test_jobconfig_compatible_1(ray_start_cluster_head, working_dir): runtime_env = None # To make the first one hanging there execute_statement = """ -sleep(600) +time.sleep(600) """ script = driver_script.format(**locals()) # Have one running with job config = None proc = run_string_as_driver_nonblocking(script, env) # waiting it to be up - sleep(5) + time.sleep(5) runtime_env = f"""{{ "working_dir": "{working_dir}" }}""" # Execute the second one which should work because Ray Client servers. execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))" @@ -562,11 +565,11 @@ def test_jobconfig_compatible_2(ray_start_cluster_head, working_dir): runtime_env = """{ "py_modules": [test_module.__path__[0]] }""" # To make the first one hanging there execute_statement = """ -sleep(600) +time.sleep(600) """ script = driver_script.format(**locals()) proc = run_string_as_driver_nonblocking(script, env) - sleep(5) + time.sleep(5) runtime_env = None # Execute the following in the second one which should # succeed @@ -587,11 +590,11 @@ def test_jobconfig_compatible_3(ray_start_cluster_head, working_dir): runtime_env = """{ "py_modules": [test_module.__path__[0]] }""" # To make the first one hanging ther execute_statement = """ -sleep(600) +time.sleep(600) """ script = driver_script.format(**locals()) proc = run_string_as_driver_nonblocking(script, env) - sleep(5) + time.sleep(5) runtime_env = f""" {{ "working_dir": test_module.__path__[0] }}""" # noqa: F541 # Execute the following cmd in the second one and ensure that @@ -830,29 +833,12 @@ def test_invalid_conda_env(shutdown_only): @pytest.mark.skipif( sys.platform == "win32", reason="runtime_env unsupported on Windows.") -@pytest.mark.parametrize( - "ray_start_cluster", [{ - "_system_config": { - "event_stats_print_interval_ms": 100, - "debug_dump_period_milliseconds": 100, - "event_stats": True - } - }], - indirect=True) -def test_no_spurious_worker_startup(ray_start_cluster): +def test_no_spurious_worker_startup(shutdown_only): """Test that no extra workers start up during a long env installation.""" - cluster = ray_start_cluster - - # This hook sleeps for 15 seconds to simulate creating a runtime env. - cluster.add_node( - num_cpus=1, - runtime_env_setup_hook=( - "ray._private.test_utils.sleep_setup_runtime_env")) - - # Set a nonempty runtime env so that the runtime env setup hook is called. - runtime_env = {"env_vars": {"a": "b"}} - ray.init(address=cluster.address) + # Causes agent to sleep for 15 seconds to simulate creating a runtime env. + os.environ["RAY_RUNTIME_ENV_SLEEP_FOR_TESTING_S"] = "15" + ray.init(num_cpus=1) @ray.remote class Counter(object): @@ -862,6 +848,9 @@ def test_no_spurious_worker_startup(ray_start_cluster): def get(self): return self.value + # Set a nonempty runtime env so that the runtime env setup hook is called. + runtime_env = {"env_vars": {"a": "b"}} + # Instantiate an actor that requires the long runtime env installation. a = Counter.options(runtime_env=runtime_env).remote() assert ray.get(a.get.remote()) == 0 @@ -904,7 +893,7 @@ def test_large_file_boundary(shutdown_only): os.chdir(tmp_dir) # Check that packages just under the max size work as expected. - size = ray._private.runtime_env.GCS_STORAGE_MAX_SIZE - 1024 * 1024 + size = working_dir_pkg.GCS_STORAGE_MAX_SIZE - 1024 * 1024 with open("test_file", "wb") as f: f.write(os.urandom(size)) @@ -929,7 +918,7 @@ def test_large_file_error(shutdown_only): # Write to two separate files, each of which is below the threshold to # make sure the error is for the full package size. - size = ray._private.runtime_env.GCS_STORAGE_MAX_SIZE // 2 + 1 + size = working_dir_pkg.GCS_STORAGE_MAX_SIZE // 2 + 1 with open("test_file_1", "wb") as f: f.write(os.urandom(size)) diff --git a/python/ray/tests/test_runtime_env_complicated.py b/python/ray/tests/test_runtime_env_complicated.py index c19877477..8782aae1a 100644 --- a/python/ray/tests/test_runtime_env_complicated.py +++ b/python/ray/tests/test_runtime_env_complicated.py @@ -1,29 +1,27 @@ import json import os from contextlib import contextmanager -from typing import List -from ray.workers.setup_runtime_env import inject_dependencies -import pytest -import sys -import unittest -import tempfile -import yaml -import time - -import subprocess - from pathlib import Path -from unittest import mock +import pytest +import subprocess +import sys +import tempfile +import time +from typing import List +from unittest import mock, skipIf +import yaml + import ray -from ray._private.utils import get_conda_env_dir, get_conda_bin_executable from ray._private.runtime_env import RuntimeEnvDict -from ray.workers.setup_runtime_env import ( +from ray._private.runtime_env.conda import ( + inject_dependencies, _inject_ray_to_conda_site, _resolve_install_from_source_ray_dependencies, _current_py_version, ) from ray._private.test_utils import (run_string_as_driver, run_string_as_driver_nonblocking) +from ray._private.utils import get_conda_env_dir, get_conda_bin_executable if not os.environ.get("CI"): # This flags turns on the local development that link against current ray @@ -193,7 +191,6 @@ def test_job_config_conda_env(conda_envs, shutdown_only): def test_get_conda_env_dir(tmp_path): - from pathlib import Path """ Typical output of `conda env list`, for context: @@ -474,7 +471,7 @@ def test_conda_input_filepath(use_working_dir, tmp_path): assert output_conda_dict == conda_dict -@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") +@skipIf(sys.platform == "win32", "Fail to create temp dir.") def test_experimental_package(shutdown_only): ray.init(num_cpus=2) pkg = ray.experimental.load_package( @@ -486,7 +483,7 @@ def test_experimental_package(shutdown_only): assert ray.get(pkg.my_func.remote()) == "hello world" -@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") +@skipIf(sys.platform == "win32", "Fail to create temp dir.") def test_experimental_package_lazy(shutdown_only): pkg = ray.experimental.load_package( os.path.join( @@ -498,7 +495,7 @@ def test_experimental_package_lazy(shutdown_only): assert ray.get(pkg.my_func.remote()) == "hello world" -@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") +@skipIf(sys.platform == "win32", "Fail to create temp dir.") def test_experimental_package_github(shutdown_only): ray.init(num_cpus=2) pkg = ray.experimental.load_package( diff --git a/python/ray/util/client/server/proxier.py b/python/ray/util/client/server/proxier.py index e242fe3e5..f91438d23 100644 --- a/python/ray/util/client/server/proxier.py +++ b/python/ray/util/client/server/proxier.py @@ -23,7 +23,7 @@ from ray.util.client.common import (ClientServerHandle, CLIENT_SERVER_MAX_THREADS, GRPC_OPTIONS) from ray._private.client_mode_hook import disable_client_hook from ray._private.parameter import RayParams -import ray._private.runtime_env as runtime_pkg +import ray._private.runtime_env.working_dir as working_dir_pkg from ray._private.services import ProcessInfo, start_ray_client_server from ray._private.utils import detect_fate_sharing_support @@ -221,7 +221,7 @@ class ProxyManager(): uris = job_config.get_runtime_env_uris() if job_config else [] if uris: # Download and set up the working_dir locally. - working_dir = runtime_pkg.ensure_runtime_env_setup(uris) + working_dir = working_dir_pkg.ensure_runtime_env_setup(uris) # Set PYTHONPATH in the environment variables so the working_dir # is included in the module search path. diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 67fa098db..9ee90ac8c 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -515,14 +515,15 @@ class Worker: else: # Generate and upload URIs for the working directory. This # uses internal_kv to upload to the GCS. - import ray._private.runtime_env as runtime_env + import ray._private.runtime_env.working_dir as working_dir_pkg with tempfile.TemporaryDirectory() as tmp_dir: - (old_dir, runtime_env.PKG_DIR) = (runtime_env.PKG_DIR, - tmp_dir) - runtime_env.rewrite_runtime_env_uris(job_config) - runtime_env.upload_runtime_env_package_if_needed( + (old_dir, + working_dir_pkg.PKG_DIR) = (working_dir_pkg.PKG_DIR, + tmp_dir) + working_dir_pkg.rewrite_runtime_env_uris(job_config) + working_dir_pkg.upload_runtime_env_package_if_needed( job_config) - runtime_env.PKG_DIR = old_dir + working_dir_pkg.PKG_DIR = old_dir serialized_job_config = pickle.dumps(job_config) diff --git a/python/ray/worker.py b/python/ray/worker.py index 9d48087ca..ae820fbb1 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -27,7 +27,7 @@ import ray.remote_function import ray.serialization as serialization import ray._private.gcs_utils as gcs_utils import ray._private.services as services -import ray._private.runtime_env as runtime_env_pkg +from ray._private.runtime_env import working_dir as working_dir_pkg import ray._private.import_thread as import_thread from ray.util.tracing.tracing_helper import import_from_string from ray.util.annotations import PublicAPI, DeveloperAPI, Deprecated @@ -938,7 +938,7 @@ def init( if driver_mode == SCRIPT_MODE and job_config: # Rewrite the URI. Note the package isn't uploaded to the URI until # later in the connect - runtime_env_pkg.rewrite_runtime_env_uris(job_config) + working_dir_pkg.rewrite_runtime_env_uris(job_config) connect( _global_node, @@ -1397,7 +1397,7 @@ def connect(node, # environment here. If it's ray client, the environmen will be prepared # at the server side. if mode == SCRIPT_MODE and not job_config.client_job: - runtime_env_pkg.upload_runtime_env_package_if_needed(job_config) + working_dir_pkg.upload_runtime_env_package_if_needed(job_config) # Notify raylet that the core worker is ready. worker.core_worker.notify_raylet() diff --git a/python/ray/workers/pluggable_runtime_env.py b/python/ray/workers/pluggable_runtime_env.py deleted file mode 100644 index c2c18b0fc..000000000 --- a/python/ray/workers/pluggable_runtime_env.py +++ /dev/null @@ -1,46 +0,0 @@ -import contextlib -import json -import logging -import threading - -logger = logging.getLogger(__name__) -thread_local_logger = threading.local() -thread_local_logger.logger = None - - -def get_hook_logger(): - """Retrieve a logger to be used by the setup hook function. Logs from this - logger will be streamed to driver. - """ - thread_logger = thread_local_logger.logger - if thread_logger is None: - logger.warning( - "Tried to receive the per job logger in runtime env agent but it " - "hasn't been properly setup, default to dashboard_agent logger.") - thread_logger = logger - return thread_logger - - -@contextlib.contextmanager -def using_thread_local_logger(new_logger): - """Configure the logger to be used by the setup hook function. This sets - a logger to be retrieved by get_hook_logger function. - """ - thread_local_logger.logger = new_logger - yield - thread_local_logger.logger = None - - -class RuntimeEnvContext: - """A context used to describe the created runtime env.""" - - def __init__(self, conda_env_name: str = None, working_dir: str = None): - self.conda_env_name = conda_env_name - self.working_dir = working_dir - - def serialize(self) -> str: - return json.dumps(self.__dict__) - - @staticmethod - def deserialize(json_string): - return RuntimeEnvContext(**json.loads(json_string)) diff --git a/python/ray/workers/setup_runtime_env.py b/python/ray/workers/setup_runtime_env.py index ee6b3bb0a..34e0d531f 100644 --- a/python/ray/workers/setup_runtime_env.py +++ b/python/ray/workers/setup_runtime_env.py @@ -3,24 +3,10 @@ import sys import argparse import json import logging -import yaml -import hashlib -import subprocess -import runpy -import shutil -from filelock import FileLock -from typing import Optional, List, Dict, Any -from pathlib import Path - -import ray -from ray._private.conda import (get_conda_activate_commands, - get_or_create_conda_env) -from ray._private.utils import try_to_create_directory -from ray._private.utils import (get_wheel_filename, get_master_wheel_url, - get_release_wheel_url) -from ray.workers.pluggable_runtime_env import (RuntimeEnvContext, - get_hook_logger) +from ray._private.runtime_env import RuntimeEnvContext +from ray._private.runtime_env.conda import setup_conda_or_pip +from ray._private.runtime_env.conda_utils import get_conda_activate_commands logger = logging.getLogger(__name__) parser = argparse.ArgumentParser() @@ -40,99 +26,6 @@ parser.add_argument( "--session-dir", type=str, help="the directory for the current session") -def _resolve_current_ray_path(): - # When ray is built from source with pip install -e, - # ray.__file__ returns .../python/ray/__init__.py. - # When ray is installed from a prebuilt binary, it returns - # .../site-packages/ray/__init__.py - return os.path.split(os.path.split(ray.__file__)[0])[0] - - -def _resolve_install_from_source_ray_dependencies(): - """Find the ray dependencies when Ray is install from source""" - ray_source_python_path = _resolve_current_ray_path() - setup_py_path = os.path.join(ray_source_python_path, "setup.py") - ray_install_requires = runpy.run_path(setup_py_path)[ - "setup_spec"].install_requires - return ray_install_requires - - -def _inject_ray_to_conda_site(conda_path): - """Write the current Ray site package directory to a new site""" - python_binary = os.path.join(conda_path, "bin/python") - site_packages_path = subprocess.check_output( - [python_binary, "-c", - "import site; print(site.getsitepackages()[0])"]).decode().strip() - - ray_path = _resolve_current_ray_path() - logger = get_hook_logger() - logger.warning(f"Injecting {ray_path} to environment {conda_path} " - "because _inject_current_ray flag is on.") - - maybe_ray_dir = os.path.join(site_packages_path, "ray") - if os.path.isdir(maybe_ray_dir): - logger.warning(f"Replacing existing ray installation with {ray_path}") - shutil.rmtree(maybe_ray_dir) - - # See usage of *.pth file at - # https://docs.python.org/3/library/site.html - with open(os.path.join(site_packages_path, "ray.pth"), "w") as f: - f.write(ray_path) - - -def _current_py_version(): - return ".".join(map(str, sys.version_info[:3])) # like 3.6.10 - - -def setup_runtime_env(runtime_env: dict, session_dir): - logger = get_hook_logger() - logger.debug(f"Setting up runtime environment {runtime_env}") - if runtime_env.get("conda") or runtime_env.get("pip"): - conda_dict = get_conda_dict(runtime_env, session_dir) - if isinstance(runtime_env.get("conda"), str): - conda_env_name = runtime_env["conda"] - else: - assert conda_dict is not None - ray_pip = current_ray_pip_specifier() - if ray_pip: - extra_pip_dependencies = [ray_pip, "ray[default]"] - elif runtime_env.get("_inject_current_ray"): - extra_pip_dependencies = ( - _resolve_install_from_source_ray_dependencies()) - else: - extra_pip_dependencies = [] - conda_dict = inject_dependencies(conda_dict, _current_py_version(), - extra_pip_dependencies) - logger.info(f"Setting up conda environment with {runtime_env}") - # It is not safe for multiple processes to install conda envs - # concurrently, even if the envs are different, so use a global - # lock for all conda installs. - # See https://github.com/ray-project/ray/issues/17086 - file_lock_name = "ray-conda-install.lock" - with FileLock(os.path.join(session_dir, file_lock_name)): - conda_dir = os.path.join(session_dir, "runtime_resources", - "conda") - try_to_create_directory(conda_dir) - conda_yaml_path = os.path.join(conda_dir, "environment.yml") - with open(conda_yaml_path, "w") as file: - # Sort keys because we hash based on the file contents, - # and we don't want the hash to depend on the order - # of the dependencies. - yaml.dump(conda_dict, file, sort_keys=True) - conda_env_name = get_or_create_conda_env( - conda_yaml_path, conda_dir) - - if runtime_env.get("_inject_current_ray"): - conda_path = os.path.join(conda_dir, conda_env_name) - _inject_ray_to_conda_site(conda_path) - logger.info( - f"Finished setting up runtime environment at {conda_env_name}") - - return RuntimeEnvContext(conda_env_name) - - return RuntimeEnvContext() - - def setup_worker(input_args): # remaining_args contains the arguments to the original worker command, # minus the python executable, e.g. default_worker.py --node-ip-address=... @@ -149,8 +42,8 @@ def setup_worker(input_args): # Ray client server setups runtime env by itself instead of agent. if runtime_env.get("conda") or runtime_env.get("pip"): if not args.serialized_runtime_env_context: - runtime_env_context = setup_runtime_env(runtime_env, - args.session_dir) + runtime_env_context = RuntimeEnvContext(args.session_dir) + setup_conda_or_pip(runtime_env, runtime_env_context, logger=logger) if runtime_env_context and runtime_env_context.working_dir is not None: commands += [f"cd {runtime_env_context.working_dir}"] @@ -196,129 +89,5 @@ def setup_worker(input_args): os.execvp("bash", ["bash", "-c", command_str]) -def get_conda_dict(runtime_env, runtime_env_dir) -> Optional[Dict[Any, Any]]: - """ Construct a conda dependencies dict from a runtime env. - - This function does not inject Ray or Python into the conda dict. - If the runtime env does not specify pip or conda, or if it specifies - the name of a preinstalled conda environment, this function returns - None. If pip is specified, a conda dict is created containing the - pip dependencies. If conda is already given as a dict, this function - is the identity function. - """ - if runtime_env.get("conda"): - if isinstance(runtime_env["conda"], dict): - return runtime_env["conda"] - else: - return None - if runtime_env.get("pip"): - requirements_txt = runtime_env["pip"] - pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest() - pip_hash_str = f"pip-generated-{pip_hash}" - - conda_dir = os.path.join(runtime_env_dir, "conda") - requirements_txt_path = os.path.join( - conda_dir, f"requirements-{pip_hash_str}.txt") - conda_dict = { - "name": pip_hash_str, - "dependencies": ["pip", { - "pip": [f"-r {requirements_txt_path}"] - }] - } - file_lock_name = f"ray-{pip_hash_str}.lock" - with FileLock(os.path.join(runtime_env_dir, file_lock_name)): - try_to_create_directory(conda_dir) - with open(requirements_txt_path, "w") as file: - file.write(requirements_txt) - return conda_dict - return None - - -def current_ray_pip_specifier() -> Optional[str]: - """The pip requirement specifier for the running version of Ray. - - Returns: - A string which can be passed to `pip install` to install the - currently running Ray version, or None if running on a version - built from source locally (likely if you are developing Ray). - - Examples: - Returns "https://s3-us-west-2.amazonaws.com/ray-wheels/[..].whl" - if running a stable release, a nightly or a specific commit - """ - logger = get_hook_logger() - if os.environ.get("RAY_CI_POST_WHEEL_TESTS"): - # Running in Buildkite CI after the wheel has been built. - # Wheels are at in the ray/.whl directory, and the present file is - # at ray/python/ray/workers. Use relative paths to allow for - # testing locally if needed. - return os.path.join( - Path(__file__).resolve().parents[3], ".whl", get_wheel_filename()) - elif ray.__commit__ == "{{RAY_COMMIT_SHA}}": - # Running on a version built from source locally. - if os.environ.get("RAY_RUNTIME_ENV_LOCAL_DEV_MODE") != "1": - logger.warning( - "Current Ray version could not be detected, most likely " - "because you have manually built Ray from source. To use " - "runtime_env in this case, set the environment variable " - "RAY_RUNTIME_ENV_LOCAL_DEV_MODE=1.") - return None - elif "dev" in ray.__version__: - # Running on a nightly wheel. - return get_master_wheel_url() - else: - return get_release_wheel_url() - - -def inject_dependencies( - conda_dict: Dict[Any, Any], - py_version: str, - pip_dependencies: Optional[List[str]] = None) -> Dict[Any, Any]: - """Add Ray, Python and (optionally) extra pip dependencies to a conda dict. - - Args: - conda_dict (dict): A dict representing the JSON-serialized conda - environment YAML file. This dict will be modified and returned. - py_version (str): A string representing a Python version to inject - into the conda dependencies, e.g. "3.7.7" - pip_dependencies (List[str]): A list of pip dependencies that - will be prepended to the list of pip dependencies in - the conda dict. If the conda dict does not already have a "pip" - field, one will be created. - Returns: - The modified dict. (Note: the input argument conda_dict is modified - and returned.) - """ - if pip_dependencies is None: - pip_dependencies = [] - if conda_dict.get("dependencies") is None: - conda_dict["dependencies"] = [] - - # Inject Python dependency. - deps = conda_dict["dependencies"] - - # Add current python dependency. If the user has already included a - # python version dependency, conda will raise a readable error if the two - # are incompatible, e.g: - # ResolvePackageNotFound: - python[version='3.5.*,>=3.6'] - deps.append(f"python={py_version}") - - if "pip" not in deps: - deps.append("pip") - - # Insert pip dependencies. - found_pip_dict = False - for dep in deps: - if isinstance(dep, dict) and dep.get("pip") and isinstance( - dep["pip"], list): - dep["pip"] = pip_dependencies + dep["pip"] - found_pip_dict = True - break - if not found_pip_dict: - deps.append({"pip": pip_dependencies}) - - return conda_dict - - if __name__ == "__main__": setup_worker(sys.argv[1:])