mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
parent
5b44afe9c1
commit
ccabba88ae
2 changed files with 15 additions and 12 deletions
|
@ -107,11 +107,10 @@ void GrpcServer::Run() {
|
||||||
// Create calls for all the server call factories.
|
// Create calls for all the server call factories.
|
||||||
for (auto &entry : server_call_factories_) {
|
for (auto &entry : server_call_factories_) {
|
||||||
for (int i = 0; i < num_threads_; i++) {
|
for (int i = 0; i < num_threads_; i++) {
|
||||||
// When there is no max active RPC limit, a call will be added to the completetion
|
// Create a buffer of 100 calls for each RPC handler.
|
||||||
// queue before RPC processing starts. In this case, the buffer size only
|
// TODO(edoakes): a small buffer should be fine and seems to have better
|
||||||
// determines the number of tags in the completion queue, instead of the number of
|
// performance, but we don't currently handle backpressure on the client.
|
||||||
// inflight RPCs being processed.
|
int buffer_size = 100;
|
||||||
int buffer_size = 128;
|
|
||||||
if (entry->GetMaxActiveRPCs() != -1) {
|
if (entry->GetMaxActiveRPCs() != -1) {
|
||||||
buffer_size = entry->GetMaxActiveRPCs();
|
buffer_size = entry->GetMaxActiveRPCs();
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,12 +162,6 @@ class ServerCallImpl : public ServerCall {
|
||||||
start_time_ = absl::GetCurrentTimeNanos();
|
start_time_ = absl::GetCurrentTimeNanos();
|
||||||
ray::stats::STATS_grpc_server_req_handling.Record(1.0, call_name_);
|
ray::stats::STATS_grpc_server_req_handling.Record(1.0, call_name_);
|
||||||
if (!io_service_.stopped()) {
|
if (!io_service_.stopped()) {
|
||||||
if (factory_.GetMaxActiveRPCs() == -1) {
|
|
||||||
// Create a new `ServerCall` as completion queue tag before handling the request
|
|
||||||
// when no back pressure limit is set, so that new requests can continue to be
|
|
||||||
// pulled from the completion queue before this request is done.
|
|
||||||
factory_.CreateCall();
|
|
||||||
}
|
|
||||||
io_service_.post([this] { HandleRequestImpl(); }, call_name_);
|
io_service_.post([this] { HandleRequestImpl(); }, call_name_);
|
||||||
} else {
|
} else {
|
||||||
// Handle service for rpc call has stopped, we must handle the call here
|
// Handle service for rpc call has stopped, we must handle the call here
|
||||||
|
@ -179,6 +173,16 @@ class ServerCallImpl : public ServerCall {
|
||||||
|
|
||||||
void HandleRequestImpl() {
|
void HandleRequestImpl() {
|
||||||
state_ = ServerCallState::PROCESSING;
|
state_ = ServerCallState::PROCESSING;
|
||||||
|
// NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in
|
||||||
|
// a different thread, and will cause `this` to be deleted.
|
||||||
|
const auto &factory = factory_;
|
||||||
|
if (factory.GetMaxActiveRPCs() == -1) {
|
||||||
|
// Create a new `ServerCall` to accept the next incoming request.
|
||||||
|
// We create this before handling the request only when no back pressure limit is
|
||||||
|
// set. So that the it can be populated by the completion queue in the background if
|
||||||
|
// a new request comes in.
|
||||||
|
factory.CreateCall();
|
||||||
|
}
|
||||||
(service_handler_.*handle_request_function_)(
|
(service_handler_.*handle_request_function_)(
|
||||||
request_,
|
request_,
|
||||||
reply_,
|
reply_,
|
||||||
|
@ -373,7 +377,7 @@ class ServerCallFactoryImpl : public ServerCallFactory {
|
||||||
|
|
||||||
/// Maximum request number to handle at the same time.
|
/// Maximum request number to handle at the same time.
|
||||||
/// -1 means no limit.
|
/// -1 means no limit.
|
||||||
int64_t max_active_rpcs_;
|
uint64_t max_active_rpcs_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rpc
|
} // namespace rpc
|
||||||
|
|
Loading…
Add table
Reference in a new issue