Revert "Revert "[runtime env] Allow working_dir and py_module to be Path type"" (#20853)

This commit is contained in:
architkulkarni 2021-12-08 11:17:19 -08:00 committed by GitHub
parent 442b1025cd
commit 5593819135
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 195 additions and 154 deletions

View file

@ -2,6 +2,7 @@ import logging
import os import os
from types import ModuleType from types import ModuleType
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from pathlib import Path
from ray.experimental.internal_kv import _internal_kv_initialized from ray.experimental.internal_kv import _internal_kv_initialized
from ray._private.runtime_env.context import RuntimeEnvContext from ray._private.runtime_env.context import RuntimeEnvContext
@ -47,6 +48,8 @@ def upload_py_modules_if_needed(
if isinstance(module, str): if isinstance(module, str):
# module_path is a local path or a URI. # module_path is a local path or a URI.
module_path = module module_path = module
elif isinstance(module, Path):
module_path = str(module)
elif isinstance(module, ModuleType): elif isinstance(module, ModuleType):
# NOTE(edoakes): Python allows some installed Python packages to # NOTE(edoakes): Python allows some installed Python packages to
# be split into multiple directories. We could probably handle # be split into multiple directories. We could probably handle

View file

@ -1,6 +1,7 @@
import logging import logging
import os import os
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from pathlib import Path
from ray._private.runtime_env.utils import RuntimeEnv from ray._private.runtime_env.utils import RuntimeEnv
from ray.experimental.internal_kv import _internal_kv_initialized from ray.experimental.internal_kv import _internal_kv_initialized
@ -24,10 +25,13 @@ def upload_working_dir_if_needed(
if working_dir is None: if working_dir is None:
return runtime_env return runtime_env
if not isinstance(working_dir, str): if not isinstance(working_dir, str) and not isinstance(working_dir, Path):
raise TypeError( raise TypeError(
"working_dir must be a string (either a local path or remote " "working_dir must be a string or Path (either a local path "
f"URI), got {type(working_dir)}.") f"or remote URI), got {type(working_dir)}.")
if isinstance(working_dir, Path):
working_dir = str(working_dir)
# working_dir is already a URI -- just pass it through. # working_dir is already a URI -- just pass it through.
try: try:

View file

@ -8,7 +8,7 @@ import pytest
import requests import requests
from ray import serve from ray import serve
from ray.tests.test_runtime_env_working_dir import tmp_working_dir # noqa: F401, E501 from ray.tests.conftest import tmp_working_dir # noqa: F401, E501
@pytest.fixture @pytest.fixture

View file

@ -249,7 +249,8 @@ py_test_module_list(
files = [ files = [
"test_runtime_env.py", "test_runtime_env.py",
"test_runtime_env_working_dir.py", "test_runtime_env_working_dir.py",
"test_runtime_env_working_dir_2.py" "test_runtime_env_working_dir_2.py",
"test_runtime_env_working_dir_remote_uri.py"
], ],
size = "large", size = "large",
extra_srcs = SRCS, extra_srcs = SRCS,

View file

@ -4,9 +4,11 @@ This file defines the common pytest fixtures used in current directory.
import os import os
from contextlib import contextmanager from contextlib import contextmanager
import pytest import pytest
import tempfile
import subprocess import subprocess
import json import json
import time import time
from pathlib import Path
import ray import ray
from ray.cluster_utils import (Cluster, AutoscalingCluster, from ray.cluster_utils import (Cluster, AutoscalingCluster,
@ -271,6 +273,30 @@ def start_cluster(ray_start_cluster, request):
yield cluster, address yield cluster, address
@pytest.fixture(scope="function")
def tmp_working_dir():
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)
hello_file = path / "hello"
with hello_file.open(mode="w") as f:
f.write("world")
module_path = path / "test_module"
module_path.mkdir(parents=True)
test_file = module_path / "test.py"
with test_file.open(mode="w") as f:
f.write("def one():\n")
f.write(" return 1\n")
init_file = module_path / "__init__.py"
with init_file.open(mode="w") as f:
f.write("from test_module.test import one\n")
yield tmp_dir
@pytest.fixture @pytest.fixture
def enable_pickle_debug(): def enable_pickle_debug():
os.environ["RAY_PICKLE_VERBOSE_DEBUG"] = "1" os.environ["RAY_PICKLE_VERBOSE_DEBUG"] = "1"

View file

@ -5,7 +5,6 @@ import sys
import tempfile import tempfile
import pytest import pytest
from pytest_lazyfixture import lazy_fixture
import ray import ray
@ -19,34 +18,12 @@ HTTPS_PACKAGE_URI = ("https://github.com/shrekris-anyscale/"
"test_module/archive/HEAD.zip") "test_module/archive/HEAD.zip")
S3_PACKAGE_URI = "s3://runtime-env-test/test_runtime_env.zip" S3_PACKAGE_URI = "s3://runtime-env-test/test_runtime_env.zip"
GS_PACKAGE_URI = "gs://public-runtime-env-test/test_module.zip" GS_PACKAGE_URI = "gs://public-runtime-env-test/test_module.zip"
REMOTE_URIS = [HTTPS_PACKAGE_URI, S3_PACKAGE_URI]
@pytest.fixture(scope="function") @pytest.mark.parametrize("option", [
def tmp_working_dir(): "failure", "working_dir", "py_modules", "py_modules_path",
with tempfile.TemporaryDirectory() as tmp_dir: "working_dir_path"
path = Path(tmp_dir) ])
hello_file = path / "hello"
with hello_file.open(mode="w") as f:
f.write("world")
module_path = path / "test_module"
module_path.mkdir(parents=True)
test_file = module_path / "test.py"
with test_file.open(mode="w") as f:
f.write("def one():\n")
f.write(" return 1\n")
init_file = module_path / "__init__.py"
with init_file.open(mode="w") as f:
f.write("from test_module.test import one\n")
yield tmp_dir
@pytest.mark.parametrize("option", ["failure", "working_dir", "py_modules"])
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") @pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_lazy_reads(start_cluster, tmp_working_dir, option: str): def test_lazy_reads(start_cluster, tmp_working_dir, option: str):
"""Tests the case where we lazily read files or import inside a task/actor. """Tests the case where we lazily read files or import inside a task/actor.
@ -62,12 +39,21 @@ def test_lazy_reads(start_cluster, tmp_working_dir, option: str):
ray.init(address) ray.init(address)
elif option == "working_dir": elif option == "working_dir":
ray.init(address, runtime_env={"working_dir": tmp_working_dir}) ray.init(address, runtime_env={"working_dir": tmp_working_dir})
elif option == "working_dir_path":
ray.init(
address, runtime_env={"working_dir": Path(tmp_working_dir)})
elif option == "py_modules": elif option == "py_modules":
ray.init( ray.init(
address, address,
runtime_env={ runtime_env={
"py_modules": [str(Path(tmp_working_dir) / "test_module")] "py_modules": [str(Path(tmp_working_dir) / "test_module")]
}) })
elif option == "py_modules_path":
ray.init(
address,
runtime_env={
"py_modules": [Path(tmp_working_dir) / "test_module"]
})
call_ray_init() call_ray_init()
@ -253,93 +239,6 @@ def test_input_validation(start_cluster, option: str):
ray.init(address, runtime_env={"py_modules": "."}) ray.init(address, runtime_env={"py_modules": "."})
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("remote_uri", REMOTE_URIS)
@pytest.mark.parametrize("option", ["failure", "working_dir", "py_modules"])
@pytest.mark.parametrize("per_task_actor", [True, False])
def test_remote_package_uri(start_cluster, remote_uri, option, per_task_actor):
"""Tests the case where we lazily read files or import inside a task/actor.
In this case, the files come from a remote location.
This tests both that this fails *without* the working_dir and that it
passes with it.
"""
cluster, address = start_cluster
if option == "working_dir":
env = {"working_dir": remote_uri}
elif option == "py_modules":
env = {"py_modules": [remote_uri]}
if option == "failure" or per_task_actor:
ray.init(address)
else:
ray.init(address, runtime_env=env)
@ray.remote
def test_import():
import test_module
return test_module.one()
if option != "failure" and per_task_actor:
test_import = test_import.options(runtime_env=env)
if option == "failure":
with pytest.raises(ImportError):
ray.get(test_import.remote())
else:
assert ray.get(test_import.remote()) == 2
@ray.remote
class Actor:
def test_import(self):
import test_module
return test_module.one()
if option != "failure" and per_task_actor:
Actor = Actor.options(runtime_env=env)
a = Actor.remote()
if option == "failure":
with pytest.raises(ImportError):
assert ray.get(a.test_import.remote()) == 2
else:
assert ray.get(a.test_import.remote()) == 2
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("option", ["working_dir", "py_modules"])
@pytest.mark.parametrize(
"source", [*REMOTE_URIS, lazy_fixture("tmp_working_dir")])
def test_multi_node(start_cluster, option: str, source: str):
"""Tests that the working_dir is propagated across multi-node clusters."""
NUM_NODES = 3
cluster, address = start_cluster
for i in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(
num_cpus=1, runtime_env_dir_name=f"node_{i}_runtime_resources")
if option == "working_dir":
ray.init(address, runtime_env={"working_dir": source})
elif option == "py_modules":
if source not in REMOTE_URIS:
source = str(Path(source) / "test_module")
ray.init(address, runtime_env={"py_modules": [source]})
@ray.remote(num_cpus=1)
class A:
def check_and_get_node_id(self):
import test_module
test_module.one()
return ray.get_runtime_context().node_id
num_cpus = int(ray.available_resources()["CPU"])
actors = [A.remote() for _ in range(num_cpus)]
object_refs = [a.check_and_get_node_id.remote() for a in actors]
assert len(set(ray.get(object_refs))) == NUM_NODES
@pytest.mark.parametrize("option", ["working_dir", "py_modules"]) @pytest.mark.parametrize("option", ["working_dir", "py_modules"])
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") @pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_exclusion(start_cluster, tmp_working_dir, option): def test_exclusion(start_cluster, tmp_working_dir, option):
@ -492,39 +391,6 @@ cache/
] ]
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize(
"working_dir",
[*REMOTE_URIS, lazy_fixture("tmp_working_dir")])
def test_runtime_context(start_cluster, working_dir):
"""Tests that the working_dir is propagated in the runtime_context."""
cluster, address = start_cluster
ray.init(runtime_env={"working_dir": working_dir})
def check():
wd = ray.get_runtime_context().runtime_env["working_dir"]
if working_dir in REMOTE_URIS:
assert wd == working_dir
else:
assert wd.startswith("gcs://_ray_pkg_")
check()
@ray.remote
def task():
check()
ray.get(task.remote())
@ray.remote
class Actor:
def check(self):
check()
a = Actor.remote()
ray.get(a.check.remote())
def test_override_failure(shutdown_only): def test_override_failure(shutdown_only):
"""Tests invalid override behaviors.""" """Tests invalid override behaviors."""
ray.init() ray.init()

View file

@ -9,8 +9,6 @@ from ray._private.test_utils import run_string_as_driver
import ray import ray
import ray.experimental.internal_kv as kv import ray.experimental.internal_kv as kv
from ray.tests.test_runtime_env_working_dir \
import tmp_working_dir # noqa: F401
from ray._private.test_utils import wait_for_condition, chdir from ray._private.test_utils import wait_for_condition, chdir
from ray._private.runtime_env import RAY_WORKER_DEV_EXCLUDES from ray._private.runtime_env import RAY_WORKER_DEV_EXCLUDES
from ray._private.runtime_env.packaging import GCS_STORAGE_MAX_SIZE from ray._private.runtime_env.packaging import GCS_STORAGE_MAX_SIZE

View file

@ -0,0 +1,143 @@
from pathlib import Path
import sys
import pytest
from pytest_lazyfixture import lazy_fixture
import ray
# This test requires you have AWS credentials set up (any AWS credentials will
# do, this test only accesses a public bucket).
# This package contains a subdirectory called `test_module`.
# Calling `test_module.one()` should return `2`.
# If you find that confusing, take it up with @jiaodong...
HTTPS_PACKAGE_URI = ("https://github.com/shrekris-anyscale/"
"test_module/archive/HEAD.zip")
S3_PACKAGE_URI = "s3://runtime-env-test/test_runtime_env.zip"
GS_PACKAGE_URI = "gs://public-runtime-env-test/test_module.zip"
REMOTE_URIS = [HTTPS_PACKAGE_URI, S3_PACKAGE_URI]
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("remote_uri", REMOTE_URIS)
@pytest.mark.parametrize("option", ["failure", "working_dir", "py_modules"])
@pytest.mark.parametrize("per_task_actor", [True, False])
def test_remote_package_uri(start_cluster, remote_uri, option, per_task_actor):
"""Tests the case where we lazily read files or import inside a task/actor.
In this case, the files come from a remote location.
This tests both that this fails *without* the working_dir and that it
passes with it.
"""
cluster, address = start_cluster
if option == "working_dir":
env = {"working_dir": remote_uri}
elif option == "py_modules":
env = {"py_modules": [remote_uri]}
if option == "failure" or per_task_actor:
ray.init(address)
else:
ray.init(address, runtime_env=env)
@ray.remote
def test_import():
import test_module
return test_module.one()
if option != "failure" and per_task_actor:
test_import = test_import.options(runtime_env=env)
if option == "failure":
with pytest.raises(ImportError):
ray.get(test_import.remote())
else:
assert ray.get(test_import.remote()) == 2
@ray.remote
class Actor:
def test_import(self):
import test_module
return test_module.one()
if option != "failure" and per_task_actor:
Actor = Actor.options(runtime_env=env)
a = Actor.remote()
if option == "failure":
with pytest.raises(ImportError):
assert ray.get(a.test_import.remote()) == 2
else:
assert ray.get(a.test_import.remote()) == 2
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("option", ["working_dir", "py_modules"])
@pytest.mark.parametrize(
"source", [*REMOTE_URIS, lazy_fixture("tmp_working_dir")])
def test_multi_node(start_cluster, option: str, source: str):
"""Tests that the working_dir is propagated across multi-node clusters."""
NUM_NODES = 3
cluster, address = start_cluster
for i in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(
num_cpus=1, runtime_env_dir_name=f"node_{i}_runtime_resources")
if option == "working_dir":
ray.init(address, runtime_env={"working_dir": source})
elif option == "py_modules":
if source not in REMOTE_URIS:
source = str(Path(source) / "test_module")
ray.init(address, runtime_env={"py_modules": [source]})
@ray.remote(num_cpus=1)
class A:
def check_and_get_node_id(self):
import test_module
test_module.one()
return ray.get_runtime_context().node_id
num_cpus = int(ray.available_resources()["CPU"])
actors = [A.remote() for _ in range(num_cpus)]
object_refs = [a.check_and_get_node_id.remote() for a in actors]
assert len(set(ray.get(object_refs))) == NUM_NODES
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize(
"working_dir",
[*REMOTE_URIS, lazy_fixture("tmp_working_dir")])
def test_runtime_context(start_cluster, working_dir):
"""Tests that the working_dir is propagated in the runtime_context."""
cluster, address = start_cluster
ray.init(runtime_env={"working_dir": working_dir})
def check():
wd = ray.get_runtime_context().runtime_env["working_dir"]
if working_dir in REMOTE_URIS:
assert wd == working_dir
else:
assert wd.startswith("gcs://_ray_pkg_")
check()
@ray.remote
def task():
check()
ray.get(task.remote())
@ray.remote
class Actor:
def check(self):
check()
a = Actor.remote()
ray.get(a.check.remote())
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))