Add test case to check job conf compatible issue (#15082)

This commit is contained in:
Yi Cheng 2021-04-02 12:03:21 -07:00 committed by GitHub
parent 42565d5bbe
commit ecb94b3fe9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 101 additions and 18 deletions

View file

@ -205,7 +205,7 @@ def run_string_as_driver(driver_script: str, env: Dict = None):
return out return out
def run_string_as_driver_nonblocking(driver_script): def run_string_as_driver_nonblocking(driver_script, env: Dict = None):
"""Start a driver as a separate process and return immediately. """Start a driver as a separate process and return immediately.
Args: Args:
@ -225,7 +225,8 @@ def run_string_as_driver_nonblocking(driver_script):
[sys.executable, "-c", script], [sys.executable, "-c", script],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE,
env=env)
proc.stdin.write(driver_script.encode("ascii")) proc.stdin.write(driver_script.encode("ascii"))
proc.stdin.close() proc.stdin.close()
return proc return proc

View file

@ -9,11 +9,13 @@ import tempfile
from unittest import mock from unittest import mock
from pathlib import Path from pathlib import Path
import ray import ray
from ray.test_utils import run_string_as_driver from ray.test_utils import (run_string_as_driver,
run_string_as_driver_nonblocking)
from ray._private.utils import get_conda_env_dir, get_conda_bin_executable from ray._private.utils import get_conda_env_dir, get_conda_bin_executable
from ray.job_config import JobConfig from ray.job_config import JobConfig
from time import sleep
driver_script = """ driver_script = """
from time import sleep
import sys import sys
import logging import logging
sys.path.insert(0, "{working_dir}") sys.path.insert(0, "{working_dir}")
@ -26,12 +28,19 @@ job_config = ray.job_config.JobConfig(
runtime_env={runtime_env} runtime_env={runtime_env}
) )
if os.environ.get("USE_RAY_CLIENT"): if not job_config.runtime_env:
ray.util.connect("{address}", job_config=job_config) job_config=None
else:
ray.init(address="{address}", try:
job_config=job_config, if os.environ.get("USE_RAY_CLIENT"):
logging_level=logging.DEBUG) ray.util.connect("{address}", job_config=job_config)
else:
ray.init(address="{address}",
job_config=job_config,
logging_level=logging.DEBUG)
except:
print("ERROR")
sys.exit(0)
@ray.remote @ray.remote
def run_test(): def run_test():
@ -49,7 +58,6 @@ if os.environ.get("USE_RAY_CLIENT"):
ray.util.disconnect() ray.util.disconnect()
else: else:
ray.shutdown() ray.shutdown()
from time import sleep
sleep(10) sleep(10)
""" """
@ -185,6 +193,74 @@ print(sum(ray.get([test_actor.one.remote()] * 1000)))
assert len(list(Path(PKG_DIR).iterdir())) == 1 assert len(list(Path(PKG_DIR).iterdir())) == 1
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
def test_jobconfig_compatible_1(ray_start_cluster_head, working_dir):
# start job_config=None
# start job_config=something
cluster = ray_start_cluster_head
(address, env, PKG_DIR) = start_client_server(cluster, True)
runtime_env = None
execute_statement = """
sleep(600)
"""
script = driver_script.format(**locals())
# Have one running with job config = None
proc = run_string_as_driver_nonblocking(script, env)
# waiting it to be up
sleep(5)
runtime_env = f"""{{ "working_dir": "{working_dir}" }}"""
execute_statement = "print(sum(ray.get([run_test.remote()] * 1000)))"
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "ERROR"
proc.kill()
proc.wait()
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
def test_jobconfig_compatible_2(ray_start_cluster_head, working_dir):
# start job_config=something
# start job_config=None
cluster = ray_start_cluster_head
(address, env, PKG_DIR) = start_client_server(cluster, True)
runtime_env = """{ "py_modules": [test_module.__path__[0]] }"""
execute_statement = """
sleep(600)
"""
script = driver_script.format(**locals())
proc = run_string_as_driver_nonblocking(script, env)
sleep(5)
runtime_env = None
execute_statement = "print('OK')"
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
assert out.strip().split()[-1] == "OK"
proc.kill()
proc.wait()
@unittest.skipIf(sys.platform == "win32", "Fail to create temp dir.")
def test_jobconfig_compatible_3(ray_start_cluster_head, working_dir):
# start job_config=something
# start job_config=something else
cluster = ray_start_cluster_head
(address, env, PKG_DIR) = start_client_server(cluster, True)
runtime_env = """{ "py_modules": [test_module.__path__[0]] }"""
execute_statement = """
sleep(600)
"""
script = driver_script.format(**locals())
proc = run_string_as_driver_nonblocking(script, env)
sleep(5)
runtime_env = f"""{{ "working_dir": test_module.__path__[0] }}"""
execute_statement = "print('OK')"
script = driver_script.format(**locals())
out = run_string_as_driver(script, env)
proc.kill()
proc.wait()
assert out.strip().split()[-1] == "ERROR"
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def conda_envs(): def conda_envs():
"""Creates two copies of current conda env with different tf versions.""" """Creates two copies of current conda env with different tf versions."""

View file

@ -76,11 +76,12 @@ class RayletServicer(ray_client_pb2_grpc.RayletDriverServicer):
# runtime env is compatible. # runtime env is compatible.
if current_job_config and set(job_config.runtime_env.uris) != set( if current_job_config and set(job_config.runtime_env.uris) != set(
current_job_config.runtime_env.uris): current_job_config.runtime_env.uris):
raise grpc.RpcError( return ray_client_pb2.InitResponse(
"Runtime environment doesn't match " ok=False,
msg="Runtime environment doesn't match "
f"request one {job_config.runtime_env.uris} " f"request one {job_config.runtime_env.uris} "
f"current one {current_job_config.runtime_env.uris}") f"current one {current_job_config.runtime_env.uris}")
return ray_client_pb2.InitResponse() return ray_client_pb2.InitResponse(ok=True)
def PrepRuntimeEnv(self, request, def PrepRuntimeEnv(self, request,
context=None) -> ray_client_pb2.PrepRuntimeEnvResponse: context=None) -> ray_client_pb2.PrepRuntimeEnvResponse:

View file

@ -449,14 +449,17 @@ class Worker:
import ray._private.runtime_env as runtime_env import ray._private.runtime_env as runtime_env
import tempfile import tempfile
with tempfile.TemporaryDirectory() as tmp_dir: with tempfile.TemporaryDirectory() as tmp_dir:
if runtime_env.PKG_DIR is None: (old_dir, runtime_env.PKG_DIR) = (runtime_env.PKG_DIR, tmp_dir)
runtime_env.PKG_DIR = tmp_dir
# Generate the uri for runtime env # Generate the uri for runtime env
runtime_env.rewrite_working_dir_uri(job_config) runtime_env.rewrite_working_dir_uri(job_config)
init_req = ray_client_pb2.InitRequest( init_req = ray_client_pb2.InitRequest(
job_config=pickle.dumps(job_config)) job_config=pickle.dumps(job_config))
self.data_client.Init(init_req) init_resp = self.data_client.Init(init_req)
if not init_resp.ok:
logger.error("Init failed due to: ", init_resp.msg)
raise IOError(init_resp.msg)
runtime_env.upload_runtime_env_package_if_needed(job_config) runtime_env.upload_runtime_env_package_if_needed(job_config)
runtime_env.PKG_DIR = old_dir
prep_req = ray_client_pb2.PrepRuntimeEnvRequest() prep_req = ray_client_pb2.PrepRuntimeEnvRequest()
self.data_client.PrepRuntimeEnv(prep_req) self.data_client.PrepRuntimeEnv(prep_req)
except grpc.RpcError as e: except grpc.RpcError as e:

View file

@ -30,7 +30,7 @@ def connect(conn_str: str,
job_config=job_config, job_config=job_config,
secure=secure, secure=secure,
metadata=metadata, metadata=metadata,
connection_retries=3, connection_retries=connection_retries,
ignore_version=ignore_version) ignore_version=ignore_version)

View file

@ -243,6 +243,8 @@ message InitRequest {
} }
message InitResponse { message InitResponse {
bool ok = 1;
string msg = 2;
} }
message PrepRuntimeEnvRequest { message PrepRuntimeEnvRequest {