[core] Check that port is unused before assigning to worker (#8773)

This commit is contained in:
Ian Rodney 2020-06-10 16:35:38 -07:00 committed by SangBin Cho
parent c3a3b00a57
commit f6034fd12e
6 changed files with 74 additions and 33 deletions

View file

@ -13,6 +13,7 @@
// limitations under the License.
#include "network_util.h"
#include "ray/util/logging.h"
std::string GetValidLocalIp(int port, int64_t timeout_ms) {
@ -71,3 +72,13 @@ bool Ping(const std::string &ip, int port, int64_t timeout_ms) {
bool is_timeout;
return client.Connect(ip, port, timeout_ms, &is_timeout);
}
bool CheckFree(int port) {
boost::asio::io_service io_service;
tcp::socket socket(io_service);
socket.open(boost::asio::ip::tcp::v4());
boost::system::error_code ec;
socket.bind(boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port), ec);
socket.close();
return !ec.failed();
}

View file

@ -20,6 +20,7 @@
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/system/error_code.hpp>
#include "constants.h"
using boost::asio::deadline_timer;
@ -123,4 +124,5 @@ std::string GetValidLocalIp(int port, int64_t timeout_ms);
/// \return Whether target rpc server is valid.
bool Ping(const std::string &ip, int port, int64_t timeout_ms);
bool CheckFree(int port);
#endif // RAY_COMMON_NETWORK_UTIL_H

View file

@ -116,6 +116,13 @@ RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024)
// limit in Ray to avoid crashing with many small inlined task arguments.
RAY_CONFIG(int64_t, max_grpc_message_size, 100 * 1024 * 1024)
// Number of times to retry creating a gRPC server.
RAY_CONFIG(int64_t, grpc_server_num_retries, 1)
// Retry timeout for trying to create a gRPC server. Only applies if the number
// of retries is non zero.
RAY_CONFIG(int64_t, grpc_server_retry_timeout_milliseconds, 1000)
// The min number of retries for direct actor creation tasks. The actual number
// of creation retries will be MAX(actor_creation_min_retries, max_restarts).
RAY_CONFIG(uint64_t, actor_creation_min_retries, 3)

View file

@ -18,6 +18,7 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include "ray/common/constants.h"
#include "ray/common/network_util.h"
#include "ray/common/ray_config.h"
#include "ray/common/status.h"
#include "ray/gcs/pb_util.h"
@ -296,17 +297,26 @@ Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_
}
Status WorkerPool::GetNextFreePort(int *port) {
if (free_ports_) {
if (free_ports_->empty()) {
return Status::Invalid(
"Ran out of ports to allocate to workers. Please specify a wider port range.");
}
if (!free_ports_) {
*port = 0;
return Status::OK();
}
// Try up to the current number of ports.
int current_size = free_ports_->size();
for (int i = 0; i < current_size; i++) {
*port = free_ports_->front();
free_ports_->pop();
} else {
*port = 0;
if (CheckFree(*port)) {
return Status::OK();
}
// Return to pool to check later.
free_ports_->push(*port);
}
return Status::OK();
*port = -1;
return Status::Invalid(
"No available ports. Please specify a wider port range using --min-worker-port and "
"--max-worker-port.");
}
void WorkerPool::MarkPortAsFree(int port) {

View file

@ -261,6 +261,9 @@ class WorkerPool {
/// Get the next unallocated port in the free ports list. If a port range isn't
/// configured, returns 0.
/// NOTE: Ray does not 'reserve' these ports from being used by other services.
/// There is a race condition where another service binds to the port sometime
/// after this function returns and before the Worker/Driver uses the port.
/// \param[out] port The next available port.
Status GetNextFreePort(int *port);

View file

@ -31,32 +31,40 @@ GrpcServer::GrpcServer(std::string name, const uint32_t port, int num_threads)
void GrpcServer::Run() {
uint32_t specified_port = port_;
std::string server_address("0.0.0.0:" + std::to_string(port_));
int num_retries = RayConfig::instance().grpc_server_num_retries();
while (num_retries >= 0) {
grpc::ServerBuilder builder;
// Disable the SO_REUSEPORT option. We don't need it in ray. If the option is enabled
// (default behavior in grpc), we may see multiple workers listen on the same port and
// the requests sent to this port may be handled by any of the workers.
builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0);
builder.AddChannelArgument(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH,
RayConfig::instance().max_grpc_message_size());
builder.AddChannelArgument(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
RayConfig::instance().max_grpc_message_size());
// TODO(hchen): Add options for authentication.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_);
// Register all the services to this server.
if (services_.empty()) {
RAY_LOG(WARNING) << "No service is found when start grpc server " << name_;
}
for (auto &entry : services_) {
builder.RegisterService(&entry.get());
}
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
for (int i = 0; i < num_threads_; i++) {
cqs_[i] = builder.AddCompletionQueue();
}
// Build and start server.
server_ = builder.BuildAndStart();
if (port_ > 0) {
break;
}
usleep(RayConfig::instance().grpc_server_retry_timeout_milliseconds() * 1000);
num_retries--;
}
grpc::ServerBuilder builder;
// Disable the SO_REUSEPORT option. We don't need it in ray. If the option is enabled
// (default behavior in grpc), we may see multiple workers listen on the same port and
// the requests sent to this port may be handled by any of the workers.
builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, 0);
builder.AddChannelArgument(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH,
RayConfig::instance().max_grpc_message_size());
builder.AddChannelArgument(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH,
RayConfig::instance().max_grpc_message_size());
// TODO(hchen): Add options for authentication.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port_);
// Register all the services to this server.
if (services_.empty()) {
RAY_LOG(WARNING) << "No service is found when start grpc server " << name_;
}
for (auto &entry : services_) {
builder.RegisterService(&entry.get());
}
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
for (int i = 0; i < num_threads_; i++) {
cqs_[i] = builder.AddCompletionQueue();
}
// Build and start server.
server_ = builder.BuildAndStart();
// If the grpc server failed to bind the port, the `port_` will be set to 0.
RAY_CHECK(port_ > 0)
<< "Port " << specified_port