[serve] Make replica state management unit-testable (#14797)

This commit is contained in:
Edward Oakes 2021-03-19 14:06:23 -05:00 committed by GitHub
parent 160519d47f
commit 75dfae84e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 495 additions and 241 deletions

View file

@ -1,5 +1,7 @@
from abc import ABC
from collections import defaultdict
from enum import Enum
import math
import time
from typing import Any, Dict, List, Optional, Tuple
@ -35,6 +37,9 @@ class ReplicaState(Enum):
STOPPED = 6
ALL_REPLICA_STATES = list(ReplicaState)
class ActorReplicaWrapper:
"""Wraps a Ray actor for a backend replica.
@ -175,7 +180,13 @@ class ActorReplicaWrapper:
pass
class BackendReplica:
class VersionedReplica(ABC):
@property
def version(self):
return self.version
class BackendReplica(VersionedReplica):
"""Manages state transitions for backend replicas.
This is basically a checkpointable lightweight state machine.
@ -328,6 +339,101 @@ class BackendReplica:
return False
class ReplicaStateContainer:
"""Container for mapping ReplicaStates to lists of BackendReplicas."""
def __init__(self):
self._replicas: Dict[ReplicaState, List[BackendReplica]] = defaultdict(
list)
def add(self, state: ReplicaState, replica: VersionedReplica):
"""Add the provided replica under the provided state.
Args:
state (ReplicaState): state to add the replica under.
replica (VersionedReplica): replica to add.
"""
assert isinstance(state, ReplicaState)
assert isinstance(replica, VersionedReplica)
self._replicas[state].append(replica)
def get(self, states: Optional[List[ReplicaState]] = None
) -> List[VersionedReplica]:
"""Get all replicas of the given states.
This does not remove them from the container. Replicas are returned
in order of state as passed in.
Args:
states (str): states to consider. If not specified, all replicas
are considered.
"""
if states is None:
states = ALL_REPLICA_STATES
assert isinstance(states, list)
return sum((self._replicas[state] for state in states), [])
def pop(self,
exclude_version: Optional[str] = None,
states: Optional[List[ReplicaState]] = None,
max_replicas: Optional[int] = math.inf) -> List[VersionedReplica]:
"""Get and remove all replicas of the given states.
This removes the replicas from the container. Replicas are returned
in order of state as passed in.
Args:
exclude_version (str): if specified, replicas of the provided
version will *not* be removed.
states (str): states to consider. If not specified, all replicas
are considered.
max_replicas (int): max number of replicas to return. If not
specified, will pop all replicas matching the criteria.
"""
if states is None:
states = ALL_REPLICA_STATES
assert exclude_version is None or isinstance(exclude_version, str)
assert isinstance(states, list)
replicas = []
for state in states:
popped = []
remaining = []
for replica in self._replicas[state]:
if len(replicas) + len(popped) == max_replicas:
remaining.append(replica)
elif (exclude_version is not None
and replica.version == exclude_version):
remaining.append(replica)
else:
popped.append(replica)
self._replicas[state] = remaining
replicas.extend(popped)
return replicas
def count(self, states: Optional[List[ReplicaState]] = None):
"""Get the total count of replicas of the given states.
Args:
states (str): states to consider. If not specified, all replicas
are considered.
"""
if states is None:
states = ALL_REPLICA_STATES
assert isinstance(states, list)
return sum(len(self._replicas[state]) for state in states)
def __str__(self):
return str(self._replicas)
def __repr__(self):
return repr(self._replicas)
class BackendState:
"""Manages all state for backends in the system.
@ -344,9 +450,7 @@ class BackendState:
self._kv_store = kv_store
self._long_poll_host = long_poll_host
self._goal_manager = goal_manager
self._replicas: Dict[BackendTag, Dict[ReplicaState, List[
BackendReplica]]] = defaultdict(lambda: defaultdict(list))
self._replicas: Dict[BackendTag, ReplicaStateContainer] = dict()
self._backend_metadata: Dict[BackendTag, BackendInfo] = dict()
self._target_replicas: Dict[BackendTag, int] = defaultdict(int)
self._backend_goals: Dict[BackendTag, GoalId] = dict()
@ -387,11 +491,10 @@ class BackendState:
self) -> Dict[BackendTag, Dict[ReplicaTag, ActorHandle]]:
return {
backend_tag: {
backend_replica._replica_tag: backend_replica.actor_handle
for backend_replica in state_to_replica_dict[
ReplicaState.RUNNING]
r.replica_tag: r.actor_handle
for r in replicas.get(states=[ReplicaState.RUNNING])
}
for backend_tag, state_to_replica_dict in self._replicas.items()
for backend_tag, replicas in self._replicas.items()
}
def get_backend_configs(self) -> Dict[BackendTag, BackendConfig]:
@ -440,6 +543,9 @@ class BackendState:
and self._target_versions[backend_tag] == version):
return self._backend_goals.get(backend_tag, None)
if backend_tag not in self._replicas:
self._replicas[backend_tag] = ReplicaStateContainer()
backend_replica_class = create_backend_replica(
replica_config.backend_def)
@ -485,31 +591,28 @@ class BackendState:
def _stop_wrong_version_replicas(
self, backend_tag: BackendTag, version: str,
graceful_shutdown_timeout_s: float) -> int:
# NOTE(edoakes): this short-circuits when using the legacy
# `create_backend` codepath -- it can be removed once we deprecate
# that as the version should never be None.
if version is None:
return
# TODO(edoakes): to implement rolling upgrade, all we should need to
# do is cap the number of old version replicas that are stopped here.
num_stopped = 0
for target_state in [
replicas_to_stop = self._replicas[backend_tag].pop(
exclude_version=version,
states=[
ReplicaState.SHOULD_START, ReplicaState.STARTING,
ReplicaState.RUNNING
]:
target_version = []
wrong_version = []
for replica in self._replicas[backend_tag][target_state]:
if replica.version == version:
target_version.append(replica)
else:
wrong_version.append(replica)
])
self._replicas[backend_tag][target_state] = target_version
for replica in wrong_version:
replica.set_should_stop(graceful_shutdown_timeout_s)
self._replicas[backend_tag][ReplicaState.SHOULD_STOP].append(
replica)
num_stopped += 1
if len(replicas_to_stop) > 0:
logger.info(f"Stopping {len(replicas_to_stop)} replicas of "
f"backend '{backend_tag}' with outdated versions.")
if num_stopped > 0:
logger.info(f"Stopping {num_stopped} replicas of backend "
f"'{backend_tag}' with outdated versions.")
for replica in replicas_to_stop:
replica.set_should_stop(graceful_shutdown_timeout_s)
self._replicas[backend_tag].add(ReplicaState.SHOULD_STOP, replica)
def _scale_backend_replicas(
self,
@ -541,10 +644,9 @@ class BackendState:
self._stop_wrong_version_replicas(backend_tag, version,
graceful_shutdown_timeout_s)
current_num_replicas = sum([
len(self._replicas[backend_tag][ReplicaState.SHOULD_START]),
len(self._replicas[backend_tag][ReplicaState.STARTING]),
len(self._replicas[backend_tag][ReplicaState.RUNNING]),
current_num_replicas = self._replicas[backend_tag].count(states=[
ReplicaState.SHOULD_START, ReplicaState.STARTING,
ReplicaState.RUNNING
])
delta_num_replicas = num_replicas - current_num_replicas
@ -557,7 +659,8 @@ class BackendState:
delta_num_replicas, backend_tag))
for _ in range(delta_num_replicas):
replica_tag = "{}#{}".format(backend_tag, get_random_letters())
self._replicas[backend_tag][ReplicaState.SHOULD_START].append(
self._replicas[backend_tag].add(
ReplicaState.SHOULD_START,
BackendReplica(self._controller_name, self._detached,
replica_tag, backend_tag, version))
@ -567,17 +670,17 @@ class BackendState:
assert self._target_replicas[backend_tag] >= delta_num_replicas
for _ in range(-delta_num_replicas):
replica_state_dict = self._replicas[backend_tag]
list_to_use = replica_state_dict[ReplicaState.SHOULD_START] \
or replica_state_dict[ReplicaState.STARTING] \
or replica_state_dict[ReplicaState.RUNNING]
replicas_to_stop = self._replicas[backend_tag].pop(
states=[
ReplicaState.SHOULD_START, ReplicaState.STARTING,
ReplicaState.RUNNING
],
max_replicas=-delta_num_replicas)
assert len(list_to_use), replica_state_dict
replica_to_stop = list_to_use.pop()
replica_to_stop.set_should_stop(graceful_shutdown_timeout_s)
self._replicas[backend_tag][ReplicaState.SHOULD_STOP].append(
replica_to_stop)
for replica in replicas_to_stop:
replica.set_should_stop(graceful_shutdown_timeout_s)
self._replicas[backend_tag].add(ReplicaState.SHOULD_STOP,
replica)
return True
@ -586,57 +689,49 @@ class BackendState:
for backend_tag, num_replicas in list(self._target_replicas.items()):
checkpoint_needed |= self._scale_backend_replicas(
backend_tag, num_replicas, self._target_versions[backend_tag])
if num_replicas == 0:
del self._backend_metadata[backend_tag]
del self._target_replicas[backend_tag]
del self._target_versions[backend_tag]
if checkpoint_needed:
self._checkpoint()
def _pop_replicas_of_state(self, state: ReplicaState
) -> List[Tuple[ReplicaState, BackendTag]]:
replicas = []
for backend_tag, state_to_replica_dict in self._replicas.items():
if state in state_to_replica_dict:
replicas.extend(
(replica, backend_tag)
for replica in state_to_replica_dict.pop(state))
return replicas
def _completed_goals(self) -> List[GoalId]:
completed_goals = []
all_tags = set(self._replicas.keys()).union(
set(self._backend_metadata.keys()))
for backend_tag in all_tags:
desired_num_replicas = self._target_replicas.get(backend_tag)
state_dict = self._replicas.get(backend_tag, {})
existing_info = state_dict.get(ReplicaState.RUNNING, [])
deleted_backends = []
for backend_tag in self._replicas:
target_count = self._target_replicas.get(backend_tag, 0)
# If we have pending ops, the current goal is *not* ready.
if (state_dict.get(ReplicaState.SHOULD_START)
or state_dict.get(ReplicaState.STARTING)
or state_dict.get(ReplicaState.SHOULD_STOP)
or state_dict.get(ReplicaState.STOPPING)):
if (self._replicas[backend_tag].count(states=[
ReplicaState.SHOULD_START,
ReplicaState.STARTING,
ReplicaState.SHOULD_STOP,
ReplicaState.STOPPING,
]) > 0):
continue
running = self._replicas[backend_tag].get(
states=[ReplicaState.RUNNING])
running_count = len(running)
# Check for deleting.
if (not desired_num_replicas or
desired_num_replicas == 0) and \
(not existing_info or len(existing_info) == 0):
if target_count == 0 and running_count == 0:
deleted_backends.append(backend_tag)
completed_goals.append(
self._backend_goals.pop(backend_tag, None))
# Check for a non-zero number of backends.
if (desired_num_replicas and existing_info) \
and desired_num_replicas == len(existing_info):
elif target_count == running_count:
# Check that all running replicas are the target version.
target_version = self._target_versions[backend_tag]
if all(r.version == target_version for r in existing_info):
if all(r.version == target_version for r in running):
completed_goals.append(
self._backend_goals.pop(backend_tag, None))
for backend_tag in deleted_backends:
del self._replicas[backend_tag]
del self._backend_metadata[backend_tag]
del self._target_replicas[backend_tag]
del self._target_versions[backend_tag]
return [goal for goal in completed_goals if goal]
def update(self) -> bool:
@ -647,43 +742,28 @@ class BackendState:
for goal_id in self._completed_goals():
self._goal_manager.complete_goal(goal_id)
for replica_state, backend_tag in self._pop_replicas_of_state(
ReplicaState.SHOULD_START):
replica_state.start(self._backend_metadata[backend_tag])
self._replicas[backend_tag][ReplicaState.STARTING].append(
replica_state)
for replica_state, backend_tag in self._pop_replicas_of_state(
ReplicaState.SHOULD_STOP):
replica_state.stop()
self._replicas[backend_tag][ReplicaState.STOPPING].append(
replica_state)
transition_triggered = False
for backend_tag, replicas in self._replicas.items():
for replica in replicas.pop(states=[ReplicaState.SHOULD_START]):
replica.start(self._backend_metadata[backend_tag])
replicas.add(ReplicaState.STARTING, replica)
for replica_state, backend_tag in self._pop_replicas_of_state(
ReplicaState.STARTING):
if replica_state.check_started():
self._replicas[backend_tag][ReplicaState.RUNNING].append(
replica_state)
transition_triggered = True
else:
self._replicas[backend_tag][ReplicaState.STARTING].append(
replica_state)
for replica in replicas.pop(states=[ReplicaState.SHOULD_STOP]):
replica.stop()
replicas.add(ReplicaState.STOPPING, replica)
for replica_state, backend_tag in self._pop_replicas_of_state(
ReplicaState.STOPPING):
if replica_state.check_stopped():
transition_triggered = True
else:
self._replicas[backend_tag][ReplicaState.STOPPING].append(
replica_state)
for replica in replicas.pop(states=[ReplicaState.STARTING]):
if replica.check_started():
replicas.add(ReplicaState.RUNNING, replica)
transition_triggered = True
else:
replicas.add(ReplicaState.STARTING, replica)
for backend_tag in list(self._replicas.keys()):
if not any(self._replicas[backend_tag]):
del self._replicas[backend_tag]
del self._backend_metadata[backend_tag]
del self._target_replicas[backend_tag]
for replica in replicas.pop(states=[ReplicaState.STOPPING]):
if replica.check_stopped():
transition_triggered = True
else:
replicas.add(ReplicaState.STOPPING, replica)
if transition_triggered:
self._checkpoint()

View file

@ -13,7 +13,12 @@ from ray.serve.common import (
ReplicaConfig,
ReplicaTag,
)
from ray.serve.backend_state import BackendState, ReplicaState
from ray.serve.backend_state import (
BackendState,
ReplicaState,
ReplicaStateContainer,
VersionedReplica,
)
class MockReplicaActorWrapper:
@ -115,26 +120,159 @@ def mock_backend_state() -> Tuple[BackendState, Mock, Mock]:
yield backend_state, timer, goal_manager
def replicas(backend_state, backend=None, states=None):
replicas = []
for backend_tag, state_dict in backend_state._replicas.items():
if backend is None or backend_tag == backend:
for state, replica_list in state_dict.items():
if states is None or state in states:
replicas.extend(replica_list)
def replica(version: Optional[str] = None) -> VersionedReplica:
class MockVersionedReplica(VersionedReplica):
def __init__(self, version):
self._version = version
return replicas
@property
def version(self):
return self._version
return MockVersionedReplica(version)
def test_replica_state_container_count():
c = ReplicaStateContainer()
r1, r2, r3 = replica(), replica(), replica()
c.add(ReplicaState.STARTING, r1)
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.STOPPING, r3)
assert c.count() == 3
assert c.count() == c.count([ReplicaState.STARTING, ReplicaState.STOPPING])
assert c.count([ReplicaState.STARTING]) == 2
assert c.count([ReplicaState.STOPPING]) == 1
assert not c.count([ReplicaState.SHOULD_START])
assert not c.count([ReplicaState.SHOULD_START, ReplicaState.SHOULD_STOP])
def test_replica_state_container_get():
c = ReplicaStateContainer()
r1, r2, r3 = replica(), replica(), replica()
c.add(ReplicaState.STARTING, r1)
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.STOPPING, r3)
assert c.get() == [r1, r2, r3]
assert c.get() == c.get([ReplicaState.STARTING, ReplicaState.STOPPING])
assert c.get([ReplicaState.STARTING]) == [r1, r2]
assert c.get([ReplicaState.STOPPING]) == [r3]
assert not c.get([ReplicaState.SHOULD_START])
assert not c.get([ReplicaState.SHOULD_START, ReplicaState.SHOULD_STOP])
def test_replica_state_container_pop_basic():
c = ReplicaStateContainer()
r1, r2, r3 = replica(), replica(), replica()
c.add(ReplicaState.STARTING, r1)
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.STOPPING, r3)
assert c.pop() == [r1, r2, r3]
assert not c.pop()
def test_replica_state_container_pop_exclude_version():
c = ReplicaStateContainer()
r1, r2, r3 = replica("1"), replica("1"), replica("2")
c.add(ReplicaState.STARTING, r1)
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.STARTING, r3)
assert c.pop(exclude_version="1") == [r3]
assert not c.pop(exclude_version="1")
assert c.pop(exclude_version="2") == [r1, r2]
assert not c.pop(exclude_version="2")
assert not c.pop()
def test_replica_state_container_pop_max_replicas():
c = ReplicaStateContainer()
r1, r2, r3 = replica(), replica(), replica()
c.add(ReplicaState.STARTING, r1)
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.STOPPING, r3)
assert not c.pop(max_replicas=0)
assert len(c.pop(max_replicas=1)) == 1
assert len(c.pop(max_replicas=2)) == 2
c.add(ReplicaState.STARTING, r1)
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.STOPPING, r3)
assert len(c.pop(max_replicas=10)) == 3
def test_replica_state_container_pop_states():
c = ReplicaStateContainer()
r1, r2, r3, r4 = replica(), replica(), replica(), replica()
# Check popping single state.
c.add(ReplicaState.STOPPING, r1)
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.SHOULD_STOP, r3)
c.add(ReplicaState.SHOULD_STOP, r4)
assert c.pop(states=[ReplicaState.STARTING]) == [r2]
assert not c.pop(states=[ReplicaState.STARTING])
assert c.pop(states=[ReplicaState.STOPPING]) == [r1]
assert not c.pop(states=[ReplicaState.STOPPING])
assert c.pop(states=[ReplicaState.SHOULD_STOP]) == [r3, r4]
assert not c.pop(states=[ReplicaState.SHOULD_STOP])
# Check popping multiple states. Ordering of states should be preserved.
c.add(ReplicaState.STOPPING, r1)
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.SHOULD_STOP, r3)
c.add(ReplicaState.SHOULD_STOP, r4)
assert c.pop(states=[ReplicaState.SHOULD_STOP, ReplicaState.STOPPING]) == [
r3, r4, r1
]
assert not c.pop(states=[ReplicaState.SHOULD_STOP, ReplicaState.STOPPING])
assert c.pop(states=[ReplicaState.STARTING]) == [r2]
assert not c.pop(states=[ReplicaState.STARTING])
assert not c.pop()
def test_replica_state_container_pop_integration():
c = ReplicaStateContainer()
r1, r2, r3, r4 = replica("1"), replica("2"), replica("2"), replica("3")
c.add(ReplicaState.STOPPING, r1)
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.SHOULD_STOP, r3)
c.add(ReplicaState.SHOULD_STOP, r4)
assert not c.pop(exclude_version="1", states=[ReplicaState.STOPPING])
assert c.pop(
exclude_version="1", states=[ReplicaState.SHOULD_STOP],
max_replicas=1) == [r3]
assert c.pop(
exclude_version="1", states=[ReplicaState.SHOULD_STOP],
max_replicas=1) == [r4]
c.add(ReplicaState.SHOULD_STOP, r3)
c.add(ReplicaState.SHOULD_STOP, r4)
assert c.pop(
exclude_version="1", states=[ReplicaState.SHOULD_STOP]) == [r3, r4]
assert c.pop(exclude_version="1", states=[ReplicaState.STARTING]) == [r2]
c.add(ReplicaState.STARTING, r2)
c.add(ReplicaState.SHOULD_STOP, r3)
c.add(ReplicaState.SHOULD_STOP, r4)
assert c.pop(
exclude_version="1",
states=[ReplicaState.SHOULD_STOP,
ReplicaState.STARTING]) == [r3, r4, r2]
assert c.pop(
exclude_version="nonsense", states=[ReplicaState.STOPPING]) == [r1]
def test_override_goals(mock_backend_state):
backend_state, _, goal_manager = mock_backend_state
tag = "tag"
b_config_1, r_config_1 = generate_configs()
initial_goal = backend_state.deploy_backend("tag1", b_config_1, r_config_1)
initial_goal = backend_state.deploy_backend(tag, b_config_1, r_config_1)
assert not goal_manager.check_complete(initial_goal)
b_config_2, r_config_2 = generate_configs(num_replicas=2)
new_goal = backend_state.deploy_backend("tag1", b_config_2, r_config_2)
new_goal = backend_state.deploy_backend(tag, b_config_2, r_config_2)
assert goal_manager.check_complete(initial_goal)
assert not goal_manager.check_complete(new_goal)
@ -142,11 +280,12 @@ def test_override_goals(mock_backend_state):
def test_return_existing_goal(mock_backend_state):
backend_state, _, goal_manager = mock_backend_state
tag = "tag"
b_config_1, r_config_1 = generate_configs()
initial_goal = backend_state.deploy_backend("tag1", b_config_1, r_config_1)
initial_goal = backend_state.deploy_backend(tag, b_config_1, r_config_1)
assert not goal_manager.check_complete(initial_goal)
new_goal = backend_state.deploy_backend("tag1", b_config_1, r_config_1)
new_goal = backend_state.deploy_backend(tag, b_config_1, r_config_1)
assert initial_goal == new_goal
assert not goal_manager.check_complete(initial_goal)
@ -154,50 +293,56 @@ def test_return_existing_goal(mock_backend_state):
def test_create_delete_single_replica(mock_backend_state):
backend_state, timer, goal_manager = mock_backend_state
assert len(replicas(backend_state)) == 0
assert len(backend_state._replicas) == 0
tag = "tag"
b_config_1, r_config_1 = generate_configs()
create_goal = backend_state.deploy_backend("tag1", b_config_1, r_config_1)
create_goal = backend_state.deploy_backend(tag, b_config_1, r_config_1)
# Single replica should be created.
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
assert replicas(backend_state)[0]._actor.started
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.STARTING]) == 1
assert backend_state._replicas[tag].count() == 1
# update() should not transition the state if the replica isn't ready.
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
replicas(backend_state)[0]._actor.set_ready()
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.STARTING]) == 1
backend_state._replicas[tag].get()[0]._actor.set_ready()
assert not goal_manager.check_complete(create_goal)
# Now the replica should be marked running.
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
assert len(backend_state._replicas[tag].get()) == 1
assert len(
backend_state._replicas[tag].get(states=[ReplicaState.RUNNING])) == 1
# TODO(edoakes): can we remove this extra update period for completing it?
backend_state.update()
assert goal_manager.check_complete(create_goal)
# Removing the replica should transition it to stopping.
delete_goal = backend_state.delete_backend("tag1")
delete_goal = backend_state.delete_backend(tag)
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
assert replicas(backend_state)[0]._actor.stopped
assert not replicas(backend_state)[0]._actor.cleaned_up
assert len(backend_state._replicas[tag].get()) == 1
assert len(
backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1
assert backend_state._replicas[tag].get()[0]._actor.stopped
assert not backend_state._replicas[tag].get()[0]._actor.cleaned_up
assert not goal_manager.check_complete(delete_goal)
# Once it's done stopping, replica should be removed.
replica = replicas(backend_state)[0]
replica = backend_state._replicas[tag].get()[0]
replica._actor.set_done_stopping()
backend_state.update()
assert len(replicas(backend_state)) == 0
assert len(backend_state._replicas[tag].get()) == 0
# TODO(edoakes): can we remove this extra update period for completing it?
backend_state.update()
assert len(backend_state._replicas) == 0
assert goal_manager.check_complete(delete_goal)
replica._actor.cleaned_up
@ -205,59 +350,68 @@ def test_create_delete_single_replica(mock_backend_state):
def test_force_kill(mock_backend_state):
backend_state, timer, goal_manager = mock_backend_state
assert len(replicas(backend_state)) == 0
assert len(backend_state._replicas) == 0
grace_period_s = 10
b_config_1, r_config_1 = generate_configs()
b_config_1.experimental_graceful_shutdown_timeout_s = grace_period_s
# Create and delete the backend.
backend_state.deploy_backend("tag1", b_config_1, r_config_1)
tag = "tag"
backend_state.deploy_backend(tag, b_config_1, r_config_1)
backend_state.update()
replicas(backend_state)[0]._actor.set_ready()
backend_state._replicas[tag].get()[0]._actor.set_ready()
backend_state.update()
delete_goal = backend_state.delete_backend("tag1")
delete_goal = backend_state.delete_backend(tag)
backend_state.update()
# Replica should remain in STOPPING until it finishes.
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
assert replicas(backend_state)[0]._actor.stopped
assert len(backend_state._replicas[tag].get()) == 1
assert len(
backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1
assert backend_state._replicas[tag].get()[0]._actor.stopped
backend_state.update()
backend_state.update()
# force_stop shouldn't be called until after the timer.
assert not replicas(backend_state)[0]._actor.force_stopped_counter
assert not replicas(backend_state)[0]._actor.cleaned_up
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
assert not backend_state._replicas[tag].get(
)[0]._actor.force_stopped_counter
assert not backend_state._replicas[tag].get()[0]._actor.cleaned_up
assert len(backend_state._replicas[tag].get()) == 1
assert len(
backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1
# Advance the timer, now the replica should be force stopped.
timer.advance(grace_period_s + 0.1)
backend_state.update()
assert replicas(backend_state)[0]._actor.force_stopped_counter == 1
assert not replicas(backend_state)[0]._actor.cleaned_up
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
assert backend_state._replicas[tag].get()[
0]._actor.force_stopped_counter == 1
assert not backend_state._replicas[tag].get()[0]._actor.cleaned_up
assert len(backend_state._replicas[tag].get()) == 1
assert len(
backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1
assert not goal_manager.check_complete(delete_goal)
# Force stop should be called repeatedly until the replica stops.
backend_state.update()
assert replicas(backend_state)[0]._actor.force_stopped_counter == 2
assert not replicas(backend_state)[0]._actor.cleaned_up
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
assert backend_state._replicas[tag].get()[
0]._actor.force_stopped_counter == 2
assert not backend_state._replicas[tag].get()[0]._actor.cleaned_up
assert len(backend_state._replicas[tag].get()) == 1
assert len(
backend_state._replicas[tag].get(states=[ReplicaState.STOPPING])) == 1
assert not goal_manager.check_complete(delete_goal)
# Once the replica is done stopping, it should be removed.
replica = replicas(backend_state)[0]
replica = backend_state._replicas[tag].get()[0]
replica._actor.set_done_stopping()
backend_state.update()
assert len(replicas(backend_state)) == 0
assert len(backend_state._replicas[tag].get()) == 0
# TODO(edoakes): can we remove this extra update period for completing it?
backend_state.update()
assert len(backend_state._replicas) == 0
assert goal_manager.check_complete(delete_goal)
assert replica._actor.cleaned_up
@ -266,95 +420,105 @@ def test_redeploy_same_version(mock_backend_state):
# Redeploying with the same version and code should do nothing.
backend_state, timer, goal_manager = mock_backend_state
assert len(replicas(backend_state)) == 0
assert len(backend_state._replicas) == 0
tag = "tag"
b_config_1, r_config_1 = generate_configs()
goal_1 = backend_state.deploy_backend(
"tag1", b_config_1, r_config_1, version="1")
tag, b_config_1, r_config_1, version="1")
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
assert replicas(backend_state)[0].version == "1"
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.STARTING]) == 1
assert backend_state._replicas[tag].get()[0].version == "1"
assert not goal_manager.check_complete(goal_1)
# Test redeploying while the initial deployment is still pending.
_, r_config_2 = generate_configs()
goal_2 = backend_state.deploy_backend(
"tag1", b_config_1, r_config_2, version="1")
tag, b_config_1, r_config_2, version="1")
assert goal_1 == goal_2
assert not goal_manager.check_complete(goal_1)
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
assert replicas(backend_state)[0].version == "1"
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.STARTING]) == 1
assert backend_state._replicas[tag].get()[0].version == "1"
# Mark the replica ready. After this, the initial goal should be complete.
replicas(backend_state)[0]._actor.set_ready()
backend_state._replicas[tag].get()[0]._actor.set_ready()
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
assert replicas(backend_state)[0].version == "1"
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.RUNNING]) == 1
assert backend_state._replicas[tag].get()[0].version == "1"
backend_state.update()
assert goal_manager.check_complete(goal_1)
# Test redeploying after the initial deployment has finished.
same_version_goal = backend_state.deploy_backend(
"tag1", b_config_1, r_config_1, version="1")
tag, b_config_1, r_config_1, version="1")
assert goal_manager.check_complete(same_version_goal)
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
assert replicas(backend_state)[0].version == "1"
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.RUNNING]) == 1
assert backend_state._replicas[tag].get()[0].version == "1"
assert goal_manager.check_complete(goal_2)
assert len(backend_state._replicas) == 1
def test_redeploy_new_version(mock_backend_state):
# Redeploying with a new version should start a new replica.
backend_state, timer, goal_manager = mock_backend_state
assert len(replicas(backend_state)) == 0
assert len(backend_state._replicas) == 0
tag = "tag"
b_config_1, r_config_1 = generate_configs()
goal_1 = backend_state.deploy_backend(
"tag1", b_config_1, r_config_1, version="1")
tag, b_config_1, r_config_1, version="1")
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
assert replicas(backend_state)[0].version == "1"
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.STARTING]) == 1
assert backend_state._replicas[tag].get()[0].version == "1"
assert not goal_manager.check_complete(goal_1)
# Test redeploying while the initial deployment is still pending.
_, r_config_2 = generate_configs()
goal_2 = backend_state.deploy_backend(
"tag1", b_config_1, r_config_2, version="2")
tag, b_config_1, r_config_2, version="2")
assert goal_1 != goal_2
assert goal_manager.check_complete(goal_1)
assert not goal_manager.check_complete(goal_2)
# The initial replica should be stopping and the new replica starting.
backend_state.update()
assert len(replicas(backend_state)) == 2
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
assert replicas(
backend_state, states=[ReplicaState.STOPPING])[0].version == "1"
assert replicas(
backend_state, states=[ReplicaState.STARTING])[0].version == "2"
assert len(backend_state._replicas[tag].get()) == 2
assert backend_state._replicas[tag].count(
states=[ReplicaState.STOPPING]) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.STARTING]) == 1
assert backend_state._replicas[tag].get(
states=[ReplicaState.STOPPING])[0].version == "1"
assert backend_state._replicas[tag].get(
states=[ReplicaState.STARTING])[0].version == "2"
# The initial replica should be gone and the new replica running.
replicas(
backend_state,
backend_state._replicas[tag].get(
states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping()
replicas(
backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready()
backend_state._replicas[tag].get(
states=[ReplicaState.STARTING])[0]._actor.set_ready()
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
assert replicas(
backend_state, states=[ReplicaState.RUNNING])[0].version == "2"
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.RUNNING]) == 1
assert backend_state._replicas[tag].get(
states=[ReplicaState.RUNNING])[0].version == "2"
backend_state.update()
assert goal_manager.check_complete(goal_2)
@ -362,31 +526,34 @@ def test_redeploy_new_version(mock_backend_state):
# Now deploy a third version after the transition has finished.
_, r_config_3 = generate_configs()
goal_3 = backend_state.deploy_backend(
"tag1", b_config_1, r_config_3, version="3")
tag, b_config_1, r_config_3, version="3")
assert not goal_manager.check_complete(goal_3)
backend_state.update()
assert len(replicas(backend_state)) == 2
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
assert replicas(
backend_state, states=[ReplicaState.STOPPING])[0].version == "2"
assert replicas(
backend_state, states=[ReplicaState.STARTING])[0].version == "3"
assert len(backend_state._replicas[tag].get()) == 2
assert backend_state._replicas[tag].count(
states=[ReplicaState.STOPPING]) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.STARTING]) == 1
assert backend_state._replicas[tag].get(
states=[ReplicaState.STOPPING])[0].version == "2"
assert backend_state._replicas[tag].get(
states=[ReplicaState.STARTING])[0].version == "3"
replicas(
backend_state,
backend_state._replicas[tag].get(
states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping()
replicas(
backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready()
backend_state._replicas[tag].get(
states=[ReplicaState.STARTING])[0]._actor.set_ready()
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
assert replicas(
backend_state, states=[ReplicaState.RUNNING])[0].version == "3"
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.RUNNING]) == 1
assert backend_state._replicas[tag].get(
states=[ReplicaState.RUNNING])[0].version == "3"
backend_state.update()
assert goal_manager.check_complete(goal_3)
assert len(backend_state._replicas) == 1
def test_deploy_new_config_same_version(mock_backend_state):
@ -394,28 +561,31 @@ def test_deploy_new_config_same_version(mock_backend_state):
# replica.
backend_state, timer, goal_manager = mock_backend_state
assert len(replicas(backend_state)) == 0
assert len(backend_state._replicas) == 0
tag = "tag"
b_config_1, r_config_1 = generate_configs()
create_goal = backend_state.deploy_backend(
"tag1", b_config_1, r_config_1, version="1")
tag, b_config_1, r_config_1, version="1")
# Create the replica initially.
backend_state.update()
replicas(backend_state)[0]._actor.set_ready()
backend_state._replicas[tag].get()[0]._actor.set_ready()
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.RUNNING]) == 1
backend_state.update()
assert goal_manager.check_complete(create_goal)
# Update to a new config without changing the version.
b_config_2, _ = generate_configs(user_config={"hello": "world"})
update_goal = backend_state.deploy_backend(
"tag1", b_config_2, r_config_1, version="1")
tag, b_config_2, r_config_1, version="1")
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.RUNNING]) == 1
backend_state.update()
assert goal_manager.check_complete(update_goal)
@ -424,45 +594,49 @@ def test_deploy_new_config_new_version(mock_backend_state):
# Deploying a new config with a new version should deploy a new replica.
backend_state, timer, goal_manager = mock_backend_state
assert len(replicas(backend_state)) == 0
assert len(backend_state._replicas) == 0
tag = "tag"
b_config_1, r_config_1 = generate_configs()
create_goal = backend_state.deploy_backend(
"tag1", b_config_1, r_config_1, version="1")
tag, b_config_1, r_config_1, version="1")
# Create the replica initially.
backend_state.update()
replicas(backend_state)[0]._actor.set_ready()
backend_state._replicas[tag].get()[0]._actor.set_ready()
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.RUNNING]) == 1
backend_state.update()
assert goal_manager.check_complete(create_goal)
# Update to a new config and a new version.
b_config_2, _ = generate_configs(user_config={"hello": "world"})
update_goal = backend_state.deploy_backend(
"tag1", b_config_2, r_config_1, version="2")
tag, b_config_2, r_config_1, version="2")
backend_state.update()
assert len(replicas(backend_state)) == 2
assert len(replicas(backend_state, states=[ReplicaState.STOPPING])) == 1
assert len(replicas(backend_state, states=[ReplicaState.STARTING])) == 1
assert replicas(
backend_state, states=[ReplicaState.STOPPING])[0].version == "1"
assert replicas(
backend_state, states=[ReplicaState.STARTING])[0].version == "2"
assert len(backend_state._replicas[tag].get()) == 2
assert backend_state._replicas[tag].count(
states=[ReplicaState.STOPPING]) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.STARTING]) == 1
assert backend_state._replicas[tag].get(
states=[ReplicaState.STOPPING])[0].version == "1"
assert backend_state._replicas[tag].get(
states=[ReplicaState.STARTING])[0].version == "2"
replicas(
backend_state,
backend_state._replicas[tag].get(
states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping()
replicas(
backend_state, states=[ReplicaState.STARTING])[0]._actor.set_ready()
backend_state._replicas[tag].get(
states=[ReplicaState.STARTING])[0]._actor.set_ready()
backend_state.update()
assert len(replicas(backend_state)) == 1
assert len(replicas(backend_state, states=[ReplicaState.RUNNING])) == 1
assert replicas(
backend_state, states=[ReplicaState.RUNNING])[0].version == "2"
assert len(backend_state._replicas[tag].get()) == 1
assert backend_state._replicas[tag].count(
states=[ReplicaState.RUNNING]) == 1
assert backend_state._replicas[tag].get(
states=[ReplicaState.RUNNING])[0].version == "2"
backend_state.update()
assert goal_manager.check_complete(update_goal)