ray/rllib/utils/actors.py
Balaji Veeramani 7f1bacc7dc
[CI] Format Python code with Black (#21975)
See #21316 and #21311 for the motivation behind these changes.
2022-01-29 18:41:57 -08:00

271 lines
10 KiB
Python

from collections import defaultdict, deque
import logging
import platform
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type
import ray
from ray.actor import ActorClass, ActorHandle
from ray.rllib.utils.deprecation import Deprecated
logger = logging.getLogger(__name__)
class TaskPool:
"""Helper class for tracking the status of many in-flight actor tasks."""
def __init__(self):
self._tasks = {}
self._objects = {}
self._fetching = deque()
def add(self, worker, all_obj_refs):
if isinstance(all_obj_refs, list):
obj_ref = all_obj_refs[0]
else:
obj_ref = all_obj_refs
self._tasks[obj_ref] = worker
self._objects[obj_ref] = all_obj_refs
def completed(self, blocking_wait=False):
pending = list(self._tasks)
if pending:
ready, _ = ray.wait(pending, num_returns=len(pending), timeout=0)
if not ready and blocking_wait:
ready, _ = ray.wait(pending, num_returns=1, timeout=10.0)
for obj_ref in ready:
yield (self._tasks.pop(obj_ref), self._objects.pop(obj_ref))
def completed_prefetch(self, blocking_wait=False, max_yield=999):
"""Similar to completed but only returns once the object is local.
Assumes obj_ref only is one id."""
for worker, obj_ref in self.completed(blocking_wait=blocking_wait):
self._fetching.append((worker, obj_ref))
for _ in range(max_yield):
if not self._fetching:
break
yield self._fetching.popleft()
def reset_workers(self, workers):
"""Notify that some workers may be removed."""
for obj_ref, ev in self._tasks.copy().items():
if ev not in workers:
del self._tasks[obj_ref]
del self._objects[obj_ref]
# We want to keep the same deque reference so that we don't suffer from
# stale references in generators that are still in flight
for _ in range(len(self._fetching)):
ev, obj_ref = self._fetching.popleft()
if ev in workers:
# Re-queue items that are still valid
self._fetching.append((ev, obj_ref))
@property
def count(self):
return len(self._tasks)
def create_colocated_actors(
actor_specs: Sequence[Tuple[Type, Any, Any, int]],
node: Optional[str] = "localhost",
max_attempts: int = 10,
) -> Dict[Type, List[ActorHandle]]:
"""Create co-located actors of any type(s) on any node.
Args:
actor_specs: Tuple/list with tuples consisting of: 1) The
(already @ray.remote) class(es) to construct, 2) c'tor args,
3) c'tor kwargs, and 4) the number of actors of that class with
given args/kwargs to construct.
node: The node to co-locate the actors on. By default ("localhost"),
place the actors on the node the caller of this function is
located on. Use None for indicating that any (resource fulfilling)
node in the cluster may be used.
max_attempts: The maximum number of co-location attempts to
perform before throwing an error.
Returns:
A dict mapping the created types to the list of n ActorHandles
created (and co-located) for that type.
"""
if node == "localhost":
node = platform.node()
# Maps each entry in `actor_specs` to lists of already co-located actors.
ok = [[] for _ in range(len(actor_specs))]
# Try n times to co-locate all given actor types (`actor_specs`).
# With each (failed) attempt, increase the number of actors we try to
# create (on the same node), then kill the ones that have been created in
# excess.
for attempt in range(max_attempts):
# If any attempt to co-locate fails, set this to False and we'll do
# another attempt.
all_good = True
# Process all `actor_specs` in sequence.
for i, (typ, args, kwargs, count) in enumerate(actor_specs):
args = args or [] # Allow None.
kwargs = kwargs or {} # Allow None.
# We don't have enough actors yet of this spec co-located on
# the desired node.
if len(ok[i]) < count:
co_located = try_create_colocated(
cls=typ,
args=args,
kwargs=kwargs,
count=count * (attempt + 1),
node=node,
)
# If node did not matter (None), from here on, use the host
# that the first actor(s) are already co-located on.
if node is None:
node = ray.get(co_located[0].get_host.remote())
# Add the newly co-located actors to the `ok` list.
ok[i].extend(co_located)
# If we still don't have enough -> We'll have to do another
# attempt.
if len(ok[i]) < count:
all_good = False
# We created too many actors for this spec -> Kill/truncate
# the excess ones.
if len(ok[i]) > count:
for a in ok[i][count:]:
a.__ray_terminate__.remote()
ok[i] = ok[i][:count]
# All `actor_specs` have been fulfilled, return lists of
# co-located actors.
if all_good:
return ok
raise Exception("Unable to create enough colocated actors -> aborting.")
def try_create_colocated(
cls: Type[ActorClass],
args: List[Any],
count: int,
kwargs: Optional[List[Any]] = None,
node: Optional[str] = "localhost",
) -> List[ActorHandle]:
"""Tries to co-locate (same node) a set of Actors of the same type.
Returns a list of successfully co-located actors. All actors that could
not be co-located (with the others on the given node) will not be in this
list.
Creates each actor via it's remote() constructor and then checks, whether
it has been co-located (on the same node) with the other (already created)
ones. If not, terminates the just created actor.
Args:
cls: The Actor class to use (already @ray.remote "converted").
args: List of args to pass to the Actor's constructor. One item
per to-be-created actor (`count`).
count: Number of actors of the given `cls` to construct.
kwargs: Optional list of kwargs to pass to the Actor's constructor.
One item per to-be-created actor (`count`).
node: The node to co-locate the actors on. By default ("localhost"),
place the actors on the node the caller of this function is
located on. If None, will try to co-locate all actors on
any available node.
Returns:
List containing all successfully co-located actor handles.
"""
if node == "localhost":
node = platform.node()
kwargs = kwargs or {}
actors = [cls.remote(*args, **kwargs) for _ in range(count)]
co_located, non_co_located = split_colocated(actors, node=node)
logger.info("Got {} colocated actors of {}".format(len(co_located), count))
for a in non_co_located:
a.__ray_terminate__.remote()
return co_located
def split_colocated(
actors: List[ActorHandle],
node: Optional[str] = "localhost",
) -> Tuple[List[ActorHandle], List[ActorHandle]]:
"""Splits up given actors into colocated (on same node) and non colocated.
The co-location criterion depends on the `node` given:
If given (or default: platform.node()): Consider all actors that are on
that node "colocated".
If None: Consider the largest sub-set of actors that are all located on
the same node (whatever that node is) as "colocated".
Args:
actors: The list of actor handles to split into "colocated" and
"non colocated".
node: The node defining "colocation" criterion. If provided, consider
thos actors "colocated" that sit on this node. If None, use the
largest subset within `actors` that are sitting on the same
(any) node.
Returns:
Tuple of two lists: 1) Co-located ActorHandles, 2) non co-located
ActorHandles.
"""
if node == "localhost":
node = platform.node()
# Get nodes of all created actors.
hosts = ray.get([a.get_host.remote() for a in actors])
# If `node` not provided, use the largest group of actors that sit on the
# same node, regardless of what that node is.
if node is None:
node_groups = defaultdict(set)
for host, actor in zip(hosts, actors):
node_groups[host].add(actor)
max_ = -1
largest_group = None
for host in node_groups:
if max_ < len(node_groups[host]):
max_ = len(node_groups[host])
largest_group = host
non_co_located = []
for host in node_groups:
if host != largest_group:
non_co_located.extend(list(node_groups[host]))
return list(node_groups[largest_group]), non_co_located
# Node provided (or default: localhost): Consider those actors "colocated"
# that were placed on `node`.
else:
# Split into co-located (on `node) and non-co-located (not on `node`).
co_located = []
non_co_located = []
for host, a in zip(hosts, actors):
# This actor has been placed on the correct node.
if host == node:
co_located.append(a)
# This actor has been placed on a different node.
else:
non_co_located.append(a)
return co_located, non_co_located
@Deprecated(new="create_colocated_actors", error=False)
def create_colocated(cls, arg, count):
kwargs = {}
args = arg
return create_colocated_actors(
actor_specs=[(cls, args, kwargs, count)],
node=platform.node(), # force on localhost
)[cls]
@Deprecated(error=False)
def drop_colocated(actors: List[ActorHandle]) -> List[ActorHandle]:
colocated, non_colocated = split_colocated(actors)
for a in colocated:
a.__ray_terminate__.remote()
return non_colocated