memory monitor (#27017)

Signed-off-by: Clarence Ng clarence.wyng@gmail.com

Why are these changes needed?
This PR adds a memory monitor in cpp that runs periodically to check if the node memory usage is above a certain threshold. The caller may provide a callback to the monitor to execute at each interval to determine whether an action should be taken.

This PR is a no-op since the monitor is disabled by default. Another PR based on this will implement the monitor to take action when memory is running low
This commit is contained in:
clarng 2022-08-01 10:40:46 -07:00 committed by GitHub
parent 1d5fef2004
commit 57adde3f7d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 448 additions and 0 deletions

View file

@ -1571,6 +1571,23 @@ cc_test(
], ],
) )
cc_test(
name = "memory_monitor_test",
size = "small",
srcs = [
"src/ray/common/test/memory_monitor_test.cc",
],
copts = COPTS,
tags = ["team:core"],
target_compatible_with = [
"@platforms//os:linux",
],
deps = [
":ray_common",
"@com_google_googletest//:gtest_main",
],
)
cc_test( cc_test(
name = "pubsub_integration_test", name = "pubsub_integration_test",
timeout = "short", timeout = "short",

View file

@ -0,0 +1,208 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/common/memory_monitor.h"
#include <filesystem>
#include <fstream> // std::ifstream
#include <tuple>
#include "ray/common/ray_config.h"
#include "ray/util/logging.h"
namespace ray {
MemoryMonitor::MemoryMonitor(float usage_threshold,
uint64_t monitor_interval_ms,
MemoryUsageRefreshCallback monitor_callback)
: usage_threshold_(usage_threshold),
monitor_callback_(monitor_callback),
io_context_(),
monitor_thread_([this] {
boost::asio::io_service::work io_service_work_(io_context_);
io_context_.run();
}),
runner_(io_context_) {
RAY_CHECK(monitor_callback_ != nullptr);
RAY_CHECK_GE(usage_threshold_, 0);
RAY_CHECK_LE(usage_threshold_, 1);
if (monitor_interval_ms > 0) {
#ifdef __linux__
runner_.RunFnPeriodically(
[this] {
bool is_usage_above_threshold = IsUsageAboveThreshold();
monitor_callback_(is_usage_above_threshold);
},
monitor_interval_ms,
"MemoryMonitor.CheckIsMemoryUsageAboveThreshold");
RAY_LOG(INFO) << "MemoryMonitor initialized";
#else
RAY_LOG(WARNING) << "Not running MemoryMonitor. It is currently supported "
<< "only on Linux.";
#endif
} else {
RAY_LOG(INFO) << "MemoryMonitor disabled. Specify "
<< "`memory_monitor_interval_ms` > 0 to enable the monitor.";
}
}
bool MemoryMonitor::IsUsageAboveThreshold() {
auto [used_memory_bytes, total_memory_bytes] = GetMemoryBytes();
if (total_memory_bytes == kNull || used_memory_bytes == kNull) {
RAY_LOG_EVERY_MS(WARNING, kLogIntervalMs)
<< "Unable to capture node memory. Monitor will not be able "
<< "to detect memory usage above threshold.";
return false;
}
auto usage_fraction = static_cast<float>(used_memory_bytes) / total_memory_bytes;
bool is_usage_above_threshold = usage_fraction > usage_threshold_;
if (is_usage_above_threshold) {
RAY_LOG_EVERY_MS(INFO, kLogIntervalMs)
<< "Node memory usage above threshold, used: " << used_memory_bytes
<< ", total: " << total_memory_bytes << ", usage fraction: " << usage_fraction
<< ", threshold: " << usage_threshold_;
}
return is_usage_above_threshold;
}
std::tuple<int64_t, int64_t> MemoryMonitor::GetMemoryBytes() {
auto [cgroup_used_bytes, cgroup_total_bytes] = GetCGroupMemoryBytes();
#ifndef __linux__
RAY_CHECK(false) << "Memory monitor currently supports only linux";
#endif
auto [system_used_bytes, system_total_bytes] = GetLinuxMemoryBytes();
/// cgroup memory limit can be higher than system memory limit when it is
/// not used. We take its value only when it is less than or equal to system memory
/// limit. TODO(clarng): find a better way to detect cgroup memory limit is used.
system_total_bytes = NullableMin(system_total_bytes, cgroup_total_bytes);
/// This assumes cgroup total bytes will look different than system (meminfo)
if (system_total_bytes == cgroup_total_bytes) {
system_used_bytes = cgroup_used_bytes;
}
return std::tuple(system_used_bytes, system_total_bytes);
}
std::tuple<int64_t, int64_t> MemoryMonitor::GetCGroupMemoryBytes() {
int64_t total_bytes = kNull;
if (std::filesystem::exists(kCgroupsV2MemoryMaxPath)) {
std::ifstream mem_file(kCgroupsV2MemoryMaxPath, std::ios::in | std::ios::binary);
mem_file >> total_bytes;
} else if (std::filesystem::exists(kCgroupsV1MemoryMaxPath)) {
std::ifstream mem_file(kCgroupsV1MemoryMaxPath, std::ios::in | std::ios::binary);
mem_file >> total_bytes;
}
int64_t used_bytes = kNull;
if (std::filesystem::exists(kCgroupsV2MemoryUsagePath)) {
std::ifstream mem_file(kCgroupsV2MemoryUsagePath, std::ios::in | std::ios::binary);
mem_file >> used_bytes;
} else if (std::filesystem::exists(kCgroupsV1MemoryUsagePath)) {
std::ifstream mem_file(kCgroupsV1MemoryUsagePath, std::ios::in | std::ios::binary);
mem_file >> used_bytes;
}
RAY_CHECK((total_bytes == kNull && used_bytes == kNull) ||
(total_bytes != kNull && used_bytes != kNull));
if (total_bytes != kNull) {
RAY_CHECK_GT(used_bytes, 0);
RAY_CHECK_GT(total_bytes, used_bytes);
}
return {used_bytes, total_bytes};
}
std::tuple<int64_t, int64_t> MemoryMonitor::GetLinuxMemoryBytes() {
std::string meminfo_path = "/proc/meminfo";
std::ifstream meminfo_ifs(meminfo_path, std::ios::in | std::ios::binary);
if (!meminfo_ifs.is_open()) {
RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs) << " file not found: " << meminfo_path;
return {kNull, kNull};
}
std::string line;
std::string title;
uint64_t value;
std::string unit;
int64_t mem_total_bytes = kNull;
int64_t mem_available_bytes = kNull;
int64_t mem_free_bytes = kNull;
int64_t cached_bytes = kNull;
int64_t buffer_bytes = kNull;
while (std::getline(meminfo_ifs, line)) {
std::istringstream iss(line);
iss >> title >> value >> unit;
/// Linux reports them as kiB
RAY_CHECK(unit == "kB");
value = value * 1024;
if (title == "MemAvailable:") {
mem_available_bytes = value;
} else if (title == "MemFree:") {
mem_free_bytes = value;
} else if (title == "Cached:") {
cached_bytes = value;
} else if (title == "Buffers:") {
buffer_bytes = value;
} else if (title == "MemTotal:") {
mem_total_bytes = value;
}
}
if (mem_total_bytes == kNull) {
RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs)
<< "Unable to determine total bytes . Will return null";
return {kNull, kNull};
}
int64_t available_bytes = kNull;
/// Follows logic from psutil
if (mem_available_bytes > 0) {
available_bytes = mem_available_bytes;
} else if (mem_free_bytes != kNull && cached_bytes != kNull && buffer_bytes != kNull) {
available_bytes = mem_free_bytes + cached_bytes + buffer_bytes;
}
if (available_bytes == kNull) {
RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs)
<< "Unable to determine available bytes. Will return null";
return {kNull, kNull};
}
if (mem_total_bytes < available_bytes) {
RAY_LOG_EVERY_MS(ERROR, kLogIntervalMs)
<< "Total bytes less than available bytes. Will return null";
return {kNull, kNull};
}
auto used_bytes = mem_total_bytes - available_bytes;
return {used_bytes, mem_total_bytes};
}
int64_t MemoryMonitor::NullableMin(int64_t left, int64_t right) {
RAY_CHECK_GE(left, kNull);
RAY_CHECK_GE(right, kNull);
if (left == kNull) {
return right;
} else if (right == kNull) {
return left;
} else {
return std::min(left, right);
}
}
MemoryMonitor::~MemoryMonitor() {
io_context_.stop();
if (monitor_thread_.joinable()) {
monitor_thread_.join();
}
}
} // namespace ray

View file

@ -0,0 +1,91 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <gtest/gtest_prod.h>
#include <boost/filesystem.hpp>
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
namespace ray {
/// Callback that runs at each monitoring interval.
///
/// \param is_usage_above_threshold true if memory usage is above the usage
/// threshold at this instant.
using MemoryUsageRefreshCallback = std::function<void(bool is_usage_above_threshold)>;
/// Monitors the memory usage of the node.
/// It checks the memory usage p
/// This class is thread safe.
class MemoryMonitor {
public:
/// Constructor.
///
/// \param usage_threshold a value in [0-1] to indicate the max usage.
/// \param monitor_interval_ms the frequency to update the usage. 0 disables the
/// the monitor and callbacks won't fire.
/// \param monitor_callback function to execute on a dedicated thread owned by this
/// monitor when the usage is refreshed.
MemoryMonitor(float usage_threshold,
uint64_t monitor_interval_ms,
MemoryUsageRefreshCallback monitor_callback);
~MemoryMonitor();
private:
static constexpr char kCgroupsV1MemoryMaxPath[] =
"/sys/fs/cgroup/memory/memory.limit_in_bytes";
static constexpr char kCgroupsV1MemoryUsagePath[] =
"/sys/fs/cgroup/memory/memory.usage_in_bytes";
static constexpr char kCgroupsV2MemoryMaxPath[] = "/sys/fs/cgroup/memory.max";
static constexpr char kCgroupsV2MemoryUsagePath[] = "/sys/fs/cgroup/memory.current";
static constexpr uint32_t kLogIntervalMs = 5000;
static constexpr int64_t kNull = -1;
/// Returns true if the memory usage of this node is above the threshold.
bool IsUsageAboveThreshold();
/// Returns the used and total memory in bytes.
std::tuple<int64_t, int64_t> GetMemoryBytes();
/// Returns the used and total memory in bytes from Cgroup.
std::tuple<int64_t, int64_t> GetCGroupMemoryBytes();
/// Returns the used and total memory in bytes for linux OS.
std::tuple<int64_t, int64_t> GetLinuxMemoryBytes();
/// Returns the smaller of the two integers, kNull if both are kNull,
/// or one of the values if the other is kNull.
static int64_t NullableMin(int64_t left, int64_t right);
private:
FRIEND_TEST(MemoryMonitorTest, TestThresholdZeroMonitorAlwaysAboveThreshold);
FRIEND_TEST(MemoryMonitorTest, TestThresholdOneMonitorAlwaysBelowThreshold);
FRIEND_TEST(MemoryMonitorTest, TestGetNodeAvailableMemoryAlwaysPositive);
FRIEND_TEST(MemoryMonitorTest, TestGetNodeTotalMemoryEqualsFreeOrCGroup);
/// Memory usage fraction between [0, 1]
const double usage_threshold_;
/// Callback function that executes at each monitoring interval,
/// on a dedicated thread managed by this class.
const MemoryUsageRefreshCallback monitor_callback_;
instrumented_io_context io_context_;
std::thread monitor_thread_;
PeriodicalRunner runner_;
};
} // namespace ray

View file

@ -76,6 +76,17 @@ RAY_CONFIG(uint64_t, raylet_report_resources_period_milliseconds, 100)
/// The duration between raylet check memory pressure and send gc request /// The duration between raylet check memory pressure and send gc request
RAY_CONFIG(uint64_t, raylet_check_gc_period_milliseconds, 100) RAY_CONFIG(uint64_t, raylet_check_gc_period_milliseconds, 100)
/// Threshold when the node is beyond the memory capacity.
/// Ranging from [0, 1]
RAY_CONFIG(float, memory_usage_threshold_fraction, 0.9)
/// The interval between runs of the memory usage monitor.
/// Monitor is disabled when this value is 0.
RAY_CONFIG(uint64_t, memory_monitor_interval_ms, 0)
/// If the raylet fails to get agent info, we will retry after this interval.
RAY_CONFIG(uint64_t, raylet_get_agent_info_interval_ms, 1)
/// For a raylet, if the last resource report was sent more than this many /// For a raylet, if the last resource report was sent more than this many
/// report periods ago, then a warning will be logged that the report /// report periods ago, then a warning will be logged that the report
/// handler is drifting. /// handler is drifting.

View file

@ -0,0 +1,99 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/common/memory_monitor.h"
#include <sys/sysinfo.h>
#include "gtest/gtest.h"
#include "ray/util/process.h"
namespace ray {
class MemoryMonitorTest : public ::testing::Test {};
TEST_F(MemoryMonitorTest, TestThresholdZeroMonitorAlwaysAboveThreshold) {
MemoryMonitor monitor(
0 /*usage_threshold*/,
0 /*refresh_interval_ms*/,
[](bool is_usage_above_threshold) { FAIL() << "Expected monitor to not run"; });
ASSERT_TRUE(monitor.IsUsageAboveThreshold());
}
TEST_F(MemoryMonitorTest, TestThresholdOneMonitorAlwaysBelowThreshold) {
MemoryMonitor monitor(
1 /*usage_threshold*/,
0 /*refresh_interval_ms*/,
[](bool is_usage_above_threshold) { FAIL() << "Expected monitor to not run"; });
ASSERT_FALSE(monitor.IsUsageAboveThreshold());
}
TEST_F(MemoryMonitorTest, TestGetNodeAvailableMemoryAlwaysPositive) {
{
MemoryMonitor monitor(
0 /*usage_threshold*/,
0 /*refresh_interval_ms*/,
[](bool is_usage_above_threshold) { FAIL() << "Expected monitor to not run"; });
auto [used_bytes, total_bytes] = monitor.GetMemoryBytes();
ASSERT_GT(total_bytes, 0);
ASSERT_GT(total_bytes, used_bytes);
}
}
TEST_F(MemoryMonitorTest, TestGetNodeTotalMemoryEqualsFreeOrCGroup) {
{
MemoryMonitor monitor(
0 /*usage_threshold*/,
0 /*refresh_interval_ms*/,
[](bool is_usage_above_threshold) { FAIL() << "Expected monitor to not run"; });
auto [used_bytes, total_bytes] = monitor.GetMemoryBytes();
auto [cgroup_used_bytes, cgroup_total_bytes] = monitor.GetCGroupMemoryBytes();
auto cmd_out = Process::Exec("free -b");
std::string title;
std::string total;
std::string used;
std::string free;
std::string shared;
std::string cache;
std::string available;
std::istringstream cmd_out_ss(cmd_out);
cmd_out_ss >> total >> used >> free >> shared >> cache >> available;
cmd_out_ss >> title >> total >> used >> free >> shared >> cache >> available;
int64_t free_total_bytes;
std::istringstream total_ss(total);
total_ss >> free_total_bytes;
ASSERT_TRUE(total_bytes == free_total_bytes || total_bytes == cgroup_total_bytes);
}
}
TEST_F(MemoryMonitorTest, TestMonitorPeriodSetCallbackExecuted) {
std::condition_variable callback_ran;
std::mutex callback_ran_mutex;
MemoryMonitor monitor(
1 /*usage_threshold*/,
1 /*refresh_interval_ms*/,
[&callback_ran](bool is_usage_above_threshold) { callback_ran.notify_all(); });
std::unique_lock<std::mutex> callback_ran_mutex_lock(callback_ran_mutex);
callback_ran.wait(callback_ran_mutex_lock);
}
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View file

@ -375,6 +375,23 @@ std::error_code Process::Call(const std::vector<std::string> &args,
return ec; return ec;
} }
std::string Process::Exec(const std::string command) {
/// Based on answer in
/// https://stackoverflow.com/questions/478898/how-do-i-execute-a-command-and-get-the-output-of-the-command-within-c-using-po
std::array<char, 128> buffer;
std::string result;
#ifdef _WIN32
std::unique_ptr<FILE, decltype(&_pclose)> pipe(_popen(command.c_str(), "r"), _pclose);
#else
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(command.c_str(), "r"), pclose);
#endif
RAY_CHECK(pipe) << "popen() failed for command: " + command;
while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) {
result += buffer.data();
}
return result;
}
Process Process::CreateNewDummy() { Process Process::CreateNewDummy() {
pid_t pid = -1; pid_t pid = -1;
Process result(pid); Process result(pid);

View file

@ -81,6 +81,11 @@ class Process {
/// Convenience function to run the given command line and wait for it to finish. /// Convenience function to run the given command line and wait for it to finish.
static std::error_code Call(const std::vector<std::string> &args, static std::error_code Call(const std::vector<std::string> &args,
const ProcessEnvironment &env = {}); const ProcessEnvironment &env = {});
/// Executes command line operation.
///
/// \param[in] argv The command line command to execute.
/// \return The output from the command.
static std::string Exec(const std::string command);
static Process CreateNewDummy(); static Process CreateNewDummy();
static Process FromPid(pid_t pid); static Process FromPid(pid_t pid);
pid_t GetId() const; pid_t GetId() const;