GCS adapts to task lease table pub sub (#8299)

This commit is contained in:
fangfengbin 2020-05-05 10:16:56 +08:00 committed by GitHub
parent cc7bd6650a
commit 14d03a0869
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 143 additions and 15 deletions

View file

@ -241,6 +241,15 @@ class TaskInfoAccessor {
virtual Status AsyncAddTaskLease(const std::shared_ptr<rpc::TaskLeaseData> &data_ptr,
const StatusCallback &callback) = 0;
/// Get task lease information from GCS asynchronously.
///
/// \param task_id The ID of the task to look up in GCS.
/// \param callback Callback that is called after lookup finished.
/// \return Status
virtual Status AsyncGetTaskLease(
const TaskID &task_id,
const OptionalItemCallback<rpc::TaskLeaseData> &callback) = 0;
/// Subscribe asynchronously to the event that the given task lease is added in GCS.
///
/// \param task_id The ID of the task to be subscribed to.

View file

@ -569,9 +569,7 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl),
subscribe_id_(ClientID::FromRandom()),
task_lease_sub_executor_(client_impl->GetRedisGcsClient().task_lease_table()) {}
: client_impl_(client_impl) {}
Status ServiceBasedTaskInfoAccessor::AsyncAdd(
const std::shared_ptr<rpc::TaskTableData> &data_ptr, const StatusCallback &callback) {
@ -690,6 +688,25 @@ Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease(
return Status::OK();
}
Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease(
const TaskID &task_id, const OptionalItemCallback<rpc::TaskLeaseData> &callback) {
RAY_LOG(DEBUG) << "Getting task lease, task id = " << task_id;
rpc::GetTaskLeaseRequest request;
request.set_task_id(task_id.Binary());
client_impl_->GetGcsRpcClient().GetTaskLease(
request,
[task_id, callback](const Status &status, const rpc::GetTaskLeaseReply &reply) {
if (reply.has_task_lease_data()) {
callback(status, reply.task_lease_data());
} else {
callback(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting task lease, status = " << status
<< ", task id = " << task_id;
});
return Status::OK();
}
Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease(
const TaskID &task_id,
const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe,
@ -697,8 +714,29 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease(
RAY_LOG(DEBUG) << "Subscribing task lease, task id = " << task_id;
RAY_CHECK(subscribe != nullptr)
<< "Failed to subscribe task lease, task id = " << task_id;
auto status =
task_lease_sub_executor_.AsyncSubscribe(subscribe_id_, task_id, subscribe, done);
auto on_subscribe = [task_id, subscribe](const std::string &id,
const std::string &data) {
TaskLeaseData task_lease_data;
task_lease_data.ParseFromString(data);
subscribe(task_id, task_lease_data);
};
auto on_done = [this, task_id, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [task_id, subscribe, done](
const Status &status,
const boost::optional<rpc::TaskLeaseData> &result) {
subscribe(task_id, result);
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGetTaskLease(task_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(TASK_LEASE_CHANNEL, task_id.Hex(),
on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing task lease, task id = " << task_id;
return status;
}
@ -706,7 +744,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease(
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id;
auto status =
task_lease_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, nullptr);
client_impl_->GetGcsPubSub().Unsubscribe(TASK_LEASE_CHANNEL, task_id.Hex());
RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id;
return status;
}

View file

@ -217,6 +217,10 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
Status AsyncAddTaskLease(const std::shared_ptr<rpc::TaskLeaseData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncGetTaskLease(
const TaskID &task_id,
const OptionalItemCallback<rpc::TaskLeaseData> &callback) override;
Status AsyncSubscribeTaskLease(
const TaskID &task_id,
const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe,
@ -230,12 +234,6 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
private:
ServiceBasedGcsClient *client_impl_;
ClientID subscribe_id_;
typedef SubscriptionExecutor<TaskID, boost::optional<TaskLeaseData>, TaskLeaseTable>
TaskLeaseSubscriptionExecutor;
TaskLeaseSubscriptionExecutor task_lease_sub_executor_;
};
/// \class ServiceBasedObjectInfoAccessor

View file

@ -100,10 +100,16 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque
<< ", task id = " << task_id << ", node id = " << node_id;
auto task_lease_data = std::make_shared<TaskLeaseData>();
task_lease_data->CopyFrom(request.task_lease_data());
auto on_done = [task_id, node_id, request, reply, send_reply_callback](Status status) {
auto on_done = [this, task_id, node_id, task_lease_data, request, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", node id = " << node_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_LEASE_CHANNEL, task_id.Hex(),
task_lease_data->SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished adding task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", node id = " << node_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
@ -112,8 +118,29 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished adding task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", node id = " << node_id;
}
void DefaultTaskInfoHandler::HandleGetTaskLease(const GetTaskLeaseRequest &request,
GetTaskLeaseReply *reply,
SendReplyCallback send_reply_callback) {
TaskID task_id = TaskID::FromBinary(request.task_id());
RAY_LOG(DEBUG) << "Getting task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id;
auto on_done = [task_id, request, reply, send_reply_callback](
const Status &status, const boost::optional<TaskLeaseData> &result) {
if (status.ok()) {
RAY_DCHECK(result);
reply->mutable_task_lease_data()->CopyFrom(*result);
}
RAY_LOG(DEBUG) << "Finished getting task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", status = " << status.ToString();
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
Status status = gcs_client_.Tasks().AsyncGetTaskLease(task_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
}
void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction(

View file

@ -41,6 +41,9 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler {
void HandleAddTaskLease(const AddTaskLeaseRequest &request, AddTaskLeaseReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleGetTaskLease(const GetTaskLeaseRequest &request, GetTaskLeaseReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleAttemptTaskReconstruction(const AttemptTaskReconstructionRequest &request,
AttemptTaskReconstructionReply *reply,
SendReplyCallback send_reply_callback) override;

View file

@ -29,6 +29,7 @@ namespace gcs {
#define WORKER_FAILURE_CHANNEL "WORKER_FAILURE"
#define OBJECT_CHANNEL "OBJECT"
#define TASK_CHANNEL "TASK"
#define TASK_LEASE_CHANNEL "TASK_LEASE"
/// \class GcsPubSub
///

View file

@ -405,6 +405,24 @@ Status RedisTaskInfoAccessor::AsyncAddTaskLease(
return task_lease_table.Add(task_id.JobId(), task_id, data_ptr, on_done);
}
Status RedisTaskInfoAccessor::AsyncGetTaskLease(
const TaskID &task_id, const OptionalItemCallback<TaskLeaseData> &callback) {
RAY_CHECK(callback != nullptr);
auto on_success = [callback](RedisGcsClient *client, const TaskID &task_id,
const TaskLeaseData &data) {
boost::optional<TaskLeaseData> result(data);
callback(Status::OK(), result);
};
auto on_failure = [callback](RedisGcsClient *client, const TaskID &task_id) {
boost::optional<TaskLeaseData> result;
callback(Status::Invalid("Task not exist."), result);
};
TaskLeaseTable &task_lease_table = client_impl_->task_lease_table();
return task_lease_table.Lookup(task_id.JobId(), task_id, on_success, on_failure);
}
Status RedisTaskInfoAccessor::AsyncSubscribeTaskLease(
const TaskID &task_id,
const SubscribeCallback<TaskID, boost::optional<TaskLeaseData>> &subscribe,

View file

@ -200,6 +200,9 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor {
Status AsyncAddTaskLease(const std::shared_ptr<TaskLeaseData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncGetTaskLease(const TaskID &task_id,
const OptionalItemCallback<TaskLeaseData> &callback) override;
Status AsyncSubscribeTaskLease(
const TaskID &task_id,
const SubscribeCallback<TaskID, boost::optional<TaskLeaseData>> &subscribe,

View file

@ -289,6 +289,15 @@ message AddTaskLeaseReply {
GcsStatus status = 1;
}
message GetTaskLeaseRequest {
bytes task_id = 1;
}
message GetTaskLeaseReply {
GcsStatus status = 1;
TaskLeaseData task_lease_data = 2;
}
message AttemptTaskReconstructionRequest {
TaskReconstructionData task_reconstruction = 1;
}
@ -307,6 +316,8 @@ service TaskInfoGcsService {
rpc DeleteTasks(DeleteTasksRequest) returns (DeleteTasksReply);
// Add a task lease to GCS Service.
rpc AddTaskLease(AddTaskLeaseRequest) returns (AddTaskLeaseReply);
// Get task lease information from GCS Service.
rpc GetTaskLease(GetTaskLeaseRequest) returns (GetTaskLeaseReply);
// Attempt task reconstruction to GCS Service.
rpc AttemptTaskReconstruction(AttemptTaskReconstructionRequest)
returns (AttemptTaskReconstructionReply);

View file

@ -144,6 +144,18 @@ class MockTaskInfoAccessor : public gcs::RedisTaskInfoAccessor {
return Status::OK();
}
Status AsyncGetTaskLease(
const TaskID &task_id,
const gcs::OptionalItemCallback<rpc::TaskLeaseData> &callback) override {
auto iter = task_lease_table_.find(task_id);
if (iter != task_lease_table_.end()) {
callback(Status::OK(), *iter->second);
} else {
callback(Status::OK(), boost::none);
}
return Status::OK();
}
Status AttemptTaskReconstruction(
const std::shared_ptr<TaskReconstructionData> &task_data,
const gcs::StatusCallback &done) override {

View file

@ -170,6 +170,9 @@ class GcsRpcClient {
/// Add a task lease to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTaskLease, task_info_grpc_client_, )
/// Get task lease information from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, GetTaskLease, task_info_grpc_client_, )
/// Attempt task reconstruction to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(TaskInfoGcsService, AttemptTaskReconstruction,
task_info_grpc_client_, )

View file

@ -283,6 +283,10 @@ class TaskInfoGcsServiceHandler {
AddTaskLeaseReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetTaskLease(const GetTaskLeaseRequest &request,
GetTaskLeaseReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleAttemptTaskReconstruction(
const AttemptTaskReconstructionRequest &request,
AttemptTaskReconstructionReply *reply, SendReplyCallback send_reply_callback) = 0;
@ -308,6 +312,7 @@ class TaskInfoGrpcService : public GrpcService {
TASK_INFO_SERVICE_RPC_HANDLER(GetTask);
TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks);
TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease);
TASK_INFO_SERVICE_RPC_HANDLER(GetTaskLease);
TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction);
}