ray/release/runtime_env_tests/workloads/rte_ray_client.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

93 lines
2.9 KiB
Python
Raw Normal View History

"""Runtime env test on Ray Client
This test installs runtime environments on a remote cluster using local
pip requirements.txt files. It is intended to be run using Anyscale connect.
This complements existing per-commit tests in CI, for which we don't have
access to a physical remote cluster.
Test owner: architkulkarni
Acceptance criteria: Should run through and print "PASSED"
"""
import argparse
import json
import os
import tempfile
import time
from pathlib import Path
import ray
def test_pip_requirements_files(tmpdir: str):
"""Test requirements.txt with tasks and actors.
Test specifying in @ray.remote decorator and in .options.
"""
pip_file_18 = Path(os.path.join(tmpdir, "runtime_env_pip_18.txt"))
pip_file_18.write_text("requests==2.18.0")
env_18 = {"pip": str(pip_file_18)}
pip_file_16 = Path(os.path.join(tmpdir, "runtime_env_pip_16.txt"))
pip_file_16.write_text("requests==2.16.0")
env_16 = {"pip": str(pip_file_16)}
@ray.remote(runtime_env=env_16)
def get_version():
import requests
return requests.__version__
# TODO(architkulkarni): Uncomment after #19002 is fixed
# assert ray.get(get_version.remote()) == "2.16.0"
assert ray.get(get_version.options(runtime_env=env_18).remote()) == "2.18.0"
@ray.remote(runtime_env=env_18)
class VersionActor:
def get_version(self):
import requests
return requests.__version__
# TODO(architkulkarni): Uncomment after #19002 is fixed
# actor_18 = VersionActor.remote()
# assert ray.get(actor_18.get_version.remote()) == "2.18.0"
actor_16 = VersionActor.options(runtime_env=env_16).remote()
assert ray.get(actor_16.get_version.remote()) == "2.16.0"
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing."
)
args = parser.parse_args()
start = time.time()
addr = os.environ.get("RAY_ADDRESS")
job_name = os.environ.get("RAY_JOB_NAME", "rte_ray_client")
# Test reconnecting to the same cluster multiple times.
for use_working_dir in [True, True, False, False]:
with tempfile.TemporaryDirectory() as tmpdir:
runtime_env = {"working_dir": tmpdir} if use_working_dir else None
print("Testing with use_working_dir=" + str(use_working_dir))
if addr is not None and addr.startswith("anyscale://"):
ray.init(address=addr, job_name=job_name, runtime_env=runtime_env)
else:
ray.init(address="auto", runtime_env=runtime_env)
test_pip_requirements_files(tmpdir)
ray.shutdown()
taken = time.time() - start
result = {
"time_taken": taken,
}
test_output_json = os.environ.get("TEST_OUTPUT_JSON", "/tmp/rte_ray_client.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
print("PASSED")