[Test] Add various fixes to the nightly dashboard to improve signals (#17351)

* Add various fixes to the nightly dashboard to improve signals

* Fix issues
This commit is contained in:
SangBin Cho 2021-07-27 12:37:11 -07:00 committed by GitHub
parent 5879e3132e
commit e1cd8580a0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 129 additions and 31 deletions

View file

@ -254,6 +254,22 @@ class ReleaseTestTimeoutError(RuntimeError):
pass
class SessionTimeoutError(ReleaseTestTimeoutError):
pass
class FileSyncTimeoutError(ReleaseTestTimeoutError):
pass
class CommandTimeoutError(ReleaseTestTimeoutError):
pass
class PrepareCommandTimeoutError(ReleaseTestTimeoutError):
pass
class State:
def __init__(self, state: str, timestamp: float, data: Any):
self.state = state
@ -345,9 +361,22 @@ def find_ray_wheels(repo: str, branch: str, version: str):
return url
def _check_stop(stop_event: multiprocessing.Event):
def _check_stop(stop_event: multiprocessing.Event, timeout_type: str):
if stop_event.is_set():
raise ReleaseTestTimeoutError("Process timed out.")
if timeout_type == "prepare_command":
raise PrepareCommandTimeoutError(
"Process timed out in the prepare command stage.")
if timeout_type == "command":
raise CommandTimeoutError(
"Process timed out while running a command.")
elif timeout_type == "file_sync":
raise FileSyncTimeoutError(
"Process timed out while syncing files.")
elif timeout_type == "session":
raise SessionTimeoutError(
"Process timed out while starting a session.")
else:
assert False, "Unexpected timeout type."
def _deep_update(d, u):
@ -752,7 +781,7 @@ def create_and_wait_for_session(
start_wait = time.time()
next_report = start_wait + REPORT_S
while not completed:
_check_stop(stop_event)
_check_stop(stop_event, "session")
now = time.time()
if now > next_report:
logger.info(f"... still waiting for session {session_name} "
@ -771,11 +800,9 @@ def create_and_wait_for_session(
def run_session_command(sdk: AnyscaleSDK,
session_id: str,
cmd_to_run: str,
stop_event: multiprocessing.Event,
result_queue: multiprocessing.Queue,
env_vars: Dict[str, str],
state_str: str = "CMD_RUN",
kick_off_only: bool = False) -> Tuple[str, int]:
state_str: str = "CMD_RUN") -> Tuple[str, int]:
full_cmd = " ".join(f"{k}={v}"
for k, v in env_vars.items()) + " " + cmd_to_run
@ -788,15 +815,23 @@ def run_session_command(sdk: AnyscaleSDK,
dict(session_id=session_id, shell_command=full_cmd))
scd_id = result.result.id
return scd_id, result
def wait_for_session_command_to_complete(create_session_command_result,
sdk: AnyscaleSDK,
scd_id: str,
stop_event: multiprocessing.Event,
state_str: str = "CMD_RUN"):
result = create_session_command_result
completed = result.result.finished_at is not None
if kick_off_only:
return scd_id, 0
start_wait = time.time()
next_report = start_wait + REPORT_S
while not completed:
_check_stop(stop_event)
if state_str == "CMD_RUN":
_check_stop(stop_event, "command")
elif state_str == "CMD_PREPARE":
_check_stop(stop_event, "prepare_command")
now = time.time()
if now > next_report:
@ -809,12 +844,13 @@ def run_session_command(sdk: AnyscaleSDK,
time.sleep(1)
status_code = result.result.status_code
runtime = time.time() - start_wait
if status_code != 0:
raise RuntimeError(
f"Command returned non-success status: {status_code}")
return scd_id, status_code
return status_code, runtime
def get_command_logs(session_controller: SessionController,
@ -972,6 +1008,8 @@ def run_test_config(
project_id: str,
test_name: str,
test_config: Dict[Any, Any],
commit_url: str,
session_name: str = None,
smoke_test: bool = False,
no_terminate: bool = False,
kick_off_only: bool = False,
@ -1004,7 +1042,8 @@ def run_test_config(
stop_event = multiprocessing.Event()
result_queue = multiprocessing.Queue()
session_name = f"{test_name}_{int(time.time())}"
if not session_name:
session_name = f"{test_name}_{int(time.time())}"
temp_dir = tempfile.mkdtemp()
@ -1067,7 +1106,10 @@ def run_test_config(
def _process_finished_command(session_controller: SessionController,
scd_id: str,
results: Optional[Dict] = None):
results: Optional[Dict] = None,
runtime: int = None,
commit_url: str = None,
session_url: str = None):
logger.info("Command finished successfully.")
if results_json:
results = results or get_remote_json_content(
@ -1103,6 +1145,10 @@ def run_test_config(
logger.info("Usually I would have fetched the results and "
"artifacts and stored them on S3.")
# Add these metadata here to avoid changing SQL schema.
results["_runtime"] = runtime
results["_session_url"] = session_url
results["_commit_url"] = commit_url
result_queue.put(
State(
"END",
@ -1140,7 +1186,7 @@ def run_test_config(
"passed": int(returncode == 0),
}
results["returncode"]: returncode
results["returncode"] = returncode
_update_results(results)
@ -1255,20 +1301,28 @@ def run_test_config(
all_nodes=False,
)
_check_stop(stop_event)
session_url = anyscale_session_url(
project_id=GLOBAL_CONFIG["ANYSCALE_PROJECT"],
session_id=session_id)
_check_stop(stop_event, "file_sync")
# Optionally run preparation command
prepare_command = test_config["run"].get("prepare")
if prepare_command:
logger.info(f"Running preparation command: {prepare_command}")
run_session_command(
scd_id, result = run_session_command(
sdk=sdk,
session_id=session_id,
cmd_to_run=prepare_command,
stop_event=stop_event,
result_queue=result_queue,
env_vars=env_vars,
state_str="CMD_PREPARE")
_, _ = wait_for_session_command_to_complete(
result,
sdk=sdk,
scd_id=scd_id,
stop_event=stop_event,
state_str="CMD_PREPARE")
# Run release test command
cmd_to_run = test_config["run"]["script"] + " "
@ -1280,19 +1334,27 @@ def run_test_config(
if smoke_test:
cmd_to_run += " --smoke-test"
scd_id, status_code = run_session_command(
scd_id, result = run_session_command(
sdk=sdk,
session_id=session_id,
cmd_to_run=cmd_to_run,
stop_event=stop_event,
result_queue=result_queue,
env_vars=env_vars,
state_str="CMD_RUN",
kick_off_only=kick_off_only)
state_str="CMD_RUN")
if not kick_off_only:
_, runtime = wait_for_session_command_to_complete(
result,
sdk=sdk,
scd_id=scd_id,
stop_event=stop_event,
state_str="CMD_RUN")
_process_finished_command(
session_controller=session_controller, scd_id=scd_id)
session_controller=session_controller,
scd_id=scd_id,
runtime=runtime,
session_url=session_url,
commit_url=commit_url)
else:
result_queue.put(
State("END", time.time(), {
@ -1306,8 +1368,9 @@ def run_test_config(
logs = str(e)
if scd_id is not None:
try:
logs = get_command_logs(session_controller, scd_id,
test_config.get("log_lines", 50))
logs = logs + "; Command logs:" + get_command_logs(
session_controller, scd_id,
test_config.get("log_lines", 50))
except Exception as e2:
logger.error(e2, exc_info=True)
@ -1317,11 +1380,35 @@ def run_test_config(
_process_finished_command(
session_controller=session_controller, scd_id=scd_id)
else:
timeout_type = ""
runtime = None
if isinstance(e, CommandTimeoutError):
timeout_type = "timeout"
runtime = 0
elif (isinstance(e, PrepareCommandTimeoutError)
or isinstance(e, FileSyncTimeoutError)
or isinstance(e, SessionTimeoutError)):
timeout_type = "infra_timeout"
runtime = None
elif isinstance(e, RuntimeError):
timeout_type = "runtime_error"
runtime = 0
else:
timeout_type = "unknown timeout"
runtime = None
# Add these metadata here to avoid changing SQL schema.
results = {}
results["_runtime"] = runtime
results["_session_url"] = session_url
results["_commit_url"] = commit_url
result_queue.put(
State("END", time.time(), {
"status": "timeout",
"last_logs": logs
}))
State(
"END", time.time(), {
"status": timeout_type,
"last_logs": logs,
"results": results
}))
finally:
if no_terminate:
logger.warning(
@ -1540,12 +1627,14 @@ def run_test_config(
def run_test(test_config_file: str,
test_name: str,
project_id: str,
commit_url: str,
category: str = "unspecified",
smoke_test: bool = False,
no_terminate: bool = False,
kick_off_only: bool = False,
check_progress=False,
report=True):
report=True,
session_name=None):
with open(test_config_file, "rt") as f:
test_configs = yaml.load(f, Loader=yaml.FullLoader)
@ -1586,6 +1675,8 @@ def run_test(test_config_file: str,
project_id,
test_name,
test_config,
commit_url,
session_name=session_name,
smoke_test=smoke_test,
no_terminate=no_terminate,
kick_off_only=kick_off_only,
@ -1673,6 +1764,11 @@ if __name__ == "__main__":
help="Category name, e.g. `release-1.3.0` (will be saved in database)")
parser.add_argument(
"--smoke-test", action="store_true", help="Finish quickly for testing")
parser.add_argument(
"--session-name",
required=False,
type=str,
help="Name of the session to run this test.")
args, _ = parser.parse_known_args()
if not GLOBAL_CONFIG["ANYSCALE_PROJECT"]:
@ -1700,10 +1796,12 @@ if __name__ == "__main__":
test_config_file=test_config_file,
test_name=args.test_name,
project_id=GLOBAL_CONFIG["ANYSCALE_PROJECT"],
commit_url=url,
category=args.category,
smoke_test=args.smoke_test,
no_terminate=args.no_terminate or args.kick_off_only,
kick_off_only=args.kick_off_only,
check_progress=args.check,
report=not args.no_report,
session_name=args.session_name,
)

View file

@ -11,4 +11,4 @@
timeout: 600
prepare: sleep 0
script: python inference.py