From de598149d1a8a7dba38580cc26a46ac1e73df9d0 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Thu, 18 Mar 2021 11:08:45 -0500 Subject: [PATCH] [serve] Add tests for backend_state versioning (#14748) --- python/ray/serve/tests/test_backend_state.py | 326 +++++++++++++------ 1 file changed, 224 insertions(+), 102 deletions(-) diff --git a/python/ray/serve/tests/test_backend_state.py b/python/ray/serve/tests/test_backend_state.py index 9aad5ece7..68a0d6f33 100644 --- a/python/ray/serve/tests/test_backend_state.py +++ b/python/ray/serve/tests/test_backend_state.py @@ -16,70 +16,61 @@ from ray.serve.common import ( from ray.serve.backend_state import BackendState, ReplicaState -def mock_replica_factory(mock_replicas): - class MockReplicaActorWrapper: - def __init__(self, actor_name: str, detached: bool, - controller_name: str, replica_tag: ReplicaTag, - backend_tag: BackendTag): - self._actor_name = actor_name - self._replica_tag = replica_tag - self._backend_tag = backend_tag - self._state = ReplicaState.SHOULD_START +class MockReplicaActorWrapper: + def __init__(self, actor_name: str, detached: bool, controller_name: str, + replica_tag: ReplicaTag, backend_tag: BackendTag): + self._actor_name = actor_name + self._replica_tag = replica_tag + self._backend_tag = backend_tag + self._state = ReplicaState.SHOULD_START - mock_replicas.append(self) + # Will be set when `start()` is called. + self.started = False + # Expected to be set in the test. + self.ready = False + # Will be set when `graceful_stop()` is called. + self.stopped = False + # Expected to be set in the test. + self.done_stopping = False + # Will be set when `force_stop()` is called. + self.force_stopped_counter = 0 + # Will be cleaned up when `cleanup()` is called. + self.cleaned_up = False - # Will be set when `start()` is called. - self.started = False - # Expected to be set in the test. - self.ready = False - # Will be set when `graceful_stop()` is called. - self.stopped = False - # Expected to be set in the test. - self.done_stopping = False - # Will be set when `force_stop()` is called. - self.force_stopped_counter = 0 - # Will be cleaned up when `cleanup()` is called. - self.cleaned_up = False + @property + def actor_handle(self) -> ActorHandle: + return None - def __del__(self): - mock_replicas.remove(self) + def set_ready(self): + self.ready = True - @property - def actor_handle(self) -> ActorHandle: - return None + def set_done_stopping(self): + self.done_stopping = True - def set_ready(self): - self.ready = True + def start(self, backend_info: BackendInfo): + self.started = True - def set_done_stopping(self): - self.done_stopping = True + def check_ready(self) -> bool: + assert self.started + return self.ready - def start(self, backend_info: BackendInfo): - self.started = True + def resource_requirements( + self) -> Tuple[Dict[str, float], Dict[str, float]]: + assert self.started + return {"REQUIRED_RESOURCE": 1.0}, {"AVAILABLE_RESOURCE": 1.0} - def check_ready(self) -> bool: - assert self.started - return self.ready + def graceful_stop(self) -> None: + assert self.started + self.stopped = True - def resource_requirements( - self) -> Tuple[Dict[str, float], Dict[str, float]]: - assert self.started - return {"REQUIRED_RESOURCE": 1.0}, {"AVAILABLE_RESOURCE": 1.0} + def check_stopped(self) -> bool: + return self.done_stopping - def graceful_stop(self) -> None: - assert self.started - self.stopped = True + def force_stop(self): + self.force_stopped_counter += 1 - def check_stopped(self) -> bool: - return self.done_stopping - - def force_stop(self): - self.force_stopped_counter += 1 - - def cleanup(self): - self.cleaned_up = True - - return MockReplicaActorWrapper + def cleanup(self): + self.cleaned_up = True def generate_configs(num_replicas: Optional[int] = 1 @@ -103,10 +94,9 @@ class MockTimer: @pytest.fixture def mock_backend_state() -> Tuple[BackendState, Mock, Mock]: timer = MockTimer() - mock_replicas = [] with patch( "ray.serve.backend_state.ActorReplicaWrapper", - new=mock_replica_factory(mock_replicas)), patch( + new=MockReplicaActorWrapper), patch( "time.time", new=timer.time), patch("ray.serve.kv_store.RayInternalKVStore" ) as mock_kv_store, patch( @@ -119,22 +109,22 @@ def mock_backend_state() -> Tuple[BackendState, Mock, Mock]: backend_state = BackendState("name", True, mock_kv_store, mock_long_poll, goal_manager) mock_checkpoint.return_value = None - yield backend_state, timer, mock_replicas, goal_manager + yield backend_state, timer, goal_manager -def replica_count(backend_state, backend=None, states=None): - total = 0 +def replicas(backend_state, backend=None, states=None): + replicas = [] for backend_tag, state_dict in backend_state._replicas.items(): if backend is None or backend_tag == backend: for state, replica_list in state_dict.items(): if states is None or state in states: - total += len(replica_list) + replicas.extend(replica_list) - return total + return replicas def test_override_goals(mock_backend_state): - backend_state, _, _, goal_manager = mock_backend_state + backend_state, _, goal_manager = mock_backend_state b_config_1, r_config_1 = generate_configs() initial_goal = backend_state.create_backend("tag1", b_config_1, r_config_1) @@ -147,7 +137,7 @@ def test_override_goals(mock_backend_state): def test_return_existing_goal(mock_backend_state): - backend_state, _, _, goal_manager = mock_backend_state + backend_state, _, goal_manager = mock_backend_state b_config_1, r_config_1 = generate_configs() initial_goal = backend_state.create_backend("tag1", b_config_1, r_config_1) @@ -159,30 +149,30 @@ def test_return_existing_goal(mock_backend_state): def test_create_delete_single_replica(mock_backend_state): - backend_state, timer, mock_replicas, goal_manager = mock_backend_state + backend_state, timer, goal_manager = mock_backend_state - assert replica_count(backend_state) == 0 + assert len(replicas(backend_state)) == 0 b_config_1, r_config_1 = generate_configs() create_goal = backend_state.create_backend("tag1", b_config_1, r_config_1) # Single replica should be created. backend_state.update() - assert replica_count(backend_state) == 1 - assert replica_count(backend_state, states=[ReplicaState.STARTING]) == 1 - assert mock_replicas[0].started + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 + assert replicas(backend_state)[0]._actor.started # update() should not transition the state if the replica isn't ready. backend_state.update() - assert replica_count(backend_state) == 1 - assert replica_count(backend_state, states=[ReplicaState.STARTING]) == 1 - mock_replicas[0].set_ready() + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 + replicas(backend_state)[0]._actor.set_ready() assert not goal_manager.check_complete(create_goal) # Now the replica should be marked running. backend_state.update() - assert replica_count(backend_state) == 1 - assert replica_count(backend_state, states=[ReplicaState.RUNNING]) == 1 + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 # TODO(edoakes): can we remove this extra update period for completing it? backend_state.update() @@ -191,28 +181,28 @@ def test_create_delete_single_replica(mock_backend_state): # Removing the replica should transition it to stopping. delete_goal = backend_state.delete_backend("tag1") backend_state.update() - assert replica_count(backend_state) == 1 - assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1 - assert mock_replicas[0].stopped - assert not mock_replicas[0].cleaned_up + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 + assert replicas(backend_state)[0]._actor.stopped + assert not replicas(backend_state)[0]._actor.cleaned_up assert not goal_manager.check_complete(delete_goal) # Once it's done stopping, replica should be removed. - mock_replicas[0].set_done_stopping() - replica = mock_replicas[0] + replica = replicas(backend_state)[0] + replica._actor.set_done_stopping() backend_state.update() - assert replica.cleaned_up - assert replica_count(backend_state) == 0 + assert len(replicas(backend_state)) == 0 # TODO(edoakes): can we remove this extra update period for completing it? backend_state.update() assert goal_manager.check_complete(delete_goal) + replica._actor.cleaned_up def test_force_kill(mock_backend_state): - backend_state, timer, mock_replicas, goal_manager = mock_backend_state + backend_state, timer, goal_manager = mock_backend_state - assert replica_count(backend_state) == 0 + assert len(replicas(backend_state)) == 0 grace_period_s = 10 b_config_1, r_config_1 = generate_configs() @@ -221,52 +211,184 @@ def test_force_kill(mock_backend_state): # Create and delete the backend. backend_state.create_backend("tag1", b_config_1, r_config_1) backend_state.update() - mock_replicas[0].set_ready() + replicas(backend_state)[0]._actor.set_ready() backend_state.update() delete_goal = backend_state.delete_backend("tag1") backend_state.update() # Replica should remain in STOPPING until it finishes. - assert replica_count(backend_state) == 1 - assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1 - assert mock_replicas[0].stopped + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 + assert replicas(backend_state)[0]._actor.stopped backend_state.update() backend_state.update() # force_stop shouldn't be called until after the timer. - assert not mock_replicas[0].force_stopped_counter - assert not mock_replicas[0].cleaned_up - assert replica_count(backend_state) == 1 - assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1 + assert not replicas(backend_state)[0]._actor.force_stopped_counter + assert not replicas(backend_state)[0]._actor.cleaned_up + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 # Advance the timer, now the replica should be force stopped. timer.advance(grace_period_s + 0.1) backend_state.update() - assert mock_replicas[0].force_stopped_counter == 1 - assert not mock_replicas[0].cleaned_up - assert replica_count(backend_state) == 1 - assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1 + assert replicas(backend_state)[0]._actor.force_stopped_counter == 1 + assert not replicas(backend_state)[0]._actor.cleaned_up + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 assert not goal_manager.check_complete(delete_goal) # Force stop should be called repeatedly until the replica stops. backend_state.update() - assert mock_replicas[0].force_stopped_counter == 2 - assert not mock_replicas[0].cleaned_up - assert replica_count(backend_state) == 1 - assert replica_count(backend_state, states=[ReplicaState.STOPPING]) == 1 + assert replicas(backend_state)[0]._actor.force_stopped_counter == 2 + assert not replicas(backend_state)[0]._actor.cleaned_up + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 assert not goal_manager.check_complete(delete_goal) # Once the replica is done stopping, it should be removed. - mock_replicas[0].set_done_stopping() - replica = mock_replicas[0] + replica = replicas(backend_state)[0] + replica._actor.set_done_stopping() backend_state.update() - assert replica_count(backend_state) == 0 + assert len(replicas(backend_state)) == 0 # TODO(edoakes): can we remove this extra update period for completing it? - assert replica.cleaned_up backend_state.update() assert goal_manager.check_complete(delete_goal) + assert replica._actor.cleaned_up + + +def test_redeploy_same_version(mock_backend_state): + # Redeploying with the same version and code should do nothing. + backend_state, timer, goal_manager = mock_backend_state + + assert len(replicas(backend_state)) == 0 + + b_config_1, r_config_1 = generate_configs() + goal_1 = backend_state.create_backend( + "tag1", b_config_1, r_config_1, version="1") + + backend_state.update() + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 + assert replicas(backend_state)[0].version == "1" + assert not goal_manager.check_complete(goal_1) + + # Test redeploying while the initial deployment is still pending. + _, r_config_2 = generate_configs() + goal_2 = backend_state.create_backend( + "tag1", b_config_1, r_config_2, version="1") + assert goal_1 == goal_2 + assert not goal_manager.check_complete(goal_1) + + backend_state.update() + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 + assert replicas(backend_state)[0].version == "1" + + # Mark the replica ready. After this, the initial goal should be complete. + replicas(backend_state)[0]._actor.set_ready() + backend_state.update() + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 + assert replicas(backend_state)[0].version == "1" + + backend_state.update() + assert goal_manager.check_complete(goal_1) + + # Test redeploying after the initial deployment has finished. + same_version_goal = backend_state.create_backend( + "tag1", b_config_1, r_config_1, version="1") + assert goal_manager.check_complete(same_version_goal) + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 + assert replicas(backend_state)[0].version == "1" + assert goal_manager.check_complete(goal_2) + + +def test_redeploy_new_version(mock_backend_state): + # Redeploying with a new version should start a new replica. + backend_state, timer, goal_manager = mock_backend_state + + assert len(replicas(backend_state)) == 0 + + b_config_1, r_config_1 = generate_configs() + goal_1 = backend_state.create_backend( + "tag1", b_config_1, r_config_1, version="1") + + backend_state.update() + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 + assert replicas(backend_state)[0].version == "1" + assert not goal_manager.check_complete(goal_1) + + # Test redeploying while the initial deployment is still pending. + _, r_config_2 = generate_configs() + goal_2 = backend_state.create_backend( + "tag1", b_config_1, r_config_2, version="2") + assert goal_1 != goal_2 + assert goal_manager.check_complete(goal_1) + assert not goal_manager.check_complete(goal_2) + + # The initial replica should be stopping and the new replica starting. + backend_state.update() + assert len(replicas(backend_state)) == 2 + assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 + assert replicas( + backend_state, states=[ReplicaState.STOPPING])[0].version == "1" + assert replicas( + backend_state, states=[ReplicaState.STARTING])[0].version == "2" + + # The initial replica should be gone and the new replica running. + replicas( + backend_state, + states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping() + replicas( + backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready() + backend_state.update() + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 + assert replicas( + backend_state, states=[ReplicaState.RUNNING])[0].version == "2" + + backend_state.update() + assert goal_manager.check_complete(goal_2) + + # Now deploy a third version after the transition has finished. + _, r_config_3 = generate_configs() + goal_3 = backend_state.create_backend( + "tag1", b_config_1, r_config_3, version="3") + assert not goal_manager.check_complete(goal_3) + + backend_state.update() + assert len(replicas(backend_state)) == 2 + assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 + assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 + assert replicas( + backend_state, states=[ReplicaState.STOPPING])[0].version == "2" + assert replicas( + backend_state, states=[ReplicaState.STARTING])[0].version == "3" + + replicas( + backend_state, + states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping() + replicas( + backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready() + backend_state.update() + assert len(replicas(backend_state)) == 1 + assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 + assert replicas( + backend_state, states=[ReplicaState.RUNNING])[0].version == "3" + + backend_state.update() + assert goal_manager.check_complete(goal_3) + + +def test_redeploy_new_config(): + # TODO(edoakes): we should test the behavior when only the config changes. + pass if __name__ == "__main__":