From baf4be245d444d286ab060d05e46df82bc43109c Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 16 Jul 2020 10:48:33 -0700 Subject: [PATCH] Fix flaky test_actor_failures::test_actor_restart (#9509) * Fix flaky test * os exit --- python/ray/tests/test_actor_failures.py | 80 +++++++++++++------------ 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index ecebd919d..997fc2c34 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -147,15 +147,16 @@ def test_actor_eviction(ray_start_regular): def test_actor_restart(ray_init_with_task_retry_delay): """Test actor restart when actor process is killed.""" - @ray.remote(max_restarts=1, max_task_retries=-1) + @ray.remote(max_restarts=1) class RestartableActor: """An actor that will be restarted at most once.""" def __init__(self): self.value = 0 - def increase(self, delay=0): - time.sleep(delay) + def increase(self, exit=False): + if exit: + os._exit(-1) self.value += 1 return self.value @@ -163,56 +164,57 @@ def test_actor_restart(ray_init_with_task_retry_delay): return os.getpid() actor = RestartableActor.remote() - pid = ray.get(actor.get_pid.remote()) - results = [actor.increase.remote() for _ in range(100)] - # Kill actor process, while the above task is still being executed. - os.kill(pid, SIGKILL) - wait_for_pid_to_exit(pid) + # Submit some tasks and kill on a task midway through. + results = [actor.increase.remote(exit=(i == 100)) for i in range(200)] # Make sure that all tasks were executed in order before the actor's death. - res = results.pop(0) i = 1 - while True: + while results: + res = results[0] try: r = ray.get(res) if r != i: - # Actor restarted without any failed tasks. + # Actor restarted at this task without any failed tasks in + # between. break - res = results.pop(0) + results.pop(0) i += 1 except ray.exceptions.RayActorError: - # Actor restarted. break - # Find the first task to execute after the actor was restarted. - while True: + # Skip any tasks that errored. + while results: try: - r = ray.get(res) - break + ray.get(results[0]) except ray.exceptions.RayActorError: - res = results.pop(0) - pass - # Make sure that all tasks were executed in order after the actor's death. - i = 1 - while True: - r = ray.get(res) - assert r == i - if results: - res = results.pop(0) + results.pop(0) + # Check all tasks that executed after the restart. + if results: + # The actor executed some tasks after the restart. + i = 1 + while results: + r = ray.get(results.pop(0)) + assert r == i i += 1 - else: - break - # Check that we can still call the actor. - result = actor.increase.remote() - assert ray.get(result) == r + 1 + # Check that we can still call the actor. + result = actor.increase.remote() + assert ray.get(result) == r + 1 + else: + # Wait for the actor to restart. + def ping(): + try: + ray.get(actor.increase.remote()) + return True + except ray.exceptions.RayActorError: + return False - # kill actor process one more time. - results = [actor.increase.remote() for _ in range(100)] - pid = ray.get(actor.get_pid.remote()) - os.kill(pid, SIGKILL) - wait_for_pid_to_exit(pid) - # The actor has exceeded max restarts, and this task should fail. - with pytest.raises(ray.exceptions.RayActorError): - ray.get(actor.increase.remote()) + wait_for_condition(ping) + + # The actor has restarted. Kill actor process one more time. + actor.increase.remote(exit=True) + # The actor has exceeded max restarts. All tasks should fail. + for _ in range(100): + with pytest.raises(ray.exceptions.RayActorError): + ray.get(actor.increase.remote()) # Create another actor. actor = RestartableActor.remote()