diff --git a/python/ray/node.py b/python/ray/node.py index 23e5623a2..7f76a5031 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -910,6 +910,12 @@ class Node: check_alive=check_alive, allow_graceful=allow_graceful) + if ray_constants.PROCESS_TYPE_GCS_SERVER in self.all_processes: + self._kill_process_type( + ray_constants.PROCESS_TYPE_GCS_SERVER, + check_alive=check_alive, + allow_graceful=allow_graceful) + # We call "list" to copy the keys because we are modifying the # dictionary while iterating over it. for process_type in list(self.all_processes.keys()): diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index ff459f3a6..71309f030 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -35,8 +35,10 @@ class GlobalStateAccessorTest : public ::testing::Test { config.redis_address = "127.0.0.1"; config.is_test = true; config.redis_port = TEST_REDIS_SERVER_PORTS.front(); - gcs_server_.reset(new gcs::GcsServer(config)); + io_service_.reset(new boost::asio::io_service()); + gcs_server_.reset(new gcs::GcsServer(config, *io_service_)); + gcs_server_->Start(); thread_io_service_.reset(new std::thread([this] { std::unique_ptr work( @@ -44,8 +46,6 @@ class GlobalStateAccessorTest : public ::testing::Test { io_service_->run(); })); - thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); })); - // Wait until server starts listening. while (!gcs_server_->IsStarted()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -67,8 +67,9 @@ class GlobalStateAccessorTest : public ::testing::Test { void TearDown() override { gcs_server_->Stop(); io_service_->stop(); + gcs_server_.reset(); thread_io_service_->join(); - thread_gcs_server_->join(); + gcs_client_->Disconnect(); global_state_->Disconnect(); global_state_.reset(); @@ -84,7 +85,6 @@ class GlobalStateAccessorTest : public ::testing::Test { gcs::GcsServerConfig config; std::unique_ptr gcs_server_; std::unique_ptr thread_io_service_; - std::unique_ptr thread_gcs_server_; std::unique_ptr io_service_; // GCS client. diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 4024d2c27..f25466912 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -30,22 +30,28 @@ class ServiceBasedGcsClientTest : public ::testing::Test { protected: void SetUp() override { - config.grpc_server_port = 0; - config.grpc_server_name = "MockedGcsServer"; - config.grpc_server_thread_num = 1; - config.redis_address = "127.0.0.1"; - config.is_test = true; - config.redis_port = TEST_REDIS_SERVER_PORTS.front(); - gcs_server_.reset(new gcs::GcsServer(config)); - io_service_.reset(new boost::asio::io_service()); + config_.grpc_server_port = 0; + config_.grpc_server_name = "MockedGcsServer"; + config_.grpc_server_thread_num = 1; + config_.redis_address = "127.0.0.1"; + config_.is_test = true; + config_.redis_port = TEST_REDIS_SERVER_PORTS.front(); - thread_io_service_.reset(new std::thread([this] { + client_io_service_.reset(new boost::asio::io_service()); + client_io_service_thread_.reset(new std::thread([this] { std::unique_ptr work( - new boost::asio::io_service::work(*io_service_)); - io_service_->run(); + new boost::asio::io_service::work(*client_io_service_)); + client_io_service_->run(); })); - thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); })); + server_io_service_.reset(new boost::asio::io_service()); + gcs_server_.reset(new gcs::GcsServer(config_, *server_io_service_)); + gcs_server_->Start(); + server_io_service_thread_.reset(new std::thread([this] { + std::unique_ptr work( + new boost::asio::io_service::work(*server_io_service_)); + server_io_service_->run(); + })); // Wait until server starts listening. while (!gcs_server_->IsStarted()) { @@ -53,28 +59,40 @@ class ServiceBasedGcsClientTest : public ::testing::Test { } // Create GCS client. - gcs::GcsClientOptions options(config.redis_address, config.redis_port, - config.redis_password, config.is_test); + gcs::GcsClientOptions options(config_.redis_address, config_.redis_port, + config_.redis_password, config_.is_test); gcs_client_.reset(new gcs::ServiceBasedGcsClient(options)); - RAY_CHECK_OK(gcs_client_->Connect(*io_service_)); + RAY_CHECK_OK(gcs_client_->Connect(*client_io_service_)); } void TearDown() override { - io_service_->stop(); - gcs_server_->Stop(); - thread_io_service_->join(); - thread_gcs_server_->join(); + client_io_service_->stop(); gcs_client_->Disconnect(); + + gcs_server_->Stop(); + server_io_service_->stop(); + gcs_server_.reset(); + server_io_service_thread_->join(); TestSetupUtil::FlushAllRedisServers(); + client_io_service_thread_->join(); } void RestartGcsServer() { RAY_LOG(INFO) << "Stopping GCS service, port = " << gcs_server_->GetPort(); gcs_server_->Stop(); - thread_gcs_server_->join(); + server_io_service_->stop(); + gcs_server_.reset(); + server_io_service_thread_->join(); + RAY_LOG(INFO) << "Finished stopping GCS service."; - gcs_server_.reset(new gcs::GcsServer(config)); - thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); })); + server_io_service_.reset(new boost::asio::io_service()); + gcs_server_.reset(new gcs::GcsServer(config_, *server_io_service_)); + gcs_server_->Start(); + server_io_service_thread_.reset(new std::thread([this] { + std::unique_ptr work( + new boost::asio::io_service::work(*server_io_service_)); + server_io_service_->run(); + })); // Wait until server starts listening. while (gcs_server_->GetPort() == 0) { @@ -466,14 +484,15 @@ class ServiceBasedGcsClientTest : public ::testing::Test { } // GCS server. - gcs::GcsServerConfig config; + gcs::GcsServerConfig config_; std::unique_ptr gcs_server_; - std::unique_ptr thread_io_service_; - std::unique_ptr thread_gcs_server_; - std::unique_ptr io_service_; + std::unique_ptr server_io_service_thread_; + std::unique_ptr server_io_service_; // GCS client. std::unique_ptr gcs_client_; + std::unique_ptr client_io_service_thread_; + std::unique_ptr client_io_service_; // Timeout waiting for GCS server reply, default is 2s. const std::chrono::milliseconds timeout_ms_{2000}; @@ -878,6 +897,7 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableReSubscribe) { ASSERT_TRUE(UpdateActor(actor2_id, actor2_table_data)); WaitPendingDone(actor1_update_count, 3); WaitPendingDone(actor2_update_count, 1); + UnsubscribeActor(actor1_id); } TEST_F(ServiceBasedGcsClientTest, TestObjectTableReSubscribe) { diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index edd6022a1..4a22d4efa 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -28,11 +28,13 @@ namespace ray { namespace gcs { -GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config) +GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, + boost::asio::io_service &main_service) : config_(config), + main_service_(main_service), rpc_server_(config.grpc_server_name, config.grpc_server_port, config.grpc_server_thread_num), - client_call_manager_(main_service_) {} + client_call_manager_(main_service) {} GcsServer::~GcsServer() { Stop(); } @@ -112,24 +114,17 @@ void GcsServer::Start() { gcs_actor_manager_->LoadInitialData(on_done); gcs_object_manager_->LoadInitialData(on_done); gcs_node_manager_->LoadInitialData(on_done); - - // Run the event loop. - // Using boost::asio::io_context::work to avoid ending the event loop when - // there are no events to handle. - boost::asio::io_context::work worker(main_service_); - main_service_.run(); } void GcsServer::Stop() { - RAY_LOG(INFO) << "Stopping gcs server."; - // Shutdown the rpc server - rpc_server_.Shutdown(); + if (!is_stopped_) { + RAY_LOG(INFO) << "Stopping GCS server."; + // Shutdown the rpc server + rpc_server_.Shutdown(); - // Stop the event loop. - main_service_.stop(); - - is_stopped_ = true; - RAY_LOG(INFO) << "Finished stopping gcs server."; + is_stopped_ = true; + RAY_LOG(INFO) << "GCS server stopped."; + } } void GcsServer::InitBackendClient() { diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index e510b8df3..3e607c785 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -48,7 +48,8 @@ class GcsActorManager; /// https://docs.google.com/document/d/1d-9qBlsh2UQHo-AWMWR0GptI_Ajwu4SKx0Q0LHKPpeI/edit#heading=h.csi0gaglj2pv class GcsServer { public: - explicit GcsServer(const GcsServerConfig &config); + explicit GcsServer(const GcsServerConfig &config, + boost::asio::io_service &main_service); virtual ~GcsServer(); /// Start gcs server. @@ -108,10 +109,10 @@ class GcsServer { /// Gcs server configuration GcsServerConfig config_; + /// The main io service to drive event posted from grpc threads. + boost::asio::io_context &main_service_; /// The grpc server rpc::GrpcServer rpc_server_; - /// The main io service to drive event posted from grpc threads. - boost::asio::io_context main_service_; /// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s. rpc::ClientCallManager client_call_manager_; /// The gcs node manager. diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index 493599fb1..2e9140e7c 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -54,6 +54,8 @@ int main(int argc, char *argv[]) { RayConfig::instance().initialize(config_map); + boost::asio::io_service main_service; + ray::gcs::GcsServerConfig gcs_server_config; gcs_server_config.grpc_server_name = "GcsServer"; gcs_server_config.grpc_server_port = 0; @@ -62,6 +64,26 @@ int main(int argc, char *argv[]) { gcs_server_config.redis_port = redis_port; gcs_server_config.redis_password = redis_password; gcs_server_config.retry_redis = retry_redis; - ray::gcs::GcsServer gcs_server(gcs_server_config); + ray::gcs::GcsServer gcs_server(gcs_server_config, main_service); + + // Destroy the GCS server on a SIGTERM. The pointer to main_service is + // guaranteed to be valid since this function will run the event loop + // instead of returning immediately. + auto handler = [&main_service, &gcs_server](const boost::system::error_code &error, + int signal_number) { + RAY_LOG(INFO) << "GCS server received SIGTERM, shutting down..."; + gcs_server.Stop(); + main_service.stop(); + }; + boost::asio::signal_set signals(main_service); +#ifdef _WIN32 + signals.add(SIGBREAK); +#else + signals.add(SIGTERM); +#endif + signals.async_wait(handler); + gcs_server.Start(); + + main_service.run(); } diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index 196e3b328..ae2bc05ed 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -34,7 +34,8 @@ class GcsServerTest : public ::testing::Test { config.redis_address = "127.0.0.1"; config.is_test = true; config.redis_port = TEST_REDIS_SERVER_PORTS.front(); - gcs_server_.reset(new gcs::GcsServer(config)); + gcs_server_.reset(new gcs::GcsServer(config, io_service_)); + gcs_server_->Start(); thread_io_service_.reset(new std::thread([this] { std::unique_ptr work( @@ -42,8 +43,6 @@ class GcsServerTest : public ::testing::Test { io_service_.run(); })); - thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); })); - // Wait until server starts listening. while (gcs_server_->GetPort() == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -58,8 +57,8 @@ class GcsServerTest : public ::testing::Test { void TearDown() override { gcs_server_->Stop(); io_service_.stop(); + gcs_server_.reset(); thread_io_service_->join(); - thread_gcs_server_->join(); } bool AddJob(const rpc::AddJobRequest &request) { @@ -391,7 +390,6 @@ class GcsServerTest : public ::testing::Test { // Gcs server std::unique_ptr gcs_server_; std::unique_ptr thread_io_service_; - std::unique_ptr thread_gcs_server_; boost::asio::io_service io_service_; // Gcs client