Revert "[core] Move reconnection to RPC layer for GCS client. (#24330)" (#24762)

This reverts commit c427bc54e7.
This commit is contained in:
Chen Shen 2022-05-13 00:07:21 -07:00 committed by GitHub
parent 82cdb0d8f1
commit 30f370bf1f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 262 additions and 593 deletions

View file

@ -2064,29 +2064,6 @@ cc_test(
],
)
cc_test(
name = "gcs_client_reconnection_test",
srcs = [
"src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc",
],
args = [
"$(location redis-server)",
"$(location redis-cli)",
],
copts = COPTS,
data = [
"//:redis-cli",
"//:redis-server",
],
tags = ["team:core"],
deps = [
":gcs_client_lib",
":gcs_server_lib",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "object_manager",
srcs = glob([

View file

@ -579,17 +579,17 @@ def create_redis_client(redis_address, password=None):
try:
cli.ping()
return cli
except Exception as e:
except Exception:
create_redis_client.instances.pop(redis_address)
if i >= num_retries - 1:
raise RuntimeError(
f"Unable to connect to Redis at {redis_address}: {e}"
)
break
# Wait a little bit.
time.sleep(delay)
# Make sure the retry interval doesn't increase too large.
delay = min(1, delay * 2)
raise RuntimeError(f"Unable to connect to Redis at {redis_address}")
def start_ray_process(
command,

View file

@ -3,7 +3,6 @@ import sys
import ray
import ray._private.gcs_utils as gcs_utils
import pytest
from time import sleep
from ray._private.test_utils import (
generate_system_config_map,
wait_for_condition,
@ -215,77 +214,6 @@ def test_gcs_client_reconnect(ray_start_regular_with_external_redis, auto_reconn
assert gcs_client.internal_kv_get(b"a", None) == b"b"
@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
{
**generate_system_config_map(
num_heartbeats_timeout=20, gcs_rpc_server_reconnect_timeout_s=3600
),
"namespace": "actor",
}
],
indirect=True,
)
def test_actor_workloads(ray_start_regular_with_external_redis):
"""This test cover the case to create actor while gcs is down
and also make sure existing actor continue to work even when
GCS is down.
"""
@ray.remote
class Counter:
def r(self, v):
return v
c = Counter.remote()
r = ray.get(c.r.remote(10))
assert r == 10
print("GCS is killed")
ray.worker._global_node.kill_gcs_server()
print("Start to create a new actor")
cc = Counter.remote()
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(cc.r.remote(10), timeout=5)
assert ray.get(c.r.remote(10)) == 10
ray.worker._global_node.start_gcs_server()
import threading
def f():
assert ray.get(cc.r.remote(10)) == 10
t = threading.Thread(target=f)
t.start()
t.join()
c = Counter.options(lifetime="detached", name="C").remote()
assert ray.get(c.r.remote(10)) == 10
ray.worker._global_node.kill_gcs_server()
sleep(2)
assert ray.get(c.r.remote(10)) == 10
ray.worker._global_node.start_gcs_server()
from ray._private.test_utils import run_string_as_driver
run_string_as_driver(
"""
import ray
ray.init('auto', namespace='actor')
a = ray.get_actor("C")
assert ray.get(a.r.remote(10)) == 10
"""
)
if __name__ == "__main__":
import pytest

View file

@ -33,7 +33,7 @@ class MockGcsClient : public GcsClient {
public:
MOCK_METHOD(Status, Connect, (instrumented_io_context & io_service), (override));
MOCK_METHOD(void, Disconnect, (), (override));
MOCK_METHOD((std::pair<std::string, int>), GetGcsServerAddress, (), (const, override));
MOCK_METHOD((std::pair<std::string, int>), GetGcsServerAddress, (), (override));
MOCK_METHOD(std::string, DebugString, (), (const, override));
MockGcsClient() {

View file

@ -350,26 +350,9 @@ RAY_CONFIG(bool, support_fork, false)
/// Each reconnection ping will be retried every 1 second.
RAY_CONFIG(int32_t, gcs_rpc_server_reconnect_timeout_s, 60)
/// The timeout for GCS connection in seconds
RAY_CONFIG(int32_t, gcs_rpc_server_connect_timeout_s, 5)
/// Minimum interval between reconnecting gcs rpc server when gcs server restarts.
RAY_CONFIG(int32_t, minimum_gcs_reconnect_interval_milliseconds, 5000)
/// gRPC channel reconnection related configs to GCS.
/// Check https://grpc.github.io/grpc/core/group__grpc__arg__keys.html for details
RAY_CONFIG(int32_t, gcs_grpc_max_reconnect_backoff_ms, 2000)
RAY_CONFIG(int32_t, gcs_grpc_min_reconnect_backoff_ms, 100)
RAY_CONFIG(int32_t, gcs_grpc_initial_reconnect_backoff_ms, 100)
/// Maximum bytes of request queued when RPC failed due to GCS is down.
/// If reach the limit, the core worker will hang until GCS is reconnected.
/// By default, the value if 5GB.
RAY_CONFIG(uint64_t, gcs_grpc_max_request_queued_max_bytes, 1024UL * 1024 * 1024 * 5)
/// The duration between two checks for grpc status.
RAY_CONFIG(int32_t, gcs_client_check_connection_status_interval_milliseconds, 1000)
/// Feature flag to use the ray syncer for resource synchronization
RAY_CONFIG(bool, use_ray_syncer, false)

View file

@ -186,7 +186,16 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
gcs_server_address_.second = port;
});
gcs_client_ = std::make_shared<gcs::GcsClient>(options_.gcs_options);
gcs_client_ = std::make_shared<gcs::GcsClient>(
options_.gcs_options, [this](std::pair<std::string, int> *address) {
absl::MutexLock lock(&gcs_server_address_mutex_);
if (gcs_server_address_.second != 0) {
address->first = gcs_server_address_.first;
address->second = gcs_server_address_.second;
return true;
}
return false;
});
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
RegisterToGcs();

View file

@ -75,17 +75,54 @@ void GcsSubscriberClient::PubsubCommandBatch(
} // namespace
GcsClient::GcsClient(const GcsClientOptions &options) : options_(options) {}
GcsClient::GcsClient(
const GcsClientOptions &options,
std::function<bool(std::pair<std::string, int> *)> get_gcs_server_address_func)
: options_(options),
get_server_address_func_(std::move(get_gcs_server_address_func)),
last_reconnect_timestamp_ms_(0),
last_reconnect_address_(std::make_pair("", -1)) {}
Status GcsClient::Connect(instrumented_io_context &io_service) {
RAY_CHECK(!is_connected_);
// Setup gcs server address fetcher
if (get_server_address_func_ == nullptr) {
get_server_address_func_ = [this](std::pair<std::string, int> *addr) {
*addr = std::make_pair(options_.gcs_address_, options_.gcs_port_);
return true;
};
}
// Get gcs address
int i = 0;
while (current_gcs_server_address_.first.empty() &&
i < RayConfig::instance().gcs_service_connect_retries()) {
std::this_thread::sleep_for(std::chrono::milliseconds(
RayConfig::instance().internal_gcs_service_connect_wait_milliseconds()));
get_server_address_func_(&current_gcs_server_address_);
i++;
}
resubscribe_func_ = [this]() {
job_accessor_->AsyncResubscribe();
actor_accessor_->AsyncResubscribe();
node_accessor_->AsyncResubscribe();
node_resource_accessor_->AsyncResubscribe();
worker_accessor_->AsyncResubscribe();
};
// Connect to gcs service.
client_call_manager_ = std::make_unique<rpc::ClientCallManager>(io_service);
gcs_rpc_client_ = std::make_shared<rpc::GcsRpcClient>(
options_.gcs_address_, options_.gcs_port_, *client_call_manager_);
current_gcs_server_address_.first,
current_gcs_server_address_.second,
*client_call_manager_,
[this](rpc::GcsServiceFailureType type) { current_connection_failure_ = type; });
rpc::Address gcs_address;
gcs_address.set_ip_address(options_.gcs_address_);
gcs_address.set_port(options_.gcs_port_);
gcs_address.set_ip_address(current_gcs_server_address_.first);
gcs_address.set_port(current_gcs_server_address_.second);
/// TODO(mwtian): refactor pubsub::Subscriber to avoid faking worker ID.
gcs_address.set_worker_id(UniqueID::FromRandom().Binary());
@ -116,19 +153,154 @@ Status GcsClient::Connect(instrumented_io_context &io_service) {
worker_accessor_ = std::make_unique<WorkerInfoAccessor>(this);
placement_group_accessor_ = std::make_unique<PlacementGroupInfoAccessor>(this);
internal_kv_accessor_ = std::make_unique<InternalKVAccessor>(this);
// Init gcs service address check timer.
periodical_runner_ = std::make_unique<PeriodicalRunner>(io_service);
periodical_runner_->RunFnPeriodically(
[this] { PeriodicallyCheckGcsConnection(); },
RayConfig::instance().gcs_service_address_check_interval_milliseconds(),
"GcsClient.deadline_timer.check_gcs_connection");
is_connected_ = true;
RAY_LOG(DEBUG) << "GcsClient connected.";
return Status::OK();
}
void GcsClient::Disconnect() {
if (gcs_rpc_client_) {
gcs_rpc_client_->Shutdown();
if (!is_connected_) {
RAY_LOG(WARNING) << "GcsClient has been disconnected.";
return;
}
is_connected_ = false;
disconnected_ = true;
RAY_LOG(DEBUG) << "GcsClient Disconnected.";
}
std::pair<std::string, int> GcsClient::GetGcsServerAddress() {
return current_gcs_server_address_;
}
/// Checks whether GCS at the specified address is healthy.
bool GcsClient::CheckHealth(const std::string &ip, int port, int64_t timeout_ms) {
// Health checking currently needs to be blocking. So it cannot use the event loop
// in the GcsClient. Otherwise there can be deadlocks.
auto channel = grpc::CreateChannel(absl::StrCat(ip, ":", port),
grpc::InsecureChannelCredentials());
std::unique_ptr<rpc::HeartbeatInfoGcsService::Stub> stub =
rpc::HeartbeatInfoGcsService::NewStub(std::move(channel));
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::milliseconds(timeout_ms));
const rpc::CheckAliveRequest request;
rpc::CheckAliveReply reply;
auto status = stub->CheckAlive(&context, request, &reply);
if (!status.ok()) {
RAY_LOG(WARNING) << "Unable to reach GCS at " << ip << ":" << port
<< ". Failure: " << status.error_code() << " "
<< status.error_message();
return false;
}
return true;
}
// TODO(iycheng, mwtian): rework the reconnection logic for GCS HA.
void GcsClient::PeriodicallyCheckGcsConnection() {
if (disconnected_) {
return;
}
// Check if current connection has failed.
if (current_connection_failure_.has_value()) {
GcsServiceFailureDetected(*current_connection_failure_);
current_connection_failure_.reset();
return;
}
// Check if GCS address has changed because of restarting.
std::pair<std::string, int> address;
if (get_server_address_func_(&address)) {
if (address != current_gcs_server_address_) {
current_gcs_server_address_ = address;
GcsServiceFailureDetected(rpc::GcsServiceFailureType::GCS_SERVER_RESTART);
}
}
}
std::pair<std::string, int> GcsClient::GetGcsServerAddress() const {
return gcs_rpc_client_->GetAddress();
void GcsClient::GcsServiceFailureDetected(rpc::GcsServiceFailureType type) {
if (disconnected_) {
return;
}
switch (type) {
case rpc::GcsServiceFailureType::RPC_DISCONNECT:
// If the GCS server address does not change, reconnect to GCS server.
ReconnectGcsServer();
break;
case rpc::GcsServiceFailureType::GCS_SERVER_RESTART:
// If GCS sever address has changed, reconnect to GCS server and redo
// subscription.
ReconnectGcsServer();
// If using GCS server for pubsub, resubscribe to GCS publishers.
resubscribe_func_();
// Resend resource usage after reconnected, needed by resource view in GCS.
node_resource_accessor_->AsyncReReportResourceUsage();
break;
default:
RAY_LOG(FATAL) << "Unsupported failure type: " << type;
break;
}
}
void GcsClient::ReconnectGcsServer() {
std::pair<std::string, int> address;
auto timeout_s =
absl::Seconds(RayConfig::instance().gcs_rpc_server_reconnect_timeout_s());
auto start = absl::Now();
auto reconnected = false;
while (absl::Now() - start < timeout_s) {
if (disconnected_) {
return;
}
if (get_server_address_func_(&address)) {
// After GCS is restarted, the gcs client will reestablish the connection. At
// present, every failed RPC request will trigger `ReconnectGcsServer`. In order to
// avoid repeated connections in a short period of time, we add a protection
// mechanism: if the address does not change (meaning gcs server doesn't restart),
// the connection can be made at most once in
// `minimum_gcs_reconnect_interval_milliseconds` milliseconds.
if (last_reconnect_address_ == address &&
(current_sys_time_ms() - last_reconnect_timestamp_ms_) <
RayConfig::instance().minimum_gcs_reconnect_interval_milliseconds()) {
RAY_LOG(DEBUG)
<< "Repeated reconnection in "
<< RayConfig::instance().minimum_gcs_reconnect_interval_milliseconds()
<< " milliseconds, return directly.";
return;
}
RAY_LOG(DEBUG) << "Attemptting to reconnect to GCS server: " << address.first << ":"
<< address.second;
if (CheckHealth(address.first, address.second, /*timeout_ms=*/2000)) {
// If `last_reconnect_address_` port is -1, it means that this is the first
// connection and no log will be printed.
if (last_reconnect_address_.second != -1) {
RAY_LOG(INFO) << "Reconnected to GCS server: " << address.first << ":"
<< address.second;
}
reconnected = true;
break;
}
}
std::this_thread::sleep_for(
std::chrono::milliseconds(kGCSReconnectionRetryIntervalMs));
}
if (reconnected) {
gcs_rpc_client_->Reset(address.first, address.second, *client_call_manager_);
last_reconnect_address_ = address;
last_reconnect_timestamp_ms_ = current_sys_time_ms();
} else {
RAY_LOG(FATAL) << "Couldn't reconnect to GCS server. The last attempted GCS "
"server address was "
<< address.first << ":" << address.second;
}
}
} // namespace gcs

View file

@ -14,8 +14,6 @@
#pragma once
#include <gtest/gtest_prod.h>
#include <boost/asio.hpp>
#include <memory>
#include <string>
@ -33,6 +31,7 @@
#include "ray/util/logging.h"
namespace ray {
namespace gcs {
/// \class GcsClientOptions
@ -70,9 +69,11 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
///
/// \param options Options for client.
/// \param get_gcs_server_address_func Function to get GCS server address.
explicit GcsClient(const GcsClientOptions &options);
explicit GcsClient(const GcsClientOptions &options,
std::function<bool(std::pair<std::string, int> *)>
get_gcs_server_address_func = nullptr);
virtual ~GcsClient() { Disconnect(); };
virtual ~GcsClient() = default;
/// Connect to GCS Service. Non-thread safe.
/// This function must be called before calling other functions.
@ -83,7 +84,7 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
/// Disconnect with GCS Service. Non-thread safe.
virtual void Disconnect();
virtual std::pair<std::string, int> GetGcsServerAddress() const;
virtual std::pair<std::string, int> GetGcsServerAddress();
/// Return client information for debug.
virtual std::string DebugString() const { return ""; }
@ -155,6 +156,9 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
protected:
GcsClientOptions options_;
/// Whether this client is connected to GCS.
std::atomic<bool> is_connected_{false};
std::unique_ptr<ActorInfoAccessor> actor_accessor_;
std::unique_ptr<JobInfoAccessor> job_accessor_;
std::unique_ptr<NodeInfoAccessor> node_accessor_;
@ -166,6 +170,21 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
std::unique_ptr<InternalKVAccessor> internal_kv_accessor_;
private:
/// Checks whether GCS at the specified address is healthy.
bool CheckHealth(const std::string &ip, int port, int64_t timeout_ms);
/// Fires a periodic timer to check if the connection to GCS is healthy.
void PeriodicallyCheckGcsConnection();
/// This function is used to redo subscription and reconnect to GCS RPC server when gcs
/// service failure is detected.
///
/// \param type The type of GCS service failure.
void GcsServiceFailureDetected(rpc::GcsServiceFailureType type);
/// Reconnect to GCS RPC server.
void ReconnectGcsServer();
const UniqueID gcs_client_id_ = UniqueID::FromRandom();
std::unique_ptr<GcsSubscriber> gcs_subscriber_;
@ -173,6 +192,19 @@ class RAY_EXPORT GcsClient : public std::enable_shared_from_this<GcsClient> {
// Gcs rpc client
std::shared_ptr<rpc::GcsRpcClient> gcs_rpc_client_;
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;
std::atomic<bool> disconnected_ = false;
// The runner to run function periodically.
std::unique_ptr<PeriodicalRunner> periodical_runner_;
std::function<bool(std::pair<std::string, int> *)> get_server_address_func_;
std::function<void()> resubscribe_func_;
std::pair<std::string, int> current_gcs_server_address_;
int64_t last_reconnect_timestamp_ms_;
std::pair<std::string, int> last_reconnect_address_;
std::optional<rpc::GcsServiceFailureType> current_connection_failure_;
/// Retry interval to reconnect to a GCS server.
const int64_t kGCSReconnectionRetryIntervalMs = 1000;
};
} // namespace gcs

View file

@ -1,315 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <boost/asio/ip/tcp.hpp>
#include <chrono>
#include "absl/strings/substitute.h"
#include "gtest/gtest.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/test_util.h"
#include "ray/gcs/gcs_client/accessor.h"
#include "ray/gcs/gcs_client/gcs_client.h"
#include "ray/gcs/gcs_server/gcs_server.h"
#include "ray/gcs/test/gcs_test_util.h"
#include "ray/rpc/gcs_server/gcs_rpc_client.h"
#include "ray/util/util.h"
using namespace std::chrono_literals;
using namespace ray;
class GcsClientReconnectionTest : public ::testing::Test {
public:
GcsClientReconnectionTest() { TestSetupUtil::StartUpRedisServers(std::vector<int>()); }
~GcsClientReconnectionTest() { TestSetupUtil::ShutDownRedisServers(); }
void StartGCS() {
RAY_CHECK(gcs_server_ == nullptr);
server_io_service_ = std::make_unique<instrumented_io_context>();
gcs_server_ = std::make_unique<gcs::GcsServer>(config_, *server_io_service_);
gcs_server_->Start();
server_io_service_thread_ = std::make_unique<std::thread>([this] {
std::unique_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(*server_io_service_));
server_io_service_->run();
});
// Wait until server starts listening.
while (!gcs_server_->IsStarted()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void ShutdownGCS() {
if (!gcs_server_) {
return;
}
server_io_service_->stop();
server_io_service_thread_->join();
gcs_server_->Stop();
gcs_server_.reset();
}
gcs::GcsClient *CreateGCSClient() {
RAY_CHECK(gcs_client_ == nullptr);
client_io_service_ = std::make_unique<instrumented_io_context>();
client_io_service_thread_ = std::make_unique<std::thread>([this] {
std::unique_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(*client_io_service_));
client_io_service_->run();
});
gcs::GcsClientOptions options("127.0.0.1:" +
std::to_string(config_.grpc_server_port));
gcs_client_ = std::make_unique<gcs::GcsClient>(options);
RAY_CHECK_OK(gcs_client_->Connect(*client_io_service_));
return gcs_client_.get();
}
void CloseGCSClient() {
if (!gcs_client_) {
return;
}
client_io_service_->stop();
client_io_service_thread_->join();
gcs_client_->Disconnect();
gcs_client_.reset();
}
bool WaitUntil(std::function<bool()> predicate, std::chrono::duration<int64_t> time_s) {
using namespace std::chrono;
auto start = steady_clock::now();
while (steady_clock::now() - start <= time_s) {
if (predicate()) {
return true;
}
std::this_thread::sleep_for(10ms);
}
return false;
}
protected:
unsigned short GetFreePort() {
using namespace boost::asio;
io_service service;
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 0));
unsigned short port = acceptor.local_endpoint().port();
return port;
}
void SetUp() override {
config_.redis_address = "127.0.0.1";
config_.enable_sharding_conn = false;
config_.redis_port = TEST_REDIS_SERVER_PORTS.front();
config_.grpc_server_port = GetFreePort();
config_.grpc_server_name = "MockedGcsServer";
config_.grpc_server_thread_num = 1;
config_.node_ip_address = "127.0.0.1";
config_.enable_sharding_conn = false;
}
void TearDown() override {
ShutdownGCS();
CloseGCSClient();
TestSetupUtil::FlushAllRedisServers();
}
// GCS server.
gcs::GcsServerConfig config_;
std::unique_ptr<gcs::GcsServer> gcs_server_;
std::unique_ptr<std::thread> server_io_service_thread_;
std::unique_ptr<instrumented_io_context> server_io_service_;
// GCS client.
std::unique_ptr<std::thread> client_io_service_thread_;
std::unique_ptr<instrumented_io_context> client_io_service_;
std::unique_ptr<gcs::GcsClient> gcs_client_;
// Timeout waiting for GCS server reply, default is 2s.
const std::chrono::milliseconds timeout_ms_{2000};
};
TEST_F(GcsClientReconnectionTest, ReconnectionBasic) {
RayConfig::instance().initialize(
R"(
{
"gcs_rpc_server_reconnect_timeout_s": 60,
"gcs_storage": "redis"
}
)");
// Start GCS server
StartGCS();
// Create client and send KV request
auto client = CreateGCSClient();
std::promise<void> p0;
auto f0 = p0.get_future();
RAY_UNUSED(client->InternalKV().AsyncInternalKVPut(
"", "A", "B", false, [&p0](auto status, auto) {
ASSERT_TRUE(status.ok()) << status.ToString();
p0.set_value();
}));
f0.get();
// Shutdown GCS server
ShutdownGCS();
// Send get request
std::promise<std::string> p1;
auto f1 = p1.get_future();
RAY_UNUSED(client->InternalKV().AsyncInternalKVGet("", "A", [&p1](auto status, auto p) {
ASSERT_TRUE(status.ok()) << status.ToString();
p1.set_value(*p);
}));
ASSERT_EQ(f1.wait_for(1s), std::future_status::timeout);
// Make sure io context is not blocked
std::promise<void> p2;
client_io_service_->post([&p2]() { p2.set_value(); }, "");
auto f2 = p2.get_future();
f2.wait();
// Resume GCS server
StartGCS();
// Make sure the request is executed
ASSERT_EQ(f1.get(), "B");
}
TEST_F(GcsClientReconnectionTest, ReconnectionBackoff) {
// This test is to ensure that during reconnection, we got the right status
// of the channel and also very basic test to verify gRPC's backoff is working.
RayConfig::instance().initialize(
R"(
{
"gcs_rpc_server_reconnect_timeout_s": 60,
"gcs_storage": "redis",
"gcs_grpc_initial_reconnect_backoff_ms": 5000
}
)");
StartGCS();
auto client = CreateGCSClient();
std::promise<void> p1;
auto f1 = p1.get_future();
RAY_UNUSED(client->InternalKV().AsyncInternalKVPut(
"", "A", "B", false, [&p1](auto status, auto) {
ASSERT_TRUE(status.ok()) << status.ToString();
p1.set_value();
}));
ASSERT_NE(f1.wait_for(1s), std::future_status::timeout);
auto channel = client->GetGcsRpcClient().GetChannel();
ASSERT_EQ(GRPC_CHANNEL_READY, channel->GetState(false));
ShutdownGCS();
RAY_UNUSED(
client->InternalKV().AsyncInternalKVPut("", "A", "B", false, [](auto, auto) {}));
ASSERT_TRUE(WaitUntil(
[channel]() {
auto status = channel->GetState(false);
return status == GRPC_CHANNEL_TRANSIENT_FAILURE;
},
1s));
StartGCS();
// For 2s, there is no reconnection
ASSERT_FALSE(WaitUntil(
[channel]() {
auto status = channel->GetState(false);
return status != GRPC_CHANNEL_TRANSIENT_FAILURE;
},
2s));
// Then there is reconnection
ASSERT_TRUE(WaitUntil(
[channel]() {
auto status = channel->GetState(false);
return status != GRPC_CHANNEL_TRANSIENT_FAILURE;
},
5s));
// Eventually it should be ready.
ASSERT_FALSE(WaitUntil(
[channel]() {
auto status = channel->GetState(false);
return status != GRPC_CHANNEL_READY;
},
1s));
}
TEST_F(GcsClientReconnectionTest, QueueingAndBlocking) {
RayConfig::instance().initialize(
R"(
{
"gcs_rpc_server_reconnect_timeout_s": 60,
"gcs_storage": "redis",
"gcs_grpc_max_request_queued_max_bytes": 10
}
)");
StartGCS();
auto client = CreateGCSClient();
std::promise<void> p1;
auto f1 = p1.get_future();
RAY_UNUSED(client->InternalKV().AsyncInternalKVPut(
"", "A", "B", false, [&p1](auto status, auto) {
ASSERT_TRUE(status.ok()) << status.ToString();
p1.set_value();
}));
f1.get();
ShutdownGCS();
// Send one request which should fail
RAY_UNUSED(client->InternalKV().AsyncInternalKVPut(
"", "A", "B", false, [](auto status, auto) {}));
// Make sure it's not blocking
std::promise<void> p2;
client_io_service_->post([&p2]() { p2.set_value(); }, "");
auto f2 = p2.get_future();
ASSERT_EQ(std::future_status::ready, f2.wait_for(1s));
// Send the second one and it should block the thread
RAY_UNUSED(client->InternalKV().AsyncInternalKVPut(
"", "A", "B", false, [](auto status, auto) {}));
std::this_thread::sleep_for(1s);
std::promise<void> p3;
client_io_service_->post([&p3]() { p3.set_value(); }, "");
auto f3 = p3.get_future();
ASSERT_EQ(std::future_status::timeout, f3.wait_for(1s));
// Resume GCS server and it should unblock
StartGCS();
ASSERT_EQ(std::future_status::ready, f3.wait_for(2s));
}
int main(int argc, char **argv) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
ray::RayLog::ShutDownRayLog,
argv[0],
ray::RayLogLevel::INFO,
/*log_dir=*/"");
::testing::InitGoogleTest(&argc, argv);
RAY_CHECK(argc == 3);
ray::TEST_REDIS_SERVER_EXEC_PATH = argv[1];
ray::TEST_REDIS_CLIENT_EXEC_PATH = argv[2];
return RUN_ALL_TESTS();
}

View file

@ -258,9 +258,6 @@ class ClientCallManager {
return call;
}
/// Get the main service of this rpc.
instrumented_io_context &GetMainService() { return main_service_; }
private:
/// This function runs in a background thread. It keeps polling events from the
/// `CompletionQueue`, and dispatches the event to the callbacks via the `ClientCall`

View file

@ -14,11 +14,6 @@
#pragma once
#include <gtest/gtest_prod.h>
#include <chrono>
#include <thread>
#include "ray/common/network_util.h"
#include "ray/rpc/grpc_client.h"
#include "src/ray/protobuf/gcs_service.grpc.pb.h"
@ -103,22 +98,8 @@ class Executor {
callback(status, reply); \
delete executor; \
} else { \
/* In case of GCS failure, we queue the request and these requets will be */ \
/* executed once GCS is back. */ \
gcs_is_down_ = true; \
pending_requests_.emplace_back(executor); \
pending_requests_bytes_ += request.ByteSizeLong(); \
if (pending_requests_bytes_ > \
::RayConfig::instance().gcs_grpc_max_request_queued_max_bytes()) { \
RAY_LOG(WARNING) << "Pending queue for failed GCS request has reached the " \
<< "limit. Blocking the current thread until GCS is back"; \
while (gcs_is_down_ && !shutdown_) { \
CheckChannelStatus(); \
std::this_thread::sleep_for(std::chrono::milliseconds( \
::RayConfig::instance() \
.gcs_client_check_connection_status_interval_milliseconds())); \
} \
} \
gcs_service_failure_detected_(GcsServiceFailureType::RPC_DISCONNECT); \
executor->Retry(); \
} \
}; \
auto operation = \
@ -149,75 +130,48 @@ class Executor {
/// Client used for communicating with gcs server.
class GcsRpcClient {
public:
/// Constructor. GcsRpcClient is not thread safe.
/// Constructor.
///
/// \param[in] address Address of gcs server.
/// \param[in] port Port of the gcs server.
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
/// \param[in] gcs_service_failure_detected The function is used to redo subscription
/// and reconnect to GCS RPC server when gcs service failure is detected.
/// \param[in] reconnection_callback The callback function when the channel get
/// reconnected due to some error.
GcsRpcClient(const std::string &address,
const int port,
ClientCallManager &client_call_manager)
: gcs_address_(address),
gcs_port_(port),
io_context_(&client_call_manager.GetMainService()),
periodical_runner_(std::make_unique<PeriodicalRunner>(*io_context_)) {
grpc::ChannelArguments arguments;
arguments.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_max_reconnect_backoff_ms());
arguments.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_min_reconnect_backoff_ms());
arguments.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS,
::RayConfig::instance().gcs_grpc_initial_reconnect_backoff_ms());
channel_ = BuildChannel(address, port, arguments);
// If not the reconnection will continue to work.
auto deadline =
std::chrono::system_clock::now() +
std::chrono::seconds(::RayConfig::instance().gcs_rpc_server_connect_timeout_s());
if (!channel_->WaitForConnected(deadline)) {
RAY_LOG(ERROR) << "Failed to connect to GCS at address " << address << ":" << port
<< " within "
<< ::RayConfig::instance().gcs_rpc_server_connect_timeout_s()
<< " seconds.";
gcs_is_down_ = true;
} else {
gcs_is_down_ = false;
}
GcsRpcClient(
const std::string &address,
const int port,
ClientCallManager &client_call_manager,
std::function<void(GcsServiceFailureType)> gcs_service_failure_detected = nullptr)
: gcs_service_failure_detected_(std::move(gcs_service_failure_detected)) {
Reset(address, port, client_call_manager);
};
void Reset(const std::string &address,
const int port,
ClientCallManager &client_call_manager) {
auto channel = BuildChannel(address, port);
job_info_grpc_client_ =
std::make_unique<GrpcClient<JobInfoGcsService>>(channel_, client_call_manager);
std::make_unique<GrpcClient<JobInfoGcsService>>(channel, client_call_manager);
actor_info_grpc_client_ =
std::make_unique<GrpcClient<ActorInfoGcsService>>(channel_, client_call_manager);
std::make_unique<GrpcClient<ActorInfoGcsService>>(channel, client_call_manager);
node_info_grpc_client_ =
std::make_unique<GrpcClient<NodeInfoGcsService>>(channel_, client_call_manager);
std::make_unique<GrpcClient<NodeInfoGcsService>>(channel, client_call_manager);
node_resource_info_grpc_client_ =
std::make_unique<GrpcClient<NodeResourceInfoGcsService>>(channel_,
std::make_unique<GrpcClient<NodeResourceInfoGcsService>>(channel,
client_call_manager);
heartbeat_info_grpc_client_ = std::make_unique<GrpcClient<HeartbeatInfoGcsService>>(
channel_, client_call_manager);
channel, client_call_manager);
stats_grpc_client_ =
std::make_unique<GrpcClient<StatsGcsService>>(channel_, client_call_manager);
std::make_unique<GrpcClient<StatsGcsService>>(channel, client_call_manager);
worker_info_grpc_client_ =
std::make_unique<GrpcClient<WorkerInfoGcsService>>(channel_, client_call_manager);
std::make_unique<GrpcClient<WorkerInfoGcsService>>(channel, client_call_manager);
placement_group_info_grpc_client_ =
std::make_unique<GrpcClient<PlacementGroupInfoGcsService>>(channel_,
std::make_unique<GrpcClient<PlacementGroupInfoGcsService>>(channel,
client_call_manager);
internal_kv_grpc_client_ =
std::make_unique<GrpcClient<InternalKVGcsService>>(channel_, client_call_manager);
std::make_unique<GrpcClient<InternalKVGcsService>>(channel, client_call_manager);
internal_pubsub_grpc_client_ = std::make_unique<GrpcClient<InternalPubSubGcsService>>(
channel_, client_call_manager);
// Setup monitor for gRPC channel status.
// TODO(iycheng): Push this into ClientCallManager with CQ by using async call.
periodical_runner_->RunFnPeriodically(
[this] { CheckChannelStatus(); },
::RayConfig::instance()
.gcs_client_check_connection_status_interval_milliseconds());
channel, client_call_manager);
}
/// Add job info to GCS Service.
@ -458,63 +412,8 @@ class GcsRpcClient {
GcsSubscriberCommandBatch,
internal_pubsub_grpc_client_,
/*method_timeout_ms*/ -1, )
void Shutdown() {
bool is_shutdown = false;
if (!shutdown_.compare_exchange_strong(is_shutdown, true)) {
RAY_LOG(DEBUG) << "GCS client has already been shutdown.";
}
}
std::pair<std::string, int64_t> GetAddress() const {
return std::make_pair(gcs_address_, gcs_port_);
}
std::shared_ptr<grpc::Channel> GetChannel() const { return channel_; }
private:
void CheckChannelStatus() {
if (shutdown_) {
return;
}
auto status = channel_->GetState(false);
// https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html
// https://grpc.github.io/grpc/core/connectivity__state_8h_source.html
RAY_LOG(DEBUG) << "GCS channel status: " << status;
switch (status) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_CONNECTING:
if (!gcs_is_down_) {
gcs_is_down_ = true;
} else {
RAY_CHECK(absl::ToInt64Seconds(absl::Now() - gcs_last_alive_time_) <
::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s())
<< "Failed to connect to GCS within "
<< ::RayConfig::instance().gcs_rpc_server_reconnect_timeout_s() << " seconds";
}
break;
case GRPC_CHANNEL_SHUTDOWN:
RAY_CHECK(shutdown_) << "Channel shoud never go to this status.";
break;
case GRPC_CHANNEL_READY:
case GRPC_CHANNEL_IDLE:
gcs_last_alive_time_ = absl::Now();
gcs_is_down_ = false;
// Retry the one queued.
while (!pending_requests_.empty()) {
pending_requests_.back()->Retry();
pending_requests_.pop_back();
}
pending_requests_bytes_ = 0;
break;
default:
RAY_CHECK(false) << "Not covered status: " << status;
}
}
std::string gcs_address_;
int64_t gcs_port_;
instrumented_io_context *io_context_;
std::function<void(GcsServiceFailureType)> gcs_service_failure_detected_;
/// The gRPC-generated stub.
std::unique_ptr<GrpcClient<JobInfoGcsService>> job_info_grpc_client_;
@ -528,18 +427,6 @@ class GcsRpcClient {
placement_group_info_grpc_client_;
std::unique_ptr<GrpcClient<InternalKVGcsService>> internal_kv_grpc_client_;
std::unique_ptr<GrpcClient<InternalPubSubGcsService>> internal_pubsub_grpc_client_;
std::shared_ptr<grpc::Channel> channel_;
bool gcs_is_down_ = false;
absl::Time gcs_last_alive_time_ = absl::Now();
std::atomic<bool> shutdown_ = false;
std::unique_ptr<PeriodicalRunner> periodical_runner_;
std::vector<Executor *> pending_requests_;
size_t pending_requests_bytes_ = 0;
friend class GcsClientReconnectionTest;
FRIEND_TEST(GcsClientReconnectionTest, ReconnectionBackoff);
};
} // namespace rpc

View file

@ -51,13 +51,12 @@ inline std::shared_ptr<grpc::Channel> BuildChannel(
std::optional<grpc::ChannelArguments> arguments = std::nullopt) {
if (!arguments.has_value()) {
arguments = grpc::ChannelArguments();
arguments->SetInt(GRPC_ARG_ENABLE_HTTP_PROXY,
::RayConfig::instance().grpc_enable_http_proxy() ? 1 : 0);
arguments->SetMaxSendMessageSize(::RayConfig::instance().max_grpc_message_size());
arguments->SetMaxReceiveMessageSize(::RayConfig::instance().max_grpc_message_size());
}
arguments->SetInt(GRPC_ARG_ENABLE_HTTP_PROXY,
::RayConfig::instance().grpc_enable_http_proxy() ? 1 : 0);
arguments->SetMaxSendMessageSize(::RayConfig::instance().max_grpc_message_size());
arguments->SetMaxReceiveMessageSize(::RayConfig::instance().max_grpc_message_size());
std::shared_ptr<grpc::Channel> channel;
if (::RayConfig::instance().USE_TLS()) {
std::string server_cert_file = std::string(::RayConfig::instance().TLS_SERVER_CERT());