ray/release/ray_release/glue.py
Kai Fricke 6c5229295e
[ci/release] Support running tests with different python versions (#24843)
OSS release tests currently run with hardcoded Python 3.7 base. In the future we will want to run tests on different python versions. 
This PR adds support for a new `python` field in the test configuration. The python field will determine both the base image used in the Buildkite runner docker container (for Ray client compatibility) and the base image for the Anyscale cluster environments. 

Note that in Buildkite, we will still only wait for the python 3.7 base image before kicking off tests. That is acceptable, as we can assume that most wheels finish in a similar time, so even if we wait for the 3.7 image and kick off a 3.8 test, that runner will wait maybe for 5-10 more minutes.
2022-05-17 17:03:12 +01:00

333 lines
12 KiB
Python

import os
import time
from typing import Optional, List
from ray_release.alerts.handle import handle_result
from ray_release.anyscale_util import get_cluster_name
from ray_release.buildkite.output import buildkite_group, buildkite_open_last
from ray_release.cluster_manager.full import FullClusterManager
from ray_release.command_runner.client_runner import ClientRunner
from ray_release.command_runner.job_runner import JobRunner
from ray_release.command_runner.sdk_runner import SDKRunner
from ray_release.config import (
Test,
DEFAULT_BUILD_TIMEOUT,
DEFAULT_CLUSTER_TIMEOUT,
DEFAULT_COMMAND_TIMEOUT,
DEFAULT_WAIT_FOR_NODES_TIMEOUT,
RELEASE_PACKAGE_DIR,
DEFAULT_AUTOSUSPEND_MINS,
validate_test,
)
from ray_release.template import load_test_cluster_env, load_test_cluster_compute
from ray_release.exception import (
ReleaseTestConfigError,
ReleaseTestSetupError,
CommandError,
PrepareCommandError,
CommandTimeout,
PrepareCommandTimeout,
TestCommandError,
TestCommandTimeout,
LocalEnvSetupError,
ClusterEnvCreateError,
)
from ray_release.file_manager.job_file_manager import JobFileManager
from ray_release.file_manager.remote_task import RemoteTaskFileManager
from ray_release.file_manager.session_controller import SessionControllerFileManager
from ray_release.logger import logger
from ray_release.reporter.reporter import Reporter
from ray_release.result import Result, handle_exception
from ray_release.util import (
run_bash_script,
get_pip_packages,
reinstall_anyscale_dependencies,
)
type_str_to_command_runner = {
"command": SDKRunner,
"sdk_command": SDKRunner,
"job": JobRunner,
"client": ClientRunner,
}
command_runner_to_cluster_manager = {
SDKRunner: FullClusterManager,
ClientRunner: FullClusterManager,
JobRunner: FullClusterManager,
}
file_manager_str_to_file_manager = {
"sdk": SessionControllerFileManager,
"client": RemoteTaskFileManager,
"job": JobFileManager,
}
command_runner_to_file_manager = {
SDKRunner: SessionControllerFileManager,
ClientRunner: RemoteTaskFileManager,
JobFileManager: JobFileManager,
}
uploader_str_to_uploader = {"client": None, "s3": None, "command_runner": None}
def run_release_test(
test: Test,
anyscale_project: str,
result: Result,
ray_wheels_url: str,
reporters: Optional[List[Reporter]] = None,
smoke_test: bool = False,
cluster_id: Optional[str] = None,
cluster_env_id: Optional[str] = None,
no_terminate: bool = False,
) -> Result:
buildkite_group(":spiral_note_pad: Loading test configuration")
validate_test(test)
result.wheels_url = ray_wheels_url
result.stable = test.get("stable", True)
result.smoke_test = smoke_test
buildkite_url = os.getenv("BUILDKITE_BUILD_URL", "")
if buildkite_url:
buildkite_url += "#" + os.getenv("BUILDKITE_JOB_ID", "")
result.buildkite_url = buildkite_url
working_dir = test["working_dir"]
old_wd = os.getcwd()
new_wd = os.path.join(RELEASE_PACKAGE_DIR, working_dir)
os.chdir(new_wd)
start_time = time.monotonic()
run_type = test["run"].get("type", "sdk_command")
command_runner_cls = type_str_to_command_runner.get(run_type)
if not command_runner_cls:
raise ReleaseTestConfigError(
f"Unknown command runner type: {run_type}. Must be one of "
f"{list(type_str_to_command_runner.keys())}"
)
cluster_manager_cls = command_runner_to_cluster_manager[command_runner_cls]
file_manager_str = test["run"].get("file_manager", None)
if file_manager_str:
if file_manager_str not in file_manager_str_to_file_manager:
raise ReleaseTestConfigError(
f"Unknown file manager: {file_manager_str}. Must be one of "
f"{list(file_manager_str_to_file_manager.keys())}"
)
file_manager_cls = file_manager_str_to_file_manager[file_manager_str]
else:
file_manager_cls = command_runner_to_file_manager[command_runner_cls]
# Instantiate managers and command runner
try:
cluster_manager = cluster_manager_cls(test["name"], anyscale_project)
file_manager = file_manager_cls(cluster_manager=cluster_manager)
command_runner = command_runner_cls(cluster_manager, file_manager, working_dir)
except Exception as e:
raise ReleaseTestSetupError(f"Error setting up release test: {e}") from e
pipeline_exception = None
try:
# Load configs
cluster_env = load_test_cluster_env(test, ray_wheels_url=ray_wheels_url)
cluster_compute = load_test_cluster_compute(test)
if cluster_env_id:
try:
cluster_manager.cluster_env_id = cluster_env_id
cluster_manager.build_cluster_env()
cluster_manager.fetch_build_info()
logger.info(
"Using overridden cluster environment with ID "
f"{cluster_env_id} and build ID "
f"{cluster_manager.cluster_env_build_id}"
)
except Exception as e:
raise ClusterEnvCreateError(
f"Could not get existing overridden cluster environment "
f"{cluster_env_id}: {e}"
) from e
else:
cluster_manager.set_cluster_env(cluster_env)
cluster_manager.set_cluster_compute(cluster_compute)
buildkite_group(":nut_and_bolt: Setting up local environment")
driver_setup_script = test.get("driver_setup", None)
if driver_setup_script:
try:
run_bash_script(driver_setup_script)
except Exception as e:
raise LocalEnvSetupError(f"Driver setup script failed: {e}") from e
# Install local dependencies
command_runner.prepare_local_env(ray_wheels_url)
command_timeout = test["run"].get("timeout", DEFAULT_COMMAND_TIMEOUT)
# Re-install anyscale package as local dependencies might have changed
# from local env setup
reinstall_anyscale_dependencies()
# Print installed pip packages
buildkite_group(":bulb: Local environment information")
pip_packages = get_pip_packages()
pip_package_string = "\n".join(pip_packages)
logger.info(f"Installed python packages:\n{pip_package_string}")
# Start cluster
if cluster_id:
buildkite_group(":rocket: Using existing cluster")
# Re-use existing cluster ID for development
cluster_manager.cluster_id = cluster_id
cluster_manager.cluster_name = get_cluster_name(cluster_id)
else:
buildkite_group(":gear: Building cluster environment")
build_timeout = test["run"].get("build_timeout", DEFAULT_BUILD_TIMEOUT)
if cluster_env_id:
cluster_manager.cluster_env_id = cluster_env_id
cluster_manager.build_configs(timeout=build_timeout)
cluster_timeout = test["run"].get(
"session_timeout", DEFAULT_CLUSTER_TIMEOUT
)
autosuspend_mins = test["cluster"].get("autosuspend_mins", None)
if autosuspend_mins:
cluster_manager.autosuspend_minutes = autosuspend_mins
else:
cluster_manager.autosuspend_minutes = min(
DEFAULT_AUTOSUSPEND_MINS, int(command_timeout / 60) + 10
)
buildkite_group(":rocket: Starting up cluster")
cluster_manager.start_cluster(timeout=cluster_timeout)
result.cluster_url = cluster_manager.get_cluster_url()
# Upload files
buildkite_group(":wrench: Preparing remote environment")
command_runner.prepare_remote_env()
wait_for_nodes = test["run"].get("wait_for_nodes", None)
if wait_for_nodes:
buildkite_group(":stopwatch: Waiting for nodes to come up")
num_nodes = test["run"]["wait_for_nodes"]["num_nodes"]
wait_timeout = test["run"]["wait_for_nodes"].get(
"timeout", DEFAULT_WAIT_FOR_NODES_TIMEOUT
)
command_runner.wait_for_nodes(num_nodes, wait_timeout)
prepare_cmd = test["run"].get("prepare", None)
if prepare_cmd:
prepare_timeout = test["run"].get("prepare_timeout", command_timeout)
try:
command_runner.run_prepare_command(prepare_cmd, timeout=prepare_timeout)
except CommandError as e:
raise PrepareCommandError(e)
except CommandTimeout as e:
raise PrepareCommandTimeout(e)
buildkite_group(":runner: Running test script")
command = test["run"]["script"]
command_env = {}
if smoke_test:
command = f"{command} --smoke-test"
command_env["IS_SMOKE_TEST"] = "1"
is_long_running = test["run"].get("long_running", False)
try:
command_runner.run_command(
command, env=command_env, timeout=command_timeout
)
except CommandError as e:
raise TestCommandError(e)
except CommandTimeout as e:
if not is_long_running:
# Only raise error if command is not long running
raise TestCommandTimeout(e)
buildkite_group(":floppy_disk: Fetching results")
try:
command_results = command_runner.fetch_results()
except Exception as e:
logger.error("Could not fetch results for test command")
logger.exception(e)
command_results = {}
# Postprocess result:
if "last_update" in command_results:
command_results["last_update_diff"] = time.time() - command_results.get(
"last_update", 0.0
)
if smoke_test:
command_results["smoke_test"] = True
result.results = command_results
result.status = "finished"
except Exception as e:
logger.exception(e)
buildkite_open_last()
pipeline_exception = e
try:
last_logs = command_runner.get_last_logs()
except Exception as e:
logger.error(f"Error fetching logs: {e}")
last_logs = "No logs could be retrieved."
result.last_logs = last_logs
if not no_terminate:
buildkite_group(":earth_africa: Terminating cluster")
try:
cluster_manager.terminate_cluster(wait=False)
except Exception as e:
logger.error(f"Could not terminate cluster: {e}")
time_taken = time.monotonic() - start_time
result.runtime = time_taken
os.chdir(old_wd)
if not pipeline_exception:
buildkite_group(":mag: Interpreting results")
# Only handle results if we didn't run into issues earlier
try:
handle_result(test, result)
except Exception as e:
pipeline_exception = e
if pipeline_exception:
buildkite_group(":rotating_light: Handling errors")
exit_code, error_type, runtime = handle_exception(pipeline_exception)
result.return_code = exit_code.value
result.status = error_type
if runtime is not None:
result.runtime = runtime
buildkite_group(":memo: Reporting results", open=True)
reporters = reporters or []
for reporter in reporters:
try:
reporter.report_result(test, result)
except Exception as e:
logger.error(f"Error reporting results via {type(reporter)}: {e}")
if pipeline_exception:
raise pipeline_exception
return result