[Serve] Implement metric interface (#5852)

* Implement metric interface

* Address comment: made actor_handles a dict

* Fix iteration

* Lint

* Mark lightweight actors as num_cpus=0 to prevent resource starvation

* Be more explicit about the readiness condition

* Make task_runner non-blocking

* Lint
This commit is contained in:
Simon Mo 2019-10-07 09:29:26 -07:00 committed by GitHub
parent 25dde48607
commit 9bb3633cd9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 347 additions and 12 deletions

View file

@ -2,11 +2,11 @@ import sys
if sys.version_info < (3, 0):
raise ImportError("serve is Python 3 only.")
from ray.experimental.serve.api import (init, create_backend, create_endpoint,
link, split, rollback, get_handle,
global_state, scale) # noqa: E402
from ray.experimental.serve.api import (
init, create_backend, create_endpoint, link, split, rollback, get_handle,
global_state, stat, scale) # noqa: E402
__all__ = [
"init", "create_backend", "create_endpoint", "link", "split", "rollback",
"get_handle", "global_state", "scale"
"get_handle", "global_state", "stat", "scale"
]

View file

@ -10,7 +10,7 @@ from ray.experimental.serve.global_state import GlobalState
global_state = GlobalState()
def init(blocking=False, object_store_memory=int(1e8)):
def init(blocking=False, object_store_memory=int(1e8), gc_window_seconds=3600):
"""Initialize a serve cluster.
Calling `ray.init` before `serve.init` is optional. When there is not a ray
@ -19,22 +19,30 @@ def init(blocking=False, object_store_memory=int(1e8)):
Args:
blocking (bool): If true, the function will wait for the HTTP server to
be healthy before returns.
be healthy, and other components to be ready before returns.
object_store_memory (int): Allocated shared memory size in bytes. The
default is 100MiB. The default is kept low for latency stability
reason.
gc_window_seconds(int): How long will we keep the metric data in
memory. Data older than the gc_window will be deleted. The default
is 3600 seconds, which is 1 hour.
"""
if not ray.is_initialized():
ray.init(object_store_memory=object_store_memory)
# NOTE(simon): Currently the initialization order is fixed.
# HTTP server depends on the API server.
# Metric monitor depends on the router.
global_state.init_api_server()
global_state.init_router()
global_state.init_http_server()
global_state.init_metric_monitor()
if blocking:
global_state.wait_until_http_ready()
ray.get(global_state.router_actor_handle.is_ready.remote())
ray.get(global_state.kv_store_actor_handle.is_ready.remote())
ray.get(global_state.metric_monitor_handle.is_ready.remote())
def create_endpoint(endpoint_name, route_expression, blocking=True):
@ -103,6 +111,7 @@ def _start_replica(backend_tag):
runner._ray_serve_main_loop.remote(runner)
global_state.backend_replicas[backend_tag].append(runner)
global_state.metric_monitor_handle.add_target.remote(runner)
def _remove_replica(backend_tag):
@ -114,6 +123,9 @@ def _remove_replica(backend_tag):
replicas = global_state.backend_replicas[backend_tag]
oldest_replica_handle = replicas.popleft()
global_state.metric_monitor_handle.remove_target.remote(
oldest_replica_handle)
# explicitly terminate that actor
del oldest_replica_handle
@ -236,3 +248,19 @@ def get_handle(endpoint_name):
from ray.experimental.serve.handle import RayServeHandle
return RayServeHandle(global_state.router_actor_handle, endpoint_name)
def stat(percentiles=[50, 90, 95],
agg_windows_seconds=[10, 60, 300, 600, 3600]):
"""Retrieve metric statistics about ray serve system.
Args:
percentiles(List[int]): The percentiles for aggregation operations.
Default is 50th, 90th, 95th percentile.
agg_windows_seconds(List[int]): The aggregation windows in seconds.
The longest aggregation window must be shorter or equal to the
gc_window_seconds.
"""
return ray.get(
global_state.metric_monitor_handle.collect.remote(
percentiles, agg_windows_seconds))

View file

@ -4,6 +4,7 @@ Full example of ray.serve module
import ray
import ray.experimental.serve as serve
from ray.experimental.serve.utils import pformat_color_json
import requests
import time
@ -56,3 +57,6 @@ for _ in range(10):
# You can also scale each backend independently.
serve.scale("echo:v1", 2)
serve.scale("echo:v2", 2)
# As well as retrieving relevant system metrics
print(pformat_color_json(serve.stat()))

View file

@ -8,6 +8,8 @@ from ray.experimental.serve.kv_store_service import KVStoreProxyActor
from ray.experimental.serve.queues import CentralizedQueuesActor
from ray.experimental.serve.utils import logger
from ray.experimental.serve.server import HTTPActor
from ray.experimental.serve.metric import (MetricMonitor,
start_metric_monitor_loop)
# TODO(simon): Global state currently is designed to resides in the driver
# process. In the next iteration, we will move all mutable states into
@ -53,6 +55,9 @@ class GlobalState:
# use random/available port in a pre-defined port range. TODO(simon)
self.http_address = ""
#: Metric monitor handle
self.metric_monitor_handle = None
def init_api_server(self):
logger.info(LOG_PREFIX + "Initalizing routing table")
self.kv_store_actor_handle = KVStoreProxyActor.remote()
@ -72,6 +77,12 @@ class GlobalState:
self.router_actor_handle.register_self_handle.remote(
self.router_actor_handle)
def init_metric_monitor(self, gc_window_seconds=3600):
logger.info(LOG_PREFIX + "Initializing metric monitor")
self.metric_monitor_handle = MetricMonitor.remote(gc_window_seconds)
start_metric_monitor_loop.remote(self.metric_monitor_handle)
self.metric_monitor_handle.add_target.remote(self.router_actor_handle)
def wait_until_http_ready(self, num_retries=5, backoff_time_s=1):
http_is_ready = False
retries = num_retries

View file

@ -167,7 +167,10 @@ class KVStoreProxy:
return self.request_count
@ray.remote
@ray.remote(num_cpus=0)
class KVStoreProxyActor(KVStoreProxy):
def __init__(self, kv_class=RayInternalKVStore):
super().__init__(kv_class=kv_class)
def is_ready(self):
return True

View file

@ -0,0 +1,155 @@
import time
import ray
import numpy as np
import pandas as pd
@ray.remote(num_cpus=0)
class MetricMonitor:
def __init__(self, gc_window_seconds=3600):
"""Metric monitor scrapes metrics from ray serve actors
and allow windowed query operations.
Args:
gc_window_seconds(int): How long will we keep the metric data in
memory. Data older than the gc_window will be deleted.
"""
#: Mapping actor ID (hex) -> actor handle
self.actor_handles = dict()
self.data_entries = []
self.gc_window_seconds = gc_window_seconds
self.latest_gc_time = time.time()
def is_ready(self):
return True
def add_target(self, target_handle):
hex_id = target_handle._ray_actor_id.hex()
self.actor_handles[hex_id] = target_handle
def remove_target(self, target_handle):
hex_id = target_handle._ray_actor_id.hex()
self.actor_handles.pop(hex_id)
def scrape(self):
# If expected gc time has passed, we will perform metric value GC.
expected_gc_time = self.latest_gc_time + self.gc_window_seconds
if expected_gc_time < time.time():
self._perform_gc()
self.latest_gc_time = time.time()
curr_time = time.time()
result = [
handle._serve_metric.remote()
for handle in self.actor_handles.values()
]
for handle_result in ray.get(result):
for metric_name, metric_info in handle_result.items():
data_entry = {
"retrieved_at": curr_time,
"name": metric_name,
"type": metric_info["type"],
}
if metric_info["type"] == "counter":
data_entry["value"] = metric_info["value"]
self.data_entries.append(data_entry)
elif metric_info["type"] == "list":
for metric_value in metric_info["value"]:
new_entry = data_entry.copy()
new_entry["value"] = metric_value
self.data_entries.append(new_entry)
def _perform_gc(self):
curr_time = time.time()
earliest_time_allowed = curr_time - self.gc_window_seconds
# If we don"t have any data at hand, no need to gc.
if len(self.data_entries) == 0:
return
df = pd.DataFrame(self.data_entries)
df = df[df["retrieved_at"] >= earliest_time_allowed]
self.data_entries = df.to_dict(orient="record")
def _get_dataframe(self):
return pd.DataFrame(self.data_entries)
def collect(self,
percentiles=[50, 90, 95],
agg_windows_seconds=[10, 60, 300, 600, 3600]):
"""Collect and perform aggregation on all metrics.
Args:
percentiles(List[int]): The percentiles for aggregation operations.
Default is 50th, 90th, 95th percentile.
agg_windows_seconds(List[int]): The aggregation windows in seconds.
The longest aggregation window must be shorter or equal to the
gc_window_seconds.
"""
result = {}
df = pd.DataFrame(self.data_entries)
if len(df) == 0: # no metric to report
return {}
# Retrieve the {metric_name -> metric_type} mapping
metric_types = df[["name",
"type"]].set_index("name").squeeze().to_dict()
for metric_name, metric_type in metric_types.items():
if metric_type == "counter":
result[metric_name] = df.loc[df["name"] == metric_name,
"value"].tolist()[-1]
if metric_type == "list":
result.update(
self._aggregate(metric_name, percentiles,
agg_windows_seconds))
return result
def _aggregate(self, metric_name, percentiles, agg_windows_seconds):
"""Perform aggregation over a metric.
Note:
This metric must have type `list`.
"""
assert max(agg_windows_seconds) <= self.gc_window_seconds, (
"Aggregation window exceeds gc window. You should set a longer gc "
"window or shorter aggregation window.")
curr_time = time.time()
df = pd.DataFrame(self.data_entries)
filtered_df = df[df["name"] == metric_name]
if len(filtered_df) == 0:
return dict()
data_types = filtered_df["type"].unique().tolist()
assert data_types == [
"list"
], ("Can't aggreagte over non-list type. {} has type {}".format(
metric_name, data_types))
aggregated_metric = {}
for window in agg_windows_seconds:
earliest_time = curr_time - window
windowed_df = filtered_df[
filtered_df["retrieved_at"] > earliest_time]
percentile_values = np.percentile(windowed_df["value"],
percentiles)
for percentile, value in zip(percentiles, percentile_values):
result_key = "{name}_{perc}th_perc_{window}_window".format(
name=metric_name, perc=percentile, window=window)
aggregated_metric[result_key] = value
return aggregated_metric
@ray.remote(num_cpus=0)
def start_metric_monitor_loop(monitor_handle, duration_s=5):
while True:
ray.get(monitor_handle.scrape.remote())
time.sleep(duration_s)

View file

@ -76,6 +76,18 @@ class CentralizedQueues:
# backend_name -> worker queue
self.workers = defaultdict(deque)
def is_ready(self):
return True
def _serve_metric(self):
return {
"service_{}_queue_size".format(service_name): {
"value": len(queue),
"type": "counter",
}
for service_name, queue in self.queues.items()
}
def enqueue_request(self, service, request_args, request_kwargs,
request_context):
query = Query(request_args, request_kwargs, request_context)

View file

@ -1,4 +1,6 @@
import traceback
import time
import ray
from ray.experimental.serve import context as serve_context
from ray.experimental.serve.context import TaskContext, FakeFlaskQuest
@ -50,6 +52,33 @@ class RayServeMixin:
_ray_serve_setup_completed = False
_ray_serve_dequeue_requestr_name = None
# Work token can be unfullfilled from last iteration.
# This cache will be used to determine whether or not we should
# work on the same task as previous iteration or we are ready to
# move on.
_ray_serve_cached_work_token = None
_serve_metric_error_counter = 0
_serve_metric_latency_list = []
def _serve_metric(self):
# Make a copy of the latency list and clear current list
latency_lst = self._serve_metric_latency_list[:]
self._serve_metric_latency_list = []
my_name = self._ray_serve_dequeue_requestr_name
return {
"{}_error_counter".format(my_name): {
"value": self._serve_metric_error_counter,
"type": "counter",
},
"{}_latency_s".format(my_name): {
"value": latency_lst,
"type": "list",
},
}
def _ray_serve_setup(self, my_name, _ray_serve_router_handle):
self._ray_serve_dequeue_requestr_name = my_name
self._ray_serve_router_handle = _ray_serve_router_handle
@ -59,10 +88,24 @@ class RayServeMixin:
assert self._ray_serve_setup_completed
self._ray_serve_self_handle = my_handle
work_token = ray.get(
self._ray_serve_router_handle.dequeue_request.remote(
self._ray_serve_dequeue_requestr_name))
work_item = ray.get(ray.ObjectID(work_token))
# Only retrieve the next task if we have completed previous task.
if self._ray_serve_cached_work_token is None:
work_token = ray.get(
self._ray_serve_router_handle.dequeue_request.remote(
self._ray_serve_dequeue_requestr_name))
else:
work_token = self._ray_serve_cached_work_token
work_token_id = ray.ObjectID(work_token)
ready, not_ready = ray.wait(
[work_token_id], num_returns=1, timeout=0.5)
if len(ready) == 1:
work_item = ray.get(work_token_id)
self._ray_serve_cached_work_token = None
else:
self._ray_serve_cached_work_token = work_token
self._ray_serve_self_handle._ray_serve_main_loop.remote(my_handle)
return
if work_item.request_context == TaskContext.Web:
serve_context.web = True
@ -77,13 +120,16 @@ class RayServeMixin:
result_object_id = work_item.result_object_id
start_timestamp = time.time()
try:
result = self.__call__(*args, **kwargs)
ray.worker.global_worker.put_object(result_object_id, result)
except Exception as e:
wrapped_exception = wrap_to_ray_error(e)
self._serve_metric_error_counter += 1
ray.worker.global_worker.put_object(result_object_id,
wrapped_exception)
self._serve_metric_latency_list.append(time.time() - start_timestamp)
serve_context.web = False
# The worker finished one unit of work.

View file

@ -0,0 +1,76 @@
import numpy as np
import pytest
import ray
from ray.experimental.serve.metric import MetricMonitor
@pytest.fixture(scope="session")
def start_target_actor(ray_instance):
@ray.remote
class Target():
def __init__(self):
self.counter_value = 0
def _serve_metric(self):
self.counter_value += 1
return {
"latency_list": {
"type": "list",
# Generate 0 to 100 inclusive.
# This means total of 101 items.
"value": np.arange(101).tolist()
},
"counter": {
"type": "counter",
"value": self.counter_value
}
}
def get_counter_value(self):
return self.counter_value
yield Target.remote()
def test_metric_gc(ray_instance, start_target_actor):
target_actor = start_target_actor
# this means when new scrapes are invoked, the
metric_monitor = MetricMonitor.remote(gc_window_seconds=0)
metric_monitor.add_target.remote(target_actor)
ray.get(metric_monitor.scrape.remote())
df = ray.get(metric_monitor._get_dataframe.remote())
print(df)
assert len(df) == 102
# Old metric sould be cleared. So only 1 counter + 101 list values left.
ray.get(metric_monitor.scrape.remote())
df = ray.get(metric_monitor._get_dataframe.remote())
assert len(df) == 102
def test_metric_system(ray_instance, start_target_actor):
target_actor = start_target_actor
metric_monitor = MetricMonitor.remote()
metric_monitor.add_target.remote(target_actor)
# Scrape once
metric_monitor.scrape.remote()
percentiles = [50, 90, 95]
agg_windows_seconds = [60]
result = ray.get(
metric_monitor.collect.remote(percentiles, agg_windows_seconds))
real_counter_value = ray.get(target_actor.get_counter_value.remote())
expected_result = {
"counter": real_counter_value,
"latency_list_50th_perc_60_window": 50.0,
"latency_list_90th_perc_60_window": 90.0,
"latency_list_95th_perc_60_window": 95.0,
}
assert result == expected_result

View file

@ -77,7 +77,7 @@ extras = {
],
"debug": ["psutil", "setproctitle", "py-spy >= 0.2.0"],
"dashboard": ["aiohttp", "psutil", "setproctitle"],
"serve": ["uvicorn", "pygments", "werkzeug", "flask"],
"serve": ["uvicorn", "pygments", "werkzeug", "flask", "pandas"],
}