Allow the node manager port and object manager port to be set through… (#3130)

* Allow the node manager port and object manager port to be set through ray start.

* Linting

* Fix Java test

* Address comments.
This commit is contained in:
Robert Nishihara 2018-10-28 17:28:41 -07:00 committed by Philipp Moritz
parent a404401dc6
commit fd854ff090
8 changed files with 104 additions and 32 deletions

View file

@ -179,6 +179,8 @@ public class RunManager {
rayConfig.rayletExecutablePath,
rayConfig.rayletSocketName,
rayConfig.objectStoreSocketName,
"0", // The object manager port.
"0", // The node manager port.
rayConfig.nodeIp,
rayConfig.getRedisIp(),
rayConfig.getRedisPort().toString(),

View file

@ -104,6 +104,11 @@ def cli(logging_level, logging_format):
required=False,
type=int,
help="the port to use for starting the object manager")
@click.option(
"--node-manager-port",
required=False,
type=int,
help="the port to use for starting the node manager")
@click.option(
"--object-store-memory",
required=False,
@ -190,11 +195,11 @@ def cli(logging_level, logging_format):
help="manually specify the root temporary dir of the Ray process")
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_password, redis_shard_ports,
object_manager_port, object_store_memory, num_workers, num_cpus,
num_gpus, resources, head, no_ui, block, plasma_directory,
huge_pages, autoscaling_config, no_redirect_worker_output,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir):
object_manager_port, node_manager_port, object_store_memory,
num_workers, num_cpus, num_gpus, resources, head, no_ui, block,
plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir):
# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
@ -243,15 +248,9 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
logger.info("Using IP address {} for this node."
.format(node_ip_address))
address_info = {}
# Use the provided object manager port if there is one.
if object_manager_port is not None:
address_info["object_manager_ports"] = [object_manager_port]
if address_info == {}:
address_info = None
address_info = services.start_ray_head(
address_info=address_info,
object_manager_ports=[object_manager_port],
node_manager_ports=[node_manager_port],
node_ip_address=node_ip_address,
redis_port=redis_port,
redis_shard_ports=redis_shard_ports,
@ -337,6 +336,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
node_ip_address=node_ip_address,
redis_address=redis_address,
object_manager_ports=[object_manager_port],
node_manager_ports=[node_manager_port],
num_workers=num_workers,
object_store_memory=object_store_memory,
redis_password=redis_password,

View file

@ -849,6 +849,8 @@ def start_raylet(redis_address,
plasma_store_name,
worker_path,
resources=None,
object_manager_port=None,
node_manager_port=None,
num_workers=0,
use_valgrind=False,
use_profiler=False,
@ -867,6 +869,13 @@ def start_raylet(redis_address,
raylet_name (str): The name of the raylet socket to create.
worker_path (str): The path of the script to use when the local
scheduler starts up new workers.
resources: The resources that this raylet has.
object_manager_port (int): The port to use for the object manager. If
this is not provided, we will use 0 and the object manager will
choose its own port.
node_manager_port (int): The port to use for the node manager. If
this is not provided, we will use 0 and the node manager will
choose its own port.
use_valgrind (bool): True if the raylet should be started inside
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the raylet should be started inside
@ -915,10 +924,21 @@ def start_raylet(redis_address,
if redis_password:
start_worker_command += " --redis-password {}".format(redis_password)
# If the object manager port is None, then use 0 to cause the object
# manager to choose its own port.
if object_manager_port is None:
object_manager_port = 0
# If the node manager port is None, then use 0 to cause the node manager
# to choose its own port.
if node_manager_port is None:
node_manager_port = 0
command = [
RAYLET_EXECUTABLE,
raylet_name,
plasma_store_name,
str(object_manager_port),
str(node_manager_port),
node_ip_address,
gcs_ip_address,
gcs_port,
@ -1159,6 +1179,8 @@ def start_raylet_monitor(redis_address,
def start_ray_processes(address_info=None,
object_manager_ports=None,
node_manager_ports=None,
node_ip_address="127.0.0.1",
redis_port=None,
redis_shard_ports=None,
@ -1188,6 +1210,12 @@ def start_ray_processes(address_info=None,
address_info (dict): A dictionary with address information for
processes that have already been started. If provided, address_info
will be modified to include processes that are newly started.
object_manager_ports (list): A list of the ports to use for the object
managers. There should be one per object manager being started on
this node (typically just one).
node_manager_ports (list): A list of the ports to use for the node
managers. There should be one per node manager being started on
this node (typically just one).
node_ip_address (str): The IP address of this node.
redis_port (int): The port that the primary Redis shard should listen
to. If None, then a random port will be chosen. If the key
@ -1341,11 +1369,14 @@ def start_ray_processes(address_info=None,
raylet_socket_names = address_info["raylet_socket_names"]
# Get the ports to use for the object managers if any are provided.
object_manager_ports = (address_info["object_manager_ports"] if
"object_manager_ports" in address_info else None)
if not isinstance(object_manager_ports, list):
assert object_manager_ports is None or num_local_schedulers == 1
object_manager_ports = num_local_schedulers * [object_manager_ports]
assert len(object_manager_ports) == num_local_schedulers
if not isinstance(node_manager_ports, list):
assert node_manager_ports is None or num_local_schedulers == 1
node_manager_ports = num_local_schedulers * [node_manager_ports]
assert len(node_manager_ports) == num_local_schedulers
# Start any object stores that do not yet exist.
for i in range(num_local_schedulers - len(object_store_addresses)):
@ -1378,6 +1409,8 @@ def start_ray_processes(address_info=None,
raylet_socket_name or get_raylet_socket_name(),
object_store_addresses[i],
worker_path,
object_manager_port=object_manager_ports[i],
node_manager_port=node_manager_ports[i],
resources=resources[i],
num_workers=workers_per_local_scheduler[i],
stdout_file=raylet_stdout_file,
@ -1402,6 +1435,7 @@ def start_ray_processes(address_info=None,
def start_ray_node(node_ip_address,
redis_address,
object_manager_ports=None,
node_manager_ports=None,
num_workers=0,
num_local_schedulers=1,
object_store_memory=None,
@ -1427,6 +1461,9 @@ def start_ray_node(node_ip_address,
object_manager_ports (list): A list of the ports to use for the object
managers. There should be one per object manager being started on
this node (typically just one).
node_manager_ports (list): A list of the ports to use for the node
managers. There should be one per node manager being started on
this node (typically just one).
num_workers (int): The number of workers to start.
num_local_schedulers (int): The number of local schedulers to start.
This is also the number of plasma stores and plasma managers to
@ -1463,10 +1500,11 @@ def start_ray_node(node_ip_address,
"""
address_info = {
"redis_address": redis_address,
"object_manager_ports": object_manager_ports
}
return start_ray_processes(
address_info=address_info,
object_manager_ports=object_manager_ports,
node_manager_ports=node_manager_ports,
node_ip_address=node_ip_address,
num_workers=num_workers,
num_local_schedulers=num_local_schedulers,
@ -1486,6 +1524,8 @@ def start_ray_node(node_ip_address,
def start_ray_head(address_info=None,
object_manager_ports=None,
node_manager_ports=None,
node_ip_address="127.0.0.1",
redis_port=None,
redis_shard_ports=None,
@ -1514,6 +1554,12 @@ def start_ray_head(address_info=None,
address_info (dict): A dictionary with address information for
processes that have already been started. If provided, address_info
will be modified to include processes that are newly started.
object_manager_ports (list): A list of the ports to use for the object
managers. There should be one per object manager being started on
this node (typically just one).
node_manager_ports (list): A list of the ports to use for the node
managers. There should be one per node manager being started on
this node (typically just one).
node_ip_address (str): The IP address of this node.
redis_port (int): The port that the primary Redis shard should listen
to. If None, then a random port will be chosen. If the key
@ -1570,6 +1616,8 @@ def start_ray_head(address_info=None,
num_redis_shards = 1 if num_redis_shards is None else num_redis_shards
return start_ray_processes(
address_info=address_info,
object_manager_ports=object_manager_ports,
node_manager_ports=node_manager_ports,
node_ip_address=node_ip_address,
redis_port=redis_port,
redis_shard_ports=redis_shard_ports,

View file

@ -29,6 +29,10 @@
namespace ray {
struct ObjectManagerConfig {
/// The port that the object manager should use to listen for connections
/// from other object managers. If this is 0, the object manager will choose
/// its own port.
int object_manager_port;
/// The time in milliseconds to wait before retrying a pull
/// that fails due to client id lookup.
uint pull_timeout_ms;

View file

@ -20,19 +20,21 @@ int main(int argc, char *argv[]) {
ray::RayLogLevel::INFO,
/*log_dir=*/"");
ray::RayLog::InstallFailureSignalHandler();
RAY_CHECK(argc == 11 || argc == 12);
RAY_CHECK(argc == 13 || argc == 14);
const std::string raylet_socket_name = std::string(argv[1]);
const std::string store_socket_name = std::string(argv[2]);
const std::string node_ip_address = std::string(argv[3]);
const std::string redis_address = std::string(argv[4]);
int redis_port = std::stoi(argv[5]);
int num_initial_workers = std::stoi(argv[6]);
int maximum_startup_concurrency = std::stoi(argv[7]);
const std::string static_resource_list = std::string(argv[8]);
const std::string python_worker_command = std::string(argv[9]);
const std::string java_worker_command = std::string(argv[10]);
const std::string redis_password = (argc == 12 ? std::string(argv[11]) : "");
int object_manager_port = std::stoi(argv[3]);
int node_manager_port = std::stoi(argv[4]);
const std::string node_ip_address = std::string(argv[5]);
const std::string redis_address = std::string(argv[6]);
int redis_port = std::stoi(argv[7]);
int num_initial_workers = std::stoi(argv[8]);
int maximum_startup_concurrency = std::stoi(argv[9]);
const std::string static_resource_list = std::string(argv[10]);
const std::string python_worker_command = std::string(argv[11]);
const std::string java_worker_command = std::string(argv[12]);
const std::string redis_password = (argc == 14 ? std::string(argv[13]) : "");
// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
@ -51,6 +53,7 @@ int main(int argc, char *argv[]) {
ray::raylet::ResourceSet(std::move(static_resource_conf));
RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: "
<< node_manager_config.resource_config.ToString();
node_manager_config.node_manager_port = node_manager_port;
node_manager_config.num_initial_workers = num_initial_workers;
node_manager_config.num_workers_per_process =
RayConfig::instance().num_workers_per_process();
@ -76,6 +79,7 @@ int main(int argc, char *argv[]) {
// Configuration for the object manager.
ray::ObjectManagerConfig object_manager_config;
object_manager_config.object_manager_port = object_manager_port;
object_manager_config.store_socket_name = store_socket_name;
object_manager_config.pull_timeout_ms =
RayConfig::instance().object_manager_pull_timeout_ms();

View file

@ -23,15 +23,23 @@ namespace ray {
namespace raylet {
struct NodeManagerConfig {
/// The node's resource configuration.
ResourceSet resource_config;
/// The port to use for listening to incoming connections. If this is 0 then
/// the node manager will choose its own port.
int node_manager_port;
/// The initial number of workers to create.
int num_initial_workers;
/// The number of workers per process.
int num_workers_per_process;
/// The maximum number of workers that can be started concurrently by a
/// worker pool.
int maximum_startup_concurrency;
/// The commands used to start the worker process, grouped by language.
std::unordered_map<Language, std::vector<std::string>> worker_commands;
/// The time between heartbeats in milliseconds.
uint64_t heartbeat_period_ms;
/// the maximum lineage size.
uint64_t max_lineage_size;
/// The store socket name.
std::string store_socket_name;

View file

@ -24,10 +24,13 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)),
socket_(main_service),
object_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
main_service,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),
object_manager_config.object_manager_port)),
object_manager_socket_(main_service),
node_manager_acceptor_(
main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)),
node_manager_acceptor_(main_service, boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(),
node_manager_config.node_manager_port)),
node_manager_socket_(main_service) {
// Start listening for clients.
DoAccept();

View file

@ -285,9 +285,12 @@ def test_calling_start_ray_head():
["ray", "start", "--head", "--node-ip-address", "127.0.0.1"])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with an object manager port specified.
run_and_get_output(
["ray", "start", "--head", "--object-manager-port", "12345"])
# Test starting Ray with the object manager and node manager ports
# specified.
run_and_get_output([
"ray", "start", "--head", "--object-manager-port", "12345",
"--node-manager-port", "54321"
])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with the number of CPUs specified.