Better checking that ray.init() has been called. (#10261)

This commit is contained in:
Robert Nishihara 2020-08-25 17:13:11 -07:00 committed by GitHub
parent d4537ac1ce
commit 79eefbf357
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 92 additions and 60 deletions

View file

@ -98,23 +98,14 @@ __commit__ = "{{RAY_COMMIT_SHA}}"
__version__ = "0.9.0.dev0"
__all__ = [
"jobs",
"nodes",
"actors",
"objects",
"timeline",
"object_transfer_timeline",
"cluster_resources",
"available_resources",
"LOCAL_MODE",
"PYTHON_MODE",
"SCRIPT_MODE",
"WORKER_MODE",
"__version__",
"_config",
"_get_runtime_context",
"actor",
"actors",
"available_resources",
"cancel",
"cluster_resources",
"connect",
"disconnect",
"get",
@ -125,20 +116,29 @@ __all__ = [
"init",
"internal",
"is_initialized",
"java_actor_class",
"java_function",
"jobs",
"kill",
"Language",
"method",
"nodes",
"objects",
"object_transfer_timeline",
"profile",
"projects",
"put",
"kill",
"register_custom_serializer",
"remote",
"shutdown",
"show_in_webui",
"wait",
"Language",
"java_function",
"java_actor_class",
"timeline",
"util",
"wait",
"LOCAL_MODE",
"PYTHON_MODE",
"SCRIPT_MODE",
"WORKER_MODE",
]
# ID types

View file

@ -472,9 +472,7 @@ class ActorClass:
raise ValueError("max_concurrency must be >= 1")
worker = ray.worker.global_worker
if worker.mode is None:
raise RuntimeError("Actors cannot be created before ray.init() "
"has been called.")
worker.check_connected()
if detached:
logger.warning("The detached flag is deprecated. To create a "

View file

@ -46,17 +46,11 @@ class GlobalState:
RuntimeError: An exception is raised if ray.init() has not been
called yet.
"""
if self.redis_client is None:
raise RuntimeError("The ray global state API cannot be used "
"before ray.init has been called.")
if self.redis_clients is None:
raise RuntimeError("The ray global state API cannot be used "
"before ray.init has been called.")
if self.global_state_accessor is None:
raise RuntimeError("The ray global state API cannot be used "
"before ray.init has been called.")
if (self.redis_client is None or self.redis_clients is None
or self.global_state_accessor is None):
raise ray.exceptions.RayConnectionError(
"Ray has not been started yet. You can start Ray with "
"'ray.init()'.")
def disconnect(self):
"""Disconnect global state from GCS."""

View file

@ -26,6 +26,7 @@ py_test_module_list(
"test_component_failures_2.py",
"test_component_failures_3.py",
"test_dynres.py",
"test_error_ray_not_initialized.py",
"test_gcs_fault_tolerance.py",
"test_global_gc.py",
"test_iter.py",

View file

@ -123,21 +123,6 @@ def wait_for_num_objects(num_objects, timeout=10):
def test_global_state_api(shutdown_only):
error_message = ("The ray global state API cannot be used "
"before ray.init has been called.")
with pytest.raises(Exception, match=error_message):
ray.objects()
with pytest.raises(Exception, match=error_message):
ray.actors()
with pytest.raises(Exception, match=error_message):
ray.nodes()
with pytest.raises(Exception, match=error_message):
ray.jobs()
ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1})
assert ray.cluster_resources()["CPU"] == 5
@ -462,15 +447,6 @@ def test_non_ascii_comment(ray_start_regular):
assert ray.get(f.remote()) == 1
def test_shutdown_disconnect_global_state():
ray.init(num_cpus=0)
ray.shutdown()
with pytest.raises(Exception) as e:
ray.objects()
assert str(e.value).endswith("ray.init has been called.")
@pytest.mark.parametrize(
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_put_pins_object(ray_start_object_store_memory):

View file

@ -0,0 +1,54 @@
import sys
import pytest
import ray
def test_errors_before_initializing_ray():
@ray.remote
def f():
pass
@ray.remote
class Foo:
pass
api_methods = [
f.remote,
Foo.remote,
ray.actors,
lambda: ray.cancel(None), # Not valid API usage.
lambda: ray.get([]),
lambda: ray.get_actor("name"),
ray.get_gpu_ids,
ray.get_resource_ids,
ray.get_webui_url,
ray.jobs,
lambda: ray.kill(None), # Not valid API usage.
ray.nodes,
ray.objects,
lambda: ray.put(1),
lambda: ray.wait([])
]
def test_exceptions_raised():
for api_method in api_methods:
print(api_method)
with pytest.raises(
ray.exceptions.RayConnectionError,
match="Ray has not been started yet."):
api_method()
test_exceptions_raised()
# Make sure that the exceptions are still raised after Ray has been
# started and shutdown.
ray.init(num_cpus=0)
ray.shutdown()
test_exceptions_raised()
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -385,6 +385,8 @@ def get_gpu_ids(as_str=False):
Returns:
A list of GPU IDs.
"""
worker = global_worker
worker.check_connected()
# TODO(ilr) Handle inserting resources in local mode
all_resource_ids = global_worker.core_worker.resource_ids()
@ -427,6 +429,9 @@ def get_resource_ids():
each pair consists of the ID of a resource and the fraction of that
resource reserved for this worker.
"""
worker = global_worker
worker.check_connected()
if _mode() == LOCAL_MODE:
raise RuntimeError("ray.get_resource_ids() currently does not work in "
"local_mode.")
@ -442,8 +447,8 @@ def get_webui_url():
Returns:
The URL of the web UI as a string.
"""
if _global_node is None:
raise RuntimeError("Ray has not been initialized/connected.")
worker = global_worker
worker.check_connected()
return _global_node.webui_url
@ -1630,6 +1635,7 @@ def wait(object_refs, num_returns=1, timeout=None):
IDs.
"""
worker = global_worker
worker.check_connected()
if hasattr(worker,
"core_worker") and worker.core_worker.current_actor_is_asyncio(
@ -1701,6 +1707,9 @@ def get_actor(name):
Raises:
ValueError if the named actor does not exist.
"""
worker = global_worker
worker.check_connected()
return ray.util.named_actors._get_actor(name)
@ -1722,11 +1731,11 @@ def kill(actor, no_restart=True):
no_restart (bool): Whether or not this actor should be restarted if
it's a restartable actor.
"""
worker = global_worker
worker.check_connected()
if not isinstance(actor, ray.actor.ActorHandle):
raise ValueError("ray.kill() only supported for actors. "
f"Got: {type(actor)}.")
worker = ray.worker.global_worker
worker.check_connected()
"Got: {}.".format(type(actor)))
worker.core_worker.kill_actor(actor._ray_actor_id, no_restart)