ray/release/long_running_tests/workloads/serve_failure.py
mwtian 97f7e3d0e6
[e2e] do not terminate in serve_failure smoke test (#21925)
When the script terminates, it will also terminate its cluster including dashboard, which will prevent subsequent job submissions. Other long running e2e tests do not terminate in smoke test mode, so make `serve_failure` behave the same.
2022-01-27 15:36:46 -08:00

159 lines
4.6 KiB
Python

import json
import os
import random
import string
import time
import requests
import ray
from ray import serve
from ray.cluster_utils import Cluster
# Global variables / constants appear only right after imports.
# Ray serve deployment setup constants
NUM_REPLICAS = 7
MAX_BATCH_SIZE = 16
# Cluster setup constants
NUM_REDIS_SHARDS = 1
REDIS_MAX_MEMORY = 10**8
OBJECT_STORE_MEMORY = 10**8
NUM_NODES = 4
# RandomTest setup constants
CPUS_PER_NODE = 10
RAY_UNIT_TEST = "RAY_UNIT_TEST" in os.environ
def update_progress(result):
"""
Write test result json to /tmp/, which will be read from
anyscale product runs in each releaser test
"""
result["last_update"] = time.time()
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/release_test_output.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)
cluster = Cluster()
for i in range(NUM_NODES):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None,
num_cpus=16,
num_gpus=0,
resources={str(i): 2},
object_store_memory=OBJECT_STORE_MEMORY,
redis_max_memory=REDIS_MAX_MEMORY,
dashboard_host="0.0.0.0",
)
ray.init(
namespace="serve_failure_test",
address=cluster.address,
dashboard_host="0.0.0.0",
log_to_driver=True,
)
serve.start(detached=True)
@ray.remote
class RandomKiller:
def __init__(self, kill_period_s=1):
self.kill_period_s = kill_period_s
def _get_all_serve_actors(self):
controller = serve.api._get_global_client()._controller
routers = list(ray.get(controller.get_http_proxies.remote()).values())
all_handles = routers + [controller]
worker_handle_dict = ray.get(controller._all_running_replicas.remote())
for _, replica_info_list in worker_handle_dict.items():
for replica_info in replica_info_list:
all_handles.append(replica_info.actor_handle)
return all_handles
def run(self):
while True:
chosen = random.choice(self._get_all_serve_actors())
print(f"Killing {chosen}")
ray.kill(chosen, no_restart=False)
time.sleep(self.kill_period_s)
class RandomTest:
def __init__(self, max_deployments=1):
self.max_deployments = max_deployments
self.weighted_actions = [
(self.create_deployment, 1),
(self.verify_deployment, 4),
]
self.deployments = []
for _ in range(max_deployments):
self.create_deployment()
def create_deployment(self):
if len(self.deployments) == self.max_deployments:
deployment_to_delete = self.deployments.pop()
serve.get_deployment(deployment_to_delete).delete()
new_name = "".join(
[random.choice(string.ascii_letters) for _ in range(10)])
@serve.deployment(name=new_name)
def handler(self, *args):
return new_name
handler.deploy()
self.deployments.append(new_name)
def verify_deployment(self):
deployment = random.choice(self.deployments)
for _ in range(100):
try:
r = requests.get("http://127.0.0.1:8000/" + deployment)
assert r.text == deployment
except Exception:
print("Request to {} failed.".format(deployment))
time.sleep(0.01)
def run(self):
iteration = 0
start_time = time.time()
previous_time = start_time
while True:
for _ in range(20):
actions, weights = zip(*self.weighted_actions)
action_chosen = random.choices(actions, weights=weights)[0]
print(f"Executing {action_chosen}")
action_chosen()
new_time = time.time()
print("Iteration {}:\n"
" - Iteration time: {}.\n"
" - Absolute time: {}.\n"
" - Total elapsed time: {}.".format(
iteration, new_time - previous_time, new_time,
new_time - start_time))
update_progress({
"iteration": iteration,
"iteration_time": new_time - previous_time,
"absolute_time": new_time,
"elapsed_time": new_time - start_time,
})
previous_time = new_time
iteration += 1
if RAY_UNIT_TEST:
break
tester = RandomTest(max_deployments=NUM_NODES * CPUS_PER_NODE)
random_killer = RandomKiller.remote()
random_killer.run.remote()
tester.run()