[serve] Fix ray serve shutdown to properly go through controller (#16524)

This commit is contained in:
Jiao 2021-06-18 15:18:04 -07:00 committed by GitHub
parent 3ba1cb851e
commit 39cc81c633
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 88 additions and 9 deletions

View file

@ -135,7 +135,9 @@ class Client:
instance.
"""
if (not self._shutdown) and ray.is_initialized():
ray.get(self._controller.shutdown.remote())
for goal_id in ray.get(self._controller.shutdown.remote()):
self._wait_for_goal(goal_id)
ray.kill(self._controller, no_restart=True)
# Wait for the named actor entry gets removed as well.

View file

@ -319,7 +319,7 @@ class BackendReplica(VersionedReplica):
self._actor.cleanup()
return True
timeout_passed = time.time() > self._shutdown_deadline
timeout_passed = time.time() >= self._shutdown_deadline
if timeout_passed:
# Graceful period passed, kill it forcefully.
@ -513,13 +513,33 @@ class BackendState:
self._notify_backend_configs_changed()
self._notify_replica_handles_changed()
def shutdown(self) -> None:
for replica_dict in self.get_running_replica_handles().values():
for replica in replica_dict.values():
ray.kill(replica, no_restart=True)
def shutdown(self) -> List[GoalId]:
"""
Shutdown all running replicas by notifying the controller, and leave
it to the controller event loop to take actions afterwards.
Once shutdown signal is received, it will also prevent any new
deployments or replicas from being created.
One can send multiple shutdown signals but won't effectively make any
difference compare to calling it once.
"""
shutdown_goals = []
for backend_tag, _ in self._replicas.items():
goal = self.delete_backend(backend_tag, force_kill=True)
if goal is not None:
shutdown_goals.append(goal)
# TODO(jiaodong): This might not be 100% safe since we deleted
# everything without ensuring all shutdown goals are completed
# yet. Need to address in follow-up PRs.
self._kv_store.delete(CHECKPOINT_KEY)
# TODO(jiaodong): Need to add some logic to prevent new replicas
# from being created once shutdown signal is sent.
return shutdown_goals
def _checkpoint(self) -> None:
self._kv_store.put(
CHECKPOINT_KEY,
@ -572,6 +592,15 @@ class BackendState:
def _set_backend_goal(self, backend_tag: BackendTag,
backend_info: Optional[BackendInfo]) -> None:
"""
Set desirable state for a given backend, identified by tag.
Args:
backend_tag (BackendTag): Identifier of a backend
backend_info (Optional[BackendInfo]): Contains backend and
replica config, if passed in as None, we're marking
target backend as shutting down.
"""
existing_goal_id = self._backend_goals.get(backend_tag)
new_goal_id = self._goal_manager.create_goal()

View file

@ -264,13 +264,15 @@ class ServeController:
"""Return the HTTP proxy configuration."""
return self.http_state.get_config()
async def shutdown(self) -> None:
async def shutdown(self) -> List[GoalId]:
"""Shuts down the serve instance completely."""
async with self.write_lock:
self.backend_state.shutdown()
goal_ids = self.backend_state.shutdown()
self.endpoint_state.shutdown()
self.http_state.shutdown()
return goal_ids
async def deploy(
self, name: str, backend_config: BackendConfig,
replica_config: ReplicaConfig, python_methods: List[str],

View file

@ -394,7 +394,7 @@ def test_create_delete_single_replica(mock_backend_state):
backend_state.update()
assert len(backend_state._replicas) == 0
assert goal_manager.check_complete(delete_goal)
replica._actor.cleaned_up
assert replica._actor.cleaned_up
def test_force_kill(mock_backend_state):
@ -1423,6 +1423,52 @@ def test_health_check(mock_backend_state):
check_counts(backend_state, total=2, by_state=[(ReplicaState.RUNNING, 2)])
def test_shutdown(mock_backend_state):
"""
Test that shutdown waits for all backends to be deleted and the backends
are force-killed without a grace period.
"""
backend_state, timer, goal_manager = mock_backend_state
b_info_1 = backend_info()
create_goal, updating = backend_state.deploy_backend(TEST_TAG, b_info_1)
# Single replica should be created.
backend_state.update()
check_counts(backend_state, total=1, by_state=[(ReplicaState.STARTING, 1)])
backend_state._replicas[TEST_TAG].get()[0]._actor.set_ready()
# Now the replica should be marked running.
backend_state.update()
check_counts(backend_state, total=1, by_state=[(ReplicaState.RUNNING, 1)])
# Test shutdown flow
assert not backend_state._replicas[TEST_TAG].get()[0]._actor.stopped
shutdown_goal = backend_state.shutdown()[0]
backend_state.update()
check_counts(backend_state, total=1, by_state=[(ReplicaState.STOPPING, 1)])
assert backend_state._replicas[TEST_TAG].get()[0]._actor.stopped
assert backend_state._replicas[TEST_TAG].get()[
0]._actor.force_stopped_counter == 1
assert not backend_state._replicas[TEST_TAG].get()[0]._actor.cleaned_up
assert not goal_manager.check_complete(shutdown_goal)
# Once it's done stopping, replica should be removed.
replica = backend_state._replicas[TEST_TAG].get()[0]
replica._actor.set_done_stopping()
backend_state.update()
check_counts(backend_state, total=0)
# TODO(edoakes): can we remove this extra update period for completing it?
backend_state.update()
assert len(backend_state._replicas) == 0
assert goal_manager.check_complete(shutdown_goal)
assert replica._actor.cleaned_up
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", "-s", __file__]))