Add task table subscribe retry when gcs service restart (#8601)

This commit is contained in:
fangfengbin 2020-05-26 17:47:03 +08:00 committed by GitHub
parent baa053496a
commit 01f4a6eca0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 132 additions and 51 deletions

View file

@ -309,6 +309,12 @@ class TaskInfoAccessor {
const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr,
const StatusCallback &callback) = 0;
/// Reestablish subscription.
/// This should be called when GCS server restarts from a failure.
///
/// \return Status
virtual Status AsyncReSubscribe() = 0;
protected:
TaskInfoAccessor() = default;
};

View file

@ -830,38 +830,44 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribe(
const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing task, task id = " << task_id;
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id;
auto on_subscribe = [task_id, subscribe](const std::string &id,
const std::string &data) {
TaskTableData task_data;
task_data.ParseFromString(data);
subscribe(task_id, task_data);
auto subscribe_operation = [this, task_id, subscribe](const StatusCallback &done) {
auto on_subscribe = [task_id, subscribe](const std::string &id,
const std::string &data) {
TaskTableData task_data;
task_data.ParseFromString(data);
subscribe(task_id, task_data);
};
auto on_done = [this, task_id, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [task_id, subscribe, done](
const Status &status,
const boost::optional<rpc::TaskTableData> &result) {
if (result) {
subscribe(task_id, *result);
}
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGet(task_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(TASK_CHANNEL, task_id.Hex(),
on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing task, task id = " << task_id;
return status;
};
auto on_done = [this, task_id, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [task_id, subscribe, done](
const Status &status,
const boost::optional<rpc::TaskTableData> &result) {
if (result) {
subscribe(task_id, *result);
}
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGet(task_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(TASK_CHANNEL, task_id.Hex(),
on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing task, task id = " << task_id;
return status;
subscribe_task_operations_[task_id] = subscribe_operation;
return subscribe_operation(done);
}
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id;
auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_CHANNEL, task_id.Hex());
subscribe_task_operations_.erase(task_id);
RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id;
return status;
}
@ -912,37 +918,43 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease(
RAY_LOG(DEBUG) << "Subscribing task lease, task id = " << task_id;
RAY_CHECK(subscribe != nullptr)
<< "Failed to subscribe task lease, task id = " << task_id;
auto on_subscribe = [task_id, subscribe](const std::string &id,
const std::string &data) {
TaskLeaseData task_lease_data;
task_lease_data.ParseFromString(data);
subscribe(task_id, task_lease_data);
auto subscribe_operation = [this, task_id, subscribe](const StatusCallback &done) {
auto on_subscribe = [task_id, subscribe](const std::string &id,
const std::string &data) {
TaskLeaseData task_lease_data;
task_lease_data.ParseFromString(data);
subscribe(task_id, task_lease_data);
};
auto on_done = [this, task_id, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [task_id, subscribe, done](
const Status &status,
const boost::optional<rpc::TaskLeaseData> &result) {
subscribe(task_id, result);
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGetTaskLease(task_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(
TASK_LEASE_CHANNEL, task_id.Hex(), on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing task lease, task id = " << task_id;
return status;
};
auto on_done = [this, task_id, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [task_id, subscribe, done](
const Status &status,
const boost::optional<rpc::TaskLeaseData> &result) {
subscribe(task_id, result);
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGetTaskLease(task_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(TASK_LEASE_CHANNEL, task_id.Hex(),
on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing task lease, task id = " << task_id;
return status;
subscribe_task_lease_operations_[task_id] = subscribe_operation;
return subscribe_operation(done);
}
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id;
auto status =
client_impl_->GetGcsPubSub().Unsubscribe(TASK_LEASE_CHANNEL, task_id.Hex());
subscribe_task_lease_operations_.erase(task_id);
RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id;
return status;
}
@ -969,6 +981,17 @@ Status ServiceBasedTaskInfoAccessor::AttemptTaskReconstruction(
return Status::OK();
}
Status ServiceBasedTaskInfoAccessor::AsyncReSubscribe() {
RAY_LOG(INFO) << "Reestablishing subscription for task info.";
for (auto &item : subscribe_task_operations_) {
RAY_CHECK_OK(item.second(nullptr));
}
for (auto &item : subscribe_task_lease_operations_) {
RAY_CHECK_OK(item.second(nullptr));
}
return Status::OK();
}
ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl) {}

View file

@ -257,7 +257,14 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncReSubscribe() override;
private:
/// Save the subscribe operation in this function, so we can call it again when GCS
/// restarts from a failure.
std::unordered_map<TaskID, SubscribeOperation> subscribe_task_operations_;
std::unordered_map<TaskID, SubscribeOperation> subscribe_task_lease_operations_;
ServiceBasedGcsClient *client_impl_;
};

View file

@ -51,6 +51,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
RAY_CHECK_OK(job_accessor_->AsyncReSubscribe());
RAY_CHECK_OK(actor_accessor_->AsyncReSubscribe());
RAY_CHECK_OK(node_accessor_->AsyncReSubscribe());
RAY_CHECK_OK(task_accessor_->AsyncReSubscribe());
};
// Connect to gcs service.

View file

@ -917,6 +917,45 @@ TEST_F(ServiceBasedGcsClientTest, TestNodeTableReSubscribe) {
WaitPendingDone(batch_heartbeat_count, 1);
}
TEST_F(ServiceBasedGcsClientTest, TestTaskTableReSubscribe) {
JobID job_id = JobID::FromInt(6);
TaskID task_id = TaskID::ForDriverTask(job_id);
auto task_table_data = Mocker::GenTaskTableData(job_id.Binary(), task_id.Binary());
// Subscribe to the event that the given task is added in GCS.
std::atomic<int> task_count(0);
auto task_subscribe = [&task_count](const TaskID &task_id,
const gcs::TaskTableData &data) { ++task_count; };
ASSERT_TRUE(SubscribeTask(task_id, task_subscribe));
// Subscribe to the event that the given task lease is added in GCS.
std::atomic<int> task_lease_count(0);
auto task_lease_subscribe = [&task_lease_count](
const TaskID &task_id,
const boost::optional<rpc::TaskLeaseData> &data) {
if (data) {
++task_lease_count;
}
};
ASSERT_TRUE(SubscribeTaskLease(task_id, task_lease_subscribe));
ASSERT_TRUE(AddTask(task_table_data));
ClientID node_id = ClientID::FromRandom();
auto task_lease = Mocker::GenTaskLeaseData(task_id.Binary(), node_id.Binary());
ASSERT_TRUE(AddTaskLease(task_lease));
WaitPendingDone(task_count, 1);
WaitPendingDone(task_lease_count, 1);
UnsubscribeTask(task_id);
RestartGcsServer();
node_id = ClientID::FromRandom();
task_lease = Mocker::GenTaskLeaseData(task_id.Binary(), node_id.Binary());
ASSERT_TRUE(AddTaskLease(task_lease));
WaitPendingDone(task_lease_count, 3);
WaitPendingDone(task_count, 1);
}
TEST_F(ServiceBasedGcsClientTest, TestGcsRedisFailureDetector) {
// Stop redis.
TearDownTestCase();

View file

@ -443,7 +443,7 @@ Status RedisTaskInfoAccessor::AsyncGetTaskLease(
auto on_failure = [callback](RedisGcsClient *client, const TaskID &task_id) {
boost::optional<TaskLeaseData> result;
callback(Status::Invalid("Task not exist."), result);
callback(Status::Invalid("Task lease not exist."), result);
};
TaskLeaseTable &task_lease_table = client_impl_->task_lease_table();

View file

@ -243,6 +243,10 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor {
const std::shared_ptr<TaskReconstructionData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncReSubscribe() override {
return Status::NotImplemented("AsyncReSubscribe not implemented");
}
private:
RedisGcsClient *client_impl_{nullptr};
// Use a random ClientID for task subscription. Because:

View file

@ -110,6 +110,7 @@ struct Mocker {
auto task_lease_data = std::make_shared<rpc::TaskLeaseData>();
task_lease_data->set_task_id(task_id);
task_lease_data->set_node_manager_id(node_id);
task_lease_data->set_timeout(9999);
return task_lease_data;
}