[Core] Raylet to pick the node manager port (#14349)

This commit is contained in:
Kai Yang 2021-02-27 20:27:09 +08:00 committed by GitHub
parent 8cfaea5fc5
commit e0e8918d60
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 77 additions and 37 deletions

View file

@ -69,7 +69,7 @@ public class RunManager {
String.format(
"import ray;"
+ " print(ray._private.services.get_address_info_from_redis("
+ "'%s', '%s', redis_password='%s'))",
+ "'%s', '%s', redis_password='%s', log_warning=False))",
rayConfig.getRedisAddress(), rayConfig.nodeIp, rayConfig.redisPassword);
List<String> command = Arrays.asList("python", "-c", script);

View file

@ -280,7 +280,8 @@ def get_address_info_from_redis_helper(redis_address,
def get_address_info_from_redis(redis_address,
node_ip_address,
num_retries=5,
redis_password=None):
redis_password=None,
log_warning=True):
counter = 0
while True:
try:
@ -289,12 +290,13 @@ def get_address_info_from_redis(redis_address,
except Exception:
if counter == num_retries:
raise
if log_warning:
logger.warning(
"Some processes that the driver needs to connect to have "
"not registered with Redis, so retrying. Have you run "
"'ray start' on this node?")
# Some of the information may not be in Redis yet, so wait a little
# bit.
logger.warning(
"Some processes that the driver needs to connect to have "
"not registered with Redis, so retrying. Have you run "
"'ray start' on this node?")
time.sleep(1)
counter += 1
@ -1318,8 +1320,8 @@ def start_raylet(redis_address,
Args:
redis_address (str): The address of the primary Redis server.
node_ip_address (str): The IP address of this node.
node_manager_port(int): The port to use for the node manager. This must
not be 0.
node_manager_port(int): The port to use for the node manager. If it's
0, a random port will be used.
raylet_name (str): The name of the raylet socket to create.
plasma_store_name (str): The name of the plasma store socket to connect
to.
@ -1356,9 +1358,7 @@ def start_raylet(redis_address,
Returns:
ProcessInfo for the process that was started.
"""
# The caller must provide a node manager port so that we can correctly
# populate the command to start a worker.
assert node_manager_port is not None and node_manager_port != 0
assert node_manager_port is not None and type(node_manager_port) == int
if use_valgrind and use_profiler:
raise ValueError("Cannot use valgrind and profiler at the same time.")
@ -1395,7 +1395,6 @@ def start_raylet(redis_address,
java_worker_command = build_java_worker_command(
json.loads(java_worker_options) if java_worker_options else [],
redis_address,
node_manager_port,
plasma_store_name,
raylet_name,
redis_password,
@ -1409,7 +1408,6 @@ def start_raylet(redis_address,
cpp_worker_command = build_cpp_worker_command(
"",
redis_address,
node_manager_port,
plasma_store_name,
raylet_name,
redis_password,
@ -1423,7 +1421,7 @@ def start_raylet(redis_address,
sys.executable,
worker_path,
f"--node-ip-address={node_ip_address}",
f"--node-manager-port={node_manager_port}",
"--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
f"--object-store-name={plasma_store_name}",
f"--raylet-name={raylet_name}",
f"--redis-address={redis_address}",
@ -1456,7 +1454,7 @@ def start_raylet(redis_address,
f"--redis-address={redis_address}",
f"--metrics-export-port={metrics_export_port}",
f"--dashboard-agent-port={metrics_agent_port}",
f"--node-manager-port={node_manager_port}",
"--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
f"--object-store-name={plasma_store_name}",
f"--raylet-name={raylet_name}",
f"--temp-dir={temp_dir}",
@ -1533,10 +1531,15 @@ def get_ray_jars_dir():
return os.path.abspath(os.path.join(current_dir, "jars"))
def build_java_worker_command(java_worker_options, redis_address,
node_manager_port, plasma_store_name,
raylet_name, redis_password, session_dir,
node_ip_address):
def build_java_worker_command(
java_worker_options,
redis_address,
plasma_store_name,
raylet_name,
redis_password,
session_dir,
node_ip_address,
):
"""This method assembles the command used to start a Java worker.
Args:
@ -1554,7 +1557,8 @@ def build_java_worker_command(java_worker_options, redis_address,
pairs = []
if redis_address is not None:
pairs.append(("ray.address", redis_address))
pairs.append(("ray.raylet.node-manager-port", node_manager_port))
pairs.append(("ray.raylet.node-manager-port",
"RAY_NODE_MANAGER_PORT_PLACEHOLDER"))
if plasma_store_name is not None:
pairs.append(("ray.object-store.socket-name", plasma_store_name))
@ -1603,7 +1607,6 @@ def build_java_worker_command(java_worker_options, redis_address,
def build_cpp_worker_command(
cpp_worker_options,
redis_address,
node_manager_port,
plasma_store_name,
raylet_name,
redis_password,
@ -1625,7 +1628,8 @@ def build_cpp_worker_command(
command = [
DEFAULT_WORKER_EXECUTABLE, plasma_store_name, raylet_name,
str(node_manager_port), redis_address, redis_password, session_dir
"RAY_NODE_MANAGER_PORT_PLACEHOLDER", redis_address, redis_password,
session_dir
]
return command

View file

@ -216,17 +216,6 @@ class Node:
self._webui_url = (
ray._private.services.get_webui_url_from_redis(redis_client))
if head or not connect_only:
# We need to start a local raylet.
if (self._ray_params.node_manager_port is None
or self._ray_params.node_manager_port == 0):
# No port specified. Pick a random port for the raylet to use.
# NOTE: There is a possible but unlikely race condition where
# the port is bound by another process between now and when the
# raylet starts.
self._ray_params.node_manager_port, self._socket = \
self._get_unused_port(close_on_exit=False)
if not connect_only and spawn_reaper and not self.kernel_fate_share:
self.start_reaper_process()
@ -240,6 +229,13 @@ class Node:
if not connect_only:
self.start_ray_processes()
address_info = (ray._private.services.get_address_info_from_redis(
self.redis_address,
self._raylet_ip_address,
redis_password=self.redis_password,
log_warning=False))
self._ray_params.node_manager_port = address_info[
"node_manager_port"]
def _register_shutdown_hooks(self):
# Register the atexit handler. In this case, we shouldn't call sys.exit

View file

@ -119,7 +119,7 @@ class RayParams:
redis_port=None,
redis_shard_ports=None,
object_manager_port=None,
node_manager_port=None,
node_manager_port=0,
gcs_server_port=None,
node_ip_address=None,
raylet_ip_address=None,

View file

@ -261,6 +261,7 @@ def debug(address):
"--node-manager-port",
required=False,
type=int,
default=0,
help="the port to use for starting the node manager")
@click.option(
"--gcs-server-port",

View file

@ -37,6 +37,8 @@ constexpr char kTaskTablePrefix[] = "TaskTable";
constexpr char kWorkerDynamicOptionPlaceholder[] =
"RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER";
constexpr char kNodeManagerPortPlaceholder[] = "RAY_NODE_MANAGER_PORT_PLACEHOLDER";
/// Public DNS address which is is used to connect and get local IP.
constexpr char kPublicDNSServerIp[] = "8.8.8.8";
constexpr int kPublicDNSServerPort = 53;

View file

@ -245,8 +245,18 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
node_manager_server_.RegisterService(agent_manager_service_);
node_manager_server_.Run();
auto options =
AgentManager::Options({self_node_id, ParseCommandLine(config.agent_command)});
worker_pool_.SetNodeManagerPort(GetServerPort());
auto agent_command_line = ParseCommandLine(config.agent_command);
for (auto &arg : agent_command_line) {
auto node_manager_port_position = arg.find(kNodeManagerPortPlaceholder);
if (node_manager_port_position != std::string::npos) {
arg.replace(node_manager_port_position, strlen(kNodeManagerPortPlaceholder),
std::to_string(GetServerPort()));
}
}
auto options = AgentManager::Options({self_node_id, agent_command_line});
agent_manager_.reset(
new AgentManager(std::move(options),
/*delay_executor=*/

View file

@ -128,6 +128,12 @@ WorkerPool::~WorkerPool() {
}
}
// NOTE(kfstorm): The node manager cannot be passed via WorkerPool constructor because the
// grpc server is started after the WorkerPool instance is constructed.
void WorkerPool::SetNodeManagerPort(int node_manager_port) {
node_manager_port_ = node_manager_port;
}
Process WorkerPool::StartWorkerProcess(
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
std::vector<std::string> dynamic_options,
@ -221,6 +227,19 @@ Process WorkerPool::StartWorkerProcess(
}
continue;
}
RAY_CHECK(node_manager_port_ != 0)
<< "Node manager port is not set yet. This shouldn't happen unless we are trying "
"to start a worker process before node manager server is started. In this "
"case, it's a bug and it should be fixed.";
auto node_manager_port_position = token.find(kNodeManagerPortPlaceholder);
if (node_manager_port_position != std::string::npos) {
auto replaced_token = token;
replaced_token.replace(node_manager_port_position,
strlen(kNodeManagerPortPlaceholder),
std::to_string(node_manager_port_));
worker_command_args.push_back(replaced_token);
continue;
}
worker_command_args.push_back(token);
}

View file

@ -123,6 +123,10 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// Destructor responsible for freeing a set of workers owned by this class.
virtual ~WorkerPool();
/// Set the node manager port.
/// \param node_manager_port The port Raylet uses for listening to incoming connections.
void SetNodeManagerPort(int node_manager_port);
/// Handles the event that a job is started.
///
/// \param job_id ID of the started job.
@ -475,6 +479,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// Keeps track of unused ports that newly-created workers can bind on.
/// If null, workers will not be passed ports and will choose them randomly.
std::unique_ptr<std::queue<int>> free_ports_;
/// The port Raylet uses for listening to incoming connections.
int node_manager_port_ = 0;
/// A client connection to the GCS.
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// The callback that will be triggered once it times out to start a worker.

View file

@ -38,7 +38,9 @@ class WorkerPoolMock : public WorkerPool {
const WorkerCommandMap &worker_commands)
: WorkerPool(io_service, POOL_SIZE_SOFT_LIMIT, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0,
{}, nullptr, worker_commands, []() {}),
last_worker_process_() {}
last_worker_process_() {
SetNodeManagerPort(1);
}
~WorkerPoolMock() {
// Avoid killing real processes