[core][gcs] Make GCS client working with timeout_ms. (#25975)

In [PR](https://github.com/ray-project/ray/pull/24764) we move the reconnection to GcsRPCClient. In case of a GCS failure, we'll queue the requests and resent them once GCS is back.
This actually breaks request with timeout because  now, the request will be queued and never got a response. This PR fixed it.

For all requests, it'll be stored by the time it's supposed to be timeout. When GCS is down, we'll check the queued requests and make sure if it's timeout, we'll reply immediately with a Timeout error message.
This commit is contained in:
Yi Cheng 2022-06-23 01:02:29 +00:00 committed by GitHub
parent 4d8a82bdf6
commit a1f02f68b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 12 deletions

View file

@ -111,6 +111,7 @@ cc_library(
":ray_common",
"@boost//:asio",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/container:btree",
],
)

View file

@ -469,6 +469,48 @@ assert ray.get(a.r.remote(10)) == 10
)
@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
{
**generate_system_config_map(
num_heartbeats_timeout=20,
gcs_rpc_server_reconnect_timeout_s=3600,
gcs_server_request_timeout_seconds=10,
),
"namespace": "actor",
}
],
indirect=True,
)
def test_named_actor_workloads(ray_start_regular_with_external_redis):
"""This test cover the case to create actor while gcs is down
and also make sure existing actor continue to work even when
GCS is down.
"""
@ray.remote
class Counter:
def r(self, v):
return v
c = Counter.options(name="c", lifetime="detached").remote()
r = ray.get(c.r.remote(10))
assert r == 10
print("GCS is killed")
ray.worker._global_node.kill_gcs_server()
print("Start to create a new actor")
with pytest.raises(ray.exceptions.GetTimeoutError):
cc = Counter.options(name="cc", lifetime="detached").remote()
assert ray.get(c.r.remote(10)) == 10
ray.worker._global_node.start_gcs_server()
cc = Counter.options(name="cc", lifetime="detached").remote()
assert ray.get(cc.r.remote(10)) == 10
@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
@ -516,9 +558,10 @@ def test_pg_actor_workloads(ray_start_regular_with_external_redis):
if __name__ == "__main__":
import pytest
import os
import pytest
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:

View file

@ -326,6 +326,32 @@ TEST_F(GcsClientReconnectionTest, QueueingAndBlocking) {
ASSERT_EQ(std::future_status::ready, f3.wait_for(5s));
}
TEST_F(GcsClientReconnectionTest, Timeout) {
RayConfig::instance().initialize(
R"(
{
"gcs_rpc_server_reconnect_timeout_s": 60,
"gcs_storage": "redis",
"gcs_grpc_max_request_queued_max_bytes": 10,
"gcs_server_request_timeout_seconds": 3
}
)");
StartGCS();
auto client = CreateGCSClient();
bool added = false;
ASSERT_TRUE(client->InternalKV().Put("", "A", "B", false, added).ok());
ASSERT_TRUE(added);
ShutdownGCS();
std::vector<std::string> values;
ASSERT_TRUE(client->InternalKV().Keys("", "A", values).IsTimedOut());
ASSERT_TRUE(values.empty());
StartGCS();
ASSERT_TRUE(client->InternalKV().Keys("", "A", values).ok());
ASSERT_EQ(std::vector<std::string>{"A"}, values);
}
int main(int argc, char **argv) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
ray::RayLog::ShutDownRayLog,

View file

@ -19,6 +19,7 @@
#include <chrono>
#include <thread>
#include "absl/container/btree_map.h"
#include "ray/common/network_util.h"
#include "ray/rpc/grpc_client.h"
#include "src/ray/protobuf/gcs_service.grpc.pb.h"
@ -32,21 +33,26 @@ class GcsRpcClient;
/// Executor saves operation and support retries.
class Executor {
public:
explicit Executor(GcsRpcClient *gcs_rpc_client) : gcs_rpc_client_(gcs_rpc_client) {}
Executor(GcsRpcClient *gcs_rpc_client,
std::function<void(const ray::Status &)> abort_callback)
: gcs_rpc_client_(gcs_rpc_client), abort_callback_(std::move(abort_callback)) {}
/// This function is used to execute the given operation.
///
/// \param operation The operation to be executed.
void Execute(const std::function<void(GcsRpcClient *gcs_rpc_client)> &operation) {
operation_ = operation;
operation(gcs_rpc_client_);
void Execute(std::function<void(GcsRpcClient *gcs_rpc_client)> operation) {
operation_ = std::move(operation);
operation_(gcs_rpc_client_);
}
/// This function is used to retry the given operation.
void Retry() { operation_(gcs_rpc_client_); }
void Abort(const ray::Status &status) { abort_callback_(status); }
private:
GcsRpcClient *gcs_rpc_client_;
std::function<void(ray::Status)> abort_callback_;
std::function<void(GcsRpcClient *gcs_rpc_client)> operation_;
};
@ -88,8 +94,10 @@ class Executor {
void METHOD(const METHOD##Request &request, \
const ClientCallback<METHOD##Reply> &callback, \
const int64_t timeout_ms = method_timeout_ms) SPECS { \
auto executor = new Executor(this); \
auto operation_callback = [this, request, callback, executor]( \
auto executor = new Executor(this, [callback](const ray::Status &status) { \
callback(status, METHOD##Reply()); \
}); \
auto operation_callback = [this, request, callback, executor, timeout_ms]( \
const ray::Status &status, \
const METHOD##Reply &reply) { \
if (status.IsTimedOut()) { \
@ -125,7 +133,10 @@ class Executor {
} \
} else { \
pending_requests_bytes_ += request_bytes; \
pending_requests_.emplace_back(executor); \
auto timeout = timeout_ms == -1 \
? absl::InfiniteFuture() \
: absl::Now() + absl::Milliseconds(timeout_ms); \
pending_requests_.emplace(timeout, std::make_pair(executor, request_bytes)); \
} \
} \
}; \
@ -138,7 +149,7 @@ class Executor {
gcs_rpc_client->grpc_client, \
timeout_ms)); \
}; \
executor->Execute(operation); \
executor->Execute(std::move(operation)); \
} \
ray::Status Sync##METHOD(const METHOD##Request &request, \
METHOD##Reply *reply_in, \
@ -496,10 +507,27 @@ class GcsRpcClient {
if (shutdown_) {
return;
}
auto status = channel_->GetState(false);
// https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html
// https://grpc.github.io/grpc/core/connectivity__state_8h_source.html
RAY_LOG(DEBUG) << "GCS channel status: " << status;
// We need to cleanup all the pending requets which are timeout.
auto now = absl::Now();
while (!pending_requests_.empty()) {
auto iter = pending_requests_.begin();
if (iter->first > now) {
break;
}
auto [executor, request_bytes] = iter->second;
executor->Abort(
ray::Status::TimedOut("Timed out while waiting for GCS to become available."));
pending_requests_bytes_ -= request_bytes;
delete executor;
pending_requests_.erase(iter);
}
switch (status) {
case GRPC_CHANNEL_TRANSIENT_FAILURE:
case GRPC_CHANNEL_CONNECTING:
@ -521,8 +549,8 @@ class GcsRpcClient {
gcs_is_down_ = false;
// Retry the one queued.
while (!pending_requests_.empty()) {
pending_requests_.back()->Retry();
pending_requests_.pop_back();
pending_requests_.begin()->second.first->Retry();
pending_requests_.erase(pending_requests_.begin());
}
pending_requests_bytes_ = 0;
break;
@ -560,7 +588,7 @@ class GcsRpcClient {
absl::Time gcs_last_alive_time_ = absl::Now();
std::atomic<bool> shutdown_ = false;
std::vector<Executor *> pending_requests_;
absl::btree_multimap<absl::Time, std::pair<Executor *, size_t>> pending_requests_;
size_t pending_requests_bytes_ = 0;
friend class GcsClientReconnectionTest;