Support reloading storage data when gcs server restarts (#8650)

This commit is contained in:
fangfengbin 2020-06-04 14:53:20 +08:00 committed by GitHub
parent ea05ebe89e
commit 84a8f2ccb5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 244 additions and 59 deletions

View file

@ -454,3 +454,11 @@ py_test(
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_gcs_fault_tolerance",
size = "medium",
srcs = SRCS + ["test_gcs_fault_tolerance.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)

View file

@ -0,0 +1,45 @@
import sys
import time
import ray
def test_gcs_server_restart():
ray.init()
@ray.remote
class Increase:
def method(self, x):
return x + 2
@ray.remote
def increase(x):
return x + 1
actor1 = Increase.remote()
result = ray.get(actor1.method.remote(1))
assert result == 3
ray.worker._global_node.kill_gcs_server()
ray.worker._global_node.start_gcs_server()
# TODO(ffbin): After gcs server restarts, if an RPC request is sent to
# gcs server immediately, gcs server cannot receive the request,
# but the request will return success. We will fix this in the next pr.
time.sleep(1)
result = ray.get(actor1.method.remote(7))
assert result == 9
actor2 = Increase.remote()
result = ray.get(actor2.method.remote(2))
assert result == 4
result = ray.get(increase.remote(1))
assert result == 2
ray.shutdown()
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -25,6 +25,9 @@ namespace ray {
namespace gcs {
/// This callback is used to notify when a operation completes.
using EmptyCallback = std::function<void()>;
/// This callback is used to notify when a write/subscribe to GCS completes.
/// \param status Status indicates whether the write/subscribe was successful.
using StatusCallback = std::function<void(Status status)>;

View file

@ -1021,6 +1021,30 @@ TEST_F(ServiceBasedGcsClientTest, TestWorkerTableReSubscribe) {
WaitPendingDone(worker_failure_count, 1);
}
TEST_F(ServiceBasedGcsClientTest, TestGcsTableReload) {
ObjectID object_id = ObjectID::FromRandom();
ClientID node_id = ClientID::FromRandom();
// Register node to GCS.
auto node_info = Mocker::GenNodeInfo();
ASSERT_TRUE(RegisterNode(*node_info));
// Add location of object to GCS.
ASSERT_TRUE(AddLocation(object_id, node_id));
// Restart GCS.
RestartGcsServer();
// Get information of nodes from GCS.
std::vector<rpc::GcsNodeInfo> node_list = GetNodeInfoList();
EXPECT_EQ(node_list.size(), 1);
// Get object's locations from GCS.
auto locations = GetLocations(object_id);
ASSERT_EQ(locations.size(), 1);
ASSERT_EQ(locations.back().manager(), node_id.Binary());
}
TEST_F(ServiceBasedGcsClientTest, TestGcsRedisFailureDetector) {
// Stop redis.
TestSetupUtil::ShutDownRedisServers();

View file

@ -92,7 +92,7 @@ GcsActorManager::GcsActorManager(std::shared_ptr<GcsActorSchedulerInterface> sch
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
const rpc::ClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(gcs_table_storage),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_pub_sub_(std::move(gcs_pub_sub)),
worker_client_factory_(worker_client_factory) {}
@ -679,7 +679,7 @@ void GcsActorManager::OnActorCreationFailed(std::shared_ptr<GcsActor> actor) {
pending_actors_.emplace_back(std::move(actor));
}
void GcsActorManager::OnActorCreationSuccess(std::shared_ptr<GcsActor> actor) {
void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &actor) {
auto actor_id = actor->GetActorID();
RAY_CHECK(registered_actors_.count(actor_id) > 0);
actor->UpdateState(rpc::ActorTableData::ALIVE);
@ -721,5 +721,36 @@ void GcsActorManager::SchedulePendingActors() {
}
}
void GcsActorManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "Loading initial data.";
auto callback = [this,
done](const std::unordered_map<ActorID, ActorTableData> &result) {
for (auto &item : result) {
if (item.second.state() != ray::rpc::ActorTableData::DEAD) {
auto actor = std::make_shared<GcsActor>(item.second);
registered_actors_.emplace(item.first, actor);
if (actor->IsDetached()) {
named_actors_.emplace(actor->GetName(), actor->GetActorID());
}
created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(),
actor->GetActorID());
auto &workers = owners_[actor->GetNodeID()];
auto it = workers.find(actor->GetWorkerID());
if (it == workers.end()) {
std::shared_ptr<rpc::CoreWorkerClientInterface> client =
worker_client_factory_(actor->GetOwnerAddress());
workers.emplace(actor->GetOwnerID(), Owner(std::move(client)));
}
}
}
RAY_LOG(INFO) << "Finished loading initial data.";
done();
};
RAY_CHECK_OK(gcs_table_storage_->ActorTable().GetAll(callback));
}
} // namespace gcs
} // namespace ray

View file

@ -215,7 +215,13 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// creation task has been scheduled successfully.
///
/// \param actor The actor that has been created.
void OnActorCreationSuccess(std::shared_ptr<GcsActor> actor);
void OnActorCreationSuccess(const std::shared_ptr<GcsActor> &actor);
/// Load initial data from gcs storage to memory cache asynchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param done Callback that will be called when load is complete.
void LoadInitialData(const EmptyCallback &done);
private:
/// A data structure representing an actor's owner.

View file

@ -21,10 +21,11 @@ namespace ray {
namespace gcs {
GcsNodeManager::NodeFailureDetector::NodeFailureDetector(
boost::asio::io_service &io_service, gcs::NodeInfoAccessor &node_info_accessor,
boost::asio::io_service &io_service,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::function<void(const ClientID &)> on_node_death_callback)
: node_info_accessor_(node_info_accessor),
: gcs_table_storage_(std::move(gcs_table_storage)),
on_node_death_callback_(std::move(on_node_death_callback)),
num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()),
detect_timer_(io_service),
@ -103,31 +104,29 @@ void GcsNodeManager::NodeFailureDetector::ScheduleTick() {
//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(boost::asio::io_service &io_service,
gcs::NodeInfoAccessor &node_info_accessor,
gcs::ErrorInfoAccessor &error_info_accessor,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: node_info_accessor_(node_info_accessor),
error_info_accessor_(error_info_accessor),
: error_info_accessor_(error_info_accessor),
node_failure_detector_(new NodeFailureDetector(
io_service, node_info_accessor, gcs_pub_sub,
io_service, gcs_table_storage, gcs_pub_sub,
[this](const ClientID &node_id) {
if (auto node = RemoveNode(node_id, /* is_intended = */ false)) {
node->set_state(rpc::GcsNodeInfo::DEAD);
RAY_CHECK(dead_nodes_.emplace(node_id, node).second);
auto on_done = [this, node_id, node](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
node->SerializeAsString(), nullptr));
auto on_done = [this, node_id, node](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
node->SerializeAsString(), nullptr));
};
RAY_CHECK_OK(
gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done));
};
RAY_CHECK_OK(node_info_accessor_.AsyncUnregister(node_id, on_done));
// TODO(Shanly): Remove node resources from resource table.
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Delete(node_id, on_done));
}
})),
gcs_pub_sub_(gcs_pub_sub),
gcs_table_storage_(gcs_table_storage) {
// TODO(Shanly): Load node info list from storage synchronously.
// TODO(Shanly): Load cluster resources from storage synchronously.
}
gcs_table_storage_(gcs_table_storage) {}
void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request,
rpc::RegisterNodeReply *reply,
@ -156,16 +155,19 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ
node->set_state(rpc::GcsNodeInfo::DEAD);
RAY_CHECK(dead_nodes_.emplace(node_id, node).second);
auto on_done = [this, node_id, node, reply, send_reply_callback](Status status) {
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id;
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
node->SerializeAsString(), nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
auto on_done = [this, node_id, node, reply,
send_reply_callback](const Status &status) {
auto on_done = [this, node_id, node, reply,
send_reply_callback](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
node->SerializeAsString(), nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id;
};
RAY_CHECK_OK(gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done));
};
// Update node state to DEAD instead of deleting it.
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done));
// TODO(Shanly): Remove node resources from resource table.
}
}
@ -315,7 +317,7 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
// Remove from alive nodes.
alive_nodes_.erase(iter);
// Remove from cluster resources.
RAY_CHECK(cluster_resources_.erase(node_id) != 0);
cluster_resources_.erase(node_id);
if (!is_intended) {
// Broadcast a warning to all of the drivers indicating that the node
// has been marked as dead.
@ -338,5 +340,32 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
return removed_node;
}
void GcsNodeManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "Loading initial data.";
auto get_node_callback = [this, done](
const std::unordered_map<ClientID, GcsNodeInfo> &result) {
for (auto &item : result) {
if (item.second.state() == rpc::GcsNodeInfo::ALIVE) {
alive_nodes_.emplace(item.first, std::make_shared<rpc::GcsNodeInfo>(item.second));
} else if (item.second.state() == rpc::GcsNodeInfo::DEAD) {
dead_nodes_.emplace(item.first, std::make_shared<rpc::GcsNodeInfo>(item.second));
}
}
auto get_node_resource_callback =
[this, done](const std::unordered_map<ClientID, ResourceMap> &result) {
for (auto &item : result) {
cluster_resources_.emplace(item.first, item.second);
}
RAY_LOG(INFO) << "Finished loading initial data.";
done();
};
RAY_CHECK_OK(
gcs_table_storage_->NodeResourceTable().GetAll(get_node_resource_callback));
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(get_node_callback));
}
} // namespace gcs
} // namespace ray

View file

@ -36,13 +36,11 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// Create a GcsNodeManager.
///
/// \param io_service The event loop to run the monitor on.
/// \param node_info_accessor The node info accessor.
/// \param error_info_accessor The error info accessor, which is used to report error.
/// \param gcs_pub_sub GCS message pushlisher.
/// \param gcs_pub_sub GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
/// when detecting the death of nodes.
explicit GcsNodeManager(boost::asio::io_service &io_service,
gcs::NodeInfoAccessor &node_info_accessor,
gcs::ErrorInfoAccessor &error_info_accessor,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage);
@ -127,15 +125,25 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
node_added_listeners_.emplace_back(std::move(listener));
}
/// Load initial data from gcs storage to memory cache asynchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param done Callback that will be called when load is complete.
void LoadInitialData(const EmptyCallback &done);
protected:
class NodeFailureDetector {
public:
/// Create a NodeFailureDetector.
///
/// \param io_service The event loop to run the monitor on.
/// \param node_info_accessor The node info accessor.
/// \param gcs_table_storage GCS table external storage accessor.
/// \param gcs_pub_sub GCS message publisher.
/// \param on_node_death_callback Callback that will be called when node death is
/// detected.
explicit NodeFailureDetector(
boost::asio::io_service &io_service, gcs::NodeInfoAccessor &node_info_accessor,
boost::asio::io_service &io_service,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::function<void(const ClientID &)> on_node_death_callback);
@ -169,8 +177,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
void ScheduleTick();
protected:
/// Node info accessor.
gcs::NodeInfoAccessor &node_info_accessor_;
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// The callback of node death.
std::function<void(const ClientID &)> on_node_death_callback_;
/// The number of heartbeats that can be missed before a node is removed.
@ -187,8 +195,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
};
private:
/// Node info accessor.
gcs::NodeInfoAccessor &node_info_accessor_;
/// Error info accessor.
gcs::ErrorInfoAccessor &error_info_accessor_;
/// Detector to detect the failure of node.

View file

@ -266,6 +266,28 @@ std::shared_ptr<ObjectTableDataList> GcsObjectManager::GenObjectTableDataList(
return object_table_data_list;
}
void GcsObjectManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "Loading initial data.";
auto callback = [this, done](
const std::unordered_map<ObjectID, ObjectTableDataList> &result) {
absl::flat_hash_map<ClientID, ObjectSet> node_to_objects;
for (auto &item : result) {
auto object_list = item.second;
for (int index = 0; index < object_list.items_size(); ++index) {
node_to_objects[ClientID::FromBinary(object_list.items(index).manager())].insert(
item.first);
}
}
for (auto &item : node_to_objects) {
AddObjectsLocation(item.first, item.second);
}
RAY_LOG(INFO) << "Finished loading initial data.";
done();
};
RAY_CHECK_OK(gcs_table_storage_->ObjectTable().GetAll(callback));
}
} // namespace gcs
} // namespace ray

View file

@ -54,6 +54,12 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
rpc::RemoveObjectLocationReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Load initial data from gcs storage to memory cache asynchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param done Callback that will be called when load is complete.
void LoadInitialData(const EmptyCallback &done);
protected:
typedef absl::flat_hash_set<ClientID> LocationSet;

View file

@ -71,9 +71,9 @@ void GcsServer::Start() {
new rpc::NodeInfoGrpcService(main_service_, *gcs_node_manager_));
rpc_server_.RegisterService(*node_info_service_);
object_info_handler_ = InitObjectInfoHandler();
gcs_object_manager_ = InitObjectManager();
object_info_service_.reset(
new rpc::ObjectInfoGrpcService(main_service_, *object_info_handler_));
new rpc::ObjectInfoGrpcService(main_service_, *gcs_object_manager_));
rpc_server_.RegisterService(*object_info_service_);
task_info_handler_ = InitTaskInfoHandler();
@ -95,12 +95,23 @@ void GcsServer::Start() {
new rpc::WorkerInfoGrpcService(main_service_, *worker_info_handler_));
rpc_server_.RegisterService(*worker_info_service_);
// Run rpc server.
rpc_server_.Run();
auto load_completed_count = std::make_shared<int>(0);
int load_count = 3;
auto on_done = [this, load_count, load_completed_count]() {
++(*load_completed_count);
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
is_started_ = true;
if (*load_completed_count == load_count) {
// Start RPC server when all tables have finished loading initial data.
rpc_server_.Run();
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
is_started_ = true;
}
};
gcs_actor_manager_->LoadInitialData(on_done);
gcs_object_manager_->LoadInitialData(on_done);
gcs_node_manager_->LoadInitialData(on_done);
// Run the event loop.
// Using boost::asio::io_context::work to avoid ending the event loop when
@ -132,8 +143,7 @@ void GcsServer::InitBackendClient() {
void GcsServer::InitGcsNodeManager() {
RAY_CHECK(redis_gcs_client_ != nullptr);
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
main_service_, redis_gcs_client_->Nodes(), redis_gcs_client_->Errors(),
gcs_pub_sub_, gcs_table_storage_);
main_service_, redis_gcs_client_->Errors(), gcs_pub_sub_, gcs_table_storage_);
}
void GcsServer::InitGcsActorManager() {
@ -198,7 +208,7 @@ std::unique_ptr<rpc::JobInfoHandler> GcsServer::InitJobInfoHandler() {
new rpc::DefaultJobInfoHandler(gcs_table_storage_, gcs_pub_sub_));
}
std::unique_ptr<rpc::ObjectInfoHandler> GcsServer::InitObjectInfoHandler() {
std::unique_ptr<GcsObjectManager> GcsServer::InitObjectManager() {
return std::unique_ptr<GcsObjectManager>(
new GcsObjectManager(gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_));
}

View file

@ -19,6 +19,7 @@
#include <ray/gcs/redis_gcs_client.h>
#include <ray/rpc/client_call.h>
#include <ray/rpc/gcs_server/gcs_rpc_server.h>
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
@ -82,8 +83,8 @@ class GcsServer {
/// The job info handler
virtual std::unique_ptr<rpc::JobInfoHandler> InitJobInfoHandler();
/// The object info handler
virtual std::unique_ptr<rpc::ObjectInfoHandler> InitObjectInfoHandler();
/// The object manager
virtual std::unique_ptr<GcsObjectManager> InitObjectManager();
/// The task info handler
virtual std::unique_ptr<rpc::TaskInfoHandler> InitTaskInfoHandler();
@ -127,7 +128,7 @@ class GcsServer {
/// Node info handler and service
std::unique_ptr<rpc::NodeInfoGrpcService> node_info_service_;
/// Object info handler and service
std::unique_ptr<rpc::ObjectInfoHandler> object_info_handler_;
std::unique_ptr<gcs::GcsObjectManager> gcs_object_manager_;
std::unique_ptr<rpc::ObjectInfoGrpcService> object_info_service_;
/// Task info handler and service
std::unique_ptr<rpc::TaskInfoHandler> task_info_handler_;

View file

@ -28,8 +28,7 @@ class GcsActorSchedulerTest : public ::testing::Test {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, node_info_accessor_, error_info_accessor_, gcs_pub_sub_,
gcs_table_storage_);
io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
@ -54,7 +53,6 @@ class GcsActorSchedulerTest : public ::testing::Test {
std::shared_ptr<gcs::StoreClient> store_client_;
std::shared_ptr<GcsServerMocker::MockedGcsActorTable> gcs_actor_table_;
GcsServerMocker::MockedNodeInfoAccessor node_info_accessor_;
GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_;
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;

View file

@ -27,10 +27,9 @@ class GcsNodeManagerTest : public ::testing::Test {
TEST_F(GcsNodeManagerTest, TestManagement) {
boost::asio::io_service io_service;
auto node_info_accessor = GcsServerMocker::MockedNodeInfoAccessor();
auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor();
gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor,
gcs_pub_sub_, gcs_table_storage_);
gcs::GcsNodeManager node_manager(io_service, error_info_accessor, gcs_pub_sub_,
gcs_table_storage_);
// Test Add/Get/Remove functionality.
auto node = Mocker::GenNodeInfo();
auto node_id = ClientID::FromBinary(node->node_id());
@ -44,10 +43,9 @@ TEST_F(GcsNodeManagerTest, TestManagement) {
TEST_F(GcsNodeManagerTest, TestListener) {
boost::asio::io_service io_service;
auto node_info_accessor = GcsServerMocker::MockedNodeInfoAccessor();
auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor();
gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor,
gcs_pub_sub_, gcs_table_storage_);
gcs::GcsNodeManager node_manager(io_service, error_info_accessor, gcs_pub_sub_,
gcs_table_storage_);
// Test AddNodeAddedListener.
int node_count = 1000;
std::vector<std::shared_ptr<rpc::GcsNodeInfo>> added_nodes;

View file

@ -54,8 +54,7 @@ class GcsObjectManagerTest : public ::testing::Test {
void SetUp() override {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, node_info_accessor_, error_info_accessor_, gcs_pub_sub_,
gcs_table_storage_);
io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_);
gcs_object_manager_ = std::make_shared<MockedGcsObjectManager>(
gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_);
GenTestData();
@ -83,7 +82,6 @@ class GcsObjectManagerTest : public ::testing::Test {
protected:
boost::asio::io_service io_service_;
GcsServerMocker::MockedNodeInfoAccessor node_info_accessor_;
GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;