mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[Core] fix gRPC handlers' unlimited active calls configuration (#25626)
Ray's gRPC server wrapper configures a max active call setting for each handler. When the max active call is -1, the handler is supposed to allow handling unlimited number of requests concurrently. However in practice it is often observed that handlers configured with unlimited active calls are still handling at most 100 requests concurrently. This is a result of the existing logic: At a high level, each gRPC method is associated with a number of ServerCall objects (acting as "tags") in the gRPC completion queue. When there is no tag for a method, gRPC server thread will not be able to poll requests from the method call from the completion queue. After a request is polled from the completion queue, it is processed by the polling gRPC server thread, then queued to an eventloop. When a handler is in the "unlimited" mode, it creates when a new ServerCall object (tag) before actual processing. The problem is that new ServerCalls are created on the eventloop instead of the gRPC server thread. When the event loop runs a callback from the gRPC server, the callback creates a new ServerCall object, and can run the gRPC handler to completion if the handler does not have any async step. So overall, the event loop will not run more callbacks than the initial number of ServerCalls, which is 100 in the "unlimited" mode. The solution is to create a new ServerCall in the gRPC server thread, before sending the ServerCall to the eventloop. Running some night tests to verify the fix does not introduce instabilities: https://buildkite.com/ray-project/release-tests-branch/builds/652 Also, looking into adding gRPC server / client stress tests with large number of concurrent requests.
This commit is contained in:
parent
6b9b1f135b
commit
dcfed617e5
2 changed files with 12 additions and 15 deletions
|
@ -107,10 +107,11 @@ void GrpcServer::Run() {
|
|||
// Create calls for all the server call factories.
|
||||
for (auto &entry : server_call_factories_) {
|
||||
for (int i = 0; i < num_threads_; i++) {
|
||||
// Create a buffer of 100 calls for each RPC handler.
|
||||
// TODO(edoakes): a small buffer should be fine and seems to have better
|
||||
// performance, but we don't currently handle backpressure on the client.
|
||||
int buffer_size = 100;
|
||||
// When there is no max active RPC limit, a call will be added to the completetion
|
||||
// queue before RPC processing starts. In this case, the buffer size only
|
||||
// determines the number of tags in the completion queue, instead of the number of
|
||||
// inflight RPCs being processed.
|
||||
int buffer_size = 128;
|
||||
if (entry->GetMaxActiveRPCs() != -1) {
|
||||
buffer_size = entry->GetMaxActiveRPCs();
|
||||
}
|
||||
|
|
|
@ -162,6 +162,12 @@ class ServerCallImpl : public ServerCall {
|
|||
start_time_ = absl::GetCurrentTimeNanos();
|
||||
ray::stats::STATS_grpc_server_req_handling.Record(1.0, call_name_);
|
||||
if (!io_service_.stopped()) {
|
||||
if (factory_.GetMaxActiveRPCs() == -1) {
|
||||
// Create a new `ServerCall` as completion queue tag before handling the request
|
||||
// when no back pressure limit is set, so that new requests can continue to be
|
||||
// pulled from the completion queue before this request is done.
|
||||
factory_.CreateCall();
|
||||
}
|
||||
io_service_.post([this] { HandleRequestImpl(); }, call_name_);
|
||||
} else {
|
||||
// Handle service for rpc call has stopped, we must handle the call here
|
||||
|
@ -173,16 +179,6 @@ class ServerCallImpl : public ServerCall {
|
|||
|
||||
void HandleRequestImpl() {
|
||||
state_ = ServerCallState::PROCESSING;
|
||||
// NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in
|
||||
// a different thread, and will cause `this` to be deleted.
|
||||
const auto &factory = factory_;
|
||||
if (factory.GetMaxActiveRPCs() == -1) {
|
||||
// Create a new `ServerCall` to accept the next incoming request.
|
||||
// We create this before handling the request only when no back pressure limit is
|
||||
// set. So that the it can be populated by the completion queue in the background if
|
||||
// a new request comes in.
|
||||
factory.CreateCall();
|
||||
}
|
||||
(service_handler_.*handle_request_function_)(
|
||||
request_,
|
||||
reply_,
|
||||
|
@ -377,7 +373,7 @@ class ServerCallFactoryImpl : public ServerCallFactory {
|
|||
|
||||
/// Maximum request number to handle at the same time.
|
||||
/// -1 means no limit.
|
||||
uint64_t max_active_rpcs_;
|
||||
int64_t max_active_rpcs_;
|
||||
};
|
||||
|
||||
} // namespace rpc
|
||||
|
|
Loading…
Add table
Reference in a new issue