From 7e6ea9e3df87fc3992f326b47da5d1f0557f6235 Mon Sep 17 00:00:00 2001 From: iasoon Date: Tue, 2 Nov 2021 23:08:52 +0100 Subject: [PATCH] [serve] split ReplicaStartupState.PENDING into PENDING_ALLOCATION and PENDING_INITIALIZATION (#19431) --- python/ray/serve/BUILD | 9 ++ python/ray/serve/backend_state.py | 108 +++++++++++++------ python/ray/serve/replica.py | 67 ++++++++---- python/ray/serve/tests/test_backend_state.py | 6 +- python/ray/serve/tests/test_cluster.py | 47 ++++++++ python/ray/serve/tests/test_warnings.py | 64 +++++++++++ 6 files changed, 245 insertions(+), 56 deletions(-) create mode 100644 python/ray/serve/tests/test_warnings.py diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index fdcac60eb..4bdfa3287 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -230,6 +230,15 @@ py_test( deps = [":serve_lib"], ) +py_test( + name = "test_warnings", + size = "small", + srcs = serve_tests_srcs, + tags = ["exclusive", "team:serve"], + deps = [":serve_lib"], +) + + py_test( name = "test_fastapi", size = "medium", diff --git a/python/ray/serve/backend_state.py b/python/ray/serve/backend_state.py index 7b00235f3..25fb40361 100644 --- a/python/ray/serve/backend_state.py +++ b/python/ray/serve/backend_state.py @@ -31,8 +31,8 @@ class ReplicaState(Enum): class ReplicaStartupStatus(Enum): - PENDING = 1 - PENDING_SLOW_START = 2 + PENDING_ALLOCATION = 1 + PENDING_INITIALIZATION = 2 SUCCEEDED = 3 FAILED = 4 @@ -74,8 +74,10 @@ class ActorReplicaWrapper: self._replica_tag = replica_tag self._backend_tag = backend_tag - # Populated in self.start(). + # Populated in either self.start() or self.recover() + self._allocated_obj_ref: ObjectRef = None self._ready_obj_ref: ObjectRef = None + self._actor_resources: Dict[str, float] = None self._max_concurrent_queries: int = None self._graceful_shutdown_timeout_s: float = 0.0 @@ -180,6 +182,7 @@ class ActorReplicaWrapper: backend_info.deployment_config.to_proto_bytes(), version, self._controller_name, self._detached) + self._allocated_obj_ref = self._actor_handle.is_allocated.remote() self._ready_obj_ref = self._actor_handle.reconfigure.remote( backend_info.deployment_config.user_config) @@ -210,6 +213,9 @@ class ActorReplicaWrapper: self._placement_group = self.get_placement_group( self._placement_group_name) + # Re-fetch initialization proof + self._allocated_obj_ref = self._actor_handle.is_allocated.remote() + # Running actor handle already has all info needed, thus successful # starting simply means retrieving replica version hash from actor self._ready_obj_ref = self._actor_handle.get_metadata.remote() @@ -222,7 +228,9 @@ class ActorReplicaWrapper: Returns: state (ReplicaStartupStatus): - PENDING: + PENDING_ALLOCATION: + - replica is waiting for a worker to start + PENDING_INITIALIZATION - replica reconfigure() haven't returned. FAILED: - replica __init__() failed. @@ -235,11 +243,18 @@ class ActorReplicaWrapper: version: - replica __init__() and reconfigure() succeeded. """ + + # check whether the replica has been allocated + ready, _ = ray.wait([self._allocated_obj_ref], timeout=0) + if len(ready) == 0: + return ReplicaStartupStatus.PENDING_ALLOCATION, None + + # check whether relica initialization has completed ready, _ = ray.wait([self._ready_obj_ref], timeout=0) # In case of deployment constructor failure, ray.get will help to # surface exception to each update() cycle. if len(ready) == 0: - return ReplicaStartupStatus.PENDING, None + return ReplicaStartupStatus.PENDING_INITIALIZATION, None elif len(ready) > 0: try: deployment_config, version = ray.get(ready)[0] @@ -406,11 +421,8 @@ class BackendReplica(VersionedReplica): """ status, version = self._actor.check_ready() - if status == ReplicaStartupStatus.PENDING: - if time.time() - self._start_time > SLOW_STARTUP_WARNING_S: - status = ReplicaStartupStatus.PENDING_SLOW_START - elif status == ReplicaStartupStatus.SUCCEEDED: - # Re-assign DeploymentVersion if start / update / recover succeeded + if status == ReplicaStartupStatus.SUCCEEDED: + # Re-assign BackendVersion if start / update / recover succeeded # by reading re-computed version in RayServeReplica if version is not None: self._version = version @@ -1003,10 +1015,9 @@ class BackendState: return GoalStatus.PENDING - def _check_startup_replicas(self, - original_state: ReplicaState, - stop_on_slow=False - ) -> Tuple[List[BackendReplica], bool]: + def _check_startup_replicas( + self, original_state: ReplicaState, stop_on_slow=False + ) -> Tuple[List[Tuple[BackendReplica, ReplicaStartupStatus]], bool]: """ Common helper function for startup actions tracking and status transition: STARTING, UPDATING and RECOVERING. @@ -1032,18 +1043,24 @@ class BackendState: replica.stop(graceful=False) self._replicas.add(ReplicaState.STOPPING, replica) - elif start_status == ReplicaStartupStatus.PENDING: - # Not done yet, remain at same state - self._replicas.add(original_state, replica) - else: - # Slow start, remain at same state but also add to - # slow start replicas. - if not stop_on_slow: - self._replicas.add(original_state, replica) - else: + elif start_status in [ + ReplicaStartupStatus.PENDING_ALLOCATION, + ReplicaStartupStatus.PENDING_INITIALIZATION, + ]: + + is_slow = time.time( + ) - replica._start_time > SLOW_STARTUP_WARNING_S + + if is_slow: + slow_replicas.append((replica, start_status)) + + # Does it make sense to stop replicas in PENDING_ALLOCATION + # state? + if is_slow and stop_on_slow: replica.stop(graceful=False) self._replicas.add(ReplicaState.STOPPING, replica) - slow_replicas.append(replica) + else: + self._replicas.add(original_state, replica) return slow_replicas, transitioned_to_running @@ -1086,17 +1103,38 @@ class BackendState: if (len(slow_start_replicas) and time.time() - self._prev_startup_warning > SLOW_STARTUP_WARNING_PERIOD_S): - required, available = slow_start_replicas[ - 0].resource_requirements() - logger.warning( - f"Deployment '{self._name}' has " - f"{len(slow_start_replicas)} replicas that have taken " - f"more than {SLOW_STARTUP_WARNING_S}s to start up. This " - "may be caused by waiting for the cluster to auto-scale, " - "waiting for a runtime environment to install, or a slow " - "constructor. Resources required " - f"for each replica: {required}, resources available: " - f"{available}. component=serve deployment={self._name}") + + pending_allocation = [] + pending_initialization = [] + + for replica, startup_status in slow_start_replicas: + if startup_status == ReplicaStartupStatus.PENDING_ALLOCATION: + pending_allocation.append(replica) + if startup_status \ + == ReplicaStartupStatus.PENDING_INITIALIZATION: + pending_initialization.append(replica) + + if len(pending_allocation) > 0: + required, available = slow_start_replicas[0][ + 0].resource_requirements() + logger.warning( + f"Deployment '{self._name}' has " + f"{len(pending_allocation)} replicas that have taken " + f"more than {SLOW_STARTUP_WARNING_S}s to be scheduled. " + f"This may be caused by waiting for the cluster to " + f"auto-scale, or waiting for a runtime environment " + f"to install. " + f"Resources required for each replica: {required}, " + f"resources available: {available}. " + f"component=serve deployment={self._name}") + + if len(pending_initialization) > 0: + logger.warning( + f"Deployment '{self._name}' has " + f"{len(pending_initialization)} replicas that have taken " + f"more than {SLOW_STARTUP_WARNING_S}s to initialize. This " + f"may be caused by a slow __init__ or reconfigure method." + f"component=serve deployment={self._name}") self._prev_startup_warning = time.time() diff --git a/python/ray/serve/replica.py b/python/ray/serve/replica.py index 45db31610..61160d969 100644 --- a/python/ray/serve/replica.py +++ b/python/ray/serve/replica.py @@ -65,30 +65,48 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes): replica_tag, controller_name, servable_object=None) - if is_function: - _callable = backend - else: - # This allows backends to define an async __init__ method - # (required for FastAPI backend definition). - _callable = backend.__new__(backend) - await sync_to_async(_callable.__init__)(*init_args, - **init_kwargs) - # Setting the context again to update the servable_object. - ray.serve.api._set_internal_replica_context( - backend_tag, - replica_tag, - controller_name, - servable_object=_callable) assert controller_name, "Must provide a valid controller_name" + controller_namespace = ray.serve.api._get_controller_namespace( detached) controller_handle = ray.get_actor( controller_name, namespace=controller_namespace) - self.backend = RayServeReplica( - _callable, backend_tag, replica_tag, deployment_config, - deployment_config.user_config, version, is_function, - controller_handle) + + # This closure initializes user code and finalizes replica + # startup. By splitting the initialization step like this, + # we can already access this actor before the user code + # has finished initializing. + # The supervising state manager can then wait + # for allocation of this replica by using the `is_allocated` + # method. After that, it calls `reconfigure` to trigger + # user code initialization. + async def initialize_backend(): + if is_function: + _callable = backend + else: + # This allows backends to define an async __init__ method + # (required for FastAPI backend definition). + _callable = backend.__new__(backend) + await sync_to_async(_callable.__init__)(*init_args, + **init_kwargs) + # Setting the context again to update the servable_object. + ray.serve.api._set_internal_replica_context( + backend_tag, + replica_tag, + controller_name, + servable_object=_callable) + + self.backend = RayServeReplica( + _callable, backend_tag, replica_tag, deployment_config, + deployment_config.user_config, version, is_function, + controller_handle) + + # Is it fine that backend is None here? + # Should we add a check in all methods that use self.backend + # or, alternatively, create an async get_backend() method? + self.backend = None + self._initialize_backend = initialize_backend # asyncio.Event used to signal that the replica is shutting down. self.shutdown_event = asyncio.Event() @@ -108,8 +126,21 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes): query = Query(request_args, request_kwargs, request_metadata) return await self.backend.handle_request(query) + async def is_allocated(self): + """poke the replica to check whether it's alive. + + When calling this method on an ActorHandle, it will complete as + soon as the actor has started running. We use this mechanism to + detect when a replica has been allocated a worker slot. + At this time, the replica can transition from PENDING_ALLOCATION + to PENDING_INITIALIZATION startup state. + """ + pass + async def reconfigure(self, user_config: Optional[Any] = None ) -> Tuple[DeploymentConfig, DeploymentVersion]: + if self.backend is None: + await self._initialize_backend() if user_config is not None: await self.backend.reconfigure(user_config) diff --git a/python/ray/serve/tests/test_backend_state.py b/python/ray/serve/tests/test_backend_state.py index 687382945..82afdf06b 100644 --- a/python/ray/serve/tests/test_backend_state.py +++ b/python/ray/serve/tests/test_backend_state.py @@ -42,8 +42,8 @@ class MockReplicaActorWrapper: self.recovering = False # Will be set when `start()` is called. self.version = None - # Expected to be set in the test. - self.ready = False + # Initial state for a replica is PENDING_ALLOCATION. + self.ready = ReplicaStartupStatus.PENDING_ALLOCATION # Will be set when `graceful_stop()` is called. self.stopped = False # Expected to be set in the test. @@ -106,7 +106,7 @@ class MockReplicaActorWrapper: def check_ready(self) -> ReplicaStartupStatus: ready = self.ready - self.ready = ReplicaStartupStatus.PENDING + self.ready = ReplicaStartupStatus.PENDING_INITIALIZATION if ready == ReplicaStartupStatus.SUCCEEDED and self.recovering: self.recovering = False self.started = True diff --git a/python/ray/serve/tests/test_cluster.py b/python/ray/serve/tests/test_cluster.py index 48290f75c..1ab633a39 100644 --- a/python/ray/serve/tests/test_cluster.py +++ b/python/ray/serve/tests/test_cluster.py @@ -9,6 +9,9 @@ import ray from ray import serve from ray.cluster_utils import Cluster +from ray.serve.backend_state import ReplicaStartupStatus, ReplicaState +from ray._private.test_utils import SignalActor, wait_for_condition + @pytest.fixture def ray_cluster(): @@ -113,5 +116,49 @@ def test_node_failure(ray_cluster): assert pids3.issubset(pids4) +@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.") +def test_replica_startup_status_transitions(ray_cluster): + cluster = ray_cluster + cluster.add_node(num_cpus=1) + cluster.connect(namespace="serve") + serve_instance = serve.start() + + signal = SignalActor.remote() + + @serve.deployment(version="1", ray_actor_options={"num_cpus": 2}) + class D: + def __init__(self): + ray.get(signal.wait.remote()) + + D.deploy(_blocking=False) + + def get_replicas(replica_state): + controller = serve_instance._controller + replicas = ray.get( + controller._dump_replica_states_for_testing.remote(D.name)) + return replicas.get([replica_state]) + + # wait for serve to start the replica, and catch a reference to it. + wait_for_condition(lambda: len(get_replicas(ReplicaState.STARTING)) > 0) + replica = get_replicas(ReplicaState.STARTING)[0] + + # declare shorthands as yapf doesn't like long lambdas + PENDING_ALLOCATION = ReplicaStartupStatus.PENDING_ALLOCATION + PENDING_INITIALIZATION = ReplicaStartupStatus.PENDING_INITIALIZATION + SUCCEEDED = ReplicaStartupStatus.SUCCEEDED + + # currently there are no resources to allocate the replica + assert replica.check_started() == PENDING_ALLOCATION + + # add the necessary resources to allocate the replica + cluster.add_node(num_cpus=4) + wait_for_condition( + lambda: (replica.check_started() == PENDING_INITIALIZATION)) + + # send signal to complete replica intialization + signal.send.remote() + wait_for_condition(lambda: replica.check_started() == SUCCEEDED) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_warnings.py b/python/ray/serve/tests/test_warnings.py new file mode 100644 index 000000000..26b63c4b0 --- /dev/null +++ b/python/ray/serve/tests/test_warnings.py @@ -0,0 +1,64 @@ +import time + +from ray import serve +from ray.serve.backend_state import (SLOW_STARTUP_WARNING_S, + SLOW_STARTUP_WARNING_PERIOD_S) + + +def test_slow_allocation_warning(serve_instance, capsys): + # this deployment can never be scheduled + @serve.deployment(ray_actor_options={"num_cpus": 99999}) + class D: + def __init__(self): + pass + + num_replicas = 2 + D.options(num_replicas=num_replicas).deploy(_blocking=False) + + expected_warning = (f"Deployment '{D.name}' has " + f"{num_replicas} replicas that have taken " + f"more than {SLOW_STARTUP_WARNING_S}s " + f"to be scheduled.") + + # wait long enough for the warning to be printed + # with a small grace period + time.sleep(SLOW_STARTUP_WARNING_PERIOD_S * 1.5) + + captured = capsys.readouterr() + + print(captured.err) + + assert expected_warning in captured.err + + # make sure that exactly one warning was printed + # for this deployment + assert captured.err.count(expected_warning) == 1 + + +def test_slow_initialization_warning(serve_instance, capsys): + # this deployment will take a while to allocate + + @serve.deployment + class D: + def __init__(self): + time.sleep(99999) + + num_replicas = 4 + D.options(num_replicas=num_replicas).deploy(_blocking=False) + + expected_warning = (f"Deployment '{D.name}' has " + f"{num_replicas} replicas that have taken " + f"more than {SLOW_STARTUP_WARNING_S}s " + f"to initialize.") + + # wait long enough for the warning to be printed + # with a small grace period + time.sleep(SLOW_STARTUP_WARNING_PERIOD_S * 1.5) + + captured = capsys.readouterr() + + assert expected_warning in captured.err + + # make sure that exactly one warning was printed + # for this deployment + assert captured.err.count(expected_warning) == 1