mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[core] Disallow empty string namespace + add validation tests (#18048)
This commit is contained in:
parent
451c501bbd
commit
406dc0b458
10 changed files with 72 additions and 24 deletions
|
@ -1096,3 +1096,11 @@ def get_release_wheel_url(
|
|||
# e.g. https://ray-wheels.s3-us-west-2.amazonaws.com/releases/1.4.0rc1/e7c7
|
||||
# f6371a69eb727fa469e4cd6f4fbefd143b4c/ray-1.4.0rc1-cp36-cp36m-manylinux201
|
||||
# 4_x86_64.whl
|
||||
|
||||
|
||||
def validate_namespace(namespace: str):
|
||||
if not isinstance(namespace, str):
|
||||
raise TypeError("namespace must be None or a string.")
|
||||
elif namespace == "":
|
||||
raise ValueError("\"\" is not a valid namespace. "
|
||||
"Pass None to not specify a namespace.")
|
||||
|
|
|
@ -619,11 +619,7 @@ class ActorClass:
|
|||
elif name == "":
|
||||
raise ValueError("Actor name cannot be an empty string.")
|
||||
if namespace is not None:
|
||||
if not isinstance(namespace, str):
|
||||
raise TypeError(f"namespace must be None or a string, "
|
||||
f"got: '{type(namespace)}'.")
|
||||
elif namespace == "":
|
||||
raise ValueError("Actor namespace cannot be an empty string.")
|
||||
ray._private.utils.validate_namespace(namespace)
|
||||
|
||||
# Check whether the name is already taken.
|
||||
# TODO(edoakes): this check has a race condition because two drivers
|
||||
|
|
|
@ -777,7 +777,7 @@ def test_detached_actor_cleanup(ray_start_regular):
|
|||
import ray
|
||||
import ray._private.gcs_utils as gcs_utils
|
||||
import time
|
||||
ray.init(address="{}", namespace="")
|
||||
ray.init(address="{}", namespace="default_test_namespace")
|
||||
|
||||
@ray.remote
|
||||
class DetachedActor:
|
||||
|
|
|
@ -128,7 +128,8 @@ def test_delete_refs_on_disconnect(ray_start_cluster):
|
|||
|
||||
# Connect to the real ray again, since we disconnected
|
||||
# upon num_clients = 0.
|
||||
real_ray.init(address=cluster.address, namespace="")
|
||||
real_ray.init(
|
||||
address=cluster.address, namespace="default_test_namespace")
|
||||
|
||||
def test_cond():
|
||||
return object_memory_usage() == 0
|
||||
|
@ -190,7 +191,8 @@ def test_delete_actor_on_disconnect(ray_start_cluster):
|
|||
|
||||
# Connect to the real ray again, since we disconnected
|
||||
# upon num_clients = 0.
|
||||
real_ray.init(address=cluster.address, namespace="")
|
||||
real_ray.init(
|
||||
address=cluster.address, namespace="default_test_namespace")
|
||||
|
||||
wait_for_condition(test_cond, timeout=10)
|
||||
|
||||
|
@ -258,7 +260,7 @@ def test_named_actor_refcount(ray_start_regular):
|
|||
|
||||
def connect_api():
|
||||
api = _ClientContext()
|
||||
api.connect("localhost:50051", namespace="")
|
||||
api.connect("localhost:50051", namespace="default_test_namespace")
|
||||
api.get_actor("actor")
|
||||
return api
|
||||
|
||||
|
|
|
@ -56,11 +56,11 @@ _ = Actor.remote()
|
|||
def test_job_gc_with_detached_actor(call_ray_start):
|
||||
address = call_ray_start
|
||||
|
||||
ray.init(address=address, namespace="")
|
||||
ray.init(address=address, namespace="test")
|
||||
driver = """
|
||||
import ray
|
||||
|
||||
ray.init(address="{}", namespace="")
|
||||
ray.init(address="{}", namespace="test")
|
||||
|
||||
@ray.remote
|
||||
class Actor:
|
||||
|
|
|
@ -235,13 +235,13 @@ def test_drivers_named_actors(call_ray_start):
|
|||
# named actor.
|
||||
address = call_ray_start
|
||||
|
||||
ray.init(address=address, namespace="")
|
||||
ray.init(address=address, namespace="test")
|
||||
|
||||
# Define a driver that creates a named actor then sleeps for a while.
|
||||
driver_script1 = """
|
||||
import ray
|
||||
import time
|
||||
ray.init(address="{}", namespace="")
|
||||
ray.init(address="{}", namespace="test")
|
||||
@ray.remote
|
||||
class Counter:
|
||||
def __init__(self):
|
||||
|
@ -257,7 +257,7 @@ time.sleep(100)
|
|||
driver_script2 = """
|
||||
import ray
|
||||
import time
|
||||
ray.init(address="{}", namespace="")
|
||||
ray.init(address="{}", namespace="test")
|
||||
while True:
|
||||
try:
|
||||
counter = ray.get_actor("Counter")
|
||||
|
|
|
@ -167,7 +167,7 @@ def train_func(config, reporter): # add a reporter arg
|
|||
reporter(timesteps_total=i, mean_accuracy=i+97) # report metrics
|
||||
|
||||
os.environ["TUNE_RESUME_PROMPT_OFF"] = "True"
|
||||
ray.init(address="{}", namespace="")
|
||||
ray.init(address="{}", namespace="default_test_namespace")
|
||||
ray.tune.register_trainable("train_func", train_func)
|
||||
|
||||
tune.run_experiments({{
|
||||
|
|
|
@ -205,9 +205,9 @@ print("Done!!!")
|
|||
|
||||
print(
|
||||
run_string_as_driver(
|
||||
template.format(address="localhost:8080", namespace="")))
|
||||
template.format(address="localhost:8080", namespace="test")))
|
||||
|
||||
ray.util.connect("localhost:8080", namespace="")
|
||||
ray.util.connect("localhost:8080", namespace="test")
|
||||
|
||||
pinger = ray.get_actor("Pinger")
|
||||
assert ray.get(pinger.ping.remote()) == "pong from other job"
|
||||
|
@ -225,5 +225,39 @@ def test_runtime_context(shutdown_only):
|
|||
assert namespace == ray.get_runtime_context().get()["namespace"]
|
||||
|
||||
|
||||
def test_namespace_validation(shutdown_only):
|
||||
with pytest.raises(TypeError):
|
||||
ray.init(namespace=123)
|
||||
|
||||
ray.shutdown()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
ray.init(namespace="")
|
||||
|
||||
ray.shutdown()
|
||||
|
||||
ray.init(namespace="abc")
|
||||
|
||||
@ray.remote
|
||||
class A:
|
||||
pass
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
A.options(namespace=123).remote()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
A.options(namespace="").remote()
|
||||
|
||||
A.options(name="a", namespace="test", lifetime="detached").remote()
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
ray.get_actor("a", namespace=123)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
ray.get_actor("a", namespace="")
|
||||
|
||||
ray.get_actor("a", namespace="test")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
|
|
|
@ -551,7 +551,7 @@ def test_placement_group_reschedule_when_node_dead(ray_start_cluster,
|
|||
cluster.add_node(num_cpus=4)
|
||||
cluster.add_node(num_cpus=4)
|
||||
cluster.wait_for_nodes()
|
||||
ray.init(address=cluster.address, namespace="")
|
||||
ray.init(address=cluster.address, namespace="default_test_namespace")
|
||||
|
||||
# Make sure both head and worker node are alive.
|
||||
nodes = ray.nodes()
|
||||
|
@ -1196,14 +1196,15 @@ def test_automatic_cleanup_detached_actors(ray_start_cluster):
|
|||
cluster.add_node(num_cpus=num_cpu_per_node)
|
||||
cluster.wait_for_nodes()
|
||||
|
||||
info = ray.init(address=cluster.address, namespace="")
|
||||
info = ray.init(
|
||||
address=cluster.address, namespace="default_test_namespace")
|
||||
available_cpus = ray.available_resources()["CPU"]
|
||||
assert available_cpus == num_nodes * num_cpu_per_node
|
||||
|
||||
driver_code = f"""
|
||||
import ray
|
||||
|
||||
ray.init(address="{info["redis_address"]}", namespace="")
|
||||
ray.init(address="{info["redis_address"]}", namespace="default_test_namespace")
|
||||
|
||||
def create_pg():
|
||||
pg = ray.util.placement_group(
|
||||
|
@ -1541,14 +1542,15 @@ def test_named_placement_group(ray_start_cluster):
|
|||
for _ in range(2):
|
||||
cluster.add_node(num_cpus=3)
|
||||
cluster.wait_for_nodes()
|
||||
info = ray.init(address=cluster.address, namespace="")
|
||||
info = ray.init(
|
||||
address=cluster.address, namespace="default_test_namespace")
|
||||
global_placement_group_name = "named_placement_group"
|
||||
|
||||
# Create a detached placement group with name.
|
||||
driver_code = f"""
|
||||
import ray
|
||||
|
||||
ray.init(address="{info["redis_address"]}", namespace="")
|
||||
ray.init(address="{info["redis_address"]}", namespace="default_test_namespace")
|
||||
|
||||
pg = ray.util.placement_group(
|
||||
[{{"CPU": 1}} for _ in range(2)],
|
||||
|
|
|
@ -12,7 +12,7 @@ import sys
|
|||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from typing import Any, Dict, List, Iterator
|
||||
from typing import Any, Dict, List, Optional, Iterator
|
||||
|
||||
# Ray modules
|
||||
from ray.autoscaler._private.constants import AUTOSCALER_EVENTS
|
||||
|
@ -1352,6 +1352,8 @@ def connect(node,
|
|||
job_config = ray.job_config.JobConfig()
|
||||
|
||||
if namespace is not None:
|
||||
ray._private.utils.validate_namespace(namespace)
|
||||
|
||||
# The namespace field of job config may have already been set in code
|
||||
# paths such as the client.
|
||||
job_config.set_ray_namespace(namespace)
|
||||
|
@ -1796,7 +1798,7 @@ def wait(object_refs, *, num_returns=1, timeout=None, fetch_local=True):
|
|||
|
||||
@PublicAPI
|
||||
@client_mode_hook
|
||||
def get_actor(name: str, namespace: str = None):
|
||||
def get_actor(name: str, namespace: Optional[str] = None):
|
||||
"""Get a handle to a named actor.
|
||||
|
||||
Gets a handle to an actor with the given name. The actor must
|
||||
|
@ -1816,6 +1818,10 @@ def get_actor(name: str, namespace: str = None):
|
|||
"""
|
||||
if not name:
|
||||
raise ValueError("Please supply a non-empty value to get_actor")
|
||||
|
||||
if namespace is not None:
|
||||
ray._private.utils.validate_namespace(namespace)
|
||||
|
||||
worker = global_worker
|
||||
worker.check_connected()
|
||||
return worker.core_worker.get_named_actor_handle(name, namespace or "")
|
||||
|
|
Loading…
Add table
Reference in a new issue