mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[Core] Set keepalive only at gcs (#18086)
This commit is contained in:
parent
56089ae926
commit
a25cc47399
4 changed files with 21 additions and 6 deletions
|
@ -449,6 +449,11 @@ RAY_CONFIG(bool, worker_resource_limits_enabled, false)
|
||||||
RAY_CONFIG(int64_t, gcs_max_active_rpcs_per_handler, 100)
|
RAY_CONFIG(int64_t, gcs_max_active_rpcs_per_handler, 100)
|
||||||
|
|
||||||
/// grpc keepalive sent interval
|
/// grpc keepalive sent interval
|
||||||
|
/// This is only configured in GCS server now.
|
||||||
|
/// NOTE: It is not ideal for other components because
|
||||||
|
/// they have a failure model that considers network failures as component failures
|
||||||
|
/// and this configuration break that assumption. We should apply to every other component
|
||||||
|
/// after we change this failure assumption from code.
|
||||||
RAY_CONFIG(int64_t, grpc_keepalive_time_ms, 10000);
|
RAY_CONFIG(int64_t, grpc_keepalive_time_ms, 10000);
|
||||||
|
|
||||||
/// grpc keepalive timeout
|
/// grpc keepalive timeout
|
||||||
|
|
|
@ -35,7 +35,8 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
|
||||||
: config_(config),
|
: config_(config),
|
||||||
main_service_(main_service),
|
main_service_(main_service),
|
||||||
rpc_server_(config.grpc_server_name, config.grpc_server_port,
|
rpc_server_(config.grpc_server_name, config.grpc_server_port,
|
||||||
config.grpc_server_thread_num),
|
config.grpc_server_thread_num,
|
||||||
|
/*keepalive_time_ms=*/RayConfig::instance().grpc_keepalive_time_ms()),
|
||||||
client_call_manager_(main_service),
|
client_call_manager_(main_service),
|
||||||
raylet_client_pool_(
|
raylet_client_pool_(
|
||||||
std::make_shared<rpc::NodeManagerClientPool>(client_call_manager_)),
|
std::make_shared<rpc::NodeManagerClientPool>(client_call_manager_)),
|
||||||
|
|
|
@ -35,8 +35,13 @@ DEFINE_stats(grpc_server_req_finished, "Finished request number in grpc server",
|
||||||
namespace ray {
|
namespace ray {
|
||||||
namespace rpc {
|
namespace rpc {
|
||||||
|
|
||||||
GrpcServer::GrpcServer(std::string name, const uint32_t port, int num_threads)
|
GrpcServer::GrpcServer(std::string name, const uint32_t port, int num_threads,
|
||||||
: name_(std::move(name)), port_(port), is_closed_(true), num_threads_(num_threads) {
|
int64_t keepalive_time_ms)
|
||||||
|
: name_(std::move(name)),
|
||||||
|
port_(port),
|
||||||
|
is_closed_(true),
|
||||||
|
num_threads_(num_threads),
|
||||||
|
keepalive_time_ms_(keepalive_time_ms) {
|
||||||
cqs_.resize(num_threads_);
|
cqs_.resize(num_threads_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,8 +57,7 @@ void GrpcServer::Run() {
|
||||||
RayConfig::instance().max_grpc_message_size());
|
RayConfig::instance().max_grpc_message_size());
|
||||||
builder.AddChannelArgument(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
|
builder.AddChannelArgument(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
|
||||||
RayConfig::instance().max_grpc_message_size());
|
RayConfig::instance().max_grpc_message_size());
|
||||||
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS,
|
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, keepalive_time_ms_);
|
||||||
RayConfig::instance().grpc_keepalive_time_ms());
|
|
||||||
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,
|
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,
|
||||||
RayConfig::instance().grpc_keepalive_timeout_ms());
|
RayConfig::instance().grpc_keepalive_timeout_ms());
|
||||||
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0);
|
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0);
|
||||||
|
|
|
@ -61,7 +61,8 @@ class GrpcServer {
|
||||||
/// \param[in] name Name of this server, used for logging and debugging purpose.
|
/// \param[in] name Name of this server, used for logging and debugging purpose.
|
||||||
/// \param[in] port The port to bind this server to. If it's 0, a random available port
|
/// \param[in] port The port to bind this server to. If it's 0, a random available port
|
||||||
/// will be chosen.
|
/// will be chosen.
|
||||||
GrpcServer(std::string name, const uint32_t port, int num_threads = 1);
|
GrpcServer(std::string name, const uint32_t port, int num_threads = 1,
|
||||||
|
int64_t keepalive_time_ms = 7200000 /*2 hours, grpc default*/);
|
||||||
|
|
||||||
/// Destruct this gRPC server.
|
/// Destruct this gRPC server.
|
||||||
~GrpcServer() { Shutdown(); }
|
~GrpcServer() { Shutdown(); }
|
||||||
|
@ -120,6 +121,10 @@ class GrpcServer {
|
||||||
std::unique_ptr<grpc::Server> server_;
|
std::unique_ptr<grpc::Server> server_;
|
||||||
/// The polling threads used to check the completion queues.
|
/// The polling threads used to check the completion queues.
|
||||||
std::vector<std::thread> polling_threads_;
|
std::vector<std::thread> polling_threads_;
|
||||||
|
/// The interval to send a new gRPC keepalive timeout from server -> client.
|
||||||
|
/// gRPC server cannot get the ping response within the time, it triggers
|
||||||
|
/// the watchdog timer fired error, which will close the connection.
|
||||||
|
const int64_t keepalive_time_ms_;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Base class that represents an abstract gRPC service.
|
/// Base class that represents an abstract gRPC service.
|
||||||
|
|
Loading…
Add table
Reference in a new issue