[Stats] Improve Stats::Init & Add it to GCS server (#9563)

This commit is contained in:
SangBin Cho 2020-07-25 10:42:08 -07:00 committed by GitHub
parent 28d5f9696d
commit d49b19c24c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 232 additions and 48 deletions

View file

@ -545,6 +545,7 @@ cc_binary(
visibility = ["//java:__subpackages__"],
deps = [
":gcs_server_lib",
":stats_lib",
"@com_github_gflags_gflags//:gflags",
],
)

View file

@ -204,6 +204,7 @@ def ray_deps_setup():
sha256 = "6592e07672e7f7980687f6c1abda81974d8d379e273fea3b54b6c4d855489b9d",
patches = [
"//thirdparty/patches:opencensus-cpp-harvest-interval.patch",
"//thirdparty/patches:opencensus-cpp-shutdown-api.patch",
]
)

View file

@ -635,7 +635,8 @@ class Node:
redis_password=self._ray_params.redis_password,
config=self._config,
fate_share=self.kernel_fate_share,
gcs_server_port=self._ray_params.gcs_server_port)
gcs_server_port=self._ray_params.gcs_server_port,
metrics_agent_port=self._ray_params.metrics_agent_port)
assert (
ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes)
self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [

View file

@ -1203,7 +1203,8 @@ def start_gcs_server(redis_address,
redis_password=None,
config=None,
fate_share=None,
gcs_server_port=None):
gcs_server_port=None,
metrics_agent_port=None):
"""Start a gcs server.
Args:
redis_address (str): The address that the Redis server is listening on.
@ -1215,6 +1216,7 @@ def start_gcs_server(redis_address,
config (dict|None): Optional configuration that will
override defaults in RayConfig.
gcs_server_port (int): Port number of the gcs server.
metrics_agent_port(int): The port where metrics agent is bound to.
Returns:
ProcessInfo for the process that was started.
"""
@ -1230,6 +1232,7 @@ def start_gcs_server(redis_address,
"--redis_port={}".format(gcs_port),
"--config_list={}".format(config_str),
"--gcs_server_port={}".format(gcs_server_port),
"--metrics-agent-port={}".format(metrics_agent_port),
]
if redis_password:
command += ["--redis_password={}".format(redis_password)]

View file

@ -220,7 +220,7 @@ class CoreWorkerDirectActorTaskSubmitter
/// Factory for producing new core worker clients.
rpc::ClientFactoryFn client_factory_;
/// Mutex to proect the various maps below.
/// Mutex to protect the various maps below.
mutable absl::Mutex mu_;
absl::flat_hash_map<ActorID, ClientQueue> client_queues_ GUARDED_BY(mu_);

View file

@ -17,11 +17,13 @@
#include "gflags/gflags.h"
#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_server/gcs_server.h"
#include "ray/stats/stats.h"
#include "ray/util/util.h"
DEFINE_string(redis_address, "", "The ip address of redis.");
DEFINE_int32(redis_port, -1, "The port of redis.");
DEFINE_int32(gcs_server_port, -1, "The port of gcs server.");
DEFINE_int32(metrics_agent_port, -1, "The port of metrics agent.");
DEFINE_string(config_list, "", "The config list of raylet.");
DEFINE_string(redis_password, "", "The password of redis.");
DEFINE_bool(retry_redis, false, "Whether we retry to connect to the redis.");
@ -36,6 +38,7 @@ int main(int argc, char *argv[]) {
const std::string redis_address = FLAGS_redis_address;
const int redis_port = static_cast<int>(FLAGS_redis_port);
const int gcs_server_port = static_cast<int>(FLAGS_gcs_server_port);
const int metrics_agent_port = static_cast<int>(FLAGS_metrics_agent_port);
const std::string config_list = FLAGS_config_list;
const std::string redis_password = FLAGS_redis_password;
const bool retry_redis = FLAGS_retry_redis;
@ -54,6 +57,9 @@ int main(int argc, char *argv[]) {
}
RayConfig::instance().initialize(config_map);
const ray::stats::TagsType global_tags = {{ray::stats::JobNameKey, "gcs_server"},
{ray::stats::VersionKey, "0.9.0.dev0"}};
ray::stats::Init(global_tags, metrics_agent_port);
boost::asio::io_service main_service;
@ -74,6 +80,7 @@ int main(int argc, char *argv[]) {
int signal_number) {
RAY_LOG(INFO) << "GCS server received SIGTERM, shutting down...";
gcs_server.Stop();
ray::stats::Shutdown();
main_service.stop();
};
boost::asio::signal_set signals(main_service);

View file

@ -229,7 +229,7 @@ int main(int argc, char *argv[]) {
{ray::stats::JobNameKey, "raylet"},
{ray::stats::VersionKey, "0.9.0.dev0"},
{ray::stats::NodeAddressKey, node_ip_address}};
ray::stats::Init(global_tags, metrics_agent_port, main_service);
ray::stats::Init(global_tags, metrics_agent_port);
// Destroy the Raylet on a SIGTERM. The pointer to main_service is
// guaranteed to be valid since this function will run the event loop
@ -240,6 +240,7 @@ int main(int argc, char *argv[]) {
RAY_LOG(INFO) << "Raylet received SIGTERM, shutting down...";
server->Stop();
gcs_client->Disconnect();
ray::stats::Shutdown();
main_service.stop();
remove(raylet_socket_name.c_str());
};

View file

@ -3583,9 +3583,6 @@ void NodeManager::FlushObjectsToFree() {
void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request,
rpc::GetNodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
// NOTE(sang): Currently reporting only infeasible/ready ActorCreationTask
// because Ray dashboard only renders actorCreationTask as of Feb 3 2020.
// TODO(sang): Support dashboard for non-ActorCreationTask.
for (const auto task : local_queues_.GetTasks(TaskState::INFEASIBLE)) {
if (task.GetTaskSpecification().IsActorCreationTask()) {
auto infeasible_task = reply->add_infeasible_tasks();
@ -3594,8 +3591,6 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_
}
// Report tasks that are not scheduled because
// resources are occupied by other actors/tasks.
// NOTE(sang): This solution is a workaround. It can be replaced by creating a new state
// like PENDING_UNTIL_RESOURCE_AVAILABLE.
for (const auto task : local_queues_.GetTasks(TaskState::READY)) {
if (task.GetTaskSpecification().IsActorCreationTask()) {
auto ready_task = reply->add_ready_tasks();

View file

@ -67,6 +67,10 @@ const absl::Duration &StatsConfig::GetHarvestInterval() const {
return harvest_interval_;
}
void StatsConfig::SetIsInitialized(bool initialized) { is_initialized_ = initialized; }
bool StatsConfig::IsInitialized() const { return is_initialized_; }
void Metric::Record(double value, const TagsType &tags) {
if (StatsConfig::instance().IsStatsDisabled()) {
return;

View file

@ -29,30 +29,42 @@ namespace stats {
/// Include tag_defs.h to define tag items
#include "ray/stats/tag_defs.h"
/// StatsConfig per process.
/// Note that this is not thread-safe. Don't modify its internal values
/// outside stats::Init() or stats::Shutdown() method.
class StatsConfig final {
public:
static StatsConfig &instance();
/// Set the global tags that will be appended to all metrics in this process.
void SetGlobalTags(const TagsType &global_tags);
/// Get the current global tags.
const TagsType &GetGlobalTags() const;
/// Set if the stats are enabled in this process.
void SetIsDisableStats(bool disable_stats);
/// Get whether or not stats are enabled.
bool IsStatsDisabled() const;
void SetReportInterval(const absl::Duration interval);
const absl::Duration &GetReportInterval() const;
void SetHarvestInterval(const absl::Duration interval);
const absl::Duration &GetHarvestInterval() const;
bool IsInitialized() const;
///
/// Functions that should be used only inside stats::Init()
/// NOTE: StatsConfig is not thread-safe. If you use these functions
/// in multi threaded environment, it can cause problems.
///
/// Set the stats have been initialized.
void SetIsInitialized(bool initialized);
/// Set the interval where metrics are harvetsed.
void SetHarvestInterval(const absl::Duration interval);
/// Set the interval where metrics are reported to data sinks.
void SetReportInterval(const absl::Duration interval);
/// Set if the stats are enabled in this process.
void SetIsDisableStats(bool disable_stats);
/// Set the global tags that will be appended to all metrics in this process.
void SetGlobalTags(const TagsType &global_tags);
private:
StatsConfig() = default;
~StatsConfig() = default;
@ -70,6 +82,8 @@ class StatsConfig final {
// report interval. So harvest interval is suggusted to be half of report
// interval.
absl::Duration harvest_interval_ = absl::Seconds(5);
// Whether or not if the stats has been initialized.
bool is_initialized_ = false;
};
/// A thin wrapper that wraps the `opencensus::tag::measure` for using it simply.

View file

@ -24,6 +24,16 @@
///
/// You can follow these examples to define your metrics.
///
/// Common
///
static Histogram RedisLatency("redis_latency", "The latency of a Redis operation.", "us",
{100, 200, 300, 400, 500, 600, 700, 800, 900, 1000},
{CustomKey});
///
/// Raylet Metrics
///
static Gauge CurrentWorker("current_worker",
"This metric is used for reporting states of workers."
"Through this, we can see the worker's state on dashboard.",
@ -36,10 +46,6 @@ static Gauge CurrentDriver("current_driver",
static Count TaskCountReceived("task_count_received",
"Number of tasks received by raylet.", "pcs", {});
static Histogram RedisLatency("redis_latency", "The latency of a Redis operation.", "us",
{100, 200, 300, 400, 500, 600, 700, 800, 900, 1000},
{CustomKey});
static Gauge LocalAvailableResource("local_available_resource",
"The available resources on this node.", "pcs",
{ResourceNameKey});

View file

@ -29,6 +29,7 @@ void MetricExporter::ExportToPoints(
if (view_data.size() == 0) {
return;
}
// NOTE(lingxuan.zlx): No sampling in histogram data, so all points all be filled in.
std::unordered_map<std::string, std::string> tags;
for (size_t i = 0; i < view_data.begin()->first.size(); ++i) {
@ -104,5 +105,6 @@ void MetricExporter::ExportViewData(
RAY_LOG(DEBUG) << "Point size : " << points.size();
metric_exporter_client_->ReportMetrics(points);
}
} // namespace stats
} // namespace ray

View file

@ -31,6 +31,7 @@ namespace ray {
using namespace stats;
const size_t kMockReportBatchSize = 10;
const int MetricsAgentPort = 10054;
class MockExporterClient1 : public MetricExporterDecorator {
public:
@ -123,17 +124,14 @@ class MetricExporterClientTest : public ::testing::Test {
exporter.reset(new stats::StdoutExporterClient());
mock1.reset(new MockExporterClient1(exporter));
mock2.reset(new MockExporterClient2(mock1));
ray::stats::Init(global_tags, 10054, io_service_, mock2, kMockReportBatchSize);
ray::stats::Init(global_tags, MetricsAgentPort, mock2, kMockReportBatchSize);
}
virtual void TearDown() override { Shutdown(); }
void Shutdown() {
opencensus::stats::StatsExporterImpl::Get()->ClearHandlersForTesting();
}
void Shutdown() { ray::stats::Shutdown(); }
protected:
boost::asio::io_service io_service_;
std::shared_ptr<MetricExporterClient> exporter;
std::shared_ptr<MockExporterClient1> mock1;
std::shared_ptr<MockExporterClient2> mock2;

View file

@ -18,8 +18,8 @@
#include <string>
#include <unordered_map>
#include "opencensus/exporters/stats/prometheus/prometheus_exporter.h"
#include "opencensus/exporters/stats/stdout/stdout_exporter.h"
#include "absl/synchronization/mutex.h"
#include "opencensus/stats/internal/delta_producer.h"
#include "opencensus/stats/stats.h"
#include "opencensus/tags/tag_key.h"
@ -27,6 +27,7 @@
#include "ray/stats/metric.h"
#include "ray/stats/metric_exporter.h"
#include "ray/stats/metric_exporter_client.h"
#include "ray/util/io_service_pool.h"
#include "ray/util/logging.h"
namespace ray {
@ -38,38 +39,75 @@ namespace stats {
/// Include metric_defs.h to define measure items.
#include "ray/stats/metric_defs.h"
/// Initialize stats.
static inline void Init(
const TagsType &global_tags, const int metrics_agent_port,
boost::asio::io_service &io_service,
std::shared_ptr<MetricExporterClient> exporter_to_use = nullptr,
int64_t metrics_report_batch_size = RayConfig::instance().metrics_report_batch_size(),
bool disable_stats = !RayConfig::instance().enable_metrics_collection()) {
// TODO(sang) Put all states and logic into a singleton class Stats.
static std::shared_ptr<IOServicePool> metrics_io_service_pool;
static std::shared_ptr<MetricExporterClient> exporter;
static absl::Mutex stats_mutex;
/// Initialize stats for a process.
/// NOTE:
/// - stats::Init should be called only once per PROCESS. Redundant calls will be just
/// ignored.
/// - If you want to reinitialize, you should call stats::Shutdown().
/// - It is thread-safe.
/// We recommend you to use this only once inside a main script and add Shutdown() method
/// to any signal handler.
/// \param global_tags[in] Tags that will be appended to all metrics in this process.
/// \param metrics_agent_port[in] The port to export metrics at each node.
/// \param exporter_to_use[in] The exporter client you will use for this process' metrics.
static inline void Init(const TagsType &global_tags, const int metrics_agent_port,
std::shared_ptr<MetricExporterClient> exporter_to_use = nullptr,
int64_t metrics_report_batch_size =
RayConfig::instance().metrics_report_batch_size()) {
absl::MutexLock lock(&stats_mutex);
if (StatsConfig::instance().IsInitialized()) {
RAY_CHECK(metrics_io_service_pool != nullptr);
RAY_CHECK(exporter != nullptr);
return;
}
RAY_CHECK(metrics_io_service_pool == nullptr);
RAY_CHECK(exporter == nullptr);
bool disable_stats = !RayConfig::instance().enable_metrics_collection();
StatsConfig::instance().SetIsDisableStats(disable_stats);
if (disable_stats) {
RAY_LOG(INFO) << "Disabled stats.";
return;
}
// Force to have a singleton exporter.
static std::shared_ptr<MetricExporterClient> exporter;
// Default exporter is metrics agent exporter.
metrics_io_service_pool = std::make_shared<IOServicePool>(1);
metrics_io_service_pool->Run();
boost::asio::io_service *metrics_io_service = metrics_io_service_pool->Get();
RAY_CHECK(metrics_io_service != nullptr);
// Default exporter is a metrics agent exporter.
if (exporter_to_use == nullptr) {
std::shared_ptr<MetricExporterClient> stdout_exporter(new StdoutExporterClient());
exporter.reset(new MetricsAgentExporter(stdout_exporter, metrics_agent_port,
io_service, "127.0.0.1"));
(*metrics_io_service), "127.0.0.1"));
} else {
exporter = exporter_to_use;
}
// TODO(sang): Currently, we don't do any cleanup. This can lead us to lose last 10
// seconds data before we exit the main script.
MetricExporter::Register(exporter, metrics_report_batch_size);
opencensus::stats::StatsExporter::SetInterval(
StatsConfig::instance().GetReportInterval());
opencensus::stats::DeltaProducer::Get()->SetHarvestInterval(
StatsConfig::instance().GetHarvestInterval());
StatsConfig::instance().SetGlobalTags(global_tags);
StatsConfig::instance().SetIsInitialized(true);
}
/// Shutdown the initialized stats library.
/// This cleans up various threads and metadata for stats library.
static inline void Shutdown() {
// TODO(sang): Harvest thread is not currently cleaned up.
absl::MutexLock lock(&stats_mutex);
metrics_io_service_pool->Stop();
opencensus::stats::StatsExporter::Shutdown();
metrics_io_service_pool = nullptr;
exporter = nullptr;
StatsConfig::instance().SetIsInitialized(false);
}
} // namespace stats

View file

@ -25,6 +25,8 @@
namespace ray {
const int MetricsAgentPort = 10054;
class MockExporter : public opencensus::stats::StatsExporter::Handler {
public:
static void Register() {
@ -62,7 +64,7 @@ uint32_t kReportFlushInterval = 500;
class StatsTest : public ::testing::Test {
public:
void SetUp() {
void SetUp() override {
absl::Duration report_interval = absl::Milliseconds(kReportFlushInterval);
absl::Duration harvest_interval = absl::Milliseconds(kReportFlushInterval / 2);
ray::stats::StatsConfig::instance().SetReportInterval(report_interval);
@ -71,14 +73,13 @@ class StatsTest : public ::testing::Test {
{stats::WorkerPidKey, "1000"}};
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
ray::stats::Init(global_tags, 10054, io_service_, exporter);
ray::stats::Init(global_tags, MetricsAgentPort, exporter);
MockExporter::Register();
}
void Shutdown() {}
virtual void TearDown() override { Shutdown(); }
private:
boost::asio::io_service io_service_;
void Shutdown() { ray::stats::Shutdown(); }
};
TEST_F(StatsTest, F) {
@ -88,6 +89,36 @@ TEST_F(StatsTest, F) {
}
}
TEST_F(StatsTest, InitializationTest) {
// Do initialization multiple times and make sure only the first initialization
// was applied.
ASSERT_TRUE(ray::stats::StatsConfig::instance().IsInitialized());
auto test_tag_value_that_shouldnt_be_applied = "TEST";
for (size_t i = 0; i < 20; ++i) {
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
ray::stats::Init({{stats::LanguageKey, test_tag_value_that_shouldnt_be_applied}},
MetricsAgentPort, exporter);
}
auto &first_tag = ray::stats::StatsConfig::instance().GetGlobalTags()[0];
ASSERT_TRUE(first_tag.second != test_tag_value_that_shouldnt_be_applied);
ray::stats::Shutdown();
ASSERT_FALSE(ray::stats::StatsConfig::instance().IsInitialized());
// Reinitialize. It should be initialized now.
const stats::TagsType global_tags = {
{stats::LanguageKey, test_tag_value_that_shouldnt_be_applied}};
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
ray::stats::Init(global_tags, MetricsAgentPort, exporter);
ASSERT_TRUE(ray::stats::StatsConfig::instance().IsInitialized());
auto &new_first_tag = ray::stats::StatsConfig::instance().GetGlobalTags()[0];
ASSERT_TRUE(new_first_tag.second == test_tag_value_that_shouldnt_be_applied);
}
} // namespace ray
int main(int argc, char **argv) {

View file

@ -37,3 +37,5 @@ static const TagKeyType DriverPidKey = TagKeyType::Register("DriverPid");
static const TagKeyType ResourceNameKey = TagKeyType::Register("ResourceName");
static const TagKeyType ValueTypeKey = TagKeyType::Register("ValueType");
static const TagKeyType ActorIdKey = TagKeyType::Register("ActorId");

View file

@ -0,0 +1,80 @@
diff --git opencensus/stats/internal/stats_exporter.cc b/opencensus/stats/internal/stats_exporter.cc
--- opencensus/stats/internal/stats_exporter.cc
+++ opencensus/stats/internal/stats_exporter.cc
@@ -95,25 +95,51 @@ void StatsExporterImpl::ClearHandlersForTesting() {
}
void StatsExporterImpl::StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
- t_ = std::thread(&StatsExporterImpl::RunWorkerLoop, this);
thread_started_ = true;
+ t_ = std::thread(&StatsExporterImpl::RunWorkerLoop, this);
+}
+
+void StatsExporterImpl::Shutdown() {
+ absl::MutexLock l(&mu_);
+ if (!thread_started_) {
+ return;
+ }
+ thread_started_ = false;
+ // Join loop thread when shutdown.
+ if (t_.joinable()) {
+ t_.join();
+ }
}
void StatsExporterImpl::RunWorkerLoop() {
absl::Time next_export_time = GetNextExportTime();
- while (true) {
+ bool thread_started = false;
+ {
+ absl::MutexLock l(&mu_);
+ bool thread_started = thread_started_;
+ }
+ while (thread_started) {
// SleepFor() returns immediately when given a negative duration.
absl::SleepFor(next_export_time - absl::Now());
// In case the last export took longer than the export interval, we
// calculate the next time from now.
next_export_time = GetNextExportTime();
Export();
+ {
+ absl::MutexLock l(&mu_);
+ thread_started = thread_started_;
+ }
}
}
// StatsExporter
// -------------
+void StatsExporter::Shutdown() {
+ StatsExporterImpl::Get()->Shutdown();
+ StatsExporterImpl::Get()->ClearHandlersForTesting();
+}
+
// static
void StatsExporter::SetInterval(absl::Duration interval) {
StatsExporterImpl::Get()->SetInterval(interval);
diff --git opencensus/stats/internal/stats_exporter_impl.h b/opencensus/stats/internal/stats_exporter_impl.h
--- opencensus/stats/internal/stats_exporter_impl.h
+++ opencensus/stats/internal/stats_exporter_impl.h
@@ -35,6 +35,7 @@ class StatsExporterImpl {
static StatsExporterImpl* Get();
void SetInterval(absl::Duration interval);
absl::Time GetNextExportTime() const;
+ void Shutdown();
void AddView(const ViewDescriptor& view);
void RemoveView(absl::string_view name);
diff --git opencensus/stats/stats_exporter.h b/opencensus/stats/stats_exporter.h
--- opencensus/stats/stats_exporter.h
+++ opencensus/stats/stats_exporter.h
@@ -45,6 +45,8 @@ class StatsExporter final {
// Removes the view with 'name' from the registry, if one is registered.
static void RemoveView(absl::string_view name);
+ static void Shutdown();
+
// StatsExporter::Handler is the interface for push exporters that export
// recorded data for registered views. The exporter should provide a static
// Register() method that takes any arguments needed by the exporter (e.g. a