diff --git a/BUILD.bazel b/BUILD.bazel index 4502adadb..e4a59518c 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -861,6 +861,7 @@ cc_test( copts = COPTS, deps = [ ":ray_util", + "@boost//:filesystem", "@com_google_googletest//:gtest_main", ], ) @@ -1344,7 +1345,9 @@ cc_library( ":sha256", "//src/ray/protobuf:event_cc_proto", "@boost//:asio", + "@boost//:property_tree", "@com_github_google_glog//:glog", + "@com_github_spdlog//:spdlog", "@com_google_absl//absl/synchronization", "@com_google_absl//absl/time", "@com_google_googletest//:gtest_main", diff --git a/bazel/BUILD.spdlog b/bazel/BUILD.spdlog new file mode 100644 index 000000000..a8cfb17d5 --- /dev/null +++ b/bazel/BUILD.spdlog @@ -0,0 +1,24 @@ +COPTS = ["-DSPDLOG_COMPILED_LIB"] +cc_library( + name = "spdlog", + srcs = glob([ + "src/*.cpp", + ]), + hdrs = glob([ + "include/spdlog/*.h", + "include/spdlog/cfg/*.h", + "include/spdlog/details/*.h", + "include/spdlog/fmt/*.h", + "include/spdlog/fmt/bundled/*.h", + "include/spdlog/sinks/*.h", + ]), + includes = [ + ".", + "include/", + ], + strip_include_prefix = 'include', + copts = COPTS, + deps = [ + ], + visibility = ["//visibility:public"], +) diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index b69fddf57..84198d416 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -96,6 +96,13 @@ def ray_deps_setup(): ], ) + auto_http_archive( + name = "com_github_spdlog", + build_file = "//bazel:BUILD.spdlog", + urls = ["https://github.com/gabime/spdlog/archive/v1.7.0.zip"], + sha256 = "c8f1e1103e0b148eb8832275d8e68036f2fdd3975a1199af0e844908c56f6ea5", + ) + auto_http_archive( name = "com_github_tporadowski_redis_bin", build_file = "//bazel:BUILD.redis", diff --git a/src/ray/util/event.cc b/src/ray/util/event.cc index 2701804e6..92572e25e 100644 --- a/src/ray/util/event.cc +++ b/src/ray/util/event.cc @@ -15,6 +15,104 @@ #include "ray/util/event.h" namespace ray { +/// +/// LogEventReporter +/// +LogEventReporter::LogEventReporter(rpc::Event_SourceType source_type, + std::string &log_dir, bool force_flush, + int rotate_max_file_size, int rotate_max_file_num) + : log_dir_(log_dir), + force_flush_(force_flush), + rotate_max_file_size_(rotate_max_file_size), + rotate_max_file_num_(rotate_max_file_num) { + RAY_CHECK(log_dir_ != ""); + if (log_dir_.back() != '/') { + log_dir_ += '/'; + } + + // generate file name, if the soucrce type is RAYLET or GCS, the file name would like + // event_GCS.log, event_RAYLET.log other condition would like + // event_CORE_WOREKER_{pid}.log + file_name_ = "event_" + Event_SourceType_Name(source_type); + if (source_type == rpc::Event_SourceType::Event_SourceType_CORE_WORKER || + source_type == rpc::Event_SourceType::Event_SourceType_COMMON) { + file_name_ += "_" + std::to_string(getpid()); + } + file_name_ += ".log"; + + std::string log_sink_key = GetReporterKey() + log_dir_ + file_name_; + log_sink_ = spdlog::get(log_sink_key); + // If the file size is over {rotate_max_file_size_} MB, this file would be renamed + // for example event_GCS.0.log, event_GCS.1.log, event_GCS.2.log ... + // We alow to rotate for {rotate_max_file_num_} times. + if (log_sink_ == nullptr) { + log_sink_ = + spdlog::rotating_logger_mt(log_sink_key, log_dir_ + file_name_, + 1048576 * rotate_max_file_size_, rotate_max_file_num_); + } + log_sink_->set_pattern("%v"); +} + +LogEventReporter::~LogEventReporter() { Flush(); } + +void LogEventReporter::Flush() { log_sink_->flush(); } + +std::string LogEventReporter::EventToString(const rpc::Event &event) { + std::stringstream result; + + boost::property_tree::ptree pt; + + auto time_stamp = event.timestamp(); + std::stringstream time_stamp_buffer; + char time_buffer[30]; + time_t epoch_time_as_time_t = time_stamp / 1000000; + + struct tm *dt = localtime(&epoch_time_as_time_t); + strftime(time_buffer, sizeof(time_buffer), "%Y-%m-%d %H:%M:%S.", dt); + + time_stamp_buffer << std::string(time_buffer) << std::setw(6) << std::setfill('0') + << time_stamp % 1000000; + + pt.put("time_stamp", time_stamp_buffer.str()); + pt.put("severity", Event_Severity_Name(event.severity())); + pt.put("label", event.label()); + pt.put("event_id", event.event_id()); + pt.put("source_type", Event_SourceType_Name(event.source_type())); + pt.put("host_name", event.source_hostname()); + pt.put("pid", std::to_string(event.source_pid())); + pt.put("message", event.message()); + + boost::property_tree::ptree pt_child; + for (auto &ele : event.custom_fields()) { + pt_child.put(ele.first, ele.second); + } + + pt.add_child("custom_fields", pt_child); + + std::stringstream ss; + boost::property_tree::json_parser::write_json(ss, pt, false); + + // the final string is like: + // {"time_stamp":"2020-08-29 14:18:15.998084","severity":"INFO","label":"label + // 1","event_id":"de150792ceb151c815d359d4b675fcc6266a","source_type":"CORE_WORKER","host_name":"Macbool.local","pid":"20830","message":"send + // message 1","custom_fields":{"task_id":"task 1","job_id":"job 1","node_id":"node 1"}} + + return ss.str(); +} + +void LogEventReporter::Report(const rpc::Event &event) { + RAY_CHECK(Event_SourceType_IsValid(event.source_type())); + RAY_CHECK(Event_Severity_IsValid(event.severity())); + std::string result = EventToString(event); + // Pop the last character from the result string because it is breakline '\n'. + result.pop_back(); + + log_sink_->info(result); + if (force_flush_) { + Flush(); + } +} + /// /// EventManager /// diff --git a/src/ray/util/event.h b/src/ray/util/event.h index 8a9243a6a..065fbbcc4 100644 --- a/src/ray/util/event.h +++ b/src/ray/util/event.h @@ -13,7 +13,10 @@ // limitations under the License. #pragma once +#include #include +#include +#include #include #include #include @@ -23,6 +26,9 @@ #include #include "ray/util/logging.h" #include "ray/util/util.h" +#include "spdlog/sinks/basic_file_sink.h" +#include "spdlog/sinks/rotating_file_sink.h" +#include "spdlog/spdlog.h" #include "src/ray/protobuf/event.pb.h" namespace ray { @@ -41,6 +47,38 @@ class BaseEventReporter { virtual std::string GetReporterKey() = 0; }; +// responsible for writing event to specific file +class LogEventReporter : public BaseEventReporter { + public: + LogEventReporter(rpc::Event_SourceType source_type, std::string &log_dir, + bool force_flush = true, int rotate_max_file_size = 100, + int rotate_max_file_num = 20); + + virtual ~LogEventReporter(); + + private: + virtual std::string EventToString(const rpc::Event &event); + + virtual void Init() override {} + + virtual void Report(const rpc::Event &event) override; + + virtual void Close() override {} + + virtual void Flush(); + + virtual std::string GetReporterKey() override { return "log.event.reporter"; } + + protected: + std::string log_dir_; + bool force_flush_; + int rotate_max_file_size_; // MB + int rotate_max_file_num_; + + std::string file_name_; + + std::shared_ptr log_sink_; +}; // store the reporters, add reporters and clean reporters class EventManager final { @@ -66,7 +104,7 @@ class EventManager final { const EventManager &operator=(const EventManager &manager) = delete; - protected: + private: std::unordered_map> reporter_map_; }; diff --git a/src/ray/util/event_test.cc b/src/ray/util/event_test.cc index f9c356867..a6e22e831 100644 --- a/src/ray/util/event_test.cc +++ b/src/ray/util/event_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/util/event.h" +#include #include #include #include @@ -55,11 +56,108 @@ void CheckEventDetail(rpc::Event &event, std::string job_id, std::string node_id EXPECT_EQ(mp.size(), custom_key_num); EXPECT_EQ(rpc::Event_SourceType_Name(event.source_type()), source_type); EXPECT_EQ(rpc::Event_Severity_Name(event.severity()), severity); - EXPECT_EQ(event.label(), label); - EXPECT_EQ(event.message(), message); + + if (label != "NULL") { + EXPECT_EQ(event.label(), label); + } + + if (message != "NULL") { + EXPECT_EQ(event.message(), message); + } + EXPECT_EQ(event.source_pid(), getpid()); } +rpc::Event GetEventFromString(std::string seq) { + std::stringstream ss; + ss << seq << '\n'; + rpc::Event event; + boost::property_tree::ptree pt; + boost::property_tree::read_json(ss, pt); + + std::vector splitArray; + for (auto it = pt.begin(); it != pt.end(); ++it) { + splitArray.push_back(it->first); + splitArray.push_back(it->second.get_value()); + } + + auto pt_custom_fields = pt.get_child_optional("custom_fields"); + for (auto it = pt_custom_fields->begin(); it != pt_custom_fields->end(); ++it) { + splitArray.push_back(it->first); + splitArray.push_back(it->second.get_value()); + } + + EXPECT_EQ(splitArray[2], "severity"); + rpc::Event_Severity severity_ele = + rpc::Event_Severity::Event_Severity_Event_Severity_INT_MIN_SENTINEL_DO_NOT_USE_; + RAY_CHECK(rpc::Event_Severity_Parse(splitArray[3], &severity_ele)); + event.set_severity(severity_ele); + EXPECT_EQ(splitArray[4], "label"); + event.set_label(splitArray[5]); + EXPECT_EQ(splitArray[6], "event_id"); + event.set_event_id(splitArray[7]); + EXPECT_EQ(splitArray[8], "source_type"); + rpc::Event_SourceType source_type_ele = rpc::Event_SourceType:: + Event_SourceType_Event_SourceType_INT_MIN_SENTINEL_DO_NOT_USE_; + RAY_CHECK(rpc::Event_SourceType_Parse(splitArray[9], &source_type_ele)); + event.set_source_type(source_type_ele); + EXPECT_EQ(splitArray[10], "host_name"); + event.set_source_hostname(splitArray[11]); + EXPECT_EQ(splitArray[12], "pid"); + event.set_source_pid(std::stoi(splitArray[13].c_str())); + EXPECT_EQ(splitArray[14], "message"); + event.set_message(splitArray[15]); + EXPECT_EQ(splitArray[16], "custom_fields"); + std::unordered_map custom_fields; + for (int i = 18, len = splitArray.size(); i < len; i += 2) { + custom_fields[splitArray[i]] = splitArray[i + 1]; + } + event.mutable_custom_fields()->insert(custom_fields.begin(), custom_fields.end()); + return event; +} + +void ParallelRunning(int nthreads, int loop_times, + std::function event_context_init, + std::function loop_function) { + if (nthreads > 1) { + std::vector threads(nthreads); + for (int t = 0; t < nthreads; t++) { + threads[t] = std::thread(std::bind( + [&](const int bi, const int ei, const int t) { + event_context_init(); + for (int loop_i = bi; loop_i < ei; loop_i++) { + loop_function(loop_i); + } + }, + t * loop_times / nthreads, + (t + 1) == nthreads ? loop_times : (t + 1) * loop_times / nthreads, t)); + } + std::for_each(threads.begin(), threads.end(), [](std::thread &x) { x.join(); }); + } else { + event_context_init(); + for (int loop_i = 0; loop_i < loop_times; loop_i++) { + loop_function(loop_i); + } + } +} + +void ReadEventFromFile(std::vector &vc, std::string log_file) { + std::string line; + std::ifstream read_file; + read_file.open(log_file, std::ios::binary); + while (std::getline(read_file, line)) { + vc.push_back(line); + } + read_file.close(); +} + +std::string GenerateLogDir() { + std::string log_dir_generate = std::string(5, ' '); + FillRandom(&log_dir_generate); + std::string log_dir = "event" + StringToHex(log_dir_generate); + return log_dir; +} + TEST(EVENT_TEST, TEST_BASIC) { TestEventReporter::event_list.clear(); ray::EventManager::Instance().ClearReporters(); @@ -107,6 +205,112 @@ TEST(EVENT_TEST, TEST_BASIC) { CheckEventDetail(result[3], "", "", "", "GCS", "FATAL", "", ""); } +TEST(EVENT_TEST, LOG_ONE_THREAD) { + std::string log_dir = GenerateLogDir(); + + ray::EventManager::Instance().ClearReporters(); + ray::RayEventContext::Instance().SetEventContext( + rpc::Event_SourceType::Event_SourceType_RAYLET, + std::unordered_map( + {{"node_id", "node 1"}, {"job_id", "job 1"}, {"task_id", "task 1"}})); + + ray::EventManager::Instance().AddReporter(std::make_shared( + rpc::Event_SourceType::Event_SourceType_RAYLET, log_dir)); + + int print_times = 1000; + for (int i = 1; i <= print_times; ++i) { + RAY_EVENT(INFO, "label " + std::to_string(i)) << "send message " + std::to_string(i); + } + + std::vector vc; + ReadEventFromFile(vc, log_dir + "/event_RAYLET.log"); + + EXPECT_EQ((int)vc.size(), 1000); + + for (int i = 0, len = vc.size(); i < print_times; ++i) { + rpc::Event ele = GetEventFromString(vc[len - print_times + i]); + CheckEventDetail(ele, "job 1", "node 1", "task 1", "RAYLET", "INFO", + "label " + std::to_string(i + 1), + "send message " + std::to_string(i + 1)); + } + + boost::filesystem::remove_all(log_dir.c_str()); +} + +TEST(EVENT_TEST, LOG_MULTI_THREAD) { + std::string log_dir = GenerateLogDir(); + + ray::EventManager::Instance().ClearReporters(); + + ray::EventManager::Instance().AddReporter(std::make_shared( + rpc::Event_SourceType::Event_SourceType_GCS, log_dir)); + int nthreads = 80; + int print_times = 1000; + + ParallelRunning( + nthreads, print_times, + []() { + ray::RayEventContext::Instance().SetEventContext( + rpc::Event_SourceType::Event_SourceType_GCS, + std::unordered_map( + {{"node_id", "node 2"}, {"job_id", "job 2"}, {"task_id", "task 2"}})); + }, + [](int loop_i) { + RAY_EVENT(WARNING, "label " + std::to_string(loop_i)) + << "send message " + std::to_string(loop_i); + }); + + std::vector vc; + ReadEventFromFile(vc, log_dir + "/event_GCS.log"); + + std::set label_set; + std::set message_set; + + for (int i = 0, len = vc.size(); i < print_times; ++i) { + rpc::Event ele = GetEventFromString(vc[len - print_times + i]); + CheckEventDetail(ele, "job 2", "node 2", "task 2", "GCS", "WARNING", "NULL", "NULL"); + message_set.insert(ele.message()); + label_set.insert(ele.label()); + } + + EXPECT_EQ(message_set.size(), print_times); + EXPECT_EQ(*(message_set.begin()), "send message 0"); + EXPECT_EQ(*(--message_set.end()), "send message " + std::to_string(print_times - 1)); + EXPECT_EQ(label_set.size(), print_times); + EXPECT_EQ(*(label_set.begin()), "label 0"); + EXPECT_EQ(*(--label_set.end()), "label " + std::to_string(print_times - 1)); + + boost::filesystem::remove_all(log_dir.c_str()); +} + +TEST(EVENT_TEST, LOG_ROTATE) { + std::string log_dir = GenerateLogDir(); + + ray::EventManager::Instance().ClearReporters(); + ray::RayEventContext::Instance().SetEventContext( + rpc::Event_SourceType::Event_SourceType_RAYLET, + std::unordered_map( + {{"node_id", "node 1"}, {"job_id", "job 1"}, {"task_id", "task 1"}})); + + ray::EventManager::Instance().AddReporter(std::make_shared( + rpc::Event_SourceType::Event_SourceType_RAYLET, log_dir, true, 1, 20)); + + int print_times = 100000; + for (int i = 1; i <= print_times; ++i) { + RAY_EVENT(INFO, "label " + std::to_string(i)) << "send message " + std::to_string(i); + } + + int cnt = 0; + for (auto &entry : + boost::make_iterator_range(boost::filesystem::directory_iterator(log_dir), {})) { + if (entry.path().string().find("event_RAYLET") != std::string::npos) { + cnt++; + } + } + + EXPECT_EQ(cnt, 21); +} + } // namespace ray int main(int argc, char **argv) {