Export Metrics in OpenCensus Protobuf Format (#10080)

This commit is contained in:
Simon Mo 2020-08-18 11:32:42 -07:00 committed by GitHub
parent 8d06e30a06
commit bedc2c24c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 329 additions and 453 deletions

View file

@ -1902,17 +1902,11 @@ genrule(
fi
# NOTE(hchen): Protobuf doesn't allow specifying Python package name. So we use this `sed`
# command to change the import path in the generated file.
files=(
python/ray/core/generated/gcs_pb2.py
python/ray/core/generated/common_pb2.py
python/ray/core/generated/node_manager_pb2.py
python/ray/core/generated/node_manager_pb2_grpc.py
python/ray/core/generated/reporter_pb2.py
python/ray/core/generated/reporter_pb2_grpc.py
python/ray/core/generated/core_worker_pb2.py
python/ray/core/generated/core_worker_pb2_grpc.py
)
# shellcheck disable=SC2006
files=(`ls python/ray/core/generated/*_pb2*.py`)
sed -i -E 's/from src.ray.protobuf/from ./' "$${files[@]}"
sed -i -E 's/from opencensus.proto.metrics.v1 import/from . import/' "$${files[@]}"
sed -i -E 's/from opencensus.proto.resource.v1 import/from . import/' "$${files[@]}"
echo "$${PWD}" > $@
""",
local = 1,

View file

@ -264,3 +264,10 @@ def ray_deps_setup():
"//thirdparty/patches:msgpack-windows-iovec.patch",
],
)
http_archive(
name = "io_opencensus_proto",
strip_prefix = "opencensus-proto-0.3.0/src",
urls = ["https://github.com/census-instrumentation/opencensus-proto/archive/v0.3.0.tar.gz"],
sha256 = "b7e13f0b4259e80c3070b583c2f39e53153085a6918718b1c710caf7037572b0",
)

View file

@ -5,91 +5,35 @@ import threading
import time
import traceback
from collections import defaultdict
from typing import List
from opencensus.stats import aggregation
from opencensus.stats import measure as measure_module
from opencensus.stats.measurement_map import MeasurementMap
from opencensus.stats import stats as stats_module
from opencensus.tags import tag_key as tag_key_module
from opencensus.tags import tag_map as tag_map_module
from opencensus.tags import tag_value as tag_value_module
from opencensus.stats import view
from opencensus.stats.view import View
from opencensus.stats.view_data import ViewData
from opencensus.stats.aggregation_data import (CountAggregationData,
DistributionAggregationData,
LastValueAggregationData)
from opencensus.metrics.export.value import ValueDouble
import ray
from ray import prometheus_exporter
from ray.core.generated.common_pb2 import MetricPoint
from ray.core.generated.metrics_pb2 import Metric
logger = logging.getLogger(__name__)
# We don't need counter, histogram, or sum because reporter just needs to
# collect momental values (gauge) that are already counted or sampled
# (histogram for example), or summed inside cpp processes.
class Gauge(view.View):
def __init__(self, name, description, unit,
tags: List[tag_key_module.TagKey]):
self._measure = measure_module.MeasureInt(name, description, unit)
self._view = view.View(name, description, tags, self.measure,
aggregation.LastValueAggregation())
@property
def measure(self):
return self._measure
@property
def view(self):
return self._view
@property
def name(self):
return self.measure.name
@property
def description(self):
return self.measure.description
@property
def units(self):
return self.measure.unit
@property
def tags(self):
return self.view.columns
def __dict__(self):
return {
"name": self.measure.name,
"description": self.measure.description,
"units": self.measure.unit,
"tags": self.view.columns,
}
def __str__(self):
return self.__repr__()
def __repr__(self):
return str(self.__dict__())
class MetricsAgent:
def __init__(self, metrics_export_port):
assert metrics_export_port is not None
# OpenCensus classes.
self.view_manager = stats_module.stats.view_manager
self.stats_recorder = stats_module.stats.stats_recorder
# Port where we will expose metrics.
self.metrics_export_port = metrics_export_port
# metric name(str) -> view (view.View)
self._registry = defaultdict(lambda: None)
# Lock required because gRPC server uses
# multiple threads to process requests.
self._lock = threading.Lock()
# Whether or not there are metrics that are missing description and
# units information. This is used to dynamically update registry.
self._missing_information = False
# Configure exporter. (We currently only support prometheus).
self.view_manager.register_exporter(
@ -97,96 +41,73 @@ class MetricsAgent:
prometheus_exporter.Options(
namespace="ray", port=metrics_export_port)))
@property
def registry(self):
"""Return metric definition registry.
Metrics definition registry is dynamically updated
by metrics reported by Ray processes.
"""
return self._registry
def record_metrics_points(self, metrics_points: List[MetricPoint]):
def record_metric_points_from_protobuf(self, metrics: List[Metric]):
"""Record metrics from Opencensus Protobuf"""
with self._lock:
measurement_map = self.stats_recorder.new_measurement_map()
for metric_point in metrics_points:
self._register_if_needed(metric_point)
self._record(metric_point, measurement_map)
return self._missing_information
self._record_metrics(metrics)
def _record(self, metric_point: MetricPoint,
measurement_map: MeasurementMap):
"""Record a single metric point to export.
def _record_metrics(self, metrics):
# The list of view data is what we are going to use for the
# final export to exporter.
view_data_changed: List[ViewData] = []
NOTE: When this method is called, the caller should acquire a lock.
# Walk the protobufs and convert them to ViewData
for metric in metrics:
descriptor = metric.metric_descriptor
timeseries = metric.timeseries
Args:
metric_point(MetricPoint) metric point defined in common.proto
measurement_map(MeasurementMap): Measurement map to record metrics.
"""
metric_name = metric_point.metric_name
tags = metric_point.tags
if len(timeseries) == 0:
continue
metric = self._registry.get(metric_name)
# Metrics should be always registered dynamically.
assert metric
columns = [label_key.key for label_key in descriptor.label_keys]
start_time = timeseries[0].start_timestamp.seconds
tag_map = tag_map_module.TagMap()
for key, value in tags.items():
tag_key = tag_key_module.TagKey(key)
tag_value = tag_value_module.TagValue(value)
tag_map.insert(tag_key, tag_value)
# Create the view and view_data
measure = measure_module.BaseMeasure(
descriptor.name, descriptor.description, descriptor.unit)
view = self.view_manager.measure_to_view_map.get_view(
descriptor.name, None)
if not view:
view = View(
descriptor.name,
descriptor.description,
columns,
measure,
aggregation=None)
self.view_manager.measure_to_view_map.register_view(
view, start_time)
view_data = (self.view_manager.measure_to_view_map.
_measure_to_view_data_list_map[measure.name][-1])
view_data_changed.append(view_data)
metric_value = metric_point.value
measurement_map.measure_float_put(metric.measure, metric_value)
# NOTE: When we record this metric, timestamp will be renewed.
measurement_map.record(tag_map)
# Create the aggregation and fill it in the our stats
for series in timeseries:
tag_vals = tuple(val.value for val in series.label_values)
for point in series.points:
if point.HasField("int64_value"):
data = CountAggregationData(point.int64_value)
elif point.HasField("double_value"):
data = LastValueAggregationData(
ValueDouble, point.double_value)
elif point.HasField("distribution_value"):
dist_value = point.distribution_value
counts_per_bucket = [
bucket.count for bucket in dist_value.buckets
]
bucket_bounds = (
dist_value.bucket_options.explicit.bounds)
data = DistributionAggregationData(
dist_value.sum / dist_value.count,
dist_value.count,
dist_value.sum_of_squared_deviation,
counts_per_bucket, bucket_bounds)
else:
raise ValueError("Summary is not supported")
def _register_if_needed(self, metric_point: MetricPoint):
"""Register metrics if they are not registered.
view_data.tag_value_aggregation_data_map[tag_vals] = data
NOTE: When this method is called, the caller should acquire a lock.
Unseen metrics:
Register it with Gauge type metrics. Note that all metrics in
the agent will be gauge because sampling is already done
within cpp processes.
Metrics that are missing description & units:
In this case, we will notify cpp proceses that we need this
information. Cpp processes will then report description and units
of all metrics they have.
Args:
metric_point metric point defined in common.proto
Return:
True if given metrics are missing description and units.
False otherwise.
"""
metric_name = metric_point.metric_name
metric_description = metric_point.description
metric_units = metric_point.units
if self._registry[metric_name] is None:
tags = metric_point.tags
metric_tags = []
for tag_key in tags:
metric_tags.append(tag_key_module.TagKey(tag_key))
metric = Gauge(metric_name, metric_description, metric_units,
metric_tags)
self._registry[metric_name] = metric
self.view_manager.register_view(metric.view)
# If there are missing description & unit information,
# we should notify cpp processes that we need them.
if not metric_description or not metric_units:
self._missing_information = True
if metric_description and metric_units:
self._registry[metric_name].view._description = metric_description
self._registry[
metric_name].view.measure._description = metric_description
self._registry[metric_name].view.measure._unit = metric_units
self._missing_information = False
# Finally, export all the values
self.view_manager.measure_to_view_map.export(view_data_changed)
class PrometheusServiceDiscoveryWriter(threading.Thread):

View file

@ -58,23 +58,14 @@ class ReporterServer(reporter_pb2_grpc.ReporterServiceServicer):
return reporter_pb2.GetProfilingStatsReply(
profiling_stats=profiling_stats, std_out=stdout, std_err=stderr)
def ReportMetrics(self, request, context):
# NOTE: Exceptions are not propagated properly
# when we don't catch them here.
def ReportOCMetrics(self, request, context):
try:
metrcs_description_required = (
self.metrics_agent.record_metrics_points(
request.metrics_points))
except Exception as e:
logger.error(e)
self.metrics_agent.record_metric_points_from_protobuf(
request.metrics)
except Exception:
logger.error(traceback.format_exc())
# If metrics description is missing, we should notify cpp processes
# that we need them. Cpp processes will then report them to here.
# We need it when (1) a new metric is reported (application metric)
# (2) a reporter goes down and restarted (currently not implemented).
return reporter_pb2.ReportMetricsReply(
metrcs_description_required=metrcs_description_required)
return reporter_pb2.ReportOCMetricsReply()
def recursive_asdict(o):

View file

@ -1,190 +1,16 @@
import asyncio
import json
import time
from collections import defaultdict
from pprint import pformat
import requests
import pytest
from opencensus.tags import tag_key as tag_key_module
from prometheus_client.parser import text_string_to_metric_families
import ray
from ray.core.generated.common_pb2 import MetricPoint
from ray.dashboard.util import get_unused_port
from ray.metrics_agent import (Gauge, MetricsAgent,
PrometheusServiceDiscoveryWriter)
from ray.metrics_agent import PrometheusServiceDiscoveryWriter
from ray.experimental.metrics import Count, Histogram
from ray.test_utils import wait_for_condition, SignalActor
def generate_metrics_point(name: str,
value: float,
timestamp: int,
tags: dict,
description: str = None,
units: str = None):
return MetricPoint(
metric_name=name,
timestamp=timestamp,
value=value,
tags=tags,
description=description,
units=units)
# NOTE: Opencensus metrics is a singleton per process.
# That says, we should re-use the same agent for all tests.
# Please be careful when you add new tests here. If each
# test doesn't use different metrics, it can have some confliction.
metrics_agent = None
@pytest.fixture
def cleanup_agent():
global metrics_agent
if not metrics_agent:
metrics_agent = MetricsAgent(get_unused_port())
yield
metrics_agent._registry = defaultdict(lambda: None)
def test_gauge():
tags = [tag_key_module.TagKey(str(i)) for i in range(10)]
name = "name"
description = "description"
units = "units"
gauge = Gauge(name, description, units, tags)
assert gauge.__dict__()["name"] == name
assert gauge.__dict__()["description"] == description
assert gauge.__dict__()["units"] == units
assert gauge.__dict__()["tags"] == tags
def test_basic_e2e(cleanup_agent):
# Test the basic end to end workflow. This includes.
# - Metrics are reported.
# - Metrics are dynamically registered to registry.
# - Metrics are accessbiel from Prometheus.
POINTS_DEF = [0, 1, 2]
tag = {"TAG_KEY": "TAG_VALUE"}
metrics_points = [
generate_metrics_point(
str(i), float(i), i, tag, description=str(i), units=str(i))
for i in POINTS_DEF
]
metrics_points_dict = {
metric_point.metric_name: metric_point
for metric_point in metrics_points
}
assert metrics_agent.record_metrics_points(metrics_points) is False
# Make sure all metrics are registered.
for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()):
metric_name, metric_entry = metric_entry
assert metric_name == metric_entry.name
assert metric_entry.name == str(i)
assert metric_entry.description == str(i)
assert metric_entry.units == str(i)
assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag]
# Make sure all metrics are available through a port.
response = requests.get("http://localhost:{}".format(
metrics_agent.metrics_export_port))
response.raise_for_status()
for line in response.text.split("\n"):
for family in text_string_to_metric_families(line):
metric_name = family.name
if metric_name not in metrics_points_dict:
continue
if line.startswith("# HELP"):
# description
assert (family.documentation == metrics_points_dict[
metric_name].description)
else:
for sample in family.samples:
metrics_points_dict[metric_name].value == sample.value
def test_missing_def(cleanup_agent):
# Make sure when metrics with description and units are reported,
# agent updates its registry to include them.
POINTS_DEF = [4, 5, 6]
tag = {"TAG_KEY": "TAG_VALUE"}
metrics_points = [
generate_metrics_point(
str(i),
float(i),
i,
tag,
) for i in POINTS_DEF
]
# At first, metrics shouldn't have description and units.
assert metrics_agent.record_metrics_points(metrics_points) is True
for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()):
metric_name, metric_entry = metric_entry
assert metric_name == metric_entry.name
assert metric_entry.name == str(i)
assert metric_entry.description == ""
assert metric_entry.units == ""
assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag]
# The points are coming again with description and units.
# Make sure they are updated.
metrics_points = [
generate_metrics_point(
str(i), float(i), i, tag, description=str(i), units=str(i))
for i in POINTS_DEF
]
assert metrics_agent.record_metrics_points(metrics_points) is False
for i, metric_entry in zip(POINTS_DEF, metrics_agent.registry.items()):
metric_name, metric_entry = metric_entry
assert metric_name == metric_entry.name
assert metric_entry.name == str(i)
assert metric_entry.description == str(i)
assert metric_entry.units == str(i)
assert metric_entry.tags == [tag_key_module.TagKey(key) for key in tag]
def test_multiple_record(cleanup_agent):
# Make sure prometheus export data properly when multiple points with
# the same name is reported.
TOTAL_POINTS = 10
NAME = "TEST"
values = list(range(TOTAL_POINTS))
tags = [{"TAG_KEY": str(i)} for i in range(TOTAL_POINTS)]
timestamps = list(range(TOTAL_POINTS))
points = []
for i in range(TOTAL_POINTS):
points.append(
generate_metrics_point(
name=NAME,
value=values[i],
timestamp=timestamps[i],
tags=tags[i]))
for point in points:
metrics_agent.record_metrics_points([point])
# Make sure data is available at prometheus.
response = requests.get("http://localhost:{}".format(
metrics_agent.metrics_export_port))
response.raise_for_status()
sample_values = []
for line in response.text.split("\n"):
for family in text_string_to_metric_families(line):
metric_name = family.name
name_without_prefix = metric_name.split("_")[1]
if name_without_prefix != NAME:
continue
# Lines for recorded metrics values.
for sample in family.samples:
sample_values.append(sample.value)
assert sample_values == [point.value for point in points]
def test_prometheus_file_based_service_discovery(ray_start_cluster):
# Make sure Prometheus service discovery file is correctly written
# when number of nodes are dynamically changed.
@ -217,8 +43,8 @@ def test_prometheus_file_based_service_discovery(ray_start_cluster):
loaded_json_data["targets"]))
@pytest.mark.skip("This test is flaky right now. Will be fixed in #10080")
def test_metrics_export_end_to_end(ray_start_cluster):
@pytest.fixture
def _setup_cluster_for_test(ray_start_cluster):
NUM_NODES = 2
cluster = ray_start_cluster
# Add a head node.
@ -231,36 +57,25 @@ def test_metrics_export_end_to_end(ray_start_cluster):
cluster.wait_for_nodes()
ray.init(address=cluster.address)
signal = SignalActor.remote()
worker_should_exit = SignalActor.remote()
# Generate some metrics around actor & tasks.
# Generate some metrics from actor & tasks.
@ray.remote
def f():
counter = Count("test_counter", "desc", "unit", [])
ray.get(signal.send.remote())
while True:
counter.record(1, {})
time.sleep(0.1)
counter = Count(f"test_counter", "desc", "unit", [])
counter.record(1, {})
ray.get(worker_should_exit.wait.remote())
@ray.remote
class A:
async def ready(self):
pass
async def ping(self):
histogram = Histogram("test_histogram", "desc", "unit", [0, 1, 2],
histogram = Histogram("test_histogram", "desc", "unit", [0.1, 1.6],
[])
while True:
histogram.record(1, {})
await asyncio.sleep(0.1)
histogram.record(1.5, {})
ray.get(worker_should_exit.wait.remote())
obj_refs = [f.remote() for _ in range(30)]
a = A.remote()
obj_refs.append(a.ping.remote())
# Make sure both histogram and counter are created
ray.get(a.ready.remote())
ray.get(signal.wait.remote())
obj_refs = [f.remote(), a.ping.remote()]
node_info_list = ray.nodes()
prom_addresses = []
@ -269,62 +84,104 @@ def test_metrics_export_end_to_end(ray_start_cluster):
addr = node_info["NodeManagerAddress"]
prom_addresses.append(f"{addr}:{metrics_export_port}")
yield prom_addresses
ray.get(worker_should_exit.send.remote())
ray.get(obj_refs)
ray.shutdown()
cluster.shutdown()
def test_metrics_export_end_to_end(_setup_cluster_for_test):
TEST_TIMEOUT_S = 20
prom_addresses = _setup_cluster_for_test
# Make sure we can ping Prometheus endpoints.
def fetch_prometheus(prom_addresses):
components_dict = {}
metric_names = set()
metric_samples = []
for address in prom_addresses:
if address not in components_dict:
components_dict[address] = set()
try:
response = requests.get(
"http://localhost:{}".format(metrics_export_port))
response = requests.get(f"http://{address}/metrics")
except requests.exceptions.ConnectionError:
return components_dict, metric_names
continue
for line in response.text.split("\n"):
for family in text_string_to_metric_families(line):
for sample in family.samples:
# print(sample)
metric_names.add(sample.name)
metric_samples.append(sample)
if "Component" in sample.labels:
components_dict[address].add(
sample.labels["Component"])
return components_dict, metric_names
return components_dict, metric_names, metric_samples
def test_prometheus_endpoint():
# TODO(Simon): Add a gcs_server after fixing metrics.
components_dict, metric_names = fetch_prometheus(prom_addresses)
def test_cases():
components_dict, metric_names, metric_samples = fetch_prometheus(
prom_addresses)
# Raylet should be on every node
expected_components = {"raylet"}
components_found = all(
expected_components.issubset(components)
for components in components_dict.values())
assert all(
"raylet" in components for components in components_dict.values())
# Core worker should be on at least one node
components_found = components_found and any(
"core_worker" in components
for components in components_dict.values())
# GCS server should be on one node
assert any("gcs_server" in components
for components in components_dict.values())
expected_metric_names = {"ray_test_counter", "ray_test_histogram_max"}
metric_names_found = expected_metric_names.issubset(metric_names)
# Core worker should be on at least on node
assert any("core_worker" in components
for components in components_dict.values())
return components_found and metric_names_found
# Make sure our user defined metrics exist
for metric_name in ["test_counter", "test_histogram"]:
assert any(metric_name in full_name for full_name in metric_names)
# Make sure the numeric value is correct
test_counter_sample = [
m for m in metric_samples if "test_counter" in m.name
][0]
assert test_counter_sample.value == 1.0
# Make sure the numeric value is correct
test_histogram_samples = [
m for m in metric_samples if "test_histogram" in m.name
]
buckets = {
m.labels["le"]: m.value
for m in test_histogram_samples if "_bucket" in m.name
}
# We recorded value 1.5 for the histogram. In Prometheus data model
# the histogram is cumulative. So we expect the count to appear in
# <1.1 and <+Inf buckets.
assert buckets == {"0.1": 0.0, "1.6": 1.0, "+Inf": 1.0}
hist_count = [m for m in test_histogram_samples
if "_count" in m.name][0].value
hist_sum = [m for m in test_histogram_samples
if "_sum" in m.name][0].value
assert hist_count == 1
assert hist_sum == 1.5
def wrap_test_case_for_retry():
try:
test_cases()
return True
except AssertionError:
return False
try:
wait_for_condition(
test_prometheus_endpoint,
timeout=20,
wrap_test_case_for_retry,
timeout=TEST_TIMEOUT_S,
retry_interval_ms=1000, # Yield resource for other processes
)
except RuntimeError:
# This is for debugging when test failed.
raise RuntimeError(
"All components were not visible to "
"prometheus endpoints on time. "
f"The compoenents are {fetch_prometheus(prom_addresses)}")
ray.shutdown()
print(
f"The compoenents are {pformat(fetch_prometheus(prom_addresses))}")
test_cases() # Should fail assert
if __name__ == "__main__":

View file

@ -56,7 +56,10 @@ python_grpc_compile(
proto_library(
name = "reporter_proto",
srcs = ["reporter.proto"],
deps = [":common_proto"],
deps = [
":common_proto",
"@io_opencensus_proto//opencensus/proto/metrics/v1:metrics_proto"
],
)
cc_proto_library(

View file

@ -16,6 +16,8 @@ syntax = "proto3";
package ray.rpc;
import "opencensus/proto/metrics/v1/metrics.proto";
import "src/ray/protobuf/common.proto";
message GetProfilingStatsRequest {
@ -42,10 +44,19 @@ message ReportMetricsReply {
bool metrcs_description_required = 1;
}
message ReportOCMetricsRequest {
repeated opencensus.proto.metrics.v1.Metric metrics = 1;
}
message ReportOCMetricsReply {
}
// Service for communicating with the reporter.py process on a remote node.
service ReporterService {
// Get the profiling stats.
rpc GetProfilingStats(GetProfilingStatsRequest) returns (GetProfilingStatsReply);
// Report metrics to the local metrics agents.
rpc ReportMetrics(ReportMetricsRequest) returns (ReportMetricsReply);
// Report OpenCensus metrics to the local metrics agent.
rpc ReportOCMetrics(ReportOCMetricsRequest) returns (ReportOCMetricsReply);
}

View file

@ -47,6 +47,12 @@ class MetricsAgentClient {
/// \param[in] callback The callback function that handles reply.
VOID_RPC_CLIENT_METHOD(ReporterService, ReportMetrics, grpc_client_, )
/// Report open census protobuf metrics to metrics agent.
///
/// \param[in] request The request message.
/// \param[in] callback The callback function that handles reply.
VOID_RPC_CLIENT_METHOD(ReporterService, ReportOCMetrics, grpc_client_, )
private:
/// The RPC client.
std::unique_ptr<GrpcClient<ReporterService>> grpc_client_;

View file

@ -20,7 +20,7 @@ namespace ray {
namespace stats {
template <>
void MetricExporter::ExportToPoints(
void MetricPointExporter::ExportToPoints(
const opencensus::stats::ViewData::DataMap<opencensus::stats::Distribution>
&view_data,
const opencensus::stats::MeasureDescriptor &measure_descriptor,
@ -75,7 +75,7 @@ void MetricExporter::ExportToPoints(
}
}
void MetricExporter::ExportViewData(
void MetricPointExporter::ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) {
std::vector<MetricPoint> points;
@ -110,5 +110,108 @@ void MetricExporter::ExportViewData(
metric_exporter_client_->ReportMetrics(points);
}
OpenCensusProtoExporter::OpenCensusProtoExporter(const int port,
boost::asio::io_service &io_service,
const std::string address)
: client_call_manager_(io_service) {
client_.reset(new rpc::MetricsAgentClient(address, port, client_call_manager_));
};
void OpenCensusProtoExporter::ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) {
// Start converting opencensus data into their protobuf format.
// The format can be found here
// https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/metrics/v1/metrics.proto
rpc::ReportOCMetricsRequest request_proto;
for (const auto &datum : data) {
// Unpack the fields we need for in memory data structure.
auto &view_descriptor = datum.first;
auto &view_data = datum.second;
auto &measure_descriptor = view_descriptor.measure_descriptor();
// Create one metric `Point` in protobuf.
auto request_point_proto = request_proto.add_metrics();
// Write the `MetricDescriptor`.
auto metric_descriptor_proto = request_point_proto->mutable_metric_descriptor();
metric_descriptor_proto->set_name(measure_descriptor.name());
metric_descriptor_proto->set_description(measure_descriptor.description());
metric_descriptor_proto->set_unit(measure_descriptor.units());
for (const auto &tag_key : view_descriptor.columns()) {
metric_descriptor_proto->add_label_keys()->set_key(tag_key.name());
};
// Helpers for writing the actual `TimeSeries`.
auto start_time = absl::ToUnixSeconds(view_data.start_time());
auto end_time = absl::ToUnixSeconds(view_data.end_time());
auto make_new_data_point_proto = [&request_point_proto, start_time, end_time](
const std::vector<std::string> &tag_values) {
auto metric_timeseries_proto = request_point_proto->add_timeseries();
metric_timeseries_proto->mutable_start_timestamp()->set_seconds(start_time);
for (const auto &value : tag_values) {
metric_timeseries_proto->add_label_values()->set_value(value);
};
auto point_proto = metric_timeseries_proto->add_points();
point_proto->mutable_timestamp()->set_seconds(end_time);
return point_proto;
};
// Write the `TimeSeries` for the given aggregated data type.
switch (view_data.type()) {
case opencensus::stats::ViewData::Type::kDouble:
for (const auto &row : view_data.double_data()) {
auto point_proto = make_new_data_point_proto(row.first /*tag_values*/);
point_proto->set_double_value(row.second);
}
break;
case opencensus::stats::ViewData::Type::kInt64:
for (const auto &row : view_data.int_data()) {
auto point_proto = make_new_data_point_proto(row.first /*tag_values*/);
point_proto->set_int64_value(row.second);
}
break;
case opencensus::stats::ViewData::Type::kDistribution:
for (const auto &row : view_data.distribution_data()) {
opencensus::stats::Distribution dist_value = row.second;
auto point_proto = make_new_data_point_proto(row.first /*tag_values*/);
// Copy in memory data into `DistributionValue` protobuf.
auto distribution_proto = point_proto->mutable_distribution_value();
distribution_proto->set_count(dist_value.count());
distribution_proto->set_sum(dist_value.count() * dist_value.mean());
distribution_proto->set_sum_of_squared_deviation(
dist_value.sum_of_squared_deviation());
// Write the `BucketOption` and `Bucket` data.
auto bucket_opt_proto =
distribution_proto->mutable_bucket_options()->mutable_explicit_();
for (const auto &bound : dist_value.bucket_boundaries().lower_boundaries()) {
bucket_opt_proto->add_bounds(bound);
}
for (const auto &count : dist_value.bucket_counts()) {
distribution_proto->add_buckets()->set_count(count);
}
}
break;
default:
RAY_LOG(FATAL) << "Unknown view data type.";
break;
}
}
client_->ReportOCMetrics(
request_proto, [](const Status &status, const rpc::ReportOCMetricsReply &reply) {
RAY_UNUSED(reply);
if (!status.ok()) {
RAY_LOG(WARNING) << "Export metrics to agent failed: " << status;
}
});
}
} // namespace stats
} // namespace ray

View file

@ -13,9 +13,11 @@
// limitations under the License.
#pragma once
#include <boost/asio.hpp>
#include "absl/memory/memory.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/tag_key.h"
#include "ray/rpc/client_call.h"
#include "ray/stats/metric.h"
#include "ray/stats/metric_exporter_client.h"
#include "ray/util/logging.h"
@ -28,19 +30,21 @@ namespace stats {
/// opencensus data view, and sends it to the remote (for example
/// sends metrics to dashboard agents through RPC). How to use it? Register metrics
/// exporter after a main thread launched.
class MetricExporter final : public opencensus::stats::StatsExporter::Handler {
class MetricPointExporter final : public opencensus::stats::StatsExporter::Handler {
public:
explicit MetricExporter(std::shared_ptr<MetricExporterClient> metric_exporter_client,
size_t report_batch_size = kDefaultBatchSize)
explicit MetricPointExporter(
std::shared_ptr<MetricExporterClient> metric_exporter_client,
size_t report_batch_size = kDefaultBatchSize)
: metric_exporter_client_(metric_exporter_client),
report_batch_size_(report_batch_size) {}
~MetricExporter() = default;
~MetricPointExporter() = default;
static void Register(std::shared_ptr<MetricExporterClient> metric_exporter_client,
size_t report_batch_size) {
opencensus::stats::StatsExporter::RegisterPushHandler(
absl::make_unique<MetricExporter>(metric_exporter_client, report_batch_size));
absl::make_unique<MetricPointExporter>(metric_exporter_client,
report_batch_size));
}
void ExportViewData(
@ -84,5 +88,29 @@ class MetricExporter final : public opencensus::stats::StatsExporter::Handler {
size_t report_batch_size_;
};
class OpenCensusProtoExporter final : public opencensus::stats::StatsExporter::Handler {
public:
OpenCensusProtoExporter(const int port, boost::asio::io_service &io_service,
const std::string address);
~OpenCensusProtoExporter() = default;
static void Register(const int port, boost::asio::io_service &io_service,
const std::string address) {
opencensus::stats::StatsExporter::RegisterPushHandler(
absl::make_unique<OpenCensusProtoExporter>(port, io_service, address));
}
void ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) override;
private:
/// Call Manager for gRPC client.
rpc::ClientCallManager client_call_manager_;
/// Client to call a metrics agent gRPC server.
std::unique_ptr<rpc::MetricsAgentClient> client_;
};
} // namespace stats
} // namespace ray

View file

@ -42,45 +42,11 @@ void MetricExporterDecorator::ReportMetrics(const std::vector<MetricPoint> &poin
///
/// Metrics Agent Exporter
///
MetricsAgentExporter::MetricsAgentExporter(std::shared_ptr<MetricExporterClient> exporter,
const int port,
boost::asio::io_service &io_service,
const std::string address)
: MetricExporterDecorator(exporter), client_call_manager_(io_service) {
client_.reset(new rpc::MetricsAgentClient(address, port, client_call_manager_));
}
MetricsAgentExporter::MetricsAgentExporter(std::shared_ptr<MetricExporterClient> exporter)
: MetricExporterDecorator(exporter) {}
void MetricsAgentExporter::ReportMetrics(const std::vector<MetricPoint> &points) {
MetricExporterDecorator::ReportMetrics(points);
rpc::ReportMetricsRequest request;
for (auto point : points) {
auto metric_point = request.add_metrics_points();
metric_point->set_metric_name(point.metric_name);
metric_point->set_timestamp(point.timestamp);
metric_point->set_value(point.value);
auto mutable_tags = metric_point->mutable_tags();
for (auto &tag : point.tags) {
(*mutable_tags)[tag.first] = tag.second;
}
// If description and units information is requested from
// the metrics agent, append the information.
// TODO(sang): It can be inefficient if there are lots of new registered metrics.
// We should make it more efficient if there's compelling use cases.
if (should_update_description_) {
metric_point->set_description(point.measure_descriptor.description());
metric_point->set_units(point.measure_descriptor.units());
}
}
should_update_description_ = false;
// TODO(sang): Should retry metrics report if it fails.
client_->ReportMetrics(
request, [this](const Status &status, const rpc::ReportMetricsReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING) << "ReportMetrics failed with status " << status;
}
should_update_description_ = reply.metrcs_description_required();
});
}
} // namespace stats

View file

@ -14,9 +14,6 @@
#pragma once
#include <boost/asio.hpp>
#include "ray/rpc/client_call.h"
#include "ray/rpc/metrics_agent_client.h"
#include "ray/stats/metric.h"
@ -57,20 +54,11 @@ class MetricExporterDecorator : public MetricExporterClient {
class MetricsAgentExporter : public MetricExporterDecorator {
public:
MetricsAgentExporter(std::shared_ptr<MetricExporterClient> exporter, const int port,
boost::asio::io_service &io_service, const std::string address);
MetricsAgentExporter(std::shared_ptr<MetricExporterClient> exporter);
~MetricsAgentExporter() {}
void ReportMetrics(const std::vector<MetricPoint> &points) override;
private:
/// Client to call a metrics agent gRPC server.
std::unique_ptr<rpc::MetricsAgentClient> client_;
/// Call Manager for gRPC client.
rpc::ClientCallManager client_call_manager_;
/// Whether or not description and units information for metrics should be updated.
bool should_update_description_ = true;
};
} // namespace stats

View file

@ -83,8 +83,7 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por
// Default exporter is a metrics agent exporter.
if (exporter_to_use == nullptr) {
std::shared_ptr<MetricExporterClient> stdout_exporter(new StdoutExporterClient());
exporter.reset(new MetricsAgentExporter(stdout_exporter, metrics_agent_port,
(*metrics_io_service), "127.0.0.1"));
exporter.reset(new MetricsAgentExporter(stdout_exporter));
} else {
exporter = exporter_to_use;
}
@ -96,7 +95,9 @@ static inline void Init(const TagsType &global_tags, const int metrics_agent_por
absl::Milliseconds(std::max(RayConfig::instance().metrics_report_interval_ms() / 2,
static_cast<uint64_t>(500))));
MetricExporter::Register(exporter, metrics_report_batch_size);
MetricPointExporter::Register(exporter, metrics_report_batch_size);
OpenCensusProtoExporter::Register(metrics_agent_port, (*metrics_io_service),
"127.0.0.1");
opencensus::stats::StatsExporter::SetInterval(
StatsConfig::instance().GetReportInterval());
opencensus::stats::DeltaProducer::Get()->SetHarvestInterval(