From df9f89141651a5f07a62e6d3669fc99a7576831e Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Thu, 14 Jul 2022 20:10:56 -0700 Subject: [PATCH] [Serve] User custom class name for replica class (#26574) --- python/ray/serve/replica.py | 17 ++++++++++++++--- python/ray/serve/tests/test_cluster.py | 2 +- python/ray/serve/tests/test_metrics.py | 14 ++++++++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/replica.py b/python/ray/serve/replica.py index 1c442f303..ae387baff 100644 --- a/python/ray/serve/replica.py +++ b/python/ray/serve/replica.py @@ -38,6 +38,10 @@ from ray.serve.version import DeploymentVersion logger = logging.getLogger(SERVE_LOGGER_NAME) +def _format_replica_actor_name(deployment_name: str): + return f"ServeReplica:{deployment_name}" + + def create_replica_wrapper(name: str): """Creates a replica class wrapping the provided function or class. @@ -211,8 +215,13 @@ def create_replica_wrapper(name: str): async def check_health(self): await self.replica.check_health() - RayServeWrappedReplica.__name__ = name - return RayServeWrappedReplica + # Dynamically create a new class with custom name here so Ray picks it up + # correctly in actor metadata table and observability stack. + return type( + _format_replica_actor_name(name), + (RayServeWrappedReplica,), + dict(RayServeWrappedReplica.__dict__), + ) class RayServeReplica: @@ -333,7 +342,9 @@ class RayServeReplica: def _get_handle_request_stats(self) -> Optional[Dict[str, int]]: actor_stats = ray.runtime_context.get_runtime_context()._get_actor_call_stats() - method_stat = actor_stats.get("RayServeWrappedReplica.handle_request") + method_stat = actor_stats.get( + f"{_format_replica_actor_name(self.deployment_name)}.handle_request" + ) return method_stat def _collect_autoscaling_metrics(self): diff --git a/python/ray/serve/tests/test_cluster.py b/python/ray/serve/tests/test_cluster.py index e5222f6ad..31b12567b 100644 --- a/python/ray/serve/tests/test_cluster.py +++ b/python/ray/serve/tests/test_cluster.py @@ -181,7 +181,7 @@ def test_intelligent_scale_down(ray_cluster): actors = ray._private.state.actors() node_to_actors = defaultdict(list) for actor in actors.values(): - if "RayServeWrappedReplica" not in actor["ActorClassName"]: + if "ServeReplica" not in actor["ActorClassName"]: continue if actor["State"] != "ALIVE": continue diff --git a/python/ray/serve/tests/test_metrics.py b/python/ray/serve/tests/test_metrics.py index 5a2f828ae..cd26e8cf7 100644 --- a/python/ray/serve/tests/test_metrics.py +++ b/python/ray/serve/tests/test_metrics.py @@ -7,6 +7,7 @@ import ray from ray import serve from ray._private.test_utils import wait_for_condition from ray.serve.utils import block_until_http_ready +import ray.experimental.state.api as state_api def test_serve_metrics_for_successful_connection(serve_instance): @@ -142,6 +143,19 @@ def test_http_metrics(serve_instance): verify_error_count(do_assert=True) +def test_actor_summary(serve_instance): + @serve.deployment + def f(): + pass + + serve.run(f.bind()) + actors = state_api.list_actors() + class_names = {actor["class_name"] for actor in actors} + assert class_names.issuperset( + {"ServeController", "HTTPProxyActor", "ServeReplica:f"} + ) + + if __name__ == "__main__": import sys