[GCS][Bootstrap n/n] Do not start Redis in GCS bootstrapping mode (#21232)

After this change in GCS bootstrapping mode, Redis no longer starts and `address` is treated as the GCS address of the Ray cluster.

Co-authored-by: Yi Cheng <chengyidna@gmail.com>
Co-authored-by: Yi Cheng <74173148+iycheng@users.noreply.github.com>
This commit is contained in:
mwtian 2022-01-04 23:06:44 -08:00 committed by GitHub
parent 8884cf0f4f
commit 70db5c5592
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 153 additions and 102 deletions

View file

@ -325,16 +325,10 @@
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TORCH_VERSION=1.6 ./ci/travis/install-dependencies.sh
- ./dashboard/tests/run_ui_tests.sh
# Enable job related tests after ray.init can accept gcs address
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_env=RAY_gcs_grpc_based_pubsub=1
--test_env=RAY_bootstrap_with_gcs=1
--test_env=RAY_gcs_storage=memory -- //python/ray/dashboard/...
-//python/ray/dashboard:test_actor
-//python/ray/dashboard:test_job_manager
-//python/ray/dashboard:test_cli_integration
-//python/ray/dashboard:test_http_job_server
-//python/ray/dashboard:test_job_submission
--test_env=RAY_gcs_storage=memory -- //python/ray/dashboard/... -//python/ray/dashboard:test_job_manager
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-post_wheel_build
--test_env=RAY_gcs_grpc_based_pubsub=1

View file

@ -32,7 +32,7 @@ class Pinger:
a = Pinger.options(lifetime={lifetime}, name={name}).remote()
ray.get(a.ping.remote())
"""
address = ray_start_with_dashboard["redis_address"]
address = ray_start_with_dashboard["address"]
detached_driver = driver_template.format(
address=address, lifetime="'detached'", name="'abc'")
named_driver = driver_template.format(
@ -84,7 +84,7 @@ import ray
from ray import serve
ray.init(
address="{ray_start_with_dashboard['redis_address']}",
address="{ray_start_with_dashboard['address']}",
namespace="serve")
serve.start(detached=True)

View file

@ -270,7 +270,10 @@ def _find_address_from_flag(flag: str):
# TODO(ekl): Find a robust solution for locating Redis.
if arg.startswith(flag):
proc_addr = arg.split("=")[1]
addresses.add(proc_addr)
# TODO(mwtian): remove this workaround after Ray
# no longer sets --redis-address to None.
if proc_addr != "" and proc_addr != "None":
addresses.add(proc_addr)
except psutil.AccessDenied:
pass
except psutil.NoSuchProcess:
@ -282,9 +285,15 @@ def find_redis_address():
return _find_address_from_flag("--redis-address")
def find_gcs_address():
return _find_address_from_flag("--gcs-address")
def find_bootstrap_address():
# TODO(mwtian): add find_gcs_address()
return find_redis_address()
if use_gcs_for_bootstrap():
return find_gcs_address()
else:
return find_redis_address()
def _find_redis_address_or_die():
@ -306,6 +315,26 @@ def _find_redis_address_or_die():
return redis_addresses.pop()
def _find_gcs_address_or_die():
"""Find one GCS address unambiguously, or raise an error.
Callers outside of this module should use get_ray_address_to_use_or_die()
"""
gcs_addresses = _find_address_from_flag("--gcs-address")
if len(gcs_addresses) > 1:
raise ConnectionError(
f"Found multiple active Ray instances: {gcs_addresses}. "
"Please specify the one to connect to by setting `--address` flag "
"or `RAY_ADDRESS` environment variable.")
sys.exit(1)
elif not gcs_addresses:
raise ConnectionError(
"Could not find any running Ray instance. "
"Please specify the one to connect to by setting `--address` flag "
"or `RAY_ADDRESS` environment variable.")
return gcs_addresses.pop()
def get_ray_address_from_environment():
"""
Attempts to find the address of Ray cluster to use, first from
@ -316,8 +345,10 @@ def get_ray_address_from_environment():
"""
addr = os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE)
if addr is None or addr == "auto":
# TODO(mwtian): support _find_gcs_address_or_die()
addr = _find_redis_address_or_die()
if use_gcs_for_bootstrap():
addr = _find_gcs_address_or_die()
else:
addr = _find_redis_address_or_die()
return addr
@ -424,13 +455,12 @@ def canonicalize_bootstrap_address(addr: str):
def extract_ip_port(bootstrap_address: str):
address_parts = bootstrap_address.split(":")
if len(address_parts) != 2:
if ":" not in bootstrap_address:
raise ValueError(f"Malformed address {bootstrap_address}. "
"Expected '<host>:<port>'.")
ip = address_parts[0]
f"Expected '<host>:<port>'.")
ip, _, port = bootstrap_address.rpartition(":")
try:
port = int(address_parts[1])
port = int(port)
except ValueError:
raise ValueError(f"Malformed address port {port}. Must be an integer.")
if port < 1024 or port > 65535:
@ -453,6 +483,8 @@ def address_to_ip(address: str):
The same address but with the hostname replaced by a numerical IP
address.
"""
if not address:
raise ValueError(f"Malformed address: {address}")
address_parts = address.split(":")
ip_address = socket.gethostbyname(address_parts[0])
# Make sure localhost isn't resolved to the loopback ip
@ -1519,9 +1551,6 @@ def start_raylet(redis_address,
resource_argument = ",".join(
["{},{}".format(*kv) for kv in static_resources.items()])
# TODO (iycheng): remove redis_ip_address after redis removal
redis_ip_address, redis_port = redis_address.split(":")
has_java_command = False
if shutil.which("java") is not None:
has_java_command = True
@ -1621,15 +1650,12 @@ def start_raylet(redis_address,
command = [
RAYLET_EXECUTABLE,
f"--raylet_socket_name={raylet_name}",
f"--gcs-address={gcs_address}",
f"--store_socket_name={plasma_store_name}",
f"--object_manager_port={object_manager_port}",
f"--min_worker_port={min_worker_port}",
f"--max_worker_port={max_worker_port}",
f"--node_manager_port={node_manager_port}",
f"--node_ip_address={node_ip_address}",
f"--redis_address={redis_ip_address}",
f"--redis_port={redis_port}",
f"--maximum_startup_concurrency={maximum_startup_concurrency}",
f"--static_resource_list={resource_argument}",
f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}", # noqa
@ -1647,6 +1673,15 @@ def start_raylet(redis_address,
f"--plasma_directory={plasma_directory}",
f"--ray-debugger-external={1 if ray_debugger_external else 0}",
]
if use_gcs_for_bootstrap():
command.append(f"--gcs-address={gcs_address}")
else:
# TODO (iycheng): remove redis_ip_address after redis removal
redis_ip_address, redis_port = redis_address.split(":")
command.extend([
f"--redis_address={redis_ip_address}", f"--redis_port={redis_port}"
])
if worker_port_list is not None:
command.append(f"--worker_port_list={worker_port_list}")
if start_initial_python_workers_for_first_job:

View file

@ -96,6 +96,8 @@ class FakeMultiNodeProvider(NodeProvider):
resources=resources,
redis_address="{}:6379".format(
ray._private.services.get_node_ip_address()),
gcs_address="{}:6379".format(
ray._private.services.get_node_ip_address()),
env_vars={
"RAY_OVERRIDE_NODE_ID_FOR_TESTING": next_id,
"RAY_OVERRIDE_RESOURCES": json.dumps(resources),

View file

@ -142,8 +142,10 @@ class Cluster:
@property
def address(self):
# TODO(mwtian): use self.gcs_address when use_gcs_for_bootstrap():
return self.redis_address
if use_gcs_for_bootstrap():
return self.gcs_address
else:
return self.redis_address
def connect(self, namespace=None):
"""Connect the driver to the cluster."""

View file

@ -5,6 +5,7 @@ import ray._private.profiling as profiling
import ray._private.utils as utils
from ray import ray_constants
from ray.state import GlobalState
from ray._private.gcs_utils import use_gcs_for_bootstrap
from ray._raylet import GcsClientOptions
__all__ = ["free", "global_gc"]
@ -30,8 +31,11 @@ def memory_summary(address=None,
address = services.canonicalize_bootstrap_address(address)
state = GlobalState()
state._initialize_global_state(
GcsClientOptions.from_redis_address(address, redis_password))
if use_gcs_for_bootstrap():
options = GcsClientOptions.from_gcs_address(address)
else:
options = GcsClientOptions.from_redis_address(address, redis_password)
state._initialize_global_state(options)
if stats_only:
return get_store_stats(state)
return (memory_summary(state, group_by, sort_by, line_wrap, units,

View file

@ -144,10 +144,6 @@ class Node:
self._gcs_client = None
if not self.head:
# TODO(mwtian): remove after supporting bootstraapping with GCS
# address.
if self._redis_address and not self._gcs_address:
self._gcs_address = self._get_gcs_address_from_redis()
self.validate_ip_port(self.address)
self.get_gcs_client()
@ -459,6 +455,8 @@ class Node:
`ray start` or `ray.int()` to start worker nodes, that has been
converted to ip:port format.
"""
if use_gcs_for_bootstrap():
return self._gcs_address
return self._redis_address
@property
@ -1042,9 +1040,9 @@ class Node:
assert self._gcs_client is None
# If this is the head node, start the relevant head node processes.
# TODO(mwtian): guard with `if not use_gcs_for_bootstrap():`
self.start_redis()
assert self._redis_address is not None
if not use_gcs_for_bootstrap():
self.start_redis()
assert self._redis_address is not None
self.start_gcs_server()
assert self._gcs_client is not None

View file

@ -527,18 +527,18 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
if port is None:
port = ray_constants.DEFAULT_PORT
# TODO(mwtian): use a more robust mechanism to avoid collision,
# e.g. node._get_cached_port()
if port == 0:
with socket() as s:
s.bind(("", 0))
port = s.getsockname()[1]
# Set bootstrap port.
# TODO(mwtian): set bootstrap port as GCS server port.
ray_params.redis_port = port
if not use_gcs_for_bootstrap():
assert ray_params.redis_port is None
assert ray_params.gcs_server_port is None
if use_gcs_for_bootstrap():
ray_params.gcs_server_port = port
else:
if port == 0:
with socket() as s:
s.bind(("", 0))
port = s.getsockname()[1]
ray_params.redis_port = port
ray_params.gcs_server_port = gcs_server_port
if os.environ.get("RAY_FAKE_CLUSTER"):
@ -715,10 +715,10 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
raise Exception("Cannot canonicalize address "
f"`--address={address}`.")
# TODO(mwtian): set ray_params.gcs_address in GCS bootstrap mode.
ray_params.redis_address = bootstrap_address
if not use_gcs_for_bootstrap():
if use_gcs_for_bootstrap():
ray_params.gcs_address = bootstrap_address
else:
ray_params.redis_address = bootstrap_address
address_ip, address_port = services.extract_ip_port(
bootstrap_address)
# Wait for the Redis server to be started. And throw an exception
@ -1485,10 +1485,13 @@ def memory(address, redis_password, group_by, sort_by, units, no_format,
def status(address, redis_password):
"""Print cluster status, including autoscaling info."""
address = services.canonicalize_bootstrap_address(address)
redis_client = ray._private.services.create_redis_client(
address, redis_password)
gcs_client = ray._private.gcs_utils.GcsClient.create_from_redis(
redis_client)
if use_gcs_for_bootstrap():
gcs_client = ray._private.gcs_utils.GcsClient(address=address)
else:
redis_client = ray._private.services.create_redis_client(
address, redis_password)
gcs_client = ray._private.gcs_utils.GcsClient.create_from_redis(
redis_client)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
status = ray.experimental.internal_kv._internal_kv_get(
DEBUG_AUTOSCALING_STATUS)
@ -1737,17 +1740,19 @@ def healthcheck(address, redis_password, component):
"""
address = services.canonicalize_bootstrap_address(address)
redis_client = ray._private.services.create_redis_client(
address, redis_password)
if use_gcs_for_bootstrap():
gcs_address = address
else:
# If client creation or ping fails, this will exit with a non-zero
# exit code.
redis_client = ray._private.services.create_redis_client(
address, redis_password)
redis_client.ping()
gcs_address = redis_client.get("GcsServerAddress").decode()
if not component:
# If no component is specified, we are health checking the core. If
# client creation or ping fails, we will still exit with a non-zero
# exit code.
redis_client.ping()
try:
# TODO: add feature to ray._private.GcsClient to share channel
gcs_address = redis_client.get("GcsServerAddress").decode()
options = (("grpc.enable_http_proxy", 0), )
channel = ray._private.utils.init_grpc_channel(
gcs_address, options)
@ -1760,8 +1765,8 @@ def healthcheck(address, redis_password, component):
except Exception:
pass
sys.exit(1)
gcs_client = ray._private.gcs_utils.GcsClient.create_from_redis(
redis_client)
gcs_client = ray._private.gcs_utils.GcsClient(address=gcs_address)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
report_str = ray.experimental.internal_kv._internal_kv_get(
component, namespace=ray_constants.KV_NAMESPACE_HEALTHCHECK)

View file

@ -38,7 +38,7 @@ from testfixtures.popen import MockPopen, PopenBehaviour
import ray
import ray.autoscaler._private.aws.config as aws_config
from ray.cluster_utils import Cluster, cluster_not_supported
from ray.cluster_utils import cluster_not_supported
import ray.scripts.scripts as scripts
from ray._private.test_utils import wait_for_condition
@ -467,7 +467,7 @@ def test_ray_submit(configure_lang, configure_aws, _unlink_test_ssh_key):
_check_output_via_pattern("test_ray_submit.txt", result)
def test_ray_status():
def test_ray_status(shutdown_only, monkeypatch):
import ray
address = ray.init(num_cpus=3).get("address")
runner = CliRunner()
@ -475,7 +475,10 @@ def test_ray_status():
def output_ready():
result = runner.invoke(scripts.status)
result.stdout
return not result.exception and "memory" in result.output
if not result.exception and "memory" in result.output:
return True
raise RuntimeError(f"result.exception={result.exception} "
f"result.output={result.output}")
wait_for_condition(output_ready)
@ -486,18 +489,18 @@ def test_ray_status():
_check_output_via_pattern("test_ray_status.txt", result_arg)
# Try to check status with RAY_ADDRESS set
os.environ["RAY_ADDRESS"] = address
monkeypatch.setenv("RAY_ADDRESS", address)
result_env = runner.invoke(scripts.status)
_check_output_via_pattern("test_ray_status.txt", result_env)
result_env_arg = runner.invoke(scripts.status, ["--address", address])
_check_output_via_pattern("test_ray_status.txt", result_env_arg)
ray.shutdown()
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_ray_status_multinode():
cluster = Cluster()
@pytest.mark.xfail(
cluster_not_supported, reason="cluster not supported on Windows")
def test_ray_status_multinode(ray_start_cluster):
cluster = ray_start_cluster
for _ in range(4):
cluster.add_node(num_cpus=2)
runner = CliRunner()
@ -514,8 +517,6 @@ def test_ray_status_multinode():
result = runner.invoke(scripts.status, [])
_check_output_via_pattern("test_ray_status_multinode.txt", result)
ray.shutdown()
cluster.shutdown()
@pytest.mark.skipif(

View file

@ -156,7 +156,7 @@ while True:
p4 = run_string_as_driver_nonblocking(blocking_noaddr_script)
wait_for_condition(
lambda: len(ray._private.services.find_redis_address()) == 4,
lambda: len(ray._private.services.find_bootstrap_address()) == 4,
retry_interval_ms=1000)
p1.kill()
@ -174,13 +174,13 @@ while True:
run_string_as_driver("""
import ray
ray.client().connect()
assert len(ray._private.services.find_redis_address()) == 1
assert len(ray._private.services.find_bootstrap_address()) == 1
""")
# ray.client("local").connect() should always create a new cluster even if
# there's one running.
p1 = run_string_as_driver_nonblocking(blocking_local_script)
wait_for_condition(
lambda: len(ray._private.services.find_redis_address()) == 2,
lambda: len(ray._private.services.find_bootstrap_address()) == 2,
retry_interval_ms=1000)
p1.kill()
subprocess.check_output("ray stop --force", shell=True)

View file

@ -29,7 +29,7 @@ from ray._private.parameter import RayParams
from ray._private.runtime_env.context import RuntimeEnvContext
from ray._private.services import ProcessInfo, start_ray_client_server
from ray._private.tls_utils import add_port_to_grpc_server
from ray._private.gcs_utils import GcsClient
from ray._private.gcs_utils import GcsClient, use_gcs_for_bootstrap
from ray._private.utils import (detect_fate_sharing_support,
check_dashboard_dependencies_installed)
@ -172,10 +172,12 @@ class ProxyManager():
"""
if self._node:
return self._node
ray_params = RayParams(redis_address=self.address)
if self._redis_password:
ray_params.redis_password = self._redis_password
if use_gcs_for_bootstrap():
ray_params = RayParams(gcs_address=self.address)
else:
ray_params = RayParams(redis_address=self.address)
if self._redis_password:
ray_params.redis_password = self._redis_password
self._node = ray.node.Node(
ray_params,
@ -746,10 +748,15 @@ def serve_proxier(connection_str: str,
# before calling ray.init within the RayletServicers.
# NOTE(edoakes): redis_address and redis_password should only be None in
# tests.
if address is not None and redis_password is not None:
gcs_cli = GcsClient.connect_to_gcs_by_redis_address(
address, redis_password)
ray.experimental.internal_kv._initialize_internal_kv(gcs_cli)
if use_gcs_for_bootstrap():
if address is not None:
gcs_cli = GcsClient(address=address)
ray.experimental.internal_kv._initialize_internal_kv(gcs_cli)
else:
if address is not None and redis_password is not None:
gcs_cli = GcsClient.connect_to_gcs_by_redis_address(
address, redis_password)
ray.experimental.internal_kv._initialize_internal_kv(gcs_cli)
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=CLIENT_SERVER_MAX_THREADS),

View file

@ -36,6 +36,7 @@ from ray.util.client.server.server_stubs import current_server
from ray.ray_constants import env_integer
from ray._private.client_mode_hook import disable_client_hook
from ray._private.tls_utils import add_port_to_grpc_server
from ray._private.gcs_utils import use_gcs_for_bootstrap
logger = logging.getLogger(__name__)
@ -812,11 +813,14 @@ def main():
try:
if not ray.experimental.internal_kv._internal_kv_initialized():
# TODO(mwtian): bootstrap with GCS address
redis_client = try_create_redis_client(
args.address, args.redis_password)
gcs_client = (ray._private.gcs_utils.GcsClient.
create_from_redis(redis_client))
if use_gcs_for_bootstrap():
gcs_client = ray._private.gcs_utils.GcsClient(
address=args.address)
else:
redis_client = try_create_redis_client(
args.address, args.redis_password)
gcs_client = (ray._private.gcs_utils.GcsClient.
create_from_redis(redis_client))
ray.experimental.internal_kv._initialize_internal_kv(
gcs_client)
ray.experimental.internal_kv._internal_kv_put(

View file

@ -844,10 +844,12 @@ def init(
if address:
bootstrap_address = services.canonicalize_bootstrap_address(address)
assert bootstrap_address is not None
# TODO(mwtian): bootstrap with GCS address
redis_address = bootstrap_address
logger.info("Connecting to existing Ray cluster at address: "
f"{bootstrap_address}")
if gcs_utils.use_gcs_for_bootstrap():
gcs_address = bootstrap_address
else:
redis_address = bootstrap_address
if configure_logging:
setup_logger(logging_level, logging_format)
@ -1369,8 +1371,8 @@ def connect(node,
# The Redis client can safely be shared between threads. However,
# that is not true of Redis pubsub clients. See the documentation at
# https://github.com/andymccurdy/redis-py#thread-safety.
# TODo(mwtian): do not create Redis client when bootstrapping with GCS
worker.redis_client = node.create_redis_client()
if not gcs_utils.use_gcs_for_bootstrap():
worker.redis_client = node.create_redis_client()
worker.gcs_client = node.get_gcs_client()
assert worker.gcs_client is not None
_initialize_internal_kv(worker.gcs_client)

View file

@ -169,13 +169,10 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// data.
rpc_server_.Run();
// Store gcs rpc server address in redis.
// TODO (iycheng): Don't write once redis removal is done
// if (!RayConfig::instance().bootstrap_with_gcs()) {
// // Store gcs rpc server address in redis.
// StoreGcsServerAddressInRedis();
// }
StoreGcsServerAddressInRedis();
if (!RayConfig::instance().bootstrap_with_gcs()) {
StoreGcsServerAddressInRedis();
}
// Only after the rpc_server_ is running can the heartbeat manager
// be run. Otherwise the node failure detector will mistake
// some living nodes as dead as the timer inside node failure