Revert "[core] Remove gcs addr updater in core worker. (#24747)" (#25375)

Turns out https://github.com/ray-project/ray/pull/25342 wasn't the root cause of the ray shutdown flakiness. I realized there's another PR that could affect this test suite. Let's try reverting it and see if things get better.
This commit is contained in:
SangBin Cho 2022-06-02 07:12:33 +09:00 committed by GitHub
parent b9874f5bd9
commit 49efcab4fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 236 additions and 56 deletions

View file

@ -1459,7 +1459,6 @@ cc_test(
deps = [
":ray_util",
"@boost//:asio",
"@boost//:process",
"@com_google_googletest//:gtest_main",
],
)

View file

@ -246,19 +246,13 @@ class Cluster:
)
if self.head_node == node:
# We have to wait to prevent the raylet becomes a zombie which will prevent
# worker from exiting
self.head_node.kill_all_processes(
check_alive=False, allow_graceful=allow_graceful, wait=True
check_alive=False, allow_graceful=allow_graceful
)
self.head_node = None
# TODO(rliaw): Do we need to kill all worker processes?
else:
# We have to wait to prevent the raylet becomes a zombie which will prevent
# worker from exiting
node.kill_all_processes(
check_alive=False, allow_graceful=allow_graceful, wait=True
)
node.kill_all_processes(check_alive=False, allow_graceful=allow_graceful)
self.worker_nodes.remove(node)
assert (

View file

@ -1341,7 +1341,7 @@ class Node:
ray_constants.PROCESS_TYPE_REAPER, check_alive=check_alive
)
def kill_all_processes(self, check_alive=True, allow_graceful=False, wait=False):
def kill_all_processes(self, check_alive=True, allow_graceful=False):
"""Kill all of the processes.
Note that This is slower than necessary because it calls kill, wait,
@ -1350,8 +1350,6 @@ class Node:
Args:
check_alive: Raise an exception if any of the processes were
already dead.
wait: If true, then this method will not return until the
process in question has exited.
"""
# Kill the raylet first. This is important for suppressing errors at
# shutdown because we give the raylet a chance to exit gracefully and
@ -1363,7 +1361,6 @@ class Node:
ray_constants.PROCESS_TYPE_RAYLET,
check_alive=check_alive,
allow_graceful=allow_graceful,
wait=wait,
)
if ray_constants.PROCESS_TYPE_GCS_SERVER in self.all_processes:
@ -1371,7 +1368,6 @@ class Node:
ray_constants.PROCESS_TYPE_GCS_SERVER,
check_alive=check_alive,
allow_graceful=allow_graceful,
wait=wait,
)
# We call "list" to copy the keys because we are modifying the
@ -1381,10 +1377,7 @@ class Node:
# while cleaning up.
if process_type != ray_constants.PROCESS_TYPE_REAPER:
self._kill_process_type(
process_type,
check_alive=check_alive,
allow_graceful=allow_graceful,
wait=wait,
process_type, check_alive=check_alive, allow_graceful=allow_graceful
)
if ray_constants.PROCESS_TYPE_REAPER in self.all_processes:
@ -1392,7 +1385,6 @@ class Node:
ray_constants.PROCESS_TYPE_REAPER,
check_alive=check_alive,
allow_graceful=allow_graceful,
wait=wait,
)
def live_processes(self):

View file

@ -255,10 +255,7 @@ def test_placement_group_scheduling_strategy(ray_start_cluster, connect_to_clien
@pytest.mark.parametrize("connect_to_client", [True, False])
def test_node_affinity_scheduling_strategy(
monkeypatch, ray_start_cluster, connect_to_client
):
monkeypatch.setenv("RAY_num_heartbeats_timeout", "4")
def test_node_affinity_scheduling_strategy(ray_start_cluster, connect_to_client):
cluster = ray_start_cluster
cluster.add_node(num_cpus=8, resources={"head": 1})
ray.init(address=cluster.address)
@ -519,10 +516,7 @@ def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client):
@pytest.mark.skipif(
platform.system() == "Windows", reason="FakeAutoscaler doesn't work on Windows"
)
def test_demand_report_for_node_affinity_scheduling_strategy(
monkeypatch, shutdown_only
):
monkeypatch.setenv("RAY_num_heartbeats_timeout", "4")
def test_demand_report_for_node_affinity_scheduling_strategy(shutdown_only):
from ray.cluster_utils import AutoscalingCluster
cluster = AutoscalingCluster(

View file

@ -198,6 +198,10 @@ class MockRayletClientInterface : public RayletClientInterface {
GetSystemConfig,
(const rpc::ClientCallback<rpc::GetSystemConfigReply> &callback),
(override));
MOCK_METHOD(void,
GetGcsServerAddress,
(const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback),
(override));
MOCK_METHOD(void,
UpdateResourceUsage,
(std::string & serialized_resource_usage_batch,

View file

@ -180,6 +180,16 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
<< rpc_address_.port() << ", worker ID " << worker_context_.GetWorkerID()
<< ", raylet " << local_raylet_id;
// Begin to get gcs server address from raylet.
gcs_server_address_updater_ = std::make_unique<GcsServerAddressUpdater>(
options_.raylet_ip_address,
options_.node_manager_port,
[this](std::string ip, int port) {
absl::MutexLock lock(&gcs_server_address_mutex_);
gcs_server_address_.first = ip;
gcs_server_address_.second = port;
});
gcs_client_ = std::make_shared<gcs::GcsClient>(options_.gcs_options);
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
@ -236,7 +246,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
return std::shared_ptr<rpc::CoreWorkerClient>(
new rpc::CoreWorkerClient(addr, *client_call_manager_));
});
if (options_.worker_type != WorkerType::DRIVER) {
if (options_.worker_type == WorkerType::WORKER) {
periodical_runner_.RunFnPeriodically(
[this] { ExitIfParentRayletDies(); },
RayConfig::instance().raylet_death_check_interval_milliseconds());
@ -748,14 +759,16 @@ void CoreWorker::RegisterToGcs() {
}
void CoreWorker::ExitIfParentRayletDies() {
RAY_CHECK(options_.worker_type == WorkerType::WORKER);
RAY_CHECK(!RayConfig::instance().RAYLET_PID().empty());
static auto raylet_pid =
static_cast<pid_t>(std::stoi(RayConfig::instance().RAYLET_PID()));
auto raylet_pid = static_cast<pid_t>(std::stoi(RayConfig::instance().RAYLET_PID()));
bool should_shutdown = !IsProcessAlive(raylet_pid);
if (should_shutdown) {
RAY_LOG(WARNING) << "Shutting down the core worker because the local raylet failed. "
<< "Check out the raylet.out log file. Raylet pid: " << raylet_pid;
QuickExit();
std::ostringstream stream;
stream << "Shutting down the core worker because the local raylet failed. "
<< "Check out the raylet.out log file. Raylet pid: " << raylet_pid;
RAY_LOG(WARNING) << stream.str();
task_execution_service_.post([this]() { Shutdown(); }, "CoreWorker.Shutdown");
}
}

View file

@ -26,6 +26,7 @@
#include "ray/core_worker/core_worker_options.h"
#include "ray/core_worker/core_worker_process.h"
#include "ray/core_worker/future_resolver.h"
#include "ray/core_worker/gcs_server_address_updater.h"
#include "ray/core_worker/lease_policy.h"
#include "ray/core_worker/object_recovery_manager.h"
#include "ray/core_worker/profiling.h"
@ -1112,6 +1113,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
// Client to the GCS shared by core worker interfaces.
std::shared_ptr<gcs::GcsClient> gcs_client_;
std::pair<std::string, int> gcs_server_address_ GUARDED_BY(gcs_server_address_mutex_) =
std::make_pair<std::string, int>("", 0);
/// To protect accessing the `gcs_server_address_`.
absl::Mutex gcs_server_address_mutex_;
std::unique_ptr<GcsServerAddressUpdater> gcs_server_address_updater_;
// Client to the raylet shared by core worker interfaces. This needs to be a
// shared_ptr for direct calls because we can lease multiple workers through
// one client, and we need to keep the connection alive until we return all

View file

@ -0,0 +1,96 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/core_worker/gcs_server_address_updater.h"
namespace ray {
namespace core {
GcsServerAddressUpdater::GcsServerAddressUpdater(
const std::string raylet_ip_address,
const int port,
std::function<void(std::string, int)> update_func)
: client_call_manager_(updater_io_service_),
raylet_client_(rpc::NodeManagerWorkerClient::make(
raylet_ip_address, port, client_call_manager_)),
update_func_(update_func),
updater_runner_(updater_io_service_),
updater_thread_([this] {
SetThreadName("gcs_addr_updater");
std::thread::id this_id = std::this_thread::get_id();
RAY_LOG(INFO) << "GCS Server updater thread id: " << this_id;
/// The asio work to keep io_service_ alive.
boost::asio::io_service::work io_service_work_(updater_io_service_);
updater_io_service_.run();
}) {
// Start updating gcs server address.
updater_runner_.RunFnPeriodically(
[this] { UpdateGcsServerAddress(); },
RayConfig::instance().gcs_service_address_check_interval_milliseconds(),
"GcsServerAddressUpdater.UpdateGcsServerAddress");
}
GcsServerAddressUpdater::~GcsServerAddressUpdater() {
updater_io_service_.stop();
if (updater_thread_.joinable()) {
updater_thread_.join();
} else {
RAY_LOG(WARNING)
<< "Could not join updater thread. This can cause segfault upon destruction.";
}
RAY_LOG(DEBUG) << "GcsServerAddressUpdater is destructed";
}
void GcsServerAddressUpdater::UpdateGcsServerAddress() {
raylet_client_.GetGcsServerAddress([this](const Status &status,
const rpc::GetGcsServerAddressReply &reply) {
const int64_t max_retries =
RayConfig::instance().gcs_rpc_server_reconnect_timeout_s() * 1000 /
RayConfig::instance().gcs_service_address_check_interval_milliseconds();
if (!status.ok()) {
failed_ping_count_ += 1;
auto warning_threshold = max_retries / 2;
RAY_LOG_EVERY_N(WARNING, warning_threshold)
<< "Failed to get the gcs server address from raylet " << failed_ping_count_
<< " times in a row. If it keeps failing to obtain the address, "
"the worker might crash. Connection status "
<< status;
if (failed_ping_count_ >= max_retries) {
std::stringstream os;
os << "Failed to receive the GCS address for " << failed_ping_count_
<< " times without success. The worker will exit ungracefully. It is because ";
if (status.IsGrpcUnavailable()) {
RAY_LOG(WARNING) << os.str()
<< "raylet has died, and it couldn't obtain the GCS address "
"from the raylet anymore. Please check the log from "
"raylet.err on this address.";
} else {
RAY_LOG(ERROR)
<< os.str()
<< "GCS has died. It could be because there was an issue that "
"kills GCS, such as high memory usage triggering OOM killer "
"to kill GCS. Cluster will be highly likely unavailable if you see "
"this log. Please check the log from gcs_server.err.";
}
QuickExit();
}
} else {
failed_ping_count_ = 0;
update_func_(reply.ip(), reply.port());
}
});
}
} // namespace core
} // namespace ray

View file

@ -0,0 +1,52 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/raylet_client/raylet_client.h"
namespace ray {
namespace core {
class GcsServerAddressUpdater {
public:
/// Create a updater for gcs server address.
///
/// \param raylet_ip_address Raylet ip address.
/// \param port Port to connect raylet.
/// \param address to store gcs server address.
GcsServerAddressUpdater(const std::string raylet_ip_address,
const int port,
std::function<void(std::string, int)> update_func);
~GcsServerAddressUpdater();
private:
/// Update gcs server address.
void UpdateGcsServerAddress();
instrumented_io_context updater_io_service_;
rpc::ClientCallManager client_call_manager_;
/// A client connection to the raylet.
raylet::RayletClient raylet_client_;
std::function<void(std::string, int)> update_func_;
PeriodicalRunner updater_runner_;
std::thread updater_thread_;
int32_t failed_ping_count_ = 0;
};
} // namespace core
} // namespace ray

View file

@ -268,6 +268,10 @@ struct GcsServerMocker {
void GetSystemConfig(const ray::rpc::ClientCallback<ray::rpc::GetSystemConfigReply>
&callback) override {}
void GetGcsServerAddress(
const ray::rpc::ClientCallback<ray::rpc::GetGcsServerAddressReply> &callback)
override {}
/// ResourceUsageInterface
void RequestResourceReport(
const rpc::ClientCallback<rpc::RequestResourceReportReply> &callback) override {

View file

@ -293,6 +293,14 @@ message UpdateResourceUsageRequest {
message UpdateResourceUsageReply {
}
message GetGcsServerAddressRequest {
}
message GetGcsServerAddressReply {
string ip = 1;
int32 port = 2;
}
message GetTasksInfoRequest {}
message GetTasksInfoReply {
@ -378,6 +386,8 @@ service NodeManagerService {
returns (ReleaseUnusedBundlesReply);
// Get the system config.
rpc GetSystemConfig(GetSystemConfigRequest) returns (GetSystemConfigReply);
// Get gcs server address.
rpc GetGcsServerAddress(GetGcsServerAddressRequest) returns (GetGcsServerAddressReply);
// [State API] Get the all task information of the node.
rpc GetTasksInfo(GetTasksInfoRequest) returns (GetTasksInfoReply);
// [State API] Get the all object information of the node.

View file

@ -2409,6 +2409,16 @@ void NodeManager::HandleGetSystemConfig(const rpc::GetSystemConfigRequest &reque
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void NodeManager::HandleGetGcsServerAddress(
const rpc::GetGcsServerAddressRequest &request,
rpc::GetGcsServerAddressReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto address = gcs_client_->GetGcsServerAddress();
reply->set_ip(address.first);
reply->set_port(address.second);
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request,
rpc::GetNodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {

View file

@ -588,6 +588,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
rpc::GetSystemConfigReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `GetGcsServerAddress` request.
void HandleGetGcsServerAddress(const rpc::GetGcsServerAddressRequest &request,
rpc::GetGcsServerAddressReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `HandleGetTasksInfo` request.
void HandleGetTasksInfo(const rpc::GetTasksInfoRequest &request,
rpc::GetTasksInfoReply *reply,

View file

@ -542,4 +542,10 @@ void raylet::RayletClient::GetSystemConfig(
grpc_client_->GetSystemConfig(request, callback);
}
void raylet::RayletClient::GetGcsServerAddress(
const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback) {
rpc::GetGcsServerAddressRequest request;
grpc_client_->GetGcsServerAddress(request, callback);
}
} // namespace ray

View file

@ -185,6 +185,9 @@ class RayletClientInterface : public PinObjectsInterface,
virtual void GetSystemConfig(
const rpc::ClientCallback<rpc::GetSystemConfigReply> &callback) = 0;
virtual void GetGcsServerAddress(
const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback) = 0;
virtual void NotifyGCSRestart(
const rpc::ClientCallback<rpc::NotifyGCSRestartReply> &callback) = 0;
@ -458,6 +461,9 @@ class RayletClient : public RayletClientInterface {
void GetSystemConfig(
const rpc::ClientCallback<rpc::GetSystemConfigReply> &callback) override;
void GetGcsServerAddress(
const rpc::ClientCallback<rpc::GetGcsServerAddressReply> &callback) override;
void GlobalGC(const rpc::ClientCallback<rpc::GlobalGCReply> &callback);
void UpdateResourceUsage(

View file

@ -187,6 +187,12 @@ class NodeManagerWorkerClient
grpc_client_,
/*method_timeout_ms*/ -1, )
/// Get gcs server address.
VOID_RPC_CLIENT_METHOD(NodeManagerService,
GetGcsServerAddress,
grpc_client_,
/*method_timeout_ms*/ -1, )
/// Get all the task information from the node.
VOID_RPC_CLIENT_METHOD(NodeManagerService,
GetTasksInfo,

View file

@ -44,6 +44,7 @@ namespace rpc {
RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedBundles, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, GetSystemConfig, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, GetGcsServerAddress, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, ShutdownRaylet, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, GetTasksInfo, -1) \
RPC_SERVICE_HANDLER(NodeManagerService, GetObjectsInfo, -1)
@ -146,10 +147,12 @@ class NodeManagerServiceHandler {
GetSystemConfigReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetGcsServerAddress(const GetGcsServerAddressRequest &request,
GetGcsServerAddressReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetTasksInfo(const GetTasksInfoRequest &request,
GetTasksInfoReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleGetObjectsInfo(const GetObjectsInfoRequest &request,
GetObjectsInfoReply *reply,
SendReplyCallback send_reply_callback) = 0;

View file

@ -602,7 +602,7 @@ pid_t GetPID() {
bool IsParentProcessAlive() { return GetParentPID() != 1; }
bool IsProcessAlive(pid_t pid) {
#if defined _WIN32
#ifdef _WIN32
if (HANDLE handle =
OpenProcess(PROCESS_QUERY_INFORMATION, FALSE, static_cast<DWORD>(pid))) {
DWORD exit_code;

View file

@ -17,15 +17,9 @@
#include <stdio.h>
#include <boost/asio/generic/basic_endpoint.hpp>
#include <boost/process/child.hpp>
#include <chrono>
#include <thread>
#include "gtest/gtest.h"
#include "ray/util/logging.h"
#include "ray/util/process.h"
using namespace std::chrono_literals;
static const char *argv0 = NULL;
@ -190,21 +184,6 @@ TEST(UtilTest, CreateCommandLineTest) {
}
}
TEST(UtilTest, IsProcessAlive) {
namespace bp = boost::process;
bp::child c("bash");
auto pid = c.id();
c.join();
for (int i = 0; i < 5; ++i) {
if (IsProcessAlive(pid)) {
std::this_thread::sleep_for(1s);
} else {
break;
}
}
RAY_CHECK(!IsProcessAlive(pid));
}
} // namespace ray
int main(int argc, char **argv) {