[Serve] Serve Autoscaling Release tests (#21208)

This commit is contained in:
shrekris-anyscale 2022-01-21 12:08:25 -08:00 committed by GitHub
parent 2cd3045b16
commit 75b3080834
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 928 additions and 14 deletions

View file

@ -234,6 +234,7 @@
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TORCH_VERSION=1.6 ./ci/travis/install-dependencies.sh
- 'git clone https://github.com/wg/wrk.git /tmp/wrk && pushd /tmp/wrk && make -j && sudo cp wrk /usr/local/bin && popd'
- ./dashboard/tests/run_ui_tests.sh
- bazel test --config=ci $(./scripts/bazel_export_options) python/ray/dashboard/...
- bazel test --config=ci $(./scripts/bazel_export_options)
@ -342,6 +343,7 @@
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TORCH_VERSION=1.6 ./ci/travis/install-dependencies.sh
- 'git clone https://github.com/wg/wrk.git /tmp/wrk && pushd /tmp/wrk && make -j && sudo cp wrk /usr/local/bin && popd'
- ./dashboard/tests/run_ui_tests.sh
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_env=RAY_gcs_grpc_based_pubsub=1

View file

@ -273,7 +273,7 @@ py_test(
py_test(
name = "test_autoscaling_policy",
size = "small",
size = "medium",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],

View file

@ -112,6 +112,10 @@ class ServeController:
return self.deployment_state_manager._deployment_states[
deployment_name]._replicas
def _stop_one_running_replica_for_testing(self, deployment_name):
self.deployment_state_manager._deployment_states[
deployment_name]._stop_one_running_replica_for_testing()
async def wait_for_goal(self, goal_id: GoalId) -> Optional[Exception]:
return await self.goal_manager.wait_for_goal(goal_id)

View file

@ -1207,6 +1207,14 @@ class DeploymentState:
return status == GoalStatus.SUCCESSFULLY_DELETED
def _stop_one_running_replica_for_testing(self):
running_replicas = self._replicas.pop(states=[ReplicaState.RUNNING])
replica_to_stop = running_replicas.pop()
replica_to_stop.stop(graceful=False)
self._replicas.add(ReplicaState.STOPPING, replica_to_stop)
for replica in running_replicas:
self._replicas.add(ReplicaState.RUNNING, replica)
class DeploymentStateManager:
"""Manages all state for deployments in the system.

View file

@ -1,5 +1,11 @@
import sys
import time
import pytest
from unittest import mock
from typing import List, Iterable
from ray._private.test_utils import SignalActor, wait_for_condition
from ray.serve.autoscaling_policy import (BasicAutoscalingPolicy,
calculate_desired_num_replicas)
@ -84,15 +90,60 @@ class TestCalculateDesiredNumReplicas:
assert 5 <= desired_num_replicas <= 8 # 10 + 0.5 * (2.5 - 10) = 6.25
def get_num_running_replicas(controller: ServeController,
deployment: Deployment) -> int:
""" Get the amount of replicas currently running for given deployment """
def get_running_replicas(controller: ServeController,
deployment: Deployment) -> List:
""" Get the replicas currently running for given deployment """
replicas = ray.get(
controller._dump_replica_states_for_testing.remote(deployment.name))
running_replicas = replicas.get([ReplicaState.RUNNING])
return running_replicas
def get_running_replica_tags(controller: ServeController,
deployment: Deployment) -> List:
""" Get the replica tags of running replicas for given deployment """
running_replicas = get_running_replicas(controller, deployment)
return [replica.replica_tag for replica in running_replicas]
def get_num_running_replicas(controller: ServeController,
deployment: Deployment) -> int:
""" Get the amount of replicas currently running for given deployment """
running_replicas = get_running_replicas(controller, deployment)
return len(running_replicas)
def assert_no_replicas_deprovisioned(replica_tags_1: Iterable[str],
replica_tags_2: Iterable[str]) -> None:
"""
Checks whether any replica tags from replica_tags_1 are absent from
replica_tags_2. Assumes that this indicates replicas were de-provisioned.
replica_tags_1: Replica tags of running replicas at the first timestep
replica_tags_2: Replica tags of running replicas at the second timestep
"""
replica_tags_1, replica_tags_2 = set(replica_tags_1), set(replica_tags_2)
num_matching_replicas = len(replica_tags_1.intersection(replica_tags_2))
print(f"{num_matching_replicas} replica(s) stayed provisioned between "
f"both deployments. All {len(replica_tags_1)} replica(s) were "
f"expected to stay provisioned. "
f"{len(replica_tags_1) - num_matching_replicas} replica(s) were "
f"de-provisioned.")
assert len(replica_tags_1) == num_matching_replicas
def test_assert_no_replicas_deprovisioned():
replica_tags_1 = ["a", "b", "c"]
replica_tags_2 = ["a", "b", "c", "d", "e"]
assert_no_replicas_deprovisioned(replica_tags_1, replica_tags_2)
with pytest.raises(AssertionError):
assert_no_replicas_deprovisioned(replica_tags_2, replica_tags_1)
def get_deployment_start_time(controller: ServeController,
deployment: Deployment):
""" Return start time for given deployment """
@ -289,6 +340,384 @@ def test_replicas_delayed_startup():
assert new_num_replicas == 123
@pytest.mark.parametrize("delay_s", [30.0, 0.0])
def test_fluctuating_ongoing_requests(delay_s):
"""
Simulates a workload that switches between too many and too few
ongoing requests.
"""
config = AutoscalingConfig(
min_replicas=1,
max_replicas=10,
target_num_ongoing_requests_per_replica=50,
upscale_delay_s=delay_s,
downscale_delay_s=delay_s)
policy = BasicAutoscalingPolicy(config)
if delay_s > 0:
wait_periods = int(delay_s / CONTROL_LOOP_PERIOD_S)
assert wait_periods > 1
underload_requests, overload_requests = [20, 20], [100]
trials = 1000
new_num_replicas = None
for trial in range(trials):
if trial % 2 == 0:
new_num_replicas = policy.get_decision_num_replicas(
current_num_ongoing_requests=overload_requests,
curr_target_num_replicas=1)
if delay_s > 0:
assert new_num_replicas == 1, trial
else:
assert new_num_replicas == 2, trial
else:
new_num_replicas = policy.get_decision_num_replicas(
current_num_ongoing_requests=underload_requests,
curr_target_num_replicas=2)
if delay_s > 0:
assert new_num_replicas == 2, trial
else:
assert new_num_replicas == 1, trial
@pytest.mark.parametrize(
"ongoing_requests",
[[7, 1, 8, 4], [8, 1, 8, 4], [6, 1, 8, 4], [0, 1, 8, 4]])
def test_imbalanced_replicas(ongoing_requests):
config = AutoscalingConfig(
min_replicas=1,
max_replicas=10,
target_num_ongoing_requests_per_replica=5,
upscale_delay_s=0.0,
downscale_delay_s=0.0)
policy = BasicAutoscalingPolicy(config)
# Check that as long as the average number of ongoing requests equals
# the target_num_ongoing_requests_per_replica, the number of replicas
# stays the same
if (sum(ongoing_requests) / len(ongoing_requests) ==
config.target_num_ongoing_requests_per_replica):
new_num_replicas = policy.get_decision_num_replicas(
current_num_ongoing_requests=ongoing_requests,
curr_target_num_replicas=4)
assert new_num_replicas == 4
# Check downscaling behavior when average number of requests
# is lower than target_num_ongoing_requests_per_replica
elif (sum(ongoing_requests) / len(ongoing_requests) <
config.target_num_ongoing_requests_per_replica):
new_num_replicas = policy.get_decision_num_replicas(
current_num_ongoing_requests=ongoing_requests,
curr_target_num_replicas=4)
if config.target_num_ongoing_requests_per_replica - sum(
ongoing_requests) / len(ongoing_requests) <= 1:
# Autoscaling uses a ceiling operator, which means a slightly low
# current_num_ongoing_requests value is insufficient to downscale
assert new_num_replicas == 4
else:
assert new_num_replicas == 3
# Check upscaling behavior when average number of requests
# is higher than target_num_ongoing_requests_per_replica
else:
new_num_replicas = policy.get_decision_num_replicas(
current_num_ongoing_requests=ongoing_requests,
curr_target_num_replicas=4)
assert new_num_replicas == 5
@pytest.mark.parametrize("ongoing_requests",
[[20, 0, 0, 0], [100, 0, 0, 0], [10, 0, 0, 0]])
def test_single_replica_receives_all_requests(ongoing_requests):
target_requests = 5
config = AutoscalingConfig(
min_replicas=1,
max_replicas=50,
target_num_ongoing_requests_per_replica=target_requests,
upscale_delay_s=0.0,
downscale_delay_s=0.0)
policy = BasicAutoscalingPolicy(config)
new_num_replicas = policy.get_decision_num_replicas(
current_num_ongoing_requests=ongoing_requests,
curr_target_num_replicas=4)
assert new_num_replicas == sum(ongoing_requests) / target_requests
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_e2e_bursty(serve_instance):
"""
Sends 100 requests in bursts. Uses delays for smooth provisioning.
"""
signal = SignalActor.remote()
@serve.deployment(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 1,
"max_replicas": 2,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1")
class A:
def __call__(self):
ray.get(signal.wait.remote())
A.deploy()
controller = serve_instance._controller
start_time = get_deployment_start_time(controller, A)
handle = A.get_handle()
[handle.remote() for _ in range(100)]
wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2)
num_replicas = get_num_running_replicas(controller, A)
signal.send.remote()
# Execute a bursty workload that issues 100 requests every 0.05 seconds
# The SignalActor allows all requests in a burst to be queued before they
# are all executed, which increases the
# target_in_flight_requests_per_replica. Then the send method will bring
# it back to 0. This bursty behavior should be smoothed by the delay
# parameters.
for _ in range(5):
time.sleep(0.05)
assert get_num_running_replicas(controller, A) == num_replicas
[handle.remote() for _ in range(100)]
signal.send.remote()
# As the queue is drained, we should scale back down.
wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 1)
# Make sure start time did not change for the deployment
assert get_deployment_start_time(controller, A) == start_time
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_e2e_intermediate_downscaling(serve_instance):
"""
Scales up, then down, and up again.
"""
signal = SignalActor.remote()
@serve.deployment(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 1,
"max_replicas": 20,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1")
class A:
def __call__(self):
ray.get(signal.wait.remote())
A.deploy()
controller = serve_instance._controller
start_time = get_deployment_start_time(controller, A)
handle = A.get_handle()
[handle.remote() for _ in range(50)]
wait_for_condition(
lambda: get_num_running_replicas(controller, A) >= 20, timeout=30)
signal.send.remote()
wait_for_condition(
lambda: get_num_running_replicas(controller, A) <= 1, timeout=30)
signal.send.remote(clear=True)
[handle.remote() for _ in range(50)]
wait_for_condition(
lambda: get_num_running_replicas(controller, A) >= 20, timeout=30)
signal.send.remote()
# As the queue is drained, we should scale back down.
wait_for_condition(
lambda: get_num_running_replicas(controller, A) <= 1, timeout=30)
# Make sure start time did not change for the deployment
assert get_deployment_start_time(controller, A) == start_time
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
@pytest.mark.skip(reason="Currently failing with undefined behavior")
def test_e2e_update_autoscaling_deployment(serve_instance):
# See https://github.com/ray-project/ray/issues/21017 for details
signal = SignalActor.remote()
@serve.deployment(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 1,
"max_replicas": 10,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1")
class A:
def __call__(self):
ray.get(signal.wait.remote())
A.deploy()
print("Deployed A with min_replicas 1 and max_replicas 10.")
controller = serve_instance._controller
start_time = get_deployment_start_time(controller, A)
assert get_num_running_replicas(controller, A) == 1
handle = A.get_handle()
[handle.remote() for _ in range(400)]
print("Issued 400 requests.")
wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 10)
print("Scaled to 10 replicas.")
first_deployment_replicas = get_running_replica_tags(controller, A)
assert get_num_running_replicas(controller, A) < 20
[handle.remote() for _ in range(458)]
time.sleep(3)
print("Issued 458 requests. Request routing in-progress.")
A.options(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 2,
"max_replicas": 20,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2
},
version="v1").deploy()
print("Redeployed A.")
wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 20)
print("Scaled up to 20 requests.")
second_deployment_replicas = get_running_replica_tags(controller, A)
# Confirm that none of the original replicas were de-provisioned
assert_no_replicas_deprovisioned(first_deployment_replicas,
second_deployment_replicas)
signal.send.remote()
# As the queue is drained, we should scale back down.
wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 2)
assert get_num_running_replicas(controller, A) > 1
# Make sure start time did not change for the deployment
assert get_deployment_start_time(controller, A) == start_time
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_e2e_raise_min_replicas(serve_instance):
signal = SignalActor.remote()
@serve.deployment(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 1,
"max_replicas": 10,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1")
class A:
def __call__(self):
ray.get(signal.wait.remote())
A.deploy()
print("Deployed A.")
controller = serve_instance._controller
start_time = get_deployment_start_time(controller, A)
handle = A.get_handle()
[handle.remote() for _ in range(1)]
print("Issued one request.")
time.sleep(2)
assert get_num_running_replicas(controller, A) == 1
print("Stayed at 1 replica.")
first_deployment_replicas = get_running_replica_tags(controller, A)
A.options(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 2,
"max_replicas": 10,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2
},
_graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1").deploy()
print("Redeployed A with min_replicas set to 2.")
wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2)
time.sleep(5)
# Confirm that autoscaler doesn't scale above 2 even after waiting
assert get_num_running_replicas(controller, A) == 2
print("Autoscaled to 2 without issuing any new requests.")
second_deployment_replicas = get_running_replica_tags(controller, A)
# Confirm that none of the original replicas were de-provisioned
assert_no_replicas_deprovisioned(first_deployment_replicas,
second_deployment_replicas)
signal.send.remote()
time.sleep(1)
print("Completed request.")
# As the queue is drained, we should scale back down.
wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 2)
assert get_num_running_replicas(controller, A) > 1
print("Stayed at 2 replicas.")
# Make sure start time did not change for the deployment
assert get_deployment_start_time(controller, A) == start_time
if __name__ == "__main__":
import sys
import pytest

View file

@ -117,6 +117,8 @@ SERVE_NIGHTLY_TESTS = {
"~/ray/release/serve_tests/serve_tests.yaml": [
"single_deployment_1k_noop_replica",
"multi_deployment_1k_noop_replica",
"autoscaling_single_deployment",
"autoscaling_multi_deployment",
"serve_micro_benchmark",
"serve_micro_benchmark_k8s",
"serve_cluster_fault_tolerance",

View file

@ -19,3 +19,39 @@ py_test(
"//python/ray/serve:serve_lib",
],
)
py_test(
name = "autoscaling_single_deployment_smoke_test",
size = "medium",
srcs = test_srcs,
env = {
"IS_SMOKE_TEST": "1",
},
main = "autoscaling_single_deployment.py",
tags = [
"exclusive",
"team:serve",
],
deps = [
"//:ray_lib",
"//python/ray/serve:serve_lib",
],
)
py_test(
name = "autoscaling_multi_deployment_smoke_test",
size = "medium",
srcs = test_srcs,
env = {
"IS_SMOKE_TEST": "1",
},
main = "autoscaling_multi_deployment.py",
tags = [
"exclusive",
"team:serve",
],
deps = [
"//:ray_lib",
"//python/ray/serve:serve_lib",
],
)

View file

@ -0,0 +1,28 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2
# 1k max replicas (1000 / 8 = 125 containers needed)
max_workers: 130
head_node_type:
name: head_node
# 8 cpus, x86, 32G mem, 10Gb NIC, $0.384/hr on demand
instance_type: m5.2xlarge
worker_node_types:
- name: worker_node
# 8 cpus, x86, 32G mem, 10Gb NIC, $0.384/hr on demand
instance_type: m5.2xlarge
min_workers: 1
# 1k max replicas
max_workers: 130
use_spot: false
aws:
TagSpecifications:
- ResourceType: "instance"
Tags:
- Key: anyscale-user
Value: '{{env["ANYSCALE_USER"]}}'
- Key: anyscale-expiration
Value: '{{env["EXPIRATION_1D"]}}'

View file

@ -26,6 +26,32 @@
smoke_test:
timeout: 600
- name: autoscaling_single_deployment
cluster:
app_config: app_config.yaml
compute_template: compute_tpl_8_cpu_autoscaling.yaml
run:
timeout: 7200
long_running: False
script: python workloads/autoscaling_single_deployment.py
smoke_test:
timeout: 600
- name: autoscaling_multi_deployment
cluster:
app_config: app_config.yaml
compute_template: compute_tpl_8_cpu_autoscaling.yaml
run:
timeout: 7200
long_running: False
script: python workloads/autoscaling_multi_deployment.py
smoke_test:
timeout: 600
- name: serve_micro_benchmark
team: serve
cluster:

View file

@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Benchmark test for multi deployment with autoscaling at up to 1k no-op replica
scale.
1) Start with a single head node.
2) Start 10 deployments each with up to 100 no-op replicas
3) Launch wrk in each running node to simulate load balanced request
4) Recursively send queries to random deployments, up to depth=5
5) Run a 10-minute wrk trial on each node, aggregate results.
Report:
per_thread_latency_avg_ms
per_thread_latency_max_ms
per_thread_avg_tps
per_thread_max_tps
per_node_avg_tps
per_node_avg_transfer_per_sec_KB
cluster_total_thoughput
cluster_total_transfer_KB
cluster_max_P50_latency_ms
cluster_max_P75_latency_ms
cluster_max_P90_latency_ms
cluster_max_P99_latency_ms
"""
import click
import math
import os
import random
import ray
from ray import serve
from ray.serve.utils import logger
from serve_test_utils import (
aggregate_all_metrics,
run_wrk_on_all_nodes,
save_test_results,
)
from serve_test_cluster_utils import (
setup_local_single_node_cluster,
setup_anyscale_cluster,
warm_up_one_cluster,
NUM_CPU_PER_NODE,
NUM_CONNECTIONS,
)
from typing import Optional
# Experiment configs
DEFAULT_SMOKE_TEST_MIN_NUM_REPLICA = 1
DEFAULT_SMOKE_TEST_MAX_NUM_REPLICA = 8
DEFAULT_SMOKE_TEST_NUM_DEPLOYMENTS = 4 # 2 replicas each
# TODO:(jiaodong) We should investigate and change this back to 1k
# for now, we won't get valid latency numbers from wrk at 1k replica
# likely due to request timeout.
DEFAULT_FULL_TEST_MIN_NUM_REPLICA = 1
DEFAULT_FULL_TEST_MAX_NUM_REPLICA = 1000
# TODO(simon): we should change this back to 100. But due to long poll issue
# we temporarily downscoped this test.
# https://github.com/ray-project/ray/pull/20270
DEFAULT_FULL_TEST_NUM_DEPLOYMENTS = 10 # 100 replicas each
# Experiment configs - wrk specific
DEFAULT_SMOKE_TEST_TRIAL_LENGTH = "15s"
DEFAULT_FULL_TEST_TRIAL_LENGTH = "10m"
def setup_multi_deployment_replicas(min_replicas, max_replicas,
num_deployments):
max_replicas_per_deployment = max_replicas // num_deployments
all_deployment_names = [f"Echo_{i+1}" for i in range(num_deployments)]
@serve.deployment(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": min_replicas,
"max_replicas": max_replicas_per_deployment,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2
},
version="v1")
class Echo:
def __init__(self):
self.all_deployment_async_handles = []
def get_random_async_handle(self):
# sync get_handle() and expected to be called only a few times
# during deployment warmup so each deployment has reference to
# all other handles to send recursive inference call
if len(self.all_deployment_async_handles) < len(
all_deployment_names):
deployments = list(serve.list_deployments().values())
self.all_deployment_async_handles = [
deployment.get_handle(sync=False)
for deployment in deployments
]
return random.choice(self.all_deployment_async_handles)
async def handle_request(self, request, depth: int):
# Max recursive call depth reached
if depth > 4:
return "hi"
next_async_handle = self.get_random_async_handle()
obj_ref = await next_async_handle.handle_request.remote(
request, depth + 1)
return await obj_ref
async def __call__(self, request):
return await self.handle_request(request, 0)
for deployment in all_deployment_names:
Echo.options(name=deployment).deploy()
@click.command()
@click.option("--min-replicas", "-min", type=int)
@click.option("--max-replicas", "-max", type=int)
@click.option("--num-deployments", "-nd", type=int)
@click.option("--trial-length", "-tl", type=str)
def main(min_replicas: Optional[int], max_replicas: Optional[int],
num_deployments: Optional[int], trial_length: Optional[str]):
# Give default cluster parameter values based on smoke_test config
# if user provided values explicitly, use them instead.
# IS_SMOKE_TEST is set by args of releaser's e2e.py
smoke_test = os.environ.get("IS_SMOKE_TEST", "1")
if smoke_test == "1":
min_replicas = min_replicas or DEFAULT_SMOKE_TEST_MIN_NUM_REPLICA
max_replicas = max_replicas or DEFAULT_SMOKE_TEST_MAX_NUM_REPLICA
num_deployments = num_deployments or DEFAULT_SMOKE_TEST_NUM_DEPLOYMENTS
trial_length = trial_length or DEFAULT_SMOKE_TEST_TRIAL_LENGTH
logger.info(f"Running smoke test with min {min_replicas} and max "
f"{max_replicas} replicas, {num_deployments} deployments "
f".. \n")
# Choose cluster setup based on user config. Local test uses Cluster()
# to mock actors that requires # of nodes to be specified, but ray
# client doesn't need to
num_nodes = int(math.ceil(max_replicas / NUM_CPU_PER_NODE))
logger.info(
f"Setting up local ray cluster with {num_nodes} nodes .. \n")
serve_client = setup_local_single_node_cluster(num_nodes)[0]
else:
min_replicas = min_replicas or DEFAULT_FULL_TEST_MIN_NUM_REPLICA
max_replicas = max_replicas or DEFAULT_FULL_TEST_MAX_NUM_REPLICA
num_deployments = num_deployments or DEFAULT_FULL_TEST_NUM_DEPLOYMENTS
trial_length = trial_length or DEFAULT_FULL_TEST_TRIAL_LENGTH
logger.info(f"Running full test with min {min_replicas} and max "
f"{max_replicas} replicas, {num_deployments} deployments "
f".. \n")
logger.info("Setting up anyscale ray cluster .. \n")
serve_client = setup_anyscale_cluster()
http_host = str(serve_client._http_config.host)
http_port = str(serve_client._http_config.port)
logger.info(f"Ray serve http_host: {http_host}, http_port: {http_port}")
logger.info(f"Deploying with min {min_replicas} and max {max_replicas}"
f"target replicas ....\n")
setup_multi_deployment_replicas(min_replicas, max_replicas,
num_deployments)
logger.info("Warming up cluster ....\n")
endpoint_refs = []
all_endpoints = list(serve.list_deployments().keys())
for endpoint in all_endpoints:
endpoint_refs.append(
warm_up_one_cluster.options(num_cpus=0).remote(
10, http_host, http_port, endpoint))
for endpoint in ray.get(endpoint_refs):
logger.info(f"Finished warming up {endpoint}")
logger.info(f"Starting wrk trial on all nodes for {trial_length} ....\n")
# For detailed discussion, see https://github.com/wg/wrk/issues/205
# TODO:(jiaodong) What's the best number to use here ?
all_metrics, all_wrk_stdout = run_wrk_on_all_nodes(
trial_length,
NUM_CONNECTIONS,
http_host,
http_port,
all_endpoints=all_endpoints)
aggregated_metrics = aggregate_all_metrics(all_metrics)
logger.info("Wrk stdout on each node: ")
for wrk_stdout in all_wrk_stdout:
logger.info(wrk_stdout)
logger.info("Final aggregated metrics: ")
for key, val in aggregated_metrics.items():
logger.info(f"{key}: {val}")
save_test_results(
aggregated_metrics,
default_output_file="/tmp/autoscaling_multi_deployment.json")
if __name__ == "__main__":
main()
import pytest
import sys
sys.exit(pytest.main(["-v", "-s", __file__]))

View file

@ -0,0 +1,166 @@
#!/usr/bin/env python3
"""
Benchmark test for single deployment at 1k no-op replica scale with
autoscaling.
1) Start with a single head node.
2) Autoscale up to 1k no-op replicas over N nodes.
3) Launch wrk in each running node to simulate load balanced request
5) Run a 10-minute wrk trial on each node, aggregate results.
Report:
per_thread_latency_avg_ms
per_thread_latency_max_ms
per_thread_avg_tps
per_thread_max_tps
per_node_avg_tps
per_node_avg_transfer_per_sec_KB
cluster_total_thoughput
cluster_total_transfer_KB
cluster_max_P50_latency_ms
cluster_max_P75_latency_ms
cluster_max_P90_latency_ms
cluster_max_P99_latency_ms
"""
import click
import json
import math
import os
from ray import serve
from ray.serve.utils import logger
from serve_test_utils import (
aggregate_all_metrics,
run_wrk_on_all_nodes,
save_test_results,
)
from serve_test_cluster_utils import (
setup_local_single_node_cluster,
setup_anyscale_cluster,
warm_up_one_cluster,
NUM_CPU_PER_NODE,
NUM_CONNECTIONS,
)
from typing import Optional
# Experiment configs
DEFAULT_SMOKE_TEST_MIN_NUM_REPLICA = 1
DEFAULT_SMOKE_TEST_MAX_NUM_REPLICA = 4
DEFAULT_FULL_TEST_MIN_NUM_REPLICA = 1
DEFAULT_FULL_TEST_MAX_NUM_REPLICA = 1000
# Deployment configs
DEFAULT_MAX_BATCH_SIZE = 16
# Experiment configs - wrk specific
DEFAULT_SMOKE_TEST_TRIAL_LENGTH = "15s"
DEFAULT_FULL_TEST_TRIAL_LENGTH = "10m"
def deploy_replicas(min_replicas, max_replicas, max_batch_size):
@serve.deployment(
name="echo",
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": min_replicas,
"max_replicas": max_replicas,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2
},
version="v1")
class Echo:
@serve.batch(max_batch_size=max_batch_size)
async def handle_batch(self, requests):
return ["hi" for _ in range(len(requests))]
async def __call__(self, request):
return await self.handle_batch(request)
Echo.deploy()
def save_results(final_result, default_name):
test_output_json = os.environ.get(
"TEST_OUTPUT_JSON", "/tmp/single_deployment_1k_noop_replica.json")
with open(test_output_json, "wt") as f:
json.dump(final_result, f)
@click.command()
@click.option("--min-replicas", "-min", type=int)
@click.option("--max-replicas", "-max", type=int)
@click.option("--trial-length", "-tl", type=str)
@click.option("--max-batch-size", type=int, default=DEFAULT_MAX_BATCH_SIZE)
def main(min_replicas: Optional[int], max_replicas: Optional[int],
trial_length: Optional[str], max_batch_size: Optional[int]):
# Give default cluster parameter values based on smoke_test config
# if user provided values explicitly, use them instead.
# IS_SMOKE_TEST is set by args of releaser's e2e.py
smoke_test = os.environ.get("IS_SMOKE_TEST", "1")
if smoke_test == "1":
min_replicas = min_replicas or DEFAULT_SMOKE_TEST_MIN_NUM_REPLICA
max_replicas = max_replicas or DEFAULT_SMOKE_TEST_MAX_NUM_REPLICA
trial_length = trial_length or DEFAULT_SMOKE_TEST_TRIAL_LENGTH
logger.info(
f"Running local / smoke test with min {min_replicas} and max "
f"{max_replicas} replicas ..\n")
# Choose cluster setup based on user config. Local test uses Cluster()
# to mock actors that requires # of nodes to be specified, but ray
# client doesn't need to
num_nodes = int(math.ceil(max_replicas / NUM_CPU_PER_NODE))
logger.info(
f"Setting up local ray cluster with {num_nodes} nodes ..\n")
serve_client = setup_local_single_node_cluster(num_nodes)[0]
else:
min_replicas = min_replicas or DEFAULT_FULL_TEST_MIN_NUM_REPLICA
max_replicas = max_replicas or DEFAULT_FULL_TEST_MAX_NUM_REPLICA
trial_length = trial_length or DEFAULT_FULL_TEST_TRIAL_LENGTH
logger.info(f"Running full test with min {min_replicas} and max "
f"{max_replicas} replicas ..\n")
logger.info("Setting up anyscale ray cluster .. \n")
serve_client = setup_anyscale_cluster()
http_host = str(serve_client._http_config.host)
http_port = str(serve_client._http_config.port)
logger.info(f"Ray serve http_host: {http_host}, http_port: {http_port}")
logger.info(f"Deploying with min {min_replicas} and max {max_replicas} "
f"target replicas ....\n")
deploy_replicas(min_replicas, max_replicas, max_batch_size)
logger.info("Warming up cluster ....\n")
warm_up_one_cluster.remote(10, http_host, http_port, "echo")
logger.info(f"Starting wrk trial on all nodes for {trial_length} ....\n")
# For detailed discussion, see https://github.com/wg/wrk/issues/205
# TODO:(jiaodong) What's the best number to use here ?
all_endpoints = list(serve.list_deployments().keys())
all_metrics, all_wrk_stdout = run_wrk_on_all_nodes(
trial_length,
NUM_CONNECTIONS,
http_host,
http_port,
all_endpoints=all_endpoints)
aggregated_metrics = aggregate_all_metrics(all_metrics)
logger.info("Wrk stdout on each node: ")
for wrk_stdout in all_wrk_stdout:
logger.info(wrk_stdout)
logger.info("Final aggregated metrics: ")
for key, val in aggregated_metrics.items():
logger.info(f"{key}: {val}")
save_test_results(
aggregated_metrics,
default_output_file="/tmp/autoscaling_single_deployment.json")
if __name__ == "__main__":
main()
import pytest
import sys
sys.exit(pytest.main(["-v", "-s", __file__]))

View file

@ -128,7 +128,7 @@ def main(num_replicas: Optional[int], num_deployments: Optional[int],
num_nodes = int(math.ceil(num_replicas / NUM_CPU_PER_NODE))
logger.info(
f"Setting up local ray cluster with {num_nodes} nodes .. \n")
serve_client = setup_local_single_node_cluster(num_nodes)
serve_client = setup_local_single_node_cluster(num_nodes)[0]
else:
num_replicas = num_replicas or DEFAULT_FULL_TEST_NUM_REPLICA
num_deployments = num_deployments or DEFAULT_FULL_TEST_NUM_DEPLOYMENTS

View file

@ -63,14 +63,23 @@ def setup_anyscale_cluster(checkpoint_path: str = DEFAULT_CHECKPOINT_PATH):
@ray.remote
def warm_up_one_cluster(
num_warmup_iterations: int,
http_host: str,
http_port: str,
endpoint: str,
) -> None:
def warm_up_one_cluster(num_warmup_iterations: int,
http_host: str,
http_port: str,
endpoint: str,
nonblocking: bool = False) -> None:
# Specifying a low timeout effectively makes requests.get() nonblocking
timeout = 0.0001 if nonblocking else None
logger.info(f"Warming up {endpoint} ..")
for _ in range(num_warmup_iterations):
resp = requests.get(f"http://{http_host}:{http_port}/{endpoint}").text
logger.info(resp)
try:
resp = requests.get(
f"http://{http_host}:{http_port}/{endpoint}",
timeout=timeout).text
logger.info(resp)
except requests.exceptions.ReadTimeout:
# This exception only gets raised if a timeout is specified in the
# requests.get() call.
logger.info("Issued nonblocking HTTP request.")
return endpoint

View file

@ -99,7 +99,7 @@ def main(num_replicas: Optional[int], trial_length: Optional[str],
num_nodes = int(math.ceil(num_replicas / NUM_CPU_PER_NODE))
logger.info(
f"Setting up local ray cluster with {num_nodes} nodes ..\n")
serve_client = setup_local_single_node_cluster(num_nodes)
serve_client = setup_local_single_node_cluster(num_nodes)[0]
else:
num_replicas = num_replicas or DEFAULT_FULL_TEST_NUM_REPLICA
trial_length = trial_length or DEFAULT_FULL_TEST_TRIAL_LENGTH