mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[xray] Re-enable some stress tests and convert stress_tests to pytest. (#2285)
* Fix one of the stress tests, fix ray.global_state.client_table when called early on. * Re-enable testWait. * Convert stress_tests.py to pytest. * Fix
This commit is contained in:
parent
3a972893ee
commit
e3534c46df
3 changed files with 493 additions and 491 deletions
|
@ -135,7 +135,7 @@ matrix:
|
|||
- python test/tensorflow_test.py
|
||||
- python test/failure_test.py
|
||||
- python test/microbenchmarks.py
|
||||
- python test/stress_tests.py
|
||||
- python -m pytest test/stress_tests.py
|
||||
# - python test/component_failures_test.py
|
||||
- python test/multi_node_test.py
|
||||
- python test/recursion_test.py
|
||||
|
@ -194,7 +194,7 @@ script:
|
|||
- python test/tensorflow_test.py
|
||||
- python test/failure_test.py
|
||||
- python test/microbenchmarks.py
|
||||
- python test/stress_tests.py
|
||||
- python -m pytest test/stress_tests.py
|
||||
- python test/component_failures_test.py
|
||||
- python test/multi_node_test.py
|
||||
- python test/recursion_test.py
|
||||
|
|
|
@ -500,6 +500,12 @@ class GlobalState(object):
|
|||
message = self.redis_client.execute_command(
|
||||
"RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.CLIENT, "",
|
||||
NIL_CLIENT_ID)
|
||||
|
||||
# Handle the case where no clients are returned. This should only
|
||||
# occur potentially immediately after the cluster is started.
|
||||
if message is None:
|
||||
return []
|
||||
|
||||
node_info = []
|
||||
gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
|
||||
message, 0)
|
||||
|
|
|
@ -2,29 +2,40 @@ from __future__ import absolute_import
|
|||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import unittest
|
||||
import os
|
||||
import ray
|
||||
import numpy as np
|
||||
import os
|
||||
import pytest
|
||||
import time
|
||||
|
||||
import ray
|
||||
import ray.ray_constants as ray_constants
|
||||
|
||||
|
||||
class TaskTests(unittest.TestCase):
|
||||
@unittest.skipIf(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
"This test does not work with xray yet.")
|
||||
def testSubmittingTasks(self):
|
||||
for num_local_schedulers in [1, 4]:
|
||||
for num_workers_per_scheduler in [4]:
|
||||
num_workers = num_local_schedulers * num_workers_per_scheduler
|
||||
@pytest.fixture
|
||||
def ray_start_regular():
|
||||
# Start the Ray processes.
|
||||
ray.init(num_cpus=10)
|
||||
yield None
|
||||
# The code after the yield will run as teardown code.
|
||||
ray.worker.cleanup()
|
||||
|
||||
|
||||
@pytest.fixture(params=[(1, 4), (4, 4)])
|
||||
def ray_start_combination(request):
|
||||
num_local_schedulers = request.param[0]
|
||||
num_workers_per_scheduler = request.param[1]
|
||||
# Start the Ray processes.
|
||||
ray.worker._init(
|
||||
start_ray_local=True,
|
||||
num_workers=num_workers,
|
||||
num_workers=num_workers_per_scheduler,
|
||||
num_local_schedulers=num_local_schedulers,
|
||||
num_cpus=100)
|
||||
num_cpus=10)
|
||||
yield num_local_schedulers, num_workers_per_scheduler
|
||||
# The code after the yield will run as teardown code.
|
||||
ray.worker.cleanup()
|
||||
|
||||
|
||||
def test_submitting_tasks(ray_start_combination):
|
||||
@ray.remote
|
||||
def f(x):
|
||||
return x
|
||||
|
@ -41,22 +52,13 @@ class TaskTests(unittest.TestCase):
|
|||
for _ in range(1000):
|
||||
ray.get([f.remote(1) for _ in range(1)])
|
||||
|
||||
self.assertTrue(ray.services.all_processes_alive())
|
||||
ray.worker.cleanup()
|
||||
assert ray.services.all_processes_alive()
|
||||
|
||||
@unittest.skipIf(
|
||||
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
"This test does not work with xray yet.")
|
||||
def testDependencies(self):
|
||||
for num_local_schedulers in [1, 4]:
|
||||
for num_workers_per_scheduler in [4]:
|
||||
num_workers = num_local_schedulers * num_workers_per_scheduler
|
||||
ray.worker._init(
|
||||
start_ray_local=True,
|
||||
num_workers=num_workers,
|
||||
num_local_schedulers=num_local_schedulers,
|
||||
num_cpus=100)
|
||||
|
||||
reason="This test does not work with xray yet.")
|
||||
def test_dependencies(ray_start_combination):
|
||||
@ray.remote
|
||||
def f(x):
|
||||
return x
|
||||
|
@ -76,15 +78,13 @@ class TaskTests(unittest.TestCase):
|
|||
xs.append(g.remote(1))
|
||||
ray.get(xs)
|
||||
|
||||
self.assertTrue(ray.services.all_processes_alive())
|
||||
ray.worker.cleanup()
|
||||
assert ray.services.all_processes_alive()
|
||||
|
||||
@unittest.skipIf(
|
||||
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
"This test does not work with xray yet.")
|
||||
def testSubmittingManyTasks(self):
|
||||
ray.init()
|
||||
|
||||
reason="This test does not work with xray yet.")
|
||||
def test_submitting_many_tasks(ray_start_regular):
|
||||
@ray.remote
|
||||
def f(x):
|
||||
return 1
|
||||
|
@ -96,12 +96,10 @@ class TaskTests(unittest.TestCase):
|
|||
return x
|
||||
|
||||
ray.get([g(1000) for _ in range(100)])
|
||||
self.assertTrue(ray.services.all_processes_alive())
|
||||
ray.worker.cleanup()
|
||||
assert ray.services.all_processes_alive()
|
||||
|
||||
def testGettingAndPutting(self):
|
||||
ray.init(num_workers=1)
|
||||
|
||||
def test_getting_and_putting(ray_start_regular):
|
||||
for n in range(8):
|
||||
x = np.zeros(10**n)
|
||||
|
||||
|
@ -112,35 +110,24 @@ class TaskTests(unittest.TestCase):
|
|||
for _ in range(1000):
|
||||
ray.get(x_id)
|
||||
|
||||
self.assertTrue(ray.services.all_processes_alive())
|
||||
ray.worker.cleanup()
|
||||
assert ray.services.all_processes_alive()
|
||||
|
||||
def testGettingManyObjects(self):
|
||||
ray.init()
|
||||
|
||||
def test_getting_many_objects(ray_start_regular):
|
||||
@ray.remote
|
||||
def f():
|
||||
return 1
|
||||
|
||||
n = 10**4 # TODO(pcm): replace by 10 ** 5 once this is faster.
|
||||
lst = ray.get([f.remote() for _ in range(n)])
|
||||
self.assertEqual(lst, n * [1])
|
||||
assert lst == n * [1]
|
||||
|
||||
self.assertTrue(ray.services.all_processes_alive())
|
||||
ray.worker.cleanup()
|
||||
assert ray.services.all_processes_alive()
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
"This test does not work with xray yet.")
|
||||
def testWait(self):
|
||||
for num_local_schedulers in [1, 4]:
|
||||
for num_workers_per_scheduler in [4]:
|
||||
|
||||
def test_wait(ray_start_combination):
|
||||
num_local_schedulers, num_workers_per_scheduler = ray_start_combination
|
||||
num_workers = num_local_schedulers * num_workers_per_scheduler
|
||||
ray.worker._init(
|
||||
start_ray_local=True,
|
||||
num_workers=num_workers,
|
||||
num_local_schedulers=num_local_schedulers,
|
||||
num_cpus=100)
|
||||
|
||||
@ray.remote
|
||||
def f(x):
|
||||
|
@ -158,41 +145,33 @@ class TaskTests(unittest.TestCase):
|
|||
|
||||
for i in range(1, 5):
|
||||
x_ids = [
|
||||
g.remote(np.random.uniform(0, i))
|
||||
for _ in range(2 * num_workers)
|
||||
g.remote(np.random.uniform(0, i)) for _ in range(2 * num_workers)
|
||||
]
|
||||
ray.wait(x_ids, num_returns=len(x_ids))
|
||||
|
||||
self.assertTrue(ray.services.all_processes_alive())
|
||||
ray.worker.cleanup()
|
||||
assert ray.services.all_processes_alive()
|
||||
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
"This test does not work with xray yet.")
|
||||
class ReconstructionTests(unittest.TestCase):
|
||||
@pytest.fixture(params=[1, 4])
|
||||
def ray_start_reconstruction(request):
|
||||
num_local_schedulers = request.param
|
||||
|
||||
num_local_schedulers = 1
|
||||
|
||||
def setUp(self):
|
||||
# Start the Redis global state store.
|
||||
node_ip_address = "127.0.0.1"
|
||||
redis_address, redis_shards = ray.services.start_redis(node_ip_address)
|
||||
self.redis_ip_address = ray.services.get_ip_address(redis_address)
|
||||
self.redis_port = ray.services.get_port(redis_address)
|
||||
redis_ip_address = ray.services.get_ip_address(redis_address)
|
||||
redis_port = ray.services.get_port(redis_address)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Start the Plasma store instances with a total of 1GB memory.
|
||||
self.plasma_store_memory = 10**9
|
||||
plasma_store_memory = 10**9
|
||||
plasma_addresses = []
|
||||
objstore_memory = (
|
||||
self.plasma_store_memory // self.num_local_schedulers)
|
||||
for i in range(self.num_local_schedulers):
|
||||
objstore_memory = plasma_store_memory // num_local_schedulers
|
||||
for i in range(num_local_schedulers):
|
||||
store_stdout_file, store_stderr_file = ray.services.new_log_files(
|
||||
"plasma_store_{}".format(i), True)
|
||||
manager_stdout_file, manager_stderr_file = (
|
||||
ray.services.new_log_files("plasma_manager_{}".format(i),
|
||||
True))
|
||||
manager_stdout_file, manager_stderr_file = (ray.services.new_log_files(
|
||||
"plasma_manager_{}".format(i), True))
|
||||
plasma_addresses.append(
|
||||
ray.services.start_objstore(
|
||||
node_ip_address,
|
||||
|
@ -213,19 +192,21 @@ class ReconstructionTests(unittest.TestCase):
|
|||
address_info=address_info,
|
||||
start_ray_local=True,
|
||||
num_workers=1,
|
||||
num_local_schedulers=self.num_local_schedulers,
|
||||
num_cpus=[1] * self.num_local_schedulers,
|
||||
redirect_output=True,
|
||||
driver_mode=ray.SILENT_MODE)
|
||||
num_local_schedulers=num_local_schedulers,
|
||||
num_cpus=[1] * num_local_schedulers,
|
||||
redirect_output=True)
|
||||
|
||||
def tearDown(self):
|
||||
self.assertTrue(ray.services.all_processes_alive())
|
||||
yield (redis_ip_address, redis_port, plasma_store_memory,
|
||||
num_local_schedulers)
|
||||
|
||||
# The code after the yield will run as teardown code.
|
||||
assert ray.services.all_processes_alive()
|
||||
|
||||
# Determine the IDs of all local schedulers that had a task scheduled
|
||||
# or submitted.
|
||||
state = ray.experimental.state.GlobalState()
|
||||
state._initialize_global_state(self.redis_ip_address, self.redis_port)
|
||||
if os.environ.get('RAY_USE_NEW_GCS', False):
|
||||
state._initialize_global_state(redis_ip_address, redis_port)
|
||||
if os.environ.get("RAY_USE_NEW_GCS") == "on":
|
||||
tasks = state.task_table()
|
||||
local_scheduler_ids = {
|
||||
task["LocalSchedulerID"]
|
||||
|
@ -239,22 +220,26 @@ class ReconstructionTests(unittest.TestCase):
|
|||
# NIL_LOCAL_SCHEDULER_ID. This is the local scheduler ID associated
|
||||
# with the driver task, since it is not scheduled by a particular local
|
||||
# scheduler.
|
||||
if os.environ.get('RAY_USE_NEW_GCS', False):
|
||||
self.assertEqual(
|
||||
len(local_scheduler_ids), self.num_local_schedulers + 1)
|
||||
if os.environ.get("RAY_USE_NEW_GCS") == "on":
|
||||
assert len(local_scheduler_ids) == num_local_schedulers + 1
|
||||
|
||||
# Clean up the Ray cluster.
|
||||
ray.worker.cleanup()
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get('RAY_USE_NEW_GCS', False),
|
||||
"Failing with new GCS API on Linux.")
|
||||
def testSimple(self):
|
||||
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
reason="This test does not work with xray yet.")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
||||
reason="Failing with new GCS API on Linux.")
|
||||
def test_simple(ray_start_reconstruction):
|
||||
_, _, plasma_store_memory, num_local_schedulers = ray_start_reconstruction
|
||||
# Define the size of one task's return argument so that the combined
|
||||
# sum of all objects' sizes is at least twice the plasma stores'
|
||||
# combined allotted memory.
|
||||
num_objects = 1000
|
||||
size = int(self.plasma_store_memory * 1.5 / (num_objects * 8))
|
||||
size = int(plasma_store_memory * 1.5 / (num_objects * 8))
|
||||
|
||||
# Define a remote task with no dependencies, which returns a numpy
|
||||
# array of the given size.
|
||||
|
@ -273,26 +258,32 @@ class ReconstructionTests(unittest.TestCase):
|
|||
# gets, old values should be evicted.
|
||||
for i in range(num_objects):
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
# Get each value again to force reconstruction.
|
||||
for i in range(num_objects):
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
# Get values sequentially, in chunks.
|
||||
num_chunks = 4 * self.num_local_schedulers
|
||||
num_chunks = 4 * num_local_schedulers
|
||||
chunk = num_objects // num_chunks
|
||||
for i in range(num_chunks):
|
||||
values = ray.get(args[i * chunk:(i + 1) * chunk])
|
||||
del values
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get('RAY_USE_NEW_GCS', False), "Failing with new GCS API.")
|
||||
def testRecursive(self):
|
||||
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
reason="This test does not work with xray yet.")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
||||
reason="Failing with new GCS API on Linux.")
|
||||
def test_recursive(ray_start_reconstruction):
|
||||
_, _, plasma_store_memory, num_local_schedulers = ray_start_reconstruction
|
||||
# Define the size of one task's return argument so that the combined
|
||||
# sum of all objects' sizes is at least twice the plasma stores'
|
||||
# combined allotted memory.
|
||||
num_objects = 1000
|
||||
size = int(self.plasma_store_memory * 1.5 / (num_objects * 8))
|
||||
size = int(plasma_store_memory * 1.5 / (num_objects * 8))
|
||||
|
||||
# Define a root task with no dependencies, which returns a numpy array
|
||||
# of the given size.
|
||||
|
@ -321,31 +312,37 @@ class ReconstructionTests(unittest.TestCase):
|
|||
# gets, old values should be evicted.
|
||||
for i in range(num_objects):
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
# Get each value again to force reconstruction.
|
||||
for i in range(num_objects):
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
# Get 10 values randomly.
|
||||
for _ in range(10):
|
||||
i = np.random.randint(num_objects)
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
# Get values sequentially, in chunks.
|
||||
num_chunks = 4 * self.num_local_schedulers
|
||||
num_chunks = 4 * num_local_schedulers
|
||||
chunk = num_objects // num_chunks
|
||||
for i in range(num_chunks):
|
||||
values = ray.get(args[i * chunk:(i + 1) * chunk])
|
||||
del values
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get('RAY_USE_NEW_GCS', False), "Failing with new GCS API.")
|
||||
def testMultipleRecursive(self):
|
||||
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
reason="This test does not work with xray yet.")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
||||
reason="Failing with new GCS API on Linux.")
|
||||
def test_multiple_recursive(ray_start_reconstruction):
|
||||
_, _, plasma_store_memory, _ = ray_start_reconstruction
|
||||
# Define the size of one task's return argument so that the combined
|
||||
# sum of all objects' sizes is at least twice the plasma stores'
|
||||
# combined allotted memory.
|
||||
num_objects = 1000
|
||||
size = self.plasma_store_memory * 2 // (num_objects * 8)
|
||||
size = plasma_store_memory * 2 // (num_objects * 8)
|
||||
|
||||
# Define a root task with no dependencies, which returns a numpy array
|
||||
# of the given size.
|
||||
|
@ -378,18 +375,19 @@ class ReconstructionTests(unittest.TestCase):
|
|||
args = args[num_args:]
|
||||
for i in range(num_objects):
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
# Get each value again to force reconstruction.
|
||||
for i in range(num_objects):
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
# Get 10 values randomly.
|
||||
for _ in range(10):
|
||||
i = np.random.randint(num_objects)
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
|
||||
def wait_for_errors(self, error_check):
|
||||
|
||||
def wait_for_errors(error_check):
|
||||
# Wait for errors from all the nondeterministic tasks.
|
||||
errors = []
|
||||
time_left = 100
|
||||
|
@ -401,17 +399,23 @@ class ReconstructionTests(unittest.TestCase):
|
|||
time.sleep(1)
|
||||
|
||||
# Make sure that enough errors came through.
|
||||
self.assertTrue(error_check(errors))
|
||||
assert error_check(errors)
|
||||
return errors
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get('RAY_USE_NEW_GCS', False), "Hanging with new GCS API.")
|
||||
def testNondeterministicTask(self):
|
||||
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
reason="This test does not work with xray yet.")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
||||
reason="Failing with new GCS API on Linux.")
|
||||
def test_nondeterministic_task(ray_start_reconstruction):
|
||||
_, _, plasma_store_memory, num_local_schedulers = ray_start_reconstruction
|
||||
# Define the size of one task's return argument so that the combined
|
||||
# sum of all objects' sizes is at least twice the plasma stores'
|
||||
# combined allotted memory.
|
||||
num_objects = 1000
|
||||
size = self.plasma_store_memory * 2 // (num_objects * 8)
|
||||
size = plasma_store_memory * 2 // (num_objects * 8)
|
||||
|
||||
# Define a nondeterministic remote task with no dependencies, which
|
||||
# returns a random numpy array of the given size. This task should
|
||||
|
@ -443,14 +447,14 @@ class ReconstructionTests(unittest.TestCase):
|
|||
# gets, old values should be evicted.
|
||||
for i in range(num_objects):
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
# Get each value again to force reconstruction.
|
||||
for i in range(num_objects):
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
|
||||
def error_check(errors):
|
||||
if self.num_local_schedulers == 1:
|
||||
if num_local_schedulers == 1:
|
||||
# In a single-node setting, each object is evicted and
|
||||
# reconstructed exactly once, so exactly half the objects will
|
||||
# produce an error during reconstruction.
|
||||
|
@ -462,20 +466,25 @@ class ReconstructionTests(unittest.TestCase):
|
|||
min_errors = 1
|
||||
return len(errors) >= min_errors
|
||||
|
||||
errors = self.wait_for_errors(error_check)
|
||||
errors = wait_for_errors(error_check)
|
||||
# Make sure all the errors have the correct type.
|
||||
self.assertTrue(
|
||||
all(error["type"] == ray_constants.HASH_MISMATCH_PUSH_ERROR
|
||||
for error in errors))
|
||||
assert all(error["type"] == ray_constants.HASH_MISMATCH_PUSH_ERROR
|
||||
for error in errors)
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get('RAY_USE_NEW_GCS', False), "Hanging with new GCS API.")
|
||||
def testDriverPutErrors(self):
|
||||
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
reason="This test does not work with xray yet.")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
||||
reason="Failing with new GCS API on Linux.")
|
||||
def test_driver_put_errors(ray_start_reconstruction):
|
||||
_, _, plasma_store_memory, _ = ray_start_reconstruction
|
||||
# Define the size of one task's return argument so that the combined
|
||||
# sum of all objects' sizes is at least twice the plasma stores'
|
||||
# combined allotted memory.
|
||||
num_objects = 1000
|
||||
size = self.plasma_store_memory * 2 // (num_objects * 8)
|
||||
size = plasma_store_memory * 2 // (num_objects * 8)
|
||||
|
||||
# Define a task with a single dependency, a numpy array, that returns
|
||||
# another array.
|
||||
|
@ -497,7 +506,7 @@ class ReconstructionTests(unittest.TestCase):
|
|||
# gets, old values should be evicted.
|
||||
for i in range(num_objects):
|
||||
value = ray.get(args[i])
|
||||
self.assertEqual(value[0], i)
|
||||
assert value[0] == i
|
||||
|
||||
# Get each value starting from the beginning to force reconstruction.
|
||||
# Currently, since we're not able to reconstruct `ray.put` objects that
|
||||
|
@ -510,23 +519,13 @@ class ReconstructionTests(unittest.TestCase):
|
|||
def error_check(errors):
|
||||
return len(errors) > 1
|
||||
|
||||
errors = self.wait_for_errors(error_check)
|
||||
self.assertTrue(
|
||||
all(error["type"] == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR
|
||||
for error in errors))
|
||||
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
"This test does not work with xray yet.")
|
||||
class ReconstructionTestsMultinode(ReconstructionTests):
|
||||
|
||||
# Run the same tests as the single-node suite, but with 4 local schedulers,
|
||||
# one worker each.
|
||||
num_local_schedulers = 4
|
||||
errors = wait_for_errors(error_check)
|
||||
assert all(error["type"] == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR
|
||||
for error in errors)
|
||||
|
||||
|
||||
# NOTE(swang): This test tries to launch 1000 workers and breaks.
|
||||
# TODO(rkn): This test needs to be updated to use pytest.
|
||||
# class WorkerPoolTests(unittest.TestCase):
|
||||
#
|
||||
# def tearDown(self):
|
||||
|
@ -547,6 +546,3 @@ class ReconstructionTestsMultinode(ReconstructionTests):
|
|||
# ray.init(num_workers=1)
|
||||
# ray.get([g.remote(i) for i in range(1000)])
|
||||
# ray.worker.cleanup()
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
|
|
Loading…
Add table
Reference in a new issue