diff --git a/python/ray/_private/runtime_env/working_dir.py b/python/ray/_private/runtime_env/working_dir.py index 08b8375a3..e38f2cd9d 100644 --- a/python/ray/_private/runtime_env/working_dir.py +++ b/python/ray/_private/runtime_env/working_dir.py @@ -6,7 +6,7 @@ from ray.experimental.internal_kv import _internal_kv_initialized from ray._private.runtime_env.context import RuntimeEnvContext from ray._private.runtime_env.packaging import ( download_and_unpack_package, delete_package, get_uri_for_directory, - parse_uri, upload_package_if_needed) + parse_uri, Protocol, upload_package_if_needed) default_logger = logging.getLogger(__name__) @@ -28,10 +28,14 @@ def upload_working_dir_if_needed(runtime_env: Dict[str, Any], # working_dir is already a URI -- just pass it through. try: - parse_uri(working_dir) - return runtime_env + protocol, path = parse_uri(working_dir) except ValueError: - pass + protocol, path = None, None + + if protocol is not None: + if protocol == Protocol.S3 and not path.endswith(".zip"): + raise ValueError("Only .zip files supported for S3 URIs.") + return runtime_env # Remove excludes, it isn't relevant after the upload step. excludes = runtime_env.pop("excludes", None) diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index c6b690a71..9430bffd7 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -252,7 +252,8 @@ py_test_module_list( py_test_module_list( files = [ - "test_runtime_env.py" + "test_runtime_env.py", + "test_runtime_env_working_dir.py" ], size = "large", extra_srcs = SRCS, diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 3389f84f7..d040655b0 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -1,658 +1,16 @@ import os import pytest import sys -import tempfile import time import requests from pathlib import Path -from pytest_lazyfixture import lazy_fixture import ray from ray.exceptions import RuntimeEnvSetupError -import ray.experimental.internal_kv as kv -from ray._private.test_utils import (run_string_as_driver, wait_for_condition) -from ray._private.runtime_env.packaging import GCS_STORAGE_MAX_SIZE, parse_uri +from ray._private.test_utils import wait_for_condition from ray._private.utils import (get_wheel_filename, get_master_wheel_url, get_release_wheel_url) -S3_PACKAGE_URI = "s3://runtime-env-test/remote_runtime_env.zip" - -driver_script = """ -import logging -import os -import sys -import time -import traceback - -import ray -import ray.util - -# Define test_module for py_module tests -try: - import test_module -except: - pass - -try: - job_config = ray.job_config.JobConfig( - runtime_env={runtime_env} - ) - - if not job_config.runtime_env: - job_config=None - - - if os.environ.get("USE_RAY_CLIENT"): - ray.client("{address}").env({runtime_env}).namespace("default_test_namespace").connect() - else: - ray.init(address="{address}", - job_config=job_config, - logging_level=logging.DEBUG, - namespace="default_test_namespace" -) -except ValueError: - print("ValueError:", traceback.format_exc()) - sys.exit(0) -except TypeError: - print("TypeError:", traceback.format_exc()) - sys.exit(0) -except Exception: - print("ERROR:", traceback.format_exc()) - sys.exit(0) - - -if os.environ.get("EXIT_AFTER_INIT"): - sys.exit(0) - -# Schedule a dummy task to kick off runtime env agent's working_dir setup() -@ray.remote -def dummy_task(): - return "dummy task scheduled" - -ray.get([dummy_task.remote()]) - -# Insert working_dir path with unzipped files -sys.path.insert(0, "{working_dir}") - -# Actual import of test_module after working_dir is setup -import test_module - -@ray.remote -def run_test(): - return test_module.one() - -@ray.remote -def check_file(name): - try: - with open(name) as f: - return f.read() - except: - return "FAILED" - -@ray.remote -class TestActor(object): - @ray.method(num_returns=1) - def one(self): - return test_module.one() - -{execute_statement} - -if os.environ.get("USE_RAY_CLIENT"): - ray.util.disconnect() -else: - ray.shutdown() -""" - - -def create_file(p): - if not p.parent.exists(): - p.parent.mkdir() - with p.open("w") as f: - f.write("Test") - - -@pytest.fixture(scope="function") -def working_dir(): - """Regular local_working_dir test setup for existing tests""" - with tempfile.TemporaryDirectory() as tmp_dir: - path = Path(tmp_dir) - module_path = path / "test_module" - module_path.mkdir(parents=True) - - init_file = module_path / "__init__.py" - test_file = module_path / "test.py" - with test_file.open(mode="w") as f: - f.write(""" -def one(): - return 1 -""") - with init_file.open(mode="w") as f: - f.write(""" -from test_module.test import one -""") - - old_dir = os.getcwd() - os.chdir(tmp_dir) - yield tmp_dir - os.chdir(old_dir) - - -@pytest.fixture(scope="function") -def local_working_dir(): - """Parametrized local_working_dir test setup""" - with tempfile.TemporaryDirectory() as tmp_dir: - path = Path(tmp_dir) - module_path = path / "test_module" - module_path.mkdir(parents=True) - - # There are "test.py" file with same module name and function - # signature, but different return value. Regular runtime env - # working_dir uses existing file and should return 1 on each - # call to one(); While s3 remote runtime env with same file - # names will return 2. - - init_file = module_path / "__init__.py" - test_file = module_path / "test.py" - with test_file.open(mode="w") as f: - f.write(""" -def one(): - return 1 -""") - with init_file.open(mode="w") as f: - f.write(""" -from test_module.test import one -""") - - old_dir = os.getcwd() - os.chdir(tmp_dir) - runtime_env = f"""{{ "working_dir": "{tmp_dir}" }}""" - # local working_dir's one() return 1 for each call - yield tmp_dir, runtime_env, "1000" - os.chdir(old_dir) - - -@pytest.fixture(scope="function") -def s3_working_dir(): - """Parametrized s3_working_dir test setup""" - with tempfile.TemporaryDirectory() as tmp_dir: - old_dir = os.getcwd() - os.chdir(tmp_dir) - - # There are "test.py" file with same module name and function - # signature, but different return value. Regular runtime env - # working_dir uses existing file and should return 1 on each - # call to one(); While s3 remote runtime env with same file - # names will return 2. - - runtime_env = f"""{{ "working_dir": "{S3_PACKAGE_URI}" }}""" - _, pkg_name = parse_uri(S3_PACKAGE_URI) - runtime_env_dir = ray.worker._global_node.get_runtime_env_dir_path() - working_dir = Path(os.path.join(runtime_env_dir, - pkg_name)).with_suffix("") - # s3 working_dir's one() return 2 for each call - yield working_dir, runtime_env, "2000" - os.chdir(old_dir) - - -@pytest.fixture( - scope="function", - params=[lazy_fixture("local_working_dir"), - lazy_fixture("s3_working_dir")]) -def working_dir_parametrized(request): - return request.param - - -def start_client_server(cluster, client_mode): - env = {} - if client_mode: - ray.worker._global_node._ray_params.ray_client_server_port = "10003" - ray.worker._global_node.start_ray_client_server() - address = "localhost:10003" - env["USE_RAY_CLIENT"] = "1" - else: - address = cluster.address - - runtime_env_dir = ray.worker._global_node.get_runtime_env_dir_path() - - return address, env, runtime_env_dir - - -""" -The following test cases are related with runtime env. It following these steps - 1) Creating a temporary dir with fixture working_dir - 2) Using a template named driver_script defined globally - 3) Overwrite runtime_env and execute_statement in the template - 4) Execute it as a separate driver and return the result -""" - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_empty_working_dir(ray_start_cluster_head, client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - env["EXIT_AFTER_INIT"] = "1" - with tempfile.TemporaryDirectory() as working_dir: - runtime_env = f"""{{ - "working_dir": r"{working_dir}" -}}""" - # Execute the following cmd in driver with runtime_env - execute_statement = "sys.exit(0)" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert not out.startswith("ERROR:") - - -@pytest.mark.skip("py_modules not supported yet.") -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_empty_py_modules(ray_start_cluster_head, client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - env["EXIT_AFTER_INIT"] = "1" - with tempfile.TemporaryDirectory() as working_dir: - runtime_env = f"""{{ - "py_modules": [r"{working_dir}"] -}}""" - # Execute the following cmd in driver with runtime_env - execute_statement = "sys.exit(0)" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert not out.startswith("ERROR:") - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_invalid_working_dir(ray_start_cluster_head, working_dir, client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - env["EXIT_AFTER_INIT"] = "1" - - runtime_env = "{ 'working_dir': 10 }" - # Execute the following cmd in driver with runtime_env - execute_statement = "" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().splitlines()[-1].startswith("TypeError"), out - - runtime_env = f"{{ 'working_dir': os.path.join(r'{working_dir}', 'na') }}" - # Execute the following cmd in driver with runtime_env - execute_statement = "" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().splitlines()[-1].startswith("ValueError"), out - - runtime_env = "{ 'working_dir': 's3://bucket/package' }" - # Execute the following cmd in driver with runtime_env - execute_statement = "" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().splitlines()[-1].startswith("ValueError"), out - - -@pytest.mark.skip("py_modules not supported yet.") -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_invalid_py_modules(ray_start_cluster_head, working_dir, client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - env["EXIT_AFTER_INIT"] = "1" - - runtime_env = "{ 'py_modules': [10] }" - # Execute the following cmd in driver with runtime_env - execute_statement = "" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().splitlines()[-1].startswith("TypeError"), out - - runtime_env = f"{{ 'py_modules': [os.path.join(r'{working_dir}', 'na')] }}" - # Execute the following cmd in driver with runtime_env - execute_statement = "" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().splitlines()[-1].startswith("ValueError"), out - - runtime_env = "{ 'py_modules': ['s3://bucket/package'] }" - # Execute the following cmd in driver with runtime_env - execute_statement = "" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().splitlines()[-1].startswith("ValueError"), out - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_single_node(ray_start_cluster_head, working_dir_parametrized, - client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - - # Unpack lazy fixture tuple to override "working_dir" to fill up - # execute_statement locals() - print(working_dir_parametrized) - working_dir, runtime_env, expected = working_dir_parametrized - print(working_dir, runtime_env, expected) - - # Execute the following cmd in driver with runtime_env - execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))" - script = driver_script.format(**locals()) - - with tempfile.TemporaryDirectory() as tmp_dir: - # Execute driver script in brand new, empty directory - os.chdir(tmp_dir) - out = run_string_as_driver(script, env) - assert out.strip().split()[-1] == expected, out - assert len(list(Path(runtime_env_dir).iterdir())) == 1 - assert len(kv._internal_kv_list("gcs://")) == 0 - # working_dir fixture will take care of going back to original test - # folder - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_two_node(two_node_cluster, working_dir_parametrized, client_mode): - cluster, _ = two_node_cluster - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - # Unpack lazy fixture tuple to override "working_dir" to fill up - # execute_statement locals() - working_dir, runtime_env, expected = working_dir_parametrized - # Execute the following cmd in driver with runtime_env - execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))" - script = driver_script.format(**locals()) - with tempfile.TemporaryDirectory() as tmp_dir: - # Execute driver script in brand new, empty directory - os.chdir(tmp_dir) - out = run_string_as_driver(script, env) - assert out.strip().split()[-1] == expected, out - assert len(list(Path(runtime_env_dir).iterdir())) == 1 - assert len(kv._internal_kv_list("gcs://")) == 0 - # working_dir fixture will take care of going back to original test - # folder - - -@pytest.mark.skip("py_modules not supported yet.") -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_two_node_module(two_node_cluster, working_dir, client_mode): - cluster, _ = two_node_cluster - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - # test runtime_env iwth py_modules - runtime_env = """{ "py_modules": [test_module.__path__[0]] }""" - # Execute the following cmd in driver with runtime_env - execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().split()[-1] == "1000", out - assert len(list(Path(runtime_env_dir).iterdir())) == 1 - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_two_node_local_file(two_node_cluster, working_dir, client_mode): - with open(os.path.join(working_dir, "test_file"), "w") as f: - f.write("1") - cluster, _ = two_node_cluster - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - # test runtime_env iwth working_dir - runtime_env = f"""{{ "working_dir": "{working_dir}" }}""" - # Execute the following cmd in driver with runtime_env - execute_statement = """ -vals = ray.get([check_file.remote('test_file')] * 1000) -print(sum([int(v) for v in vals])) -""" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().split()[-1] == "1000", out - assert len(list(Path(runtime_env_dir).iterdir())) == 1 - assert len(kv._internal_kv_list("gcs://")) == 0 - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_exclusion(ray_start_cluster_head, working_dir, client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - working_path = Path(working_dir) - - create_file(working_path / "tmp_dir" / "test_1") - create_file(working_path / "tmp_dir" / "test_2") - create_file(working_path / "tmp_dir" / "test_3") - create_file(working_path / "tmp_dir" / "sub_dir" / "test_1") - create_file(working_path / "tmp_dir" / "sub_dir" / "test_2") - create_file(working_path / "test1") - create_file(working_path / "test2") - create_file(working_path / "test3") - tmp_dir_test_3 = str((working_path / "tmp_dir" / "test_3").absolute()) - runtime_env = f"""{{ - "working_dir": r"{working_dir}", - }}""" - execute_statement = """ - vals = ray.get([ - check_file.remote('test1'), - check_file.remote('test2'), - check_file.remote('test3'), - check_file.remote(os.path.join('tmp_dir', 'test_1')), - check_file.remote(os.path.join('tmp_dir', 'test_2')), - check_file.remote(os.path.join('tmp_dir', 'test_3')), - check_file.remote(os.path.join('tmp_dir', 'sub_dir', 'test_1')), - check_file.remote(os.path.join('tmp_dir', 'sub_dir', 'test_2')), - ]) - print(','.join(vals)) -""" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - # Test it works before - assert out.strip().split("\n")[-1] == \ - "Test,Test,Test,Test,Test,Test,Test,Test", out - runtime_env = f"""{{ - "working_dir": r"{working_dir}", - "excludes": [ - # exclude by relative path - r"test2", - # exclude by dir - r"{str(Path("tmp_dir") / "sub_dir")}", - # exclude part of the dir - r"{str(Path("tmp_dir") / "test_1")}", - # exclude part of the dir - r"{str(Path("tmp_dir") / "test_2")}", - ] - }}""" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().split("\n")[-1] == \ - "Test,FAILED,Test,FAILED,FAILED,Test,FAILED,FAILED", out - # Test excluding all files using gitignore pattern matching syntax - runtime_env = f"""{{ - "working_dir": r"{working_dir}", - "excludes": ["*"] - }}""" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().split("\n")[-1] == \ - "FAILED,FAILED,FAILED,FAILED,FAILED,FAILED,FAILED,FAILED", out - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_exclusion_2(ray_start_cluster_head, working_dir, client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - working_path = Path(working_dir) - - def create_file(p): - if not p.parent.exists(): - p.parent.mkdir(parents=True) - with p.open("w") as f: - f.write("Test") - - create_file(working_path / "tmp_dir" / "test_1") - create_file(working_path / "tmp_dir" / "test_2") - create_file(working_path / "tmp_dir" / "test_3") - create_file(working_path / "tmp_dir" / "sub_dir" / "test_1") - create_file(working_path / "tmp_dir" / "sub_dir" / "test_2") - create_file(working_path / "test1") - create_file(working_path / "test2") - create_file(working_path / "test3") - create_file(working_path / "cache" / "test_1") - create_file(working_path / "tmp_dir" / "cache" / "test_1") - create_file(working_path / "another_dir" / "cache" / "test_1") - tmp_dir_test_3 = str((working_path / "tmp_dir" / "test_3").absolute()) - runtime_env = f"""{{ - "working_dir": r"{working_dir}", - }}""" - execute_statement = """ - vals = ray.get([ - check_file.remote('test1'), - check_file.remote('test2'), - check_file.remote('test3'), - check_file.remote(os.path.join('tmp_dir', 'test_1')), - check_file.remote(os.path.join('tmp_dir', 'test_2')), - check_file.remote(os.path.join('tmp_dir', 'test_3')), - check_file.remote(os.path.join('tmp_dir', 'sub_dir', 'test_1')), - check_file.remote(os.path.join('tmp_dir', 'sub_dir', 'test_2')), - check_file.remote(os.path.join("cache", "test_1")), - check_file.remote(os.path.join("tmp_dir", "cache", "test_1")), - check_file.remote(os.path.join("another_dir", "cache", "test_1")), - ]) - print(','.join(vals)) -""" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - # Test it works before - assert out.strip().split("\n")[-1] == \ - "Test,Test,Test,Test,Test,Test,Test,Test,Test,Test,Test", out - with open(f"{working_dir}/.gitignore", "w") as f: - f.write(""" -# Comment -test_[12] -/test1 -!/tmp_dir/sub_dir/test_1 -cache/ -""") - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - t = out.strip().split("\n")[-1] - assert out.strip().split("\n")[-1] == \ - "FAILED,Test,Test,FAILED,FAILED,Test,Test,FAILED,FAILED,FAILED,FAILED" - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_runtime_env_getter(ray_start_cluster_head, working_dir, client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - runtime_env = f"""{{ "working_dir": "{working_dir}" }}""" - # Execute the following cmd in driver with runtime_env - execute_statement = """ -print(ray.get_runtime_context().runtime_env["working_dir"]) -""" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - working_dir_uri = out.strip().split()[-1] - assert working_dir_uri.startswith("gcs://_ray_pkg_") - assert working_dir_uri.endswith(".zip") - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_regular_actors(ray_start_cluster_head, working_dir, client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - runtime_env = f"""{{ "working_dir": "{working_dir}" }}""" - # Execute the following cmd in driver with runtime_env - execute_statement = """ -test_actor = TestActor.options(name="test_actor").remote() -print(sum(ray.get([test_actor.one.remote()] * 1000))) -""" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().split()[-1] == "1000", out - assert len(list(Path(runtime_env_dir).iterdir())) == 1 - assert len(kv._internal_kv_list("gcs://")) == 0 - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -@pytest.mark.parametrize("client_mode", [True, False]) -def test_detached_actors(ray_start_cluster_head, working_dir, client_mode): - cluster = ray_start_cluster_head - address, env, runtime_env_dir = start_client_server(cluster, client_mode) - runtime_env = f"""{{ "working_dir": "{working_dir}" }}""" - # Execute the following cmd in driver with runtime_env - execute_statement = """ -test_actor = TestActor.options(name="test_actor", lifetime="detached").remote() -print(sum(ray.get([test_actor.one.remote()] * 1000))) -""" - script = driver_script.format(**locals()) - out = run_string_as_driver(script, env) - assert out.strip().split()[-1] == "1000", out - # It's a detached actors, so it should still be there - assert len(kv._internal_kv_list("gcs://")) == 1 - assert len(list(Path(runtime_env_dir).iterdir())) == 2 - pkg_dir = [f for f in Path(runtime_env_dir).glob("*") if f.is_dir()][0] - sys.path.insert(0, str(pkg_dir)) - test_actor = ray.get_actor("test_actor") - assert sum(ray.get([test_actor.one.remote()] * 1000)) == 1000 - ray.kill(test_actor) - time.sleep(5) - assert len(list(Path(runtime_env_dir).iterdir())) == 1 - assert len(kv._internal_kv_list("gcs://")) == 0 - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -def test_util_without_job_config(shutdown_only): - from ray.cluster_utils import Cluster - - with tempfile.TemporaryDirectory() as tmp_dir: - with (Path(tmp_dir) / "lib.py").open("w") as f: - f.write(""" -def one(): - return 1 - """) - old_dir = os.getcwd() - os.chdir(tmp_dir) - cluster = Cluster() - cluster.add_node(num_cpus=1) - ray.init(address=cluster.address) - address, env, runtime_env_dir = start_client_server(cluster, True) - script = f""" -import ray -import ray.util -import os - - -ray.util.connect("{address}", job_config=None) - -@ray.remote -def run(): - from lib import one - return one() - -print(ray.get([run.remote()])[0]) -""" - out = run_string_as_driver(script, env) - print(out) - os.chdir(old_dir) - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -def test_init(shutdown_only): - with tempfile.TemporaryDirectory() as tmp_dir: - old_dir = os.getcwd() - os.chdir(tmp_dir) - with open("hello", "w") as f: - f.write("world") - ray.init(runtime_env={"working_dir": "."}) - - @ray.remote - class Test: - def test(self): - with open("hello") as f: - return f.read() - - t = Test.remote() - assert ray.get(t.test.remote()) == "world" - os.chdir(old_dir) - def test_get_wheel_filename(): ray_version = "2.0.0.dev0" @@ -763,36 +121,6 @@ def test_container_option_serialize(): assert job_config_serialized.count(b"image") == 1 -def test_working_dir_override_failure(shutdown_only): - ray.init() - - with pytest.raises(ValueError): - - @ray.remote(runtime_env={"working_dir": "."}) - def f(): - pass - - @ray.remote - def g(): - pass - - with pytest.raises(ValueError): - g.options(runtime_env={"working_dir": "."}) - - with pytest.raises(ValueError): - - @ray.remote(runtime_env={"working_dir": "."}) - class A: - pass - - @ray.remote - class B: - pass - - with pytest.raises(ValueError): - B.options(runtime_env={"working_dir": "."}) - - @pytest.mark.skipif( sys.platform == "win32", reason="runtime_env unsupported on Windows.") def test_invalid_conda_env(shutdown_only): @@ -881,51 +209,6 @@ def test_no_spurious_worker_startup(shutdown_only): assert got_num_workers, "failed to read num workers for 10 seconds" -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -def test_large_file_boundary(shutdown_only): - with tempfile.TemporaryDirectory() as tmp_dir: - old_dir = os.getcwd() - os.chdir(tmp_dir) - - # Check that packages just under the max size work as expected. - size = GCS_STORAGE_MAX_SIZE - 1024 * 1024 - with open("test_file", "wb") as f: - f.write(os.urandom(size)) - - ray.init(runtime_env={"working_dir": "."}) - - @ray.remote - class Test: - def get_size(self): - with open("test_file", "rb") as f: - return len(f.read()) - - t = Test.remote() - assert ray.get(t.get_size.remote()) == size - os.chdir(old_dir) - - -@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") -def test_large_file_error(shutdown_only): - with tempfile.TemporaryDirectory() as tmp_dir: - old_dir = os.getcwd() - os.chdir(tmp_dir) - - # Write to two separate files, each of which is below the threshold to - # make sure the error is for the full package size. - size = GCS_STORAGE_MAX_SIZE // 2 + 1 - with open("test_file_1", "wb") as f: - f.write(os.urandom(size)) - - with open("test_file_2", "wb") as f: - f.write(os.urandom(size)) - - with pytest.raises(RuntimeError): - ray.init(runtime_env={"working_dir": "."}) - - os.chdir(old_dir) - - if __name__ == "__main__": import sys sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_runtime_env_complicated.py b/python/ray/tests/test_runtime_env_complicated.py index 4ae7a1d9e..a7db638ba 100644 --- a/python/ray/tests/test_runtime_env_complicated.py +++ b/python/ray/tests/test_runtime_env_complicated.py @@ -920,38 +920,6 @@ def test_runtime_env_override(call_ray_start): ray.shutdown() -@pytest.mark.skipif( - os.environ.get("CI") and sys.platform != "linux", - reason="This test is only run on linux CI machines.") -def test_runtime_env_inheritance_regression(shutdown_only): - # https://github.com/ray-project/ray/issues/16479 - with tempfile.TemporaryDirectory() as tmpdir, chdir(tmpdir): - with open("hello", "w") as f: - f.write("world") - - ray.init(runtime_env={"working_dir": "."}) - - # Make sure we aren't reading the original file. - os.unlink("hello") - - @ray.remote - class Test: - def f(self): - return open("hello").read() - - # Passing working_dir URI through directly should work. - env1 = ray.get_runtime_context().runtime_env - assert "working_dir" in env1 - t = Test.options(runtime_env=env1).remote() - assert ray.get(t.f.remote()) == "world" - - # Passing a local directory should not work. - env2 = ray.get_runtime_context().runtime_env - env2["working_dir"] = "." - with pytest.raises(ValueError): - t = Test.options(runtime_env=env2).remote() - - @pytest.mark.skipif( os.environ.get("CI") and sys.platform != "linux", reason="This test is only run on linux CI machines.") diff --git a/python/ray/tests/test_runtime_env_working_dir.py b/python/ray/tests/test_runtime_env_working_dir.py new file mode 100644 index 000000000..1f769e4ad --- /dev/null +++ b/python/ray/tests/test_runtime_env_working_dir.py @@ -0,0 +1,687 @@ +from contextlib import contextmanager +import os +from pathlib import Path +import sys +import tempfile + +import pytest +from pytest_lazyfixture import lazy_fixture + +import ray +import ray.experimental.internal_kv as kv +from ray._private.test_utils import wait_for_condition +from ray._private.runtime_env.packaging import GCS_STORAGE_MAX_SIZE + +S3_PACKAGE_URI = "s3://runtime-env-test/remote_runtime_env.zip" + + +@pytest.fixture(scope="function", params=["ray_client", "no_ray_client"]) +def start_cluster(ray_start_cluster, request): + assert request.param in {"ray_client", "no_ray_client"} + use_ray_client: bool = request.param == "ray_client" + + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + if use_ray_client: + cluster.head_node._ray_params.ray_client_server_port = "10003" + cluster.head_node.start_ray_client_server() + address = "ray://localhost:10003" + else: + address = cluster.address + + yield cluster, address + + +@pytest.fixture(scope="function") +def tmp_working_dir(): + with tempfile.TemporaryDirectory() as tmp_dir: + path = Path(tmp_dir) + + hello_file = path / "hello" + with hello_file.open(mode="w") as f: + f.write("world") + + module_path = path / "test_module" + module_path.mkdir(parents=True) + + test_file = module_path / "test.py" + with test_file.open(mode="w") as f: + f.write("def one():\n") + f.write(" return 1\n") + + init_file = module_path / "__init__.py" + with init_file.open(mode="w") as f: + f.write("from test_module.test import one\n") + + yield tmp_dir + + +@pytest.mark.parametrize("test_failure", [True, False]) +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +def test_lazy_reads(start_cluster, tmp_working_dir, test_failure): + """Tests the case where we lazily read files or import inside a task/actor. + + This tests both that this fails *without* the working_dir and that it + passes with it. + """ + cluster, address = start_cluster + + if test_failure: + # Don't pass working_dir, so it should fail! + ray.init(address) + else: + ray.init(address, runtime_env={"working_dir": tmp_working_dir}) + + @ray.remote + def test_import(): + import test_module + return test_module.one() + + if test_failure: + with pytest.raises(ImportError): + ray.get(test_import.remote()) + else: + assert ray.get(test_import.remote()) == 1 + + @ray.remote + def test_read(): + return open("hello").read() + + if test_failure: + with pytest.raises(FileNotFoundError): + ray.get(test_read.remote()) + else: + assert ray.get(test_read.remote()) == "world" + + @ray.remote + class Actor: + def test_import(self): + import test_module + return test_module.one() + + def test_read(self): + return open("hello").read() + + a = Actor.remote() + if test_failure: + with pytest.raises(ImportError): + assert ray.get(a.test_import.remote()) == 1 + with pytest.raises(FileNotFoundError): + assert ray.get(a.test_read.remote()) == "world" + else: + assert ray.get(a.test_import.remote()) == 1 + assert ray.get(a.test_read.remote()) == "world" + + +@pytest.mark.parametrize("test_failure", [True, False]) +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +def test_captured_import(start_cluster, tmp_working_dir, test_failure): + """Tests importing a module in the driver and capturing it in a task/actor. + + This tests both that this fails *without* the working_dir and that it + passes with it. + """ + cluster, address = start_cluster + + if test_failure: + # Don't pass working_dir, so it should fail! + ray.init(address) + else: + ray.init(address, runtime_env={"working_dir": tmp_working_dir}) + + # Import in the driver. + sys.path.insert(0, tmp_working_dir) + import test_module + + @ray.remote + def test_import(): + return test_module.one() + + if test_failure: + with pytest.raises(Exception): + ray.get(test_import.remote()) + else: + assert ray.get(test_import.remote()) == 1 + + @ray.remote + class Actor: + def test_import(self): + return test_module.one() + + def test_read(self): + return open("hello").read() + + if test_failure: + with pytest.raises(Exception): + a = Actor.remote() + assert ray.get(a.test_import.remote()) == 1 + else: + a = Actor.remote() + assert ray.get(a.test_import.remote()) == 1 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +def test_empty_working_dir(start_cluster): + """Tests the case where we pass an empty directory as the working_dir.""" + cluster, address = start_cluster + with tempfile.TemporaryDirectory() as working_dir: + ray.init(address, runtime_env={"working_dir": working_dir}) + + @ray.remote + def listdir(): + return os.listdir() + + assert len(ray.get(listdir.remote())) == 0 + + @ray.remote + class A: + def listdir(self): + return os.listdir() + pass + + a = A.remote() + assert len(ray.get(a.listdir.remote())) == 0 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +def test_invalid_working_dir(start_cluster): + """Tests input validation for the working_dir.""" + cluster, address = start_cluster + + with pytest.raises(TypeError): + ray.init(address, runtime_env={"working_dir": 10}) + + ray.shutdown() + + with pytest.raises(ValueError): + ray.init(address, runtime_env={"working_dir": "/does/not/exist"}) + + ray.shutdown() + + with pytest.raises(ValueError): + ray.init(address, runtime_env={"working_dir": "does_not_exist"}) + + ray.shutdown() + + with pytest.raises(ValueError): + ray.init(address, runtime_env={"working_dir": "s3://no_dot_zip"}) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +@pytest.mark.parametrize("test_failure", [True, False]) +@pytest.mark.parametrize("per_task_actor", [True, False]) +def test_s3_uri(start_cluster, test_failure, per_task_actor): + """Tests the case where we lazily read files or import inside a task/actor. + + In this case, the files come from an S3 URI. + + This tests both that this fails *without* the working_dir and that it + passes with it. + """ + cluster, address = start_cluster + + env = {"working_dir": S3_PACKAGE_URI} + if test_failure or per_task_actor: + ray.init(address) + else: + ray.init(address, runtime_env=env) + + @ray.remote + def test_import(): + import test_module + return test_module.one() + + if not test_failure and per_task_actor: + test_import = test_import.options(runtime_env=env) + + if test_failure: + with pytest.raises(ImportError): + ray.get(test_import.remote()) + else: + assert ray.get(test_import.remote()) == 2 + + @ray.remote + class Actor: + def test_import(self): + import test_module + return test_module.one() + + if not test_failure and per_task_actor: + Actor = Actor.options(runtime_env=env) + + a = Actor.remote() + if test_failure: + with pytest.raises(ImportError): + assert ray.get(a.test_import.remote()) == 2 + else: + assert ray.get(a.test_import.remote()) == 2 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +@pytest.mark.parametrize( + "working_dir", + [S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")]) +def test_multi_node(start_cluster, working_dir): + """Tests that the working_dir is propagated across multi-node clusters.""" + NUM_NODES = 3 + cluster, address = start_cluster + for _ in range(NUM_NODES - 1): # Head node already added. + cluster.add_node(num_cpus=1) + + ray.init(address, runtime_env={"working_dir": working_dir}) + + @ray.remote + def check_and_get_node_id(): + import test_module + test_module.one() + return ray.get_runtime_context().node_id + + object_refs = [check_and_get_node_id.remote() for _ in range(10000)] + assert len(set(ray.get(object_refs))) == NUM_NODES + + +def check_internal_kv_gced(): + return len(kv._internal_kv_list("gcs://")) == 0 + + +def check_local_files_gced(cluster): + for node in cluster.list_all_nodes(): + all_files = os.listdir(node.get_runtime_env_dir_path()) + # Check that there are no files remaining except for .lock files. + # TODO(edoakes): the lock files should get cleaned up too! + if len(list(filter(lambda f: not f.endswith(".lock"), all_files))) > 0: + return False + + return True + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +@pytest.mark.parametrize( + "working_dir", + [S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")]) +def test_job_level_gc(start_cluster, working_dir): + """Tests that job-level working_dir is GC'd when the job exits.""" + NUM_NODES = 3 + cluster, address = start_cluster + for _ in range(NUM_NODES - 1): # Head node already added. + cluster.add_node(num_cpus=1) + + ray.init(address, runtime_env={"working_dir": working_dir}) + + # For a local directory, the package should be in the GCS. + # For an S3 URI, there should be nothing in the GCS because + # it will be downloaded from S3 directly on each node. + if working_dir == S3_PACKAGE_URI: + assert check_internal_kv_gced() + else: + assert not check_internal_kv_gced() + + @ray.remote + class A: + def test_import(self): + import test_module + test_module.one() + + actors = [A.remote() for _ in range(5)] + ray.get([a.test_import.remote() for a in actors]) + + @ray.remote + def test_import(): + import test_module + test_module.one() + + ray.get([test_import.remote() for _ in range(10000)]) + + if working_dir == S3_PACKAGE_URI: + assert check_internal_kv_gced() + else: + assert not check_internal_kv_gced() + assert not check_local_files_gced(cluster) + + ray.shutdown() + + # Need to re-connect to use internal_kv. + ray.init(address=address) + wait_for_condition(check_internal_kv_gced) + wait_for_condition(lambda: check_local_files_gced(cluster)) + + +# TODO(edoakes): fix this bug and enable test. +@pytest.mark.skip("Currently failing.") +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +def test_actor_level_gc(start_cluster): + """Tests that actor-level working_dir is GC'd when the actor exits.""" + NUM_NODES = 3 + cluster, address = start_cluster + for _ in range(NUM_NODES - 1): # Head node already added. + cluster.add_node(num_cpus=1) + + ray.init(address) + + @ray.remote + class A: + def check(self): + assert "test_module" in os.listdir() + + # TODO(edoakes): this doesn't work in decorator with ray client. + A = A.options(runtime_env={"working_dir": S3_PACKAGE_URI}) + + actors = [A.remote() for _ in range(5)] + ray.get([a.check.remote() for a in actors]) + + assert not check_local_files_gced(cluster) + + [ray.kill(a) for a in actors] + + wait_for_condition(lambda: check_local_files_gced(cluster)) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +@pytest.mark.parametrize( + "working_dir", + [S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")]) +def test_detached_actor_gc(start_cluster, working_dir): + """Tests that URIs for detached actors are GC'd only when they exit.""" + cluster, address = start_cluster + ray.init( + address, namespace="test", runtime_env={"working_dir": working_dir}) + + # For a local directory, the package should be in the GCS. + # For an S3 URI, there should be nothing in the GCS because + # it will be downloaded from S3 directly on each node. + if working_dir == S3_PACKAGE_URI: + assert check_internal_kv_gced() + else: + assert not check_internal_kv_gced() + + @ray.remote + class A: + def test_import(self): + import test_module + test_module.one() + + a = A.options(name="test", lifetime="detached").remote() + ray.get(a.test_import.remote()) + + if working_dir == S3_PACKAGE_URI: + assert check_internal_kv_gced() + else: + assert not check_internal_kv_gced() + assert not check_local_files_gced(cluster) + + ray.shutdown() + + ray.init(address, namespace="test") + + if working_dir == S3_PACKAGE_URI: + assert check_internal_kv_gced() + else: + assert not check_internal_kv_gced() + assert not check_local_files_gced(cluster) + + a = ray.get_actor("test") + ray.get(a.test_import.remote()) + + ray.kill(a) + wait_for_condition(check_internal_kv_gced) + wait_for_condition(lambda: check_local_files_gced(cluster)) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +def test_exclusion(start_cluster, tmp_working_dir): + """Tests various forms of the 'excludes' parameter.""" + cluster, address = start_cluster + + def create_file(p): + if not p.parent.exists(): + p.parent.mkdir(parents=True) + with p.open("w") as f: + f.write("Test") + + working_path = Path(tmp_working_dir) + create_file(working_path / "test1") + create_file(working_path / "test2") + create_file(working_path / "test3") + create_file(working_path / "tmp_dir" / "test_1") + create_file(working_path / "tmp_dir" / "test_2") + create_file(working_path / "tmp_dir" / "test_3") + create_file(working_path / "tmp_dir" / "sub_dir" / "test_1") + create_file(working_path / "tmp_dir" / "sub_dir" / "test_2") + create_file(working_path / "cache" / "test_1") + create_file(working_path / "tmp_dir" / "cache" / "test_1") + create_file(working_path / "another_dir" / "cache" / "test_1") + + # Test that all files are present without excluding. + ray.init(address, runtime_env={"working_dir": tmp_working_dir}) + + @ray.remote + def check_file(name): + try: + with open(name) as f: + return f.read() + except Exception: + return "FAILED" + + def get_all(): + return ray.get([ + check_file.remote("test1"), + check_file.remote("test2"), + check_file.remote("test3"), + check_file.remote(os.path.join("tmp_dir", "test_1")), + check_file.remote(os.path.join("tmp_dir", "test_2")), + check_file.remote(os.path.join("tmp_dir", "test_3")), + check_file.remote(os.path.join("tmp_dir", "sub_dir", "test_1")), + check_file.remote(os.path.join("tmp_dir", "sub_dir", "test_2")), + check_file.remote(os.path.join("cache", "test_1")), + check_file.remote(os.path.join("tmp_dir", "cache", "test_1")), + check_file.remote(os.path.join("another_dir", "cache", "test_1")), + ]) + + assert get_all() == [ + "Test", "Test", "Test", "Test", "Test", "Test", "Test", "Test", "Test", + "Test", "Test" + ] + + ray.shutdown() + + # Test various exclusion methods. + ray.init( + address, + runtime_env={ + "working_dir": tmp_working_dir, + "excludes": [ + # exclude by relative path + "test2", + # exclude by dir + str(Path("tmp_dir") / "sub_dir"), + # exclude part of the dir + str(Path("tmp_dir") / "test_1"), + # exclude part of the dir + str(Path("tmp_dir") / "test_2"), + ] + }) + + assert get_all() == [ + "Test", "FAILED", "Test", "FAILED", "FAILED", "Test", "FAILED", + "FAILED", "Test", "Test", "Test" + ] + + ray.shutdown() + + # Test excluding all files using gitignore pattern matching syntax + ray.init( + address, + runtime_env={ + "working_dir": tmp_working_dir, + "excludes": ["*"] + }) + + assert get_all() == [ + "FAILED", "FAILED", "FAILED", "FAILED", "FAILED", "FAILED", "FAILED", + "FAILED", "FAILED", "FAILED", "FAILED" + ] + + ray.shutdown() + + # Test excluding with a .gitignore file. + with open(f"{tmp_working_dir}/.gitignore", "w") as f: + f.write(""" +# Comment +test_[12] +/test1 +!/tmp_dir/sub_dir/test_1 +cache/ +""") + + ray.init( + address, runtime_env={ + "working_dir": tmp_working_dir, + }) + + assert get_all() == [ + "FAILED", "Test", "Test", "FAILED", "FAILED", "Test", "Test", "FAILED", + "FAILED", "FAILED", "FAILED" + ] + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +@pytest.mark.parametrize( + "working_dir", + [S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")]) +def test_runtime_context(start_cluster, working_dir): + """Tests that the working_dir is propagated in the runtime_context.""" + cluster, address = start_cluster + ray.init(runtime_env={"working_dir": working_dir}) + + def check(): + wd = ray.get_runtime_context().runtime_env["working_dir"] + if working_dir == S3_PACKAGE_URI: + assert wd == S3_PACKAGE_URI + else: + assert wd.startswith("gcs://_ray_pkg_") + + check() + + @ray.remote + def task(): + check() + + ray.get(task.remote()) + + @ray.remote + class Actor: + def check(self): + check() + + a = Actor.remote() + ray.get(a.check.remote()) + + +def test_override_failure(shutdown_only): + """Tests invalid override behaviors.""" + ray.init() + + with pytest.raises(ValueError): + + @ray.remote(runtime_env={"working_dir": "."}) + def f(): + pass + + @ray.remote + def g(): + pass + + with pytest.raises(ValueError): + g.options(runtime_env={"working_dir": "."}) + + with pytest.raises(ValueError): + + @ray.remote(runtime_env={"working_dir": "."}) + class A: + pass + + @ray.remote + class B: + pass + + with pytest.raises(ValueError): + B.options(runtime_env={"working_dir": "."}) + + +@contextmanager +def chdir(d: str): + old_dir = os.getcwd() + os.chdir(d) + yield + os.chdir(old_dir) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +def test_inheritance(start_cluster): + """Tests that child tasks/actors inherit URIs properly.""" + cluster, address = start_cluster + with tempfile.TemporaryDirectory() as tmpdir, chdir(tmpdir): + with open("hello", "w") as f: + f.write("world") + + ray.init(address, runtime_env={"working_dir": "."}) + + # Make sure we aren't reading the original file. + os.unlink("hello") + + @ray.remote + class Test: + def f(self): + return open("hello").read() + + # Passing working_dir URI through directly should work. + env1 = ray.get_runtime_context().runtime_env + assert "working_dir" in env1 + t = Test.options(runtime_env=env1).remote() + assert ray.get(t.f.remote()) == "world" + + # Passing a local directory should not work. + env2 = ray.get_runtime_context().runtime_env + env2["working_dir"] = "." + with pytest.raises(ValueError): + t = Test.options(runtime_env=env2).remote() + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +def test_large_file_boundary(shutdown_only): + """Check that packages just under the max size work as expected.""" + with tempfile.TemporaryDirectory() as tmp_dir, chdir(tmp_dir): + size = GCS_STORAGE_MAX_SIZE - 1024 * 1024 + with open("test_file", "wb") as f: + f.write(os.urandom(size)) + + ray.init(runtime_env={"working_dir": "."}) + + @ray.remote + class Test: + def get_size(self): + with open("test_file", "rb") as f: + return len(f.read()) + + t = Test.remote() + assert ray.get(t.get_size.remote()) == size + + +@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.") +def test_large_file_error(shutdown_only): + with tempfile.TemporaryDirectory() as tmp_dir, chdir(tmp_dir): + # Write to two separate files, each of which is below the threshold to + # make sure the error is for the full package size. + size = GCS_STORAGE_MAX_SIZE // 2 + 1 + with open("test_file_1", "wb") as f: + f.write(os.urandom(size)) + + with open("test_file_2", "wb") as f: + f.write(os.urandom(size)) + + with pytest.raises(RuntimeError): + ray.init(runtime_env={"working_dir": "."}) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-sv", __file__]))