[serve] Fix serve_failure test (#20268)

This commit is contained in:
Edward Oakes 2021-11-11 19:19:34 -08:00 committed by GitHub
parent eb6449b21b
commit 7c9881b73d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 22 additions and 8 deletions

View file

@ -703,9 +703,9 @@ class DeploymentState:
"Target state should be recovered successfully first before "
"recovering current state from replica actor names.")
logger.info("Recovering current state for deployment "
f"{self._name} from {len(replica_actor_names)} actors in "
"current ray cluster..")
logger.debug("Recovering current state for deployment "
f"{self._name} from {len(replica_actor_names)} actors in "
"current ray cluster..")
# All current states use default value, only attach running replicas.
for replica_actor_name in replica_actor_names:
replica_name: ReplicaName = ReplicaName.from_str(

View file

@ -151,7 +151,8 @@ def create_replica_wrapper(name: str, serialized_deployment_def: bytes):
async def prepare_for_shutdown(self):
self.shutdown_event.set()
return await self.replica.prepare_for_shutdown()
if self.replica is not None:
return await self.replica.prepare_for_shutdown()
async def run_forever(self):
await self.shutdown_event.wait()

View file

@ -18,6 +18,14 @@ def test_validation():
DeploymentVersion(123, {"set": set()})
def test_other_type_equality():
v = DeploymentVersion("1", None)
assert v is not None
assert v != "1"
assert v != None # noqa: E711
def test_code_version():
v1 = DeploymentVersion("1", None)
v2 = DeploymentVersion("1", None)

View file

@ -30,6 +30,8 @@ class DeploymentVersion:
return self._hash
def __eq__(self, other: Any) -> bool:
if not isinstance(other, DeploymentVersion):
return False
return self._hash == other._hash

View file

@ -54,7 +54,7 @@ ray.init(
namespace="serve_failure_test",
address=cluster.address,
dashboard_host="0.0.0.0",
log_to_driver=False)
log_to_driver=True)
serve.start(detached=True)
@ -76,8 +76,9 @@ class RandomKiller:
def run(self):
while True:
ray.kill(
random.choice(self._get_all_serve_actors()), no_restart=False)
chosen = random.choice(self._get_all_serve_actors())
print(f"Killing {chosen}")
ray.kill(chosen, no_restart=False)
time.sleep(self.kill_period_s)
@ -125,7 +126,9 @@ class RandomTest:
while True:
for _ in range(100):
actions, weights = zip(*self.weighted_actions)
random.choices(actions, weights=weights)[0]()
action_chosen = random.choices(actions, weights=weights)[0]
print(f"Executing {action_chosen}")
action_chosen()
new_time = time.time()
print("Iteration {}:\n"