From d392f97331889441625a0a232a17acb7da7cdf0a Mon Sep 17 00:00:00 2001 From: Gagandeep Singh Date: Thu, 13 Jan 2022 14:39:59 +0530 Subject: [PATCH] Unskipped tests in serve: `test_controller_recovery.py` (#21450) --- .../test_controller_recovery.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/ray/serve/tests/fault_tolerance_tests/test_controller_recovery.py b/python/ray/serve/tests/fault_tolerance_tests/test_controller_recovery.py index 9ce9dd43e..334e715d1 100644 --- a/python/ray/serve/tests/fault_tolerance_tests/test_controller_recovery.py +++ b/python/ray/serve/tests/fault_tolerance_tests/test_controller_recovery.py @@ -13,7 +13,6 @@ from ray._private.test_utils import SignalActor from ray.serve.utils import get_random_letters -@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_recover_start_from_replica_actor_names(serve_instance): """Test controller is able to recover starting -> running replicas from actor names. @@ -91,7 +90,6 @@ def test_recover_start_from_replica_actor_names(serve_instance): "recover from actor names") -@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_recover_rolling_update_from_replica_actor_names(serve_instance): """Test controller is able to recover starting -> updating -> running replicas from actor names, with right replica versions during rolling @@ -130,14 +128,16 @@ def test_recover_rolling_update_from_replica_actor_names(serve_instance): async def __call__(self, request): return await self.handler() - def make_nonblocking_calls(expected, expect_blocking=False): + def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1): # Returns dict[val, set(pid)]. blocking = [] responses = defaultdict(set) start = time.time() - while time.time() - start < 30: + timeout_value = 60 if sys.platform == "win32" else 30 + while time.time() - start < timeout_value: refs = [call.remote(block=False) for _ in range(10)] - ready, not_ready = ray.wait(refs, timeout=5) + ready, not_ready = ray.wait( + refs, timeout=5, num_returns=num_returns) for ref in ready: val, pid = ray.get(ref) responses[val].add(pid) @@ -145,7 +145,7 @@ def test_recover_rolling_update_from_replica_actor_names(serve_instance): blocking.extend(not_ready) if (all( - len(responses[val]) == num + len(responses[val]) >= num for val, num in expected.items()) and (expect_blocking is False or len(blocking) > 0)): break @@ -155,7 +155,7 @@ def test_recover_rolling_update_from_replica_actor_names(serve_instance): return responses, blocking V1.deploy() - responses1, _ = make_nonblocking_calls({"1": 2}) + responses1, _ = make_nonblocking_calls({"1": 2}, num_returns=2) pids1 = responses1["1"] # ref2 will block a single replica until the signal is sent. Check that @@ -190,7 +190,7 @@ def test_recover_rolling_update_from_replica_actor_names(serve_instance): # Now the goal and requests to the new version should complete. # We should have two running replicas of the new version. assert client._wait_for_goal(goal_ref) - make_nonblocking_calls({"2": 2}) + make_nonblocking_calls({"2": 2}, num_returns=2) if __name__ == "__main__":