ray/benchmarks/distributed/test_many_tasks.py
SangBin Cho 2c2d96eeb1
[Nightly tests] Improve k8s testing (#23108)
This PR improves broken k8s tests.

Use exponential backoff on the unstable HTTP path (getting job status sometimes has broken connection from the server. I couldn't really find the relevant logs to figure out why this is happening, unfortunately).
Fix benchmark tests resource leak check. The existing one was broken because the job submission uses 0.001 node IP resource, which means the cluster_resources can never be the same as available resources. I fixed the issue by not checking node IP resources
K8s infra doesn't support instances < 8 CPUs. I used m5.2xlarge instead of xlarge. It will increase the cost a bit, but it wouldn't be very big.
2022-03-14 03:49:15 -07:00

85 lines
2.6 KiB
Python

import click
import json
import os
import ray
import ray._private.test_utils as test_utils
import time
import tqdm
sleep_time = 300
def test_max_running_tasks(num_tasks):
cpus_per_task = 0.25
@ray.remote(num_cpus=cpus_per_task)
def task():
time.sleep(sleep_time)
refs = [task.remote() for _ in tqdm.trange(num_tasks, desc="Launching tasks")]
max_cpus = ray.cluster_resources()["CPU"]
min_cpus_available = max_cpus
for _ in tqdm.trange(int(sleep_time / 0.1), desc="Waiting"):
try:
cur_cpus = ray.available_resources().get("CPU", 0)
min_cpus_available = min(min_cpus_available, cur_cpus)
except Exception:
# There are race conditions `.get` can fail if a new heartbeat
# comes at the same time.
pass
time.sleep(0.1)
# There are some relevant magic numbers in this check. 10k tasks each
# require 1/4 cpus. Therefore, ideally 2.5k cpus will be used.
err_str = f"Only {max_cpus - min_cpus_available}/{max_cpus} cpus used."
threshold = num_tasks * cpus_per_task * 0.75
assert max_cpus - min_cpus_available > threshold, err_str
for _ in tqdm.trange(num_tasks, desc="Ensuring all tasks have finished"):
done, refs = ray.wait(refs)
assert ray.get(done[0]) is None
def no_resource_leaks():
return test_utils.no_resource_leaks_excluding_node_resources()
@click.command()
@click.option("--num-tasks", required=True, type=int, help="Number of tasks to launch.")
def test(num_tasks):
ray.init(address="auto")
test_utils.wait_for_condition(no_resource_leaks)
monitor_actor = test_utils.monitor_memory_usage()
start_time = time.time()
test_max_running_tasks(num_tasks)
end_time = time.time()
ray.get(monitor_actor.stop_run.remote())
used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
print(f"Peak memory usage: {round(used_gb, 2)}GB")
print(f"Peak memory usage per processes:\n {usage}")
del monitor_actor
test_utils.wait_for_condition(no_resource_leaks)
rate = num_tasks / (end_time - start_time - sleep_time)
print(
f"Success! Started {num_tasks} tasks in {end_time - start_time}s. "
f"({rate} tasks/s)"
)
if "TEST_OUTPUT_JSON" in os.environ:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
results = {
"tasks_per_second": rate,
"num_tasks": num_tasks,
"time": end_time - start_time,
"success": "1",
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage,
}
json.dump(results, out_file)
if __name__ == "__main__":
test()