[Core] Allow specifying runtime_env conda and pip via filepath (#16073)

This commit is contained in:
architkulkarni 2021-05-27 15:58:47 -07:00 committed by Dmitri Gekhtman
parent a05b755aee
commit 86feccf9c7
3 changed files with 182 additions and 94 deletions

View file

@ -1,6 +1,7 @@
import hashlib import hashlib
import logging import logging
import json import json
import yaml
from filelock import FileLock from filelock import FileLock
from pathlib import Path from pathlib import Path
@ -9,6 +10,7 @@ from ray._private.thirdparty.pathspec import PathSpec
from ray.job_config import JobConfig from ray.job_config import JobConfig
from enum import Enum from enum import Enum
import ray
from ray.experimental.internal_kv import (_internal_kv_put, _internal_kv_get, from ray.experimental.internal_kv import (_internal_kv_put, _internal_kv_get,
_internal_kv_exists, _internal_kv_exists,
_internal_kv_initialized) _internal_kv_initialized)
@ -41,12 +43,18 @@ class RuntimeEnvDict:
Examples: Examples:
["/path/to/other_module", "/other_path/local_project.zip"] ["/path/to/other_module", "/other_path/local_project.zip"]
pip (List[str] | str): Either a list of pip packages, or a string pip (List[str] | str): Either a list of pip packages, or a string
containing the contents of a pip requirements.txt file. containing the path to a pip requirements.txt file. If a relative
conda (dict | str): Either the conda YAML config or the name of a path is specified and working_dir is specified, the path is
local conda env (e.g., "pytorch_p36"). The Ray dependency will be interpreted relative to working_dir.
automatically injected into the conda env to ensure compatibility conda (dict | str): Either the conda YAML config, the name of a
with the cluster Ray. The conda name may be mangled automatically local conda env (e.g., "pytorch_p36"), or the path to a conda
to avoid conflicts between runtime envs. environment.yaml file. If a relative path is specified and
working_dir is specified, the path is interpreted relative to
working_dir.
The Ray dependency will be automatically injected into the conda
env to ensure compatibility with the cluster Ray. The conda name
may be mangled automatically to avoid conflicts between runtime
envs.
This field cannot be specified at the same time as the 'pip' field. This field cannot be specified at the same time as the 'pip' field.
To use pip with conda, please specify your pip dependencies within To use pip with conda, please specify your pip dependencies within
the conda YAML config: the conda YAML config:
@ -70,7 +78,15 @@ class RuntimeEnvDict:
# Simple dictionary with all options validated. This will always # Simple dictionary with all options validated. This will always
# contain all supported keys; values will be set to None if # contain all supported keys; values will be set to None if
# unspecified. However, if all values are None this is set to {}. # unspecified. However, if all values are None this is set to {}.
self._dict = {} self._dict = dict()
if "working_dir" in runtime_env_json:
self._dict["working_dir"] = runtime_env_json["working_dir"]
working_dir = Path(self._dict["working_dir"])
else:
self._dict["working_dir"] = None
working_dir = None
self._dict["conda"] = None self._dict["conda"] = None
if "conda" in runtime_env_json: if "conda" in runtime_env_json:
if sys.platform == "win32": if sys.platform == "win32":
@ -79,7 +95,22 @@ class RuntimeEnvDict:
"Windows.") "Windows.")
conda = runtime_env_json["conda"] conda = runtime_env_json["conda"]
if isinstance(conda, str): if isinstance(conda, str):
self._dict["conda"] = conda yaml_file = Path(conda)
if yaml_file.suffix in (".yaml", ".yml"):
if working_dir and not yaml_file.is_absolute():
yaml_file = working_dir / yaml_file
if not yaml_file.is_file():
raise ValueError(
f"Can't find conda YAML file {yaml_file}")
try:
self._dict["conda"] = yaml.load(yaml_file.read_text())
except Exception as e:
raise ValueError(
f"Invalid conda file {yaml_file} with error {e}")
else:
logger.info(
f"Using preinstalled conda environment: {conda}")
self._dict["conda"] = conda
elif isinstance(conda, dict): elif isinstance(conda, dict):
self._dict["conda"] = conda self._dict["conda"] = conda
elif conda is not None: elif conda is not None:
@ -104,7 +135,13 @@ class RuntimeEnvDict:
"#create-env-file-manually") "#create-env-file-manually")
pip = runtime_env_json["pip"] pip = runtime_env_json["pip"]
if isinstance(pip, str): if isinstance(pip, str):
self._dict["pip"] = pip # We have been given a path to a requirements.txt file.
pip_file = Path(pip)
if working_dir and not pip_file.is_absolute():
pip_file = working_dir / pip_file
if not pip_file.is_file():
raise ValueError(f"{pip_file} is not a valid file")
self._dict["pip"] = pip_file.read_text()
elif isinstance(pip, list) and all( elif isinstance(pip, list) and all(
isinstance(dep, str) for dep in pip): isinstance(dep, str) for dep in pip):
# Construct valid pip requirements.txt from list of packages. # Construct valid pip requirements.txt from list of packages.
@ -113,17 +150,9 @@ class RuntimeEnvDict:
raise TypeError("runtime_env['pip'] must be of type str or " raise TypeError("runtime_env['pip'] must be of type str or "
"List[str]") "List[str]")
if "working_dir" in runtime_env_json:
self._dict["working_dir"] = runtime_env_json["working_dir"]
else:
self._dict["working_dir"] = None
if "uris" in runtime_env_json: if "uris" in runtime_env_json:
self._dict["uris"] = runtime_env_json["uris"] self._dict["uris"] = runtime_env_json["uris"]
if "_ray_release" in runtime_env_json:
self._dict["_ray_release"] = runtime_env_json["_ray_release"]
self._dict["env_vars"] = None self._dict["env_vars"] = None
if "env_vars" in runtime_env_json: if "env_vars" in runtime_env_json:
env_vars = runtime_env_json["env_vars"] env_vars = runtime_env_json["env_vars"]
@ -141,6 +170,15 @@ class RuntimeEnvDict:
self._dict["env_vars"].update( self._dict["env_vars"].update(
RAY_RUNTIME_ENV_FILES=self._dict["working_dir"]) RAY_RUNTIME_ENV_FILES=self._dict["working_dir"])
if "_ray_release" in runtime_env_json:
self._dict["_ray_release"] = runtime_env_json["_ray_release"]
if "_ray_commit" in runtime_env_json:
self._dict["_ray_commit"] = runtime_env_json["_ray_commit"]
else:
if self._dict.get("pip") or self._dict.get("conda"):
self._dict["_ray_commit"] = ray.__commit__
# TODO(ekl) we should have better schema validation here. # TODO(ekl) we should have better schema validation here.
# TODO(ekl) support py_modules # TODO(ekl) support py_modules
# TODO(architkulkarni) support docker # TODO(architkulkarni) support docker

View file

@ -1,14 +1,16 @@
import os import os
from ray.workers.setup_runtime_env import inject_ray_and_python from ray.workers.setup_runtime_env import inject_dependencies
import pytest import pytest
import sys import sys
import unittest import unittest
import yaml
import subprocess import subprocess
from unittest import mock from unittest import mock
import ray import ray
from ray._private.utils import get_conda_env_dir, get_conda_bin_executable from ray._private.utils import get_conda_env_dir, get_conda_bin_executable
from ray._private.runtime_env import RuntimeEnvDict
from ray.job_config import JobConfig from ray.job_config import JobConfig
from ray.test_utils import run_string_as_driver from ray.test_utils import run_string_as_driver
@ -319,7 +321,7 @@ def test_conda_create_job_config(shutdown_only):
assert ray.get(f.remote()) assert ray.get(f.remote())
def test_inject_ray_and_python(): def test_inject_dependencies():
num_tests = 4 num_tests = 4
conda_dicts = [None] * num_tests conda_dicts = [None] * num_tests
outputs = [None] * num_tests outputs = [None] * num_tests
@ -349,15 +351,14 @@ def test_inject_ray_and_python():
outputs[3] = { outputs[3] = {
"dependencies": [ "dependencies": [
"blah", "pip", { "blah", "pip", {
"pip": ["some_pkg", "ray==1.2.3"] "pip": ["ray==1.2.3", "some_pkg"]
}, "python=7.8" }, "python=7.8"
] ]
} }
for i in range(num_tests): for i in range(num_tests):
output = inject_ray_and_python(conda_dicts[i], "ray==1.2.3", "7.8") output = inject_dependencies(conda_dicts[i], "7.8", ["ray==1.2.3"])
error_msg = (f"failed on input {i}." error_msg = (f"failed on input {i}."
f"Input: {conda_dicts[i]} \n"
f"Output: {output} \n" f"Output: {output} \n"
f"Expected output: {outputs[i]}") f"Expected output: {outputs[i]}")
assert (output == outputs[i]), error_msg assert (output == outputs[i]), error_msg
@ -416,17 +417,21 @@ def test_conda_create_ray_client(call_ray_start):
@pytest.mark.skipif( @pytest.mark.skipif(
sys.platform != "linux", reason="This test is only run on Buildkite.") sys.platform != "linux", reason="This test is only run on Buildkite.")
@pytest.mark.parametrize("pip_as_str", [True, False]) @pytest.mark.parametrize("pip_as_str", [True, False])
def test_pip_task(shutdown_only, pip_as_str): def test_pip_task(shutdown_only, pip_as_str, tmp_path):
"""Tests pip installs in the runtime env specified in the job config.""" """Tests pip installs in the runtime env specified in the job config."""
ray.init() ray.init()
if pip_as_str: if pip_as_str:
d = tmp_path / "pip_requirements"
d.mkdir()
p = d / "requirements.txt"
requirements_txt = """ requirements_txt = """
pip-install-test==0.5 pip-install-test==0.5
opentelemetry-api==1.0.0rc1 opentelemetry-api==1.0.0rc1
opentelemetry-sdk==1.0.0rc1 opentelemetry-sdk==1.0.0rc1
""" """
runtime_env = {"pip": requirements_txt} p.write_text(requirements_txt)
runtime_env = {"pip": str(p)}
else: else:
runtime_env = { runtime_env = {
"pip": [ "pip": [
@ -455,16 +460,20 @@ def test_pip_task(shutdown_only, pip_as_str):
@pytest.mark.skipif( @pytest.mark.skipif(
sys.platform != "linux", reason="This test is only run on Buildkite.") sys.platform != "linux", reason="This test is only run on Buildkite.")
@pytest.mark.parametrize("pip_as_str", [True, False]) @pytest.mark.parametrize("pip_as_str", [True, False])
def test_pip_job_config(shutdown_only, pip_as_str): def test_pip_job_config(shutdown_only, pip_as_str, tmp_path):
"""Tests dynamic installation of pip packages in a task's runtime env.""" """Tests dynamic installation of pip packages in a task's runtime env."""
if pip_as_str: if pip_as_str:
d = tmp_path / "pip_requirements"
d.mkdir()
p = d / "requirements.txt"
requirements_txt = """ requirements_txt = """
pip-install-test==0.5 pip-install-test==0.5
opentelemetry-api==1.0.0rc1 opentelemetry-api==1.0.0rc1
opentelemetry-sdk==1.0.0rc1 opentelemetry-sdk==1.0.0rc1
""" """
runtime_env = {"pip": requirements_txt} p.write_text(requirements_txt)
runtime_env = {"pip": str(p)}
else: else:
runtime_env = { runtime_env = {
"pip": [ "pip": [
@ -486,6 +495,30 @@ def test_pip_job_config(shutdown_only, pip_as_str):
assert ray.get(f.remote()) assert ray.get(f.remote())
@pytest.mark.skipif(sys.platform == "win32", reason="Unsupported on Windows.")
def test_conda_input_filepath(tmp_path):
conda_dict = {
"dependencies": [
"pip", {
"pip": [
"pip-install-test==0.5", "opentelemetry-api==1.0.0rc1",
"opentelemetry-sdk==1.0.0rc1"
]
}
]
}
d = tmp_path / "pip_requirements"
d.mkdir()
p = d / "environment.yml"
p.write_text(yaml.dump(conda_dict))
runtime_env_dict = RuntimeEnvDict({"conda": str(p)})
output_conda_dict = runtime_env_dict.get_parsed_dict().get("conda")
assert output_conda_dict == conda_dict
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") @unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
def test_experimental_package(shutdown_only): def test_experimental_package(shutdown_only):
ray.init(num_cpus=2) ray.init(num_cpus=2)

View file

@ -7,7 +7,7 @@ import yaml
import hashlib import hashlib
from filelock import FileLock from filelock import FileLock
from typing import Optional from typing import Optional, List, Dict, Any
from pathlib import Path from pathlib import Path
import ray import ray
@ -39,19 +39,20 @@ def setup(input_args):
py_executable: str = sys.executable py_executable: str = sys.executable
if runtime_env.get("conda"): if runtime_env.get("conda") or runtime_env.get("pip"):
conda_dict = get_conda_dict(runtime_env, args.session_dir)
py_executable = "python" py_executable = "python"
if isinstance(runtime_env["conda"], str): if isinstance(runtime_env.get("conda"), str):
commands += get_conda_activate_commands(runtime_env["conda"]) conda_env_name = runtime_env["conda"]
elif isinstance(runtime_env["conda"], dict): else:
assert conda_dict is not None
py_version = ".".join(map(str, py_version = ".".join(map(str,
sys.version_info[:3])) # like 3.6.10 sys.version_info[:3])) # like 3.6.10
conda_dict = inject_ray_and_python(runtime_env["conda"], conda_dict = inject_dependencies(conda_dict, py_version,
current_ray_pip_specifier(), [current_ray_pip_specifier()])
py_version)
# Locking to avoid multiple processes installing concurrently # Locking to avoid multiple processes installing concurrently
conda_hash = hashlib.sha1( conda_hash = hashlib.sha1(
json.dumps(runtime_env["conda"], json.dumps(conda_dict,
sort_keys=True).encode("utf-8")).hexdigest() sort_keys=True).encode("utf-8")).hexdigest()
conda_hash_str = f"conda-generated-{conda_hash}" conda_hash_str = f"conda-generated-{conda_hash}"
file_lock_name = f"ray-{conda_hash_str}.lock" file_lock_name = f"ray-{conda_hash_str}.lock"
@ -67,44 +68,6 @@ def setup(input_args):
yaml.dump(conda_dict, file, sort_keys=True) yaml.dump(conda_dict, file, sort_keys=True)
conda_env_name = get_or_create_conda_env( conda_env_name = get_or_create_conda_env(
conda_yaml_path, conda_dir) conda_yaml_path, conda_dir)
commands += get_conda_activate_commands(conda_env_name)
elif runtime_env.get("pip"):
# Install pip requirements into an empty conda env.
py_executable = "python"
requirements_txt = runtime_env["pip"]
pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest()
pip_hash_str = f"pip-generated-{pip_hash}"
conda_dir = os.path.join(args.session_dir, "runtime_resources",
"conda")
requirements_txt_path = os.path.join(
conda_dir, f"requirements-{pip_hash_str}.txt")
py_version = ".".join(map(str, sys.version_info[:3])) # E.g. 3.6.13
conda_dict = {
"name": pip_hash_str,
"dependencies": ["pip", {
"pip": [f"-r {requirements_txt_path}"]
}]
}
conda_dict = inject_ray_and_python(conda_dict,
current_ray_pip_specifier(),
py_version)
file_lock_name = f"ray-{pip_hash_str}.lock"
with FileLock(os.path.join(args.session_dir, file_lock_name)):
try_to_create_directory(conda_dir)
conda_yaml_path = os.path.join(conda_dir,
f"env-{pip_hash_str}.yml")
with open(conda_yaml_path, "w") as file:
yaml.dump(conda_dict, file, sort_keys=True)
with open(requirements_txt_path, "w") as file:
file.write(requirements_txt)
conda_env_name = get_or_create_conda_env(conda_yaml_path,
conda_dir)
commands += get_conda_activate_commands(conda_env_name) commands += get_conda_activate_commands(conda_env_name)
@ -119,6 +82,44 @@ def setup(input_args):
os.execvp("bash", ["bash", "-c", command_str]) os.execvp("bash", ["bash", "-c", command_str])
def get_conda_dict(runtime_env, session_dir) -> Optional[Dict[Any, Any]]:
""" Construct a conda dependencies dict from a runtime env.
This function does not inject Ray or Python into the conda dict.
If the runtime env does not specify pip or conda, or if it specifies
the name of a preinstalled conda environment, this function returns
None. If pip is specified, a conda dict is created containing the
pip dependencies. If conda is already given as a dict, this function
is the identity function.
"""
if runtime_env.get("conda"):
if isinstance(runtime_env["conda"], dict):
return runtime_env["conda"]
else:
return None
if runtime_env.get("pip"):
requirements_txt = runtime_env["pip"]
pip_hash = hashlib.sha1(requirements_txt.encode("utf-8")).hexdigest()
pip_hash_str = f"pip-generated-{pip_hash}"
conda_dir = os.path.join(session_dir, "runtime_resources", "conda")
requirements_txt_path = os.path.join(
conda_dir, f"requirements-{pip_hash_str}.txt")
conda_dict = {
"name": pip_hash_str,
"dependencies": ["pip", {
"pip": [f"-r {requirements_txt_path}"]
}]
}
file_lock_name = f"ray-{pip_hash_str}.lock"
with FileLock(os.path.join(session_dir, file_lock_name)):
try_to_create_directory(conda_dir)
with open(requirements_txt_path, "w") as file:
file.write(requirements_txt)
return conda_dict
return None
def current_ray_pip_specifier() -> Optional[str]: def current_ray_pip_specifier() -> Optional[str]:
"""The pip requirement specifier for the running version of Ray. """The pip requirement specifier for the running version of Ray.
@ -141,6 +142,12 @@ def current_ray_pip_specifier() -> Optional[str]:
Path(__file__).resolve().parents[3], ".whl", get_wheel_filename()) Path(__file__).resolve().parents[3], ".whl", get_wheel_filename())
elif ray.__commit__ == "{{RAY_COMMIT_SHA}}": elif ray.__commit__ == "{{RAY_COMMIT_SHA}}":
# Running on a version built from source locally. # Running on a version built from source locally.
logger.warning(
"Current Ray version could not be detected, most likely "
"because you are using a version of Ray "
"built from source. If you wish to use runtime_env, "
"you can try building a wheel and including the wheel "
"explicitly as a pip dependency.")
return None return None
elif "dev" in ray.__version__: elif "dev" in ray.__version__:
# Running on a nightly wheel. # Running on a nightly wheel.
@ -149,8 +156,27 @@ def current_ray_pip_specifier() -> Optional[str]:
return f"ray[all]=={ray.__version__}" return f"ray[all]=={ray.__version__}"
def inject_ray_and_python(conda_dict, ray_pip_specifier: Optional[str], def inject_dependencies(
py_version: str) -> None: conda_dict: Dict[Any, Any],
py_version: str,
pip_dependencies: Optional[List[str]] = None) -> Dict[Any, Any]:
"""Add Ray, Python and (optionally) extra pip dependencies to a conda dict.
Args:
conda_dict (dict): A dict representing the JSON-serialized conda
environment YAML file. This dict will be modified and returned.
py_version (str): A string representing a Python version to inject
into the conda dependencies, e.g. "3.7.7"
pip_dependencies (List[str]): A list of pip dependencies that
will be prepended to the list of pip dependencies in
the conda dict. If the conda dict does not already have a "pip"
field, one will be created.
Returns:
The modified dict. (Note: the input argument conda_dict is modified
and returned.)
"""
if pip_dependencies is None:
pip_dependencies = []
if conda_dict.get("dependencies") is None: if conda_dict.get("dependencies") is None:
conda_dict["dependencies"] = [] conda_dict["dependencies"] = []
@ -166,24 +192,15 @@ def inject_ray_and_python(conda_dict, ray_pip_specifier: Optional[str],
if "pip" not in deps: if "pip" not in deps:
deps.append("pip") deps.append("pip")
# Insert Ray dependency. If the user has already included Ray, conda # Insert pip dependencies.
# will raise an error only if the two are incompatible. found_pip_dict = False
for dep in deps:
if ray_pip_specifier is not None: if isinstance(dep, dict) and dep.get("pip") and isinstance(
found_pip_dict = False dep["pip"], list):
for dep in deps: dep["pip"] = pip_dependencies + dep["pip"]
if isinstance(dep, dict) and dep.get("pip"): found_pip_dict = True
dep["pip"].append(ray_pip_specifier) break
found_pip_dict = True if not found_pip_dict:
break deps.append({"pip": pip_dependencies})
if not found_pip_dict:
deps.append({"pip": [ray_pip_specifier]})
else:
logger.warning("Current Ray version could not be inserted "
"into conda's pip dependencies, most likely "
"because you are using a version of Ray "
"built from source. If so, you can try "
"building a wheel and including the wheel "
"as a dependency.")
return conda_dict return conda_dict