mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
135 lines
3.8 KiB
Python
135 lines
3.8 KiB
Python
import json
|
|
import os
|
|
import re
|
|
import time
|
|
import subprocess
|
|
from subprocess import PIPE
|
|
|
|
import requests
|
|
|
|
import ray
|
|
from ray import serve
|
|
from ray.cluster_utils import Cluster
|
|
|
|
# Global variables / constants appear only right after imports.
|
|
# Ray serve deployment setup constants
|
|
NUM_REPLICAS = 7
|
|
MAX_BATCH_SIZE = 16
|
|
|
|
# Cluster setup constants
|
|
NUM_REDIS_SHARDS = 1
|
|
REDIS_MAX_MEMORY = 10**8
|
|
OBJECT_STORE_MEMORY = 10**8
|
|
NUM_NODES = 4
|
|
|
|
# wrk setup constants (might want to make these configurable ?)
|
|
NUM_CONNECTIONS = int(NUM_REPLICAS * MAX_BATCH_SIZE * 0.75)
|
|
NUM_THREADS = 2
|
|
# Append and print every 5mins for quick status polling as well
|
|
# as time series plotting
|
|
TIME_PER_CYCLE = "5m"
|
|
|
|
|
|
def update_progress(result):
|
|
"""
|
|
Write test result json to /tmp/, which will be read from
|
|
anyscale product runs in each releaser test
|
|
"""
|
|
result["last_update"] = time.time()
|
|
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
|
|
"/tmp/release_test_output.json")
|
|
with open(test_output_json, "wt") as f:
|
|
json.dump(result, f)
|
|
|
|
|
|
cluster = Cluster()
|
|
for i in range(NUM_NODES):
|
|
cluster.add_node(
|
|
redis_port=6379 if i == 0 else None,
|
|
num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None,
|
|
num_cpus=8,
|
|
num_gpus=0,
|
|
resources={str(i): 2},
|
|
object_store_memory=OBJECT_STORE_MEMORY,
|
|
redis_max_memory=REDIS_MAX_MEMORY,
|
|
dashboard_host="0.0.0.0",
|
|
)
|
|
|
|
ray.init(address=cluster.address, dashboard_host="0.0.0.0")
|
|
serve.start()
|
|
|
|
|
|
@serve.deployment(name="echo", num_replicas=NUM_REPLICAS)
|
|
class Echo:
|
|
@serve.batch(max_batch_size=MAX_BATCH_SIZE)
|
|
async def handle_batch(self, requests):
|
|
time.sleep(0.01)
|
|
return ["hi" for _ in range(len(requests))]
|
|
|
|
async def __call__(self, request):
|
|
return await self.handle_batch(request)
|
|
|
|
|
|
Echo.deploy()
|
|
|
|
print("Warming up")
|
|
for _ in range(5):
|
|
resp = requests.get("http://127.0.0.1:8000/echo").text
|
|
print(resp)
|
|
time.sleep(0.5)
|
|
|
|
print("Started load testing with the following config: ")
|
|
print(f"num_connections: {NUM_CONNECTIONS}")
|
|
print(f"num_threads: {NUM_THREADS}")
|
|
print(f"time_per_cycle: {TIME_PER_CYCLE}")
|
|
|
|
while True:
|
|
proc = subprocess.Popen(
|
|
[
|
|
"wrk",
|
|
"-c",
|
|
str(NUM_CONNECTIONS),
|
|
"-t",
|
|
str(NUM_THREADS),
|
|
"-d",
|
|
TIME_PER_CYCLE,
|
|
"http://127.0.0.1:8000/echo",
|
|
],
|
|
stdout=PIPE,
|
|
stderr=PIPE,
|
|
)
|
|
proc.wait()
|
|
out, err = proc.communicate()
|
|
|
|
# Sample wrk stdout:
|
|
#
|
|
# Running 10s test @ http://127.0.0.1:8000/echo
|
|
# 2 threads and 84 connections
|
|
# Thread Stats Avg Stdev Max +/- Stdev
|
|
# Latency 59.33ms 13.51ms 113.83ms 64.20%
|
|
# Req/Sec 709.16 61.73 848.00 78.50%
|
|
# 14133 requests in 10.02s, 2.08MB read
|
|
# Requests/sec: 1410.71
|
|
# Transfer/sec: 212.16KB
|
|
metrics_dict = {}
|
|
for line in out.decode().splitlines():
|
|
parsed = re.split(r"\s+", line.strip())
|
|
if parsed[0] == "Latency":
|
|
metrics_dict["latency_avg"] = parsed[1]
|
|
metrics_dict["latency_stdev"] = parsed[2]
|
|
metrics_dict["latency_max"] = parsed[3]
|
|
metrics_dict["latency_+/-_stdev"] = parsed[4]
|
|
elif parsed[0] == "Req/Sec":
|
|
metrics_dict["req/sec_avg"] = parsed[1]
|
|
metrics_dict["req/sec_stdev"] = parsed[2]
|
|
metrics_dict["req/sec_max"] = parsed[3]
|
|
metrics_dict["req/sec_+/-_stdev"] = parsed[4]
|
|
elif parsed[0] == "Requests/sec:":
|
|
metrics_dict["requests/sec"] = parsed[1]
|
|
elif parsed[0] == "Transfer/sec:":
|
|
metrics_dict["transfer/sec"] = parsed[1]
|
|
|
|
print(out.decode())
|
|
print(err.decode())
|
|
|
|
update_progress(metrics_dict)
|