mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[serve] Remove unnecessary STOPPED ReplicaState (#16783)
This commit is contained in:
parent
0841b6d2de
commit
691f0a1dd1
1 changed files with 7 additions and 24 deletions
|
@ -30,7 +30,6 @@ class ReplicaState(Enum):
|
|||
RUNNING = 3
|
||||
SHOULD_STOP = 4
|
||||
STOPPING = 5
|
||||
STOPPED = 6
|
||||
|
||||
|
||||
ALL_REPLICA_STATES = list(ReplicaState)
|
||||
|
@ -57,7 +56,6 @@ class ActorReplicaWrapper:
|
|||
|
||||
self._startup_obj_ref = None
|
||||
self._drain_obj_ref = None
|
||||
self._stopped = False
|
||||
self._actor_resources = None
|
||||
self._health_check_ref = None
|
||||
|
||||
|
@ -126,32 +124,24 @@ class ActorReplicaWrapper:
|
|||
|
||||
def graceful_stop(self) -> None:
|
||||
"""Request the actor to exit gracefully."""
|
||||
# NOTE: the replicas may already be stopped if we failed
|
||||
# after stopping them but before writing a checkpoint.
|
||||
if self._stopped:
|
||||
return
|
||||
|
||||
try:
|
||||
handle = ray.get_actor(self._actor_name)
|
||||
self._drain_obj_ref = handle.drain_pending_queries.remote()
|
||||
except ValueError:
|
||||
self._stopped = True
|
||||
pass
|
||||
|
||||
def check_stopped(self) -> bool:
|
||||
"""Check if the actor has exited."""
|
||||
if self._stopped:
|
||||
return True
|
||||
|
||||
try:
|
||||
handle = ray.get_actor(self._actor_name)
|
||||
ready, _ = ray.wait([self._drain_obj_ref], timeout=0)
|
||||
self._stopped = len(ready) == 1
|
||||
if self._stopped:
|
||||
stopped = len(ready) == 1
|
||||
if stopped:
|
||||
ray.kill(handle, no_restart=True)
|
||||
except ValueError:
|
||||
self._stopped = True
|
||||
stopped = True
|
||||
|
||||
return self._stopped
|
||||
return stopped
|
||||
|
||||
def check_health(self) -> bool:
|
||||
"""Check if the actor is healthy."""
|
||||
|
@ -303,18 +293,11 @@ class BackendReplica(VersionedReplica):
|
|||
) + self._graceful_shutdown_timeout_s
|
||||
|
||||
def check_stopped(self) -> bool:
|
||||
"""Check if the replica has stopped. If so, transition to STOPPED.
|
||||
|
||||
Should handle the case where the replica has already stopped.
|
||||
"""
|
||||
if self._state == ReplicaState.STOPPED:
|
||||
return True
|
||||
"""Check if the replica has finished stopping."""
|
||||
assert self._state == ReplicaState.STOPPING, (
|
||||
f"State must be {ReplicaState.STOPPING}, *not* {self._state}")
|
||||
|
||||
stopped = self._actor.check_stopped()
|
||||
if stopped:
|
||||
self._state = ReplicaState.STOPPED
|
||||
if self._actor.check_stopped():
|
||||
# Clean up any associated resources (e.g., placement group).
|
||||
self._actor.cleanup()
|
||||
return True
|
||||
|
|
Loading…
Add table
Reference in a new issue