Remove static concurrency limit from gRPC server (#7544)

This commit is contained in:
Edward Oakes 2020-03-10 16:27:02 -07:00 committed by GitHub
parent dbbf0c0e70
commit 119a303ea0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 89 additions and 109 deletions

View file

@ -23,31 +23,28 @@
namespace ray {
namespace rpc {
#define JOB_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(JobInfoGcsService, HANDLER, CONCURRENCY)
#define JOB_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(JobInfoGcsService, HANDLER)
#define ACTOR_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(ActorInfoGcsService, HANDLER, CONCURRENCY)
#define ACTOR_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(ActorInfoGcsService, HANDLER)
#define NODE_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(NodeInfoGcsService, HANDLER, CONCURRENCY)
#define NODE_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(NodeInfoGcsService, HANDLER)
#define OBJECT_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(ObjectInfoGcsService, HANDLER, CONCURRENCY)
#define OBJECT_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(ObjectInfoGcsService, HANDLER)
#define TASK_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(TaskInfoGcsService, HANDLER, CONCURRENCY)
#define TASK_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(TaskInfoGcsService, HANDLER)
#define STATS_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(StatsGcsService, HANDLER, CONCURRENCY)
#define STATS_SERVICE_RPC_HANDLER(HANDLER) RPC_SERVICE_HANDLER(StatsGcsService, HANDLER)
#define ERROR_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(ErrorInfoGcsService, HANDLER, CONCURRENCY)
#define ERROR_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(ErrorInfoGcsService, HANDLER)
#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER, CONCURRENCY)
#define SERVER_CALL_CONCURRENCY 9999
#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER)
class JobInfoGcsServiceHandler {
public:
@ -76,10 +73,9 @@ class JobInfoGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
JOB_INFO_SERVICE_RPC_HANDLER(AddJob, SERVER_CALL_CONCURRENCY);
JOB_INFO_SERVICE_RPC_HANDLER(MarkJobFinished, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
JOB_INFO_SERVICE_RPC_HANDLER(AddJob);
JOB_INFO_SERVICE_RPC_HANDLER(MarkJobFinished);
}
private:
@ -133,14 +129,13 @@ class ActorInfoGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActorInfo, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(UpdateActorInfo, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(AddActorCheckpoint, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpoint, SERVER_CALL_CONCURRENCY);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpointID, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(UpdateActorInfo);
ACTOR_INFO_SERVICE_RPC_HANDLER(AddActorCheckpoint);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpoint);
ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpointID);
}
private:
@ -202,16 +197,15 @@ class NodeInfoGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
NODE_INFO_SERVICE_RPC_HANDLER(RegisterNode, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(ReportBatchHeartbeat, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(GetResources, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(UpdateResources, SERVER_CALL_CONCURRENCY);
NODE_INFO_SERVICE_RPC_HANDLER(DeleteResources, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
NODE_INFO_SERVICE_RPC_HANDLER(RegisterNode);
NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode);
NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo);
NODE_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat);
NODE_INFO_SERVICE_RPC_HANDLER(ReportBatchHeartbeat);
NODE_INFO_SERVICE_RPC_HANDLER(GetResources);
NODE_INFO_SERVICE_RPC_HANDLER(UpdateResources);
NODE_INFO_SERVICE_RPC_HANDLER(DeleteResources);
}
private:
@ -253,11 +247,10 @@ class ObjectInfoGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
OBJECT_INFO_SERVICE_RPC_HANDLER(GetObjectLocations, SERVER_CALL_CONCURRENCY);
OBJECT_INFO_SERVICE_RPC_HANDLER(AddObjectLocation, SERVER_CALL_CONCURRENCY);
OBJECT_INFO_SERVICE_RPC_HANDLER(RemoveObjectLocation, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
OBJECT_INFO_SERVICE_RPC_HANDLER(GetObjectLocations);
OBJECT_INFO_SERVICE_RPC_HANDLER(AddObjectLocation);
OBJECT_INFO_SERVICE_RPC_HANDLER(RemoveObjectLocation);
}
private:
@ -305,13 +298,12 @@ class TaskInfoGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
TASK_INFO_SERVICE_RPC_HANDLER(AddTask, SERVER_CALL_CONCURRENCY);
TASK_INFO_SERVICE_RPC_HANDLER(GetTask, SERVER_CALL_CONCURRENCY);
TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks, SERVER_CALL_CONCURRENCY);
TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease, SERVER_CALL_CONCURRENCY);
TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
TASK_INFO_SERVICE_RPC_HANDLER(AddTask);
TASK_INFO_SERVICE_RPC_HANDLER(GetTask);
TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks);
TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease);
TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction);
}
private:
@ -345,9 +337,8 @@ class StatsGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
STATS_SERVICE_RPC_HANDLER(AddProfileData, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
STATS_SERVICE_RPC_HANDLER(AddProfileData);
}
private:
@ -381,9 +372,8 @@ class ErrorInfoGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
ERROR_INFO_SERVICE_RPC_HANDLER(ReportJobError, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
ERROR_INFO_SERVICE_RPC_HANDLER(ReportJobError);
}
private:
@ -417,9 +407,8 @@ class WorkerInfoGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
WORKER_INFO_SERVICE_RPC_HANDLER(ReportWorkerFailure, SERVER_CALL_CONCURRENCY);
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
WORKER_INFO_SERVICE_RPC_HANDLER(ReportWorkerFailure);
}
private:

View file

@ -64,10 +64,9 @@ void GrpcServer::Run() {
RAY_LOG(INFO) << name_ << " server started, listening on port " << port_ << ".";
// Create calls for all the server call factories.
for (auto &entry : server_call_factories_and_concurrencies_) {
for (int i = 0; i < entry.second; i++) {
// Create and request calls from the factory.
entry.first->CreateCall();
for (auto &entry : server_call_factories_) {
for (int i = 0; i < num_threads_; i++) {
entry->CreateCall();
}
}
// Start threads that polls incoming requests.
@ -82,7 +81,7 @@ void GrpcServer::RegisterService(GrpcService &service) {
services_.emplace_back(service.GetGrpcService());
for (int i = 0; i < num_threads_; i++) {
service.InitServerCallFactories(cqs_[i], &server_call_factories_and_concurrencies_);
service.InitServerCallFactories(cqs_[i], &server_call_factories_);
}
}

View file

@ -27,14 +27,13 @@
namespace ray {
namespace rpc {
#define RPC_SERVICE_HANDLER(SERVICE, HANDLER, CONCURRENCY) \
#define RPC_SERVICE_HANDLER(SERVICE, HANDLER) \
std::unique_ptr<ServerCallFactory> HANDLER##_call_factory( \
new ServerCallFactoryImpl<SERVICE, SERVICE##Handler, HANDLER##Request, \
HANDLER##Reply>( \
service_, &SERVICE::AsyncService::Request##HANDLER, service_handler_, \
&SERVICE##Handler::Handle##HANDLER, cq, main_service_)); \
server_call_factories_and_concurrencies->emplace_back( \
std::move(HANDLER##_call_factory), CONCURRENCY);
server_call_factories->emplace_back(std::move(HANDLER##_call_factory));
// Define a void RPC client method.
#define DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(METHOD) \
@ -109,10 +108,8 @@ class GrpcServer {
bool is_closed_;
/// The `grpc::Service` objects which should be registered to `ServerBuilder`.
std::vector<std::reference_wrapper<grpc::Service>> services_;
/// The `ServerCallFactory` objects, and the maximum number of concurrent requests that
/// this gRPC server can handle.
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
server_call_factories_and_concurrencies_;
/// The `ServerCallFactory` objects.
std::vector<std::unique_ptr<ServerCallFactory>> server_call_factories_;
/// The number of completion queues the server is polling from.
int num_threads_;
/// The `ServerCompletionQueue` object used for polling events.
@ -149,12 +146,11 @@ class GrpcService {
/// server can handle.
///
/// \param[in] cq The grpc completion queue.
/// \param[out] server_call_factories_and_concurrencies The `ServerCallFactory` objects,
/// \param[out] server_call_factories The `ServerCallFactory` objects,
/// and the maximum number of concurrent requests that this gRPC server can handle.
virtual void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) = 0;
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) = 0;
/// The main event loop, to which the service handler functions will be posted.
boost::asio::io_service &main_service_;

View file

@ -25,12 +25,12 @@ namespace rpc {
/// NOTE: See src/ray/core_worker/core_worker.h on how to add a new grpc handler.
#define RAY_NODE_MANAGER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(NodeManagerService, RequestWorkerLease, 100) \
RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker, 100) \
RPC_SERVICE_HANDLER(NodeManagerService, ForwardTask, 100) \
RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs, 100) \
RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats, 1) \
RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC, 1)
RPC_SERVICE_HANDLER(NodeManagerService, RequestWorkerLease) \
RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker) \
RPC_SERVICE_HANDLER(NodeManagerService, ForwardTask) \
RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs) \
RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats) \
RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC)
/// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`.
class NodeManagerServiceHandler {
@ -86,8 +86,7 @@ class NodeManagerGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
RAY_NODE_MANAGER_RPC_HANDLERS
}

View file

@ -25,9 +25,9 @@ namespace ray {
namespace rpc {
#define RAY_OBJECT_MANAGER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(ObjectManagerService, Push, 5) \
RPC_SERVICE_HANDLER(ObjectManagerService, Pull, 5) \
RPC_SERVICE_HANDLER(ObjectManagerService, FreeObjects, 2)
RPC_SERVICE_HANDLER(ObjectManagerService, Push) \
RPC_SERVICE_HANDLER(ObjectManagerService, Pull) \
RPC_SERVICE_HANDLER(ObjectManagerService, FreeObjects)
/// Implementations of the `ObjectManagerGrpcService`, check interface in
/// `src/ray/protobuf/object_manager.proto`.
@ -67,8 +67,7 @@ class ObjectManagerGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
RAY_OBJECT_MANAGER_RPC_HANDLERS
}

View file

@ -92,8 +92,6 @@ class ServerCallFactory {
public:
/// Create a new `ServerCall` and request gRPC runtime to start accepting the
/// corresponding type of requests.
///
/// \return Pointer to the `ServerCall` object.
virtual void CreateCall() const = 0;
virtual ~ServerCallFactory() = default;
@ -155,6 +153,10 @@ class ServerCallImpl : public ServerCall {
// NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in
// a different thread, and will cause `this` to be deleted.
const auto &factory = factory_;
// Create a new `ServerCall` to accept the next incoming request.
// We create this before handling the request so that the it can be populated by
// the completion queue in the background if a new request comes in.
factory.CreateCall();
(service_handler_.*handle_request_function_)(
request_, &reply_,
[this](Status status, std::function<void()> success,
@ -169,9 +171,6 @@ class ServerCallImpl : public ServerCall {
// this server call might be deleted
SendReply(status);
});
// We've finished handling this request,
// create a new `ServerCall` to accept the next incoming request.
factory.CreateCall();
}
void OnReplySent() override {

View file

@ -29,16 +29,16 @@ namespace rpc {
/// NOTE: See src/ray/core_worker/core_worker.h on how to add a new grpc handler.
#define RAY_CORE_WORKER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(CoreWorkerService, AssignTask, 5) \
RPC_SERVICE_HANDLER(CoreWorkerService, PushTask, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, DirectActorCallArgWaitComplete, 100) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetObjectStatus, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForObjectEviction, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, KillActor, 9999) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats, 100) \
RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC, 100) \
RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady, 9999)
RPC_SERVICE_HANDLER(CoreWorkerService, AssignTask) \
RPC_SERVICE_HANDLER(CoreWorkerService, PushTask) \
RPC_SERVICE_HANDLER(CoreWorkerService, DirectActorCallArgWaitComplete) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetObjectStatus) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForObjectEviction) \
RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved) \
RPC_SERVICE_HANDLER(CoreWorkerService, KillActor) \
RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats) \
RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC) \
RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady)
#define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignTask) \
@ -84,8 +84,7 @@ class CoreWorkerGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
RAY_CORE_WORKER_RPC_HANDLERS
}