From 09fd38ede18e403b5acf739ad7f88d50d02e5e24 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 1 Mar 2021 23:14:06 -0800 Subject: [PATCH] [Multi node shuffle] More efficient ray memory --stats-only (#14423) * Done. * Fix all the issues. --- dashboard/memory_utils.py | 37 +++++--------------- python/ray/internal/internal_api.py | 41 +++++++++------------- python/ray/scripts/scripts.py | 6 ++-- python/ray/tests/test_memstat.py | 54 +++++++++++------------------ src/ray/protobuf/node_manager.proto | 5 +++ src/ray/raylet/node_manager.cc | 13 ++++--- 6 files changed, 60 insertions(+), 96 deletions(-) diff --git a/dashboard/memory_utils.py b/dashboard/memory_utils.py index fed651b19..431088599 100644 --- a/dashboard/memory_utils.py +++ b/dashboard/memory_utils.py @@ -7,9 +7,7 @@ from typing import List import ray from ray._raylet import (TaskID, ActorID, JobID) -from ray.state import GlobalState -from ray.internal.internal_api import node_stats, store_stats_summary -from ray.ray_constants import REDIS_DEFAULT_PASSWORD +from ray.internal.internal_api import node_stats import logging logger = logging.getLogger(__name__) @@ -330,8 +328,7 @@ def construct_memory_table(workers_stats: List, return memory_table -def get_memory_summary(redis_address, redis_password, group_by, sort_by, - line_wrap, stats_only) -> str: +def get_memory_summary(group_by, sort_by, line_wrap) -> str: from ray.new_dashboard.modules.stats_collector.stats_collector_head\ import node_stats_to_dict @@ -341,13 +338,11 @@ def get_memory_summary(redis_address, redis_password, group_by, sort_by, line_wrap_threshold = 137 # Fetch core memory worker stats, store as a dictionary - state = GlobalState() - state._initialize_global_state(redis_address, redis_password) core_worker_stats = [] - for raylet in state.node_table(): + for raylet in ray.nodes(): stats = node_stats_to_dict( - node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"], - (not stats_only))) + node_stats(raylet["NodeManagerAddress"], + raylet["NodeManagerPort"])) core_worker_stats.extend(stats["coreWorkersStats"]) assert type(stats) is dict and "coreWorkersStats" in stats @@ -424,23 +419,7 @@ def get_memory_summary(redis_address, redis_password, group_by, sort_by, return mem -def get_store_stats_summary(redis_address, redis_password) -> str: - state = GlobalState() - state._initialize_global_state(redis_address, redis_password) - raylet = state.node_table()[0] - stats = node_stats(raylet["NodeManagerAddress"], raylet["NodeManagerPort"]) - store_summary = store_stats_summary(stats) - return store_summary - - -def memory_summary(redis_address, - redis_password=REDIS_DEFAULT_PASSWORD, - group_by="NODE_ADDRESS", +def memory_summary(group_by="NODE_ADDRESS", sort_by="OBJECT_SIZE", - line_wrap=True, - stats_only=False): - if stats_only: - return get_store_stats_summary(redis_address, redis_password) - return get_memory_summary(redis_address, redis_password, group_by, sort_by, - line_wrap, stats_only) + get_store_stats_summary( - redis_address, redis_password) + line_wrap=True): + return get_memory_summary(group_by, sort_by, line_wrap) diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index 36f5389cc..ac01e0be0 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -1,7 +1,6 @@ import ray import ray.worker from ray import profiling -from ray.ray_constants import REDIS_DEFAULT_PASSWORD __all__ = ["free", "global_gc"] MAX_MESSAGE_LENGTH = ray._config.max_grpc_message_size() @@ -14,20 +13,17 @@ def global_gc(): worker.core_worker.global_gc() -def memory_summary_wrapper(redis_address, - redis_password=REDIS_DEFAULT_PASSWORD, - group_by="NODE_ADDRESS", - sort_by="OBJECT_SIZE", - line_wrap=True, - stats_only=False): - from ray.new_dashboard.memory_utils import memory_summary - return memory_summary(redis_address, redis_password, group_by, sort_by, - line_wrap, stats_only) - - -def memory_summary(node_manager_address=None, - node_manager_port=None, +def memory_summary(group_by="NODE_ADDRESS", + sort_by="OBJECT_SIZE", + line_wrap=True, stats_only=False): + from ray.new_dashboard.memory_utils import memory_summary + if stats_only: + return get_store_stats() + return memory_summary(group_by, sort_by, line_wrap) + get_store_stats() + + +def get_store_stats(node_manager_address=None, node_manager_port=None): """Returns a formatted string describing memory usage in the cluster.""" import grpc @@ -52,10 +48,10 @@ def memory_summary(node_manager_address=None, ) stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) reply = stub.FormatGlobalMemoryInfo( - node_manager_pb2.FormatGlobalMemoryInfoRequest(), timeout=30.0) - if stats_only: - return store_stats_summary(reply) - return reply.memory_summary + "\n" + store_stats_summary(reply, stats_only) + node_manager_pb2.FormatGlobalMemoryInfoRequest( + include_memory_info=False), + timeout=30.0) + return store_stats_summary(reply) def node_stats(node_manager_address=None, @@ -68,13 +64,8 @@ def node_stats(node_manager_address=None, from ray.core.generated import node_manager_pb2_grpc # We can ask any Raylet for the global memory info. - if (node_manager_address is None or node_manager_port is None): - raylet = ray.nodes()[0] - raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], - raylet["NodeManagerPort"]) - else: - raylet_address = "{}:{}".format(node_manager_address, - node_manager_port) + assert (node_manager_address is not None and node_manager_port is not None) + raylet_address = "{}:{}".format(node_manager_address, node_manager_port) channel = grpc.insecure_channel( raylet_address, options=[ diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 996cc253b..9fe4802c7 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -26,7 +26,7 @@ from ray.autoscaler._private.constants import RAY_PROCESSES from ray.autoscaler._private.util import DEBUG_AUTOSCALING_ERROR, \ DEBUG_AUTOSCALING_STATUS -from ray.internal.internal_api import memory_summary_wrapper +from ray.internal.internal_api import memory_summary import ray.ray_constants as ray_constants import ray.utils @@ -1384,8 +1384,8 @@ def memory(address, redis_password, group_by, sort_by, no_format, stats_only): address = services.get_ray_address_to_use_or_die() time = datetime.now() header = "=" * 8 + f" Object references status: {time} " + "=" * 8 - mem_stats = memory_summary_wrapper(address, redis_password, group_by, - sort_by, no_format, stats_only) + mem_stats = memory_summary(address, redis_password, group_by, sort_by, + no_format, stats_only) print(f"{header}\n{mem_stats}") diff --git a/python/ray/tests/test_memstat.py b/python/ray/tests/test_memstat.py index 0edc958df..8e3b4ea8f 100644 --- a/python/ray/tests/test_memstat.py +++ b/python/ray/tests/test_memstat.py @@ -3,7 +3,7 @@ import time import ray from ray.cluster_utils import Cluster -from ray.new_dashboard.memory_utils import memory_summary +from ray.internal.internal_api import memory_summary # Unique strings. DRIVER_PID = "Driver" @@ -59,28 +59,25 @@ def count(memory_str, substr): def test_driver_put_ref(ray_start_regular): - address = ray_start_regular["redis_address"] - info = memory_summary(address) + info = memory_summary() assert num_objects(info) == 0, info x_id = ray.put("HI") - info = memory_summary(address) + info = memory_summary() print(info) assert num_objects(info) == 1, info assert count(info, DRIVER_PID) == 1, info assert count(info, WORKER_PID) == 0, info del x_id - info = memory_summary(address) + info = memory_summary() assert num_objects(info) == 0, info def test_worker_task_refs(ray_start_regular): - address = ray_start_regular["redis_address"] - @ray.remote def f(y): from ray.new_dashboard.memory_utils import memory_summary x_id = ray.put("HI") - info = memory_summary(address) + info = memory_summary() del x_id return info @@ -99,7 +96,7 @@ def test_worker_task_refs(ray_start_regular): assert count(info, UNKNOWN_SIZE) == 1, info print(ray_start_regular) - info = memory_summary(address) + info = memory_summary() print(info) assert num_objects(info) == 1, info assert count(info, DRIVER_PID) == 1, info @@ -108,13 +105,11 @@ def test_worker_task_refs(ray_start_regular): assert count(info, x_id.hex()) == 1, info del x_id - info = memory_summary(address) + info = memory_summary() assert num_objects(info) == 0, info def test_actor_task_refs(ray_start_regular): - address = ray_start_regular["redis_address"] - @ray.remote class Actor: def __init__(self): @@ -123,7 +118,7 @@ def test_actor_task_refs(ray_start_regular): def f(self, x): from ray.new_dashboard.memory_utils import memory_summary self.refs.append(x) - return memory_summary(address) + return memory_summary() def make_actor(): return Actor.remote() @@ -147,7 +142,7 @@ def test_actor_task_refs(ray_start_regular): # These should accumulate in the actor. for _ in range(5): ray.get(actor.f.remote([ray.put(np.zeros(100000))])) - info = memory_summary(address) + info = memory_summary() print(info) assert count(info, DESER_ACTOR_TASK_ARG) == 5, info assert count(info, ACTOR_TASK_CALL_OBJ) == 1, info @@ -155,17 +150,16 @@ def test_actor_task_refs(ray_start_regular): # Cleanup. del actor time.sleep(1) - info = memory_summary(address) + info = memory_summary() assert num_objects(info) == 0, info def test_nested_object_refs(ray_start_regular): - address = ray_start_regular["redis_address"] x_id = ray.put(np.zeros(100000)) y_id = ray.put([x_id]) z_id = ray.put([y_id]) del x_id, y_id - info = memory_summary(address) + info = memory_summary() print(info) assert num_objects(info) == 3, info assert count(info, LOCAL_REF) == 1, info @@ -174,10 +168,9 @@ def test_nested_object_refs(ray_start_regular): def test_pinned_object_call_site(ray_start_regular): - address = ray_start_regular["redis_address"] # Local ref only. x_id = ray.put(np.zeros(100000)) - info = memory_summary(address) + info = memory_summary() print(info) assert num_objects(info) == 1, info assert count(info, LOCAL_REF) == 1, info @@ -185,7 +178,7 @@ def test_pinned_object_call_site(ray_start_regular): # Local ref + pinned buffer. buf = ray.get(x_id) - info = memory_summary(address) + info = memory_summary() print(info) assert num_objects(info) == 1, info assert count(info, LOCAL_REF) == 0, info @@ -193,7 +186,7 @@ def test_pinned_object_call_site(ray_start_regular): # Just pinned buffer. del x_id - info = memory_summary(address) + info = memory_summary() print(info) assert num_objects(info) == 1, info assert count(info, LOCAL_REF) == 0, info @@ -201,7 +194,7 @@ def test_pinned_object_call_site(ray_start_regular): # Nothing. del buf - info = memory_summary(address) + info = memory_summary() print(info) assert num_objects(info) == 0, info @@ -228,24 +221,19 @@ def test_multi_node_stats(shutdown_only): ray.get(b.ping.remote()) # Verify we have collected stats across the nodes. - info = memory_summary(cluster.address) + info = memory_summary() print(info) assert count(info, PUT_OBJ) == 2, info def test_group_by_sort_by(ray_start_regular): - address = ray_start_regular["redis_address"] - @ray.remote def f(y): - from ray.new_dashboard.memory_utils import memory_summary x_id = ray.put("HI") info_a = memory_summary( - address, group_by="STACK_TRACE", sort_by="REFERENCE_TYPE") - info_b = memory_summary( - address, group_by="NODE_ADDRESS", sort_by="OBJECT_SIZE") - info_c = memory_summary( - address, group_by="NODE_ADDRESS", sort_by="PID") + group_by="STACK_TRACE", sort_by="REFERENCE_TYPE") + info_b = memory_summary(group_by="NODE_ADDRESS", sort_by="OBJECT_SIZE") + info_c = memory_summary(group_by="NODE_ADDRESS", sort_by="PID") del x_id return info_a, info_b, info_c @@ -261,12 +249,10 @@ def test_group_by_sort_by(ray_start_regular): def test_memory_used_output(ray_start_regular): - address = ray_start_regular["redis_address"] - import numpy as np _ = ray.put(np.ones(8 * 1024 * 1024, dtype=np.int8)) - info = memory_summary(address) + info = memory_summary() print(info) assert count(info, "Plasma memory usage 8 MiB") == 1, info assert count(info, "8388861 B") == 2, info diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 9273665f3..0bc425d62 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -160,6 +160,11 @@ message GlobalGCReply { // Accumulates memory info across all nodes. To access per-node memory info, // use GetNodeStats() calls instead. message FormatGlobalMemoryInfoRequest { + // Whether or not the reply should include memory summary. + // If it is true, it will add extra overhead to the system + // because getting memory info requires to ping every core worker + // in the cluster. + bool include_memory_info = 1; } message FormatGlobalMemoryInfoReply { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a17917906..32b5f8612 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2615,17 +2615,20 @@ void NodeManager::HandleFormatGlobalMemoryInfo( auto replies = std::make_shared>(); auto local_request = std::make_shared(); auto local_reply = std::make_shared(); - local_request->set_include_memory_info(true); + bool include_memory_info = request.include_memory_info(); + local_request->set_include_memory_info(include_memory_info); unsigned int num_nodes = remote_node_manager_addresses_.size() + 1; rpc::GetNodeStatsRequest stats_req; - stats_req.set_include_memory_info(true); + stats_req.set_include_memory_info(include_memory_info); - auto store_reply = [replies, reply, num_nodes, - send_reply_callback](const rpc::GetNodeStatsReply &local_reply) { + auto store_reply = [replies, reply, num_nodes, send_reply_callback, + include_memory_info](const rpc::GetNodeStatsReply &local_reply) { replies->push_back(local_reply); if (replies->size() >= num_nodes) { - reply->set_memory_summary(FormatMemoryInfo(*replies)); + if (include_memory_info) { + reply->set_memory_summary(FormatMemoryInfo(*replies)); + } reply->mutable_store_stats()->CopyFrom(AccumulateStoreStats(*replies)); send_reply_callback(Status::OK(), nullptr, nullptr); }