diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 895a847cb..06bacdf2b 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -1016,6 +1016,11 @@ def deployment( Deployment """ + if num_replicas is not None \ + and _autoscaling_config is not None: + raise ValueError("Manually setting num_replicas is not allowed when " + "_autoscaling_config is provided.") + config = BackendConfig() if num_replicas is not None: config.num_replicas = num_replicas diff --git a/python/ray/serve/backend_state.py b/python/ray/serve/backend_state.py index fd9019ea8..fcdedc0b7 100644 --- a/python/ray/serve/backend_state.py +++ b/python/ray/serve/backend_state.py @@ -727,6 +727,7 @@ class BackendState: if backend_info is not None: self._target_info = backend_info self._target_replicas = backend_info.backend_config.num_replicas + self._target_version = BackendVersion( backend_info.version, user_config=backend_info.backend_config.user_config) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index fab281768..a8a51bcd4 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -5,6 +5,7 @@ from collections import defaultdict import os from typing import Dict, List, Optional, Tuple, Any from ray.serve.autoscaling_policy import BasicAutoscalingPolicy +from copy import copy import ray from ray.actor import ActorHandle @@ -169,17 +170,11 @@ class ServeController: current_num_ongoing_requests, len(running_replicas))) new_backend_config.num_replicas = decision_num_replicas - replica_config = backend_info.replica_config - deployer_job_id = backend_info.deployer_job_id - backend_config_proto_bytes = new_backend_config.to_proto_bytes() - goal_id, updating = self.deploy( - deployment_name, - backend_config_proto_bytes, - replica_config, - version=backend_info.version, - prev_version=backend_info.version, - route_prefix=route_prefix, - deployer_job_id=deployer_job_id) + new_backend_info = copy(backend_info) + new_backend_info.backend_config = new_backend_config + + goal_id, updating = self.backend_state_manager.deploy_backend( + deployment_name, new_backend_info) async def run_control_loop(self) -> None: while True: @@ -304,6 +299,9 @@ class ServeController: autoscaling_config = backend_config.autoscaling_config if autoscaling_config is not None: + # TODO: is this the desired behaviour? Should this be a setting? + backend_config.num_replicas = autoscaling_config.min_replicas + autoscaling_policy = BasicAutoscalingPolicy(autoscaling_config) else: autoscaling_policy = None @@ -320,8 +318,7 @@ class ServeController: autoscaling_policy=autoscaling_policy) # TODO(architkulkarni): When a deployment is redeployed, even if # the only change was num_replicas, the start_time_ms is refreshed. - # This is probably not the desired behavior for an autoscaling - # deployment, which redeploys very often to change num_replicas. + # Is this the desired behaviour? goal_id, updating = self.backend_state_manager.deploy_backend( name, backend_info) diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index daa0f04bf..9277d2ee7 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -2,12 +2,16 @@ import sys import time import pytest +from unittest import mock + from ray._private.test_utils import SignalActor, wait_for_condition from ray.serve.autoscaling_policy import (BasicAutoscalingPolicy, calculate_desired_num_replicas) from ray.serve.backend_state import ReplicaState from ray.serve.config import AutoscalingConfig from ray.serve.constants import CONTROL_LOOP_PERIOD_S +from ray.serve.controller import ServeController +from ray.serve.api import Deployment import ray from ray import serve @@ -84,6 +88,23 @@ class TestCalculateDesiredNumReplicas: assert 5 <= desired_num_replicas <= 8 # 10 + 0.5 * (2.5 - 10) = 6.25 +def get_num_running_replicas(controller: ServeController, + deployment: Deployment) -> int: + """ Get the amount of replicas currently running for given deployment """ + replicas = ray.get( + controller._dump_replica_states_for_testing.remote(deployment.name)) + running_replicas = replicas.get([ReplicaState.RUNNING]) + return len(running_replicas) + + +def get_deployment_start_time(controller: ServeController, + deployment: Deployment): + """ Return start time for given deployment """ + deployments = ray.get(controller.list_deployments.remote()) + backend_info, _route_prefix = deployments[deployment.name] + return backend_info.start_time_ms + + @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_e2e_basic_scale_up_down(serve_instance): """Send 100 requests and check that we autoscale up, and then back down.""" @@ -109,22 +130,46 @@ def test_e2e_basic_scale_up_down(serve_instance): ray.get(signal.wait.remote()) A.deploy() + + controller = serve_instance._controller + start_time = get_deployment_start_time(controller, A) + handle = A.get_handle() [handle.remote() for _ in range(100)] - controller = serve_instance._controller - - def get_num_running_replicas(): - replicas = ray.get( - controller._dump_replica_states_for_testing.remote("A")) - running_replicas = replicas.get([ReplicaState.RUNNING]) - return len(running_replicas) - - wait_for_condition(lambda: get_num_running_replicas() >= 2) + wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2) signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas() <= 1) + wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 1) + + # Make sure start time did not change for the deployment + assert get_deployment_start_time(controller, A) == start_time + + +@mock.patch.object(ServeController, "autoscale") +def test_initial_num_replicas(mock, serve_instance): + """ assert that the inital amount of replicas a deployment is launched with + respects the bounds set by autoscaling_config. + + For this test we mock out the autoscaling loop, make sure the number of + replicas is set correctly before we hit the autoscaling procedure. + """ + + @serve.deployment( + _autoscaling_config={ + "min_replicas": 2, + "max_replicas": 4, + }, + version="v1") + class A: + def __call__(self): + return "ok!" + + A.deploy() + + controller = serve_instance._controller + assert get_num_running_replicas(controller, A) == 2 class MockTimer: