From 4d91f516ca59abd2f4880f36e17630ad3effee11 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 29 Aug 2022 23:55:36 +0000 Subject: [PATCH] [nightly] Add serve ha chaos test into nightly test. (#27413) This PR adds a serve ha test. The flow of the tests is: 1. check the kube ray build 2. start ray service 3. warm up the cluster 4. start killing nodes 5. get the stats and make sure it's good --- release/k8s_tests/app_config.yaml | 2 + release/k8s_tests/compute_tpl.yaml | 5 - release/k8s_tests/locust-run.yaml | 19 + release/k8s_tests/locustfile.py | 33 ++ .../ray_v1alpha1_rayservice_template.yaml | 309 +++++++++++++++ release/k8s_tests/run_gcs_ft_on_k8s.py | 375 +++++++++++++++++- release/k8s_tests/solution.py | 50 +++ release/release_tests.yaml | 2 +- 8 files changed, 782 insertions(+), 13 deletions(-) create mode 100644 release/k8s_tests/locust-run.yaml create mode 100644 release/k8s_tests/locustfile.py create mode 100644 release/k8s_tests/ray_v1alpha1_rayservice_template.yaml create mode 100644 release/k8s_tests/solution.py diff --git a/release/k8s_tests/app_config.yaml b/release/k8s_tests/app_config.yaml index 1283b8a19..d97693f09 100644 --- a/release/k8s_tests/app_config.yaml +++ b/release/k8s_tests/app_config.yaml @@ -20,3 +20,5 @@ post_build_cmds: - curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - - sudo apt-get update && sudo apt-get install google-cloud-cli - sudo apt-get install google-cloud-sdk-gke-gcloud-auth-plugin + - curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash + - helm repo add deliveryhero https://charts.deliveryhero.io/ diff --git a/release/k8s_tests/compute_tpl.yaml b/release/k8s_tests/compute_tpl.yaml index db0dd6ea4..1045aa894 100644 --- a/release/k8s_tests/compute_tpl.yaml +++ b/release/k8s_tests/compute_tpl.yaml @@ -22,8 +22,3 @@ aws: Value: '{{env["ANYSCALE_USER"]}}' - Key: anyscale-expiration Value: '{{env["EXPIRATION_2D"]}}' - - BlockDeviceMappings: - - DeviceName: /dev/sda1 - Ebs: - VolumeSize: 200 \ No newline at end of file diff --git a/release/k8s_tests/locust-run.yaml b/release/k8s_tests/locust-run.yaml new file mode 100644 index 000000000..a9c9e1505 --- /dev/null +++ b/release/k8s_tests/locust-run.yaml @@ -0,0 +1,19 @@ +loadtest: + name: serve_ha_{cluster_id} + locust_host: "http://service-{cluster_id}-serve-svc:8000" + locust_locustfile_configmap: locusttest-{cluster_id} + loadtest.locust_lib_configmap: locusttest-{cluster_id} + locust_locustfile: locustfile.py +worker: + replicas: 4 +master: + args: + - "--spawn-rate=20" + - "--users={users}" + - "--autostart" + - "--run-time={duration}s" + - "--csv=test_result_{cluster_id}" + - "--print-stats" + - "--csv-full-history" + - "--reset-stats" + - "--enable-rebalancing" diff --git a/release/k8s_tests/locustfile.py b/release/k8s_tests/locustfile.py new file mode 100644 index 000000000..003b86535 --- /dev/null +++ b/release/k8s_tests/locustfile.py @@ -0,0 +1,33 @@ +import locust +from locust import task, HttpUser +import logging +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry + +logger = logging.getLogger(__name__) + + +class TimeoutHTTPAdapter(HTTPAdapter): + def __init__(self, timeout, *args, **kwargs): + self.timeout = timeout + super().__init__(*args, **kwargs) + + def send(self, request, **kwargs): + timeout = kwargs.get("timeout") + if timeout is None: + kwargs["timeout"] = self.timeout + return super().send(request, **kwargs) + + +class ServeTest(HttpUser): + wait_time = locust.wait_time.constant_throughput(10) + + def on_start(self): + retries = Retry( + total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504] + ) + self.client.mount("http://", TimeoutHTTPAdapter(max_retries=retries, timeout=1)) + + @task + def index(self): + self.client.get("/?val=3") diff --git a/release/k8s_tests/ray_v1alpha1_rayservice_template.yaml b/release/k8s_tests/ray_v1alpha1_rayservice_template.yaml new file mode 100644 index 000000000..1a252cfa2 --- /dev/null +++ b/release/k8s_tests/ray_v1alpha1_rayservice_template.yaml @@ -0,0 +1,309 @@ +kind: ConfigMap +apiVersion: v1 +metadata: + name: locusttest-{cluster_id} +data: + locustfile.py: | +{locustfile} +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: script-{cluster_id} +data: + solution.py: | +{solution} +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: redis-config-{cluster_id} + labels: + app: redis +data: + redis.conf: |- + dir /data + port 6379 + bind 0.0.0.0 + appendonly yes + protected-mode no + requirepass 5241590000000000 + pidfile /data/redis-6379.pid +--- +apiVersion: v1 +kind: Service +metadata: + name: redis-{cluster_id} + labels: + app: redis +spec: + type: ClusterIP + ports: + - name: redis + port: 6379 + selector: + app: redis +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis-{cluster_id} + labels: + app: redis +spec: + replicas: 1 + selector: + matchLabels: + app: redis + template: + metadata: + labels: + app: redis + spec: + containers: + - name: redis + image: redis:5.0.8 + command: + - "sh" + - "-c" + - "redis-server /usr/local/etc/redis/redis.conf" + ports: + - containerPort: 6379 + volumeMounts: + - name: config + mountPath: /usr/local/etc/redis/redis.conf + subPath: redis.conf + volumes: + - name: config + configMap: + name: redis-config-{cluster_id} +--- +apiVersion: ray.io/v1alpha1 +kind: RayService +metadata: + name: service-{cluster_id} + annotations: + ray.io/ft-enabled: "true" +spec: + serviceUnhealthySecondThreshold: 300 + deploymentUnhealthySecondThreshold: 300 + serveConfig: + importPath: solution.serve_entrypoint + runtimeEnv: | + env_vars: + PYTHONPATH: "/tmp/testing/" + deployments: + - name: a + numReplicas: 6 + rayActorOptions: + numCpus: 1 + - name: b + numReplicas: 6 + rayActorOptions: + numCpus: 1 + - name: c + numReplicas: 6 + rayActorOptions: + numCpus: 1 + - name: d + numReplicas: 6 + rayActorOptions: + numCpus: 1 + - name: e + numReplicas: 6 + rayActorOptions: + numCpus: 1 + - name: DAGDriver + numReplicas: 6 + rayActorOptions: + numCpus: 1 + rayClusterConfig: + rayVersion: '3.0.0.dev0' # should match the Ray version in the image of the containers + ######################headGroupSpecs################################# + # head group template and specs, (perhaps 'group' is not needed in the name) + headGroupSpec: + # Kubernetes Service Type, valid values are 'ClusterIP', 'NodePort' and 'LoadBalancer' + serviceType: ClusterIP + # the pod replicas in this group typed head (assuming there could be more than 1 in the future) + replicas: 1 + # logical group name, for this called head-group, also can be functional + # pod type head or worker + # rayNodeType: head # Not needed since it is under the headgroup + # the following params are used to complete the ray start: ray start --head --block --redis-port=6379 ... + rayStartParams: + port: '6379' # should match container port named gcs-server + object-store-memory: '100000000' + dashboard-host: '0.0.0.0' + num-cpus: '0' # can be auto-completed from the limits + node-ip-address: $MY_POD_IP # auto-completed as the head pod IP + block: 'true' + #pod template + template: + metadata: + labels: + rayCluster: cluster-{cluster_id} + rayNodeType: head # will be injected if missing, must be head or wroker + groupName: headgroup # will be injected if missing + # annotations for pod + annotations: + key: value + spec: + volumes: + - name: script + configMap: + name: script-{cluster_id} + - name: log-volume + emptyDir: {{}} + containers: + - name: ray-head + image: {ray_image} + imagePullPolicy: Always + env: + - name: MY_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: RAY_REDIS_ADDRESS + value: redis-{cluster_id}:6379 + - name: RAY_gcs_rpc_server_reconnect_timeout_s + value: "600" + - name: RAY_num_heartbeats_timeout + value: "120" + - name: RAY_gcs_failover_worker_reconnect_timeout + value: "600" + resources: + limits: + cpu: 2 + requests: + cpu: 2 + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 # Ray dashboard + name: dashboard + - containerPort: 10001 + name: client + - containerPort: 8000 + name: serve + volumeMounts: + - name: script + mountPath: /tmp/testing/solution.py + subPath: solution.py + - mountPath: /tmp/ray/ + name: log-volume + workerGroupSpecs: + # the pod replicas in this group typed worker + - replicas: 12 + minReplicas: 12 + maxReplicas: 12 + # logical group name, for this called small-group, also can be functional + groupName: small-group + # if worker pods need to be added, we can simply increment the replicas + # if worker pods need to be removed, we decrement the replicas, and populate the podsToDelete list + # the operator will remove pods from the list until the number of replicas is satisfied + # when a pod is confirmed to be deleted, its name will be removed from the list below + #scaleStrategy: + # workersToDelete: + # - raycluster-complete-worker-small-group-bdtwh + # - raycluster-complete-worker-small-group-hv457 + # - raycluster-complete-worker-small-group-k8tj7 + # the following params are used to complete the ray start: ray start --block --node-ip-address= ... + rayStartParams: + node-ip-address: $MY_POD_IP + block: 'true' + num-cpus: '4' # can be auto-completed from the limits + #pod template + template: + metadata: + labels: + key: value + rayCluster: cluster-{cluster_id} + # annotations for pod + annotations: + key: value + spec: + initContainers: + # the env var $RAY_IP is set by the operator if missing, with the value of the head service name + - name: init-myservice + image: busybox:1.28 + command: ['sh', '-c', "until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for myservice; sleep 2; done"] + volumes: + - name: script + configMap: + name: script-{cluster_id} + - name: log-volume + emptyDir: {{}} + containers: + - name: machine-learning # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc' + image: {ray_image} + imagePullPolicy: Always + livenessProbe: + initialDelaySeconds: 30 + periodSeconds: 5 + timeoutSeconds: 10 + readinessProbe: + initialDelaySeconds: 30 + periodSeconds: 5 + timeoutSeconds: 10 + + # environment variables to set in the container.Optional. + # Refer to https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/ + env: + - name: RAY_DISABLE_DOCKER_CPU_WARNING + value: "1" + - name: TYPE + value: "worker" + - name: CPU_REQUEST + valueFrom: + resourceFieldRef: + containerName: machine-learning + resource: requests.cpu + - name: CPU_LIMITS + valueFrom: + resourceFieldRef: + containerName: machine-learning + resource: limits.cpu + - name: MEMORY_LIMITS + valueFrom: + resourceFieldRef: + containerName: machine-learning + resource: limits.memory + - name: MEMORY_REQUESTS + valueFrom: + resourceFieldRef: + containerName: machine-learning + resource: requests.memory + - name: MY_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: MY_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: RAY_gcs_rpc_server_reconnect_timeout_s + value: "600" + - name: RAY_num_heartbeats_timeout + value: "120" + - name: RAY_gcs_failover_worker_reconnect_timeout + value: "600" + - name: RAY_gcs_server_request_timeout_seconds + value: "5" + ports: + - containerPort: 80 + name: client + lifecycle: + preStop: + exec: + command: ["/bin/sh","-c","ray stop"] + resources: + limits: + cpu: "2" + requests: + cpu: "2" + volumeMounts: + - name: script + mountPath: /tmp/testing/solution.py + subPath: solution.py + - mountPath: /tmp/ray/ + name: log-volume diff --git a/release/k8s_tests/run_gcs_ft_on_k8s.py b/release/k8s_tests/run_gcs_ft_on_k8s.py index 76fcaaf41..26cf3c99a 100644 --- a/release/k8s_tests/run_gcs_ft_on_k8s.py +++ b/release/k8s_tests/run_gcs_ft_on_k8s.py @@ -1,10 +1,371 @@ -from kubernetes import client, config +import subprocess +from kubernetes import client, config, watch +import requests +import random +import uuid +import pathlib +import time +import ray +import os + +# global variables for the cluster informations +cluster_id = str(uuid.uuid4()).split("-")[0] +ray_cluster_name = "cluster-" + cluster_id +ray_service_name = "service-" + cluster_id +locust_id = "ray-locust-" + cluster_id + + +if os.environ.get("RAY_IMAGE") is not None: + ray_image = os.environ.get("RAY_IMAGE") +elif ray.__version__ != "3.0.0.dev0": + ray_image = f"rayproject/ray:{ray.__version__}" +elif ray.__commit__ == "{{RAY_COMMIT_SHA}}": + ray_image = "rayproject/ray:nightly" +else: + ray_image = f"rayproject/ray:{ray.__commit__[:6]}" -# Configs can be set in Configuration class directly or using helper utility config.load_kube_config() +cli = client.CoreV1Api() -v1 = client.CoreV1Api() -print("Listing pods with their IPs:") -ret = v1.list_pod_for_all_namespaces(watch=False) -for i in ret.items: - print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name)) +yaml_path = pathlib.Path("/tmp/ray_v1alpha1_rayservice.yaml") + + +def check_kuberay_installed(): + # Make sure the ray namespace exists + KUBERAY_VERSION = "v0.3.0" + uri = ( + "github.com/ray-project/kuberay/manifests" + f"/base?ref={KUBERAY_VERSION}&timeout=90s" + ) + print( + subprocess.check_output( + [ + "kubectl", + "apply", + "-k", + uri, + ] + ).decode() + ) + pods = subprocess.check_output( + ["kubectl", "get", "pods", "--namespace", "ray-system", "--no-headers"] + ).decode() + assert pods.split("\n") != 0 + + +def start_rayservice(): + # step-1: generate the yaml file + print(f"Using ray image: {ray_image}") + solution = "\n".join( + [ + f" {line}" + for line in pathlib.Path("./solution.py").read_text().splitlines() + ] + ) + locustfile = "\n".join( + [ + f" {line}" + for line in pathlib.Path("./locustfile.py").read_text().splitlines() + ] + ) + template = ( + pathlib.Path("ray_v1alpha1_rayservice_template.yaml") + .read_text() + .format( + cluster_id=cluster_id, + ray_image=ray_image, + solution=solution, + locustfile=locustfile, + ) + ) + + print("=== YamlFile ===") + print(template) + tmp_yaml = pathlib.Path("/tmp/ray_v1alpha1_rayservice.yaml") + tmp_yaml.write_text(template) + + print("=== Get Pods from ray-system ===") + print( + subprocess.check_output( + ["kubectl", "get", "pods", "--namespace", "ray-system", "--no-headers"] + ).decode() + ) + + # step-2: create the cluter + print(f"Creating cluster with id: {cluster_id}") + print(subprocess.check_output(["kubectl", "create", "-f", str(tmp_yaml)]).decode()) + + # step-3: make sure the ray cluster is up + w = watch.Watch() + start_time = time.time() + head_pod_name = None + for event in w.stream( + func=cli.list_namespaced_pod, + namespace="default", + label_selector=f"rayCluster={ray_cluster_name},ray.io/node-type=head", + timeout_seconds=60, + ): + if event["object"].status.phase == "Running": + assert event["object"].kind == "Pod" + head_pod_name = event["object"].metadata.name + end_time = time.time() + print(f"{cluster_id} started in {end_time-start_time} sec") + print(f"head pod {head_pod_name}") + break + assert head_pod_name is not None + # step-4: e2e check it's alive + cmd = """ +import requests +print(requests.get('http://localhost:8000/?val=123').text) +""" + while True: + try: + resp = ( + subprocess.check_output( + f'kubectl exec {head_pod_name} -- python -c "{cmd}"', shell=True + ) + .decode() + .strip() + ) + if resp == "375": + print("Service is up now!") + break + else: + print(f"Failed with msg {resp}") + except Exception as e: + print("Error", e) + time.sleep(2) + + +def start_port_forward(): + proc = None + proc = subprocess.Popen( + [ + "kubectl", + "port-forward", + f"svc/{ray_service_name}-serve-svc", + "8000:8000", + "--address=0.0.0.0", + ] + ) + + while True: + try: + resp = requests.get( + "http://localhost:8000/", + timeout=1, + params={ + "val": 10, + }, + ) + if resp.status_code == 200: + print("The ray service is ready!!!") + break + except requests.exceptions.Timeout: + pass + except requests.exceptions.ConnectionError: + pass + + print("Waiting for the proxy to be alive") + time.sleep(1) + + return proc + + +def warmup_cluster(num_reqs): + for _ in range(num_reqs): + resp = requests.get( + "http://localhost:8000/", + timeout=1, + params={ + "val": 10, + }, + ) + assert resp.status_code == 200 + + +def start_sending_traffics(duration, users): + print("=== Install locust by helm ===") + yaml_config = ( + pathlib.Path("locust-run.yaml") + .read_text() + .format(users=users, cluster_id=cluster_id, duration=int(duration)) + ) + print("=== Locust YAML ===") + print(yaml_config) + + pathlib.Path("/tmp/locust-run-config.yaml").write_text(yaml_config) + helm_install_logs = subprocess.check_output( + [ + "helm", + "install", + locust_id, + "deliveryhero/locust", + "-f", + "/tmp/locust-run-config.yaml", + ] + ) + print(helm_install_logs) + + proc = subprocess.Popen( + [ + "kubectl", + "port-forward", + f"svc/ray-locust-{cluster_id}", + "8080:8089", + "--address=0.0.0.0", + ] + ) + return proc + + +def dump_pods_actors(pod_name): + print( + subprocess.run( + f"kubectl exec {pod_name} -- ps -ef | grep ::", + shell=True, + capture_output=True, + ).stdout.decode() + ) + + +def kill_header(): + pods = cli.list_namespaced_pod( + "default", + label_selector=f"rayCluster={ray_cluster_name},ray.io/node-type=head", + ) + if pods.items[0].status.phase == "Running": + print(f"Killing header {pods.items[0].metadata.name}") + dump_pods_actors(pods.items[0].metadata.name) + cli.delete_namespaced_pod(pods.items[0].metadata.name, "default") + + +def kill_worker(): + pods = cli.list_namespaced_pod( + "default", + label_selector=f"rayCluster={ray_cluster_name},ray.io/node-type=worker", + ) + alive_pods = [ + (p.status.start_time, p.metadata.name) + for p in pods.items + if p.status.phase == "Running" + ] + # sorted(alive_pods) + # We kill the oldest nodes for now given the memory leak in serve. + # to_be_killed = alive_pods[-1][1] + + to_be_killed = random.choice(alive_pods)[1] + print(f"Killing worker {to_be_killed}") + dump_pods_actors(pods.items[0].metadata.name) + cli.delete_namespaced_pod(to_be_killed, "default") + + +def start_killing_nodes(duration, kill_interval, kill_head_every_n): + """Kill the nodes in ray cluster. + + duration: How long does we run the test (seconds) + kill_interval: The interval between two kills (seconds) + kill_head_every_n: For every n kills, we kill a head node + """ + + for kill_idx in range(1, int(duration / kill_interval)): + while True: + try: + # kill + if kill_idx % kill_head_every_n == 0: + kill_header() + else: + kill_worker() + break + except Exception as e: + from time import sleep + + print(f"Fail to kill node, retry in 5 seconds: {e}") + sleep(5) + + time.sleep(kill_interval) + + +def get_stats(): + labels = [ + f"app.kubernetes.io/instance=ray-locust-{cluster_id}", + "app.kubernetes.io/name=locust,component=master", + ] + pods = cli.list_namespaced_pod("default", label_selector=",".join(labels)) + assert len(pods.items) == 1 + pod_name = pods.items[0].metadata.name + subprocess.check_output( + [ + "kubectl", + "cp", + f"{pod_name}:/home/locust/test_result_{cluster_id}_stats_history.csv", + "./stats_history.csv", + ] + ) + data = [] + with open("stats_history.csv") as f: + import csv + + reader = csv.reader(f) + for d in reader: + data.append(d) + # The first 5mins is for warming up + offset = 300 + start_time = int(data[offset][0]) + end_time = int(data[-1][0]) + # 17 is the index for total requests + # 18 is the index for total failed requests + total = float(data[-1][17]) - float(data[offset][17]) + failures = float(data[-1][18]) - float(data[offset][18]) + + # Available, through put + return (total - failures) / total, total / (end_time - start_time) + + +def main(): + procs = [] + try: + check_kuberay_installed() + start_rayservice() + procs.append(start_port_forward()) + warmup_cluster(200) + users = 60 + duration = 5 * 60 * 60 + procs.append(start_sending_traffics(duration * 1.1, users)) + start_killing_nodes(duration, 60, 6) + rate, qps = get_stats() + + print("Result:", rate, qps) + assert rate > 0.9995 + assert qps > users * 10 * 0.8 + + except Exception as e: + print("Experiment failed") + raise e + finally: + print("=== Cleanup ===") + + subprocess.run( + ["kubectl", "delete", "-f", str(yaml_path)], + capture_output=True, + ) + + subprocess.run( + ["helm", "uninstall", locust_id], + capture_output=True, + ) + + print("Kill processes") + for p in procs: + p.kill() + + +if __name__ == "__main__": + try: + # Connect to ray so that the auto suspense + # will not start. + ray.init("auto") + except Exception: + # It doesnt' matter if it failed. + pass + main() diff --git a/release/k8s_tests/solution.py b/release/k8s_tests/solution.py new file mode 100644 index 000000000..e323b1c22 --- /dev/null +++ b/release/k8s_tests/solution.py @@ -0,0 +1,50 @@ +from ray import serve +from ray.serve.drivers import DAGDriver +from ray.dag.input_node import InputNode + +""" +We are building a DAG like this: +A -> B ----> C + \-> D --/ + \-> E -/ +""" + + +@serve.deployment +def a(val: int): + return val + + +@serve.deployment +def b(val: int): + return val + 1 + + +@serve.deployment +def c(v1: int, v2: int, v3: int): + return sum([v1, v2, v3]) + + +@serve.deployment +def d(val): + return val + 2 + + +@serve.deployment +def e(val): + return val + 3 + + +with InputNode() as user_input: + oa = a.bind(user_input) + ob = b.bind(oa) + od = d.bind(oa) + oe = e.bind(oa) + oc = c.bind(ob, od, oe) + + +def input_adapter(val: int): + return val + + +serve_entrypoint = DAGDriver.bind(oc, http_adapter=input_adapter) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 87f1c0587..d7c38391b 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -4643,7 +4643,7 @@ cluster_compute: compute_tpl.yaml run: - timeout: 1800 + timeout: 28800 # 8h prepare: bash prepare.sh script: python run_gcs_ft_on_k8s.py type: sdk_command