Listen to 127.0.0.1 by default on mac osx (#18904)

This commit is contained in:
Jiajun Yao 2021-09-29 11:40:19 -07:00 committed by GitHub
parent 3665c99896
commit ed9118393c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 74 additions and 40 deletions

View file

@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "process_helper.h"
#include <boost/algorithm/string.hpp>
#include "process_helper.h"
#include "ray/util/process.h"
#include "ray/util/util.h"
#include "src/ray/protobuf/gcs.pb.h"
@ -27,9 +28,9 @@ using ray::core::WorkerType;
void ProcessHelper::StartRayNode(const int redis_port, const std::string redis_password,
const std::vector<std::string> &head_args) {
std::vector<std::string> cmdargs({"ray", "start", "--head", "--port",
std::to_string(redis_port), "--redis-password",
redis_password});
std::vector<std::string> cmdargs(
{"ray", "start", "--head", "--port", std::to_string(redis_port), "--redis-password",
redis_password, "--node-ip-address", GetNodeIpAddress()});
if (!head_args.empty()) {
cmdargs.insert(cmdargs.end(), head_args.begin(), head_args.end());
}

View file

@ -398,6 +398,11 @@ def node_ip_address_from_perspective(address):
def get_node_ip_address(address="8.8.8.8:53"):
if ray.worker._global_node is not None:
return ray.worker._global_node.node_ip_address
if sys.platform == "darwin":
# Due to the mac osx firewall,
# we use loopback ip as the ip address
# to prevent security popups.
return "127.0.0.1"
return node_ip_address_from_perspective(address)
@ -866,7 +871,8 @@ def start_redis(node_ip_address,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
fate_share=fate_share,
port_denylist=port_denylist)
port_denylist=port_denylist,
listen_to_localhost_only=(node_ip_address == "127.0.0.1"))
processes.append(p)
redis_address = address(node_ip_address, port)
primary_redis_client = redis.StrictRedis(
@ -922,7 +928,8 @@ def start_redis(node_ip_address,
stdout_file=redis_stdout_file,
stderr_file=redis_stderr_file,
fate_share=fate_share,
port_denylist=port_denylist)
port_denylist=port_denylist,
listen_to_localhost_only=(node_ip_address == "127.0.0.1"))
processes.append(p)
shard_address = address(node_ip_address, redis_shard_port)
@ -944,7 +951,8 @@ def _start_redis_instance(executable,
password=None,
redis_max_memory=None,
fate_share=None,
port_denylist=None):
port_denylist=None,
listen_to_localhost_only=False):
"""Start a single Redis server.
Notes:
@ -970,6 +978,9 @@ def _start_redis_instance(executable,
will start LRU eviction of entries.
port_denylist (set): A set of denylist ports that shouldn't
be used when allocating a new port.
listen_to_localhost_only (bool): Redis server only listens to
localhost (127.0.0.1) if it's true,
otherwise it listens to all network interfaces.
Returns:
A tuple of the port used by Redis and ProcessInfo for the process that
@ -990,6 +1001,8 @@ def _start_redis_instance(executable,
raise ValueError("Spaces not permitted in redis password.")
command += ["--requirepass", password]
command += (["--port", str(port), "--loglevel", "warning"])
if listen_to_localhost_only:
command += ["--bind", "127.0.0.1"]
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_REDIS_SERVER,

View file

@ -599,10 +599,9 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
"password.", cf.bold("--redis-password"),
cf.bold("--address"))
node_ip_address = services.get_node_ip_address()
# Get the node IP address if one is not provided.
ray_params.update_if_absent(node_ip_address=node_ip_address)
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address())
cli_logger.labeled_value("Local node IP", ray_params.node_ip_address)
ray_params.update_if_absent(
redis_port=port,
@ -615,7 +614,7 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
# Fail early when starting a new cluster when one is already running
if address is None:
default_address = f"{node_ip_address}:{port}"
default_address = f"{ray_params.node_ip_address}:{port}"
redis_addresses = services.find_redis_address(default_address)
if len(redis_addresses) > 0:
raise ConnectionError(

View file

@ -102,7 +102,7 @@ def test_detached_deployment(ray_cluster):
# https://github.com/ray-project/ray/issues/11437
cluster = ray_cluster
head_node = cluster.add_node(node_ip_address="127.0.0.1", num_cpus=6)
head_node = cluster.add_node(num_cpus=6)
# Create first job, check we can run a simple serve endpoint
ray.init(head_node.address, namespace="serve")

View file

@ -11,6 +11,7 @@ import pytest
import ray
from ray.cluster_utils import Cluster
from ray._private.test_utils import run_string_as_driver, wait_for_condition
from ray._private import services
def test_ray_debugger_breakpoint(shutdown_only):
@ -217,7 +218,7 @@ def test_ray_debugger_public(shutdown_only, call_ray_stop_only,
host, port = session["pdb_address"].split(":")
if ray_debugger_external:
assert host not in ["localhost", "127.0.0.1"], host
assert host == services.get_node_ip_address(), host
else:
assert host == "localhost", host
@ -267,13 +268,13 @@ def test_ray_debugger_public_multi_node(shutdown_only, ray_debugger_external):
host1, port1 = session1["pdb_address"].split(":")
if ray_debugger_external:
assert host1 not in ["localhost", "127.0.0.1"], host1
assert host1 == services.get_node_ip_address(), host1
else:
assert host1 == "localhost", host1
host2, port2 = session2["pdb_address"].split(":")
if ray_debugger_external:
assert host2 not in ["localhost", "127.0.0.1"], host2
assert host2 == services.get_node_ip_address(), host2
else:
assert host2 == "localhost", host2

View file

@ -217,6 +217,7 @@ def test_ray_address(input, call_ray_start):
res = ray.init(input)
# Ensure this is not a client.connect()
assert not isinstance(res, ClientContext)
ray.shutdown()
class Credentials(grpc.ChannelCredentials):

View file

@ -428,7 +428,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
// Start RPC server after all the task receivers are properly initialized and we have
// our assigned port from the raylet.
core_worker_server_ = std::make_unique<rpc::GrpcServer>(
WorkerTypeString(options_.worker_type), assigned_port);
WorkerTypeString(options_.worker_type), assigned_port,
options_.node_ip_address == "127.0.0.1");
core_worker_server_->RegisterService(grpc_service_);
core_worker_server_->Run();

View file

@ -36,6 +36,7 @@ class GlobalStateAccessorTest : public ::testing::Test {
config.grpc_server_name = "MockedGcsServer";
config.grpc_server_thread_num = 1;
config.redis_address = "127.0.0.1";
config.node_ip_address = "127.0.0.1";
config.enable_sharding_conn = false;
config.redis_port = TEST_REDIS_SERVER_PORTS.front();

View file

@ -47,6 +47,7 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
config_.grpc_server_name = "MockedGcsServer";
config_.grpc_server_thread_num = 1;
config_.redis_address = "127.0.0.1";
config_.node_ip_address = "127.0.0.1";
config_.enable_sharding_conn = false;
config_.redis_port = TEST_REDIS_SERVER_PORTS.front();
// Tests legacy code paths. The poller and broadcaster have their own dedicated unit

View file

@ -35,7 +35,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
: config_(config),
main_service_(main_service),
rpc_server_(config.grpc_server_name, config.grpc_server_port,
config.grpc_server_thread_num,
config.node_ip_address == "127.0.0.1", config.grpc_server_thread_num,
/*keepalive_time_ms=*/RayConfig::instance().grpc_keepalive_time_ms()),
client_call_manager_(main_service),
raylet_client_pool_(

View file

@ -33,6 +33,7 @@ class GcsServerTest : public ::testing::Test {
config.grpc_server_name = "MockedGcsServer";
config.grpc_server_thread_num = 1;
config.redis_address = "127.0.0.1";
config.node_ip_address = "127.0.0.1";
config.enable_sharding_conn = false;
config.redis_port = TEST_REDIS_SERVER_PORTS.front();
gcs_server_.reset(new gcs::GcsServer(config, io_service_));

View file

@ -88,6 +88,7 @@ ObjectManager::ObjectManager(
buffer_pool_(config_.store_socket_name, config_.object_chunk_size),
rpc_work_(rpc_service_),
object_manager_server_("ObjectManager", config_.object_manager_port,
config_.object_manager_address == "127.0.0.1",
config_.rpc_service_threads_number),
object_manager_service_(rpc_service_, *this),
client_call_manager_(main_service, config_.rpc_service_threads_number),
@ -441,17 +442,18 @@ void ObjectManager::PushObjectInternal(const ObjectID &object_id, const NodeID &
[=]() {
// Post to the multithreaded RPC event loop so that data is copied
// off of the main thread.
SendObjectChunk(push_id, object_id, node_id, chunk_id, rpc_client,
[=](const Status &status) {
// Post back to the main event loop because the
// PushManager is thread-safe.
main_service_->post(
[this, node_id, object_id]() {
push_manager_->OnChunkComplete(node_id, object_id);
},
"ObjectManager.Push");
},
std::move(chunk_reader));
SendObjectChunk(
push_id, object_id, node_id, chunk_id, rpc_client,
[=](const Status &status) {
// Post back to the main event loop because the
// PushManager is thread-safe.
main_service_->post(
[this, node_id, object_id]() {
push_manager_->OnChunkComplete(node_id, object_id);
},
"ObjectManager.Push");
},
chunk_reader);
},
"ObjectManager.Push");
});

View file

@ -49,6 +49,8 @@
namespace ray {
struct ObjectManagerConfig {
/// The IP address this object manager is running on.
std::string object_manager_address;
/// The port that the object manager should use to listen for connections
/// from other object managers. If this is 0, the object manager will choose
/// its own port.

View file

@ -212,6 +212,7 @@ int main(int argc, char *argv[]) {
// Configuration for the object manager.
ray::ObjectManagerConfig object_manager_config;
object_manager_config.object_manager_address = node_ip_address;
object_manager_config.object_manager_port = object_manager_port;
object_manager_config.store_socket_name = store_socket_name;

View file

@ -252,7 +252,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
temp_dir_(config.temp_dir),
initial_config_(config),
dependency_manager_(object_manager_),
node_manager_server_("NodeManager", config.node_manager_port),
node_manager_server_("NodeManager", config.node_manager_port,
config.node_manager_address == "127.0.0.1"),
node_manager_service_(io_service, *this),
agent_manager_service_handler_(
new DefaultAgentManagerServiceHandler(agent_manager_)),

View file

@ -35,10 +35,12 @@ DEFINE_stats(grpc_server_req_finished, "Finished request number in grpc server",
namespace ray {
namespace rpc {
GrpcServer::GrpcServer(std::string name, const uint32_t port, int num_threads,
GrpcServer::GrpcServer(std::string name, const uint32_t port,
bool listen_to_localhost_only, int num_threads,
int64_t keepalive_time_ms)
: name_(std::move(name)),
port_(port),
listen_to_localhost_only_(listen_to_localhost_only),
is_closed_(true),
num_threads_(num_threads),
keepalive_time_ms_(keepalive_time_ms) {
@ -47,7 +49,8 @@ GrpcServer::GrpcServer(std::string name, const uint32_t port, int num_threads,
void GrpcServer::Run() {
uint32_t specified_port = port_;
std::string server_address("0.0.0.0:" + std::to_string(port_));
std::string server_address((listen_to_localhost_only_ ? "127.0.0.1:" : "0.0.0.0:") +
std::to_string(port_));
grpc::ServerBuilder builder;
// Disable the SO_REUSEPORT option. We don't need it in ray. If the option is enabled
// (default behavior in grpc), we may see multiple workers listen on the same port and

View file

@ -61,7 +61,8 @@ class GrpcServer {
/// \param[in] name Name of this server, used for logging and debugging purpose.
/// \param[in] port The port to bind this server to. If it's 0, a random available port
/// will be chosen.
GrpcServer(std::string name, const uint32_t port, int num_threads = 1,
GrpcServer(std::string name, const uint32_t port, bool listen_to_localhost_only,
int num_threads = 1,
int64_t keepalive_time_ms = 7200000 /*2 hours, grpc default*/);
/// Destruct this gRPC server.
@ -108,6 +109,9 @@ class GrpcServer {
const std::string name_;
/// Port of this server.
int port_;
/// Listen to localhost (127.0.0.1) only if it's true, otherwise listen to all network
/// interfaces (0.0.0.0)
const bool listen_to_localhost_only_;
/// Indicates whether this server has been closed.
bool is_closed_;
/// The `grpc::Service` objects which should be registered to `ServerBuilder`.

View file

@ -13,6 +13,7 @@
// limitations under the License.
#include <chrono>
#include "gtest/gtest.h"
#include "ray/rpc/grpc_client.h"
#include "ray/rpc/grpc_server.h"
@ -35,13 +36,14 @@ class TestServiceHandler {
RAY_LOG(INFO) << "No reply!";
return;
}
send_reply_callback(ray::Status::OK(),
/*reply_success=*/[]() { RAY_LOG(INFO) << "Reply success."; },
/*reply_failure=*/
[this]() {
RAY_LOG(INFO) << "Reply failed.";
reply_failure_count++;
});
send_reply_callback(
ray::Status::OK(),
/*reply_success=*/[]() { RAY_LOG(INFO) << "Reply success."; },
/*reply_failure=*/
[this]() {
RAY_LOG(INFO) << "Reply failed.";
reply_failure_count++;
});
}
std::atomic<int> request_count{0};
std::atomic<int> reply_failure_count{0};
@ -83,7 +85,7 @@ class TestGrpcServerClientFixture : public ::testing::Test {
handler_io_service_.run();
});
test_service_.reset(new TestGrpcService(handler_io_service_, test_service_handler_));
grpc_server_.reset(new GrpcServer("test", 0));
grpc_server_.reset(new GrpcServer("test", 0, true));
grpc_server_->RegisterService(*test_service_);
grpc_server_->Run();