ray/release/nightly_tests/stress_tests/test_threaded_actors.py

163 lines
5.1 KiB
Python

import argparse
import json
import logging
import numpy as np
import os
import threading
import time
from ray._private.test_utils import monitor_memory_usage
from ray.data._internal.progress_bar import ProgressBar
from collections import namedtuple
from queue import Queue
import ray
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
PiResult = namedtuple("PiResult", ["samples", "pi"])
@ray.remote(num_cpus=0)
class PiCalculator:
def __init__(self, metadata):
# -- Read only variables --
self.metadata = metadata
self.sample_batch = 1000000
# -- Variables that are accessed by mulitple threads --
self.lock = threading.Lock()
self.result_queue = Queue()
self.is_running = False
def ready(self):
pass
def run_compute(self):
self.is_running = True
sample_cnt = 0
while self.is_running:
# Compute pi
xs = np.random.uniform(low=-1.0, high=1.0, size=self.sample_batch)
ys = np.random.uniform(low=-1.0, high=1.0, size=self.sample_batch)
xys = np.stack((xs, ys), axis=-1)
inside = xs * xs + ys * ys <= 1.0
xys_inside = xys[inside]
in_circle = xys_inside.shape[0]
approx_pi = 4.0 * in_circle / self.sample_batch
# Put the result to the queue.
sample_cnt += self.sample_batch
with self.lock:
self.result_queue.put(PiResult(samples=sample_cnt, pi=approx_pi))
def stop(self):
self.is_running = False
def get_metadata(self):
return self.metadata
def get_pi(self):
result = None
while not result:
with self.lock:
if not self.result_queue.empty():
result = self.result_queue.get(block=False)
time.sleep(1)
return result
def start_actors(total_num_actors, num_nodes):
"""Create actors and run the computation loop."""
total_num_actors = int(total_num_actors)
actors_per_node = int(total_num_actors / num_nodes)
start = time.time()
nodes = []
# Place an actor per node in round-robin.
# It is added here to simulate the real user workload.
while len(nodes) < num_nodes:
nodes = [
next((r for r in n["Resources"] if "node" in r), None)
for n in ray.nodes()
if n["Alive"]
]
nodes = [n for n in nodes if n is not None]
pi_actors = [
PiCalculator.options(resources={n: 0.01}, max_concurrency=10).remote(
{"meta": 1}
)
for n in nodes
for _ in range(actors_per_node)
]
ray.get([actor.ready.remote() for actor in pi_actors])
print(f"Took {time.time() - start} to create {total_num_actors} actors")
# Start the computation loop.
for actor in pi_actors:
actor.run_compute.remote()
return pi_actors
def parse_script_args():
parser = argparse.ArgumentParser()
parser.add_argument("--kill-interval_s", type=float, default=60)
parser.add_argument("--test-runtime", type=float, default=3000)
return parser.parse_known_args()
def main():
"""The test simulates the workload with many threaded actors.
Test is doing 4 things for 1 hour.
- It first creates actors as many as num_cpus with max_concurrency=10
- Each actor computes pi and put the result to the queue.
- Driver keeps getting result & metadata from the actor.
- Every X seconds, it kills all actors and restarts them.
"""
ray.init(address="auto")
args, unknown = parse_script_args()
num_cpus = ray.cluster_resources()["CPU"]
num_nodes = sum(1 for n in ray.nodes() if n["Alive"])
print(f"Total number of actors: {num_cpus}, nodes: {num_nodes}")
monitor_actor = monitor_memory_usage()
start = time.time()
while time.time() - start < args.test_runtime:
# Step 1: Create actors and start computation loop.
print("Create actors.")
actors = start_actors(num_cpus, num_nodes)
# Step 2: Get the pi result from actors.
compute_start = time.time()
print("Start computation.")
while time.time() - compute_start < args.kill_interval_s:
# Get the metadata.
ray.get([actor.get_metadata.remote() for actor in actors])
# Get the result.
pb = ProgressBar("Computing Pi", num_cpus)
results = [actor.get_pi.remote() for actor in actors]
pb.fetch_until_complete(results)
pb.close()
# Step 3: Kill actors.
print("Kill all actors.")
for actor in actors:
ray.kill(actor)
# Report the result.
print("PASSED.")
used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
print("Memory usage with failures.")
print(f"Peak memory usage: {round(used_gb, 2)}GB")
print(f"Peak memory usage per processes:\n {usage}")
# Report the result.
ray.get(monitor_actor.stop_run.remote())
result = {"success": 0}
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(json.dumps(result))
if __name__ == "__main__":
main()