mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[GCS] Remove unused ReportBatchHeartbeat/SubscribeHeartbeat (#11567)
* Remove unused message ReportBatchHeartbeat * add up
This commit is contained in:
parent
11f1bbf03c
commit
0fbee4da0c
8 changed files with 0 additions and 105 deletions
|
@ -574,24 +574,6 @@ class NodeInfoAccessor {
|
|||
/// Resend heartbeat when GCS restarts from a failure.
|
||||
virtual void AsyncReReportHeartbeat() = 0;
|
||||
|
||||
/// Subscribe to the heartbeat of each node from GCS.
|
||||
///
|
||||
/// \param subscribe Callback that will be called each time when heartbeat is updated.
|
||||
/// \param done Callback that will be called when subscription is complete.
|
||||
/// \return Status
|
||||
virtual Status AsyncSubscribeHeartbeat(
|
||||
const SubscribeCallback<NodeID, rpc::HeartbeatTableData> &subscribe,
|
||||
const StatusCallback &done) = 0;
|
||||
|
||||
/// Report state of all nodes to GCS asynchronously.
|
||||
///
|
||||
/// \param data_ptr The heartbeats that will be reported to GCS.
|
||||
/// \param callback Callback that will be called after report finishes.
|
||||
/// \return Status
|
||||
virtual Status AsyncReportBatchHeartbeat(
|
||||
const std::shared_ptr<rpc::HeartbeatBatchTableData> &data_ptr,
|
||||
const StatusCallback &callback) = 0;
|
||||
|
||||
/// Subscribe batched state of all nodes from GCS.
|
||||
///
|
||||
/// \param subscribe Callback that will be called each time when batch heartbeat is
|
||||
|
|
|
@ -706,24 +706,6 @@ void ServiceBasedNodeInfoAccessor::AsyncReReportHeartbeat() {
|
|||
}
|
||||
}
|
||||
|
||||
Status ServiceBasedNodeInfoAccessor::AsyncSubscribeHeartbeat(
|
||||
const SubscribeCallback<NodeID, rpc::HeartbeatTableData> &subscribe,
|
||||
const StatusCallback &done) {
|
||||
const std::string error_msg =
|
||||
"Unsupported method of AsyncSubscribeHeartbeat in ServiceBasedNodeInfoAccessor.";
|
||||
RAY_LOG(FATAL) << error_msg;
|
||||
return Status::Invalid(error_msg);
|
||||
}
|
||||
|
||||
Status ServiceBasedNodeInfoAccessor::AsyncReportBatchHeartbeat(
|
||||
const std::shared_ptr<rpc::HeartbeatBatchTableData> &data_ptr,
|
||||
const StatusCallback &callback) {
|
||||
const std::string error_msg =
|
||||
"Unsupported method of AsyncReportBatchHeartbeat in ServiceBasedNodeInfoAccessor.";
|
||||
RAY_LOG(FATAL) << error_msg;
|
||||
return Status::Invalid(error_msg);
|
||||
}
|
||||
|
||||
Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
|
||||
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
|
||||
const StatusCallback &done) {
|
||||
|
|
|
@ -193,14 +193,6 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
|
|||
|
||||
void AsyncReReportHeartbeat() override;
|
||||
|
||||
Status AsyncSubscribeHeartbeat(
|
||||
const SubscribeCallback<NodeID, rpc::HeartbeatTableData> &subscribe,
|
||||
const StatusCallback &done) override;
|
||||
|
||||
Status AsyncReportBatchHeartbeat(
|
||||
const std::shared_ptr<rpc::HeartbeatBatchTableData> &data_ptr,
|
||||
const StatusCallback &callback) override;
|
||||
|
||||
Status AsyncSubscribeBatchHeartbeat(
|
||||
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
|
||||
const StatusCallback &done) override;
|
||||
|
|
|
@ -380,21 +380,6 @@ struct GcsServerMocker {
|
|||
return Status::NotImplemented("");
|
||||
}
|
||||
|
||||
Status AsyncSubscribeHeartbeat(
|
||||
const gcs::SubscribeCallback<NodeID, rpc::HeartbeatTableData> &subscribe,
|
||||
const gcs::StatusCallback &done) override {
|
||||
return Status::NotImplemented("");
|
||||
}
|
||||
|
||||
Status AsyncReportBatchHeartbeat(
|
||||
const std::shared_ptr<rpc::HeartbeatBatchTableData> &data_ptr,
|
||||
const gcs::StatusCallback &callback) override {
|
||||
if (callback) {
|
||||
callback(Status::OK());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status AsyncSubscribeBatchHeartbeat(
|
||||
const gcs::ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
|
||||
const gcs::StatusCallback &done) override {
|
||||
|
|
|
@ -495,7 +495,6 @@ Status RedisObjectInfoAccessor::AsyncUnsubscribeToLocations(const ObjectID &obje
|
|||
RedisNodeInfoAccessor::RedisNodeInfoAccessor(RedisGcsClient *client_impl)
|
||||
: client_impl_(client_impl),
|
||||
resource_sub_executor_(client_impl_->resource_table()),
|
||||
heartbeat_sub_executor_(client_impl->heartbeat_table()),
|
||||
heartbeat_batch_sub_executor_(client_impl->heartbeat_batch_table()) {}
|
||||
|
||||
Status RedisNodeInfoAccessor::RegisterSelf(const GcsNodeInfo &local_node_info) {
|
||||
|
@ -600,30 +599,6 @@ Status RedisNodeInfoAccessor::AsyncReportHeartbeat(
|
|||
|
||||
void RedisNodeInfoAccessor::AsyncReReportHeartbeat() {}
|
||||
|
||||
Status RedisNodeInfoAccessor::AsyncSubscribeHeartbeat(
|
||||
const SubscribeCallback<NodeID, HeartbeatTableData> &subscribe,
|
||||
const StatusCallback &done) {
|
||||
RAY_CHECK(subscribe != nullptr);
|
||||
auto on_subscribe = [subscribe](const NodeID &node_id, const HeartbeatTableData &data) {
|
||||
subscribe(node_id, data);
|
||||
};
|
||||
|
||||
return heartbeat_sub_executor_.AsyncSubscribeAll(NodeID::Nil(), on_subscribe, done);
|
||||
}
|
||||
|
||||
Status RedisNodeInfoAccessor::AsyncReportBatchHeartbeat(
|
||||
const std::shared_ptr<HeartbeatBatchTableData> &data_ptr,
|
||||
const StatusCallback &callback) {
|
||||
HeartbeatBatchTable::WriteCallback on_done = nullptr;
|
||||
if (callback != nullptr) {
|
||||
on_done = [callback](RedisGcsClient *client, const NodeID &node_id,
|
||||
const HeartbeatBatchTableData &data) { callback(Status::OK()); };
|
||||
}
|
||||
|
||||
HeartbeatBatchTable &hb_batch_table = client_impl_->heartbeat_batch_table();
|
||||
return hb_batch_table.Add(JobID::Nil(), NodeID::Nil(), data_ptr, on_done);
|
||||
}
|
||||
|
||||
Status RedisNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
|
||||
const ItemCallback<HeartbeatBatchTableData> &subscribe, const StatusCallback &done) {
|
||||
RAY_CHECK(subscribe != nullptr);
|
||||
|
|
|
@ -369,14 +369,6 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
|
|||
|
||||
void AsyncReReportHeartbeat() override;
|
||||
|
||||
Status AsyncSubscribeHeartbeat(
|
||||
const SubscribeCallback<NodeID, HeartbeatTableData> &subscribe,
|
||||
const StatusCallback &done) override;
|
||||
|
||||
Status AsyncReportBatchHeartbeat(
|
||||
const std::shared_ptr<HeartbeatBatchTableData> &data_ptr,
|
||||
const StatusCallback &callback) override;
|
||||
|
||||
Status AsyncSubscribeBatchHeartbeat(
|
||||
const ItemCallback<HeartbeatBatchTableData> &subscribe,
|
||||
const StatusCallback &done) override;
|
||||
|
@ -401,10 +393,6 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
|
|||
DynamicResourceSubscriptionExecutor;
|
||||
DynamicResourceSubscriptionExecutor resource_sub_executor_;
|
||||
|
||||
typedef SubscriptionExecutor<NodeID, HeartbeatTableData, HeartbeatTable>
|
||||
HeartbeatSubscriptionExecutor;
|
||||
HeartbeatSubscriptionExecutor heartbeat_sub_executor_;
|
||||
|
||||
typedef SubscriptionExecutor<NodeID, HeartbeatBatchTableData, HeartbeatBatchTable>
|
||||
HeartbeatBatchSubscriptionExecutor;
|
||||
HeartbeatBatchSubscriptionExecutor heartbeat_batch_sub_executor_;
|
||||
|
|
|
@ -207,7 +207,6 @@ template class SubscriptionExecutor<TaskID, boost::optional<TaskLeaseData>,
|
|||
TaskLeaseTable>;
|
||||
template class SubscriptionExecutor<NodeID, ResourceChangeNotification,
|
||||
DynamicResourceTable>;
|
||||
template class SubscriptionExecutor<NodeID, HeartbeatTableData, HeartbeatTable>;
|
||||
template class SubscriptionExecutor<NodeID, HeartbeatBatchTableData, HeartbeatBatchTable>;
|
||||
template class SubscriptionExecutor<WorkerID, WorkerTableData, WorkerTable>;
|
||||
|
||||
|
|
|
@ -198,14 +198,6 @@ message ReportHeartbeatReply {
|
|||
GcsStatus status = 1;
|
||||
}
|
||||
|
||||
message ReportBatchHeartbeatRequest {
|
||||
HeartbeatBatchTableData heartbeat_batch = 1;
|
||||
}
|
||||
|
||||
message ReportBatchHeartbeatReply {
|
||||
GcsStatus status = 1;
|
||||
}
|
||||
|
||||
message GetResourcesRequest {
|
||||
bytes node_id = 1;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue