diff --git a/release/nightly_tests/nightly_tests.yaml b/release/nightly_tests/nightly_tests.yaml index ee176c191..ddf94f0c5 100644 --- a/release/nightly_tests/nightly_tests.yaml +++ b/release/nightly_tests/nightly_tests.yaml @@ -192,7 +192,12 @@ smoke_test: cluster: - compute_template: shuffle/shuffle_compute_smoke.yaml # Does not exist yet + app_config: stress_tests/stress_tests_app_config.yaml + compute_template: stress_tests/smoke_test_compute.yaml + + run: + timeout: 3600 + script: python stress_tests/test_many_tasks.py --num-nodes=4 --smoke-test # Stress tests with dead actors - name: stress_test_dead_actors @@ -206,7 +211,12 @@ smoke_test: cluster: - compute_template: shuffle/shuffle_compute_smoke.yaml # Does not exist yet + app_config: stress_tests/stress_tests_app_config.yaml + compute_template: stress_tests/smoke_test_compute.yaml + + run: + timeout: 3600 + script: python stress_tests/test_dead_actors.py --num-nodes=4 --num-parents=3 --num-children=3 # Stress tests with placement groups - name: stress_test_placement_group diff --git a/release/nightly_tests/stress_tests/smoke_test_compute.yaml b/release/nightly_tests/stress_tests/smoke_test_compute.yaml new file mode 100644 index 000000000..ebded9eea --- /dev/null +++ b/release/nightly_tests/stress_tests/smoke_test_compute.yaml @@ -0,0 +1,23 @@ +cloud_id: {{env["ANYSCALE_CLOUD_ID"]}} +region: us-west-2 + +max_workers: 4 + +aws: + BlockDeviceMappings: + - DeviceName: /dev/sda1 + Ebs: + VolumeSize: 500 + +head_node_type: + name: head_node + instance_type: m4.4xlarge + +worker_node_types: + - name: worker_node + instance_type: m4.large + min_workers: 4 + max_workers: 4 + use_spot: false + resources: + cpu: 2 diff --git a/release/nightly_tests/stress_tests/test_dead_actors.py b/release/nightly_tests/stress_tests/test_dead_actors.py index 0150f7c24..529b4e0b0 100644 --- a/release/nightly_tests/stress_tests/test_dead_actors.py +++ b/release/nightly_tests/stress_tests/test_dead_actors.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import argparse import json import logging import numpy as np @@ -14,13 +15,6 @@ logger = logging.getLogger(__name__) ray.init(address="auto") -# These numbers need to correspond with the autoscaler config file. -# The number of remote nodes in the autoscaler should upper bound -# these because sometimes nodes fail to update. -NUM_REMOTE_NODES = 100 -HEAD_NODE_CPUS = 2 -NUM_REMOTE_CPU = NUM_REMOTE_NODES * HEAD_NODE_CPUS - @ray.remote class Child(object): @@ -59,24 +53,38 @@ class Parent(object): ray.get([child.__ray_terminate__.remote() for child in self.children]) +def parse_script_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--num-nodes", type=int, default=100) + parser.add_argument("--num-parents", type=int, default=10) + parser.add_argument("--num-children", type=int, default=10) + parser.add_argument("--death-probability", type=int, default=0.95) + return parser.parse_known_args() + + if __name__ == "__main__": + args, unknown = parse_script_args() result = {"success": 0} + # These numbers need to correspond with the autoscaler config file. + # The number of remote nodes in the autoscaler should upper bound + # these because sometimes nodes fail to update. + num_remote_nodes = args.num_nodes + num_parents = args.num_parents + num_children = args.num_children + death_probability = args.death_probability + try: # Wait until the expected number of nodes have joined the cluster. while True: num_nodes = len(ray.nodes()) logger.info("Waiting for nodes {}/{}".format( - num_nodes, NUM_REMOTE_NODES + 1)) - if num_nodes >= NUM_REMOTE_NODES + 1: + num_nodes, num_remote_nodes + 1)) + if num_nodes >= num_remote_nodes + 1: break time.sleep(5) logger.info("Nodes have all joined. There are %s resources.", ray.cluster_resources()) - num_parents = 10 - num_children = 10 - death_probability = 0.95 - parents = [ Parent.remote(num_children, death_probability) for _ in range(num_parents) diff --git a/release/nightly_tests/stress_tests/test_many_tasks.py b/release/nightly_tests/stress_tests/test_many_tasks.py index 12ca1ae0e..550f8d50e 100644 --- a/release/nightly_tests/stress_tests/test_many_tasks.py +++ b/release/nightly_tests/stress_tests/test_many_tasks.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import argparse from collections import defaultdict import numpy as np import json @@ -14,13 +15,6 @@ logger = logging.getLogger(__name__) ray.init(address="auto") -# These numbers need to correspond with the autoscaler config file. -# The number of remote nodes in the autoscaler should upper bound -# these because sometimes nodes fail to update. -NUM_REMOTE_NODES = 100 -HEAD_NODE_CPUS = 2 -NUM_REMOTE_CPUS = NUM_REMOTE_NODES * HEAD_NODE_CPUS - @ray.remote(num_cpus=1) def f(size, *xs): @@ -34,28 +28,40 @@ class Actor(object): # Stage 0: Submit a bunch of small tasks with large returns. -def stage0(): +def stage0(smoke=False): + num_tasks = 1000 + size = 1000000 + + if smoke: + num_tasks //= 25 + size //= 25 + stage_0_iterations = [] start_time = time.time() logger.info("Submitting many tasks with large returns.") for i in range(10): iteration_start = time.time() logger.info("Iteration %s", i) - ray.get([f.remote(1000000) for _ in range(1000)]) + ray.get([f.remote(size) for _ in range(num_tasks)]) stage_0_iterations.append(time.time() - iteration_start) return time.time() - start_time # Stage 1: Launch a bunch of tasks. -def stage1(): +def stage1(smoke=False): + num_tasks = 100000 + + if smoke: + num_tasks //= 25 + stage_1_iterations = [] start_time = time.time() logger.info("Submitting many tasks.") for i in range(10): iteration_start = time.time() logger.info("Iteration %s", i) - ray.get([f.remote(0) for _ in range(100000)]) + ray.get([f.remote(0) for _ in range(num_tasks)]) stage_1_iterations.append(time.time() - iteration_start) return time.time() - start_time, stage_1_iterations @@ -64,7 +70,12 @@ def stage1(): # Launch a bunch of tasks, each with a bunch of dependencies. TODO(rkn): This # test starts to fail if we increase the number of tasks in the inner loop from # 500 to 1000. (approximately 615 seconds) -def stage2(): +def stage2(smoke=False): + num_tasks_per_iteration = 500 + + if smoke: + num_tasks_per_iteration //= 25 + stage_2_iterations = [] start_time = time.time() logger.info("Submitting tasks with many dependencies.") @@ -74,7 +85,9 @@ def stage2(): for i in range(20): logger.info("Iteration %s. Cumulative time %s seconds", i, time.time() - start_time) - x_ids = [f.remote(0, *x_ids) for _ in range(500)] + x_ids = [ + f.remote(0, *x_ids) for _ in range(num_tasks_per_iteration) + ] ray.get(x_ids) stage_2_iterations.append(time.time() - iteration_start) logger.info("Finished after %s seconds.", time.time() - start_time) @@ -82,18 +95,23 @@ def stage2(): # Create a bunch of actors. -def stage3(): +def stage3(total_num_remote_cpus, smoke=False): start_time = time.time() - logger.info("Creating %s actors.", NUM_REMOTE_CPUS) - actors = [Actor.remote() for _ in range(NUM_REMOTE_CPUS)] + logger.info("Creating %s actors.", total_num_remote_cpus) + actors = [Actor.remote() for _ in range(total_num_remote_cpus)] stage_3_creation_time = time.time() - start_time logger.info("Finished stage 3 actor creation in %s seconds.", stage_3_creation_time) + num_tasks = 1000 + + if smoke: + num_tasks //= 25 + # Submit a bunch of small tasks to each actor. (approximately 1070 seconds) start_time = time.time() logger.info("Submitting many small actor tasks.") - for N in [1000, 100000]: + for N in [num_tasks, num_tasks * 100]: x_ids = [] for i in range(N): x_ids = [a.method.remote(0) for a in actors] @@ -143,25 +161,41 @@ def stage4(): return avg_spread +def parse_script_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--num-nodes", type=int, default=100) + parser.add_argument("--smoke-test", action="store_true") + return parser.parse_known_args() + + if __name__ == "__main__": + args, unknown = parse_script_args() + # These numbers need to correspond with the autoscaler config file. + # The number of remote nodes in the autoscaler should upper bound + # these because sometimes nodes fail to update. + num_remote_nodes = args.num_nodes + num_remote_cpus = 2 + total_num_remote_cpus = num_remote_nodes * num_remote_cpus + is_smoke_test = args.smoke_test + result = {"success": 0} try: # Wait until the expected number of nodes have joined the cluster. while True: num_nodes = len(ray.nodes()) logger.info("Waiting for nodes {}/{}".format( - num_nodes, NUM_REMOTE_NODES + 1)) - if num_nodes >= NUM_REMOTE_NODES + 1: + num_nodes, num_remote_nodes + 1)) + if num_nodes >= num_remote_nodes + 1: break time.sleep(5) logger.info("Nodes have all joined. There are %s resources.", ray.cluster_resources()) - stage_0_time = stage0() + stage_0_time = stage0(smoke=is_smoke_test) logger.info("Finished stage 0 after %s seconds.", stage_0_time) result["stage_0_time"] = stage_0_time - stage_1_time, stage_1_iterations = stage1() + stage_1_time, stage_1_iterations = stage1(smoke=is_smoke_test) logger.info("Finished stage 1 after %s seconds.", stage_1_time) result["stage_1_time"] = stage_1_time result["stage_1_avg_iteration_time"] = sum(stage_1_iterations) / len( @@ -169,7 +203,7 @@ if __name__ == "__main__": result["stage_1_max_iteration_time"] = max(stage_1_iterations) result["stage_1_min_iteration_time"] = min(stage_1_iterations) - stage_2_time, stage_2_iterations = stage2() + stage_2_time, stage_2_iterations = stage2(smoke=is_smoke_test) logger.info("Finished stage 2 after %s seconds.", stage_2_time) result["stage_2_time"] = stage_2_time result["stage_2_avg_iteration_time"] = sum(stage_2_iterations) / len( @@ -177,7 +211,8 @@ if __name__ == "__main__": result["stage_2_max_iteration_time"] = max(stage_2_iterations) result["stage_2_min_iteration_time"] = min(stage_2_iterations) - stage_3_time, stage_3_creation_time = stage3() + stage_3_time, stage_3_creation_time = stage3( + total_num_remote_cpus, smoke=is_smoke_test) logger.info("Finished stage 3 in %s seconds.", stage_3_time) result["stage_3_creation_time"] = stage_3_creation_time result["stage_3_time"] = stage_3_time