mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
[ci/release] Always use full cluster address (#23067)
Not using the full cluster address is deprecated and breaks Job usage for uploads/downloads: https://buildkite.com/ray-project/release-tests-branch/builds/135#2a03e47b-6a9a-42ff-9346-905725eb8d09
This commit is contained in:
parent
07372927cc
commit
a8bed94ed6
8 changed files with 32 additions and 27 deletions
|
@ -1,4 +1,4 @@
|
||||||
cloud_id: cld_17WvYIBBkdgLwEUNcLeRAE
|
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
|
||||||
region: us-west-2
|
region: us-west-2
|
||||||
|
|
||||||
aws:
|
aws:
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import copy
|
import copy
|
||||||
|
import os
|
||||||
from typing import Optional, Dict
|
from typing import Optional, Dict
|
||||||
|
|
||||||
from ray_release.buildkite.concurrency import CONCURRENY_GROUPS, get_concurrency_group
|
from ray_release.buildkite.concurrency import CONCURRENY_GROUPS, get_concurrency_group
|
||||||
|
@ -47,7 +48,11 @@ def get_step(
|
||||||
|
|
||||||
step = copy.deepcopy(DEFAULT_STEP_TEMPLATE)
|
step = copy.deepcopy(DEFAULT_STEP_TEMPLATE)
|
||||||
|
|
||||||
cmd = f"./release/run_release_test.sh \"{test['name']}\" --report"
|
cmd = f"./release/run_release_test.sh \"{test['name']}\" "
|
||||||
|
|
||||||
|
if not bool(int(os.environ.get("NO_REPORT_OVERRIDE", "0"))):
|
||||||
|
cmd += " --report"
|
||||||
|
|
||||||
if smoke_test:
|
if smoke_test:
|
||||||
cmd += " --smoke-test"
|
cmd += " --smoke-test"
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ class ClusterManager(abc.ABC):
|
||||||
def terminate_cluster(self):
|
def terminate_cluster(self):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def get_cluster_address(self, full: bool = True) -> str:
|
def get_cluster_address(self) -> str:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def get_cluster_url(self) -> Optional[str]:
|
def get_cluster_url(self) -> Optional[str]:
|
||||||
|
|
|
@ -125,9 +125,3 @@ class FullClusterManager(MinimalClusterManager):
|
||||||
while result.result.state != "Terminated":
|
while result.result.state != "Terminated":
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
result = self.sdk.get_cluster(self.cluster_id)
|
result = self.sdk.get_cluster(self.cluster_id)
|
||||||
|
|
||||||
def get_cluster_address(self, full: bool = True) -> str:
|
|
||||||
if full:
|
|
||||||
return f"anyscale://{self.project_name}/{self.cluster_name}"
|
|
||||||
else:
|
|
||||||
return f"anyscale://{self.cluster_name}"
|
|
||||||
|
|
|
@ -287,5 +287,5 @@ class MinimalClusterManager(ClusterManager):
|
||||||
def terminate_cluster(self):
|
def terminate_cluster(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def get_cluster_address(self, full: bool = True) -> str:
|
def get_cluster_address(self) -> str:
|
||||||
return f"anyscale://{self.cluster_name}"
|
return f"anyscale://{self.project_name}/{self.cluster_name}"
|
||||||
|
|
|
@ -178,7 +178,7 @@ def load_and_render_yaml_template(
|
||||||
render_env.update(env)
|
render_env.update(env)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
content = jinja2.Template(content).render(env=env)
|
content = jinja2.Template(content).render(env=render_env)
|
||||||
return yaml.safe_load(content)
|
return yaml.safe_load(content)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise ReleaseTestConfigError(
|
raise ReleaseTestConfigError(
|
||||||
|
|
|
@ -19,17 +19,26 @@ class JobManager:
|
||||||
self.job_client = None
|
self.job_client = None
|
||||||
self.last_job_id = None
|
self.last_job_id = None
|
||||||
|
|
||||||
|
def _get_job_client(self) -> JobSubmissionClient:
|
||||||
|
if not self.job_client:
|
||||||
|
self.job_client = JobSubmissionClient(
|
||||||
|
self.cluster_manager.get_cluster_address()
|
||||||
|
)
|
||||||
|
return self.job_client
|
||||||
|
|
||||||
def _run_job(self, cmd_to_run, env_vars) -> int:
|
def _run_job(self, cmd_to_run, env_vars) -> int:
|
||||||
self.counter += 1
|
self.counter += 1
|
||||||
command_id = self.counter
|
command_id = self.counter
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
env["RAY_ADDRESS"] = self.cluster_manager.get_cluster_address(full=False)
|
env["RAY_ADDRESS"] = self.cluster_manager.get_cluster_address()
|
||||||
env.setdefault("ANYSCALE_HOST", ANYSCALE_HOST)
|
env.setdefault("ANYSCALE_HOST", ANYSCALE_HOST)
|
||||||
|
|
||||||
full_cmd = " ".join(f"{k}={v}" for k, v in env_vars.items()) + " " + cmd_to_run
|
full_cmd = " ".join(f"{k}={v}" for k, v in env_vars.items()) + " " + cmd_to_run
|
||||||
logger.info(f"Executing {cmd_to_run} with {env_vars} via ray job submit")
|
logger.info(f"Executing {cmd_to_run} with {env_vars} via ray job submit")
|
||||||
|
|
||||||
job_id = self.job_client.submit_job(
|
job_client = self._get_job_client()
|
||||||
|
|
||||||
|
job_id = job_client.submit_job(
|
||||||
# Entrypoint shell command to execute
|
# Entrypoint shell command to execute
|
||||||
entrypoint=full_cmd,
|
entrypoint=full_cmd,
|
||||||
)
|
)
|
||||||
|
@ -39,6 +48,8 @@ class JobManager:
|
||||||
return command_id
|
return command_id
|
||||||
|
|
||||||
def _wait_job(self, command_id: int, timeout: int):
|
def _wait_job(self, command_id: int, timeout: int):
|
||||||
|
job_client = self._get_job_client()
|
||||||
|
|
||||||
start_time = time.monotonic()
|
start_time = time.monotonic()
|
||||||
timeout_at = start_time + timeout
|
timeout_at = start_time + timeout
|
||||||
next_status = start_time + 30
|
next_status = start_time + 30
|
||||||
|
@ -56,11 +67,11 @@ class JobManager:
|
||||||
f"({int(now - start_time)} seconds) ..."
|
f"({int(now - start_time)} seconds) ..."
|
||||||
)
|
)
|
||||||
next_status += 30
|
next_status += 30
|
||||||
status = self.job_client.get_job_status(self.job_id_pool[command_id])
|
status = job_client.get_job_status(self.job_id_pool[command_id])
|
||||||
if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
|
if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}:
|
||||||
break
|
break
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
status = self.job_client.get_job_status(self.job_id_pool[command_id])
|
status = job_client.get_job_status(self.job_id_pool[command_id])
|
||||||
# TODO(sang): Propagate JobInfo.error_type
|
# TODO(sang): Propagate JobInfo.error_type
|
||||||
if status == JobStatus.SUCCEEDED:
|
if status == JobStatus.SUCCEEDED:
|
||||||
retcode = 0
|
retcode = 0
|
||||||
|
@ -69,18 +80,13 @@ class JobManager:
|
||||||
duration = time.time() - self.start_time[command_id]
|
duration = time.time() - self.start_time[command_id]
|
||||||
return retcode, duration
|
return retcode, duration
|
||||||
|
|
||||||
def run_and_wait(self, cmd_to_run, env_vars, timeout: int = 120) -> Tuple[int, int]:
|
def run_and_wait(
|
||||||
if not self.job_client:
|
self, cmd_to_run, env_vars, timeout: int = 120
|
||||||
self.job_client = JobSubmissionClient(
|
) -> Tuple[int, float]:
|
||||||
self.cluster_manager.get_cluster_address(full=False)
|
|
||||||
)
|
|
||||||
cid = self._run_job(cmd_to_run, env_vars)
|
cid = self._run_job(cmd_to_run, env_vars)
|
||||||
return self._wait_job(cid, timeout)
|
return self._wait_job(cid, timeout)
|
||||||
|
|
||||||
def get_last_logs(self):
|
def get_last_logs(self):
|
||||||
# return None
|
# return None
|
||||||
if not self.job_client:
|
job_client = self._get_job_client()
|
||||||
self.job_client = JobSubmissionClient(
|
return job_client.get_job_logs(self.last_job_id)
|
||||||
self.cluster_manager.get_cluster_address(full=False)
|
|
||||||
)
|
|
||||||
return self.job_client.get_job_logs(self.last_job_id)
|
|
||||||
|
|
|
@ -2423,7 +2423,7 @@
|
||||||
file_manager: job
|
file_manager: job
|
||||||
|
|
||||||
smoke_test:
|
smoke_test:
|
||||||
frequency: nightly
|
frequency: multi
|
||||||
|
|
||||||
cluster:
|
cluster:
|
||||||
cluster_env: dask_on_ray/large_scale_dask_on_ray_app_config.yaml
|
cluster_env: dask_on_ray/large_scale_dask_on_ray_app_config.yaml
|
||||||
|
|
Loading…
Add table
Reference in a new issue