[EVENT] add log reporter (#10419)

This commit is contained in:
Basasuya 2020-09-16 11:54:05 +08:00 committed by GitHub
parent da1d171c39
commit 5e030db8a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 377 additions and 3 deletions

View file

@ -861,6 +861,7 @@ cc_test(
copts = COPTS,
deps = [
":ray_util",
"@boost//:filesystem",
"@com_google_googletest//:gtest_main",
],
)
@ -1344,7 +1345,9 @@ cc_library(
":sha256",
"//src/ray/protobuf:event_cc_proto",
"@boost//:asio",
"@boost//:property_tree",
"@com_github_google_glog//:glog",
"@com_github_spdlog//:spdlog",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
"@com_google_googletest//:gtest_main",

24
bazel/BUILD.spdlog Normal file
View file

@ -0,0 +1,24 @@
COPTS = ["-DSPDLOG_COMPILED_LIB"]
cc_library(
name = "spdlog",
srcs = glob([
"src/*.cpp",
]),
hdrs = glob([
"include/spdlog/*.h",
"include/spdlog/cfg/*.h",
"include/spdlog/details/*.h",
"include/spdlog/fmt/*.h",
"include/spdlog/fmt/bundled/*.h",
"include/spdlog/sinks/*.h",
]),
includes = [
".",
"include/",
],
strip_include_prefix = 'include',
copts = COPTS,
deps = [
],
visibility = ["//visibility:public"],
)

View file

@ -96,6 +96,13 @@ def ray_deps_setup():
],
)
auto_http_archive(
name = "com_github_spdlog",
build_file = "//bazel:BUILD.spdlog",
urls = ["https://github.com/gabime/spdlog/archive/v1.7.0.zip"],
sha256 = "c8f1e1103e0b148eb8832275d8e68036f2fdd3975a1199af0e844908c56f6ea5",
)
auto_http_archive(
name = "com_github_tporadowski_redis_bin",
build_file = "//bazel:BUILD.redis",

View file

@ -15,6 +15,104 @@
#include "ray/util/event.h"
namespace ray {
///
/// LogEventReporter
///
LogEventReporter::LogEventReporter(rpc::Event_SourceType source_type,
std::string &log_dir, bool force_flush,
int rotate_max_file_size, int rotate_max_file_num)
: log_dir_(log_dir),
force_flush_(force_flush),
rotate_max_file_size_(rotate_max_file_size),
rotate_max_file_num_(rotate_max_file_num) {
RAY_CHECK(log_dir_ != "");
if (log_dir_.back() != '/') {
log_dir_ += '/';
}
// generate file name, if the soucrce type is RAYLET or GCS, the file name would like
// event_GCS.log, event_RAYLET.log other condition would like
// event_CORE_WOREKER_{pid}.log
file_name_ = "event_" + Event_SourceType_Name(source_type);
if (source_type == rpc::Event_SourceType::Event_SourceType_CORE_WORKER ||
source_type == rpc::Event_SourceType::Event_SourceType_COMMON) {
file_name_ += "_" + std::to_string(getpid());
}
file_name_ += ".log";
std::string log_sink_key = GetReporterKey() + log_dir_ + file_name_;
log_sink_ = spdlog::get(log_sink_key);
// If the file size is over {rotate_max_file_size_} MB, this file would be renamed
// for example event_GCS.0.log, event_GCS.1.log, event_GCS.2.log ...
// We alow to rotate for {rotate_max_file_num_} times.
if (log_sink_ == nullptr) {
log_sink_ =
spdlog::rotating_logger_mt(log_sink_key, log_dir_ + file_name_,
1048576 * rotate_max_file_size_, rotate_max_file_num_);
}
log_sink_->set_pattern("%v");
}
LogEventReporter::~LogEventReporter() { Flush(); }
void LogEventReporter::Flush() { log_sink_->flush(); }
std::string LogEventReporter::EventToString(const rpc::Event &event) {
std::stringstream result;
boost::property_tree::ptree pt;
auto time_stamp = event.timestamp();
std::stringstream time_stamp_buffer;
char time_buffer[30];
time_t epoch_time_as_time_t = time_stamp / 1000000;
struct tm *dt = localtime(&epoch_time_as_time_t);
strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%d %H:%M:%S.", dt);
time_stamp_buffer << std::string(time_buffer) << std::setw(6) << std::setfill('0')
<< time_stamp % 1000000;
pt.put("time_stamp", time_stamp_buffer.str());
pt.put("severity", Event_Severity_Name(event.severity()));
pt.put("label", event.label());
pt.put("event_id", event.event_id());
pt.put("source_type", Event_SourceType_Name(event.source_type()));
pt.put("host_name", event.source_hostname());
pt.put("pid", std::to_string(event.source_pid()));
pt.put("message", event.message());
boost::property_tree::ptree pt_child;
for (auto &ele : event.custom_fields()) {
pt_child.put(ele.first, ele.second);
}
pt.add_child("custom_fields", pt_child);
std::stringstream ss;
boost::property_tree::json_parser::write_json(ss, pt, false);
// the final string is like:
// {"time_stamp":"2020-08-29 14:18:15.998084","severity":"INFO","label":"label
// 1","event_id":"de150792ceb151c815d359d4b675fcc6266a","source_type":"CORE_WORKER","host_name":"Macbool.local","pid":"20830","message":"send
// message 1","custom_fields":{"task_id":"task 1","job_id":"job 1","node_id":"node 1"}}
return ss.str();
}
void LogEventReporter::Report(const rpc::Event &event) {
RAY_CHECK(Event_SourceType_IsValid(event.source_type()));
RAY_CHECK(Event_Severity_IsValid(event.severity()));
std::string result = EventToString(event);
// Pop the last character from the result string because it is breakline '\n'.
result.pop_back();
log_sink_->info(result);
if (force_flush_) {
Flush();
}
}
///
/// EventManager
///

View file

@ -13,7 +13,10 @@
// limitations under the License.
#pragma once
#include <boost/asio.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cmath>
#include <cstring>
#include <iomanip>
@ -23,6 +26,9 @@
#include <vector>
#include "ray/util/logging.h"
#include "ray/util/util.h"
#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/sinks/rotating_file_sink.h"
#include "spdlog/spdlog.h"
#include "src/ray/protobuf/event.pb.h"
namespace ray {
@ -41,6 +47,38 @@ class BaseEventReporter {
virtual std::string GetReporterKey() = 0;
};
// responsible for writing event to specific file
class LogEventReporter : public BaseEventReporter {
public:
LogEventReporter(rpc::Event_SourceType source_type, std::string &log_dir,
bool force_flush = true, int rotate_max_file_size = 100,
int rotate_max_file_num = 20);
virtual ~LogEventReporter();
private:
virtual std::string EventToString(const rpc::Event &event);
virtual void Init() override {}
virtual void Report(const rpc::Event &event) override;
virtual void Close() override {}
virtual void Flush();
virtual std::string GetReporterKey() override { return "log.event.reporter"; }
protected:
std::string log_dir_;
bool force_flush_;
int rotate_max_file_size_; // MB
int rotate_max_file_num_;
std::string file_name_;
std::shared_ptr<spdlog::logger> log_sink_;
};
// store the reporters, add reporters and clean reporters
class EventManager final {
@ -66,7 +104,7 @@ class EventManager final {
const EventManager &operator=(const EventManager &manager) = delete;
protected:
private:
std::unordered_map<std::string, std::shared_ptr<BaseEventReporter>> reporter_map_;
};

View file

@ -13,6 +13,7 @@
// limitations under the License.
#include "ray/util/event.h"
#include <boost/filesystem.hpp>
#include <fstream>
#include <set>
#include <thread>
@ -55,11 +56,108 @@ void CheckEventDetail(rpc::Event &event, std::string job_id, std::string node_id
EXPECT_EQ(mp.size(), custom_key_num);
EXPECT_EQ(rpc::Event_SourceType_Name(event.source_type()), source_type);
EXPECT_EQ(rpc::Event_Severity_Name(event.severity()), severity);
EXPECT_EQ(event.label(), label);
EXPECT_EQ(event.message(), message);
if (label != "NULL") {
EXPECT_EQ(event.label(), label);
}
if (message != "NULL") {
EXPECT_EQ(event.message(), message);
}
EXPECT_EQ(event.source_pid(), getpid());
}
rpc::Event GetEventFromString(std::string seq) {
std::stringstream ss;
ss << seq << '\n';
rpc::Event event;
boost::property_tree::ptree pt;
boost::property_tree::read_json(ss, pt);
std::vector<std::string> splitArray;
for (auto it = pt.begin(); it != pt.end(); ++it) {
splitArray.push_back(it->first);
splitArray.push_back(it->second.get_value<std::string>());
}
auto pt_custom_fields = pt.get_child_optional("custom_fields");
for (auto it = pt_custom_fields->begin(); it != pt_custom_fields->end(); ++it) {
splitArray.push_back(it->first);
splitArray.push_back(it->second.get_value<std::string>());
}
EXPECT_EQ(splitArray[2], "severity");
rpc::Event_Severity severity_ele =
rpc::Event_Severity::Event_Severity_Event_Severity_INT_MIN_SENTINEL_DO_NOT_USE_;
RAY_CHECK(rpc::Event_Severity_Parse(splitArray[3], &severity_ele));
event.set_severity(severity_ele);
EXPECT_EQ(splitArray[4], "label");
event.set_label(splitArray[5]);
EXPECT_EQ(splitArray[6], "event_id");
event.set_event_id(splitArray[7]);
EXPECT_EQ(splitArray[8], "source_type");
rpc::Event_SourceType source_type_ele = rpc::Event_SourceType::
Event_SourceType_Event_SourceType_INT_MIN_SENTINEL_DO_NOT_USE_;
RAY_CHECK(rpc::Event_SourceType_Parse(splitArray[9], &source_type_ele));
event.set_source_type(source_type_ele);
EXPECT_EQ(splitArray[10], "host_name");
event.set_source_hostname(splitArray[11]);
EXPECT_EQ(splitArray[12], "pid");
event.set_source_pid(std::stoi(splitArray[13].c_str()));
EXPECT_EQ(splitArray[14], "message");
event.set_message(splitArray[15]);
EXPECT_EQ(splitArray[16], "custom_fields");
std::unordered_map<std::string, std::string> custom_fields;
for (int i = 18, len = splitArray.size(); i < len; i += 2) {
custom_fields[splitArray[i]] = splitArray[i + 1];
}
event.mutable_custom_fields()->insert(custom_fields.begin(), custom_fields.end());
return event;
}
void ParallelRunning(int nthreads, int loop_times,
std::function<void()> event_context_init,
std::function<void(int)> loop_function) {
if (nthreads > 1) {
std::vector<std::thread> threads(nthreads);
for (int t = 0; t < nthreads; t++) {
threads[t] = std::thread(std::bind(
[&](const int bi, const int ei, const int t) {
event_context_init();
for (int loop_i = bi; loop_i < ei; loop_i++) {
loop_function(loop_i);
}
},
t * loop_times / nthreads,
(t + 1) == nthreads ? loop_times : (t + 1) * loop_times / nthreads, t));
}
std::for_each(threads.begin(), threads.end(), [](std::thread &x) { x.join(); });
} else {
event_context_init();
for (int loop_i = 0; loop_i < loop_times; loop_i++) {
loop_function(loop_i);
}
}
}
void ReadEventFromFile(std::vector<std::string> &vc, std::string log_file) {
std::string line;
std::ifstream read_file;
read_file.open(log_file, std::ios::binary);
while (std::getline(read_file, line)) {
vc.push_back(line);
}
read_file.close();
}
std::string GenerateLogDir() {
std::string log_dir_generate = std::string(5, ' ');
FillRandom(&log_dir_generate);
std::string log_dir = "event" + StringToHex(log_dir_generate);
return log_dir;
}
TEST(EVENT_TEST, TEST_BASIC) {
TestEventReporter::event_list.clear();
ray::EventManager::Instance().ClearReporters();
@ -107,6 +205,112 @@ TEST(EVENT_TEST, TEST_BASIC) {
CheckEventDetail(result[3], "", "", "", "GCS", "FATAL", "", "");
}
TEST(EVENT_TEST, LOG_ONE_THREAD) {
std::string log_dir = GenerateLogDir();
ray::EventManager::Instance().ClearReporters();
ray::RayEventContext::Instance().SetEventContext(
rpc::Event_SourceType::Event_SourceType_RAYLET,
std::unordered_map<std::string, std::string>(
{{"node_id", "node 1"}, {"job_id", "job 1"}, {"task_id", "task 1"}}));
ray::EventManager::Instance().AddReporter(std::make_shared<LogEventReporter>(
rpc::Event_SourceType::Event_SourceType_RAYLET, log_dir));
int print_times = 1000;
for (int i = 1; i <= print_times; ++i) {
RAY_EVENT(INFO, "label " + std::to_string(i)) << "send message " + std::to_string(i);
}
std::vector<std::string> vc;
ReadEventFromFile(vc, log_dir + "/event_RAYLET.log");
EXPECT_EQ((int)vc.size(), 1000);
for (int i = 0, len = vc.size(); i < print_times; ++i) {
rpc::Event ele = GetEventFromString(vc[len - print_times + i]);
CheckEventDetail(ele, "job 1", "node 1", "task 1", "RAYLET", "INFO",
"label " + std::to_string(i + 1),
"send message " + std::to_string(i + 1));
}
boost::filesystem::remove_all(log_dir.c_str());
}
TEST(EVENT_TEST, LOG_MULTI_THREAD) {
std::string log_dir = GenerateLogDir();
ray::EventManager::Instance().ClearReporters();
ray::EventManager::Instance().AddReporter(std::make_shared<LogEventReporter>(
rpc::Event_SourceType::Event_SourceType_GCS, log_dir));
int nthreads = 80;
int print_times = 1000;
ParallelRunning(
nthreads, print_times,
[]() {
ray::RayEventContext::Instance().SetEventContext(
rpc::Event_SourceType::Event_SourceType_GCS,
std::unordered_map<std::string, std::string>(
{{"node_id", "node 2"}, {"job_id", "job 2"}, {"task_id", "task 2"}}));
},
[](int loop_i) {
RAY_EVENT(WARNING, "label " + std::to_string(loop_i))
<< "send message " + std::to_string(loop_i);
});
std::vector<std::string> vc;
ReadEventFromFile(vc, log_dir + "/event_GCS.log");
std::set<std::string> label_set;
std::set<std::string> message_set;
for (int i = 0, len = vc.size(); i < print_times; ++i) {
rpc::Event ele = GetEventFromString(vc[len - print_times + i]);
CheckEventDetail(ele, "job 2", "node 2", "task 2", "GCS", "WARNING", "NULL", "NULL");
message_set.insert(ele.message());
label_set.insert(ele.label());
}
EXPECT_EQ(message_set.size(), print_times);
EXPECT_EQ(*(message_set.begin()), "send message 0");
EXPECT_EQ(*(--message_set.end()), "send message " + std::to_string(print_times - 1));
EXPECT_EQ(label_set.size(), print_times);
EXPECT_EQ(*(label_set.begin()), "label 0");
EXPECT_EQ(*(--label_set.end()), "label " + std::to_string(print_times - 1));
boost::filesystem::remove_all(log_dir.c_str());
}
TEST(EVENT_TEST, LOG_ROTATE) {
std::string log_dir = GenerateLogDir();
ray::EventManager::Instance().ClearReporters();
ray::RayEventContext::Instance().SetEventContext(
rpc::Event_SourceType::Event_SourceType_RAYLET,
std::unordered_map<std::string, std::string>(
{{"node_id", "node 1"}, {"job_id", "job 1"}, {"task_id", "task 1"}}));
ray::EventManager::Instance().AddReporter(std::make_shared<LogEventReporter>(
rpc::Event_SourceType::Event_SourceType_RAYLET, log_dir, true, 1, 20));
int print_times = 100000;
for (int i = 1; i <= print_times; ++i) {
RAY_EVENT(INFO, "label " + std::to_string(i)) << "send message " + std::to_string(i);
}
int cnt = 0;
for (auto &entry :
boost::make_iterator_range(boost::filesystem::directory_iterator(log_dir), {})) {
if (entry.path().string().find("event_RAYLET") != std::string::npos) {
cnt++;
}
}
EXPECT_EQ(cnt, 21);
}
} // namespace ray
int main(int argc, char **argv) {