mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00

Currently the release test runner prefers the first successfully version of a cluster env, instead of the last version. But sometimes a cluster env may build successfully on Anyscale but cannot launch cluster successfully (e.g. version 2 here) or new dependencies need to be installed, so a new version needs to be built. The existing logic always picks up the 1st successful build and cannot pick up the new cluster env version. Although this is an edge case (tweaking cluster env versions, with the same Ray wheel or cluster env name), I believe it is possible for others to run into it. Also, avoid running most of the CI tests for changes under release/ray_release/.
307 lines
11 KiB
Python
307 lines
11 KiB
Python
import time
|
|
|
|
from ray_release.exception import (
|
|
ClusterEnvBuildError,
|
|
ClusterEnvBuildTimeout,
|
|
ClusterEnvCreateError,
|
|
ClusterComputeCreateError,
|
|
)
|
|
from ray_release.logger import logger
|
|
from ray_release.cluster_manager.cluster_manager import ClusterManager
|
|
from ray_release.util import format_link, anyscale_cluster_env_build_url
|
|
|
|
REPORT_S = 30.0
|
|
|
|
|
|
class MinimalClusterManager(ClusterManager):
|
|
"""Minimal manager.
|
|
|
|
Builds app config and compute template but does not start or stop session.
|
|
"""
|
|
|
|
def create_cluster_env(self, _repeat: bool = True):
|
|
assert self.cluster_env_id is None
|
|
|
|
if self.cluster_env:
|
|
assert self.cluster_env_name
|
|
|
|
logger.info(
|
|
f"Test uses a cluster env with name "
|
|
f"{self.cluster_env_name}. Looking up existing "
|
|
f"cluster envs with this name."
|
|
)
|
|
|
|
paging_token = None
|
|
while not self.cluster_env_id:
|
|
result = self.sdk.search_cluster_environments(
|
|
dict(
|
|
project_id=self.project_id,
|
|
name=dict(equals=self.cluster_env_name),
|
|
paging=dict(count=50, token=paging_token),
|
|
)
|
|
)
|
|
paging_token = result.metadata.next_paging_token
|
|
|
|
for res in result.results:
|
|
if res.name == self.cluster_env_name:
|
|
self.cluster_env_id = res.id
|
|
logger.info(
|
|
f"Cluster env already exists with ID "
|
|
f"{self.cluster_env_id}"
|
|
)
|
|
break
|
|
|
|
if not paging_token or self.cluster_env_id:
|
|
break
|
|
|
|
if not self.cluster_env_id:
|
|
logger.info("Cluster env not found. Creating new one.")
|
|
try:
|
|
result = self.sdk.create_cluster_environment(
|
|
dict(
|
|
name=self.cluster_env_name,
|
|
project_id=self.project_id,
|
|
config_json=self.cluster_env,
|
|
)
|
|
)
|
|
self.cluster_env_id = result.result.id
|
|
except Exception as e:
|
|
if _repeat:
|
|
logger.warning(
|
|
f"Got exception when trying to create cluster "
|
|
f"env: {e}. Sleeping for 10 seconds and then "
|
|
f"try again once..."
|
|
)
|
|
time.sleep(10)
|
|
return self.create_cluster_env(_repeat=False)
|
|
|
|
raise ClusterEnvCreateError("Could not create cluster env.") from e
|
|
|
|
logger.info(f"Cluster env created with ID {self.cluster_env_id}")
|
|
|
|
def build_cluster_env(self, timeout: float = 600.0):
|
|
assert self.cluster_env_id
|
|
assert self.cluster_env_build_id is None
|
|
|
|
# Fetch build
|
|
build_id = None
|
|
last_status = None
|
|
error_message = None
|
|
config_json = None
|
|
result = self.sdk.list_cluster_environment_builds(self.cluster_env_id)
|
|
if not result or not result.results:
|
|
raise ClusterEnvBuildError(f"No build found for cluster env: {result}")
|
|
|
|
build = sorted(result.results, key=lambda b: b.created_at)[-1]
|
|
build_id = build.id
|
|
last_status = build.status
|
|
error_message = build.error_message
|
|
config_json = build.config_json
|
|
|
|
if last_status == "succeeded":
|
|
logger.info(
|
|
f"Link to succeeded cluster env build: "
|
|
f"{format_link(anyscale_cluster_env_build_url(build_id))}"
|
|
)
|
|
self.cluster_env_build_id = build_id
|
|
return
|
|
|
|
if last_status == "failed":
|
|
logger.info(f"Previous cluster env build failed: {error_message}")
|
|
logger.info("Starting new cluster env build...")
|
|
|
|
# Retry build
|
|
result = self.sdk.create_cluster_environment_build(
|
|
dict(
|
|
cluster_environment_id=self.cluster_env_id, config_json=config_json
|
|
)
|
|
)
|
|
build_id = result.result.id
|
|
|
|
logger.info(
|
|
f"Link to created cluster env build: "
|
|
f"{format_link(anyscale_cluster_env_build_url(build_id))}"
|
|
)
|
|
|
|
# Build found but not failed/finished yet
|
|
completed = False
|
|
start_wait = time.time()
|
|
next_report = start_wait + REPORT_S
|
|
timeout_at = time.monotonic() + timeout
|
|
logger.info(f"Waiting for build {build_id} to finish...")
|
|
logger.info(
|
|
f"Track progress here: "
|
|
f"{format_link(anyscale_cluster_env_build_url(build_id))}"
|
|
)
|
|
while not completed:
|
|
now = time.time()
|
|
if now > next_report:
|
|
logger.info(
|
|
f"... still waiting for build {build_id} to finish "
|
|
f"({int(now - start_wait)} seconds) ..."
|
|
)
|
|
next_report = next_report + REPORT_S
|
|
|
|
result = self.sdk.get_build(build_id)
|
|
build = result.result
|
|
|
|
if build.status == "failed":
|
|
raise ClusterEnvBuildError(
|
|
f"Cluster env build failed. Please see "
|
|
f"{anyscale_cluster_env_build_url(build_id)} for details. "
|
|
f"Error message: {build.error_message}"
|
|
)
|
|
|
|
if build.status == "succeeded":
|
|
logger.info("Build succeeded.")
|
|
self.cluster_env_build_id = build_id
|
|
return
|
|
|
|
completed = build.status not in ["in_progress", "pending"]
|
|
|
|
if completed:
|
|
raise ClusterEnvBuildError(
|
|
f"Unknown build status: {build.status}. Please see "
|
|
f"{anyscale_cluster_env_build_url(build_id)} for details"
|
|
)
|
|
|
|
if time.monotonic() > timeout_at:
|
|
raise ClusterEnvBuildTimeout(
|
|
f"Time out when building cluster env {self.cluster_env_name}"
|
|
)
|
|
|
|
time.sleep(1)
|
|
|
|
self.cluster_env_build_id = build_id
|
|
|
|
def fetch_build_info(self):
|
|
assert self.cluster_env_build_id
|
|
|
|
result = self.sdk.get_cluster_environment_build(self.cluster_env_build_id)
|
|
self.cluster_env = result.result.config_json
|
|
|
|
def create_cluster_compute(self, _repeat: bool = True):
|
|
assert self.cluster_compute_id is None
|
|
|
|
if self.cluster_compute:
|
|
assert self.cluster_compute
|
|
|
|
logger.info(
|
|
f"Tests uses compute template "
|
|
f"with name {self.cluster_compute_name}. "
|
|
f"Looking up existing cluster computes."
|
|
)
|
|
|
|
paging_token = None
|
|
while not self.cluster_compute_id:
|
|
result = self.sdk.search_cluster_computes(
|
|
dict(
|
|
project_id=self.project_id,
|
|
name=dict(equals=self.cluster_compute_name),
|
|
include_anonymous=True,
|
|
paging=dict(token=paging_token),
|
|
)
|
|
)
|
|
paging_token = result.metadata.next_paging_token
|
|
|
|
for res in result.results:
|
|
if res.name == self.cluster_compute_name:
|
|
self.cluster_compute_id = res.id
|
|
logger.info(
|
|
f"Cluster compute already exists "
|
|
f"with ID {self.cluster_compute_id}"
|
|
)
|
|
break
|
|
|
|
if not paging_token:
|
|
break
|
|
|
|
if not self.cluster_compute_id:
|
|
logger.info(
|
|
f"Cluster compute not found. "
|
|
f"Creating with name {self.cluster_compute_name}."
|
|
)
|
|
try:
|
|
result = self.sdk.create_cluster_compute(
|
|
dict(
|
|
name=self.cluster_compute_name,
|
|
project_id=self.project_id,
|
|
config=self.cluster_compute,
|
|
)
|
|
)
|
|
self.cluster_compute_id = result.result.id
|
|
except Exception as e:
|
|
if _repeat:
|
|
logger.warning(
|
|
f"Got exception when trying to create cluster "
|
|
f"compute: {e}. Sleeping for 10 seconds and then "
|
|
f"try again once..."
|
|
)
|
|
time.sleep(10)
|
|
return self.create_cluster_compute(_repeat=False)
|
|
|
|
raise ClusterComputeCreateError(
|
|
"Could not create cluster compute"
|
|
) from e
|
|
|
|
logger.info(
|
|
f"Cluster compute template created with "
|
|
f"name {self.cluster_compute_name} and "
|
|
f"ID {self.cluster_compute_id}"
|
|
)
|
|
|
|
def build_configs(self, timeout: float = 30.0):
|
|
try:
|
|
self.create_cluster_compute()
|
|
except AssertionError as e:
|
|
# If already exists, ignore
|
|
logger.warning(str(e))
|
|
except ClusterComputeCreateError as e:
|
|
raise e
|
|
except Exception as e:
|
|
raise ClusterComputeCreateError(
|
|
f"Unexpected cluster compute build error: {e}"
|
|
) from e
|
|
|
|
try:
|
|
self.create_cluster_env()
|
|
except AssertionError as e:
|
|
# If already exists, ignore
|
|
logger.warning(str(e))
|
|
except ClusterEnvCreateError as e:
|
|
raise e
|
|
except Exception as e:
|
|
raise ClusterEnvCreateError(
|
|
f"Unexpected cluster env create error: {e}"
|
|
) from e
|
|
|
|
try:
|
|
self.build_cluster_env(timeout=timeout)
|
|
except AssertionError as e:
|
|
# If already exists, ignore
|
|
logger.warning(str(e))
|
|
except (ClusterEnvBuildError, ClusterEnvBuildTimeout) as e:
|
|
raise e
|
|
except Exception as e:
|
|
raise ClusterEnvBuildError(
|
|
f"Unexpected cluster env build error: {e}"
|
|
) from e
|
|
|
|
def delete_configs(self):
|
|
if self.cluster_id:
|
|
self.sdk.delete_cluster(self.cluster_id)
|
|
if self.cluster_env_build_id:
|
|
self.sdk.delete_cluster_environment_build(self.cluster_env_build_id)
|
|
if self.cluster_env_id:
|
|
self.sdk.delete_cluster_environment(self.cluster_env_id)
|
|
if self.cluster_compute_id:
|
|
self.sdk.delete_cluster_compute(self.cluster_compute_id)
|
|
|
|
def start_cluster(self, timeout: float = 600.0):
|
|
pass
|
|
|
|
def terminate_cluster(self):
|
|
pass
|
|
|
|
def get_cluster_address(self) -> str:
|
|
return f"anyscale://{self.project_name}/{self.cluster_name}"
|