diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 000000000..216715165 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,35 @@ +# Ray Scalability Envelope + +### Note: This document is a WIP. This is not a scalability guarantee (yet). + +## Distributed Benchmarks + +All distributed tests are run on 64 nodes with 64 cores/node. Maximum number of nodes is achieved by adding 4 core nodes. + +| Dimension | Quantity | +| --------- | -------- | +| # nodes in cluster (with trivial task workload) | 250+ | +| # actors in cluster (with trivial workload) | 10k+ | +| # simultaneously running tasks | 10k+ | +| # simultaneously running placement groups | 1k+ | + +## Object Store Benchmarks + +| Dimension | Quantity | +| --------- | -------- | +| 1 GiB object broadcast (# of nodes) | 50+ | + + +## Single Node Benchmarks. + +All single node benchmarks are run on a single m4.16xlarge. + +| Dimension | Quantity | +| --------- | -------- | +| # of object artuments to a single task | 10000+ | +| # of objects returned from a single task | 3000+ | +| # of plasma objects in a single `ray.get` call | 10000+ | +| # of tasks queued on a single node | 1,000,000+ | +| Maximum `ray.get` numpy object size | 100GiB+ | + + diff --git a/benchmarks/distributed/config.yaml b/benchmarks/distributed/config.yaml new file mode 100644 index 000000000..630de0eef --- /dev/null +++ b/benchmarks/distributed/config.yaml @@ -0,0 +1,58 @@ +cluster_name: distributed-benchmarks +min_workers: 0 +max_workers: 999999 + +upscaling_speed: 9999999 + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a, us-west-2b, us-west-2c, us-west-2d + +auth: + ssh_user: ubuntu + +available_node_types: + head_node: + node_config: + InstanceType: m5.16xlarge + ImageId: ami-098555c9b343eb09c + resources: + node: 1 + small: 1 + max_workers: 999999 + worker_node: + node_config: + InstanceType: m5.16xlarge + ImageId: ami-098555c9b343eb09c + resources: + node: 1 + min_workers: 63 + max_workers: 63 + small_worker_node: + node_config: + InstanceType: m5.xlarge + ImageId: ami-098555c9b343eb09c + resources: + node: 1 + max_workers: 999999 + +head_node_type: head_node + +worker_default_node_type: worker_node + +setup_commands: + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl + - pip install tqdm + - sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 65535" >> /etc/security/limits.conf; echo "* hard nofile 65535" >> /etc/security/limits.conf;' + +idle_timeout_minutes: 1 + +head_start_ray_commands: + - ray stop + - ulimit -n 65535; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 65535; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/benchmarks/distributed/test_distributed.py b/benchmarks/distributed/test_distributed.py new file mode 100644 index 000000000..c929cdba8 --- /dev/null +++ b/benchmarks/distributed/test_distributed.py @@ -0,0 +1,204 @@ +import ray +import ray.autoscaler.sdk +from ray.test_utils import Semaphore +from ray.util.placement_group import placement_group, remove_placement_group + +from time import sleep, perf_counter +from tqdm import tqdm, trange + +TEST_NUM_NODES = 64 +MAX_ACTORS_IN_CLUSTER = 10000 +MAX_RUNNING_TASKS_IN_CLUSTER = 10000 +MAX_PLACEMENT_GROUPS = 1000 +MAX_NUM_NODES = 250 + + +def num_alive_nodes(): + n = 0 + for node in ray.nodes(): + if node["Alive"]: + n += 1 + return n + + +def scale_to(target): + while num_alive_nodes() != target: + ray.autoscaler.sdk.request_resources(bundles=[{"node": 1}] * target) + print(f"Current # nodes: {num_alive_nodes()}, target: {target}") + print("Waiting ...") + sleep(5) + + +def test_nodes(): + scale_to(MAX_NUM_NODES) + assert num_alive_nodes() == MAX_NUM_NODES + # Treat this as a trivial task to ensure the nodes are all functioning + test_max_running_tasks() + + +def test_max_actors(): + # TODO (Alex): Dynamically set this based on number of cores + cpus_per_actor = 0.25 + + @ray.remote(num_cpus=cpus_per_actor) + class Actor: + def foo(self): + pass + + actors = [ + Actor.remote() + for _ in trange(MAX_ACTORS_IN_CLUSTER, desc="Launching actors") + ] + + for actor in tqdm(actors, desc="Ensuring actors have started"): + assert ray.get(actor.foo.remote()) is None + + +def test_max_running_tasks(): + counter = Semaphore.remote(0) + blocker = Semaphore.remote(0) + + @ray.remote(num_cpus=0.25) + def task(counter, blocker): + sleep(300) + + refs = [ + task.remote(counter, blocker) + for _ in trange(MAX_RUNNING_TASKS_IN_CLUSTER, desc="Launching tasks") + ] + + max_cpus = ray.cluster_resources()["CPU"] + min_cpus_available = max_cpus + for _ in trange(int(300 / 0.1), desc="Waiting"): + try: + cur_cpus = ray.available_resources().get("CPU", 0) + min_cpus_available = min(min_cpus_available, cur_cpus) + except Exception: + # There are race conditions `.get` can fail if a new heartbeat + # comes at the same time. + pass + sleep(0.1) + + # There are some relevant magic numbers in this check. 10k tasks each + # require 1/4 cpus. Therefore, ideally 2.5k cpus will be used. + err_str = f"Only {max_cpus - min_cpus_available}/{max_cpus} cpus used." + assert max_cpus - min_cpus_available > 2000, err_str + + for _ in trange( + MAX_RUNNING_TASKS_IN_CLUSTER, + desc="Ensuring all tasks have finished"): + done, refs = ray.wait(refs) + assert ray.get(done[0]) is None + + +def test_many_placement_groups(): + @ray.remote(num_cpus=1, resources={"node": 0.02}) + def f1(): + sleep(10) + pass + + @ray.remote(num_cpus=1) + def f2(): + sleep(10) + pass + + @ray.remote(resources={"node": 0.02}) + def f3(): + sleep(10) + pass + + bundle1 = {"node": 0.02, "CPU": 1} + bundle2 = {"CPU": 1} + bundle3 = {"node": 0.02} + + pgs = [] + for _ in trange(MAX_PLACEMENT_GROUPS, desc="Creating pgs"): + pg = placement_group(bundles=[bundle1, bundle2, bundle3]) + pgs.append(pg) + + for pg in tqdm(pgs, desc="Waiting for pgs to be ready"): + ray.get(pg.ready()) + + refs = [] + for pg in tqdm(pgs, desc="Scheduling tasks"): + ref1 = f1.options(placement_group=pg).remote() + ref2 = f2.options(placement_group=pg).remote() + ref3 = f3.options(placement_group=pg).remote() + refs.extend([ref1, ref2, ref3]) + + for _ in trange(10, desc="Waiting"): + sleep(1) + + with tqdm() as p_bar: + while refs: + done, refs = ray.wait(refs) + p_bar.update() + + for pg in tqdm(pgs, desc="Cleaning up pgs"): + remove_placement_group(pg) + + +ray.init(address="auto") + +scale_to(TEST_NUM_NODES) +assert num_alive_nodes( +) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) + +cluster_resources = ray.cluster_resources() + +available_resources = ray.available_resources() +assert available_resources == cluster_resources, ( + str(available_resources) + " != " + str(cluster_resources)) +print("Done launching nodes") + +actor_start = perf_counter() +test_max_actors() +actor_end = perf_counter() + +sleep(1) +assert num_alive_nodes( +) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) +assert available_resources == cluster_resources, ( + str(available_resources) + " != " + str(cluster_resources)) +print("Done testing actors") + +task_start = perf_counter() +test_max_running_tasks() +task_end = perf_counter() + +sleep(1) +assert num_alive_nodes( +) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) +assert available_resources == cluster_resources, ( + str(available_resources) + " != " + str(cluster_resources)) +print("Done testing tasks") + +pg_start = perf_counter() +test_many_placement_groups() +pg_end = perf_counter() + +sleep(1) +assert num_alive_nodes( +) == TEST_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) +assert available_resources == cluster_resources, ( + str(available_resources) + " != " + str(cluster_resources)) +print("Done testing placement groups") + +launch_start = perf_counter() +test_nodes() +launch_end = perf_counter() + +sleep(1) +assert num_alive_nodes( +) == MAX_NUM_NODES, "Wrong number of nodes in cluster " + len(ray.nodes()) +print("Done.") + +actor_time = actor_end - actor_start +task_time = task_end - task_start +pg_time = pg_end - pg_start +launch_time = launch_end - launch_start + +print(f"Actor time: {actor_time} ({MAX_ACTORS_IN_CLUSTER} actors)") +print(f"Task time: {task_time} ({MAX_RUNNING_TASKS_IN_CLUSTER} tasks)") +print(f"PG time: {pg_time} ({MAX_PLACEMENT_GROUPS} placement groups)") +print(f"Node launch time: {launch_time} ({MAX_NUM_NODES} nodes)") diff --git a/benchmarks/object_store/config.yaml b/benchmarks/object_store/config.yaml new file mode 100644 index 000000000..5ea3ce835 --- /dev/null +++ b/benchmarks/object_store/config.yaml @@ -0,0 +1,48 @@ +cluster_name: object-store-benchmarks +min_workers: 0 +max_workers: 999999 + +upscaling_speed: 9999999 + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + +auth: + ssh_user: ubuntu + +available_node_types: + head_node: + node_config: + InstanceType: m4.4xlarge + ImageId: ami-098555c9b343eb09c + resources: + node: 1 + max_workers: 999999 + worker_node: + node_config: + InstanceType: m4.xlarge + ImageId: ami-098555c9b343eb09c + resources: + node: 1 + max_workers: 999999 + +head_node_type: head_node + +worker_default_node_type: worker_node + +setup_commands: + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-1.2.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl + - pip install tqdm numpy + +idle_timeout_minutes: 5 + +head_start_ray_commands: + - ray stop + - ulimit -n 1000000; ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml + +# Command to start ray on worker nodes. You don't need to change this. +worker_start_ray_commands: + - ray stop + - ulimit -n 1000000; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 diff --git a/benchmarks/object_store/test_object_store.py b/benchmarks/object_store/test_object_store.py new file mode 100644 index 000000000..83312fddd --- /dev/null +++ b/benchmarks/object_store/test_object_store.py @@ -0,0 +1,61 @@ +import numpy as np + +import ray +import ray.autoscaler.sdk + +from time import sleep, perf_counter +from tqdm import tqdm + +NUM_NODES = 50 +OBJECT_SIZE = 2**30 + + +def num_alive_nodes(): + n = 0 + for node in ray.nodes(): + if node["Alive"]: + n += 1 + return n + + +def scale_to(target): + while num_alive_nodes() != target: + ray.autoscaler.sdk.request_resources(bundles=[{"node": 1}] * target) + print(f"Current # nodes: {num_alive_nodes()}, target: {target}") + print("Waiting ...") + sleep(5) + + +def test_object_broadcast(): + scale_to(NUM_NODES) + + @ray.remote(num_cpus=1, resources={"node": 1}) + class Actor: + def foo(self): + pass + + def sum(self, arr): + return np.sum(arr) + + actors = [Actor.remote() for _ in range(NUM_NODES)] + + arr = np.ones(OBJECT_SIZE, dtype=np.uint8) + ref = ray.put(arr) + + for actor in tqdm(actors, desc="Ensure all actors have started."): + ray.get(actor.foo.remote()) + + result_refs = [] + for actor in tqdm(actors, desc="Broadcasting objects"): + result_refs.append(actor.sum.remote(ref)) + + results = ray.get(result_refs) + for result in results: + assert result == OBJECT_SIZE + + +ray.init(address="auto") +start = perf_counter() +test_object_broadcast() +end = perf_counter() +print(f"Broadcast time: {end - start} ({OBJECT_SIZE} B x {NUM_NODES} nodes)") diff --git a/benchmarks/single_node/config.yaml b/benchmarks/single_node/config.yaml new file mode 100644 index 000000000..e5798541f --- /dev/null +++ b/benchmarks/single_node/config.yaml @@ -0,0 +1,41 @@ +cluster_name: single-node-benchmarks +min_workers: 0 +max_workers: 0 + +upscaling_speed: 9999999 + +provider: + type: aws + region: us-west-2 + availability_zone: us-west-2a + +auth: + ssh_user: ubuntu + +available_node_types: + head_node: + node_config: + InstanceType: m4.16xlarge + ImageId: ami-098555c9b343eb09c + resources: + node: 1 + max_workers: 999999 + worker_node: + node_config: + InstanceType: m4.xlarge + ImageId: ami-098555c9b343eb09c + +head_node_type: head_node + +worker_default_node_type: worker_node + +setup_commands: + - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-1.2.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl + - pip install numpy tqdm + - sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1000000" >> /etc/security/limits.conf; echo "* hard nofile 1000000" >> /etc/security/limits.conf;' + +idle_timeout_minutes: 5 + +head_start_ray_commands: + - ray stop + - ulimit -n 1000000; ray start --head --port=6379 --object-manager-port=8076 --object-store-memory=128000000000 --autoscaling-config=~/ray_bootstrap_config.yaml diff --git a/benchmarks/single_node/test_single_node.py b/benchmarks/single_node/test_single_node.py new file mode 100644 index 000000000..75d783124 --- /dev/null +++ b/benchmarks/single_node/test_single_node.py @@ -0,0 +1,175 @@ +import numpy as np +import ray +import ray.autoscaler.sdk +from ray.test_utils import Semaphore + +from time import perf_counter +from tqdm import trange, tqdm + +MAX_ARGS = 10000 +MAX_RETURNS = 3000 +MAX_RAY_GET_ARGS = 10000 +MAX_QUEUED_TASKS = 1_000_000 +MAX_RAY_GET_SIZE = 100 * 2**30 + + +def test_many_args(): + @ray.remote + def sum_args(*args): + return sum(sum(arg) for arg in args) + + args = [[1 for _ in range(10000)] for _ in range(MAX_ARGS)] + result = ray.get(sum_args.remote(*args)) + assert result == MAX_ARGS * 10000 + + +def test_many_returns(): + @ray.remote(num_returns=MAX_RETURNS) + def f(): + to_return = [] + for _ in range(MAX_RETURNS): + obj = list(range(10000)) + to_return.append(obj) + + return tuple(to_return) + + returned_refs = f.remote() + assert len(returned_refs) == MAX_RETURNS + + for ref in returned_refs: + expected = list(range(10000)) + obj = ray.get(ref) + assert obj == expected + + +def test_ray_get_args(): + def with_dese(): + print("Putting test objects:") + refs = [] + for _ in trange(MAX_RAY_GET_ARGS): + obj = list(range(10000)) + refs.append(ray.put(obj)) + + print("Getting objects") + results = ray.get(refs) + assert len(results) == MAX_RAY_GET_ARGS + + print("Asserting correctness") + for obj in tqdm(results): + expected = list(range(10000)) + assert obj == expected + + def with_zero_copy(): + print("Putting test objects:") + refs = [] + for _ in trange(MAX_RAY_GET_ARGS): + obj = np.arange(10000) + refs.append(ray.put(obj)) + + print("Getting objects") + results = ray.get(refs) + assert len(results) == MAX_RAY_GET_ARGS + + print("Asserting correctness") + for obj in tqdm(results): + expected = np.arange(10000) + assert (obj == expected).all() + + with_dese() + print("Done with dese") + with_zero_copy() + print("Done with zero copy") + + +def test_many_queued_tasks(): + sema = Semaphore.remote(0) + + @ray.remote(num_cpus=1) + def block(): + ray.get(sema.acquire.remote()) + + @ray.remote(num_cpus=1) + def f(): + pass + + num_cpus = int(ray.cluster_resources()["CPU"]) + blocked_tasks = [] + for _ in range(num_cpus): + blocked_tasks.append(block.remote()) + + print("Submitting many tasks") + pending_tasks = [] + for _ in trange(MAX_QUEUED_TASKS): + pending_tasks.append(f.remote()) + + # Make sure all the tasks can actually run. + for _ in range(num_cpus): + sema.release.remote() + + print("Unblocking tasks") + for ref in tqdm(pending_tasks): + assert ray.get(ref) is None + + +def test_large_object(): + print("Generating object") + obj = np.zeros(MAX_RAY_GET_SIZE, dtype=np.int8) + print("Putting object") + ref = ray.put(obj) + del obj + print("Getting object") + big_obj = ray.get(ref) + + assert big_obj[0] == 0 + assert big_obj[-1] == 0 + + +ray.init(address="auto") + +args_start = perf_counter() +test_many_args() +args_end = perf_counter() + +assert ray.cluster_resources() == ray.available_resources() +print("Finished many args") + +returns_start = perf_counter() +test_many_returns() +returns_end = perf_counter() + +assert ray.cluster_resources() == ray.available_resources() +print("Finished many returns") + +get_start = perf_counter() +test_ray_get_args() +get_end = perf_counter() + +assert ray.cluster_resources() == ray.available_resources() +print("Finished ray.get on many objects") + +queued_start = perf_counter() +test_many_queued_tasks() +queued_end = perf_counter() + +assert ray.cluster_resources() == ray.available_resources() +print("Finished queueing many tasks") + +large_object_start = perf_counter() +test_large_object() +large_object_end = perf_counter() + +assert ray.cluster_resources() == ray.available_resources() +print("Done") + +args_time = args_end - args_start +returns_time = returns_end - returns_start +get_time = get_end - get_start +queued_time = queued_end - queued_start +large_object_time = large_object_end - large_object_start + +print(f"Many args time: {args_time} ({MAX_ARGS} args)") +print(f"Many returns time: {returns_time} ({MAX_RETURNS} returns)") +print(f"Ray.get time: {get_time} ({MAX_RAY_GET_ARGS} args)") +print(f"Queued task time: {queued_time} ({MAX_QUEUED_TASKS} tasks)") +print(f"Ray.get large object time: {large_object_time} " + f"({MAX_RAY_GET_SIZE} bytes)") diff --git a/release/RELEASE_PROCESS.rst b/release/RELEASE_PROCESS.rst index c60e1c4aa..018f56bdf 100644 --- a/release/RELEASE_PROCESS.rst +++ b/release/RELEASE_PROCESS.rst @@ -134,7 +134,13 @@ is generally the easiest way to run release tests. The summaries printed by each test should be checked in under ``release_logs/`` on the **master** branch (make a pull request). -5. **ASAN tests** +5. **Scalability envelope tests** + + - Run the tests in `benchmarks/` (with `ray submit --start cluster.yaml `) + - Record the outputted times. + - Whether the results are acceptable is a judgement call. + +6. **ASAN tests** Run the ``ci/asan_tests`` with the commit. This will enable ASAN build and run the whole Python tests to detect memory leaks.