[Serve] Change ReplicaName to use internal prefix (#20067)

This commit is contained in:
Simon Mo 2021-11-11 14:21:34 -08:00 committed by GitHub
parent 992ab3e098
commit fca851eef5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 82 additions and 26 deletions

View file

@ -214,6 +214,14 @@ py_test(
deps = [":serve_lib"],
)
py_test(
name = "test_standalone2",
size = "medium",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)
py_test(
name = "test_cluster",
size = "small",

View file

@ -1,9 +1,8 @@
import ray
from dataclasses import dataclass
from typing import Optional
from uuid import UUID
import ray
from ray.actor import ActorClass, ActorHandle
from ray.serve.config import DeploymentConfig, ReplicaConfig
from ray.serve.autoscaling_policy import AutoscalingPolicy
@ -49,24 +48,28 @@ class ReplicaName:
replica_suffix: str
replica_tag: ReplicaTag = ""
delimiter: str = "#"
prefix: str = "SERVE_REPLICA::"
def __init__(self, deployment_tag: str, replica_suffix: str):
self.deployment_tag = deployment_tag
self.replica_suffix = replica_suffix
self.replica_tag = f"{deployment_tag}{self.delimiter}{replica_suffix}"
@staticmethod
def is_replica_name(actor_name: str) -> bool:
return actor_name.startswith(ReplicaName.prefix)
@classmethod
def from_str(self, replica_name):
parsed = replica_name.split(self.delimiter)
def from_str(cls, actor_name):
assert ReplicaName.is_replica_name(actor_name)
# TODO(simon): this currently conforms the tag and suffix logic. We
# can try to keep the internal name always hard coded with the prefix.
replica_name = actor_name.replace(cls.prefix, "")
parsed = replica_name.split(cls.delimiter)
assert len(parsed) == 2, (
f"Given replica name {replica_name} didn't match pattern, please "
f"ensure it has exactly two fields with delimiter {self.delimiter}"
)
self.deployment_tag = parsed[0]
self.replica_suffix = parsed[1]
self.replica_tag = replica_name
return self
f"ensure it has exactly two fields with delimiter {cls.delimiter}")
return cls(deployment_tag=parsed[0], replica_suffix=parsed[1])
def __str__(self):
return self.replica_tag

View file

@ -13,9 +13,9 @@ from ray.serve.async_goal_manager import AsyncGoalManager
from ray.serve.common import (DeploymentInfo, Duration, GoalId, ReplicaTag,
ReplicaName, RunningReplicaInfo)
from ray.serve.config import DeploymentConfig
from ray.serve.constants import (
CONTROLLER_STARTUP_GRACE_PERIOD_S, SERVE_CONTROLLER_NAME, SERVE_PROXY_NAME,
MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT, MAX_NUM_DELETED_DEPLOYMENTS)
from ray.serve.constants import (CONTROLLER_STARTUP_GRACE_PERIOD_S,
MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT,
MAX_NUM_DELETED_DEPLOYMENTS)
from ray.serve.storage.kv_store import KVStoreBase
from ray.serve.long_poll import LongPollHost, LongPollNamespace
from ray.serve.utils import format_actor_name, get_random_letters, logger
@ -364,8 +364,8 @@ class DeploymentReplica(VersionedReplica):
replica_tag: ReplicaTag, deployment_name: str,
version: DeploymentVersion):
self._actor = ActorReplicaWrapper(
format_actor_name(replica_tag), detached, controller_name,
replica_tag, deployment_name)
f"{ReplicaName.prefix}{format_actor_name(replica_tag)}", detached,
controller_name, replica_tag, deployment_name)
self._controller_name = controller_name
self._deployment_name = deployment_name
self._replica_tag = replica_tag
@ -1262,8 +1262,7 @@ class DeploymentStateManager:
"""
all_replica_names = [
actor_name for actor_name in all_current_actor_names
if (SERVE_CONTROLLER_NAME not in actor_name
and SERVE_PROXY_NAME not in actor_name)
if ReplicaName.is_replica_name(actor_name)
]
deployment_to_current_replicas = defaultdict(list)
if len(all_replica_names) > 0:

View file

@ -15,20 +15,34 @@ def test_replica_tag_formatting():
def test_replica_name_from_str():
replica_suffix = get_random_letters()
replica_tag = f"DeploymentA#{replica_suffix}"
actor_name = f"{ReplicaName.prefix}DeploymentA#{replica_suffix}"
replica_name = ReplicaName.from_str(replica_tag)
assert replica_name.replica_tag == replica_tag
assert str(replica_tag) == replica_tag
replica_name = ReplicaName.from_str(actor_name)
assert str(replica_name) == replica_name.replica_tag == actor_name.replace(
ReplicaName.prefix, "")
def test_invalid_name_from_str():
replica_suffix = get_random_letters()
replica_tag = f"DeploymentA##{replica_suffix}"
replica_tag = f"DeploymentA##{replica_suffix}"
with pytest.raises(AssertionError):
ReplicaName.from_str(replica_tag)
# No prefix
replica_tag = f"DeploymentA#{replica_suffix}"
with pytest.raises(AssertionError):
ReplicaName.from_str(replica_tag)
def test_is_replica_name():
replica_suffix = get_random_letters()
assert not ReplicaName.is_replica_name(f"DeploymentA##{replica_suffix}")
assert not ReplicaName.is_replica_name(f"DeploymentA#{replica_suffix}")
assert ReplicaName.is_replica_name(
f"{ReplicaName.prefix}DeploymentA#{replica_suffix}")
if __name__ == "__main__":
import sys

View file

@ -13,6 +13,7 @@ from ray.serve.common import (
str,
ReplicaConfig,
ReplicaTag,
ReplicaName,
)
from ray.serve.deployment_state import (
DeploymentState,
@ -1882,7 +1883,7 @@ def test_resume_deployment_state_from_replica_tags(
# Step 3: Create new deployment_state by resuming from passed in replicas
deployment_state_manager._recover_from_checkpoint(
[mocked_replica.replica_tag])
[ReplicaName.prefix + mocked_replica.replica_tag])
# Step 4: Ensure new deployment_state is correct
# deployment state behind "test" is re-created in recovery flow

View file

@ -513,7 +513,7 @@ A.deploy()"""
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_local_store_recovery():
def test_local_store_recovery(ray_shutdown):
_, tmp_path = mkstemp()
@serve.deployment
@ -551,7 +551,8 @@ def test_local_store_recovery():
"num_cpus": 4
}], indirect=True)
def test_snapshot_always_written_to_internal_kv(
ray_start_with_dashboard): # noqa: F811
ray_start_with_dashboard, # noqa: F811
ray_shutdown):
# https://github.com/ray-project/ray/issues/19752
_, tmp_path = mkstemp()

View file

@ -0,0 +1,30 @@
import sys
import pytest
import ray
from ray import serve
def test_standalone_actor_outside_serve():
# https://github.com/ray-project/ray/issues/20066
ray.init(num_cpus=8, namespace="serve")
@ray.remote
class MyActor:
def ready(self):
return
a = MyActor.options(name="my_actor").remote()
ray.get(a.ready.remote())
serve.start()
serve.shutdown()
ray.get(a.ready.remote())
ray.shutdown()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))