From 84a8f2ccb5ac076d30f10c838baca784389b7fba Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Thu, 4 Jun 2020 14:53:20 +0800 Subject: [PATCH] Support reloading storage data when gcs server restarts (#8650) --- python/ray/tests/BUILD | 8 ++ python/ray/tests/test_gcs_fault_tolerance.py | 45 ++++++++++++ src/ray/gcs/callback.h | 3 + .../test/service_based_gcs_client_test.cc | 24 ++++++ src/ray/gcs/gcs_server/gcs_actor_manager.cc | 35 ++++++++- src/ray/gcs/gcs_server/gcs_actor_manager.h | 8 +- src/ray/gcs/gcs_server/gcs_node_manager.cc | 73 +++++++++++++------ src/ray/gcs/gcs_server/gcs_node_manager.h | 24 +++--- src/ray/gcs/gcs_server/gcs_object_manager.cc | 22 ++++++ src/ray/gcs/gcs_server/gcs_object_manager.h | 6 ++ src/ray/gcs/gcs_server/gcs_server.cc | 30 +++++--- src/ray/gcs/gcs_server/gcs_server.h | 7 +- .../test/gcs_actor_scheduler_test.cc | 4 +- .../gcs_server/test/gcs_node_manager_test.cc | 10 +-- .../test/gcs_object_manager_test.cc | 4 +- 15 files changed, 244 insertions(+), 59 deletions(-) create mode 100644 python/ray/tests/test_gcs_fault_tolerance.py diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 70111a1bd..0531796b2 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -454,3 +454,11 @@ py_test( tags = ["exclusive"], deps = ["//:ray_lib"], ) + +py_test( + name = "test_gcs_fault_tolerance", + size = "medium", + srcs = SRCS + ["test_gcs_fault_tolerance.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], +) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py new file mode 100644 index 000000000..c8e1eb64d --- /dev/null +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -0,0 +1,45 @@ +import sys +import time + +import ray + + +def test_gcs_server_restart(): + ray.init() + + @ray.remote + class Increase: + def method(self, x): + return x + 2 + + @ray.remote + def increase(x): + return x + 1 + + actor1 = Increase.remote() + result = ray.get(actor1.method.remote(1)) + assert result == 3 + + ray.worker._global_node.kill_gcs_server() + ray.worker._global_node.start_gcs_server() + + # TODO(ffbin): After gcs server restarts, if an RPC request is sent to + # gcs server immediately, gcs server cannot receive the request, + # but the request will return success. We will fix this in the next pr. + time.sleep(1) + + result = ray.get(actor1.method.remote(7)) + assert result == 9 + + actor2 = Increase.remote() + result = ray.get(actor2.method.remote(2)) + assert result == 4 + + result = ray.get(increase.remote(1)) + assert result == 2 + ray.shutdown() + + +if __name__ == "__main__": + import pytest + sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/callback.h b/src/ray/gcs/callback.h index 62e8073ad..80ea7a280 100644 --- a/src/ray/gcs/callback.h +++ b/src/ray/gcs/callback.h @@ -25,6 +25,9 @@ namespace ray { namespace gcs { +/// This callback is used to notify when a operation completes. +using EmptyCallback = std::function; + /// This callback is used to notify when a write/subscribe to GCS completes. /// \param status Status indicates whether the write/subscribe was successful. using StatusCallback = std::function; diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index e32f7f247..4024d2c27 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -1021,6 +1021,30 @@ TEST_F(ServiceBasedGcsClientTest, TestWorkerTableReSubscribe) { WaitPendingDone(worker_failure_count, 1); } +TEST_F(ServiceBasedGcsClientTest, TestGcsTableReload) { + ObjectID object_id = ObjectID::FromRandom(); + ClientID node_id = ClientID::FromRandom(); + + // Register node to GCS. + auto node_info = Mocker::GenNodeInfo(); + ASSERT_TRUE(RegisterNode(*node_info)); + + // Add location of object to GCS. + ASSERT_TRUE(AddLocation(object_id, node_id)); + + // Restart GCS. + RestartGcsServer(); + + // Get information of nodes from GCS. + std::vector node_list = GetNodeInfoList(); + EXPECT_EQ(node_list.size(), 1); + + // Get object's locations from GCS. + auto locations = GetLocations(object_id); + ASSERT_EQ(locations.size(), 1); + ASSERT_EQ(locations.back().manager(), node_id.Binary()); +} + TEST_F(ServiceBasedGcsClientTest, TestGcsRedisFailureDetector) { // Stop redis. TestSetupUtil::ShutDownRedisServers(); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 0c4089867..cda42c0de 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -92,7 +92,7 @@ GcsActorManager::GcsActorManager(std::shared_ptr sch std::shared_ptr gcs_pub_sub, const rpc::ClientFactoryFn &worker_client_factory) : gcs_actor_scheduler_(std::move(scheduler)), - gcs_table_storage_(gcs_table_storage), + gcs_table_storage_(std::move(gcs_table_storage)), gcs_pub_sub_(std::move(gcs_pub_sub)), worker_client_factory_(worker_client_factory) {} @@ -679,7 +679,7 @@ void GcsActorManager::OnActorCreationFailed(std::shared_ptr actor) { pending_actors_.emplace_back(std::move(actor)); } -void GcsActorManager::OnActorCreationSuccess(std::shared_ptr actor) { +void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &actor) { auto actor_id = actor->GetActorID(); RAY_CHECK(registered_actors_.count(actor_id) > 0); actor->UpdateState(rpc::ActorTableData::ALIVE); @@ -721,5 +721,36 @@ void GcsActorManager::SchedulePendingActors() { } } +void GcsActorManager::LoadInitialData(const EmptyCallback &done) { + RAY_LOG(INFO) << "Loading initial data."; + auto callback = [this, + done](const std::unordered_map &result) { + for (auto &item : result) { + if (item.second.state() != ray::rpc::ActorTableData::DEAD) { + auto actor = std::make_shared(item.second); + registered_actors_.emplace(item.first, actor); + + if (actor->IsDetached()) { + named_actors_.emplace(actor->GetName(), actor->GetActorID()); + } + + created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(), + actor->GetActorID()); + + auto &workers = owners_[actor->GetNodeID()]; + auto it = workers.find(actor->GetWorkerID()); + if (it == workers.end()) { + std::shared_ptr client = + worker_client_factory_(actor->GetOwnerAddress()); + workers.emplace(actor->GetOwnerID(), Owner(std::move(client))); + } + } + } + RAY_LOG(INFO) << "Finished loading initial data."; + done(); + }; + RAY_CHECK_OK(gcs_table_storage_->ActorTable().GetAll(callback)); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 065613193..dafcbc8cc 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -215,7 +215,13 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// creation task has been scheduled successfully. /// /// \param actor The actor that has been created. - void OnActorCreationSuccess(std::shared_ptr actor); + void OnActorCreationSuccess(const std::shared_ptr &actor); + + /// Load initial data from gcs storage to memory cache asynchronously. + /// This should be called when GCS server restarts after a failure. + /// + /// \param done Callback that will be called when load is complete. + void LoadInitialData(const EmptyCallback &done); private: /// A data structure representing an actor's owner. diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index f2383bf16..52e6cfcfc 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -21,10 +21,11 @@ namespace ray { namespace gcs { GcsNodeManager::NodeFailureDetector::NodeFailureDetector( - boost::asio::io_service &io_service, gcs::NodeInfoAccessor &node_info_accessor, + boost::asio::io_service &io_service, + std::shared_ptr gcs_table_storage, std::shared_ptr gcs_pub_sub, std::function on_node_death_callback) - : node_info_accessor_(node_info_accessor), + : gcs_table_storage_(std::move(gcs_table_storage)), on_node_death_callback_(std::move(on_node_death_callback)), num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), detect_timer_(io_service), @@ -103,31 +104,29 @@ void GcsNodeManager::NodeFailureDetector::ScheduleTick() { ////////////////////////////////////////////////////////////////////////////////////////// GcsNodeManager::GcsNodeManager(boost::asio::io_service &io_service, - gcs::NodeInfoAccessor &node_info_accessor, gcs::ErrorInfoAccessor &error_info_accessor, std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage) - : node_info_accessor_(node_info_accessor), - error_info_accessor_(error_info_accessor), + : error_info_accessor_(error_info_accessor), node_failure_detector_(new NodeFailureDetector( - io_service, node_info_accessor, gcs_pub_sub, + io_service, gcs_table_storage, gcs_pub_sub, [this](const ClientID &node_id) { if (auto node = RemoveNode(node_id, /* is_intended = */ false)) { node->set_state(rpc::GcsNodeInfo::DEAD); RAY_CHECK(dead_nodes_.emplace(node_id, node).second); auto on_done = [this, node_id, node](const Status &status) { - RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), - node->SerializeAsString(), nullptr)); + auto on_done = [this, node_id, node](const Status &status) { + RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), + node->SerializeAsString(), nullptr)); + }; + RAY_CHECK_OK( + gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done)); }; - RAY_CHECK_OK(node_info_accessor_.AsyncUnregister(node_id, on_done)); - // TODO(Shanly): Remove node resources from resource table. + RAY_CHECK_OK(gcs_table_storage_->NodeTable().Delete(node_id, on_done)); } })), gcs_pub_sub_(gcs_pub_sub), - gcs_table_storage_(gcs_table_storage) { - // TODO(Shanly): Load node info list from storage synchronously. - // TODO(Shanly): Load cluster resources from storage synchronously. -} + gcs_table_storage_(gcs_table_storage) {} void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request, rpc::RegisterNodeReply *reply, @@ -156,16 +155,19 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ node->set_state(rpc::GcsNodeInfo::DEAD); RAY_CHECK(dead_nodes_.emplace(node_id, node).second); - auto on_done = [this, node_id, node, reply, send_reply_callback](Status status) { - RAY_CHECK_OK(status); - RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id; - RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), - node->SerializeAsString(), nullptr)); - GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + auto on_done = [this, node_id, node, reply, + send_reply_callback](const Status &status) { + auto on_done = [this, node_id, node, reply, + send_reply_callback](const Status &status) { + RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), + node->SerializeAsString(), nullptr)); + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id; + }; + RAY_CHECK_OK(gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done)); }; // Update node state to DEAD instead of deleting it. RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done)); - // TODO(Shanly): Remove node resources from resource table. } } @@ -315,7 +317,7 @@ std::shared_ptr GcsNodeManager::RemoveNode( // Remove from alive nodes. alive_nodes_.erase(iter); // Remove from cluster resources. - RAY_CHECK(cluster_resources_.erase(node_id) != 0); + cluster_resources_.erase(node_id); if (!is_intended) { // Broadcast a warning to all of the drivers indicating that the node // has been marked as dead. @@ -338,5 +340,32 @@ std::shared_ptr GcsNodeManager::RemoveNode( return removed_node; } +void GcsNodeManager::LoadInitialData(const EmptyCallback &done) { + RAY_LOG(INFO) << "Loading initial data."; + + auto get_node_callback = [this, done]( + const std::unordered_map &result) { + for (auto &item : result) { + if (item.second.state() == rpc::GcsNodeInfo::ALIVE) { + alive_nodes_.emplace(item.first, std::make_shared(item.second)); + } else if (item.second.state() == rpc::GcsNodeInfo::DEAD) { + dead_nodes_.emplace(item.first, std::make_shared(item.second)); + } + } + + auto get_node_resource_callback = + [this, done](const std::unordered_map &result) { + for (auto &item : result) { + cluster_resources_.emplace(item.first, item.second); + } + RAY_LOG(INFO) << "Finished loading initial data."; + done(); + }; + RAY_CHECK_OK( + gcs_table_storage_->NodeResourceTable().GetAll(get_node_resource_callback)); + }; + RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(get_node_callback)); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index 8c889b8ec..dbcec1f30 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -36,13 +36,11 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// Create a GcsNodeManager. /// /// \param io_service The event loop to run the monitor on. - /// \param node_info_accessor The node info accessor. /// \param error_info_accessor The error info accessor, which is used to report error. - /// \param gcs_pub_sub GCS message pushlisher. + /// \param gcs_pub_sub GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. /// when detecting the death of nodes. explicit GcsNodeManager(boost::asio::io_service &io_service, - gcs::NodeInfoAccessor &node_info_accessor, gcs::ErrorInfoAccessor &error_info_accessor, std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage); @@ -127,15 +125,25 @@ class GcsNodeManager : public rpc::NodeInfoHandler { node_added_listeners_.emplace_back(std::move(listener)); } + /// Load initial data from gcs storage to memory cache asynchronously. + /// This should be called when GCS server restarts after a failure. + /// + /// \param done Callback that will be called when load is complete. + void LoadInitialData(const EmptyCallback &done); + protected: class NodeFailureDetector { public: /// Create a NodeFailureDetector. /// /// \param io_service The event loop to run the monitor on. - /// \param node_info_accessor The node info accessor. + /// \param gcs_table_storage GCS table external storage accessor. + /// \param gcs_pub_sub GCS message publisher. + /// \param on_node_death_callback Callback that will be called when node death is + /// detected. explicit NodeFailureDetector( - boost::asio::io_service &io_service, gcs::NodeInfoAccessor &node_info_accessor, + boost::asio::io_service &io_service, + std::shared_ptr gcs_table_storage, std::shared_ptr gcs_pub_sub, std::function on_node_death_callback); @@ -169,8 +177,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { void ScheduleTick(); protected: - /// Node info accessor. - gcs::NodeInfoAccessor &node_info_accessor_; + /// Storage for GCS tables. + std::shared_ptr gcs_table_storage_; /// The callback of node death. std::function on_node_death_callback_; /// The number of heartbeats that can be missed before a node is removed. @@ -187,8 +195,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler { }; private: - /// Node info accessor. - gcs::NodeInfoAccessor &node_info_accessor_; /// Error info accessor. gcs::ErrorInfoAccessor &error_info_accessor_; /// Detector to detect the failure of node. diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.cc b/src/ray/gcs/gcs_server/gcs_object_manager.cc index 770ce6cfd..ff162c168 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_object_manager.cc @@ -266,6 +266,28 @@ std::shared_ptr GcsObjectManager::GenObjectTableDataList( return object_table_data_list; } +void GcsObjectManager::LoadInitialData(const EmptyCallback &done) { + RAY_LOG(INFO) << "Loading initial data."; + auto callback = [this, done]( + const std::unordered_map &result) { + absl::flat_hash_map node_to_objects; + for (auto &item : result) { + auto object_list = item.second; + for (int index = 0; index < object_list.items_size(); ++index) { + node_to_objects[ClientID::FromBinary(object_list.items(index).manager())].insert( + item.first); + } + } + + for (auto &item : node_to_objects) { + AddObjectsLocation(item.first, item.second); + } + RAY_LOG(INFO) << "Finished loading initial data."; + done(); + }; + RAY_CHECK_OK(gcs_table_storage_->ObjectTable().GetAll(callback)); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.h b/src/ray/gcs/gcs_server/gcs_object_manager.h index 9fce7a0d0..f6d55b09c 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.h +++ b/src/ray/gcs/gcs_server/gcs_object_manager.h @@ -54,6 +54,12 @@ class GcsObjectManager : public rpc::ObjectInfoHandler { rpc::RemoveObjectLocationReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Load initial data from gcs storage to memory cache asynchronously. + /// This should be called when GCS server restarts after a failure. + /// + /// \param done Callback that will be called when load is complete. + void LoadInitialData(const EmptyCallback &done); + protected: typedef absl::flat_hash_set LocationSet; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index f3d5a4617..edd6022a1 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -71,9 +71,9 @@ void GcsServer::Start() { new rpc::NodeInfoGrpcService(main_service_, *gcs_node_manager_)); rpc_server_.RegisterService(*node_info_service_); - object_info_handler_ = InitObjectInfoHandler(); + gcs_object_manager_ = InitObjectManager(); object_info_service_.reset( - new rpc::ObjectInfoGrpcService(main_service_, *object_info_handler_)); + new rpc::ObjectInfoGrpcService(main_service_, *gcs_object_manager_)); rpc_server_.RegisterService(*object_info_service_); task_info_handler_ = InitTaskInfoHandler(); @@ -95,12 +95,23 @@ void GcsServer::Start() { new rpc::WorkerInfoGrpcService(main_service_, *worker_info_handler_)); rpc_server_.RegisterService(*worker_info_service_); - // Run rpc server. - rpc_server_.Run(); + auto load_completed_count = std::make_shared(0); + int load_count = 3; + auto on_done = [this, load_count, load_completed_count]() { + ++(*load_completed_count); - // Store gcs rpc server address in redis. - StoreGcsServerAddressInRedis(); - is_started_ = true; + if (*load_completed_count == load_count) { + // Start RPC server when all tables have finished loading initial data. + rpc_server_.Run(); + + // Store gcs rpc server address in redis. + StoreGcsServerAddressInRedis(); + is_started_ = true; + } + }; + gcs_actor_manager_->LoadInitialData(on_done); + gcs_object_manager_->LoadInitialData(on_done); + gcs_node_manager_->LoadInitialData(on_done); // Run the event loop. // Using boost::asio::io_context::work to avoid ending the event loop when @@ -132,8 +143,7 @@ void GcsServer::InitBackendClient() { void GcsServer::InitGcsNodeManager() { RAY_CHECK(redis_gcs_client_ != nullptr); gcs_node_manager_ = std::make_shared( - main_service_, redis_gcs_client_->Nodes(), redis_gcs_client_->Errors(), - gcs_pub_sub_, gcs_table_storage_); + main_service_, redis_gcs_client_->Errors(), gcs_pub_sub_, gcs_table_storage_); } void GcsServer::InitGcsActorManager() { @@ -198,7 +208,7 @@ std::unique_ptr GcsServer::InitJobInfoHandler() { new rpc::DefaultJobInfoHandler(gcs_table_storage_, gcs_pub_sub_)); } -std::unique_ptr GcsServer::InitObjectInfoHandler() { +std::unique_ptr GcsServer::InitObjectManager() { return std::unique_ptr( new GcsObjectManager(gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_)); } diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 68caaf7c1..e510b8df3 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -19,6 +19,7 @@ #include #include #include +#include "ray/gcs/gcs_server/gcs_object_manager.h" #include "ray/gcs/gcs_server/gcs_redis_failure_detector.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" @@ -82,8 +83,8 @@ class GcsServer { /// The job info handler virtual std::unique_ptr InitJobInfoHandler(); - /// The object info handler - virtual std::unique_ptr InitObjectInfoHandler(); + /// The object manager + virtual std::unique_ptr InitObjectManager(); /// The task info handler virtual std::unique_ptr InitTaskInfoHandler(); @@ -127,7 +128,7 @@ class GcsServer { /// Node info handler and service std::unique_ptr node_info_service_; /// Object info handler and service - std::unique_ptr object_info_handler_; + std::unique_ptr gcs_object_manager_; std::unique_ptr object_info_service_; /// Task info handler and service std::unique_ptr task_info_handler_; diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc index c5670c0d4..82211d37d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_test.cc @@ -28,8 +28,7 @@ class GcsActorSchedulerTest : public ::testing::Test { gcs_pub_sub_ = std::make_shared(redis_client_); gcs_table_storage_ = std::make_shared(redis_client_); gcs_node_manager_ = std::make_shared( - io_service_, node_info_accessor_, error_info_accessor_, gcs_pub_sub_, - gcs_table_storage_); + io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_); store_client_ = std::make_shared(io_service_); gcs_actor_table_ = std::make_shared(store_client_); @@ -54,7 +53,6 @@ class GcsActorSchedulerTest : public ::testing::Test { std::shared_ptr store_client_; std::shared_ptr gcs_actor_table_; - GcsServerMocker::MockedNodeInfoAccessor node_info_accessor_; GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_; std::shared_ptr raylet_client_; diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index 79c18a2d1..5ea44dfd5 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -27,10 +27,9 @@ class GcsNodeManagerTest : public ::testing::Test { TEST_F(GcsNodeManagerTest, TestManagement) { boost::asio::io_service io_service; - auto node_info_accessor = GcsServerMocker::MockedNodeInfoAccessor(); auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor(); - gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor, - gcs_pub_sub_, gcs_table_storage_); + gcs::GcsNodeManager node_manager(io_service, error_info_accessor, gcs_pub_sub_, + gcs_table_storage_); // Test Add/Get/Remove functionality. auto node = Mocker::GenNodeInfo(); auto node_id = ClientID::FromBinary(node->node_id()); @@ -44,10 +43,9 @@ TEST_F(GcsNodeManagerTest, TestManagement) { TEST_F(GcsNodeManagerTest, TestListener) { boost::asio::io_service io_service; - auto node_info_accessor = GcsServerMocker::MockedNodeInfoAccessor(); auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor(); - gcs::GcsNodeManager node_manager(io_service, node_info_accessor, error_info_accessor, - gcs_pub_sub_, gcs_table_storage_); + gcs::GcsNodeManager node_manager(io_service, error_info_accessor, gcs_pub_sub_, + gcs_table_storage_); // Test AddNodeAddedListener. int node_count = 1000; std::vector> added_nodes; diff --git a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc index ee90631af..ee91db7de 100644 --- a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc @@ -54,8 +54,7 @@ class GcsObjectManagerTest : public ::testing::Test { void SetUp() override { gcs_table_storage_ = std::make_shared(io_service_); gcs_node_manager_ = std::make_shared( - io_service_, node_info_accessor_, error_info_accessor_, gcs_pub_sub_, - gcs_table_storage_); + io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_); gcs_object_manager_ = std::make_shared( gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_); GenTestData(); @@ -83,7 +82,6 @@ class GcsObjectManagerTest : public ::testing::Test { protected: boost::asio::io_service io_service_; - GcsServerMocker::MockedNodeInfoAccessor node_info_accessor_; GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_; std::shared_ptr gcs_node_manager_; std::shared_ptr gcs_client_;