diff --git a/doc/source/serve/advanced.rst b/doc/source/serve/advanced.rst index 3ac191f1b..542a3ce18 100644 --- a/doc/source/serve/advanced.rst +++ b/doc/source/serve/advanced.rst @@ -321,6 +321,10 @@ The following metrics are exposed by Ray Serve: - The number of HTTP requests processed. * - ``serve_num_router_requests`` - The number of requests processed by the router. + * - ``serve_handle_request_counter`` + - The number of requests processed by this ServeHandle. + * - ``backend_queued_queries`` + - The number of queries for this backend waiting to be assigned to a replica. To see this in action, run ``ray start --head --metrics-export-port=8080`` in your terminal, and then run the following script: diff --git a/python/ray/serve/handle.py b/python/ray/serve/handle.py index c6951c638..475f64556 100644 --- a/python/ray/serve/handle.py +++ b/python/ray/serve/handle.py @@ -4,6 +4,8 @@ from dataclasses import dataclass, field from typing import Any, Dict, Optional, Union from enum import Enum +from ray.serve.utils import get_random_letters +from ray.util import metrics from ray.serve.router import Router @@ -47,6 +49,17 @@ class RayServeHandle: self.router = router self.endpoint_name = endpoint_name self.handle_options = handle_options or HandleOptions() + self.handle_tag = f"{self.endpoint_name}#{get_random_letters()}" + + self.request_counter = metrics.Count( + "serve_handle_request_counter", + description=("The number of handle.remote() calls that have been " + "made on this handle."), + tag_keys=("handle", "endpoint")) + self.request_counter.set_default_tags({ + "handle": self.handle_tag, + "endpoint": self.endpoint_name + }) def options(self, *, @@ -92,6 +105,7 @@ class RayServeHandle: ``**kwargs``: All keyword arguments will be available in ``request.query_params``. """ + self.request_counter.record(1) return await self.router._remote( self.endpoint_name, self.handle_options, request_data, kwargs) @@ -118,6 +132,7 @@ class RayServeSyncHandle(RayServeHandle): ``**kwargs``: All keyword arguments will be available in ``request.args``. """ + self.request_counter.record(1) coro = self.router._remote(self.endpoint_name, self.handle_options, request_data, kwargs) future: concurrent.futures.Future = asyncio.run_coroutine_threadsafe( diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index c4a87b49b..ec887d006 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -1,7 +1,6 @@ import asyncio from enum import Enum import itertools -from collections import defaultdict from dataclasses import dataclass, field from typing import Any, ChainMap, Dict, Iterable, List, Optional @@ -49,12 +48,12 @@ class Query: class ReplicaSet: """Data structure representing a set of replica actor handles""" - def __init__(self): + def __init__(self, backend_tag): + self.backend_tag = backend_tag # NOTE(simon): We have to do this because max_concurrent_queries # and the replica handles come from different long poll keys. self.max_concurrent_queries: int = 8 self.in_flight_queries: Dict[ActorHandle, set] = dict() - # The iterator used for load balancing among replicas. Using itertools # cycle, we implements a round-robin policy, skipping overloaded # replicas. @@ -64,15 +63,25 @@ class ReplicaSet: self.replica_iterator = itertools.cycle(self.in_flight_queries.keys()) # Used to unblock this replica set waiting for free replicas. A newly - # added replica or updated max_concurrenty_queries value means the + # added replica or updated max_concurrent_queries value means the # query that waits on a free replica might be unblocked on. self.config_updated_event = asyncio.Event() + self.num_queued_queries = 0 + self.num_queued_queries_gauge = metrics.Gauge( + "serve_backend_queued_queries", + description=( + "The current number of queries to this backend waiting" + " to be assigned to a replica."), + tag_keys=("backend", "endpoint")) + self.num_queued_queries_gauge.set_default_tags({ + "backend": self.backend_tag + }) def set_max_concurrent_queries(self, new_value): if new_value != self.max_concurrent_queries: self.max_concurrent_queries = new_value logger.debug( - f"ReplicaSet: chaging max_concurrent_queries to {new_value}") + f"ReplicaSet: changing max_concurrent_queries to {new_value}") self.config_updated_event.set() def update_worker_replicas(self, worker_replicas: Iterable[ActorHandle]): @@ -92,7 +101,7 @@ class ReplicaSet: self.config_updated_event.set() def _try_assign_replica(self, query: Query) -> Optional[ray.ObjectRef]: - """Try to assign query to a replica, return the object ref is succeeded + """Try to assign query to a replica, return the object ref if succeeded or return None if it can't assign this query to any replicas. """ for _ in range(len(self.in_flight_queries.keys())): @@ -130,6 +139,10 @@ class ReplicaSet: and only send a query to available replicas (determined by the backend max_concurrent_quries value.) """ + endpoint = query.metadata.endpoint + self.num_queued_queries += 1 + self.num_queued_queries_gauge.record( + self.num_queued_queries, tags={"endpoint": endpoint}) assigned_ref = self._try_assign_replica(query) while assigned_ref is None: # Can't assign a replica right now. logger.debug("Failed to assign a replica for " @@ -147,8 +160,12 @@ class ReplicaSet: return_when=asyncio.FIRST_COMPLETED) if self.config_updated_event.is_set(): self.config_updated_event.clear() - # We are pretty sure a free replica is ready now. + # We are pretty sure a free replica is ready now, let's recurse and + # assign this query a replica. assigned_ref = self._try_assign_replica(query) + self.num_queued_queries -= 1 + self.num_queued_queries_gauge.record( + self.num_queued_queries, tags={"endpoint": endpoint}) return assigned_ref @@ -168,7 +185,8 @@ class Router: self.controller = controller_handle self.endpoint_policies: Dict[str, EndpointPolicy] = dict() - self.backend_replicas: Dict[str, ReplicaSet] = defaultdict(ReplicaSet) + + self.backend_replicas: Dict[str, ReplicaSet] = dict() self._pending_endpoints: Dict[str, asyncio.Future] = dict() @@ -212,8 +230,8 @@ class Router: replica_handles) for backend_tag, replica_handles in ChainMap(added, updated).items(): - self.backend_replicas[backend_tag].update_worker_replicas( - replica_handles) + self._get_or_create_replica_set( + backend_tag).update_worker_replicas(replica_handles) for backend_tag in removed.keys(): if backend_tag in self.backend_replicas: @@ -223,8 +241,9 @@ class Router: added, removed, updated = compute_dict_delta(self.backend_replicas, backend_configs) for backend_tag, config in ChainMap(added, updated).items(): - self.backend_replicas[backend_tag].set_max_concurrent_queries( - config.max_concurrent_queries) + self._get_or_create_replica_set( + backend_tag).set_max_concurrent_queries( + config.max_concurrent_queries) for backend_tag in removed.keys(): if backend_tag in self.backend_replicas: @@ -261,11 +280,17 @@ class Router: endpoint_policy = self.endpoint_policies[endpoint] chosen_backend, *shadow_backends = endpoint_policy.assign(query) - result_ref = await self.backend_replicas[chosen_backend - ].assign_replica(query) + result_ref = await self._get_or_create_replica_set( + chosen_backend).assign_replica(query) for backend in shadow_backends: - await self.backend_replicas[backend].assign_replica(query) + (await self._get_or_create_replica_set(backend) + .assign_replica(query)) self.num_router_requests.record(1, tags={"endpoint": endpoint}) return result_ref + + def _get_or_create_replica_set(self, backend_name): + if backend_name not in self.backend_replicas: + self.backend_replicas[backend_name] = ReplicaSet(backend_name) + return self.backend_replicas[backend_name] diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index a35f7e54b..62f239f78 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -875,6 +875,10 @@ def test_serve_metrics(serve_instance): # gauge "replica_processing_queries", "replica_queued_queries", + # handle + "serve_handle_request_counter", + # ReplicaSet + "backend_queued_queries" ] for metric in expected_metrics: # For the final error round diff --git a/python/ray/serve/tests/test_router.py b/python/ray/serve/tests/test_router.py index 231ac11a5..9b8eb5548 100644 --- a/python/ray/serve/tests/test_router.py +++ b/python/ray/serve/tests/test_router.py @@ -204,7 +204,7 @@ async def test_replica_set(ray_instance): return self._num_queries # We will test a scenario with two replicas in the replica set. - rs = ReplicaSet() + rs = ReplicaSet("my_backend") workers = [MockWorker.remote() for _ in range(2)] rs.set_max_concurrent_queries(1) rs.update_worker_replicas(workers)