[ci/release] Start cluster before connecting via anyscale connect (#18878)

This commit is contained in:
Kai Fricke 2021-09-24 16:17:06 +01:00 committed by GitHub
parent d52203ee03
commit e08d4253cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 66 deletions

View file

@ -802,9 +802,7 @@ def run_job(cluster_name: str, compute_tpl_name: str, cluster_env_name: str,
script_args: List[str], env_vars: Dict[str, str],
autosuspend: int) -> Tuple[int, str]:
# Start cluster and job
address = f"anyscale://{cluster_name}?cluster_compute={compute_tpl_name}" \
f"&cluster_env={cluster_env_name}&autosuspend={autosuspend}" \
"&&update=True"
address = f"anyscale://{cluster_name}?autosuspend={autosuspend}"
logger.info(f"Starting job {job_name} with Ray address: {address}")
env = copy.deepcopy(os.environ)
env.update(GLOBAL_CONFIG)
@ -1304,6 +1302,7 @@ def run_test_config(
session_url = None
runtime = None
anyscale.conf.CLI_TOKEN = GLOBAL_CONFIG["ANYSCALE_CLI_TOKEN"]
test_uses_ray_connect = test_config["run"].get("use_connect")
session_id = None
scd_id = None
@ -1360,17 +1359,71 @@ def run_test_config(
session_options["build_id"] = build_id
session_options["uses_app_config"] = True
if not test_config["run"].get("use_connect"):
session_id = create_and_wait_for_session(
sdk=sdk,
stop_event=stop_event,
session_name=session_name,
session_options=session_options,
)
# Start session
session_id = create_and_wait_for_session(
sdk=sdk,
stop_event=stop_event,
session_name=session_name,
session_options=session_options,
)
if test_config["run"].get("use_connect"):
assert compute_tpl_name, "Compute template must exist."
assert app_config_name, "Cluster environment must exist."
prepare_command = test_config["run"].get("prepare")
# Write test state json
test_state_file = os.path.join(local_dir, "test_state.json")
with open(test_state_file, "wt") as f:
json.dump({
"start_time": time.time(),
"test_name": test_name
}, f)
if prepare_command or not test_uses_ray_connect:
if test_uses_ray_connect:
logger.info("Found a prepare command, so pushing it "
"to the session.")
# Rsync up
logger.info("Syncing files to session...")
session_controller.push(
session_name=session_name,
source=None,
target=None,
config=None,
all_nodes=False,
)
logger.info("Syncing test state to session...")
session_controller.push(
session_name=session_name,
source=test_state_file,
target=state_json,
config=None,
all_nodes=False,
)
session_url = anyscale_session_url(
project_id=GLOBAL_CONFIG["ANYSCALE_PROJECT"],
session_id=session_id)
_check_stop(stop_event, "file_sync")
# Optionally run preparation command
if prepare_command:
logger.info(
f"Running preparation command: {prepare_command}")
scd_id, result = run_session_command(
sdk=sdk,
session_id=session_id,
cmd_to_run=prepare_command,
result_queue=result_queue,
env_vars=env_vars,
state_str="CMD_PREPARE")
_, _ = wait_for_session_command_to_complete(
result,
sdk=sdk,
scd_id=scd_id,
stop_event=stop_event,
state_str="CMD_PREPARE")
if test_uses_ray_connect:
script_args = test_config["run"].get("args", [])
if smoke_test:
script_args += ["--smoke-test"]
@ -1380,7 +1433,7 @@ def run_test_config(
# Build completed, use job timeout
result_queue.put(State("CMD_RUN", time.time(), None))
returncode, logs = run_job(
cluster_name=test_name,
cluster_name=session_name,
compute_tpl_name=compute_tpl_name,
cluster_env_name=app_config_name,
job_name=session_name,
@ -1392,56 +1445,6 @@ def run_test_config(
_process_finished_client_command(returncode, logs)
return
# Write test state json
test_state_file = os.path.join(local_dir, "test_state.json")
with open(test_state_file, "wt") as f:
json.dump({
"start_time": time.time(),
"test_name": test_name
}, f)
# Rsync up
logger.info("Syncing files to session...")
session_controller.push(
session_name=session_name,
source=None,
target=None,
config=None,
all_nodes=False,
)
logger.info("Syncing test state to session...")
session_controller.push(
session_name=session_name,
source=test_state_file,
target=state_json,
config=None,
all_nodes=False,
)
session_url = anyscale_session_url(
project_id=GLOBAL_CONFIG["ANYSCALE_PROJECT"],
session_id=session_id)
_check_stop(stop_event, "file_sync")
# Optionally run preparation command
prepare_command = test_config["run"].get("prepare")
if prepare_command:
logger.info(f"Running preparation command: {prepare_command}")
scd_id, result = run_session_command(
sdk=sdk,
session_id=session_id,
cmd_to_run=prepare_command,
result_queue=result_queue,
env_vars=env_vars,
state_str="CMD_PREPARE")
_, _ = wait_for_session_command_to_complete(
result,
sdk=sdk,
scd_id=scd_id,
stop_event=stop_event,
state_str="CMD_PREPARE")
# Run release test command
cmd_to_run = test_config["run"]["script"] + " "

View file

@ -4,8 +4,8 @@
compute_template: tpl_cpu_small.yaml
run:
# use_connect: True
# autosuspend_mins: 10
use_connect: True
autosuspend_mins: 10
timeout: 600
prepare: python wait_cluster.py 4 600
script: python workloads/train_small.py