From 119a303ea04893c0f7f898c6b29d8d89d51c6424 Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Tue, 10 Mar 2020 16:27:02 -0700 Subject: [PATCH] Remove static concurrency limit from gRPC server (#7544) --- src/ray/rpc/gcs_server/gcs_rpc_server.h | 111 ++++++++---------- src/ray/rpc/grpc_server.cc | 9 +- src/ray/rpc/grpc_server.h | 16 +-- .../rpc/node_manager/node_manager_server.h | 17 ++- .../object_manager/object_manager_server.h | 11 +- src/ray/rpc/server_call.h | 9 +- src/ray/rpc/worker/core_worker_server.h | 25 ++-- 7 files changed, 89 insertions(+), 109 deletions(-) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index a6343e2d1..9a131fd22 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -23,31 +23,28 @@ namespace ray { namespace rpc { -#define JOB_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \ - RPC_SERVICE_HANDLER(JobInfoGcsService, HANDLER, CONCURRENCY) +#define JOB_INFO_SERVICE_RPC_HANDLER(HANDLER) \ + RPC_SERVICE_HANDLER(JobInfoGcsService, HANDLER) -#define ACTOR_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \ - RPC_SERVICE_HANDLER(ActorInfoGcsService, HANDLER, CONCURRENCY) +#define ACTOR_INFO_SERVICE_RPC_HANDLER(HANDLER) \ + RPC_SERVICE_HANDLER(ActorInfoGcsService, HANDLER) -#define NODE_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \ - RPC_SERVICE_HANDLER(NodeInfoGcsService, HANDLER, CONCURRENCY) +#define NODE_INFO_SERVICE_RPC_HANDLER(HANDLER) \ + RPC_SERVICE_HANDLER(NodeInfoGcsService, HANDLER) -#define OBJECT_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \ - RPC_SERVICE_HANDLER(ObjectInfoGcsService, HANDLER, CONCURRENCY) +#define OBJECT_INFO_SERVICE_RPC_HANDLER(HANDLER) \ + RPC_SERVICE_HANDLER(ObjectInfoGcsService, HANDLER) -#define TASK_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \ - RPC_SERVICE_HANDLER(TaskInfoGcsService, HANDLER, CONCURRENCY) +#define TASK_INFO_SERVICE_RPC_HANDLER(HANDLER) \ + RPC_SERVICE_HANDLER(TaskInfoGcsService, HANDLER) -#define STATS_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \ - RPC_SERVICE_HANDLER(StatsGcsService, HANDLER, CONCURRENCY) +#define STATS_SERVICE_RPC_HANDLER(HANDLER) RPC_SERVICE_HANDLER(StatsGcsService, HANDLER) -#define ERROR_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \ - RPC_SERVICE_HANDLER(ErrorInfoGcsService, HANDLER, CONCURRENCY) +#define ERROR_INFO_SERVICE_RPC_HANDLER(HANDLER) \ + RPC_SERVICE_HANDLER(ErrorInfoGcsService, HANDLER) -#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \ - RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER, CONCURRENCY) - -#define SERVER_CALL_CONCURRENCY 9999 +#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER) \ + RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER) class JobInfoGcsServiceHandler { public: @@ -76,10 +73,9 @@ class JobInfoGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { - JOB_INFO_SERVICE_RPC_HANDLER(AddJob, SERVER_CALL_CONCURRENCY); - JOB_INFO_SERVICE_RPC_HANDLER(MarkJobFinished, SERVER_CALL_CONCURRENCY); + std::vector> *server_call_factories) override { + JOB_INFO_SERVICE_RPC_HANDLER(AddJob); + JOB_INFO_SERVICE_RPC_HANDLER(MarkJobFinished); } private: @@ -133,14 +129,13 @@ class ActorInfoGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { - ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo, SERVER_CALL_CONCURRENCY); - ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActorInfo, SERVER_CALL_CONCURRENCY); - ACTOR_INFO_SERVICE_RPC_HANDLER(UpdateActorInfo, SERVER_CALL_CONCURRENCY); - ACTOR_INFO_SERVICE_RPC_HANDLER(AddActorCheckpoint, SERVER_CALL_CONCURRENCY); - ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpoint, SERVER_CALL_CONCURRENCY); - ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpointID, SERVER_CALL_CONCURRENCY); + std::vector> *server_call_factories) override { + ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorInfo); + ACTOR_INFO_SERVICE_RPC_HANDLER(RegisterActorInfo); + ACTOR_INFO_SERVICE_RPC_HANDLER(UpdateActorInfo); + ACTOR_INFO_SERVICE_RPC_HANDLER(AddActorCheckpoint); + ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpoint); + ACTOR_INFO_SERVICE_RPC_HANDLER(GetActorCheckpointID); } private: @@ -202,16 +197,15 @@ class NodeInfoGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { - NODE_INFO_SERVICE_RPC_HANDLER(RegisterNode, SERVER_CALL_CONCURRENCY); - NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode, SERVER_CALL_CONCURRENCY); - NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo, SERVER_CALL_CONCURRENCY); - NODE_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat, SERVER_CALL_CONCURRENCY); - NODE_INFO_SERVICE_RPC_HANDLER(ReportBatchHeartbeat, SERVER_CALL_CONCURRENCY); - NODE_INFO_SERVICE_RPC_HANDLER(GetResources, SERVER_CALL_CONCURRENCY); - NODE_INFO_SERVICE_RPC_HANDLER(UpdateResources, SERVER_CALL_CONCURRENCY); - NODE_INFO_SERVICE_RPC_HANDLER(DeleteResources, SERVER_CALL_CONCURRENCY); + std::vector> *server_call_factories) override { + NODE_INFO_SERVICE_RPC_HANDLER(RegisterNode); + NODE_INFO_SERVICE_RPC_HANDLER(UnregisterNode); + NODE_INFO_SERVICE_RPC_HANDLER(GetAllNodeInfo); + NODE_INFO_SERVICE_RPC_HANDLER(ReportHeartbeat); + NODE_INFO_SERVICE_RPC_HANDLER(ReportBatchHeartbeat); + NODE_INFO_SERVICE_RPC_HANDLER(GetResources); + NODE_INFO_SERVICE_RPC_HANDLER(UpdateResources); + NODE_INFO_SERVICE_RPC_HANDLER(DeleteResources); } private: @@ -253,11 +247,10 @@ class ObjectInfoGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { - OBJECT_INFO_SERVICE_RPC_HANDLER(GetObjectLocations, SERVER_CALL_CONCURRENCY); - OBJECT_INFO_SERVICE_RPC_HANDLER(AddObjectLocation, SERVER_CALL_CONCURRENCY); - OBJECT_INFO_SERVICE_RPC_HANDLER(RemoveObjectLocation, SERVER_CALL_CONCURRENCY); + std::vector> *server_call_factories) override { + OBJECT_INFO_SERVICE_RPC_HANDLER(GetObjectLocations); + OBJECT_INFO_SERVICE_RPC_HANDLER(AddObjectLocation); + OBJECT_INFO_SERVICE_RPC_HANDLER(RemoveObjectLocation); } private: @@ -305,13 +298,12 @@ class TaskInfoGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { - TASK_INFO_SERVICE_RPC_HANDLER(AddTask, SERVER_CALL_CONCURRENCY); - TASK_INFO_SERVICE_RPC_HANDLER(GetTask, SERVER_CALL_CONCURRENCY); - TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks, SERVER_CALL_CONCURRENCY); - TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease, SERVER_CALL_CONCURRENCY); - TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction, SERVER_CALL_CONCURRENCY); + std::vector> *server_call_factories) override { + TASK_INFO_SERVICE_RPC_HANDLER(AddTask); + TASK_INFO_SERVICE_RPC_HANDLER(GetTask); + TASK_INFO_SERVICE_RPC_HANDLER(DeleteTasks); + TASK_INFO_SERVICE_RPC_HANDLER(AddTaskLease); + TASK_INFO_SERVICE_RPC_HANDLER(AttemptTaskReconstruction); } private: @@ -345,9 +337,8 @@ class StatsGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { - STATS_SERVICE_RPC_HANDLER(AddProfileData, SERVER_CALL_CONCURRENCY); + std::vector> *server_call_factories) override { + STATS_SERVICE_RPC_HANDLER(AddProfileData); } private: @@ -381,9 +372,8 @@ class ErrorInfoGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { - ERROR_INFO_SERVICE_RPC_HANDLER(ReportJobError, SERVER_CALL_CONCURRENCY); + std::vector> *server_call_factories) override { + ERROR_INFO_SERVICE_RPC_HANDLER(ReportJobError); } private: @@ -417,9 +407,8 @@ class WorkerInfoGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { - WORKER_INFO_SERVICE_RPC_HANDLER(ReportWorkerFailure, SERVER_CALL_CONCURRENCY); + std::vector> *server_call_factories) override { + WORKER_INFO_SERVICE_RPC_HANDLER(ReportWorkerFailure); } private: diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 2db634a4c..7c0e35e26 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -64,10 +64,9 @@ void GrpcServer::Run() { RAY_LOG(INFO) << name_ << " server started, listening on port " << port_ << "."; // Create calls for all the server call factories. - for (auto &entry : server_call_factories_and_concurrencies_) { - for (int i = 0; i < entry.second; i++) { - // Create and request calls from the factory. - entry.first->CreateCall(); + for (auto &entry : server_call_factories_) { + for (int i = 0; i < num_threads_; i++) { + entry->CreateCall(); } } // Start threads that polls incoming requests. @@ -82,7 +81,7 @@ void GrpcServer::RegisterService(GrpcService &service) { services_.emplace_back(service.GetGrpcService()); for (int i = 0; i < num_threads_; i++) { - service.InitServerCallFactories(cqs_[i], &server_call_factories_and_concurrencies_); + service.InitServerCallFactories(cqs_[i], &server_call_factories_); } } diff --git a/src/ray/rpc/grpc_server.h b/src/ray/rpc/grpc_server.h index 6e8c7b77d..68537cae0 100644 --- a/src/ray/rpc/grpc_server.h +++ b/src/ray/rpc/grpc_server.h @@ -27,14 +27,13 @@ namespace ray { namespace rpc { -#define RPC_SERVICE_HANDLER(SERVICE, HANDLER, CONCURRENCY) \ +#define RPC_SERVICE_HANDLER(SERVICE, HANDLER) \ std::unique_ptr HANDLER##_call_factory( \ new ServerCallFactoryImpl( \ service_, &SERVICE::AsyncService::Request##HANDLER, service_handler_, \ &SERVICE##Handler::Handle##HANDLER, cq, main_service_)); \ - server_call_factories_and_concurrencies->emplace_back( \ - std::move(HANDLER##_call_factory), CONCURRENCY); + server_call_factories->emplace_back(std::move(HANDLER##_call_factory)); // Define a void RPC client method. #define DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(METHOD) \ @@ -109,10 +108,8 @@ class GrpcServer { bool is_closed_; /// The `grpc::Service` objects which should be registered to `ServerBuilder`. std::vector> services_; - /// The `ServerCallFactory` objects, and the maximum number of concurrent requests that - /// this gRPC server can handle. - std::vector, int>> - server_call_factories_and_concurrencies_; + /// The `ServerCallFactory` objects. + std::vector> server_call_factories_; /// The number of completion queues the server is polling from. int num_threads_; /// The `ServerCompletionQueue` object used for polling events. @@ -149,12 +146,11 @@ class GrpcService { /// server can handle. /// /// \param[in] cq The grpc completion queue. - /// \param[out] server_call_factories_and_concurrencies The `ServerCallFactory` objects, + /// \param[out] server_call_factories The `ServerCallFactory` objects, /// and the maximum number of concurrent requests that this gRPC server can handle. virtual void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) = 0; + std::vector> *server_call_factories) = 0; /// The main event loop, to which the service handler functions will be posted. boost::asio::io_service &main_service_; diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index 504ee762d..068b9f6a8 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -24,13 +24,13 @@ namespace ray { namespace rpc { /// NOTE: See src/ray/core_worker/core_worker.h on how to add a new grpc handler. -#define RAY_NODE_MANAGER_RPC_HANDLERS \ - RPC_SERVICE_HANDLER(NodeManagerService, RequestWorkerLease, 100) \ - RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker, 100) \ - RPC_SERVICE_HANDLER(NodeManagerService, ForwardTask, 100) \ - RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs, 100) \ - RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats, 1) \ - RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC, 1) +#define RAY_NODE_MANAGER_RPC_HANDLERS \ + RPC_SERVICE_HANDLER(NodeManagerService, RequestWorkerLease) \ + RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker) \ + RPC_SERVICE_HANDLER(NodeManagerService, ForwardTask) \ + RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs) \ + RPC_SERVICE_HANDLER(NodeManagerService, GetNodeStats) \ + RPC_SERVICE_HANDLER(NodeManagerService, GlobalGC) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. class NodeManagerServiceHandler { @@ -86,8 +86,7 @@ class NodeManagerGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { + std::vector> *server_call_factories) override { RAY_NODE_MANAGER_RPC_HANDLERS } diff --git a/src/ray/rpc/object_manager/object_manager_server.h b/src/ray/rpc/object_manager/object_manager_server.h index 6164f23f4..f5ede7c64 100644 --- a/src/ray/rpc/object_manager/object_manager_server.h +++ b/src/ray/rpc/object_manager/object_manager_server.h @@ -24,10 +24,10 @@ namespace ray { namespace rpc { -#define RAY_OBJECT_MANAGER_RPC_HANDLERS \ - RPC_SERVICE_HANDLER(ObjectManagerService, Push, 5) \ - RPC_SERVICE_HANDLER(ObjectManagerService, Pull, 5) \ - RPC_SERVICE_HANDLER(ObjectManagerService, FreeObjects, 2) +#define RAY_OBJECT_MANAGER_RPC_HANDLERS \ + RPC_SERVICE_HANDLER(ObjectManagerService, Push) \ + RPC_SERVICE_HANDLER(ObjectManagerService, Pull) \ + RPC_SERVICE_HANDLER(ObjectManagerService, FreeObjects) /// Implementations of the `ObjectManagerGrpcService`, check interface in /// `src/ray/protobuf/object_manager.proto`. @@ -67,8 +67,7 @@ class ObjectManagerGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { + std::vector> *server_call_factories) override { RAY_OBJECT_MANAGER_RPC_HANDLERS } diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 63aa522b3..36f3354cf 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -92,8 +92,6 @@ class ServerCallFactory { public: /// Create a new `ServerCall` and request gRPC runtime to start accepting the /// corresponding type of requests. - /// - /// \return Pointer to the `ServerCall` object. virtual void CreateCall() const = 0; virtual ~ServerCallFactory() = default; @@ -155,6 +153,10 @@ class ServerCallImpl : public ServerCall { // NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in // a different thread, and will cause `this` to be deleted. const auto &factory = factory_; + // Create a new `ServerCall` to accept the next incoming request. + // We create this before handling the request so that the it can be populated by + // the completion queue in the background if a new request comes in. + factory.CreateCall(); (service_handler_.*handle_request_function_)( request_, &reply_, [this](Status status, std::function success, @@ -169,9 +171,6 @@ class ServerCallImpl : public ServerCall { // this server call might be deleted SendReply(status); }); - // We've finished handling this request, - // create a new `ServerCall` to accept the next incoming request. - factory.CreateCall(); } void OnReplySent() override { diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index 9202957ae..7ea42f8d0 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -28,17 +28,17 @@ class CoreWorker; namespace rpc { /// NOTE: See src/ray/core_worker/core_worker.h on how to add a new grpc handler. -#define RAY_CORE_WORKER_RPC_HANDLERS \ - RPC_SERVICE_HANDLER(CoreWorkerService, AssignTask, 5) \ - RPC_SERVICE_HANDLER(CoreWorkerService, PushTask, 9999) \ - RPC_SERVICE_HANDLER(CoreWorkerService, DirectActorCallArgWaitComplete, 100) \ - RPC_SERVICE_HANDLER(CoreWorkerService, GetObjectStatus, 9999) \ - RPC_SERVICE_HANDLER(CoreWorkerService, WaitForObjectEviction, 9999) \ - RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved, 9999) \ - RPC_SERVICE_HANDLER(CoreWorkerService, KillActor, 9999) \ - RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats, 100) \ - RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC, 100) \ - RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady, 9999) +#define RAY_CORE_WORKER_RPC_HANDLERS \ + RPC_SERVICE_HANDLER(CoreWorkerService, AssignTask) \ + RPC_SERVICE_HANDLER(CoreWorkerService, PushTask) \ + RPC_SERVICE_HANDLER(CoreWorkerService, DirectActorCallArgWaitComplete) \ + RPC_SERVICE_HANDLER(CoreWorkerService, GetObjectStatus) \ + RPC_SERVICE_HANDLER(CoreWorkerService, WaitForObjectEviction) \ + RPC_SERVICE_HANDLER(CoreWorkerService, WaitForRefRemoved) \ + RPC_SERVICE_HANDLER(CoreWorkerService, KillActor) \ + RPC_SERVICE_HANDLER(CoreWorkerService, GetCoreWorkerStats) \ + RPC_SERVICE_HANDLER(CoreWorkerService, LocalGC) \ + RPC_SERVICE_HANDLER(CoreWorkerService, PlasmaObjectReady) #define RAY_CORE_WORKER_DECLARE_RPC_HANDLERS \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(AssignTask) \ @@ -84,8 +84,7 @@ class CoreWorkerGrpcService : public GrpcService { void InitServerCallFactories( const std::unique_ptr &cq, - std::vector, int>> - *server_call_factories_and_concurrencies) override { + std::vector> *server_call_factories) override { RAY_CORE_WORKER_RPC_HANDLERS }