From 5ea47474485458d4f89ee5561533c462e7d13a5f Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Thu, 11 Aug 2022 07:01:01 -0700 Subject: [PATCH] [Core][State Observability] Nightly release test for state API (#26610) * Initial * Correctness test skeleton * Added limit for listing * Updated grpc config * no more waiting * metrics * Updated constant and add test * renamed * actors * actors * actors * dada * actor dead? * Script * correct test name * limit * Added timeout * release test /2 * Merged * format+doc * wip Signed-off-by: rickyyx * revert packag-lock Signed-off-by: rickyyx * wip * results Signed-off-by: rickyx Signed-off-by: rickyyx Signed-off-by: rickyyx Signed-off-by: rickyx Co-authored-by: rickyyx --- python/ray/_private/state_api_test_utils.py | 294 ++++++++++++ .../stress_tests/test_state_api_scale.py | 423 ++++++++++++++++++ .../test_state_api_with_other_tests.py | 215 +++++++++ release/release_tests.yaml | 53 +++ 4 files changed, 985 insertions(+) create mode 100644 python/ray/_private/state_api_test_utils.py create mode 100644 release/nightly_tests/stress_tests/test_state_api_scale.py create mode 100644 release/nightly_tests/stress_tests/test_state_api_with_other_tests.py diff --git a/python/ray/_private/state_api_test_utils.py b/python/ray/_private/state_api_test_utils.py new file mode 100644 index 000000000..7824eaa11 --- /dev/null +++ b/python/ray/_private/state_api_test_utils.py @@ -0,0 +1,294 @@ +import asyncio +from copy import deepcopy +from collections import defaultdict +import concurrent.futures +from dataclasses import dataclass, field +import logging +import numpy as np +import pprint +import time +import traceback +from typing import Callable, Dict, List, Optional +import ray +from ray.actor import ActorHandle + + +@dataclass +class StateAPIMetric: + latency_sec: float + result_size: int + + +@dataclass +class StateAPICallSpec: + api: Callable + verify_cb: Callable + kwargs: Dict = field(default_factory=dict) + + +@dataclass +class StateAPIStats: + pending_calls: int = 0 + total_calls: int = 0 + calls: Dict = field(default_factory=lambda: defaultdict(list)) + + +GLOBAL_STATE_STATS = StateAPIStats() + +STATE_LIST_LIMIT = int(1e6) # 1m +STATE_LIST_TIMEOUT = 600 # 10min + + +def invoke_state_api( + verify_cb: Callable, + state_api_fn: Callable, + state_stats: StateAPIStats = GLOBAL_STATE_STATS, + key_suffix: Optional[str] = None, + **kwargs, +): + """Invoke a State API + + Args: + - verify_cb: Callback that takes in the response from `state_api_fn` and + returns a boolean, indicating the correctness of the results. + - state_api_fn: Function of the state API + - state_stats: Stats + - kwargs: Keyword arguments to be forwarded to the `state_api_fn` + """ + if "timeout" not in kwargs: + kwargs["timeout"] = STATE_LIST_TIMEOUT + + # Suppress missing output warning + kwargs["raise_on_missing_output"] = False + + res = None + try: + state_stats.total_calls += 1 + state_stats.pending_calls += 1 + + t_start = time.perf_counter() + res = state_api_fn(**kwargs) + t_end = time.perf_counter() + + metric = StateAPIMetric(t_end - t_start, len(res)) + if key_suffix: + key = f"{state_api_fn.__name__}_{key_suffix}" + else: + key = state_api_fn.__name__ + state_stats.calls[key].append(metric) + assert verify_cb(res), f"Calling State API failed. len(res)=({len(res)}): {res}" + except Exception as e: + traceback.print_exc() + assert ( + False + ), f"Calling {state_api_fn.__name__}({kwargs}) failed with {repr(e)}." + finally: + state_stats.pending_calls -= 1 + + return res + + +def aggregate_perf_results(state_stats: StateAPIStats = GLOBAL_STATE_STATS): + """Aggregate stats of state API calls + + Return: + This returns a dict of below fields: + - max_{api_key_name}_latency_sec: + Max latency of call to {api_key_name} + - {api_key_name}_result_size_with_max_latency: + The size of the result (or the number of bytes for get_log API) + for the max latency invocation + - avg/p99/p95/p50_{api_key_name}_latency_sec: + The percentile latency stats + - avg_state_api_latency_sec: + The average latency of all the state apis tracked + """ + # Prevent iteration when modifying error + state_stats = deepcopy(state_stats) + perf_result = {} + for api_key_name, metrics in state_stats.calls.items(): + # Per api aggregation + # Max latency + latency_key = f"max_{api_key_name}_latency_sec" + size_key = f"{api_key_name}_result_size_with_max_latency" + metric = max(metrics, key=lambda metric: metric.latency_sec) + + perf_result[latency_key] = metric.latency_sec + perf_result[size_key] = metric.result_size + + latency_list = np.array([metric.latency_sec for metric in metrics]) + # avg latency + key = f"avg_{api_key_name}_latency_sec" + perf_result[key] = np.average(latency_list) + + # p99 latency + key = f"p99_{api_key_name}_latency_sec" + perf_result[key] = np.percentile(latency_list, 99) + + # p95 latency + key = f"p95_{api_key_name}_latency_sec" + perf_result[key] = np.percentile(latency_list, 95) + + # p50 latency + key = f"p50_{api_key_name}_latency_sec" + perf_result[key] = np.percentile(latency_list, 50) + + all_state_api_latency = sum( + metric.latency_sec + for metric_samples in state_stats.calls.values() + for metric in metric_samples + ) + + perf_result["avg_state_api_latency_sec"] = ( + (all_state_api_latency / state_stats.total_calls) + if state_stats.total_calls != 0 + else -1 + ) + + return perf_result + + +@ray.remote +class StateAPIGeneratorActor: + def __init__( + self, + apis: List[StateAPICallSpec], + call_interval_s: float = 5.0, + print_interval_s: float = 20.0, + wait_after_stop: bool = True, + ) -> None: + """An actor that periodically issues state API + + Args: + - apis: List of StateAPICallSpec + - call_interval_s: State apis in the `apis` will be issued + every `call_interval_s` seconds. + - print_interval_s: How frequent state api stats will be dumped. + - wait_after_stop: When true, call to `ray.get(actor.stop.remote())` + will wait for all pending state APIs to return. + Setting it to `False` might miss some long-running state apis calls. + """ + # Configs + self._apis = apis + self._call_interval_s = call_interval_s + self._print_interval_s = print_interval_s + self._wait_after_cancel = wait_after_stop + self._logger = logging.getLogger(self.__class__.__name__) + + # States + self._tasks = None + self._fut_queue = None + self._executor = None + self._loop = None + self._stopping = False + self._stopped = False + self._stats = StateAPIStats() + + async def start(self): + # Run the periodic api generator + self._fut_queue = asyncio.Queue() + self._executor = concurrent.futures.ThreadPoolExecutor() + + self._tasks = [ + asyncio.ensure_future(awt) + for awt in [ + self._run_generator(), + self._run_result_waiter(), + self._run_stats_reporter(), + ] + ] + await asyncio.gather(*self._tasks) + + def call(self, fn, verify_cb, **kwargs): + def run_fn(): + try: + self._logger.debug(f"calling {fn.__name__}({kwargs})") + return invoke_state_api( + verify_cb, fn, state_stats=self._stats, **kwargs + ) + except Exception as e: + self._logger.warning(f"{fn.__name__}({kwargs}) failed with: {repr(e)}") + return None + + fut = asyncio.get_running_loop().run_in_executor(self._executor, run_fn) + return fut + + async def _run_stats_reporter(self): + while not self._stopped: + # Keep the reporter running until all pending apis finish and the bool + # `self._stopped` is then True + self._logger.info(pprint.pprint(aggregate_perf_results(self._stats))) + try: + await asyncio.sleep(self._print_interval_s) + except asyncio.CancelledError: + self._logger.info( + "_run_stats_reporter cancelled, " + f"waiting for all api {self._stats.pending_calls}calls to return..." + ) + + async def _run_generator(self): + try: + while not self._stopping: + # Run the state API in another thread + for api_spec in self._apis: + fut = self.call(api_spec.api, api_spec.verify_cb, **api_spec.kwargs) + self._fut_queue.put_nowait(fut) + + await asyncio.sleep(self._call_interval_s) + except asyncio.CancelledError: + # Stop running + self._logger.info("_run_generator cancelled, now stopping...") + return + + async def _run_result_waiter(self): + try: + while not self._stopping: + fut = await self._fut_queue.get() + await fut + except asyncio.CancelledError: + self._logger.info( + f"_run_result_waiter cancelled, cancelling {self._fut_queue.qsize()} " + "pending futures..." + ) + while not self._fut_queue.empty(): + fut = self._fut_queue.get_nowait() + if self._wait_after_cancel: + await fut + else: + # Ignore the queue futures if we are not + # waiting on them after stop() called + fut.cancel() + return + + def get_stats(self): + # deep copy to prevent race between reporting and modifying stats + return aggregate_perf_results(self._stats) + + def ready(self): + pass + + def stop(self): + self._stopping = True + self._logger.debug(f"calling stop, canceling {len(self._tasks)} tasks") + for task in self._tasks: + task.cancel() + + # This will block the stop() function until all futures are cancelled + # if _wait_after_cancel=True. When _wait_after_cancel=False, it will still + # wait for any in-progress futures. + # See: https://docs.python.org/3.8/library/concurrent.futures.html + self._executor.shutdown(wait=self._wait_after_cancel) + self._stopped = True + + +def periodic_invoke_state_apis_with_actor(*args, **kwargs) -> ActorHandle: + current_node_ip = ray._private.worker.global_worker.node_ip_address + # Schedule the actor on the current node. + actor = StateAPIGeneratorActor.options( + resources={f"node:{current_node_ip}": 0.001} + ).remote(*args, **kwargs) + print("Waiting for state api actor to be ready...") + ray.get(actor.ready.remote()) + print("State api actor is ready now.") + actor.start.remote() + return actor diff --git a/release/nightly_tests/stress_tests/test_state_api_scale.py b/release/nightly_tests/stress_tests/test_state_api_scale.py new file mode 100644 index 000000000..9b2a87c39 --- /dev/null +++ b/release/nightly_tests/stress_tests/test_state_api_scale.py @@ -0,0 +1,423 @@ +import click +import json +import ray +from ray._private.ray_constants import LOG_PREFIX_ACTOR_NAME +from ray._private.state_api_test_utils import ( + STATE_LIST_LIMIT, + StateAPIMetric, + aggregate_perf_results, + invoke_state_api, + GLOBAL_STATE_STATS, +) + +import ray._private.test_utils as test_utils +import tqdm +import asyncio +import time +import os + +from ray.experimental.state.api import ( + get_log, + list_actors, + list_objects, + list_tasks, +) + +GiB = 1024 * 1024 * 1024 +MiB = 1024 * 1024 + + +# We set num_cpus to zero because this actor will mostly just block on I/O. +@ray.remote(num_cpus=0) +class SignalActor: + def __init__(self): + self.ready_event = asyncio.Event() + + def send(self, clear=False): + self.ready_event.set() + if clear: + self.ready_event.clear() + + async def wait(self, should_wait=True): + if should_wait: + await self.ready_event.wait() + + +def invoke_state_api_n(*args, **kwargs): + NUM_API_CALL_SAMPLES = 10 + for _ in range(NUM_API_CALL_SAMPLES): + invoke_state_api(*args, **kwargs) + + +def test_many_tasks(num_tasks: int): + if num_tasks == 0: + print("Skipping test with no tasks") + return + # No running tasks + invoke_state_api( + lambda res: len(res) == 0, + list_tasks, + filters=[("name", "=", "pi4_sample()"), ("scheduling_state", "=", "RUNNING")], + key_suffix="0", + limit=STATE_LIST_LIMIT, + ) + + # Task definition adopted from: + # https://docs.ray.io/en/master/ray-core/examples/highly_parallel.html + from random import random + + SAMPLES = 100 + + @ray.remote + def pi4_sample(signal): + in_count = 0 + for _ in range(SAMPLES): + x, y = random(), random() + if x * x + y * y <= 1: + in_count += 1 + # Block on signal + ray.get(signal.wait.remote()) + return in_count + + results = [] + signal = SignalActor.remote() + for _ in tqdm.trange(num_tasks, desc="Launching tasks"): + results.append(pi4_sample.remote(signal)) + + invoke_state_api_n( + lambda res: len(res) == num_tasks, + list_tasks, + filters=[("name", "=", "pi4_sample()")], + key_suffix=f"{num_tasks}", + limit=STATE_LIST_LIMIT, + ) + + print("Waiting for tasks to finish...") + ray.get(signal.send.remote()) + ray.get(results) + + # Clean up + # All compute tasks done other than the signal actor + invoke_state_api( + lambda res: len(res) == 0, + list_tasks, + filters=[("name", "=", "pi4_sample()"), ("scheduling_state", "=", "RUNNING")], + key_suffix="0", + limit=STATE_LIST_LIMIT, + ) + + del signal + + +def test_many_actors(num_actors: int): + if num_actors == 0: + print("Skipping test with no actors") + return + + @ray.remote + class TestActor: + def running(self): + return True + + def exit(self): + ray.actor.exit_actor() + + actor_class_name = TestActor.__ray_metadata__.class_name + + invoke_state_api( + lambda res: len(res) == 0, + list_actors, + filters=[("state", "=", "ALIVE"), ("class_name", "=", actor_class_name)], + key_suffix="0", + limit=STATE_LIST_LIMIT, + ) + + actors = [ + TestActor.remote() for _ in tqdm.trange(num_actors, desc="Launching actors...") + ] + + waiting_actors = [actor.running.remote() for actor in actors] + print("Waiting for actors to finish...") + ray.get(waiting_actors) + + invoke_state_api_n( + lambda res: len(res) == num_actors, + list_actors, + filters=[("state", "=", "ALIVE"), ("class_name", "=", actor_class_name)], + key_suffix=f"{num_actors}", + limit=STATE_LIST_LIMIT, + ) + + exiting_actors = [actor.exit.remote() for actor in actors] + for _ in tqdm.trange(len(actors), desc="Destroying actors..."): + _exitted, exiting_actors = ray.wait(exiting_actors) + + invoke_state_api( + lambda res: len(res) == 0, + list_actors, + filters=[("state", "=", "ALIVE"), ("class_name", "=", actor_class_name)], + key_suffix="0", + limit=STATE_LIST_LIMIT, + ) + + +def test_many_objects(num_objects, num_actors): + if num_objects == 0: + print("Skipping test with no objects") + return + + @ray.remote(num_cpus=0.1) + class ObjectActor: + def __init__(self): + self.objs = [] + + def create_objs(self, num_objects): + import os + + for _ in range(num_objects): + # Object size shouldn't matter here. + self.objs.append(ray.put(bytearray(os.urandom(1024)))) + + return self.objs + + def exit(self): + ray.actor.exit_actor() + + actors = [ + ObjectActor.remote() for _ in tqdm.trange(num_actors, desc="Creating actors...") + ] + + # Splitting objects to multiple actors for creation, + # credit: https://stackoverflow.com/a/2135920 + def _split(a, n): + k, m = divmod(len(a), n) + return (a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)) + + num_objs_per_actor = [len(objs) for objs in _split(range(num_objects), num_actors)] + + waiting_actors = [ + actor.create_objs.remote(num_objs) + for actor, num_objs in zip(actors, num_objs_per_actor) + ] + + total_objs_created = 0 + for _ in tqdm.trange(num_actors, desc="Waiting actors to create objects..."): + objs, waiting_actors = ray.wait(waiting_actors) + total_objs_created += len(ray.get(*objs)) + + assert ( + total_objs_created == num_objects + ), "Expect correct number of objects created." + + invoke_state_api_n( + lambda res: len(res) == num_objects, + list_objects, + filters=[ + ("reference_type", "=", "LOCAL_REFERENCE"), + ("type", "=", "Worker"), + ], + key_suffix=f"{num_objects}", + limit=STATE_LIST_LIMIT, + ) + + exiting_actors = [actor.exit.remote() for actor in actors] + for _ in tqdm.trange(len(actors), desc="Destroying actors..."): + _exitted, exiting_actors = ray.wait(exiting_actors) + + +def test_large_log_file(log_file_size_byte: int): + if log_file_size_byte == 0: + print("Skipping test with 0 log file size") + return + + import sys + import string + import random + import hashlib + + @ray.remote + class LogActor: + def write_log(self, log_file_size_byte: int): + ctx = hashlib.md5() + prefix = f"{LOG_PREFIX_ACTOR_NAME}LogActor\n" + ctx.update(prefix.encode()) + while log_file_size_byte > 0: + n = min(log_file_size_byte, 4 * MiB) + chunk = "".join(random.choices(string.ascii_letters, k=n)) + sys.stdout.writelines([chunk]) + ctx.update(chunk.encode()) + log_file_size_byte -= n + + sys.stdout.flush() + return ctx.hexdigest(), ray.get_runtime_context().node_id.hex() + + actor = LogActor.remote() + expected_hash, node_id = ray.get( + actor.write_log.remote(log_file_size_byte=log_file_size_byte) + ) + assert expected_hash is not None, "Empty checksum from the log actor" + assert node_id is not None, "Empty node id from the log actor" + + # Retrieve the log and compare the checksum + ctx = hashlib.md5() + + time_taken = 0 + t_start = time.perf_counter() + for s in get_log(actor_id=actor._actor_id.hex(), tail=-1): + t_end = time.perf_counter() + time_taken += t_end - t_start + # Not including this time + ctx.update(s.encode()) + # Only time the iterator's performance + t_start = time.perf_counter() + + assert expected_hash == ctx.hexdigest(), "Mismatch log file" + + metric = StateAPIMetric(time_taken, log_file_size_byte) + GLOBAL_STATE_STATS.calls["get_log"].append(metric) + + +def _parse_input( + num_tasks_str: str, num_actors_str: str, num_objects_str: str, log_file_sizes: str +): + def _split_to_int(s): + tokens = s.split(",") + return [int(token) for token in tokens] + + return ( + _split_to_int(num_tasks_str), + _split_to_int(num_actors_str), + _split_to_int(num_objects_str), + _split_to_int(log_file_sizes), + ) + + +def no_resource_leaks(): + return test_utils.no_resource_leaks_excluding_node_resources() + + +@click.command() +@click.option( + "--num-tasks", + required=False, + default="1,100,1000,10000", + type=str, + help="Number of tasks to launch.", +) +@click.option( + "--num-actors", + required=False, + default="1,100,1000,5000", + type=str, + help="Number of actors to launch.", +) +@click.option( + "--num-objects", + required=False, + default="100,1000,10000,50000", + type=str, + help="Number of actors to launch.", +) +@click.option( + "--num-actors-for-objects", + required=False, + default=16, + type=int, + help="Number of actors to use for object creation.", +) +@click.option( + "--log-file-size-byte", + required=False, + default=f"{256*MiB},{1*GiB},{4*GiB}", + type=str, + help="Number of actors to launch.", +) +@click.option( + "--smoke-test", + is_flag=True, + type=bool, + default=False, + help="If set, it's a smoke test", +) +def test( + num_tasks, + num_actors, + num_objects, + num_actors_for_objects, + log_file_size_byte, + smoke_test, +): + ray.init(address="auto", log_to_driver=False) + + if smoke_test: + num_tasks = "100" + num_actors = "10" + num_objects = "100" + log_file_size_byte = f"{16*MiB}" + + # Parse the input + num_tasks_arr, num_actors_arr, num_objects_arr, log_file_size_arr = _parse_input( + num_tasks, num_actors, num_objects, log_file_size_byte + ) + + test_utils.wait_for_condition(no_resource_leaks) + monitor_actor = test_utils.monitor_memory_usage() + start_time = time.perf_counter() + # Run some long-running tasks + for n in num_tasks_arr: + print(f"\nRunning with many tasks={n}") + test_many_tasks(num_tasks=n) + print(f"\ntest_many_tasks({n}) PASS") + + # Run many actors + for n in num_actors_arr: + print(f"\nRunning with many actors={n}") + test_many_actors(num_actors=n) + print(f"\ntest_many_actors({n}) PASS") + + # Create many objects + for n in num_objects_arr: + print(f"\nRunning with many objects={n}") + test_many_objects(num_objects=n, num_actors=num_actors_for_objects) + print(f"\ntest_many_objects({n}) PASS") + + # Create large logs + for n in log_file_size_arr: + print(f"\nRunning with large file={n} bytes") + test_large_log_file(log_file_size_byte=n) + print(f"\ntest_large_log_file({n} bytes) PASS") + + print("\n\nPASS") + end_time = time.perf_counter() + + # Collect mem usage + ray.get(monitor_actor.stop_run.remote()) + used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) + print(f"Peak memory usage: {round(used_gb, 2)}GB") + print(f"Peak memory usage per processes:\n {usage}") + del monitor_actor + + state_perf_result = aggregate_perf_results() + results = { + "time": end_time - start_time, + "success": "1", + "_peak_memory": round(used_gb, 2), + "_peak_process_memory": usage, + "perf_metrics": [ + { + "perf_metric_name": "avg_state_api_latency_sec", + "perf_metric_value": state_perf_result["avg_state_api_latency_sec"], + "perf_metric_type": "LATENCY", + } + ], + } + if "TEST_OUTPUT_JSON" in os.environ: + out_file = open(os.environ["TEST_OUTPUT_JSON"], "w") + json.dump(results, out_file) + + results.update(state_perf_result) + print(json.dumps(results, indent=2)) + + +if __name__ == "__main__": + test() diff --git a/release/nightly_tests/stress_tests/test_state_api_with_other_tests.py b/release/nightly_tests/stress_tests/test_state_api_with_other_tests.py new file mode 100644 index 000000000..be8c396a7 --- /dev/null +++ b/release/nightly_tests/stress_tests/test_state_api_with_other_tests.py @@ -0,0 +1,215 @@ +import time +from typing import Dict, List +import click +import json +import os + +import ray + +from ray.experimental.state.api import ( + list_actors, + list_nodes, + list_objects, + list_tasks, + summarize_actors, + summarize_objects, + summarize_tasks, +) + +import ray._private.test_utils as test_utils + +from ray._private.state_api_test_utils import ( + StateAPICallSpec, + periodic_invoke_state_apis_with_actor, + STATE_LIST_LIMIT, +) + + +def download_release_test(test_file_path: str) -> None: + """Download the release test file from github. + + It is currently assumed individual release test is independent from each other, + and isolated in its own file. + + This always downloads the file into current working directory so that this + python script could invoke the release test w/o path imports. + + Args: + test_file_path: File path relevant to the `/release` folder. + + Return: + Basename (file name) of the test file path if download successfully. + """ + import urllib.request as rq + import urllib.parse as parse + + RAW_RAY_GITHUB_URL = ( + "https://raw.githubusercontent.com/ray-project/ray/master/release/" + ) + file_name = os.path.basename(test_file_path) + try: + rq.urlretrieve(parse.urljoin(RAW_RAY_GITHUB_URL, test_file_path), file_name) + return file_name + except Exception as e: + print(f"Failed to retrieve :{test_file_path} :\n{e}") + return None + + +def cleanup_release_test(test_file_name: str) -> bool: + try: + os.remove(test_file_name) + return True + except Exception as e: + print(f"Failed to remove file: {test_file_name}: \n{e}") + return False + + +def run_release_test_in_subprocess(test_file: str, args: List[str]) -> bool: + import subprocess as sp + + # Run the test in subprocess + cmds = ["python", test_file, *args] + + print(f"Running: {' '.join(cmds)}") + proc = None + try: + proc = sp.run(cmds, check=True, text=True, capture_output=True) + proc.check_returncode() + return True + except sp.CalledProcessError as e: + print(f"Failed to run :{' '.join(cmds)}") + print(e) + print(e.stdout) + print(e.stderr) + return False + + +def run_test(test_name: str, test_args: List[str]): + + monitor_actor = test_utils.monitor_memory_usage() + + start = time.perf_counter() + run_release_test_in_subprocess(test_name, test_args) + end = time.perf_counter() + # Collect mem usage + ray.get(monitor_actor.stop_run.remote()) + used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote()) + + results = { + "duration": end - start, + "peak_memory": round(used_gb, 2), + "peak_process_memory": usage, + } + return results + + +def run_test_with_state_api( + test_name: str, + test_args: List[str], + apis: List[StateAPICallSpec], + call_interval_s: int = 3, + print_interval_s: int = 15, +) -> Dict: + + start_time = time.perf_counter() + + # Stage 1: Run with state APIs + api_caller = periodic_invoke_state_apis_with_actor( + apis=apis, call_interval_s=call_interval_s, print_interval_s=print_interval_s + ) + + stats_with_state_apis = run_test(test_name, test_args) + ray.get(api_caller.stop.remote()) + print(json.dumps(ray.get(api_caller.get_stats.remote()), indent=2)) + + # Stage 2: Run without API generator + stats_without_state_apis = run_test(test_name, test_args) + end_time = time.perf_counter() + + # Dumping results + results = { + "time": end_time - start_time, + "success": "1", + "perf_metrics": [ + { + "perf_metric_name": "state_api_extra_latency_sec", + "perf_metric_value": stats_with_state_apis["duration"] + - stats_without_state_apis["duration"], + "perf_metric_type": "LATENCY", + }, + { + "perf_metric_name": "state_api_extra_latency_sec_percentage", + "perf_metric_value": ( + stats_with_state_apis["duration"] + / stats_without_state_apis["duration"] + - 1 + ) + * 100, + "perf_metric_type": "LATENCY", + }, + { + "perf_metric_name": "state_api_extra_mem", + "perf_metric_value": stats_with_state_apis["peak_memory"] + - stats_without_state_apis["peak_memory"], + "perf_metric_type": "MEMORY", + }, + ], + } + + return results + + +@click.command() +@click.argument( + "test_path", +) +@click.option( + "--test-args", + type=str, +) +@click.option( + "--call-interval-s", type=int, default=3, help="interval of state api calls" +) +def test( + test_path, + test_args, + call_interval_s, +): + + # Set up state API calling methods + def not_none(res): + return res is not None + + apis = [ + StateAPICallSpec(list_nodes, not_none, {"limit": STATE_LIST_LIMIT}), + StateAPICallSpec(list_objects, not_none, {"limit": STATE_LIST_LIMIT}), + StateAPICallSpec(list_tasks, not_none, {"limit": STATE_LIST_LIMIT}), + StateAPICallSpec(list_actors, not_none, {"limit": STATE_LIST_LIMIT}), + StateAPICallSpec(summarize_tasks, not_none), + StateAPICallSpec(summarize_actors, not_none), + StateAPICallSpec(summarize_objects, not_none), + ] + + # Set up benchmark test by downloading the release test file directly + test_name = download_release_test(test_path) + assert test_name is not None, f"Failed to retrieve release test: {test_path}" + + ray.init() + results = run_test_with_state_api( + test_name, + test_args.split(), + apis, + call_interval_s=call_interval_s, + ) + + if "TEST_OUTPUT_JSON" in os.environ: + # This will overwrite all other release tests result + out_file = open(os.environ["TEST_OUTPUT_JSON"], "w") + json.dump(results, out_file) + print(json.dumps(results, indent=2)) + + assert cleanup_release_test(test_name) + + +if __name__ == "__main__": + test() diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 5df023d34..5f67615f8 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3572,6 +3572,59 @@ wait_for_nodes: num_nodes: 5 +- name: stress_test_state_api_scale + group: core-daily-test + working_dir: nightly_tests + legacy: + test_name: stress_test_state_api_scale + test_suite: nightly_tests + stable: false + + frequency: nightly + team: core + cluster: + cluster_env: stress_tests/stress_tests_app_config.yaml + cluster_compute: stress_tests/stress_tests_compute.yaml + + run: + timeout: 3600 + script: python stress_tests/test_state_api_scale.py + type: sdk_command + file_manager: sdk + + smoke_test: + frequency: multi + cluster: + app_config: stress_tests/stress_tests_app_config.yaml + cluster_compute: stress_tests/smoke_test_compute.yaml + + run: + timeout: 3600 + script: python stress_tests/test_state_api_scale.py --smoke-test + + +- name: shuffle_20gb_with_state_api + group: core-daily-test + working_dir: nightly_tests + legacy: + test_name: shuffle_20gb_with_state_api + test_suite: nightly_tests + stable: false + + frequency: nightly + team: core + cluster: + cluster_env: shuffle/shuffle_app_config.yaml + cluster_compute: shuffle/shuffle_compute_single.yaml + + run: + timeout: 1000 + script: python stress_tests/test_state_api_with_other_tests.py + nightly_tests/shuffle/shuffle_test.py --test-args="--num-partitions=100 --partition-size=200e6" + + type: sdk_command + file_manager: sdk + - name: stress_test_many_tasks group: core-daily-test working_dir: nightly_tests