Revert "[data] set iter_batches default batch_size #26869 " (#26938)

This reverts commit b048c6f659.
This commit is contained in:
matthewdeng 2022-07-23 17:46:45 -07:00 committed by GitHub
parent da9581b746
commit bcec60d898
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 32 additions and 58 deletions

View file

@ -110,12 +110,12 @@ transformed_ds.show()
@ray.remote
def consume(data) -> int:
num_batches = 0
for batch in data.iter_batches(batch_size=10):
for batch in data.iter_batches():
num_batches += 1
return num_batches
ray.get(consume.remote(ds))
# -> 15
# -> 10
# __data_access_end__
# fmt: on

View file

@ -31,7 +31,7 @@ class DummyTrainer(DataParallelTrainer):
scaling_config: Optional[ScalingConfig] = None,
num_epochs: int = 1,
prefetch_blocks: int = 1,
batch_size: Optional[int] = 4096,
batch_size: Optional[int] = None,
**kwargs
):
if not scaling_config:

View file

@ -360,10 +360,8 @@ class Dataset(Generic[T]):
fn: The function to apply to each record batch, or a class type
that can be instantiated to create such a callable. Callable classes are
only supported for the actor compute strategy.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows.
Defaults to 4096.
batch_size: Request a specific batch size, or None to use entire
blocks as batches. Defaults to a system-chosen batch size.
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or "actors" to use an autoscaling actor pool. If wanting to
configure the min or max size of the autoscaling actor pool, you can
@ -2342,7 +2340,7 @@ class Dataset(Generic[T]):
else "native"
)
for batch in self.iter_batches(
batch_size=None, prefetch_blocks=prefetch_blocks, batch_format=batch_format
prefetch_blocks=prefetch_blocks, batch_format=batch_format
):
batch = BlockAccessor.for_block(batch)
for row in batch.iter_rows():
@ -2352,7 +2350,7 @@ class Dataset(Generic[T]):
self,
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_size: Optional[int] = None,
batch_format: str = "native",
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
@ -2370,10 +2368,7 @@ class Dataset(Generic[T]):
Args:
prefetch_blocks: The number of blocks to prefetch ahead of the
current block during the scan.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
batch_size: Record batch size, or None to let the system pick.
batch_format: The format in which to return each batch.
Specify "native" to use the native block format (promoting
tables to Pandas and tensors to NumPy), "pandas" to select
@ -2413,7 +2408,7 @@ class Dataset(Generic[T]):
self,
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_size: Optional[int] = None,
dtypes: Optional[Union["torch.dtype", Dict[str, "torch.dtype"]]] = None,
device: Optional[str] = None,
drop_last: bool = False,
@ -2443,10 +2438,7 @@ class Dataset(Generic[T]):
Args:
prefetch_blocks: The number of blocks to prefetch ahead of the
current block during the scan.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
batch_size: Record batch size, or None to let the system pick.
dtypes: The Torch dtype(s) for the created tensor(s); if None, the dtype
will be inferred from the tensor data.
device: The device on which the tensor should be placed; if None, the Torch
@ -2487,7 +2479,7 @@ class Dataset(Generic[T]):
self,
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_size: Optional[int] = None,
dtypes: Optional[Union["tf.dtypes.DType", Dict[str, "tf.dtypes.DType"]]] = None,
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
@ -2517,10 +2509,7 @@ class Dataset(Generic[T]):
Args:
prefetch_blocks: The number of blocks to prefetch ahead of the
current block during the scan.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
batch_size: Record batch size, or None to let the system pick.
dtypes: The TensorFlow dtype(s) for the created tensor(s); if None, the
dtype will be inferred from the tensor data.
drop_last: Whether to drop the last batch if it's incomplete.

View file

@ -135,7 +135,7 @@ class DatasetPipeline(Generic[T]):
self,
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_size: int = None,
batch_format: str = "native",
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
@ -154,10 +154,7 @@ class DatasetPipeline(Generic[T]):
Args:
prefetch_blocks: The number of blocks to prefetch ahead of the
current block during the scan.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows if
``drop_last`` is ``False``. Defaults to 256.
batch_size: Record batch size, or None to let the system pick.
batch_format: The format in which to return each batch.
Specify "native" to use the current block format (promoting
Arrow to pandas automatically), "pandas" to
@ -943,7 +940,7 @@ class DatasetPipeline(Generic[T]):
self,
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_size: Optional[int] = None,
batch_format: str = "native",
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
@ -965,7 +962,7 @@ class DatasetPipeline(Generic[T]):
self,
*,
prefetch_blocks: int = 0,
batch_size: Optional[int] = 256,
batch_size: Optional[int] = None,
batch_format: str = "native",
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,

View file

@ -404,7 +404,7 @@ def _get_unique_value_indices(
value_counts = dataset.map_batches(get_pd_value_counts, batch_format="pandas")
final_counters = {col: Counter() for col in columns}
for batch in value_counts.iter_batches(batch_size=None):
for batch in value_counts.iter_batches():
for col_value_counts in batch:
for col, value_counts in col_value_counts.items():
final_counters[col] += value_counts

View file

@ -93,7 +93,7 @@ def _get_most_frequent_values(
value_counts = dataset.map_batches(get_pd_value_counts, batch_format="pandas")
final_counters = {col: Counter() for col in columns}
for batch in value_counts.iter_batches(batch_size=None):
for batch in value_counts.iter_batches():
for col_value_counts in batch:
for col, value_counts in col_value_counts.items():
final_counters[col] += value_counts

View file

@ -100,7 +100,7 @@ class CountVectorizer(Preprocessor):
value_counts = dataset.map_batches(get_pd_value_counts, batch_format="pandas")
total_counts = [Counter() for _ in self.columns]
for batch in value_counts.iter_batches(batch_size=None):
for batch in value_counts.iter_batches():
for i, col_value_counts in enumerate(batch):
total_counts[i].update(col_value_counts)

View file

@ -1531,17 +1531,17 @@ def test_iter_batches_basic(ray_start_regular_shared):
ds = ray.data.from_pandas(dfs)
# Default.
for batch, df in zip(ds.iter_batches(batch_size=None, batch_format="pandas"), dfs):
for batch, df in zip(ds.iter_batches(batch_format="pandas"), dfs):
assert isinstance(batch, pd.DataFrame)
assert batch.equals(df)
# pyarrow.Table format.
for batch, df in zip(ds.iter_batches(batch_size=None, batch_format="pyarrow"), dfs):
for batch, df in zip(ds.iter_batches(batch_format="pyarrow"), dfs):
assert isinstance(batch, pa.Table)
assert batch.equals(pa.Table.from_pandas(df))
# NumPy format.
for batch, df in zip(ds.iter_batches(batch_size=None, batch_format="numpy"), dfs):
for batch, df in zip(ds.iter_batches(batch_format="numpy"), dfs):
assert isinstance(batch, dict)
assert list(batch.keys()) == ["one", "two"]
assert all(isinstance(col, np.ndarray) for col in batch.values())
@ -1549,14 +1549,14 @@ def test_iter_batches_basic(ray_start_regular_shared):
# Test NumPy format on Arrow blocks.
ds2 = ds.map_batches(lambda b: b, batch_size=None, batch_format="pyarrow")
for batch, df in zip(ds2.iter_batches(batch_size=None, batch_format="numpy"), dfs):
for batch, df in zip(ds2.iter_batches(batch_format="numpy"), dfs):
assert isinstance(batch, dict)
assert list(batch.keys()) == ["one", "two"]
assert all(isinstance(col, np.ndarray) for col in batch.values())
pd.testing.assert_frame_equal(pd.DataFrame(batch), df)
# Native format.
for batch, df in zip(ds.iter_batches(batch_size=None, batch_format="native"), dfs):
for batch, df in zip(ds.iter_batches(batch_format="native"), dfs):
assert BlockAccessor.for_block(batch).to_pandas().equals(df)
# Batch size.
@ -1616,9 +1616,7 @@ def test_iter_batches_basic(ray_start_regular_shared):
)
# Prefetch.
batches = list(
ds.iter_batches(prefetch_blocks=1, batch_size=None, batch_format="pandas")
)
batches = list(ds.iter_batches(prefetch_blocks=1, batch_format="pandas"))
assert len(batches) == len(dfs)
for batch, df in zip(batches, dfs):
assert isinstance(batch, pd.DataFrame)
@ -1637,11 +1635,7 @@ def test_iter_batches_basic(ray_start_regular_shared):
)
# Prefetch more than number of blocks.
batches = list(
ds.iter_batches(
prefetch_blocks=len(dfs), batch_size=None, batch_format="pandas"
)
)
batches = list(ds.iter_batches(prefetch_blocks=len(dfs), batch_format="pandas"))
assert len(batches) == len(dfs)
for batch, df in zip(batches, dfs):
assert isinstance(batch, pd.DataFrame)
@ -1650,9 +1644,7 @@ def test_iter_batches_basic(ray_start_regular_shared):
# Prefetch with ray.wait.
context = DatasetContext.get_current()
context.actor_prefetcher_enabled = False
batches = list(
ds.iter_batches(prefetch_blocks=1, batch_size=None, batch_format="pandas")
)
batches = list(ds.iter_batches(prefetch_blocks=1, batch_format="pandas"))
assert len(batches) == len(dfs)
for batch, df in zip(batches, dfs):
assert isinstance(batch, pd.DataFrame)
@ -1665,11 +1657,7 @@ def test_iter_batches_local_shuffle(shutdown_only, pipelined, ds_format):
# Input validation.
# Batch size must be given for local shuffle.
with pytest.raises(ValueError):
list(
ray.data.range(100).iter_batches(
batch_size=None, local_shuffle_buffer_size=10
)
)
list(ray.data.range(100).iter_batches(local_shuffle_buffer_size=10))
def range(n, parallelism=200):
if ds_format == "simple":
@ -1958,7 +1946,7 @@ def test_iter_batches_grid(ray_start_regular_shared):
def test_lazy_loading_iter_batches_exponential_rampup(ray_start_regular_shared):
ds = ray.data.range(32, parallelism=8)
expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8]
for _, expected in zip(ds.iter_batches(batch_size=None), expected_num_blocks):
for _, expected in zip(ds.iter_batches(), expected_num_blocks):
assert ds._plan.execute()._num_computed() == expected

View file

@ -402,7 +402,7 @@ def test_repartition(ray_start_regular_shared):
def test_iter_batches_basic(ray_start_regular_shared):
pipe = ray.data.range(10, parallelism=10).window(blocks_per_window=2)
batches = list(pipe.iter_batches(batch_size=None))
batches = list(pipe.iter_batches())
assert len(batches) == 10
assert all(len(e) == 1 for e in batches)

View file

@ -10,7 +10,7 @@ from ray.tests.conftest import * # noqa
def check_no_spill(ctx, pipe, prefetch_blocks: int = 0):
# Run .iter_batches() for 10 secs, and we expect no object spilling.
end_time = time.time() + 10
for batch in pipe.iter_batches(batch_size=None, prefetch_blocks=prefetch_blocks):
for batch in pipe.iter_batches(prefetch_blocks=prefetch_blocks):
if time.time() > end_time:
break
meminfo = memory_summary(ctx.address_info["address"], stats_only=True)
@ -127,7 +127,7 @@ def test_pipeline_splitting_has_no_spilling(shutdown_only):
@ray.remote
def consume(p):
for batch in p.iter_batches(batch_size=None):
for batch in p.iter_batches():
pass
tasks = [consume.remote(p1), consume.remote(p2)]