diff --git a/python/ray/serve/autoscaling_metrics.py b/python/ray/serve/autoscaling_metrics.py index 4b0d030d7..aae88a56f 100644 --- a/python/ray/serve/autoscaling_metrics.py +++ b/python/ray/serve/autoscaling_metrics.py @@ -51,7 +51,7 @@ def start_metrics_pusher(interval_s: float, timer = threading.Thread(target=send_forever) # Making this a daemon thread so it doesn't leak upon shutdown, and it - # doesn't need to block the backend worker's shutdown. + # doesn't need to block the replica's shutdown. timer.setDaemon(True) timer.start() diff --git a/python/ray/serve/autoscaling_policy.py b/python/ray/serve/autoscaling_policy.py index 5f448aec4..6959afb3e 100644 --- a/python/ray/serve/autoscaling_policy.py +++ b/python/ray/serve/autoscaling_policy.py @@ -71,7 +71,7 @@ class AutoscalingPolicy: def get_decision_num_replicas(self, current_num_ongoing_requests: List[float], curr_target_num_replicas: int) -> int: - """Make a decision to scale backends. + """Make a decision to scale replicas. Arguments: current_num_ongoing_requests: List[float]: List of number of @@ -80,7 +80,7 @@ class AutoscalingPolicy: deployment is currently trying to scale to. Returns: - int: The new number of replicas to scale this backend to. + int: The new number of replicas to scale to. """ return curr_target_num_replicas diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index 3fb84c95f..7254be89d 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -52,8 +52,8 @@ DEFAULT_LATENCY_BUCKET_MS = [ 5000, ] -#: Name of backend reconfiguration method implemented by user. -BACKEND_RECONFIGURE_METHOD = "reconfigure" +#: Name of deployment reconfiguration method implemented by user. +RECONFIGURE_METHOD = "reconfigure" SERVE_ROOT_URL_ENV_KEY = "RAY_SERVE_ROOT_URL" diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index 7c34bdb0b..d5e10fa4f 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -127,10 +127,10 @@ class RayServeHandle: """Set options for this handle. Args: - method_name(str): The method to invoke on the backend. + method_name(str): The method to invoke. http_method(str): The HTTP method to use for the request. shard_key(str): A string to use to deterministically map this - request to a backend if there are multiple for this endpoint. + request to a deployment if there are multiple. """ new_options_dict = self.handle_options.__dict__.copy() user_modified_options_dict = { @@ -227,7 +227,7 @@ class RayServeSyncHandle(RayServeHandle): request_data(dict, Any): If it's a dictionary, the data will be available in ``request.json()`` or ``request.form()``. If it's a Starlette Request object, it will be passed in to the - backend directly, unmodified. Otherwise, the data will be + handler directly, unmodified. Otherwise, the data will be available in ``request.data``. ``**kwargs``: All keyword arguments will be available in ``request.args``. diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index e129f5d60..710f8152b 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -45,7 +45,7 @@ async def _send_request_to_handle(handle, scope, receive, send): del scope["endpoint"] # NOTE(edoakes): it's important that we defer building the starlette - # request until it reaches the backend replica to avoid unnecessary + # request until it reaches the replica to avoid unnecessary # serialization cost, so we use a simple dataclass here. request = HTTPRequestWrapper(scope, http_body_bytes) # Perform a pickle here to improve latency. Stdlib pickle for simple diff --git a/python/ray/serve/replica.py b/python/ray/serve/replica.py index 0b247bbc2..59215a060 100644 --- a/python/ray/serve/replica.py +++ b/python/ray/serve/replica.py @@ -22,7 +22,7 @@ from ray.serve.exceptions import RayServeException from ray.util import metrics from ray.serve.router import Query, RequestMetadata from ray.serve.constants import ( - BACKEND_RECONFIGURE_METHOD, + RECONFIGURE_METHOD, DEFAULT_LATENCY_BUCKET_MS, ) from ray.serve.version import DeploymentVersion @@ -45,21 +45,21 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes): init_kwargs, deployment_config_proto_bytes: bytes, version: DeploymentVersion, controller_name: str, detached: bool): - backend = cloudpickle.loads(serialized_deployment_def) + deployment_def = cloudpickle.loads(serialized_deployment_def) deployment_config = DeploymentConfig.from_proto_bytes( deployment_config_proto_bytes) - if inspect.isfunction(backend): + if inspect.isfunction(deployment_def): is_function = True - elif inspect.isclass(backend): + elif inspect.isclass(deployment_def): is_function = False else: assert False, ("deployment_def must be function, class, or " "corresponding import path.") # Set the controller name so that serve.connect() in the user's - # backend code will connect to the instance that this backend is - # running in. + # code will connect to the instance that this deployment is running + # in. ray.serve.api._set_internal_replica_context( deployment_name, replica_tag, @@ -81,13 +81,13 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes): # for allocation of this replica by using the `is_allocated` # method. After that, it calls `reconfigure` to trigger # user code initialization. - async def initialize_backend(): + async def initialize_replica(): if is_function: - _callable = backend + _callable = deployment_def else: - # This allows backends to define an async __init__ method - # (required for FastAPI backend definition). - _callable = backend.__new__(backend) + # This allows deployments to define an async __init__ + # method (required for FastAPI). + _callable = deployment_def.__new__(deployment_def) await sync_to_async(_callable.__init__)(*init_args, **init_kwargs) # Setting the context again to update the servable_object. @@ -97,16 +97,16 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes): controller_name, servable_object=_callable) - self.backend = RayServeReplica( + self.replica = RayServeReplica( _callable, deployment_name, replica_tag, deployment_config, deployment_config.user_config, version, is_function, controller_handle) - # Is it fine that backend is None here? - # Should we add a check in all methods that use self.backend - # or, alternatively, create an async get_backend() method? - self.backend = None - self._initialize_backend = initialize_backend + # Is it fine that replica is None here? + # Should we add a check in all methods that use self.replica + # or, alternatively, create an async get_replica() method? + self.replica = None + self._initialize_replica = initialize_replica # asyncio.Event used to signal that the replica is shutting down. self.shutdown_event = asyncio.Event() @@ -124,7 +124,7 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes): # Directly receive input because it might contain an ObjectRef. query = Query(request_args, request_kwargs, request_metadata) - return await self.backend.handle_request(query) + return await self.replica.handle_request(query) async def is_allocated(self): """poke the replica to check whether it's alive. @@ -139,19 +139,19 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes): async def reconfigure(self, user_config: Optional[Any] = None ) -> Tuple[DeploymentConfig, DeploymentVersion]: - if self.backend is None: - await self._initialize_backend() + if self.replica is None: + await self._initialize_replica() if user_config is not None: - await self.backend.reconfigure(user_config) + await self.replica.reconfigure(user_config) return self.get_metadata() def get_metadata(self) -> Tuple[DeploymentConfig, DeploymentVersion]: - return self.backend.deployment_config, self.backend.version + return self.replica.deployment_config, self.replica.version async def prepare_for_shutdown(self): self.shutdown_event.set() - return await self.backend.prepare_for_shutdown() + return await self.replica.prepare_for_shutdown() async def run_forever(self): await self.shutdown_event.wait() @@ -352,12 +352,12 @@ class RayServeReplica: if self.is_function: raise ValueError( "deployment_def must be a class to use user_config") - elif not hasattr(self.callable, BACKEND_RECONFIGURE_METHOD): - raise RayServeException("user_config specified but backend " + + elif not hasattr(self.callable, RECONFIGURE_METHOD): + raise RayServeException("user_config specified but deployment " + self.deployment_name + " missing " + - BACKEND_RECONFIGURE_METHOD + " method") + RECONFIGURE_METHOD + " method") reconfigure_method = sync_to_async( - getattr(self.callable, BACKEND_RECONFIGURE_METHOD)) + getattr(self.callable, RECONFIGURE_METHOD)) await reconfigure_method(user_config) async def handle_request(self, request: Query) -> asyncio.Future: diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index 33874ce17..6bda1d98f 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -27,7 +27,7 @@ class RequestMetadata: http_headers: Dict[str, str] = field(default_factory=dict) # This flag will be set to true if the input argument is manually pickled - # and it needs to be deserialized by the backend worker. + # and it needs to be deserialized by the replica. http_arg_is_pickled: bool = False def __post_init__(self): @@ -139,7 +139,7 @@ class ReplicaSet: async def assign_replica(self, query: Query) -> ray.ObjectRef: """Given a query, submit it to a replica and return the object ref. This method will keep track of the in flight queries for each replicas - and only send a query to available replicas (determined by the backend + and only send a query to available replicas (determined by the max_concurrent_quries value.) """ endpoint = query.metadata.endpoint @@ -179,7 +179,7 @@ class Router: deployment_name: str, event_loop: asyncio.BaseEventLoop = None, ): - """Router process incoming queries: choose backend, and assign replica. + """Router process incoming queries: assign a replica. Args: controller_handle(ActorHandle): The controller handle.