diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 19dc7f1de..5c9cef3e7 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -202,23 +202,6 @@ class TaskInfoAccessor { virtual Status AsyncGet(const TaskID &task_id, const OptionalItemCallback &callback) = 0; - /// Subscribe asynchronously to the event that the given task is added in GCS. - /// - /// \param task_id The ID of the task to be subscribed to. - /// \param subscribe Callback that will be called each time when the task is updated. - /// \param done Callback that will be called when subscription is complete. - /// \return Status - virtual Status AsyncSubscribe( - const TaskID &task_id, - const SubscribeCallback &subscribe, - const StatusCallback &done) = 0; - - /// Cancel subscription to a task asynchronously. - /// - /// \param task_id The ID of the task to be unsubscribed to. - /// \return Status - virtual Status AsyncUnsubscribe(const TaskID &task_id) = 0; - /// Add a task lease to GCS asynchronously. /// /// \param data_ptr The task lease that will be added to GCS. @@ -274,12 +257,6 @@ class TaskInfoAccessor { /// \param is_pubsub_server_restarted Whether pubsub server is restarted. virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; - /// Check if the specified task is unsubscribed. - /// - /// \param task_id The ID of the task. - /// \return Whether the specified task is unsubscribed. - virtual bool IsTaskUnsubscribed(const TaskID &task_id) = 0; - /// Check if the specified task lease is unsubscribed. /// /// \param task_id The ID of the task. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index ea2425caa..50379f58a 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -887,56 +887,6 @@ Status ServiceBasedTaskInfoAccessor::AsyncGet( return Status::OK(); } -Status ServiceBasedTaskInfoAccessor::AsyncSubscribe( - const TaskID &task_id, const SubscribeCallback &subscribe, - const StatusCallback &done) { - RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id - << ", job id = " << task_id.JobId(); - - auto fetch_data_operation = [this, task_id, - subscribe](const StatusCallback &fetch_done) { - auto callback = [task_id, subscribe, fetch_done]( - const Status &status, - const boost::optional &result) { - if (result) { - subscribe(task_id, *result); - } - if (fetch_done) { - fetch_done(status); - } - }; - RAY_CHECK_OK(AsyncGet(task_id, callback)); - }; - - auto subscribe_operation = [this, task_id, - subscribe](const StatusCallback &subscribe_done) { - auto on_subscribe = [task_id, subscribe](const std::string &id, - const std::string &data) { - TaskTableData task_data; - task_data.ParseFromString(data); - subscribe(task_id, task_data); - }; - return client_impl_->GetGcsPubSub().Subscribe(TASK_CHANNEL, task_id.Hex(), - on_subscribe, subscribe_done); - }; - - subscribe_task_operations_[task_id] = subscribe_operation; - fetch_task_data_operations_[task_id] = fetch_data_operation; - return subscribe_operation( - [fetch_data_operation, done](const Status &status) { fetch_data_operation(done); }); -} - -Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) { - RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id - << ", job id = " << task_id.JobId(); - auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_CHANNEL, task_id.Hex()); - subscribe_task_operations_.erase(task_id); - fetch_task_data_operations_.erase(task_id); - RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id - << ", job id = " << task_id.JobId(); - return status; -} - Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease( const std::shared_ptr &data_ptr, const StatusCallback &callback) { TaskID task_id = TaskID::FromBinary(data_ptr->task_id()); @@ -1061,12 +1011,6 @@ void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar // If the pub-sub server has also restarted, we need to resubscribe to the pub-sub // server first, then fetch data from the GCS server. if (is_pubsub_server_restarted) { - for (auto &item : subscribe_task_operations_) { - auto &task_id = item.first; - RAY_CHECK_OK(item.second([this, task_id](const Status &status) { - fetch_task_data_operations_[task_id](nullptr); - })); - } for (auto &item : subscribe_task_lease_operations_) { auto &task_id = item.first; RAY_CHECK_OK(item.second([this, task_id](const Status &status) { @@ -1074,19 +1018,12 @@ void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar })); } } else { - for (auto &item : fetch_task_data_operations_) { - item.second(nullptr); - } for (auto &item : fetch_task_lease_data_operations_) { item.second(nullptr); } } } -bool ServiceBasedTaskInfoAccessor::IsTaskUnsubscribed(const TaskID &task_id) { - return client_impl_->GetGcsPubSub().IsUnsubscribed(TASK_CHANNEL, task_id.Hex()); -} - bool ServiceBasedTaskInfoAccessor::IsTaskLeaseUnsubscribed(const TaskID &task_id) { return client_impl_->GetGcsPubSub().IsUnsubscribed(TASK_LEASE_CHANNEL, task_id.Hex()); } diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 85f579b96..eca23dd11 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -273,12 +273,6 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor { Status AsyncGet(const TaskID &task_id, const OptionalItemCallback &callback) override; - Status AsyncSubscribe(const TaskID &task_id, - const SubscribeCallback &subscribe, - const StatusCallback &done) override; - - Status AsyncUnsubscribe(const TaskID &task_id) override; - Status AsyncAddTaskLease(const std::shared_ptr &data_ptr, const StatusCallback &callback) override; @@ -299,19 +293,15 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor { void AsyncResubscribe(bool is_pubsub_server_restarted) override; - bool IsTaskUnsubscribed(const TaskID &task_id) override; - bool IsTaskLeaseUnsubscribed(const TaskID &task_id) override; private: /// Save the subscribe operations, so we can call them again when PubSub /// server restarts from a failure. - std::unordered_map subscribe_task_operations_; std::unordered_map subscribe_task_lease_operations_; /// Save the fetch data operation in this function, so we can call it again when GCS /// server restarts from a failure. - std::unordered_map fetch_task_data_operations_; std::unordered_map fetch_task_lease_data_operations_; ServiceBasedGcsClient *client_impl_; diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 4a4868c5d..64093880f 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -365,27 +365,6 @@ class ServiceBasedGcsClientTest : public ::testing::Test { return resources; } - bool SubscribeTask( - const TaskID &task_id, - const gcs::SubscribeCallback &subscribe) { - std::promise promise; - RAY_CHECK_OK(gcs_client_->Tasks().AsyncSubscribe( - task_id, subscribe, - [&promise](Status status) { promise.set_value(status.ok()); })); - return WaitReady(promise.get_future(), timeout_ms_); - } - - void UnsubscribeTask(const TaskID &task_id) { - RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id)); - } - - void WaitForTaskUnsubscribed(const TaskID &task_id) { - auto condition = [this, task_id]() { - return gcs_client_->Tasks().IsTaskUnsubscribed(task_id); - }; - EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); - } - void WaitForTaskLeaseUnsubscribed(const TaskID &task_id) { auto condition = [this, task_id]() { return gcs_client_->Tasks().IsTaskLeaseUnsubscribed(task_id); @@ -848,29 +827,12 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) { TaskID task_id = TaskID::ForDriverTask(job_id); auto task_table_data = Mocker::GenTaskTableData(job_id.Binary(), task_id.Binary()); - // Subscribe to the event that the given task is added in GCS. - std::atomic task_count(0); - auto task_subscribe = [&task_count](const TaskID &id, - const rpc::TaskTableData &result) { ++task_count; }; - ASSERT_TRUE(SubscribeTask(task_id, task_subscribe)); - // Add a task to GCS. ASSERT_TRUE(AddTask(task_table_data)); auto get_task_result = GetTask(task_id); ASSERT_TRUE(get_task_result.task().task_spec().task_id() == task_id.Binary()); ASSERT_TRUE(get_task_result.task().task_spec().job_id() == job_id.Binary()); - // Cancel subscription to a task. - UnsubscribeTask(task_id); - WaitForTaskUnsubscribed(task_id); - - // Add a task to GCS again. - ASSERT_TRUE(AddTask(task_table_data)); - - // Assert unsubscribe succeeded. - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - EXPECT_EQ(task_count, 1); - // Subscribe to the event that the given task lease is added in GCS. std::atomic task_lease_count(0); auto task_lease_subscribe = [&task_lease_count]( @@ -1178,12 +1140,6 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) { TaskID task_id = TaskID::ForDriverTask(job_id); auto task_table_data = Mocker::GenTaskTableData(job_id.Binary(), task_id.Binary()); - // Subscribe to the event that the given task is added in GCS. - std::atomic task_count(0); - auto task_subscribe = [&task_count](const TaskID &task_id, - const gcs::TaskTableData &data) { ++task_count; }; - ASSERT_TRUE(SubscribeTask(task_id, task_subscribe)); - // Subscribe to the event that the given task lease is added in GCS. std::atomic task_lease_count(0); auto task_lease_subscribe = [&task_lease_count]( @@ -1199,10 +1155,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) { NodeID node_id = NodeID::FromRandom(); auto task_lease = Mocker::GenTaskLeaseData(task_id.Binary(), node_id.Binary()); ASSERT_TRUE(AddTaskLease(task_lease)); - WaitForExpectedCount(task_count, 1); WaitForExpectedCount(task_lease_count, 1); - UnsubscribeTask(task_id); - WaitForTaskUnsubscribed(task_id); RestartGcsServer(); @@ -1210,7 +1163,6 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) { task_lease = Mocker::GenTaskLeaseData(task_id.Binary(), node_id.Binary()); ASSERT_TRUE(AddTaskLease(task_lease)); WaitForExpectedCount(task_lease_count, 3); - WaitForExpectedCount(task_count, 1); } TEST_F(ServiceBasedGcsClientTest, TestWorkerTableResubscribe) { diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.cc b/src/ray/gcs/gcs_server/task_info_handler_impl.cc index b47ab7cef..70cc80f0e 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.cc @@ -23,14 +23,11 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request, JobID job_id = JobID::FromBinary(request.task_data().task().task_spec().job_id()); TaskID task_id = TaskID::FromBinary(request.task_data().task().task_spec().task_id()); RAY_LOG(DEBUG) << "Adding task, job id = " << job_id << ", task id = " << task_id; - auto on_done = [this, job_id, task_id, request, reply, - send_reply_callback](const Status &status) { + auto on_done = [job_id, task_id, reply, send_reply_callback](const Status &status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add task, job id = " << job_id << ", task id = " << task_id; } else { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - TASK_CHANNEL, task_id.Hex(), request.task_data().SerializeAsString(), nullptr)); RAY_LOG(DEBUG) << "Finished adding task, job id = " << job_id << ", task id = " << task_id; GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);