From a1f02f68b7c12fc92e2419cc333349450341fa3e Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 23 Jun 2022 01:02:29 +0000 Subject: [PATCH] [core][gcs] Make GCS client working with timeout_ms. (#25975) In [PR](https://github.com/ray-project/ray/pull/24764) we move the reconnection to GcsRPCClient. In case of a GCS failure, we'll queue the requests and resent them once GCS is back. This actually breaks request with timeout because now, the request will be queued and never got a response. This PR fixed it. For all requests, it'll be stored by the time it's supposed to be timeout. When GCS is down, we'll check the queued requests and make sure if it's timeout, we'll reply immediately with a Timeout error message. --- BUILD.bazel | 1 + python/ray/tests/test_gcs_fault_tolerance.py | 45 ++++++++++++++++- .../test/gcs_client_reconnection_test.cc | 26 ++++++++++ src/ray/rpc/gcs_server/gcs_rpc_client.h | 50 +++++++++++++++---- 4 files changed, 110 insertions(+), 12 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index a99d03aff..d5bfeae45 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -111,6 +111,7 @@ cc_library( ":ray_common", "@boost//:asio", "@com_github_grpc_grpc//:grpc++", + "@com_google_absl//absl/container:btree", ], ) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 89aa3b2b3..430618d66 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -469,6 +469,48 @@ assert ray.get(a.r.remote(10)) == 10 ) +@pytest.mark.parametrize( + "ray_start_regular_with_external_redis", + [ + { + **generate_system_config_map( + num_heartbeats_timeout=20, + gcs_rpc_server_reconnect_timeout_s=3600, + gcs_server_request_timeout_seconds=10, + ), + "namespace": "actor", + } + ], + indirect=True, +) +def test_named_actor_workloads(ray_start_regular_with_external_redis): + """This test cover the case to create actor while gcs is down + and also make sure existing actor continue to work even when + GCS is down. + """ + + @ray.remote + class Counter: + def r(self, v): + return v + + c = Counter.options(name="c", lifetime="detached").remote() + r = ray.get(c.r.remote(10)) + assert r == 10 + + print("GCS is killed") + ray.worker._global_node.kill_gcs_server() + + print("Start to create a new actor") + with pytest.raises(ray.exceptions.GetTimeoutError): + cc = Counter.options(name="cc", lifetime="detached").remote() + + assert ray.get(c.r.remote(10)) == 10 + ray.worker._global_node.start_gcs_server() + cc = Counter.options(name="cc", lifetime="detached").remote() + assert ray.get(cc.r.remote(10)) == 10 + + @pytest.mark.parametrize( "ray_start_regular_with_external_redis", [ @@ -516,9 +558,10 @@ def test_pg_actor_workloads(ray_start_regular_with_external_redis): if __name__ == "__main__": - import pytest import os + import pytest + if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) else: diff --git a/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc index 52da54dd5..e81cc30f0 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_reconnection_test.cc @@ -326,6 +326,32 @@ TEST_F(GcsClientReconnectionTest, QueueingAndBlocking) { ASSERT_EQ(std::future_status::ready, f3.wait_for(5s)); } +TEST_F(GcsClientReconnectionTest, Timeout) { + RayConfig::instance().initialize( + R"( +{ + "gcs_rpc_server_reconnect_timeout_s": 60, + "gcs_storage": "redis", + "gcs_grpc_max_request_queued_max_bytes": 10, + "gcs_server_request_timeout_seconds": 3 +} + )"); + StartGCS(); + auto client = CreateGCSClient(); + bool added = false; + ASSERT_TRUE(client->InternalKV().Put("", "A", "B", false, added).ok()); + ASSERT_TRUE(added); + + ShutdownGCS(); + + std::vector values; + ASSERT_TRUE(client->InternalKV().Keys("", "A", values).IsTimedOut()); + ASSERT_TRUE(values.empty()); + StartGCS(); + ASSERT_TRUE(client->InternalKV().Keys("", "A", values).ok()); + ASSERT_EQ(std::vector{"A"}, values); +} + int main(int argc, char **argv) { InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog, ray::RayLog::ShutDownRayLog, diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index cda0e6b7d..5f1a2f4fc 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -19,6 +19,7 @@ #include #include +#include "absl/container/btree_map.h" #include "ray/common/network_util.h" #include "ray/rpc/grpc_client.h" #include "src/ray/protobuf/gcs_service.grpc.pb.h" @@ -32,21 +33,26 @@ class GcsRpcClient; /// Executor saves operation and support retries. class Executor { public: - explicit Executor(GcsRpcClient *gcs_rpc_client) : gcs_rpc_client_(gcs_rpc_client) {} + Executor(GcsRpcClient *gcs_rpc_client, + std::function abort_callback) + : gcs_rpc_client_(gcs_rpc_client), abort_callback_(std::move(abort_callback)) {} /// This function is used to execute the given operation. /// /// \param operation The operation to be executed. - void Execute(const std::function &operation) { - operation_ = operation; - operation(gcs_rpc_client_); + void Execute(std::function operation) { + operation_ = std::move(operation); + operation_(gcs_rpc_client_); } /// This function is used to retry the given operation. void Retry() { operation_(gcs_rpc_client_); } + void Abort(const ray::Status &status) { abort_callback_(status); } + private: GcsRpcClient *gcs_rpc_client_; + std::function abort_callback_; std::function operation_; }; @@ -88,8 +94,10 @@ class Executor { void METHOD(const METHOD##Request &request, \ const ClientCallback &callback, \ const int64_t timeout_ms = method_timeout_ms) SPECS { \ - auto executor = new Executor(this); \ - auto operation_callback = [this, request, callback, executor]( \ + auto executor = new Executor(this, [callback](const ray::Status &status) { \ + callback(status, METHOD##Reply()); \ + }); \ + auto operation_callback = [this, request, callback, executor, timeout_ms]( \ const ray::Status &status, \ const METHOD##Reply &reply) { \ if (status.IsTimedOut()) { \ @@ -125,7 +133,10 @@ class Executor { } \ } else { \ pending_requests_bytes_ += request_bytes; \ - pending_requests_.emplace_back(executor); \ + auto timeout = timeout_ms == -1 \ + ? absl::InfiniteFuture() \ + : absl::Now() + absl::Milliseconds(timeout_ms); \ + pending_requests_.emplace(timeout, std::make_pair(executor, request_bytes)); \ } \ } \ }; \ @@ -138,7 +149,7 @@ class Executor { gcs_rpc_client->grpc_client, \ timeout_ms)); \ }; \ - executor->Execute(operation); \ + executor->Execute(std::move(operation)); \ } \ ray::Status Sync##METHOD(const METHOD##Request &request, \ METHOD##Reply *reply_in, \ @@ -496,10 +507,27 @@ class GcsRpcClient { if (shutdown_) { return; } + auto status = channel_->GetState(false); // https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html // https://grpc.github.io/grpc/core/connectivity__state_8h_source.html RAY_LOG(DEBUG) << "GCS channel status: " << status; + + // We need to cleanup all the pending requets which are timeout. + auto now = absl::Now(); + while (!pending_requests_.empty()) { + auto iter = pending_requests_.begin(); + if (iter->first > now) { + break; + } + auto [executor, request_bytes] = iter->second; + executor->Abort( + ray::Status::TimedOut("Timed out while waiting for GCS to become available.")); + pending_requests_bytes_ -= request_bytes; + delete executor; + pending_requests_.erase(iter); + } + switch (status) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_CONNECTING: @@ -521,8 +549,8 @@ class GcsRpcClient { gcs_is_down_ = false; // Retry the one queued. while (!pending_requests_.empty()) { - pending_requests_.back()->Retry(); - pending_requests_.pop_back(); + pending_requests_.begin()->second.first->Retry(); + pending_requests_.erase(pending_requests_.begin()); } pending_requests_bytes_ = 0; break; @@ -560,7 +588,7 @@ class GcsRpcClient { absl::Time gcs_last_alive_time_ = absl::Now(); std::atomic shutdown_ = false; - std::vector pending_requests_; + absl::btree_multimap> pending_requests_; size_t pending_requests_bytes_ = 0; friend class GcsClientReconnectionTest;