[serve] split ReplicaStartupState.PENDING into PENDING_ALLOCATION and PENDING_INITIALIZATION (#19431)

This commit is contained in:
iasoon 2021-11-02 23:08:52 +01:00 committed by GitHub
parent f1eedb15b6
commit 7e6ea9e3df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 245 additions and 56 deletions

View file

@ -230,6 +230,15 @@ py_test(
deps = [":serve_lib"],
)
py_test(
name = "test_warnings",
size = "small",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)
py_test(
name = "test_fastapi",
size = "medium",

View file

@ -31,8 +31,8 @@ class ReplicaState(Enum):
class ReplicaStartupStatus(Enum):
PENDING = 1
PENDING_SLOW_START = 2
PENDING_ALLOCATION = 1
PENDING_INITIALIZATION = 2
SUCCEEDED = 3
FAILED = 4
@ -74,8 +74,10 @@ class ActorReplicaWrapper:
self._replica_tag = replica_tag
self._backend_tag = backend_tag
# Populated in self.start().
# Populated in either self.start() or self.recover()
self._allocated_obj_ref: ObjectRef = None
self._ready_obj_ref: ObjectRef = None
self._actor_resources: Dict[str, float] = None
self._max_concurrent_queries: int = None
self._graceful_shutdown_timeout_s: float = 0.0
@ -180,6 +182,7 @@ class ActorReplicaWrapper:
backend_info.deployment_config.to_proto_bytes(), version,
self._controller_name, self._detached)
self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
self._ready_obj_ref = self._actor_handle.reconfigure.remote(
backend_info.deployment_config.user_config)
@ -210,6 +213,9 @@ class ActorReplicaWrapper:
self._placement_group = self.get_placement_group(
self._placement_group_name)
# Re-fetch initialization proof
self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
# Running actor handle already has all info needed, thus successful
# starting simply means retrieving replica version hash from actor
self._ready_obj_ref = self._actor_handle.get_metadata.remote()
@ -222,7 +228,9 @@ class ActorReplicaWrapper:
Returns:
state (ReplicaStartupStatus):
PENDING:
PENDING_ALLOCATION:
- replica is waiting for a worker to start
PENDING_INITIALIZATION
- replica reconfigure() haven't returned.
FAILED:
- replica __init__() failed.
@ -235,11 +243,18 @@ class ActorReplicaWrapper:
version:
- replica __init__() and reconfigure() succeeded.
"""
# check whether the replica has been allocated
ready, _ = ray.wait([self._allocated_obj_ref], timeout=0)
if len(ready) == 0:
return ReplicaStartupStatus.PENDING_ALLOCATION, None
# check whether relica initialization has completed
ready, _ = ray.wait([self._ready_obj_ref], timeout=0)
# In case of deployment constructor failure, ray.get will help to
# surface exception to each update() cycle.
if len(ready) == 0:
return ReplicaStartupStatus.PENDING, None
return ReplicaStartupStatus.PENDING_INITIALIZATION, None
elif len(ready) > 0:
try:
deployment_config, version = ray.get(ready)[0]
@ -406,11 +421,8 @@ class BackendReplica(VersionedReplica):
"""
status, version = self._actor.check_ready()
if status == ReplicaStartupStatus.PENDING:
if time.time() - self._start_time > SLOW_STARTUP_WARNING_S:
status = ReplicaStartupStatus.PENDING_SLOW_START
elif status == ReplicaStartupStatus.SUCCEEDED:
# Re-assign DeploymentVersion if start / update / recover succeeded
if status == ReplicaStartupStatus.SUCCEEDED:
# Re-assign BackendVersion if start / update / recover succeeded
# by reading re-computed version in RayServeReplica
if version is not None:
self._version = version
@ -1003,10 +1015,9 @@ class BackendState:
return GoalStatus.PENDING
def _check_startup_replicas(self,
original_state: ReplicaState,
stop_on_slow=False
) -> Tuple[List[BackendReplica], bool]:
def _check_startup_replicas(
self, original_state: ReplicaState, stop_on_slow=False
) -> Tuple[List[Tuple[BackendReplica, ReplicaStartupStatus]], bool]:
"""
Common helper function for startup actions tracking and status
transition: STARTING, UPDATING and RECOVERING.
@ -1032,18 +1043,24 @@ class BackendState:
replica.stop(graceful=False)
self._replicas.add(ReplicaState.STOPPING, replica)
elif start_status == ReplicaStartupStatus.PENDING:
# Not done yet, remain at same state
self._replicas.add(original_state, replica)
else:
# Slow start, remain at same state but also add to
# slow start replicas.
if not stop_on_slow:
self._replicas.add(original_state, replica)
else:
elif start_status in [
ReplicaStartupStatus.PENDING_ALLOCATION,
ReplicaStartupStatus.PENDING_INITIALIZATION,
]:
is_slow = time.time(
) - replica._start_time > SLOW_STARTUP_WARNING_S
if is_slow:
slow_replicas.append((replica, start_status))
# Does it make sense to stop replicas in PENDING_ALLOCATION
# state?
if is_slow and stop_on_slow:
replica.stop(graceful=False)
self._replicas.add(ReplicaState.STOPPING, replica)
slow_replicas.append(replica)
else:
self._replicas.add(original_state, replica)
return slow_replicas, transitioned_to_running
@ -1086,17 +1103,38 @@ class BackendState:
if (len(slow_start_replicas)
and time.time() - self._prev_startup_warning >
SLOW_STARTUP_WARNING_PERIOD_S):
required, available = slow_start_replicas[
0].resource_requirements()
logger.warning(
f"Deployment '{self._name}' has "
f"{len(slow_start_replicas)} replicas that have taken "
f"more than {SLOW_STARTUP_WARNING_S}s to start up. This "
"may be caused by waiting for the cluster to auto-scale, "
"waiting for a runtime environment to install, or a slow "
"constructor. Resources required "
f"for each replica: {required}, resources available: "
f"{available}. component=serve deployment={self._name}")
pending_allocation = []
pending_initialization = []
for replica, startup_status in slow_start_replicas:
if startup_status == ReplicaStartupStatus.PENDING_ALLOCATION:
pending_allocation.append(replica)
if startup_status \
== ReplicaStartupStatus.PENDING_INITIALIZATION:
pending_initialization.append(replica)
if len(pending_allocation) > 0:
required, available = slow_start_replicas[0][
0].resource_requirements()
logger.warning(
f"Deployment '{self._name}' has "
f"{len(pending_allocation)} replicas that have taken "
f"more than {SLOW_STARTUP_WARNING_S}s to be scheduled. "
f"This may be caused by waiting for the cluster to "
f"auto-scale, or waiting for a runtime environment "
f"to install. "
f"Resources required for each replica: {required}, "
f"resources available: {available}. "
f"component=serve deployment={self._name}")
if len(pending_initialization) > 0:
logger.warning(
f"Deployment '{self._name}' has "
f"{len(pending_initialization)} replicas that have taken "
f"more than {SLOW_STARTUP_WARNING_S}s to initialize. This "
f"may be caused by a slow __init__ or reconfigure method."
f"component=serve deployment={self._name}")
self._prev_startup_warning = time.time()

View file

@ -65,30 +65,48 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes):
replica_tag,
controller_name,
servable_object=None)
if is_function:
_callable = backend
else:
# This allows backends to define an async __init__ method
# (required for FastAPI backend definition).
_callable = backend.__new__(backend)
await sync_to_async(_callable.__init__)(*init_args,
**init_kwargs)
# Setting the context again to update the servable_object.
ray.serve.api._set_internal_replica_context(
backend_tag,
replica_tag,
controller_name,
servable_object=_callable)
assert controller_name, "Must provide a valid controller_name"
controller_namespace = ray.serve.api._get_controller_namespace(
detached)
controller_handle = ray.get_actor(
controller_name, namespace=controller_namespace)
self.backend = RayServeReplica(
_callable, backend_tag, replica_tag, deployment_config,
deployment_config.user_config, version, is_function,
controller_handle)
# This closure initializes user code and finalizes replica
# startup. By splitting the initialization step like this,
# we can already access this actor before the user code
# has finished initializing.
# The supervising state manager can then wait
# for allocation of this replica by using the `is_allocated`
# method. After that, it calls `reconfigure` to trigger
# user code initialization.
async def initialize_backend():
if is_function:
_callable = backend
else:
# This allows backends to define an async __init__ method
# (required for FastAPI backend definition).
_callable = backend.__new__(backend)
await sync_to_async(_callable.__init__)(*init_args,
**init_kwargs)
# Setting the context again to update the servable_object.
ray.serve.api._set_internal_replica_context(
backend_tag,
replica_tag,
controller_name,
servable_object=_callable)
self.backend = RayServeReplica(
_callable, backend_tag, replica_tag, deployment_config,
deployment_config.user_config, version, is_function,
controller_handle)
# Is it fine that backend is None here?
# Should we add a check in all methods that use self.backend
# or, alternatively, create an async get_backend() method?
self.backend = None
self._initialize_backend = initialize_backend
# asyncio.Event used to signal that the replica is shutting down.
self.shutdown_event = asyncio.Event()
@ -108,8 +126,21 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes):
query = Query(request_args, request_kwargs, request_metadata)
return await self.backend.handle_request(query)
async def is_allocated(self):
"""poke the replica to check whether it's alive.
When calling this method on an ActorHandle, it will complete as
soon as the actor has started running. We use this mechanism to
detect when a replica has been allocated a worker slot.
At this time, the replica can transition from PENDING_ALLOCATION
to PENDING_INITIALIZATION startup state.
"""
pass
async def reconfigure(self, user_config: Optional[Any] = None
) -> Tuple[DeploymentConfig, DeploymentVersion]:
if self.backend is None:
await self._initialize_backend()
if user_config is not None:
await self.backend.reconfigure(user_config)

View file

@ -42,8 +42,8 @@ class MockReplicaActorWrapper:
self.recovering = False
# Will be set when `start()` is called.
self.version = None
# Expected to be set in the test.
self.ready = False
# Initial state for a replica is PENDING_ALLOCATION.
self.ready = ReplicaStartupStatus.PENDING_ALLOCATION
# Will be set when `graceful_stop()` is called.
self.stopped = False
# Expected to be set in the test.
@ -106,7 +106,7 @@ class MockReplicaActorWrapper:
def check_ready(self) -> ReplicaStartupStatus:
ready = self.ready
self.ready = ReplicaStartupStatus.PENDING
self.ready = ReplicaStartupStatus.PENDING_INITIALIZATION
if ready == ReplicaStartupStatus.SUCCEEDED and self.recovering:
self.recovering = False
self.started = True

View file

@ -9,6 +9,9 @@ import ray
from ray import serve
from ray.cluster_utils import Cluster
from ray.serve.backend_state import ReplicaStartupStatus, ReplicaState
from ray._private.test_utils import SignalActor, wait_for_condition
@pytest.fixture
def ray_cluster():
@ -113,5 +116,49 @@ def test_node_failure(ray_cluster):
assert pids3.issubset(pids4)
@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
def test_replica_startup_status_transitions(ray_cluster):
cluster = ray_cluster
cluster.add_node(num_cpus=1)
cluster.connect(namespace="serve")
serve_instance = serve.start()
signal = SignalActor.remote()
@serve.deployment(version="1", ray_actor_options={"num_cpus": 2})
class D:
def __init__(self):
ray.get(signal.wait.remote())
D.deploy(_blocking=False)
def get_replicas(replica_state):
controller = serve_instance._controller
replicas = ray.get(
controller._dump_replica_states_for_testing.remote(D.name))
return replicas.get([replica_state])
# wait for serve to start the replica, and catch a reference to it.
wait_for_condition(lambda: len(get_replicas(ReplicaState.STARTING)) > 0)
replica = get_replicas(ReplicaState.STARTING)[0]
# declare shorthands as yapf doesn't like long lambdas
PENDING_ALLOCATION = ReplicaStartupStatus.PENDING_ALLOCATION
PENDING_INITIALIZATION = ReplicaStartupStatus.PENDING_INITIALIZATION
SUCCEEDED = ReplicaStartupStatus.SUCCEEDED
# currently there are no resources to allocate the replica
assert replica.check_started() == PENDING_ALLOCATION
# add the necessary resources to allocate the replica
cluster.add_node(num_cpus=4)
wait_for_condition(
lambda: (replica.check_started() == PENDING_INITIALIZATION))
# send signal to complete replica intialization
signal.send.remote()
wait_for_condition(lambda: replica.check_started() == SUCCEEDED)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))

View file

@ -0,0 +1,64 @@
import time
from ray import serve
from ray.serve.backend_state import (SLOW_STARTUP_WARNING_S,
SLOW_STARTUP_WARNING_PERIOD_S)
def test_slow_allocation_warning(serve_instance, capsys):
# this deployment can never be scheduled
@serve.deployment(ray_actor_options={"num_cpus": 99999})
class D:
def __init__(self):
pass
num_replicas = 2
D.options(num_replicas=num_replicas).deploy(_blocking=False)
expected_warning = (f"Deployment '{D.name}' has "
f"{num_replicas} replicas that have taken "
f"more than {SLOW_STARTUP_WARNING_S}s "
f"to be scheduled.")
# wait long enough for the warning to be printed
# with a small grace period
time.sleep(SLOW_STARTUP_WARNING_PERIOD_S * 1.5)
captured = capsys.readouterr()
print(captured.err)
assert expected_warning in captured.err
# make sure that exactly one warning was printed
# for this deployment
assert captured.err.count(expected_warning) == 1
def test_slow_initialization_warning(serve_instance, capsys):
# this deployment will take a while to allocate
@serve.deployment
class D:
def __init__(self):
time.sleep(99999)
num_replicas = 4
D.options(num_replicas=num_replicas).deploy(_blocking=False)
expected_warning = (f"Deployment '{D.name}' has "
f"{num_replicas} replicas that have taken "
f"more than {SLOW_STARTUP_WARNING_S}s "
f"to initialize.")
# wait long enough for the warning to be printed
# with a small grace period
time.sleep(SLOW_STARTUP_WARNING_PERIOD_S * 1.5)
captured = capsys.readouterr()
assert expected_warning in captured.err
# make sure that exactly one warning was printed
# for this deployment
assert captured.err.count(expected_warning) == 1