Revert "GCS server task info handler use storage instead of redis accessor (#8531)" (#8562)

This reverts commit 9823e15311.
This commit is contained in:
Eric Liang 2020-05-22 19:16:43 -07:00 committed by GitHub
parent 2e5e789294
commit 351839bf69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 34 deletions

View file

@ -224,7 +224,7 @@ void GcsServer::StoreGcsServerAddressInRedis() {
std::unique_ptr<rpc::TaskInfoHandler> GcsServer::InitTaskInfoHandler() { std::unique_ptr<rpc::TaskInfoHandler> GcsServer::InitTaskInfoHandler() {
return std::unique_ptr<rpc::DefaultTaskInfoHandler>( return std::unique_ptr<rpc::DefaultTaskInfoHandler>(
new rpc::DefaultTaskInfoHandler(gcs_table_storage_, gcs_pub_sub_)); new rpc::DefaultTaskInfoHandler(*redis_gcs_client_, gcs_pub_sub_));
} }
std::unique_ptr<rpc::StatsHandler> GcsServer::InitStatsHandler() { std::unique_ptr<rpc::StatsHandler> GcsServer::InitStatsHandler() {

View file

@ -23,22 +23,23 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request,
JobID job_id = JobID::FromBinary(request.task_data().task().task_spec().job_id()); JobID job_id = JobID::FromBinary(request.task_data().task().task_spec().job_id());
TaskID task_id = TaskID::FromBinary(request.task_data().task().task_spec().task_id()); TaskID task_id = TaskID::FromBinary(request.task_data().task().task_spec().task_id());
RAY_LOG(DEBUG) << "Adding task, job id = " << job_id << ", task id = " << task_id; RAY_LOG(DEBUG) << "Adding task, job id = " << job_id << ", task id = " << task_id;
auto on_done = [this, job_id, task_id, request, reply, auto task_table_data = std::make_shared<TaskTableData>();
task_table_data->CopyFrom(request.task_data());
auto on_done = [this, job_id, task_id, task_table_data, request, reply,
send_reply_callback](const Status &status) { send_reply_callback](const Status &status) {
if (!status.ok()) { if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add task, job id = " << job_id RAY_LOG(ERROR) << "Failed to add task, job id = " << job_id
<< ", task id = " << task_id; << ", task id = " << task_id;
} else { } else {
RAY_CHECK_OK(gcs_pub_sub_->Publish( RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_CHANNEL, task_id.Hex(),
TASK_CHANNEL, task_id.Hex(), request.task_data().SerializeAsString(), nullptr)); task_table_data->SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished adding task, job id = " << job_id RAY_LOG(DEBUG) << "Finished adding task, job id = " << job_id
<< ", task id = " << task_id; << ", task id = " << task_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
} }
}; };
Status status = Status status = gcs_client_.Tasks().AsyncAdd(task_table_data, on_done);
gcs_table_storage_->TaskTable().Put(task_id, request.task_data(), on_done);
if (!status.ok()) { if (!status.ok()) {
on_done(status); on_done(status);
} }
@ -52,7 +53,8 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request,
<< ", task id = " << task_id; << ", task id = " << task_id;
auto on_done = [task_id, request, reply, send_reply_callback]( auto on_done = [task_id, request, reply, send_reply_callback](
const Status &status, const boost::optional<TaskTableData> &result) { const Status &status, const boost::optional<TaskTableData> &result) {
if (status.ok() && result) { if (status.ok()) {
RAY_DCHECK(result);
reply->mutable_task_data()->CopyFrom(*result); reply->mutable_task_data()->CopyFrom(*result);
} }
RAY_LOG(DEBUG) << "Finished getting task, job id = " << task_id.JobId() RAY_LOG(DEBUG) << "Finished getting task, job id = " << task_id.JobId()
@ -60,7 +62,7 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request,
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}; };
Status status = gcs_table_storage_->TaskTable().Get(task_id, on_done); Status status = gcs_client_.Tasks().AsyncGet(task_id, on_done);
if (!status.ok()) { if (!status.ok()) {
on_done(status, boost::none); on_done(status, boost::none);
} }
@ -81,7 +83,7 @@ void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
}; };
Status status = gcs_table_storage_->TaskTable().BatchDelete(task_ids, on_done); Status status = gcs_client_.Tasks().AsyncDelete(task_ids, on_done);
if (!status.ok()) { if (!status.ok()) {
on_done(status); on_done(status);
} }
@ -96,23 +98,23 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque
ClientID node_id = ClientID::FromBinary(request.task_lease_data().node_manager_id()); ClientID node_id = ClientID::FromBinary(request.task_lease_data().node_manager_id());
RAY_LOG(DEBUG) << "Adding task lease, job id = " << task_id.JobId() RAY_LOG(DEBUG) << "Adding task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", node id = " << node_id; << ", task id = " << task_id << ", node id = " << node_id;
auto on_done = [this, task_id, node_id, request, reply, auto task_lease_data = std::make_shared<TaskLeaseData>();
task_lease_data->CopyFrom(request.task_lease_data());
auto on_done = [this, task_id, node_id, task_lease_data, request, reply,
send_reply_callback](const Status &status) { send_reply_callback](const Status &status) {
if (!status.ok()) { if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add task lease, job id = " << task_id.JobId() RAY_LOG(ERROR) << "Failed to add task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", node id = " << node_id; << ", task id = " << task_id << ", node id = " << node_id;
} else { } else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_LEASE_CHANNEL, task_id.Hex(), RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_LEASE_CHANNEL, task_id.Hex(),
request.task_lease_data().SerializeAsString(), task_lease_data->SerializeAsString(), nullptr));
nullptr));
RAY_LOG(DEBUG) << "Finished adding task lease, job id = " << task_id.JobId() RAY_LOG(DEBUG) << "Finished adding task lease, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", node id = " << node_id; << ", task id = " << task_id << ", node id = " << node_id;
} }
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
}; };
Status status = gcs_table_storage_->TaskLeaseTable().Put( Status status = gcs_client_.Tasks().AsyncAddTaskLease(task_lease_data, on_done);
task_id, request.task_lease_data(), on_done);
if (!status.ok()) { if (!status.ok()) {
on_done(status); on_done(status);
} }
@ -126,7 +128,8 @@ void DefaultTaskInfoHandler::HandleGetTaskLease(const GetTaskLeaseRequest &reque
<< ", task id = " << task_id; << ", task id = " << task_id;
auto on_done = [task_id, request, reply, send_reply_callback]( auto on_done = [task_id, request, reply, send_reply_callback](
const Status &status, const boost::optional<TaskLeaseData> &result) { const Status &status, const boost::optional<TaskLeaseData> &result) {
if (status.ok() && result) { if (status.ok()) {
RAY_DCHECK(result);
reply->mutable_task_lease_data()->CopyFrom(*result); reply->mutable_task_lease_data()->CopyFrom(*result);
} }
RAY_LOG(DEBUG) << "Finished getting task lease, job id = " << task_id.JobId() RAY_LOG(DEBUG) << "Finished getting task lease, job id = " << task_id.JobId()
@ -134,7 +137,7 @@ void DefaultTaskInfoHandler::HandleGetTaskLease(const GetTaskLeaseRequest &reque
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}; };
Status status = gcs_table_storage_->TaskLeaseTable().Get(task_id, on_done); Status status = gcs_client_.Tasks().AsyncGetTaskLease(task_id, on_done);
if (!status.ok()) { if (!status.ok()) {
on_done(status, boost::none); on_done(status, boost::none);
} }
@ -150,6 +153,8 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction(
<< ", task id = " << task_id << ", reconstructions num = " << ", task id = " << task_id << ", reconstructions num = "
<< request.task_reconstruction().num_reconstructions() << request.task_reconstruction().num_reconstructions()
<< ", node id = " << node_id; << ", node id = " << node_id;
auto task_reconstruction_data = std::make_shared<TaskReconstructionData>();
task_reconstruction_data->CopyFrom(request.task_reconstruction());
auto on_done = [task_id, node_id, request, reply, auto on_done = [task_id, node_id, request, reply,
send_reply_callback](const Status &status) { send_reply_callback](const Status &status) {
if (!status.ok()) { if (!status.ok()) {
@ -166,8 +171,8 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction(
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
}; };
Status status = gcs_table_storage_->TaskReconstructionTable().Put( Status status =
task_id, request.task_reconstruction(), on_done); gcs_client_.Tasks().AttemptTaskReconstruction(task_reconstruction_data, on_done);
if (!status.ok()) { if (!status.ok()) {
on_done(status); on_done(status);
} }

View file

@ -15,7 +15,6 @@
#ifndef RAY_GCS_TASK_INFO_HANDLER_IMPL_H #ifndef RAY_GCS_TASK_INFO_HANDLER_IMPL_H
#define RAY_GCS_TASK_INFO_HANDLER_IMPL_H #define RAY_GCS_TASK_INFO_HANDLER_IMPL_H
#include "gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_gcs_client.h" #include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h"
@ -26,9 +25,9 @@ namespace rpc {
/// This implementation class of `TaskInfoHandler`. /// This implementation class of `TaskInfoHandler`.
class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { class DefaultTaskInfoHandler : public rpc::TaskInfoHandler {
public: public:
explicit DefaultTaskInfoHandler(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage, explicit DefaultTaskInfoHandler(gcs::RedisGcsClient &gcs_client,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub) std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
: gcs_table_storage_(gcs_table_storage), gcs_pub_sub_(gcs_pub_sub) {} : gcs_client_(gcs_client), gcs_pub_sub_(gcs_pub_sub) {}
void HandleAddTask(const AddTaskRequest &request, AddTaskReply *reply, void HandleAddTask(const AddTaskRequest &request, AddTaskReply *reply,
SendReplyCallback send_reply_callback) override; SendReplyCallback send_reply_callback) override;
@ -50,7 +49,7 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler {
SendReplyCallback send_reply_callback) override; SendReplyCallback send_reply_callback) override;
private: private:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_; gcs::RedisGcsClient &gcs_client_;
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub_; std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub_;
}; };

View file

@ -23,8 +23,7 @@ namespace ray {
namespace gcs { namespace gcs {
std::string RedisStoreClient::table_separator_ = ":"; std::string RedisStoreClient::separator_ = ":";
std::string RedisStoreClient::index_table_separator_ = "&";
Status RedisStoreClient::AsyncPut(const std::string &table_name, const std::string &key, Status RedisStoreClient::AsyncPut(const std::string &table_name, const std::string &key,
const std::string &data, const std::string &data,
@ -208,7 +207,7 @@ RedisStoreClient::GenCommandsByShards(const std::shared_ptr<RedisClient> &redis_
std::string RedisStoreClient::GenRedisKey(const std::string &table_name, std::string RedisStoreClient::GenRedisKey(const std::string &table_name,
const std::string &key) { const std::string &key) {
std::stringstream ss; std::stringstream ss;
ss << table_name << table_separator_ << key; ss << table_name << separator_ << key;
return ss.str(); return ss.str();
} }
@ -216,35 +215,33 @@ std::string RedisStoreClient::GenRedisKey(const std::string &table_name,
const std::string &key, const std::string &key,
const std::string &index_key) { const std::string &index_key) {
std::stringstream ss; std::stringstream ss;
ss << table_name << index_table_separator_ << index_key << index_table_separator_ ss << table_name << separator_ << index_key << separator_ << key;
<< key;
return ss.str(); return ss.str();
} }
std::string RedisStoreClient::GenRedisMatchPattern(const std::string &table_name) { std::string RedisStoreClient::GenRedisMatchPattern(const std::string &table_name) {
std::stringstream ss; std::stringstream ss;
ss << table_name << table_separator_ << "*"; ss << table_name << separator_ << "*";
return ss.str(); return ss.str();
} }
std::string RedisStoreClient::GenRedisMatchPattern(const std::string &table_name, std::string RedisStoreClient::GenRedisMatchPattern(const std::string &table_name,
const std::string &index_key) { const std::string &index_key) {
std::stringstream ss; std::stringstream ss;
ss << table_name << index_table_separator_ << index_key << index_table_separator_ ss << table_name << separator_ << index_key << separator_ << "*";
<< "*";
return ss.str(); return ss.str();
} }
std::string RedisStoreClient::GetKeyFromRedisKey(const std::string &redis_key, std::string RedisStoreClient::GetKeyFromRedisKey(const std::string &redis_key,
const std::string &table_name) { const std::string &table_name) {
auto pos = table_name.size() + table_separator_.size(); auto pos = table_name.size() + separator_.size();
return redis_key.substr(pos, redis_key.size() - pos); return redis_key.substr(pos, redis_key.size() - pos);
} }
std::string RedisStoreClient::GetKeyFromRedisKey(const std::string &redis_key, std::string RedisStoreClient::GetKeyFromRedisKey(const std::string &redis_key,
const std::string &table_name, const std::string &table_name,
const std::string &index_key) { const std::string &index_key) {
auto pos = table_name.size() + index_table_separator_.size() * 2 + index_key.size(); auto pos = table_name.size() + separator_.size() * 2 + index_key.size();
return redis_key.substr(pos, redis_key.size() - pos); return redis_key.substr(pos, redis_key.size() - pos);
} }

View file

@ -112,8 +112,7 @@ class RedisStoreClient : public StoreClient {
const std::vector<std::string> &keys); const std::vector<std::string> &keys);
/// The separator is used when building redis key. /// The separator is used when building redis key.
static std::string table_separator_; static std::string separator_;
static std::string index_table_separator_;
static std::string GenRedisKey(const std::string &table_name, const std::string &key); static std::string GenRedisKey(const std::string &table_name, const std::string &key);