Automatically create custom node id resource (#5882)

* node id

* comment

* comments

* fix tests
This commit is contained in:
Eric Liang 2019-10-15 21:31:11 -07:00 committed by GitHub
parent c52bb0621d
commit 6843a01a7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 73 additions and 1 deletions

View file

@ -13,6 +13,10 @@ import ray.ray_constants as ray_constants
logger = logging.getLogger(__name__)
# Prefix for the node id resource that is automatically added to each node.
# For example, a node may have id `node:172.23.42.1`.
NODE_ID_PREFIX = "node:"
class ResourceSpec(
namedtuple("ResourceSpec", [
@ -127,6 +131,10 @@ class ResourceSpec(
assert "memory" not in resources, resources
assert "object_store_memory" not in resources, resources
# Automatically create a node id resource on each node. This is
# queryable with ray.state.node_ids() and ray.state.current_node_id().
resources[NODE_ID_PREFIX + ray.services.get_node_ip_address()] = 1.0
num_cpus = self.num_cpus
if num_cpus is None:
num_cpus = multiprocessing.cpu_count()

View file

@ -1083,6 +1083,38 @@ def nodes():
return state.client_table()
def current_node_id():
"""Return the node id of the current node.
For example, "node:172.10.5.34". This can be used as a custom resource,
e.g., {node_id: 1} to reserve the whole node, or {node_id: 0.001} to
just force placement on the node.
Returns:
Id of the current node.
"""
return ray.resource_spec.NODE_ID_PREFIX + ray.services.get_node_ip_address(
)
def node_ids():
"""Get a list of the node ids in the cluster.
For example, ["node:172.10.5.34", "node:172.42.3.77"]. These can be used
as custom resources, e.g., {node_id: 1} to reserve the whole node, or
{node_id: 0.001} to just force placement on the node.
Returns:
List of the node resource ids.
"""
node_ids = []
for node in nodes():
for k, v in node["Resources"].items():
if k.startswith(ray.resource_spec.NODE_ID_PREFIX):
node_ids.append(k)
return node_ids
def tasks(task_id=None):
"""Fetch and parse the task table information for one or more task IDs.

View file

@ -2290,6 +2290,26 @@ def test_custom_resources(ray_start_cluster):
ray.get([h.remote() for _ in range(5)])
def test_node_id_resource(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=3)
cluster.add_node(num_cpus=3)
ray.init(address=cluster.address)
local_node = ray.state.current_node_id()
# Note that these will have the same IP in the test cluster
assert len(ray.state.node_ids()) == 2
assert local_node in ray.state.node_ids()
@ray.remote(resources={local_node: 1})
def f():
return ray.state.current_node_id()
# Check the node id resource is automatically usable for scheduling.
assert ray.get(f.remote()) == ray.state.current_node_id()
def test_two_custom_resources(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(
@ -2393,6 +2413,9 @@ def test_zero_capacity_deletion_semantics(shutdown_only):
del resources["memory"]
del resources["object_store_memory"]
for key in list(resources.keys()):
if key.startswith("node:"):
del resources[key]
while resources and retry_count < MAX_RETRY_ATTEMPTS:
time.sleep(0.1)
@ -2401,7 +2424,7 @@ def test_zero_capacity_deletion_semantics(shutdown_only):
if retry_count >= MAX_RETRY_ATTEMPTS:
raise RuntimeError(
"Resources were available even after five retries.")
"Resources were available even after five retries.", resources)
return resources

View file

@ -82,6 +82,12 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=10):
del resource_usage[2]["memory"]
if "object_store_memory" in resource_usage[2]:
del resource_usage[2]["object_store_memory"]
for key in list(resource_usage[1].keys()):
if key.startswith("node:"):
del resource_usage[1][key]
for key in list(resource_usage[2].keys()):
if key.startswith("node:"):
del resource_usage[2][key]
if expected_resource_usage is None:
if all(x for x in resource_usage[1:]):

View file

@ -496,6 +496,7 @@ class RayTrialExecutor(TrialExecutor):
self._committed_resources.get_res_total(name),
self._avail_resources.get_res_total(name), name)
for name in self._avail_resources.custom_resources
if not name.startswith(ray.resource_spec.NODE_ID_PREFIX)
])
if customs:
status += " ({})".format(customs)

View file

@ -9,6 +9,7 @@ from numbers import Number
# For compatibility under py2 to consider unicode as str
from six import string_types
import ray
from ray.tune import TuneError
logger = logging.getLogger(__name__)
@ -111,6 +112,7 @@ class Resources(
custom_summary = ", ".join([
"{} {}".format(self.get_res_total(res), res)
for res in self.custom_resources
if not res.startswith(ray.resource_spec.NODE_ID_PREFIX)
])
if custom_summary:
summary += " ({})".format(custom_summary)