mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[Datasets] Rename _experimental_lazy() --> experimental_lazy(). (#24321)
The experimental_ prefix should suffice, we shouldn't also need to make it a private method.
This commit is contained in:
parent
0825078a20
commit
14f2729b3a
4 changed files with 22 additions and 15 deletions
|
@ -2884,10 +2884,17 @@ List[str]]]): The names of the columns to use as the features. Can be a list of
|
||||||
"""
|
"""
|
||||||
return self._plan.execute().get_blocks()
|
return self._plan.execute().get_blocks()
|
||||||
|
|
||||||
def _experimental_lazy(self) -> "Dataset[T]":
|
def experimental_lazy(self) -> "Dataset[T]":
|
||||||
"""Enable lazy evaluation (experimental)."""
|
"""EXPERIMENTAL: Enable lazy evaluation.
|
||||||
self._lazy = True
|
|
||||||
return self
|
The returned dataset is a lazy dataset, where all subsequent operations on the
|
||||||
|
dataset won't be executed until the dataset is consumed (e.g. ``.take()``,
|
||||||
|
``.iter_batches()``, ``.to_torch()``, ``.to_tf()``, etc.) or execution is
|
||||||
|
manually triggered via ``.fully_executed()``.
|
||||||
|
"""
|
||||||
|
ds = Dataset(self._plan, self._epoch, lazy=True)
|
||||||
|
ds._set_uuid(self._get_uuid())
|
||||||
|
return ds
|
||||||
|
|
||||||
def has_serializable_lineage(self) -> bool:
|
def has_serializable_lineage(self) -> bool:
|
||||||
"""Whether this dataset's lineage is able to be serialized for storage and
|
"""Whether this dataset's lineage is able to be serialized for storage and
|
||||||
|
|
|
@ -41,7 +41,7 @@ def maybe_pipeline(ds, enabled):
|
||||||
|
|
||||||
def maybe_lazy(ds, enabled):
|
def maybe_lazy(ds, enabled):
|
||||||
if enabled:
|
if enabled:
|
||||||
return ds._experimental_lazy()
|
return ds.experimental_lazy()
|
||||||
else:
|
else:
|
||||||
return ds
|
return ds
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ def test_memory_release_lazy(shutdown_only):
|
||||||
ds = ray.data.range(10)
|
ds = ray.data.range(10)
|
||||||
|
|
||||||
# Should get fused into single stage.
|
# Should get fused into single stage.
|
||||||
ds = ds._experimental_lazy()
|
ds = ds.experimental_lazy()
|
||||||
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
|
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
|
||||||
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
|
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
|
||||||
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
|
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
|
||||||
|
@ -69,7 +69,7 @@ def test_memory_release_lazy_shuffle(shutdown_only):
|
||||||
ds = ray.data.range(10)
|
ds = ray.data.range(10)
|
||||||
|
|
||||||
# Should get fused into single stage.
|
# Should get fused into single stage.
|
||||||
ds = ds._experimental_lazy()
|
ds = ds.experimental_lazy()
|
||||||
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
|
ds = ds.map(lambda x: np.ones(100 * 1024 * 1024, dtype=np.uint8))
|
||||||
ds.random_shuffle().fully_executed()
|
ds.random_shuffle().fully_executed()
|
||||||
meminfo = memory_summary(info.address_info["address"], stats_only=True)
|
meminfo = memory_summary(info.address_info["address"], stats_only=True)
|
||||||
|
@ -84,7 +84,7 @@ def test_memory_release_lazy_shuffle(shutdown_only):
|
||||||
|
|
||||||
|
|
||||||
def test_spread_hint_inherit(ray_start_regular_shared):
|
def test_spread_hint_inherit(ray_start_regular_shared):
|
||||||
ds = ray.data.range(10)._experimental_lazy()
|
ds = ray.data.range(10).experimental_lazy()
|
||||||
ds = ds.map(lambda x: x + 1)
|
ds = ds.map(lambda x: x + 1)
|
||||||
ds = ds.random_shuffle()
|
ds = ds.random_shuffle()
|
||||||
for s in ds._plan._stages_before_snapshot:
|
for s in ds._plan._stages_before_snapshot:
|
||||||
|
@ -97,7 +97,7 @@ def test_spread_hint_inherit(ray_start_regular_shared):
|
||||||
|
|
||||||
|
|
||||||
def test_execution_preserves_original(ray_start_regular_shared):
|
def test_execution_preserves_original(ray_start_regular_shared):
|
||||||
ds = ray.data.range(10).map(lambda x: x + 1)._experimental_lazy()
|
ds = ray.data.range(10).map(lambda x: x + 1).experimental_lazy()
|
||||||
ds1 = ds.map(lambda x: x + 1)
|
ds1 = ds.map(lambda x: x + 1)
|
||||||
assert ds1._plan._snapshot_blocks is not None
|
assert ds1._plan._snapshot_blocks is not None
|
||||||
assert len(ds1._plan._stages_after_snapshot) == 1
|
assert len(ds1._plan._stages_after_snapshot) == 1
|
||||||
|
@ -136,7 +136,7 @@ def test_stage_linking(ray_start_regular_shared):
|
||||||
_assert_has_stages(ds._plan._last_optimized_stages, ["read->map"])
|
_assert_has_stages(ds._plan._last_optimized_stages, ["read->map"])
|
||||||
|
|
||||||
# Test lazy dataset.
|
# Test lazy dataset.
|
||||||
ds = ray.data.range(10)._experimental_lazy()
|
ds = ray.data.range(10).experimental_lazy()
|
||||||
assert len(ds._plan._stages_before_snapshot) == 0
|
assert len(ds._plan._stages_before_snapshot) == 0
|
||||||
assert len(ds._plan._stages_after_snapshot) == 0
|
assert len(ds._plan._stages_after_snapshot) == 0
|
||||||
assert len(ds._plan._last_optimized_stages) == 0
|
assert len(ds._plan._last_optimized_stages) == 0
|
||||||
|
@ -328,7 +328,7 @@ def test_optimize_lazy_reuse_base_data(
|
||||||
ds = ray.data.read_datasource(source, parallelism=4, paths=paths)
|
ds = ray.data.read_datasource(source, parallelism=4, paths=paths)
|
||||||
num_reads = ray.get(counter.get.remote())
|
num_reads = ray.get(counter.get.remote())
|
||||||
assert num_reads == 1, num_reads
|
assert num_reads == 1, num_reads
|
||||||
ds = ds._experimental_lazy()
|
ds = ds.experimental_lazy()
|
||||||
ds = ds.map(lambda x: x)
|
ds = ds.map(lambda x: x)
|
||||||
if with_shuffle:
|
if with_shuffle:
|
||||||
ds = ds.random_shuffle()
|
ds = ds.random_shuffle()
|
||||||
|
|
|
@ -33,8 +33,8 @@ def gen_dataset_func() -> Dataset:
|
||||||
|
|
||||||
|
|
||||||
def test_grid_search():
|
def test_grid_search():
|
||||||
ds1 = gen_dataset_func()._experimental_lazy().map(lambda x: x)
|
ds1 = gen_dataset_func().experimental_lazy().map(lambda x: x)
|
||||||
ds2 = gen_dataset_func()._experimental_lazy().map(lambda x: x)
|
ds2 = gen_dataset_func().experimental_lazy().map(lambda x: x)
|
||||||
assert not ds1._plan._has_final_stage_snapshot()
|
assert not ds1._plan._has_final_stage_snapshot()
|
||||||
assert not ds2._plan._has_final_stage_snapshot()
|
assert not ds2._plan._has_final_stage_snapshot()
|
||||||
param_space = {"train_dataset": tune.grid_search([ds1, ds2])}
|
param_space = {"train_dataset": tune.grid_search([ds1, ds2])}
|
||||||
|
@ -46,8 +46,8 @@ def test_grid_search():
|
||||||
|
|
||||||
|
|
||||||
def test_choice():
|
def test_choice():
|
||||||
ds1 = gen_dataset_func()._experimental_lazy().map(lambda x: x)
|
ds1 = gen_dataset_func().experimental_lazy().map(lambda x: x)
|
||||||
ds2 = gen_dataset_func()._experimental_lazy().map(lambda x: x)
|
ds2 = gen_dataset_func().experimental_lazy().map(lambda x: x)
|
||||||
assert not ds1._plan._has_final_stage_snapshot()
|
assert not ds1._plan._has_final_stage_snapshot()
|
||||||
assert not ds2._plan._has_final_stage_snapshot()
|
assert not ds2._plan._has_final_stage_snapshot()
|
||||||
param_space = {"train_dataset": tune.choice([ds1, ds2])}
|
param_space = {"train_dataset": tune.choice([ds1, ds2])}
|
||||||
|
|
Loading…
Add table
Reference in a new issue