diff --git a/python/ray/serve/backend_state.py b/python/ray/serve/backend_state.py index 650646b8f..3458b361a 100644 --- a/python/ray/serve/backend_state.py +++ b/python/ray/serve/backend_state.py @@ -3,6 +3,7 @@ import time from abc import ABC from collections import defaultdict from enum import Enum +import os from typing import Any, Dict, List, Optional, Tuple import ray.cloudpickle as pickle @@ -33,6 +34,7 @@ class ReplicaState(Enum): ALL_REPLICA_STATES = list(ReplicaState) +USE_PLACEMENT_GROUP = os.environ.get("SERVE_USE_PLACEMENT_GROUP", "0") == "1" class ActorReplicaWrapper: @@ -82,17 +84,21 @@ class ActorReplicaWrapper: def start(self, backend_info: BackendInfo): self._actor_resources = backend_info.replica_config.resource_dict - try: - self._placement_group = ray.util.get_placement_group( - self._placement_group_name) - except ValueError: - logger.debug( - "Creating placement group '{}' for backend '{}'".format( - self._placement_group_name, self._backend_tag)) - self._placement_group = ray.util.placement_group( - [self._actor_resources], - lifetime="detached" if self._detached else None, - name=self._placement_group_name) + # Feature flagging because of placement groups doesn't handle + # newly added nodes. + # https://github.com/ray-project/ray/issues/15801 + if USE_PLACEMENT_GROUP: + try: + self._placement_group = ray.util.get_placement_group( + self._placement_group_name) + except ValueError: + logger.debug( + "Creating placement group '{}' for backend '{}'".format( + self._placement_group_name, self._backend_tag)) + self._placement_group = ray.util.placement_group( + [self._actor_resources], + lifetime="detached" if self._detached else None, + name=self._placement_group_name) try: self._actor_handle = ray.get_actor(self._actor_name) @@ -137,9 +143,11 @@ class ActorReplicaWrapper: return True try: - ray.get_actor(self._actor_name) + handle = ray.get_actor(self._actor_name) ready, _ = ray.wait([self._drain_obj_ref], timeout=0) self._stopped = len(ready) == 1 + if self._stopped: + ray.kill(handle, no_restart=True) except ValueError: self._stopped = True @@ -165,6 +173,9 @@ class ActorReplicaWrapper: Currently, this just removes the placement group. """ + if not USE_PLACEMENT_GROUP: + return + try: ray.util.remove_placement_group( ray.util.get_placement_group(self._placement_group_name)) diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index d897cb01a..c312d76c5 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -305,5 +305,3 @@ class RayServeReplica: f"Waiting for an additional {sleep_time}s to shut down " f"because there are {self.num_ongoing_requests} " "ongoing requests.") - - ray.actor.exit_actor() diff --git a/python/ray/serve/tests/test_cluster.py b/python/ray/serve/tests/test_cluster.py index 8bba10c52..3ee91ebde 100644 --- a/python/ray/serve/tests/test_cluster.py +++ b/python/ray/serve/tests/test_cluster.py @@ -75,7 +75,7 @@ def test_node_failure(ray_cluster): worker_node = cluster.add_node(num_cpus=2) - @serve.deployment(version="1", num_replicas=3) + @serve.deployment(version="1", num_replicas=5) def D(*args): return os.getpid() @@ -92,24 +92,24 @@ def test_node_failure(ray_cluster): print("Initial deploy.") D.deploy() - pids1 = get_pids(3) + pids1 = get_pids(5) - # Remove the node. There should still be one replica running. + # Remove the node. There should still be three replicas running. print("Kill node.") cluster.remove_node(worker_node) - pids2 = get_pids(1) + pids2 = get_pids(3) assert pids2.issubset(pids1) # Add a worker node back. One replica should get placed. print("Add back first node.") cluster.add_node(num_cpus=1) - pids3 = get_pids(2) + pids3 = get_pids(4) assert pids2.issubset(pids3) # Add another worker node. One more replica should get placed. print("Add back second node.") cluster.add_node(num_cpus=1) - pids4 = get_pids(3) + pids4 = get_pids(5) assert pids3.issubset(pids4)