ray/benchmarks/single_node/test_single_node.py
Balaji Veeramani 7f1bacc7dc
[CI] Format Python code with Black (#21975)
See #21316 and #21311 for the motivation behind these changes.
2022-01-29 18:41:57 -08:00

209 lines
5.1 KiB
Python

import numpy as np
import time
import ray
import ray.autoscaler.sdk
from ray._private.test_utils import Semaphore
import json
import os
from time import perf_counter
from tqdm import trange, tqdm
MAX_ARGS = 10000
MAX_RETURNS = 3000
MAX_RAY_GET_ARGS = 10000
MAX_QUEUED_TASKS = 1_000_000
MAX_RAY_GET_SIZE = 100 * 2 ** 30
def assert_no_leaks():
total = ray.cluster_resources()
current = ray.available_resources()
total.pop("memory")
total.pop("object_store_memory")
current.pop("memory")
current.pop("object_store_memory")
assert total == current, (total, current)
def test_many_args():
@ray.remote
def sum_args(*args):
return sum(sum(arg) for arg in args)
args = [[1 for _ in range(10000)] for _ in range(MAX_ARGS)]
result = ray.get(sum_args.remote(*args))
assert result == MAX_ARGS * 10000
def test_many_returns():
@ray.remote(num_returns=MAX_RETURNS)
def f():
to_return = []
for _ in range(MAX_RETURNS):
obj = list(range(10000))
to_return.append(obj)
return tuple(to_return)
returned_refs = f.remote()
assert len(returned_refs) == MAX_RETURNS
for ref in returned_refs:
expected = list(range(10000))
obj = ray.get(ref)
assert obj == expected
def test_ray_get_args():
def with_dese():
print("Putting test objects:")
refs = []
for _ in trange(MAX_RAY_GET_ARGS):
obj = list(range(10000))
refs.append(ray.put(obj))
print("Getting objects")
results = ray.get(refs)
assert len(results) == MAX_RAY_GET_ARGS
print("Asserting correctness")
for obj in tqdm(results):
expected = list(range(10000))
assert obj == expected
def with_zero_copy():
print("Putting test objects:")
refs = []
for _ in trange(MAX_RAY_GET_ARGS):
obj = np.arange(10000)
refs.append(ray.put(obj))
print("Getting objects")
results = ray.get(refs)
assert len(results) == MAX_RAY_GET_ARGS
print("Asserting correctness")
for obj in tqdm(results):
expected = np.arange(10000)
assert (obj == expected).all()
with_dese()
print("Done with dese")
with_zero_copy()
print("Done with zero copy")
def test_many_queued_tasks():
sema = Semaphore.remote(0)
@ray.remote(num_cpus=1)
def block():
ray.get(sema.acquire.remote())
@ray.remote(num_cpus=1)
def f():
pass
num_cpus = int(ray.cluster_resources()["CPU"])
blocked_tasks = []
for _ in range(num_cpus):
blocked_tasks.append(block.remote())
print("Submitting many tasks")
pending_tasks = []
for _ in trange(MAX_QUEUED_TASKS):
pending_tasks.append(f.remote())
# Make sure all the tasks can actually run.
for _ in range(num_cpus):
sema.release.remote()
print("Unblocking tasks")
for ref in tqdm(pending_tasks):
assert ray.get(ref) is None
def test_large_object():
print("Generating object")
obj = np.zeros(MAX_RAY_GET_SIZE, dtype=np.int8)
print("Putting object")
ref = ray.put(obj)
del obj
print("Getting object")
big_obj = ray.get(ref)
assert big_obj[0] == 0
assert big_obj[-1] == 0
ray.init(address="auto")
args_start = perf_counter()
test_many_args()
args_end = perf_counter()
time.sleep(5)
assert_no_leaks()
print("Finished many args")
returns_start = perf_counter()
test_many_returns()
returns_end = perf_counter()
time.sleep(5)
assert_no_leaks()
print("Finished many returns")
get_start = perf_counter()
test_ray_get_args()
get_end = perf_counter()
time.sleep(5)
assert_no_leaks()
print("Finished ray.get on many objects")
queued_start = perf_counter()
test_many_queued_tasks()
queued_end = perf_counter()
time.sleep(5)
assert_no_leaks()
print("Finished queueing many tasks")
large_object_start = perf_counter()
test_large_object()
large_object_end = perf_counter()
time.sleep(5)
assert_no_leaks()
print("Done")
args_time = args_end - args_start
returns_time = returns_end - returns_start
get_time = get_end - get_start
queued_time = queued_end - queued_start
large_object_time = large_object_end - large_object_start
print(f"Many args time: {args_time} ({MAX_ARGS} args)")
print(f"Many returns time: {returns_time} ({MAX_RETURNS} returns)")
print(f"Ray.get time: {get_time} ({MAX_RAY_GET_ARGS} args)")
print(f"Queued task time: {queued_time} ({MAX_QUEUED_TASKS} tasks)")
print(f"Ray.get large object time: {large_object_time} " f"({MAX_RAY_GET_SIZE} bytes)")
if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
results = {
"args_time": args_time,
"num_args": MAX_ARGS,
"returns_time": returns_time,
"num_returns": MAX_RETURNS,
"get_time": get_time,
"num_get_args": MAX_RAY_GET_ARGS,
"queued_time": queued_time,
"num_queued": MAX_QUEUED_TASKS,
"large_object_time": large_object_time,
"large_object_size": MAX_RAY_GET_SIZE,
"success": "1",
}
json.dump(results, out_file)