[Streaming] Support streaming metric reporter (#17981)

* Streaming support metric reporter

* fix lint

* fix bazel format lint

* fix lint

* metric deps lint

* lint

* and comments for runtime reporter

* unordered_map instead

* comments

* fix visibility flag

* deps local .so target

* make stats public visibility

* stats lib in public

* add antgroup team tag
This commit is contained in:
Lingxuan Zuo 2021-09-08 14:36:00 +08:00 committed by GitHub
parent df9c6aa863
commit 46b941b702
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 748 additions and 1 deletions

View file

@ -588,6 +588,7 @@ cc_library(
"src/ray/stats/tag_defs.h",
],
strip_include_prefix = "src",
visibility = ["//visibility:public"],
deps = [
":ray_util",
"@com_github_jupp0r_prometheus_cpp//pull",
@ -623,6 +624,7 @@ cc_library(
],
}),
strip_include_prefix = "src",
visibility = ["//visibility:public"],
deps = [
":reporter_rpc",
":stats_metric",

View file

@ -23,6 +23,9 @@
*ray*ObjectID*
*ray*ObjectReference*
# Others
*ray*metrics*
*opencensus*
*ray*stats*
*ray*CoreWorker*
*PyInit*
*init_raylet*

View file

@ -25,6 +25,9 @@ VERSION_1.0 {
*ray*ObjectID*;
*ray*ObjectReference*;
# Others
*ray*metrics*;
*opencensus*;
*ray*stats*;
*ray*CoreWorker*;
*PyInit*;
*init_raylet*;

View file

@ -60,6 +60,14 @@ cc_binary(
deps = ["//:ray_common"],
)
cc_binary(
name = "stats_lib.so",
copts = COPTS,
linkshared = 1,
visibility = ["//visibility:public"],
deps = ["//:stats_lib"],
)
cc_binary(
name = "core_worker_lib.so",
copts = COPTS,
@ -93,6 +101,24 @@ cc_library(
],
)
cc_library(
name = "streaming_metrics",
srcs = glob([
"src/metrics/*.cc",
]),
hdrs = glob([
"src/metrics/*.h",
]),
copts = COPTS,
strip_include_prefix = "src",
visibility = ["//visibility:public"],
deps = [
"stats_lib.so",
":streaming_config",
":streaming_util",
],
)
cc_library(
name = "streaming_config",
srcs = glob([
@ -235,6 +261,7 @@ cc_library(
":streaming_common",
":streaming_config",
":streaming_message",
":streaming_metrics",
":streaming_queue",
":streaming_reliability",
":streaming_util",
@ -319,6 +346,16 @@ cc_test(
deps = test_common_deps,
)
cc_test(
name = "streaming_perf_tests",
srcs = [
"src/test/streaming_perf_tests.cc",
],
copts = COPTS,
tags = ["team:ant-group"],
deps = test_common_deps,
)
cc_test(
name = "event_service_tests",
srcs = [

View file

@ -16,6 +16,19 @@ using StreamingRole = proto::NodeType;
TYPE Get##NAME() const { return VALUE; } \
void Set##NAME(TYPE value) { VALUE = value; }
using TagsMap = std::unordered_map<std::string, std::string>;
class StreamingMetricsConfig {
public:
DECL_GET_SET_PROPERTY(const std::string &, MetricsServiceName, metrics_service_name_);
DECL_GET_SET_PROPERTY(uint32_t, MetricsReportInterval, metrics_report_interval_);
DECL_GET_SET_PROPERTY(const TagsMap, MetricsGlobalTags, global_tags);
private:
std::string metrics_service_name_ = "streaming";
uint32_t metrics_report_interval_ = 10;
std::unordered_map<std::string, std::string> global_tags;
};
class StreamingConfig {
public:
static uint64_t TIME_WAIT_UINT;
@ -50,6 +63,7 @@ class StreamingConfig {
ReliabilityLevel streaming_strategy_ = ReliabilityLevel::EXACTLY_ONCE;
StreamingRole streaming_role = StreamingRole::TRANSFORM;
bool metrics_enable = true;
public:
void FromProto(const uint8_t *, uint32_t size);
@ -74,6 +88,7 @@ class StreamingConfig {
event_driven_flow_control_interval_)
DECL_GET_SET_PROPERTY(StreamingRole, StreamingRole, streaming_role)
DECL_GET_SET_PROPERTY(ReliabilityLevel, ReliabilityLevel, streaming_strategy_)
DECL_GET_SET_PROPERTY(bool, MetricsEnable, metrics_enable)
uint32_t GetRingBufferCapacity() const;
/// Note(lingxuan.zlx), RingBufferCapacity's valid range is from 1 to

View file

@ -1,3 +1,17 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <cstring>

View file

@ -0,0 +1,117 @@
#include "metrics/stats_reporter.h"
#include "util/streaming_logging.h"
namespace ray {
namespace streaming {
std::shared_ptr<ray::stats::Metric> StatsReporter::GetMetricByName(
const std::string &metric_name) {
std::unique_lock<std::mutex> lock(metric_mutex_);
auto metric = metric_map_.find(metric_name);
if (metric != metric_map_.end()) {
return metric->second;
}
return nullptr;
}
void StatsReporter::MetricRegister(const std::string &metric_name,
std::shared_ptr<ray::stats::Metric> metric) {
std::unique_lock<std::mutex> lock(metric_mutex_);
metric_map_[metric_name] = metric;
}
void StatsReporter::UnregisterAllMetrics() {
std::unique_lock<std::mutex> lock(metric_mutex_);
metric_map_.clear();
}
bool StatsReporter::Start(const StreamingMetricsConfig &conf) {
global_tags_ = conf.GetMetricsGlobalTags();
service_name_ = conf.GetMetricsServiceName();
STREAMING_LOG(INFO) << "Start stats reporter, service name " << service_name_
<< ", global tags size : " << global_tags_.size()
<< ", stats disabled : "
<< stats::StatsConfig::instance().IsStatsDisabled();
for (auto &tag : global_tags_) {
global_tag_key_list_.push_back(stats::TagKeyType::Register(tag.first));
}
return true;
}
bool StatsReporter::Start(const std::string &json_string) { return true; }
StatsReporter::~StatsReporter() {
STREAMING_LOG(WARNING) << "stats client shutdown";
Shutdown();
};
void StatsReporter::Shutdown() { UnregisterAllMetrics(); }
void StatsReporter::UpdateCounter(const std::string &domain,
const std::string &group_name,
const std::string &short_name, double value) {
const std::string merged_metric_name =
METRIC_GROUP_JOIN(domain, group_name, short_name);
}
void StatsReporter::UpdateCounter(
const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags, double value) {
STREAMING_LOG(DEBUG) << "Report counter metric " << metric_name << " , value " << value;
}
void StatsReporter::UpdateGauge(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value,
bool is_reset) {
const std::string merged_metric_name =
service_name_ + "." + METRIC_GROUP_JOIN(domain, group_name, short_name);
STREAMING_LOG(DEBUG) << "Report gauge metric " << merged_metric_name << " , value "
<< value;
auto metric = GetMetricByName(merged_metric_name);
if (nullptr == metric) {
metric = std::shared_ptr<ray::stats::Metric>(
new ray::stats::Gauge(merged_metric_name, "", "", global_tag_key_list_));
MetricRegister(merged_metric_name, metric);
}
metric->Record(value, global_tags_);
}
void StatsReporter::UpdateGauge(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value, bool is_reset) {
const std::string merged_metric_name = service_name_ + "." + metric_name;
STREAMING_LOG(DEBUG) << "Report gauge metric " << merged_metric_name << " , value "
<< value;
// Get metric from registered map, create a new one item if no such metric can be found
// in map.
auto metric = GetMetricByName(metric_name);
if (nullptr == metric) {
// Register tag key for all tags.
std::vector<stats::TagKeyType> tag_key_list(global_tag_key_list_.begin(),
global_tag_key_list_.end());
for (auto &tag : tags) {
tag_key_list.push_back(stats::TagKeyType::Register(tag.first));
}
metric = std::shared_ptr<ray::stats::Metric>(
new ray::stats::Gauge(merged_metric_name, "", "", tag_key_list));
MetricRegister(merged_metric_name, metric);
}
auto merged_tags = MergeGlobalTags(tags);
metric->Record(value, merged_tags);
}
void StatsReporter::UpdateHistogram(const std::string &domain,
const std::string &group_name,
const std::string &short_name, double value,
double min_value, double max_value) {}
void StatsReporter::UpdateHistogram(
const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags, double value,
double min_value, double max_value) {}
void StatsReporter::UpdateQPS(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value) {}
} // namespace streaming
} // namespace ray

View file

@ -0,0 +1,88 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <mutex>
#include "ray/stats/metric.h"
#include "streaming_perf_metric.h"
namespace ray {
namespace streaming {
class StatsReporter : public StreamingReporterInterface {
public:
virtual ~StatsReporter();
bool Start(const StreamingMetricsConfig &conf) override;
bool Start(const std::string &json_string);
void Shutdown() override;
void UpdateCounter(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value) override;
void UpdateGauge(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value,
bool is_reset = true) override;
void UpdateHistogram(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value, double min_value,
double max_value) override;
void UpdateCounter(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value) override;
void UpdateGauge(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags, double value,
bool is_rest = true) override;
void UpdateHistogram(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value, double min_value, double max_value) override;
void UpdateQPS(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value) override;
protected:
std::shared_ptr<ray::stats::Metric> GetMetricByName(const std::string &metric_name);
void MetricRegister(const std::string &metric_name,
std::shared_ptr<ray::stats::Metric> metric);
void UnregisterAllMetrics();
private:
inline std::unordered_map<std::string, std::string> MergeGlobalTags(
const std::unordered_map<std::string, std::string> &tags) {
std::unordered_map<std::string, std::string> merged_tags;
merged_tags.insert(global_tags_.begin(), global_tags_.end());
for (auto &item : tags) {
merged_tags.emplace(item.first, item.second);
}
return merged_tags;
}
private:
std::mutex metric_mutex_;
std::unordered_map<std::string, std::shared_ptr<ray::stats::Metric>> metric_map_;
std::unordered_map<std::string, std::string> global_tags_;
std::vector<stats::TagKeyType> global_tag_key_list_;
std::string service_name_;
};
} // namespace streaming
} // namespace ray

View file

@ -0,0 +1,97 @@
#include <sstream>
#include "metrics/stats_reporter.h"
#include "metrics/streaming_perf_metric.h"
#include "util/streaming_logging.h"
namespace ray {
namespace streaming {
bool StreamingReporter::Start(const StreamingMetricsConfig &conf) {
if (impl_) {
STREAMING_LOG(WARNING) << "Streaming perf is active";
} else {
impl_.reset(new StatsReporter());
return impl_->Start(conf);
}
return false;
}
void StreamingReporter::Shutdown() {
if (impl_) {
impl_->Shutdown();
impl_.reset();
} else {
STREAMING_LOG(WARNING) << "No active perf instance will be shutdown";
}
}
void StreamingReporter::UpdateCounter(const std::string &domain,
const std::string &group_name,
const std::string &short_name, double value) {
if (impl_) {
impl_->UpdateCounter(domain, group_name, short_name, value);
} else {
STREAMING_LOG(WARNING) << "No active perf instance";
}
}
void StreamingReporter::UpdateGauge(const std::string &domain,
const std::string &group_name,
const std::string &short_name, double value,
bool is_reset) {
if (impl_) {
impl_->UpdateGauge(domain, group_name, short_name, value, is_reset);
} else {
STREAMING_LOG(WARNING) << "No active perf instance";
}
}
void StreamingReporter::UpdateHistogram(const std::string &domain,
const std::string &group_name,
const std::string &short_name, double value,
double min_value, double max_value) {
if (impl_) {
impl_->UpdateHistogram(domain, group_name, short_name, value, min_value, max_value);
} else {
STREAMING_LOG(WARNING) << "No active perf instance";
}
}
void StreamingReporter::UpdateQPS(
const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags, double value) {
if (impl_) {
impl_->UpdateQPS(metric_name, tags, value);
} else {
STREAMING_LOG(WARNING) << "No active perf instance";
}
}
StreamingReporter::~StreamingReporter() {
if (impl_) {
STREAMING_LOG(INFO) << "Destory streamimg perf => " << impl_.get();
Shutdown();
}
}
void StreamingReporter::UpdateCounter(
const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags, double value) {
if (impl_) {
impl_->UpdateCounter(metric_name, tags, value);
}
}
void StreamingReporter::UpdateGauge(
const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags, double value,
bool is_rest) {
if (impl_) {
impl_->UpdateGauge(metric_name, tags, value, is_rest);
}
}
void StreamingReporter::UpdateHistogram(
const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags, double value,
double min_value, double max_value) {}
} // namespace streaming
} // namespace ray

View file

@ -0,0 +1,97 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "config/streaming_config.h"
namespace ray {
namespace streaming {
#define METRIC_GROUP_JOIN(a, b, c) (a + "." + b + "." + c)
class StreamingReporterInterface {
public:
virtual ~StreamingReporterInterface() = default;
virtual bool Start(const StreamingMetricsConfig &conf) = 0;
virtual void Shutdown() = 0;
virtual void UpdateCounter(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value) = 0;
virtual void UpdateGauge(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value,
bool is_reset) = 0;
virtual void UpdateHistogram(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value,
double min_value, double max_value) = 0;
virtual void UpdateCounter(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value) = 0;
virtual void UpdateGauge(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value, bool is_rest) = 0;
virtual void UpdateHistogram(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value, double min_value, double max_value) = 0;
virtual void UpdateQPS(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value) = 0;
};
/// Streaming perf is a reporter instance based multiple backend.
/// Other modules can report gauge/histogram/counter/qps measurement to meteric server
/// side.
class StreamingReporter : public StreamingReporterInterface {
public:
StreamingReporter(){};
virtual ~StreamingReporter();
bool Start(const StreamingMetricsConfig &conf) override;
void Shutdown() override;
void UpdateCounter(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value) override;
void UpdateGauge(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value,
bool is_reset = true) override;
void UpdateHistogram(const std::string &domain, const std::string &group_name,
const std::string &short_name, double value, double min_value,
double max_value) override;
void UpdateCounter(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value) override;
void UpdateGauge(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags, double value,
bool is_rest = true) override;
void UpdateHistogram(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value, double min_value, double max_value) override;
void UpdateQPS(const std::string &metric_name,
const std::unordered_map<std::string, std::string> &tags,
double value) override;
private:
std::unique_ptr<StreamingReporterInterface> impl_;
};
} // namespace streaming
} // namespace ray

View file

@ -4,6 +4,7 @@
#include "ray/util/util.h"
#include "src/ray/protobuf/common.pb.h"
#include "util/streaming_logging.h"
#include "util/streaming_util.h"
namespace ray {
namespace streaming {
@ -26,7 +27,111 @@ void RuntimeContext::SetConfig(const uint8_t *data, uint32_t size) {
RuntimeContext::~RuntimeContext() {}
RuntimeContext::RuntimeContext() : runtime_status_(RuntimeStatus::Init) {}
RuntimeContext::RuntimeContext()
: enable_timer_service_(false), runtime_status_(RuntimeStatus::Init) {}
void RuntimeContext::InitMetricsReporter() {
STREAMING_LOG(INFO) << "init metrics";
if (!config_.GetMetricsEnable()) {
STREAMING_LOG(WARNING) << "metrics is disable";
return;
}
perf_metrics_reporter_.reset(new StreamingReporter());
std::unordered_map<std::string, std::string> default_tag_map = {
{"role", NodeType_Name(config_.GetNodeType())},
{"op_name", config_.GetOpName()},
{"worker_name", config_.GetWorkerName()}};
metrics_config_.SetMetricsGlobalTags(default_tag_map);
perf_metrics_reporter_->Start(metrics_config_);
}
void RuntimeContext::ReportMetrics(
const std::string &metric_name, double value,
const std::unordered_map<std::string, std::string> &tags) {
if (config_.GetMetricsEnable()) {
perf_metrics_reporter_->UpdateGauge(metric_name, tags, value);
}
}
void RuntimeContext::RunTimer() {
AutoSpinLock lock(report_flag_);
if (runtime_status_ != RuntimeStatus::Running) {
STREAMING_LOG(WARNING) << "Run timer failed in state "
<< static_cast<uint8_t>(runtime_status_);
return;
}
STREAMING_LOG(INFO) << "Streaming metric timer called, interval="
<< metrics_config_.GetMetricsReportInterval();
if (async_io_.stopped()) {
STREAMING_LOG(INFO) << "Async io stopped, return from timer reporting.";
return;
}
this->report_timer_handler_();
boost::posix_time::seconds interval(metrics_config_.GetMetricsReportInterval());
metrics_timer_->expires_from_now(interval);
metrics_timer_->async_wait([this](const boost::system::error_code &e) {
if (boost::asio::error::operation_aborted == e) {
return;
}
this->RunTimer();
});
}
void RuntimeContext::EnableTimer(std::function<void()> report_timer_handler) {
if (!config_.GetMetricsEnable()) {
STREAMING_LOG(WARNING) << "Streaming metrics disabled.";
return;
}
if (enable_timer_service_) {
STREAMING_LOG(INFO) << "Timer service already enabled";
return;
}
this->report_timer_handler_ = report_timer_handler;
STREAMING_LOG(INFO) << "Streaming metric timer enabled";
// We new a thread for timer if timer is not alive currently.
if (!timer_thread_) {
async_io_.reset();
boost::posix_time::seconds interval(metrics_config_.GetMetricsReportInterval());
metrics_timer_.reset(new boost::asio::deadline_timer(async_io_, interval));
metrics_timer_->async_wait(
[this](const boost::system::error_code & /*e*/) { this->RunTimer(); });
timer_thread_ = std::make_shared<std::thread>([this]() {
STREAMING_LOG(INFO) << "Async io running.";
async_io_.run();
});
STREAMING_LOG(INFO) << "New thread " << timer_thread_->get_id();
}
enable_timer_service_ = true;
}
void RuntimeContext::ShutdownTimer() {
{
AutoSpinLock lock(report_flag_);
if (!config_.GetMetricsEnable()) {
STREAMING_LOG(WARNING) << "Streaming metrics disabled";
return;
}
if (!enable_timer_service_) {
STREAMING_LOG(INFO) << "Timer service already disabled";
return;
}
STREAMING_LOG(INFO) << "Timer server shutdown";
enable_timer_service_ = false;
STREAMING_LOG(INFO) << "Cancel metrics timer.";
metrics_timer_->cancel();
}
STREAMING_LOG(INFO) << "Wake up all reporting conditions.";
if (timer_thread_) {
STREAMING_LOG(INFO) << "Join and reset timer thread.";
if (timer_thread_->joinable()) {
timer_thread_->join();
}
timer_thread_.reset();
metrics_timer_.reset();
}
}
} // namespace streaming
} // namespace ray

View file

@ -1,9 +1,26 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <string>
#include "common/status.h"
#include "config/streaming_config.h"
#include "metrics/streaming_perf_metric.h"
namespace ray {
namespace streaming {
@ -30,9 +47,41 @@ class RuntimeContext {
inline void MarkMockTest() { is_mock_test_ = true; }
inline bool IsMockTest() { return is_mock_test_; }
void InitMetricsReporter();
/// It's periodic reporter entry for all runtime modules.
/// \param metric_name, metric name
/// \param value, metric value
/// \param tags, metric tag map
void ReportMetrics(const std::string &metric_name, double value,
const std::unordered_map<std::string, std::string> &tags = {});
/// Enable and register a specific reporting timer for updating all of metrics.
/// \param reporter_timer_handler
void EnableTimer(std::function<void()> report_timer_handler);
/// Halt the timer invoking from now on.
void ShutdownTimer();
private:
void RunTimer();
protected:
std::unique_ptr<StreamingReporter> perf_metrics_reporter_;
std::function<void()> report_timer_handler_;
boost::asio::io_service async_io_;
private:
bool enable_timer_service_;
std::unique_ptr<boost::asio::deadline_timer> metrics_timer_;
std::shared_ptr<std::thread> timer_thread_;
std::atomic_flag report_flag_ = ATOMIC_FLAG_INIT;
StreamingConfig config_;
RuntimeStatus runtime_status_;
StreamingMetricsConfig metrics_config_;
bool is_mock_test_ = false;
};

View file

@ -0,0 +1,120 @@
#include <chrono>
#include <cstdlib>
#include <string>
#include <thread>
#include "opencensus/stats/internal/delta_producer.h"
#include "opencensus/stats/internal/stats_exporter_impl.h"
#include "ray/stats/stats.h"
#include "config/streaming_config.h"
#include "gtest/gtest.h"
#include "metrics/streaming_perf_metric.h"
using namespace ray::streaming;
using namespace ray;
class StreamingReporterCounterTest : public ::testing::Test {
public:
using UpdateFunc = std::function<void(size_t)>;
void SetUp() {
uint32_t kReportFlushInterval = 100;
absl::Duration report_interval = absl::Milliseconds(kReportFlushInterval);
absl::Duration harvest_interval = absl::Milliseconds(kReportFlushInterval / 2);
ray::stats::StatsConfig::instance().SetReportInterval(report_interval);
ray::stats::StatsConfig::instance().SetHarvestInterval(harvest_interval);
const stats::TagsType global_tags = {{stats::ResourceNameKey, "CPU"}};
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
ray::stats::Init(global_tags, 10054, exporter);
setenv("STREAMING_METRICS_MODE", "DEV", 1);
setenv("ENABLE_RAY_STATS", "ON", 1);
setenv("STREAMING_ENABLE_METRICS", "ON", 1);
perf_counter_.reset(new StreamingReporter());
const std::unordered_map<std::string, std::string> default_tags = {
{"app", "s_test"}, {"cluster", "kmon-dev"}};
metrics_conf_.SetMetricsGlobalTags(default_tags);
perf_counter_->Start(metrics_conf_);
}
void TearDown() {
opencensus::stats::DeltaProducer::Get()->Flush();
opencensus::stats::StatsExporterImpl::Get()->Export();
perf_counter_->Shutdown();
ray::stats::Shutdown();
}
void RegisterAndRun(UpdateFunc update_handler) {
auto stat_time_handler = [this](size_t thread_index, UpdateFunc update_handler) {
auto start = std::chrono::high_resolution_clock::now();
for (size_t loop_index = 0; loop_index < loop_update_times_; ++loop_index) {
update_handler(loop_index);
}
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::micro> elapsed = end - start;
std::string info = "Thread=" + std::to_string(thread_index) +
", times=" + std::to_string(loop_update_times_) +
", cost=" + std::to_string(elapsed.count()) + "us.";
std::cout << info << std::endl;
};
for (size_t thread_index = 0; thread_index < op_thread_count_; ++thread_index) {
thread_pool_.emplace_back(
std::bind(stat_time_handler, thread_index, update_handler));
}
for (auto &thread : thread_pool_) {
thread.join();
}
}
protected:
size_t op_thread_count_{4};
size_t loop_update_times_{10};
std::vector<std::thread> thread_pool_;
StreamingMetricsConfig metrics_conf_;
std::unique_ptr<StreamingReporter> perf_counter_;
};
TEST_F(StreamingReporterCounterTest, UpdateCounterWithOneKeyTest) {
RegisterAndRun([this](size_t loop_index) {
perf_counter_->UpdateCounter("domaina", "groupa", "a", loop_index);
});
}
TEST_F(StreamingReporterCounterTest, UpdateCounterTest) {
RegisterAndRun([this](size_t loop_index) {
auto loop_index_str = std::to_string(loop_index % 10);
perf_counter_->UpdateCounter("domaina" + loop_index_str, "groupa" + loop_index_str,
"a" + loop_index_str, loop_index);
});
}
TEST_F(StreamingReporterCounterTest, UpdateGaugeWithOneKeyTest) {
RegisterAndRun([this](size_t loop_index) {
std::unordered_map<std::string, std::string> tags;
tags["tag1"] = "tag1";
tags["tag2"] = std::to_string(loop_index);
perf_counter_->UpdateGauge("streaming.test.gauge", tags, loop_index);
});
}
TEST_F(StreamingReporterCounterTest, UpdateGaugeTest) {
RegisterAndRun([this](size_t loop_index) {
auto loop_index_str = std::to_string(loop_index % 10);
perf_counter_->UpdateGauge("domaina" + loop_index_str, "groupa" + loop_index_str,
"a" + loop_index_str, loop_index);
});
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}