[gcs] Remove use_gcs_for_bootstrap in core (python) and autoscaler (#23050)

This is part of cleanup PR for Redisless Ray. This PR remove use_gcs_for_bootstrap in core and autoscaler.
This commit is contained in:
Yi Cheng 2022-03-11 14:36:16 -08:00 committed by GitHub
parent 252ba6cecd
commit 4f86b5b523
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 41 additions and 214 deletions

View file

@ -18,7 +18,6 @@ from typing import Any, Callable
import ray
import ray.dashboard.consts as dashboard_consts
from ray.ray_constants import env_bool
from ray._private.gcs_utils import use_gcs_for_bootstrap
from ray import serve
try:
@ -259,22 +258,11 @@ def init_ray_and_catch_exceptions(connect_to_serve: bool = False) -> Callable:
try:
if not ray.is_initialized():
try:
if use_gcs_for_bootstrap():
address = self._dashboard_head.gcs_address
redis_pw = None
logger.info(f"Connecting to ray with address={address}")
else:
ip, port = self._dashboard_head.redis_address
redis_pw = self._dashboard_head.redis_password
address = f"{ip}:{port}"
logger.info(
f"Connecting to ray with address={address}, "
f"redis_pw={redis_pw}"
)
address = self._dashboard_head.gcs_address
logger.info(f"Connecting to ray with address={address}")
ray.init(
address=address,
namespace=RAY_INTERNAL_DASHBOARD_NAMESPACE,
_redis_password=redis_pw,
)
except Exception as e:
ray.shutdown()

View file

@ -35,12 +35,8 @@ from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
from ray.core.generated import gcs_pb2
import ray.ray_constants as ray_constants
from ray._private.ray_logging import setup_component_logger
from ray._private.gcs_pubsub import gcs_pubsub_enabled, GcsPublisher
from ray._private.gcs_utils import (
GcsClient,
get_gcs_address_from_redis,
use_gcs_for_bootstrap,
)
from ray._private.gcs_pubsub import GcsPublisher
from ray._private.gcs_utils import GcsClient
from ray.experimental.internal_kv import (
_initialize_internal_kv,
_internal_kv_put,
@ -148,19 +144,7 @@ class Monitor:
stop_event: Optional[Event] = None,
retry_on_failure: bool = True,
):
if not use_gcs_for_bootstrap():
# Initialize the Redis clients.
redis_address = address
self.redis = ray._private.services.create_redis_client(
redis_address, password=redis_password
)
(ip, port) = address.split(":")
# Initialize the gcs stub for getting all node resource usage.
gcs_address = get_gcs_address_from_redis(self.redis)
else:
gcs_address = address
redis_address = None
gcs_address = address
options = (("grpc.enable_http_proxy", 0),)
gcs_channel = ray._private.utils.init_grpc_channel(gcs_address, options)
# TODO: Use gcs client for this
@ -170,39 +154,25 @@ class Monitor:
self.gcs_node_info_stub = gcs_service_pb2_grpc.NodeInfoGcsServiceStub(
gcs_channel
)
if redis_password is not None:
logger.warning("redis_password has been deprecated.")
# Set the redis client and mode so _internal_kv works for autoscaler.
worker = ray.worker.global_worker
if use_gcs_for_bootstrap():
gcs_client = GcsClient(address=gcs_address)
else:
worker.redis_client = self.redis
gcs_client = GcsClient.create_from_redis(self.redis)
gcs_client = GcsClient(address=gcs_address)
if monitor_ip:
monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}"
if use_gcs_for_bootstrap():
gcs_client.internal_kv_put(
b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None
)
else:
self.redis.set("AutoscalerMetricsAddress", monitor_addr)
gcs_client.internal_kv_put(
b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None
)
_initialize_internal_kv(gcs_client)
if monitor_ip:
monitor_addr = f"{monitor_ip}:{AUTOSCALER_METRIC_PORT}"
if use_gcs_for_bootstrap():
gcs_client.internal_kv_put(
b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None
)
else:
self.redis.set("AutoscalerMetricsAddress", monitor_addr)
gcs_client.internal_kv_put(
b"AutoscalerMetricsAddress", monitor_addr.encode(), True, None
)
worker.mode = 0
if use_gcs_for_bootstrap():
head_node_ip = gcs_address.split(":")[0]
else:
head_node_ip = redis_address.split(":")[0]
self.redis_address = redis_address
self.redis_password = redis_password
head_node_ip = gcs_address.split(":")[0]
self.load_metrics = LoadMetrics()
self.last_avail_resources = None
@ -482,26 +452,13 @@ class Monitor:
_internal_kv_put(
ray_constants.DEBUG_AUTOSCALING_ERROR, message, overwrite=True
)
if not use_gcs_for_bootstrap():
redis_client = ray._private.services.create_redis_client(
self.redis_address, password=self.redis_password
)
else:
redis_client = None
gcs_publisher = None
if gcs_pubsub_enabled():
if use_gcs_for_bootstrap():
gcs_publisher = GcsPublisher(address=args.gcs_address)
else:
gcs_publisher = GcsPublisher(
address=get_gcs_address_from_redis(redis_client)
)
gcs_publisher = GcsPublisher(address=args.gcs_address)
from ray._private.utils import publish_error_to_driver
publish_error_to_driver(
ray_constants.MONITOR_DIED_ERROR,
message,
redis_client=redis_client,
redis_client=None,
gcs_publisher=gcs_publisher,
)
@ -637,9 +594,7 @@ if __name__ == "__main__":
else:
autoscaling_config = None
bootstrap_address = (
args.gcs_address if use_gcs_for_bootstrap() else args.redis_address
)
bootstrap_address = args.gcs_address
if bootstrap_address is None:
raise ValueError("One of --gcs-address or --redis-address must be set!")

View file

@ -9,7 +9,6 @@ import time
import ray
import ray._private.services
from ray._private.gcs_utils import use_gcs_for_bootstrap
from ray._private.client_mode_hook import disable_client_hook
from ray import ray_constants
from ray._raylet import GcsClientOptions
@ -149,10 +148,7 @@ class Cluster:
@property
def address(self):
if use_gcs_for_bootstrap():
return self.gcs_address
else:
return self.redis_address
return self.gcs_address
def connect(self, namespace=None):
"""Connect the driver to the cluster."""
@ -208,12 +204,7 @@ class Cluster:
)
self.webui_url = self.head_node.webui_url
# Init global state accessor when creating head node.
if use_gcs_for_bootstrap():
gcs_options = GcsClientOptions.from_gcs_address(node.gcs_address)
else:
gcs_options = GcsClientOptions.from_redis_address(
self.redis_address, self.redis_password
)
gcs_options = GcsClientOptions.from_gcs_address(node.gcs_address)
self.global_state._initialize_global_state(gcs_options)
else:
ray_params.update_if_absent(redis_address=self.redis_address)

View file

@ -5,7 +5,6 @@ import ray._private.profiling as profiling
import ray._private.utils as utils
from ray import ray_constants
from ray.state import GlobalState
from ray._private.gcs_utils import use_gcs_for_bootstrap
from ray._raylet import GcsClientOptions
__all__ = ["free", "global_gc"]
@ -34,10 +33,7 @@ def memory_summary(
address = services.canonicalize_bootstrap_address(address)
state = GlobalState()
if use_gcs_for_bootstrap():
options = GcsClientOptions.from_gcs_address(address)
else:
options = GcsClientOptions.from_redis_address(address, redis_password)
options = GcsClientOptions.from_gcs_address(address)
state._initialize_global_state(options)
if stats_only:
return get_store_stats(state)

View file

@ -23,11 +23,7 @@ import ray
import ray.ray_constants as ray_constants
import ray._private.services
import ray._private.utils
from ray._private.gcs_utils import (
GcsClient,
use_gcs_for_bootstrap,
get_gcs_address_from_redis,
)
from ray._private.gcs_utils import GcsClient
from ray._private.resource_spec import ResourceSpec
from ray._private.utils import try_to_create_directory, try_to_symlink, open_log
import ray._private.usage.usage_lib as ray_usage_lib
@ -324,8 +320,6 @@ class Node:
# Makes sure the Node object has valid addresses after setup.
self.validate_ip_port(self.address)
self.validate_ip_port(self.gcs_address)
if not use_gcs_for_bootstrap():
self.validate_ip_port(self.redis_address)
@staticmethod
def validate_ip_port(ip_port):
@ -384,19 +378,6 @@ class Node:
ray._private.utils.set_sigterm_handler(sigterm_handler)
def _get_gcs_address_from_redis(self):
redis_cli = self.create_redis_client()
error = None
for _ in range(NUM_REDIS_GET_RETRIES):
try:
return get_gcs_address_from_redis(redis_cli)
except Exception as e:
logger.debug(f"Fetch gcs address from redis failed {e}")
error = e
time.sleep(1)
assert error is not None
logger.error(f"Fetch gcs address from redis failed {error}")
def _init_temp(self):
# Create a dictionary to store temp file index.
self._incremental_dict = collections.defaultdict(lambda: 0)
@ -512,26 +493,17 @@ class Node:
`ray start` or `ray.int()` to start worker nodes, that has been
converted to ip:port format.
"""
if use_gcs_for_bootstrap():
return self._gcs_address
return self._redis_address
return self._gcs_address
@property
def gcs_address(self):
"""Get the gcs address."""
if use_gcs_for_bootstrap():
assert self._gcs_address is not None, "Gcs address is not set"
else:
# Always get the address from Redis because GCS address may change
# after restarting. This will be removed later.
self._gcs_address = self._get_gcs_address_from_redis()
assert self._gcs_address is not None, "Gcs address is not set"
return self._gcs_address
@property
def redis_address(self):
"""Get the cluster Redis address."""
if not use_gcs_for_bootstrap():
assert self._redis_address is not None
return self._redis_address
@property
@ -982,8 +954,7 @@ class Node:
# e.g. https://github.com/ray-project/ray/issues/15780
# TODO(mwtian): figure out a way to use 127.0.0.1 for local connection
# when possible.
if use_gcs_for_bootstrap():
self._gcs_address = f"{self._node_ip_address}:" f"{gcs_server_port}"
self._gcs_address = f"{self._node_ip_address}:" f"{gcs_server_port}"
# Initialize gcs client, which also waits for GCS to start running.
self.get_gcs_client()
@ -1110,10 +1081,7 @@ class Node:
assert self._gcs_address is None
assert self._gcs_client is None
if (
not use_gcs_for_bootstrap()
or self._ray_params.external_addresses is not None
):
if self._ray_params.external_addresses is not None:
# This only configures external Redis and does not start local
# Redis, when external Redis address is specified.
# TODO(mwtian): after GCS bootstrapping is default and stable,
@ -1147,14 +1115,9 @@ class Node:
# on this node and spilled objects remain on disk.
if not self.head:
# Get the system config from GCS first if this is a non-head node.
if not use_gcs_for_bootstrap():
gcs_options = ray._raylet.GcsClientOptions.from_redis_address(
self.redis_address, self.redis_password
)
else:
gcs_options = ray._raylet.GcsClientOptions.from_gcs_address(
self.gcs_address
)
gcs_options = ray._raylet.GcsClientOptions.from_gcs_address(
self.gcs_address
)
global_state = ray.state.GlobalState()
global_state._initialize_global_state(gcs_options)
new_config = global_state.get_system_config()

View file

@ -12,11 +12,9 @@ import time
import urllib
import urllib.parse
import yaml
from socket import socket
import ray
import psutil
from ray._private.gcs_utils import use_gcs_for_bootstrap
import ray._private.services as services
import ray.ray_constants as ray_constants
import ray._private.utils
@ -544,7 +542,7 @@ def start(
ray_debugger_external,
):
"""Start Ray processes manually on the local machine."""
if use_gcs_for_bootstrap() and gcs_server_port is not None:
if gcs_server_port is not None:
cli_logger.error(
"`{}` is deprecated and ignored. Use {} to specify "
"GCS server port on head node.",
@ -618,15 +616,7 @@ def start(
# Set bootstrap port.
assert ray_params.redis_port is None
assert ray_params.gcs_server_port is None
if use_gcs_for_bootstrap():
ray_params.gcs_server_port = port
else:
if port == 0:
with socket() as s:
s.bind(("", 0))
port = s.getsockname()[1]
ray_params.redis_port = port
ray_params.gcs_server_port = gcs_server_port
ray_params.gcs_server_port = port
if os.environ.get("RAY_FAKE_CLUSTER"):
ray_params.env_vars = {
@ -847,16 +837,7 @@ def start(
)
raise Exception("Cannot canonicalize address " f"`--address={address}`.")
if use_gcs_for_bootstrap():
ray_params.gcs_address = bootstrap_address
else:
ray_params.redis_address = bootstrap_address
address_ip, address_port = services.extract_ip_port(bootstrap_address)
# Wait for the Redis server to be started. And throw an exception
# if we can't connect to it.
services.wait_for_redis_to_start(
address_ip, address_port, password=redis_password
)
ray_params.gcs_address = bootstrap_address
# Get the node IP address if one is not provided.
ray_params.update_if_absent(
@ -1765,13 +1746,7 @@ def memory(
def status(address, redis_password):
"""Print cluster status, including autoscaling info."""
address = services.canonicalize_bootstrap_address(address)
if use_gcs_for_bootstrap():
gcs_client = ray._private.gcs_utils.GcsClient(address=address)
else:
redis_client = ray._private.services.create_redis_client(
address, redis_password
)
gcs_client = ray._private.gcs_utils.GcsClient.create_from_redis(redis_client)
gcs_client = ray._private.gcs_utils.GcsClient(address=address)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
status = ray.experimental.internal_kv._internal_kv_get(
ray_constants.DEBUG_AUTOSCALING_STATUS
@ -2067,16 +2042,7 @@ def healthcheck(address, redis_password, component):
address = services.canonicalize_bootstrap_address(address)
if use_gcs_for_bootstrap():
gcs_address = address
else:
# If client creation or ping fails, this will exit with a non-zero
# exit code.
redis_client = ray._private.services.create_redis_client(
address, redis_password
)
redis_client.ping()
gcs_address = redis_client.get("GcsServerAddress").decode()
gcs_address = address
if not component:
try:

View file

@ -8,7 +8,6 @@ from ray.util.placement_group import placement_group, remove_placement_group
from ray.autoscaler.sdk import request_resources
from ray.autoscaler._private.monitor import Monitor
from ray.cluster_utils import Cluster
from ray._private.gcs_utils import use_gcs_for_bootstrap
from ray._private.test_utils import (
generate_system_config_map,
wait_for_condition,
@ -195,10 +194,7 @@ def test_heartbeats_single(ray_start_cluster_head):
Test proper metrics.
"""
cluster = ray_start_cluster_head
if use_gcs_for_bootstrap():
monitor = setup_monitor(cluster.gcs_address)
else:
monitor = setup_monitor(cluster.address)
monitor = setup_monitor(cluster.gcs_address)
total_cpus = ray.state.cluster_resources()["CPU"]
verify_load_metrics(monitor, ({"CPU": 0.0}, {"CPU": total_cpus}))

View file

@ -28,11 +28,7 @@ from ray.ray_constants import DEBUG_AUTOSCALING_ERROR, DEBUG_AUTOSCALING_STATUS
def get_ray_status_output(address):
if gcs_utils.use_gcs_for_bootstrap():
gcs_client = gcs_utils.GcsClient(address=address)
else:
redis_client = ray._private.services.create_redis_client(address, "")
gcs_client = gcs_utils.GcsClient.create_from_redis(redis_client)
gcs_client = gcs_utils.GcsClient(address=address)
internal_kv._initialize_internal_kv(gcs_client)
status = internal_kv._internal_kv_get(DEBUG_AUTOSCALING_STATUS)
error = internal_kv._internal_kv_get(DEBUG_AUTOSCALING_ERROR)

View file

@ -977,10 +977,7 @@ def init(
logger.info(
"Connecting to existing Ray cluster at address: " f"{bootstrap_address}"
)
if gcs_utils.use_gcs_for_bootstrap():
gcs_address = bootstrap_address
else:
redis_address = bootstrap_address
gcs_address = bootstrap_address
if configure_logging:
setup_logger(logging_level, logging_format)
@ -1538,21 +1535,12 @@ def connect(
# The Redis client can safely be shared between threads. However,
# that is not true of Redis pubsub clients. See the documentation at
# https://github.com/andymccurdy/redis-py#thread-safety.
if not gcs_utils.use_gcs_for_bootstrap():
worker.redis_client = node.create_redis_client()
worker.gcs_client = node.get_gcs_client()
assert worker.gcs_client is not None
_initialize_internal_kv(worker.gcs_client)
if gcs_utils.use_gcs_for_bootstrap():
ray.state.state._initialize_global_state(
ray._raylet.GcsClientOptions.from_gcs_address(node.gcs_address)
)
else:
ray.state.state._initialize_global_state(
ray._raylet.GcsClientOptions.from_redis_address(
node.redis_address, redis_password=node.redis_password
)
)
ray.state.state._initialize_global_state(
ray._raylet.GcsClientOptions.from_gcs_address(node.gcs_address)
)
worker.gcs_pubsub_enabled = gcs_pubsub_enabled()
worker.gcs_publisher = None
if worker.gcs_pubsub_enabled:
@ -1625,19 +1613,7 @@ def connect(
elif not LOCAL_MODE:
raise ValueError("Invalid worker mode. Expected DRIVER, WORKER or LOCAL.")
if gcs_utils.use_gcs_for_bootstrap():
gcs_options = ray._raylet.GcsClientOptions.from_gcs_address(node.gcs_address)
else:
# As the synchronous and the asynchronous context of redis client is
# not used in this gcs client. We would not open connection for it
# by setting `enable_sync_conn` and `enable_async_conn` as false.
gcs_options = ray._raylet.GcsClientOptions.from_redis_address(
node.redis_address,
node.redis_password,
enable_sync_conn=False,
enable_async_conn=False,
enable_subscribe_conn=True,
)
gcs_options = ray._raylet.GcsClientOptions.from_gcs_address(node.gcs_address)
if job_config is None:
job_config = ray.job_config.JobConfig()