GCS client add fetch operation before subscribe (#9564)

This commit is contained in:
fangfengbin 2020-07-20 10:16:42 +08:00 committed by GitHub
parent 2554a1a997
commit 0cee75c86a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 5 deletions

View file

@ -89,9 +89,18 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeAll(
void ServiceBasedJobInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) { void ServiceBasedJobInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
RAY_LOG(INFO) << "Reestablishing subscription for job info."; RAY_LOG(INFO) << "Reestablishing subscription for job info.";
// If the pub-sub server has restarted, we need to resubscribe to the pub-sub server. // If only the GCS sever has restarted, we only need to fetch data from the GCS server.
if (subscribe_operation_ != nullptr && is_pubsub_server_restarted) { // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
RAY_CHECK_OK(subscribe_operation_(nullptr)); // server first, then fetch data from the GCS server.
if (is_pubsub_server_restarted) {
if (subscribe_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_operation_(
[this](const Status &status) { fetch_all_data_operation_(nullptr); }));
}
} else {
if (fetch_all_data_operation_ != nullptr) {
fetch_all_data_operation_(nullptr);
}
} }
} }

View file

@ -868,11 +868,16 @@ TEST_F(ServiceBasedGcsClientTest, TestJobTableResubscribe) {
}; };
ASSERT_TRUE(SubscribeToAllJobs(subscribe)); ASSERT_TRUE(SubscribeToAllJobs(subscribe));
ASSERT_TRUE(AddJob(job_table_data));
WaitPendingDone(job_update_count, 1);
RestartGcsServer(); RestartGcsServer();
ASSERT_TRUE(AddJob(job_table_data)); // The GCS client will fetch data from the GCS server after the GCS server is restarted,
ASSERT_TRUE(MarkJobFinished(job_id)); // and the GCS server keeps a job record, so `job_update_count` plus one.
WaitPendingDone(job_update_count, 2); WaitPendingDone(job_update_count, 2);
ASSERT_TRUE(MarkJobFinished(job_id));
WaitPendingDone(job_update_count, 3);
} }
TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) { TEST_F(ServiceBasedGcsClientTest, TestActorTableResubscribe) {