diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index 3dcd64c1e..7522039ec 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -7,6 +7,7 @@ import time import ray import ray.ray_constants +import ray.services import ray.test_utils from ray._raylet import GlobalStateAccessor @@ -332,6 +333,31 @@ def test_backlog_report(shutdown_only): global_state_accessor.disconnect() +def test_heartbeat_ip(shutdown_only): + cluster = ray.init( + num_cpus=1, _system_config={ + "report_worker_backlog": True, + }) + global_state_accessor = GlobalStateAccessor( + cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD) + global_state_accessor.connect() + + self_ip = ray.services.get_node_ip_address() + + def self_ip_is_set(): + message = global_state_accessor.get_all_resource_usage() + if message is None: + return False + + resource_usage = ray.gcs_utils.ResourceUsageBatchData.FromString( + message) + resources_data = resource_usage.batch[0] + return resources_data.node_manager_address == self_ip + + ray.test_utils.wait_for_condition(self_ip_is_set, timeout=2) + global_state_accessor.disconnect() + + if __name__ == "__main__": import pytest import sys diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 902c29cb7..a56bffbe1 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -327,6 +327,8 @@ message ResourcesData { ResourceLoad resource_load_by_shape = 7; // Whether this node manager is requesting global GC. bool should_global_gc = 8; + // IP address of the node. + string node_manager_address = 9; } message ResourceUsageBatchData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index cbe287ef7..e784758b1 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -456,6 +456,7 @@ void NodeManager::Heartbeat() { void NodeManager::ReportResourceUsage() { auto resources_data = std::make_shared(); resources_data->set_node_id(self_node_id_.Binary()); + resources_data->set_node_manager_address(initial_config_.node_manager_address); // Update local chche from gcs remote cache, this is needed when gcs restart. // We should always keep the cache view consistent. cluster_resource_scheduler_->UpdateLastResourceUsage(