ray/ci/performance_tests/test_performance.py
2019-10-10 19:09:25 -07:00

252 lines
8 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import numpy as np
import time
import ray
from ray.tests.cluster_utils import Cluster
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser(
description="Parse arguments for running the performance tests.")
parser.add_argument(
"--num-nodes",
required=True,
type=int,
help="The number of nodes to simulate in the cluster.")
parser.add_argument(
"--skip-object-store-warmup",
default=False,
action="store_true",
help="True if the object store should not be warmed up. This could cause "
"the benchmarks to appear slower than usual.")
parser.add_argument(
"--address",
required=False,
type=str,
help="The address of the cluster to connect to. If this is ommitted, then "
"a cluster will be started locally (on a single machine).")
def start_local_cluster(num_nodes, object_store_memory):
"""Start a local Ray cluster.
The ith node in the cluster will have a resource named "i".
Args:
num_nodes: The number of nodes to start in the cluster.
Returns:
The cluster object.
"""
num_redis_shards = 2
redis_max_memory = 10**8
cluster = Cluster()
for i in range(num_nodes):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=num_redis_shards if i == 0 else None,
num_cpus=8 if i == 0 else 2,
num_gpus=0,
resources={str(i): 500},
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory)
ray.init(address=cluster.address)
return cluster
def wait_for_and_check_cluster_configuration(num_nodes):
"""Check that the cluster's custom resources are properly configured.
The ith node should have a resource labeled 'i' with quantity 500.
Args:
num_nodes: The number of nodes that we expect to be in the cluster.
Raises:
RuntimeError: This exception is raised if the cluster is not configured
properly for this test.
"""
logger.warning("Waiting for cluster to have %s nodes.", num_nodes)
while True:
nodes = ray.nodes()
if len(nodes) == num_nodes:
break
if len(nodes) > num_nodes:
raise RuntimeError(
"The cluster has %s nodes, but it should "
"only have %s.", len(nodes), num_nodes)
if not ([set(node["Resources"].keys())
for node in ray.nodes()] == [{str(i), "CPU"}
for i in range(num_nodes)]):
raise RuntimeError(
"The ith node in the cluster should have a "
"custom resource called 'i' with quantity "
"500. The nodes are\n%s", ray.nodes())
if not ([[
resource_quantity
for resource_name, resource_quantity in node["Resources"].items()
if resource_name != "CPU"
] for node in ray.nodes()] == num_nodes * [[500.0]]):
raise RuntimeError(
"The ith node in the cluster should have a "
"custom resource called 'i' with quantity "
"500. The nodes are\n%s", ray.nodes())
for node in ray.nodes():
if ("0" in node["Resources"] and node["ObjectStoreSocketName"] !=
ray.worker.global_worker.plasma_client.store_socket_name):
raise RuntimeError("The node that this driver is connected to "
"must have a custom resource labeled '0'.")
@ray.remote
def create_array(size):
return np.zeros(shape=size, dtype=np.uint8)
@ray.remote
def no_op(*values):
# The reason that this function takes *values is so that we can pass in
# an arbitrary number of object IDs to create task dependencies.
return 1
@ray.remote
class Actor(object):
def ping(self, *values):
pass
def warm_up_cluster(num_nodes, object_store_memory):
"""Warm up the cluster.
This will allocate enough objects in each object store to cause eviction
because the first time a driver or worker touches a region of memory in the
object store, it may be slower.
Note that remote functions are exported lazily, so the first invocation of
a given remote function will be slower.
"""
logger.warning("Warming up the object store.")
size = object_store_memory * 2 // 5
num_objects = 2
while size > 0:
object_ids = []
for i in range(num_nodes):
for _ in range(num_objects):
object_ids += [
create_array._remote(args=[size], resources={str(i): 1})
]
size = size // 2
num_objects = min(num_objects * 2, 1000)
for object_id in object_ids:
ray.get(object_id)
logger.warning("Finished warming up the object store.")
# Invoke all of the remote functions once so that the definitions are
# broadcast to the workers.
ray.get(no_op.remote())
ray.get(Actor.remote().ping.remote())
def run_multiple_trials(f, num_trials):
durations = []
for _ in range(num_trials):
start = time.time()
f()
durations.append(time.time() - start)
return durations
def test_tasks(num_nodes):
def one_thousand_serial_tasks_local_node():
for _ in range(1000):
ray.get(no_op._remote(resources={"0": 1}))
durations = run_multiple_trials(one_thousand_serial_tasks_local_node, 10)
logger.warning(
"one_thousand_serial_tasks_local_node \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
def one_thousand_serial_tasks_remote_node():
for _ in range(1000):
ray.get(no_op._remote(resources={"1": 1}))
durations = run_multiple_trials(one_thousand_serial_tasks_remote_node, 10)
logger.warning(
"one_thousand_serial_tasks_remote_node \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
def ten_thousand_parallel_tasks_local():
ray.get([no_op._remote(resources={"0": 1}) for _ in range(10000)])
durations = run_multiple_trials(ten_thousand_parallel_tasks_local, 5)
logger.warning(
"ten_thousand_parallel_tasks_local \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
def ten_thousand_parallel_tasks_load_balanced():
ray.get([
no_op._remote(resources={str(i % num_nodes): 1})
for i in range(10000)
])
durations = run_multiple_trials(ten_thousand_parallel_tasks_load_balanced,
5)
logger.warning(
"ten_thousand_parallel_tasks_load_balanced \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
if __name__ == "__main__":
args = parser.parse_args()
num_nodes = args.num_nodes
object_store_memory = 10**8
# Configure the cluster or check that it is properly configured.
if num_nodes < 2:
raise ValueError("The --num-nodes argument must be at least 2.")
if args.address:
ray.init(address=args.address)
wait_for_and_check_cluster_configuration(num_nodes)
logger.warning(
"Running performance benchmarks on the cluster with "
"address %s.", args.address)
else:
logger.warning(
"Running performance benchmarks on a simulated cluster "
"of %s nodes.", num_nodes)
cluster = start_local_cluster(num_nodes, object_store_memory)
if not args.skip_object_store_warmup:
warm_up_cluster(num_nodes, object_store_memory)
# Run the benchmarks.
test_tasks(num_nodes)
# TODO(rkn): Test actors, test object transfers, test tasks with many
# dependencies.