From ecb94b3fe908c7bd59a89787dee50f8ea891cc19 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 2 Apr 2021 12:03:21 -0700 Subject: [PATCH] Add test case to check job conf compatible issue (#15082) --- python/ray/test_utils.py | 5 +- python/ray/tests/test_runtime_env.py | 94 ++++++++++++++++++++++--- python/ray/util/client/server/server.py | 7 +- python/ray/util/client/worker.py | 9 ++- python/ray/util/client_connect.py | 2 +- src/ray/protobuf/ray_client.proto | 2 + 6 files changed, 101 insertions(+), 18 deletions(-) diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index c63bbff4f..2d6401444 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -205,7 +205,7 @@ def run_string_as_driver(driver_script: str, env: Dict = None): return out -def run_string_as_driver_nonblocking(driver_script): +def run_string_as_driver_nonblocking(driver_script, env: Dict = None): """Start a driver as a separate process and return immediately. Args: @@ -225,7 +225,8 @@ def run_string_as_driver_nonblocking(driver_script): [sys.executable, "-c", script], stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, + env=env) proc.stdin.write(driver_script.encode("ascii")) proc.stdin.close() return proc diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index e9682c08c..fc26ce3e2 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -9,11 +9,13 @@ import tempfile from unittest import mock from pathlib import Path import ray -from ray.test_utils import run_string_as_driver +from ray.test_utils import (run_string_as_driver, + run_string_as_driver_nonblocking) from ray._private.utils import get_conda_env_dir, get_conda_bin_executable from ray.job_config import JobConfig - +from time import sleep driver_script = """ +from time import sleep import sys import logging sys.path.insert(0, "{working_dir}") @@ -26,12 +28,19 @@ job_config = ray.job_config.JobConfig( runtime_env={runtime_env} ) -if os.environ.get("USE_RAY_CLIENT"): - ray.util.connect("{address}", job_config=job_config) -else: - ray.init(address="{address}", - job_config=job_config, - logging_level=logging.DEBUG) +if not job_config.runtime_env: + job_config=None + +try: + if os.environ.get("USE_RAY_CLIENT"): + ray.util.connect("{address}", job_config=job_config) + else: + ray.init(address="{address}", + job_config=job_config, + logging_level=logging.DEBUG) +except: + print("ERROR") + sys.exit(0) @ray.remote def run_test(): @@ -49,7 +58,6 @@ if os.environ.get("USE_RAY_CLIENT"): ray.util.disconnect() else: ray.shutdown() -from time import sleep sleep(10) """ @@ -185,6 +193,74 @@ print(sum(ray.get([test_actor.one.remote()] * 1000))) assert len(list(Path(PKG_DIR).iterdir())) == 1 +@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") +def test_jobconfig_compatible_1(ray_start_cluster_head, working_dir): + # start job_config=None + # start job_config=something + cluster = ray_start_cluster_head + (address, env, PKG_DIR) = start_client_server(cluster, True) + runtime_env = None + execute_statement = """ +sleep(600) +""" + script = driver_script.format(**locals()) + # Have one running with job config = None + proc = run_string_as_driver_nonblocking(script, env) + # waiting it to be up + sleep(5) + runtime_env = f"""{{ "working_dir": "{working_dir}" }}""" + execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))" + script = driver_script.format(**locals()) + out = run_string_as_driver(script, env) + assert out.strip().split()[-1] == "ERROR" + proc.kill() + proc.wait() + + +@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") +def test_jobconfig_compatible_2(ray_start_cluster_head, working_dir): + # start job_config=something + # start job_config=None + cluster = ray_start_cluster_head + (address, env, PKG_DIR) = start_client_server(cluster, True) + runtime_env = """{ "py_modules": [test_module.__path__[0]] }""" + execute_statement = """ +sleep(600) +""" + script = driver_script.format(**locals()) + proc = run_string_as_driver_nonblocking(script, env) + sleep(5) + runtime_env = None + execute_statement = "print('OK')" + script = driver_script.format(**locals()) + out = run_string_as_driver(script, env) + assert out.strip().split()[-1] == "OK" + proc.kill() + proc.wait() + + +@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.") +def test_jobconfig_compatible_3(ray_start_cluster_head, working_dir): + # start job_config=something + # start job_config=something else + cluster = ray_start_cluster_head + (address, env, PKG_DIR) = start_client_server(cluster, True) + runtime_env = """{ "py_modules": [test_module.__path__[0]] }""" + execute_statement = """ +sleep(600) +""" + script = driver_script.format(**locals()) + proc = run_string_as_driver_nonblocking(script, env) + sleep(5) + runtime_env = f"""{{ "working_dir": test_module.__path__[0] }}""" + execute_statement = "print('OK')" + script = driver_script.format(**locals()) + out = run_string_as_driver(script, env) + proc.kill() + proc.wait() + assert out.strip().split()[-1] == "ERROR" + + @pytest.fixture(scope="session") def conda_envs(): """Creates two copies of current conda env with different tf versions.""" diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index 09ef15e6b..fe3b71654 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -76,11 +76,12 @@ class RayletServicer(ray_client_pb2_grpc.RayletDriverServicer): # runtime env is compatible. if current_job_config and set(job_config.runtime_env.uris) != set( current_job_config.runtime_env.uris): - raise grpc.RpcError( - "Runtime environment doesn't match " + return ray_client_pb2.InitResponse( + ok=False, + msg="Runtime environment doesn't match " f"request one {job_config.runtime_env.uris} " f"current one {current_job_config.runtime_env.uris}") - return ray_client_pb2.InitResponse() + return ray_client_pb2.InitResponse(ok=True) def PrepRuntimeEnv(self, request, context=None) -> ray_client_pb2.PrepRuntimeEnvResponse: diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 515f4c80d..da4b22bf8 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -449,14 +449,17 @@ class Worker: import ray._private.runtime_env as runtime_env import tempfile with tempfile.TemporaryDirectory() as tmp_dir: - if runtime_env.PKG_DIR is None: - runtime_env.PKG_DIR = tmp_dir + (old_dir, runtime_env.PKG_DIR) = (runtime_env.PKG_DIR, tmp_dir) # Generate the uri for runtime env runtime_env.rewrite_working_dir_uri(job_config) init_req = ray_client_pb2.InitRequest( job_config=pickle.dumps(job_config)) - self.data_client.Init(init_req) + init_resp = self.data_client.Init(init_req) + if not init_resp.ok: + logger.error("Init failed due to: ", init_resp.msg) + raise IOError(init_resp.msg) runtime_env.upload_runtime_env_package_if_needed(job_config) + runtime_env.PKG_DIR = old_dir prep_req = ray_client_pb2.PrepRuntimeEnvRequest() self.data_client.PrepRuntimeEnv(prep_req) except grpc.RpcError as e: diff --git a/python/ray/util/client_connect.py b/python/ray/util/client_connect.py index 542f539bb..d3733f8bf 100644 --- a/python/ray/util/client_connect.py +++ b/python/ray/util/client_connect.py @@ -30,7 +30,7 @@ def connect(conn_str: str, job_config=job_config, secure=secure, metadata=metadata, - connection_retries=3, + connection_retries=connection_retries, ignore_version=ignore_version) diff --git a/src/ray/protobuf/ray_client.proto b/src/ray/protobuf/ray_client.proto index e6863b33f..9b2ccc72d 100644 --- a/src/ray/protobuf/ray_client.proto +++ b/src/ray/protobuf/ray_client.proto @@ -243,6 +243,8 @@ message InitRequest { } message InitResponse { + bool ok = 1; + string msg = 2; } message PrepRuntimeEnvRequest {