ray/release/k8s_tests/run_gcs_ft_on_k8s.py
Yi Cheng 4d91f516ca
[nightly] Add serve ha chaos test into nightly test. (#27413)
This PR adds a serve ha test. The flow of the tests is:

1. check the kube ray build
2. start ray service
3. warm up the cluster
4. start killing nodes
5. get the stats and make sure it's good
2022-08-29 16:55:36 -07:00

371 lines
10 KiB
Python

import subprocess
from kubernetes import client, config, watch
import requests
import random
import uuid
import pathlib
import time
import ray
import os
# global variables for the cluster informations
cluster_id = str(uuid.uuid4()).split("-")[0]
ray_cluster_name = "cluster-" + cluster_id
ray_service_name = "service-" + cluster_id
locust_id = "ray-locust-" + cluster_id
if os.environ.get("RAY_IMAGE") is not None:
ray_image = os.environ.get("RAY_IMAGE")
elif ray.__version__ != "3.0.0.dev0":
ray_image = f"rayproject/ray:{ray.__version__}"
elif ray.__commit__ == "{{RAY_COMMIT_SHA}}":
ray_image = "rayproject/ray:nightly"
else:
ray_image = f"rayproject/ray:{ray.__commit__[:6]}"
config.load_kube_config()
cli = client.CoreV1Api()
yaml_path = pathlib.Path("/tmp/ray_v1alpha1_rayservice.yaml")
def check_kuberay_installed():
# Make sure the ray namespace exists
KUBERAY_VERSION = "v0.3.0"
uri = (
"github.com/ray-project/kuberay/manifests"
f"/base?ref={KUBERAY_VERSION}&timeout=90s"
)
print(
subprocess.check_output(
[
"kubectl",
"apply",
"-k",
uri,
]
).decode()
)
pods = subprocess.check_output(
["kubectl", "get", "pods", "--namespace", "ray-system", "--no-headers"]
).decode()
assert pods.split("\n") != 0
def start_rayservice():
# step-1: generate the yaml file
print(f"Using ray image: {ray_image}")
solution = "\n".join(
[
f" {line}"
for line in pathlib.Path("./solution.py").read_text().splitlines()
]
)
locustfile = "\n".join(
[
f" {line}"
for line in pathlib.Path("./locustfile.py").read_text().splitlines()
]
)
template = (
pathlib.Path("ray_v1alpha1_rayservice_template.yaml")
.read_text()
.format(
cluster_id=cluster_id,
ray_image=ray_image,
solution=solution,
locustfile=locustfile,
)
)
print("=== YamlFile ===")
print(template)
tmp_yaml = pathlib.Path("/tmp/ray_v1alpha1_rayservice.yaml")
tmp_yaml.write_text(template)
print("=== Get Pods from ray-system ===")
print(
subprocess.check_output(
["kubectl", "get", "pods", "--namespace", "ray-system", "--no-headers"]
).decode()
)
# step-2: create the cluter
print(f"Creating cluster with id: {cluster_id}")
print(subprocess.check_output(["kubectl", "create", "-f", str(tmp_yaml)]).decode())
# step-3: make sure the ray cluster is up
w = watch.Watch()
start_time = time.time()
head_pod_name = None
for event in w.stream(
func=cli.list_namespaced_pod,
namespace="default",
label_selector=f"rayCluster={ray_cluster_name},ray.io/node-type=head",
timeout_seconds=60,
):
if event["object"].status.phase == "Running":
assert event["object"].kind == "Pod"
head_pod_name = event["object"].metadata.name
end_time = time.time()
print(f"{cluster_id} started in {end_time-start_time} sec")
print(f"head pod {head_pod_name}")
break
assert head_pod_name is not None
# step-4: e2e check it's alive
cmd = """
import requests
print(requests.get('http://localhost:8000/?val=123').text)
"""
while True:
try:
resp = (
subprocess.check_output(
f'kubectl exec {head_pod_name} -- python -c "{cmd}"', shell=True
)
.decode()
.strip()
)
if resp == "375":
print("Service is up now!")
break
else:
print(f"Failed with msg {resp}")
except Exception as e:
print("Error", e)
time.sleep(2)
def start_port_forward():
proc = None
proc = subprocess.Popen(
[
"kubectl",
"port-forward",
f"svc/{ray_service_name}-serve-svc",
"8000:8000",
"--address=0.0.0.0",
]
)
while True:
try:
resp = requests.get(
"http://localhost:8000/",
timeout=1,
params={
"val": 10,
},
)
if resp.status_code == 200:
print("The ray service is ready!!!")
break
except requests.exceptions.Timeout:
pass
except requests.exceptions.ConnectionError:
pass
print("Waiting for the proxy to be alive")
time.sleep(1)
return proc
def warmup_cluster(num_reqs):
for _ in range(num_reqs):
resp = requests.get(
"http://localhost:8000/",
timeout=1,
params={
"val": 10,
},
)
assert resp.status_code == 200
def start_sending_traffics(duration, users):
print("=== Install locust by helm ===")
yaml_config = (
pathlib.Path("locust-run.yaml")
.read_text()
.format(users=users, cluster_id=cluster_id, duration=int(duration))
)
print("=== Locust YAML ===")
print(yaml_config)
pathlib.Path("/tmp/locust-run-config.yaml").write_text(yaml_config)
helm_install_logs = subprocess.check_output(
[
"helm",
"install",
locust_id,
"deliveryhero/locust",
"-f",
"/tmp/locust-run-config.yaml",
]
)
print(helm_install_logs)
proc = subprocess.Popen(
[
"kubectl",
"port-forward",
f"svc/ray-locust-{cluster_id}",
"8080:8089",
"--address=0.0.0.0",
]
)
return proc
def dump_pods_actors(pod_name):
print(
subprocess.run(
f"kubectl exec {pod_name} -- ps -ef | grep ::",
shell=True,
capture_output=True,
).stdout.decode()
)
def kill_header():
pods = cli.list_namespaced_pod(
"default",
label_selector=f"rayCluster={ray_cluster_name},ray.io/node-type=head",
)
if pods.items[0].status.phase == "Running":
print(f"Killing header {pods.items[0].metadata.name}")
dump_pods_actors(pods.items[0].metadata.name)
cli.delete_namespaced_pod(pods.items[0].metadata.name, "default")
def kill_worker():
pods = cli.list_namespaced_pod(
"default",
label_selector=f"rayCluster={ray_cluster_name},ray.io/node-type=worker",
)
alive_pods = [
(p.status.start_time, p.metadata.name)
for p in pods.items
if p.status.phase == "Running"
]
# sorted(alive_pods)
# We kill the oldest nodes for now given the memory leak in serve.
# to_be_killed = alive_pods[-1][1]
to_be_killed = random.choice(alive_pods)[1]
print(f"Killing worker {to_be_killed}")
dump_pods_actors(pods.items[0].metadata.name)
cli.delete_namespaced_pod(to_be_killed, "default")
def start_killing_nodes(duration, kill_interval, kill_head_every_n):
"""Kill the nodes in ray cluster.
duration: How long does we run the test (seconds)
kill_interval: The interval between two kills (seconds)
kill_head_every_n: For every n kills, we kill a head node
"""
for kill_idx in range(1, int(duration / kill_interval)):
while True:
try:
# kill
if kill_idx % kill_head_every_n == 0:
kill_header()
else:
kill_worker()
break
except Exception as e:
from time import sleep
print(f"Fail to kill node, retry in 5 seconds: {e}")
sleep(5)
time.sleep(kill_interval)
def get_stats():
labels = [
f"app.kubernetes.io/instance=ray-locust-{cluster_id}",
"app.kubernetes.io/name=locust,component=master",
]
pods = cli.list_namespaced_pod("default", label_selector=",".join(labels))
assert len(pods.items) == 1
pod_name = pods.items[0].metadata.name
subprocess.check_output(
[
"kubectl",
"cp",
f"{pod_name}:/home/locust/test_result_{cluster_id}_stats_history.csv",
"./stats_history.csv",
]
)
data = []
with open("stats_history.csv") as f:
import csv
reader = csv.reader(f)
for d in reader:
data.append(d)
# The first 5mins is for warming up
offset = 300
start_time = int(data[offset][0])
end_time = int(data[-1][0])
# 17 is the index for total requests
# 18 is the index for total failed requests
total = float(data[-1][17]) - float(data[offset][17])
failures = float(data[-1][18]) - float(data[offset][18])
# Available, through put
return (total - failures) / total, total / (end_time - start_time)
def main():
procs = []
try:
check_kuberay_installed()
start_rayservice()
procs.append(start_port_forward())
warmup_cluster(200)
users = 60
duration = 5 * 60 * 60
procs.append(start_sending_traffics(duration * 1.1, users))
start_killing_nodes(duration, 60, 6)
rate, qps = get_stats()
print("Result:", rate, qps)
assert rate > 0.9995
assert qps > users * 10 * 0.8
except Exception as e:
print("Experiment failed")
raise e
finally:
print("=== Cleanup ===")
subprocess.run(
["kubectl", "delete", "-f", str(yaml_path)],
capture_output=True,
)
subprocess.run(
["helm", "uninstall", locust_id],
capture_output=True,
)
print("Kill processes")
for p in procs:
p.kill()
if __name__ == "__main__":
try:
# Connect to ray so that the auto suspense
# will not start.
ray.init("auto")
except Exception:
# It doesnt' matter if it failed.
pass
main()