Add a script for benchmarking performance for Ray developers. (#5472)

This commit is contained in:
Robert Nishihara 2019-08-19 23:41:24 -07:00 committed by Philipp Moritz
parent 99a2f9fab3
commit 851c5b2dae
16 changed files with 287 additions and 606 deletions

View file

@ -1,25 +0,0 @@
#!/usr/bin/env bash
# Show explicitly which commands are currently running.
set -ex
ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
pushd "$ROOT_DIR"
python -m pip install pytest-benchmark
pip install -U https://ray-wheels.s3-us-west-2.amazonaws.com/latest/ray-0.8.0.dev3-cp36-cp36m-manylinux1_x86_64.whl
python -m pytest --benchmark-autosave --benchmark-min-rounds=10 --benchmark-columns="min, max, mean" $ROOT_DIR/../../../python/ray/tests/perf_integration_tests/test_perf_integration.py
pushd $ROOT_DIR/../../../python
python -m pip install -e .
popd
python -m pytest --benchmark-compare --benchmark-min-rounds=10 --benchmark-compare-fail=min:5% --benchmark-columns="min, max, mean" $ROOT_DIR/../../../python/ray/tests/perf_integration_tests/test_perf_integration.py
# This is how Modin stores the values in an S3 bucket
#sha_tag=`git rev-parse --verify --short HEAD`
# save the results to S3
#aws s3 cp .benchmarks/*/*.json s3://modin-jenkins-result/${sha_tag}-perf-${BUCKET_SUFFIX}/ --acl public-read
#rm -rf .benchmarks

View file

@ -1,27 +0,0 @@
#!/usr/bin/env bash
# Cause the script to exit if a single command fails.
set -e
# Show explicitly which commands are currently running.
set -x
BUCKET_NAME=ray-integration-testing/ASV
COMMIT=$(cat /ray/git-rev)
ASV_RESULTS_DIR=/ray/python/ASV_RESULTS
pip install awscli
# Install Ray fork of ASV
git clone https://github.com/ray-project/asv.git /tmp/asv/ || true
cd /tmp/asv/
pip install -e .
cd /ray/python/
asv machine --machine jenkins
mkdir $ASV_RESULTS_DIR || true
aws s3 cp s3://$BUCKET_NAME/ASV_RESULTS/benchmarks.json $ASV_RESULTS_DIR/benchmarks.json || true
asv run --show-stderr --python=same --force-record-commit=$COMMIT
aws s3 cp $ASV_RESULTS_DIR/benchmarks.json s3://$BUCKET_NAME/ASV_RESULTS/benchmarks_$COMMIT.json
aws s3 sync $ASV_RESULTS_DIR/ s3://$BUCKET_NAME/ASV_RESULTS/

View file

@ -1,29 +0,0 @@
#!/usr/bin/env bash
# Cause the script to exit if a single command fails.
set -e
# Show explicitly which commands are currently running.
set -x
BUCKET_NAME=ray-integration-testing/ASV
COMMIT=$(cat /ray/git-rev)
RLLIB_RESULTS=RLLIB_RESULTS
RLLIB_RESULTS_DIR=/ray/python/ray/rllib/RLLIB_RESULTS
pip install awscli
# Install Ray fork of ASV
git clone https://github.com/ray-project/asv.git /tmp/asv/ || true
cd /tmp/asv/
pip install -e .
cd /ray/python/ray/rllib/
asv machine --machine jenkins
mkdir $RLLIB_RESULTS_DIR || true
aws s3 cp s3://$BUCKET_NAME/RLLIB_RESULTS/benchmarks.json $RLLIB_RESULTS_DIR/benchmarks.json || true
./tuned_examples/generate_regression_tests.py
asv run --show-stderr --python=same --force-record-commit=$COMMIT
aws s3 cp $RLLIB_RESULTS_DIR/benchmarks.json s3://$BUCKET_NAME/RLLIB_RESULTS/benchmarks_$COMMIT.json
aws s3 sync $RLLIB_RESULTS_DIR/ s3://$BUCKET_NAME/RLLIB_RESULTS/

0
ci/jenkins_tests/run_rllib_tests.sh Normal file → Executable file
View file

View file

@ -14,10 +14,10 @@ To run the workloads, first edit the config.yaml and replace
./start_workloads.sh
This will start one EC2 instance per workload and will start
the workloads running (one per instance). Running the ``./start_workloads.sh``
script again will clean up any state from the previous runs and will start the
workloads again.
This will start one EC2 instance per workload and will start the workloads
running (one per instance). Running the ``./start_workloads.sh`` script again
will clean up any state from the previous runs and will start the workloads
again.
Check Workload Statuses
-----------------------

View file

@ -0,0 +1,31 @@
Performance Tests
=================
This directory contains scripts for running performance benchmarks. These
benchmarks are intended to be used by Ray developers to check if a given pull
request introduces a performance regression.
To check if a pull request introduces a performance regression, it is necessary
to run these benchmarks on the codebase before and after the change.
Running the Workloads
---------------------
To run the workload on a single machine, do the following.
.. code-block:: bash
python test_performance.py --num-nodes=3
This will start simulate a 3 node cluster on your local machine, attach to it,
and run the benchmarks. To run the benchmarks on an existing cluster, do the
following.
.. code-block:: bash
python test_performance.py --num-nodes=3 --redis-address=<redis-address>
The ``--num-nodes`` flag must match the number of nodes in the cluster. The
nodes in the cluster must be configured with the appropriate resource labels. In
particular, the ith node in the cluster must have a resource named ``"i"``
with quantity ``500``.

View file

@ -0,0 +1,252 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import numpy as np
import time
import ray
from ray.tests.cluster_utils import Cluster
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser(
description="Parse arguments for running the performance tests.")
parser.add_argument(
"--num-nodes",
required=True,
type=int,
help="The number of nodes to simulate in the cluster.")
parser.add_argument(
"--skip-object-store-warmup",
default=False,
action="store_true",
help="True if the object store should not be warmed up. This could cause "
"the benchmarks to appear slower than usual.")
parser.add_argument(
"--redis-address",
required=False,
type=str,
help="The address of the cluster to connect to. If this is ommitted, then "
"a cluster will be started locally (on a single machine).")
def start_local_cluster(num_nodes, object_store_memory):
"""Start a local Ray cluster.
The ith node in the cluster will have a resource named "i".
Args:
num_nodes: The number of nodes to start in the cluster.
Returns:
The cluster object.
"""
num_redis_shards = 2
redis_max_memory = 10**8
cluster = Cluster()
for i in range(num_nodes):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=num_redis_shards if i == 0 else None,
num_cpus=8 if i == 0 else 2,
num_gpus=0,
resources={str(i): 500},
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory)
ray.init(redis_address=cluster.redis_address)
return cluster
def wait_for_and_check_cluster_configuration(num_nodes):
"""Check that the cluster's custom resources are properly configured.
The ith node should have a resource labeled 'i' with quantity 500.
Args:
num_nodes: The number of nodes that we expect to be in the cluster.
Raises:
RuntimeError: This exception is raised if the cluster is not configured
properly for this test.
"""
logger.warning("Waiting for cluster to have %s nodes.", num_nodes)
while True:
nodes = ray.nodes()
if len(nodes) == num_nodes:
break
if len(nodes) > num_nodes:
raise RuntimeError(
"The cluster has %s nodes, but it should "
"only have %s.", len(nodes), num_nodes)
if not ([set(node["Resources"].keys())
for node in ray.nodes()] == [{str(i), "CPU"}
for i in range(num_nodes)]):
raise RuntimeError(
"The ith node in the cluster should have a "
"custom resource called 'i' with quantity "
"500. The nodes are\n%s", ray.nodes())
if not ([[
resource_quantity
for resource_name, resource_quantity in node["Resources"].items()
if resource_name != "CPU"
] for node in ray.nodes()] == num_nodes * [[500.0]]):
raise RuntimeError(
"The ith node in the cluster should have a "
"custom resource called 'i' with quantity "
"500. The nodes are\n%s", ray.nodes())
for node in ray.nodes():
if ("0" in node["Resources"] and node["ObjectStoreSocketName"] !=
ray.worker.global_worker.plasma_client.store_socket_name):
raise RuntimeError("The node that this driver is connected to "
"must have a custom resource labeled '0'.")
@ray.remote
def create_array(size):
return np.zeros(shape=size, dtype=np.uint8)
@ray.remote
def no_op(*values):
# The reason that this function takes *values is so that we can pass in
# an arbitrary number of object IDs to create task dependencies.
return 1
@ray.remote
class Actor():
def ping(self, *values):
pass
def warm_up_cluster(num_nodes, object_store_memory):
"""Warm up the cluster.
This will allocate enough objects in each object store to cause eviction
because the first time a driver or worker touches a region of memory in the
object store, it may be slower.
Note that remote functions are exported lazily, so the first invocation of
a given remote function will be slower.
"""
logger.warning("Warming up the object store.")
size = object_store_memory * 2 // 5
num_objects = 2
while size > 0:
object_ids = []
for i in range(num_nodes):
for _ in range(num_objects):
object_ids += [
create_array._remote(args=[size], resources={str(i): 1})
]
size = size // 2
num_objects = min(num_objects * 2, 1000)
for object_id in object_ids:
ray.get(object_id)
logger.warning("Finished warming up the object store.")
# Invoke all of the remote functions once so that the definitions are
# broadcast to the workers.
ray.get(no_op.remote())
ray.get(Actor.remote().ping.remote())
def run_multiple_trials(f, num_trials):
durations = []
for _ in range(num_trials):
start = time.time()
f()
durations.append(time.time() - start)
return durations
def test_tasks(num_nodes):
def one_thousand_serial_tasks_local_node():
for _ in range(1000):
ray.get(no_op._remote(resources={"0": 1}))
durations = run_multiple_trials(one_thousand_serial_tasks_local_node, 10)
logger.warning(
"one_thousand_serial_tasks_local_node \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
def one_thousand_serial_tasks_remote_node():
for _ in range(1000):
ray.get(no_op._remote(resources={"1": 1}))
durations = run_multiple_trials(one_thousand_serial_tasks_remote_node, 10)
logger.warning(
"one_thousand_serial_tasks_remote_node \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
def ten_thousand_parallel_tasks_local():
ray.get([no_op._remote(resources={"0": 1}) for _ in range(10000)])
durations = run_multiple_trials(ten_thousand_parallel_tasks_local, 5)
logger.warning(
"ten_thousand_parallel_tasks_local \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
def ten_thousand_parallel_tasks_load_balanced():
ray.get([
no_op._remote(resources={str(i % num_nodes): 1})
for i in range(10000)
])
durations = run_multiple_trials(ten_thousand_parallel_tasks_load_balanced,
5)
logger.warning(
"ten_thousand_parallel_tasks_load_balanced \n"
" min: %.2gs\n"
" mean: %.2gs\n"
" std: %.2gs", np.min(durations), np.mean(durations),
np.std(durations))
if __name__ == "__main__":
args = parser.parse_args()
num_nodes = args.num_nodes
object_store_memory = 10**8
# Configure the cluster or check that it is properly configured.
if num_nodes < 2:
raise ValueError("The --num-nodes argument must be at least 2.")
if args.redis_address:
ray.init(redis_address=args.redis_address)
wait_for_and_check_cluster_configuration(num_nodes)
logger.warning(
"Running performance benchmarks on the cluster with "
"address %s.", args.redis_address)
else:
logger.warning(
"Running performance benchmarks on a simulated cluster "
"of %s nodes.", num_nodes)
cluster = start_local_cluster(num_nodes, object_store_memory)
if not args.skip_object_store_warmup:
warm_up_cluster(num_nodes, object_store_memory)
# Run the benchmarks.
test_tasks(num_nodes)
# TODO(rkn): Test actors, test object transfers, test tasks with many
# dependencies.

View file

@ -1,38 +0,0 @@
Running the benchmarks
======================
You can run the benchmark suite by doing the following:
1. Install https://github.com/ray-project/asv: ``cd asv; pip install -e .``
2. Run ``asv dev`` in this directory.
To run ASV inside docker, you can use the following command:
``docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA bash -c '/ray/test/jenkins_tests/run_asv.sh'``
``docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA bash -c '/ray/test/jenkins_tests/run_rllib_asv.sh'``
Visualizing Benchmarks
======================
For visualizing regular Ray benchmarks, you must copy the S3 bucket down to `$RAY_DIR/python`.
.. code-block::
cd $RAY_DIR/python
aws s3 sync s3://$BUCKET/ASV/ .
For rllib, you must sync a _particular_ folder down to `$RLLIB_DIR (ray/python/ray/rllib)`.
.. code-block::
cd $RAY_DIR/python/ray/rllib
aws s3 sync s3://$BUCKET/RLLIB_RESULTS/ ./RLLIB_RESULTS
Then, in the directory, you can run:
.. code-block::
asv publish --no-pull
asv preview
This creates the directory and then launches a server at which you can visualize results.

View file

@ -1,85 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
NUM_WORKERS = 4
def setup():
if not hasattr(setup, "is_initialized"):
ray.init(num_cpus=4)
setup.is_initialized = True
@ray.remote
class MyActor(object):
def __init__(self):
self.x = None
def get_x(self):
return self.x
def set_x(self, x):
self.x = x
class ActorInstantiationSuite(object):
def instantiate_actor(self):
actor = MyActor.remote()
# Block to make sure actor is instantiated
ray.get(actor.get_x.remote())
def instantiate_many_actors(self):
actors = [MyActor.remote() for _ in range(NUM_WORKERS + 10)]
ray.get([actor.get_x.remote() for actor in actors])
def time_instantiate_actor(self):
self.instantiate_actor()
def peakmem_instantiate_actor(self):
self.instantiate_actor()
def time_instantiate_many_actors(self):
self.instantiate_many_actors()
def peakmem_instantiate_many_actors(self):
self.instantiate_many_actors()
class ActorMethodSuite(object):
def setup(self):
self.actor = MyActor.remote()
# Block to make sure actor is instantiated
ray.get(self.actor.get_x.remote())
def time_call_method(self):
ray.get(self.actor.get_x.remote())
def peakmem_call_method(self):
ray.get(self.actor.get_x.remote())
class ActorCheckpointSuite(object):
def checkpoint_and_restore(self):
actor = MyActor.remote()
actor.__ray_checkpoint__.remote()
assert ray.get(actor.__ray_checkpoint_restore__.remote())
def save_checkpoint(self):
actor = MyActor.remote()
checkpoint = ray.get(actor.__ray_save_checkpoint__.remote())
return checkpoint
def time_checkpoint_and_restore(self):
self.checkpoint_and_restore()
def peakmem_checkpoint_and_restore(self):
self.checkpoint_and_restore()
def time_save_checkpoint(self):
self.save_checkpoint()
def mem_save_checkpoint(self):
return self.save_checkpoint()

View file

@ -1,108 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import ray
def setup():
if not hasattr(setup, "is_initialized"):
ray.init(num_cpus=4)
setup.is_initialized = True
def square(x):
return x * x
class Foo(object):
def bar(self):
return 42
class GetBase(object):
def setup(self):
self.oid = ray.put(None)
def time_get(self):
ray.get(self.oid)
def peakmem_get(self):
ray.get(self.oid)
class GetBoolSuite(GetBase):
def setup(self):
self.oid = ray.put(True)
class GetIntSuite(GetBase):
def setup(self):
self.oid = ray.put(42)
class GetFloatSuite(GetBase):
def setup(self):
self.oid = ray.put(4.2)
class GetComplexSuite(GetBase):
def setup(self):
self.oid = ray.put(4 + 2j)
class GetNoneSuite(GetBase):
def setup(self):
self.oid = ray.put(None)
class GetStringSuite(GetBase):
def setup(self):
self.oid = ray.put("forty-two")
class GetBytesSuite(GetBase):
def setup(self):
self.oid = ray.put(b"forty-two")
class GetListSuite(GetBase):
def setup(self):
self.oid = ray.put([i for i in range(100)])
class GetSetSuite(GetBase):
def setup(self):
self.oid = ray.put({i for i in range(100)})
class GetTupleSuite(GetBase):
def setup(self):
self.oid = ray.put(tuple(range(100)))
class GetDictSuite(GetBase):
def setup(self):
self.oid = ray.put({i: i for i in range(100)})
class GetFunctionSuite(GetBase):
def setup(self):
self.oid = ray.put(square)
class GetClassSuite(GetBase):
def setup(self):
self.oid = ray.put(Foo)
class GetClassInstanceSuite(GetBase):
def setup(self):
self.oid = ray.put(Foo())
class GetArraySuite(GetBase):
def setup(self):
self.oid = ray.put(np.random.random((100, 100, 100)))

View file

@ -1,108 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import ray
def setup():
if not hasattr(setup, "is_initialized"):
ray.init(num_cpus=0)
setup.is_initialized = True
def square(x):
return x * x
class Foo(object):
def bar(self):
return 42
class PutBase(object):
def setup(self):
self.object = None
def time_put(self):
ray.put(self.object)
def peakmem_put(self):
ray.put(self.object)
class PutBoolSuite(PutBase):
def setup(self):
self.object = True
class PutIntSuite(PutBase):
def setup(self):
self.object = 42
class PutFloatSuite(PutBase):
def setup(self):
self.object = 4.2
class PutComplexSuite(PutBase):
def setup(self):
self.object = 4 + 2j
class PutNoneSuite(PutBase):
def setup(self):
self.object = None
class PutStringSuite(PutBase):
def setup(self):
self.object = "forty-two"
class PutBytesSuite(PutBase):
def setup(self):
self.object = b"forty-two"
class PutListSuite(PutBase):
def setup(self):
self.object = [i for i in range(100)]
class PutSetSuite(PutBase):
def setup(self):
self.object = {i for i in range(100)}
class PutTupleSuite(PutBase):
def setup(self):
self.object = tuple(range(100))
class PutDictSuite(PutBase):
def setup(self):
self.object = {i: i for i in range(100)}
class PutFunctionSuite(PutBase):
def setup(self):
self.object = square
class PutClassSuite(PutBase):
def setup(self):
self.object = Foo
class PutClassInstanceSuite(PutBase):
def setup(self):
self.object = Foo()
class PutArraySuite(PutBase):
def setup(self):
self.object = np.random.random((100, 100, 100))

View file

@ -1,31 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
from ray.experimental.queue import Queue
def setup():
if not hasattr(setup, "is_initialized"):
ray.init(num_cpus=4)
setup.is_initialized = True
class QueueSuite(object):
def time_put(self):
queue = Queue(1000)
for i in range(1000):
queue.put(i)
def time_get(self):
queue = Queue()
for i in range(1000):
queue.put(i)
for _ in range(1000):
queue.get()
def time_qsize(self):
queue = Queue()
for _ in range(1000):
queue.qsize()

View file

@ -1,61 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
def setup():
if not hasattr(setup, "is_initialized"):
ray.init(num_cpus=10, resources={"foo": 1})
setup.is_initialized = True
def square(x):
return x * x
class TaskSuite(object):
timeout = 10
def setup(self):
self.square = ray.remote(square)
def run_many_tasks(self):
ray.get([self.square.remote(i) for i in range(100)])
def run_task_dependency(self):
first_oid = self.square.remote(2)
second_oid = self.square.remote(first_oid)
ray.get(second_oid)
def time_submit_task(self):
self.square.remote(2)
def time_task_lifecycle(self):
ray.get(self.square.remote(2))
def peakmem_task_lifecycle(self):
ray.get(self.square.remote(2))
def time_run_many_tasks(self):
self.run_many_tasks()
def peakmem_run_many_tasks(self):
self.run_many_tasks()
def time_task_dependency(self):
self.run_task_dependency()
def peakmem_task_dependency(self):
self.run_task_dependency()
class CPUTaskSuite(TaskSuite):
def setup(self):
self.square = ray.remote(num_cpus=1)(square)
class CustomResourceTaskSuite(TaskSuite):
def setup(self):
self.square = ray.remote(resources={"foo": 1})(square)

View file

@ -1,39 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
import ray
def setup(*args):
if not hasattr(setup, "is_initialized"):
ray.init(num_cpus=4)
setup.is_initialized = True
@ray.remote
def sleep(x):
time.sleep(x)
class WaitSuite(object):
timeout = 0.01
timer = time.time
def time_wait_task(self):
ray.wait([sleep.remote(0.1)])
def time_wait_many_tasks(self, num_returns):
tasks = [sleep.remote(i / 5) for i in range(4)]
ray.wait(tasks, num_returns=num_returns)
time_wait_many_tasks.params = list(range(1, 4))
time_wait_many_tasks.param_names = ["num_returns"]
def time_wait_timeout(self, timeout):
ray.wait([sleep.remote(0.5)], timeout=timeout)
time_wait_timeout.params = [0.2, 0.8]
time_wait_timeout.param_names = ["timeout"]

View file

@ -1,51 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
def setup():
if not hasattr(setup, "is_initialized"):
ray.init(num_cpus=4)
setup.is_initialized = True
@ray.remote
def trivial_function():
return 1
class TimeSuite(object):
"""An example benchmark."""
def setup(self):
self.d = {}
for x in range(500):
self.d[x] = None
def time_keys(self):
for key in self.d.keys():
pass
def time_range(self):
d = self.d
for key in range(500):
d[key]
class MemSuite(object):
def mem_list(self):
return [0] * 256
class MicroBenchmarkSuite(object):
def time_submit(self):
trivial_function.remote()
def time_submit_and_get(self):
x = trivial_function.remote()
ray.get(x)
def time_put(self):
ray.put(1)