mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Add --worker-port-list option to ray start (#11481)
This commit is contained in:
parent
da2d3fbcfc
commit
5d7f271e7d
11 changed files with 76 additions and 10 deletions
|
@ -1156,6 +1156,7 @@ def start_raylet(redis_address,
|
|||
object_store_memory,
|
||||
min_worker_port=None,
|
||||
max_worker_port=None,
|
||||
worker_port_list=None,
|
||||
object_manager_port=None,
|
||||
redis_password=None,
|
||||
metrics_agent_port=None,
|
||||
|
@ -1354,6 +1355,8 @@ def start_raylet(redis_address,
|
|||
f"--metrics-agent-port={metrics_agent_port}",
|
||||
f"--metrics_export_port={metrics_export_port}",
|
||||
]
|
||||
if worker_port_list is not None:
|
||||
command.append(f"--worker_port_list={worker_port_list}")
|
||||
if start_initial_python_workers_for_first_job:
|
||||
command.append("--num_initial_python_workers_for_first_job={}".format(
|
||||
resource_spec.num_cpus))
|
||||
|
|
|
@ -720,6 +720,7 @@ class Node:
|
|||
object_store_memory,
|
||||
min_worker_port=self._ray_params.min_worker_port,
|
||||
max_worker_port=self._ray_params.max_worker_port,
|
||||
worker_port_list=self._ray_params.worker_port_list,
|
||||
object_manager_port=self._ray_params.object_manager_port,
|
||||
redis_password=self._ray_params.redis_password,
|
||||
metrics_agent_port=self._ray_params.metrics_agent_port,
|
||||
|
|
|
@ -41,6 +41,9 @@ class RayParams:
|
|||
on. If not set or set to 0, random ports will be chosen.
|
||||
max_worker_port (int): The highest port number that workers will bind
|
||||
on. If set, min_worker_port must also be set.
|
||||
worker_port_list (str): An explicit list of ports to be used for
|
||||
workers (comma-separated). Overrides min_worker_port and
|
||||
max_worker_port.
|
||||
object_ref_seed (int): Used to seed the deterministic generation of
|
||||
object refs. The same value can be used across multiple runs of the
|
||||
same job in order to generate the object refs in a consistent
|
||||
|
@ -116,6 +119,7 @@ class RayParams:
|
|||
raylet_ip_address=None,
|
||||
min_worker_port=None,
|
||||
max_worker_port=None,
|
||||
worker_port_list=None,
|
||||
object_ref_seed=None,
|
||||
driver_mode=None,
|
||||
redirect_worker_output=None,
|
||||
|
@ -163,6 +167,7 @@ class RayParams:
|
|||
self.raylet_ip_address = raylet_ip_address
|
||||
self.min_worker_port = min_worker_port
|
||||
self.max_worker_port = max_worker_port
|
||||
self.worker_port_list = worker_port_list
|
||||
self.driver_mode = driver_mode
|
||||
self.redirect_worker_output = redirect_worker_output
|
||||
self.redirect_output = redirect_output
|
||||
|
@ -252,6 +257,20 @@ class RayParams:
|
|||
self._check_usage()
|
||||
|
||||
def _check_usage(self):
|
||||
if self.worker_port_list is not None:
|
||||
for port_str in self.worker_port_list.split(","):
|
||||
try:
|
||||
port = int(port_str)
|
||||
except ValueError as e:
|
||||
raise ValueError(
|
||||
"worker_port_list must be a comma-separated " +
|
||||
"list of integers: {}".format(e)) from None
|
||||
|
||||
if port < 1024 or port > 65535:
|
||||
raise ValueError(
|
||||
"Ports in worker_port_list must be "
|
||||
"between 1024 and 65535. Got: {}".format(port))
|
||||
|
||||
# Used primarily for testing.
|
||||
if os.environ.get("RAY_USE_RANDOM_PORTS", False):
|
||||
if self.min_worker_port is None and self.min_worker_port is None:
|
||||
|
|
|
@ -207,6 +207,11 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
|
|||
default=10999,
|
||||
help="the highest port number that workers will bind on. If set, "
|
||||
"'--min-worker-port' must also be set.")
|
||||
@click.option(
|
||||
"--worker-port-list",
|
||||
required=False,
|
||||
help="a comma-separated list of open ports for workers to bind on. "
|
||||
"Overrides '--min-worker-port' and '--max-worker-port'.")
|
||||
@click.option(
|
||||
"--memory",
|
||||
required=False,
|
||||
|
@ -357,9 +362,9 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
|
|||
@add_click_options(logging_options)
|
||||
def start(node_ip_address, address, port, redis_password, redis_shard_ports,
|
||||
object_manager_port, node_manager_port, gcs_server_port,
|
||||
min_worker_port, max_worker_port, memory, object_store_memory,
|
||||
redis_max_memory, num_cpus, num_gpus, resources, head,
|
||||
include_dashboard, dashboard_host, dashboard_port, block,
|
||||
min_worker_port, max_worker_port, worker_port_list, memory,
|
||||
object_store_memory, redis_max_memory, num_cpus, num_gpus, resources,
|
||||
head, include_dashboard, dashboard_host, dashboard_port, block,
|
||||
plasma_directory, autoscaling_config, no_redirect_worker_output,
|
||||
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
|
||||
temp_dir, java_worker_options, load_code_from_local,
|
||||
|
@ -401,6 +406,7 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
|
|||
node_ip_address=node_ip_address,
|
||||
min_worker_port=min_worker_port,
|
||||
max_worker_port=max_worker_port,
|
||||
worker_port_list=worker_port_list,
|
||||
object_manager_port=object_manager_port,
|
||||
node_manager_port=node_manager_port,
|
||||
gcs_server_port=gcs_server_port,
|
||||
|
|
|
@ -405,6 +405,20 @@ def test_calling_start_ray_head(call_ray_stop_only):
|
|||
])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with a worker port list.
|
||||
check_call_ray(["start", "--head", "--worker-port-list", "10000,10001"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with a non-int in the worker port list.
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
check_call_ray(["start", "--head", "--worker-port-list", "10000,a"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with an invalid port in the worker port list.
|
||||
with pytest.raises(subprocess.CalledProcessError):
|
||||
check_call_ray(["start", "--head", "--worker-port-list", "100"])
|
||||
check_call_ray(["stop"])
|
||||
|
||||
# Test starting Ray with the number of CPUs specified.
|
||||
check_call_ray(["start", "--head", "--num-cpus", "2", "--port", "0"])
|
||||
check_call_ray(["stop"])
|
||||
|
|
|
@ -36,6 +36,8 @@ DEFINE_int32(min_worker_port, 0,
|
|||
"The lowest port that workers' gRPC servers will bind on.");
|
||||
DEFINE_int32(max_worker_port, 0,
|
||||
"The highest port that workers' gRPC servers will bind on.");
|
||||
DEFINE_string(worker_port_list, "",
|
||||
"An explicit list of ports that workers' gRPC servers will bind on.");
|
||||
DEFINE_int32(num_initial_workers, 0, "Number of initial workers.");
|
||||
DEFINE_int32(num_initial_python_workers_for_first_job, 0,
|
||||
"Number of initial Python workers for the first job.");
|
||||
|
@ -75,6 +77,7 @@ int main(int argc, char *argv[]) {
|
|||
const int redis_port = static_cast<int>(FLAGS_redis_port);
|
||||
const int min_worker_port = static_cast<int>(FLAGS_min_worker_port);
|
||||
const int max_worker_port = static_cast<int>(FLAGS_max_worker_port);
|
||||
const std::string worker_port_list = FLAGS_worker_port_list;
|
||||
const int num_initial_workers = static_cast<int>(FLAGS_num_initial_workers);
|
||||
const int num_initial_python_workers_for_first_job =
|
||||
static_cast<int>(FLAGS_num_initial_python_workers_for_first_job);
|
||||
|
@ -150,6 +153,15 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
RayConfig::instance().initialize(raylet_config);
|
||||
|
||||
// Parse the worker port list.
|
||||
std::istringstream worker_port_list_string(worker_port_list);
|
||||
std::string worker_port;
|
||||
std::vector<int> worker_ports;
|
||||
|
||||
while (std::getline(worker_port_list_string, worker_port, ',')) {
|
||||
worker_ports.push_back(std::stoi(worker_port));
|
||||
}
|
||||
|
||||
// Parse the resource list.
|
||||
std::istringstream resource_string(static_resource_list);
|
||||
std::string resource_name;
|
||||
|
@ -157,8 +169,6 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
while (std::getline(resource_string, resource_name, ',')) {
|
||||
RAY_CHECK(std::getline(resource_string, resource_quantity, ','));
|
||||
// TODO(rkn): The line below could throw an exception. What should we do
|
||||
// about this?
|
||||
static_resource_conf[resource_name] = std::stod(resource_quantity);
|
||||
}
|
||||
auto num_cpus_it = static_resource_conf.find("CPU");
|
||||
|
@ -180,6 +190,7 @@ int main(int argc, char *argv[]) {
|
|||
node_manager_config.maximum_startup_concurrency = maximum_startup_concurrency;
|
||||
node_manager_config.min_worker_port = min_worker_port;
|
||||
node_manager_config.max_worker_port = max_worker_port;
|
||||
node_manager_config.worker_ports = worker_ports;
|
||||
|
||||
if (!python_worker_command.empty()) {
|
||||
node_manager_config.worker_commands.emplace(
|
||||
|
|
|
@ -142,8 +142,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
|
|||
io_service, config.num_initial_workers, config.num_workers_soft_limit,
|
||||
config.num_initial_python_workers_for_first_job,
|
||||
config.maximum_startup_concurrency, config.min_worker_port,
|
||||
config.max_worker_port, gcs_client_, config.worker_commands,
|
||||
config.raylet_config,
|
||||
config.max_worker_port, config.worker_ports, gcs_client_,
|
||||
config.worker_commands, config.raylet_config,
|
||||
/*starting_worker_timeout_callback=*/
|
||||
[this]() { this->DispatchTasks(this->local_queues_.GetReadyTasksByClass()); }),
|
||||
scheduling_policy_(local_queues_),
|
||||
|
|
|
@ -65,6 +65,9 @@ struct NodeManagerConfig {
|
|||
/// The highest port number that workers started will bind on.
|
||||
/// If this is not set to 0, min_worker_port must also not be set to 0.
|
||||
int max_worker_port;
|
||||
/// An explicit list of open ports that workers started will bind
|
||||
/// on. This takes precedence over min_worker_port and max_worker_port.
|
||||
std::vector<int> worker_ports;
|
||||
/// The initial number of workers to create.
|
||||
int num_initial_workers;
|
||||
/// The soft limit of the number of workers.
|
||||
|
|
|
@ -58,7 +58,8 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers,
|
|||
int num_workers_soft_limit,
|
||||
int num_initial_python_workers_for_first_job,
|
||||
int maximum_startup_concurrency, int min_worker_port,
|
||||
int max_worker_port, std::shared_ptr<gcs::GcsClient> gcs_client,
|
||||
int max_worker_port, const std::vector<int> &worker_ports,
|
||||
std::shared_ptr<gcs::GcsClient> gcs_client,
|
||||
const WorkerCommandMap &worker_commands,
|
||||
const std::unordered_map<std::string, std::string> &raylet_config,
|
||||
std::function<void()> starting_worker_timeout_callback)
|
||||
|
@ -114,7 +115,12 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers,
|
|||
RAY_CHECK(!state.worker_command.empty()) << "Worker command must not be empty.";
|
||||
}
|
||||
// Initialize free ports list with all ports in the specified range.
|
||||
if (min_worker_port != 0) {
|
||||
if (!worker_ports.empty()) {
|
||||
free_ports_ = std::unique_ptr<std::queue<int>>(new std::queue<int>());
|
||||
for (int port : worker_ports) {
|
||||
free_ports_->push(port);
|
||||
}
|
||||
} else if (min_worker_port != 0) {
|
||||
if (max_worker_port == 0) {
|
||||
max_worker_port = 65535; // Maximum valid port number.
|
||||
}
|
||||
|
|
|
@ -83,6 +83,8 @@ class WorkerPool : public WorkerPoolInterface {
|
|||
/// If this is set to 0, workers will bind on random ports.
|
||||
/// \param max_worker_port The highest port number that workers started will bind on.
|
||||
/// If this is not set to 0, min_worker_port must also not be set to 0.
|
||||
/// \param worker_ports An explicit list of open ports that workers started will bind
|
||||
/// on. This takes precedence over min_worker_port and max_worker_port.
|
||||
/// \param worker_commands The commands used to start the worker process, grouped by
|
||||
/// language.
|
||||
/// \param raylet_config The raylet config list of this node.
|
||||
|
@ -91,6 +93,7 @@ class WorkerPool : public WorkerPoolInterface {
|
|||
WorkerPool(boost::asio::io_service &io_service, int num_workers,
|
||||
int num_workers_soft_limit, int num_initial_python_workers_for_first_job,
|
||||
int maximum_startup_concurrency, int min_worker_port, int max_worker_port,
|
||||
const std::vector<int> &worker_ports,
|
||||
std::shared_ptr<gcs::GcsClient> gcs_client,
|
||||
const WorkerCommandMap &worker_commands,
|
||||
const std::unordered_map<std::string, std::string> &raylet_config,
|
||||
|
|
|
@ -34,7 +34,7 @@ class WorkerPoolMock : public WorkerPool {
|
|||
public:
|
||||
explicit WorkerPoolMock(boost::asio::io_service &io_service,
|
||||
const WorkerCommandMap &worker_commands)
|
||||
: WorkerPool(io_service, 0, 0, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, nullptr,
|
||||
: WorkerPool(io_service, 0, 0, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr,
|
||||
worker_commands, {}, []() {}),
|
||||
last_worker_process_() {
|
||||
states_by_lang_[ray::Language::JAVA].num_workers_per_process =
|
||||
|
|
Loading…
Add table
Reference in a new issue