From 86feccf9c791ff47950c15c621e3bf8835d91b17 Mon Sep 17 00:00:00 2001 From: architkulkarni Date: Thu, 27 May 2021 15:58:47 -0700 Subject: [PATCH] [Core] Allow specifying runtime_env conda and pip via filepath (#16073) --- python/ray/_private/runtime_env.py | 72 +++++++-- .../ray/tests/test_runtime_env_complicated.py | 51 ++++-- python/ray/workers/setup_runtime_env.py | 153 ++++++++++-------- 3 files changed, 182 insertions(+), 94 deletions(-) diff --git a/python/ray/_private/runtime_env.py b/python/ray/_private/runtime_env.py index 6ec5ba0cb..865f6f449 100644 --- a/python/ray/_private/runtime_env.py +++ b/python/ray/_private/runtime_env.py @@ -1,6 +1,7 @@ import hashlib import logging import json +import yaml from filelock import FileLock from pathlib import Path @@ -9,6 +10,7 @@ from ray._private.thirdparty.pathspec import PathSpec from ray.job_config import JobConfig from enum import Enum +import ray from ray.experimental.internal_kv import (_internal_kv_put, _internal_kv_get, _internal_kv_exists, _internal_kv_initialized) @@ -41,12 +43,18 @@ class RuntimeEnvDict: Examples: ["/path/to/other_module", "/other_path/local_project.zip"] pip (List[str] | str): Either a list of pip packages, or a string - containing the contents of a pip requirements.txt file. - conda (dict | str): Either the conda YAML config or the name of a - local conda env (e.g., "pytorch_p36"). The Ray dependency will be - automatically injected into the conda env to ensure compatibility - with the cluster Ray. The conda name may be mangled automatically - to avoid conflicts between runtime envs. + containing the path to a pip requirements.txt file. If a relative + path is specified and working_dir is specified, the path is + interpreted relative to working_dir. + conda (dict | str): Either the conda YAML config, the name of a + local conda env (e.g., "pytorch_p36"), or the path to a conda + environment.yaml file. If a relative path is specified and + working_dir is specified, the path is interpreted relative to + working_dir. + The Ray dependency will be automatically injected into the conda + env to ensure compatibility with the cluster Ray. The conda name + may be mangled automatically to avoid conflicts between runtime + envs. This field cannot be specified at the same time as the 'pip' field. To use pip with conda, please specify your pip dependencies within the conda YAML config: @@ -70,7 +78,15 @@ class RuntimeEnvDict: # Simple dictionary with all options validated. This will always # contain all supported keys; values will be set to None if # unspecified. However, if all values are None this is set to {}. - self._dict = {} + self._dict = dict() + + if "working_dir" in runtime_env_json: + self._dict["working_dir"] = runtime_env_json["working_dir"] + working_dir = Path(self._dict["working_dir"]) + else: + self._dict["working_dir"] = None + working_dir = None + self._dict["conda"] = None if "conda" in runtime_env_json: if sys.platform == "win32": @@ -79,7 +95,22 @@ class RuntimeEnvDict: "Windows.") conda = runtime_env_json["conda"] if isinstance(conda, str): - self._dict["conda"] = conda + yaml_file = Path(conda) + if yaml_file.suffix in (".yaml", ".yml"): + if working_dir and not yaml_file.is_absolute(): + yaml_file = working_dir / yaml_file + if not yaml_file.is_file(): + raise ValueError( + f"Can't find conda YAML file {yaml_file}") + try: + self._dict["conda"] = yaml.load(yaml_file.read_text()) + except Exception as e: + raise ValueError( + f"Invalid conda file {yaml_file} with error {e}") + else: + logger.info( + f"Using preinstalled conda environment: {conda}") + self._dict["conda"] = conda elif isinstance(conda, dict): self._dict["conda"] = conda elif conda is not None: @@ -104,7 +135,13 @@ class RuntimeEnvDict: "#create-env-file-manually") pip = runtime_env_json["pip"] if isinstance(pip, str): - self._dict["pip"] = pip + # We have been given a path to a requirements.txt file. + pip_file = Path(pip) + if working_dir and not pip_file.is_absolute(): + pip_file = working_dir / pip_file + if not pip_file.is_file(): + raise ValueError(f"{pip_file} is not a valid file") + self._dict["pip"] = pip_file.read_text() elif isinstance(pip, list) and all( isinstance(dep, str) for dep in pip): # Construct valid pip requirements.txt from list of packages. @@ -113,17 +150,9 @@ class RuntimeEnvDict: raise TypeError("runtime_env['pip'] must be of type str or " "List[str]") - if "working_dir" in runtime_env_json: - self._dict["working_dir"] = runtime_env_json["working_dir"] - else: - self._dict["working_dir"] = None - if "uris" in runtime_env_json: self._dict["uris"] = runtime_env_json["uris"] - if "_ray_release" in runtime_env_json: - self._dict["_ray_release"] = runtime_env_json["_ray_release"] - self._dict["env_vars"] = None if "env_vars" in runtime_env_json: env_vars = runtime_env_json["env_vars"] @@ -141,6 +170,15 @@ class RuntimeEnvDict: self._dict["env_vars"].update( RAY_RUNTIME_ENV_FILES=self._dict["working_dir"]) + if "_ray_release" in runtime_env_json: + self._dict["_ray_release"] = runtime_env_json["_ray_release"] + + if "_ray_commit" in runtime_env_json: + self._dict["_ray_commit"] = runtime_env_json["_ray_commit"] + else: + if self._dict.get("pip") or self._dict.get("conda"): + self._dict["_ray_commit"] = ray.__commit__ + # TODO(ekl) we should have better schema validation here. # TODO(ekl) support py_modules # TODO(architkulkarni) support docker diff --git a/python/ray/tests/test_runtime_env_complicated.py b/python/ray/tests/test_runtime_env_complicated.py index 0ff48ba0a..435cd0f10 100644 --- a/python/ray/tests/test_runtime_env_complicated.py +++ b/python/ray/tests/test_runtime_env_complicated.py @@ -1,14 +1,16 @@ import os -from ray.workers.setup_runtime_env import inject_ray_and_python +from ray.workers.setup_runtime_env import inject_dependencies import pytest import sys import unittest +import yaml import subprocess from unittest import mock import ray from ray._private.utils import get_conda_env_dir, get_conda_bin_executable +from ray._private.runtime_env import RuntimeEnvDict from ray.job_config import JobConfig from ray.test_utils import run_string_as_driver @@ -319,7 +321,7 @@ def test_conda_create_job_config(shutdown_only): assert ray.get(f.remote()) -def test_inject_ray_and_python(): +def test_inject_dependencies(): num_tests = 4 conda_dicts = [None] * num_tests outputs = [None] * num_tests @@ -349,15 +351,14 @@ def test_inject_ray_and_python(): outputs[3] = { "dependencies": [ "blah", "pip", { - "pip": ["some_pkg", "ray==1.2.3"] + "pip": ["ray==1.2.3", "some_pkg"] }, "python=7.8" ] } for i in range(num_tests): - output = inject_ray_and_python(conda_dicts[i], "ray==1.2.3", "7.8") + output = inject_dependencies(conda_dicts[i], "7.8", ["ray==1.2.3"]) error_msg = (f"failed on input {i}." - f"Input: {conda_dicts[i]} \n" f"Output: {output} \n" f"Expected output: {outputs[i]}") assert (output == outputs[i]), error_msg @@ -416,17 +417,21 @@ def test_conda_create_ray_client(call_ray_start): @pytest.mark.skipif( sys.platform != "linux", reason="This test is only run on Buildkite.") @pytest.mark.parametrize("pip_as_str", [True, False]) -def test_pip_task(shutdown_only, pip_as_str): +def test_pip_task(shutdown_only, pip_as_str, tmp_path): """Tests pip installs in the runtime env specified in the job config.""" ray.init() if pip_as_str: + d = tmp_path / "pip_requirements" + d.mkdir() + p = d / "requirements.txt" requirements_txt = """ pip-install-test==0.5 opentelemetry-api==1.0.0rc1 opentelemetry-sdk==1.0.0rc1 """ - runtime_env = {"pip": requirements_txt} + p.write_text(requirements_txt) + runtime_env = {"pip": str(p)} else: runtime_env = { "pip": [ @@ -455,16 +460,20 @@ def test_pip_task(shutdown_only, pip_as_str): @pytest.mark.skipif( sys.platform != "linux", reason="This test is only run on Buildkite.") @pytest.mark.parametrize("pip_as_str", [True, False]) -def test_pip_job_config(shutdown_only, pip_as_str): +def test_pip_job_config(shutdown_only, pip_as_str, tmp_path): """Tests dynamic installation of pip packages in a task's runtime env.""" if pip_as_str: + d = tmp_path / "pip_requirements" + d.mkdir() + p = d / "requirements.txt" requirements_txt = """ pip-install-test==0.5 opentelemetry-api==1.0.0rc1 opentelemetry-sdk==1.0.0rc1 """ - runtime_env = {"pip": requirements_txt} + p.write_text(requirements_txt) + runtime_env = {"pip": str(p)} else: runtime_env = { "pip": [ @@ -486,6 +495,30 @@ def test_pip_job_config(shutdown_only, pip_as_str): assert ray.get(f.remote()) +@pytest.mark.skipif(sys.platform == "win32", reason="Unsupported on Windows.") +def test_conda_input_filepath(tmp_path): + conda_dict = { + "dependencies": [ + "pip", { + "pip": [ + "pip-install-test==0.5", "opentelemetry-api==1.0.0rc1", + "opentelemetry-sdk==1.0.0rc1" + ] + } + ] + } + d = tmp_path / "pip_requirements" + d.mkdir() + p = d / "environment.yml" + + p.write_text(yaml.dump(conda_dict)) + + runtime_env_dict = RuntimeEnvDict({"conda": str(p)}) + + output_conda_dict = runtime_env_dict.get_parsed_dict().get("conda") + assert output_conda_dict == conda_dict + + @unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") def test_experimental_package(shutdown_only): ray.init(num_cpus=2) diff --git a/python/ray/workers/setup_runtime_env.py b/python/ray/workers/setup_runtime_env.py index 6b6e33ef7..443bb8c2e 100644 --- a/python/ray/workers/setup_runtime_env.py +++ b/python/ray/workers/setup_runtime_env.py @@ -7,7 +7,7 @@ import yaml import hashlib from filelock import FileLock -from typing import Optional +from typing import Optional, List, Dict, Any from pathlib import Path import ray @@ -39,19 +39,20 @@ def setup(input_args): py_executable: str = sys.executable - if runtime_env.get("conda"): + if runtime_env.get("conda") or runtime_env.get("pip"): + conda_dict = get_conda_dict(runtime_env, args.session_dir) py_executable = "python" - if isinstance(runtime_env["conda"], str): - commands += get_conda_activate_commands(runtime_env["conda"]) - elif isinstance(runtime_env["conda"], dict): + if isinstance(runtime_env.get("conda"), str): + conda_env_name = runtime_env["conda"] + else: + assert conda_dict is not None py_version = ".".join(map(str, sys.version_info[:3])) # like 3.6.10 - conda_dict = inject_ray_and_python(runtime_env["conda"], - current_ray_pip_specifier(), - py_version) + conda_dict = inject_dependencies(conda_dict, py_version, + [current_ray_pip_specifier()]) # Locking to avoid multiple processes installing concurrently conda_hash = hashlib.sha1( - json.dumps(runtime_env["conda"], + json.dumps(conda_dict, sort_keys=True).encode("utf-8")).hexdigest() conda_hash_str = f"conda-generated-{conda_hash}" file_lock_name = f"ray-{conda_hash_str}.lock" @@ -67,44 +68,6 @@ def setup(input_args): yaml.dump(conda_dict, file, sort_keys=True) conda_env_name = get_or_create_conda_env( conda_yaml_path, conda_dir) - commands += get_conda_activate_commands(conda_env_name) - elif runtime_env.get("pip"): - # Install pip requirements into an empty conda env. - py_executable = "python" - requirements_txt = runtime_env["pip"] - pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest() - pip_hash_str = f"pip-generated-{pip_hash}" - - conda_dir = os.path.join(args.session_dir, "runtime_resources", - "conda") - requirements_txt_path = os.path.join( - conda_dir, f"requirements-{pip_hash_str}.txt") - - py_version = ".".join(map(str, sys.version_info[:3])) # E.g. 3.6.13 - conda_dict = { - "name": pip_hash_str, - "dependencies": ["pip", { - "pip": [f"-r {requirements_txt_path}"] - }] - } - - conda_dict = inject_ray_and_python(conda_dict, - current_ray_pip_specifier(), - py_version) - - file_lock_name = f"ray-{pip_hash_str}.lock" - with FileLock(os.path.join(args.session_dir, file_lock_name)): - try_to_create_directory(conda_dir) - conda_yaml_path = os.path.join(conda_dir, - f"env-{pip_hash_str}.yml") - with open(conda_yaml_path, "w") as file: - yaml.dump(conda_dict, file, sort_keys=True) - - with open(requirements_txt_path, "w") as file: - file.write(requirements_txt) - - conda_env_name = get_or_create_conda_env(conda_yaml_path, - conda_dir) commands += get_conda_activate_commands(conda_env_name) @@ -119,6 +82,44 @@ def setup(input_args): os.execvp("bash", ["bash", "-c", command_str]) +def get_conda_dict(runtime_env, session_dir) -> Optional[Dict[Any, Any]]: + """ Construct a conda dependencies dict from a runtime env. + + This function does not inject Ray or Python into the conda dict. + If the runtime env does not specify pip or conda, or if it specifies + the name of a preinstalled conda environment, this function returns + None. If pip is specified, a conda dict is created containing the + pip dependencies. If conda is already given as a dict, this function + is the identity function. + """ + if runtime_env.get("conda"): + if isinstance(runtime_env["conda"], dict): + return runtime_env["conda"] + else: + return None + if runtime_env.get("pip"): + requirements_txt = runtime_env["pip"] + pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest() + pip_hash_str = f"pip-generated-{pip_hash}" + + conda_dir = os.path.join(session_dir, "runtime_resources", "conda") + requirements_txt_path = os.path.join( + conda_dir, f"requirements-{pip_hash_str}.txt") + conda_dict = { + "name": pip_hash_str, + "dependencies": ["pip", { + "pip": [f"-r {requirements_txt_path}"] + }] + } + file_lock_name = f"ray-{pip_hash_str}.lock" + with FileLock(os.path.join(session_dir, file_lock_name)): + try_to_create_directory(conda_dir) + with open(requirements_txt_path, "w") as file: + file.write(requirements_txt) + return conda_dict + return None + + def current_ray_pip_specifier() -> Optional[str]: """The pip requirement specifier for the running version of Ray. @@ -141,6 +142,12 @@ def current_ray_pip_specifier() -> Optional[str]: Path(__file__).resolve().parents[3], ".whl", get_wheel_filename()) elif ray.__commit__ == "{{RAY_COMMIT_SHA}}": # Running on a version built from source locally. + logger.warning( + "Current Ray version could not be detected, most likely " + "because you are using a version of Ray " + "built from source. If you wish to use runtime_env, " + "you can try building a wheel and including the wheel " + "explicitly as a pip dependency.") return None elif "dev" in ray.__version__: # Running on a nightly wheel. @@ -149,8 +156,27 @@ def current_ray_pip_specifier() -> Optional[str]: return f"ray[all]=={ray.__version__}" -def inject_ray_and_python(conda_dict, ray_pip_specifier: Optional[str], - py_version: str) -> None: +def inject_dependencies( + conda_dict: Dict[Any, Any], + py_version: str, + pip_dependencies: Optional[List[str]] = None) -> Dict[Any, Any]: + """Add Ray, Python and (optionally) extra pip dependencies to a conda dict. + + Args: + conda_dict (dict): A dict representing the JSON-serialized conda + environment YAML file. This dict will be modified and returned. + py_version (str): A string representing a Python version to inject + into the conda dependencies, e.g. "3.7.7" + pip_dependencies (List[str]): A list of pip dependencies that + will be prepended to the list of pip dependencies in + the conda dict. If the conda dict does not already have a "pip" + field, one will be created. + Returns: + The modified dict. (Note: the input argument conda_dict is modified + and returned.) + """ + if pip_dependencies is None: + pip_dependencies = [] if conda_dict.get("dependencies") is None: conda_dict["dependencies"] = [] @@ -166,24 +192,15 @@ def inject_ray_and_python(conda_dict, ray_pip_specifier: Optional[str], if "pip" not in deps: deps.append("pip") - # Insert Ray dependency. If the user has already included Ray, conda - # will raise an error only if the two are incompatible. - - if ray_pip_specifier is not None: - found_pip_dict = False - for dep in deps: - if isinstance(dep, dict) and dep.get("pip"): - dep["pip"].append(ray_pip_specifier) - found_pip_dict = True - break - if not found_pip_dict: - deps.append({"pip": [ray_pip_specifier]}) - else: - logger.warning("Current Ray version could not be inserted " - "into conda's pip dependencies, most likely " - "because you are using a version of Ray " - "built from source. If so, you can try " - "building a wheel and including the wheel " - "as a dependency.") + # Insert pip dependencies. + found_pip_dict = False + for dep in deps: + if isinstance(dep, dict) and dep.get("pip") and isinstance( + dep["pip"], list): + dep["pip"] = pip_dependencies + dep["pip"] + found_pip_dict = True + break + if not found_pip_dict: + deps.append({"pip": pip_dependencies}) return conda_dict