2022-02-16 17:35:02 +00:00
|
|
|
import time
|
|
|
|
|
|
|
|
from ray_release.exception import (
|
|
|
|
ClusterEnvBuildError,
|
|
|
|
ClusterEnvBuildTimeout,
|
|
|
|
ClusterEnvCreateError,
|
|
|
|
ClusterComputeCreateError,
|
|
|
|
)
|
|
|
|
from ray_release.logger import logger
|
|
|
|
from ray_release.cluster_manager.cluster_manager import ClusterManager
|
|
|
|
from ray_release.util import format_link, anyscale_cluster_env_build_url
|
|
|
|
|
|
|
|
REPORT_S = 30.0
|
|
|
|
|
|
|
|
|
|
|
|
class MinimalClusterManager(ClusterManager):
|
|
|
|
"""Minimal manager.
|
|
|
|
|
|
|
|
Builds app config and compute template but does not start or stop session.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def create_cluster_env(self, _repeat: bool = True):
|
|
|
|
assert self.cluster_env_id is None
|
|
|
|
|
|
|
|
if self.cluster_env:
|
|
|
|
assert self.cluster_env_name
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
f"Test uses a cluster env with name "
|
|
|
|
f"{self.cluster_env_name}. Looking up existing "
|
|
|
|
f"cluster envs with this name."
|
|
|
|
)
|
|
|
|
|
|
|
|
paging_token = None
|
|
|
|
while not self.cluster_env_id:
|
|
|
|
result = self.sdk.search_cluster_environments(
|
|
|
|
dict(
|
|
|
|
project_id=self.project_id,
|
|
|
|
name=dict(equals=self.cluster_env_name),
|
|
|
|
paging=dict(count=50, token=paging_token),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
paging_token = result.metadata.next_paging_token
|
|
|
|
|
|
|
|
for res in result.results:
|
|
|
|
if res.name == self.cluster_env_name:
|
|
|
|
self.cluster_env_id = res.id
|
|
|
|
logger.info(
|
|
|
|
f"Cluster env already exists with ID "
|
|
|
|
f"{self.cluster_env_id}"
|
|
|
|
)
|
|
|
|
break
|
|
|
|
|
|
|
|
if not paging_token or self.cluster_env_id:
|
|
|
|
break
|
|
|
|
|
|
|
|
if not self.cluster_env_id:
|
|
|
|
logger.info("Cluster env not found. Creating new one.")
|
|
|
|
try:
|
|
|
|
result = self.sdk.create_cluster_environment(
|
|
|
|
dict(
|
|
|
|
name=self.cluster_env_name,
|
|
|
|
project_id=self.project_id,
|
|
|
|
config_json=self.cluster_env,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
self.cluster_env_id = result.result.id
|
|
|
|
except Exception as e:
|
|
|
|
if _repeat:
|
|
|
|
logger.warning(
|
|
|
|
f"Got exception when trying to create cluster "
|
|
|
|
f"env: {e}. Sleeping for 10 seconds and then "
|
|
|
|
f"try again once..."
|
|
|
|
)
|
|
|
|
time.sleep(10)
|
|
|
|
return self.create_cluster_env(_repeat=False)
|
|
|
|
|
|
|
|
raise ClusterEnvCreateError("Could not create cluster env.") from e
|
|
|
|
|
|
|
|
logger.info(f"Cluster env created with ID {self.cluster_env_id}")
|
|
|
|
|
|
|
|
def build_cluster_env(self, timeout: float = 600.0):
|
|
|
|
assert self.cluster_env_id
|
|
|
|
assert self.cluster_env_build_id is None
|
|
|
|
|
|
|
|
# Fetch build
|
|
|
|
build_id = None
|
|
|
|
last_status = None
|
2022-03-22 09:45:22 +00:00
|
|
|
error_message = None
|
|
|
|
config_json = None
|
2022-02-16 17:35:02 +00:00
|
|
|
result = self.sdk.list_cluster_environment_builds(self.cluster_env_id)
|
|
|
|
for build in sorted(result.results, key=lambda b: b.created_at):
|
|
|
|
build_id = build.id
|
|
|
|
last_status = build.status
|
2022-03-22 09:45:22 +00:00
|
|
|
error_message = build.error_message
|
|
|
|
config_json = build.config_json
|
2022-02-16 17:35:02 +00:00
|
|
|
|
|
|
|
if build.status == "failed":
|
|
|
|
continue
|
|
|
|
|
2022-03-22 09:45:22 +00:00
|
|
|
elif build.status == "succeeded":
|
2022-02-16 17:35:02 +00:00
|
|
|
logger.info(
|
|
|
|
f"Link to cluster env build: "
|
|
|
|
f"{format_link(anyscale_cluster_env_build_url(build_id))}"
|
|
|
|
)
|
|
|
|
self.cluster_env_build_id = build_id
|
|
|
|
return
|
2022-03-22 09:45:22 +00:00
|
|
|
else:
|
|
|
|
# If the build is neither failed nor succeeded, it is still
|
|
|
|
# going on
|
|
|
|
break
|
2022-02-16 17:35:02 +00:00
|
|
|
|
|
|
|
if last_status == "failed":
|
2022-03-22 09:45:22 +00:00
|
|
|
logger.info(f"Previous cluster env build failed: {error_message}")
|
|
|
|
logger.info("Starting new cluster env build...")
|
|
|
|
|
|
|
|
# Retry build
|
|
|
|
result = self.sdk.create_cluster_environment_build(
|
|
|
|
dict(
|
|
|
|
cluster_environment_id=self.cluster_env_id, config_json=config_json
|
|
|
|
)
|
|
|
|
)
|
|
|
|
build_id = result.result.id
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
f"Link to cluster env build: "
|
|
|
|
f"{format_link(anyscale_cluster_env_build_url(build_id))}"
|
|
|
|
)
|
2022-02-16 17:35:02 +00:00
|
|
|
|
|
|
|
if not build_id:
|
|
|
|
raise ClusterEnvBuildError("No build found for cluster env.")
|
|
|
|
|
|
|
|
# Build found but not failed/finished yet
|
|
|
|
completed = False
|
|
|
|
start_wait = time.time()
|
|
|
|
next_report = start_wait + REPORT_S
|
|
|
|
timeout_at = time.monotonic() + timeout
|
|
|
|
logger.info(f"Waiting for build {build_id} to finish...")
|
|
|
|
logger.info(
|
|
|
|
f"Track progress here: "
|
|
|
|
f"{format_link(anyscale_cluster_env_build_url(build_id))}"
|
|
|
|
)
|
|
|
|
while not completed:
|
|
|
|
now = time.time()
|
|
|
|
if now > next_report:
|
|
|
|
logger.info(
|
|
|
|
f"... still waiting for build {build_id} to finish "
|
|
|
|
f"({int(now - start_wait)} seconds) ..."
|
|
|
|
)
|
|
|
|
next_report = next_report + REPORT_S
|
|
|
|
|
|
|
|
result = self.sdk.get_build(build_id)
|
|
|
|
build = result.result
|
|
|
|
|
|
|
|
if build.status == "failed":
|
|
|
|
raise ClusterEnvBuildError(
|
|
|
|
f"Cluster env build failed. Please see "
|
2022-03-22 09:45:22 +00:00
|
|
|
f"{anyscale_cluster_env_build_url(build_id)} for details. "
|
|
|
|
f"Error message: {build.error_message}"
|
2022-02-16 17:35:02 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
if build.status == "succeeded":
|
|
|
|
logger.info("Build succeeded.")
|
|
|
|
self.cluster_env_build_id = build_id
|
|
|
|
return
|
|
|
|
|
|
|
|
completed = build.status not in ["in_progress", "pending"]
|
|
|
|
|
|
|
|
if completed:
|
|
|
|
raise ClusterEnvBuildError(
|
|
|
|
f"Unknown build status: {build.status}. Please see "
|
|
|
|
f"{anyscale_cluster_env_build_url(build_id)} for details"
|
|
|
|
)
|
|
|
|
|
|
|
|
if time.monotonic() > timeout_at:
|
|
|
|
raise ClusterEnvBuildTimeout(
|
|
|
|
f"Time out when building cluster env {self.cluster_env_name}"
|
|
|
|
)
|
|
|
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
self.cluster_env_build_id = build_id
|
|
|
|
|
|
|
|
def fetch_build_info(self):
|
|
|
|
assert self.cluster_env_build_id
|
|
|
|
|
|
|
|
result = self.sdk.get_cluster_environment_build(self.cluster_env_build_id)
|
|
|
|
self.cluster_env = result.result.config_json
|
|
|
|
|
|
|
|
def create_cluster_compute(self, _repeat: bool = True):
|
|
|
|
assert self.cluster_compute_id is None
|
|
|
|
|
|
|
|
if self.cluster_compute:
|
|
|
|
assert self.cluster_compute
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
f"Tests uses compute template "
|
|
|
|
f"with name {self.cluster_compute_name}. "
|
|
|
|
f"Looking up existing cluster computes."
|
|
|
|
)
|
|
|
|
|
|
|
|
paging_token = None
|
|
|
|
while not self.cluster_compute_id:
|
|
|
|
result = self.sdk.search_cluster_computes(
|
|
|
|
dict(
|
|
|
|
project_id=self.project_id,
|
|
|
|
name=dict(equals=self.cluster_compute_name),
|
|
|
|
include_anonymous=True,
|
|
|
|
paging=dict(token=paging_token),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
paging_token = result.metadata.next_paging_token
|
|
|
|
|
|
|
|
for res in result.results:
|
|
|
|
if res.name == self.cluster_compute_name:
|
|
|
|
self.cluster_compute_id = res.id
|
|
|
|
logger.info(
|
|
|
|
f"Cluster compute already exists "
|
|
|
|
f"with ID {self.cluster_compute_id}"
|
|
|
|
)
|
|
|
|
break
|
|
|
|
|
|
|
|
if not paging_token:
|
|
|
|
break
|
|
|
|
|
|
|
|
if not self.cluster_compute_id:
|
|
|
|
logger.info(
|
|
|
|
f"Cluster compute not found. "
|
|
|
|
f"Creating with name {self.cluster_compute_name}."
|
|
|
|
)
|
|
|
|
try:
|
|
|
|
result = self.sdk.create_cluster_compute(
|
|
|
|
dict(
|
|
|
|
name=self.cluster_compute_name,
|
|
|
|
project_id=self.project_id,
|
|
|
|
config=self.cluster_compute,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
self.cluster_compute_id = result.result.id
|
|
|
|
except Exception as e:
|
|
|
|
if _repeat:
|
|
|
|
logger.warning(
|
|
|
|
f"Got exception when trying to create cluster "
|
|
|
|
f"compute: {e}. Sleeping for 10 seconds and then "
|
|
|
|
f"try again once..."
|
|
|
|
)
|
|
|
|
time.sleep(10)
|
|
|
|
return self.create_cluster_compute(_repeat=False)
|
|
|
|
|
|
|
|
raise ClusterComputeCreateError(
|
|
|
|
"Could not create cluster compute"
|
|
|
|
) from e
|
|
|
|
|
|
|
|
logger.info(
|
|
|
|
f"Cluster compute template created with "
|
|
|
|
f"name {self.cluster_compute_name} and "
|
|
|
|
f"ID {self.cluster_compute_id}"
|
|
|
|
)
|
|
|
|
|
|
|
|
def build_configs(self, timeout: float = 30.0):
|
|
|
|
try:
|
|
|
|
self.create_cluster_compute()
|
|
|
|
except AssertionError as e:
|
|
|
|
# If already exists, ignore
|
|
|
|
logger.warning(str(e))
|
|
|
|
except ClusterComputeCreateError as e:
|
|
|
|
raise e
|
|
|
|
except Exception as e:
|
|
|
|
raise ClusterComputeCreateError(
|
|
|
|
f"Unexpected cluster compute build error: {e}"
|
|
|
|
) from e
|
|
|
|
|
|
|
|
try:
|
|
|
|
self.create_cluster_env()
|
|
|
|
except AssertionError as e:
|
|
|
|
# If already exists, ignore
|
|
|
|
logger.warning(str(e))
|
|
|
|
except ClusterEnvCreateError as e:
|
|
|
|
raise e
|
|
|
|
except Exception as e:
|
|
|
|
raise ClusterEnvCreateError(
|
|
|
|
f"Unexpected cluster env create error: {e}"
|
|
|
|
) from e
|
|
|
|
|
|
|
|
try:
|
|
|
|
self.build_cluster_env(timeout=timeout)
|
|
|
|
except AssertionError as e:
|
|
|
|
# If already exists, ignore
|
|
|
|
logger.warning(str(e))
|
|
|
|
except (ClusterEnvBuildError, ClusterEnvBuildTimeout) as e:
|
|
|
|
raise e
|
|
|
|
except Exception as e:
|
|
|
|
raise ClusterEnvBuildError(
|
|
|
|
f"Unexpected cluster env build error: {e}"
|
|
|
|
) from e
|
|
|
|
|
|
|
|
def delete_configs(self):
|
|
|
|
if self.cluster_id:
|
|
|
|
self.sdk.delete_cluster(self.cluster_id)
|
|
|
|
if self.cluster_env_build_id:
|
|
|
|
self.sdk.delete_cluster_environment_build(self.cluster_env_build_id)
|
|
|
|
if self.cluster_env_id:
|
|
|
|
self.sdk.delete_cluster_environment(self.cluster_env_id)
|
|
|
|
if self.cluster_compute_id:
|
|
|
|
self.sdk.delete_cluster_compute(self.cluster_compute_id)
|
|
|
|
|
|
|
|
def start_cluster(self, timeout: float = 600.0):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def terminate_cluster(self):
|
|
|
|
pass
|
|
|
|
|
2022-03-11 16:31:21 +00:00
|
|
|
def get_cluster_address(self) -> str:
|
|
|
|
return f"anyscale://{self.project_name}/{self.cluster_name}"
|