mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
271 lines
10 KiB
Python
271 lines
10 KiB
Python
from collections import defaultdict, deque
|
|
import logging
|
|
import platform
|
|
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type
|
|
|
|
import ray
|
|
from ray.actor import ActorClass, ActorHandle
|
|
from ray.rllib.utils.deprecation import Deprecated
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TaskPool:
|
|
"""Helper class for tracking the status of many in-flight actor tasks."""
|
|
|
|
def __init__(self):
|
|
self._tasks = {}
|
|
self._objects = {}
|
|
self._fetching = deque()
|
|
|
|
def add(self, worker, all_obj_refs):
|
|
if isinstance(all_obj_refs, list):
|
|
obj_ref = all_obj_refs[0]
|
|
else:
|
|
obj_ref = all_obj_refs
|
|
self._tasks[obj_ref] = worker
|
|
self._objects[obj_ref] = all_obj_refs
|
|
|
|
def completed(self, blocking_wait=False):
|
|
pending = list(self._tasks)
|
|
if pending:
|
|
ready, _ = ray.wait(pending, num_returns=len(pending), timeout=0)
|
|
if not ready and blocking_wait:
|
|
ready, _ = ray.wait(pending, num_returns=1, timeout=10.0)
|
|
for obj_ref in ready:
|
|
yield (self._tasks.pop(obj_ref), self._objects.pop(obj_ref))
|
|
|
|
def completed_prefetch(self, blocking_wait=False, max_yield=999):
|
|
"""Similar to completed but only returns once the object is local.
|
|
|
|
Assumes obj_ref only is one id."""
|
|
|
|
for worker, obj_ref in self.completed(blocking_wait=blocking_wait):
|
|
self._fetching.append((worker, obj_ref))
|
|
|
|
for _ in range(max_yield):
|
|
if not self._fetching:
|
|
break
|
|
|
|
yield self._fetching.popleft()
|
|
|
|
def reset_workers(self, workers):
|
|
"""Notify that some workers may be removed."""
|
|
for obj_ref, ev in self._tasks.copy().items():
|
|
if ev not in workers:
|
|
del self._tasks[obj_ref]
|
|
del self._objects[obj_ref]
|
|
|
|
# We want to keep the same deque reference so that we don't suffer from
|
|
# stale references in generators that are still in flight
|
|
for _ in range(len(self._fetching)):
|
|
ev, obj_ref = self._fetching.popleft()
|
|
if ev in workers:
|
|
# Re-queue items that are still valid
|
|
self._fetching.append((ev, obj_ref))
|
|
|
|
@property
|
|
def count(self):
|
|
return len(self._tasks)
|
|
|
|
|
|
def create_colocated_actors(
|
|
actor_specs: Sequence[Tuple[Type, Any, Any, int]],
|
|
node: Optional[str] = "localhost",
|
|
max_attempts: int = 10,
|
|
) -> Dict[Type, List[ActorHandle]]:
|
|
"""Create co-located actors of any type(s) on any node.
|
|
|
|
Args:
|
|
actor_specs: Tuple/list with tuples consisting of: 1) The
|
|
(already @ray.remote) class(es) to construct, 2) c'tor args,
|
|
3) c'tor kwargs, and 4) the number of actors of that class with
|
|
given args/kwargs to construct.
|
|
node: The node to co-locate the actors on. By default ("localhost"),
|
|
place the actors on the node the caller of this function is
|
|
located on. Use None for indicating that any (resource fulfilling)
|
|
node in the cluster may be used.
|
|
max_attempts: The maximum number of co-location attempts to
|
|
perform before throwing an error.
|
|
|
|
Returns:
|
|
A dict mapping the created types to the list of n ActorHandles
|
|
created (and co-located) for that type.
|
|
"""
|
|
if node == "localhost":
|
|
node = platform.node()
|
|
|
|
# Maps each entry in `actor_specs` to lists of already co-located actors.
|
|
ok = [[] for _ in range(len(actor_specs))]
|
|
|
|
# Try n times to co-locate all given actor types (`actor_specs`).
|
|
# With each (failed) attempt, increase the number of actors we try to
|
|
# create (on the same node), then kill the ones that have been created in
|
|
# excess.
|
|
for attempt in range(max_attempts):
|
|
# If any attempt to co-locate fails, set this to False and we'll do
|
|
# another attempt.
|
|
all_good = True
|
|
# Process all `actor_specs` in sequence.
|
|
for i, (typ, args, kwargs, count) in enumerate(actor_specs):
|
|
args = args or [] # Allow None.
|
|
kwargs = kwargs or {} # Allow None.
|
|
# We don't have enough actors yet of this spec co-located on
|
|
# the desired node.
|
|
if len(ok[i]) < count:
|
|
co_located = try_create_colocated(
|
|
cls=typ,
|
|
args=args,
|
|
kwargs=kwargs,
|
|
count=count * (attempt + 1),
|
|
node=node,
|
|
)
|
|
# If node did not matter (None), from here on, use the host
|
|
# that the first actor(s) are already co-located on.
|
|
if node is None:
|
|
node = ray.get(co_located[0].get_host.remote())
|
|
# Add the newly co-located actors to the `ok` list.
|
|
ok[i].extend(co_located)
|
|
# If we still don't have enough -> We'll have to do another
|
|
# attempt.
|
|
if len(ok[i]) < count:
|
|
all_good = False
|
|
# We created too many actors for this spec -> Kill/truncate
|
|
# the excess ones.
|
|
if len(ok[i]) > count:
|
|
for a in ok[i][count:]:
|
|
a.__ray_terminate__.remote()
|
|
ok[i] = ok[i][:count]
|
|
|
|
# All `actor_specs` have been fulfilled, return lists of
|
|
# co-located actors.
|
|
if all_good:
|
|
return ok
|
|
|
|
raise Exception("Unable to create enough colocated actors -> aborting.")
|
|
|
|
|
|
def try_create_colocated(
|
|
cls: Type[ActorClass],
|
|
args: List[Any],
|
|
count: int,
|
|
kwargs: Optional[List[Any]] = None,
|
|
node: Optional[str] = "localhost",
|
|
) -> List[ActorHandle]:
|
|
"""Tries to co-locate (same node) a set of Actors of the same type.
|
|
|
|
Returns a list of successfully co-located actors. All actors that could
|
|
not be co-located (with the others on the given node) will not be in this
|
|
list.
|
|
|
|
Creates each actor via it's remote() constructor and then checks, whether
|
|
it has been co-located (on the same node) with the other (already created)
|
|
ones. If not, terminates the just created actor.
|
|
|
|
Args:
|
|
cls: The Actor class to use (already @ray.remote "converted").
|
|
args: List of args to pass to the Actor's constructor. One item
|
|
per to-be-created actor (`count`).
|
|
count: Number of actors of the given `cls` to construct.
|
|
kwargs: Optional list of kwargs to pass to the Actor's constructor.
|
|
One item per to-be-created actor (`count`).
|
|
node: The node to co-locate the actors on. By default ("localhost"),
|
|
place the actors on the node the caller of this function is
|
|
located on. If None, will try to co-locate all actors on
|
|
any available node.
|
|
|
|
Returns:
|
|
List containing all successfully co-located actor handles.
|
|
"""
|
|
if node == "localhost":
|
|
node = platform.node()
|
|
|
|
kwargs = kwargs or {}
|
|
actors = [cls.remote(*args, **kwargs) for _ in range(count)]
|
|
co_located, non_co_located = split_colocated(actors, node=node)
|
|
logger.info("Got {} colocated actors of {}".format(len(co_located), count))
|
|
for a in non_co_located:
|
|
a.__ray_terminate__.remote()
|
|
return co_located
|
|
|
|
|
|
def split_colocated(
|
|
actors: List[ActorHandle],
|
|
node: Optional[str] = "localhost",
|
|
) -> Tuple[List[ActorHandle], List[ActorHandle]]:
|
|
"""Splits up given actors into colocated (on same node) and non colocated.
|
|
|
|
The co-location criterion depends on the `node` given:
|
|
If given (or default: platform.node()): Consider all actors that are on
|
|
that node "colocated".
|
|
If None: Consider the largest sub-set of actors that are all located on
|
|
the same node (whatever that node is) as "colocated".
|
|
|
|
Args:
|
|
actors: The list of actor handles to split into "colocated" and
|
|
"non colocated".
|
|
node: The node defining "colocation" criterion. If provided, consider
|
|
thos actors "colocated" that sit on this node. If None, use the
|
|
largest subset within `actors` that are sitting on the same
|
|
(any) node.
|
|
|
|
Returns:
|
|
Tuple of two lists: 1) Co-located ActorHandles, 2) non co-located
|
|
ActorHandles.
|
|
"""
|
|
if node == "localhost":
|
|
node = platform.node()
|
|
|
|
# Get nodes of all created actors.
|
|
hosts = ray.get([a.get_host.remote() for a in actors])
|
|
|
|
# If `node` not provided, use the largest group of actors that sit on the
|
|
# same node, regardless of what that node is.
|
|
if node is None:
|
|
node_groups = defaultdict(set)
|
|
for host, actor in zip(hosts, actors):
|
|
node_groups[host].add(actor)
|
|
max_ = -1
|
|
largest_group = None
|
|
for host in node_groups:
|
|
if max_ < len(node_groups[host]):
|
|
max_ = len(node_groups[host])
|
|
largest_group = host
|
|
non_co_located = []
|
|
for host in node_groups:
|
|
if host != largest_group:
|
|
non_co_located.extend(list(node_groups[host]))
|
|
return list(node_groups[largest_group]), non_co_located
|
|
# Node provided (or default: localhost): Consider those actors "colocated"
|
|
# that were placed on `node`.
|
|
else:
|
|
# Split into co-located (on `node) and non-co-located (not on `node`).
|
|
co_located = []
|
|
non_co_located = []
|
|
for host, a in zip(hosts, actors):
|
|
# This actor has been placed on the correct node.
|
|
if host == node:
|
|
co_located.append(a)
|
|
# This actor has been placed on a different node.
|
|
else:
|
|
non_co_located.append(a)
|
|
return co_located, non_co_located
|
|
|
|
|
|
@Deprecated(new="create_colocated_actors", error=False)
|
|
def create_colocated(cls, arg, count):
|
|
kwargs = {}
|
|
args = arg
|
|
|
|
return create_colocated_actors(
|
|
actor_specs=[(cls, args, kwargs, count)],
|
|
node=platform.node(), # force on localhost
|
|
)[cls]
|
|
|
|
|
|
@Deprecated(error=False)
|
|
def drop_colocated(actors: List[ActorHandle]) -> List[ActorHandle]:
|
|
colocated, non_colocated = split_colocated(actors)
|
|
for a in colocated:
|
|
a.__ray_terminate__.remote()
|
|
return non_colocated
|