[Datasets] [Out-of-Band Serialization: 2/3] Refactor ExecutionPlan to maintain complete lineage and eagerly unlink block references. (#23931)

This PR refactors ExecutionPlan to maintain complete stage lineage, even for eagerly computed datasets, while ensuring that block references are unlinked as early as possible in order to more eagerly release block memory. This PR is the final precursor to adding the actual out-of-band serialization APIs (PR 3/3).

The fully lineage has to be maintained, even for eagerly computed datasets, since the lineage is needed for out-of-band serialization of datasets.
This commit is contained in:
Clark Zinzow 2022-04-22 16:07:24 -07:00 committed by GitHub
parent 73ed67e9e6
commit 9ee24530ab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 490 additions and 252 deletions

View file

@ -2606,22 +2606,22 @@ List[str]]]): The names of the columns to use as the features. Can be a list of
to repeat indefinitely.
"""
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.impl.plan import _rewrite_read_stage
# If optimizations are enabled, rewrite the read stage into a OneToOneStage
# to enable fusion with downstream map stages.
ctx = DatasetContext.get_current()
if self._plan._is_read_stage() and ctx.optimize_fuse_read_stages:
self._plan._in_blocks.clear()
blocks, read_stage = self._plan._rewrite_read_stage()
outer_stats = DatasetStats(stages={}, parent=None)
if self._plan.is_read_stage() and ctx.optimize_fuse_read_stages:
blocks, _ = self._plan._get_source_blocks()
blocks.clear()
blocks, outer_stats, read_stage = _rewrite_read_stage(blocks)
else:
blocks = self._plan.execute()
read_stage = None
outer_stats = self._plan.stats()
read_stage = None
uuid = self._get_uuid()
outer_stats.dataset_uuid = uuid
if times is not None and times < 1:
raise ValueError("`times` must be >= 1, got {}".format(times))
uuid = self._get_uuid()
class Iterator:
def __init__(self, blocks):
@ -2719,6 +2719,7 @@ List[str]]]): The names of the columns to use as the features. Can be a list of
exclusive with ``blocks_per_window``.
"""
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.impl.plan import _rewrite_read_stage
if blocks_per_window is not None and bytes_per_window is not None:
raise ValueError("Only one windowing scheme can be specified.")
@ -2726,17 +2727,15 @@ List[str]]]): The names of the columns to use as the features. Can be a list of
if blocks_per_window is None:
blocks_per_window = 10
# If optimizations are enabled, rewrite the read stage into a OneToOneStage
# to enable fusion with downstream map stages.
ctx = DatasetContext.get_current()
if self._plan._is_read_stage() and ctx.optimize_fuse_read_stages:
self._plan._in_blocks.clear()
blocks, read_stage = self._plan._rewrite_read_stage()
outer_stats = DatasetStats(stages={}, parent=None)
if self._plan.is_read_stage() and ctx.optimize_fuse_read_stages:
blocks, _ = self._plan._get_source_blocks()
blocks.clear()
blocks, outer_stats, read_stage = _rewrite_read_stage(blocks)
else:
blocks = self._plan.execute()
read_stage = None
outer_stats = self._plan.stats()
read_stage = None
class Iterator:
def __init__(self, splits, epoch):
@ -2814,19 +2813,9 @@ List[str]]]): The names of the columns to use as the features. Can be a list of
Returns:
A Dataset with all blocks fully materialized in memory.
"""
blocks, metadata = [], []
for b, m in self._plan.execute().get_blocks_with_metadata():
blocks.append(b)
metadata.append(m)
ds = Dataset(
ExecutionPlan(
BlockList(blocks, metadata),
self._plan.stats(),
dataset_uuid=self._get_uuid(),
),
self._epoch,
lazy=False,
)
plan = self._plan.deep_copy(preserve_uuid=True)
plan.execute(force_read=True)
ds = Dataset(plan, self._epoch, lazy=False)
ds._set_uuid(self._get_uuid())
return ds

View file

@ -752,16 +752,21 @@ class DatasetPipeline(Generic[T]):
self._optimized_stages = self._stages
return
# This dummy dataset will be used to get a set of optimized stages.
dummy_ds = Dataset(
ExecutionPlan(BlockList([], []), DatasetStats(stages={}, parent=None)),
0,
True,
)
# Apply all pipeline operations to the dummy dataset.
for stage in self._stages:
dummy_ds = stage(dummy_ds)
dummy_ds._plan._optimize()
# Get the optimized stages.
_, _, stages = dummy_ds._plan._optimize()
# Apply these optimized stages to the datasets underlying the pipeline.
# These optimized stages will be executed by the PipelineExecutor.
optimized_stages = []
for stage in dummy_ds._plan._stages:
for stage in stages:
optimized_stages.append(
lambda ds, stage=stage: Dataset(
ds._plan.with_stage(stage), ds._epoch, True

View file

@ -261,6 +261,11 @@ class LazyBlockList(BlockList):
self._cached_metadata = metadata
return block_refs, metadata
def compute_to_blocklist(self) -> BlockList:
"""Launch all tasks and return a concrete BlockList."""
blocks, metadata = self._get_blocks_with_metadata()
return BlockList(blocks, metadata)
def compute_first_block(self):
"""Kick off computation for the first block in the list.

View file

@ -1,4 +1,14 @@
from typing import Callable, Tuple, Optional, Union, Iterable, Iterator, TYPE_CHECKING
import copy
from typing import (
Callable,
List,
Tuple,
Optional,
Union,
Iterator,
Iterable,
TYPE_CHECKING,
)
import uuid
if TYPE_CHECKING:
@ -16,217 +26,6 @@ from ray.data.impl.lazy_block_list import LazyBlockList
INHERITABLE_REMOTE_ARGS = ["scheduling_strategy"]
class ExecutionPlan:
"""A lazy execution plan for a Dataset."""
def __init__(self, in_blocks: BlockList, stats: DatasetStats, dataset_uuid=None):
"""Create a plan with no transformation stages.
Args:
in_blocks: Base list of blocks.
stats: Stats for the base blocks.
"""
self._in_blocks = in_blocks
self._out_blocks = None
self._in_stats = stats
self._out_stats = None
self._stages = []
self._dataset_uuid = dataset_uuid or uuid.uuid4().hex
if not stats.dataset_uuid:
stats.dataset_uuid = self._dataset_uuid
def with_stage(self, stage: "Stage") -> "ExecutionPlan":
"""Return a copy of this plan with the given stage appended.
Args:
stage: The stage to append.
Returns:
A new ExecutionPlan with this stage appended.
"""
if self._out_blocks:
copy = ExecutionPlan(self._out_blocks, self._out_stats)
copy._stages = [stage]
else:
copy = ExecutionPlan(self._in_blocks, self._in_stats)
copy._stages = self._stages.copy()
copy._stages.append(stage)
return copy
def initial_num_blocks(self) -> int:
"""Get the estimated number of blocks after applying all plan stages."""
if self._out_blocks:
return self._out_blocks.initial_num_blocks()
for stage in self._stages[::-1]:
if stage.num_blocks is not None:
return stage.num_blocks
return self._in_blocks.initial_num_blocks()
def schema(
self, fetch_if_missing: bool = False
) -> Union[type, "pyarrow.lib.Schema"]:
"""Get the schema after applying all plan stages.
Args:
fetch_if_missing: Whether to execute the plan to fetch the schema.
Returns:
The schema of the output dataset.
"""
if self._stages:
if fetch_if_missing:
self.execute()
blocks = self._out_blocks
else:
blocks = self._in_blocks
if blocks:
# Don't force fetching in case it's a lazy block list, in which case we
# don't want to trigger full execution for a schema read. If we want to
# trigger execution to get schema, we'll trigger read tasks progressively
# until a viable schema is available, below.
metadata = blocks.get_metadata(fetch_if_missing=False)
else:
metadata = []
# Some blocks could be empty, in which case we cannot get their schema.
# TODO(ekl) validate schema is the same across different blocks.
for m in metadata:
if m.schema is not None and (m.num_rows is None or m.num_rows > 0):
return m.schema
if not fetch_if_missing:
return None
# Synchronously fetch the schema.
# For lazy block lists, this launches read tasks and fetches block metadata
# until we find valid block schema.
for _, m in blocks.iter_blocks_with_metadata():
if m.schema is not None and (m.num_rows is None or m.num_rows > 0):
return m.schema
return None
def meta_count(self) -> Optional[int]:
"""Get the number of rows after applying all plan stages if possible.
This method will never trigger any computation.
Returns:
The number of records of the result Dataset, or None.
"""
if self._stages:
blocks = self._out_blocks
else:
blocks = self._in_blocks
metadata = blocks.get_metadata() if blocks else None
if metadata and all(m.num_rows is not None for m in metadata):
return sum(m.num_rows for m in metadata)
else:
return None
def execute(self, clear_input_blocks: bool = True) -> BlockList:
"""Execute this plan.
Args:
clear_input_blocks: Whether to assume ownership of the input blocks,
allowing them to be dropped from memory during execution.
Returns:
The blocks of the output dataset.
"""
if self._out_blocks is None:
self._optimize()
blocks = self._in_blocks
stats = self._in_stats
for stage in self._stages:
stats_builder = stats.child_builder(stage.name)
blocks, stage_info = stage(blocks, clear_input_blocks)
if stage_info:
stats = stats_builder.build_multistage(stage_info)
else:
stats = stats_builder.build(blocks)
stats.dataset_uuid = uuid.uuid4().hex
self._out_blocks = blocks
self._out_stats = stats
self._out_stats.dataset_uuid = self._dataset_uuid
return self._out_blocks
def clear(self) -> None:
"""Clear all cached block references of this plan, including input blocks.
This will render the plan un-executable unless the root is a LazyBlockList."""
self._in_blocks.clear()
self._out_blocks = None
self._out_stats = None
def stats(self) -> DatasetStats:
"""Return stats for this plan, forcing execution if needed."""
self.execute()
return self._out_stats
def _optimize(self) -> None:
"""Apply stage fusion optimizations, updating this plan."""
context = DatasetContext.get_current()
if context.optimize_fuse_stages:
if context.optimize_fuse_read_stages:
self._rewrite_read_stages()
self._fuse_one_to_one_stages()
def _rewrite_read_stages(self) -> None:
"""Rewrites read stages into one-to-one stages."""
if self._stages and self._has_read_stage():
block_list, stage = self._rewrite_read_stage()
self._in_blocks = block_list
self._in_stats = DatasetStats(stages={}, parent=None)
self._stages.insert(0, stage)
def _has_read_stage(self) -> bool:
"""Whether this plan has a read stage for its input."""
return isinstance(self._in_blocks, LazyBlockList)
def _is_read_stage(self) -> bool:
"""Whether this plan is a bare read stage."""
return self._has_read_stage() and not self._stages
def _rewrite_read_stage(self) -> Tuple[BlockList, "Stage"]:
"""Rewrite the read stage to a OneToOne stage over read tasks as input.
For example, suppose the plan was [Read -> MapBatches(Fn)]. These stages cannot
be fused, since read stages are handled specially.
After rewriting to [GetReadTasks -> MapBatches(DoRead) -> MapBatches(Fn)],
now we can fuse the latter two MapBatches stages into a single OneToOne stage:
[GetReadTasks -> MapBatches(DoRead -> Fn)].
"""
# Generate the "GetReadTasks" stage blocks.
remote_args = self._in_blocks._remote_args
blocks = []
metadata = []
for read_task in self._in_blocks._tasks:
blocks.append(ray.put(read_task._read_fn))
metadata.append(read_task.get_metadata())
block_list = BlockList(blocks, metadata)
def block_fn(read_fn: Callable[[], Iterator[Block]]) -> Iterator[Block]:
for block in read_fn():
yield block
return block_list, OneToOneStage("read", block_fn, "tasks", remote_args)
def _fuse_one_to_one_stages(self) -> None:
"""Fuses compatible one-to-one stages."""
optimized_stages = []
prev_stage = None
for stage in self._stages:
if prev_stage is None:
prev_stage = stage
elif stage.can_fuse(prev_stage):
prev_stage = stage.fuse(prev_stage)
else:
optimized_stages.append(prev_stage)
prev_stage = stage
if prev_stage:
optimized_stages.append(prev_stage)
prev_stage = None
self._stages = optimized_stages
class Stage:
"""Represents a Dataset transform stage (e.g., map or shuffle)."""
@ -249,6 +48,296 @@ class Stage:
raise NotImplementedError
class ExecutionPlan:
"""A lazy execution plan for a Dataset."""
# Implementation Notes:
#
# This lazy execution plan takes in an input block list and builds up a chain of
# BlockList --> BlockList stages. When execution is triggered, it tries to fuse
# together stages in order to reduce Ray task overhead and data copies.
#
# Internally, the execution plan holds two block lists:
# * _in_blocks: The (possibly lazy) input block list.
# * _snapshot_blocks: A snapshot of a computed block list, where this snapshot
# is the cached output of executing some prefix in the stage chain.
#
# The stages in this execution plan are partitioned into two subchains: before the
# snapshot and after the snapshot. When the snapshot exists from a previous
# execution, any future executions will only have to execute the "after the
# snapshot" subchain, using the snapshot as the input to that subchain.
def __init__(self, in_blocks: BlockList, stats: DatasetStats, dataset_uuid=None):
"""Create a plan with no transformation stages.
Args:
in_blocks: Base list of blocks.
stats: Stats for the base blocks.
dataset_uuid: Dataset's UUID.
"""
self._in_blocks = in_blocks
self._in_stats = stats
# A computed snapshot of some prefix of stages.
self._snapshot_blocks = None
self._snapshot_stats = None
# Chains of stages.
self._stages_before_snapshot = []
self._stages_after_snapshot = []
# Cache of optimized stages.
self._last_optimized_stages = None
self._dataset_uuid = dataset_uuid or uuid.uuid4().hex
if not stats.dataset_uuid:
stats.dataset_uuid = self._dataset_uuid
def with_stage(self, stage: "Stage") -> "ExecutionPlan":
"""Return a copy of this plan with the given stage appended.
Args:
stage: The stage to append.
Returns:
A new ExecutionPlan with this stage appended.
"""
copy = self.copy()
copy._stages_after_snapshot.append(stage)
return copy
def copy(self) -> "ExecutionPlan":
"""Create a shallow copy of this execution plan.
This copy can be executed without mutating the original, but clearing the copy
will also clear the original.
Returns:
A shallow copy of this execution plan.
"""
plan_copy = ExecutionPlan(self._in_blocks, self._in_stats)
if self._snapshot_blocks is not None:
# Copy over the existing snapshot.
plan_copy._snapshot_blocks = self._snapshot_blocks
plan_copy._snapshot_stats = self._snapshot_stats
plan_copy._stages_before_snapshot = self._stages_before_snapshot.copy()
plan_copy._stages_after_snapshot = self._stages_after_snapshot.copy()
return plan_copy
def deep_copy(self, preserve_uuid: bool = False) -> "ExecutionPlan":
"""Create a deep copy of this execution plan.
This copy can be executed AND cleared without mutating the original.
Args:
preserve_uuid: Whether to preserve the original UUID in the copy.
Returns:
A deep copy of this execution plan.
"""
dataset_uuid = None
if preserve_uuid:
dataset_uuid = self._dataset_uuid
in_blocks = self._in_blocks
if isinstance(in_blocks, BlockList):
in_blocks = in_blocks.copy()
plan_copy = ExecutionPlan(
in_blocks, copy.copy(self._in_stats), dataset_uuid=dataset_uuid
)
if self._snapshot_blocks:
# Copy over the existing snapshot.
plan_copy._snapshot_blocks = self._snapshot_blocks.copy()
plan_copy._snapshot_stats = copy.copy(self._snapshot_stats)
plan_copy._stages_before_snapshot = self._stages_before_snapshot.copy()
plan_copy._stages_after_snapshot = self._stages_after_snapshot.copy()
return plan_copy
def initial_num_blocks(self) -> int:
"""Get the estimated number of blocks after applying all plan stages."""
if self.has_computed_output():
return self._snapshot_blocks.initial_num_blocks()
for stage in self._stages_after_snapshot[::-1]:
if stage.num_blocks is not None:
return stage.num_blocks
if self._snapshot_blocks is not None:
return self._snapshot_blocks.initial_num_blocks()
for stage in self._stages_before_snapshot[::-1]:
if stage.num_blocks is not None:
return stage.num_blocks
if self._in_blocks is not None:
return self._in_blocks.initial_num_blocks()
return None
def schema(
self, fetch_if_missing: bool = False
) -> Union[type, "pyarrow.lib.Schema"]:
"""Get the schema after applying all plan stages.
Args:
fetch_if_missing: Whether to execute the plan to fetch the schema.
Returns:
The schema of the output dataset.
"""
if self._stages_after_snapshot:
if fetch_if_missing:
self.execute()
else:
return None
# Snapshot is now guaranteed to be the output of the final stage or None.
blocks = self._snapshot_blocks
if not blocks:
return None
# Don't force fetching in case it's a lazy block list, in which case we
# don't want to trigger full execution for a schema read. If we want to
# trigger execution to get schema, we'll trigger read tasks progressively
# until a viable schema is available, below.
metadata = blocks.get_metadata(fetch_if_missing=False)
# Some blocks could be empty, in which case we cannot get their schema.
# TODO(ekl) validate schema is the same across different blocks.
for m in metadata:
if m.schema is not None and (m.num_rows is None or m.num_rows > 0):
return m.schema
if not fetch_if_missing:
return None
# Synchronously fetch the schema.
# For lazy block lists, this launches read tasks and fetches block metadata
# until we find valid block schema.
for _, m in blocks.iter_blocks_with_metadata():
if m.schema is not None and (m.num_rows is None or m.num_rows > 0):
return m.schema
return None
def meta_count(self) -> Optional[int]:
"""Get the number of rows after applying all plan stages if possible.
This method will never trigger any computation.
Returns:
The number of records of the result Dataset, or None.
"""
if self._stages_after_snapshot:
return None
# Snapshot is now guaranteed to be the output of the final stage or None.
blocks = self._snapshot_blocks
metadata = blocks.get_metadata() if blocks else None
if metadata and all(m.num_rows is not None for m in metadata):
return sum(m.num_rows for m in metadata)
else:
return None
def execute(
self, clear_input_blocks: bool = True, force_read: bool = False
) -> BlockList:
"""Execute this plan.
Args:
clear_input_blocks: Whether to assume ownership of the input blocks,
allowing them to be dropped from memory during execution.
force_read: Whether to force the read stage to fully execute.
Returns:
The blocks of the output dataset.
"""
if not self.has_computed_output():
blocks, stats, stages = self._optimize()
for stage in stages:
stats_builder = stats.child_builder(stage.name)
blocks, stage_info = stage(blocks, clear_input_blocks)
if stage_info:
stats = stats_builder.build_multistage(stage_info)
else:
stats = stats_builder.build(blocks)
stats.dataset_uuid = uuid.uuid4().hex
# Set the snapshot to the output of the final stage.
self._snapshot_blocks = blocks
self._snapshot_stats = stats
self._snapshot_stats.dataset_uuid = self._dataset_uuid
self._stages_before_snapshot += self._stages_after_snapshot
self._stages_after_snapshot = []
if _is_lazy(self._snapshot_blocks) and force_read:
self._snapshot_blocks = self._snapshot_blocks.compute_to_blocklist()
return self._snapshot_blocks
def clear(self) -> None:
"""Clear all cached block references of this plan, including input blocks.
This will render the plan un-executable unless the root is a LazyBlockList."""
self._in_blocks.clear()
self._snapshot_blocks = None
self._snapshot_stats = None
# We're erasing the snapshot, so put all stages into the "after snapshot"
# bucket.
self._stages_after_snapshot = (
self._stages_before_snapshot + self._stages_after_snapshot
)
self._stages_before_snapshot = []
def stats(self) -> DatasetStats:
"""Return stats for this plan, forcing execution if needed."""
self.execute()
return self._snapshot_stats
def _optimize(self) -> Tuple[BlockList, DatasetStats, List[Stage]]:
"""Apply stage fusion optimizations, returning an updated source block list and
associated stats, and a set of optimized stages.
"""
context = DatasetContext.get_current()
blocks, stats = self._get_source_blocks()
stages = self._stages_after_snapshot.copy()
if context.optimize_fuse_stages:
if context.optimize_fuse_read_stages:
# If using a lazy datasource, rewrite read stage into one-to-one stage
# so it can be fused into downstream stages.
blocks, stats, stages = _rewrite_read_stages(
blocks, stats, stages, self._dataset_uuid
)
stages = _fuse_one_to_one_stages(stages)
self._last_optimized_stages = stages
return blocks, stats, stages
def _get_source_blocks(self) -> Tuple[BlockList, DatasetStats]:
"""Get the source blocks (and corresponding stats) for plan execution.
If a computed snapshot exists, return the snapshot blocks and stats; otherwise,
return the input blocks and stats that the plan was created with.
"""
if self._snapshot_blocks is not None:
# If snapshot exists, we only have to execute the plan from the
# snapshot.
blocks = self._snapshot_blocks
stats = self._snapshot_stats
# Unlink the snapshot blocks from the plan so we can eagerly reclaim the
# snapshot block memory after the first stage is done executing.
self._snapshot_blocks = None
else:
# If no snapshot exists, we have to execute the full plan from the
# beginning.
blocks = self._in_blocks
stats = self._in_stats
if not self.has_lazy_input():
# If not a lazy datasource, unlink the input blocks from the plan so we
# can eagerly reclaim the input block memory after the first stage is
# done executing.
self._in_blocks = None
return blocks, stats
def has_lazy_input(self) -> bool:
"""Return whether this plan has lazy input blocks."""
return _is_lazy(self._in_blocks)
def is_read_stage(self) -> bool:
"""Return whether this plan only consists of a read stage."""
return (
self.has_lazy_input()
and not self._stages_before_snapshot
and not self._stages_after_snapshot
)
def has_computed_output(self) -> bool:
"""Whether this plan has a computed snapshot for the final stage, i.e. for the
output of this plan.
"""
return self._snapshot_blocks is not None and not self._stages_after_snapshot
class OneToOneStage(Stage):
"""A stage that transforms blocks independently (e.g., map or filter)."""
@ -269,15 +358,15 @@ class OneToOneStage(Stage):
return False
if prev.compute != self.compute:
return False
for key in INHERITABLE_REMOTE_ARGS:
remote_args = self.ray_remote_args.copy()
if key in prev.ray_remote_args:
remote_args[key] = prev.ray_remote_args[key]
if prev.ray_remote_args != remote_args:
if not _are_remote_args_compatible(prev.ray_remote_args, self.ray_remote_args):
return False
return True
def fuse(self, prev: Stage):
if not self.can_fuse(prev):
raise ValueError(
f"Tried to fuse {prev} with {self}, but these are not fusable."
)
name = prev.name + "->" + self.name
fn1 = prev.block_fn
fn2 = self.block_fn
@ -334,6 +423,10 @@ class AllToAllStage(Stage):
return True
def fuse(self, prev: Stage):
if not self.can_fuse(prev):
raise ValueError(
f"Tried to fuse {prev} with {self}, but these are not fusable."
)
assert self.supports_block_udf
name = prev.name + "->" + self.name
return AllToAllStage(
@ -348,3 +441,93 @@ class AllToAllStage(Stage):
)
assert isinstance(blocks, BlockList), blocks
return blocks, stage_info
def _rewrite_read_stages(
blocks: BlockList,
stats: DatasetStats,
stages: List[Stage],
dataset_uuid: str,
) -> Tuple[BlockList, DatasetStats, List[Stage]]:
"""Rewrites read stages into one-to-one stages, if needed."""
if _is_lazy(blocks) and stages:
blocks, stats, stage = _rewrite_read_stage(blocks)
stats.dataset_uuid = dataset_uuid
stages.insert(0, stage)
return blocks, stats, stages
def _rewrite_read_stage(
in_blocks: LazyBlockList,
) -> Tuple[BlockList, DatasetStats, Stage]:
"""Rewrite the read stage to a OneToOne stage over read tasks as input.
For example, suppose the plan was [Read -> MapBatches(Fn)]. These stages cannot
be fused, since read stages are handled specially.
After rewriting to [GetReadTasks -> MapBatches(DoRead) -> MapBatches(Fn)],
now we can fuse the latter two MapBatches stages into a single OneToOne stage:
[GetReadTasks -> MapBatches(DoRead -> Fn)].
Args:
blocks: Lazy block list representing read stage.
Returns:
Non-lazy block list containing read tasks for not-yet-read block partitions,
new stats for the block list, and the new one-to-one read stage.
"""
# Generate the "GetReadTasks" stage blocks.
remote_args = in_blocks._remote_args
blocks, metadata = [], []
for read_task in in_blocks._tasks:
blocks.append(ray.put(read_task._read_fn))
metadata.append(read_task.get_metadata())
block_list = BlockList(blocks, metadata)
def block_fn(read_fn: Callable[[], Iterator[Block]]) -> Iterator[Block]:
for block in read_fn():
yield block
stage = OneToOneStage("read", block_fn, "tasks", remote_args)
stats = DatasetStats(stages={}, parent=None)
return block_list, stats, stage
def _fuse_one_to_one_stages(stages: List[Stage]) -> List[Stage]:
"""Fuses compatible one-to-one stages.
Args:
stages: Stages to try to fuse.
Returns:
Fused stages.
"""
fused_stages = []
prev_stage = None
for idx, stage in enumerate(stages):
if prev_stage is None:
prev_stage = stage
elif stage.can_fuse(prev_stage):
prev_stage = stage.fuse(prev_stage)
else:
fused_stages.append(prev_stage)
prev_stage = stage
if prev_stage:
fused_stages.append(prev_stage)
prev_stage = None
return fused_stages
def _are_remote_args_compatible(prev_args, next_args):
"""Check if Ray remote arguments are compatible for merging."""
remote_args = next_args.copy()
for key in INHERITABLE_REMOTE_ARGS:
if key in prev_args:
remote_args[key] = prev_args[key]
if prev_args != remote_args:
return False
return True
def _is_lazy(blocks: BlockList) -> bool:
"""Whether the provided block list is lazy."""
return isinstance(blocks, LazyBlockList)

View file

@ -87,11 +87,67 @@ def test_spread_hint_inherit(ray_start_regular_shared):
ds = ray.data.range(10)._experimental_lazy()
ds = ds.map(lambda x: x + 1)
ds = ds.random_shuffle()
for s in ds._plan._stages:
for s in ds._plan._stages_before_snapshot:
assert s.ray_remote_args == {}, s.ray_remote_args
ds._plan._optimize()
assert len(ds._plan._stages) == 1, ds._plan._stages
assert ds._plan._stages[0].ray_remote_args == {"scheduling_strategy": "SPREAD"}
for s in ds._plan._stages_after_snapshot:
assert s.ray_remote_args == {}, s.ray_remote_args
_, _, optimized_stages = ds._plan._optimize()
assert len(optimized_stages) == 1, optimized_stages
assert optimized_stages[0].ray_remote_args == {"scheduling_strategy": "SPREAD"}
def test_execution_preserves_original(ray_start_regular_shared):
ds = ray.data.range(10).map(lambda x: x + 1)._experimental_lazy()
ds1 = ds.map(lambda x: x + 1)
assert ds1._plan._snapshot_blocks is not None
assert len(ds1._plan._stages_after_snapshot) == 1
ds2 = ds1.fully_executed()
# Confirm that snapshot blocks and stages on original lazy dataset have not changed.
assert ds1._plan._snapshot_blocks is not None
assert len(ds1._plan._stages_after_snapshot) == 1
# Confirm that UUID on executed dataset is properly set.
assert ds2._get_uuid() == ds1._get_uuid()
# Check content.
assert ds2.take() == list(range(2, 12))
# Check original lazy dataset content.
assert ds1.take() == list(range(2, 12))
# Check source lazy dataset content.
# TODO(Clark): Uncomment this when we add new block clearing semantics.
# assert ds.take() == list(range(1, 11))
def _assert_has_stages(stages, stage_names):
assert len(stages) == len(stage_names)
for stage, name in zip(stages, stage_names):
assert stage.name == name
def test_stage_linking(ray_start_regular_shared):
# NOTE: This tests the internals of `ExecutionPlan`, which is bad practice. Remove
# this test once we have proper unit testing of `ExecutionPlan`.
# Test eager dataset.
ds = ray.data.range(10)
assert len(ds._plan._stages_before_snapshot) == 0
assert len(ds._plan._stages_after_snapshot) == 0
assert len(ds._plan._last_optimized_stages) == 0
ds = ds.map(lambda x: x + 1)
_assert_has_stages(ds._plan._stages_before_snapshot, ["map"])
assert len(ds._plan._stages_after_snapshot) == 0
_assert_has_stages(ds._plan._last_optimized_stages, ["read->map"])
# Test lazy dataset.
ds = ray.data.range(10)._experimental_lazy()
assert len(ds._plan._stages_before_snapshot) == 0
assert len(ds._plan._stages_after_snapshot) == 0
assert len(ds._plan._last_optimized_stages) == 0
ds = ds.map(lambda x: x + 1)
assert len(ds._plan._stages_before_snapshot) == 0
_assert_has_stages(ds._plan._stages_after_snapshot, ["map"])
assert ds._plan._last_optimized_stages is None
ds = ds.fully_executed()
_assert_has_stages(ds._plan._stages_before_snapshot, ["map"])
assert len(ds._plan._stages_after_snapshot) == 0
_assert_has_stages(ds._plan._last_optimized_stages, ["read->map"])
def test_optimize_fuse(ray_start_regular_shared):
@ -220,7 +276,7 @@ class MySource(CSVDatasource):
def _read_stream(self, f, path: str, **reader_args):
count = self.counter.increment.remote()
ray.get(count)
for block in CSVDatasource._read_stream(self, f, path, **reader_args):
for block in super()._read_stream(f, path, **reader_args):
yield block