[Test] Support stress test smoke test (#16827)

* Support smoke test

* lint
This commit is contained in:
SangBin Cho 2021-07-02 09:50:26 -07:00 committed by GitHub
parent ee9a1b022e
commit 7bd3138227
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 114 additions and 38 deletions

View file

@ -192,7 +192,12 @@
smoke_test:
cluster:
compute_template: shuffle/shuffle_compute_smoke.yaml # Does not exist yet
app_config: stress_tests/stress_tests_app_config.yaml
compute_template: stress_tests/smoke_test_compute.yaml
run:
timeout: 3600
script: python stress_tests/test_many_tasks.py --num-nodes=4 --smoke-test
# Stress tests with dead actors
- name: stress_test_dead_actors
@ -206,7 +211,12 @@
smoke_test:
cluster:
compute_template: shuffle/shuffle_compute_smoke.yaml # Does not exist yet
app_config: stress_tests/stress_tests_app_config.yaml
compute_template: stress_tests/smoke_test_compute.yaml
run:
timeout: 3600
script: python stress_tests/test_dead_actors.py --num-nodes=4 --num-parents=3 --num-children=3
# Stress tests with placement groups
- name: stress_test_placement_group

View file

@ -0,0 +1,23 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2
max_workers: 4
aws:
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 500
head_node_type:
name: head_node
instance_type: m4.4xlarge
worker_node_types:
- name: worker_node
instance_type: m4.large
min_workers: 4
max_workers: 4
use_spot: false
resources:
cpu: 2

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python
import argparse
import json
import logging
import numpy as np
@ -14,13 +15,6 @@ logger = logging.getLogger(__name__)
ray.init(address="auto")
# These numbers need to correspond with the autoscaler config file.
# The number of remote nodes in the autoscaler should upper bound
# these because sometimes nodes fail to update.
NUM_REMOTE_NODES = 100
HEAD_NODE_CPUS = 2
NUM_REMOTE_CPU = NUM_REMOTE_NODES * HEAD_NODE_CPUS
@ray.remote
class Child(object):
@ -59,24 +53,38 @@ class Parent(object):
ray.get([child.__ray_terminate__.remote() for child in self.children])
def parse_script_args():
parser = argparse.ArgumentParser()
parser.add_argument("--num-nodes", type=int, default=100)
parser.add_argument("--num-parents", type=int, default=10)
parser.add_argument("--num-children", type=int, default=10)
parser.add_argument("--death-probability", type=int, default=0.95)
return parser.parse_known_args()
if __name__ == "__main__":
args, unknown = parse_script_args()
result = {"success": 0}
# These numbers need to correspond with the autoscaler config file.
# The number of remote nodes in the autoscaler should upper bound
# these because sometimes nodes fail to update.
num_remote_nodes = args.num_nodes
num_parents = args.num_parents
num_children = args.num_children
death_probability = args.death_probability
try:
# Wait until the expected number of nodes have joined the cluster.
while True:
num_nodes = len(ray.nodes())
logger.info("Waiting for nodes {}/{}".format(
num_nodes, NUM_REMOTE_NODES + 1))
if num_nodes >= NUM_REMOTE_NODES + 1:
num_nodes, num_remote_nodes + 1))
if num_nodes >= num_remote_nodes + 1:
break
time.sleep(5)
logger.info("Nodes have all joined. There are %s resources.",
ray.cluster_resources())
num_parents = 10
num_children = 10
death_probability = 0.95
parents = [
Parent.remote(num_children, death_probability)
for _ in range(num_parents)

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python
import argparse
from collections import defaultdict
import numpy as np
import json
@ -14,13 +15,6 @@ logger = logging.getLogger(__name__)
ray.init(address="auto")
# These numbers need to correspond with the autoscaler config file.
# The number of remote nodes in the autoscaler should upper bound
# these because sometimes nodes fail to update.
NUM_REMOTE_NODES = 100
HEAD_NODE_CPUS = 2
NUM_REMOTE_CPUS = NUM_REMOTE_NODES * HEAD_NODE_CPUS
@ray.remote(num_cpus=1)
def f(size, *xs):
@ -34,28 +28,40 @@ class Actor(object):
# Stage 0: Submit a bunch of small tasks with large returns.
def stage0():
def stage0(smoke=False):
num_tasks = 1000
size = 1000000
if smoke:
num_tasks //= 25
size //= 25
stage_0_iterations = []
start_time = time.time()
logger.info("Submitting many tasks with large returns.")
for i in range(10):
iteration_start = time.time()
logger.info("Iteration %s", i)
ray.get([f.remote(1000000) for _ in range(1000)])
ray.get([f.remote(size) for _ in range(num_tasks)])
stage_0_iterations.append(time.time() - iteration_start)
return time.time() - start_time
# Stage 1: Launch a bunch of tasks.
def stage1():
def stage1(smoke=False):
num_tasks = 100000
if smoke:
num_tasks //= 25
stage_1_iterations = []
start_time = time.time()
logger.info("Submitting many tasks.")
for i in range(10):
iteration_start = time.time()
logger.info("Iteration %s", i)
ray.get([f.remote(0) for _ in range(100000)])
ray.get([f.remote(0) for _ in range(num_tasks)])
stage_1_iterations.append(time.time() - iteration_start)
return time.time() - start_time, stage_1_iterations
@ -64,7 +70,12 @@ def stage1():
# Launch a bunch of tasks, each with a bunch of dependencies. TODO(rkn): This
# test starts to fail if we increase the number of tasks in the inner loop from
# 500 to 1000. (approximately 615 seconds)
def stage2():
def stage2(smoke=False):
num_tasks_per_iteration = 500
if smoke:
num_tasks_per_iteration //= 25
stage_2_iterations = []
start_time = time.time()
logger.info("Submitting tasks with many dependencies.")
@ -74,7 +85,9 @@ def stage2():
for i in range(20):
logger.info("Iteration %s. Cumulative time %s seconds", i,
time.time() - start_time)
x_ids = [f.remote(0, *x_ids) for _ in range(500)]
x_ids = [
f.remote(0, *x_ids) for _ in range(num_tasks_per_iteration)
]
ray.get(x_ids)
stage_2_iterations.append(time.time() - iteration_start)
logger.info("Finished after %s seconds.", time.time() - start_time)
@ -82,18 +95,23 @@ def stage2():
# Create a bunch of actors.
def stage3():
def stage3(total_num_remote_cpus, smoke=False):
start_time = time.time()
logger.info("Creating %s actors.", NUM_REMOTE_CPUS)
actors = [Actor.remote() for _ in range(NUM_REMOTE_CPUS)]
logger.info("Creating %s actors.", total_num_remote_cpus)
actors = [Actor.remote() for _ in range(total_num_remote_cpus)]
stage_3_creation_time = time.time() - start_time
logger.info("Finished stage 3 actor creation in %s seconds.",
stage_3_creation_time)
num_tasks = 1000
if smoke:
num_tasks //= 25
# Submit a bunch of small tasks to each actor. (approximately 1070 seconds)
start_time = time.time()
logger.info("Submitting many small actor tasks.")
for N in [1000, 100000]:
for N in [num_tasks, num_tasks * 100]:
x_ids = []
for i in range(N):
x_ids = [a.method.remote(0) for a in actors]
@ -143,25 +161,41 @@ def stage4():
return avg_spread
def parse_script_args():
parser = argparse.ArgumentParser()
parser.add_argument("--num-nodes", type=int, default=100)
parser.add_argument("--smoke-test", action="store_true")
return parser.parse_known_args()
if __name__ == "__main__":
args, unknown = parse_script_args()
# These numbers need to correspond with the autoscaler config file.
# The number of remote nodes in the autoscaler should upper bound
# these because sometimes nodes fail to update.
num_remote_nodes = args.num_nodes
num_remote_cpus = 2
total_num_remote_cpus = num_remote_nodes * num_remote_cpus
is_smoke_test = args.smoke_test
result = {"success": 0}
try:
# Wait until the expected number of nodes have joined the cluster.
while True:
num_nodes = len(ray.nodes())
logger.info("Waiting for nodes {}/{}".format(
num_nodes, NUM_REMOTE_NODES + 1))
if num_nodes >= NUM_REMOTE_NODES + 1:
num_nodes, num_remote_nodes + 1))
if num_nodes >= num_remote_nodes + 1:
break
time.sleep(5)
logger.info("Nodes have all joined. There are %s resources.",
ray.cluster_resources())
stage_0_time = stage0()
stage_0_time = stage0(smoke=is_smoke_test)
logger.info("Finished stage 0 after %s seconds.", stage_0_time)
result["stage_0_time"] = stage_0_time
stage_1_time, stage_1_iterations = stage1()
stage_1_time, stage_1_iterations = stage1(smoke=is_smoke_test)
logger.info("Finished stage 1 after %s seconds.", stage_1_time)
result["stage_1_time"] = stage_1_time
result["stage_1_avg_iteration_time"] = sum(stage_1_iterations) / len(
@ -169,7 +203,7 @@ if __name__ == "__main__":
result["stage_1_max_iteration_time"] = max(stage_1_iterations)
result["stage_1_min_iteration_time"] = min(stage_1_iterations)
stage_2_time, stage_2_iterations = stage2()
stage_2_time, stage_2_iterations = stage2(smoke=is_smoke_test)
logger.info("Finished stage 2 after %s seconds.", stage_2_time)
result["stage_2_time"] = stage_2_time
result["stage_2_avg_iteration_time"] = sum(stage_2_iterations) / len(
@ -177,7 +211,8 @@ if __name__ == "__main__":
result["stage_2_max_iteration_time"] = max(stage_2_iterations)
result["stage_2_min_iteration_time"] = min(stage_2_iterations)
stage_3_time, stage_3_creation_time = stage3()
stage_3_time, stage_3_creation_time = stage3(
total_num_remote_cpus, smoke=is_smoke_test)
logger.info("Finished stage 3 in %s seconds.", stage_3_time)
result["stage_3_creation_time"] = stage_3_creation_time
result["stage_3_time"] = stage_3_time