From f6be6dbcdca20d8d5edeaed21b3880669d929e42 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Fri, 14 May 2021 15:12:32 -0500 Subject: [PATCH] [Serve] batch slow warning for multiple replicas (#15798) --- python/ray/serve/backend_state.py | 83 +++++++++++++++++++------------ 1 file changed, 51 insertions(+), 32 deletions(-) diff --git a/python/ray/serve/backend_state.py b/python/ray/serve/backend_state.py index b656f7b84..9d3fb6e4f 100644 --- a/python/ray/serve/backend_state.py +++ b/python/ray/serve/backend_state.py @@ -114,20 +114,9 @@ class ActorReplicaWrapper: ready, _ = ray.wait([self._startup_obj_ref], timeout=0) return len(ready) == 1 - def resource_requirements( - self) -> Tuple[Dict[str, float], Dict[str, float]]: - """Returns required and currently available resources. - - Only resources with nonzero requirements will be included in the - required dict and only resources in the required dict will be - included in the available dict (filtered for relevance). - """ - required = {k: v for k, v in self._actor_resources.items() if v > 0} - available = { - k: v - for k, v in ray.available_resources().items() if k in required - } - return required, available + @property + def actor_resources(self) -> Dict[str, float]: + return self._actor_resources def graceful_stop(self) -> None: """Request the actor to exit gracefully.""" @@ -254,35 +243,25 @@ class BackendReplica(VersionedReplica): self._prev_slow_startup_warning_time = time.time() self._state = ReplicaState.STARTING - def check_started(self) -> bool: + def check_started(self) -> Tuple[bool, bool]: """Check if the replica has started. If so, transition to RUNNING. Should handle the case where the replica has already stopped. + + Returns: + (ready, past_slow_startup_threshold) """ if self._state == ReplicaState.RUNNING: - return True + return True, False assert self._state == ReplicaState.STARTING, ( f"State must be {ReplicaState.STARTING}, *not* {self._state}") if self._actor.check_ready(): self._state = ReplicaState.RUNNING - return True + return True, False time_since_start = time.time() - self._start_time - if (time_since_start > SLOW_STARTUP_WARNING_S - and time.time() - self._prev_slow_startup_warning_time > - SLOW_STARTUP_WARNING_PERIOD_S): - required, available = self._actor.resource_requirements() - logger.warning( - f"Replica '{self._replica_tag}' for backend " - f"'{self._backend_tag}' has taken more than " - f"{time_since_start:.0f}s to start up. This may be " - "caused by waiting for the cluster to auto-scale or " - "because the backend constructor is slow. Resources required: " - f"{required}, resources available: {available}.") - self._prev_slow_startup_warning_time = time.time() - - return False + return False, time_since_start > SLOW_STARTUP_WARNING_S def set_should_stop(self, graceful_shutdown_timeout_s: Duration) -> None: """Mark the replica to be stopped in the future. @@ -348,6 +327,24 @@ class BackendReplica(VersionedReplica): """ return self._actor.check_health() + def resource_requirements( + self) -> Tuple[Dict[str, float], Dict[str, float]]: + """Returns required and currently available resources. + + Only resources with nonzero requirements will be included in the + required dict and only resources in the required dict will be + included in the available dict (filtered for relevance). + """ + required = { + k: v + for k, v in self._actor.actor_resources.items() if v > 0 + } + available = { + k: v + for k, v in ray.available_resources().items() if k in required + } + return required, available + class ReplicaStateContainer: """Container for mapping ReplicaStates to lists of BackendReplicas.""" @@ -490,6 +487,8 @@ class BackendState: self._target_replicas: Dict[BackendTag, int] = defaultdict(int) self._backend_goals: Dict[BackendTag, GoalId] = dict() self._target_versions: Dict[BackendTag, str] = dict() + self._prev_startup_warnings: Dict[BackendTag, float] = defaultdict( + float) checkpoint = self._kv_store.get(CHECKPOINT_KEY) if checkpoint is not None: @@ -867,14 +866,34 @@ class BackendState: replica.stop() replicas.add(ReplicaState.STOPPING, replica) + slow_start_replicas = [] for replica in replicas.pop(states=[ReplicaState.STARTING]): - if replica.check_started(): + ready, slow_start = replica.check_started() + if ready: # This replica should be now be added to handle's replica # set. replicas.add(ReplicaState.RUNNING, replica) transitioned_backend_tags.add(backend_tag) else: replicas.add(ReplicaState.STARTING, replica) + if slow_start: + slow_start_replicas.append(replica) + + if (len(slow_start_replicas) + and time.time() - self._prev_startup_warnings[backend_tag] + > SLOW_STARTUP_WARNING_PERIOD_S): + required, available = slow_start_replicas[ + 0].resource_requirements() + logger.warning( + f"Backend '{backend_tag}' has {len(slow_start_replicas)} " + "replicas that have taken more than " + f"{SLOW_STARTUP_WARNING_S}s to start up. This may be " + "caused by waiting for the cluster to auto-scale or " + "because the constructor is slow. Resources required " + f"for each replica: {required}, resources available: " + f"{available}.") + + self._prev_startup_warnings[backend_tag] = time.time() for replica in replicas.pop(states=[ReplicaState.STOPPING]): if not replica.check_stopped():