[core][syncer][6.1] Buffering long-polling for efficiency. (#24884)

Compared with pushing based model, long-polling is slower because to send a message, you need to wait until you receive the polling requests. This PR improves this by sending X polling requests so that at most there can be 10 requests flying in the middle and this can improve the perf. Tested with `many_nodes_actor_tests` and no regression:

```
Actor launch time: 1.6540390349998688 (5000 actors)
Actor ready time: 13.953653221999957 (5000 actors)
Total time: 15.607692256999826 (5000 actors)
```

1. [X]  Deprecate old code path for publish node resource change
2. [X]  Move poller and broadcaster into RaySyncer
3. [X]  Deprecate the old pg reporting code path
4. [X]  Remove syncer from gcs cluster resource manager and encapsulate everything in syncer module.
5. [X]  Versioning the report
6. [ ]  Introduce Reporter/Receiver API in the prototype and adaptor rayler and gcs with that.
7. [ ]  Experiment with protocol & communication layer change.
This commit is contained in:
Yi Cheng 2022-05-20 18:26:05 -07:00 committed by GitHub
parent e3f854e34d
commit b95c50caa2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 25 deletions

View file

@ -373,6 +373,10 @@ RAY_CONFIG(int32_t, gcs_client_check_connection_status_interval_milliseconds, 10
/// Feature flag to use the ray syncer for resource synchronization
RAY_CONFIG(bool, use_ray_syncer, false)
/// The queuing buffer of ray syncer. This indicates how many concurrent
/// requests can run in flight for syncing.
RAY_CONFIG(int64_t, ray_syncer_polling_buffer, 5)
/// The interval at which the gcs client will check if the address of gcs service has
/// changed. When the address changed, we will resubscribe again.
RAY_CONFIG(uint64_t, gcs_service_address_check_interval_milliseconds, 1000)

View file

@ -162,8 +162,8 @@ class ServerSyncConnection : public NodeSyncConnection {
/// After the message being sent, these two fields will be set to be empty again.
/// When the periodical timer wake up, it'll check whether these two fields are set
/// and it'll only send data when these are set.
RaySyncMessages *response_ = nullptr;
grpc::ServerUnaryReactor *unary_reactor_ = nullptr;
std::vector<RaySyncMessages *> responses_;
std::vector<grpc::ServerUnaryReactor *> unary_reactors_;
};
/// SyncConnection for gRPC client side. It has customized logic for sending.
@ -186,9 +186,6 @@ class ClientSyncConnection : public NodeSyncConnection {
/// Stub for this connection.
std::unique_ptr<ray::rpc::syncer::RaySyncer::Stub> stub_;
/// Where the received message is stored.
ray::rpc::syncer::RaySyncMessages in_message_;
/// Dummy request for long-polling.
DummyRequest dummy_;
};

View file

@ -131,7 +131,9 @@ ClientSyncConnection::ClientSyncConnection(
std::shared_ptr<grpc::Channel> channel)
: NodeSyncConnection(io_context, node_id, std::move(message_processor)),
stub_(ray::rpc::syncer::RaySyncer::NewStub(channel)) {
StartLongPolling();
for (int64_t i = 0; i < RayConfig::instance().ray_syncer_polling_buffer(); ++i) {
StartLongPolling();
}
}
void ClientSyncConnection::StartLongPolling() {
@ -139,19 +141,18 @@ void ClientSyncConnection::StartLongPolling() {
// 1. there is a new version of message
// 2. and it has passed X ms since last update.
auto client_context = std::make_shared<grpc::ClientContext>();
auto in_message = std::make_shared<ray::rpc::syncer::RaySyncMessages>();
stub_->async()->LongPolling(
client_context.get(),
&dummy_,
&in_message_,
[this, client_context](grpc::Status status) {
in_message.get(),
[this, client_context, in_message](grpc::Status status) mutable {
if (status.ok()) {
RAY_CHECK(in_message_.GetArena() == nullptr);
io_context_.dispatch(
[this, messages = std::move(in_message_)]() mutable {
[this, messages = std::move(*in_message)]() mutable {
ReceiveUpdate(std::move(messages));
},
"LongPollingCallback");
in_message_.Clear();
// Start the next polling.
StartLongPolling();
}
@ -204,26 +205,25 @@ ServerSyncConnection::ServerSyncConnection(
ServerSyncConnection::~ServerSyncConnection() {
// If there is a pending request, we need to cancel it. Otherwise, rpc will
// hang there forever.
if (unary_reactor_ != nullptr) {
unary_reactor_->Finish(grpc::Status::CANCELLED);
while (!unary_reactors_.empty()) {
unary_reactors_.back()->Finish(grpc::Status::CANCELLED);
unary_reactors_.pop_back();
}
}
void ServerSyncConnection::HandleLongPollingRequest(grpc::ServerUnaryReactor *reactor,
RaySyncMessages *response) {
RAY_CHECK(response_ == nullptr);
RAY_CHECK(unary_reactor_ == nullptr);
unary_reactor_ = reactor;
response_ = response;
unary_reactors_.push_back(reactor);
responses_.push_back(response);
}
void ServerSyncConnection::DoSend() {
// There is no receive request
if (unary_reactor_ == nullptr || sending_buffer_.empty()) {
if (unary_reactors_.empty() || sending_buffer_.empty()) {
return;
}
RAY_CHECK(unary_reactor_ != nullptr && response_ != nullptr);
RAY_CHECK(!responses_.empty());
size_t message_bytes = 0;
auto iter = sending_buffer_.begin();
@ -231,14 +231,14 @@ void ServerSyncConnection::DoSend() {
iter != sending_buffer_.end()) {
message_bytes += iter->second->sync_message().size();
// TODO (iycheng): Use arena allocator for optimization
response_->add_sync_messages()->CopyFrom(*iter->second);
responses_.back()->add_sync_messages()->CopyFrom(*iter->second);
sending_buffer_.erase(iter++);
}
if (response_->sync_messages_size() != 0) {
unary_reactor_->Finish(grpc::Status::OK);
unary_reactor_ = nullptr;
response_ = nullptr;
if (responses_.back()->sync_messages_size() != 0) {
unary_reactors_.back()->Finish(grpc::Status::OK);
responses_.pop_back();
unary_reactors_.pop_back();
}
}