[Serve] Add ServeHandle metrics (#13640)

This commit is contained in:
architkulkarni 2021-01-28 12:40:47 -08:00 committed by GitHub
parent 4bc257f4fb
commit cb771f263d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 64 additions and 16 deletions

View file

@ -321,6 +321,10 @@ The following metrics are exposed by Ray Serve:
- The number of HTTP requests processed. - The number of HTTP requests processed.
* - ``serve_num_router_requests`` * - ``serve_num_router_requests``
- The number of requests processed by the router. - The number of requests processed by the router.
* - ``serve_handle_request_counter``
- The number of requests processed by this ServeHandle.
* - ``backend_queued_queries``
- The number of queries for this backend waiting to be assigned to a replica.
To see this in action, run ``ray start --head --metrics-export-port=8080`` in your terminal, and then run the following script: To see this in action, run ``ray start --head --metrics-export-port=8080`` in your terminal, and then run the following script:

View file

@ -4,6 +4,8 @@ from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union
from enum import Enum from enum import Enum
from ray.serve.utils import get_random_letters
from ray.util import metrics
from ray.serve.router import Router from ray.serve.router import Router
@ -47,6 +49,17 @@ class RayServeHandle:
self.router = router self.router = router
self.endpoint_name = endpoint_name self.endpoint_name = endpoint_name
self.handle_options = handle_options or HandleOptions() self.handle_options = handle_options or HandleOptions()
self.handle_tag = f"{self.endpoint_name}#{get_random_letters()}"
self.request_counter = metrics.Count(
"serve_handle_request_counter",
description=("The number of handle.remote() calls that have been "
"made on this handle."),
tag_keys=("handle", "endpoint"))
self.request_counter.set_default_tags({
"handle": self.handle_tag,
"endpoint": self.endpoint_name
})
def options(self, def options(self,
*, *,
@ -92,6 +105,7 @@ class RayServeHandle:
``**kwargs``: All keyword arguments will be available in ``**kwargs``: All keyword arguments will be available in
``request.query_params``. ``request.query_params``.
""" """
self.request_counter.record(1)
return await self.router._remote( return await self.router._remote(
self.endpoint_name, self.handle_options, request_data, kwargs) self.endpoint_name, self.handle_options, request_data, kwargs)
@ -118,6 +132,7 @@ class RayServeSyncHandle(RayServeHandle):
``**kwargs``: All keyword arguments will be available in ``**kwargs``: All keyword arguments will be available in
``request.args``. ``request.args``.
""" """
self.request_counter.record(1)
coro = self.router._remote(self.endpoint_name, self.handle_options, coro = self.router._remote(self.endpoint_name, self.handle_options,
request_data, kwargs) request_data, kwargs)
future: concurrent.futures.Future = asyncio.run_coroutine_threadsafe( future: concurrent.futures.Future = asyncio.run_coroutine_threadsafe(

View file

@ -1,7 +1,6 @@
import asyncio import asyncio
from enum import Enum from enum import Enum
import itertools import itertools
from collections import defaultdict
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, ChainMap, Dict, Iterable, List, Optional from typing import Any, ChainMap, Dict, Iterable, List, Optional
@ -49,12 +48,12 @@ class Query:
class ReplicaSet: class ReplicaSet:
"""Data structure representing a set of replica actor handles""" """Data structure representing a set of replica actor handles"""
def __init__(self): def __init__(self, backend_tag):
self.backend_tag = backend_tag
# NOTE(simon): We have to do this because max_concurrent_queries # NOTE(simon): We have to do this because max_concurrent_queries
# and the replica handles come from different long poll keys. # and the replica handles come from different long poll keys.
self.max_concurrent_queries: int = 8 self.max_concurrent_queries: int = 8
self.in_flight_queries: Dict[ActorHandle, set] = dict() self.in_flight_queries: Dict[ActorHandle, set] = dict()
# The iterator used for load balancing among replicas. Using itertools # The iterator used for load balancing among replicas. Using itertools
# cycle, we implements a round-robin policy, skipping overloaded # cycle, we implements a round-robin policy, skipping overloaded
# replicas. # replicas.
@ -64,15 +63,25 @@ class ReplicaSet:
self.replica_iterator = itertools.cycle(self.in_flight_queries.keys()) self.replica_iterator = itertools.cycle(self.in_flight_queries.keys())
# Used to unblock this replica set waiting for free replicas. A newly # Used to unblock this replica set waiting for free replicas. A newly
# added replica or updated max_concurrenty_queries value means the # added replica or updated max_concurrent_queries value means the
# query that waits on a free replica might be unblocked on. # query that waits on a free replica might be unblocked on.
self.config_updated_event = asyncio.Event() self.config_updated_event = asyncio.Event()
self.num_queued_queries = 0
self.num_queued_queries_gauge = metrics.Gauge(
"serve_backend_queued_queries",
description=(
"The current number of queries to this backend waiting"
" to be assigned to a replica."),
tag_keys=("backend", "endpoint"))
self.num_queued_queries_gauge.set_default_tags({
"backend": self.backend_tag
})
def set_max_concurrent_queries(self, new_value): def set_max_concurrent_queries(self, new_value):
if new_value != self.max_concurrent_queries: if new_value != self.max_concurrent_queries:
self.max_concurrent_queries = new_value self.max_concurrent_queries = new_value
logger.debug( logger.debug(
f"ReplicaSet: chaging max_concurrent_queries to {new_value}") f"ReplicaSet: changing max_concurrent_queries to {new_value}")
self.config_updated_event.set() self.config_updated_event.set()
def update_worker_replicas(self, worker_replicas: Iterable[ActorHandle]): def update_worker_replicas(self, worker_replicas: Iterable[ActorHandle]):
@ -92,7 +101,7 @@ class ReplicaSet:
self.config_updated_event.set() self.config_updated_event.set()
def _try_assign_replica(self, query: Query) -> Optional[ray.ObjectRef]: def _try_assign_replica(self, query: Query) -> Optional[ray.ObjectRef]:
"""Try to assign query to a replica, return the object ref is succeeded """Try to assign query to a replica, return the object ref if succeeded
or return None if it can't assign this query to any replicas. or return None if it can't assign this query to any replicas.
""" """
for _ in range(len(self.in_flight_queries.keys())): for _ in range(len(self.in_flight_queries.keys())):
@ -130,6 +139,10 @@ class ReplicaSet:
and only send a query to available replicas (determined by the backend and only send a query to available replicas (determined by the backend
max_concurrent_quries value.) max_concurrent_quries value.)
""" """
endpoint = query.metadata.endpoint
self.num_queued_queries += 1
self.num_queued_queries_gauge.record(
self.num_queued_queries, tags={"endpoint": endpoint})
assigned_ref = self._try_assign_replica(query) assigned_ref = self._try_assign_replica(query)
while assigned_ref is None: # Can't assign a replica right now. while assigned_ref is None: # Can't assign a replica right now.
logger.debug("Failed to assign a replica for " logger.debug("Failed to assign a replica for "
@ -147,8 +160,12 @@ class ReplicaSet:
return_when=asyncio.FIRST_COMPLETED) return_when=asyncio.FIRST_COMPLETED)
if self.config_updated_event.is_set(): if self.config_updated_event.is_set():
self.config_updated_event.clear() self.config_updated_event.clear()
# We are pretty sure a free replica is ready now. # We are pretty sure a free replica is ready now, let's recurse and
# assign this query a replica.
assigned_ref = self._try_assign_replica(query) assigned_ref = self._try_assign_replica(query)
self.num_queued_queries -= 1
self.num_queued_queries_gauge.record(
self.num_queued_queries, tags={"endpoint": endpoint})
return assigned_ref return assigned_ref
@ -168,7 +185,8 @@ class Router:
self.controller = controller_handle self.controller = controller_handle
self.endpoint_policies: Dict[str, EndpointPolicy] = dict() self.endpoint_policies: Dict[str, EndpointPolicy] = dict()
self.backend_replicas: Dict[str, ReplicaSet] = defaultdict(ReplicaSet)
self.backend_replicas: Dict[str, ReplicaSet] = dict()
self._pending_endpoints: Dict[str, asyncio.Future] = dict() self._pending_endpoints: Dict[str, asyncio.Future] = dict()
@ -212,8 +230,8 @@ class Router:
replica_handles) replica_handles)
for backend_tag, replica_handles in ChainMap(added, updated).items(): for backend_tag, replica_handles in ChainMap(added, updated).items():
self.backend_replicas[backend_tag].update_worker_replicas( self._get_or_create_replica_set(
replica_handles) backend_tag).update_worker_replicas(replica_handles)
for backend_tag in removed.keys(): for backend_tag in removed.keys():
if backend_tag in self.backend_replicas: if backend_tag in self.backend_replicas:
@ -223,8 +241,9 @@ class Router:
added, removed, updated = compute_dict_delta(self.backend_replicas, added, removed, updated = compute_dict_delta(self.backend_replicas,
backend_configs) backend_configs)
for backend_tag, config in ChainMap(added, updated).items(): for backend_tag, config in ChainMap(added, updated).items():
self.backend_replicas[backend_tag].set_max_concurrent_queries( self._get_or_create_replica_set(
config.max_concurrent_queries) backend_tag).set_max_concurrent_queries(
config.max_concurrent_queries)
for backend_tag in removed.keys(): for backend_tag in removed.keys():
if backend_tag in self.backend_replicas: if backend_tag in self.backend_replicas:
@ -261,11 +280,17 @@ class Router:
endpoint_policy = self.endpoint_policies[endpoint] endpoint_policy = self.endpoint_policies[endpoint]
chosen_backend, *shadow_backends = endpoint_policy.assign(query) chosen_backend, *shadow_backends = endpoint_policy.assign(query)
result_ref = await self.backend_replicas[chosen_backend result_ref = await self._get_or_create_replica_set(
].assign_replica(query) chosen_backend).assign_replica(query)
for backend in shadow_backends: for backend in shadow_backends:
await self.backend_replicas[backend].assign_replica(query) (await self._get_or_create_replica_set(backend)
.assign_replica(query))
self.num_router_requests.record(1, tags={"endpoint": endpoint}) self.num_router_requests.record(1, tags={"endpoint": endpoint})
return result_ref return result_ref
def _get_or_create_replica_set(self, backend_name):
if backend_name not in self.backend_replicas:
self.backend_replicas[backend_name] = ReplicaSet(backend_name)
return self.backend_replicas[backend_name]

View file

@ -875,6 +875,10 @@ def test_serve_metrics(serve_instance):
# gauge # gauge
"replica_processing_queries", "replica_processing_queries",
"replica_queued_queries", "replica_queued_queries",
# handle
"serve_handle_request_counter",
# ReplicaSet
"backend_queued_queries"
] ]
for metric in expected_metrics: for metric in expected_metrics:
# For the final error round # For the final error round

View file

@ -204,7 +204,7 @@ async def test_replica_set(ray_instance):
return self._num_queries return self._num_queries
# We will test a scenario with two replicas in the replica set. # We will test a scenario with two replicas in the replica set.
rs = ReplicaSet() rs = ReplicaSet("my_backend")
workers = [MockWorker.remote() for _ in range(2)] workers = [MockWorker.remote() for _ in range(2)]
rs.set_max_concurrent_queries(1) rs.set_max_concurrent_queries(1)
rs.update_worker_replicas(workers) rs.update_worker_replicas(workers)