[client] Add support for protocol (ray://, local://, custom://) to ray.init (#16946)

This commit is contained in:
Chris K. W 2021-07-14 21:45:46 -07:00 committed by GitHub
parent ac54164e73
commit bd9d7bbbaa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 277 additions and 24 deletions

View file

@ -434,7 +434,7 @@ Runtime Environments (Experimental)
On Mac OS and Linux, Ray 1.4+ supports dynamically setting the runtime environment of tasks, actors, and jobs so that they can depend on different Python libraries (e.g., conda environments, pip dependencies) while all running on the same Ray cluster.
The ``runtime_env`` is a (JSON-serializable) dictionary that can be passed as an option to tasks and actors, and can also be passed to ``ray.init()`` and ``ray.client().connect()``.
The ``runtime_env`` is a (JSON-serializable) dictionary that can be passed as an option to tasks and actors, and can also be passed to ``ray.init()``.
The runtime environment defines the dependencies required for your workload.
You can specify a runtime environment for your whole job using ``ray.init()`` or Ray Client...
@ -449,7 +449,8 @@ You can specify a runtime environment for your whole job using ``ray.init()`` or
.. code-block:: python
ray.client("localhost:10001").env(runtime_env).connect()
# Using Ray Client
ray.init("ray://localhost:10001", runtime_env=runtime_env)
...or specify per-actor or per-task in the ``@ray.remote()`` decorator or by using ``.options()``:

View file

@ -89,11 +89,11 @@ Deploying an application
------------------------
The recommended way of connecting to a Ray cluster is to use the
``ray.client(...).connect()`` API and connect via the Ray Client.
``ray.init("ray://<host>:<port>")`` API and connect via the Ray Client.
.. note::
Using ``ray.client(...).connect()`` is generally a best practice because it allows
Using ``ray.init("ray://<host>:<port>")`` is generally a best practice because it allows
you to test your code locally, and deploy to a cluster with **no code
changes**.

View file

@ -8,7 +8,7 @@ Ray Client
The Ray Client is an API that connects a python script to a Ray cluster. Effectively, it allows you to leverage a remote Ray cluster just like you would with Ray running on your local machine.
By changing ``ray.init()`` to ``ray.client(...).connect()``, you can connect to a remote cluster and scale out your Ray code, while maintaining the ability to develop interactively in a python shell.
By changing ``ray.init()`` to ``ray.init("ray://<host>:<port>")``, you can connect to a remote cluster and scale out your Ray code, while maintaining the ability to develop interactively in a python shell.
.. code-block:: python
@ -17,7 +17,7 @@ By changing ``ray.init()`` to ``ray.client(...).connect()``, you can connect to
import ray
# Starting the Ray client. This connects to a remote Ray cluster.
ray.client("<head_node_host>:10001").connect()
ray.init("ray://<head_node_host>:10001").connect()
# Normal Ray code follows
@ray.remote
@ -28,6 +28,26 @@ By changing ``ray.init()`` to ``ray.client(...).connect()``, you can connect to
#....
You can also connect using the ClientBuilder API, but this will eventually be deprecated.
.. code-block:: python
# You can run this code outside of the Ray cluster!
import ray
# Starting the Ray client. This connects to a remote Ray cluster.
# `ray.client` will be deprecated in future releases, so we recommend
# using `ray.init("ray://<head_node_host>:10001")` instead.
ray.client("<head_node_host>:10001").connect()
# Normal Ray code follows
@ray.remote
def do_work(x):
return x ** x
do_work.remote(2)
#....
How do you use the Ray client?
------------------------------
@ -62,7 +82,7 @@ Now, connect to the Ray Cluster with the following and then use Ray like you nor
import ray
# replace with the appropriate host and port
ray.client("<head_node_host>:10001").connect()
ray.init("ray://<head_node_host>:10001")
# Normal Ray code follows
@ray.remote

View file

@ -18,7 +18,7 @@ There are three ways of starting the Ray runtime:
* Explicitly via CLI (:ref:`start-ray-cli`)
* Explicitly via the cluster launcher (:ref:`start-ray-up`)
You can also connect to an existing Ray runtime using the `Ray Client <ray-client.html>`
You can also connect to an existing Ray runtime using the `Ray Client <cluster/ray-client.html>`__
.. _start-ray-init:

View file

@ -1,12 +1,15 @@
import os
import importlib
import json
import logging
from dataclasses import dataclass
import sys
from typing import Any, Dict, Optional, Tuple
from ray.ray_constants import RAY_ADDRESS_ENVIRONMENT_VARIABLE
from ray.ray_constants import (RAY_ADDRESS_ENVIRONMENT_VARIABLE,
RAY_NAMESPACE_ENVIRONMENT_VARIABLE,
RAY_RUNTIME_ENV_ENVIRONMENT_VARIABLE)
from ray.job_config import JobConfig
import ray.util.client_connect
@ -63,6 +66,7 @@ class ClientBuilder:
def __init__(self, address: Optional[str]) -> None:
self.address = address
self._job_config = JobConfig()
self._fill_defaults_from_env()
def env(self, env: Dict[str, Any]) -> "ClientBuilder":
"""
@ -105,14 +109,87 @@ class ClientBuilder:
protocol_version=client_info_dict["protocol_version"],
_num_clients=client_info_dict["num_clients"])
def _fill_defaults_from_env(self):
# Check environment variables for default values
namespace_env_var = os.environ.get(RAY_NAMESPACE_ENVIRONMENT_VARIABLE)
if namespace_env_var and self._job_config.ray_namespace is None:
self.namespace(namespace_env_var)
runtime_env_var = os.environ.get(RAY_RUNTIME_ENV_ENVIRONMENT_VARIABLE)
if runtime_env_var and self._job_config.runtime_env is None:
self.env(json.loads(runtime_env_var))
def _init_args(self, **kwargs) -> "ClientBuilder":
"""
When a client builder is constructed through ray.init, for example
`ray.init(ray://..., namespace=...)`, all of the
arguments passed into ray.init are passed again into this method.
Custom client builders can override this method to do their own
handling/validation of arguments.
"""
# Use namespace and runtime_env from ray.init call
if kwargs.get("namespace") is not None:
self.namespace(kwargs["namespace"])
del kwargs["namespace"]
if kwargs.get("runtime_env") is not None:
self.env(kwargs["runtime_env"])
del kwargs["runtime_env"]
if not kwargs:
return self
unknown = ", ".join(kwargs)
raise RuntimeError(
f"Unexpected keyword argument(s) for Ray Client: {unknown}")
class _LocalClientBuilder(ClientBuilder):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._init_args_dict = {}
self._internal_config = {}
def _init_args(self, **kwargs) -> "ClientBuilder":
for argument in kwargs:
if argument.startswith("_"):
raise RuntimeError(
f"Unexpected keyword argument: {argument}. Unstable "
"parameters using the local client should be passed "
"through the `internal_config` dict, e.g. ray.init"
"(local://..., internal_config={{argument: value}}")
if kwargs.get("job_config") is not None:
self._job_config = kwargs.pop("job_config")
if kwargs.get("namespace") is not None:
self.namespace(kwargs.pop("namespace"))
if kwargs.get("runtime_env") is not None:
self.env(kwargs.pop("runtime_env"))
self._internal_config = kwargs.pop("internal_config", {})
# Prefix internal_config names with underscores
arg_names = list(self._internal_config)
for name in arg_names:
if name.startswith("_"):
stripped_name = name.lstrip("_")
raise RuntimeError(
f"Found internal_config argument "
f"`{name}`. Arguments passed in internal_config should "
"not include the underscore prefix. Use "
f"`{stripped_name}` instead.")
self._internal_config["_" + name] = self._internal_config[name]
del self._internal_config[name]
self._fill_defaults_from_env()
self._init_args_dict = kwargs
return self
def connect(self) -> ClientContext:
"""
Begin a connection to the address passed in via ray.client(...).
Begin a connection to the address passed in via ray.client(...) or
ray.init("local://...")
"""
connection_dict = ray.init(
address=self.address, job_config=self._job_config)
address=self.address,
job_config=self._job_config,
**self._init_args_dict,
**self._internal_config)
return ClientContext(
dashboard_url=connection_dict["webui_url"],
python_version="{}.{}.{}".format(
@ -152,6 +229,8 @@ def _get_builder_from_address(address: Optional[str]) -> ClientBuilder:
pass
return _LocalClientBuilder(address)
module_string, inner_address = _split_address(address)
if module_string == "local":
return _LocalClientBuilder(inner_address or None)
try:
module = importlib.import_module(module_string)
except Exception:

View file

@ -49,6 +49,8 @@ REQUIRE_SHM_SIZE_THRESHOLD = 10**10
DEFAULT_PORT = 6379
RAY_ADDRESS_ENVIRONMENT_VARIABLE = "RAY_ADDRESS"
RAY_NAMESPACE_ENVIRONMENT_VARIABLE = "RAY_NAMESPACE"
RAY_RUNTIME_ENV_ENVIRONMENT_VARIABLE = "RAY_RUNTIME_ENV"
DEFAULT_DASHBOARD_IP = "127.0.0.1"
DEFAULT_DASHBOARD_PORT = 8265

View file

@ -64,6 +64,7 @@ py_test_module_list(
"test_multinode_failures_2.py",
"test_multiprocessing.py",
"test_output.py",
"test_ray_init.py",
"test_reconstruction.py",
"test_reference_counting.py",
"test_resource_demand_scheduler.py",
@ -109,7 +110,6 @@ py_test_module_list(
"test_numba.py",
"test_queue.py",
"test_ray_debugger.py",
"test_ray_init.py",
"test_unhandled_error.py",
"test_top_level_api.py",
"test_list_actors.py",

View file

@ -3,9 +3,11 @@ import sys
import pytest
import redis
import unittest.mock
import ray
import ray._private.services
from ray.util.client.ray_client_helpers import ray_start_client_server
from ray.client_builder import ClientContext
from ray.cluster_utils import Cluster
@ -145,6 +147,94 @@ def test_ray_init_from_workers(ray_start_cluster):
assert node_info.node_manager_port == node2.node_manager_port
def test_ray_init_local(shutdown_only):
with ray.init("local://", dashboard_port=22222) as context:
assert context.dashboard_url.split(":")[-1] == "22222"
def test_ray_init_namespace(shutdown_only):
with ray.init("local://", namespace="abcdefg"):
assert ray.get_runtime_context().namespace == "abcdefg"
def test_ray_init_invalid_keyword(shutdown_only):
with pytest.raises(RuntimeError) as excinfo:
ray.init("localhost", logginglevel="<- missing underscore")
assert "logginglevel" in str(excinfo.value)
def test_ray_init_invalid_keyword_with_client(shutdown_only):
with pytest.raises(RuntimeError) as excinfo:
ray.init("ray://127.0.0.0", logginglevel="<- missing underscore")
assert "logginglevel" in str(excinfo.value)
def test_ray_init_valid_keyword_with_client(shutdown_only):
with pytest.raises(RuntimeError) as excinfo:
# num_cpus is a valid argument for regular ray.init, but not for
# init(ray://)
ray.init("ray://127.0.0.0", num_cpus=1)
assert "num_cpus" in str(excinfo.value)
def test_ray_init_local_with_unstable_parameter(shutdown_only):
with pytest.raises(RuntimeError) as excinfo:
# _redis_password is a valid init argument, but should be passed as
# internal_config={"_redis_password": "1234"} for local.
ray.init("local://", _redis_password="1234")
assert "_redis_password" in str(excinfo.value)
with pytest.raises(RuntimeError) as excinfo:
# Passing an invalid unstable parameter through internal_config
# should error
ray.init("local://", internal_config={"asdfasd": "1234"})
assert "asdfasd" in str(excinfo.value)
with pytest.raises(RuntimeError) as excinfo:
# Error if internal_config has valid parameter but with underscore
# still included
ray.init("local://", internal_config={"_node_ip_address": "0.0.0.0"})
assert "_node_ip_address" in str(excinfo.value)
# Make sure local:// works when unstables passed correctly
ray.init("local://", internal_config={"node_ip_address": "0.0.0.0"})
def test_env_var_override():
with unittest.mock.patch.dict(os.environ, {"RAY_NAMESPACE": "envName"}), \
ray_start_client_server() as given_connection:
given_connection.disconnect()
with ray.init("ray://localhost:50051"):
assert ray.get_runtime_context().namespace == "envName"
def test_env_var_no_override():
# init() argument has precedence over environment variables
with unittest.mock.patch.dict(os.environ, {"RAY_NAMESPACE": "envName"}), \
ray_start_client_server() as given_connection:
given_connection.disconnect()
with ray.init("ray://localhost:50051", namespace="argumentName"):
assert ray.get_runtime_context().namespace == "argumentName"
@pytest.mark.parametrize("input", [None, "auto"])
def test_ray_address(input, call_ray_start):
address = call_ray_start
with unittest.mock.patch.dict(os.environ, {"RAY_ADDRESS": address}):
res = ray.init(input)
# Ensure this is not a client.connect()
assert not isinstance(res, ClientContext)
ray.shutdown()
addr = "localhost:{}".format(address.split(":")[-1])
with unittest.mock.patch.dict(os.environ, {"RAY_ADDRESS": addr}):
res = ray.init(input)
# Ensure this is not a client.connect()
assert not isinstance(res, ClientContext)
if __name__ == "__main__":
import pytest
import sys

View file

@ -28,7 +28,7 @@ import ray.ray_constants as ray_constants
import ray.remote_function
import ray.serialization as serialization
import ray._private.services as services
import ray._private.runtime_env as runtime_env
import ray._private.runtime_env as runtime_env_pkg
import ray._private.import_thread as import_thread
from ray.util.tracing.tracing_helper import import_from_string
import ray
@ -573,6 +573,8 @@ def init(
logging_format=ray_constants.LOGGER_FORMAT,
log_to_driver=True,
namespace=None,
runtime_env=None,
internal_config=None,
# The following are unstable parameters and their use is discouraged.
_enable_object_reconstruction=False,
_redis_max_memory=None,
@ -585,7 +587,8 @@ def init(
_lru_evict=False,
_metrics_export_port=None,
_system_config=None,
_tracing_startup_hook=None):
_tracing_startup_hook=None,
**kwargs):
"""
Connect to an existing Ray cluster or start one and connect to it.
@ -620,6 +623,10 @@ def init(
specify a specific node address. If the environment variable
`RAY_ADDRESS` is defined and the address is None or "auto", Ray
will set `address` to `RAY_ADDRESS`.
Addresses can be prefixed with a protocol to connect using Ray
Client. For example, passing in the address
"ray://123.45.67.89:50005" will connect to the cluster at the
given address using the Ray client.
num_cpus (int): Number of CPUs the user wishes to assign to each
raylet. By default, this is set based on virtual cores.
num_gpus (int): Number of GPUs the user wishes to assign to each
@ -656,6 +663,13 @@ def init(
is true.
log_to_driver (bool): If true, the output from all of the worker
processes on all nodes will be directed to the driver.
namespace (str): Namespace to use
runtime_env (dict): The runtime environment to use
internal_config (dict): Dictionary mapping names of a unstable
parameters to values, e.g. {"redis_password": "1234"}. This is
only used for initializing a local client (ray.init(local://...)).
Values in this dictionary will be used to configure the local
cluster. Parameter names should exclude the underscore prefix.
_enable_object_reconstruction (bool): If True, when an object stored in
the distributed plasma store is lost due to node failure, Ray will
attempt to reconstruct the object by re-executing the task that
@ -685,13 +699,60 @@ def init(
and the API is subject to change.
Returns:
Address information about the started processes.
If the provided address included a protocol (e.g. "ray://1.2.3.4")
then a ClientContext is returned with information such as settings,
server versions for ray and python, and the dashboard_url.
Otherwise, returns address information about the started processes.
Raises:
Exception: An exception is raised if an inappropriate combination of
arguments is passed in.
"""
# If available, use RAY_ADDRESS to override if the address was left
# unspecified, or set to "auto" in the call to init
address_env_var = os.environ.get(
ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE)
if address_env_var:
if address is None or address == "auto":
address = address_env_var
logger.info(
f"Using address {address_env_var} set in the environment "
f"variable {ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE}")
if address is not None and "://" in address:
# Address specified a protocol, use ray client
builder = ray.client(address)
# Forward any keyword arguments that were changed from their default
# values to the builder
init_sig = inspect.signature(init)
passed_kwargs = {}
for argument_name, param_obj in init_sig.parameters.items():
if argument_name in {"kwargs", "address"}:
# kwargs and address are handled separately
continue
default_value = param_obj.default
passed_value = locals()[argument_name]
if passed_value != default_value:
# passed value is different than default, pass to the client
# builder
passed_kwargs[argument_name] = passed_value
passed_kwargs.update(kwargs)
builder._init_args(**passed_kwargs)
return builder.connect()
if kwargs:
# User passed in extra keyword arguments but isn't connecting through
# ray client. Raise an error, since most likely a typo in keyword
unknown = ", ".join(kwargs)
raise RuntimeError(f"Unknown keyword argument(s): {unknown}")
if internal_config is not None:
# Should only be used with local client
raise RuntimeError("`internal_config` should only be used with "
"ray.init(local://...)")
# Try to increase the file descriptor limit, which is too low by
# default for Ray: https://github.com/ray-project/ray/issues/11239
try:
@ -717,11 +778,11 @@ def init(
logger.debug("Could not import resource module (on Windows)")
pass
address_env_var = os.environ.get(
ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE)
if address_env_var:
if address is None or address == "auto":
address = address_env_var
if runtime_env:
if job_config is None:
job_config = ray.job_config.JobConfig()
job_config.set_runtime_env(runtime_env)
# Convert hostnames to numerical IP address.
if _node_ip_address is not None:
node_ip_address = services.address_to_ip(_node_ip_address)
@ -850,7 +911,7 @@ def init(
if driver_mode == SCRIPT_MODE and job_config:
# Rewrite the URI. Note the package isn't uploaded to the URI until
# later in the connect
runtime_env.rewrite_runtime_env_uris(job_config)
runtime_env_pkg.rewrite_runtime_env_uris(job_config)
connect(
_global_node,
@ -1278,14 +1339,14 @@ def connect(node,
# environment here. If it's ray client, the environmen will be prepared
# at the server side.
if mode == SCRIPT_MODE and not job_config.client_job:
runtime_env.upload_runtime_env_package_if_needed(job_config)
runtime_env_pkg.upload_runtime_env_package_if_needed(job_config)
elif mode == WORKER_MODE:
# TODO(ekl) get rid of the env var hack and get runtime env from the
# task spec and/or job config only.
uris = os.environ.get("RAY_PACKAGING_URI")
uris = [uris] if uris else \
worker.core_worker.get_job_config().runtime_env.uris
working_dir = runtime_env.ensure_runtime_env_setup(uris)
working_dir = runtime_env_pkg.ensure_runtime_env_setup(uris)
if working_dir is not None:
os.chdir(working_dir)