[runtime env] [test] Enable runtime env nightly test with working_dir reconnection (#19906)

This commit is contained in:
architkulkarni 2021-10-31 08:48:48 -07:00 committed by GitHub
parent de8a9b5151
commit 702bffe072
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 25 deletions

View file

@ -72,17 +72,24 @@ def test_lazy_reads(start_cluster, tmp_working_dir, option: str):
"""
cluster, address = start_cluster
if option == "failure":
# Don't pass the files at all, so it should fail!
ray.init(address)
elif option == "working_dir":
ray.init(address, runtime_env={"working_dir": tmp_working_dir})
elif option == "py_modules":
ray.init(
address,
runtime_env={
"py_modules": [str(Path(tmp_working_dir) / "test_module")]
})
def call_ray_init():
if option == "failure":
# Don't pass the files at all, so it should fail!
ray.init(address)
elif option == "working_dir":
ray.init(address, runtime_env={"working_dir": tmp_working_dir})
elif option == "py_modules":
ray.init(
address,
runtime_env={
"py_modules": [str(Path(tmp_working_dir) / "test_module")]
})
call_ray_init()
def reinit():
ray.shutdown()
call_ray_init()
@ray.remote
def test_import():
@ -95,6 +102,8 @@ def test_lazy_reads(start_cluster, tmp_working_dir, option: str):
else:
assert ray.get(test_import.remote()) == 1
reinit()
@ray.remote
def test_read():
return open("hello").read()
@ -105,6 +114,8 @@ def test_lazy_reads(start_cluster, tmp_working_dir, option: str):
elif option == "working_dir":
assert ray.get(test_read.remote()) == "world"
reinit()
@ray.remote
class Actor:
def test_import(self):
@ -135,17 +146,26 @@ def test_captured_import(start_cluster, tmp_working_dir, option: str):
"""
cluster, address = start_cluster
if option == "failure":
# Don't pass the files at all, so it should fail!
ray.init(address)
elif option == "working_dir":
ray.init(address, runtime_env={"working_dir": tmp_working_dir})
elif option == "py_modules":
ray.init(
address,
runtime_env={
"py_modules": [os.path.join(tmp_working_dir, "test_module")]
})
def call_ray_init():
if option == "failure":
# Don't pass the files at all, so it should fail!
ray.init(address)
elif option == "working_dir":
ray.init(address, runtime_env={"working_dir": tmp_working_dir})
elif option == "py_modules":
ray.init(
address,
runtime_env={
"py_modules": [
os.path.join(tmp_working_dir, "test_module")
]
})
call_ray_init()
def reinit():
ray.shutdown()
call_ray_init()
# Import in the driver.
sys.path.insert(0, tmp_working_dir)
@ -161,6 +181,8 @@ def test_captured_import(start_cluster, tmp_working_dir, option: str):
else:
assert ray.get(test_import.remote()) == 1
reinit()
@ray.remote
class Actor:
def test_import(self):
@ -197,6 +219,10 @@ def test_empty_working_dir(start_cluster):
a = A.remote()
assert len(ray.get(a.listdir.remote())) == 0
# Test that we can reconnect with no errors
ray.shutdown()
ray.init(address, runtime_env={"working_dir": working_dir})
@pytest.mark.parametrize("option", ["working_dir", "py_modules"])
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")

View file

@ -189,8 +189,7 @@ NIGHTLY_TESTS = {
"serve_cluster_fault_tolerance",
],
"~/ray/release/runtime_env_tests/runtime_env_tests.yaml": [
"rte_many_tasks_actors",
"wheel_urls",
"rte_many_tasks_actors", "wheel_urls", "rte_ray_client"
],
}

View file

@ -69,7 +69,8 @@ if __name__ == "__main__":
addr = os.environ.get("RAY_ADDRESS")
job_name = os.environ.get("RAY_JOB_NAME", "rte_ray_client")
for use_working_dir in [True, False]:
# Test reconnecting to the same cluster multiple times.
for use_working_dir in [True, True, False, False]:
with tempfile.TemporaryDirectory() as tmpdir:
runtime_env = {"working_dir": tmpdir} if use_working_dir else None
print("Testing with use_working_dir=" + str(use_working_dir))