From 351839bf698dec8dd8592ef70cd063ceab778d0e Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 22 May 2020 19:16:43 -0700 Subject: [PATCH] Revert "GCS server task info handler use storage instead of redis accessor (#8531)" (#8562) This reverts commit 9823e1531187f908fad34861a4c3e02654a6a300. --- src/ray/gcs/gcs_server/gcs_server.cc | 2 +- .../gcs/gcs_server/task_info_handler_impl.cc | 39 +++++++++++-------- .../gcs/gcs_server/task_info_handler_impl.h | 7 ++-- .../gcs/store_client/redis_store_client.cc | 17 ++++---- src/ray/gcs/store_client/redis_store_client.h | 3 +- 5 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 2a1f6533e..b17e5d336 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -224,7 +224,7 @@ void GcsServer::StoreGcsServerAddressInRedis() { std::unique_ptr GcsServer::InitTaskInfoHandler() { return std::unique_ptr( - new rpc::DefaultTaskInfoHandler(gcs_table_storage_, gcs_pub_sub_)); + new rpc::DefaultTaskInfoHandler(*redis_gcs_client_, gcs_pub_sub_)); } std::unique_ptr GcsServer::InitStatsHandler() { diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.cc b/src/ray/gcs/gcs_server/task_info_handler_impl.cc index 6f634390b..c0d250834 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.cc @@ -23,22 +23,23 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request, JobID job_id = JobID::FromBinary(request.task_data().task().task_spec().job_id()); TaskID task_id = TaskID::FromBinary(request.task_data().task().task_spec().task_id()); RAY_LOG(DEBUG) << "Adding task, job id = " << job_id << ", task id = " << task_id; - auto on_done = [this, job_id, task_id, request, reply, + auto task_table_data = std::make_shared(); + task_table_data->CopyFrom(request.task_data()); + auto on_done = [this, job_id, task_id, task_table_data, request, reply, send_reply_callback](const Status &status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add task, job id = " << job_id << ", task id = " << task_id; } else { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - TASK_CHANNEL, task_id.Hex(), request.task_data().SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_CHANNEL, task_id.Hex(), + task_table_data->SerializeAsString(), nullptr)); RAY_LOG(DEBUG) << "Finished adding task, job id = " << job_id << ", task id = " << task_id; GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); } }; - Status status = - gcs_table_storage_->TaskTable().Put(task_id, request.task_data(), on_done); + Status status = gcs_client_.Tasks().AsyncAdd(task_table_data, on_done); if (!status.ok()) { on_done(status); } @@ -52,7 +53,8 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request, << ", task id = " << task_id; auto on_done = [task_id, request, reply, send_reply_callback]( const Status &status, const boost::optional &result) { - if (status.ok() && result) { + if (status.ok()) { + RAY_DCHECK(result); reply->mutable_task_data()->CopyFrom(*result); } RAY_LOG(DEBUG) << "Finished getting task, job id = " << task_id.JobId() @@ -60,7 +62,7 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request, GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); }; - Status status = gcs_table_storage_->TaskTable().Get(task_id, on_done); + Status status = gcs_client_.Tasks().AsyncGet(task_id, on_done); if (!status.ok()) { on_done(status, boost::none); } @@ -81,7 +83,7 @@ void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; - Status status = gcs_table_storage_->TaskTable().BatchDelete(task_ids, on_done); + Status status = gcs_client_.Tasks().AsyncDelete(task_ids, on_done); if (!status.ok()) { on_done(status); } @@ -96,23 +98,23 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque ClientID node_id = ClientID::FromBinary(request.task_lease_data().node_manager_id()); RAY_LOG(DEBUG) << "Adding task lease, job id = " << task_id.JobId() << ", task id = " << task_id << ", node id = " << node_id; - auto on_done = [this, task_id, node_id, request, reply, + auto task_lease_data = std::make_shared(); + task_lease_data->CopyFrom(request.task_lease_data()); + auto on_done = [this, task_id, node_id, task_lease_data, request, reply, send_reply_callback](const Status &status) { if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add task lease, job id = " << task_id.JobId() << ", task id = " << task_id << ", node id = " << node_id; } else { RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_LEASE_CHANNEL, task_id.Hex(), - request.task_lease_data().SerializeAsString(), - nullptr)); + task_lease_data->SerializeAsString(), nullptr)); RAY_LOG(DEBUG) << "Finished adding task lease, job id = " << task_id.JobId() << ", task id = " << task_id << ", node id = " << node_id; } GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; - Status status = gcs_table_storage_->TaskLeaseTable().Put( - task_id, request.task_lease_data(), on_done); + Status status = gcs_client_.Tasks().AsyncAddTaskLease(task_lease_data, on_done); if (!status.ok()) { on_done(status); } @@ -126,7 +128,8 @@ void DefaultTaskInfoHandler::HandleGetTaskLease(const GetTaskLeaseRequest &reque << ", task id = " << task_id; auto on_done = [task_id, request, reply, send_reply_callback]( const Status &status, const boost::optional &result) { - if (status.ok() && result) { + if (status.ok()) { + RAY_DCHECK(result); reply->mutable_task_lease_data()->CopyFrom(*result); } RAY_LOG(DEBUG) << "Finished getting task lease, job id = " << task_id.JobId() @@ -134,7 +137,7 @@ void DefaultTaskInfoHandler::HandleGetTaskLease(const GetTaskLeaseRequest &reque GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); }; - Status status = gcs_table_storage_->TaskLeaseTable().Get(task_id, on_done); + Status status = gcs_client_.Tasks().AsyncGetTaskLease(task_id, on_done); if (!status.ok()) { on_done(status, boost::none); } @@ -150,6 +153,8 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction( << ", task id = " << task_id << ", reconstructions num = " << request.task_reconstruction().num_reconstructions() << ", node id = " << node_id; + auto task_reconstruction_data = std::make_shared(); + task_reconstruction_data->CopyFrom(request.task_reconstruction()); auto on_done = [task_id, node_id, request, reply, send_reply_callback](const Status &status) { if (!status.ok()) { @@ -166,8 +171,8 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction( GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; - Status status = gcs_table_storage_->TaskReconstructionTable().Put( - task_id, request.task_reconstruction(), on_done); + Status status = + gcs_client_.Tasks().AttemptTaskReconstruction(task_reconstruction_data, on_done); if (!status.ok()) { on_done(status); } diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.h b/src/ray/gcs/gcs_server/task_info_handler_impl.h index 29fb8b9ff..410a1fd1d 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.h +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.h @@ -15,7 +15,6 @@ #ifndef RAY_GCS_TASK_INFO_HANDLER_IMPL_H #define RAY_GCS_TASK_INFO_HANDLER_IMPL_H -#include "gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/gcs/redis_gcs_client.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" @@ -26,9 +25,9 @@ namespace rpc { /// This implementation class of `TaskInfoHandler`. class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { public: - explicit DefaultTaskInfoHandler(std::shared_ptr gcs_table_storage, + explicit DefaultTaskInfoHandler(gcs::RedisGcsClient &gcs_client, std::shared_ptr &gcs_pub_sub) - : gcs_table_storage_(gcs_table_storage), gcs_pub_sub_(gcs_pub_sub) {} + : gcs_client_(gcs_client), gcs_pub_sub_(gcs_pub_sub) {} void HandleAddTask(const AddTaskRequest &request, AddTaskReply *reply, SendReplyCallback send_reply_callback) override; @@ -50,7 +49,7 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { SendReplyCallback send_reply_callback) override; private: - std::shared_ptr gcs_table_storage_; + gcs::RedisGcsClient &gcs_client_; std::shared_ptr &gcs_pub_sub_; }; diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index cd77251bc..35533f233 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -23,8 +23,7 @@ namespace ray { namespace gcs { -std::string RedisStoreClient::table_separator_ = ":"; -std::string RedisStoreClient::index_table_separator_ = "&"; +std::string RedisStoreClient::separator_ = ":"; Status RedisStoreClient::AsyncPut(const std::string &table_name, const std::string &key, const std::string &data, @@ -208,7 +207,7 @@ RedisStoreClient::GenCommandsByShards(const std::shared_ptr &redis_ std::string RedisStoreClient::GenRedisKey(const std::string &table_name, const std::string &key) { std::stringstream ss; - ss << table_name << table_separator_ << key; + ss << table_name << separator_ << key; return ss.str(); } @@ -216,35 +215,33 @@ std::string RedisStoreClient::GenRedisKey(const std::string &table_name, const std::string &key, const std::string &index_key) { std::stringstream ss; - ss << table_name << index_table_separator_ << index_key << index_table_separator_ - << key; + ss << table_name << separator_ << index_key << separator_ << key; return ss.str(); } std::string RedisStoreClient::GenRedisMatchPattern(const std::string &table_name) { std::stringstream ss; - ss << table_name << table_separator_ << "*"; + ss << table_name << separator_ << "*"; return ss.str(); } std::string RedisStoreClient::GenRedisMatchPattern(const std::string &table_name, const std::string &index_key) { std::stringstream ss; - ss << table_name << index_table_separator_ << index_key << index_table_separator_ - << "*"; + ss << table_name << separator_ << index_key << separator_ << "*"; return ss.str(); } std::string RedisStoreClient::GetKeyFromRedisKey(const std::string &redis_key, const std::string &table_name) { - auto pos = table_name.size() + table_separator_.size(); + auto pos = table_name.size() + separator_.size(); return redis_key.substr(pos, redis_key.size() - pos); } std::string RedisStoreClient::GetKeyFromRedisKey(const std::string &redis_key, const std::string &table_name, const std::string &index_key) { - auto pos = table_name.size() + index_table_separator_.size() * 2 + index_key.size(); + auto pos = table_name.size() + separator_.size() * 2 + index_key.size(); return redis_key.substr(pos, redis_key.size() - pos); } diff --git a/src/ray/gcs/store_client/redis_store_client.h b/src/ray/gcs/store_client/redis_store_client.h index b2cada940..67f1c5f80 100644 --- a/src/ray/gcs/store_client/redis_store_client.h +++ b/src/ray/gcs/store_client/redis_store_client.h @@ -112,8 +112,7 @@ class RedisStoreClient : public StoreClient { const std::vector &keys); /// The separator is used when building redis key. - static std::string table_separator_; - static std::string index_table_separator_; + static std::string separator_; static std::string GenRedisKey(const std::string &table_name, const std::string &key);