From ed9118393ca142d74c380e5e9fe43fb6ea026815 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 29 Sep 2021 11:40:19 -0700 Subject: [PATCH] Listen to 127.0.0.1 by default on mac osx (#18904) --- cpp/src/ray/util/process_helper.cc | 9 +++---- python/ray/_private/services.py | 19 ++++++++++++--- python/ray/scripts/scripts.py | 7 +++--- python/ray/serve/tests/test_standalone.py | 2 +- python/ray/tests/test_ray_debugger.py | 7 +++--- python/ray/tests/test_ray_init.py | 1 + src/ray/core_worker/core_worker.cc | 3 ++- .../test/global_state_accessor_test.cc | 1 + .../test/service_based_gcs_client_test.cc | 1 + src/ray/gcs/gcs_server/gcs_server.cc | 2 +- .../gcs_server/test/gcs_server_rpc_test.cc | 1 + src/ray/object_manager/object_manager.cc | 24 ++++++++++--------- src/ray/object_manager/object_manager.h | 2 ++ src/ray/raylet/main.cc | 1 + src/ray/raylet/node_manager.cc | 3 ++- src/ray/rpc/grpc_server.cc | 7 ++++-- src/ray/rpc/grpc_server.h | 6 ++++- src/ray/rpc/test/grpc_server_client_test.cc | 18 +++++++------- 18 files changed, 74 insertions(+), 40 deletions(-) diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index 40f115e64..cd60f3da1 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "process_helper.h" + #include -#include "process_helper.h" #include "ray/util/process.h" #include "ray/util/util.h" #include "src/ray/protobuf/gcs.pb.h" @@ -27,9 +28,9 @@ using ray::core::WorkerType; void ProcessHelper::StartRayNode(const int redis_port, const std::string redis_password, const std::vector &head_args) { - std::vector cmdargs({"ray", "start", "--head", "--port", - std::to_string(redis_port), "--redis-password", - redis_password}); + std::vector cmdargs( + {"ray", "start", "--head", "--port", std::to_string(redis_port), "--redis-password", + redis_password, "--node-ip-address", GetNodeIpAddress()}); if (!head_args.empty()) { cmdargs.insert(cmdargs.end(), head_args.begin(), head_args.end()); } diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 8d135129b..1bfcc235f 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -398,6 +398,11 @@ def node_ip_address_from_perspective(address): def get_node_ip_address(address="8.8.8.8:53"): if ray.worker._global_node is not None: return ray.worker._global_node.node_ip_address + if sys.platform == "darwin": + # Due to the mac osx firewall, + # we use loopback ip as the ip address + # to prevent security popups. + return "127.0.0.1" return node_ip_address_from_perspective(address) @@ -866,7 +871,8 @@ def start_redis(node_ip_address, stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, fate_share=fate_share, - port_denylist=port_denylist) + port_denylist=port_denylist, + listen_to_localhost_only=(node_ip_address == "127.0.0.1")) processes.append(p) redis_address = address(node_ip_address, port) primary_redis_client = redis.StrictRedis( @@ -922,7 +928,8 @@ def start_redis(node_ip_address, stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, fate_share=fate_share, - port_denylist=port_denylist) + port_denylist=port_denylist, + listen_to_localhost_only=(node_ip_address == "127.0.0.1")) processes.append(p) shard_address = address(node_ip_address, redis_shard_port) @@ -944,7 +951,8 @@ def _start_redis_instance(executable, password=None, redis_max_memory=None, fate_share=None, - port_denylist=None): + port_denylist=None, + listen_to_localhost_only=False): """Start a single Redis server. Notes: @@ -970,6 +978,9 @@ def _start_redis_instance(executable, will start LRU eviction of entries. port_denylist (set): A set of denylist ports that shouldn't be used when allocating a new port. + listen_to_localhost_only (bool): Redis server only listens to + localhost (127.0.0.1) if it's true, + otherwise it listens to all network interfaces. Returns: A tuple of the port used by Redis and ProcessInfo for the process that @@ -990,6 +1001,8 @@ def _start_redis_instance(executable, raise ValueError("Spaces not permitted in redis password.") command += ["--requirepass", password] command += (["--port", str(port), "--loglevel", "warning"]) + if listen_to_localhost_only: + command += ["--bind", "127.0.0.1"] process_info = start_ray_process( command, ray_constants.PROCESS_TYPE_REDIS_SERVER, diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 40c44a22f..b6f20092b 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -599,10 +599,9 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, "password.", cf.bold("--redis-password"), cf.bold("--address")) - node_ip_address = services.get_node_ip_address() - # Get the node IP address if one is not provided. - ray_params.update_if_absent(node_ip_address=node_ip_address) + ray_params.update_if_absent( + node_ip_address=services.get_node_ip_address()) cli_logger.labeled_value("Local node IP", ray_params.node_ip_address) ray_params.update_if_absent( redis_port=port, @@ -615,7 +614,7 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, # Fail early when starting a new cluster when one is already running if address is None: - default_address = f"{node_ip_address}:{port}" + default_address = f"{ray_params.node_ip_address}:{port}" redis_addresses = services.find_redis_address(default_address) if len(redis_addresses) > 0: raise ConnectionError( diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index ce8183b2d..3248e0278 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -102,7 +102,7 @@ def test_detached_deployment(ray_cluster): # https://github.com/ray-project/ray/issues/11437 cluster = ray_cluster - head_node = cluster.add_node(node_ip_address="127.0.0.1", num_cpus=6) + head_node = cluster.add_node(num_cpus=6) # Create first job, check we can run a simple serve endpoint ray.init(head_node.address, namespace="serve") diff --git a/python/ray/tests/test_ray_debugger.py b/python/ray/tests/test_ray_debugger.py index fdc6c56da..3cc980bb1 100644 --- a/python/ray/tests/test_ray_debugger.py +++ b/python/ray/tests/test_ray_debugger.py @@ -11,6 +11,7 @@ import pytest import ray from ray.cluster_utils import Cluster from ray._private.test_utils import run_string_as_driver, wait_for_condition +from ray._private import services def test_ray_debugger_breakpoint(shutdown_only): @@ -217,7 +218,7 @@ def test_ray_debugger_public(shutdown_only, call_ray_stop_only, host, port = session["pdb_address"].split(":") if ray_debugger_external: - assert host not in ["localhost", "127.0.0.1"], host + assert host == services.get_node_ip_address(), host else: assert host == "localhost", host @@ -267,13 +268,13 @@ def test_ray_debugger_public_multi_node(shutdown_only, ray_debugger_external): host1, port1 = session1["pdb_address"].split(":") if ray_debugger_external: - assert host1 not in ["localhost", "127.0.0.1"], host1 + assert host1 == services.get_node_ip_address(), host1 else: assert host1 == "localhost", host1 host2, port2 = session2["pdb_address"].split(":") if ray_debugger_external: - assert host2 not in ["localhost", "127.0.0.1"], host2 + assert host2 == services.get_node_ip_address(), host2 else: assert host2 == "localhost", host2 diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index ceeab926f..3fdb6a6ea 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -217,6 +217,7 @@ def test_ray_address(input, call_ray_start): res = ray.init(input) # Ensure this is not a client.connect() assert not isinstance(res, ClientContext) + ray.shutdown() class Credentials(grpc.ChannelCredentials): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9549c13e2..157e3c6eb 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -428,7 +428,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // Start RPC server after all the task receivers are properly initialized and we have // our assigned port from the raylet. core_worker_server_ = std::make_unique( - WorkerTypeString(options_.worker_type), assigned_port); + WorkerTypeString(options_.worker_type), assigned_port, + options_.node_ip_address == "127.0.0.1"); core_worker_server_->RegisterService(grpc_service_); core_worker_server_->Run(); diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index 223ee7ca7..a4950fabb 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -36,6 +36,7 @@ class GlobalStateAccessorTest : public ::testing::Test { config.grpc_server_name = "MockedGcsServer"; config.grpc_server_thread_num = 1; config.redis_address = "127.0.0.1"; + config.node_ip_address = "127.0.0.1"; config.enable_sharding_conn = false; config.redis_port = TEST_REDIS_SERVER_PORTS.front(); diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 0e51ca7b8..0adf74b5c 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -47,6 +47,7 @@ class ServiceBasedGcsClientTest : public ::testing::Test { config_.grpc_server_name = "MockedGcsServer"; config_.grpc_server_thread_num = 1; config_.redis_address = "127.0.0.1"; + config_.node_ip_address = "127.0.0.1"; config_.enable_sharding_conn = false; config_.redis_port = TEST_REDIS_SERVER_PORTS.front(); // Tests legacy code paths. The poller and broadcaster have their own dedicated unit diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 1bdb2bb47..84821c6af 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -35,7 +35,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, : config_(config), main_service_(main_service), rpc_server_(config.grpc_server_name, config.grpc_server_port, - config.grpc_server_thread_num, + config.node_ip_address == "127.0.0.1", config.grpc_server_thread_num, /*keepalive_time_ms=*/RayConfig::instance().grpc_keepalive_time_ms()), client_call_manager_(main_service), raylet_client_pool_( diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index cbe1ba784..5d265ac1b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -33,6 +33,7 @@ class GcsServerTest : public ::testing::Test { config.grpc_server_name = "MockedGcsServer"; config.grpc_server_thread_num = 1; config.redis_address = "127.0.0.1"; + config.node_ip_address = "127.0.0.1"; config.enable_sharding_conn = false; config.redis_port = TEST_REDIS_SERVER_PORTS.front(); gcs_server_.reset(new gcs::GcsServer(config, io_service_)); diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 8e4dd703b..3ee951d75 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -88,6 +88,7 @@ ObjectManager::ObjectManager( buffer_pool_(config_.store_socket_name, config_.object_chunk_size), rpc_work_(rpc_service_), object_manager_server_("ObjectManager", config_.object_manager_port, + config_.object_manager_address == "127.0.0.1", config_.rpc_service_threads_number), object_manager_service_(rpc_service_, *this), client_call_manager_(main_service, config_.rpc_service_threads_number), @@ -441,17 +442,18 @@ void ObjectManager::PushObjectInternal(const ObjectID &object_id, const NodeID & [=]() { // Post to the multithreaded RPC event loop so that data is copied // off of the main thread. - SendObjectChunk(push_id, object_id, node_id, chunk_id, rpc_client, - [=](const Status &status) { - // Post back to the main event loop because the - // PushManager is thread-safe. - main_service_->post( - [this, node_id, object_id]() { - push_manager_->OnChunkComplete(node_id, object_id); - }, - "ObjectManager.Push"); - }, - std::move(chunk_reader)); + SendObjectChunk( + push_id, object_id, node_id, chunk_id, rpc_client, + [=](const Status &status) { + // Post back to the main event loop because the + // PushManager is thread-safe. + main_service_->post( + [this, node_id, object_id]() { + push_manager_->OnChunkComplete(node_id, object_id); + }, + "ObjectManager.Push"); + }, + chunk_reader); }, "ObjectManager.Push"); }); diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 3aaa847f0..688c22ef0 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -49,6 +49,8 @@ namespace ray { struct ObjectManagerConfig { + /// The IP address this object manager is running on. + std::string object_manager_address; /// The port that the object manager should use to listen for connections /// from other object managers. If this is 0, the object manager will choose /// its own port. diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index aa096b3f1..bdceb344c 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -212,6 +212,7 @@ int main(int argc, char *argv[]) { // Configuration for the object manager. ray::ObjectManagerConfig object_manager_config; + object_manager_config.object_manager_address = node_ip_address; object_manager_config.object_manager_port = object_manager_port; object_manager_config.store_socket_name = store_socket_name; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 708ea6665..b988f9e2f 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -252,7 +252,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self temp_dir_(config.temp_dir), initial_config_(config), dependency_manager_(object_manager_), - node_manager_server_("NodeManager", config.node_manager_port), + node_manager_server_("NodeManager", config.node_manager_port, + config.node_manager_address == "127.0.0.1"), node_manager_service_(io_service, *this), agent_manager_service_handler_( new DefaultAgentManagerServiceHandler(agent_manager_)), diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 3c12f9944..470f1851d 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -35,10 +35,12 @@ DEFINE_stats(grpc_server_req_finished, "Finished request number in grpc server", namespace ray { namespace rpc { -GrpcServer::GrpcServer(std::string name, const uint32_t port, int num_threads, +GrpcServer::GrpcServer(std::string name, const uint32_t port, + bool listen_to_localhost_only, int num_threads, int64_t keepalive_time_ms) : name_(std::move(name)), port_(port), + listen_to_localhost_only_(listen_to_localhost_only), is_closed_(true), num_threads_(num_threads), keepalive_time_ms_(keepalive_time_ms) { @@ -47,7 +49,8 @@ GrpcServer::GrpcServer(std::string name, const uint32_t port, int num_threads, void GrpcServer::Run() { uint32_t specified_port = port_; - std::string server_address("0.0.0.0:" + std::to_string(port_)); + std::string server_address((listen_to_localhost_only_ ? "127.0.0.1:" : "0.0.0.0:") + + std::to_string(port_)); grpc::ServerBuilder builder; // Disable the SO_REUSEPORT option. We don't need it in ray. If the option is enabled // (default behavior in grpc), we may see multiple workers listen on the same port and diff --git a/src/ray/rpc/grpc_server.h b/src/ray/rpc/grpc_server.h index 460a27ba6..843c0acba 100644 --- a/src/ray/rpc/grpc_server.h +++ b/src/ray/rpc/grpc_server.h @@ -61,7 +61,8 @@ class GrpcServer { /// \param[in] name Name of this server, used for logging and debugging purpose. /// \param[in] port The port to bind this server to. If it's 0, a random available port /// will be chosen. - GrpcServer(std::string name, const uint32_t port, int num_threads = 1, + GrpcServer(std::string name, const uint32_t port, bool listen_to_localhost_only, + int num_threads = 1, int64_t keepalive_time_ms = 7200000 /*2 hours, grpc default*/); /// Destruct this gRPC server. @@ -108,6 +109,9 @@ class GrpcServer { const std::string name_; /// Port of this server. int port_; + /// Listen to localhost (127.0.0.1) only if it's true, otherwise listen to all network + /// interfaces (0.0.0.0) + const bool listen_to_localhost_only_; /// Indicates whether this server has been closed. bool is_closed_; /// The `grpc::Service` objects which should be registered to `ServerBuilder`. diff --git a/src/ray/rpc/test/grpc_server_client_test.cc b/src/ray/rpc/test/grpc_server_client_test.cc index e7b602e6b..3bd86f5a2 100644 --- a/src/ray/rpc/test/grpc_server_client_test.cc +++ b/src/ray/rpc/test/grpc_server_client_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include + #include "gtest/gtest.h" #include "ray/rpc/grpc_client.h" #include "ray/rpc/grpc_server.h" @@ -35,13 +36,14 @@ class TestServiceHandler { RAY_LOG(INFO) << "No reply!"; return; } - send_reply_callback(ray::Status::OK(), - /*reply_success=*/[]() { RAY_LOG(INFO) << "Reply success."; }, - /*reply_failure=*/ - [this]() { - RAY_LOG(INFO) << "Reply failed."; - reply_failure_count++; - }); + send_reply_callback( + ray::Status::OK(), + /*reply_success=*/[]() { RAY_LOG(INFO) << "Reply success."; }, + /*reply_failure=*/ + [this]() { + RAY_LOG(INFO) << "Reply failed."; + reply_failure_count++; + }); } std::atomic request_count{0}; std::atomic reply_failure_count{0}; @@ -83,7 +85,7 @@ class TestGrpcServerClientFixture : public ::testing::Test { handler_io_service_.run(); }); test_service_.reset(new TestGrpcService(handler_io_service_, test_service_handler_)); - grpc_server_.reset(new GrpcServer("test", 0)); + grpc_server_.reset(new GrpcServer("test", 0, true)); grpc_server_->RegisterService(*test_service_); grpc_server_->Run();