From dcfed617e5d08cca55743b772c3682bc2412e15a Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Fri, 10 Jun 2022 11:28:41 -0700 Subject: [PATCH] [Core] fix gRPC handlers' unlimited active calls configuration (#25626) Ray's gRPC server wrapper configures a max active call setting for each handler. When the max active call is -1, the handler is supposed to allow handling unlimited number of requests concurrently. However in practice it is often observed that handlers configured with unlimited active calls are still handling at most 100 requests concurrently. This is a result of the existing logic: At a high level, each gRPC method is associated with a number of ServerCall objects (acting as "tags") in the gRPC completion queue. When there is no tag for a method, gRPC server thread will not be able to poll requests from the method call from the completion queue. After a request is polled from the completion queue, it is processed by the polling gRPC server thread, then queued to an eventloop. When a handler is in the "unlimited" mode, it creates when a new ServerCall object (tag) before actual processing. The problem is that new ServerCalls are created on the eventloop instead of the gRPC server thread. When the event loop runs a callback from the gRPC server, the callback creates a new ServerCall object, and can run the gRPC handler to completion if the handler does not have any async step. So overall, the event loop will not run more callbacks than the initial number of ServerCalls, which is 100 in the "unlimited" mode. The solution is to create a new ServerCall in the gRPC server thread, before sending the ServerCall to the eventloop. Running some night tests to verify the fix does not introduce instabilities: https://buildkite.com/ray-project/release-tests-branch/builds/652 Also, looking into adding gRPC server / client stress tests with large number of concurrent requests. --- src/ray/rpc/grpc_server.cc | 9 +++++---- src/ray/rpc/server_call.h | 18 +++++++----------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index c55422569..522ee317c 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -107,10 +107,11 @@ void GrpcServer::Run() { // Create calls for all the server call factories. for (auto &entry : server_call_factories_) { for (int i = 0; i < num_threads_; i++) { - // Create a buffer of 100 calls for each RPC handler. - // TODO(edoakes): a small buffer should be fine and seems to have better - // performance, but we don't currently handle backpressure on the client. - int buffer_size = 100; + // When there is no max active RPC limit, a call will be added to the completetion + // queue before RPC processing starts. In this case, the buffer size only + // determines the number of tags in the completion queue, instead of the number of + // inflight RPCs being processed. + int buffer_size = 128; if (entry->GetMaxActiveRPCs() != -1) { buffer_size = entry->GetMaxActiveRPCs(); } diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 7b42c5ac8..5c7e6e576 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -162,6 +162,12 @@ class ServerCallImpl : public ServerCall { start_time_ = absl::GetCurrentTimeNanos(); ray::stats::STATS_grpc_server_req_handling.Record(1.0, call_name_); if (!io_service_.stopped()) { + if (factory_.GetMaxActiveRPCs() == -1) { + // Create a new `ServerCall` as completion queue tag before handling the request + // when no back pressure limit is set, so that new requests can continue to be + // pulled from the completion queue before this request is done. + factory_.CreateCall(); + } io_service_.post([this] { HandleRequestImpl(); }, call_name_); } else { // Handle service for rpc call has stopped, we must handle the call here @@ -173,16 +179,6 @@ class ServerCallImpl : public ServerCall { void HandleRequestImpl() { state_ = ServerCallState::PROCESSING; - // NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in - // a different thread, and will cause `this` to be deleted. - const auto &factory = factory_; - if (factory.GetMaxActiveRPCs() == -1) { - // Create a new `ServerCall` to accept the next incoming request. - // We create this before handling the request only when no back pressure limit is - // set. So that the it can be populated by the completion queue in the background if - // a new request comes in. - factory.CreateCall(); - } (service_handler_.*handle_request_function_)( request_, reply_, @@ -377,7 +373,7 @@ class ServerCallFactoryImpl : public ServerCallFactory { /// Maximum request number to handle at the same time. /// -1 means no limit. - uint64_t max_active_rpcs_; + int64_t max_active_rpcs_; }; } // namespace rpc