Revert "Support creating a DatasetPipeline windowed by bytes (#22577)" (#22695)

This reverts commit b5b4460932.
This commit is contained in:
SangBin Cho 2022-03-01 04:56:12 +09:00 committed by GitHub
parent 82daf2b041
commit ba4f1423c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 2 additions and 158 deletions

View file

@ -106,16 +106,6 @@ Tune the throughput vs latency of your pipeline with the ``blocks_per_window`` s
.. image:: images/dataset-pipeline-3.svg
You can also specify the size of each window using ``bytes_per_window``. In this mode, Datasets will determine the size of each window based on the target byte size, giving each window at least 1 block but not otherwise exceeding the target bytes per window. This mode can be useful to limit the memory usage of a pipeline. As a rule of thumb, the cluster memory should be at least 2-5x the window size to avoid spilling.
.. code-block:: python
# Create a DatasetPipeline with up to 10GB of data per window.
pipe: DatasetPipeline = ray.data \
.read_binary_files("s3://bucket/image-dir") \
.window(bytes_per_window=10e9)
# -> INFO -- Created DatasetPipeline with 73 windows: 9120MiB min, 9431MiB max, 9287MiB mean
.. _dataset-pipeline-per-epoch-shuffle:
Per-Epoch Shuffle Pipeline

View file

@ -2575,12 +2575,7 @@ Dict[str, List[str]]]): The names of the columns
"Use .window(blocks_per_window=n) instead of " ".pipeline(parallelism=n)"
)
def window(
self,
*,
blocks_per_window: Optional[int] = None,
bytes_per_window: Optional[int] = None,
) -> "DatasetPipeline[T]":
def window(self, *, blocks_per_window: int = 10) -> "DatasetPipeline[T]":
"""Convert this into a DatasetPipeline by windowing over data blocks.
Transformations prior to the call to ``window()`` are evaluated in
@ -2626,19 +2621,9 @@ Dict[str, List[str]]]): The names of the columns
increases the latency to initial output, since it decreases the
length of the pipeline. Setting this to infinity effectively
disables pipelining.
bytes_per_window: Specify the window size in bytes instead of blocks.
This will be treated as an upper bound for the window size, but each
window will still include at least one block. This is mutually
exclusive with ``blocks_per_window``.
"""
from ray.data.dataset_pipeline import DatasetPipeline
if blocks_per_window is not None and bytes_per_window is not None:
raise ValueError("Only one windowing scheme can be specified.")
if blocks_per_window is None:
blocks_per_window = 10
# If optimizations are enabled, rewrite the read stage into a OneToOneStage
# to enable fusion with downstream map stages.
ctx = DatasetContext.get_current()
@ -2671,37 +2656,7 @@ Dict[str, List[str]]]): The names of the columns
class Iterable:
def __init__(self, blocks, epoch):
if bytes_per_window:
self._splits = blocks.split_by_bytes(bytes_per_window)
else:
self._splits = blocks.split(split_size=blocks_per_window)
try:
sizes = [s.size_bytes() for s in self._splits]
assert [s > 0 for s in sizes], sizes
def fmt(size_bytes):
if size_bytes > 10 * 1024:
return "{}MiB".format(round(size_bytes / (1024 * 1024), 2))
else:
return "{}b".format(size_bytes)
logger.info(
"Created DatasetPipeline with {} windows: "
"{} min, {} max, {} mean".format(
len(self._splits),
fmt(min(sizes)),
fmt(max(sizes)),
fmt(int(np.mean(sizes))),
)
)
except Exception as e:
logger.info(
"Created DatasetPipeline with {} windows; "
"error getting sizes: {}".format(
len(self._splits),
e,
)
)
self._splits = blocks.split(split_size=blocks_per_window)
self._epoch = epoch
def __iter__(self):

View file

@ -65,48 +65,6 @@ class BlockList:
output.append(BlockList(b.tolist(), m.tolist()))
return output
def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
"""Split this BlockList into multiple lists.
Args:
bytes_per_split: The max number of bytes per split.
"""
self._check_if_cleared()
output = []
cur_blocks = []
cur_meta = []
cur_size = 0
for b, m in zip(self._blocks, self._metadata):
if m.size_bytes is None:
raise RuntimeError(
"Block has unknown size, cannot use split_by_bytes()"
)
size = m.size_bytes
if cur_blocks and cur_size + size > bytes_per_split:
output.append(BlockList(cur_blocks, cur_meta))
cur_blocks = []
cur_meta = []
cur_size = 0
cur_blocks.append(b)
cur_meta.append(m)
cur_size += size
if cur_blocks:
output.append(BlockList(cur_blocks, cur_meta))
return output
def size_bytes(self) -> int:
"""Returns the total size in bytes of the blocks, or -1 if not known."""
size = 0
has_size = False
for m in self.get_metadata():
if m.size_bytes is not None:
has_size = True
size += m.size_bytes
if not has_size:
return -1
else:
return size
def divide(self, block_idx: int) -> ("BlockList", "BlockList"):
"""Divide into two BlockLists by the given block index.

View file

@ -67,30 +67,6 @@ class LazyBlockList(BlockList):
output.append(LazyBlockList(c.tolist(), m.tolist(), b.tolist()))
return output
# Note: does not force execution prior to splitting.
def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
self._check_if_cleared()
output = []
cur_calls, cur_meta, cur_blocks = [], [], []
cur_size = 0
for c, m, b in zip(self._calls, self._metadata, self._block_partitions):
if m.size_bytes is None:
raise RuntimeError(
"Block has unknown size, cannot use split_by_bytes()"
)
size = m.size_bytes
if cur_blocks and cur_size + size > bytes_per_split:
output.append(LazyBlockList(cur_calls, cur_meta, cur_blocks))
cur_calls, cur_meta, cur_blocks = [], [], []
cur_size = 0
cur_calls.append(c)
cur_meta.append(m)
cur_blocks.append(b)
cur_size += size
if cur_blocks:
output.append(LazyBlockList(cur_calls, cur_meta, cur_blocks))
return output
# Note: does not force execution prior to division.
def divide(self, part_idx: int) -> ("LazyBlockList", "LazyBlockList"):
left = LazyBlockList(

View file

@ -38,41 +38,6 @@ def test_incremental_take(shutdown_only):
assert pipe.take(1) == [0]
def test_window_by_bytes(ray_start_regular_shared):
with pytest.raises(ValueError):
ray.data.range_arrow(10).window(blocks_per_window=2, bytes_per_window=2)
pipe = ray.data.range_arrow(10000000, parallelism=100).window(blocks_per_window=2)
assert str(pipe) == "DatasetPipeline(num_windows=50, num_stages=1)"
pipe = ray.data.range_arrow(10000000, parallelism=100).window(
bytes_per_window=10 * 1024 * 1024
)
assert str(pipe) == "DatasetPipeline(num_windows=8, num_stages=1)"
dss = list(pipe.iter_datasets())
assert len(dss) == 8, dss
for ds in dss[:-1]:
assert ds.num_blocks() in [12, 13]
pipe = ray.data.range_arrow(10000000, parallelism=100).window(bytes_per_window=1)
assert str(pipe) == "DatasetPipeline(num_windows=100, num_stages=1)"
for ds in pipe.iter_datasets():
assert ds.num_blocks() == 1
pipe = ray.data.range_arrow(10000000, parallelism=100).window(bytes_per_window=1e9)
assert str(pipe) == "DatasetPipeline(num_windows=1, num_stages=1)"
for ds in pipe.iter_datasets():
assert ds.num_blocks() == 100
# Test creating from non-lazy BlockList.
pipe = (
ray.data.range_arrow(10000000, parallelism=100)
.map_batches(lambda x: x)
.window(bytes_per_window=10 * 1024 * 1024)
)
assert str(pipe) == "DatasetPipeline(num_windows=8, num_stages=1)"
def test_epoch(ray_start_regular_shared):
# Test dataset repeat.
pipe = ray.data.range(5).map(lambda x: x * 2).repeat(3).map(lambda x: x * 2)