Revert "[GCS]Open light heartbeat by default (#11689)" (#11861)

This reverts commit 612ddb2dd1.
This commit is contained in:
SangBin Cho 2020-11-06 14:34:59 -08:00 committed by GitHub
parent 871cde989a
commit e0ecf5d79d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 61 additions and 71 deletions

View file

@ -39,7 +39,7 @@ RAY_CONFIG(int64_t, raylet_heartbeat_timeout_milliseconds, 100)
/// Whether to send heartbeat lightly. When it is enalbed, only changed part,
/// like should_global_gc or changed resources, will be included in the heartbeat,
/// and gcs only broadcast the changed heartbeat.
RAY_CONFIG(bool, light_heartbeat_enabled, true)
RAY_CONFIG(bool, light_heartbeat_enabled, false)
/// If a component has not sent a heartbeat in the last num_heartbeats_timeout
/// heartbeat intervals, the raylet monitor process will report
/// it as dead to the db_client table.

View file

@ -344,7 +344,7 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re
if (!node_heartbeats_.empty()) {
auto batch = std::make_shared<rpc::HeartbeatBatchTableData>();
absl::flat_hash_map<ResourceSet, rpc::ResourceDemand> aggregate_load;
for (const auto &heartbeat : node_heartbeats_) {
for (auto &heartbeat : node_heartbeats_) {
// Aggregate the load reported by each raylet.
auto load = heartbeat.second.resource_load_by_shape();
for (const auto &demand : load.resource_demands()) {
@ -361,13 +361,14 @@ void GcsNodeManager::HandleGetAllHeartbeat(const rpc::GetAllHeartbeatRequest &re
demand.backlog_size());
}
}
heartbeat.second.clear_resource_load_by_shape();
batch->add_batch()->CopyFrom(heartbeat.second);
batch->add_batch()->Swap(&heartbeat.second);
}
for (const auto &demand : aggregate_load) {
for (auto &demand : aggregate_load) {
auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands();
demand_proto->CopyFrom(demand.second);
demand_proto->Swap(&demand.second);
for (const auto &resource_pair : demand.first.GetResourceMap()) {
(*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second;
}
@ -405,8 +406,6 @@ void GcsNodeManager::UpdateNodeHeartbeat(const NodeID node_id,
if (request.heartbeat().resource_load_changed()) {
(*iter->second.mutable_resource_load()) = request.heartbeat().resource_load();
}
(*iter->second.mutable_resource_load_by_shape()) =
request.heartbeat().resource_load_by_shape();
}
}
@ -525,7 +524,6 @@ void GcsNodeManager::StartNodeFailureDetector() {
void GcsNodeManager::UpdateNodeRealtimeResources(
const NodeID &node_id, const rpc::HeartbeatTableData &heartbeat) {
if (!RayConfig::instance().light_heartbeat_enabled() ||
cluster_realtime_resources_.count(node_id) == 0 ||
heartbeat.resources_available_changed()) {
auto resources_available = MapFromProtobuf(heartbeat.resources_available());
cluster_realtime_resources_[node_id] =
@ -558,6 +556,7 @@ void GcsNodeManager::AddDeadNodeToCache(std::shared_ptr<rpc::GcsNodeInfo> node)
void GcsNodeManager::SendBatchedHeartbeat() {
if (!heartbeat_buffer_.empty()) {
auto batch = std::make_shared<rpc::HeartbeatBatchTableData>();
std::unordered_map<ResourceSet, rpc::ResourceDemand> aggregate_load;
for (auto &heartbeat : heartbeat_buffer_) {
batch->add_batch()->Swap(&heartbeat.second);
}

View file

@ -483,12 +483,10 @@ void NodeManager::Heartbeat() {
}
}
if (!new_scheduler_enabled_) {
// Add resource load by shape. This will be used by the new autoscaler.
auto resource_load = local_queues_.GetResourceLoadByShape(
RayConfig::instance().max_resource_shapes_per_load_report());
heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load);
}
// Add resource load by shape. This will be used by the new autoscaler.
auto resource_load = local_queues_.GetResourceLoadByShape(
RayConfig::instance().max_resource_shapes_per_load_report());
heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load);
// Set the global gc bit on the outgoing heartbeat message.
if (should_global_gc_) {
@ -886,7 +884,7 @@ void NodeManager::HeartbeatAdded(const NodeID &client_id,
ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total()));
remote_resources.SetTotalResources(std::move(remote_total));
}
if (heartbeat_data.resources_available_changed()) {
if (heartbeat_data.resource_load_changed()) {
ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available()));
remote_resources.SetAvailableResources(std::move(remote_available));
}

View file

@ -150,10 +150,6 @@ class TaskResourceInstances {
/// Total and available capacities of each resource of a node.
class NodeResources {
public:
NodeResources() {}
NodeResources(const NodeResources &other)
: predefined_resources(other.predefined_resources),
custom_resources(other.custom_resources) {}
/// Available and total capacities for predefined resources.
std::vector<ResourceCapacity> predefined_resources;
/// Map containing custom resources. The key of each entry represents the

View file

@ -804,16 +804,17 @@ void ClusterResourceScheduler::FreeLocalTaskResources(
}
void ClusterResourceScheduler::Heartbeat(
bool light_heartbeat_enabled, std::shared_ptr<HeartbeatTableData> heartbeat_data) {
bool light_heartbeat_enabled,
std::shared_ptr<HeartbeatTableData> heartbeat_data) const {
NodeResources resources;
RAY_CHECK(GetNodeResources(local_node_id_, &resources))
<< "Error: Populating heartbeat failed. Please file a bug report: "
"https://github.com/ray-project/ray/issues/new.";
if (light_heartbeat_enabled && last_report_resources_ &&
resources == *last_report_resources_.get()) {
return;
if (light_heartbeat_enabled) {
// TODO
RAY_CHECK(false) << "TODO";
} else {
for (int i = 0; i < PredefinedResources_MAX; i++) {
const auto &label = ResourceEnumToString((PredefinedResources)i);
@ -839,10 +840,6 @@ void ClusterResourceScheduler::Heartbeat(
(*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double();
}
}
heartbeat_data->set_resources_available_changed(true);
if (light_heartbeat_enabled) {
last_report_resources_.reset(new NodeResources(resources));
}
}
}

View file

@ -49,8 +49,6 @@ class ClusterResourceScheduler {
/// Keep the mapping between node and resource IDs in string representation
/// to integer representation. Used for improving map performance.
StringIdMap string_to_int_map_;
/// Cached resources, used to compare with newest one in light heartbeat mode.
std::unique_ptr<NodeResources> last_report_resources_;
/// Set predefined resources.
///
@ -384,7 +382,8 @@ class ClusterResourceScheduler {
/// \param light_heartbeat_enabled Only send changed fields if true.
/// \param Output parameter. `resources_available` and `resources_total` are the only
/// fields used.
void Heartbeat(bool light_heartbeat_enabled, std::shared_ptr<HeartbeatTableData> data);
void Heartbeat(bool light_heartbeat_enabled,
std::shared_ptr<HeartbeatTableData> data) const;
/// Return human-readable string for this scheduler state.
std::string DebugString() const;

View file

@ -229,59 +229,60 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
void ClusterTaskManager::Heartbeat(bool light_heartbeat_enabled,
std::shared_ptr<HeartbeatTableData> data) const {
// TODO (WangTao): Find a way to check if load changed and combine it with light
// heartbeat. Now we just report it every time.
data->set_resource_load_changed(true);
auto resource_loads = data->mutable_resource_load();
auto resource_load_by_shape =
data->mutable_resource_load_by_shape()->mutable_resource_demands();
// TODO (Alex): Implement the 1-CPU task optimization.
for (const auto &pair : tasks_to_schedule_) {
const auto &scheduling_class = pair.first;
const auto &resources =
TaskSpecification::GetSchedulingClassDescriptor(scheduling_class)
.GetResourceMap();
const auto &queue = pair.second;
const auto &count = queue.size();
if (light_heartbeat_enabled) {
RAY_CHECK(false) << "TODO";
} else {
// TODO (Alex): Implement the 1-CPU task optimization.
for (const auto &pair : tasks_to_schedule_) {
const auto &scheduling_class = pair.first;
const auto &resources =
TaskSpecification::GetSchedulingClassDescriptor(scheduling_class)
.GetResourceMap();
const auto &queue = pair.second;
const auto &count = queue.size();
auto by_shape_entry = resource_load_by_shape->Add();
auto by_shape_entry = resource_load_by_shape->Add();
for (const auto &resource : resources) {
// Add to `resource_loads`.
const auto &label = resource.first;
const auto &quantity = resource.second;
(*resource_loads)[label] += quantity * count;
for (const auto &resource : resources) {
// Add to `resource_loads`.
const auto &label = resource.first;
const auto &quantity = resource.second;
(*resource_loads)[label] += quantity * count;
// Add to `resource_load_by_shape`.
(*by_shape_entry->mutable_shape())[label] = quantity;
// TODO (Alex): Technically being on `tasks_to_schedule` could also mean
// that the entire cluster is utilized.
by_shape_entry->set_num_infeasible_requests_queued(count);
// Add to `resource_load_by_shape`.
(*by_shape_entry->mutable_shape())[label] = quantity;
// TODO (Alex): Technically being on `tasks_to_schedule` could also mean
// that the entire cluster is utilized.
by_shape_entry->set_num_infeasible_requests_queued(count);
}
}
}
for (const auto &pair : tasks_to_dispatch_) {
const auto &scheduling_class = pair.first;
const auto &resources =
TaskSpecification::GetSchedulingClassDescriptor(scheduling_class)
.GetResourceMap();
const auto &queue = pair.second;
const auto &count = queue.size();
for (const auto &pair : tasks_to_dispatch_) {
const auto &scheduling_class = pair.first;
const auto &resources =
TaskSpecification::GetSchedulingClassDescriptor(scheduling_class)
.GetResourceMap();
const auto &queue = pair.second;
const auto &count = queue.size();
auto by_shape_entry = resource_load_by_shape->Add();
auto by_shape_entry = resource_load_by_shape->Add();
for (const auto &resource : resources) {
// Add to `resource_loads`.
const auto &label = resource.first;
const auto &quantity = resource.second;
(*resource_loads)[label] += quantity * count;
for (const auto &resource : resources) {
// Add to `resource_loads`.
const auto &label = resource.first;
const auto &quantity = resource.second;
(*resource_loads)[label] += quantity * count;
// Add to `resource_load_by_shape`.
(*by_shape_entry->mutable_shape())[label] = quantity;
// TODO (Alex): Technically being on `tasks_to_schedule` could also mean
// that the entire cluster is utilized.
by_shape_entry->set_num_ready_requests_queued(count);
// Add to `resource_load_by_shape`.
(*by_shape_entry->mutable_shape())[label] = quantity;
// TODO (Alex): Technically being on `tasks_to_schedule` could also mean
// that the entire cluster is utilized.
by_shape_entry->set_num_ready_requests_queued(count);
}
}
}
}