Add task reconstruction function to task info handler (#6711)

This commit is contained in:
fangfengbin 2020-01-09 15:37:42 +08:00 committed by Hao Chen
parent 3673835f30
commit ca454c5c1b
6 changed files with 73 additions and 0 deletions

View file

@ -94,5 +94,34 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque
<< ", node id = " << node_id;
}
void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction(
const AttemptTaskReconstructionRequest &request,
AttemptTaskReconstructionReply *reply, SendReplyCallback send_reply_callback) {
ClientID node_id =
ClientID::FromBinary(request.task_reconstruction().node_manager_id());
RAY_LOG(DEBUG) << "Reconstructing task, reconstructions num = "
<< request.task_reconstruction().num_reconstructions()
<< ", node id = " << node_id;
auto task_reconstruction_data = std::make_shared<TaskReconstructionData>();
task_reconstruction_data->CopyFrom(request.task_reconstruction());
auto on_done = [node_id, request, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to reconstruct task, reconstructions num = "
<< request.task_reconstruction().num_reconstructions()
<< ", node id = " << node_id;
}
send_reply_callback(status, nullptr, nullptr);
};
Status status =
gcs_client_.Tasks().AttemptTaskReconstruction(task_reconstruction_data, on_done);
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished reconstructing task, reconstructions num = "
<< request.task_reconstruction().num_reconstructions()
<< ", node id = " << node_id;
}
} // namespace rpc
} // namespace ray

View file

@ -25,6 +25,10 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler {
void HandleAddTaskLease(const AddTaskLeaseRequest &request, AddTaskLeaseReply *reply,
SendReplyCallback send_reply_callback) override;
void HandleAttemptTaskReconstruction(const AttemptTaskReconstructionRequest &request,
AttemptTaskReconstructionReply *reply,
SendReplyCallback send_reply_callback) override;
private:
gcs::RedisGcsClient &gcs_client_;
};

View file

@ -334,6 +334,17 @@ class GcsServerTest : public RedisServiceManagerForTest {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool AttemptTaskReconstruction(const rpc::AttemptTaskReconstructionRequest &request) {
std::promise<bool> promise;
client_->AttemptTaskReconstruction(
request, [&promise](const Status &status,
const rpc::AttemptTaskReconstructionReply &reply) {
RAY_CHECK_OK(status);
promise.set_value(true);
});
return WaitReady(promise.get_future(), timeout_ms_);
}
bool WaitReady(const std::future<bool> &future, uint64_t timeout_ms) {
auto status = future.wait_for(std::chrono::milliseconds(timeout_ms));
return status == std::future_status::ready;
@ -572,6 +583,16 @@ TEST_F(GcsServerTest, TestTaskInfo) {
rpc::AddTaskLeaseRequest add_task_lease_request;
add_task_lease_request.mutable_task_lease_data()->CopyFrom(task_lease_data);
ASSERT_TRUE(AddTaskLease(add_task_lease_request));
// Attempt task reconstruction
rpc::AttemptTaskReconstructionRequest attempt_task_reconstruction_request;
rpc::TaskReconstructionData task_reconstruction_data;
task_reconstruction_data.set_task_id(task_id.Binary());
task_reconstruction_data.set_node_manager_id(node_id.Binary());
task_reconstruction_data.set_num_reconstructions(0);
attempt_task_reconstruction_request.mutable_task_reconstruction()->CopyFrom(
task_reconstruction_data);
ASSERT_TRUE(AttemptTaskReconstruction(attempt_task_reconstruction_request));
}
} // namespace ray

View file

@ -248,6 +248,13 @@ message AddTaskLeaseRequest {
message AddTaskLeaseReply {
}
message AttemptTaskReconstructionRequest {
TaskReconstructionData task_reconstruction = 1;
}
message AttemptTaskReconstructionReply {
}
// Service for task info access.
service TaskInfoGcsService {
// Add a task to GCS Service.
@ -258,4 +265,7 @@ service TaskInfoGcsService {
rpc DeleteTasks(DeleteTasksRequest) returns (DeleteTasksReply);
// Add a task lease to GCS Service.
rpc AddTaskLease(AddTaskLeaseRequest) returns (AddTaskLeaseReply);
// Attempt task reconstruction to GCS Service.
rpc AttemptTaskReconstruction(AttemptTaskReconstructionRequest)
returns (AttemptTaskReconstructionReply);
}

View file

@ -105,6 +105,10 @@ class GcsRpcClient {
/// Add a task lease to GCS Service.
VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AddTaskLease, task_info_grpc_client_, )
/// Attempt task reconstruction to GCS Service.
VOID_RPC_CLIENT_METHOD(TaskInfoGcsService, AttemptTaskReconstruction,
task_info_grpc_client_, )
private:
/// The gRPC-generated stub.
std::unique_ptr<GrpcClient<JobInfoGcsService>> job_info_grpc_client_;

View file

@ -259,6 +259,10 @@ class TaskInfoGcsServiceHandler {
virtual void HandleAddTaskLease(const AddTaskLeaseRequest &request,
AddTaskLeaseReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleAttemptTaskReconstruction(
const AttemptTaskReconstructionRequest &request,
AttemptTaskReconstructionReply *reply, SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `TaskInfoGcsService`.
@ -282,6 +286,7 @@ class TaskInfoGrpcService : public GrpcService {
TASK_INFO_SERVICE_RPC_HANDLER(GetTask, 1);
TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks, 1);
TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease, 1);
TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction, 1);
}
private: