diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 246e07ec9..a2fd66228 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -39,7 +39,7 @@ RAY_CONFIG(int64_t, raylet_heartbeat_timeout_milliseconds, 100) /// Whether to send heartbeat lightly. When it is enalbed, only changed part, /// like should_global_gc or changed resources, will be included in the heartbeat, /// and gcs only broadcast the changed heartbeat. -RAY_CONFIG(bool, light_heartbeat_enabled, true) +RAY_CONFIG(bool, light_heartbeat_enabled, false) /// If a component has not sent a heartbeat in the last num_heartbeats_timeout /// heartbeat intervals, the raylet monitor process will report /// it as dead to the db_client table. diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index d88c0afbe..cf23df8a6 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -344,7 +344,7 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re if (!node_heartbeats_.empty()) { auto batch = std::make_shared(); absl::flat_hash_map aggregate_load; - for (const auto &heartbeat : node_heartbeats_) { + for (auto &heartbeat : node_heartbeats_) { // Aggregate the load reported by each raylet. auto load = heartbeat.second.resource_load_by_shape(); for (const auto &demand : load.resource_demands()) { @@ -361,13 +361,14 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re demand.backlog_size()); } } + heartbeat.second.clear_resource_load_by_shape(); - batch->add_batch()->CopyFrom(heartbeat.second); + batch->add_batch()->Swap(&heartbeat.second); } - for (const auto &demand : aggregate_load) { + for (auto &demand : aggregate_load) { auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands(); - demand_proto->CopyFrom(demand.second); + demand_proto->Swap(&demand.second); for (const auto &resource_pair : demand.first.GetResourceMap()) { (*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second; } @@ -405,8 +406,6 @@ void GcsNodeManager::UpdateNodeHeartbeat(const NodeID node_id, if (request.heartbeat().resource_load_changed()) { (*iter->second.mutable_resource_load()) = request.heartbeat().resource_load(); } - (*iter->second.mutable_resource_load_by_shape()) = - request.heartbeat().resource_load_by_shape(); } } @@ -525,7 +524,6 @@ void GcsNodeManager::StartNodeFailureDetector() { void GcsNodeManager::UpdateNodeRealtimeResources( const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat) { if (!RayConfig::instance().light_heartbeat_enabled() || - cluster_realtime_resources_.count(node_id) == 0 || heartbeat.resources_available_changed()) { auto resources_available = MapFromProtobuf(heartbeat.resources_available()); cluster_realtime_resources_[node_id] = @@ -558,6 +556,7 @@ void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr node) void GcsNodeManager::SendBatchedHeartbeat() { if (!heartbeat_buffer_.empty()) { auto batch = std::make_shared(); + std::unordered_map aggregate_load; for (auto &heartbeat : heartbeat_buffer_) { batch->add_batch()->Swap(&heartbeat.second); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index ebc66aa21..753e56cb9 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -483,12 +483,10 @@ void NodeManager::Heartbeat() { } } - if (!new_scheduler_enabled_) { - // Add resource load by shape. This will be used by the new autoscaler. - auto resource_load = local_queues_.GetResourceLoadByShape( - RayConfig::instance().max_resource_shapes_per_load_report()); - heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load); - } + // Add resource load by shape. This will be used by the new autoscaler. + auto resource_load = local_queues_.GetResourceLoadByShape( + RayConfig::instance().max_resource_shapes_per_load_report()); + heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load); // Set the global gc bit on the outgoing heartbeat message. if (should_global_gc_) { @@ -886,7 +884,7 @@ void NodeManager::HeartbeatAdded(const NodeID &client_id, ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total())); remote_resources.SetTotalResources(std::move(remote_total)); } - if (heartbeat_data.resources_available_changed()) { + if (heartbeat_data.resource_load_changed()) { ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available())); remote_resources.SetAvailableResources(std::move(remote_available)); } diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h index 9c769ecf4..1d1f8c592 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.h +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -150,10 +150,6 @@ class TaskResourceInstances { /// Total and available capacities of each resource of a node. class NodeResources { public: - NodeResources() {} - NodeResources(const NodeResources &other) - : predefined_resources(other.predefined_resources), - custom_resources(other.custom_resources) {} /// Available and total capacities for predefined resources. std::vector predefined_resources; /// Map containing custom resources. The key of each entry represents the diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 86ffb2e82..3646fabae 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -804,16 +804,17 @@ void ClusterResourceScheduler::FreeLocalTaskResources( } void ClusterResourceScheduler::Heartbeat( - bool light_heartbeat_enabled, std::shared_ptr heartbeat_data) { + bool light_heartbeat_enabled, + std::shared_ptr heartbeat_data) const { NodeResources resources; RAY_CHECK(GetNodeResources(local_node_id_, &resources)) << "Error: Populating heartbeat failed. Please file a bug report: " "https://github.com/ray-project/ray/issues/new."; - if (light_heartbeat_enabled && last_report_resources_ && - resources == *last_report_resources_.get()) { - return; + if (light_heartbeat_enabled) { + // TODO + RAY_CHECK(false) << "TODO"; } else { for (int i = 0; i < PredefinedResources_MAX; i++) { const auto &label = ResourceEnumToString((PredefinedResources)i); @@ -839,10 +840,6 @@ void ClusterResourceScheduler::Heartbeat( (*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double(); } } - heartbeat_data->set_resources_available_changed(true); - if (light_heartbeat_enabled) { - last_report_resources_.reset(new NodeResources(resources)); - } } } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 15375bd10..c71859f3e 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -49,8 +49,6 @@ class ClusterResourceScheduler { /// Keep the mapping between node and resource IDs in string representation /// to integer representation. Used for improving map performance. StringIdMap string_to_int_map_; - /// Cached resources, used to compare with newest one in light heartbeat mode. - std::unique_ptr last_report_resources_; /// Set predefined resources. /// @@ -384,7 +382,8 @@ class ClusterResourceScheduler { /// \param light_heartbeat_enabled Only send changed fields if true. /// \param Output parameter. `resources_available` and `resources_total` are the only /// fields used. - void Heartbeat(bool light_heartbeat_enabled, std::shared_ptr data); + void Heartbeat(bool light_heartbeat_enabled, + std::shared_ptr data) const; /// Return human-readable string for this scheduler state. std::string DebugString() const; diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 482cf6eb4..e0487cbdd 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -229,59 +229,60 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { void ClusterTaskManager::Heartbeat(bool light_heartbeat_enabled, std::shared_ptr data) const { - // TODO (WangTao): Find a way to check if load changed and combine it with light - // heartbeat. Now we just report it every time. - data->set_resource_load_changed(true); auto resource_loads = data->mutable_resource_load(); auto resource_load_by_shape = data->mutable_resource_load_by_shape()->mutable_resource_demands(); - // TODO (Alex): Implement the 1-CPU task optimization. - for (const auto &pair : tasks_to_schedule_) { - const auto &scheduling_class = pair.first; - const auto &resources = - TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) - .GetResourceMap(); - const auto &queue = pair.second; - const auto &count = queue.size(); + if (light_heartbeat_enabled) { + RAY_CHECK(false) << "TODO"; + } else { + // TODO (Alex): Implement the 1-CPU task optimization. + for (const auto &pair : tasks_to_schedule_) { + const auto &scheduling_class = pair.first; + const auto &resources = + TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) + .GetResourceMap(); + const auto &queue = pair.second; + const auto &count = queue.size(); - auto by_shape_entry = resource_load_by_shape->Add(); + auto by_shape_entry = resource_load_by_shape->Add(); - for (const auto &resource : resources) { - // Add to `resource_loads`. - const auto &label = resource.first; - const auto &quantity = resource.second; - (*resource_loads)[label] += quantity * count; + for (const auto &resource : resources) { + // Add to `resource_loads`. + const auto &label = resource.first; + const auto &quantity = resource.second; + (*resource_loads)[label] += quantity * count; - // Add to `resource_load_by_shape`. - (*by_shape_entry->mutable_shape())[label] = quantity; - // TODO (Alex): Technically being on `tasks_to_schedule` could also mean - // that the entire cluster is utilized. - by_shape_entry->set_num_infeasible_requests_queued(count); + // Add to `resource_load_by_shape`. + (*by_shape_entry->mutable_shape())[label] = quantity; + // TODO (Alex): Technically being on `tasks_to_schedule` could also mean + // that the entire cluster is utilized. + by_shape_entry->set_num_infeasible_requests_queued(count); + } } - } - for (const auto &pair : tasks_to_dispatch_) { - const auto &scheduling_class = pair.first; - const auto &resources = - TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) - .GetResourceMap(); - const auto &queue = pair.second; - const auto &count = queue.size(); + for (const auto &pair : tasks_to_dispatch_) { + const auto &scheduling_class = pair.first; + const auto &resources = + TaskSpecification::GetSchedulingClassDescriptor(scheduling_class) + .GetResourceMap(); + const auto &queue = pair.second; + const auto &count = queue.size(); - auto by_shape_entry = resource_load_by_shape->Add(); + auto by_shape_entry = resource_load_by_shape->Add(); - for (const auto &resource : resources) { - // Add to `resource_loads`. - const auto &label = resource.first; - const auto &quantity = resource.second; - (*resource_loads)[label] += quantity * count; + for (const auto &resource : resources) { + // Add to `resource_loads`. + const auto &label = resource.first; + const auto &quantity = resource.second; + (*resource_loads)[label] += quantity * count; - // Add to `resource_load_by_shape`. - (*by_shape_entry->mutable_shape())[label] = quantity; - // TODO (Alex): Technically being on `tasks_to_schedule` could also mean - // that the entire cluster is utilized. - by_shape_entry->set_num_ready_requests_queued(count); + // Add to `resource_load_by_shape`. + (*by_shape_entry->mutable_shape())[label] = quantity; + // TODO (Alex): Technically being on `tasks_to_schedule` could also mean + // that the entire cluster is utilized. + by_shape_entry->set_num_ready_requests_queued(count); + } } } }