[cli][ray] ray start should error by default if there's already an instance running (#10826)

This commit is contained in:
Keqiu Hu 2020-09-24 08:29:59 -07:00 committed by GitHub
parent 842861b4fc
commit 46a560e876
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 64 additions and 36 deletions

View file

@ -10,6 +10,7 @@ import time
import urllib
import urllib.parse
import yaml
from socket import socket
import ray
import psutil
@ -160,11 +161,11 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
"--address", required=False, type=str, help="the address to use for Ray")
@click.option(
"--port",
type=int,
required=False,
type=str,
help="the port of the head ray process. If not provided, tries to use "
"{0}, falling back to a random port if {0} is "
"not available".format(ray_constants.DEFAULT_PORT))
help=f"the port of the head ray process. If not provided, defaults to "
f"{ray_constants.DEFAULT_PORT}; if port is set to 0, we will"
f" allocate an available port.")
@click.option(
"--redis-password",
required=False,
@ -368,7 +369,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
log_color, verbose):
"""Start Ray processes manually on the local machine."""
cli_logger.configure(log_style, log_color, verbose)
if gcs_server_port and not head:
raise ValueError(
"gcs_server_port can be only assigned when you specify --head.")
@ -381,7 +381,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
if address is not None:
(redis_address, redis_address_ip,
redis_address_port) = services.validate_redis_address(address)
try:
resources = json.loads(resources)
except Exception:
@ -430,6 +429,15 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
enable_object_reconstruction=enable_object_reconstruction,
metrics_export_port=metrics_export_port)
if head:
# Use default if port is none, allocate an available port if port is 0
if port is None:
port = ray_constants.DEFAULT_PORT
if port == 0:
with socket() as s:
s.bind(("", 0))
port = s.getsockname()[1]
num_redis_shards = None
# Start Ray on the head node.
if redis_shard_ports is not None:
@ -447,9 +455,10 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
"started, so a Redis address should not be "
"provided.")
node_ip_address = services.get_node_ip_address()
# Get the node IP address if one is not provided.
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address())
ray_params.update_if_absent(node_ip_address=node_ip_address)
cli_logger.labeled_value("Local node IP", ray_params.node_ip_address)
cli_logger.old_info(logger, "Using IP address {} for this node.",
ray_params.node_ip_address)
@ -462,6 +471,16 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
autoscaling_config=autoscaling_config,
)
# Fail early when starting a new cluster when one is already running
if address is None:
default_address = f"{node_ip_address}:{port}"
redis_addresses = services.find_redis_address(default_address)
if len(redis_addresses) > 0:
raise ConnectionError(
f"Ray is already running at {default_address}. "
f"Please specify a different port using the `--port`"
f" command to `ray start`.")
node = ray.node.Node(
ray_params, head=True, shutdown_at_exit=block, spawn_reaper=block)
redis_address = node.redis_address

View file

@ -110,7 +110,7 @@ def new_port():
return random.randint(10000, 65535)
def find_redis_address_or_die():
def find_redis_address(address=None):
pids = psutil.pids()
redis_addresses = set()
for pid in pids:
@ -129,12 +129,19 @@ def find_redis_address_or_die():
for arg in arglist.split(" "):
# TODO(ekl): Find a robust solution for locating Redis.
if arg.startswith("--redis-address="):
addr = arg.split("=")[1]
redis_addresses.add(addr)
proc_addr = arg.split("=")[1]
if address is not None and address != proc_addr:
continue
redis_addresses.add(proc_addr)
except psutil.AccessDenied:
pass
except psutil.NoSuchProcess:
pass
return redis_addresses
def find_redis_address_or_die():
redis_addresses = find_redis_address()
if len(redis_addresses) > 1:
raise ConnectionError(
f"Found multiple active Ray instances: {redis_addresses}. "

View file

@ -178,9 +178,8 @@ def ray_start_object_store_memory(request):
@pytest.fixture
def call_ray_start(request):
parameter = getattr(
request, "param",
"ray start --head --num-cpus=1 --min-worker-port=0 --max-worker-port=0"
)
request, "param", "ray start --head --num-cpus=1 --min-worker-port=0 "
"--max-worker-port=0 --port 0")
command_args = parameter.split(" ")
out = ray.utils.decode(
subprocess.check_output(command_args, stderr=subprocess.STDOUT))

View file

@ -158,9 +158,9 @@ DEFAULT_TEST_CONFIG_PATH = str(
reason=("Mac builds don't provide proper locale support"))
def test_ray_start(configure_lang):
runner = CliRunner()
result = runner.invoke(
scripts.start,
["--head", "--log-style=pretty", "--log-color", "False"])
result = runner.invoke(scripts.start, [
"--head", "--log-style=pretty", "--log-color", "False", "--port", "0"
])
_die_on_error(runner.invoke(scripts.stop))
_check_output_via_pattern("test_ray_start.txt", result)

View file

@ -301,7 +301,7 @@ ray.get([a.log.remote(), f.remote()])
@pytest.mark.parametrize(
"call_ray_start", [
"ray start --head --num-cpus=1 --num-gpus=1 " +
"--min-worker-port=0 --max-worker-port=0"
"--min-worker-port=0 --max-worker-port=0 --port 0"
],
indirect=True)
def test_drivers_release_resources(call_ray_start):
@ -369,26 +369,24 @@ print("success")
def test_calling_start_ray_head(call_ray_stop_only):
# Test that we can call ray start with various command line
# parameters. TODO(rkn): This test only tests the --head code path. We
# should also test the non-head node code path.
# Test starting Ray with no arguments.
check_call_ray(["start", "--head"])
check_call_ray(["stop"])
# Test starting Ray with a redis port specified.
check_call_ray(["start", "--head"])
check_call_ray(["start", "--head", "--port", "0"])
check_call_ray(["stop"])
# Test starting Ray with a node IP address specified.
check_call_ray(["start", "--head", "--node-ip-address", "127.0.0.1"])
check_call_ray(
["start", "--head", "--node-ip-address", "127.0.0.1", "--port", "0"])
check_call_ray(["stop"])
# Test starting Ray with a system config parameter set.
check_call_ray([
"start", "--head", "--system-config",
"{\"metrics_report_interval_ms\":100}"
"{\"metrics_report_interval_ms\":100}", "--port", "0"
])
check_call_ray(["stop"])
@ -396,45 +394,49 @@ def test_calling_start_ray_head(call_ray_stop_only):
# specified.
check_call_ray([
"start", "--head", "--object-manager-port", "12345",
"--node-manager-port", "54321"
"--node-manager-port", "54321", "--port", "0"
])
check_call_ray(["stop"])
# Test starting Ray with the worker port range specified.
check_call_ray([
"start", "--head", "--min-worker-port", "50000", "--max-worker-port",
"51000"
"51000", "--port", "0"
])
check_call_ray(["stop"])
# Test starting Ray with the number of CPUs specified.
check_call_ray(["start", "--head", "--num-cpus", "2"])
check_call_ray(["start", "--head", "--num-cpus", "2", "--port", "0"])
check_call_ray(["stop"])
# Test starting Ray with the number of GPUs specified.
check_call_ray(["start", "--head", "--num-gpus", "100"])
check_call_ray(["start", "--head", "--num-gpus", "100", "--port", "0"])
check_call_ray(["stop"])
# Test starting Ray with redis shard ports specified.
check_call_ray(
["start", "--head", "--redis-shard-ports", "6380,6381,6382"])
check_call_ray([
"start", "--head", "--redis-shard-ports", "6380,6381,6382", "--port",
"0"
])
check_call_ray(["stop"])
# Test starting Ray with all arguments specified.
check_call_ray([
"start", "--head", "--redis-shard-ports", "6380,6381,6382",
"--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", "0",
"--resources", "{\"Custom\": 1}"
"--resources", "{\"Custom\": 1}", "--port", "0"
])
check_call_ray(["stop"])
# Test starting Ray with invalid arguments.
with pytest.raises(subprocess.CalledProcessError):
check_call_ray(["start", "--head", "--address", "127.0.0.1:6379"])
check_call_ray(
["start", "--head", "--address", "127.0.0.1:6379", "--port", "0"])
check_call_ray(["stop"])
# Test --block. Killing a child process should cause the command to exit.
blocked = subprocess.Popen(["ray", "start", "--head", "--block"])
blocked = subprocess.Popen(
["ray", "start", "--head", "--block", "--port", "0"])
wait_for_children_of_pid(blocked.pid, num_children=7, timeout=30)
@ -447,7 +449,8 @@ def test_calling_start_ray_head(call_ray_stop_only):
assert blocked.returncode != 0, "ray start shouldn't return 0 on bad exit"
# Test --block. Killing the command should clean up all child processes.
blocked = subprocess.Popen(["ray", "start", "--head", "--block"])
blocked = subprocess.Popen(
["ray", "start", "--head", "--block", "--port", "0"])
blocked.poll()
assert blocked.returncode is None

View file

@ -42,7 +42,7 @@ def test_tempdir_commandline():
shutil.rmtree(ray.utils.get_ray_temp_dir(), ignore_errors=True)
check_call_ray([
"start", "--head", "--temp-dir=" + os.path.join(
ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2")
ray.utils.get_user_temp_dir(), "i_am_a_temp_dir2"), "--port", "0"
])
assert os.path.exists(
os.path.join(ray.utils.get_user_temp_dir(),