mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[serve] Correctly set num_replicas when deploying autoscaling deployment (#19520)
This commit is contained in:
parent
c937950910
commit
b5158ca0ab
4 changed files with 71 additions and 23 deletions
|
@ -1016,6 +1016,11 @@ def deployment(
|
|||
Deployment
|
||||
"""
|
||||
|
||||
if num_replicas is not None \
|
||||
and _autoscaling_config is not None:
|
||||
raise ValueError("Manually setting num_replicas is not allowed when "
|
||||
"_autoscaling_config is provided.")
|
||||
|
||||
config = BackendConfig()
|
||||
if num_replicas is not None:
|
||||
config.num_replicas = num_replicas
|
||||
|
|
|
@ -727,6 +727,7 @@ class BackendState:
|
|||
if backend_info is not None:
|
||||
self._target_info = backend_info
|
||||
self._target_replicas = backend_info.backend_config.num_replicas
|
||||
|
||||
self._target_version = BackendVersion(
|
||||
backend_info.version,
|
||||
user_config=backend_info.backend_config.user_config)
|
||||
|
|
|
@ -5,6 +5,7 @@ from collections import defaultdict
|
|||
import os
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
from ray.serve.autoscaling_policy import BasicAutoscalingPolicy
|
||||
from copy import copy
|
||||
|
||||
import ray
|
||||
from ray.actor import ActorHandle
|
||||
|
@ -169,17 +170,11 @@ class ServeController:
|
|||
current_num_ongoing_requests, len(running_replicas)))
|
||||
new_backend_config.num_replicas = decision_num_replicas
|
||||
|
||||
replica_config = backend_info.replica_config
|
||||
deployer_job_id = backend_info.deployer_job_id
|
||||
backend_config_proto_bytes = new_backend_config.to_proto_bytes()
|
||||
goal_id, updating = self.deploy(
|
||||
deployment_name,
|
||||
backend_config_proto_bytes,
|
||||
replica_config,
|
||||
version=backend_info.version,
|
||||
prev_version=backend_info.version,
|
||||
route_prefix=route_prefix,
|
||||
deployer_job_id=deployer_job_id)
|
||||
new_backend_info = copy(backend_info)
|
||||
new_backend_info.backend_config = new_backend_config
|
||||
|
||||
goal_id, updating = self.backend_state_manager.deploy_backend(
|
||||
deployment_name, new_backend_info)
|
||||
|
||||
async def run_control_loop(self) -> None:
|
||||
while True:
|
||||
|
@ -304,6 +299,9 @@ class ServeController:
|
|||
|
||||
autoscaling_config = backend_config.autoscaling_config
|
||||
if autoscaling_config is not None:
|
||||
# TODO: is this the desired behaviour? Should this be a setting?
|
||||
backend_config.num_replicas = autoscaling_config.min_replicas
|
||||
|
||||
autoscaling_policy = BasicAutoscalingPolicy(autoscaling_config)
|
||||
else:
|
||||
autoscaling_policy = None
|
||||
|
@ -320,8 +318,7 @@ class ServeController:
|
|||
autoscaling_policy=autoscaling_policy)
|
||||
# TODO(architkulkarni): When a deployment is redeployed, even if
|
||||
# the only change was num_replicas, the start_time_ms is refreshed.
|
||||
# This is probably not the desired behavior for an autoscaling
|
||||
# deployment, which redeploys very often to change num_replicas.
|
||||
# Is this the desired behaviour?
|
||||
|
||||
goal_id, updating = self.backend_state_manager.deploy_backend(
|
||||
name, backend_info)
|
||||
|
|
|
@ -2,12 +2,16 @@ import sys
|
|||
import time
|
||||
|
||||
import pytest
|
||||
from unittest import mock
|
||||
|
||||
from ray._private.test_utils import SignalActor, wait_for_condition
|
||||
from ray.serve.autoscaling_policy import (BasicAutoscalingPolicy,
|
||||
calculate_desired_num_replicas)
|
||||
from ray.serve.backend_state import ReplicaState
|
||||
from ray.serve.config import AutoscalingConfig
|
||||
from ray.serve.constants import CONTROL_LOOP_PERIOD_S
|
||||
from ray.serve.controller import ServeController
|
||||
from ray.serve.api import Deployment
|
||||
|
||||
import ray
|
||||
from ray import serve
|
||||
|
@ -84,6 +88,23 @@ class TestCalculateDesiredNumReplicas:
|
|||
assert 5 <= desired_num_replicas <= 8 # 10 + 0.5 * (2.5 - 10) = 6.25
|
||||
|
||||
|
||||
def get_num_running_replicas(controller: ServeController,
|
||||
deployment: Deployment) -> int:
|
||||
""" Get the amount of replicas currently running for given deployment """
|
||||
replicas = ray.get(
|
||||
controller._dump_replica_states_for_testing.remote(deployment.name))
|
||||
running_replicas = replicas.get([ReplicaState.RUNNING])
|
||||
return len(running_replicas)
|
||||
|
||||
|
||||
def get_deployment_start_time(controller: ServeController,
|
||||
deployment: Deployment):
|
||||
""" Return start time for given deployment """
|
||||
deployments = ray.get(controller.list_deployments.remote())
|
||||
backend_info, _route_prefix = deployments[deployment.name]
|
||||
return backend_info.start_time_ms
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
|
||||
def test_e2e_basic_scale_up_down(serve_instance):
|
||||
"""Send 100 requests and check that we autoscale up, and then back down."""
|
||||
|
@ -109,22 +130,46 @@ def test_e2e_basic_scale_up_down(serve_instance):
|
|||
ray.get(signal.wait.remote())
|
||||
|
||||
A.deploy()
|
||||
|
||||
controller = serve_instance._controller
|
||||
start_time = get_deployment_start_time(controller, A)
|
||||
|
||||
handle = A.get_handle()
|
||||
[handle.remote() for _ in range(100)]
|
||||
|
||||
controller = serve_instance._controller
|
||||
|
||||
def get_num_running_replicas():
|
||||
replicas = ray.get(
|
||||
controller._dump_replica_states_for_testing.remote("A"))
|
||||
running_replicas = replicas.get([ReplicaState.RUNNING])
|
||||
return len(running_replicas)
|
||||
|
||||
wait_for_condition(lambda: get_num_running_replicas() >= 2)
|
||||
wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2)
|
||||
signal.send.remote()
|
||||
|
||||
# As the queue is drained, we should scale back down.
|
||||
wait_for_condition(lambda: get_num_running_replicas() <= 1)
|
||||
wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 1)
|
||||
|
||||
# Make sure start time did not change for the deployment
|
||||
assert get_deployment_start_time(controller, A) == start_time
|
||||
|
||||
|
||||
@mock.patch.object(ServeController, "autoscale")
|
||||
def test_initial_num_replicas(mock, serve_instance):
|
||||
""" assert that the inital amount of replicas a deployment is launched with
|
||||
respects the bounds set by autoscaling_config.
|
||||
|
||||
For this test we mock out the autoscaling loop, make sure the number of
|
||||
replicas is set correctly before we hit the autoscaling procedure.
|
||||
"""
|
||||
|
||||
@serve.deployment(
|
||||
_autoscaling_config={
|
||||
"min_replicas": 2,
|
||||
"max_replicas": 4,
|
||||
},
|
||||
version="v1")
|
||||
class A:
|
||||
def __call__(self):
|
||||
return "ok!"
|
||||
|
||||
A.deploy()
|
||||
|
||||
controller = serve_instance._controller
|
||||
assert get_num_running_replicas(controller, A) == 2
|
||||
|
||||
|
||||
class MockTimer:
|
||||
|
|
Loading…
Add table
Reference in a new issue