Revert "[core] Reenable GCS test with redis as backend. (#23506)"

This reverts commit 8ec558dcb9.
This commit is contained in:
Yi Cheng 2022-05-20 14:47:27 -07:00 committed by GitHub
parent 8960afa69c
commit 22b3d2f4ee
12 changed files with 114 additions and 96 deletions

View file

@ -19,7 +19,7 @@ prelude_commands: &prelude_commands |-
epilogue_commands: &epilogue_commands |-
# Cleanup runtime environment to save storage
rm -rf /tmp/ray || true
rm -rf /tmp/ray
# Cleanup local caches (this shouldn't clean up global disk cache)
bazel clean

View file

@ -324,36 +324,6 @@
- bazel test --config=ci $(./ci/run/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z
python/ray/tests/...
- label: ":redis: (External Redis) (Small & Client)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=client_tests,small_size_python_tests
--test_env=TEST_EXTERNAL_REDIS=1
-- python/ray/tests/...
- label: ":redis: (External Redis) (Large)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
parallelism: 3
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TEST_EXTERNAL_REDIS=1 . ./ci/ci.sh test_large
- label: ":redis: (External Redis) (Medium A-J)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_a_to_j
--test_env=TEST_EXTERNAL_REDIS=1
-- //python/ray/tests/...
- label: ":redis: (External Redis) (Medium K-Z)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z
--test_env=TEST_EXTERNAL_REDIS=1
-- //python/ray/tests/...
- label: ":python: Debug Test"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:

View file

@ -2,7 +2,6 @@ load("@bazel_skylib//lib:paths.bzl", "paths")
# py_test_module_list creates a py_test target for each
# Python file in `files`
def py_test_module_list(files, size, deps, extra_srcs, name_suffix="", **kwargs):
for file in files:
# remove .py

View file

@ -64,12 +64,6 @@ def make_global_state_accessor(ray_context):
return global_state_accessor
def test_external_redis():
import os
return os.environ.get("TEST_EXTERNAL_REDIS") == "1"
def _pid_alive(pid):
"""Check if the process with this PID is alive or not.

View file

@ -87,6 +87,7 @@ RAY_PROCESSES = [
["gcs_server", True],
["monitor.py", False],
["ray.util.client.server", False],
["redis-server", False],
["default_worker.py", False], # Python worker.
["setup_worker.py", False], # Python environment setup worker.
# For mac osx, setproctitle doesn't change the process name returned

View file

@ -984,7 +984,19 @@ def stop(force, grace_period):
proc, proc_cmd, proc_args = candidate
corpus = proc_cmd if filter_by_cmd else subprocess.list2cmdline(proc_args)
if keyword in corpus:
# This is a way to avoid killing redis server that's not started by Ray.
# We are using a simple hacky solution here since
# Redis server will anyway removed soon from the ray repository.
# This feature is only supported on MacOS/Linux temporarily until
# Redis is removed from Ray.
if (
keyword == "redis-server"
and sys.platform != "win32"
and "core/src/ray/thirdparty/redis/src/redis-server" not in corpus
):
continue
found.append(candidate)
for proc, proc_cmd, proc_args in found:
proc_string = str(subprocess.list2cmdline(proc_args))
try:

View file

@ -4,7 +4,7 @@ from ray import serve
from ray.serve.config import ReplicaConfig, DeploymentConfig
from ray.serve.utils import msgpack_serialize
from ray.serve.generated.serve_pb2 import JAVA, RequestMetadata, RequestWrapper
from ray.tests.conftest import shutdown_only, maybe_external_redis # noqa: F401
from ray.tests.conftest import shutdown_only # noqa: F401
def test_controller_starts_java_replica(shutdown_only): # noqa: F811

View file

@ -36,7 +36,6 @@ from ray.serve.utils import block_until_http_ready, get_all_node_ids, format_act
# Explicitly importing it here because it is a ray core tests utility (
# not in the tree)
from ray.tests.conftest import ray_start_with_dashboard # noqa: F401
from ray.tests.conftest import maybe_external_redis # noqa: F401
@pytest.fixture

View file

@ -32,11 +32,17 @@ from ray._private.test_utils import (
setup_tls,
teardown_tls,
get_and_run_node_killer,
test_external_redis,
)
from ray.cluster_utils import Cluster, AutoscalingCluster, cluster_not_supported
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def get_default_fixure_system_config():
system_config = {
"object_timeout_milliseconds": 200,
@ -58,11 +64,10 @@ def get_default_fixture_ray_kwargs():
return ray_kwargs
@contextmanager
def _setup_redis(request):
@pytest.fixture
def external_redis(request, monkeypatch):
# Setup external Redis and env var for initialization.
param = getattr(request, "param", {})
external_redis_ports = param.get("external_redis_ports")
if external_redis_ports is None:
with socket.socket() as s:
@ -83,41 +88,12 @@ def _setup_redis(request):
processes.append(proc)
wait_for_redis_to_start("127.0.0.1", port, ray_constants.REDIS_DEFAULT_PASSWORD)
address_str = ",".join(map(lambda x: f"127.0.0.1:{x}", external_redis_ports))
import os
old_addr = os.environ.get("RAY_REDIS_ADDRESS")
os.environ["RAY_REDIS_ADDRESS"] = address_str
yield
if old_addr is not None:
os.environ["RAY_REDIS_ADDRESS"] = old_addr
else:
del os.environ["RAY_REDIS_ADDRESS"]
monkeypatch.setenv("RAY_REDIS_ADDRESS", address_str)
yield None
for proc in processes:
proc.process.terminate()
@pytest.fixture
def maybe_external_redis(request):
if test_external_redis():
with _setup_redis(request):
yield
else:
yield
@pytest.fixture
def external_redis(request):
with _setup_redis(request):
yield
@pytest.fixture
def shutdown_only(maybe_external_redis):
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@contextmanager
def _ray_start(**kwargs):
init_kwargs = get_default_fixture_ray_kwargs()
@ -131,7 +107,7 @@ def _ray_start(**kwargs):
@pytest.fixture
def ray_start_with_dashboard(request, maybe_external_redis):
def ray_start_with_dashboard(request):
param = getattr(request, "param", {})
if param.get("num_cpus") is None:
param["num_cpus"] = 1
@ -141,7 +117,7 @@ def ray_start_with_dashboard(request, maybe_external_redis):
# The following fixture will start ray with 0 cpu.
@pytest.fixture
def ray_start_no_cpu(request, maybe_external_redis):
def ray_start_no_cpu(request):
param = getattr(request, "param", {})
with _ray_start(num_cpus=0, **param) as res:
yield res
@ -149,7 +125,7 @@ def ray_start_no_cpu(request, maybe_external_redis):
# The following fixture will start ray with 1 cpu.
@pytest.fixture
def ray_start_regular(request, maybe_external_redis):
def ray_start_regular(request):
param = getattr(request, "param", {})
with _ray_start(**param) as res:
yield res
@ -180,14 +156,14 @@ def ray_start_shared_local_modes(request):
@pytest.fixture
def ray_start_2_cpus(request, maybe_external_redis):
def ray_start_2_cpus(request):
param = getattr(request, "param", {})
with _ray_start(num_cpus=2, **param) as res:
yield res
@pytest.fixture
def ray_start_10_cpus(request, maybe_external_redis):
def ray_start_10_cpus(request):
param = getattr(request, "param", {})
with _ray_start(num_cpus=10, **param) as res:
yield res
@ -230,14 +206,14 @@ def _ray_start_cluster(**kwargs):
# This fixture will start a cluster with empty nodes.
@pytest.fixture
def ray_start_cluster(request, maybe_external_redis):
def ray_start_cluster(request):
param = getattr(request, "param", {})
with _ray_start_cluster(**param) as res:
yield res
@pytest.fixture
def ray_start_cluster_enabled(request, maybe_external_redis):
def ray_start_cluster_enabled(request):
param = getattr(request, "param", {})
param["skip_cluster"] = False
with _ray_start_cluster(**param) as res:
@ -245,14 +221,14 @@ def ray_start_cluster_enabled(request, maybe_external_redis):
@pytest.fixture
def ray_start_cluster_init(request, maybe_external_redis):
def ray_start_cluster_init(request):
param = getattr(request, "param", {})
with _ray_start_cluster(do_init=True, **param) as res:
yield res
@pytest.fixture
def ray_start_cluster_head(request, maybe_external_redis):
def ray_start_cluster_head(request):
param = getattr(request, "param", {})
with _ray_start_cluster(do_init=True, num_nodes=1, **param) as res:
yield res
@ -269,14 +245,14 @@ def ray_start_cluster_head_with_external_redis(request, external_redis):
@pytest.fixture
def ray_start_cluster_2_nodes(request, maybe_external_redis):
def ray_start_cluster_2_nodes(request):
param = getattr(request, "param", {})
with _ray_start_cluster(do_init=True, num_nodes=2, **param) as res:
yield res
@pytest.fixture
def ray_start_object_store_memory(request, maybe_external_redis):
def ray_start_object_store_memory(request):
# Start the Ray processes.
store_size = request.param
system_config = get_default_fixure_system_config()

View file

@ -10,7 +10,6 @@ from unittest.mock import patch
import grpc
import ray
from ray.ray_constants import REDIS_DEFAULT_PASSWORD
import ray.core.generated.ray_client_pb2 as ray_client_pb2
from ray.cloudpickle.compat import pickle
from ray.job_config import JobConfig
@ -19,12 +18,12 @@ from ray._private.test_utils import run_string_as_driver
def start_ray_and_proxy_manager(n_ports=2):
ray_instance = ray.init(_redis_password=REDIS_DEFAULT_PASSWORD)
ray_instance = ray.init(_redis_password="test")
agent_port = ray.worker.global_worker.node.metrics_agent_port
pm = proxier.ProxyManager(
ray_instance["address"],
session_dir=ray_instance["session_dir"],
redis_password=REDIS_DEFAULT_PASSWORD,
redis_password="test",
runtime_env_agent_port=agent_port,
)
free_ports = random.choices(range(45000, 45100), k=n_ports)

View file

@ -16,7 +16,6 @@ from ray.util.multiprocessing import Pool, TimeoutError, JoinableQueue
from ray.util.joblib import register_ray
from joblib import parallel_backend, Parallel, delayed
from ray._private.test_utils import test_external_redis
def teardown_function(function):
@ -69,9 +68,6 @@ def ray_start_4_cpu():
ray.shutdown()
@pytest.mark.skipif(
test_external_redis(), reason="The same Redis is used within the test."
)
def test_ray_init(shutdown_only):
def getpid(args):
return os.getpid()
@ -121,9 +117,6 @@ def test_ray_init(shutdown_only):
],
indirect=True,
)
@pytest.mark.skipif(
test_external_redis(), reason="The same Redis is used within the test."
)
def test_connect_to_ray(ray_start_cluster):
def getpid(args):
return os.getpid()

View file

@ -3,6 +3,7 @@ import sys
import logging
import pytest
import redis
import unittest.mock
import ray
import ray._private.services
@ -15,6 +16,80 @@ from ray.util.client.worker import Worker
import grpc
@pytest.fixture
def password():
random_bytes = os.urandom(128)
if hasattr(random_bytes, "hex"):
return random_bytes.hex() # Python 3
return random_bytes.encode("hex") # Python 2
class TestRedisPassword:
@pytest.mark.skipif(
True, reason="Not valid anymore. To be added back when fixing Redis mode"
)
def test_redis_password(self, password, shutdown_only):
@ray.remote
def f():
return 1
info = ray.init(_redis_password=password)
address = info["redis_address"]
redis_ip, redis_port = address.split(":")
# Check that we can run a task
object_ref = f.remote()
ray.get(object_ref)
# Check that Redis connections require a password
redis_client = redis.StrictRedis(host=redis_ip, port=redis_port, password=None)
with pytest.raises(redis.exceptions.AuthenticationError):
redis_client.ping()
# We want to simulate how this is called by ray.scripts.start().
try:
ray._private.services.wait_for_redis_to_start(
redis_ip, redis_port, password="wrong password"
)
# We catch a generic Exception here in case someone later changes the
# type of the exception.
except Exception as ex:
if not (
isinstance(ex.__cause__, redis.AuthenticationError)
and "invalid password" in str(ex.__cause__)
) and not (
isinstance(ex, redis.ResponseError)
and "WRONGPASS invalid username-password pair" in str(ex)
):
raise
# By contrast, we may be fairly confident the exact string
# 'invalid password' won't go away, because redis-py simply wraps
# the exact error from the Redis library.
# https://github.com/andymccurdy/redis-py/blob/master/
# redis/connection.py#L132
# Except, apparently sometimes redis-py raises a completely
# different *type* of error for a bad password,
# redis.ResponseError, which is not even derived from
# redis.ConnectionError as redis.AuthenticationError is.
# Check that we can connect to Redis using the provided password
redis_client = redis.StrictRedis(
host=redis_ip, port=redis_port, password=password
)
assert redis_client.ping()
def test_redis_password_cluster(self, password, shutdown_only):
@ray.remote
def f():
return 1
node_args = {"redis_password": password}
cluster = Cluster(initialize_head=True, connect=True, head_node_args=node_args)
cluster.add_node(**node_args)
object_ref = f.remote()
ray.get(object_ref)
def test_shutdown_and_reset_global_worker(shutdown_only):
ray.init(job_config=ray.job_config.JobConfig(code_search_path=["a"]))
ray.shutdown()