Improve actor pool support in Datasets (#22574)

This commit is contained in:
Eric Liang 2022-02-24 12:01:36 -08:00 committed by GitHub
parent 02cb974c6c
commit 533a0440a6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 71 additions and 33 deletions

View file

@ -152,11 +152,14 @@ since your map function can return an output batch of any size.
# -> [{'value': 0}, {'value': 2}, ...]
By default, transformations are executed using Ray tasks.
For transformations that require setup, specify ``compute="actors"`` and Ray will use an autoscaling actor pool to execute your transforms instead.
For transformations that require setup, specify ``compute=ray.data.ActorPoolStrategy(min, max)`` and Ray will use an autoscaling actor pool of ``min`` to ``max`` actors to execute your transforms.
For a fixed-size actor pool, specify ``ActorPoolStrategy(n, n)``.
The following is an end-to-end example of reading, transforming, and saving batch inference results using Ray Data:
.. code-block:: python
from ray.data import ActorPoolStrategy
# Example of GPU batch inference on an ImageNet model.
def preprocess(image: bytes) -> bytes:
return image
@ -175,7 +178,9 @@ The following is an end-to-end example of reading, transforming, and saving batc
# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1)
ds = ds.map_batches(
BatchInferModel, compute=ActorPoolStrategy(10, 20),
batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s]
# Save the results.

View file

@ -23,6 +23,7 @@ from ray.data.read_api import (
from ray.data.datasource import Datasource, ReadTask
from ray.data.dataset import Dataset
from ray.data.impl.progress_bar import set_progress_bars
from ray.data.impl.compute import ActorPoolStrategy
# Module-level cached global functions (for impl/compute). It cannot be defined
# in impl/compute since it has to be process-global across cloudpickled funcs.
@ -30,6 +31,7 @@ _cached_fn = None
_cached_cls = None
__all__ = [
"ActorPoolStrategy",
"Dataset",
"Datasource",
"ReadTask",

View file

@ -154,9 +154,9 @@ class Dataset(Generic[T]):
... return self.model(batch)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> ds.map(CachedModel, compute="actors", num_gpus=1)
>>> # compute=ActorPoolStrategy(2, 8) the transform will be applied on an
>>> # autoscaling pool of 2-8 Ray actors, each allocated 1 GPU by Ray.
>>> ds.map(CachedModel, compute=ActorPoolStrategy(2, 8), num_gpus=1)
Time complexity: O(dataset size / parallelism)
@ -164,7 +164,7 @@ class Dataset(Generic[T]):
fn: The function to apply to each record, or a class type
that can be instantiated to create such a callable.
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or "actors" to use an autoscaling Ray actor pool.
tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
"""
@ -215,11 +215,11 @@ class Dataset(Generic[T]):
... return self.model(item)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute="actors", the transform will be applied on an
>>> # autoscaling pool of Ray actors, each allocated 1 GPU by Ray.
>>> # compute=ActorPoolStrategy(2, 8) the transform will be applied on an
>>> # autoscaling pool of 2-8 Ray actors, each allocated 1 GPU by Ray.
>>> ds.map_batches(
... CachedModel,
... batch_size=256, compute="actors", num_gpus=1)
... batch_size=256, compute=ActorPoolStrategy(2, 8), num_gpus=1)
Time complexity: O(dataset size / parallelism)
@ -229,7 +229,7 @@ class Dataset(Generic[T]):
batch_size: Request a specific batch size, or None to use entire
blocks as batches. Defaults to a system-chosen batch size.
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or "actors" to use an autoscaling Ray actor pool.
tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
batch_format: Specify "native" to use the native block format
(promotes Arrow to pandas), "pandas" to select
``pandas.DataFrame`` as the batch format,
@ -330,7 +330,7 @@ class Dataset(Generic[T]):
fn: Map function generating the column values given a batch of
records in pandas format.
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or "actors" to use an autoscaling Ray actor pool.
tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
"""
@ -367,7 +367,7 @@ class Dataset(Generic[T]):
fn: The function to apply to each record, or a class type
that can be instantiated to create such a callable.
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or "actors" to use an autoscaling Ray actor pool.
tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
"""
@ -414,7 +414,7 @@ class Dataset(Generic[T]):
fn: The predicate to apply to each record, or a class type
that can be instantiated to create such a callable.
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or "actors" to use an autoscaling Ray actor pool.
tasks, or ActorPoolStrategy(min, max) to use an autoscaling actor pool.
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
"""

View file

@ -1,6 +1,7 @@
from typing import TypeVar, Any, Union, Callable, List, Tuple
from typing import TypeVar, Any, Union, Callable, List, Tuple, Optional
import ray
from ray.util.annotations import PublicAPI
from ray.data.block import (
Block,
BlockAccessor,
@ -22,7 +23,7 @@ CallableClass = type
class ComputeStrategy:
def apply(self, fn: Any, blocks: BlockList, clear_input_blocks: bool) -> BlockList:
def _apply(self, fn: Any, blocks: BlockList, clear_input_blocks: bool) -> BlockList:
raise NotImplementedError
@ -58,8 +59,8 @@ def _map_block_nosplit(
)
class TaskPool(ComputeStrategy):
def apply(
class TaskPoolStrategy(ComputeStrategy):
def _apply(
self,
fn: Any,
remote_args: dict,
@ -121,21 +122,33 @@ class TaskPool(ComputeStrategy):
return BlockList(list(new_blocks), list(new_metadata))
class ActorPool(ComputeStrategy):
def __init__(self):
self.workers = []
@PublicAPI
class ActorPoolStrategy(ComputeStrategy):
"""Specify the compute strategy for a Dataset transform.
def __del__(self):
for w in self.workers:
w.__ray_terminate__.remote()
ActorPool specifies that an autoscaling pool of actors should be used for a given
Dataset transform. This is useful for stateful setup of callable classes.
def apply(
To autoscale from ``m`` to ``n`` actors, specify ``compute=ActorPool(m, n)``.
For a fixed-sized pool of size ``n``, specify ``compute=ActorPool(n, n)``.
"""
def __init__(self, min_size: int = 1, max_size: Optional[int] = None):
if min_size < 1:
raise ValueError("min_size must be > 1", min_size)
if max_size is not None and min_size > max_size:
raise ValueError("min_size must be <= max_size", min_size, max_size)
self.min_size = min_size
self.max_size = max_size or float("inf")
def _apply(
self,
fn: Any,
remote_args: dict,
block_list: BlockList,
clear_input_blocks: bool,
) -> BlockList:
"""Note: this is not part of the Dataset public API."""
context = DatasetContext.get_current()
blocks_in = block_list.get_blocks_with_metadata()
@ -168,8 +181,8 @@ class ActorPool(ComputeStrategy):
BlockWorker = ray.remote(**remote_args)(BlockWorker)
self.workers = [BlockWorker.remote()]
tasks = {w.ready.remote(): w for w in self.workers}
workers = [BlockWorker.remote() for _ in range(self.min_size)]
tasks = {w.ready.remote(): w for w in workers}
metadata_mapping = {}
ready_workers = set()
@ -178,13 +191,16 @@ class ActorPool(ComputeStrategy):
list(tasks), timeout=0.01, num_returns=1, fetch_local=False
)
if not ready:
if len(ready_workers) / len(self.workers) > 0.8:
if (
len(workers) < self.max_size
and len(ready_workers) / len(workers) > 0.8
):
w = BlockWorker.remote()
self.workers.append(w)
workers.append(w)
tasks[w.ready.remote()] = w
map_bar.set_description(
"Map Progress ({} actors {} pending)".format(
len(ready_workers), len(self.workers) - len(ready_workers)
len(ready_workers), len(workers) - len(ready_workers)
)
)
continue
@ -199,6 +215,11 @@ class ActorPool(ComputeStrategy):
map_bar.update(1)
else:
ready_workers.add(worker)
map_bar.set_description(
"Map Progress ({} actors {} pending)".format(
len(ready_workers), len(workers) - len(ready_workers)
)
)
# Schedule a new task.
if blocks_in:
@ -253,10 +274,10 @@ def cache_wrapper(
def get_compute(compute_spec: Union[str, ComputeStrategy]) -> ComputeStrategy:
if not compute_spec or compute_spec == "tasks":
return TaskPool()
return TaskPoolStrategy()
elif compute_spec == "actors":
return ActorPool()
return ActorPoolStrategy()
elif isinstance(compute_spec, ComputeStrategy):
return compute_spec
else:
raise ValueError("compute must be one of [`tasks`, `actors`]")
raise ValueError("compute must be one of [`tasks`, `actors`, ComputeStrategy]")

View file

@ -284,7 +284,7 @@ class OneToOneStage(Stage):
self, blocks: BlockList, clear_input_blocks: bool
) -> Tuple[BlockList, dict]:
compute = get_compute(self.compute)
blocks = compute.apply(
blocks = compute._apply(
self.block_fn, self.ray_remote_args, blocks, clear_input_blocks
)
assert isinstance(blocks, BlockList), blocks

View file

@ -81,6 +81,16 @@ def test_basic_actors(shutdown_only, pipelined):
range(1, n + 1)
)
# Should still work even if num actors > num cpus.
ds = ray.data.range(n)
ds = maybe_pipeline(ds, pipelined)
assert sorted(
ds.map(lambda x: x + 1, compute=ray.data.ActorPoolStrategy(4, 4)).take()
) == list(range(1, n + 1))
with pytest.raises(ValueError):
ray.data.range(10).map(lambda x: x, compute=ray.data.ActorPoolStrategy(8, 4))
@pytest.mark.parametrize("pipelined", [False, True])
def test_avoid_placement_group_capture(shutdown_only, pipelined):