[Serve] batch slow warning for multiple replicas (#15798)

This commit is contained in:
Edward Oakes 2021-05-14 15:12:32 -05:00 committed by GitHub
parent 00c913cbc6
commit f6be6dbcdc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -114,20 +114,9 @@ class ActorReplicaWrapper:
ready, _ = ray.wait([self._startup_obj_ref], timeout=0)
return len(ready) == 1
def resource_requirements(
self) -> Tuple[Dict[str, float], Dict[str, float]]:
"""Returns required and currently available resources.
Only resources with nonzero requirements will be included in the
required dict and only resources in the required dict will be
included in the available dict (filtered for relevance).
"""
required = {k: v for k, v in self._actor_resources.items() if v > 0}
available = {
k: v
for k, v in ray.available_resources().items() if k in required
}
return required, available
@property
def actor_resources(self) -> Dict[str, float]:
return self._actor_resources
def graceful_stop(self) -> None:
"""Request the actor to exit gracefully."""
@ -254,35 +243,25 @@ class BackendReplica(VersionedReplica):
self._prev_slow_startup_warning_time = time.time()
self._state = ReplicaState.STARTING
def check_started(self) -> bool:
def check_started(self) -> Tuple[bool, bool]:
"""Check if the replica has started. If so, transition to RUNNING.
Should handle the case where the replica has already stopped.
Returns:
(ready, past_slow_startup_threshold)
"""
if self._state == ReplicaState.RUNNING:
return True
return True, False
assert self._state == ReplicaState.STARTING, (
f"State must be {ReplicaState.STARTING}, *not* {self._state}")
if self._actor.check_ready():
self._state = ReplicaState.RUNNING
return True
return True, False
time_since_start = time.time() - self._start_time
if (time_since_start > SLOW_STARTUP_WARNING_S
and time.time() - self._prev_slow_startup_warning_time >
SLOW_STARTUP_WARNING_PERIOD_S):
required, available = self._actor.resource_requirements()
logger.warning(
f"Replica '{self._replica_tag}' for backend "
f"'{self._backend_tag}' has taken more than "
f"{time_since_start:.0f}s to start up. This may be "
"caused by waiting for the cluster to auto-scale or "
"because the backend constructor is slow. Resources required: "
f"{required}, resources available: {available}.")
self._prev_slow_startup_warning_time = time.time()
return False
return False, time_since_start > SLOW_STARTUP_WARNING_S
def set_should_stop(self, graceful_shutdown_timeout_s: Duration) -> None:
"""Mark the replica to be stopped in the future.
@ -348,6 +327,24 @@ class BackendReplica(VersionedReplica):
"""
return self._actor.check_health()
def resource_requirements(
self) -> Tuple[Dict[str, float], Dict[str, float]]:
"""Returns required and currently available resources.
Only resources with nonzero requirements will be included in the
required dict and only resources in the required dict will be
included in the available dict (filtered for relevance).
"""
required = {
k: v
for k, v in self._actor.actor_resources.items() if v > 0
}
available = {
k: v
for k, v in ray.available_resources().items() if k in required
}
return required, available
class ReplicaStateContainer:
"""Container for mapping ReplicaStates to lists of BackendReplicas."""
@ -490,6 +487,8 @@ class BackendState:
self._target_replicas: Dict[BackendTag, int] = defaultdict(int)
self._backend_goals: Dict[BackendTag, GoalId] = dict()
self._target_versions: Dict[BackendTag, str] = dict()
self._prev_startup_warnings: Dict[BackendTag, float] = defaultdict(
float)
checkpoint = self._kv_store.get(CHECKPOINT_KEY)
if checkpoint is not None:
@ -867,14 +866,34 @@ class BackendState:
replica.stop()
replicas.add(ReplicaState.STOPPING, replica)
slow_start_replicas = []
for replica in replicas.pop(states=[ReplicaState.STARTING]):
if replica.check_started():
ready, slow_start = replica.check_started()
if ready:
# This replica should be now be added to handle's replica
# set.
replicas.add(ReplicaState.RUNNING, replica)
transitioned_backend_tags.add(backend_tag)
else:
replicas.add(ReplicaState.STARTING, replica)
if slow_start:
slow_start_replicas.append(replica)
if (len(slow_start_replicas)
and time.time() - self._prev_startup_warnings[backend_tag]
> SLOW_STARTUP_WARNING_PERIOD_S):
required, available = slow_start_replicas[
0].resource_requirements()
logger.warning(
f"Backend '{backend_tag}' has {len(slow_start_replicas)} "
"replicas that have taken more than "
f"{SLOW_STARTUP_WARNING_S}s to start up. This may be "
"caused by waiting for the cluster to auto-scale or "
"because the constructor is slow. Resources required "
f"for each replica: {required}, resources available: "
f"{available}.")
self._prev_startup_warnings[backend_tag] = time.time()
for replica in replicas.pop(states=[ReplicaState.STOPPING]):
if not replica.check_stopped():