diff --git a/BUILD.bazel b/BUILD.bazel index 934b6e430..d05c87a25 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1571,6 +1571,23 @@ cc_test( ], ) +cc_test( + name = "memory_monitor_test", + size = "small", + srcs = [ + "src/ray/common/test/memory_monitor_test.cc", + ], + copts = COPTS, + tags = ["team:core"], + target_compatible_with = [ + "@platforms//os:linux", + ], + deps = [ + ":ray_common", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "pubsub_integration_test", timeout = "short", diff --git a/src/ray/common/memory_monitor.cc b/src/ray/common/memory_monitor.cc new file mode 100644 index 000000000..3fc2ad2c4 --- /dev/null +++ b/src/ray/common/memory_monitor.cc @@ -0,0 +1,208 @@ +// Copyright 2022 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/common/memory_monitor.h" + +#include +#include // std::ifstream +#include + +#include "ray/common/ray_config.h" +#include "ray/util/logging.h" + +namespace ray { + +MemoryMonitor::MemoryMonitor(float usage_threshold, + uint64_t monitor_interval_ms, + MemoryUsageRefreshCallback monitor_callback) + : usage_threshold_(usage_threshold), + monitor_callback_(monitor_callback), + io_context_(), + monitor_thread_([this] { + boost::asio::io_service::work io_service_work_(io_context_); + io_context_.run(); + }), + runner_(io_context_) { + RAY_CHECK(monitor_callback_ != nullptr); + RAY_CHECK_GE(usage_threshold_, 0); + RAY_CHECK_LE(usage_threshold_, 1); + if (monitor_interval_ms > 0) { +#ifdef __linux__ + runner_.RunFnPeriodically( + [this] { + bool is_usage_above_threshold = IsUsageAboveThreshold(); + monitor_callback_(is_usage_above_threshold); + }, + monitor_interval_ms, + "MemoryMonitor.CheckIsMemoryUsageAboveThreshold"); + RAY_LOG(INFO) << "MemoryMonitor initialized"; +#else + RAY_LOG(WARNING) << "Not running MemoryMonitor. It is currently supported " + << "only on Linux."; +#endif + } else { + RAY_LOG(INFO) << "MemoryMonitor disabled. Specify " + << "`memory_monitor_interval_ms` > 0 to enable the monitor."; + } +} + +bool MemoryMonitor::IsUsageAboveThreshold() { + auto [used_memory_bytes, total_memory_bytes] = GetMemoryBytes(); + if (total_memory_bytes == kNull || used_memory_bytes == kNull) { + RAY_LOG_EVERY_MS(WARNING, kLogIntervalMs) + << "Unable to capture node memory. Monitor will not be able " + << "to detect memory usage above threshold."; + return false; + } + auto usage_fraction = static_cast(used_memory_bytes) / total_memory_bytes; + bool is_usage_above_threshold = usage_fraction > usage_threshold_; + if (is_usage_above_threshold) { + RAY_LOG_EVERY_MS(INFO, kLogIntervalMs) + << "Node memory usage above threshold, used: " << used_memory_bytes + << ", total: " << total_memory_bytes << ", usage fraction: " << usage_fraction + << ", threshold: " << usage_threshold_; + } + return is_usage_above_threshold; +} + +std::tuple MemoryMonitor::GetMemoryBytes() { + auto [cgroup_used_bytes, cgroup_total_bytes] = GetCGroupMemoryBytes(); +#ifndef __linux__ + RAY_CHECK(false) << "Memory monitor currently supports only linux"; +#endif + auto [system_used_bytes, system_total_bytes] = GetLinuxMemoryBytes(); + /// cgroup memory limit can be higher than system memory limit when it is + /// not used. We take its value only when it is less than or equal to system memory + /// limit. TODO(clarng): find a better way to detect cgroup memory limit is used. + system_total_bytes = NullableMin(system_total_bytes, cgroup_total_bytes); + /// This assumes cgroup total bytes will look different than system (meminfo) + if (system_total_bytes == cgroup_total_bytes) { + system_used_bytes = cgroup_used_bytes; + } + return std::tuple(system_used_bytes, system_total_bytes); +} + +std::tuple MemoryMonitor::GetCGroupMemoryBytes() { + int64_t total_bytes = kNull; + if (std::filesystem::exists(kCgroupsV2MemoryMaxPath)) { + std::ifstream mem_file(kCgroupsV2MemoryMaxPath, std::ios::in | std::ios::binary); + mem_file >> total_bytes; + } else if (std::filesystem::exists(kCgroupsV1MemoryMaxPath)) { + std::ifstream mem_file(kCgroupsV1MemoryMaxPath, std::ios::in | std::ios::binary); + mem_file >> total_bytes; + } + + int64_t used_bytes = kNull; + if (std::filesystem::exists(kCgroupsV2MemoryUsagePath)) { + std::ifstream mem_file(kCgroupsV2MemoryUsagePath, std::ios::in | std::ios::binary); + mem_file >> used_bytes; + } else if (std::filesystem::exists(kCgroupsV1MemoryUsagePath)) { + std::ifstream mem_file(kCgroupsV1MemoryUsagePath, std::ios::in | std::ios::binary); + mem_file >> used_bytes; + } + + RAY_CHECK((total_bytes == kNull && used_bytes == kNull) || + (total_bytes != kNull && used_bytes != kNull)); + if (total_bytes != kNull) { + RAY_CHECK_GT(used_bytes, 0); + RAY_CHECK_GT(total_bytes, used_bytes); + } + + return {used_bytes, total_bytes}; +} + +std::tuple MemoryMonitor::GetLinuxMemoryBytes() { + std::string meminfo_path = "/proc/meminfo"; + std::ifstream meminfo_ifs(meminfo_path, std::ios::in | std::ios::binary); + if (!meminfo_ifs.is_open()) { + RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs) << " file not found: " << meminfo_path; + return {kNull, kNull}; + } + std::string line; + std::string title; + uint64_t value; + std::string unit; + + int64_t mem_total_bytes = kNull; + int64_t mem_available_bytes = kNull; + int64_t mem_free_bytes = kNull; + int64_t cached_bytes = kNull; + int64_t buffer_bytes = kNull; + while (std::getline(meminfo_ifs, line)) { + std::istringstream iss(line); + iss >> title >> value >> unit; + /// Linux reports them as kiB + RAY_CHECK(unit == "kB"); + value = value * 1024; + if (title == "MemAvailable:") { + mem_available_bytes = value; + } else if (title == "MemFree:") { + mem_free_bytes = value; + } else if (title == "Cached:") { + cached_bytes = value; + } else if (title == "Buffers:") { + buffer_bytes = value; + } else if (title == "MemTotal:") { + mem_total_bytes = value; + } + } + if (mem_total_bytes == kNull) { + RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs) + << "Unable to determine total bytes . Will return null"; + return {kNull, kNull}; + } + + int64_t available_bytes = kNull; + /// Follows logic from psutil + if (mem_available_bytes > 0) { + available_bytes = mem_available_bytes; + } else if (mem_free_bytes != kNull && cached_bytes != kNull && buffer_bytes != kNull) { + available_bytes = mem_free_bytes + cached_bytes + buffer_bytes; + } + + if (available_bytes == kNull) { + RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs) + << "Unable to determine available bytes. Will return null"; + return {kNull, kNull}; + } + if (mem_total_bytes < available_bytes) { + RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs) + << "Total bytes less than available bytes. Will return null"; + return {kNull, kNull}; + } + auto used_bytes = mem_total_bytes - available_bytes; + return {used_bytes, mem_total_bytes}; +} + +int64_t MemoryMonitor::NullableMin(int64_t left, int64_t right) { + RAY_CHECK_GE(left, kNull); + RAY_CHECK_GE(right, kNull); + + if (left == kNull) { + return right; + } else if (right == kNull) { + return left; + } else { + return std::min(left, right); + } +} + +MemoryMonitor::~MemoryMonitor() { + io_context_.stop(); + if (monitor_thread_.joinable()) { + monitor_thread_.join(); + } +} + +} // namespace ray diff --git a/src/ray/common/memory_monitor.h b/src/ray/common/memory_monitor.h new file mode 100644 index 000000000..35afa86f6 --- /dev/null +++ b/src/ray/common/memory_monitor.h @@ -0,0 +1,91 @@ +// Copyright 2022 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include + +#include "ray/common/asio/instrumented_io_context.h" +#include "ray/common/asio/periodical_runner.h" + +namespace ray { +/// Callback that runs at each monitoring interval. +/// +/// \param is_usage_above_threshold true if memory usage is above the usage +/// threshold at this instant. +using MemoryUsageRefreshCallback = std::function; + +/// Monitors the memory usage of the node. +/// It checks the memory usage p +/// This class is thread safe. +class MemoryMonitor { + public: + /// Constructor. + /// + /// \param usage_threshold a value in [0-1] to indicate the max usage. + /// \param monitor_interval_ms the frequency to update the usage. 0 disables the + /// the monitor and callbacks won't fire. + /// \param monitor_callback function to execute on a dedicated thread owned by this + /// monitor when the usage is refreshed. + MemoryMonitor(float usage_threshold, + uint64_t monitor_interval_ms, + MemoryUsageRefreshCallback monitor_callback); + + ~MemoryMonitor(); + + private: + static constexpr char kCgroupsV1MemoryMaxPath[] = + "/sys/fs/cgroup/memory/memory.limit_in_bytes"; + static constexpr char kCgroupsV1MemoryUsagePath[] = + "/sys/fs/cgroup/memory/memory.usage_in_bytes"; + static constexpr char kCgroupsV2MemoryMaxPath[] = "/sys/fs/cgroup/memory.max"; + static constexpr char kCgroupsV2MemoryUsagePath[] = "/sys/fs/cgroup/memory.current"; + static constexpr uint32_t kLogIntervalMs = 5000; + static constexpr int64_t kNull = -1; + + /// Returns true if the memory usage of this node is above the threshold. + bool IsUsageAboveThreshold(); + + /// Returns the used and total memory in bytes. + std::tuple GetMemoryBytes(); + + /// Returns the used and total memory in bytes from Cgroup. + std::tuple GetCGroupMemoryBytes(); + + /// Returns the used and total memory in bytes for linux OS. + std::tuple GetLinuxMemoryBytes(); + + /// Returns the smaller of the two integers, kNull if both are kNull, + /// or one of the values if the other is kNull. + static int64_t NullableMin(int64_t left, int64_t right); + + private: + FRIEND_TEST(MemoryMonitorTest, TestThresholdZeroMonitorAlwaysAboveThreshold); + FRIEND_TEST(MemoryMonitorTest, TestThresholdOneMonitorAlwaysBelowThreshold); + FRIEND_TEST(MemoryMonitorTest, TestGetNodeAvailableMemoryAlwaysPositive); + FRIEND_TEST(MemoryMonitorTest, TestGetNodeTotalMemoryEqualsFreeOrCGroup); + + /// Memory usage fraction between [0, 1] + const double usage_threshold_; + /// Callback function that executes at each monitoring interval, + /// on a dedicated thread managed by this class. + const MemoryUsageRefreshCallback monitor_callback_; + instrumented_io_context io_context_; + std::thread monitor_thread_; + PeriodicalRunner runner_; +}; + +} // namespace ray diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 2c37bed4e..8bff0ed3f 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -76,6 +76,17 @@ RAY_CONFIG(uint64_t, raylet_report_resources_period_milliseconds, 100) /// The duration between raylet check memory pressure and send gc request RAY_CONFIG(uint64_t, raylet_check_gc_period_milliseconds, 100) +/// Threshold when the node is beyond the memory capacity. +/// Ranging from [0, 1] +RAY_CONFIG(float, memory_usage_threshold_fraction, 0.9) + +/// The interval between runs of the memory usage monitor. +/// Monitor is disabled when this value is 0. +RAY_CONFIG(uint64_t, memory_monitor_interval_ms, 0) + +/// If the raylet fails to get agent info, we will retry after this interval. +RAY_CONFIG(uint64_t, raylet_get_agent_info_interval_ms, 1) + /// For a raylet, if the last resource report was sent more than this many /// report periods ago, then a warning will be logged that the report /// handler is drifting. diff --git a/src/ray/common/test/memory_monitor_test.cc b/src/ray/common/test/memory_monitor_test.cc new file mode 100644 index 000000000..33d232162 --- /dev/null +++ b/src/ray/common/test/memory_monitor_test.cc @@ -0,0 +1,99 @@ +// Copyright 2022 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/common/memory_monitor.h" + +#include + +#include "gtest/gtest.h" +#include "ray/util/process.h" + +namespace ray { +class MemoryMonitorTest : public ::testing::Test {}; + +TEST_F(MemoryMonitorTest, TestThresholdZeroMonitorAlwaysAboveThreshold) { + MemoryMonitor monitor( + 0 /*usage_threshold*/, + 0 /*refresh_interval_ms*/, + [](bool is_usage_above_threshold) { FAIL() << "Expected monitor to not run"; }); + ASSERT_TRUE(monitor.IsUsageAboveThreshold()); +} + +TEST_F(MemoryMonitorTest, TestThresholdOneMonitorAlwaysBelowThreshold) { + MemoryMonitor monitor( + 1 /*usage_threshold*/, + 0 /*refresh_interval_ms*/, + [](bool is_usage_above_threshold) { FAIL() << "Expected monitor to not run"; }); + ASSERT_FALSE(monitor.IsUsageAboveThreshold()); +} + +TEST_F(MemoryMonitorTest, TestGetNodeAvailableMemoryAlwaysPositive) { + { + MemoryMonitor monitor( + 0 /*usage_threshold*/, + 0 /*refresh_interval_ms*/, + [](bool is_usage_above_threshold) { FAIL() << "Expected monitor to not run"; }); + auto [used_bytes, total_bytes] = monitor.GetMemoryBytes(); + ASSERT_GT(total_bytes, 0); + ASSERT_GT(total_bytes, used_bytes); + } +} + +TEST_F(MemoryMonitorTest, TestGetNodeTotalMemoryEqualsFreeOrCGroup) { + { + MemoryMonitor monitor( + 0 /*usage_threshold*/, + 0 /*refresh_interval_ms*/, + [](bool is_usage_above_threshold) { FAIL() << "Expected monitor to not run"; }); + auto [used_bytes, total_bytes] = monitor.GetMemoryBytes(); + auto [cgroup_used_bytes, cgroup_total_bytes] = monitor.GetCGroupMemoryBytes(); + + auto cmd_out = Process::Exec("free -b"); + std::string title; + std::string total; + std::string used; + std::string free; + std::string shared; + std::string cache; + std::string available; + std::istringstream cmd_out_ss(cmd_out); + cmd_out_ss >> total >> used >> free >> shared >> cache >> available; + cmd_out_ss >> title >> total >> used >> free >> shared >> cache >> available; + + int64_t free_total_bytes; + std::istringstream total_ss(total); + total_ss >> free_total_bytes; + + ASSERT_TRUE(total_bytes == free_total_bytes || total_bytes == cgroup_total_bytes); + } +} + +TEST_F(MemoryMonitorTest, TestMonitorPeriodSetCallbackExecuted) { + std::condition_variable callback_ran; + std::mutex callback_ran_mutex; + + MemoryMonitor monitor( + 1 /*usage_threshold*/, + 1 /*refresh_interval_ms*/, + [&callback_ran](bool is_usage_above_threshold) { callback_ran.notify_all(); }); + std::unique_lock callback_ran_mutex_lock(callback_ran_mutex); + callback_ran.wait(callback_ran_mutex_lock); +} + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index 4ccb9c9fa..82be9ba34 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -375,6 +375,23 @@ std::error_code Process::Call(const std::vector &args, return ec; } +std::string Process::Exec(const std::string command) { + /// Based on answer in + /// https://stackoverflow.com/questions/478898/how-do-i-execute-a-command-and-get-the-output-of-the-command-within-c-using-po + std::array buffer; + std::string result; +#ifdef _WIN32 + std::unique_ptr pipe(_popen(command.c_str(), "r"), _pclose); +#else + std::unique_ptr pipe(popen(command.c_str(), "r"), pclose); +#endif + RAY_CHECK(pipe) << "popen() failed for command: " + command; + while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) { + result += buffer.data(); + } + return result; +} + Process Process::CreateNewDummy() { pid_t pid = -1; Process result(pid); diff --git a/src/ray/util/process.h b/src/ray/util/process.h index 8771326e5..2e79df06d 100644 --- a/src/ray/util/process.h +++ b/src/ray/util/process.h @@ -81,6 +81,11 @@ class Process { /// Convenience function to run the given command line and wait for it to finish. static std::error_code Call(const std::vector &args, const ProcessEnvironment &env = {}); + /// Executes command line operation. + /// + /// \param[in] argv The command line command to execute. + /// \return The output from the command. + static std::string Exec(const std::string command); static Process CreateNewDummy(); static Process FromPid(pid_t pid); pid_t GetId() const;