mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
164 lines
5.1 KiB
Python
164 lines
5.1 KiB
Python
import argparse
|
|
import json
|
|
import logging
|
|
import numpy as np
|
|
import os
|
|
import threading
|
|
import time
|
|
|
|
from ray._private.test_utils import monitor_memory_usage
|
|
from ray.data.impl.progress_bar import ProgressBar
|
|
|
|
from collections import namedtuple
|
|
from queue import Queue
|
|
|
|
import ray
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PiResult = namedtuple("PiResult", ["samples", "pi"])
|
|
|
|
|
|
@ray.remote(num_cpus=0)
|
|
class PiCalculator:
|
|
def __init__(self, metadata):
|
|
# -- Read only variables --
|
|
self.metadata = metadata
|
|
self.sample_batch = 1000000
|
|
# -- Variables that are accessed by mulitple threads --
|
|
self.lock = threading.Lock()
|
|
self.result_queue = Queue()
|
|
self.is_running = False
|
|
|
|
def ready(self):
|
|
pass
|
|
|
|
def run_compute(self):
|
|
self.is_running = True
|
|
sample_cnt = 0
|
|
while self.is_running:
|
|
# Compute pi
|
|
xs = np.random.uniform(low=-1.0, high=1.0, size=self.sample_batch)
|
|
ys = np.random.uniform(low=-1.0, high=1.0, size=self.sample_batch)
|
|
xys = np.stack((xs, ys), axis=-1)
|
|
inside = xs * xs + ys * ys <= 1.0
|
|
xys_inside = xys[inside]
|
|
in_circle = xys_inside.shape[0]
|
|
approx_pi = 4.0 * in_circle / self.sample_batch
|
|
|
|
# Put the result to the queue.
|
|
sample_cnt += self.sample_batch
|
|
with self.lock:
|
|
self.result_queue.put(
|
|
PiResult(samples=sample_cnt, pi=approx_pi))
|
|
|
|
def stop(self):
|
|
self.is_running = False
|
|
|
|
def get_metadata(self):
|
|
return self.metadata
|
|
|
|
def get_pi(self):
|
|
result = None
|
|
while not result:
|
|
with self.lock:
|
|
if not self.result_queue.empty():
|
|
result = self.result_queue.get(block=False)
|
|
time.sleep(1)
|
|
return result
|
|
|
|
|
|
def start_actors(total_num_actors, num_nodes):
|
|
"""Create actors and run the computation loop.
|
|
"""
|
|
total_num_actors = int(total_num_actors)
|
|
actors_per_node = int(total_num_actors / num_nodes)
|
|
start = time.time()
|
|
nodes = []
|
|
# Place an actor per node in round-robin.
|
|
# It is added here to simulate the real user workload.
|
|
while len(nodes) < num_nodes:
|
|
nodes = [
|
|
next((r for r in n["Resources"] if "node" in r), None)
|
|
for n in ray.nodes() if n["Alive"]
|
|
]
|
|
nodes = [n for n in nodes if n is not None]
|
|
pi_actors = [
|
|
PiCalculator.options(resources={
|
|
n: 0.01
|
|
}, max_concurrency=10).remote({
|
|
"meta": 1
|
|
}) for n in nodes for _ in range(actors_per_node)
|
|
]
|
|
ray.get([actor.ready.remote() for actor in pi_actors])
|
|
print(f"Took {time.time() - start} to create {total_num_actors} actors")
|
|
# Start the computation loop.
|
|
for actor in pi_actors:
|
|
actor.run_compute.remote()
|
|
return pi_actors
|
|
|
|
|
|
def parse_script_args():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--kill-interval_s", type=float, default=60)
|
|
parser.add_argument("--test-runtime", type=float, default=3000)
|
|
return parser.parse_known_args()
|
|
|
|
|
|
def main():
|
|
"""The test simulates the workload with many threaded actors.
|
|
|
|
Test is doing 4 things for 1 hour.
|
|
|
|
- It first creates actors as many as num_cpus with max_concurrency=10
|
|
- Each actor computes pi and put the result to the queue.
|
|
- Driver keeps getting result & metadata from the actor.
|
|
- Every X seconds, it kills all actors and restarts them.
|
|
"""
|
|
ray.init(address="auto")
|
|
args, unknown = parse_script_args()
|
|
num_cpus = ray.cluster_resources()["CPU"]
|
|
num_nodes = sum(1 for n in ray.nodes() if n["Alive"])
|
|
print(f"Total number of actors: {num_cpus}, nodes: {num_nodes}")
|
|
monitor_actor = monitor_memory_usage()
|
|
|
|
start = time.time()
|
|
while time.time() - start < args.test_runtime:
|
|
# Step 1: Create actors and start computation loop.
|
|
print("Create actors.")
|
|
actors = start_actors(num_cpus, num_nodes)
|
|
|
|
# Step 2: Get the pi result from actors.
|
|
compute_start = time.time()
|
|
print("Start computation.")
|
|
while time.time() - compute_start < args.kill_interval_s:
|
|
# Get the metadata.
|
|
ray.get([actor.get_metadata.remote() for actor in actors])
|
|
# Get the result.
|
|
pb = ProgressBar("Computing Pi", num_cpus)
|
|
results = [actor.get_pi.remote() for actor in actors]
|
|
pb.fetch_until_complete(results)
|
|
pb.close()
|
|
|
|
# Step 3: Kill actors.
|
|
print("Kill all actors.")
|
|
for actor in actors:
|
|
ray.kill(actor)
|
|
|
|
# Report the result.
|
|
print("PASSED.")
|
|
used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
|
|
print("Memory usage with failures.")
|
|
print(f"Peak memory usage: {round(used_gb, 2)}GB")
|
|
print(f"Peak memory usage per processes:\n {usage}")
|
|
# Report the result.
|
|
ray.get(monitor_actor.stop_run.remote())
|
|
|
|
result = {"success": 0}
|
|
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
|
|
f.write(json.dumps(result))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|