diff --git a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java index b4b13df37..4328a8dac 100644 --- a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java @@ -69,7 +69,7 @@ public class RunManager { String.format( "import ray;" + " print(ray._private.services.get_address_info_from_redis(" - + "'%s', '%s', redis_password='%s'))", + + "'%s', '%s', redis_password='%s', log_warning=False))", rayConfig.getRedisAddress(), rayConfig.nodeIp, rayConfig.redisPassword); List command = Arrays.asList("python", "-c", script); diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 3cba01ffe..f8324b28d 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -280,7 +280,8 @@ def get_address_info_from_redis_helper(redis_address, def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5, - redis_password=None): + redis_password=None, + log_warning=True): counter = 0 while True: try: @@ -289,12 +290,13 @@ def get_address_info_from_redis(redis_address, except Exception: if counter == num_retries: raise + if log_warning: + logger.warning( + "Some processes that the driver needs to connect to have " + "not registered with Redis, so retrying. Have you run " + "'ray start' on this node?") # Some of the information may not be in Redis yet, so wait a little # bit. - logger.warning( - "Some processes that the driver needs to connect to have " - "not registered with Redis, so retrying. Have you run " - "'ray start' on this node?") time.sleep(1) counter += 1 @@ -1318,8 +1320,8 @@ def start_raylet(redis_address, Args: redis_address (str): The address of the primary Redis server. node_ip_address (str): The IP address of this node. - node_manager_port(int): The port to use for the node manager. This must - not be 0. + node_manager_port(int): The port to use for the node manager. If it's + 0, a random port will be used. raylet_name (str): The name of the raylet socket to create. plasma_store_name (str): The name of the plasma store socket to connect to. @@ -1356,9 +1358,7 @@ def start_raylet(redis_address, Returns: ProcessInfo for the process that was started. """ - # The caller must provide a node manager port so that we can correctly - # populate the command to start a worker. - assert node_manager_port is not None and node_manager_port != 0 + assert node_manager_port is not None and type(node_manager_port) == int if use_valgrind and use_profiler: raise ValueError("Cannot use valgrind and profiler at the same time.") @@ -1395,7 +1395,6 @@ def start_raylet(redis_address, java_worker_command = build_java_worker_command( json.loads(java_worker_options) if java_worker_options else [], redis_address, - node_manager_port, plasma_store_name, raylet_name, redis_password, @@ -1409,7 +1408,6 @@ def start_raylet(redis_address, cpp_worker_command = build_cpp_worker_command( "", redis_address, - node_manager_port, plasma_store_name, raylet_name, redis_password, @@ -1423,7 +1421,7 @@ def start_raylet(redis_address, sys.executable, worker_path, f"--node-ip-address={node_ip_address}", - f"--node-manager-port={node_manager_port}", + "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER", f"--object-store-name={plasma_store_name}", f"--raylet-name={raylet_name}", f"--redis-address={redis_address}", @@ -1456,7 +1454,7 @@ def start_raylet(redis_address, f"--redis-address={redis_address}", f"--metrics-export-port={metrics_export_port}", f"--dashboard-agent-port={metrics_agent_port}", - f"--node-manager-port={node_manager_port}", + "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER", f"--object-store-name={plasma_store_name}", f"--raylet-name={raylet_name}", f"--temp-dir={temp_dir}", @@ -1533,10 +1531,15 @@ def get_ray_jars_dir(): return os.path.abspath(os.path.join(current_dir, "jars")) -def build_java_worker_command(java_worker_options, redis_address, - node_manager_port, plasma_store_name, - raylet_name, redis_password, session_dir, - node_ip_address): +def build_java_worker_command( + java_worker_options, + redis_address, + plasma_store_name, + raylet_name, + redis_password, + session_dir, + node_ip_address, +): """This method assembles the command used to start a Java worker. Args: @@ -1554,7 +1557,8 @@ def build_java_worker_command(java_worker_options, redis_address, pairs = [] if redis_address is not None: pairs.append(("ray.address", redis_address)) - pairs.append(("ray.raylet.node-manager-port", node_manager_port)) + pairs.append(("ray.raylet.node-manager-port", + "RAY_NODE_MANAGER_PORT_PLACEHOLDER")) if plasma_store_name is not None: pairs.append(("ray.object-store.socket-name", plasma_store_name)) @@ -1603,7 +1607,6 @@ def build_java_worker_command(java_worker_options, redis_address, def build_cpp_worker_command( cpp_worker_options, redis_address, - node_manager_port, plasma_store_name, raylet_name, redis_password, @@ -1625,7 +1628,8 @@ def build_cpp_worker_command( command = [ DEFAULT_WORKER_EXECUTABLE, plasma_store_name, raylet_name, - str(node_manager_port), redis_address, redis_password, session_dir + "RAY_NODE_MANAGER_PORT_PLACEHOLDER", redis_address, redis_password, + session_dir ] return command diff --git a/python/ray/node.py b/python/ray/node.py index 05f3383a5..86c82cb0f 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -216,17 +216,6 @@ class Node: self._webui_url = ( ray._private.services.get_webui_url_from_redis(redis_client)) - if head or not connect_only: - # We need to start a local raylet. - if (self._ray_params.node_manager_port is None - or self._ray_params.node_manager_port == 0): - # No port specified. Pick a random port for the raylet to use. - # NOTE: There is a possible but unlikely race condition where - # the port is bound by another process between now and when the - # raylet starts. - self._ray_params.node_manager_port, self._socket = \ - self._get_unused_port(close_on_exit=False) - if not connect_only and spawn_reaper and not self.kernel_fate_share: self.start_reaper_process() @@ -240,6 +229,13 @@ class Node: if not connect_only: self.start_ray_processes() + address_info = (ray._private.services.get_address_info_from_redis( + self.redis_address, + self._raylet_ip_address, + redis_password=self.redis_password, + log_warning=False)) + self._ray_params.node_manager_port = address_info[ + "node_manager_port"] def _register_shutdown_hooks(self): # Register the atexit handler. In this case, we shouldn't call sys.exit diff --git a/python/ray/parameter.py b/python/ray/parameter.py index bdeec7627..c1c0b2f38 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -119,7 +119,7 @@ class RayParams: redis_port=None, redis_shard_ports=None, object_manager_port=None, - node_manager_port=None, + node_manager_port=0, gcs_server_port=None, node_ip_address=None, raylet_ip_address=None, diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index ec55beddb..5ca024a63 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -261,6 +261,7 @@ def debug(address): "--node-manager-port", required=False, type=int, + default=0, help="the port to use for starting the node manager") @click.option( "--gcs-server-port", diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index 31976b080..7bc5c146c 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -37,6 +37,8 @@ constexpr char kTaskTablePrefix[] = "TaskTable"; constexpr char kWorkerDynamicOptionPlaceholder[] = "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER"; +constexpr char kNodeManagerPortPlaceholder[] = "RAY_NODE_MANAGER_PORT_PLACEHOLDER"; + /// Public DNS address which is is used to connect and get local IP. constexpr char kPublicDNSServerIp[] = "8.8.8.8"; constexpr int kPublicDNSServerPort = 53; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a7397601d..610c322b4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -245,8 +245,18 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self node_manager_server_.RegisterService(agent_manager_service_); node_manager_server_.Run(); - auto options = - AgentManager::Options({self_node_id, ParseCommandLine(config.agent_command)}); + worker_pool_.SetNodeManagerPort(GetServerPort()); + + auto agent_command_line = ParseCommandLine(config.agent_command); + for (auto &arg : agent_command_line) { + auto node_manager_port_position = arg.find(kNodeManagerPortPlaceholder); + if (node_manager_port_position != std::string::npos) { + arg.replace(node_manager_port_position, strlen(kNodeManagerPortPlaceholder), + std::to_string(GetServerPort())); + } + } + + auto options = AgentManager::Options({self_node_id, agent_command_line}); agent_manager_.reset( new AgentManager(std::move(options), /*delay_executor=*/ diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index bbe89ecfe..ab0e3baf6 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -128,6 +128,12 @@ WorkerPool::~WorkerPool() { } } +// NOTE(kfstorm): The node manager cannot be passed via WorkerPool constructor because the +// grpc server is started after the WorkerPool instance is constructed. +void WorkerPool::SetNodeManagerPort(int node_manager_port) { + node_manager_port_ = node_manager_port; +} + Process WorkerPool::StartWorkerProcess( const Language &language, const rpc::WorkerType worker_type, const JobID &job_id, std::vector dynamic_options, @@ -221,6 +227,19 @@ Process WorkerPool::StartWorkerProcess( } continue; } + RAY_CHECK(node_manager_port_ != 0) + << "Node manager port is not set yet. This shouldn't happen unless we are trying " + "to start a worker process before node manager server is started. In this " + "case, it's a bug and it should be fixed."; + auto node_manager_port_position = token.find(kNodeManagerPortPlaceholder); + if (node_manager_port_position != std::string::npos) { + auto replaced_token = token; + replaced_token.replace(node_manager_port_position, + strlen(kNodeManagerPortPlaceholder), + std::to_string(node_manager_port_)); + worker_command_args.push_back(replaced_token); + continue; + } worker_command_args.push_back(token); } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 7a101d55d..4d47aa0cf 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -123,6 +123,10 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Destructor responsible for freeing a set of workers owned by this class. virtual ~WorkerPool(); + /// Set the node manager port. + /// \param node_manager_port The port Raylet uses for listening to incoming connections. + void SetNodeManagerPort(int node_manager_port); + /// Handles the event that a job is started. /// /// \param job_id ID of the started job. @@ -475,6 +479,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Keeps track of unused ports that newly-created workers can bind on. /// If null, workers will not be passed ports and will choose them randomly. std::unique_ptr> free_ports_; + /// The port Raylet uses for listening to incoming connections. + int node_manager_port_ = 0; /// A client connection to the GCS. std::shared_ptr gcs_client_; /// The callback that will be triggered once it times out to start a worker. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index ba5981c16..022ca3e7d 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -38,7 +38,9 @@ class WorkerPoolMock : public WorkerPool { const WorkerCommandMap &worker_commands) : WorkerPool(io_service, POOL_SIZE_SOFT_LIMIT, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr, worker_commands, []() {}), - last_worker_process_() {} + last_worker_process_() { + SetNodeManagerPort(1); + } ~WorkerPoolMock() { // Avoid killing real processes