GCS Server add SIGTERM signal handler (#8795)

This commit is contained in:
fangfengbin 2020-06-08 17:26:36 +08:00 committed by GitHub
parent d04953ab3c
commit 68718b33b4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 98 additions and 56 deletions

View file

@ -910,6 +910,12 @@ class Node:
check_alive=check_alive,
allow_graceful=allow_graceful)
if ray_constants.PROCESS_TYPE_GCS_SERVER in self.all_processes:
self._kill_process_type(
ray_constants.PROCESS_TYPE_GCS_SERVER,
check_alive=check_alive,
allow_graceful=allow_graceful)
# We call "list" to copy the keys because we are modifying the
# dictionary while iterating over it.
for process_type in list(self.all_processes.keys()):

View file

@ -35,8 +35,10 @@ class GlobalStateAccessorTest : public ::testing::Test {
config.redis_address = "127.0.0.1";
config.is_test = true;
config.redis_port = TEST_REDIS_SERVER_PORTS.front();
gcs_server_.reset(new gcs::GcsServer(config));
io_service_.reset(new boost::asio::io_service());
gcs_server_.reset(new gcs::GcsServer(config, *io_service_));
gcs_server_->Start();
thread_io_service_.reset(new std::thread([this] {
std::unique_ptr<boost::asio::io_service::work> work(
@ -44,8 +46,6 @@ class GlobalStateAccessorTest : public ::testing::Test {
io_service_->run();
}));
thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); }));
// Wait until server starts listening.
while (!gcs_server_->IsStarted()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
@ -67,8 +67,9 @@ class GlobalStateAccessorTest : public ::testing::Test {
void TearDown() override {
gcs_server_->Stop();
io_service_->stop();
gcs_server_.reset();
thread_io_service_->join();
thread_gcs_server_->join();
gcs_client_->Disconnect();
global_state_->Disconnect();
global_state_.reset();
@ -84,7 +85,6 @@ class GlobalStateAccessorTest : public ::testing::Test {
gcs::GcsServerConfig config;
std::unique_ptr<gcs::GcsServer> gcs_server_;
std::unique_ptr<std::thread> thread_io_service_;
std::unique_ptr<std::thread> thread_gcs_server_;
std::unique_ptr<boost::asio::io_service> io_service_;
// GCS client.

View file

@ -30,22 +30,28 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
protected:
void SetUp() override {
config.grpc_server_port = 0;
config.grpc_server_name = "MockedGcsServer";
config.grpc_server_thread_num = 1;
config.redis_address = "127.0.0.1";
config.is_test = true;
config.redis_port = TEST_REDIS_SERVER_PORTS.front();
gcs_server_.reset(new gcs::GcsServer(config));
io_service_.reset(new boost::asio::io_service());
config_.grpc_server_port = 0;
config_.grpc_server_name = "MockedGcsServer";
config_.grpc_server_thread_num = 1;
config_.redis_address = "127.0.0.1";
config_.is_test = true;
config_.redis_port = TEST_REDIS_SERVER_PORTS.front();
thread_io_service_.reset(new std::thread([this] {
client_io_service_.reset(new boost::asio::io_service());
client_io_service_thread_.reset(new std::thread([this] {
std::unique_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(*io_service_));
io_service_->run();
new boost::asio::io_service::work(*client_io_service_));
client_io_service_->run();
}));
thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); }));
server_io_service_.reset(new boost::asio::io_service());
gcs_server_.reset(new gcs::GcsServer(config_, *server_io_service_));
gcs_server_->Start();
server_io_service_thread_.reset(new std::thread([this] {
std::unique_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(*server_io_service_));
server_io_service_->run();
}));
// Wait until server starts listening.
while (!gcs_server_->IsStarted()) {
@ -53,28 +59,40 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
}
// Create GCS client.
gcs::GcsClientOptions options(config.redis_address, config.redis_port,
config.redis_password, config.is_test);
gcs::GcsClientOptions options(config_.redis_address, config_.redis_port,
config_.redis_password, config_.is_test);
gcs_client_.reset(new gcs::ServiceBasedGcsClient(options));
RAY_CHECK_OK(gcs_client_->Connect(*io_service_));
RAY_CHECK_OK(gcs_client_->Connect(*client_io_service_));
}
void TearDown() override {
io_service_->stop();
gcs_server_->Stop();
thread_io_service_->join();
thread_gcs_server_->join();
client_io_service_->stop();
gcs_client_->Disconnect();
gcs_server_->Stop();
server_io_service_->stop();
gcs_server_.reset();
server_io_service_thread_->join();
TestSetupUtil::FlushAllRedisServers();
client_io_service_thread_->join();
}
void RestartGcsServer() {
RAY_LOG(INFO) << "Stopping GCS service, port = " << gcs_server_->GetPort();
gcs_server_->Stop();
thread_gcs_server_->join();
server_io_service_->stop();
gcs_server_.reset();
server_io_service_thread_->join();
RAY_LOG(INFO) << "Finished stopping GCS service.";
gcs_server_.reset(new gcs::GcsServer(config));
thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); }));
server_io_service_.reset(new boost::asio::io_service());
gcs_server_.reset(new gcs::GcsServer(config_, *server_io_service_));
gcs_server_->Start();
server_io_service_thread_.reset(new std::thread([this] {
std::unique_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(*server_io_service_));
server_io_service_->run();
}));
// Wait until server starts listening.
while (gcs_server_->GetPort() == 0) {
@ -466,14 +484,15 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
}
// GCS server.
gcs::GcsServerConfig config;
gcs::GcsServerConfig config_;
std::unique_ptr<gcs::GcsServer> gcs_server_;
std::unique_ptr<std::thread> thread_io_service_;
std::unique_ptr<std::thread> thread_gcs_server_;
std::unique_ptr<boost::asio::io_service> io_service_;
std::unique_ptr<std::thread> server_io_service_thread_;
std::unique_ptr<boost::asio::io_service> server_io_service_;
// GCS client.
std::unique_ptr<gcs::GcsClient> gcs_client_;
std::unique_ptr<std::thread> client_io_service_thread_;
std::unique_ptr<boost::asio::io_service> client_io_service_;
// Timeout waiting for GCS server reply, default is 2s.
const std::chrono::milliseconds timeout_ms_{2000};
@ -878,6 +897,7 @@ TEST_F(ServiceBasedGcsClientTest, TestActorTableReSubscribe) {
ASSERT_TRUE(UpdateActor(actor2_id, actor2_table_data));
WaitPendingDone(actor1_update_count, 3);
WaitPendingDone(actor2_update_count, 1);
UnsubscribeActor(actor1_id);
}
TEST_F(ServiceBasedGcsClientTest, TestObjectTableReSubscribe) {

View file

@ -28,11 +28,13 @@
namespace ray {
namespace gcs {
GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config)
GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
boost::asio::io_service &main_service)
: config_(config),
main_service_(main_service),
rpc_server_(config.grpc_server_name, config.grpc_server_port,
config.grpc_server_thread_num),
client_call_manager_(main_service_) {}
client_call_manager_(main_service) {}
GcsServer::~GcsServer() { Stop(); }
@ -112,24 +114,17 @@ void GcsServer::Start() {
gcs_actor_manager_->LoadInitialData(on_done);
gcs_object_manager_->LoadInitialData(on_done);
gcs_node_manager_->LoadInitialData(on_done);
// Run the event loop.
// Using boost::asio::io_context::work to avoid ending the event loop when
// there are no events to handle.
boost::asio::io_context::work worker(main_service_);
main_service_.run();
}
void GcsServer::Stop() {
RAY_LOG(INFO) << "Stopping gcs server.";
if (!is_stopped_) {
RAY_LOG(INFO) << "Stopping GCS server.";
// Shutdown the rpc server
rpc_server_.Shutdown();
// Stop the event loop.
main_service_.stop();
is_stopped_ = true;
RAY_LOG(INFO) << "Finished stopping gcs server.";
RAY_LOG(INFO) << "GCS server stopped.";
}
}
void GcsServer::InitBackendClient() {

View file

@ -48,7 +48,8 @@ class GcsActorManager;
/// https://docs.google.com/document/d/1d-9qBlsh2UQHo-AWMWR0GptI_Ajwu4SKx0Q0LHKPpeI/edit#heading=h.csi0gaglj2pv
class GcsServer {
public:
explicit GcsServer(const GcsServerConfig &config);
explicit GcsServer(const GcsServerConfig &config,
boost::asio::io_service &main_service);
virtual ~GcsServer();
/// Start gcs server.
@ -108,10 +109,10 @@ class GcsServer {
/// Gcs server configuration
GcsServerConfig config_;
/// The main io service to drive event posted from grpc threads.
boost::asio::io_context &main_service_;
/// The grpc server
rpc::GrpcServer rpc_server_;
/// The main io service to drive event posted from grpc threads.
boost::asio::io_context main_service_;
/// The `ClientCallManager` object that is shared by all `NodeManagerWorkerClient`s.
rpc::ClientCallManager client_call_manager_;
/// The gcs node manager.

View file

@ -54,6 +54,8 @@ int main(int argc, char *argv[]) {
RayConfig::instance().initialize(config_map);
boost::asio::io_service main_service;
ray::gcs::GcsServerConfig gcs_server_config;
gcs_server_config.grpc_server_name = "GcsServer";
gcs_server_config.grpc_server_port = 0;
@ -62,6 +64,26 @@ int main(int argc, char *argv[]) {
gcs_server_config.redis_port = redis_port;
gcs_server_config.redis_password = redis_password;
gcs_server_config.retry_redis = retry_redis;
ray::gcs::GcsServer gcs_server(gcs_server_config);
ray::gcs::GcsServer gcs_server(gcs_server_config, main_service);
// Destroy the GCS server on a SIGTERM. The pointer to main_service is
// guaranteed to be valid since this function will run the event loop
// instead of returning immediately.
auto handler = [&main_service, &gcs_server](const boost::system::error_code &error,
int signal_number) {
RAY_LOG(INFO) << "GCS server received SIGTERM, shutting down...";
gcs_server.Stop();
main_service.stop();
};
boost::asio::signal_set signals(main_service);
#ifdef _WIN32
signals.add(SIGBREAK);
#else
signals.add(SIGTERM);
#endif
signals.async_wait(handler);
gcs_server.Start();
main_service.run();
}

View file

@ -34,7 +34,8 @@ class GcsServerTest : public ::testing::Test {
config.redis_address = "127.0.0.1";
config.is_test = true;
config.redis_port = TEST_REDIS_SERVER_PORTS.front();
gcs_server_.reset(new gcs::GcsServer(config));
gcs_server_.reset(new gcs::GcsServer(config, io_service_));
gcs_server_->Start();
thread_io_service_.reset(new std::thread([this] {
std::unique_ptr<boost::asio::io_service::work> work(
@ -42,8 +43,6 @@ class GcsServerTest : public ::testing::Test {
io_service_.run();
}));
thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); }));
// Wait until server starts listening.
while (gcs_server_->GetPort() == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
@ -58,8 +57,8 @@ class GcsServerTest : public ::testing::Test {
void TearDown() override {
gcs_server_->Stop();
io_service_.stop();
gcs_server_.reset();
thread_io_service_->join();
thread_gcs_server_->join();
}
bool AddJob(const rpc::AddJobRequest &request) {
@ -391,7 +390,6 @@ class GcsServerTest : public ::testing::Test {
// Gcs server
std::unique_ptr<gcs::GcsServer> gcs_server_;
std::unique_ptr<std::thread> thread_io_service_;
std::unique_ptr<std::thread> thread_gcs_server_;
boost::asio::io_service io_service_;
// Gcs client