mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Fix flaky test_actor_failures::test_actor_restart (#9509)
* Fix flaky test * os exit
This commit is contained in:
parent
1e46d4e29f
commit
baf4be245d
1 changed files with 41 additions and 39 deletions
|
@ -147,15 +147,16 @@ def test_actor_eviction(ray_start_regular):
|
|||
def test_actor_restart(ray_init_with_task_retry_delay):
|
||||
"""Test actor restart when actor process is killed."""
|
||||
|
||||
@ray.remote(max_restarts=1, max_task_retries=-1)
|
||||
@ray.remote(max_restarts=1)
|
||||
class RestartableActor:
|
||||
"""An actor that will be restarted at most once."""
|
||||
|
||||
def __init__(self):
|
||||
self.value = 0
|
||||
|
||||
def increase(self, delay=0):
|
||||
time.sleep(delay)
|
||||
def increase(self, exit=False):
|
||||
if exit:
|
||||
os._exit(-1)
|
||||
self.value += 1
|
||||
return self.value
|
||||
|
||||
|
@ -163,54 +164,55 @@ def test_actor_restart(ray_init_with_task_retry_delay):
|
|||
return os.getpid()
|
||||
|
||||
actor = RestartableActor.remote()
|
||||
pid = ray.get(actor.get_pid.remote())
|
||||
results = [actor.increase.remote() for _ in range(100)]
|
||||
# Kill actor process, while the above task is still being executed.
|
||||
os.kill(pid, SIGKILL)
|
||||
wait_for_pid_to_exit(pid)
|
||||
# Submit some tasks and kill on a task midway through.
|
||||
results = [actor.increase.remote(exit=(i == 100)) for i in range(200)]
|
||||
# Make sure that all tasks were executed in order before the actor's death.
|
||||
res = results.pop(0)
|
||||
i = 1
|
||||
while True:
|
||||
while results:
|
||||
res = results[0]
|
||||
try:
|
||||
r = ray.get(res)
|
||||
if r != i:
|
||||
# Actor restarted without any failed tasks.
|
||||
# Actor restarted at this task without any failed tasks in
|
||||
# between.
|
||||
break
|
||||
res = results.pop(0)
|
||||
results.pop(0)
|
||||
i += 1
|
||||
except ray.exceptions.RayActorError:
|
||||
# Actor restarted.
|
||||
break
|
||||
# Find the first task to execute after the actor was restarted.
|
||||
while True:
|
||||
# Skip any tasks that errored.
|
||||
while results:
|
||||
try:
|
||||
r = ray.get(res)
|
||||
break
|
||||
ray.get(results[0])
|
||||
except ray.exceptions.RayActorError:
|
||||
res = results.pop(0)
|
||||
pass
|
||||
# Make sure that all tasks were executed in order after the actor's death.
|
||||
i = 1
|
||||
while True:
|
||||
r = ray.get(res)
|
||||
assert r == i
|
||||
results.pop(0)
|
||||
# Check all tasks that executed after the restart.
|
||||
if results:
|
||||
res = results.pop(0)
|
||||
# The actor executed some tasks after the restart.
|
||||
i = 1
|
||||
while results:
|
||||
r = ray.get(results.pop(0))
|
||||
assert r == i
|
||||
i += 1
|
||||
else:
|
||||
break
|
||||
|
||||
# Check that we can still call the actor.
|
||||
result = actor.increase.remote()
|
||||
assert ray.get(result) == r + 1
|
||||
else:
|
||||
# Wait for the actor to restart.
|
||||
def ping():
|
||||
try:
|
||||
ray.get(actor.increase.remote())
|
||||
return True
|
||||
except ray.exceptions.RayActorError:
|
||||
return False
|
||||
|
||||
# kill actor process one more time.
|
||||
results = [actor.increase.remote() for _ in range(100)]
|
||||
pid = ray.get(actor.get_pid.remote())
|
||||
os.kill(pid, SIGKILL)
|
||||
wait_for_pid_to_exit(pid)
|
||||
# The actor has exceeded max restarts, and this task should fail.
|
||||
wait_for_condition(ping)
|
||||
|
||||
# The actor has restarted. Kill actor process one more time.
|
||||
actor.increase.remote(exit=True)
|
||||
# The actor has exceeded max restarts. All tasks should fail.
|
||||
for _ in range(100):
|
||||
with pytest.raises(ray.exceptions.RayActorError):
|
||||
ray.get(actor.increase.remote())
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue