From b95c50caa21c0287dfcaf166d09305308e623766 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Fri, 20 May 2022 18:26:05 -0700 Subject: [PATCH] [core][syncer][6.1] Buffering long-polling for efficiency. (#24884) Compared with pushing based model, long-polling is slower because to send a message, you need to wait until you receive the polling requests. This PR improves this by sending X polling requests so that at most there can be 10 requests flying in the middle and this can improve the perf. Tested with `many_nodes_actor_tests` and no regression: ``` Actor launch time: 1.6540390349998688 (5000 actors) Actor ready time: 13.953653221999957 (5000 actors) Total time: 15.607692256999826 (5000 actors) ``` 1. [X] Deprecate old code path for publish node resource change 2. [X] Move poller and broadcaster into RaySyncer 3. [X] Deprecate the old pg reporting code path 4. [X] Remove syncer from gcs cluster resource manager and encapsulate everything in syncer module. 5. [X] Versioning the report 6. [ ] Introduce Reporter/Receiver API in the prototype and adaptor rayler and gcs with that. 7. [ ] Experiment with protocol & communication layer change. --- src/ray/common/ray_config_def.h | 4 +++ src/ray/common/ray_syncer/ray_syncer-inl.h | 7 ++-- src/ray/common/ray_syncer/ray_syncer.cc | 40 +++++++++++----------- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index f419a95f8..fe7067758 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -373,6 +373,10 @@ RAY_CONFIG(int32_t, gcs_client_check_connection_status_interval_milliseconds, 10 /// Feature flag to use the ray syncer for resource synchronization RAY_CONFIG(bool, use_ray_syncer, false) +/// The queuing buffer of ray syncer. This indicates how many concurrent +/// requests can run in flight for syncing. +RAY_CONFIG(int64_t, ray_syncer_polling_buffer, 5) + /// The interval at which the gcs client will check if the address of gcs service has /// changed. When the address changed, we will resubscribe again. RAY_CONFIG(uint64_t, gcs_service_address_check_interval_milliseconds, 1000) diff --git a/src/ray/common/ray_syncer/ray_syncer-inl.h b/src/ray/common/ray_syncer/ray_syncer-inl.h index 17948622f..506ab14b6 100644 --- a/src/ray/common/ray_syncer/ray_syncer-inl.h +++ b/src/ray/common/ray_syncer/ray_syncer-inl.h @@ -162,8 +162,8 @@ class ServerSyncConnection : public NodeSyncConnection { /// After the message being sent, these two fields will be set to be empty again. /// When the periodical timer wake up, it'll check whether these two fields are set /// and it'll only send data when these are set. - RaySyncMessages *response_ = nullptr; - grpc::ServerUnaryReactor *unary_reactor_ = nullptr; + std::vector responses_; + std::vector unary_reactors_; }; /// SyncConnection for gRPC client side. It has customized logic for sending. @@ -186,9 +186,6 @@ class ClientSyncConnection : public NodeSyncConnection { /// Stub for this connection. std::unique_ptr stub_; - /// Where the received message is stored. - ray::rpc::syncer::RaySyncMessages in_message_; - /// Dummy request for long-polling. DummyRequest dummy_; }; diff --git a/src/ray/common/ray_syncer/ray_syncer.cc b/src/ray/common/ray_syncer/ray_syncer.cc index 3972efd58..3c9ae2e17 100644 --- a/src/ray/common/ray_syncer/ray_syncer.cc +++ b/src/ray/common/ray_syncer/ray_syncer.cc @@ -131,7 +131,9 @@ ClientSyncConnection::ClientSyncConnection( std::shared_ptr channel) : NodeSyncConnection(io_context, node_id, std::move(message_processor)), stub_(ray::rpc::syncer::RaySyncer::NewStub(channel)) { - StartLongPolling(); + for (int64_t i = 0; i < RayConfig::instance().ray_syncer_polling_buffer(); ++i) { + StartLongPolling(); + } } void ClientSyncConnection::StartLongPolling() { @@ -139,19 +141,18 @@ void ClientSyncConnection::StartLongPolling() { // 1. there is a new version of message // 2. and it has passed X ms since last update. auto client_context = std::make_shared(); + auto in_message = std::make_shared(); stub_->async()->LongPolling( client_context.get(), &dummy_, - &in_message_, - [this, client_context](grpc::Status status) { + in_message.get(), + [this, client_context, in_message](grpc::Status status) mutable { if (status.ok()) { - RAY_CHECK(in_message_.GetArena() == nullptr); io_context_.dispatch( - [this, messages = std::move(in_message_)]() mutable { + [this, messages = std::move(*in_message)]() mutable { ReceiveUpdate(std::move(messages)); }, "LongPollingCallback"); - in_message_.Clear(); // Start the next polling. StartLongPolling(); } @@ -204,26 +205,25 @@ ServerSyncConnection::ServerSyncConnection( ServerSyncConnection::~ServerSyncConnection() { // If there is a pending request, we need to cancel it. Otherwise, rpc will // hang there forever. - if (unary_reactor_ != nullptr) { - unary_reactor_->Finish(grpc::Status::CANCELLED); + while (!unary_reactors_.empty()) { + unary_reactors_.back()->Finish(grpc::Status::CANCELLED); + unary_reactors_.pop_back(); } } void ServerSyncConnection::HandleLongPollingRequest(grpc::ServerUnaryReactor *reactor, RaySyncMessages *response) { - RAY_CHECK(response_ == nullptr); - RAY_CHECK(unary_reactor_ == nullptr); - - unary_reactor_ = reactor; - response_ = response; + unary_reactors_.push_back(reactor); + responses_.push_back(response); } void ServerSyncConnection::DoSend() { // There is no receive request - if (unary_reactor_ == nullptr || sending_buffer_.empty()) { + if (unary_reactors_.empty() || sending_buffer_.empty()) { return; } - RAY_CHECK(unary_reactor_ != nullptr && response_ != nullptr); + + RAY_CHECK(!responses_.empty()); size_t message_bytes = 0; auto iter = sending_buffer_.begin(); @@ -231,14 +231,14 @@ void ServerSyncConnection::DoSend() { iter != sending_buffer_.end()) { message_bytes += iter->second->sync_message().size(); // TODO (iycheng): Use arena allocator for optimization - response_->add_sync_messages()->CopyFrom(*iter->second); + responses_.back()->add_sync_messages()->CopyFrom(*iter->second); sending_buffer_.erase(iter++); } - if (response_->sync_messages_size() != 0) { - unary_reactor_->Finish(grpc::Status::OK); - unary_reactor_ = nullptr; - response_ = nullptr; + if (responses_.back()->sync_messages_size() != 0) { + unary_reactors_.back()->Finish(grpc::Status::OK); + responses_.pop_back(); + unary_reactors_.pop_back(); } }