ray memory should collect statistics from all nodes (#7721)

This commit is contained in:
Eric Liang 2020-03-25 16:31:31 -07:00 committed by GitHub
parent 46404d8a0b
commit 23b6fdcda1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 18 deletions

View file

@ -221,7 +221,7 @@ py_test(
py_test(
name = "test_global_gc",
size = "small",
size = "medium",
srcs = ["test_global_gc.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],

View file

@ -1,6 +1,8 @@
import ray
import numpy as np
import time
import ray
from ray.cluster_utils import Cluster
from ray.internal.internal_api import memory_summary
# Unique strings.
@ -189,6 +191,33 @@ def test_pinned_object_call_site(ray_start_regular):
assert num_objects(info) == 0, info
def test_multi_node_stats(shutdown_only):
cluster = Cluster()
for _ in range(2):
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)
@ray.remote(num_cpus=1)
class Actor:
def __init__(self):
self.ref = ray.put(np.zeros(100000))
def ping(self):
pass
# Each actor will be on a different node.
a = Actor.remote()
b = Actor.remote()
ray.get(a.ping.remote())
ray.get(b.ping.remote())
# Verify we have collected stats across the nodes.
info = memory_summary()
print(info)
assert count(info, PUT_OBJ) == 2, info
if __name__ == "__main__":
import pytest
import sys

View file

@ -3380,12 +3380,11 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_
}
}
std::string FormatMemoryInfo(
std::vector<std::shared_ptr<rpc::GetNodeStatsReply>> node_stats) {
std::string FormatMemoryInfo(std::vector<rpc::GetNodeStatsReply> node_stats) {
// First pass to compute object sizes.
absl::flat_hash_map<ObjectID, int64_t> object_sizes;
for (const auto &reply : node_stats) {
for (const auto &worker_stats : reply->workers_stats()) {
for (const auto &worker_stats : reply.workers_stats()) {
for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) {
auto obj_id = ObjectID::FromBinary(object_ref.object_id());
if (object_ref.object_size() > 0) {
@ -3408,7 +3407,7 @@ std::string FormatMemoryInfo(
// Second pass builds the summary string for each node.
for (const auto &reply : node_stats) {
for (const auto &worker_stats : reply->workers_stats()) {
for (const auto &worker_stats : reply.workers_stats()) {
bool pid_printed = false;
for (const auto &object_ref : worker_stats.core_worker_stats().object_refs()) {
if (!object_ref.pinned_in_memory() && object_ref.local_ref_count() == 0 &&
@ -3459,23 +3458,43 @@ std::string FormatMemoryInfo(
void NodeManager::HandleFormatGlobalMemoryInfo(
const rpc::FormatGlobalMemoryInfoRequest &request,
rpc::FormatGlobalMemoryInfoReply *reply, rpc::SendReplyCallback send_reply_callback) {
std::vector<std::shared_ptr<rpc::GetNodeStatsReply>> replies;
auto replies = std::make_shared<std::vector<rpc::GetNodeStatsReply>>();
auto local_request = std::make_shared<rpc::GetNodeStatsRequest>();
auto local_reply = std::make_shared<rpc::GetNodeStatsReply>();
local_request->set_include_memory_info(true);
// TODO(ekl): for (const auto &entry : remote_node_manager_clients_) {}
// to handle remote nodes
unsigned int num_nodes = remote_node_manager_clients_.size() + 1;
rpc::GetNodeStatsRequest stats_req;
stats_req.set_include_memory_info(true);
HandleGetNodeStats(*local_request, local_reply.get(),
[local_request, local_reply, replies, reply, send_reply_callback](
Status status, std::function<void()> success,
std::function<void()> failure) mutable {
replies.push_back(local_reply);
reply->set_memory_summary(FormatMemoryInfo(replies));
send_reply_callback(Status::OK(), nullptr, nullptr);
});
auto store_reply = [replies, reply, num_nodes,
send_reply_callback](const rpc::GetNodeStatsReply &local_reply) {
replies->push_back(local_reply);
if (replies->size() >= num_nodes) {
reply->set_memory_summary(FormatMemoryInfo(*replies));
send_reply_callback(Status::OK(), nullptr, nullptr);
}
};
// Fetch from remote nodes.
for (const auto &entry : remote_node_manager_clients_) {
entry.second->GetNodeStats(
stats_req, [replies, store_reply](const ray::Status &status,
const rpc::GetNodeStatsReply &r) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to get remote node stats: " << status.ToString();
}
store_reply(r);
});
}
// Fetch from the local node.
HandleGetNodeStats(
stats_req, local_reply.get(),
[local_reply, store_reply](Status status, std::function<void()> success,
std::function<void()> failure) mutable {
store_reply(*local_reply);
});
}
void NodeManager::HandleGlobalGC(const rpc::GlobalGCRequest &request,