[runtime_env] Clean up working dir tests, add more test cases (#19597)

This commit is contained in:
Edward Oakes 2021-10-22 12:35:27 -05:00 committed by GitHub
parent 4f05bac8fb
commit 0760fe869d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 698 additions and 755 deletions

View file

@ -6,7 +6,7 @@ from ray.experimental.internal_kv import _internal_kv_initialized
from ray._private.runtime_env.context import RuntimeEnvContext
from ray._private.runtime_env.packaging import (
download_and_unpack_package, delete_package, get_uri_for_directory,
parse_uri, upload_package_if_needed)
parse_uri, Protocol, upload_package_if_needed)
default_logger = logging.getLogger(__name__)
@ -28,10 +28,14 @@ def upload_working_dir_if_needed(runtime_env: Dict[str, Any],
# working_dir is already a URI -- just pass it through.
try:
parse_uri(working_dir)
return runtime_env
protocol, path = parse_uri(working_dir)
except ValueError:
pass
protocol, path = None, None
if protocol is not None:
if protocol == Protocol.S3 and not path.endswith(".zip"):
raise ValueError("Only .zip files supported for S3 URIs.")
return runtime_env
# Remove excludes, it isn't relevant after the upload step.
excludes = runtime_env.pop("excludes", None)

View file

@ -252,7 +252,8 @@ py_test_module_list(
py_test_module_list(
files = [
"test_runtime_env.py"
"test_runtime_env.py",
"test_runtime_env_working_dir.py"
],
size = "large",
extra_srcs = SRCS,

View file

@ -1,658 +1,16 @@
import os
import pytest
import sys
import tempfile
import time
import requests
from pathlib import Path
from pytest_lazyfixture import lazy_fixture
import ray
from ray.exceptions import RuntimeEnvSetupError
import ray.experimental.internal_kv as kv
from ray._private.test_utils import (run_string_as_driver, wait_for_condition)
from ray._private.runtime_env.packaging import GCS_STORAGE_MAX_SIZE, parse_uri
from ray._private.test_utils import wait_for_condition
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
get_release_wheel_url)
S3_PACKAGE_URI = "s3://runtime-env-test/remote_runtime_env.zip"
driver_script = """
import logging
import os
import sys
import time
import traceback
import ray
import ray.util
# Define test_module for py_module tests
try:
import test_module
except:
pass
try:
job_config = ray.job_config.JobConfig(
runtime_env={runtime_env}
)
if not job_config.runtime_env:
job_config=None
if os.environ.get("USE_RAY_CLIENT"):
ray.client("{address}").env({runtime_env}).namespace("default_test_namespace").connect()
else:
ray.init(address="{address}",
job_config=job_config,
logging_level=logging.DEBUG,
namespace="default_test_namespace"
)
except ValueError:
print("ValueError:", traceback.format_exc())
sys.exit(0)
except TypeError:
print("TypeError:", traceback.format_exc())
sys.exit(0)
except Exception:
print("ERROR:", traceback.format_exc())
sys.exit(0)
if os.environ.get("EXIT_AFTER_INIT"):
sys.exit(0)
# Schedule a dummy task to kick off runtime env agent's working_dir setup()
@ray.remote
def dummy_task():
return "dummy task scheduled"
ray.get([dummy_task.remote()])
# Insert working_dir path with unzipped files
sys.path.insert(0, "{working_dir}")
# Actual import of test_module after working_dir is setup
import test_module
@ray.remote
def run_test():
return test_module.one()
@ray.remote
def check_file(name):
try:
with open(name) as f:
return f.read()
except:
return "FAILED"
@ray.remote
class TestActor(object):
@ray.method(num_returns=1)
def one(self):
return test_module.one()
{execute_statement}
if os.environ.get("USE_RAY_CLIENT"):
ray.util.disconnect()
else:
ray.shutdown()
"""
def create_file(p):
if not p.parent.exists():
p.parent.mkdir()
with p.open("w") as f:
f.write("Test")
@pytest.fixture(scope="function")
def working_dir():
"""Regular local_working_dir test setup for existing tests"""
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)
module_path = path / "test_module"
module_path.mkdir(parents=True)
init_file = module_path / "__init__.py"
test_file = module_path / "test.py"
with test_file.open(mode="w") as f:
f.write("""
def one():
return 1
""")
with init_file.open(mode="w") as f:
f.write("""
from test_module.test import one
""")
old_dir = os.getcwd()
os.chdir(tmp_dir)
yield tmp_dir
os.chdir(old_dir)
@pytest.fixture(scope="function")
def local_working_dir():
"""Parametrized local_working_dir test setup"""
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)
module_path = path / "test_module"
module_path.mkdir(parents=True)
# There are "test.py" file with same module name and function
# signature, but different return value. Regular runtime env
# working_dir uses existing file and should return 1 on each
# call to one(); While s3 remote runtime env with same file
# names will return 2.
init_file = module_path / "__init__.py"
test_file = module_path / "test.py"
with test_file.open(mode="w") as f:
f.write("""
def one():
return 1
""")
with init_file.open(mode="w") as f:
f.write("""
from test_module.test import one
""")
old_dir = os.getcwd()
os.chdir(tmp_dir)
runtime_env = f"""{{ "working_dir": "{tmp_dir}" }}"""
# local working_dir's one() return 1 for each call
yield tmp_dir, runtime_env, "1000"
os.chdir(old_dir)
@pytest.fixture(scope="function")
def s3_working_dir():
"""Parametrized s3_working_dir test setup"""
with tempfile.TemporaryDirectory() as tmp_dir:
old_dir = os.getcwd()
os.chdir(tmp_dir)
# There are "test.py" file with same module name and function
# signature, but different return value. Regular runtime env
# working_dir uses existing file and should return 1 on each
# call to one(); While s3 remote runtime env with same file
# names will return 2.
runtime_env = f"""{{ "working_dir": "{S3_PACKAGE_URI}" }}"""
_, pkg_name = parse_uri(S3_PACKAGE_URI)
runtime_env_dir = ray.worker._global_node.get_runtime_env_dir_path()
working_dir = Path(os.path.join(runtime_env_dir,
pkg_name)).with_suffix("")
# s3 working_dir's one() return 2 for each call
yield working_dir, runtime_env, "2000"
os.chdir(old_dir)
@pytest.fixture(
scope="function",
params=[lazy_fixture("local_working_dir"),
lazy_fixture("s3_working_dir")])
def working_dir_parametrized(request):
return request.param
def start_client_server(cluster, client_mode):
env = {}
if client_mode:
ray.worker._global_node._ray_params.ray_client_server_port = "10003"
ray.worker._global_node.start_ray_client_server()
address = "localhost:10003"
env["USE_RAY_CLIENT"] = "1"
else:
address = cluster.address
runtime_env_dir = ray.worker._global_node.get_runtime_env_dir_path()
return address, env, runtime_env_dir
"""
The following test cases are related with runtime env. It following these steps
1) Creating a temporary dir with fixture working_dir
2) Using a template named driver_script defined globally
3) Overwrite runtime_env and execute_statement in the template
4) Execute it as a separate driver and return the result
"""
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_empty_working_dir(ray_start_cluster_head, client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
env["EXIT_AFTER_INIT"] = "1"
with tempfile.TemporaryDirectory() as working_dir:
runtime_env = f"""{{
"working_dir": r"{working_dir}"
}}"""
# Execute the following cmd in driver with runtime_env
execute_statement = "sys.exit(0)"
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert not out.startswith("ERROR:")
@pytest.mark.skip("py_modules not supported yet.")
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_empty_py_modules(ray_start_cluster_head, client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
env["EXIT_AFTER_INIT"] = "1"
with tempfile.TemporaryDirectory() as working_dir:
runtime_env = f"""{{
"py_modules": [r"{working_dir}"]
}}"""
# Execute the following cmd in driver with runtime_env
execute_statement = "sys.exit(0)"
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert not out.startswith("ERROR:")
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_invalid_working_dir(ray_start_cluster_head, working_dir, client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
env["EXIT_AFTER_INIT"] = "1"
runtime_env = "{ 'working_dir': 10 }"
# Execute the following cmd in driver with runtime_env
execute_statement = ""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().splitlines()[-1].startswith("TypeError"), out
runtime_env = f"{{ 'working_dir': os.path.join(r'{working_dir}', 'na') }}"
# Execute the following cmd in driver with runtime_env
execute_statement = ""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().splitlines()[-1].startswith("ValueError"), out
runtime_env = "{ 'working_dir': 's3://bucket/package' }"
# Execute the following cmd in driver with runtime_env
execute_statement = ""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().splitlines()[-1].startswith("ValueError"), out
@pytest.mark.skip("py_modules not supported yet.")
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_invalid_py_modules(ray_start_cluster_head, working_dir, client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
env["EXIT_AFTER_INIT"] = "1"
runtime_env = "{ 'py_modules': [10] }"
# Execute the following cmd in driver with runtime_env
execute_statement = ""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().splitlines()[-1].startswith("TypeError"), out
runtime_env = f"{{ 'py_modules': [os.path.join(r'{working_dir}', 'na')] }}"
# Execute the following cmd in driver with runtime_env
execute_statement = ""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().splitlines()[-1].startswith("ValueError"), out
runtime_env = "{ 'py_modules': ['s3://bucket/package'] }"
# Execute the following cmd in driver with runtime_env
execute_statement = ""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().splitlines()[-1].startswith("ValueError"), out
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_single_node(ray_start_cluster_head, working_dir_parametrized,
client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
# Unpack lazy fixture tuple to override "working_dir" to fill up
# execute_statement locals()
print(working_dir_parametrized)
working_dir, runtime_env, expected = working_dir_parametrized
print(working_dir, runtime_env, expected)
# Execute the following cmd in driver with runtime_env
execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))"
script = driver_script.format(**locals())
with tempfile.TemporaryDirectory() as tmp_dir:
# Execute driver script in brand new, empty directory
os.chdir(tmp_dir)
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == expected, out
assert len(list(Path(runtime_env_dir).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
# working_dir fixture will take care of going back to original test
# folder
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_two_node(two_node_cluster, working_dir_parametrized, client_mode):
cluster, _ = two_node_cluster
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
# Unpack lazy fixture tuple to override "working_dir" to fill up
# execute_statement locals()
working_dir, runtime_env, expected = working_dir_parametrized
# Execute the following cmd in driver with runtime_env
execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))"
script = driver_script.format(**locals())
with tempfile.TemporaryDirectory() as tmp_dir:
# Execute driver script in brand new, empty directory
os.chdir(tmp_dir)
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == expected, out
assert len(list(Path(runtime_env_dir).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
# working_dir fixture will take care of going back to original test
# folder
@pytest.mark.skip("py_modules not supported yet.")
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_two_node_module(two_node_cluster, working_dir, client_mode):
cluster, _ = two_node_cluster
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
# test runtime_env iwth py_modules
runtime_env = """{ "py_modules": [test_module.__path__[0]] }"""
# Execute the following cmd in driver with runtime_env
execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))"
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000", out
assert len(list(Path(runtime_env_dir).iterdir())) == 1
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_two_node_local_file(two_node_cluster, working_dir, client_mode):
with open(os.path.join(working_dir, "test_file"), "w") as f:
f.write("1")
cluster, _ = two_node_cluster
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
# test runtime_env iwth working_dir
runtime_env = f"""{{ "working_dir": "{working_dir}" }}"""
# Execute the following cmd in driver with runtime_env
execute_statement = """
vals = ray.get([check_file.remote('test_file')] * 1000)
print(sum([int(v) for v in vals]))
"""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000", out
assert len(list(Path(runtime_env_dir).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_exclusion(ray_start_cluster_head, working_dir, client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
working_path = Path(working_dir)
create_file(working_path / "tmp_dir" / "test_1")
create_file(working_path / "tmp_dir" / "test_2")
create_file(working_path / "tmp_dir" / "test_3")
create_file(working_path / "tmp_dir" / "sub_dir" / "test_1")
create_file(working_path / "tmp_dir" / "sub_dir" / "test_2")
create_file(working_path / "test1")
create_file(working_path / "test2")
create_file(working_path / "test3")
tmp_dir_test_3 = str((working_path / "tmp_dir" / "test_3").absolute())
runtime_env = f"""{{
"working_dir": r"{working_dir}",
}}"""
execute_statement = """
vals = ray.get([
check_file.remote('test1'),
check_file.remote('test2'),
check_file.remote('test3'),
check_file.remote(os.path.join('tmp_dir', 'test_1')),
check_file.remote(os.path.join('tmp_dir', 'test_2')),
check_file.remote(os.path.join('tmp_dir', 'test_3')),
check_file.remote(os.path.join('tmp_dir', 'sub_dir', 'test_1')),
check_file.remote(os.path.join('tmp_dir', 'sub_dir', 'test_2')),
])
print(','.join(vals))
"""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
# Test it works before
assert out.strip().split("\n")[-1] == \
"Test,Test,Test,Test,Test,Test,Test,Test", out
runtime_env = f"""{{
"working_dir": r"{working_dir}",
"excludes": [
# exclude by relative path
r"test2",
# exclude by dir
r"{str(Path("tmp_dir") / "sub_dir")}",
# exclude part of the dir
r"{str(Path("tmp_dir") / "test_1")}",
# exclude part of the dir
r"{str(Path("tmp_dir") / "test_2")}",
]
}}"""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().split("\n")[-1] == \
"Test,FAILED,Test,FAILED,FAILED,Test,FAILED,FAILED", out
# Test excluding all files using gitignore pattern matching syntax
runtime_env = f"""{{
"working_dir": r"{working_dir}",
"excludes": ["*"]
}}"""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().split("\n")[-1] == \
"FAILED,FAILED,FAILED,FAILED,FAILED,FAILED,FAILED,FAILED", out
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_exclusion_2(ray_start_cluster_head, working_dir, client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
working_path = Path(working_dir)
def create_file(p):
if not p.parent.exists():
p.parent.mkdir(parents=True)
with p.open("w") as f:
f.write("Test")
create_file(working_path / "tmp_dir" / "test_1")
create_file(working_path / "tmp_dir" / "test_2")
create_file(working_path / "tmp_dir" / "test_3")
create_file(working_path / "tmp_dir" / "sub_dir" / "test_1")
create_file(working_path / "tmp_dir" / "sub_dir" / "test_2")
create_file(working_path / "test1")
create_file(working_path / "test2")
create_file(working_path / "test3")
create_file(working_path / "cache" / "test_1")
create_file(working_path / "tmp_dir" / "cache" / "test_1")
create_file(working_path / "another_dir" / "cache" / "test_1")
tmp_dir_test_3 = str((working_path / "tmp_dir" / "test_3").absolute())
runtime_env = f"""{{
"working_dir": r"{working_dir}",
}}"""
execute_statement = """
vals = ray.get([
check_file.remote('test1'),
check_file.remote('test2'),
check_file.remote('test3'),
check_file.remote(os.path.join('tmp_dir', 'test_1')),
check_file.remote(os.path.join('tmp_dir', 'test_2')),
check_file.remote(os.path.join('tmp_dir', 'test_3')),
check_file.remote(os.path.join('tmp_dir', 'sub_dir', 'test_1')),
check_file.remote(os.path.join('tmp_dir', 'sub_dir', 'test_2')),
check_file.remote(os.path.join("cache", "test_1")),
check_file.remote(os.path.join("tmp_dir", "cache", "test_1")),
check_file.remote(os.path.join("another_dir", "cache", "test_1")),
])
print(','.join(vals))
"""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
# Test it works before
assert out.strip().split("\n")[-1] == \
"Test,Test,Test,Test,Test,Test,Test,Test,Test,Test,Test", out
with open(f"{working_dir}/.gitignore", "w") as f:
f.write("""
# Comment
test_[12]
/test1
!/tmp_dir/sub_dir/test_1
cache/
""")
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
t = out.strip().split("\n")[-1]
assert out.strip().split("\n")[-1] == \
"FAILED,Test,Test,FAILED,FAILED,Test,Test,FAILED,FAILED,FAILED,FAILED"
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_runtime_env_getter(ray_start_cluster_head, working_dir, client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
runtime_env = f"""{{ "working_dir": "{working_dir}" }}"""
# Execute the following cmd in driver with runtime_env
execute_statement = """
print(ray.get_runtime_context().runtime_env["working_dir"])
"""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
working_dir_uri = out.strip().split()[-1]
assert working_dir_uri.startswith("gcs://_ray_pkg_")
assert working_dir_uri.endswith(".zip")
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_regular_actors(ray_start_cluster_head, working_dir, client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
runtime_env = f"""{{ "working_dir": "{working_dir}" }}"""
# Execute the following cmd in driver with runtime_env
execute_statement = """
test_actor = TestActor.options(name="test_actor").remote()
print(sum(ray.get([test_actor.one.remote()] * 1000)))
"""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000", out
assert len(list(Path(runtime_env_dir).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("client_mode", [True, False])
def test_detached_actors(ray_start_cluster_head, working_dir, client_mode):
cluster = ray_start_cluster_head
address, env, runtime_env_dir = start_client_server(cluster, client_mode)
runtime_env = f"""{{ "working_dir": "{working_dir}" }}"""
# Execute the following cmd in driver with runtime_env
execute_statement = """
test_actor = TestActor.options(name="test_actor", lifetime="detached").remote()
print(sum(ray.get([test_actor.one.remote()] * 1000)))
"""
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "1000", out
# It's a detached actors, so it should still be there
assert len(kv._internal_kv_list("gcs://")) == 1
assert len(list(Path(runtime_env_dir).iterdir())) == 2
pkg_dir = [f for f in Path(runtime_env_dir).glob("*") if f.is_dir()][0]
sys.path.insert(0, str(pkg_dir))
test_actor = ray.get_actor("test_actor")
assert sum(ray.get([test_actor.one.remote()] * 1000)) == 1000
ray.kill(test_actor)
time.sleep(5)
assert len(list(Path(runtime_env_dir).iterdir())) == 1
assert len(kv._internal_kv_list("gcs://")) == 0
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_util_without_job_config(shutdown_only):
from ray.cluster_utils import Cluster
with tempfile.TemporaryDirectory() as tmp_dir:
with (Path(tmp_dir) / "lib.py").open("w") as f:
f.write("""
def one():
return 1
""")
old_dir = os.getcwd()
os.chdir(tmp_dir)
cluster = Cluster()
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)
address, env, runtime_env_dir = start_client_server(cluster, True)
script = f"""
import ray
import ray.util
import os
ray.util.connect("{address}", job_config=None)
@ray.remote
def run():
from lib import one
return one()
print(ray.get([run.remote()])[0])
"""
out = run_string_as_driver(script, env)
print(out)
os.chdir(old_dir)
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_init(shutdown_only):
with tempfile.TemporaryDirectory() as tmp_dir:
old_dir = os.getcwd()
os.chdir(tmp_dir)
with open("hello", "w") as f:
f.write("world")
ray.init(runtime_env={"working_dir": "."})
@ray.remote
class Test:
def test(self):
with open("hello") as f:
return f.read()
t = Test.remote()
assert ray.get(t.test.remote()) == "world"
os.chdir(old_dir)
def test_get_wheel_filename():
ray_version = "2.0.0.dev0"
@ -763,36 +121,6 @@ def test_container_option_serialize():
assert job_config_serialized.count(b"image") == 1
def test_working_dir_override_failure(shutdown_only):
ray.init()
with pytest.raises(ValueError):
@ray.remote(runtime_env={"working_dir": "."})
def f():
pass
@ray.remote
def g():
pass
with pytest.raises(ValueError):
g.options(runtime_env={"working_dir": "."})
with pytest.raises(ValueError):
@ray.remote(runtime_env={"working_dir": "."})
class A:
pass
@ray.remote
class B:
pass
with pytest.raises(ValueError):
B.options(runtime_env={"working_dir": "."})
@pytest.mark.skipif(
sys.platform == "win32", reason="runtime_env unsupported on Windows.")
def test_invalid_conda_env(shutdown_only):
@ -881,51 +209,6 @@ def test_no_spurious_worker_startup(shutdown_only):
assert got_num_workers, "failed to read num workers for 10 seconds"
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_large_file_boundary(shutdown_only):
with tempfile.TemporaryDirectory() as tmp_dir:
old_dir = os.getcwd()
os.chdir(tmp_dir)
# Check that packages just under the max size work as expected.
size = GCS_STORAGE_MAX_SIZE - 1024 * 1024
with open("test_file", "wb") as f:
f.write(os.urandom(size))
ray.init(runtime_env={"working_dir": "."})
@ray.remote
class Test:
def get_size(self):
with open("test_file", "rb") as f:
return len(f.read())
t = Test.remote()
assert ray.get(t.get_size.remote()) == size
os.chdir(old_dir)
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_large_file_error(shutdown_only):
with tempfile.TemporaryDirectory() as tmp_dir:
old_dir = os.getcwd()
os.chdir(tmp_dir)
# Write to two separate files, each of which is below the threshold to
# make sure the error is for the full package size.
size = GCS_STORAGE_MAX_SIZE // 2 + 1
with open("test_file_1", "wb") as f:
f.write(os.urandom(size))
with open("test_file_2", "wb") as f:
f.write(os.urandom(size))
with pytest.raises(RuntimeError):
ray.init(runtime_env={"working_dir": "."})
os.chdir(old_dir)
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -920,38 +920,6 @@ def test_runtime_env_override(call_ray_start):
ray.shutdown()
@pytest.mark.skipif(
os.environ.get("CI") and sys.platform != "linux",
reason="This test is only run on linux CI machines.")
def test_runtime_env_inheritance_regression(shutdown_only):
# https://github.com/ray-project/ray/issues/16479
with tempfile.TemporaryDirectory() as tmpdir, chdir(tmpdir):
with open("hello", "w") as f:
f.write("world")
ray.init(runtime_env={"working_dir": "."})
# Make sure we aren't reading the original file.
os.unlink("hello")
@ray.remote
class Test:
def f(self):
return open("hello").read()
# Passing working_dir URI through directly should work.
env1 = ray.get_runtime_context().runtime_env
assert "working_dir" in env1
t = Test.options(runtime_env=env1).remote()
assert ray.get(t.f.remote()) == "world"
# Passing a local directory should not work.
env2 = ray.get_runtime_context().runtime_env
env2["working_dir"] = "."
with pytest.raises(ValueError):
t = Test.options(runtime_env=env2).remote()
@pytest.mark.skipif(
os.environ.get("CI") and sys.platform != "linux",
reason="This test is only run on linux CI machines.")

View file

@ -0,0 +1,687 @@
from contextlib import contextmanager
import os
from pathlib import Path
import sys
import tempfile
import pytest
from pytest_lazyfixture import lazy_fixture
import ray
import ray.experimental.internal_kv as kv
from ray._private.test_utils import wait_for_condition
from ray._private.runtime_env.packaging import GCS_STORAGE_MAX_SIZE
S3_PACKAGE_URI = "s3://runtime-env-test/remote_runtime_env.zip"
@pytest.fixture(scope="function", params=["ray_client", "no_ray_client"])
def start_cluster(ray_start_cluster, request):
assert request.param in {"ray_client", "no_ray_client"}
use_ray_client: bool = request.param == "ray_client"
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
if use_ray_client:
cluster.head_node._ray_params.ray_client_server_port = "10003"
cluster.head_node.start_ray_client_server()
address = "ray://localhost:10003"
else:
address = cluster.address
yield cluster, address
@pytest.fixture(scope="function")
def tmp_working_dir():
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(tmp_dir)
hello_file = path / "hello"
with hello_file.open(mode="w") as f:
f.write("world")
module_path = path / "test_module"
module_path.mkdir(parents=True)
test_file = module_path / "test.py"
with test_file.open(mode="w") as f:
f.write("def one():\n")
f.write(" return 1\n")
init_file = module_path / "__init__.py"
with init_file.open(mode="w") as f:
f.write("from test_module.test import one\n")
yield tmp_dir
@pytest.mark.parametrize("test_failure", [True, False])
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_lazy_reads(start_cluster, tmp_working_dir, test_failure):
"""Tests the case where we lazily read files or import inside a task/actor.
This tests both that this fails *without* the working_dir and that it
passes with it.
"""
cluster, address = start_cluster
if test_failure:
# Don't pass working_dir, so it should fail!
ray.init(address)
else:
ray.init(address, runtime_env={"working_dir": tmp_working_dir})
@ray.remote
def test_import():
import test_module
return test_module.one()
if test_failure:
with pytest.raises(ImportError):
ray.get(test_import.remote())
else:
assert ray.get(test_import.remote()) == 1
@ray.remote
def test_read():
return open("hello").read()
if test_failure:
with pytest.raises(FileNotFoundError):
ray.get(test_read.remote())
else:
assert ray.get(test_read.remote()) == "world"
@ray.remote
class Actor:
def test_import(self):
import test_module
return test_module.one()
def test_read(self):
return open("hello").read()
a = Actor.remote()
if test_failure:
with pytest.raises(ImportError):
assert ray.get(a.test_import.remote()) == 1
with pytest.raises(FileNotFoundError):
assert ray.get(a.test_read.remote()) == "world"
else:
assert ray.get(a.test_import.remote()) == 1
assert ray.get(a.test_read.remote()) == "world"
@pytest.mark.parametrize("test_failure", [True, False])
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_captured_import(start_cluster, tmp_working_dir, test_failure):
"""Tests importing a module in the driver and capturing it in a task/actor.
This tests both that this fails *without* the working_dir and that it
passes with it.
"""
cluster, address = start_cluster
if test_failure:
# Don't pass working_dir, so it should fail!
ray.init(address)
else:
ray.init(address, runtime_env={"working_dir": tmp_working_dir})
# Import in the driver.
sys.path.insert(0, tmp_working_dir)
import test_module
@ray.remote
def test_import():
return test_module.one()
if test_failure:
with pytest.raises(Exception):
ray.get(test_import.remote())
else:
assert ray.get(test_import.remote()) == 1
@ray.remote
class Actor:
def test_import(self):
return test_module.one()
def test_read(self):
return open("hello").read()
if test_failure:
with pytest.raises(Exception):
a = Actor.remote()
assert ray.get(a.test_import.remote()) == 1
else:
a = Actor.remote()
assert ray.get(a.test_import.remote()) == 1
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_empty_working_dir(start_cluster):
"""Tests the case where we pass an empty directory as the working_dir."""
cluster, address = start_cluster
with tempfile.TemporaryDirectory() as working_dir:
ray.init(address, runtime_env={"working_dir": working_dir})
@ray.remote
def listdir():
return os.listdir()
assert len(ray.get(listdir.remote())) == 0
@ray.remote
class A:
def listdir(self):
return os.listdir()
pass
a = A.remote()
assert len(ray.get(a.listdir.remote())) == 0
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_invalid_working_dir(start_cluster):
"""Tests input validation for the working_dir."""
cluster, address = start_cluster
with pytest.raises(TypeError):
ray.init(address, runtime_env={"working_dir": 10})
ray.shutdown()
with pytest.raises(ValueError):
ray.init(address, runtime_env={"working_dir": "/does/not/exist"})
ray.shutdown()
with pytest.raises(ValueError):
ray.init(address, runtime_env={"working_dir": "does_not_exist"})
ray.shutdown()
with pytest.raises(ValueError):
ray.init(address, runtime_env={"working_dir": "s3://no_dot_zip"})
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize("test_failure", [True, False])
@pytest.mark.parametrize("per_task_actor", [True, False])
def test_s3_uri(start_cluster, test_failure, per_task_actor):
"""Tests the case where we lazily read files or import inside a task/actor.
In this case, the files come from an S3 URI.
This tests both that this fails *without* the working_dir and that it
passes with it.
"""
cluster, address = start_cluster
env = {"working_dir": S3_PACKAGE_URI}
if test_failure or per_task_actor:
ray.init(address)
else:
ray.init(address, runtime_env=env)
@ray.remote
def test_import():
import test_module
return test_module.one()
if not test_failure and per_task_actor:
test_import = test_import.options(runtime_env=env)
if test_failure:
with pytest.raises(ImportError):
ray.get(test_import.remote())
else:
assert ray.get(test_import.remote()) == 2
@ray.remote
class Actor:
def test_import(self):
import test_module
return test_module.one()
if not test_failure and per_task_actor:
Actor = Actor.options(runtime_env=env)
a = Actor.remote()
if test_failure:
with pytest.raises(ImportError):
assert ray.get(a.test_import.remote()) == 2
else:
assert ray.get(a.test_import.remote()) == 2
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize(
"working_dir",
[S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")])
def test_multi_node(start_cluster, working_dir):
"""Tests that the working_dir is propagated across multi-node clusters."""
NUM_NODES = 3
cluster, address = start_cluster
for _ in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(num_cpus=1)
ray.init(address, runtime_env={"working_dir": working_dir})
@ray.remote
def check_and_get_node_id():
import test_module
test_module.one()
return ray.get_runtime_context().node_id
object_refs = [check_and_get_node_id.remote() for _ in range(10000)]
assert len(set(ray.get(object_refs))) == NUM_NODES
def check_internal_kv_gced():
return len(kv._internal_kv_list("gcs://")) == 0
def check_local_files_gced(cluster):
for node in cluster.list_all_nodes():
all_files = os.listdir(node.get_runtime_env_dir_path())
# Check that there are no files remaining except for .lock files.
# TODO(edoakes): the lock files should get cleaned up too!
if len(list(filter(lambda f: not f.endswith(".lock"), all_files))) > 0:
return False
return True
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize(
"working_dir",
[S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")])
def test_job_level_gc(start_cluster, working_dir):
"""Tests that job-level working_dir is GC'd when the job exits."""
NUM_NODES = 3
cluster, address = start_cluster
for _ in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(num_cpus=1)
ray.init(address, runtime_env={"working_dir": working_dir})
# For a local directory, the package should be in the GCS.
# For an S3 URI, there should be nothing in the GCS because
# it will be downloaded from S3 directly on each node.
if working_dir == S3_PACKAGE_URI:
assert check_internal_kv_gced()
else:
assert not check_internal_kv_gced()
@ray.remote
class A:
def test_import(self):
import test_module
test_module.one()
actors = [A.remote() for _ in range(5)]
ray.get([a.test_import.remote() for a in actors])
@ray.remote
def test_import():
import test_module
test_module.one()
ray.get([test_import.remote() for _ in range(10000)])
if working_dir == S3_PACKAGE_URI:
assert check_internal_kv_gced()
else:
assert not check_internal_kv_gced()
assert not check_local_files_gced(cluster)
ray.shutdown()
# Need to re-connect to use internal_kv.
ray.init(address=address)
wait_for_condition(check_internal_kv_gced)
wait_for_condition(lambda: check_local_files_gced(cluster))
# TODO(edoakes): fix this bug and enable test.
@pytest.mark.skip("Currently failing.")
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_actor_level_gc(start_cluster):
"""Tests that actor-level working_dir is GC'd when the actor exits."""
NUM_NODES = 3
cluster, address = start_cluster
for _ in range(NUM_NODES - 1): # Head node already added.
cluster.add_node(num_cpus=1)
ray.init(address)
@ray.remote
class A:
def check(self):
assert "test_module" in os.listdir()
# TODO(edoakes): this doesn't work in decorator with ray client.
A = A.options(runtime_env={"working_dir": S3_PACKAGE_URI})
actors = [A.remote() for _ in range(5)]
ray.get([a.check.remote() for a in actors])
assert not check_local_files_gced(cluster)
[ray.kill(a) for a in actors]
wait_for_condition(lambda: check_local_files_gced(cluster))
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize(
"working_dir",
[S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")])
def test_detached_actor_gc(start_cluster, working_dir):
"""Tests that URIs for detached actors are GC'd only when they exit."""
cluster, address = start_cluster
ray.init(
address, namespace="test", runtime_env={"working_dir": working_dir})
# For a local directory, the package should be in the GCS.
# For an S3 URI, there should be nothing in the GCS because
# it will be downloaded from S3 directly on each node.
if working_dir == S3_PACKAGE_URI:
assert check_internal_kv_gced()
else:
assert not check_internal_kv_gced()
@ray.remote
class A:
def test_import(self):
import test_module
test_module.one()
a = A.options(name="test", lifetime="detached").remote()
ray.get(a.test_import.remote())
if working_dir == S3_PACKAGE_URI:
assert check_internal_kv_gced()
else:
assert not check_internal_kv_gced()
assert not check_local_files_gced(cluster)
ray.shutdown()
ray.init(address, namespace="test")
if working_dir == S3_PACKAGE_URI:
assert check_internal_kv_gced()
else:
assert not check_internal_kv_gced()
assert not check_local_files_gced(cluster)
a = ray.get_actor("test")
ray.get(a.test_import.remote())
ray.kill(a)
wait_for_condition(check_internal_kv_gced)
wait_for_condition(lambda: check_local_files_gced(cluster))
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_exclusion(start_cluster, tmp_working_dir):
"""Tests various forms of the 'excludes' parameter."""
cluster, address = start_cluster
def create_file(p):
if not p.parent.exists():
p.parent.mkdir(parents=True)
with p.open("w") as f:
f.write("Test")
working_path = Path(tmp_working_dir)
create_file(working_path / "test1")
create_file(working_path / "test2")
create_file(working_path / "test3")
create_file(working_path / "tmp_dir" / "test_1")
create_file(working_path / "tmp_dir" / "test_2")
create_file(working_path / "tmp_dir" / "test_3")
create_file(working_path / "tmp_dir" / "sub_dir" / "test_1")
create_file(working_path / "tmp_dir" / "sub_dir" / "test_2")
create_file(working_path / "cache" / "test_1")
create_file(working_path / "tmp_dir" / "cache" / "test_1")
create_file(working_path / "another_dir" / "cache" / "test_1")
# Test that all files are present without excluding.
ray.init(address, runtime_env={"working_dir": tmp_working_dir})
@ray.remote
def check_file(name):
try:
with open(name) as f:
return f.read()
except Exception:
return "FAILED"
def get_all():
return ray.get([
check_file.remote("test1"),
check_file.remote("test2"),
check_file.remote("test3"),
check_file.remote(os.path.join("tmp_dir", "test_1")),
check_file.remote(os.path.join("tmp_dir", "test_2")),
check_file.remote(os.path.join("tmp_dir", "test_3")),
check_file.remote(os.path.join("tmp_dir", "sub_dir", "test_1")),
check_file.remote(os.path.join("tmp_dir", "sub_dir", "test_2")),
check_file.remote(os.path.join("cache", "test_1")),
check_file.remote(os.path.join("tmp_dir", "cache", "test_1")),
check_file.remote(os.path.join("another_dir", "cache", "test_1")),
])
assert get_all() == [
"Test", "Test", "Test", "Test", "Test", "Test", "Test", "Test", "Test",
"Test", "Test"
]
ray.shutdown()
# Test various exclusion methods.
ray.init(
address,
runtime_env={
"working_dir": tmp_working_dir,
"excludes": [
# exclude by relative path
"test2",
# exclude by dir
str(Path("tmp_dir") / "sub_dir"),
# exclude part of the dir
str(Path("tmp_dir") / "test_1"),
# exclude part of the dir
str(Path("tmp_dir") / "test_2"),
]
})
assert get_all() == [
"Test", "FAILED", "Test", "FAILED", "FAILED", "Test", "FAILED",
"FAILED", "Test", "Test", "Test"
]
ray.shutdown()
# Test excluding all files using gitignore pattern matching syntax
ray.init(
address,
runtime_env={
"working_dir": tmp_working_dir,
"excludes": ["*"]
})
assert get_all() == [
"FAILED", "FAILED", "FAILED", "FAILED", "FAILED", "FAILED", "FAILED",
"FAILED", "FAILED", "FAILED", "FAILED"
]
ray.shutdown()
# Test excluding with a .gitignore file.
with open(f"{tmp_working_dir}/.gitignore", "w") as f:
f.write("""
# Comment
test_[12]
/test1
!/tmp_dir/sub_dir/test_1
cache/
""")
ray.init(
address, runtime_env={
"working_dir": tmp_working_dir,
})
assert get_all() == [
"FAILED", "Test", "Test", "FAILED", "FAILED", "Test", "Test", "FAILED",
"FAILED", "FAILED", "FAILED"
]
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
@pytest.mark.parametrize(
"working_dir",
[S3_PACKAGE_URI, lazy_fixture("tmp_working_dir")])
def test_runtime_context(start_cluster, working_dir):
"""Tests that the working_dir is propagated in the runtime_context."""
cluster, address = start_cluster
ray.init(runtime_env={"working_dir": working_dir})
def check():
wd = ray.get_runtime_context().runtime_env["working_dir"]
if working_dir == S3_PACKAGE_URI:
assert wd == S3_PACKAGE_URI
else:
assert wd.startswith("gcs://_ray_pkg_")
check()
@ray.remote
def task():
check()
ray.get(task.remote())
@ray.remote
class Actor:
def check(self):
check()
a = Actor.remote()
ray.get(a.check.remote())
def test_override_failure(shutdown_only):
"""Tests invalid override behaviors."""
ray.init()
with pytest.raises(ValueError):
@ray.remote(runtime_env={"working_dir": "."})
def f():
pass
@ray.remote
def g():
pass
with pytest.raises(ValueError):
g.options(runtime_env={"working_dir": "."})
with pytest.raises(ValueError):
@ray.remote(runtime_env={"working_dir": "."})
class A:
pass
@ray.remote
class B:
pass
with pytest.raises(ValueError):
B.options(runtime_env={"working_dir": "."})
@contextmanager
def chdir(d: str):
old_dir = os.getcwd()
os.chdir(d)
yield
os.chdir(old_dir)
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_inheritance(start_cluster):
"""Tests that child tasks/actors inherit URIs properly."""
cluster, address = start_cluster
with tempfile.TemporaryDirectory() as tmpdir, chdir(tmpdir):
with open("hello", "w") as f:
f.write("world")
ray.init(address, runtime_env={"working_dir": "."})
# Make sure we aren't reading the original file.
os.unlink("hello")
@ray.remote
class Test:
def f(self):
return open("hello").read()
# Passing working_dir URI through directly should work.
env1 = ray.get_runtime_context().runtime_env
assert "working_dir" in env1
t = Test.options(runtime_env=env1).remote()
assert ray.get(t.f.remote()) == "world"
# Passing a local directory should not work.
env2 = ray.get_runtime_context().runtime_env
env2["working_dir"] = "."
with pytest.raises(ValueError):
t = Test.options(runtime_env=env2).remote()
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_large_file_boundary(shutdown_only):
"""Check that packages just under the max size work as expected."""
with tempfile.TemporaryDirectory() as tmp_dir, chdir(tmp_dir):
size = GCS_STORAGE_MAX_SIZE - 1024 * 1024
with open("test_file", "wb") as f:
f.write(os.urandom(size))
ray.init(runtime_env={"working_dir": "."})
@ray.remote
class Test:
def get_size(self):
with open("test_file", "rb") as f:
return len(f.read())
t = Test.remote()
assert ray.get(t.get_size.remote()) == size
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_large_file_error(shutdown_only):
with tempfile.TemporaryDirectory() as tmp_dir, chdir(tmp_dir):
# Write to two separate files, each of which is below the threshold to
# make sure the error is for the full package size.
size = GCS_STORAGE_MAX_SIZE // 2 + 1
with open("test_file_1", "wb") as f:
f.write(os.urandom(size))
with open("test_file_2", "wb") as f:
f.write(os.urandom(size))
with pytest.raises(RuntimeError):
ray.init(runtime_env={"working_dir": "."})
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))