[GCS] support external Redis in GCS bootstrapping mode (#21436)

External Redis should still be supported with GCS bootstrapping, to avoid breaking users.
In GCS mode, some logic are removed for external Redis:
- Printing external Redis addresses to terminal: hard to implement across `ray start`, `ray.init()` and Ray cluster util.
- Starting local Redis if external Redis is unavailable: failing loudly here seems more appropriate.

Also, re-enable a few tests which restarts GCS in GCS bootstrapping mode, by using external Redis for KV storage.
This commit is contained in:
mwtian 2022-01-13 16:01:11 -08:00 committed by GitHub
parent d6dbf3b8bf
commit 30968a9358
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 246 additions and 134 deletions

View file

@ -365,8 +365,7 @@
--test_env=RAY_bootstrap_with_gcs=1
--test_env=RAY_gcs_storage=memory
-- //python/ray/tests/...
-//python/ray/tests:test_component_failures_3 -//python/ray/tests:test_healthcheck
-//python/ray/tests:test_gcs_fault_tolerance
-//python/ray/tests:test_healthcheck
- label: ":redis: HA GCS (Medium K-Z)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
@ -377,8 +376,7 @@
--test_env=RAY_bootstrap_with_gcs=1
--test_env=RAY_gcs_storage=memory
-- //python/ray/tests/...
-//python/ray/tests:test_ray_debugger -//python/ray/tests:test_multi_node_3
-//python/ray/tests:test_placement_group_2 -//python/ray/tests:test_placement_group_3
-//python/ray/tests:test_ray_debugger
- label: ":octopus: Tune soft imports test"
conditions: ["RAY_CI_TUNE_AFFECTED"]

View file

@ -208,7 +208,7 @@ test_large_gcs() {
--test_env=CONDA_SHLVL --test_env=CONDA_PREFIX --test_env=CONDA_DEFAULT_ENV --test_env=CONDA_PROMPT_MODIFIER \
--test_env=CI --test_tag_filters="large_size_python_tests_shard_${BUILDKITE_PARALLEL_JOB}" \
--test_env=RAY_gcs_grpc_based_pubsub=1 --test_env=RAY_bootstrap_with_gcs=1 --test_env=RAY_gcs_storage=memory \
-- python/ray/tests/... -//python/ray/tests:test_failure_2
-- python/ray/tests/...
}
test_cpp() {

View file

@ -548,23 +548,23 @@ def create_redis_client(redis_address, password=None):
"""
if not hasattr(create_redis_client, "instances"):
create_redis_client.instances = {}
else:
for _ in range(ray_constants.START_REDIS_WAIT_RETRIES):
cli = create_redis_client.instances.get(redis_address)
if cli is not None:
try:
cli.ping()
return cli
except Exception:
create_redis_client.instances.pop(redis_address)
if cli is None:
redis_ip_address, redis_port = extract_ip_port(
canonicalize_bootstrap_address(redis_address))
cli = redis.StrictRedis(
host=redis_ip_address, port=int(redis_port), password=password)
create_redis_client.instances[redis_address] = cli
try:
cli.ping()
return cli
except Exception:
create_redis_client.instances.pop(redis_address)
time.sleep(2)
redis_ip_address, redis_port = extract_ip_port(
canonicalize_bootstrap_address(redis_address))
# For this command to work, some other client (on the same machine
# as Redis) must have run "CONFIG SET protected-mode no".
create_redis_client.instances[redis_address] = redis.StrictRedis(
host=redis_ip_address, port=int(redis_port), password=password)
return create_redis_client.instances[redis_address]
raise RuntimeError(f"Unable to connect to Redis at {redis_address}")
def start_ray_process(command,

View file

@ -169,14 +169,16 @@ def wait_for_children_names_of_pid(pid, children_names, timeout=20):
def wait_for_children_of_pid(pid, num_children=1, timeout=20):
p = psutil.Process(pid)
start_time = time.time()
alive = []
while time.time() - start_time < timeout:
num_alive = len(p.children(recursive=False))
alive = p.children(recursive=False)
num_alive = len(alive)
if num_alive >= num_children:
return
time.sleep(0.1)
raise RayTestTimeoutException(
"Timed out while waiting for process {} children to start "
"({}/{} started).".format(pid, num_alive, num_children))
f"Timed out while waiting for process {pid} children to start "
f"({num_alive}/{num_children} started: {alive}).")
def wait_for_children_of_pid_to_exit(pid, timeout=20):

View file

@ -106,7 +106,7 @@ class Cluster:
initialize_head (bool): Automatically start a Ray cluster
by initializing the head node. Defaults to False.
connect (bool): If `initialize_head=True` and `connect=True`,
ray.init will be called with the redis address of this cluster
ray.init will be called with the address of this cluster
passed in.
head_node_args (dict): Arguments to be passed into
`start_ray_head` via `self.add_node`.

View file

@ -2,6 +2,7 @@ import atexit
import collections
import datetime
import errno
import grpc
import json
import logging
import os
@ -83,6 +84,23 @@ class Node:
self.all_processes = {}
self.removal_lock = threading.Lock()
# Set up external Redis when `RAY_REDIS_ADDRESS` is specified.
redis_address_env = os.environ.get("RAY_REDIS_ADDRESS")
if ray_params.external_addresses is None and \
redis_address_env is not None:
external_redis = redis_address_env.split(",")
# Reuse primary Redis as Redis shard when there's only one
# instance provided.
if len(external_redis) == 1:
external_redis.append(external_redis[0])
[primary_redis_ip, port] = external_redis[0].split(":")
ray._private.services.wait_for_redis_to_start(
primary_redis_ip, port, password=ray_params.redis_password)
ray_params.external_addresses = external_redis
ray_params.num_redis_shards = len(external_redis) - 1
# Try to get node IP address with the parameters.
if ray_params.node_ip_address:
node_ip_address = ray_params.node_ip_address
@ -801,8 +819,8 @@ class Node:
process_info,
]
def start_redis(self):
"""Start the Redis server."""
def start_or_configure_redis(self):
"""Starts local Redis or configures external Redis."""
assert self._redis_address is None
redis_log_files = []
if self._ray_params.external_addresses is None:
@ -1020,15 +1038,12 @@ class Node:
]
def _write_cluster_info_to_kv(self):
client = self.get_gcs_client()
# Version info.
# Write Version info.
ray_version, python_version = self._compute_version_info()
version_info = json.dumps((ray_version, python_version))
client.internal_kv_put(
self._internal_kv_put_with_retry(
b"VERSION_INFO",
version_info.encode(),
overwrite=True,
namespace=ray_constants.KV_NAMESPACE_CLUSTER)
def start_head_processes(self):
@ -1039,10 +1054,15 @@ class Node:
assert self._gcs_address is None
assert self._gcs_client is None
# If this is the head node, start the relevant head node processes.
if not use_gcs_for_bootstrap():
self.start_redis()
assert self._redis_address is not None
if not use_gcs_for_bootstrap() or \
self._ray_params.external_addresses is not None:
# This only configures external Redis and does not start local
# Redis, when external Redis address is specified.
# TODO(mwtian): after GCS bootstrapping is default and stable,
# only keep external Redis configuration logic in the function.
self.start_or_configure_redis()
# Wait for Redis to become available.
self.create_redis_client()
self.start_gcs_server()
assert self._gcs_client is not None
@ -1447,5 +1467,22 @@ class Node:
time.sleep(2)
if not result:
raise RuntimeError(f"Could not read '{key}' from GCS (redis). "
"Has redis started correctly on the head node?")
"If using Redis, did Redis start successfully?")
return result
def _internal_kv_put_with_retry(self,
key,
value,
namespace,
num_retries=NUM_REDIS_GET_RETRIES):
if isinstance(key, str):
key = key.encode()
for i in range(num_retries):
try:
return self.get_gcs_client().internal_kv_put(
key, value, overwrite=True, namespace=namespace)
except grpc.RpcError:
logger.exception("Internal KV Put failed")
time.sleep(2)
# Reraise the last grpc.RpcError.
raise

View file

@ -157,7 +157,8 @@ RESOURCES_ENVIRONMENT_VARIABLE = "RAY_OVERRIDE_RESOURCES"
REPORTER_UPDATE_INTERVAL_MS = env_integer("REPORTER_UPDATE_INTERVAL_MS", 2500)
# Number of attempts to ping the Redis server. See
# `services.py:wait_for_redis_to_start`.
# `services.py::wait_for_redis_to_start()` and
# `services.py::create_redis_client()`
START_REDIS_WAIT_RETRIES = env_integer("RAY_START_REDIS_WAIT_RETRIES", 16)
LOGGER_FORMAT = (

View file

@ -554,16 +554,17 @@ def start(node_ip_address, address, port, redis_password, redis_shard_ports,
# not provided.
num_redis_shards = len(redis_shard_ports)
address_env = os.environ.get("RAY_REDIS_ADDRESS")
underlying_address = \
address_env if address_env is not None else address
if underlying_address is not None:
# This logic is deprecated and will be removed later.
if address is not None:
cli_logger.warning(
"Specifying {} for external Redis address is deprecated. "
"Please specify environment variable {}={} instead.",
cf.bold("--address"), cf.bold("RAY_REDIS_ADDRESS"), address)
cli_logger.print(
"Will use `{}` as external Redis server address(es). "
"If the primary one is not reachable, we starts new one(s) "
"with `{}` in local.", cf.bold(underlying_address),
cf.bold("--port"))
external_addresses = underlying_address.split(",")
"with `{}` in local.", cf.bold(address), cf.bold("--port"))
external_addresses = address.split(",")
# We reuse primary redis as sharding when there's only one
# instance provided.

View file

@ -5,15 +5,18 @@ import os
from contextlib import contextmanager
import pytest
import tempfile
import socket
import subprocess
import json
import time
from pathlib import Path
import ray
import ray.ray_constants as ray_constants
from ray.cluster_utils import (Cluster, AutoscalingCluster,
cluster_not_supported)
from ray._private.services import REDIS_EXECUTABLE, _start_redis_instance
from ray._private.services import REDIS_EXECUTABLE, _start_redis_instance, \
wait_for_redis_to_start
from ray._private.test_utils import (init_error_pubsub, init_log_pubsub,
setup_tls, teardown_tls,
get_and_run_node_killer)
@ -48,6 +51,37 @@ def get_default_fixture_ray_kwargs():
return ray_kwargs
@pytest.fixture
def external_redis(request, monkeypatch):
# Setup external Redis and env var for initialization.
param = getattr(request, "param", {})
external_redis_ports = param.get("external_redis_ports")
if external_redis_ports is None:
with socket.socket() as s:
s.bind(("", 0))
port = s.getsockname()[1]
external_redis_ports = [port]
else:
del param["external_redis_ports"]
processes = []
for port in external_redis_ports:
temp_dir = ray._private.utils.get_ray_temp_dir()
port, proc = _start_redis_instance(
REDIS_EXECUTABLE,
temp_dir,
port,
password=ray_constants.REDIS_DEFAULT_PASSWORD)
processes.append(proc)
wait_for_redis_to_start("127.0.0.1", port,
ray_constants.REDIS_DEFAULT_PASSWORD)
address_str = ",".join(
map(lambda x: f"127.0.0.1:{x}", external_redis_ports))
monkeypatch.setenv("RAY_REDIS_ADDRESS", address_str)
yield None
for proc in processes:
proc.process.terminate()
@contextmanager
def _ray_start(**kwargs):
init_kwargs = get_default_fixture_ray_kwargs()
@ -85,6 +119,16 @@ def ray_start_regular(request):
yield res
# We can compose external_redis and ray_start_regular instead of creating this
# separate fixture, if there is a good way to ensure external_redis runs before
# ray_start_regular.
@pytest.fixture
def ray_start_regular_with_external_redis(request, external_redis):
param = getattr(request, "param", {})
with _ray_start(**param) as res:
yield res
@pytest.fixture(scope="module")
def ray_start_regular_shared(request):
param = getattr(request, "param", {})
@ -174,6 +218,16 @@ def ray_start_cluster_head(request):
yield res
# We can compose external_redis and ray_start_cluster_head instead of creating
# this separate fixture, if there is a good way to ensure external_redis runs
# before ray_start_cluster_head.
@pytest.fixture
def ray_start_cluster_head_with_external_redis(request, external_redis):
param = getattr(request, "param", {})
with _ray_start_cluster(do_init=True, num_nodes=1, **param) as res:
yield res
@pytest.fixture
def ray_start_cluster_2_nodes(request):
param = getattr(request, "param", {})

View file

@ -274,7 +274,6 @@ def test_actor_class_name(ray_start_regular):
pass
Foo.remote()
# TODO: redis-removal kv
g = ray.worker.global_worker.gcs_client
actor_keys = g.internal_kv_keys(b"ActorClass",
ray_constants.KV_NAMESPACE_FUNCTION_TABLE)

View file

@ -6,6 +6,7 @@ import pytest
import ray
import ray.ray_constants as ray_constants
from ray._private.gcs_utils import use_gcs_for_bootstrap
from ray._private.test_utils import get_other_nodes
@ -78,7 +79,8 @@ def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_raylet()
ray.worker._global_node.kill_log_monitor()
ray.worker._global_node.kill_monitor()
ray.worker._global_node.kill_gcs_server()
if not use_gcs_for_bootstrap():
ray.worker._global_node.kill_gcs_server()
# If the driver can reach the tearDown method, then it is still alive.
@ -86,11 +88,11 @@ def test_driver_lives_sequential(ray_start_regular):
def test_driver_lives_parallel(ray_start_regular):
all_processes = ray.worker._global_node.all_processes
process_infos = (all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET] +
process_infos = (all_processes[ray_constants.PROCESS_TYPE_RAYLET] +
all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_MONITOR])
assert len(process_infos) == 4
if not use_gcs_for_bootstrap():
process_infos += all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER]
# Kill all the components in parallel.
for process_info in process_infos:

View file

@ -21,12 +21,12 @@ def increase(x):
@pytest.mark.parametrize(
"ray_start_regular", [
"ray_start_regular_with_external_redis", [
generate_system_config_map(
num_heartbeats_timeout=20, gcs_rpc_server_reconnect_timeout_s=60)
],
indirect=True)
def test_gcs_server_restart(ray_start_regular):
def test_gcs_server_restart(ray_start_regular_with_external_redis):
actor1 = Increase.remote()
result = ray.get(actor1.method.remote(1))
assert result == 3
@ -49,7 +49,7 @@ def test_gcs_server_restart(ray_start_regular):
@pytest.mark.parametrize(
"ray_start_regular", [
"ray_start_regular_with_external_redis", [
generate_system_config_map(
num_heartbeats_timeout=20, gcs_rpc_server_reconnect_timeout_s=60)
],
@ -58,7 +58,8 @@ def test_gcs_server_restart(ray_start_regular):
gcs_pubsub.gcs_pubsub_enabled(),
reason="GCS pubsub may lose messages after GCS restarts. Need to "
"implement re-fetching state in GCS client.")
def test_gcs_server_restart_during_actor_creation(ray_start_regular):
def test_gcs_server_restart_during_actor_creation(
ray_start_regular_with_external_redis):
ids = []
# We reduce the number of actors because there are too many actors created
# and `Too many open files` error will be thrown.
@ -79,19 +80,20 @@ def test_gcs_server_restart_during_actor_creation(ray_start_regular):
@pytest.mark.parametrize(
"ray_start_cluster_head", [
"ray_start_cluster_head_with_external_redis", [
generate_system_config_map(
num_heartbeats_timeout=2, gcs_rpc_server_reconnect_timeout_s=60)
],
indirect=True)
def test_node_failure_detector_when_gcs_server_restart(ray_start_cluster_head):
def test_node_failure_detector_when_gcs_server_restart(
ray_start_cluster_head_with_external_redis):
"""Checks that the node failure detector is correct when gcs server restart.
We set the cluster to timeout nodes after 2 seconds of heartbeats. We then
kill gcs server and remove the worker node and restart gcs server again to
check that the removed node will die finally.
"""
cluster = ray_start_cluster_head
cluster = ray_start_cluster_head_with_external_redis
worker = cluster.add_node()
cluster.wait_for_nodes()
@ -139,12 +141,13 @@ def test_node_failure_detector_when_gcs_server_restart(ray_start_cluster_head):
@pytest.mark.parametrize(
"ray_start_regular", [
"ray_start_regular_with_external_redis", [
generate_system_config_map(
num_heartbeats_timeout=20, gcs_rpc_server_reconnect_timeout_s=60)
],
indirect=True)
def test_del_actor_after_gcs_server_restart(ray_start_regular):
def test_del_actor_after_gcs_server_restart(
ray_start_regular_with_external_redis):
actor = Increase.options(name="abc").remote()
result = ray.get(actor.method.remote(1))
assert result == 3
@ -173,11 +176,11 @@ def test_del_actor_after_gcs_server_restart(ray_start_regular):
@pytest.mark.parametrize("auto_reconnect", [True, False])
def test_gcs_client_reconnect(ray_start_regular, auto_reconnect):
redis_client = ray.worker.global_worker.redis_client
channel = gcs_utils.GcsChannel(redis_client=redis_client)
gcs_client = gcs_utils.GcsClient(channel) if auto_reconnect \
else gcs_utils.GcsClient(channel, nums_reconnect_retry=0)
def test_gcs_client_reconnect(ray_start_regular_with_external_redis,
auto_reconnect):
gcs_address = ray.worker.global_worker.gcs_client.address
gcs_client = gcs_utils.GcsClient(address=gcs_address) if auto_reconnect \
else gcs_utils.GcsClient(address=gcs_address, nums_reconnect_retry=0)
gcs_client.internal_kv_put(b"a", b"b", True, None)
gcs_client.internal_kv_get(b"a", None) == b"b"

View file

@ -4,6 +4,10 @@ import subprocess
import sys
import ray
import ray.ray_constants as ray_constants
from ray._private.gcs_utils import use_gcs_for_bootstrap
from ray._private.services import REDIS_EXECUTABLE, _start_redis_instance
from ray._private.utils import detect_fate_sharing_support
from ray._private.test_utils import (
check_call_ray, run_string_as_driver, run_string_as_driver_nonblocking,
wait_for_children_of_pid, wait_for_children_of_pid_to_exit,
@ -83,16 +87,28 @@ def test_calling_start_ray_head(call_ray_stop_only):
])
check_call_ray(["stop"])
# Test starting Ray with invalid external address.
# It will fall back to creating a new one.
check_call_ray(
["start", "--head", "--address", "127.0.0.1:6379", "--port", "0"])
check_call_ray(["stop"])
temp_dir = ray._private.utils.get_ray_temp_dir()
if not use_gcs_for_bootstrap():
# Test starting Ray with --address flag (deprecated).
_, proc = _start_redis_instance(
REDIS_EXECUTABLE,
temp_dir,
7777,
password=ray_constants.REDIS_DEFAULT_PASSWORD)
check_call_ray(["start", "--head", "--address", "127.0.0.1:7777"])
check_call_ray(["stop"])
proc.process.terminate()
# Test starting Ray with RAY_REDIS_ADDRESS env.
os.environ["RAY_REDIS_ADDRESS"] = "127.0.0.1:6379"
check_call_ray(["start", "--head", "--port", "0"])
_, proc = _start_redis_instance(
REDIS_EXECUTABLE,
temp_dir,
8888,
password=ray_constants.REDIS_DEFAULT_PASSWORD)
os.environ["RAY_REDIS_ADDRESS"] = "127.0.0.1:8888"
check_call_ray(["start", "--head"])
check_call_ray(["stop"])
proc.process.terminate()
del os.environ["RAY_REDIS_ADDRESS"]
# Test --block. Killing a child process should cause the command to exit.
@ -136,7 +152,18 @@ for i in range(0, 5):
blocked.poll()
assert blocked.returncode is None
wait_for_children_of_pid(blocked.pid, num_children=7, timeout=30)
# Include GCS, autoscaler monitor, client server, dashboard, raylet and
# log_monitor.py
num_children = 6
if not use_gcs_for_bootstrap():
# Account for Redis
num_children += 1
if not detect_fate_sharing_support():
# Account for ray_process_reaper.py
num_children += 1
# Check a set of child process commands & scripts instead?
wait_for_children_of_pid(
blocked.pid, num_children=num_children, timeout=30)
blocked.terminate()
wait_for_children_of_pid_to_exit(blocked.pid, timeout=30)

View file

@ -122,40 +122,6 @@ def test_actor_creation_node_failure(ray_start_cluster):
cluster.remove_node(get_other_nodes(cluster, True)[-1])
def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_raylet()
ray.worker._global_node.kill_log_monitor()
if not sys.platform.startswith("win"):
# fails on windows.
ray.worker._global_node.kill_monitor()
ray.worker._global_node.kill_gcs_server()
# If the driver can reach the tearDown method, then it is still alive.
def test_driver_lives_parallel(ray_start_regular):
all_processes = ray.worker._global_node.all_processes
process_infos = (all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET] +
all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_MONITOR])
assert len(process_infos) == 4
# Kill all the components in parallel.
for process_info in process_infos:
process_info.process.terminate()
time.sleep(0.1)
for process_info in process_infos:
process_info.process.kill()
for process_info in process_infos:
process_info.process.wait()
# If the driver can reach the tearDown method, then it is still alive.
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -697,14 +697,14 @@ ray.shutdown()
@pytest.mark.parametrize(
"ray_start_cluster_head", [
"ray_start_cluster_head_with_external_redis", [
generate_system_config_map(
num_heartbeats_timeout=10, gcs_rpc_server_reconnect_timeout_s=60)
],
indirect=True)
def test_create_placement_group_after_gcs_server_restart(
ray_start_cluster_head):
cluster = ray_start_cluster_head
ray_start_cluster_head_with_external_redis):
cluster = ray_start_cluster_head_with_external_redis
cluster.add_node(num_cpus=2)
cluster.add_node(num_cpus=2)
cluster.wait_for_nodes()
@ -735,14 +735,14 @@ def test_create_placement_group_after_gcs_server_restart(
@pytest.mark.parametrize(
"ray_start_cluster_head", [
"ray_start_cluster_head_with_external_redis", [
generate_system_config_map(
num_heartbeats_timeout=10, gcs_rpc_server_reconnect_timeout_s=60)
],
indirect=True)
def test_create_actor_with_placement_group_after_gcs_server_restart(
ray_start_cluster_head):
cluster = ray_start_cluster_head
ray_start_cluster_head_with_external_redis):
cluster = ray_start_cluster_head_with_external_redis
cluster.add_node(num_cpus=2)
cluster.wait_for_nodes()
@ -759,14 +759,14 @@ def test_create_actor_with_placement_group_after_gcs_server_restart(
@pytest.mark.parametrize(
"ray_start_cluster_head", [
"ray_start_cluster_head_with_external_redis", [
generate_system_config_map(
num_heartbeats_timeout=10, gcs_rpc_server_reconnect_timeout_s=60)
],
indirect=True)
def test_bundle_recreated_when_raylet_fo_after_gcs_server_restart(
ray_start_cluster_head):
cluster = ray_start_cluster_head
ray_start_cluster_head_with_external_redis):
cluster = ray_start_cluster_head_with_external_redis
cluster.add_node(num_cpus=2)
cluster.wait_for_nodes()

View file

@ -25,8 +25,11 @@ from ray.autoscaler._private.util import DEBUG_AUTOSCALING_ERROR, \
def get_ray_status_output(address):
redis_client = ray._private.services.create_redis_client(address, "")
gcs_client = gcs_utils.GcsClient.create_from_redis(redis_client)
if gcs_utils.use_gcs_for_bootstrap():
gcs_client = gcs_utils.GcsClient(address=address)
else:
redis_client = ray._private.services.create_redis_client(address, "")
gcs_client = gcs_utils.GcsClient.create_from_redis(redis_client)
internal_kv._initialize_internal_kv(gcs_client)
status = internal_kv._internal_kv_get(DEBUG_AUTOSCALING_STATUS)
error = internal_kv._internal_kv_get(DEBUG_AUTOSCALING_ERROR)
@ -39,14 +42,14 @@ def get_ray_status_output(address):
@pytest.mark.parametrize(
"ray_start_cluster_head", [
"ray_start_cluster_head_with_external_redis", [
generate_system_config_map(
num_heartbeats_timeout=10, gcs_rpc_server_reconnect_timeout_s=60)
],
indirect=True)
def test_create_placement_group_during_gcs_server_restart(
ray_start_cluster_head):
cluster = ray_start_cluster_head
ray_start_cluster_head_with_external_redis):
cluster = ray_start_cluster_head_with_external_redis
cluster.add_node(num_cpus=200)
cluster.wait_for_nodes()
@ -64,13 +67,13 @@ def test_create_placement_group_during_gcs_server_restart(
@pytest.mark.parametrize(
"ray_start_cluster_head", [
"ray_start_cluster_head_with_external_redis", [
generate_system_config_map(
num_heartbeats_timeout=10, gcs_rpc_server_reconnect_timeout_s=60)
],
indirect=True)
def test_placement_group_wait_api(ray_start_cluster_head):
cluster = ray_start_cluster_head
def test_placement_group_wait_api(ray_start_cluster_head_with_external_redis):
cluster = ray_start_cluster_head_with_external_redis
cluster.add_node(num_cpus=2)
cluster.add_node(num_cpus=2)
cluster.wait_for_nodes()

View file

@ -34,6 +34,7 @@ namespace gcs {
GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
instrumented_io_context &main_service)
: config_(config),
storage_type_(StorageType()),
main_service_(main_service),
rpc_server_(config.grpc_server_name, config.grpc_server_port,
config.node_ip_address == "127.0.0.1", config.grpc_server_thread_num,
@ -46,19 +47,17 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
periodical_runner_(main_service),
is_started_(false),
is_stopped_(false) {
// Init gcs table storage.
RAY_LOG(INFO) << "GCS Storage is set to " << RayConfig::instance().gcs_storage();
// Init GCS table storage.
RAY_LOG(INFO) << "GCS storage type is " << storage_type_;
RAY_LOG(INFO) << "gRPC based pubsub is"
<< (RayConfig::instance().gcs_grpc_based_pubsub() ? " " : " not ")
<< "enabled";
if (RayConfig::instance().gcs_storage() == "redis") {
if (storage_type_ == "redis") {
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(GetOrConnectRedis());
} else if (RayConfig::instance().gcs_storage() == "memory") {
} else if (storage_type_ == "memory") {
RAY_CHECK(RayConfig::instance().gcs_grpc_based_pubsub())
<< " grpc pubsub has to be enabled when using storage other than redis";
gcs_table_storage_ = std::make_shared<InMemoryGcsTableStorage>(main_service_);
} else {
RAY_LOG(FATAL) << "Unsupported gcs storage: " << RayConfig::instance().gcs_storage();
}
auto on_done = [this](const ray::Status &status) {
@ -368,6 +367,23 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
rpc_server_.RegisterService(*placement_group_info_service_);
}
std::string GcsServer::StorageType() const {
if (RayConfig::instance().gcs_storage() == "memory") {
if (!config_.redis_address.empty()) {
RAY_LOG(INFO) << "Using external Redis for KV storage: " << config_.redis_address;
return "redis";
}
return "memory";
}
if (RayConfig::instance().gcs_storage() == "redis") {
RAY_CHECK(!config_.redis_address.empty());
return "redis";
}
RAY_LOG(FATAL) << "Unsupported GCS storage type: "
<< RayConfig::instance().gcs_storage();
return RayConfig::instance().gcs_storage();
}
void GcsServer::StoreGcsServerAddressInRedis() {
std::string ip = config_.node_ip_address;
if (ip.empty()) {
@ -419,12 +435,10 @@ void GcsServer::InitStatsHandler() {
void GcsServer::InitKVManager() {
std::unique_ptr<InternalKVInterface> instance;
// TODO (yic): Use a factory with configs
if (RayConfig::instance().gcs_storage() == "redis") {
if (storage_type_ == "redis") {
instance = std::make_unique<RedisInternalKV>(GetRedisClientOptions());
} else if (RayConfig::instance().gcs_storage() == "memory") {
} else if (storage_type_ == "memory") {
instance = std::make_unique<MemoryInternalKV>(main_service_);
} else {
RAY_LOG(FATAL) << "Unsupported gcs storage: " << RayConfig::instance().gcs_storage();
}
kv_manager_ = std::make_unique<GcsInternalKVManager>(std::move(instance));

View file

@ -137,6 +137,9 @@ class GcsServer {
void InstallEventListeners();
private:
/// Gets the type of KV storage to use from config.
std::string StorageType() const;
/// Store the address of GCS server in Redis.
///
/// Clients will look up this address in Redis and use it to connect to GCS server.
@ -160,7 +163,9 @@ class GcsServer {
std::shared_ptr<RedisClient> GetOrConnectRedis();
/// Gcs server configuration.
GcsServerConfig config_;
const GcsServerConfig config_;
// Type of storage to use.
const std::string storage_type_;
/// The main io service to drive event posted from grpc threads.
instrumented_io_context &main_service_;
/// The io service used by heartbeat manager in case of node failure detector being