mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Add node table subscribe retry when gcs service restart (#8591)
This commit is contained in:
parent
7e5b3dc0d9
commit
c41976938d
7 changed files with 135 additions and 47 deletions
|
@ -542,6 +542,12 @@ class NodeInfoAccessor {
|
|||
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
|
||||
const StatusCallback &done) = 0;
|
||||
|
||||
/// Reestablish subscription.
|
||||
/// This should be called when GCS server restarts from a failure.
|
||||
///
|
||||
/// \return Status
|
||||
virtual Status AsyncReSubscribe() = 0;
|
||||
|
||||
protected:
|
||||
NodeInfoAccessor() = default;
|
||||
};
|
||||
|
|
|
@ -514,31 +514,35 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange(
|
|||
RAY_CHECK(node_change_callback_ == nullptr);
|
||||
node_change_callback_ = subscribe;
|
||||
|
||||
auto on_subscribe = [this](const std::string &id, const std::string &data) {
|
||||
GcsNodeInfo node_info;
|
||||
node_info.ParseFromString(data);
|
||||
HandleNotification(node_info);
|
||||
};
|
||||
|
||||
auto on_done = [this, subscribe, done](const Status &status) {
|
||||
// Get nodes from GCS Service.
|
||||
auto callback = [this, subscribe, done](
|
||||
const Status &status,
|
||||
const std::vector<GcsNodeInfo> &node_info_list) {
|
||||
for (auto &node_info : node_info_list) {
|
||||
HandleNotification(node_info);
|
||||
}
|
||||
if (done) {
|
||||
done(status);
|
||||
}
|
||||
RAY_CHECK(subscribe != nullptr);
|
||||
subscribe_node_operation_ = [this, subscribe](const StatusCallback &done) {
|
||||
auto on_subscribe = [this](const std::string &id, const std::string &data) {
|
||||
GcsNodeInfo node_info;
|
||||
node_info.ParseFromString(data);
|
||||
HandleNotification(node_info);
|
||||
};
|
||||
RAY_CHECK_OK(AsyncGetAll(callback));
|
||||
};
|
||||
|
||||
auto status =
|
||||
client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe, on_done);
|
||||
RAY_LOG(DEBUG) << "Finished subscribing node change.";
|
||||
return status;
|
||||
auto on_done = [this, subscribe, done](const Status &status) {
|
||||
// Get nodes from GCS Service.
|
||||
auto callback = [this, subscribe, done](
|
||||
const Status &status,
|
||||
const std::vector<GcsNodeInfo> &node_info_list) {
|
||||
for (auto &node_info : node_info_list) {
|
||||
HandleNotification(node_info);
|
||||
}
|
||||
if (done) {
|
||||
done(status);
|
||||
}
|
||||
};
|
||||
RAY_CHECK_OK(AsyncGetAll(callback));
|
||||
};
|
||||
|
||||
auto status =
|
||||
client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe, on_done);
|
||||
RAY_LOG(DEBUG) << "Finished subscribing node change.";
|
||||
return status;
|
||||
};
|
||||
return subscribe_node_operation_(done);
|
||||
}
|
||||
|
||||
boost::optional<GcsNodeInfo> ServiceBasedNodeInfoAccessor::Get(
|
||||
|
@ -641,16 +645,19 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources(
|
|||
RAY_LOG(DEBUG) << "Subscribing node resources change.";
|
||||
RAY_CHECK(subscribe != nullptr);
|
||||
|
||||
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
|
||||
rpc::NodeResourceChange node_resource_change;
|
||||
node_resource_change.ParseFromString(data);
|
||||
subscribe(node_resource_change);
|
||||
};
|
||||
subscribe_resource_operation_ = [this, subscribe](const StatusCallback &done) {
|
||||
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
|
||||
rpc::NodeResourceChange node_resource_change;
|
||||
node_resource_change.ParseFromString(data);
|
||||
subscribe(node_resource_change);
|
||||
};
|
||||
|
||||
auto status = client_impl_->GetGcsPubSub().SubscribeAll(NODE_RESOURCE_CHANNEL,
|
||||
on_subscribe, done);
|
||||
RAY_LOG(DEBUG) << "Finished subscribing node resources change.";
|
||||
return status;
|
||||
auto status = client_impl_->GetGcsPubSub().SubscribeAll(NODE_RESOURCE_CHANNEL,
|
||||
on_subscribe, done);
|
||||
RAY_LOG(DEBUG) << "Finished subscribing node resources change.";
|
||||
return status;
|
||||
};
|
||||
return subscribe_resource_operation_(done);
|
||||
}
|
||||
|
||||
Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat(
|
||||
|
@ -690,15 +697,19 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
|
|||
const StatusCallback &done) {
|
||||
RAY_LOG(DEBUG) << "Subscribing batch heartbeat.";
|
||||
RAY_CHECK(subscribe != nullptr);
|
||||
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
|
||||
rpc::HeartbeatBatchTableData heartbeat_batch_table_data;
|
||||
heartbeat_batch_table_data.ParseFromString(data);
|
||||
subscribe(heartbeat_batch_table_data);
|
||||
|
||||
subscribe_batch_heartbeat_operation_ = [this, subscribe](const StatusCallback &done) {
|
||||
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
|
||||
rpc::HeartbeatBatchTableData heartbeat_batch_table_data;
|
||||
heartbeat_batch_table_data.ParseFromString(data);
|
||||
subscribe(heartbeat_batch_table_data);
|
||||
};
|
||||
auto status = client_impl_->GetGcsPubSub().Subscribe(
|
||||
HEARTBEAT_BATCH_CHANNEL, ClientID::Nil().Hex(), on_subscribe, done);
|
||||
RAY_LOG(DEBUG) << "Finished subscribing batch heartbeat.";
|
||||
return status;
|
||||
};
|
||||
auto status = client_impl_->GetGcsPubSub().Subscribe(
|
||||
HEARTBEAT_BATCH_CHANNEL, ClientID::Nil().Hex(), on_subscribe, done);
|
||||
RAY_LOG(DEBUG) << "Finished subscribing batch heartbeat.";
|
||||
return status;
|
||||
return subscribe_batch_heartbeat_operation_(done);
|
||||
}
|
||||
|
||||
void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) {
|
||||
|
@ -740,6 +751,20 @@ void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_in
|
|||
}
|
||||
}
|
||||
|
||||
Status ServiceBasedNodeInfoAccessor::AsyncReSubscribe() {
|
||||
RAY_LOG(INFO) << "Reestablishing subscription for node info.";
|
||||
if (subscribe_node_operation_ != nullptr) {
|
||||
return subscribe_node_operation_(nullptr);
|
||||
}
|
||||
if (subscribe_resource_operation_ != nullptr) {
|
||||
return subscribe_resource_operation_(nullptr);
|
||||
}
|
||||
if (subscribe_batch_heartbeat_operation_ != nullptr) {
|
||||
return subscribe_batch_heartbeat_operation_(nullptr);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor(
|
||||
ServiceBasedGcsClient *client_impl)
|
||||
: client_impl_(client_impl) {}
|
||||
|
|
|
@ -185,7 +185,15 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
|
|||
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
|
||||
const StatusCallback &done) override;
|
||||
|
||||
Status AsyncReSubscribe() override;
|
||||
|
||||
private:
|
||||
/// Save the subscribe operation in this function, so we can call it again when GCS
|
||||
/// restarts from a failure.
|
||||
SubscribeOperation subscribe_node_operation_;
|
||||
SubscribeOperation subscribe_resource_operation_;
|
||||
SubscribeOperation subscribe_batch_heartbeat_operation_;
|
||||
|
||||
void HandleNotification(const GcsNodeInfo &node_info);
|
||||
|
||||
ServiceBasedGcsClient *client_impl_;
|
||||
|
|
|
@ -50,6 +50,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
|
|||
auto re_subscribe = [this]() {
|
||||
RAY_CHECK_OK(job_accessor_->AsyncReSubscribe());
|
||||
RAY_CHECK_OK(actor_accessor_->AsyncReSubscribe());
|
||||
RAY_CHECK_OK(node_accessor_->AsyncReSubscribe());
|
||||
};
|
||||
|
||||
// Connect to gcs service.
|
||||
|
|
|
@ -260,9 +260,12 @@ class ServiceBasedGcsClientTest : public RedisServiceManagerForTest {
|
|||
return resource_map;
|
||||
}
|
||||
|
||||
bool UpdateResources(const ClientID &node_id,
|
||||
const gcs::NodeInfoAccessor::ResourceMap &resource_map) {
|
||||
bool UpdateResources(const ClientID &node_id, const std::string &key) {
|
||||
std::promise<bool> promise;
|
||||
gcs::NodeInfoAccessor::ResourceMap resource_map;
|
||||
auto resource = std::make_shared<rpc::ResourceTableData>();
|
||||
resource->set_resource_capacity(1.0);
|
||||
resource_map[key] = resource;
|
||||
RAY_CHECK_OK(gcs_client_->Nodes().AsyncUpdateResources(
|
||||
node_id, resource_map,
|
||||
[&promise](Status status) { promise.set_value(status.ok()); }));
|
||||
|
@ -639,12 +642,8 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeResources) {
|
|||
|
||||
// Update resources of node in GCS.
|
||||
ClientID node_id = ClientID::FromBinary(node_info->node_id());
|
||||
gcs::NodeInfoAccessor::ResourceMap resource_map;
|
||||
std::string key = "CPU";
|
||||
auto resource = std::make_shared<rpc::ResourceTableData>();
|
||||
resource->set_resource_capacity(1.0);
|
||||
resource_map[key] = resource;
|
||||
ASSERT_TRUE(UpdateResources(node_id, resource_map));
|
||||
ASSERT_TRUE(UpdateResources(node_id, key));
|
||||
WaitPendingDone(add_count, 1);
|
||||
ASSERT_TRUE(GetResources(node_id).count(key));
|
||||
|
||||
|
@ -832,6 +831,7 @@ TEST_F(ServiceBasedGcsClientTest, TestJobTableReSubscribe) {
|
|||
}
|
||||
|
||||
TEST_F(ServiceBasedGcsClientTest, TestActorTableReSubscribe) {
|
||||
// Test that subscription of the actor table can still work when GCS server restarts.
|
||||
JobID job_id = JobID::FromInt(1);
|
||||
auto actor1_table_data = Mocker::GenActorTableData(job_id);
|
||||
auto actor1_id = ActorID::FromBinary(actor1_table_data->actor_id());
|
||||
|
@ -875,6 +875,48 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableReSubscribe) {
|
|||
WaitPendingDone(actor2_update_count, 1);
|
||||
}
|
||||
|
||||
TEST_F(ServiceBasedGcsClientTest, TestNodeTableReSubscribe) {
|
||||
// Test that subscription of the node table can still work when GCS server restarts.
|
||||
// Subscribe to node addition and removal events from GCS and cache those information.
|
||||
std::atomic<int> node_change_count(0);
|
||||
auto node_subscribe = [&node_change_count](const ClientID &id,
|
||||
const rpc::GcsNodeInfo &result) {
|
||||
++node_change_count;
|
||||
};
|
||||
ASSERT_TRUE(SubscribeToNodeChange(node_subscribe));
|
||||
|
||||
// Subscribe to node resource changes.
|
||||
std::atomic<int> resource_change_count(0);
|
||||
auto resource_subscribe =
|
||||
[&resource_change_count](const rpc::NodeResourceChange &result) {
|
||||
++resource_change_count;
|
||||
};
|
||||
ASSERT_TRUE(SubscribeToResources(resource_subscribe));
|
||||
|
||||
// Subscribe batched state of all nodes from GCS.
|
||||
std::atomic<int> batch_heartbeat_count(0);
|
||||
auto batch_heartbeat_subscribe =
|
||||
[&batch_heartbeat_count](const rpc::HeartbeatBatchTableData &result) {
|
||||
++batch_heartbeat_count;
|
||||
};
|
||||
ASSERT_TRUE(SubscribeBatchHeartbeat(batch_heartbeat_subscribe));
|
||||
|
||||
RestartGcsServer();
|
||||
|
||||
auto node_info = Mocker::GenNodeInfo(1);
|
||||
ASSERT_TRUE(RegisterNode(*node_info));
|
||||
ClientID node_id = ClientID::FromBinary(node_info->node_id());
|
||||
std::string key = "CPU";
|
||||
ASSERT_TRUE(UpdateResources(node_id, key));
|
||||
auto heartbeat = std::make_shared<rpc::HeartbeatTableData>();
|
||||
heartbeat->set_client_id(node_info->node_id());
|
||||
ASSERT_TRUE(ReportHeartbeat(heartbeat));
|
||||
|
||||
WaitPendingDone(node_change_count, 1);
|
||||
WaitPendingDone(resource_change_count, 1);
|
||||
WaitPendingDone(batch_heartbeat_count, 1);
|
||||
}
|
||||
|
||||
TEST_F(ServiceBasedGcsClientTest, TestGcsRedisFailureDetector) {
|
||||
// Stop redis.
|
||||
TearDownTestCase();
|
||||
|
|
|
@ -350,6 +350,8 @@ struct GcsServerMocker {
|
|||
const gcs::StatusCallback &done) override {
|
||||
return Status::NotImplemented("");
|
||||
}
|
||||
|
||||
Status AsyncReSubscribe() override { return Status::NotImplemented(""); }
|
||||
};
|
||||
|
||||
class MockedErrorInfoAccessor : public gcs::ErrorInfoAccessor {
|
||||
|
|
|
@ -371,6 +371,10 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
|
|||
const ItemCallback<HeartbeatBatchTableData> &subscribe,
|
||||
const StatusCallback &done) override;
|
||||
|
||||
Status AsyncReSubscribe() override {
|
||||
return Status::NotImplemented("AsyncReSubscribe not implemented");
|
||||
}
|
||||
|
||||
private:
|
||||
RedisGcsClient *client_impl_{nullptr};
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue