ray/release/nightly_tests/stress_tests/test_state_api_scale.py
Ricky Xu 5ea4747448
[Core][State Observability] Nightly release test for state API (#26610)
* Initial

* Correctness test skeleton

* Added limit for listing

* Updated grpc config

* no more waiting

* metrics

* Updated constant and add test

* renamed

* actors

* actors

* actors

* dada

* actor dead?

* Script

* correct test name

* limit

* Added timeout

* release test /2

* Merged

* format+doc

* wip

Signed-off-by: rickyyx <ricky@anyscale.com>

* revert packag-lock

Signed-off-by: rickyyx <rickyx@anyscale.com>

* wip

* results

Signed-off-by: rickyx <rickyx@anyscale.com>

Signed-off-by: rickyyx <rickyx@anyscale.com>
Signed-off-by: rickyyx <ricky@anyscale.com>
Signed-off-by: rickyx <rickyx@anyscale.com>
Co-authored-by: rickyyx <ricky@anyscale.com>
2022-08-11 07:01:01 -07:00

423 lines
12 KiB
Python

import click
import json
import ray
from ray._private.ray_constants import LOG_PREFIX_ACTOR_NAME
from ray._private.state_api_test_utils import (
STATE_LIST_LIMIT,
StateAPIMetric,
aggregate_perf_results,
invoke_state_api,
GLOBAL_STATE_STATS,
)
import ray._private.test_utils as test_utils
import tqdm
import asyncio
import time
import os
from ray.experimental.state.api import (
get_log,
list_actors,
list_objects,
list_tasks,
)
GiB = 1024 * 1024 * 1024
MiB = 1024 * 1024
# We set num_cpus to zero because this actor will mostly just block on I/O.
@ray.remote(num_cpus=0)
class SignalActor:
def __init__(self):
self.ready_event = asyncio.Event()
def send(self, clear=False):
self.ready_event.set()
if clear:
self.ready_event.clear()
async def wait(self, should_wait=True):
if should_wait:
await self.ready_event.wait()
def invoke_state_api_n(*args, **kwargs):
NUM_API_CALL_SAMPLES = 10
for _ in range(NUM_API_CALL_SAMPLES):
invoke_state_api(*args, **kwargs)
def test_many_tasks(num_tasks: int):
if num_tasks == 0:
print("Skipping test with no tasks")
return
# No running tasks
invoke_state_api(
lambda res: len(res) == 0,
list_tasks,
filters=[("name", "=", "pi4_sample()"), ("scheduling_state", "=", "RUNNING")],
key_suffix="0",
limit=STATE_LIST_LIMIT,
)
# Task definition adopted from:
# https://docs.ray.io/en/master/ray-core/examples/highly_parallel.html
from random import random
SAMPLES = 100
@ray.remote
def pi4_sample(signal):
in_count = 0
for _ in range(SAMPLES):
x, y = random(), random()
if x * x + y * y <= 1:
in_count += 1
# Block on signal
ray.get(signal.wait.remote())
return in_count
results = []
signal = SignalActor.remote()
for _ in tqdm.trange(num_tasks, desc="Launching tasks"):
results.append(pi4_sample.remote(signal))
invoke_state_api_n(
lambda res: len(res) == num_tasks,
list_tasks,
filters=[("name", "=", "pi4_sample()")],
key_suffix=f"{num_tasks}",
limit=STATE_LIST_LIMIT,
)
print("Waiting for tasks to finish...")
ray.get(signal.send.remote())
ray.get(results)
# Clean up
# All compute tasks done other than the signal actor
invoke_state_api(
lambda res: len(res) == 0,
list_tasks,
filters=[("name", "=", "pi4_sample()"), ("scheduling_state", "=", "RUNNING")],
key_suffix="0",
limit=STATE_LIST_LIMIT,
)
del signal
def test_many_actors(num_actors: int):
if num_actors == 0:
print("Skipping test with no actors")
return
@ray.remote
class TestActor:
def running(self):
return True
def exit(self):
ray.actor.exit_actor()
actor_class_name = TestActor.__ray_metadata__.class_name
invoke_state_api(
lambda res: len(res) == 0,
list_actors,
filters=[("state", "=", "ALIVE"), ("class_name", "=", actor_class_name)],
key_suffix="0",
limit=STATE_LIST_LIMIT,
)
actors = [
TestActor.remote() for _ in tqdm.trange(num_actors, desc="Launching actors...")
]
waiting_actors = [actor.running.remote() for actor in actors]
print("Waiting for actors to finish...")
ray.get(waiting_actors)
invoke_state_api_n(
lambda res: len(res) == num_actors,
list_actors,
filters=[("state", "=", "ALIVE"), ("class_name", "=", actor_class_name)],
key_suffix=f"{num_actors}",
limit=STATE_LIST_LIMIT,
)
exiting_actors = [actor.exit.remote() for actor in actors]
for _ in tqdm.trange(len(actors), desc="Destroying actors..."):
_exitted, exiting_actors = ray.wait(exiting_actors)
invoke_state_api(
lambda res: len(res) == 0,
list_actors,
filters=[("state", "=", "ALIVE"), ("class_name", "=", actor_class_name)],
key_suffix="0",
limit=STATE_LIST_LIMIT,
)
def test_many_objects(num_objects, num_actors):
if num_objects == 0:
print("Skipping test with no objects")
return
@ray.remote(num_cpus=0.1)
class ObjectActor:
def __init__(self):
self.objs = []
def create_objs(self, num_objects):
import os
for _ in range(num_objects):
# Object size shouldn't matter here.
self.objs.append(ray.put(bytearray(os.urandom(1024))))
return self.objs
def exit(self):
ray.actor.exit_actor()
actors = [
ObjectActor.remote() for _ in tqdm.trange(num_actors, desc="Creating actors...")
]
# Splitting objects to multiple actors for creation,
# credit: https://stackoverflow.com/a/2135920
def _split(a, n):
k, m = divmod(len(a), n)
return (a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n))
num_objs_per_actor = [len(objs) for objs in _split(range(num_objects), num_actors)]
waiting_actors = [
actor.create_objs.remote(num_objs)
for actor, num_objs in zip(actors, num_objs_per_actor)
]
total_objs_created = 0
for _ in tqdm.trange(num_actors, desc="Waiting actors to create objects..."):
objs, waiting_actors = ray.wait(waiting_actors)
total_objs_created += len(ray.get(*objs))
assert (
total_objs_created == num_objects
), "Expect correct number of objects created."
invoke_state_api_n(
lambda res: len(res) == num_objects,
list_objects,
filters=[
("reference_type", "=", "LOCAL_REFERENCE"),
("type", "=", "Worker"),
],
key_suffix=f"{num_objects}",
limit=STATE_LIST_LIMIT,
)
exiting_actors = [actor.exit.remote() for actor in actors]
for _ in tqdm.trange(len(actors), desc="Destroying actors..."):
_exitted, exiting_actors = ray.wait(exiting_actors)
def test_large_log_file(log_file_size_byte: int):
if log_file_size_byte == 0:
print("Skipping test with 0 log file size")
return
import sys
import string
import random
import hashlib
@ray.remote
class LogActor:
def write_log(self, log_file_size_byte: int):
ctx = hashlib.md5()
prefix = f"{LOG_PREFIX_ACTOR_NAME}LogActor\n"
ctx.update(prefix.encode())
while log_file_size_byte > 0:
n = min(log_file_size_byte, 4 * MiB)
chunk = "".join(random.choices(string.ascii_letters, k=n))
sys.stdout.writelines([chunk])
ctx.update(chunk.encode())
log_file_size_byte -= n
sys.stdout.flush()
return ctx.hexdigest(), ray.get_runtime_context().node_id.hex()
actor = LogActor.remote()
expected_hash, node_id = ray.get(
actor.write_log.remote(log_file_size_byte=log_file_size_byte)
)
assert expected_hash is not None, "Empty checksum from the log actor"
assert node_id is not None, "Empty node id from the log actor"
# Retrieve the log and compare the checksum
ctx = hashlib.md5()
time_taken = 0
t_start = time.perf_counter()
for s in get_log(actor_id=actor._actor_id.hex(), tail=-1):
t_end = time.perf_counter()
time_taken += t_end - t_start
# Not including this time
ctx.update(s.encode())
# Only time the iterator's performance
t_start = time.perf_counter()
assert expected_hash == ctx.hexdigest(), "Mismatch log file"
metric = StateAPIMetric(time_taken, log_file_size_byte)
GLOBAL_STATE_STATS.calls["get_log"].append(metric)
def _parse_input(
num_tasks_str: str, num_actors_str: str, num_objects_str: str, log_file_sizes: str
):
def _split_to_int(s):
tokens = s.split(",")
return [int(token) for token in tokens]
return (
_split_to_int(num_tasks_str),
_split_to_int(num_actors_str),
_split_to_int(num_objects_str),
_split_to_int(log_file_sizes),
)
def no_resource_leaks():
return test_utils.no_resource_leaks_excluding_node_resources()
@click.command()
@click.option(
"--num-tasks",
required=False,
default="1,100,1000,10000",
type=str,
help="Number of tasks to launch.",
)
@click.option(
"--num-actors",
required=False,
default="1,100,1000,5000",
type=str,
help="Number of actors to launch.",
)
@click.option(
"--num-objects",
required=False,
default="100,1000,10000,50000",
type=str,
help="Number of actors to launch.",
)
@click.option(
"--num-actors-for-objects",
required=False,
default=16,
type=int,
help="Number of actors to use for object creation.",
)
@click.option(
"--log-file-size-byte",
required=False,
default=f"{256*MiB},{1*GiB},{4*GiB}",
type=str,
help="Number of actors to launch.",
)
@click.option(
"--smoke-test",
is_flag=True,
type=bool,
default=False,
help="If set, it's a smoke test",
)
def test(
num_tasks,
num_actors,
num_objects,
num_actors_for_objects,
log_file_size_byte,
smoke_test,
):
ray.init(address="auto", log_to_driver=False)
if smoke_test:
num_tasks = "100"
num_actors = "10"
num_objects = "100"
log_file_size_byte = f"{16*MiB}"
# Parse the input
num_tasks_arr, num_actors_arr, num_objects_arr, log_file_size_arr = _parse_input(
num_tasks, num_actors, num_objects, log_file_size_byte
)
test_utils.wait_for_condition(no_resource_leaks)
monitor_actor = test_utils.monitor_memory_usage()
start_time = time.perf_counter()
# Run some long-running tasks
for n in num_tasks_arr:
print(f"\nRunning with many tasks={n}")
test_many_tasks(num_tasks=n)
print(f"\ntest_many_tasks({n}) PASS")
# Run many actors
for n in num_actors_arr:
print(f"\nRunning with many actors={n}")
test_many_actors(num_actors=n)
print(f"\ntest_many_actors({n}) PASS")
# Create many objects
for n in num_objects_arr:
print(f"\nRunning with many objects={n}")
test_many_objects(num_objects=n, num_actors=num_actors_for_objects)
print(f"\ntest_many_objects({n}) PASS")
# Create large logs
for n in log_file_size_arr:
print(f"\nRunning with large file={n} bytes")
test_large_log_file(log_file_size_byte=n)
print(f"\ntest_large_log_file({n} bytes) PASS")
print("\n\nPASS")
end_time = time.perf_counter()
# Collect mem usage
ray.get(monitor_actor.stop_run.remote())
used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
print(f"Peak memory usage: {round(used_gb, 2)}GB")
print(f"Peak memory usage per processes:\n {usage}")
del monitor_actor
state_perf_result = aggregate_perf_results()
results = {
"time": end_time - start_time,
"success": "1",
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage,
"perf_metrics": [
{
"perf_metric_name": "avg_state_api_latency_sec",
"perf_metric_value": state_perf_result["avg_state_api_latency_sec"],
"perf_metric_type": "LATENCY",
}
],
}
if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
json.dump(results, out_file)
results.update(state_perf_result)
print(json.dumps(results, indent=2))
if __name__ == "__main__":
test()