mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00
[serve] Add tests for backend_state versioning (#14748)
This commit is contained in:
parent
971855a353
commit
de598149d1
1 changed files with 224 additions and 102 deletions
|
@ -16,70 +16,61 @@ from ray.serve.common import (
|
|||
from ray.serve.backend_state import BackendState, ReplicaState
|
||||
|
||||
|
||||
def mock_replica_factory(mock_replicas):
|
||||
class MockReplicaActorWrapper:
|
||||
def __init__(self, actor_name: str, detached: bool,
|
||||
controller_name: str, replica_tag: ReplicaTag,
|
||||
backend_tag: BackendTag):
|
||||
self._actor_name = actor_name
|
||||
self._replica_tag = replica_tag
|
||||
self._backend_tag = backend_tag
|
||||
self._state = ReplicaState.SHOULD_START
|
||||
class MockReplicaActorWrapper:
|
||||
def __init__(self, actor_name: str, detached: bool, controller_name: str,
|
||||
replica_tag: ReplicaTag, backend_tag: BackendTag):
|
||||
self._actor_name = actor_name
|
||||
self._replica_tag = replica_tag
|
||||
self._backend_tag = backend_tag
|
||||
self._state = ReplicaState.SHOULD_START
|
||||
|
||||
mock_replicas.append(self)
|
||||
# Will be set when `start()` is called.
|
||||
self.started = False
|
||||
# Expected to be set in the test.
|
||||
self.ready = False
|
||||
# Will be set when `graceful_stop()` is called.
|
||||
self.stopped = False
|
||||
# Expected to be set in the test.
|
||||
self.done_stopping = False
|
||||
# Will be set when `force_stop()` is called.
|
||||
self.force_stopped_counter = 0
|
||||
# Will be cleaned up when `cleanup()` is called.
|
||||
self.cleaned_up = False
|
||||
|
||||
# Will be set when `start()` is called.
|
||||
self.started = False
|
||||
# Expected to be set in the test.
|
||||
self.ready = False
|
||||
# Will be set when `graceful_stop()` is called.
|
||||
self.stopped = False
|
||||
# Expected to be set in the test.
|
||||
self.done_stopping = False
|
||||
# Will be set when `force_stop()` is called.
|
||||
self.force_stopped_counter = 0
|
||||
# Will be cleaned up when `cleanup()` is called.
|
||||
self.cleaned_up = False
|
||||
@property
|
||||
def actor_handle(self) -> ActorHandle:
|
||||
return None
|
||||
|
||||
def __del__(self):
|
||||
mock_replicas.remove(self)
|
||||
def set_ready(self):
|
||||
self.ready = True
|
||||
|
||||
@property
|
||||
def actor_handle(self) -> ActorHandle:
|
||||
return None
|
||||
def set_done_stopping(self):
|
||||
self.done_stopping = True
|
||||
|
||||
def set_ready(self):
|
||||
self.ready = True
|
||||
def start(self, backend_info: BackendInfo):
|
||||
self.started = True
|
||||
|
||||
def set_done_stopping(self):
|
||||
self.done_stopping = True
|
||||
def check_ready(self) -> bool:
|
||||
assert self.started
|
||||
return self.ready
|
||||
|
||||
def start(self, backend_info: BackendInfo):
|
||||
self.started = True
|
||||
def resource_requirements(
|
||||
self) -> Tuple[Dict[str, float], Dict[str, float]]:
|
||||
assert self.started
|
||||
return {"REQUIRED_RESOURCE": 1.0}, {"AVAILABLE_RESOURCE": 1.0}
|
||||
|
||||
def check_ready(self) -> bool:
|
||||
assert self.started
|
||||
return self.ready
|
||||
def graceful_stop(self) -> None:
|
||||
assert self.started
|
||||
self.stopped = True
|
||||
|
||||
def resource_requirements(
|
||||
self) -> Tuple[Dict[str, float], Dict[str, float]]:
|
||||
assert self.started
|
||||
return {"REQUIRED_RESOURCE": 1.0}, {"AVAILABLE_RESOURCE": 1.0}
|
||||
def check_stopped(self) -> bool:
|
||||
return self.done_stopping
|
||||
|
||||
def graceful_stop(self) -> None:
|
||||
assert self.started
|
||||
self.stopped = True
|
||||
def force_stop(self):
|
||||
self.force_stopped_counter += 1
|
||||
|
||||
def check_stopped(self) -> bool:
|
||||
return self.done_stopping
|
||||
|
||||
def force_stop(self):
|
||||
self.force_stopped_counter += 1
|
||||
|
||||
def cleanup(self):
|
||||
self.cleaned_up = True
|
||||
|
||||
return MockReplicaActorWrapper
|
||||
def cleanup(self):
|
||||
self.cleaned_up = True
|
||||
|
||||
|
||||
def generate_configs(num_replicas: Optional[int] = 1
|
||||
|
@ -103,10 +94,9 @@ class MockTimer:
|
|||
@pytest.fixture
|
||||
def mock_backend_state() -> Tuple[BackendState, Mock, Mock]:
|
||||
timer = MockTimer()
|
||||
mock_replicas = []
|
||||
with patch(
|
||||
"ray.serve.backend_state.ActorReplicaWrapper",
|
||||
new=mock_replica_factory(mock_replicas)), patch(
|
||||
new=MockReplicaActorWrapper), patch(
|
||||
"time.time",
|
||||
new=timer.time), patch("ray.serve.kv_store.RayInternalKVStore"
|
||||
) as mock_kv_store, patch(
|
||||
|
@ -119,22 +109,22 @@ def mock_backend_state() -> Tuple[BackendState, Mock, Mock]:
|
|||
backend_state = BackendState("name", True, mock_kv_store,
|
||||
mock_long_poll, goal_manager)
|
||||
mock_checkpoint.return_value = None
|
||||
yield backend_state, timer, mock_replicas, goal_manager
|
||||
yield backend_state, timer, goal_manager
|
||||
|
||||
|
||||
def replica_count(backend_state, backend=None, states=None):
|
||||
total = 0
|
||||
def replicas(backend_state, backend=None, states=None):
|
||||
replicas = []
|
||||
for backend_tag, state_dict in backend_state._replicas.items():
|
||||
if backend is None or backend_tag == backend:
|
||||
for state, replica_list in state_dict.items():
|
||||
if states is None or state in states:
|
||||
total += len(replica_list)
|
||||
replicas.extend(replica_list)
|
||||
|
||||
return total
|
||||
return replicas
|
||||
|
||||
|
||||
def test_override_goals(mock_backend_state):
|
||||
backend_state, _, _, goal_manager = mock_backend_state
|
||||
backend_state, _, goal_manager = mock_backend_state
|
||||
|
||||
b_config_1, r_config_1 = generate_configs()
|
||||
initial_goal = backend_state.create_backend("tag1", b_config_1, r_config_1)
|
||||
|
@ -147,7 +137,7 @@ def test_override_goals(mock_backend_state):
|
|||
|
||||
|
||||
def test_return_existing_goal(mock_backend_state):
|
||||
backend_state, _, _, goal_manager = mock_backend_state
|
||||
backend_state, _, goal_manager = mock_backend_state
|
||||
|
||||
b_config_1, r_config_1 = generate_configs()
|
||||
initial_goal = backend_state.create_backend("tag1", b_config_1, r_config_1)
|
||||
|
@ -159,30 +149,30 @@ def test_return_existing_goal(mock_backend_state):
|
|||
|
||||
|
||||
def test_create_delete_single_replica(mock_backend_state):
|
||||
backend_state, timer, mock_replicas, goal_manager = mock_backend_state
|
||||
backend_state, timer, goal_manager = mock_backend_state
|
||||
|
||||
assert replica_count(backend_state) == 0
|
||||
assert len(replicas(backend_state)) == 0
|
||||
|
||||
b_config_1, r_config_1 = generate_configs()
|
||||
create_goal = backend_state.create_backend("tag1", b_config_1, r_config_1)
|
||||
|
||||
# Single replica should be created.
|
||||
backend_state.update()
|
||||
assert replica_count(backend_state) == 1
|
||||
assert replica_count(backend_state, states=[ReplicaState.STARTING]) == 1
|
||||
assert mock_replicas[0].started
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
|
||||
assert replicas(backend_state)[0]._actor.started
|
||||
|
||||
# update() should not transition the state if the replica isn't ready.
|
||||
backend_state.update()
|
||||
assert replica_count(backend_state) == 1
|
||||
assert replica_count(backend_state, states=[ReplicaState.STARTING]) == 1
|
||||
mock_replicas[0].set_ready()
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
|
||||
replicas(backend_state)[0]._actor.set_ready()
|
||||
assert not goal_manager.check_complete(create_goal)
|
||||
|
||||
# Now the replica should be marked running.
|
||||
backend_state.update()
|
||||
assert replica_count(backend_state) == 1
|
||||
assert replica_count(backend_state, states=[ReplicaState.RUNNING]) == 1
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
|
||||
|
||||
# TODO(edoakes): can we remove this extra update period for completing it?
|
||||
backend_state.update()
|
||||
|
@ -191,28 +181,28 @@ def test_create_delete_single_replica(mock_backend_state):
|
|||
# Removing the replica should transition it to stopping.
|
||||
delete_goal = backend_state.delete_backend("tag1")
|
||||
backend_state.update()
|
||||
assert replica_count(backend_state) == 1
|
||||
assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1
|
||||
assert mock_replicas[0].stopped
|
||||
assert not mock_replicas[0].cleaned_up
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
|
||||
assert replicas(backend_state)[0]._actor.stopped
|
||||
assert not replicas(backend_state)[0]._actor.cleaned_up
|
||||
assert not goal_manager.check_complete(delete_goal)
|
||||
|
||||
# Once it's done stopping, replica should be removed.
|
||||
mock_replicas[0].set_done_stopping()
|
||||
replica = mock_replicas[0]
|
||||
replica = replicas(backend_state)[0]
|
||||
replica._actor.set_done_stopping()
|
||||
backend_state.update()
|
||||
assert replica.cleaned_up
|
||||
assert replica_count(backend_state) == 0
|
||||
assert len(replicas(backend_state)) == 0
|
||||
|
||||
# TODO(edoakes): can we remove this extra update period for completing it?
|
||||
backend_state.update()
|
||||
assert goal_manager.check_complete(delete_goal)
|
||||
replica._actor.cleaned_up
|
||||
|
||||
|
||||
def test_force_kill(mock_backend_state):
|
||||
backend_state, timer, mock_replicas, goal_manager = mock_backend_state
|
||||
backend_state, timer, goal_manager = mock_backend_state
|
||||
|
||||
assert replica_count(backend_state) == 0
|
||||
assert len(replicas(backend_state)) == 0
|
||||
|
||||
grace_period_s = 10
|
||||
b_config_1, r_config_1 = generate_configs()
|
||||
|
@ -221,52 +211,184 @@ def test_force_kill(mock_backend_state):
|
|||
# Create and delete the backend.
|
||||
backend_state.create_backend("tag1", b_config_1, r_config_1)
|
||||
backend_state.update()
|
||||
mock_replicas[0].set_ready()
|
||||
replicas(backend_state)[0]._actor.set_ready()
|
||||
backend_state.update()
|
||||
delete_goal = backend_state.delete_backend("tag1")
|
||||
backend_state.update()
|
||||
|
||||
# Replica should remain in STOPPING until it finishes.
|
||||
assert replica_count(backend_state) == 1
|
||||
assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1
|
||||
assert mock_replicas[0].stopped
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
|
||||
assert replicas(backend_state)[0]._actor.stopped
|
||||
|
||||
backend_state.update()
|
||||
backend_state.update()
|
||||
|
||||
# force_stop shouldn't be called until after the timer.
|
||||
assert not mock_replicas[0].force_stopped_counter
|
||||
assert not mock_replicas[0].cleaned_up
|
||||
assert replica_count(backend_state) == 1
|
||||
assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1
|
||||
assert not replicas(backend_state)[0]._actor.force_stopped_counter
|
||||
assert not replicas(backend_state)[0]._actor.cleaned_up
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
|
||||
|
||||
# Advance the timer, now the replica should be force stopped.
|
||||
timer.advance(grace_period_s + 0.1)
|
||||
backend_state.update()
|
||||
assert mock_replicas[0].force_stopped_counter == 1
|
||||
assert not mock_replicas[0].cleaned_up
|
||||
assert replica_count(backend_state) == 1
|
||||
assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1
|
||||
assert replicas(backend_state)[0]._actor.force_stopped_counter == 1
|
||||
assert not replicas(backend_state)[0]._actor.cleaned_up
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
|
||||
assert not goal_manager.check_complete(delete_goal)
|
||||
|
||||
# Force stop should be called repeatedly until the replica stops.
|
||||
backend_state.update()
|
||||
assert mock_replicas[0].force_stopped_counter == 2
|
||||
assert not mock_replicas[0].cleaned_up
|
||||
assert replica_count(backend_state) == 1
|
||||
assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1
|
||||
assert replicas(backend_state)[0]._actor.force_stopped_counter == 2
|
||||
assert not replicas(backend_state)[0]._actor.cleaned_up
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
|
||||
assert not goal_manager.check_complete(delete_goal)
|
||||
|
||||
# Once the replica is done stopping, it should be removed.
|
||||
mock_replicas[0].set_done_stopping()
|
||||
replica = mock_replicas[0]
|
||||
replica = replicas(backend_state)[0]
|
||||
replica._actor.set_done_stopping()
|
||||
backend_state.update()
|
||||
assert replica_count(backend_state) == 0
|
||||
assert len(replicas(backend_state)) == 0
|
||||
|
||||
# TODO(edoakes): can we remove this extra update period for completing it?
|
||||
assert replica.cleaned_up
|
||||
backend_state.update()
|
||||
assert goal_manager.check_complete(delete_goal)
|
||||
assert replica._actor.cleaned_up
|
||||
|
||||
|
||||
def test_redeploy_same_version(mock_backend_state):
|
||||
# Redeploying with the same version and code should do nothing.
|
||||
backend_state, timer, goal_manager = mock_backend_state
|
||||
|
||||
assert len(replicas(backend_state)) == 0
|
||||
|
||||
b_config_1, r_config_1 = generate_configs()
|
||||
goal_1 = backend_state.create_backend(
|
||||
"tag1", b_config_1, r_config_1, version="1")
|
||||
|
||||
backend_state.update()
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
|
||||
assert replicas(backend_state)[0].version == "1"
|
||||
assert not goal_manager.check_complete(goal_1)
|
||||
|
||||
# Test redeploying while the initial deployment is still pending.
|
||||
_, r_config_2 = generate_configs()
|
||||
goal_2 = backend_state.create_backend(
|
||||
"tag1", b_config_1, r_config_2, version="1")
|
||||
assert goal_1 == goal_2
|
||||
assert not goal_manager.check_complete(goal_1)
|
||||
|
||||
backend_state.update()
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
|
||||
assert replicas(backend_state)[0].version == "1"
|
||||
|
||||
# Mark the replica ready. After this, the initial goal should be complete.
|
||||
replicas(backend_state)[0]._actor.set_ready()
|
||||
backend_state.update()
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
|
||||
assert replicas(backend_state)[0].version == "1"
|
||||
|
||||
backend_state.update()
|
||||
assert goal_manager.check_complete(goal_1)
|
||||
|
||||
# Test redeploying after the initial deployment has finished.
|
||||
same_version_goal = backend_state.create_backend(
|
||||
"tag1", b_config_1, r_config_1, version="1")
|
||||
assert goal_manager.check_complete(same_version_goal)
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
|
||||
assert replicas(backend_state)[0].version == "1"
|
||||
assert goal_manager.check_complete(goal_2)
|
||||
|
||||
|
||||
def test_redeploy_new_version(mock_backend_state):
|
||||
# Redeploying with a new version should start a new replica.
|
||||
backend_state, timer, goal_manager = mock_backend_state
|
||||
|
||||
assert len(replicas(backend_state)) == 0
|
||||
|
||||
b_config_1, r_config_1 = generate_configs()
|
||||
goal_1 = backend_state.create_backend(
|
||||
"tag1", b_config_1, r_config_1, version="1")
|
||||
|
||||
backend_state.update()
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
|
||||
assert replicas(backend_state)[0].version == "1"
|
||||
assert not goal_manager.check_complete(goal_1)
|
||||
|
||||
# Test redeploying while the initial deployment is still pending.
|
||||
_, r_config_2 = generate_configs()
|
||||
goal_2 = backend_state.create_backend(
|
||||
"tag1", b_config_1, r_config_2, version="2")
|
||||
assert goal_1 != goal_2
|
||||
assert goal_manager.check_complete(goal_1)
|
||||
assert not goal_manager.check_complete(goal_2)
|
||||
|
||||
# The initial replica should be stopping and the new replica starting.
|
||||
backend_state.update()
|
||||
assert len(replicas(backend_state)) == 2
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
|
||||
assert replicas(
|
||||
backend_state, states=[ReplicaState.STOPPING])[0].version == "1"
|
||||
assert replicas(
|
||||
backend_state, states=[ReplicaState.STARTING])[0].version == "2"
|
||||
|
||||
# The initial replica should be gone and the new replica running.
|
||||
replicas(
|
||||
backend_state,
|
||||
states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping()
|
||||
replicas(
|
||||
backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready()
|
||||
backend_state.update()
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
|
||||
assert replicas(
|
||||
backend_state, states=[ReplicaState.RUNNING])[0].version == "2"
|
||||
|
||||
backend_state.update()
|
||||
assert goal_manager.check_complete(goal_2)
|
||||
|
||||
# Now deploy a third version after the transition has finished.
|
||||
_, r_config_3 = generate_configs()
|
||||
goal_3 = backend_state.create_backend(
|
||||
"tag1", b_config_1, r_config_3, version="3")
|
||||
assert not goal_manager.check_complete(goal_3)
|
||||
|
||||
backend_state.update()
|
||||
assert len(replicas(backend_state)) == 2
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
|
||||
assert replicas(
|
||||
backend_state, states=[ReplicaState.STOPPING])[0].version == "2"
|
||||
assert replicas(
|
||||
backend_state, states=[ReplicaState.STARTING])[0].version == "3"
|
||||
|
||||
replicas(
|
||||
backend_state,
|
||||
states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping()
|
||||
replicas(
|
||||
backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready()
|
||||
backend_state.update()
|
||||
assert len(replicas(backend_state)) == 1
|
||||
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
|
||||
assert replicas(
|
||||
backend_state, states=[ReplicaState.RUNNING])[0].version == "3"
|
||||
|
||||
backend_state.update()
|
||||
assert goal_manager.check_complete(goal_3)
|
||||
|
||||
|
||||
def test_redeploy_new_config():
|
||||
# TODO(edoakes): we should test the behavior when only the config changes.
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Add table
Reference in a new issue