From 0cee75c86a8599955b7cfdc7fa3a0dfc591b3de6 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Mon, 20 Jul 2020 10:16:42 +0800 Subject: [PATCH] GCS client add fetch operation before subscribe (#9564) --- src/ray/gcs/gcs_client/service_based_accessor.cc | 15 ++++++++++++--- .../test/service_based_gcs_client_test.cc | 9 +++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 3cf17b91b..16140672f 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -89,9 +89,18 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeAll( void ServiceBasedJobInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { RAY_LOG(INFO) << "Reestablishing subscription for job info."; - // If the pub-sub server has restarted, we need to resubscribe to the pub-sub server. - if (subscribe_operation_ != nullptr && is_pubsub_server_restarted) { - RAY_CHECK_OK(subscribe_operation_(nullptr)); + // If only the GCS sever has restarted, we only need to fetch data from the GCS server. + // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub + // server first, then fetch data from the GCS server. + if (is_pubsub_server_restarted) { + if (subscribe_operation_ != nullptr) { + RAY_CHECK_OK(subscribe_operation_( + [this](const Status &status) { fetch_all_data_operation_(nullptr); })); + } + } else { + if (fetch_all_data_operation_ != nullptr) { + fetch_all_data_operation_(nullptr); + } } } diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 21edd04ec..fd5bff5d6 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -868,11 +868,16 @@ TEST_F(ServiceBasedGcsClientTest, TestJobTableResubscribe) { }; ASSERT_TRUE(SubscribeToAllJobs(subscribe)); + ASSERT_TRUE(AddJob(job_table_data)); + WaitPendingDone(job_update_count, 1); RestartGcsServer(); - ASSERT_TRUE(AddJob(job_table_data)); - ASSERT_TRUE(MarkJobFinished(job_id)); + // The GCS client will fetch data from the GCS server after the GCS server is restarted, + // and the GCS server keeps a job record, so `job_update_count` plus one. WaitPendingDone(job_update_count, 2); + + ASSERT_TRUE(MarkJobFinished(job_id)); + WaitPendingDone(job_update_count, 3); } TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) {