[GCS] Remove task info publish as nowhere uses it (#13509)

* Remove task info publish as nowhere uses it

* simplify right publish channel
This commit is contained in:
Tao Wang 2021-01-18 17:15:03 +08:00 committed by GitHub
parent 1e2adb335e
commit 516eb77080
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 1 additions and 148 deletions

View file

@ -202,23 +202,6 @@ class TaskInfoAccessor {
virtual Status AsyncGet(const TaskID &task_id,
const OptionalItemCallback<rpc::TaskTableData> &callback) = 0;
/// Subscribe asynchronously to the event that the given task is added in GCS.
///
/// \param task_id The ID of the task to be subscribed to.
/// \param subscribe Callback that will be called each time when the task is updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribe(
const TaskID &task_id,
const SubscribeCallback<TaskID, rpc::TaskTableData> &subscribe,
const StatusCallback &done) = 0;
/// Cancel subscription to a task asynchronously.
///
/// \param task_id The ID of the task to be unsubscribed to.
/// \return Status
virtual Status AsyncUnsubscribe(const TaskID &task_id) = 0;
/// Add a task lease to GCS asynchronously.
///
/// \param data_ptr The task lease that will be added to GCS.
@ -274,12 +257,6 @@ class TaskInfoAccessor {
/// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
/// Check if the specified task is unsubscribed.
///
/// \param task_id The ID of the task.
/// \return Whether the specified task is unsubscribed.
virtual bool IsTaskUnsubscribed(const TaskID &task_id) = 0;
/// Check if the specified task lease is unsubscribed.
///
/// \param task_id The ID of the task.

View file

@ -887,56 +887,6 @@ Status ServiceBasedTaskInfoAccessor::AsyncGet(
return Status::OK();
}
Status ServiceBasedTaskInfoAccessor::AsyncSubscribe(
const TaskID &task_id, const SubscribeCallback<TaskID, rpc::TaskTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id
<< ", job id = " << task_id.JobId();
auto fetch_data_operation = [this, task_id,
subscribe](const StatusCallback &fetch_done) {
auto callback = [task_id, subscribe, fetch_done](
const Status &status,
const boost::optional<rpc::TaskTableData> &result) {
if (result) {
subscribe(task_id, *result);
}
if (fetch_done) {
fetch_done(status);
}
};
RAY_CHECK_OK(AsyncGet(task_id, callback));
};
auto subscribe_operation = [this, task_id,
subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [task_id, subscribe](const std::string &id,
const std::string &data) {
TaskTableData task_data;
task_data.ParseFromString(data);
subscribe(task_id, task_data);
};
return client_impl_->GetGcsPubSub().Subscribe(TASK_CHANNEL, task_id.Hex(),
on_subscribe, subscribe_done);
};
subscribe_task_operations_[task_id] = subscribe_operation;
fetch_task_data_operations_[task_id] = fetch_data_operation;
return subscribe_operation(
[fetch_data_operation, done](const Status &status) { fetch_data_operation(done); });
}
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id
<< ", job id = " << task_id.JobId();
auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_CHANNEL, task_id.Hex());
subscribe_task_operations_.erase(task_id);
fetch_task_data_operations_.erase(task_id);
RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id
<< ", job id = " << task_id.JobId();
return status;
}
Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease(
const std::shared_ptr<rpc::TaskLeaseData> &data_ptr, const StatusCallback &callback) {
TaskID task_id = TaskID::FromBinary(data_ptr->task_id());
@ -1061,12 +1011,6 @@ void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar
// If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
// server first, then fetch data from the GCS server.
if (is_pubsub_server_restarted) {
for (auto &item : subscribe_task_operations_) {
auto &task_id = item.first;
RAY_CHECK_OK(item.second([this, task_id](const Status &status) {
fetch_task_data_operations_[task_id](nullptr);
}));
}
for (auto &item : subscribe_task_lease_operations_) {
auto &task_id = item.first;
RAY_CHECK_OK(item.second([this, task_id](const Status &status) {
@ -1074,19 +1018,12 @@ void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar
}));
}
} else {
for (auto &item : fetch_task_data_operations_) {
item.second(nullptr);
}
for (auto &item : fetch_task_lease_data_operations_) {
item.second(nullptr);
}
}
}
bool ServiceBasedTaskInfoAccessor::IsTaskUnsubscribed(const TaskID &task_id) {
return client_impl_->GetGcsPubSub().IsUnsubscribed(TASK_CHANNEL, task_id.Hex());
}
bool ServiceBasedTaskInfoAccessor::IsTaskLeaseUnsubscribed(const TaskID &task_id) {
return client_impl_->GetGcsPubSub().IsUnsubscribed(TASK_LEASE_CHANNEL, task_id.Hex());
}

View file

@ -273,12 +273,6 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
Status AsyncGet(const TaskID &task_id,
const OptionalItemCallback<rpc::TaskTableData> &callback) override;
Status AsyncSubscribe(const TaskID &task_id,
const SubscribeCallback<TaskID, rpc::TaskTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribe(const TaskID &task_id) override;
Status AsyncAddTaskLease(const std::shared_ptr<rpc::TaskLeaseData> &data_ptr,
const StatusCallback &callback) override;
@ -299,19 +293,15 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
void AsyncResubscribe(bool is_pubsub_server_restarted) override;
bool IsTaskUnsubscribed(const TaskID &task_id) override;
bool IsTaskLeaseUnsubscribed(const TaskID &task_id) override;
private:
/// Save the subscribe operations, so we can call them again when PubSub
/// server restarts from a failure.
std::unordered_map<TaskID, SubscribeOperation> subscribe_task_operations_;
std::unordered_map<TaskID, SubscribeOperation> subscribe_task_lease_operations_;
/// Save the fetch data operation in this function, so we can call it again when GCS
/// server restarts from a failure.
std::unordered_map<TaskID, FetchDataOperation> fetch_task_data_operations_;
std::unordered_map<TaskID, FetchDataOperation> fetch_task_lease_data_operations_;
ServiceBasedGcsClient *client_impl_;

View file

@ -365,27 +365,6 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
return resources;
}
bool SubscribeTask(
const TaskID &task_id,
const gcs::SubscribeCallback<TaskID, rpc::TaskTableData> &subscribe) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Tasks().AsyncSubscribe(
task_id, subscribe,
[&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}
void UnsubscribeTask(const TaskID &task_id) {
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id));
}
void WaitForTaskUnsubscribed(const TaskID &task_id) {
auto condition = [this, task_id]() {
return gcs_client_->Tasks().IsTaskUnsubscribed(task_id);
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}
void WaitForTaskLeaseUnsubscribed(const TaskID &task_id) {
auto condition = [this, task_id]() {
return gcs_client_->Tasks().IsTaskLeaseUnsubscribed(task_id);
@ -848,29 +827,12 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) {
TaskID task_id = TaskID::ForDriverTask(job_id);
auto task_table_data = Mocker::GenTaskTableData(job_id.Binary(), task_id.Binary());
// Subscribe to the event that the given task is added in GCS.
std::atomic<int> task_count(0);
auto task_subscribe = [&task_count](const TaskID &id,
const rpc::TaskTableData &result) { ++task_count; };
ASSERT_TRUE(SubscribeTask(task_id, task_subscribe));
// Add a task to GCS.
ASSERT_TRUE(AddTask(task_table_data));
auto get_task_result = GetTask(task_id);
ASSERT_TRUE(get_task_result.task().task_spec().task_id() == task_id.Binary());
ASSERT_TRUE(get_task_result.task().task_spec().job_id() == job_id.Binary());
// Cancel subscription to a task.
UnsubscribeTask(task_id);
WaitForTaskUnsubscribed(task_id);
// Add a task to GCS again.
ASSERT_TRUE(AddTask(task_table_data));
// Assert unsubscribe succeeded.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_EQ(task_count, 1);
// Subscribe to the event that the given task lease is added in GCS.
std::atomic<int> task_lease_count(0);
auto task_lease_subscribe = [&task_lease_count](
@ -1178,12 +1140,6 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) {
TaskID task_id = TaskID::ForDriverTask(job_id);
auto task_table_data = Mocker::GenTaskTableData(job_id.Binary(), task_id.Binary());
// Subscribe to the event that the given task is added in GCS.
std::atomic<int> task_count(0);
auto task_subscribe = [&task_count](const TaskID &task_id,
const gcs::TaskTableData &data) { ++task_count; };
ASSERT_TRUE(SubscribeTask(task_id, task_subscribe));
// Subscribe to the event that the given task lease is added in GCS.
std::atomic<int> task_lease_count(0);
auto task_lease_subscribe = [&task_lease_count](
@ -1199,10 +1155,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) {
NodeID node_id = NodeID::FromRandom();
auto task_lease = Mocker::GenTaskLeaseData(task_id.Binary(), node_id.Binary());
ASSERT_TRUE(AddTaskLease(task_lease));
WaitForExpectedCount(task_count, 1);
WaitForExpectedCount(task_lease_count, 1);
UnsubscribeTask(task_id);
WaitForTaskUnsubscribed(task_id);
RestartGcsServer();
@ -1210,7 +1163,6 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) {
task_lease = Mocker::GenTaskLeaseData(task_id.Binary(), node_id.Binary());
ASSERT_TRUE(AddTaskLease(task_lease));
WaitForExpectedCount(task_lease_count, 3);
WaitForExpectedCount(task_count, 1);
}
TEST_F(ServiceBasedGcsClientTest, TestWorkerTableResubscribe) {

View file

@ -23,14 +23,11 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request,
JobID job_id = JobID::FromBinary(request.task_data().task().task_spec().job_id());
TaskID task_id = TaskID::FromBinary(request.task_data().task().task_spec().task_id());
RAY_LOG(DEBUG) << "Adding task, job id = " << job_id << ", task id = " << task_id;
auto on_done = [this, job_id, task_id, request, reply,
send_reply_callback](const Status &status) {
auto on_done = [job_id, task_id, reply, send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add task, job id = " << job_id
<< ", task id = " << task_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
TASK_CHANNEL, task_id.Hex(), request.task_data().SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished adding task, job id = " << job_id
<< ", task id = " << task_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);