Revert "[Core][GCS] Use port and address flags to configure GCS server / client in GCS bootstrapping mode (#21115)" (#21157)

This reverts commit 0e7c0b491b.
This commit is contained in:
Chen Shen 2021-12-17 11:48:40 -08:00 committed by GitHub
parent ce81ad21f3
commit d99f699e3d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 101 additions and 237 deletions

View file

@ -206,9 +206,10 @@ class JobSupervisor:
"runtime_env": self._runtime_env,
"metadata": self._metadata,
})
ray_address = ray._private.services.get_ray_address_to_use_or_die()
os.environ[
ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE] = ray_address
ray_redis_address = ray._private.services.find_redis_address_or_die( # noqa: E501
)
os.environ[ray_constants.
RAY_ADDRESS_ENVIRONMENT_VARIABLE] = ray_redis_address
log_path = self._log_client.get_log_file_path(self._job_id)
child_process = self._exec_entrypoint(log_path)

View file

@ -12,9 +12,10 @@ class RayParams:
"""A class used to store the parameters used by Ray.
Attributes:
bootstrap_address (str): The address of Redis / GCS server to connect
to for bootstrapping. If this address is not provided, then this
command will start the Ray cluster.
external_addresses (str): The address of external Redis server to
connect to, in format of "ip1:port1,ip2:port2,...". If this
address is provided, then ray won't start Redis instances in the
head node but use external Redis server(s) instead.
redis_address (str): The address of the Redis server to connect to. If
this address is not provided, then this command will start Redis, a
raylet, a plasma store, a plasma manager, and some workers.
@ -61,10 +62,6 @@ class RayParams:
processes should be redirected to files.
redirect_output (bool): True if stdout and stderr for non-worker
processes should be redirected to files and false otherwise.
external_addresses (str): The address of external Redis server to
connect to, in format of "ip1:port1,ip2:port2,...". If this
address is provided, then ray won't start Redis instances in the
head node but use external Redis server(s) instead.
num_redis_shards: The number of Redis shards to start in addition to
the primary Redis shard.
redis_max_clients: If provided, attempt to configure Redis with this
@ -125,7 +122,7 @@ class RayParams:
"""
def __init__(self,
bootstrap_address=None,
external_addresses=None,
redis_address=None,
num_cpus=None,
num_gpus=None,
@ -148,7 +145,6 @@ class RayParams:
driver_mode=None,
redirect_worker_output=None,
redirect_output=None,
external_addresses=None,
num_redis_shards=None,
redis_max_clients=None,
redis_password=ray_constants.REDIS_DEFAULT_PASSWORD,
@ -177,7 +173,8 @@ class RayParams:
tracing_startup_hook=None,
no_monitor=False,
env_vars=None):
self.bootstrap_address = bootstrap_address
self.object_ref_seed = object_ref_seed
self.external_addresses = external_addresses
self.redis_address = redis_address
self.num_cpus = num_cpus
self.num_gpus = num_gpus
@ -199,7 +196,6 @@ class RayParams:
self.driver_mode = driver_mode
self.redirect_worker_output = redirect_worker_output
self.redirect_output = redirect_output
self.external_addresses = external_addresses
self.num_redis_shards = num_redis_shards
self.redis_max_clients = redis_max_clients
self.redis_password = redis_password
@ -222,7 +218,6 @@ class RayParams:
self.metrics_export_port = metrics_export_port
self.tracing_startup_hook = tracing_startup_hook
self.no_monitor = no_monitor
self.object_ref_seed = object_ref_seed
self.start_initial_python_workers_for_first_job = (
start_initial_python_workers_for_first_job)
self.ray_debugger_external = ray_debugger_external

View file

@ -178,7 +178,7 @@ def new_port(lower_bound=10000, upper_bound=65535, denylist=None):
return port
def find_redis_address():
def find_redis_address(address=None):
"""
Attempts to find all valid Ray redis addresses on this node.
@ -255,6 +255,8 @@ def find_redis_address():
# TODO(ekl): Find a robust solution for locating Redis.
if arg.startswith("--redis-address="):
proc_addr = arg.split("=")[1]
if address is not None and address != proc_addr:
continue
redis_addresses.add(proc_addr)
except psutil.AccessDenied:
pass
@ -263,11 +265,19 @@ def find_redis_address():
return redis_addresses
def _find_redis_address_or_die():
"""Find one Redis address unambiguously, or raise an error.
Callers outside of this module should use get_ray_address_to_use_or_die()
def get_ray_address_to_use_or_die():
"""
Attempts to find an address for an existing Ray cluster if it is not
already specified as an environment variable.
Returns:
A string to pass into `ray.init(address=...)`
"""
return os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE,
find_redis_address_or_die())
def find_redis_address_or_die():
redis_addresses = find_redis_address()
if len(redis_addresses) > 1:
raise ConnectionError(
@ -281,123 +291,6 @@ def _find_redis_address_or_die():
return redis_addresses.pop()
def find_gcs_address():
"""
Attempts to find all valid Ray GCS address on this node.
Returns:
Set of detected Redis instances.
"""
# Currently, this extracts the --gcs_address flag from the command
# that launched the raylet running on this node, if any. Anyone looking to
# edit this function should be warned that these commands look like, for
# example:
# /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet
# --gcs_address=123.456.78.910:6379 --node_ip_address=123.456.78.910
# --raylet_socket_name=... --store_socket_name=... --object_manager_port=0
# --min_worker_port=10000 --max_worker_port=19999
# --node_manager_port=58578 --maximum_startup_concurrency=8
# --static_resource_list=node:123.456.78.910,1.0,object_store_memory,66
# --config_list=plasma_store_as_thread,True
# --python_worker_command=/usr/bin/python
# /usr/local/lib/python3.8/dist-packages/ray/workers/default_worker.py
# --gcs-address=123.456.78.910:6379
# --node-ip-address=123.456.78.910 --node-manager-port=58578
# --object-store-name=... --raylet-name=...
# --temp-dir=/tmp/ray
# --metrics-agent-port=41856
# --java_worker_command= --cpp_worker_command=
# --temp_dir=/tmp/ray --session_dir=...
# --metrics-agent-port=41856 --metrics_export_port=64229
# --agent_command=/usr/bin/python
# -u /usr/local/lib/python3.8/dist-packages/ray/dashboard/agent.py
# --gcs-address=123.456.78.910:6379 --metrics-export-port=64229
# --dashboard-agent-port=41856 --node-manager-port=58578
# --object-store-name=... --raylet-name=... --temp-dir=/tmp/ray
# --log-dir=/tmp/ray/session_2020-11-08_14-29-07_199128_278000/logs
# --object_store_memory=5037192806 --plasma_directory=/tmp
# Longer arguments are elided with ... but all arguments from this instance
# are included, to provide a sense of what is in these.
# Indeed, we had to pull --gcs_address to the front of each call to make
# this readable.
# As you can see, this is very long and complex, which is why we can't
# simply extract all the the arguments using regular expressions and
# present a dict as if we never lost track of these arguments, for
# example. Picking out --gcs_address below looks like it might grab the
# wrong thing, but double-checking that we're finding the correct process
# by checking that the contents look like we expect would probably be prone
# to choking in unexpected ways.
pids = psutil.pids()
gcs_addresses = set()
for pid in pids:
try:
proc = psutil.Process(pid)
# HACK: Workaround for UNIX idiosyncrasy
# Normally, cmdline() is supposed to return the argument list.
# But it in some cases (such as when setproctitle is called),
# an arbitrary string resembling a command-line is stored in
# the first argument.
# Explanation: https://unix.stackexchange.com/a/432681
# More info: https://github.com/giampaolo/psutil/issues/1179
cmdline = proc.cmdline()
# NOTE(kfstorm): To support Windows, we can't use
# `os.path.basename(cmdline[0]) == "raylet"` here.
if len(cmdline) > 0 and "raylet" in os.path.basename(cmdline[0]):
for arglist in cmdline:
# Given we're merely seeking --redis-address, we just split
# every argument on spaces for now.
for arg in arglist.split(" "):
# TODO(ekl): Find a robust solution for locating Redis.
if arg.startswith("--gcs_address="):
proc_addr = arg.split("=")[1]
gcs_addresses.add(proc_addr)
except psutil.AccessDenied:
pass
except psutil.NoSuchProcess:
pass
return gcs_addresses
def _find_gcs_address_or_die():
"""Find one GCS address unambiguously, or raise an error.
Callers outside of this module should use get_ray_address_to_use_or_die()
"""
gcs_addresses = find_gcs_address()
if len(gcs_addresses) > 1:
raise ConnectionError(
f"Found multiple active Ray instances: {gcs_addresses}. "
"Please specify the one to connect to by setting `--address` flag "
"or `RAY_ADDRESS` environment variable.")
sys.exit(1)
elif not gcs_addresses:
raise ConnectionError(
"Could not find any running Ray instance. "
"Please specify the one to connect to by setting `--address` flag "
"or `RAY_ADDRESS` environment variable.")
return gcs_addresses.pop()
def bootstrap_with_gcs() -> bool:
"""Returns whether bootstrapping should be done with GCS."""
return os.environ.get("RAY_bootstrap_with_gcs") not in \
[None, "0", "false"]
def get_ray_address_to_use_or_die():
"""
Attempts to find an address for an existing Ray cluster if it is not
already specified as an environment variable.
Returns:
A string to pass into `ray.init(address=...)`
"""
return os.environ.get(
ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE,
_find_gcs_address_or_die()
if bootstrap_with_gcs() else _find_redis_address_or_die())
def wait_for_node(redis_address,
node_plasma_store_socket_name,
redis_password=None,
@ -474,33 +367,32 @@ def remaining_processes_alive():
return ray.worker._global_node.remaining_processes_alive()
def validate_bootstrap_address(addr):
"""Validates address parameter, and extract the host and IP.
def validate_redis_address(address):
"""Validates address parameter.
Returns:
bootstrap_address: string containing the full <host:port> address for
bootstrapping.
ip: string representing the host portion of the address.
port: integer representing the port portion of the address.
redis_address: string containing the full <host:port> address.
redis_ip: string representing the host portion of the address.
redis_port: integer representing the port portion of the address.
"""
if addr == "auto":
addr = get_ray_address_to_use_or_die()
bootstrap_address = address_to_ip(addr)
if address == "auto":
address = find_redis_address_or_die()
redis_address = address_to_ip(address)
address_parts = bootstrap_address.split(":")
if len(address_parts) != 2:
redis_address_parts = redis_address.split(":")
if len(redis_address_parts) != 2:
raise ValueError("Malformed address. Expected '<host>:<port>'.")
ip = address_parts[0]
redis_ip = redis_address_parts[0]
try:
port = int(address_parts[1])
redis_port = int(redis_address_parts[1])
except ValueError:
raise ValueError("Malformed address port. Must be an integer.")
if port < 1024 or port > 65535:
raise ValueError("Invalid address port. Must be between 1024 and "
"65535 (inclusive).")
if redis_port < 1024 or redis_port > 65535:
raise ValueError("Invalid address port. Must "
"be between 1024 and 65535.")
return bootstrap_address, ip, port
return redis_address, redis_ip, redis_port
def address_to_ip(address):
@ -589,7 +481,7 @@ def create_redis_client(redis_address, password=None):
except Exception:
create_redis_client.instances.pop(redis_address)
_, redis_ip_address, redis_port = validate_bootstrap_address(redis_address)
_, redis_ip_address, redis_port = validate_redis_address(redis_address)
# For this command to work, some other client (on the same machine
# as Redis) must have run "CONFIG SET protected-mode no".
create_redis_client.instances[redis_address] = redis.StrictRedis(

View file

@ -27,8 +27,10 @@ def memory_summary(address=None,
stats_only=False,
num_entries=None):
from ray.dashboard.memory_utils import memory_summary
if not address or address == "auto":
if not address:
address = services.get_ray_address_to_use_or_die()
if address == "auto":
address = services.find_redis_address_or_die()
state = GlobalState()
state._initialize_global_state(

View file

@ -469,11 +469,9 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
metrics_export_port, no_monitor, tracing_startup_hook,
ray_debugger_external):
"""Start Ray processes manually on the local machine."""
if services.bootstrap_with_gcs() and gcs_server_port is not None:
cli_logger.abort("`{}` is deprecated. Specify {} instead.",
cf.bold("--gcs-server-port"), cf.bold("--port"))
if gcs_server_port and not head:
raise ValueError(
"`--gcs-server-port` is deprecated. Specify `--port` instead.")
"gcs_server_port can be only assigned when you specify --head.")
# Convert hostnames to numerical IP address.
if node_ip_address is not None:
@ -513,7 +511,6 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
num_cpus=num_cpus,
num_gpus=num_gpus,
resources=resources,
autoscaling_config=autoscaling_config,
plasma_directory=plasma_directory,
huge_pages=False,
plasma_store_socket_name=plasma_store_socket_name,
@ -530,23 +527,16 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
no_monitor=no_monitor,
tracing_startup_hook=tracing_startup_hook,
ray_debugger_external=ray_debugger_external)
if head:
# Start head node.
# Use default if port is none, allocate an available port if port is 0
if port is None:
port = ray_constants.DEFAULT_PORT
elif port == 0:
if port == 0:
with socket() as s:
s.bind(("", 0))
port = s.getsockname()[1]
# Override GCS port to `--port`.
if services.bootstrap_with_gcs():
assert ray_params.gcs_server_port is None
ray_params.gcs_server_port = port
if os.environ.get("RAY_FAKE_CLUSTER"):
ray_params.env_vars = {
"RAY_OVERRIDE_NODE_ID_FOR_TESTING": FAKE_HEAD_NODE_ID
@ -598,26 +588,24 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address())
cli_logger.labeled_value("Local node IP", ray_params.node_ip_address)
# Initialize Redis settings.
ray_params.update_if_absent(
redis_port=port,
redis_shard_ports=redis_shard_ports,
redis_max_memory=redis_max_memory,
num_redis_shards=num_redis_shards,
redis_max_clients=None,
autoscaling_config=autoscaling_config,
)
# Fail early when starting a new cluster when one is already running
if address is None:
default_address = f"{ray_params.node_ip_address}:{port}"
redis_addresses = services.find_redis_address()
if default_address in redis_addresses:
redis_addresses = services.find_redis_address(default_address)
if len(redis_addresses) > 0:
raise ConnectionError(
f"Ray is trying to start at {default_address}, "
f"but is already running at {redis_addresses}. "
"Please specify a different port using the `--port`"
" command to `ray start`.")
f"Ray is already running at {default_address}. "
f"Please specify a different port using the `--port`"
f" command to `ray start`.")
node = ray.node.Node(
ray_params, head=True, shutdown_at_exit=block, spawn_reaper=block)
@ -682,68 +670,57 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
cli_logger.print("To terminate the Ray runtime, run")
cli_logger.print(cf.bold(" ray stop"))
else:
# Start worker node.
# Ensure `--address` flag is specified.
if address is None:
cli_logger.abort(
"`{}` is a required flag unless starting a head "
"node with `{}`.", cf.bold("--address"), cf.bold("--head"))
raise Exception("`--address` is a required flag unless starting a "
"head node with `--head`.")
# Raise error if any head-only flag are specified.
head_only_flags = {
"--port": port,
"--redis-shard-ports": redis_shard_ports,
"--include-dashboard": include_dashboard
}
for flag, val in head_only_flags.items():
if val is None:
continue
cli_logger.abort(
"`{}` should only be specified when starting head "
"node with `{}`.", cf.bold(flag), cf.bold("--head"))
raise ValueError(
f"{flag} should only be specified when starting head node "
"with `--head`.")
# Start Ray on a non-head node.
bootstrap_address, address_ip, address_port = \
services.validate_bootstrap_address(address)
redis_address = None
if address is not None:
(redis_address, redis_address_ip,
redis_address_port) = services.validate_redis_address(address)
if not (port is None):
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--port"), cf.bold("--head"))
if bootstrap_address is None:
cli_logger.abort("Cannot extract host IP and port from `{}={}`.",
cf.bold("--address"), cf.bold(address))
raise Exception("Cannot extract host IP and port from "
f"`--address={address}`.")
ray_params.update(bootstrap_address=bootstrap_address)
raise Exception("If --head is not passed in, --port is not "
"allowed.")
if redis_shard_ports is not None:
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--redis-shard-ports"), cf.bold("--head"))
if services.bootstrap_with_gcs():
raise NotImplementedError(
"Check version info via GCS is not implemented.")
else:
# Wait for the Redis server to be started. And throw an exception
# if we can't connect to it.
services.wait_for_redis_to_start(
address_ip, address_port, password=redis_password)
raise Exception("If --head is not passed in, --redis-shard-ports "
"is not allowed.")
if redis_address is None:
cli_logger.abort("`{}` is required unless starting with `{}`.",
cf.bold("--address"), cf.bold("--head"))
# Create a Redis client.
redis_client = services.create_redis_client(
address_ip, password=redis_password)
raise Exception("If --head is not passed in, --address must "
"be provided.")
if include_dashboard:
cli_logger.abort("`{}` should not be specified without `{}`.",
cf.bold("--include-dashboard"), cf.bold("--head"))
# Check that the version information on this node matches the
# version information that the cluster was started with.
services.check_version_info(redis_client)
raise ValueError(
"If --head is not passed in, the --include-dashboard"
"flag is not relevant.")
ray_params.update(redis_address=bootstrap_address)
# Wait for the Redis server to be started. And throw an exception if we
# can't connect to it.
services.wait_for_redis_to_start(
redis_address_ip, redis_address_port, password=redis_password)
# Create a Redis client.
redis_client = services.create_redis_client(
redis_address, password=redis_password)
# Check that the version information on this node matches the version
# information that the cluster was started with.
services.check_version_info(redis_client)
# Get the node IP address if one is not provided.
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address(address_ip))
node_ip_address=services.get_node_ip_address(redis_address))
cli_logger.labeled_value("Local node IP", ray_params.node_ip_address)
ray_params.update(redis_address=redis_address)
node = ray.node.Node(
ray_params, head=False, shutdown_at_exit=block, spawn_reaper=block)

View file

@ -841,18 +841,16 @@ def init(
raylet_ip_address = node_ip_address
if address:
bootstrap_address, _, _ = services.validate_bootstrap_address(address)
redis_address, _, _ = services.validate_redis_address(address)
else:
bootstrap_address = None
redis_address = None if services.bootstrap_with_gcs(
) else bootstrap_address
redis_address = None
if configure_logging:
setup_logger(logging_level, logging_format)
if bootstrap_address is not None:
logger.info("Connecting to existing Ray cluster at address: "
f"{bootstrap_address}")
if redis_address is not None:
logger.info(
f"Connecting to existing Ray cluster at address: {redis_address}")
if local_mode:
driver_mode = LOCAL_MODE
@ -875,10 +873,9 @@ def init(
raise TypeError("The _system_config must be a dict.")
global _global_node
if bootstrap_address is None:
if redis_address is None:
# In this case, we need to start a new cluster.
ray_params = ray._private.parameter.RayParams(
bootstrap_address=bootstrap_address,
redis_address=redis_address,
node_ip_address=node_ip_address,
raylet_ip_address=raylet_ip_address,