mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
[serve] Rename backend -> deployment in replica.py (#20020)
This commit is contained in:
parent
ede9d0ed76
commit
91c730efd0
7 changed files with 39 additions and 39 deletions
|
@ -51,7 +51,7 @@ def start_metrics_pusher(interval_s: float,
|
|||
|
||||
timer = threading.Thread(target=send_forever)
|
||||
# Making this a daemon thread so it doesn't leak upon shutdown, and it
|
||||
# doesn't need to block the backend worker's shutdown.
|
||||
# doesn't need to block the replica's shutdown.
|
||||
timer.setDaemon(True)
|
||||
timer.start()
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ class AutoscalingPolicy:
|
|||
def get_decision_num_replicas(self,
|
||||
current_num_ongoing_requests: List[float],
|
||||
curr_target_num_replicas: int) -> int:
|
||||
"""Make a decision to scale backends.
|
||||
"""Make a decision to scale replicas.
|
||||
|
||||
Arguments:
|
||||
current_num_ongoing_requests: List[float]: List of number of
|
||||
|
@ -80,7 +80,7 @@ class AutoscalingPolicy:
|
|||
deployment is currently trying to scale to.
|
||||
|
||||
Returns:
|
||||
int: The new number of replicas to scale this backend to.
|
||||
int: The new number of replicas to scale to.
|
||||
"""
|
||||
return curr_target_num_replicas
|
||||
|
||||
|
|
|
@ -52,8 +52,8 @@ DEFAULT_LATENCY_BUCKET_MS = [
|
|||
5000,
|
||||
]
|
||||
|
||||
#: Name of backend reconfiguration method implemented by user.
|
||||
BACKEND_RECONFIGURE_METHOD = "reconfigure"
|
||||
#: Name of deployment reconfiguration method implemented by user.
|
||||
RECONFIGURE_METHOD = "reconfigure"
|
||||
|
||||
SERVE_ROOT_URL_ENV_KEY = "RAY_SERVE_ROOT_URL"
|
||||
|
||||
|
|
|
@ -127,10 +127,10 @@ class RayServeHandle:
|
|||
"""Set options for this handle.
|
||||
|
||||
Args:
|
||||
method_name(str): The method to invoke on the backend.
|
||||
method_name(str): The method to invoke.
|
||||
http_method(str): The HTTP method to use for the request.
|
||||
shard_key(str): A string to use to deterministically map this
|
||||
request to a backend if there are multiple for this endpoint.
|
||||
request to a deployment if there are multiple.
|
||||
"""
|
||||
new_options_dict = self.handle_options.__dict__.copy()
|
||||
user_modified_options_dict = {
|
||||
|
@ -227,7 +227,7 @@ class RayServeSyncHandle(RayServeHandle):
|
|||
request_data(dict, Any): If it's a dictionary, the data will be
|
||||
available in ``request.json()`` or ``request.form()``.
|
||||
If it's a Starlette Request object, it will be passed in to the
|
||||
backend directly, unmodified. Otherwise, the data will be
|
||||
handler directly, unmodified. Otherwise, the data will be
|
||||
available in ``request.data``.
|
||||
``**kwargs``: All keyword arguments will be available in
|
||||
``request.args``.
|
||||
|
|
|
@ -45,7 +45,7 @@ async def _send_request_to_handle(handle, scope, receive, send):
|
|||
del scope["endpoint"]
|
||||
|
||||
# NOTE(edoakes): it's important that we defer building the starlette
|
||||
# request until it reaches the backend replica to avoid unnecessary
|
||||
# request until it reaches the replica to avoid unnecessary
|
||||
# serialization cost, so we use a simple dataclass here.
|
||||
request = HTTPRequestWrapper(scope, http_body_bytes)
|
||||
# Perform a pickle here to improve latency. Stdlib pickle for simple
|
||||
|
|
|
@ -22,7 +22,7 @@ from ray.serve.exceptions import RayServeException
|
|||
from ray.util import metrics
|
||||
from ray.serve.router import Query, RequestMetadata
|
||||
from ray.serve.constants import (
|
||||
BACKEND_RECONFIGURE_METHOD,
|
||||
RECONFIGURE_METHOD,
|
||||
DEFAULT_LATENCY_BUCKET_MS,
|
||||
)
|
||||
from ray.serve.version import DeploymentVersion
|
||||
|
@ -45,21 +45,21 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes):
|
|||
init_kwargs, deployment_config_proto_bytes: bytes,
|
||||
version: DeploymentVersion, controller_name: str,
|
||||
detached: bool):
|
||||
backend = cloudpickle.loads(serialized_deployment_def)
|
||||
deployment_def = cloudpickle.loads(serialized_deployment_def)
|
||||
deployment_config = DeploymentConfig.from_proto_bytes(
|
||||
deployment_config_proto_bytes)
|
||||
|
||||
if inspect.isfunction(backend):
|
||||
if inspect.isfunction(deployment_def):
|
||||
is_function = True
|
||||
elif inspect.isclass(backend):
|
||||
elif inspect.isclass(deployment_def):
|
||||
is_function = False
|
||||
else:
|
||||
assert False, ("deployment_def must be function, class, or "
|
||||
"corresponding import path.")
|
||||
|
||||
# Set the controller name so that serve.connect() in the user's
|
||||
# backend code will connect to the instance that this backend is
|
||||
# running in.
|
||||
# code will connect to the instance that this deployment is running
|
||||
# in.
|
||||
ray.serve.api._set_internal_replica_context(
|
||||
deployment_name,
|
||||
replica_tag,
|
||||
|
@ -81,13 +81,13 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes):
|
|||
# for allocation of this replica by using the `is_allocated`
|
||||
# method. After that, it calls `reconfigure` to trigger
|
||||
# user code initialization.
|
||||
async def initialize_backend():
|
||||
async def initialize_replica():
|
||||
if is_function:
|
||||
_callable = backend
|
||||
_callable = deployment_def
|
||||
else:
|
||||
# This allows backends to define an async __init__ method
|
||||
# (required for FastAPI backend definition).
|
||||
_callable = backend.__new__(backend)
|
||||
# This allows deployments to define an async __init__
|
||||
# method (required for FastAPI).
|
||||
_callable = deployment_def.__new__(deployment_def)
|
||||
await sync_to_async(_callable.__init__)(*init_args,
|
||||
**init_kwargs)
|
||||
# Setting the context again to update the servable_object.
|
||||
|
@ -97,16 +97,16 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes):
|
|||
controller_name,
|
||||
servable_object=_callable)
|
||||
|
||||
self.backend = RayServeReplica(
|
||||
self.replica = RayServeReplica(
|
||||
_callable, deployment_name, replica_tag, deployment_config,
|
||||
deployment_config.user_config, version, is_function,
|
||||
controller_handle)
|
||||
|
||||
# Is it fine that backend is None here?
|
||||
# Should we add a check in all methods that use self.backend
|
||||
# or, alternatively, create an async get_backend() method?
|
||||
self.backend = None
|
||||
self._initialize_backend = initialize_backend
|
||||
# Is it fine that replica is None here?
|
||||
# Should we add a check in all methods that use self.replica
|
||||
# or, alternatively, create an async get_replica() method?
|
||||
self.replica = None
|
||||
self._initialize_replica = initialize_replica
|
||||
|
||||
# asyncio.Event used to signal that the replica is shutting down.
|
||||
self.shutdown_event = asyncio.Event()
|
||||
|
@ -124,7 +124,7 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes):
|
|||
|
||||
# Directly receive input because it might contain an ObjectRef.
|
||||
query = Query(request_args, request_kwargs, request_metadata)
|
||||
return await self.backend.handle_request(query)
|
||||
return await self.replica.handle_request(query)
|
||||
|
||||
async def is_allocated(self):
|
||||
"""poke the replica to check whether it's alive.
|
||||
|
@ -139,19 +139,19 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes):
|
|||
|
||||
async def reconfigure(self, user_config: Optional[Any] = None
|
||||
) -> Tuple[DeploymentConfig, DeploymentVersion]:
|
||||
if self.backend is None:
|
||||
await self._initialize_backend()
|
||||
if self.replica is None:
|
||||
await self._initialize_replica()
|
||||
if user_config is not None:
|
||||
await self.backend.reconfigure(user_config)
|
||||
await self.replica.reconfigure(user_config)
|
||||
|
||||
return self.get_metadata()
|
||||
|
||||
def get_metadata(self) -> Tuple[DeploymentConfig, DeploymentVersion]:
|
||||
return self.backend.deployment_config, self.backend.version
|
||||
return self.replica.deployment_config, self.replica.version
|
||||
|
||||
async def prepare_for_shutdown(self):
|
||||
self.shutdown_event.set()
|
||||
return await self.backend.prepare_for_shutdown()
|
||||
return await self.replica.prepare_for_shutdown()
|
||||
|
||||
async def run_forever(self):
|
||||
await self.shutdown_event.wait()
|
||||
|
@ -352,12 +352,12 @@ class RayServeReplica:
|
|||
if self.is_function:
|
||||
raise ValueError(
|
||||
"deployment_def must be a class to use user_config")
|
||||
elif not hasattr(self.callable, BACKEND_RECONFIGURE_METHOD):
|
||||
raise RayServeException("user_config specified but backend " +
|
||||
elif not hasattr(self.callable, RECONFIGURE_METHOD):
|
||||
raise RayServeException("user_config specified but deployment " +
|
||||
self.deployment_name + " missing " +
|
||||
BACKEND_RECONFIGURE_METHOD + " method")
|
||||
RECONFIGURE_METHOD + " method")
|
||||
reconfigure_method = sync_to_async(
|
||||
getattr(self.callable, BACKEND_RECONFIGURE_METHOD))
|
||||
getattr(self.callable, RECONFIGURE_METHOD))
|
||||
await reconfigure_method(user_config)
|
||||
|
||||
async def handle_request(self, request: Query) -> asyncio.Future:
|
||||
|
|
|
@ -27,7 +27,7 @@ class RequestMetadata:
|
|||
http_headers: Dict[str, str] = field(default_factory=dict)
|
||||
|
||||
# This flag will be set to true if the input argument is manually pickled
|
||||
# and it needs to be deserialized by the backend worker.
|
||||
# and it needs to be deserialized by the replica.
|
||||
http_arg_is_pickled: bool = False
|
||||
|
||||
def __post_init__(self):
|
||||
|
@ -139,7 +139,7 @@ class ReplicaSet:
|
|||
async def assign_replica(self, query: Query) -> ray.ObjectRef:
|
||||
"""Given a query, submit it to a replica and return the object ref.
|
||||
This method will keep track of the in flight queries for each replicas
|
||||
and only send a query to available replicas (determined by the backend
|
||||
and only send a query to available replicas (determined by the
|
||||
max_concurrent_quries value.)
|
||||
"""
|
||||
endpoint = query.metadata.endpoint
|
||||
|
@ -179,7 +179,7 @@ class Router:
|
|||
deployment_name: str,
|
||||
event_loop: asyncio.BaseEventLoop = None,
|
||||
):
|
||||
"""Router process incoming queries: choose backend, and assign replica.
|
||||
"""Router process incoming queries: assign a replica.
|
||||
|
||||
Args:
|
||||
controller_handle(ActorHandle): The controller handle.
|
||||
|
|
Loading…
Add table
Reference in a new issue