Unskipped tests in serve: test_controller_recovery.py (#21450)

This commit is contained in:
Gagandeep Singh 2022-01-13 14:39:59 +05:30 committed by GitHub
parent a6e76c2803
commit d392f97331
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -13,7 +13,6 @@ from ray._private.test_utils import SignalActor
from ray.serve.utils import get_random_letters
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_recover_start_from_replica_actor_names(serve_instance):
"""Test controller is able to recover starting -> running replicas from
actor names.
@ -91,7 +90,6 @@ def test_recover_start_from_replica_actor_names(serve_instance):
"recover from actor names")
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_recover_rolling_update_from_replica_actor_names(serve_instance):
"""Test controller is able to recover starting -> updating -> running
replicas from actor names, with right replica versions during rolling
@ -130,14 +128,16 @@ def test_recover_rolling_update_from_replica_actor_names(serve_instance):
async def __call__(self, request):
return await self.handler()
def make_nonblocking_calls(expected, expect_blocking=False):
def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1):
# Returns dict[val, set(pid)].
blocking = []
responses = defaultdict(set)
start = time.time()
while time.time() - start < 30:
timeout_value = 60 if sys.platform == "win32" else 30
while time.time() - start < timeout_value:
refs = [call.remote(block=False) for _ in range(10)]
ready, not_ready = ray.wait(refs, timeout=5)
ready, not_ready = ray.wait(
refs, timeout=5, num_returns=num_returns)
for ref in ready:
val, pid = ray.get(ref)
responses[val].add(pid)
@ -145,7 +145,7 @@ def test_recover_rolling_update_from_replica_actor_names(serve_instance):
blocking.extend(not_ready)
if (all(
len(responses[val]) == num
len(responses[val]) >= num
for val, num in expected.items())
and (expect_blocking is False or len(blocking) > 0)):
break
@ -155,7 +155,7 @@ def test_recover_rolling_update_from_replica_actor_names(serve_instance):
return responses, blocking
V1.deploy()
responses1, _ = make_nonblocking_calls({"1": 2})
responses1, _ = make_nonblocking_calls({"1": 2}, num_returns=2)
pids1 = responses1["1"]
# ref2 will block a single replica until the signal is sent. Check that
@ -190,7 +190,7 @@ def test_recover_rolling_update_from_replica_actor_names(serve_instance):
# Now the goal and requests to the new version should complete.
# We should have two running replicas of the new version.
assert client._wait_for_goal(goal_ref)
make_nonblocking_calls({"2": 2})
make_nonblocking_calls({"2": 2}, num_returns=2)
if __name__ == "__main__":