diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 31fbd28fd..f744382bc 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -10,6 +10,7 @@ import time import urllib import urllib.parse import yaml +from socket import socket import ray import psutil @@ -160,11 +161,11 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port): "--address", required=False, type=str, help="the address to use for Ray") @click.option( "--port", + type=int, required=False, - type=str, - help="the port of the head ray process. If not provided, tries to use " - "{0}, falling back to a random port if {0} is " - "not available".format(ray_constants.DEFAULT_PORT)) + help=f"the port of the head ray process. If not provided, defaults to " + f"{ray_constants.DEFAULT_PORT}; if port is set to 0, we will" + f" allocate an available port.") @click.option( "--redis-password", required=False, @@ -368,7 +369,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, log_color, verbose): """Start Ray processes manually on the local machine.""" cli_logger.configure(log_style, log_color, verbose) - if gcs_server_port and not head: raise ValueError( "gcs_server_port can be only assigned when you specify --head.") @@ -381,7 +381,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, if address is not None: (redis_address, redis_address_ip, redis_address_port) = services.validate_redis_address(address) - try: resources = json.loads(resources) except Exception: @@ -430,6 +429,15 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, enable_object_reconstruction=enable_object_reconstruction, metrics_export_port=metrics_export_port) if head: + # Use default if port is none, allocate an available port if port is 0 + if port is None: + port = ray_constants.DEFAULT_PORT + + if port == 0: + with socket() as s: + s.bind(("", 0)) + port = s.getsockname()[1] + num_redis_shards = None # Start Ray on the head node. if redis_shard_ports is not None: @@ -447,9 +455,10 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, "started, so a Redis address should not be " "provided.") + node_ip_address = services.get_node_ip_address() + # Get the node IP address if one is not provided. - ray_params.update_if_absent( - node_ip_address=services.get_node_ip_address()) + ray_params.update_if_absent(node_ip_address=node_ip_address) cli_logger.labeled_value("Local node IP", ray_params.node_ip_address) cli_logger.old_info(logger, "Using IP address {} for this node.", ray_params.node_ip_address) @@ -462,6 +471,16 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports, autoscaling_config=autoscaling_config, ) + # Fail early when starting a new cluster when one is already running + if address is None: + default_address = f"{node_ip_address}:{port}" + redis_addresses = services.find_redis_address(default_address) + if len(redis_addresses) > 0: + raise ConnectionError( + f"Ray is already running at {default_address}. " + f"Please specify a different port using the `--port`" + f" command to `ray start`.") + node = ray.node.Node( ray_params, head=True, shutdown_at_exit=block, spawn_reaper=block) redis_address = node.redis_address diff --git a/python/ray/services.py b/python/ray/services.py index 9094b1e4b..67ffdc92c 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -110,7 +110,7 @@ def new_port(): return random.randint(10000, 65535) -def find_redis_address_or_die(): +def find_redis_address(address=None): pids = psutil.pids() redis_addresses = set() for pid in pids: @@ -129,12 +129,19 @@ def find_redis_address_or_die(): for arg in arglist.split(" "): # TODO(ekl): Find a robust solution for locating Redis. if arg.startswith("--redis-address="): - addr = arg.split("=")[1] - redis_addresses.add(addr) + proc_addr = arg.split("=")[1] + if address is not None and address != proc_addr: + continue + redis_addresses.add(proc_addr) except psutil.AccessDenied: pass except psutil.NoSuchProcess: pass + return redis_addresses + + +def find_redis_address_or_die(): + redis_addresses = find_redis_address() if len(redis_addresses) > 1: raise ConnectionError( f"Found multiple active Ray instances: {redis_addresses}. " diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index f4938cbb8..3d9a429e7 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -178,9 +178,8 @@ def ray_start_object_store_memory(request): @pytest.fixture def call_ray_start(request): parameter = getattr( - request, "param", - "ray start --head --num-cpus=1 --min-worker-port=0 --max-worker-port=0" - ) + request, "param", "ray start --head --num-cpus=1 --min-worker-port=0 " + "--max-worker-port=0 --port 0") command_args = parameter.split(" ") out = ray.utils.decode( subprocess.check_output(command_args, stderr=subprocess.STDOUT)) diff --git a/python/ray/tests/test_cli.py b/python/ray/tests/test_cli.py index 21ee42662..4f128f91e 100644 --- a/python/ray/tests/test_cli.py +++ b/python/ray/tests/test_cli.py @@ -158,9 +158,9 @@ DEFAULT_TEST_CONFIG_PATH = str( reason=("Mac builds don't provide proper locale support")) def test_ray_start(configure_lang): runner = CliRunner() - result = runner.invoke( - scripts.start, - ["--head", "--log-style=pretty", "--log-color", "False"]) + result = runner.invoke(scripts.start, [ + "--head", "--log-style=pretty", "--log-color", "False", "--port", "0" + ]) _die_on_error(runner.invoke(scripts.stop)) _check_output_via_pattern("test_ray_start.txt", result) diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index bc3a91ed5..c9cb3cfbd 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -301,7 +301,7 @@ ray.get([a.log.remote(), f.remote()]) @pytest.mark.parametrize( "call_ray_start", [ "ray start --head --num-cpus=1 --num-gpus=1 " + - "--min-worker-port=0 --max-worker-port=0" + "--min-worker-port=0 --max-worker-port=0 --port 0" ], indirect=True) def test_drivers_release_resources(call_ray_start): @@ -369,26 +369,24 @@ print("success") def test_calling_start_ray_head(call_ray_stop_only): + # Test that we can call ray start with various command line # parameters. TODO(rkn): This test only tests the --head code path. We # should also test the non-head node code path. - # Test starting Ray with no arguments. - check_call_ray(["start", "--head"]) - check_call_ray(["stop"]) - # Test starting Ray with a redis port specified. - check_call_ray(["start", "--head"]) + check_call_ray(["start", "--head", "--port", "0"]) check_call_ray(["stop"]) # Test starting Ray with a node IP address specified. - check_call_ray(["start", "--head", "--node-ip-address", "127.0.0.1"]) + check_call_ray( + ["start", "--head", "--node-ip-address", "127.0.0.1", "--port", "0"]) check_call_ray(["stop"]) # Test starting Ray with a system config parameter set. check_call_ray([ "start", "--head", "--system-config", - "{\"metrics_report_interval_ms\":100}" + "{\"metrics_report_interval_ms\":100}", "--port", "0" ]) check_call_ray(["stop"]) @@ -396,45 +394,49 @@ def test_calling_start_ray_head(call_ray_stop_only): # specified. check_call_ray([ "start", "--head", "--object-manager-port", "12345", - "--node-manager-port", "54321" + "--node-manager-port", "54321", "--port", "0" ]) check_call_ray(["stop"]) # Test starting Ray with the worker port range specified. check_call_ray([ "start", "--head", "--min-worker-port", "50000", "--max-worker-port", - "51000" + "51000", "--port", "0" ]) check_call_ray(["stop"]) # Test starting Ray with the number of CPUs specified. - check_call_ray(["start", "--head", "--num-cpus", "2"]) + check_call_ray(["start", "--head", "--num-cpus", "2", "--port", "0"]) check_call_ray(["stop"]) # Test starting Ray with the number of GPUs specified. - check_call_ray(["start", "--head", "--num-gpus", "100"]) + check_call_ray(["start", "--head", "--num-gpus", "100", "--port", "0"]) check_call_ray(["stop"]) # Test starting Ray with redis shard ports specified. - check_call_ray( - ["start", "--head", "--redis-shard-ports", "6380,6381,6382"]) + check_call_ray([ + "start", "--head", "--redis-shard-ports", "6380,6381,6382", "--port", + "0" + ]) check_call_ray(["stop"]) # Test starting Ray with all arguments specified. check_call_ray([ "start", "--head", "--redis-shard-ports", "6380,6381,6382", "--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", "0", - "--resources", "{\"Custom\": 1}" + "--resources", "{\"Custom\": 1}", "--port", "0" ]) check_call_ray(["stop"]) # Test starting Ray with invalid arguments. with pytest.raises(subprocess.CalledProcessError): - check_call_ray(["start", "--head", "--address", "127.0.0.1:6379"]) + check_call_ray( + ["start", "--head", "--address", "127.0.0.1:6379", "--port", "0"]) check_call_ray(["stop"]) # Test --block. Killing a child process should cause the command to exit. - blocked = subprocess.Popen(["ray", "start", "--head", "--block"]) + blocked = subprocess.Popen( + ["ray", "start", "--head", "--block", "--port", "0"]) wait_for_children_of_pid(blocked.pid, num_children=7, timeout=30) @@ -447,7 +449,8 @@ def test_calling_start_ray_head(call_ray_stop_only): assert blocked.returncode != 0, "ray start shouldn't return 0 on bad exit" # Test --block. Killing the command should clean up all child processes. - blocked = subprocess.Popen(["ray", "start", "--head", "--block"]) + blocked = subprocess.Popen( + ["ray", "start", "--head", "--block", "--port", "0"]) blocked.poll() assert blocked.returncode is None diff --git a/python/ray/tests/test_tempfile.py b/python/ray/tests/test_tempfile.py index 8875499d9..2de26a739 100644 --- a/python/ray/tests/test_tempfile.py +++ b/python/ray/tests/test_tempfile.py @@ -42,7 +42,7 @@ def test_tempdir_commandline(): shutil.rmtree(ray.utils.get_ray_temp_dir(), ignore_errors=True) check_call_ray([ "start", "--head", "--temp-dir=" + os.path.join( - ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2") + ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2"), "--port", "0" ]) assert os.path.exists( os.path.join(ray.utils.get_user_temp_dir(),