[Serve] Add Custom Usage Tags for Reporting (#27061)

This commit is contained in:
Simon Mo 2022-07-27 10:59:41 -07:00 committed by GitHub
parent 2fe0ac11a4
commit 8bb37c3a9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 5 deletions

View file

@ -244,10 +244,21 @@ def _put_library_usage(library_usage: str):
class TagKey(Enum):
_TEST1 = auto()
_TEST2 = auto()
# RLlib
# The deep learning framework ("tf", "torch", etc.).
RLLIB_FRAMEWORK = auto()
# The algorithm name (only built-in algorithms).
RLLIB_ALGORITHM = auto()
# The number of workers as a string.
RLLIB_NUM_WORKERS = auto()
# Serve
# The public Python API version ("v1", "v2").
SERVE_API_VERSION = auto()
# The total number of running serve deployments as a string.
SERVE_NUM_DEPLOYMENTS = auto()
def record_extra_usage_tag(key: TagKey, value: str):
"""Record extra kv usage tag.

View file

@ -14,6 +14,10 @@ from typing import Any, Callable, Dict, List, Optional, Tuple
import ray
from ray import ObjectRef, cloudpickle
from ray._private.usage.usage_lib import (
TagKey,
record_extra_usage_tag,
)
from ray.actor import ActorHandle
from ray.exceptions import RayActorError, RayError
from ray.serve._private.autoscaling_metrics import InMemoryMetricsStore
@ -1771,6 +1775,9 @@ class DeploymentStateManager:
self._deployment_states[deployment_name] = self._create_deployment_state(
deployment_name
)
record_extra_usage_tag(
TagKey.SERVE_NUM_DEPLOYMENTS, str(len(self._deployment_states))
)
return self._deployment_states[deployment_name].deploy(deployment_info)
@ -1856,3 +1863,8 @@ class DeploymentStateManager:
for tag in deleted_tags:
del self._deployment_states[tag]
if len(deleted_tags):
record_extra_usage_tag(
TagKey.SERVE_NUM_DEPLOYMENTS, str(len(self._deployment_states))
)

View file

@ -4,6 +4,7 @@ import logging
from typing import Any, Callable, Dict, Optional, Tuple, Union, overload
from fastapi import APIRouter, FastAPI
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from starlette.requests import Request
from uvicorn.config import Config
from uvicorn.lifespan.on import LifespanOn
@ -94,8 +95,12 @@ def start(
dedicated_cpu: Whether to reserve a CPU core for the internal
Serve controller actor. Defaults to False.
"""
client = _private_api.serve_start(detached, http_options, dedicated_cpu, **kwargs)
return _private_api.serve_start(detached, http_options, dedicated_cpu, **kwargs)
# Record after Ray has been started.
record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1")
return client
@PublicAPI
@ -397,6 +402,7 @@ def get_deployment(name: str) -> Deployment:
Returns:
Deployment
"""
record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1")
return _private_api.get_deployment(name)
@ -407,7 +413,7 @@ def list_deployments() -> Dict[str, Deployment]:
Dictionary maps deployment name to Deployment objects.
"""
record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1")
return _private_api.list_deployments()
@ -436,11 +442,13 @@ def run(
RayServeHandle: A regular ray serve handle that can be called by user
to execute the serve DAG.
"""
client = _private_api.serve_start(
detached=True, http_options={"host": host, "port": port}
)
# Record after Ray has been started.
record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v2")
if isinstance(target, Application):
deployments = list(target.deployments.values())
ingress = target.ingress
@ -511,7 +519,6 @@ def build(target: Union[ClassNode, FunctionNode]) -> Application:
The returned Application object can be exported to a dictionary or YAML
config.
"""
if in_interactive_shell():
raise RuntimeError(
"build cannot be called from an interactive shell like "

View file

@ -9,6 +9,7 @@ from typing import (
Tuple,
Union,
)
from ray._private.usage.usage_lib import TagKey, record_extra_usage_tag
from ray.serve.context import get_global_client
from ray.dag.class_node import ClassNode
@ -213,6 +214,7 @@ class Deployment:
init_kwargs: kwargs to pass to the class __init__
method. Not valid if this deployment wraps a function.
"""
record_extra_usage_tag(TagKey.SERVE_API_VERSION, "v1")
self._deploy(*init_args, _blocking=_blocking, **init_kwargs)
# TODO(Sihan) Promote the _deploy to deploy after we fully deprecate the API
@ -276,7 +278,6 @@ class Deployment:
Returns:
ServeHandle
"""
return self._get_handle(sync)
# TODO(Sihan) Promote the _get_handle to get_handle after we fully deprecate the API

View file

@ -955,6 +955,8 @@ provider:
"extra_k1": "extra_v1",
"_test1": "extra_v2",
"_test2": "extra_v3",
"serve_num_deployments": "1",
"serve_api_version": "v1",
}
assert payload["total_num_nodes"] == 1
assert payload["total_num_running_jobs"] == 1