From c41976938d363b59f6c32931d65dbd0808e297ec Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Tue, 26 May 2020 14:42:48 +0800 Subject: [PATCH] Add node table subscribe retry when gcs service restart (#8591) --- src/ray/gcs/accessor.h | 6 + .../gcs/gcs_client/service_based_accessor.cc | 105 +++++++++++------- .../gcs/gcs_client/service_based_accessor.h | 8 ++ .../gcs_client/service_based_gcs_client.cc | 1 + .../test/service_based_gcs_client_test.cc | 56 ++++++++-- .../gcs_server/test/gcs_server_test_util.h | 2 + src/ray/gcs/redis_accessor.h | 4 + 7 files changed, 135 insertions(+), 47 deletions(-) diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 192da9b00..11335f4bd 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -542,6 +542,12 @@ class NodeInfoAccessor { const ItemCallback &subscribe, const StatusCallback &done) = 0; + /// Reestablish subscription. + /// This should be called when GCS server restarts from a failure. + /// + /// \return Status + virtual Status AsyncReSubscribe() = 0; + protected: NodeInfoAccessor() = default; }; diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index b6676f452..9853d0464 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -514,31 +514,35 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange( RAY_CHECK(node_change_callback_ == nullptr); node_change_callback_ = subscribe; - auto on_subscribe = [this](const std::string &id, const std::string &data) { - GcsNodeInfo node_info; - node_info.ParseFromString(data); - HandleNotification(node_info); - }; - - auto on_done = [this, subscribe, done](const Status &status) { - // Get nodes from GCS Service. - auto callback = [this, subscribe, done]( - const Status &status, - const std::vector &node_info_list) { - for (auto &node_info : node_info_list) { - HandleNotification(node_info); - } - if (done) { - done(status); - } + RAY_CHECK(subscribe != nullptr); + subscribe_node_operation_ = [this, subscribe](const StatusCallback &done) { + auto on_subscribe = [this](const std::string &id, const std::string &data) { + GcsNodeInfo node_info; + node_info.ParseFromString(data); + HandleNotification(node_info); }; - RAY_CHECK_OK(AsyncGetAll(callback)); - }; - auto status = - client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe, on_done); - RAY_LOG(DEBUG) << "Finished subscribing node change."; - return status; + auto on_done = [this, subscribe, done](const Status &status) { + // Get nodes from GCS Service. + auto callback = [this, subscribe, done]( + const Status &status, + const std::vector &node_info_list) { + for (auto &node_info : node_info_list) { + HandleNotification(node_info); + } + if (done) { + done(status); + } + }; + RAY_CHECK_OK(AsyncGetAll(callback)); + }; + + auto status = + client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe, on_done); + RAY_LOG(DEBUG) << "Finished subscribing node change."; + return status; + }; + return subscribe_node_operation_(done); } boost::optional ServiceBasedNodeInfoAccessor::Get( @@ -641,16 +645,19 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToResources( RAY_LOG(DEBUG) << "Subscribing node resources change."; RAY_CHECK(subscribe != nullptr); - auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { - rpc::NodeResourceChange node_resource_change; - node_resource_change.ParseFromString(data); - subscribe(node_resource_change); - }; + subscribe_resource_operation_ = [this, subscribe](const StatusCallback &done) { + auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { + rpc::NodeResourceChange node_resource_change; + node_resource_change.ParseFromString(data); + subscribe(node_resource_change); + }; - auto status = client_impl_->GetGcsPubSub().SubscribeAll(NODE_RESOURCE_CHANNEL, - on_subscribe, done); - RAY_LOG(DEBUG) << "Finished subscribing node resources change."; - return status; + auto status = client_impl_->GetGcsPubSub().SubscribeAll(NODE_RESOURCE_CHANNEL, + on_subscribe, done); + RAY_LOG(DEBUG) << "Finished subscribing node resources change."; + return status; + }; + return subscribe_resource_operation_(done); } Status ServiceBasedNodeInfoAccessor::AsyncReportHeartbeat( @@ -690,15 +697,19 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat( const StatusCallback &done) { RAY_LOG(DEBUG) << "Subscribing batch heartbeat."; RAY_CHECK(subscribe != nullptr); - auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { - rpc::HeartbeatBatchTableData heartbeat_batch_table_data; - heartbeat_batch_table_data.ParseFromString(data); - subscribe(heartbeat_batch_table_data); + + subscribe_batch_heartbeat_operation_ = [this, subscribe](const StatusCallback &done) { + auto on_subscribe = [subscribe](const std::string &id, const std::string &data) { + rpc::HeartbeatBatchTableData heartbeat_batch_table_data; + heartbeat_batch_table_data.ParseFromString(data); + subscribe(heartbeat_batch_table_data); + }; + auto status = client_impl_->GetGcsPubSub().Subscribe( + HEARTBEAT_BATCH_CHANNEL, ClientID::Nil().Hex(), on_subscribe, done); + RAY_LOG(DEBUG) << "Finished subscribing batch heartbeat."; + return status; }; - auto status = client_impl_->GetGcsPubSub().Subscribe( - HEARTBEAT_BATCH_CHANNEL, ClientID::Nil().Hex(), on_subscribe, done); - RAY_LOG(DEBUG) << "Finished subscribing batch heartbeat."; - return status; + return subscribe_batch_heartbeat_operation_(done); } void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) { @@ -740,6 +751,20 @@ void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_in } } +Status ServiceBasedNodeInfoAccessor::AsyncReSubscribe() { + RAY_LOG(INFO) << "Reestablishing subscription for node info."; + if (subscribe_node_operation_ != nullptr) { + return subscribe_node_operation_(nullptr); + } + if (subscribe_resource_operation_ != nullptr) { + return subscribe_resource_operation_(nullptr); + } + if (subscribe_batch_heartbeat_operation_ != nullptr) { + return subscribe_batch_heartbeat_operation_(nullptr); + } + return Status::OK(); +} + ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor( ServiceBasedGcsClient *client_impl) : client_impl_(client_impl) {} diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index ddd2b8d7d..4948ae882 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -185,7 +185,15 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor { const ItemCallback &subscribe, const StatusCallback &done) override; + Status AsyncReSubscribe() override; + private: + /// Save the subscribe operation in this function, so we can call it again when GCS + /// restarts from a failure. + SubscribeOperation subscribe_node_operation_; + SubscribeOperation subscribe_resource_operation_; + SubscribeOperation subscribe_batch_heartbeat_operation_; + void HandleNotification(const GcsNodeInfo &node_info); ServiceBasedGcsClient *client_impl_; diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.cc b/src/ray/gcs/gcs_client/service_based_gcs_client.cc index 388e939fd..b5d4c82ca 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.cc +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.cc @@ -50,6 +50,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) { auto re_subscribe = [this]() { RAY_CHECK_OK(job_accessor_->AsyncReSubscribe()); RAY_CHECK_OK(actor_accessor_->AsyncReSubscribe()); + RAY_CHECK_OK(node_accessor_->AsyncReSubscribe()); }; // Connect to gcs service. diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 7da0b376b..5782f3a9f 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -260,9 +260,12 @@ class ServiceBasedGcsClientTest : public RedisServiceManagerForTest { return resource_map; } - bool UpdateResources(const ClientID &node_id, - const gcs::NodeInfoAccessor::ResourceMap &resource_map) { + bool UpdateResources(const ClientID &node_id, const std::string &key) { std::promise promise; + gcs::NodeInfoAccessor::ResourceMap resource_map; + auto resource = std::make_shared(); + resource->set_resource_capacity(1.0); + resource_map[key] = resource; RAY_CHECK_OK(gcs_client_->Nodes().AsyncUpdateResources( node_id, resource_map, [&promise](Status status) { promise.set_value(status.ok()); })); @@ -639,12 +642,8 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeResources) { // Update resources of node in GCS. ClientID node_id = ClientID::FromBinary(node_info->node_id()); - gcs::NodeInfoAccessor::ResourceMap resource_map; std::string key = "CPU"; - auto resource = std::make_shared(); - resource->set_resource_capacity(1.0); - resource_map[key] = resource; - ASSERT_TRUE(UpdateResources(node_id, resource_map)); + ASSERT_TRUE(UpdateResources(node_id, key)); WaitPendingDone(add_count, 1); ASSERT_TRUE(GetResources(node_id).count(key)); @@ -832,6 +831,7 @@ TEST_F(ServiceBasedGcsClientTest, TestJobTableReSubscribe) { } TEST_F(ServiceBasedGcsClientTest, TestActorTableReSubscribe) { + // Test that subscription of the actor table can still work when GCS server restarts. JobID job_id = JobID::FromInt(1); auto actor1_table_data = Mocker::GenActorTableData(job_id); auto actor1_id = ActorID::FromBinary(actor1_table_data->actor_id()); @@ -875,6 +875,48 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableReSubscribe) { WaitPendingDone(actor2_update_count, 1); } +TEST_F(ServiceBasedGcsClientTest, TestNodeTableReSubscribe) { + // Test that subscription of the node table can still work when GCS server restarts. + // Subscribe to node addition and removal events from GCS and cache those information. + std::atomic node_change_count(0); + auto node_subscribe = [&node_change_count](const ClientID &id, + const rpc::GcsNodeInfo &result) { + ++node_change_count; + }; + ASSERT_TRUE(SubscribeToNodeChange(node_subscribe)); + + // Subscribe to node resource changes. + std::atomic resource_change_count(0); + auto resource_subscribe = + [&resource_change_count](const rpc::NodeResourceChange &result) { + ++resource_change_count; + }; + ASSERT_TRUE(SubscribeToResources(resource_subscribe)); + + // Subscribe batched state of all nodes from GCS. + std::atomic batch_heartbeat_count(0); + auto batch_heartbeat_subscribe = + [&batch_heartbeat_count](const rpc::HeartbeatBatchTableData &result) { + ++batch_heartbeat_count; + }; + ASSERT_TRUE(SubscribeBatchHeartbeat(batch_heartbeat_subscribe)); + + RestartGcsServer(); + + auto node_info = Mocker::GenNodeInfo(1); + ASSERT_TRUE(RegisterNode(*node_info)); + ClientID node_id = ClientID::FromBinary(node_info->node_id()); + std::string key = "CPU"; + ASSERT_TRUE(UpdateResources(node_id, key)); + auto heartbeat = std::make_shared(); + heartbeat->set_client_id(node_info->node_id()); + ASSERT_TRUE(ReportHeartbeat(heartbeat)); + + WaitPendingDone(node_change_count, 1); + WaitPendingDone(resource_change_count, 1); + WaitPendingDone(batch_heartbeat_count, 1); +} + TEST_F(ServiceBasedGcsClientTest, TestGcsRedisFailureDetector) { // Stop redis. TearDownTestCase(); diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 79f105940..a6a65d1d6 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -350,6 +350,8 @@ struct GcsServerMocker { const gcs::StatusCallback &done) override { return Status::NotImplemented(""); } + + Status AsyncReSubscribe() override { return Status::NotImplemented(""); } }; class MockedErrorInfoAccessor : public gcs::ErrorInfoAccessor { diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index f1d9dbd78..cf0ab23d5 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -371,6 +371,10 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor { const ItemCallback &subscribe, const StatusCallback &done) override; + Status AsyncReSubscribe() override { + return Status::NotImplemented("AsyncReSubscribe not implemented"); + } + private: RedisGcsClient *client_impl_{nullptr};