Refactor pytest fixtures for ray core (#4390)

This commit is contained in:
Yuhong Guo 2019-03-20 11:48:32 +08:00 committed by Hao Chen
parent c6f15a0057
commit 8ce7565530
22 changed files with 378 additions and 681 deletions

View file

@ -0,0 +1,163 @@
"""
This file defines the common pytest fixtures used in current directory.
"""
from contextlib import contextmanager
import json
import pytest
import subprocess
import ray
from ray.tests.cluster_utils import Cluster
from ray.tests.utils import run_and_get_output
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def generate_internal_config_map(**kwargs):
internal_config = json.dumps(kwargs)
ray_kwargs = {
"_internal_config": internal_config,
}
return ray_kwargs
def get_default_fixure_internal_config():
internal_config = json.dumps({
"initial_reconstruction_timeout_milliseconds": 200,
"num_heartbeats_timeout": 10,
})
return internal_config
def get_default_fixture_ray_kwargs():
internal_config = get_default_fixure_internal_config()
ray_kwargs = {
"num_cpus": 1,
"object_store_memory": 10**8,
"_internal_config": internal_config,
}
return ray_kwargs
@contextmanager
def _ray_start(**kwargs):
init_kwargs = get_default_fixture_ray_kwargs()
init_kwargs.update(kwargs)
# Start the Ray processes.
address_info = ray.init(**init_kwargs)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
# The following fixture will start ray with 1 cpu.
@pytest.fixture
def ray_start_regular(request):
param = getattr(request, "param", {})
with _ray_start(**param) as res:
yield res
@pytest.fixture
def ray_start_2_cpus(request):
param = getattr(request, "param", {})
with _ray_start(num_cpus=2, **param) as res:
yield res
@pytest.fixture
def ray_start_10_cpus(request):
param = getattr(request, "param", {})
with _ray_start(num_cpus=10, **param) as res:
yield res
@contextmanager
def _ray_start_cluster(**kwargs):
init_kwargs = get_default_fixture_ray_kwargs()
num_nodes = 0
do_init = False
# num_nodes & do_init are not arguments for ray.init, so delete them.
if "num_nodes" in kwargs:
num_nodes = kwargs["num_nodes"]
del kwargs["num_nodes"]
if "do_init" in kwargs:
do_init = kwargs["do_init"]
del kwargs["do_init"]
elif num_nodes > 0:
do_init = True
init_kwargs.update(kwargs)
cluster = Cluster()
remote_nodes = []
for _ in range(num_nodes):
remote_nodes.append(cluster.add_node(**init_kwargs))
if do_init:
ray.init(redis_address=cluster.redis_address)
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
# This fixture will start a cluster with empty nodes.
@pytest.fixture
def ray_start_cluster(request):
param = getattr(request, "param", {})
with _ray_start_cluster(**param) as res:
yield res
@pytest.fixture
def ray_start_cluster_head(request):
param = getattr(request, "param", {})
with _ray_start_cluster(do_init=True, num_nodes=1, **param) as res:
yield res
@pytest.fixture
def ray_start_cluster_2_nodes(request):
param = getattr(request, "param", {})
with _ray_start_cluster(do_init=True, num_nodes=2, **param) as res:
yield res
@pytest.fixture
def ray_start_object_store_memory(request):
# Start the Ray processes.
store_size = request.param
internal_config = get_default_fixure_internal_config()
init_kwargs = {
"num_cpus": 1,
"_internal_config": internal_config,
"object_store_memory": store_size,
}
ray.init(**init_kwargs)
yield store_size
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture
def call_ray_start(request):
parameter = getattr(request, "param", "ray start --head --num-cpus=1")
command_args = parameter.split(" ")
out = run_and_get_output(command_args)
# Get the redis address from the output.
redis_substring_prefix = "redis_address=\""
redis_address_location = (
out.find(redis_substring_prefix) + len(redis_substring_prefix))
redis_address = out[redis_address_location:]
redis_address = redis_address.split("\"")[0]
yield redis_address
# Disconnect from the Ray cluster.
ray.shutdown()
# Kill the Ray cluster.
subprocess.Popen(["ray", "stop"]).wait()

View file

@ -16,82 +16,13 @@ import ray
import ray.ray_constants as ray_constants
import ray.tests.utils
import ray.tests.cluster_utils
from ray.tests.conftest import generate_internal_config_map
from ray.tests.utils import (
wait_for_errors,
relevant_errors,
wait_for_errors,
)
@pytest.fixture
def ray_start_regular(request):
internal_config = {
"initial_reconstruction_timeout_milliseconds": 200,
"num_heartbeats_timeout": 10,
}
internal_config.update(getattr(request, "param", {}))
# Start the Ray processes.
ray.init(
num_cpus=1,
_internal_config=json.dumps(internal_config),
)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture()
def ray_start_cluster():
cluster = ray.tests.cluster_utils.Cluster()
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
@pytest.fixture()
def two_node_cluster():
internal_config = json.dumps({
"initial_reconstruction_timeout_milliseconds": 200,
"num_heartbeats_timeout": 10,
})
cluster = ray.tests.cluster_utils.Cluster()
for _ in range(2):
remote_node = cluster.add_node(
num_cpus=1, _internal_config=internal_config)
ray.init(redis_address=cluster.redis_address)
yield cluster, remote_node
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
@pytest.fixture
def head_node_cluster(request):
timeout = getattr(request, "param", 200)
cluster = ray.tests.cluster_utils.Cluster(
initialize_head=True,
connect=True,
head_node_args={
"_internal_config": json.dumps({
"initial_reconstruction_timeout_milliseconds": timeout,
"num_heartbeats_timeout": 10,
})
})
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
@pytest.fixture
def ray_checkpointable_actor_cls(request):
checkpoint_dir = "/tmp/ray_temp_checkpoint_dir/"
@ -610,16 +541,7 @@ def test_multiple_actors(ray_start_regular):
assert v == num_actors * [j + 1]
@pytest.fixture
def ray_start_bigger():
# Start the Ray processes.
ray.init(num_cpus=10)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_remote_function_within_actor(ray_start_bigger):
def test_remote_function_within_actor(ray_start_10_cpus):
# Make sure we can use remote funtions within actors.
# Create some values to close over.
@ -667,7 +589,7 @@ def test_remote_function_within_actor(ray_start_bigger):
range(1, 6))
def test_define_actor_within_actor(ray_start_bigger):
def test_define_actor_within_actor(ray_start_10_cpus):
# Make sure we can use remote funtions within actors.
@ray.remote
@ -694,7 +616,7 @@ def test_define_actor_within_actor(ray_start_bigger):
assert ray.get(actor1.get_values.remote(5)) == (3, 5)
def test_use_actor_within_actor(ray_start_bigger):
def test_use_actor_within_actor(ray_start_10_cpus):
# Make sure we can use actors within actors.
@ray.remote
@ -718,7 +640,7 @@ def test_use_actor_within_actor(ray_start_bigger):
assert ray.get(actor2.get_values.remote(5)) == (3, 4)
def test_define_actor_within_remote_function(ray_start_bigger):
def test_define_actor_within_remote_function(ray_start_10_cpus):
# Make sure we can define and actors within remote funtions.
@ray.remote
@ -739,7 +661,7 @@ def test_define_actor_within_remote_function(ray_start_bigger):
[f.remote(i, 20) for i in range(10)]) == [20 * [i] for i in range(10)]
def test_use_actor_within_remote_function(ray_start_bigger):
def test_use_actor_within_remote_function(ray_start_10_cpus):
# Make sure we can create and use actors within remote funtions.
@ray.remote
@ -758,7 +680,7 @@ def test_use_actor_within_remote_function(ray_start_bigger):
assert ray.get(f.remote(3)) == 3
def test_actor_import_counter(ray_start_bigger):
def test_actor_import_counter(ray_start_10_cpus):
# This is mostly a test of the export counters to make sure that when
# an actor is imported, all of the necessary remote functions have been
# imported.
@ -1379,8 +1301,9 @@ def test_blocking_actor_task(shutdown_only):
assert remaining_ids == [x_id]
def test_exception_raised_when_actor_node_dies(head_node_cluster):
remote_node = head_node_cluster.add_node()
def test_exception_raised_when_actor_node_dies(ray_start_cluster_head):
cluster = ray_start_cluster_head
remote_node = cluster.add_node()
@ray.remote
class Counter(object):
@ -1401,7 +1324,7 @@ def test_exception_raised_when_actor_node_dies(head_node_cluster):
actor = Counter.remote()
# Kill the second node.
head_node_cluster.remove_node(remote_node)
cluster.remove_node(remote_node)
# Submit some new actor tasks both before and after the node failure is
# detected. Make sure that getting the result raises an exception.
@ -1419,8 +1342,9 @@ def test_exception_raised_when_actor_node_dies(head_node_cluster):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_actor_init_fails(head_node_cluster):
remote_node = head_node_cluster.add_node()
def test_actor_init_fails(ray_start_cluster_head):
cluster = ray_start_cluster_head
remote_node = cluster.add_node()
@ray.remote(max_reconstructions=1)
class Counter(object):
@ -1436,16 +1360,17 @@ def test_actor_init_fails(head_node_cluster):
# Allow some time to forward the actor creation tasks to the other node.
time.sleep(0.1)
# Kill the second node.
head_node_cluster.remove_node(remote_node)
cluster.remove_node(remote_node)
# Get all of the results
results = ray.get([actor.inc.remote() for actor in actors])
assert results == [1 for actor in actors]
def test_reconstruction_suppression(head_node_cluster):
def test_reconstruction_suppression(ray_start_cluster_head):
cluster = ray_start_cluster_head
num_nodes = 10
worker_nodes = [head_node_cluster.add_node() for _ in range(num_nodes)]
worker_nodes = [cluster.add_node() for _ in range(num_nodes)]
@ray.remote(max_reconstructions=1)
class Counter(object):
@ -1465,7 +1390,7 @@ def test_reconstruction_suppression(head_node_cluster):
ray.get([actor.inc.remote() for actor in actors])
# Kill a node.
head_node_cluster.remove_node(worker_nodes[0])
cluster.remove_node(worker_nodes[0])
# Submit several tasks per actor. These should be randomly scheduled to the
# nodes, so that multiple nodes will detect and try to reconstruct the
@ -1537,8 +1462,8 @@ def setup_counter_actor(test_checkpoint=False,
@pytest.mark.skip("Fork/join consistency not yet implemented.")
def test_distributed_handle(two_node_cluster):
cluster = two_node_cluster
def test_distributed_handle(ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
counter, ids = setup_counter_actor(test_checkpoint=False)
@ray.remote
@ -1575,8 +1500,8 @@ def test_distributed_handle(two_node_cluster):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_remote_checkpoint_distributed_handle(two_node_cluster):
cluster = two_node_cluster
def test_remote_checkpoint_distributed_handle(ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
counter, ids = setup_counter_actor(test_checkpoint=True)
@ray.remote
@ -1616,8 +1541,8 @@ def test_remote_checkpoint_distributed_handle(two_node_cluster):
@pytest.mark.skip("Fork/join consistency not yet implemented.")
def test_checkpoint_distributed_handle(two_node_cluster):
cluster = two_node_cluster
def test_checkpoint_distributed_handle(ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
counter, ids = setup_counter_actor(test_checkpoint=True)
@ray.remote
@ -1719,16 +1644,17 @@ def _test_nondeterministic_reconstruction(
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Currently doesn't work with the new GCS.")
def test_nondeterministic_reconstruction(two_node_cluster):
cluster = two_node_cluster
def test_nondeterministic_reconstruction(ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
_test_nondeterministic_reconstruction(cluster, 10, 100, 10)
@pytest.mark.skip("Nondeterministic reconstruction currently not supported "
"when there are concurrent forks that didn't finish "
"initial execution.")
def test_nondeterministic_reconstruction_concurrent_forks(two_node_cluster):
cluster = two_node_cluster
def test_nondeterministic_reconstruction_concurrent_forks(
ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
_test_nondeterministic_reconstruction(cluster, 10, 100, 1)
@ -2098,7 +2024,11 @@ def test_creating_more_actors_than_resources(shutdown_only):
ray.get(results)
def test_actor_eviction(shutdown_only):
@pytest.mark.parametrize(
"ray_start_object_store_memory", [10**8], indirect=True)
def test_actor_eviction(ray_start_object_store_memory):
object_store_memory = ray_start_object_store_memory
@ray.remote
class Actor(object):
def __init__(self):
@ -2107,13 +2037,6 @@ def test_actor_eviction(shutdown_only):
def create_object(self, size):
return np.random.rand(size)
object_store_memory = 10**8
ray.init(
object_store_memory=object_store_memory,
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 200
}))
a = Actor.remote()
# Submit enough methods on the actor so that they exceed the size of the
# object store.
@ -2189,9 +2112,9 @@ def test_actor_reconstruction(ray_start_regular):
ray.get(actor.increase.remote())
def test_actor_reconstruction_on_node_failure(head_node_cluster):
def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
"""Test actor reconstruction when node dies unexpectedly."""
cluster = head_node_cluster
cluster = ray_start_cluster_head
max_reconstructions = 3
# Add a few nodes to the cluster.
# Use custom resource to make sure the actor is only created on worker
@ -2253,8 +2176,14 @@ def test_actor_reconstruction_on_node_failure(head_node_cluster):
# this test. Because if this value is too small, suprious task reconstruction
# may happen and cause the test fauilure. If the value is too large, this test
# could be very slow. We can remove this once we support dynamic timeout.
@pytest.mark.parametrize("head_node_cluster", [1000], indirect=True)
def test_multiple_actor_reconstruction(head_node_cluster):
@pytest.mark.parametrize(
"ray_start_cluster_head", [
generate_internal_config_map(
initial_reconstruction_timeout_milliseconds=1000)
],
indirect=True)
def test_multiple_actor_reconstruction(ray_start_cluster_head):
cluster = ray_start_cluster_head
# This test can be made more stressful by increasing the numbers below.
# The total number of actors created will be
# num_actors_at_a_time * num_nodes.
@ -2263,7 +2192,7 @@ def test_multiple_actor_reconstruction(head_node_cluster):
num_function_calls_at_a_time = 10
worker_nodes = [
head_node_cluster.add_node(
cluster.add_node(
num_cpus=3,
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 200,
@ -2303,7 +2232,7 @@ def test_multiple_actor_reconstruction(head_node_cluster):
for _ in range(num_function_calls_at_a_time):
result_ids[actor].append(actor.inc.remote(j**2 * 0.000001))
# Kill a node.
head_node_cluster.remove_node(node)
cluster.remove_node(node)
# Run some more methods.
for j in range(len(actors)):
@ -2401,15 +2330,16 @@ def test_remote_checkpointing(ray_start_regular, ray_checkpointable_actor_cls):
assert ray.get(actor.was_resumed_from_checkpoint.remote()) is True
def test_checkpointing_on_node_failure(two_node_cluster,
def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes,
ray_checkpointable_actor_cls):
"""Test actor checkpointing on a remote node."""
# Place the actor on the remote node.
cluster, remote_node = two_node_cluster
cluster = ray_start_cluster_2_nodes
remote_node = [node for node in cluster.worker_nodes]
actor_cls = ray.remote(max_reconstructions=1)(ray_checkpointable_actor_cls)
actor = actor_cls.remote()
while (ray.get(actor.local_plasma.remote()) !=
remote_node.plasma_store_socket_name):
remote_node[0].plasma_store_socket_name):
actor = actor_cls.remote()
# Call increase several times.
@ -2420,7 +2350,7 @@ def test_checkpointing_on_node_failure(two_node_cluster,
# Assert that the actor wasn't resumed from a checkpoint.
assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False
# Kill actor process.
cluster.remove_node(remote_node)
cluster.remove_node(remote_node[0])
# Assert that the actor was resumed from a checkpoint and its value is
# still correct.
assert ray.get(actor.get.remote()) == expected
@ -2519,9 +2449,7 @@ def test_checkpointing_load_exception(ray_start_regular,
"ray_start_regular",
# This overwrite currently isn't effective,
# see https://github.com/ray-project/ray/issues/3926.
[{
"num_actor_checkpoints_to_keep": 20
}],
[generate_internal_config_map(num_actor_checkpoints_to_keep=20)],
indirect=True,
)
def test_deleting_actor_checkpoint(ray_start_regular):

View file

@ -17,19 +17,12 @@ if sys.version_info >= (3, 0):
@pytest.fixture
def ray_start_regular():
for module in [
ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg
]:
reload(module)
# Start the Ray processes.
ray.init(num_cpus=2)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def reload_modules():
modules = [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]
[reload(module) for module in modules]
def test_remote_array_methods(ray_start_regular):
def test_remote_array_methods(ray_start_2_cpus, reload_modules):
# test eye
object_id = ra.eye.remote(3)
val = ray.get(object_id)
@ -56,7 +49,7 @@ def test_remote_array_methods(ray_start_regular):
assert_almost_equal(np.dot(q_val, r_val), a_val)
def test_distributed_array_assemble(ray_start_regular):
def test_distributed_array_assemble(ray_start_2_cpus, reload_modules):
a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
x = da.DistArray([2 * da.BLOCK_SIZE, da.BLOCK_SIZE], np.array([[a], [b]]))
@ -68,25 +61,7 @@ def test_distributed_array_assemble(ray_start_regular):
]))
@pytest.fixture
def ray_start_two_nodes():
for module in [
ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg
]:
reload(module)
# Start the Ray processes.
cluster = ray.tests.cluster_utils.Cluster()
for _ in range(2):
cluster.add_node(num_cpus=10)
ray.init(redis_address=cluster.redis_address)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
def test_distributed_array_methods(ray_start_two_nodes):
def test_distributed_array_methods(ray_start_cluster_2_nodes, reload_modules):
x = da.zeros.remote([9, 25, 51], "float")
assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51]))

View file

@ -33,23 +33,7 @@ from ray.utils import _random_string
logger = logging.getLogger(__name__)
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=1)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_simple_serialization(ray_start):
def test_simple_serialization(ray_start_regular):
primitive_objects = [
# Various primitive types.
0,
@ -116,7 +100,7 @@ def test_simple_serialization(ray_start):
assert type(obj) == type(new_obj_2)
def test_complex_serialization(ray_start):
def test_complex_serialization(ray_start_regular):
def assert_equal(obj1, obj2):
module_numpy = (type(obj1).__module__ == np.__name__
or type(obj2).__module__ == np.__name__)
@ -319,7 +303,7 @@ def test_complex_serialization(ray_start):
assert_equal(obj, ray.get(ray.put(obj)))
def test_ray_recursive_objects(ray_start):
def test_ray_recursive_objects(ray_start_regular):
class ClassA(object):
pass
@ -347,7 +331,7 @@ def test_ray_recursive_objects(ray_start):
ray.put(obj)
def test_passing_arguments_by_value_out_of_the_box(ray_start):
def test_passing_arguments_by_value_out_of_the_box(ray_start_regular):
@ray.remote
def f(x):
return x
@ -379,7 +363,7 @@ def test_passing_arguments_by_value_out_of_the_box(ray_start):
ray.get(ray.put(Foo))
def test_putting_object_that_closes_over_object_id(ray_start):
def test_putting_object_that_closes_over_object_id(ray_start_regular):
# This test is here to prevent a regression of
# https://github.com/ray-project/ray/issues/1317.
@ -422,9 +406,7 @@ def test_put_get(shutdown_only):
assert value_before == value_after
def test_custom_serializers(shutdown_only):
ray.init(num_cpus=1)
def test_custom_serializers(ray_start_regular):
class Foo(object):
def __init__(self):
self.x = 3
@ -454,7 +436,7 @@ def test_custom_serializers(shutdown_only):
assert ray.get(f.remote()) == ((3, "string1", Bar.__name__), "string2")
def test_serialization_final_fallback(ray_start):
def test_serialization_final_fallback(ray_start_regular):
pytest.importorskip("catboost")
# This test will only run when "catboost" is installed.
from catboost import CatBoostClassifier
@ -471,9 +453,7 @@ def test_serialization_final_fallback(ray_start):
reconstructed_model.get_params().items())
def test_register_class(shutdown_only):
ray.init(num_cpus=2)
def test_register_class(ray_start_2_cpus):
# Check that putting an object of a class that has not been registered
# throws an exception.
class TempClass(object):
@ -616,7 +596,7 @@ def test_register_class(shutdown_only):
assert not hasattr(c2, "method1")
def test_keyword_args(shutdown_only):
def test_keyword_args(ray_start_regular):
@ray.remote
def keyword_fct1(a, b="hello"):
return "{} {}".format(a, b)
@ -629,8 +609,6 @@ def test_keyword_args(shutdown_only):
def keyword_fct3(a, b, c="hello", d="world"):
return "{} {} {} {}".format(a, b, c, d)
ray.init(num_cpus=1)
x = keyword_fct1.remote(1)
assert ray.get(x) == "1 hello"
x = keyword_fct1.remote(1, "hi")
@ -886,8 +864,7 @@ def test_submit_api(shutdown_only):
assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2]
def test_get_multiple(shutdown_only):
ray.init(num_cpus=1)
def test_get_multiple(ray_start_regular):
object_ids = [ray.put(i) for i in range(10)]
assert ray.get(object_ids) == list(range(10))
@ -898,8 +875,7 @@ def test_get_multiple(shutdown_only):
assert results == indices
def test_get_multiple_experimental(shutdown_only):
ray.init(num_cpus=1)
def test_get_multiple_experimental(ray_start_regular):
object_ids = [ray.put(i) for i in range(10)]
object_ids_tuple = tuple(object_ids)
@ -909,8 +885,7 @@ def test_get_multiple_experimental(shutdown_only):
assert ray.experimental.get(object_ids_nparray) == list(range(10))
def test_get_dict(shutdown_only):
ray.init(num_cpus=1)
def test_get_dict(ray_start_regular):
d = {str(i): ray.put(i) for i in range(5)}
for i in range(5, 10):
d[str(i)] = i
@ -919,9 +894,7 @@ def test_get_dict(shutdown_only):
assert result == expected
def test_wait(shutdown_only):
ray.init(num_cpus=1)
def test_wait(ray_start_regular):
@ray.remote
def f(delay):
time.sleep(delay)
@ -976,9 +949,7 @@ def test_wait(shutdown_only):
ray.wait([1])
def test_wait_iterables(shutdown_only):
ray.init(num_cpus=1)
def test_wait_iterables(ray_start_regular):
@ray.remote
def f(delay):
time.sleep(delay)
@ -1075,9 +1046,7 @@ def test_caching_functions_to_run(shutdown_only):
ray.worker.global_worker.run_function_on_all_workers(f)
def test_running_function_on_all_workers(shutdown_only):
ray.init(num_cpus=1)
def test_running_function_on_all_workers(ray_start_regular):
def f(worker_info):
sys.path.append("fake_directory")
@ -1104,9 +1073,7 @@ def test_running_function_on_all_workers(shutdown_only):
assert "fake_directory" not in ray.get(get_path2.remote())
def test_profiling_api(shutdown_only):
ray.init(num_cpus=2)
def test_profiling_api(ray_start_2_cpus):
@ray.remote
def f():
with ray.profile(
@ -1150,16 +1117,6 @@ def test_profiling_api(shutdown_only):
break
@pytest.fixture()
def ray_start_cluster():
cluster = ray.tests.cluster_utils.Cluster()
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
def test_wait_cluster(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1, resources={"RemoteResource": 1})
@ -1227,10 +1184,9 @@ def test_object_transfer_dump(ray_start_cluster):
}) == num_nodes
def test_identical_function_names(shutdown_only):
def test_identical_function_names(ray_start_regular):
# Define a bunch of remote functions and make sure that we don't
# accidentally call an older version.
ray.init(num_cpus=1)
num_calls = 200
@ -1294,8 +1250,7 @@ def test_identical_function_names(shutdown_only):
assert result_values == num_calls * [5]
def test_illegal_api_calls(shutdown_only):
ray.init(num_cpus=1)
def test_illegal_api_calls(ray_start_regular):
# Verify that we cannot call put on an ObjectID.
x = ray.put(1)
@ -1310,10 +1265,9 @@ def test_illegal_api_calls(shutdown_only):
# because plasma client isn't thread-safe. This needs to be fixed from the
# Arrow side. See #4107 for relevant discussions.
@pytest.mark.skipif(six.PY2, reason="Doesn't work in Python 2.")
def test_multithreading(shutdown_only):
def test_multithreading(ray_start_2_cpus):
# This test requires at least 2 CPUs to finish since the worker does not
# release resources when joining the threads.
ray.init(num_cpus=2)
def run_test_in_multi_threads(test_case, num_threads=10, num_repeats=25):
"""A helper function that runs test cases in multiple threads."""
@ -2273,9 +2227,7 @@ def test_specific_gpus(save_gpu_ids_shutdown_only):
ray.get([g.remote() for _ in range(100)])
def test_blocking_tasks(shutdown_only):
ray.init(num_cpus=1)
def test_blocking_tasks(ray_start_regular):
@ray.remote
def f(i, j):
return (i, j)
@ -2310,9 +2262,7 @@ def test_blocking_tasks(shutdown_only):
ray.get(sleep.remote())
def test_max_call_tasks(shutdown_only):
ray.init(num_cpus=1)
def test_max_call_tasks(ray_start_regular):
@ray.remote(max_calls=1)
def f():
return os.getpid()
@ -2692,9 +2642,7 @@ def test_wait_reconstruction(shutdown_only):
assert len(ready_ids) == 1
def test_ray_setproctitle(shutdown_only):
ray.init(num_cpus=2)
def test_ray_setproctitle(ray_start_2_cpus):
@ray.remote
class UniqueName(object):
def __init__(self):
@ -2739,9 +2687,7 @@ def test_duplicate_error_messages(shutdown_only):
@pytest.mark.skipif(
os.getenv("TRAVIS") is None,
reason="This test should only be run on Travis.")
def test_ray_stack(shutdown_only):
ray.init(num_cpus=2)
def test_ray_stack(ray_start_2_cpus):
def unique_name_1():
time.sleep(1000)
@ -2797,9 +2743,7 @@ def test_socket_dir_not_existing(shutdown_only):
ray.init(num_cpus=1, raylet_socket_name=temp_raylet_socket_name)
def test_raylet_is_robust_to_random_messages(shutdown_only):
ray.init(num_cpus=1)
def test_raylet_is_robust_to_random_messages(ray_start_regular):
node_manager_address = None
node_manager_port = None
for client in ray.global_state.client_table():
@ -2820,7 +2764,7 @@ def test_raylet_is_robust_to_random_messages(shutdown_only):
assert ray.get(f.remote()) == 1
def test_non_ascii_comment(ray_start):
def test_non_ascii_comment(ray_start_regular):
@ray.remote
def f():
# 日本語 Japanese comment

View file

@ -2,22 +2,12 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import time
import ray
from ray.experimental.streaming.batched_queue import BatchedQueue
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=2)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@ray.remote
class Reader(object):
def __init__(self, queue):
@ -36,7 +26,7 @@ class Reader(object):
time.sleep(0.001)
def test_batched_queue(ray_start):
def test_batched_queue(ray_start_regular):
# Batched queue parameters
max_queue_size = 10000 # Max number of batches in queue
max_batch_size = 1000 # Max number of elements per batch

View file

@ -3,7 +3,6 @@ from __future__ import division
from __future__ import print_function
import os
import json
import signal
import sys
import time
@ -17,43 +16,12 @@ from ray.tests.cluster_utils import Cluster
from ray.tests.utils import run_string_as_driver_nonblocking
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture
def ray_start_cluster():
node_args = {
"num_cpus": 4,
"_internal_config": json.dumps({
"initial_reconstruction_timeout_milliseconds": 1000,
"num_heartbeats_timeout": 10
})
}
# Start with 3 worker nodes and 4 cores each.
cluster = Cluster(
initialize_head=True, connect=True, head_node_args=node_args)
workers = []
for _ in range(3):
workers.append(cluster.add_node(**node_args))
cluster.wait_for_nodes()
yield cluster
ray.shutdown()
cluster.shutdown()
# This test checks that when a worker dies in the middle of a get, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_worker_get(shutdown_only):
# Start the Ray processes.
ray.init(num_cpus=2)
def test_dying_worker_get(ray_start_2_cpus):
@ray.remote
def sleep_forever():
time.sleep(10**6)
@ -100,9 +68,9 @@ def test_dying_worker_get(shutdown_only):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_driver_get(shutdown_only):
def test_dying_driver_get(ray_start_regular):
# Start the Ray processes.
address_info = ray.init(num_cpus=1)
address_info = ray_start_regular
@ray.remote
def sleep_forever():
@ -143,9 +111,7 @@ ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}")))
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_worker_wait(shutdown_only):
ray.init(num_cpus=2)
def test_dying_worker_wait(ray_start_2_cpus):
@ray.remote
def sleep_forever():
time.sleep(10**6)
@ -185,9 +151,9 @@ def test_dying_worker_wait(shutdown_only):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_driver_wait(shutdown_only):
def test_dying_driver_wait(ray_start_regular):
# Start the Ray processes.
address_info = ray.init(num_cpus=1)
address_info = ray_start_regular
@ray.remote
def sleep_forever():
@ -283,28 +249,6 @@ def test_worker_failed(ray_start_workers_separate_multinode):
pass
@pytest.fixture
def ray_initialize_cluster():
# Start with 4 workers and 4 cores.
num_nodes = 4
num_workers_per_scheduler = 8
cluster = Cluster()
for _ in range(num_nodes):
cluster.add_node(
num_cpus=num_workers_per_scheduler,
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 1000,
"num_heartbeats_timeout": 10,
}))
ray.init(redis_address=cluster.redis_address)
yield cluster
ray.shutdown()
cluster.shutdown()
def _test_component_failed(cluster, component_type):
"""Kill a component on all worker nodes and check workload succeeds."""
# Submit many tasks with many dependencies.
@ -364,8 +308,13 @@ def check_components_alive(cluster, component_type, check_component_alive):
assert not process.poll() is None
def test_raylet_failed(ray_initialize_cluster):
cluster = ray_initialize_cluster
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 4
}], indirect=True)
def test_raylet_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all local schedulers on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_RAYLET)
@ -377,8 +326,13 @@ def test_raylet_failed(ray_initialize_cluster):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_plasma_store_failed(ray_initialize_cluster):
cluster = ray_initialize_cluster
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 4
}], indirect=True)
def test_plasma_store_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all plasma stores on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE)
@ -388,6 +342,13 @@ def test_plasma_store_failed(ray_initialize_cluster):
check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False)
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 4,
"num_nodes": 3,
"do_init": True
}],
indirect=True)
def test_actor_creation_node_failure(ray_start_cluster):
# TODO(swang): Refactor test_raylet_failed, etc to reuse the below code.
cluster = ray_start_cluster
@ -434,8 +395,7 @@ def test_actor_creation_node_failure(ray_start_cluster):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_sequential(shutdown_only):
ray.init(num_cpus=1)
def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_raylet()
ray.worker._global_node.kill_plasma_store()
ray.worker._global_node.kill_log_monitor()
@ -448,8 +408,7 @@ def test_driver_lives_sequential(shutdown_only):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_parallel(shutdown_only):
ray.init(num_cpus=1)
def test_driver_lives_parallel(ray_start_regular):
all_processes = ray.worker._global_node.all_processes
process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET] +

View file

@ -17,23 +17,10 @@ import ray
import ray.ray_constants as ray_constants
from ray.utils import _random_string
from ray.tests.cluster_utils import Cluster
from ray.tests.utils import (relevant_errors, wait_for_errors)
@pytest.fixture
def ray_start_regular():
# Start the Ray processes.
ray.init(num_cpus=2)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
from ray.tests.utils import (
relevant_errors,
wait_for_errors,
)
def test_failed_task(ray_start_regular):
@ -89,7 +76,7 @@ def test_failed_task(ray_start_regular):
assert False
def test_fail_importing_remote_function(ray_start_regular):
def test_fail_importing_remote_function(ray_start_2_cpus):
# Create the contents of a temporary Python file.
temporary_python_file = """
def temporary_helper_function():
@ -129,7 +116,7 @@ def temporary_helper_function():
sys.path.pop(-1)
def test_failed_function_to_run(ray_start_regular):
def test_failed_function_to_run(ray_start_2_cpus):
def f(worker):
if ray.worker.global_worker.mode == ray.WORKER_MODE:
raise Exception("Function to run failed.")
@ -386,17 +373,9 @@ def test_actor_scope_or_intentionally_killed_message(ray_start_regular):
"Should not have propogated an error - {}".format(ray.error_info()))
@pytest.fixture
def ray_start_object_store_memory():
# Start the Ray processes.
store_size = 10**6
ray.init(num_cpus=1, object_store_memory=store_size)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.mark.skip("This test does not work yet.")
@pytest.mark.parametrize(
"ray_start_object_store_memory", [10**6], indirect=True)
def test_put_error1(ray_start_object_store_memory):
num_objects = 3
object_size = 4 * 10**5
@ -439,6 +418,8 @@ def test_put_error1(ray_start_object_store_memory):
@pytest.mark.skip("This test does not work yet.")
@pytest.mark.parametrize(
"ray_start_object_store_memory", [10**6], indirect=True)
def test_put_error2(ray_start_object_store_memory):
# This is the same as the previous test, but it calls ray.put directly.
num_objects = 3
@ -608,8 +589,8 @@ def test_warning_for_too_many_nested_tasks(shutdown_only):
wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1)
def test_redis_module_failure(shutdown_only):
address_info = ray.init(num_cpus=1)
def test_redis_module_failure(ray_start_regular):
address_info = ray_start_regular
redis_address = address_info["redis_address"]
redis_address = redis_address.split(":")
assert len(redis_address) == 2
@ -653,28 +634,10 @@ def test_redis_module_failure(shutdown_only):
run_one_command("RAY.SET_ADD", 1, 1, 3, 1)
@pytest.fixture
def ray_start_two_nodes():
# Start the Ray processes.
cluster = Cluster()
for _ in range(2):
cluster.add_node(
num_cpus=0,
_internal_config=json.dumps({
"num_heartbeats_timeout": 40
}))
ray.init(redis_address=cluster.redis_address)
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
# Note that this test will take at least 10 seconds because it must wait for
# the monitor to detect enough missed heartbeats.
def test_warning_for_dead_node(ray_start_two_nodes):
cluster = ray_start_two_nodes
def test_warning_for_dead_node(ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
cluster.wait_for_nodes()
client_ids = {item["ClientID"] for item in ray.global_state.client_table()}

View file

@ -2,7 +2,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import pytest
try:
import pytest_timeout
@ -11,33 +10,6 @@ except ImportError:
import time
import ray
from ray.tests.cluster_utils import Cluster
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=1)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture
def cluster_start():
# Start the Ray processes.
cluster = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 1,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 10
})
})
yield cluster
ray.shutdown()
cluster.shutdown()
# TODO(rliaw): The proper way to do this is to have the pytest config setup.
@ -45,7 +17,7 @@ def cluster_start():
pytest_timeout is None,
reason="Timeout package not installed; skipping test that may hang.")
@pytest.mark.timeout(10)
def test_replenish_resources(ray_start):
def test_replenish_resources(ray_start_regular):
cluster_resources = ray.global_state.cluster_resources()
available_resources = ray.global_state.available_resources()
assert cluster_resources == available_resources
@ -67,7 +39,7 @@ def test_replenish_resources(ray_start):
pytest_timeout is None,
reason="Timeout package not installed; skipping test that may hang.")
@pytest.mark.timeout(10)
def test_uses_resources(ray_start):
def test_uses_resources(ray_start_regular):
cluster_resources = ray.global_state.cluster_resources()
@ray.remote
@ -89,9 +61,9 @@ def test_uses_resources(ray_start):
pytest_timeout is None,
reason="Timeout package not installed; skipping test that may hang.")
@pytest.mark.timeout(20)
def test_add_remove_cluster_resources(cluster_start):
def test_add_remove_cluster_resources(ray_start_cluster_head):
"""Tests that Global State API is consistent with actual cluster."""
cluster = cluster_start
cluster = ray_start_cluster_head
assert ray.global_state.cluster_resources()["CPU"] == 1
nodes = []
nodes += [cluster.add_node(num_cpus=1)]

View file

@ -2,20 +2,12 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import os
import ray
import pytest
import time
import numpy as np
@pytest.fixture
def ray_start_regular():
# Start the Ray processes.
ray.init(num_cpus=3)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
import ray
def test_timing(ray_start_regular):
@ -96,6 +88,7 @@ def test_timing(ray_start_regular):
# average_elapsed_time should be about 0.00087.
@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_cache(ray_start_regular):
A = np.random.rand(1, 1000000)
v = np.random.rand(1000000)

View file

@ -2,22 +2,12 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import ray
test_values = [1, 1.0, "test", b"test", (0, 1), [0, 1], {0: 1}]
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=1)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_basic_task_api(ray_start):
def test_basic_task_api(ray_start_regular):
# Test a simple function.
@ -50,7 +40,7 @@ def test_basic_task_api(ray_start):
# Test keyword arguments.
def test_put_api(ray_start):
def test_put_api(ray_start_regular):
for obj in test_values:
assert ray.get(ray.put(obj)) == obj
@ -61,7 +51,7 @@ def test_put_api(ray_start):
assert ray.get(ray.put(obj)) == obj
def test_actor_api(ray_start):
def test_actor_api(ray_start_regular):
@ray.remote
class Foo(object):
def __init__(self, val):

View file

@ -7,13 +7,6 @@ import pytest
import ray
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_modin_import_with_ray_init(shutdown_only):
ray.init(num_cpus=1)
import modin.pandas as pd

View file

@ -12,26 +12,8 @@ from ray.tests.utils import (run_and_get_output, run_string_as_driver,
run_string_as_driver_nonblocking)
@pytest.fixture
def ray_start_head():
out = run_and_get_output(["ray", "start", "--head", "--num-cpus=2"])
# Get the redis address from the output.
redis_substring_prefix = "redis_address=\""
redis_address_location = (
out.find(redis_substring_prefix) + len(redis_substring_prefix))
redis_address = out[redis_address_location:]
redis_address = redis_address.split("\"")[0]
yield redis_address
# Disconnect from the Ray cluster.
ray.shutdown()
# Kill the Ray cluster.
subprocess.Popen(["ray", "stop"]).wait()
def test_error_isolation(ray_start_head):
redis_address = ray_start_head
def test_error_isolation(call_ray_start):
redis_address = call_ray_start
# Connect a driver to the Ray cluster.
ray.init(redis_address=redis_address)
@ -99,10 +81,10 @@ print("success")
assert error_string1 in ray.error_info()[0]["message"]
def test_remote_function_isolation(ray_start_head):
def test_remote_function_isolation(call_ray_start):
# This test will run multiple remote functions with the same names in
# two different drivers. Connect a driver to the Ray cluster.
redis_address = ray_start_head
redis_address = call_ray_start
ray.init(redis_address=redis_address)
@ -142,10 +124,10 @@ print("success")
assert "success" in out
def test_driver_exiting_quickly(ray_start_head):
def test_driver_exiting_quickly(call_ray_start):
# This test will create some drivers that submit some tasks and then
# exit without waiting for the tasks to complete.
redis_address = ray_start_head
redis_address = call_ray_start
ray.init(redis_address=redis_address)
@ -218,25 +200,11 @@ ray.get([a.log.remote(), f.remote()])
assert out.count(log_message) == 4
@pytest.fixture
def ray_start_head_with_resources():
out = run_and_get_output(
["ray", "start", "--head", "--num-cpus=1", "--num-gpus=1"])
# Get the redis address from the output.
redis_substring_prefix = "redis_address=\""
redis_address_location = (
out.find(redis_substring_prefix) + len(redis_substring_prefix))
redis_address = out[redis_address_location:]
redis_address = redis_address.split("\"")[0]
yield redis_address
# Kill the Ray cluster.
subprocess.Popen(["ray", "stop"]).wait()
def test_drivers_release_resources(ray_start_head_with_resources):
redis_address = ray_start_head_with_resources
@pytest.mark.parametrize(
"call_ray_start", ["ray start --head --num-cpus=1 --num-gpus=1"],
indirect=True)
def test_drivers_release_resources(call_ray_start):
redis_address = call_ray_start
# Define a driver that creates an actor and exits.
driver_script1 = """
@ -359,23 +327,13 @@ def test_calling_start_ray_head():
subprocess.Popen(["ray", "stop"]).wait()
@pytest.fixture
def ray_start_head_local():
# Start the Ray processes on this machine.
run_and_get_output([
"ray", "start", "--head", "--node-ip-address=localhost",
"--redis-port=6379"
])
yield None
# Disconnect from the Ray cluster.
ray.shutdown()
# Kill the Ray cluster.
subprocess.Popen(["ray", "stop"]).wait()
def test_using_hostnames(ray_start_head_local):
@pytest.mark.parametrize(
"call_ray_start", [
"ray start --head --num-cpus=1 " +
"--node-ip-address=localhost --redis-port=6379"
],
indirect=True)
def test_using_hostnames(call_ray_start):
ray.init(node_ip_address="localhost", redis_address="localhost:6379")
@ray.remote
@ -385,15 +343,6 @@ def test_using_hostnames(ray_start_head_local):
assert ray.get(f.remote()) == 1
@pytest.fixture
def ray_start_regular():
# Start the Ray processes.
address_info = ray.init(num_cpus=1)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
def test_connecting_in_local_case(ray_start_regular):
address_info = ray_start_regular
@ -453,10 +402,10 @@ print("success")
assert "success" in out
def test_driver_exiting_when_worker_blocked(ray_start_head):
def test_driver_exiting_when_worker_blocked(call_ray_start):
# This test will create some drivers that submit some tasks and then
# exit without waiting for the tasks to complete.
redis_address = ray_start_head
redis_address = call_ray_start
ray.init(redis_address=redis_address)

View file

@ -2,7 +2,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import logging
import pytest
import time
@ -10,46 +9,11 @@ import time
import ray
import ray.ray_constants as ray_constants
from ray.tests.cluster_utils import Cluster
from ray.tests.conftest import generate_internal_config_map
logger = logging.getLogger(__name__)
@pytest.fixture
def start_connected_cluster():
# Start the Ray processes.
g = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 1,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 10
})
})
yield g
# The code after the yield will run as teardown code.
ray.shutdown()
g.shutdown()
@pytest.fixture
def start_connected_longer_cluster():
"""Creates a cluster with a longer timeout."""
g = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 1,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 20
})
})
yield g
# The code after the yield will run as teardown code.
ray.shutdown()
g.shutdown()
def test_cluster():
"""Basic test for adding and removing nodes in cluster."""
g = Cluster(initialize_head=False)
@ -70,7 +34,11 @@ def test_shutdown():
assert not any(n.any_processes_alive() for n in [node, node2])
def test_internal_config(start_connected_longer_cluster):
@pytest.mark.parametrize(
"ray_start_cluster_head",
[generate_internal_config_map(num_heartbeats_timeout=20)],
indirect=True)
def test_internal_config(ray_start_cluster_head):
"""Checks that the internal configuration setting works.
We set the cluster to timeout nodes after 2 seconds of no timeouts. We
@ -78,7 +46,7 @@ def test_internal_config(start_connected_longer_cluster):
of sync, then wait another 2 seconds (giving 1 second of leeway) to check
that the client has timed out.
"""
cluster = start_connected_longer_cluster
cluster = ray_start_cluster_head
worker = cluster.add_node()
cluster.wait_for_nodes()
@ -90,13 +58,13 @@ def test_internal_config(start_connected_longer_cluster):
assert ray.global_state.cluster_resources()["CPU"] == 1
def test_wait_for_nodes(start_connected_cluster):
def test_wait_for_nodes(ray_start_cluster_head):
"""Unit test for `Cluster.wait_for_nodes`.
Adds 4 workers, waits, then removes 4 workers, waits,
then adds 1 worker, waits, and removes 1 worker, waits.
"""
cluster = start_connected_cluster
cluster = ray_start_cluster_head
workers = [cluster.add_node() for i in range(4)]
cluster.wait_for_nodes()
[cluster.remove_node(w) for w in workers]
@ -110,8 +78,8 @@ def test_wait_for_nodes(start_connected_cluster):
assert ray.global_state.cluster_resources()["CPU"] == 1
def test_worker_plasma_store_failure(start_connected_cluster):
cluster = start_connected_cluster
def test_worker_plasma_store_failure(ray_start_cluster_head):
cluster = ray_start_cluster_head
worker = cluster.add_node()
cluster.wait_for_nodes()
# Log monitor doesn't die for some reason

View file

@ -2,27 +2,14 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import ray
from ray.tests.cluster_utils import Cluster
from ray.tests.utils import run_string_as_driver
@pytest.fixture()
def ray_start_empty_cluster():
cluster = Cluster()
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
# This tests the queue transitions for infeasible tasks. This has been an issue
# in the past, e.g., https://github.com/ray-project/ray/issues/3275.
def test_infeasible_tasks(ray_start_empty_cluster):
cluster = ray_start_empty_cluster
def test_infeasible_tasks(ray_start_cluster):
cluster = ray_start_cluster
@ray.remote
def f():

View file

@ -33,7 +33,7 @@ def create_cluster(num_nodes):
@pytest.fixture()
def ray_start_cluster():
def ray_start_cluster_with_resource():
num_nodes = 5
cluster = create_cluster(num_nodes)
yield cluster, num_nodes
@ -43,20 +43,10 @@ def ray_start_cluster():
cluster.shutdown()
@pytest.fixture()
def ray_start_empty_cluster():
cluster = Cluster()
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
# This test is here to make sure that when we broadcast an object to a bunch of
# machines, we don't have too many excess object transfers.
def test_object_broadcast(ray_start_cluster):
cluster, num_nodes = ray_start_cluster
def test_object_broadcast(ray_start_cluster_with_resource):
cluster, num_nodes = ray_start_cluster_with_resource
@ray.remote
def f(x):
@ -137,8 +127,8 @@ def test_object_broadcast(ray_start_cluster):
# to the actor's object manager. However, in the past we did not deduplicate
# the pushes and so the same object could get shipped to the same object
# manager many times. This test checks that that isn't happening.
def test_actor_broadcast(ray_start_cluster):
cluster, num_nodes = ray_start_cluster
def test_actor_broadcast(ray_start_cluster_with_resource):
cluster, num_nodes = ray_start_cluster_with_resource
@ray.remote
class Actor(object):
@ -212,8 +202,8 @@ def test_actor_broadcast(ray_start_cluster):
# The purpose of this test is to make sure that an object that was already been
# transferred to a node can be transferred again.
def test_object_transfer_retry(ray_start_empty_cluster):
cluster = ray_start_empty_cluster
def test_object_transfer_retry(ray_start_cluster):
cluster = ray_start_cluster
repeated_push_delay = 4
@ -300,8 +290,8 @@ def test_object_transfer_retry(ray_start_empty_cluster):
# The purpose of this test is to make sure we can transfer many objects. In the
# past, this has caused failures in which object managers create too many open
# files and run out of resources.
def test_many_small_transfers(ray_start_cluster):
cluster, num_nodes = ray_start_cluster
def test_many_small_transfers(ray_start_cluster_with_resource):
cluster, num_nodes = ray_start_cluster_with_resource
@ray.remote
def f(*args):

View file

@ -9,16 +9,7 @@ import ray
from ray.experimental.queue import Queue, Empty, Full
@pytest.fixture
def ray_start():
# Start the Ray process.
ray.init(num_cpus=1)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_queue(ray_start):
def test_queue(ray_start_regular):
@ray.remote
def get_async(queue, block, timeout, sleep):
time.sleep(sleep)

View file

@ -18,13 +18,6 @@ def password():
return random_bytes.encode("hex") # Python 2
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
class TestRedisPassword(object):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",

View file

@ -6,23 +6,9 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import ray
@pytest.fixture
def ray_start():
# Start ray instance
ray.init(num_cpus=1)
# Run test using this fixture
yield None
# Shutdown ray instance
ray.shutdown()
@ray.remote
def factorial(n):
if n == 0:
@ -30,7 +16,7 @@ def factorial(n):
return n * ray.get(factorial.remote(n - 1))
def test_recursion(ray_start):
def test_recursion(ray_start_regular):
assert ray.get(factorial.remote(0)) == 1
assert ray.get(factorial.remote(1)) == 1
assert ray.get(factorial.remote(2)) == 2

View file

@ -10,16 +10,7 @@ import ray.experimental.no_return
import ray.worker
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=1)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_set_single_output(ray_start):
def test_set_single_output(ray_start_regular):
@ray.remote
def f():
return_object_ids = ray.worker.global_worker._current_task.returns()
@ -29,7 +20,7 @@ def test_set_single_output(ray_start):
assert ray.get(f.remote()) == 123
def test_set_multiple_outputs(ray_start):
def test_set_multiple_outputs(ray_start_regular):
@ray.remote(num_return_vals=3)
def f(set_out0, set_out1, set_out2):
returns = []
@ -51,7 +42,7 @@ def test_set_multiple_outputs(ray_start):
]
def test_set_actor_method(ray_start):
def test_set_actor_method(ray_start_regular):
@ray.remote
class Actor(object):
def __init__(self):
@ -67,7 +58,7 @@ def test_set_actor_method(ray_start):
assert ray.get(actor.ping.remote()) == 123
def test_exception(ray_start):
def test_exception(ray_start_regular):
@ray.remote(num_return_vals=2)
def f():
return_object_ids = ray.worker.global_worker._current_task.returns()
@ -84,7 +75,7 @@ def test_exception(ray_start):
ray.get(exception_id)
def test_no_set_and_no_return(ray_start):
def test_no_set_and_no_return(ray_start_regular):
@ray.remote
def f():
return ray.experimental.no_return.NoReturn

View file

@ -1,4 +1,3 @@
import pytest
import time
import ray
@ -10,15 +9,6 @@ class UserSignal(signal.Signal):
self.value = value
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=4)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def receive_all_signals(sources, timeout):
# Get all signals from sources, until there is no signal for a time
# period of timeout.
@ -32,7 +22,7 @@ def receive_all_signals(sources, timeout):
results.extend(r)
def test_task_to_driver(ray_start):
def test_task_to_driver(ray_start_regular):
# Send a signal from a task to the driver.
@ray.remote
@ -47,7 +37,7 @@ def test_task_to_driver(ray_start):
assert len(result_list) == 1
def test_send_signal_from_actor_to_driver(ray_start):
def test_send_signal_from_actor_to_driver(ray_start_regular):
# Send several signals from an actor, and receive them in the driver.
@ray.remote
@ -70,7 +60,7 @@ def test_send_signal_from_actor_to_driver(ray_start):
assert signal_value + str(i) == result_list[i][1].value
def test_send_signals_from_actor_to_driver(ray_start):
def test_send_signals_from_actor_to_driver(ray_start_regular):
# Send "count" signal at intervals from an actor and get
# these signals in the driver.
@ -96,7 +86,7 @@ def test_send_signals_from_actor_to_driver(ray_start):
assert True
def test_task_crash(ray_start):
def test_task_crash(ray_start_regular):
# Get an error when ray.get() is called on the return of a failed task.
@ray.remote
@ -114,7 +104,7 @@ def test_task_crash(ray_start):
assert type(result_list[0][1]) == signal.ErrorSignal
def test_task_crash_without_get(ray_start):
def test_task_crash_without_get(ray_start_regular):
# Get an error when task failed.
@ray.remote
@ -127,7 +117,7 @@ def test_task_crash_without_get(ray_start):
assert type(result_list[0][1]) == signal.ErrorSignal
def test_actor_crash(ray_start):
def test_actor_crash(ray_start_regular):
# Get an error when ray.get() is called on a return parameter
# of a method that failed.
@ -150,7 +140,7 @@ def test_actor_crash(ray_start):
assert type(result_list[0][1]) == signal.ErrorSignal
def test_actor_crash_init(ray_start):
def test_actor_crash_init(ray_start_regular):
# Get an error when an actor's __init__ failed.
@ray.remote
@ -168,7 +158,7 @@ def test_actor_crash_init(ray_start):
assert type(result_list[0][1]) == signal.ErrorSignal
def test_actor_crash_init2(ray_start):
def test_actor_crash_init2(ray_start_regular):
# Get errors when (1) __init__ fails, and (2) subsequently when
# ray.get() is called on the return parameter of another method
# of the actor.
@ -192,7 +182,7 @@ def test_actor_crash_init2(ray_start):
assert type(result_list[0][1]) == signal.ErrorSignal
def test_actor_crash_init3(ray_start):
def test_actor_crash_init3(ray_start_regular):
# Get errors when (1) __init__ fails, and (2) subsequently when
# another method of the actor is invoked.
@ -213,7 +203,7 @@ def test_actor_crash_init3(ray_start):
assert type(result_list[0][1]) == signal.ErrorSignal
def test_send_signals_from_actor_to_actor(ray_start):
def test_send_signals_from_actor_to_actor(ray_start_regular):
# Send "count" signal at intervals of 100ms from two actors and get
# these signals in another actor.
@ -260,7 +250,7 @@ def test_send_signals_from_actor_to_actor(ray_start):
assert received_count == 2 * count
def test_forget(ray_start):
def test_forget(ray_start_regular):
# Send "count" signals on behalf of an actor, then ignore all these
# signals, and then send anther "count" signals on behalf of the same
# actor. Then show that the driver only gets the last "count" signals.
@ -284,7 +274,7 @@ def test_forget(ray_start):
assert len(result_list) == count
def test_send_signal_from_two_tasks_to_driver(ray_start):
def test_send_signal_from_two_tasks_to_driver(ray_start_regular):
# Define a remote function that sends a user-defined signal.
@ray.remote
def send_signal(value):
@ -302,7 +292,7 @@ def test_send_signal_from_two_tasks_to_driver(ray_start):
assert len(result_list) == 1
def test_receiving_on_two_returns(ray_start):
def test_receiving_on_two_returns(ray_start_regular):
@ray.remote(num_return_vals=2)
def send_signal(value):
signal.send(UserSignal(value))
@ -318,7 +308,7 @@ def test_receiving_on_two_returns(ray_start):
or (x == results[1][0] and y == results[0][0]))
def test_serial_tasks_reading_same_signal(ray_start):
def test_serial_tasks_reading_same_signal(ray_start_regular):
@ray.remote
def send_signal(value):
signal.send(UserSignal(value))

View file

@ -473,21 +473,13 @@ def test_nondeterministic_task(ray_start_reconstruction):
assert cluster.remaining_processes_alive()
@pytest.fixture
def ray_start_driver_put_errors():
plasma_store_memory = 10**9
# Start the Ray processes.
ray.init(num_cpus=1, object_store_memory=plasma_store_memory)
yield plasma_store_memory
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_driver_put_errors(ray_start_driver_put_errors):
plasma_store_memory = ray_start_driver_put_errors
@pytest.mark.parametrize(
"ray_start_object_store_memory", [10**9], indirect=True)
def test_driver_put_errors(ray_start_object_store_memory):
plasma_store_memory = ray_start_object_store_memory
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.

View file

@ -3,7 +3,6 @@ from __future__ import division
from __future__ import print_function
from numpy.testing import assert_almost_equal
import pytest
import tensorflow as tf
import ray
@ -96,16 +95,7 @@ class TrainActor(object):
return self.values[1].get_weights()
@pytest.fixture
def ray_start_regular():
# Start the Ray processes.
ray.init(num_cpus=2)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_tensorflow_variables(ray_start_regular):
def test_tensorflow_variables(ray_start_2_cpus):
sess = tf.Session()
loss, init, _, _ = make_linear_network()
sess.run(init)
@ -144,7 +134,7 @@ def test_tensorflow_variables(ray_start_regular):
# Test that the variable names for the two different nets are not
# modified by TensorFlow to be unique (i.e., they should already
# be unique because of the variable prefix).
def test_variable_name_collision(ray_start_regular):
def test_variable_name_collision(ray_start_2_cpus):
net1 = NetActor()
net2 = NetActor()
@ -155,7 +145,7 @@ def test_variable_name_collision(ray_start_regular):
# Test that TensorFlowVariables can take in addition variables through
# input_variables arg and with no loss.
def test_additional_variables_no_loss(ray_start_regular):
def test_additional_variables_no_loss(ray_start_2_cpus):
net = LossActor(use_loss=False)
assert len(net.values[0].variables.items()) == 1
assert len(net.values[0].placeholders.items()) == 1
@ -165,7 +155,7 @@ def test_additional_variables_no_loss(ray_start_regular):
# Test that TensorFlowVariables can take in addition variables through
# input_variables arg and with a loss.
def test_additional_variables_with_loss(ray_start_regular):
def test_additional_variables_with_loss(ray_start_2_cpus):
net = LossActor()
assert len(net.values[0].variables.items()) == 3
assert len(net.values[0].placeholders.items()) == 3
@ -175,7 +165,7 @@ def test_additional_variables_with_loss(ray_start_regular):
# Test that different networks on the same worker are independent and
# we can get/set their weights without any interaction.
def test_networks_independent(ray_start_regular):
def test_networks_independent(ray_start_2_cpus):
# Note we use only one worker to ensure that all of the remote
# functions run on the same worker.
net1 = NetActor()
@ -204,7 +194,7 @@ def test_networks_independent(ray_start_regular):
# This test creates an additional network on the driver so that the
# tensorflow variables on the driver and the worker differ.
def test_network_driver_worker_independent(ray_start_regular):
def test_network_driver_worker_independent(ray_start_2_cpus):
# Create a network on the driver locally.
sess1 = tf.Session()
loss1, init1, _, _ = make_linear_network()
@ -219,7 +209,7 @@ def test_network_driver_worker_independent(ray_start_regular):
assert weights2 == new_weights2
def test_variables_control_dependencies(ray_start_regular):
def test_variables_control_dependencies(ray_start_2_cpus):
# Creates a network and appends a momentum optimizer.
sess = tf.Session()
loss, init, _, _ = make_linear_network()
@ -232,12 +222,12 @@ def test_variables_control_dependencies(ray_start_regular):
assert len(net_vars.variables.items()) == 4
def test_remote_training_step(ray_start_regular):
def test_remote_training_step(ray_start_2_cpus):
net = ray.remote(TrainActor).remote()
ray.get(net.training_step.remote(net.get_weights.remote()))
def test_remote_training_loss(ray_start_regular):
def test_remote_training_loss(ray_start_2_cpus):
net = ray.remote(TrainActor).remote()
net_values = TrainActor().values
loss, variables, _, sess, grads, train, placeholders = net_values