[Multi node shuffle] More efficient ray memory --stats-only (#14423)

* Done.

* Fix all the issues.
This commit is contained in:
SangBin Cho 2021-03-01 23:14:06 -08:00 committed by GitHub
parent 58c0959ea7
commit 09fd38ede1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 60 additions and 96 deletions

View file

@ -7,9 +7,7 @@ from typing import List
import ray
from ray._raylet import (TaskID, ActorID, JobID)
from ray.state import GlobalState
from ray.internal.internal_api import node_stats, store_stats_summary
from ray.ray_constants import REDIS_DEFAULT_PASSWORD
from ray.internal.internal_api import node_stats
import logging
logger = logging.getLogger(__name__)
@ -330,8 +328,7 @@ def construct_memory_table(workers_stats: List,
return memory_table
def get_memory_summary(redis_address, redis_password, group_by, sort_by,
line_wrap, stats_only) -> str:
def get_memory_summary(group_by, sort_by, line_wrap) -> str:
from ray.new_dashboard.modules.stats_collector.stats_collector_head\
import node_stats_to_dict
@ -341,13 +338,11 @@ def get_memory_summary(redis_address, redis_password, group_by, sort_by,
line_wrap_threshold = 137
# Fetch core memory worker stats, store as a dictionary
state = GlobalState()
state._initialize_global_state(redis_address, redis_password)
core_worker_stats = []
for raylet in state.node_table():
for raylet in ray.nodes():
stats = node_stats_to_dict(
node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"],
(not stats_only)))
node_stats(raylet["NodeManagerAddress"],
raylet["NodeManagerPort"]))
core_worker_stats.extend(stats["coreWorkersStats"])
assert type(stats) is dict and "coreWorkersStats" in stats
@ -424,23 +419,7 @@ def get_memory_summary(redis_address, redis_password, group_by, sort_by,
return mem
def get_store_stats_summary(redis_address, redis_password) -> str:
state = GlobalState()
state._initialize_global_state(redis_address, redis_password)
raylet = state.node_table()[0]
stats = node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"])
store_summary = store_stats_summary(stats)
return store_summary
def memory_summary(redis_address,
redis_password=REDIS_DEFAULT_PASSWORD,
group_by="NODE_ADDRESS",
def memory_summary(group_by="NODE_ADDRESS",
sort_by="OBJECT_SIZE",
line_wrap=True,
stats_only=False):
if stats_only:
return get_store_stats_summary(redis_address, redis_password)
return get_memory_summary(redis_address, redis_password, group_by, sort_by,
line_wrap, stats_only) + get_store_stats_summary(
redis_address, redis_password)
line_wrap=True):
return get_memory_summary(group_by, sort_by, line_wrap)

View file

@ -1,7 +1,6 @@
import ray
import ray.worker
from ray import profiling
from ray.ray_constants import REDIS_DEFAULT_PASSWORD
__all__ = ["free", "global_gc"]
MAX_MESSAGE_LENGTH = ray._config.max_grpc_message_size()
@ -14,20 +13,17 @@ def global_gc():
worker.core_worker.global_gc()
def memory_summary_wrapper(redis_address,
redis_password=REDIS_DEFAULT_PASSWORD,
group_by="NODE_ADDRESS",
sort_by="OBJECT_SIZE",
line_wrap=True,
stats_only=False):
from ray.new_dashboard.memory_utils import memory_summary
return memory_summary(redis_address, redis_password, group_by, sort_by,
line_wrap, stats_only)
def memory_summary(node_manager_address=None,
node_manager_port=None,
def memory_summary(group_by="NODE_ADDRESS",
sort_by="OBJECT_SIZE",
line_wrap=True,
stats_only=False):
from ray.new_dashboard.memory_utils import memory_summary
if stats_only:
return get_store_stats()
return memory_summary(group_by, sort_by, line_wrap) + get_store_stats()
def get_store_stats(node_manager_address=None, node_manager_port=None):
"""Returns a formatted string describing memory usage in the cluster."""
import grpc
@ -52,10 +48,10 @@ def memory_summary(node_manager_address=None,
)
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
reply = stub.FormatGlobalMemoryInfo(
node_manager_pb2.FormatGlobalMemoryInfoRequest(), timeout=30.0)
if stats_only:
return store_stats_summary(reply)
return reply.memory_summary + "\n" + store_stats_summary(reply, stats_only)
node_manager_pb2.FormatGlobalMemoryInfoRequest(
include_memory_info=False),
timeout=30.0)
return store_stats_summary(reply)
def node_stats(node_manager_address=None,
@ -68,13 +64,8 @@ def node_stats(node_manager_address=None,
from ray.core.generated import node_manager_pb2_grpc
# We can ask any Raylet for the global memory info.
if (node_manager_address is None or node_manager_port is None):
raylet = ray.nodes()[0]
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
raylet["NodeManagerPort"])
else:
raylet_address = "{}:{}".format(node_manager_address,
node_manager_port)
assert (node_manager_address is not None and node_manager_port is not None)
raylet_address = "{}:{}".format(node_manager_address, node_manager_port)
channel = grpc.insecure_channel(
raylet_address,
options=[

View file

@ -26,7 +26,7 @@ from ray.autoscaler._private.constants import RAY_PROCESSES
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_ERROR, \
DEBUG_AUTOSCALING_STATUS
from ray.internal.internal_api import memory_summary_wrapper
from ray.internal.internal_api import memory_summary
import ray.ray_constants as ray_constants
import ray.utils
@ -1384,8 +1384,8 @@ def memory(address, redis_password, group_by, sort_by, no_format, stats_only):
address = services.get_ray_address_to_use_or_die()
time = datetime.now()
header = "=" * 8 + f" Object references status: {time} " + "=" * 8
mem_stats = memory_summary_wrapper(address, redis_password, group_by,
sort_by, no_format, stats_only)
mem_stats = memory_summary(address, redis_password, group_by, sort_by,
no_format, stats_only)
print(f"{header}\n{mem_stats}")

View file

@ -3,7 +3,7 @@ import time
import ray
from ray.cluster_utils import Cluster
from ray.new_dashboard.memory_utils import memory_summary
from ray.internal.internal_api import memory_summary
# Unique strings.
DRIVER_PID = "Driver"
@ -59,28 +59,25 @@ def count(memory_str, substr):
def test_driver_put_ref(ray_start_regular):
address = ray_start_regular["redis_address"]
info = memory_summary(address)
info = memory_summary()
assert num_objects(info) == 0, info
x_id = ray.put("HI")
info = memory_summary(address)
info = memory_summary()
print(info)
assert num_objects(info) == 1, info
assert count(info, DRIVER_PID) == 1, info
assert count(info, WORKER_PID) == 0, info
del x_id
info = memory_summary(address)
info = memory_summary()
assert num_objects(info) == 0, info
def test_worker_task_refs(ray_start_regular):
address = ray_start_regular["redis_address"]
@ray.remote
def f(y):
from ray.new_dashboard.memory_utils import memory_summary
x_id = ray.put("HI")
info = memory_summary(address)
info = memory_summary()
del x_id
return info
@ -99,7 +96,7 @@ def test_worker_task_refs(ray_start_regular):
assert count(info, UNKNOWN_SIZE) == 1, info
print(ray_start_regular)
info = memory_summary(address)
info = memory_summary()
print(info)
assert num_objects(info) == 1, info
assert count(info, DRIVER_PID) == 1, info
@ -108,13 +105,11 @@ def test_worker_task_refs(ray_start_regular):
assert count(info, x_id.hex()) == 1, info
del x_id
info = memory_summary(address)
info = memory_summary()
assert num_objects(info) == 0, info
def test_actor_task_refs(ray_start_regular):
address = ray_start_regular["redis_address"]
@ray.remote
class Actor:
def __init__(self):
@ -123,7 +118,7 @@ def test_actor_task_refs(ray_start_regular):
def f(self, x):
from ray.new_dashboard.memory_utils import memory_summary
self.refs.append(x)
return memory_summary(address)
return memory_summary()
def make_actor():
return Actor.remote()
@ -147,7 +142,7 @@ def test_actor_task_refs(ray_start_regular):
# These should accumulate in the actor.
for _ in range(5):
ray.get(actor.f.remote([ray.put(np.zeros(100000))]))
info = memory_summary(address)
info = memory_summary()
print(info)
assert count(info, DESER_ACTOR_TASK_ARG) == 5, info
assert count(info, ACTOR_TASK_CALL_OBJ) == 1, info
@ -155,17 +150,16 @@ def test_actor_task_refs(ray_start_regular):
# Cleanup.
del actor
time.sleep(1)
info = memory_summary(address)
info = memory_summary()
assert num_objects(info) == 0, info
def test_nested_object_refs(ray_start_regular):
address = ray_start_regular["redis_address"]
x_id = ray.put(np.zeros(100000))
y_id = ray.put([x_id])
z_id = ray.put([y_id])
del x_id, y_id
info = memory_summary(address)
info = memory_summary()
print(info)
assert num_objects(info) == 3, info
assert count(info, LOCAL_REF) == 1, info
@ -174,10 +168,9 @@ def test_nested_object_refs(ray_start_regular):
def test_pinned_object_call_site(ray_start_regular):
address = ray_start_regular["redis_address"]
# Local ref only.
x_id = ray.put(np.zeros(100000))
info = memory_summary(address)
info = memory_summary()
print(info)
assert num_objects(info) == 1, info
assert count(info, LOCAL_REF) == 1, info
@ -185,7 +178,7 @@ def test_pinned_object_call_site(ray_start_regular):
# Local ref + pinned buffer.
buf = ray.get(x_id)
info = memory_summary(address)
info = memory_summary()
print(info)
assert num_objects(info) == 1, info
assert count(info, LOCAL_REF) == 0, info
@ -193,7 +186,7 @@ def test_pinned_object_call_site(ray_start_regular):
# Just pinned buffer.
del x_id
info = memory_summary(address)
info = memory_summary()
print(info)
assert num_objects(info) == 1, info
assert count(info, LOCAL_REF) == 0, info
@ -201,7 +194,7 @@ def test_pinned_object_call_site(ray_start_regular):
# Nothing.
del buf
info = memory_summary(address)
info = memory_summary()
print(info)
assert num_objects(info) == 0, info
@ -228,24 +221,19 @@ def test_multi_node_stats(shutdown_only):
ray.get(b.ping.remote())
# Verify we have collected stats across the nodes.
info = memory_summary(cluster.address)
info = memory_summary()
print(info)
assert count(info, PUT_OBJ) == 2, info
def test_group_by_sort_by(ray_start_regular):
address = ray_start_regular["redis_address"]
@ray.remote
def f(y):
from ray.new_dashboard.memory_utils import memory_summary
x_id = ray.put("HI")
info_a = memory_summary(
address, group_by="STACK_TRACE", sort_by="REFERENCE_TYPE")
info_b = memory_summary(
address, group_by="NODE_ADDRESS", sort_by="OBJECT_SIZE")
info_c = memory_summary(
address, group_by="NODE_ADDRESS", sort_by="PID")
group_by="STACK_TRACE", sort_by="REFERENCE_TYPE")
info_b = memory_summary(group_by="NODE_ADDRESS", sort_by="OBJECT_SIZE")
info_c = memory_summary(group_by="NODE_ADDRESS", sort_by="PID")
del x_id
return info_a, info_b, info_c
@ -261,12 +249,10 @@ def test_group_by_sort_by(ray_start_regular):
def test_memory_used_output(ray_start_regular):
address = ray_start_regular["redis_address"]
import numpy as np
_ = ray.put(np.ones(8 * 1024 * 1024, dtype=np.int8))
info = memory_summary(address)
info = memory_summary()
print(info)
assert count(info, "Plasma memory usage 8 MiB") == 1, info
assert count(info, "8388861 B") == 2, info

View file

@ -160,6 +160,11 @@ message GlobalGCReply {
// Accumulates memory info across all nodes. To access per-node memory info,
// use GetNodeStats() calls instead.
message FormatGlobalMemoryInfoRequest {
// Whether or not the reply should include memory summary.
// If it is true, it will add extra overhead to the system
// because getting memory info requires to ping every core worker
// in the cluster.
bool include_memory_info = 1;
}
message FormatGlobalMemoryInfoReply {

View file

@ -2615,17 +2615,20 @@ void NodeManager::HandleFormatGlobalMemoryInfo(
auto replies = std::make_shared<std::vector<rpc::GetNodeStatsReply>>();
auto local_request = std::make_shared<rpc::GetNodeStatsRequest>();
auto local_reply = std::make_shared<rpc::GetNodeStatsReply>();
local_request->set_include_memory_info(true);
bool include_memory_info = request.include_memory_info();
local_request->set_include_memory_info(include_memory_info);
unsigned int num_nodes = remote_node_manager_addresses_.size() + 1;
rpc::GetNodeStatsRequest stats_req;
stats_req.set_include_memory_info(true);
stats_req.set_include_memory_info(include_memory_info);
auto store_reply = [replies, reply, num_nodes,
send_reply_callback](const rpc::GetNodeStatsReply &local_reply) {
auto store_reply = [replies, reply, num_nodes, send_reply_callback,
include_memory_info](const rpc::GetNodeStatsReply &local_reply) {
replies->push_back(local_reply);
if (replies->size() >= num_nodes) {
reply->set_memory_summary(FormatMemoryInfo(*replies));
if (include_memory_info) {
reply->set_memory_summary(FormatMemoryInfo(*replies));
}
reply->mutable_store_stats()->CopyFrom(AccumulateStoreStats(*replies));
send_reply_callback(Status::OK(), nullptr, nullptr);
}