[Nightly test] Chaos test fixture (#20277)

This PR is mostly for implementing "fixture" for nightly test. Note that the current fixture implementation is not that great, and we can probably improve this in the future after refactoring e2e.py.
This commit is contained in:
SangBin Cho 2021-11-25 10:13:29 +09:00 committed by GitHub
parent 5ce79d0a46
commit cd7a32f1a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 18 deletions

View file

@ -930,7 +930,10 @@ def teardown_tls(key_filepath, cert_filepath, temp_dir):
del os.environ["RAY_TLS_CA_CERT"]
def get_and_run_node_killer(node_kill_interval_s):
def get_and_run_node_killer(node_kill_interval_s,
namespace=None,
lifetime=None,
no_start=False):
assert ray.is_initialized(), (
"The API is only available when Ray is initialized.")
@ -999,12 +1002,17 @@ def get_and_run_node_killer(node_kill_interval_s):
head_node_ip = ray.worker.global_worker.node_ip_address
head_node_id = ray.worker.global_worker.current_node_id.hex()
# Schedule the actor on the current node.
node_killer = NodeKillerActor.options(resources={
f"node:{head_node_ip}": 0.001
}).remote(
head_node_id, node_kill_interval_s=node_kill_interval_s)
node_killer = NodeKillerActor.options(
resources={
f"node:{head_node_ip}": 0.001
},
namespace=namespace,
name="node_killer",
lifetime=lifetime).remote(
head_node_id, node_kill_interval_s=node_kill_interval_s)
print("Waiting for node killer actor to be ready...")
ray.get(node_killer.ready.remote())
print("Node killer actor is ready now.")
node_killer.run.remote()
if not no_start:
node_killer.run.remote()
return node_killer

View file

@ -31,11 +31,11 @@ def assert_no_system_failure(p, timeout):
"worker_node_types": {
"cpu_node": {
"resources": {
"CPU": 8,
"CPU": 4,
},
"node_config": {},
"min_workers": 0,
"max_workers": 4,
"max_workers": 3,
},
},
}],
@ -58,7 +58,7 @@ def test_chaos_task_retry(ray_start_chaos_cluster):
return ray.get(task.remote())
# 50MB of return values.
TOTAL_TASKS = 300
TOTAL_TASKS = 100
pb = ProgressBar("Chaos test sanity check", TOTAL_TASKS)
results = [invoke_nested_task.remote() for _ in range(TOTAL_TASKS)]
@ -82,7 +82,7 @@ def test_chaos_task_retry(ray_start_chaos_cluster):
"worker_node_types": {
"cpu_node": {
"resources": {
"CPU": 8,
"CPU": 4,
},
"node_config": {},
"min_workers": 0,
@ -104,7 +104,7 @@ def test_chaos_actor_retry(ray_start_chaos_cluster):
def get(self):
return self.letter_dict
NUM_CPUS = 32
NUM_CPUS = 16
TOTAL_TASKS = 300
pb = ProgressBar("Chaos test sanity check", TOTAL_TASKS * NUM_CPUS)

View file

@ -10,7 +10,7 @@
run:
timeout: 3600
prepare: python wait_cluster.py 10 600
prepare: python wait_cluster.py 10 600; python setup_chaos.py --no-start
script: python chaos_test/test_chaos_basic.py --workload=tasks
stable: false
@ -21,7 +21,7 @@
run:
timeout: 3600
prepare: python wait_cluster.py 10 600
prepare: python wait_cluster.py 10 600; python setup_chaos.py --no-start
script: python chaos_test/test_chaos_basic.py --workload=actors
stable: false

View file

@ -10,8 +10,7 @@ import numpy as np
import ray
from ray.data.impl.progress_bar import ProgressBar
from ray._private.test_utils import (monitor_memory_usage, wait_for_condition,
get_and_run_node_killer)
from ray._private.test_utils import (monitor_memory_usage, wait_for_condition)
def run_task_workload(total_num_cpus, smoke):
@ -33,7 +32,7 @@ def run_task_workload(total_num_cpus, smoke):
time.sleep(0.8)
return ray.get(task.remote())
multiplier = 25
multiplier = 75
# For smoke mode, run less number of tasks
if smoke:
multiplier = 1
@ -198,9 +197,10 @@ def main():
# Step 3
print("Running with failures")
node_killer = get_and_run_node_killer(
node_kill_interval_s=args.node_kill_interval)
start = time.time()
node_killer = ray.get_actor(
"node_killer", namespace="release_test_namespace")
node_killer.run.remote()
workload(total_num_cpus, args.smoke)
print(f"Runtime when there are many failures: {time.time() - start}")
print(f"Total node failures: "
@ -213,6 +213,8 @@ def main():
# Report the result.
ray.get(monitor_actor.stop_run.remote())
print("Total number of killed nodes: "
f"{ray.get(node_killer.get_total_killed_nodes.remote())}")
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(
json.dumps({

View file

@ -0,0 +1,38 @@
import argparse
import ray
from ray._private.test_utils import get_and_run_node_killer
def parse_script_args():
parser = argparse.ArgumentParser()
parser.add_argument("--node-kill-interval", type=int, default=60)
parser.add_argument(
"--no-start",
action="store_true",
default=False,
help=("If set, node killer won't be starting to kill nodes when "
"the script is done. Driver needs to manually "
"obtain the node killer handle and invoke run method to "
"start killing nodes. If not set, as soon as "
"the script is done, nodes will be killed every "
"--node-kill-interval seconds."))
return parser.parse_known_args()
def main():
"""Start the chaos testing.
Currently chaos testing only covers random node failures.
"""
args, _ = parse_script_args()
ray.init(address="auto")
get_and_run_node_killer(
args.node_kill_interval,
namespace="release_test_namespace",
lifetime="detached",
no_start=args.no_start)
print("Successfully deployed a node killer.")
main()