Fail calling Cluster() on windows, skip tests appropriately (#20172)

This commit is contained in:
Matti Picus 2021-11-24 23:40:59 +02:00 committed by GitHub
parent c07d8c4c22
commit 8299f94ab8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 53 additions and 20 deletions

View file

@ -14,6 +14,8 @@ from ray import ray_constants
logger = logging.getLogger(__name__)
cluster_not_supported = (os.name == "nt")
class AutoscalingCluster:
"""Create a local autoscaling cluster for testing.
@ -102,6 +104,11 @@ class Cluster:
shutdown_at_exit (bool): If True, registers an exit hook
for shutting down all started processes.
"""
if cluster_not_supported:
logger.warning(
"Ray cluster mode is currently experimental and untested on "
"Windows. If you are using it and running into issues please "
"file a report at https://github.com/ray-project/ray/issues.")
self.head_node = None
self.worker_nodes = set()
self.redis_address = None

View file

@ -9,7 +9,8 @@ import json
import time
import ray
from ray.cluster_utils import Cluster, AutoscalingCluster
from ray.cluster_utils import (Cluster, AutoscalingCluster,
cluster_not_supported)
from ray._private.services import REDIS_EXECUTABLE, _start_redis_instance
from ray._private.test_utils import (init_error_pubsub, init_log_pubsub,
setup_tls, teardown_tls,
@ -117,6 +118,8 @@ def ray_start_10_cpus(request):
@contextmanager
def _ray_start_cluster(**kwargs):
if cluster_not_supported:
pytest.skip("Cluster not supported")
init_kwargs = get_default_fixture_ray_kwargs()
num_nodes = 0
do_init = False
@ -290,6 +293,8 @@ def two_node_cluster():
"object_timeout_milliseconds": 200,
"num_heartbeats_timeout": 10,
}
if cluster_not_supported:
pytest.skip("Cluster not supported")
cluster = ray.cluster_utils.Cluster(
head_node_args={"_system_config": system_config})
for _ in range(2):

View file

@ -371,6 +371,7 @@ def test_illegal_api_calls(ray_start_regular):
@pytest.mark.skipif(
client_test_enabled(), reason="grpc interaction with releasing resources")
@pytest.mark.skipif(sys.platform == "win32", reason="Time out on Windows")
def test_multithreading(ray_start_2_cpus):
# This test requires at least 2 CPUs to finish since the worker does not
# release resources when joining the threads.

View file

@ -6,9 +6,10 @@ import pytest
import numpy as np
import ray
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
@pytest.mark.asyncio
async def test_asyncio_cluster_wait():
cluster = Cluster()

View file

@ -12,7 +12,7 @@ import pytest
from unittest.mock import MagicMock, patch
import ray.cluster_utils
from ray.cluster_utils import Cluster, cluster_not_supported
from ray._private.test_utils import client_test_enabled
from ray.tests.client_test_utils import create_remote_signal_actor
from ray.exceptions import GetTimeoutError
@ -324,10 +324,11 @@ def test_call_chain(ray_start_cluster):
assert ray.get(x) == 100
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
@pytest.mark.skipif(client_test_enabled(), reason="init issue")
def test_system_config_when_connecting(ray_start_cluster):
config = {"object_timeout_milliseconds": 200}
cluster = ray.cluster_utils.Cluster()
cluster = Cluster()
cluster.add_node(
_system_config=config, object_store_memory=100 * 1024 * 1024)
cluster.wait_for_nodes()

View file

@ -38,6 +38,7 @@ from testfixtures.popen import MockPopen, PopenBehaviour
import ray
import ray.autoscaler._private.aws.config as aws_config
from ray.cluster_utils import Cluster, cluster_not_supported
import ray.scripts.scripts as scripts
from ray._private.test_utils import wait_for_condition
@ -494,8 +495,8 @@ def test_ray_status():
ray.shutdown()
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_ray_status_multinode():
from ray.cluster_utils import Cluster
cluster = Cluster()
for _ in range(4):
cluster.add_node(num_cpus=2)

View file

@ -43,7 +43,6 @@ def test_client(address):
assert builder.address == address.replace("ray://", "")
@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
def test_namespace(ray_start_cluster):
"""
Most of the "checks" in this test case rely on the fact that

View file

@ -11,6 +11,7 @@ import ray.util.client.server.server as ray_client_server
import ray.core.generated.ray_client_pb2 as ray_client_pb2
from ray.util.client import _ClientContext, CURRENT_PROTOCOL_VERSION
from ray.cluster_utils import cluster_not_supported
import ray
@ -40,6 +41,7 @@ class C:
return self.val
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
@pytest.fixture
def init_and_serve_lazy():
cluster = ray.cluster_utils.Cluster()

View file

@ -7,7 +7,7 @@ import pytest
import ray
import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
from ray._private.test_utils import (
RayTestTimeoutException,
get_other_nodes,
@ -17,6 +17,7 @@ from ray._private.test_utils import (
SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_workers_separate_multinode(request):
num_nodes = request.param[0]

View file

@ -14,7 +14,7 @@ from ray.autoscaler._private.util import DEBUG_AUTOSCALING_ERROR
import ray._private.utils
from ray.util.placement_group import placement_group
import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
from ray._private.test_utils import (init_error_pubsub, get_error_message,
get_log_batch, Semaphore,
wait_for_condition)
@ -377,6 +377,7 @@ def test_serialized_id(ray_start_cluster):
ray.get(get.remote([obj], True))
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
@pytest.mark.parametrize("use_actors,node_failure",
[(False, False), (False, True), (True, False),
(True, True)])

View file

@ -8,7 +8,7 @@ import psutil
import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
from ray import NodeID
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
@ -94,6 +94,7 @@ def test_retry_application_level_error(ray_start_regular):
ray.get(r3)
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_connect_with_disconnected_node(shutdown_only):
config = {
"num_heartbeats_timeout": 50,

View file

@ -58,6 +58,8 @@ def test_auto_local_gc(shutdown_only):
gc.enable()
@pytest.mark.xfail(
ray.cluster_utils.cluster_not_supported, reason="cluster not supported")
def test_global_gc(shutdown_only):
cluster = ray.cluster_utils.Cluster()
cluster.add_node(
@ -108,6 +110,8 @@ def test_global_gc(shutdown_only):
gc.enable()
@pytest.mark.xfail(
ray.cluster_utils.cluster_not_supported, reason="cluster not supported")
def test_global_gc_when_full(shutdown_only):
cluster = ray.cluster_utils.Cluster()
for _ in range(2):

View file

@ -4,7 +4,7 @@ import time
import pytest
import ray
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
from ray.internal.internal_api import memory_summary
# RayConfig to enable recording call sites during ObjectRej creations.
@ -233,6 +233,7 @@ def test_pinned_object_call_site(ray_start_regular):
assert num_objects(info) == 0, info
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_multi_node_stats(shutdown_only):
# NOTE(mwtian): using env var only enables the feature on workers, while
# using head_node_args={"_system_config": ray_config} only enables the

View file

@ -7,12 +7,13 @@ import pytest
import ray
import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
from ray._private.test_utils import RayTestTimeoutException, get_other_nodes
SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_workers_separate_multinode(request):
num_nodes = request.param[0]

View file

@ -6,7 +6,7 @@ import time
import warnings
import ray
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
from ray.exceptions import GetTimeoutError
if (multiprocessing.cpu_count() < 40
@ -296,6 +296,7 @@ def test_pull_request_retry(shutdown_only):
ray.get(driver.remote())
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_pull_bundles_admission_control(shutdown_only):
cluster = Cluster()
object_size = int(6e6)
@ -329,6 +330,7 @@ def test_pull_bundles_admission_control(shutdown_only):
ray.get(tasks)
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_pull_bundles_pinning(shutdown_only):
cluster = Cluster()
object_size = int(50e6)
@ -353,6 +355,7 @@ def test_pull_bundles_pinning(shutdown_only):
ray.get(foo.remote(*task_args))
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_pull_bundles_admission_control_dynamic(shutdown_only):
# This test is the same as test_pull_bundles_admission_control, except that
# the object store's capacity starts off higher and is later consumed
@ -399,6 +402,7 @@ def test_pull_bundles_admission_control_dynamic(shutdown_only):
del allocated
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_max_pinned_args_memory(shutdown_only):
cluster = Cluster()
cluster.add_node(
@ -431,6 +435,7 @@ def test_max_pinned_args_memory(shutdown_only):
ray.get(large_arg.remote(ref))
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_ray_get_task_args_deadlock(shutdown_only):
cluster = Cluster()
object_size = int(6e6)

View file

@ -15,7 +15,7 @@ from ray.tests.conftest import (file_system_object_spilling_config,
from ray.external_storage import (create_url_with_offset,
parse_url_with_offset)
from ray._private.test_utils import wait_for_condition
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
from ray.internal.internal_api import memory_summary
@ -601,8 +601,7 @@ def test_pull_spilled_object_failure(object_spilling_config,
assert hash_value == hash_value1
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
def test_spill_dir_cleanup_on_raylet_start(object_spilling_config):
object_spilling_config, temp_folder = object_spilling_config
cluster = Cluster()

View file

@ -9,7 +9,7 @@ import pexpect
import pytest
import ray
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
from ray import ray_constants
from ray._private.test_utils import run_string_as_driver, wait_for_condition
from ray._private import services
@ -241,8 +241,7 @@ def test_ray_debugger_public(shutdown_only, call_ray_stop_only,
ray.get(result)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
@pytest.mark.parametrize("ray_debugger_external", [False, True])
def test_ray_debugger_public_multi_node(shutdown_only, ray_debugger_external):
c = Cluster(

View file

@ -475,6 +475,8 @@ def test_pull_manager_at_capacity_reports(ray_start_cluster):
wait_for_condition(lambda: not fetches_queued())
@pytest.mark.xfail(
ray.cluster_utils.cluster_not_supported, reason="cluster not supported")
def build_cluster(num_cpu_nodes, num_gpu_nodes):
cluster = ray.cluster_utils.Cluster()
gpu_ids = [

View file

@ -3,9 +3,10 @@ import pytest
import time
import ray
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_combination(request):
num_nodes = request.param[0]

View file

@ -4,11 +4,12 @@ import sys
import time
import ray
from ray.cluster_utils import Cluster
from ray.cluster_utils import Cluster, cluster_not_supported
import ray.ray_constants as ray_constants
from ray._private.test_utils import get_error_message
@pytest.mark.xfail(cluster_not_supported, reason="cluster not supported")
@pytest.fixture(params=[1, 4])
def ray_start_reconstruction(request):
num_nodes = request.param