diff --git a/python/ray/serve/backend_state.py b/python/ray/serve/backend_state.py index cf50c0155..a3e5f2134 100644 --- a/python/ray/serve/backend_state.py +++ b/python/ray/serve/backend_state.py @@ -1,5 +1,7 @@ +from abc import ABC from collections import defaultdict from enum import Enum +import math import time from typing import Any, Dict, List, Optional, Tuple @@ -35,6 +37,9 @@ class ReplicaState(Enum): STOPPED = 6 +ALL_REPLICA_STATES = list(ReplicaState) + + class ActorReplicaWrapper: """Wraps a Ray actor for a backend replica. @@ -175,7 +180,13 @@ class ActorReplicaWrapper: pass -class BackendReplica: +class VersionedReplica(ABC): + @property + def version(self): + return self.version + + +class BackendReplica(VersionedReplica): """Manages state transitions for backend replicas. This is basically a checkpointable lightweight state machine. @@ -328,6 +339,101 @@ class BackendReplica: return False +class ReplicaStateContainer: + """Container for mapping ReplicaStates to lists of BackendReplicas.""" + + def __init__(self): + self._replicas: Dict[ReplicaState, List[BackendReplica]] = defaultdict( + list) + + def add(self, state: ReplicaState, replica: VersionedReplica): + """Add the provided replica under the provided state. + + Args: + state (ReplicaState): state to add the replica under. + replica (VersionedReplica): replica to add. + """ + assert isinstance(state, ReplicaState) + assert isinstance(replica, VersionedReplica) + self._replicas[state].append(replica) + + def get(self, states: Optional[List[ReplicaState]] = None + ) -> List[VersionedReplica]: + """Get all replicas of the given states. + + This does not remove them from the container. Replicas are returned + in order of state as passed in. + + Args: + states (str): states to consider. If not specified, all replicas + are considered. + """ + if states is None: + states = ALL_REPLICA_STATES + + assert isinstance(states, list) + + return sum((self._replicas[state] for state in states), []) + + def pop(self, + exclude_version: Optional[str] = None, + states: Optional[List[ReplicaState]] = None, + max_replicas: Optional[int] = math.inf) -> List[VersionedReplica]: + """Get and remove all replicas of the given states. + + This removes the replicas from the container. Replicas are returned + in order of state as passed in. + + Args: + exclude_version (str): if specified, replicas of the provided + version will *not* be removed. + states (str): states to consider. If not specified, all replicas + are considered. + max_replicas (int): max number of replicas to return. If not + specified, will pop all replicas matching the criteria. + """ + if states is None: + states = ALL_REPLICA_STATES + + assert exclude_version is None or isinstance(exclude_version, str) + assert isinstance(states, list) + + replicas = [] + for state in states: + popped = [] + remaining = [] + for replica in self._replicas[state]: + if len(replicas) + len(popped) == max_replicas: + remaining.append(replica) + elif (exclude_version is not None + and replica.version == exclude_version): + remaining.append(replica) + else: + popped.append(replica) + self._replicas[state] = remaining + replicas.extend(popped) + + return replicas + + def count(self, states: Optional[List[ReplicaState]] = None): + """Get the total count of replicas of the given states. + + Args: + states (str): states to consider. If not specified, all replicas + are considered. + """ + if states is None: + states = ALL_REPLICA_STATES + assert isinstance(states, list) + return sum(len(self._replicas[state]) for state in states) + + def __str__(self): + return str(self._replicas) + + def __repr__(self): + return repr(self._replicas) + + class BackendState: """Manages all state for backends in the system. @@ -344,9 +450,7 @@ class BackendState: self._kv_store = kv_store self._long_poll_host = long_poll_host self._goal_manager = goal_manager - - self._replicas: Dict[BackendTag, Dict[ReplicaState, List[ - BackendReplica]]] = defaultdict(lambda: defaultdict(list)) + self._replicas: Dict[BackendTag, ReplicaStateContainer] = dict() self._backend_metadata: Dict[BackendTag, BackendInfo] = dict() self._target_replicas: Dict[BackendTag, int] = defaultdict(int) self._backend_goals: Dict[BackendTag, GoalId] = dict() @@ -387,11 +491,10 @@ class BackendState: self) -> Dict[BackendTag, Dict[ReplicaTag, ActorHandle]]: return { backend_tag: { - backend_replica._replica_tag: backend_replica.actor_handle - for backend_replica in state_to_replica_dict[ - ReplicaState.RUNNING] + r.replica_tag: r.actor_handle + for r in replicas.get(states=[ReplicaState.RUNNING]) } - for backend_tag, state_to_replica_dict in self._replicas.items() + for backend_tag, replicas in self._replicas.items() } def get_backend_configs(self) -> Dict[BackendTag, BackendConfig]: @@ -440,6 +543,9 @@ class BackendState: and self._target_versions[backend_tag] == version): return self._backend_goals.get(backend_tag, None) + if backend_tag not in self._replicas: + self._replicas[backend_tag] = ReplicaStateContainer() + backend_replica_class = create_backend_replica( replica_config.backend_def) @@ -485,31 +591,28 @@ class BackendState: def _stop_wrong_version_replicas( self, backend_tag: BackendTag, version: str, graceful_shutdown_timeout_s: float) -> int: + # NOTE(edoakes): this short-circuits when using the legacy + # `create_backend` codepath -- it can be removed once we deprecate + # that as the version should never be None. + if version is None: + return + # TODO(edoakes): to implement rolling upgrade, all we should need to # do is cap the number of old version replicas that are stopped here. - num_stopped = 0 - for target_state in [ + replicas_to_stop = self._replicas[backend_tag].pop( + exclude_version=version, + states=[ ReplicaState.SHOULD_START, ReplicaState.STARTING, ReplicaState.RUNNING - ]: - target_version = [] - wrong_version = [] - for replica in self._replicas[backend_tag][target_state]: - if replica.version == version: - target_version.append(replica) - else: - wrong_version.append(replica) + ]) - self._replicas[backend_tag][target_state] = target_version - for replica in wrong_version: - replica.set_should_stop(graceful_shutdown_timeout_s) - self._replicas[backend_tag][ReplicaState.SHOULD_STOP].append( - replica) - num_stopped += 1 + if len(replicas_to_stop) > 0: + logger.info(f"Stopping {len(replicas_to_stop)} replicas of " + f"backend '{backend_tag}' with outdated versions.") - if num_stopped > 0: - logger.info(f"Stopping {num_stopped} replicas of backend " - f"'{backend_tag}' with outdated versions.") + for replica in replicas_to_stop: + replica.set_should_stop(graceful_shutdown_timeout_s) + self._replicas[backend_tag].add(ReplicaState.SHOULD_STOP, replica) def _scale_backend_replicas( self, @@ -541,10 +644,9 @@ class BackendState: self._stop_wrong_version_replicas(backend_tag, version, graceful_shutdown_timeout_s) - current_num_replicas = sum([ - len(self._replicas[backend_tag][ReplicaState.SHOULD_START]), - len(self._replicas[backend_tag][ReplicaState.STARTING]), - len(self._replicas[backend_tag][ReplicaState.RUNNING]), + current_num_replicas = self._replicas[backend_tag].count(states=[ + ReplicaState.SHOULD_START, ReplicaState.STARTING, + ReplicaState.RUNNING ]) delta_num_replicas = num_replicas - current_num_replicas @@ -557,7 +659,8 @@ class BackendState: delta_num_replicas, backend_tag)) for _ in range(delta_num_replicas): replica_tag = "{}#{}".format(backend_tag, get_random_letters()) - self._replicas[backend_tag][ReplicaState.SHOULD_START].append( + self._replicas[backend_tag].add( + ReplicaState.SHOULD_START, BackendReplica(self._controller_name, self._detached, replica_tag, backend_tag, version)) @@ -567,17 +670,17 @@ class BackendState: assert self._target_replicas[backend_tag] >= delta_num_replicas for _ in range(-delta_num_replicas): - replica_state_dict = self._replicas[backend_tag] - list_to_use = replica_state_dict[ReplicaState.SHOULD_START] \ - or replica_state_dict[ReplicaState.STARTING] \ - or replica_state_dict[ReplicaState.RUNNING] + replicas_to_stop = self._replicas[backend_tag].pop( + states=[ + ReplicaState.SHOULD_START, ReplicaState.STARTING, + ReplicaState.RUNNING + ], + max_replicas=-delta_num_replicas) - assert len(list_to_use), replica_state_dict - replica_to_stop = list_to_use.pop() - - replica_to_stop.set_should_stop(graceful_shutdown_timeout_s) - self._replicas[backend_tag][ReplicaState.SHOULD_STOP].append( - replica_to_stop) + for replica in replicas_to_stop: + replica.set_should_stop(graceful_shutdown_timeout_s) + self._replicas[backend_tag].add(ReplicaState.SHOULD_STOP, + replica) return True @@ -586,57 +689,49 @@ class BackendState: for backend_tag, num_replicas in list(self._target_replicas.items()): checkpoint_needed |= self._scale_backend_replicas( backend_tag, num_replicas, self._target_versions[backend_tag]) - if num_replicas == 0: - del self._backend_metadata[backend_tag] - del self._target_replicas[backend_tag] - del self._target_versions[backend_tag] if checkpoint_needed: self._checkpoint() - def _pop_replicas_of_state(self, state: ReplicaState - ) -> List[Tuple[ReplicaState, BackendTag]]: - replicas = [] - for backend_tag, state_to_replica_dict in self._replicas.items(): - if state in state_to_replica_dict: - replicas.extend( - (replica, backend_tag) - for replica in state_to_replica_dict.pop(state)) - - return replicas - def _completed_goals(self) -> List[GoalId]: completed_goals = [] - all_tags = set(self._replicas.keys()).union( - set(self._backend_metadata.keys())) - - for backend_tag in all_tags: - desired_num_replicas = self._target_replicas.get(backend_tag) - state_dict = self._replicas.get(backend_tag, {}) - existing_info = state_dict.get(ReplicaState.RUNNING, []) + deleted_backends = [] + for backend_tag in self._replicas: + target_count = self._target_replicas.get(backend_tag, 0) # If we have pending ops, the current goal is *not* ready. - if (state_dict.get(ReplicaState.SHOULD_START) - or state_dict.get(ReplicaState.STARTING) - or state_dict.get(ReplicaState.SHOULD_STOP) - or state_dict.get(ReplicaState.STOPPING)): + if (self._replicas[backend_tag].count(states=[ + ReplicaState.SHOULD_START, + ReplicaState.STARTING, + ReplicaState.SHOULD_STOP, + ReplicaState.STOPPING, + ]) > 0): continue + running = self._replicas[backend_tag].get( + states=[ReplicaState.RUNNING]) + running_count = len(running) + # Check for deleting. - if (not desired_num_replicas or - desired_num_replicas == 0) and \ - (not existing_info or len(existing_info) == 0): + if target_count == 0 and running_count == 0: + deleted_backends.append(backend_tag) completed_goals.append( self._backend_goals.pop(backend_tag, None)) # Check for a non-zero number of backends. - if (desired_num_replicas and existing_info) \ - and desired_num_replicas == len(existing_info): + elif target_count == running_count: # Check that all running replicas are the target version. target_version = self._target_versions[backend_tag] - if all(r.version == target_version for r in existing_info): + if all(r.version == target_version for r in running): completed_goals.append( self._backend_goals.pop(backend_tag, None)) + + for backend_tag in deleted_backends: + del self._replicas[backend_tag] + del self._backend_metadata[backend_tag] + del self._target_replicas[backend_tag] + del self._target_versions[backend_tag] + return [goal for goal in completed_goals if goal] def update(self) -> bool: @@ -647,43 +742,28 @@ class BackendState: for goal_id in self._completed_goals(): self._goal_manager.complete_goal(goal_id) - for replica_state, backend_tag in self._pop_replicas_of_state( - ReplicaState.SHOULD_START): - replica_state.start(self._backend_metadata[backend_tag]) - self._replicas[backend_tag][ReplicaState.STARTING].append( - replica_state) - - for replica_state, backend_tag in self._pop_replicas_of_state( - ReplicaState.SHOULD_STOP): - replica_state.stop() - self._replicas[backend_tag][ReplicaState.STOPPING].append( - replica_state) - transition_triggered = False + for backend_tag, replicas in self._replicas.items(): + for replica in replicas.pop(states=[ReplicaState.SHOULD_START]): + replica.start(self._backend_metadata[backend_tag]) + replicas.add(ReplicaState.STARTING, replica) - for replica_state, backend_tag in self._pop_replicas_of_state( - ReplicaState.STARTING): - if replica_state.check_started(): - self._replicas[backend_tag][ReplicaState.RUNNING].append( - replica_state) - transition_triggered = True - else: - self._replicas[backend_tag][ReplicaState.STARTING].append( - replica_state) + for replica in replicas.pop(states=[ReplicaState.SHOULD_STOP]): + replica.stop() + replicas.add(ReplicaState.STOPPING, replica) - for replica_state, backend_tag in self._pop_replicas_of_state( - ReplicaState.STOPPING): - if replica_state.check_stopped(): - transition_triggered = True - else: - self._replicas[backend_tag][ReplicaState.STOPPING].append( - replica_state) + for replica in replicas.pop(states=[ReplicaState.STARTING]): + if replica.check_started(): + replicas.add(ReplicaState.RUNNING, replica) + transition_triggered = True + else: + replicas.add(ReplicaState.STARTING, replica) - for backend_tag in list(self._replicas.keys()): - if not any(self._replicas[backend_tag]): - del self._replicas[backend_tag] - del self._backend_metadata[backend_tag] - del self._target_replicas[backend_tag] + for replica in replicas.pop(states=[ReplicaState.STOPPING]): + if replica.check_stopped(): + transition_triggered = True + else: + replicas.add(ReplicaState.STOPPING, replica) if transition_triggered: self._checkpoint() diff --git a/python/ray/serve/tests/test_backend_state.py b/python/ray/serve/tests/test_backend_state.py index 87066b5f2..46ed602a7 100644 --- a/python/ray/serve/tests/test_backend_state.py +++ b/python/ray/serve/tests/test_backend_state.py @@ -13,7 +13,12 @@ from ray.serve.common import ( ReplicaConfig, ReplicaTag, ) -from ray.serve.backend_state import BackendState, ReplicaState +from ray.serve.backend_state import ( + BackendState, + ReplicaState, + ReplicaStateContainer, + VersionedReplica, +) class MockReplicaActorWrapper: @@ -115,26 +120,159 @@ def mock_backend_state() -> Tuple[BackendState, Mock, Mock]: yield backend_state, timer, goal_manager -def replicas(backend_state, backend=None, states=None): - replicas = [] - for backend_tag, state_dict in backend_state._replicas.items(): - if backend is None or backend_tag == backend: - for state, replica_list in state_dict.items(): - if states is None or state in states: - replicas.extend(replica_list) +def replica(version: Optional[str] = None) -> VersionedReplica: + class MockVersionedReplica(VersionedReplica): + def __init__(self, version): + self._version = version - return replicas + @property + def version(self): + return self._version + + return MockVersionedReplica(version) + + +def test_replica_state_container_count(): + c = ReplicaStateContainer() + r1, r2, r3 = replica(), replica(), replica() + c.add(ReplicaState.STARTING, r1) + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.STOPPING, r3) + assert c.count() == 3 + assert c.count() == c.count([ReplicaState.STARTING, ReplicaState.STOPPING]) + assert c.count([ReplicaState.STARTING]) == 2 + assert c.count([ReplicaState.STOPPING]) == 1 + assert not c.count([ReplicaState.SHOULD_START]) + assert not c.count([ReplicaState.SHOULD_START, ReplicaState.SHOULD_STOP]) + + +def test_replica_state_container_get(): + c = ReplicaStateContainer() + r1, r2, r3 = replica(), replica(), replica() + + c.add(ReplicaState.STARTING, r1) + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.STOPPING, r3) + assert c.get() == [r1, r2, r3] + assert c.get() == c.get([ReplicaState.STARTING, ReplicaState.STOPPING]) + assert c.get([ReplicaState.STARTING]) == [r1, r2] + assert c.get([ReplicaState.STOPPING]) == [r3] + assert not c.get([ReplicaState.SHOULD_START]) + assert not c.get([ReplicaState.SHOULD_START, ReplicaState.SHOULD_STOP]) + + +def test_replica_state_container_pop_basic(): + c = ReplicaStateContainer() + r1, r2, r3 = replica(), replica(), replica() + + c.add(ReplicaState.STARTING, r1) + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.STOPPING, r3) + assert c.pop() == [r1, r2, r3] + assert not c.pop() + + +def test_replica_state_container_pop_exclude_version(): + c = ReplicaStateContainer() + r1, r2, r3 = replica("1"), replica("1"), replica("2") + + c.add(ReplicaState.STARTING, r1) + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.STARTING, r3) + assert c.pop(exclude_version="1") == [r3] + assert not c.pop(exclude_version="1") + assert c.pop(exclude_version="2") == [r1, r2] + assert not c.pop(exclude_version="2") + assert not c.pop() + + +def test_replica_state_container_pop_max_replicas(): + c = ReplicaStateContainer() + r1, r2, r3 = replica(), replica(), replica() + + c.add(ReplicaState.STARTING, r1) + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.STOPPING, r3) + assert not c.pop(max_replicas=0) + assert len(c.pop(max_replicas=1)) == 1 + assert len(c.pop(max_replicas=2)) == 2 + c.add(ReplicaState.STARTING, r1) + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.STOPPING, r3) + assert len(c.pop(max_replicas=10)) == 3 + + +def test_replica_state_container_pop_states(): + c = ReplicaStateContainer() + r1, r2, r3, r4 = replica(), replica(), replica(), replica() + + # Check popping single state. + c.add(ReplicaState.STOPPING, r1) + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.SHOULD_STOP, r3) + c.add(ReplicaState.SHOULD_STOP, r4) + assert c.pop(states=[ReplicaState.STARTING]) == [r2] + assert not c.pop(states=[ReplicaState.STARTING]) + assert c.pop(states=[ReplicaState.STOPPING]) == [r1] + assert not c.pop(states=[ReplicaState.STOPPING]) + assert c.pop(states=[ReplicaState.SHOULD_STOP]) == [r3, r4] + assert not c.pop(states=[ReplicaState.SHOULD_STOP]) + + # Check popping multiple states. Ordering of states should be preserved. + c.add(ReplicaState.STOPPING, r1) + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.SHOULD_STOP, r3) + c.add(ReplicaState.SHOULD_STOP, r4) + assert c.pop(states=[ReplicaState.SHOULD_STOP, ReplicaState.STOPPING]) == [ + r3, r4, r1 + ] + assert not c.pop(states=[ReplicaState.SHOULD_STOP, ReplicaState.STOPPING]) + assert c.pop(states=[ReplicaState.STARTING]) == [r2] + assert not c.pop(states=[ReplicaState.STARTING]) + assert not c.pop() + + +def test_replica_state_container_pop_integration(): + c = ReplicaStateContainer() + r1, r2, r3, r4 = replica("1"), replica("2"), replica("2"), replica("3") + + c.add(ReplicaState.STOPPING, r1) + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.SHOULD_STOP, r3) + c.add(ReplicaState.SHOULD_STOP, r4) + assert not c.pop(exclude_version="1", states=[ReplicaState.STOPPING]) + assert c.pop( + exclude_version="1", states=[ReplicaState.SHOULD_STOP], + max_replicas=1) == [r3] + assert c.pop( + exclude_version="1", states=[ReplicaState.SHOULD_STOP], + max_replicas=1) == [r4] + c.add(ReplicaState.SHOULD_STOP, r3) + c.add(ReplicaState.SHOULD_STOP, r4) + assert c.pop( + exclude_version="1", states=[ReplicaState.SHOULD_STOP]) == [r3, r4] + assert c.pop(exclude_version="1", states=[ReplicaState.STARTING]) == [r2] + c.add(ReplicaState.STARTING, r2) + c.add(ReplicaState.SHOULD_STOP, r3) + c.add(ReplicaState.SHOULD_STOP, r4) + assert c.pop( + exclude_version="1", + states=[ReplicaState.SHOULD_STOP, + ReplicaState.STARTING]) == [r3, r4, r2] + assert c.pop( + exclude_version="nonsense", states=[ReplicaState.STOPPING]) == [r1] def test_override_goals(mock_backend_state): backend_state, _, goal_manager = mock_backend_state + tag = "tag" b_config_1, r_config_1 = generate_configs() - initial_goal = backend_state.deploy_backend("tag1", b_config_1, r_config_1) + initial_goal = backend_state.deploy_backend(tag, b_config_1, r_config_1) assert not goal_manager.check_complete(initial_goal) b_config_2, r_config_2 = generate_configs(num_replicas=2) - new_goal = backend_state.deploy_backend("tag1", b_config_2, r_config_2) + new_goal = backend_state.deploy_backend(tag, b_config_2, r_config_2) assert goal_manager.check_complete(initial_goal) assert not goal_manager.check_complete(new_goal) @@ -142,11 +280,12 @@ def test_override_goals(mock_backend_state): def test_return_existing_goal(mock_backend_state): backend_state, _, goal_manager = mock_backend_state + tag = "tag" b_config_1, r_config_1 = generate_configs() - initial_goal = backend_state.deploy_backend("tag1", b_config_1, r_config_1) + initial_goal = backend_state.deploy_backend(tag, b_config_1, r_config_1) assert not goal_manager.check_complete(initial_goal) - new_goal = backend_state.deploy_backend("tag1", b_config_1, r_config_1) + new_goal = backend_state.deploy_backend(tag, b_config_1, r_config_1) assert initial_goal == new_goal assert not goal_manager.check_complete(initial_goal) @@ -154,50 +293,56 @@ def test_return_existing_goal(mock_backend_state): def test_create_delete_single_replica(mock_backend_state): backend_state, timer, goal_manager = mock_backend_state - assert len(replicas(backend_state)) == 0 + assert len(backend_state._replicas) == 0 + tag = "tag" b_config_1, r_config_1 = generate_configs() - create_goal = backend_state.deploy_backend("tag1", b_config_1, r_config_1) + create_goal = backend_state.deploy_backend(tag, b_config_1, r_config_1) # Single replica should be created. backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 - assert replicas(backend_state)[0]._actor.started + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STARTING]) == 1 + assert backend_state._replicas[tag].count() == 1 # update() should not transition the state if the replica isn't ready. backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 - replicas(backend_state)[0]._actor.set_ready() + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STARTING]) == 1 + backend_state._replicas[tag].get()[0]._actor.set_ready() assert not goal_manager.check_complete(create_goal) # Now the replica should be marked running. backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 + assert len(backend_state._replicas[tag].get()) == 1 + assert len( + backend_state._replicas[tag].get(states=[ReplicaState.RUNNING])) == 1 # TODO(edoakes): can we remove this extra update period for completing it? backend_state.update() assert goal_manager.check_complete(create_goal) # Removing the replica should transition it to stopping. - delete_goal = backend_state.delete_backend("tag1") + delete_goal = backend_state.delete_backend(tag) backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 - assert replicas(backend_state)[0]._actor.stopped - assert not replicas(backend_state)[0]._actor.cleaned_up + assert len(backend_state._replicas[tag].get()) == 1 + assert len( + backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1 + assert backend_state._replicas[tag].get()[0]._actor.stopped + assert not backend_state._replicas[tag].get()[0]._actor.cleaned_up assert not goal_manager.check_complete(delete_goal) # Once it's done stopping, replica should be removed. - replica = replicas(backend_state)[0] + replica = backend_state._replicas[tag].get()[0] replica._actor.set_done_stopping() backend_state.update() - assert len(replicas(backend_state)) == 0 + assert len(backend_state._replicas[tag].get()) == 0 # TODO(edoakes): can we remove this extra update period for completing it? backend_state.update() + assert len(backend_state._replicas) == 0 assert goal_manager.check_complete(delete_goal) replica._actor.cleaned_up @@ -205,59 +350,68 @@ def test_create_delete_single_replica(mock_backend_state): def test_force_kill(mock_backend_state): backend_state, timer, goal_manager = mock_backend_state - assert len(replicas(backend_state)) == 0 + assert len(backend_state._replicas) == 0 grace_period_s = 10 b_config_1, r_config_1 = generate_configs() b_config_1.experimental_graceful_shutdown_timeout_s = grace_period_s # Create and delete the backend. - backend_state.deploy_backend("tag1", b_config_1, r_config_1) + tag = "tag" + backend_state.deploy_backend(tag, b_config_1, r_config_1) backend_state.update() - replicas(backend_state)[0]._actor.set_ready() + backend_state._replicas[tag].get()[0]._actor.set_ready() backend_state.update() - delete_goal = backend_state.delete_backend("tag1") + delete_goal = backend_state.delete_backend(tag) backend_state.update() # Replica should remain in STOPPING until it finishes. - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 - assert replicas(backend_state)[0]._actor.stopped + assert len(backend_state._replicas[tag].get()) == 1 + assert len( + backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1 + assert backend_state._replicas[tag].get()[0]._actor.stopped backend_state.update() backend_state.update() # force_stop shouldn't be called until after the timer. - assert not replicas(backend_state)[0]._actor.force_stopped_counter - assert not replicas(backend_state)[0]._actor.cleaned_up - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 + assert not backend_state._replicas[tag].get( + )[0]._actor.force_stopped_counter + assert not backend_state._replicas[tag].get()[0]._actor.cleaned_up + assert len(backend_state._replicas[tag].get()) == 1 + assert len( + backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1 # Advance the timer, now the replica should be force stopped. timer.advance(grace_period_s + 0.1) backend_state.update() - assert replicas(backend_state)[0]._actor.force_stopped_counter == 1 - assert not replicas(backend_state)[0]._actor.cleaned_up - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 + assert backend_state._replicas[tag].get()[ + 0]._actor.force_stopped_counter == 1 + assert not backend_state._replicas[tag].get()[0]._actor.cleaned_up + assert len(backend_state._replicas[tag].get()) == 1 + assert len( + backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1 assert not goal_manager.check_complete(delete_goal) # Force stop should be called repeatedly until the replica stops. backend_state.update() - assert replicas(backend_state)[0]._actor.force_stopped_counter == 2 - assert not replicas(backend_state)[0]._actor.cleaned_up - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 + assert backend_state._replicas[tag].get()[ + 0]._actor.force_stopped_counter == 2 + assert not backend_state._replicas[tag].get()[0]._actor.cleaned_up + assert len(backend_state._replicas[tag].get()) == 1 + assert len( + backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1 assert not goal_manager.check_complete(delete_goal) # Once the replica is done stopping, it should be removed. - replica = replicas(backend_state)[0] + replica = backend_state._replicas[tag].get()[0] replica._actor.set_done_stopping() backend_state.update() - assert len(replicas(backend_state)) == 0 + assert len(backend_state._replicas[tag].get()) == 0 # TODO(edoakes): can we remove this extra update period for completing it? backend_state.update() + assert len(backend_state._replicas) == 0 assert goal_manager.check_complete(delete_goal) assert replica._actor.cleaned_up @@ -266,95 +420,105 @@ def test_redeploy_same_version(mock_backend_state): # Redeploying with the same version and code should do nothing. backend_state, timer, goal_manager = mock_backend_state - assert len(replicas(backend_state)) == 0 + assert len(backend_state._replicas) == 0 + tag = "tag" b_config_1, r_config_1 = generate_configs() goal_1 = backend_state.deploy_backend( - "tag1", b_config_1, r_config_1, version="1") + tag, b_config_1, r_config_1, version="1") backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 - assert replicas(backend_state)[0].version == "1" + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STARTING]) == 1 + assert backend_state._replicas[tag].get()[0].version == "1" assert not goal_manager.check_complete(goal_1) # Test redeploying while the initial deployment is still pending. _, r_config_2 = generate_configs() goal_2 = backend_state.deploy_backend( - "tag1", b_config_1, r_config_2, version="1") + tag, b_config_1, r_config_2, version="1") assert goal_1 == goal_2 assert not goal_manager.check_complete(goal_1) backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 - assert replicas(backend_state)[0].version == "1" + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STARTING]) == 1 + assert backend_state._replicas[tag].get()[0].version == "1" # Mark the replica ready. After this, the initial goal should be complete. - replicas(backend_state)[0]._actor.set_ready() + backend_state._replicas[tag].get()[0]._actor.set_ready() backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 - assert replicas(backend_state)[0].version == "1" + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.RUNNING]) == 1 + assert backend_state._replicas[tag].get()[0].version == "1" backend_state.update() assert goal_manager.check_complete(goal_1) # Test redeploying after the initial deployment has finished. same_version_goal = backend_state.deploy_backend( - "tag1", b_config_1, r_config_1, version="1") + tag, b_config_1, r_config_1, version="1") assert goal_manager.check_complete(same_version_goal) - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 - assert replicas(backend_state)[0].version == "1" + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.RUNNING]) == 1 + assert backend_state._replicas[tag].get()[0].version == "1" assert goal_manager.check_complete(goal_2) + assert len(backend_state._replicas) == 1 def test_redeploy_new_version(mock_backend_state): # Redeploying with a new version should start a new replica. backend_state, timer, goal_manager = mock_backend_state - assert len(replicas(backend_state)) == 0 + assert len(backend_state._replicas) == 0 + tag = "tag" b_config_1, r_config_1 = generate_configs() goal_1 = backend_state.deploy_backend( - "tag1", b_config_1, r_config_1, version="1") + tag, b_config_1, r_config_1, version="1") backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 - assert replicas(backend_state)[0].version == "1" + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STARTING]) == 1 + assert backend_state._replicas[tag].get()[0].version == "1" assert not goal_manager.check_complete(goal_1) # Test redeploying while the initial deployment is still pending. _, r_config_2 = generate_configs() goal_2 = backend_state.deploy_backend( - "tag1", b_config_1, r_config_2, version="2") + tag, b_config_1, r_config_2, version="2") assert goal_1 != goal_2 assert goal_manager.check_complete(goal_1) assert not goal_manager.check_complete(goal_2) # The initial replica should be stopping and the new replica starting. backend_state.update() - assert len(replicas(backend_state)) == 2 - assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 - assert replicas( - backend_state, states=[ReplicaState.STOPPING])[0].version == "1" - assert replicas( - backend_state, states=[ReplicaState.STARTING])[0].version == "2" + assert len(backend_state._replicas[tag].get()) == 2 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STOPPING]) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STARTING]) == 1 + assert backend_state._replicas[tag].get( + states=[ReplicaState.STOPPING])[0].version == "1" + assert backend_state._replicas[tag].get( + states=[ReplicaState.STARTING])[0].version == "2" # The initial replica should be gone and the new replica running. - replicas( - backend_state, + backend_state._replicas[tag].get( states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping() - replicas( - backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready() + backend_state._replicas[tag].get( + states=[ReplicaState.STARTING])[0]._actor.set_ready() backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 - assert replicas( - backend_state, states=[ReplicaState.RUNNING])[0].version == "2" + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.RUNNING]) == 1 + assert backend_state._replicas[tag].get( + states=[ReplicaState.RUNNING])[0].version == "2" backend_state.update() assert goal_manager.check_complete(goal_2) @@ -362,31 +526,34 @@ def test_redeploy_new_version(mock_backend_state): # Now deploy a third version after the transition has finished. _, r_config_3 = generate_configs() goal_3 = backend_state.deploy_backend( - "tag1", b_config_1, r_config_3, version="3") + tag, b_config_1, r_config_3, version="3") assert not goal_manager.check_complete(goal_3) backend_state.update() - assert len(replicas(backend_state)) == 2 - assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 - assert replicas( - backend_state, states=[ReplicaState.STOPPING])[0].version == "2" - assert replicas( - backend_state, states=[ReplicaState.STARTING])[0].version == "3" + assert len(backend_state._replicas[tag].get()) == 2 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STOPPING]) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STARTING]) == 1 + assert backend_state._replicas[tag].get( + states=[ReplicaState.STOPPING])[0].version == "2" + assert backend_state._replicas[tag].get( + states=[ReplicaState.STARTING])[0].version == "3" - replicas( - backend_state, + backend_state._replicas[tag].get( states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping() - replicas( - backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready() + backend_state._replicas[tag].get( + states=[ReplicaState.STARTING])[0]._actor.set_ready() backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 - assert replicas( - backend_state, states=[ReplicaState.RUNNING])[0].version == "3" + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.RUNNING]) == 1 + assert backend_state._replicas[tag].get( + states=[ReplicaState.RUNNING])[0].version == "3" backend_state.update() assert goal_manager.check_complete(goal_3) + assert len(backend_state._replicas) == 1 def test_deploy_new_config_same_version(mock_backend_state): @@ -394,28 +561,31 @@ def test_deploy_new_config_same_version(mock_backend_state): # replica. backend_state, timer, goal_manager = mock_backend_state - assert len(replicas(backend_state)) == 0 + assert len(backend_state._replicas) == 0 + tag = "tag" b_config_1, r_config_1 = generate_configs() create_goal = backend_state.deploy_backend( - "tag1", b_config_1, r_config_1, version="1") + tag, b_config_1, r_config_1, version="1") # Create the replica initially. backend_state.update() - replicas(backend_state)[0]._actor.set_ready() + backend_state._replicas[tag].get()[0]._actor.set_ready() backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.RUNNING]) == 1 backend_state.update() assert goal_manager.check_complete(create_goal) # Update to a new config without changing the version. b_config_2, _ = generate_configs(user_config={"hello": "world"}) update_goal = backend_state.deploy_backend( - "tag1", b_config_2, r_config_1, version="1") + tag, b_config_2, r_config_1, version="1") backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.RUNNING]) == 1 backend_state.update() assert goal_manager.check_complete(update_goal) @@ -424,45 +594,49 @@ def test_deploy_new_config_new_version(mock_backend_state): # Deploying a new config with a new version should deploy a new replica. backend_state, timer, goal_manager = mock_backend_state - assert len(replicas(backend_state)) == 0 + assert len(backend_state._replicas) == 0 + tag = "tag" b_config_1, r_config_1 = generate_configs() create_goal = backend_state.deploy_backend( - "tag1", b_config_1, r_config_1, version="1") + tag, b_config_1, r_config_1, version="1") # Create the replica initially. backend_state.update() - replicas(backend_state)[0]._actor.set_ready() + backend_state._replicas[tag].get()[0]._actor.set_ready() backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.RUNNING]) == 1 backend_state.update() assert goal_manager.check_complete(create_goal) # Update to a new config and a new version. b_config_2, _ = generate_configs(user_config={"hello": "world"}) update_goal = backend_state.deploy_backend( - "tag1", b_config_2, r_config_1, version="2") + tag, b_config_2, r_config_1, version="2") backend_state.update() - assert len(replicas(backend_state)) == 2 - assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1 - assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1 - assert replicas( - backend_state, states=[ReplicaState.STOPPING])[0].version == "1" - assert replicas( - backend_state, states=[ReplicaState.STARTING])[0].version == "2" + assert len(backend_state._replicas[tag].get()) == 2 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STOPPING]) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.STARTING]) == 1 + assert backend_state._replicas[tag].get( + states=[ReplicaState.STOPPING])[0].version == "1" + assert backend_state._replicas[tag].get( + states=[ReplicaState.STARTING])[0].version == "2" - replicas( - backend_state, + backend_state._replicas[tag].get( states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping() - replicas( - backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready() + backend_state._replicas[tag].get( + states=[ReplicaState.STARTING])[0]._actor.set_ready() backend_state.update() - assert len(replicas(backend_state)) == 1 - assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1 - assert replicas( - backend_state, states=[ReplicaState.RUNNING])[0].version == "2" + assert len(backend_state._replicas[tag].get()) == 1 + assert backend_state._replicas[tag].count( + states=[ReplicaState.RUNNING]) == 1 + assert backend_state._replicas[tag].get( + states=[ReplicaState.RUNNING])[0].version == "2" backend_state.update() assert goal_manager.check_complete(update_goal)