From 46b941b70284259222d3db431df2f5c4a5aa06ec Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Wed, 8 Sep 2021 14:36:00 +0800 Subject: [PATCH] [Streaming] Support streaming metric reporter (#17981) * Streaming support metric reporter * fix lint * fix bazel format lint * fix lint * metric deps lint * lint * and comments for runtime reporter * unordered_map instead * comments * fix visibility flag * deps local .so target * make stats public visibility * stats lib in public * add antgroup team tag --- BUILD.bazel | 2 + src/ray/ray_exported_symbols.lds | 3 + src/ray/ray_version_script.lds | 3 + streaming/BUILD.bazel | 37 ++++++ streaming/src/config/streaming_config.h | 15 +++ streaming/src/data_writer.h | 14 ++ streaming/src/metrics/stats_reporter.cc | 117 +++++++++++++++++ streaming/src/metrics/stats_reporter.h | 88 +++++++++++++ .../src/metrics/streaming_perf_metric.cc | 97 ++++++++++++++ streaming/src/metrics/streaming_perf_metric.h | 97 ++++++++++++++ streaming/src/runtime_context.cc | 107 +++++++++++++++- streaming/src/runtime_context.h | 49 +++++++ streaming/src/test/streaming_perf_tests.cc | 120 ++++++++++++++++++ 13 files changed, 748 insertions(+), 1 deletion(-) create mode 100644 streaming/src/metrics/stats_reporter.cc create mode 100644 streaming/src/metrics/stats_reporter.h create mode 100644 streaming/src/metrics/streaming_perf_metric.cc create mode 100644 streaming/src/metrics/streaming_perf_metric.h create mode 100644 streaming/src/test/streaming_perf_tests.cc diff --git a/BUILD.bazel b/BUILD.bazel index d86247c91..412e4e4fe 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -588,6 +588,7 @@ cc_library( "src/ray/stats/tag_defs.h", ], strip_include_prefix = "src", + visibility = ["//visibility:public"], deps = [ ":ray_util", "@com_github_jupp0r_prometheus_cpp//pull", @@ -623,6 +624,7 @@ cc_library( ], }), strip_include_prefix = "src", + visibility = ["//visibility:public"], deps = [ ":reporter_rpc", ":stats_metric", diff --git a/src/ray/ray_exported_symbols.lds b/src/ray/ray_exported_symbols.lds index 1410390db..d7eee5264 100644 --- a/src/ray/ray_exported_symbols.lds +++ b/src/ray/ray_exported_symbols.lds @@ -23,6 +23,9 @@ *ray*ObjectID* *ray*ObjectReference* # Others +*ray*metrics* +*opencensus* +*ray*stats* *ray*CoreWorker* *PyInit* *init_raylet* diff --git a/src/ray/ray_version_script.lds b/src/ray/ray_version_script.lds index 6d27e40b1..7ebfa967b 100644 --- a/src/ray/ray_version_script.lds +++ b/src/ray/ray_version_script.lds @@ -25,6 +25,9 @@ VERSION_1.0 { *ray*ObjectID*; *ray*ObjectReference*; # Others + *ray*metrics*; + *opencensus*; + *ray*stats*; *ray*CoreWorker*; *PyInit*; *init_raylet*; diff --git a/streaming/BUILD.bazel b/streaming/BUILD.bazel index 29eaa1c76..dea81f199 100644 --- a/streaming/BUILD.bazel +++ b/streaming/BUILD.bazel @@ -60,6 +60,14 @@ cc_binary( deps = ["//:ray_common"], ) +cc_binary( + name = "stats_lib.so", + copts = COPTS, + linkshared = 1, + visibility = ["//visibility:public"], + deps = ["//:stats_lib"], +) + cc_binary( name = "core_worker_lib.so", copts = COPTS, @@ -93,6 +101,24 @@ cc_library( ], ) +cc_library( + name = "streaming_metrics", + srcs = glob([ + "src/metrics/*.cc", + ]), + hdrs = glob([ + "src/metrics/*.h", + ]), + copts = COPTS, + strip_include_prefix = "src", + visibility = ["//visibility:public"], + deps = [ + "stats_lib.so", + ":streaming_config", + ":streaming_util", + ], +) + cc_library( name = "streaming_config", srcs = glob([ @@ -235,6 +261,7 @@ cc_library( ":streaming_common", ":streaming_config", ":streaming_message", + ":streaming_metrics", ":streaming_queue", ":streaming_reliability", ":streaming_util", @@ -319,6 +346,16 @@ cc_test( deps = test_common_deps, ) +cc_test( + name = "streaming_perf_tests", + srcs = [ + "src/test/streaming_perf_tests.cc", + ], + copts = COPTS, + tags = ["team:ant-group"], + deps = test_common_deps, +) + cc_test( name = "event_service_tests", srcs = [ diff --git a/streaming/src/config/streaming_config.h b/streaming/src/config/streaming_config.h index 784f7b094..1485e8b8c 100644 --- a/streaming/src/config/streaming_config.h +++ b/streaming/src/config/streaming_config.h @@ -16,6 +16,19 @@ using StreamingRole = proto::NodeType; TYPE Get##NAME() const { return VALUE; } \ void Set##NAME(TYPE value) { VALUE = value; } +using TagsMap = std::unordered_map; +class StreamingMetricsConfig { + public: + DECL_GET_SET_PROPERTY(const std::string &, MetricsServiceName, metrics_service_name_); + DECL_GET_SET_PROPERTY(uint32_t, MetricsReportInterval, metrics_report_interval_); + DECL_GET_SET_PROPERTY(const TagsMap, MetricsGlobalTags, global_tags); + + private: + std::string metrics_service_name_ = "streaming"; + uint32_t metrics_report_interval_ = 10; + std::unordered_map global_tags; +}; + class StreamingConfig { public: static uint64_t TIME_WAIT_UINT; @@ -50,6 +63,7 @@ class StreamingConfig { ReliabilityLevel streaming_strategy_ = ReliabilityLevel::EXACTLY_ONCE; StreamingRole streaming_role = StreamingRole::TRANSFORM; + bool metrics_enable = true; public: void FromProto(const uint8_t *, uint32_t size); @@ -74,6 +88,7 @@ class StreamingConfig { event_driven_flow_control_interval_) DECL_GET_SET_PROPERTY(StreamingRole, StreamingRole, streaming_role) DECL_GET_SET_PROPERTY(ReliabilityLevel, ReliabilityLevel, streaming_strategy_) + DECL_GET_SET_PROPERTY(bool, MetricsEnable, metrics_enable) uint32_t GetRingBufferCapacity() const; /// Note(lingxuan.zlx), RingBufferCapacity's valid range is from 1 to diff --git a/streaming/src/data_writer.h b/streaming/src/data_writer.h index 07a7abb4d..2b1582d90 100644 --- a/streaming/src/data_writer.h +++ b/streaming/src/data_writer.h @@ -1,3 +1,17 @@ +// Copyright 2021 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #pragma once #include diff --git a/streaming/src/metrics/stats_reporter.cc b/streaming/src/metrics/stats_reporter.cc new file mode 100644 index 000000000..286b8a382 --- /dev/null +++ b/streaming/src/metrics/stats_reporter.cc @@ -0,0 +1,117 @@ +#include "metrics/stats_reporter.h" +#include "util/streaming_logging.h" + +namespace ray { +namespace streaming { + +std::shared_ptr StatsReporter::GetMetricByName( + const std::string &metric_name) { + std::unique_lock lock(metric_mutex_); + auto metric = metric_map_.find(metric_name); + if (metric != metric_map_.end()) { + return metric->second; + } + return nullptr; +} + +void StatsReporter::MetricRegister(const std::string &metric_name, + std::shared_ptr metric) { + std::unique_lock lock(metric_mutex_); + metric_map_[metric_name] = metric; +} + +void StatsReporter::UnregisterAllMetrics() { + std::unique_lock lock(metric_mutex_); + metric_map_.clear(); +} + +bool StatsReporter::Start(const StreamingMetricsConfig &conf) { + global_tags_ = conf.GetMetricsGlobalTags(); + service_name_ = conf.GetMetricsServiceName(); + STREAMING_LOG(INFO) << "Start stats reporter, service name " << service_name_ + << ", global tags size : " << global_tags_.size() + << ", stats disabled : " + << stats::StatsConfig::instance().IsStatsDisabled(); + for (auto &tag : global_tags_) { + global_tag_key_list_.push_back(stats::TagKeyType::Register(tag.first)); + } + return true; +} + +bool StatsReporter::Start(const std::string &json_string) { return true; } + +StatsReporter::~StatsReporter() { + STREAMING_LOG(WARNING) << "stats client shutdown"; + Shutdown(); +}; + +void StatsReporter::Shutdown() { UnregisterAllMetrics(); } + +void StatsReporter::UpdateCounter(const std::string &domain, + const std::string &group_name, + const std::string &short_name, double value) { + const std::string merged_metric_name = + METRIC_GROUP_JOIN(domain, group_name, short_name); +} + +void StatsReporter::UpdateCounter( + const std::string &metric_name, + const std::unordered_map &tags, double value) { + STREAMING_LOG(DEBUG) << "Report counter metric " << metric_name << " , value " << value; +} + +void StatsReporter::UpdateGauge(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value, + bool is_reset) { + const std::string merged_metric_name = + service_name_ + "." + METRIC_GROUP_JOIN(domain, group_name, short_name); + STREAMING_LOG(DEBUG) << "Report gauge metric " << merged_metric_name << " , value " + << value; + auto metric = GetMetricByName(merged_metric_name); + if (nullptr == metric) { + metric = std::shared_ptr( + new ray::stats::Gauge(merged_metric_name, "", "", global_tag_key_list_)); + MetricRegister(merged_metric_name, metric); + } + metric->Record(value, global_tags_); +} + +void StatsReporter::UpdateGauge(const std::string &metric_name, + const std::unordered_map &tags, + double value, bool is_reset) { + const std::string merged_metric_name = service_name_ + "." + metric_name; + STREAMING_LOG(DEBUG) << "Report gauge metric " << merged_metric_name << " , value " + << value; + // Get metric from registered map, create a new one item if no such metric can be found + // in map. + auto metric = GetMetricByName(metric_name); + if (nullptr == metric) { + // Register tag key for all tags. + std::vector tag_key_list(global_tag_key_list_.begin(), + global_tag_key_list_.end()); + for (auto &tag : tags) { + tag_key_list.push_back(stats::TagKeyType::Register(tag.first)); + } + metric = std::shared_ptr( + new ray::stats::Gauge(merged_metric_name, "", "", tag_key_list)); + MetricRegister(merged_metric_name, metric); + } + auto merged_tags = MergeGlobalTags(tags); + metric->Record(value, merged_tags); +} + +void StatsReporter::UpdateHistogram(const std::string &domain, + const std::string &group_name, + const std::string &short_name, double value, + double min_value, double max_value) {} + +void StatsReporter::UpdateHistogram( + const std::string &metric_name, + const std::unordered_map &tags, double value, + double min_value, double max_value) {} + +void StatsReporter::UpdateQPS(const std::string &metric_name, + const std::unordered_map &tags, + double value) {} +} // namespace streaming +} // namespace ray diff --git a/streaming/src/metrics/stats_reporter.h b/streaming/src/metrics/stats_reporter.h new file mode 100644 index 000000000..2577a1fbf --- /dev/null +++ b/streaming/src/metrics/stats_reporter.h @@ -0,0 +1,88 @@ +// Copyright 2021 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "ray/stats/metric.h" +#include "streaming_perf_metric.h" + +namespace ray { +namespace streaming { + +class StatsReporter : public StreamingReporterInterface { + public: + virtual ~StatsReporter(); + + bool Start(const StreamingMetricsConfig &conf) override; + + bool Start(const std::string &json_string); + + void Shutdown() override; + + void UpdateCounter(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value) override; + + void UpdateGauge(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value, + bool is_reset = true) override; + + void UpdateHistogram(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value, double min_value, + double max_value) override; + + void UpdateCounter(const std::string &metric_name, + const std::unordered_map &tags, + double value) override; + + void UpdateGauge(const std::string &metric_name, + const std::unordered_map &tags, double value, + bool is_rest = true) override; + + void UpdateHistogram(const std::string &metric_name, + const std::unordered_map &tags, + double value, double min_value, double max_value) override; + + void UpdateQPS(const std::string &metric_name, + const std::unordered_map &tags, + double value) override; + + protected: + std::shared_ptr GetMetricByName(const std::string &metric_name); + void MetricRegister(const std::string &metric_name, + std::shared_ptr metric); + void UnregisterAllMetrics(); + + private: + inline std::unordered_map MergeGlobalTags( + const std::unordered_map &tags) { + std::unordered_map merged_tags; + merged_tags.insert(global_tags_.begin(), global_tags_.end()); + for (auto &item : tags) { + merged_tags.emplace(item.first, item.second); + } + return merged_tags; + } + + private: + std::mutex metric_mutex_; + std::unordered_map> metric_map_; + std::unordered_map global_tags_; + std::vector global_tag_key_list_; + std::string service_name_; +}; + +} // namespace streaming +} // namespace ray diff --git a/streaming/src/metrics/streaming_perf_metric.cc b/streaming/src/metrics/streaming_perf_metric.cc new file mode 100644 index 000000000..c8a48191a --- /dev/null +++ b/streaming/src/metrics/streaming_perf_metric.cc @@ -0,0 +1,97 @@ +#include + +#include "metrics/stats_reporter.h" +#include "metrics/streaming_perf_metric.h" +#include "util/streaming_logging.h" + +namespace ray { +namespace streaming { + +bool StreamingReporter::Start(const StreamingMetricsConfig &conf) { + if (impl_) { + STREAMING_LOG(WARNING) << "Streaming perf is active"; + } else { + impl_.reset(new StatsReporter()); + return impl_->Start(conf); + } + return false; +} + +void StreamingReporter::Shutdown() { + if (impl_) { + impl_->Shutdown(); + impl_.reset(); + } else { + STREAMING_LOG(WARNING) << "No active perf instance will be shutdown"; + } +} +void StreamingReporter::UpdateCounter(const std::string &domain, + const std::string &group_name, + const std::string &short_name, double value) { + if (impl_) { + impl_->UpdateCounter(domain, group_name, short_name, value); + } else { + STREAMING_LOG(WARNING) << "No active perf instance"; + } +} + +void StreamingReporter::UpdateGauge(const std::string &domain, + const std::string &group_name, + const std::string &short_name, double value, + bool is_reset) { + if (impl_) { + impl_->UpdateGauge(domain, group_name, short_name, value, is_reset); + } else { + STREAMING_LOG(WARNING) << "No active perf instance"; + } +} + +void StreamingReporter::UpdateHistogram(const std::string &domain, + const std::string &group_name, + const std::string &short_name, double value, + double min_value, double max_value) { + if (impl_) { + impl_->UpdateHistogram(domain, group_name, short_name, value, min_value, max_value); + } else { + STREAMING_LOG(WARNING) << "No active perf instance"; + } +} +void StreamingReporter::UpdateQPS( + const std::string &metric_name, + const std::unordered_map &tags, double value) { + if (impl_) { + impl_->UpdateQPS(metric_name, tags, value); + } else { + STREAMING_LOG(WARNING) << "No active perf instance"; + } +} + +StreamingReporter::~StreamingReporter() { + if (impl_) { + STREAMING_LOG(INFO) << "Destory streamimg perf => " << impl_.get(); + Shutdown(); + } +} + +void StreamingReporter::UpdateCounter( + const std::string &metric_name, + const std::unordered_map &tags, double value) { + if (impl_) { + impl_->UpdateCounter(metric_name, tags, value); + } +} +void StreamingReporter::UpdateGauge( + const std::string &metric_name, + const std::unordered_map &tags, double value, + bool is_rest) { + if (impl_) { + impl_->UpdateGauge(metric_name, tags, value, is_rest); + } +} +void StreamingReporter::UpdateHistogram( + const std::string &metric_name, + const std::unordered_map &tags, double value, + double min_value, double max_value) {} + +} // namespace streaming +} // namespace ray diff --git a/streaming/src/metrics/streaming_perf_metric.h b/streaming/src/metrics/streaming_perf_metric.h new file mode 100644 index 000000000..dbb8c850a --- /dev/null +++ b/streaming/src/metrics/streaming_perf_metric.h @@ -0,0 +1,97 @@ +// Copyright 2021 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include "config/streaming_config.h" + +namespace ray { +namespace streaming { +#define METRIC_GROUP_JOIN(a, b, c) (a + "." + b + "." + c) + +class StreamingReporterInterface { + public: + virtual ~StreamingReporterInterface() = default; + virtual bool Start(const StreamingMetricsConfig &conf) = 0; + + virtual void Shutdown() = 0; + + virtual void UpdateCounter(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value) = 0; + + virtual void UpdateGauge(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value, + bool is_reset) = 0; + + virtual void UpdateHistogram(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value, + double min_value, double max_value) = 0; + + virtual void UpdateCounter(const std::string &metric_name, + const std::unordered_map &tags, + double value) = 0; + + virtual void UpdateGauge(const std::string &metric_name, + const std::unordered_map &tags, + double value, bool is_rest) = 0; + + virtual void UpdateHistogram(const std::string &metric_name, + const std::unordered_map &tags, + double value, double min_value, double max_value) = 0; + + virtual void UpdateQPS(const std::string &metric_name, + const std::unordered_map &tags, + double value) = 0; +}; + +/// Streaming perf is a reporter instance based multiple backend. +/// Other modules can report gauge/histogram/counter/qps measurement to meteric server +/// side. +class StreamingReporter : public StreamingReporterInterface { + public: + StreamingReporter(){}; + virtual ~StreamingReporter(); + bool Start(const StreamingMetricsConfig &conf) override; + void Shutdown() override; + void UpdateCounter(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value) override; + void UpdateGauge(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value, + bool is_reset = true) override; + + void UpdateHistogram(const std::string &domain, const std::string &group_name, + const std::string &short_name, double value, double min_value, + double max_value) override; + + void UpdateCounter(const std::string &metric_name, + const std::unordered_map &tags, + double value) override; + + void UpdateGauge(const std::string &metric_name, + const std::unordered_map &tags, double value, + bool is_rest = true) override; + + void UpdateHistogram(const std::string &metric_name, + const std::unordered_map &tags, + double value, double min_value, double max_value) override; + + void UpdateQPS(const std::string &metric_name, + const std::unordered_map &tags, + double value) override; + + private: + std::unique_ptr impl_; +}; +} // namespace streaming + +} // namespace ray diff --git a/streaming/src/runtime_context.cc b/streaming/src/runtime_context.cc index 52c0184d0..782d6f3d2 100644 --- a/streaming/src/runtime_context.cc +++ b/streaming/src/runtime_context.cc @@ -4,6 +4,7 @@ #include "ray/util/util.h" #include "src/ray/protobuf/common.pb.h" #include "util/streaming_logging.h" +#include "util/streaming_util.h" namespace ray { namespace streaming { @@ -26,7 +27,111 @@ void RuntimeContext::SetConfig(const uint8_t *data, uint32_t size) { RuntimeContext::~RuntimeContext() {} -RuntimeContext::RuntimeContext() : runtime_status_(RuntimeStatus::Init) {} +RuntimeContext::RuntimeContext() + : enable_timer_service_(false), runtime_status_(RuntimeStatus::Init) {} + +void RuntimeContext::InitMetricsReporter() { + STREAMING_LOG(INFO) << "init metrics"; + if (!config_.GetMetricsEnable()) { + STREAMING_LOG(WARNING) << "metrics is disable"; + return; + } + perf_metrics_reporter_.reset(new StreamingReporter()); + + std::unordered_map default_tag_map = { + {"role", NodeType_Name(config_.GetNodeType())}, + {"op_name", config_.GetOpName()}, + {"worker_name", config_.GetWorkerName()}}; + metrics_config_.SetMetricsGlobalTags(default_tag_map); + + perf_metrics_reporter_->Start(metrics_config_); +} + +void RuntimeContext::ReportMetrics( + const std::string &metric_name, double value, + const std::unordered_map &tags) { + if (config_.GetMetricsEnable()) { + perf_metrics_reporter_->UpdateGauge(metric_name, tags, value); + } +} + +void RuntimeContext::RunTimer() { + AutoSpinLock lock(report_flag_); + if (runtime_status_ != RuntimeStatus::Running) { + STREAMING_LOG(WARNING) << "Run timer failed in state " + << static_cast(runtime_status_); + return; + } + STREAMING_LOG(INFO) << "Streaming metric timer called, interval=" + << metrics_config_.GetMetricsReportInterval(); + if (async_io_.stopped()) { + STREAMING_LOG(INFO) << "Async io stopped, return from timer reporting."; + return; + } + this->report_timer_handler_(); + boost::posix_time::seconds interval(metrics_config_.GetMetricsReportInterval()); + metrics_timer_->expires_from_now(interval); + metrics_timer_->async_wait([this](const boost::system::error_code &e) { + if (boost::asio::error::operation_aborted == e) { + return; + } + this->RunTimer(); + }); +} + +void RuntimeContext::EnableTimer(std::function report_timer_handler) { + if (!config_.GetMetricsEnable()) { + STREAMING_LOG(WARNING) << "Streaming metrics disabled."; + return; + } + if (enable_timer_service_) { + STREAMING_LOG(INFO) << "Timer service already enabled"; + return; + } + this->report_timer_handler_ = report_timer_handler; + STREAMING_LOG(INFO) << "Streaming metric timer enabled"; + // We new a thread for timer if timer is not alive currently. + if (!timer_thread_) { + async_io_.reset(); + boost::posix_time::seconds interval(metrics_config_.GetMetricsReportInterval()); + metrics_timer_.reset(new boost::asio::deadline_timer(async_io_, interval)); + metrics_timer_->async_wait( + [this](const boost::system::error_code & /*e*/) { this->RunTimer(); }); + timer_thread_ = std::make_shared([this]() { + STREAMING_LOG(INFO) << "Async io running."; + async_io_.run(); + }); + STREAMING_LOG(INFO) << "New thread " << timer_thread_->get_id(); + } + enable_timer_service_ = true; +} + +void RuntimeContext::ShutdownTimer() { + { + AutoSpinLock lock(report_flag_); + if (!config_.GetMetricsEnable()) { + STREAMING_LOG(WARNING) << "Streaming metrics disabled"; + return; + } + if (!enable_timer_service_) { + STREAMING_LOG(INFO) << "Timer service already disabled"; + return; + } + STREAMING_LOG(INFO) << "Timer server shutdown"; + enable_timer_service_ = false; + STREAMING_LOG(INFO) << "Cancel metrics timer."; + metrics_timer_->cancel(); + } + STREAMING_LOG(INFO) << "Wake up all reporting conditions."; + if (timer_thread_) { + STREAMING_LOG(INFO) << "Join and reset timer thread."; + if (timer_thread_->joinable()) { + timer_thread_->join(); + } + timer_thread_.reset(); + metrics_timer_.reset(); + } +} } // namespace streaming } // namespace ray diff --git a/streaming/src/runtime_context.h b/streaming/src/runtime_context.h index a86ebbcd1..5a49d82b2 100644 --- a/streaming/src/runtime_context.h +++ b/streaming/src/runtime_context.h @@ -1,9 +1,26 @@ +// Copyright 2021 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #pragma once +#include +#include #include #include "common/status.h" #include "config/streaming_config.h" +#include "metrics/streaming_perf_metric.h" namespace ray { namespace streaming { @@ -30,9 +47,41 @@ class RuntimeContext { inline void MarkMockTest() { is_mock_test_ = true; } inline bool IsMockTest() { return is_mock_test_; } + void InitMetricsReporter(); + + /// It's periodic reporter entry for all runtime modules. + /// \param metric_name, metric name + /// \param value, metric value + /// \param tags, metric tag map + void ReportMetrics(const std::string &metric_name, double value, + const std::unordered_map &tags = {}); + + /// Enable and register a specific reporting timer for updating all of metrics. + /// \param reporter_timer_handler + void EnableTimer(std::function report_timer_handler); + + /// Halt the timer invoking from now on. + void ShutdownTimer(); + private: + void RunTimer(); + + protected: + std::unique_ptr perf_metrics_reporter_; + std::function report_timer_handler_; + + boost::asio::io_service async_io_; + + private: + bool enable_timer_service_; + + std::unique_ptr metrics_timer_; + std::shared_ptr timer_thread_; + std::atomic_flag report_flag_ = ATOMIC_FLAG_INIT; + StreamingConfig config_; RuntimeStatus runtime_status_; + StreamingMetricsConfig metrics_config_; bool is_mock_test_ = false; }; diff --git a/streaming/src/test/streaming_perf_tests.cc b/streaming/src/test/streaming_perf_tests.cc new file mode 100644 index 000000000..3c4752b0f --- /dev/null +++ b/streaming/src/test/streaming_perf_tests.cc @@ -0,0 +1,120 @@ +#include +#include +#include +#include + +#include "opencensus/stats/internal/delta_producer.h" +#include "opencensus/stats/internal/stats_exporter_impl.h" +#include "ray/stats/stats.h" + +#include "config/streaming_config.h" +#include "gtest/gtest.h" +#include "metrics/streaming_perf_metric.h" + +using namespace ray::streaming; +using namespace ray; + +class StreamingReporterCounterTest : public ::testing::Test { + public: + using UpdateFunc = std::function; + + void SetUp() { + uint32_t kReportFlushInterval = 100; + absl::Duration report_interval = absl::Milliseconds(kReportFlushInterval); + absl::Duration harvest_interval = absl::Milliseconds(kReportFlushInterval / 2); + ray::stats::StatsConfig::instance().SetReportInterval(report_interval); + ray::stats::StatsConfig::instance().SetHarvestInterval(harvest_interval); + const stats::TagsType global_tags = {{stats::ResourceNameKey, "CPU"}}; + std::shared_ptr exporter( + new stats::StdoutExporterClient()); + ray::stats::Init(global_tags, 10054, exporter); + + setenv("STREAMING_METRICS_MODE", "DEV", 1); + setenv("ENABLE_RAY_STATS", "ON", 1); + setenv("STREAMING_ENABLE_METRICS", "ON", 1); + perf_counter_.reset(new StreamingReporter()); + + const std::unordered_map default_tags = { + {"app", "s_test"}, {"cluster", "kmon-dev"}}; + metrics_conf_.SetMetricsGlobalTags(default_tags); + perf_counter_->Start(metrics_conf_); + } + + void TearDown() { + opencensus::stats::DeltaProducer::Get()->Flush(); + opencensus::stats::StatsExporterImpl::Get()->Export(); + perf_counter_->Shutdown(); + ray::stats::Shutdown(); + } + + void RegisterAndRun(UpdateFunc update_handler) { + auto stat_time_handler = [this](size_t thread_index, UpdateFunc update_handler) { + auto start = std::chrono::high_resolution_clock::now(); + + for (size_t loop_index = 0; loop_index < loop_update_times_; ++loop_index) { + update_handler(loop_index); + } + + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed = end - start; + + std::string info = "Thread=" + std::to_string(thread_index) + + ", times=" + std::to_string(loop_update_times_) + + ", cost=" + std::to_string(elapsed.count()) + "us."; + std::cout << info << std::endl; + }; + + for (size_t thread_index = 0; thread_index < op_thread_count_; ++thread_index) { + thread_pool_.emplace_back( + std::bind(stat_time_handler, thread_index, update_handler)); + } + + for (auto &thread : thread_pool_) { + thread.join(); + } + } + + protected: + size_t op_thread_count_{4}; + size_t loop_update_times_{10}; + std::vector thread_pool_; + + StreamingMetricsConfig metrics_conf_; + std::unique_ptr perf_counter_; +}; + +TEST_F(StreamingReporterCounterTest, UpdateCounterWithOneKeyTest) { + RegisterAndRun([this](size_t loop_index) { + perf_counter_->UpdateCounter("domaina", "groupa", "a", loop_index); + }); +} + +TEST_F(StreamingReporterCounterTest, UpdateCounterTest) { + RegisterAndRun([this](size_t loop_index) { + auto loop_index_str = std::to_string(loop_index % 10); + perf_counter_->UpdateCounter("domaina" + loop_index_str, "groupa" + loop_index_str, + "a" + loop_index_str, loop_index); + }); +} + +TEST_F(StreamingReporterCounterTest, UpdateGaugeWithOneKeyTest) { + RegisterAndRun([this](size_t loop_index) { + std::unordered_map tags; + tags["tag1"] = "tag1"; + tags["tag2"] = std::to_string(loop_index); + perf_counter_->UpdateGauge("streaming.test.gauge", tags, loop_index); + }); +} + +TEST_F(StreamingReporterCounterTest, UpdateGaugeTest) { + RegisterAndRun([this](size_t loop_index) { + auto loop_index_str = std::to_string(loop_index % 10); + perf_counter_->UpdateGauge("domaina" + loop_index_str, "groupa" + loop_index_str, + "a" + loop_index_str, loop_index); + }); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}