mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Add test that tasks sent to actor on dead node raise exceptions. (#2626)
* Add actor failure test. * Minor change. * Make test harder. * Change numbers a bit. * Skip test for non xray.
This commit is contained in:
parent
06a58016d8
commit
aaf5456b3d
1 changed files with 51 additions and 0 deletions
|
@ -1253,6 +1253,57 @@ class ActorsWithGPUs(unittest.TestCase):
|
|||
assert remaining_ids == [x_id]
|
||||
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get("RAY_USE_XRAY") != "1", "This test only works with xray.")
|
||||
class ActorExceptionFailures(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
ray.shutdown()
|
||||
|
||||
def testExceptionRaisedWhenActorNodeDies(self):
|
||||
ray.worker._init(
|
||||
start_ray_local=True, num_local_schedulers=2, num_cpus=1)
|
||||
|
||||
@ray.remote
|
||||
class Counter(object):
|
||||
def __init__(self):
|
||||
self.x = 0
|
||||
|
||||
def local_plasma(self):
|
||||
return ray.worker.global_worker.plasma_client.store_socket_name
|
||||
|
||||
def inc(self):
|
||||
self.x += 1
|
||||
return self.x
|
||||
|
||||
local_plasma = ray.worker.global_worker.plasma_client.store_socket_name
|
||||
|
||||
# Create an actor that is not on the local scheduler.
|
||||
actor = Counter.remote()
|
||||
while ray.get(actor.local_plasma.remote()) == local_plasma:
|
||||
actor = Counter.remote()
|
||||
|
||||
# Kill the second plasma store to get rid of the cached objects and
|
||||
# trigger the corresponding local scheduler to exit.
|
||||
process = ray.services.all_processes[
|
||||
ray.services.PROCESS_TYPE_PLASMA_STORE][1]
|
||||
process.kill()
|
||||
|
||||
# Submit some new actor tasks.
|
||||
x_ids = [actor.inc.remote() for _ in range(100)]
|
||||
|
||||
# Make sure that getting the result raises an exception.
|
||||
for _ in range(10):
|
||||
for x_id in x_ids:
|
||||
with pytest.raises(ray.worker.RayGetError):
|
||||
# There is some small chance that ray.get will actually
|
||||
# succeed (if the object is transferred before the raylet
|
||||
# dies).
|
||||
ray.get(x_id)
|
||||
|
||||
# Make sure the process has exited.
|
||||
process.wait()
|
||||
|
||||
|
||||
@unittest.skipIf(
|
||||
os.environ.get("RAY_USE_XRAY") == "1",
|
||||
"This test does not work with xray yet.")
|
||||
|
|
Loading…
Add table
Reference in a new issue