Move ray.data out of experimental (#17560)

This commit is contained in:
Eric Liang 2021-08-04 13:31:10 -07:00 committed by GitHub
parent 63708468df
commit d4f9d3620e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 221 additions and 230 deletions

View file

@ -353,7 +353,7 @@
commands: commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ./ci/travis/install-dependencies.sh - DATA_PROCESSING_TESTING=1 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) python/ray/experimental/workflow/... python/ray/experimental/data/... - bazel test --config=ci $(./scripts/bazel_export_options) python/ray/experimental/workflow/... python/ray/data/...
- label: ":book: Doc tests and examples" - label: ":book: Doc tests and examples"
conditions: conditions:

View file

@ -5,7 +5,7 @@ Datasets: Distributed Arrow on Ray
.. tip:: .. tip::
Ray Datasets is available in early preview at ``ray.experimental.data``. Ray Datasets is available in early preview at ``ray.data``.
Ray Datasets are the standard way to load and exchange data in Ray libraries and applications. Datasets provide basic distributed data transformations such as ``map``, ``filter``, and ``repartition``, and are compatible with a variety of file formats, datasources, and distributed frameworks. Ray Datasets are the standard way to load and exchange data in Ray libraries and applications. Datasets provide basic distributed data transformations such as ``map``, ``filter``, and ``repartition``, and are compatible with a variety of file formats, datasources, and distributed frameworks.
@ -57,6 +57,9 @@ Datasource Compatibility Matrices
* - Binary Files * - Binary Files
- ``ray.data.read_binary_files()`` - ``ray.data.read_binary_files()``
- ✅ - ✅
* - Python Objects
- ``ray.data.from_items()``
- ✅
* - Spark Dataframe * - Spark Dataframe
- ``ray.data.from_spark()`` - ``ray.data.from_spark()``
- (todo) - (todo)
@ -219,17 +222,17 @@ Datasets can be transformed in parallel using ``.map()``. Transformations are ex
ds = ray.data.range(10000) ds = ray.data.range(10000)
ds = ds.map(lambda x: x * 2) ds = ds.map(lambda x: x * 2)
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1123.54it/s] # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>) # -> Dataset(num_blocks=200, num_rows=10000, schema=<class 'int'>)
ds.take(5) ds.take(5)
# -> [0, 2, 4, 6, 8] # -> [0, 2, 4, 6, 8]
ds.filter(lambda x: x > 5).take(5) ds.filter(lambda x: x > 5).take(5)
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1859.63it/s] # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1859.63it/s]
# -> [6, 8, 10, 12, 14] # -> [6, 8, 10, 12, 14]
ds.flat_map(lambda x: [x, -x]).take(5) ds.flat_map(lambda x: [x, -x]).take(5)
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1568.10it/s] # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1568.10it/s]
# -> [0, 0, 2, -2, 4] # -> [0, 0, 2, -2, 4]
To take advantage of vectorized functions, use ``.map_batches()``. Note that you can also implement ``filter`` and ``flat_map`` using ``.map_batches()``, since your map function can return an output batch of any size. To take advantage of vectorized functions, use ``.map_batches()``. Note that you can also implement ``filter`` and ``flat_map`` using ``.map_batches()``, since your map function can return an output batch of any size.
@ -237,8 +240,9 @@ To take advantage of vectorized functions, use ``.map_batches()``. Note that you
.. code-block:: python .. code-block:: python
ds = ray.data.range_arrow(10000) ds = ray.data.range_arrow(10000)
ds = ds.map_batches(lambda df: df.applymap(lambda x: x * 2), batch_format="pandas") ds = ds.map_batches(
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1927.62it/s] lambda df: df.applymap(lambda x: x * 2), batch_format="pandas")
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1927.62it/s]
ds.take(5) ds.take(5)
# -> [ArrowRow({'value': 0}), ArrowRow({'value': 2}), ...] # -> [ArrowRow({'value': 0}), ArrowRow({'value': 2}), ...]
@ -260,12 +264,12 @@ By default, transformations are executed using Ray tasks. For transformations th
# Preprocess the data. # Preprocess the data.
ds = ds.map(preprocess) ds = ds.map(preprocess)
# -> Map Progress: 100%|█████████████████████████| 200/200 [00:00<00:00, 1123.54it/s] # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]
# Apply GPU batch inference with actors, and assign each actor a GPU using # Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here). # ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1) ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|█████| 200/200 [00:07<00:00, 27.60it/s] # -> Map Progress (16 actors 4 pending): 100%|█████| 200/200 [00:07, 27.60it/s]
# Save the results. # Save the results.
ds.repartition(1).write_json("s3://bucket/inference-results") ds.repartition(1).write_json("s3://bucket/inference-results")
@ -324,7 +328,8 @@ Datasets support tensor-typed values, which are represented in-memory as Arrow t
# Create a Dataset of tensor-typed values. # Create a Dataset of tensor-typed values.
ds = ray.data.range_tensor(10000, shape=(3, 5)) ds = ray.data.range_tensor(10000, shape=(3, 5))
# -> Dataset(num_blocks=200, num_rows=10000, schema=<Tensor: shape=(None, 3, 5), dtype=int64>) # -> Dataset(num_blocks=200, num_rows=10000,
# schema=<Tensor: shape=(None, 3, 5), dtype=int64>)
ds.map_batches(lambda t: t + 2).show(2) ds.map_batches(lambda t: t + 2).show(2)
# -> [[2 2 2 2 2] # -> [[2 2 2 2 2]
@ -339,7 +344,8 @@ Datasets support tensor-typed values, which are represented in-memory as Arrow t
# Read from storage. # Read from storage.
ray.data.read_numpy("/tmp/tensor_out") ray.data.read_numpy("/tmp/tensor_out")
# -> Dataset(num_blocks=200, num_rows=?, schema=<Tensor: shape=(None, 3, 5), dtype=int64>) # -> Dataset(num_blocks=200, num_rows=?,
# schema=<Tensor: shape=(None, 3, 5), dtype=int64>)
Tensor datasets are also created whenever an array type is returned from a map function: Tensor datasets are also created whenever an array type is returned from a map function:
@ -351,7 +357,8 @@ Tensor datasets are also created whenever an array type is returned from a map f
# It is now converted into a Tensor dataset. # It is now converted into a Tensor dataset.
ds = ds.map_batches(lambda x: np.array(x)) ds = ds.map_batches(lambda x: np.array(x))
# -> Dataset(num_blocks=10, num_rows=10, schema=<Tensor: shape=(None,), dtype=int64>) # -> Dataset(num_blocks=10, num_rows=10,
# schema=<Tensor: shape=(None,), dtype=int64>)
Limitations: currently tensor-typed values cannot be nested in tabular records (e.g., as in TFRecord / Petastorm format). This is planned for development. Limitations: currently tensor-typed values cannot be nested in tabular records (e.g., as in TFRecord / Petastorm format). This is planned for development.

View file

@ -13,6 +13,7 @@ Creating a Dataset
.. autofunction:: ray.data.read_text .. autofunction:: ray.data.read_text
.. autofunction:: ray.data.read_binary_files .. autofunction:: ray.data.read_binary_files
.. autofunction:: ray.data.read_datasource .. autofunction:: ray.data.read_datasource
.. autofunction:: ray.data.from_items
.. autofunction:: ray.data.from_arrow .. autofunction:: ray.data.from_arrow
.. autofunction:: ray.data.from_spark .. autofunction:: ray.data.from_spark
.. autofunction:: ray.data.from_dask .. autofunction:: ray.data.from_dask
@ -29,7 +30,7 @@ Dataset API
DatasetPipeline API DatasetPipeline API
------------------- -------------------
.. autoclass:: ray.experimental.data.dataset_pipeline.DatasetPipeline .. autoclass:: ray.data.dataset_pipeline.DatasetPipeline
:members: :members:
Custom Datasource API Custom Datasource API

View file

@ -98,6 +98,7 @@ import ray.actor # noqa: E402,F401
from ray.actor import method # noqa: E402 from ray.actor import method # noqa: E402
from ray.cross_language import java_function, java_actor_class # noqa: E402 from ray.cross_language import java_function, java_actor_class # noqa: E402
from ray.runtime_context import get_runtime_context # noqa: E402 from ray.runtime_context import get_runtime_context # noqa: E402
from ray import data # noqa: E402,F401
from ray import util # noqa: E402 from ray import util # noqa: E402
# We import ClientBuilder so that modules can inherit from `ray.ClientBuilder`. # We import ClientBuilder so that modules can inherit from `ray.ClientBuilder`.
from ray.client_builder import client, ClientBuilder # noqa: E402 from ray.client_builder import client, ClientBuilder # noqa: E402
@ -112,6 +113,7 @@ __all__ = [
"client", "client",
"ClientBuilder", "ClientBuilder",
"cluster_resources", "cluster_resources",
"data"
"get", "get",
"get_actor", "get_actor",
"get_gpu_ids", "get_gpu_ids",
@ -151,11 +153,6 @@ __all__ += [
"PlacementGroupID", "PlacementGroupID",
] ]
# Add an alias so we can point to the final location in docs.
# TODO(ekl) remove this once datasets is out of alpha.
from ray.experimental import data # noqa
__all__.append(data)
# Add an alias so we can point to the final location in docs. # Add an alias so we can point to the final location in docs.
# TODO(yic) remove this once workflow is out of alpha. # TODO(yic) remove this once workflow is out of alpha.
from ray.experimental import workflow # noqa from ray.experimental import workflow # noqa

View file

@ -1,5 +1,5 @@
# -------------------------------------------------------------------- # --------------------------------------------------------------------
# Tests from the python/ray/experimental/data/tests directory. # Tests from the python/ray/data/tests directory.
# Covers all tests starting with `test_`. # Covers all tests starting with `test_`.
# Please keep these sorted alphabetically. # Please keep these sorted alphabetically.
# -------------------------------------------------------------------- # --------------------------------------------------------------------

View file

@ -1,10 +1,10 @@
from ray.experimental.data.read_api import from_items, range, range_arrow, \ from ray.data.read_api import from_items, range, range_arrow, \
range_tensor, read_parquet, read_json, read_csv, read_binary_files, \ range_tensor, read_parquet, read_json, read_csv, read_binary_files, \
from_dask, from_modin, from_mars, from_pandas, from_arrow, from_spark, \ from_dask, from_modin, from_mars, from_pandas, from_arrow, from_spark, \
read_datasource, read_numpy, read_text read_datasource, read_numpy, read_text
from ray.experimental.data.datasource import Datasource, ReadTask, WriteTask from ray.data.datasource import Datasource, ReadTask, WriteTask
from ray.experimental.data.dataset import Dataset from ray.data.dataset import Dataset
from ray.experimental.data.impl.progress_bar import set_progress_bars from ray.data.impl.progress_bar import set_progress_bars
# Module-level cached global functions (for impl/compute). It cannot be defined # Module-level cached global functions (for impl/compute). It cannot be defined
# in impl/compute since it has to be process-global across cloudpickled funcs. # in impl/compute since it has to be process-global across cloudpickled funcs.

View file

@ -6,7 +6,7 @@ import numpy as np
if TYPE_CHECKING: if TYPE_CHECKING:
import pandas import pandas
import pyarrow import pyarrow
from ray.experimental.data.impl.block_builder import BlockBuilder from ray.data.impl.block_builder import BlockBuilder
from ray.util.annotations import DeveloperAPI from ray.util.annotations import DeveloperAPI
@ -115,15 +115,15 @@ class BlockAccessor(Generic[T]):
import pyarrow import pyarrow
if isinstance(block, pyarrow.Table): if isinstance(block, pyarrow.Table):
from ray.experimental.data.impl.arrow_block import \ from ray.data.impl.arrow_block import \
ArrowBlockAccessor ArrowBlockAccessor
return ArrowBlockAccessor(block) return ArrowBlockAccessor(block)
elif isinstance(block, list): elif isinstance(block, list):
from ray.experimental.data.impl.simple_block import \ from ray.data.impl.simple_block import \
SimpleBlockAccessor SimpleBlockAccessor
return SimpleBlockAccessor(block) return SimpleBlockAccessor(block)
elif isinstance(block, np.ndarray): elif isinstance(block, np.ndarray):
from ray.experimental.data.impl.tensor_block import \ from ray.data.impl.tensor_block import \
TensorBlockAccessor TensorBlockAccessor
return TensorBlockAccessor(block) return TensorBlockAccessor(block)
else: else:

View file

@ -14,7 +14,7 @@ if TYPE_CHECKING:
import ray.util.sgd import ray.util.sgd
import torch import torch
import tensorflow as tf import tensorflow as tf
from ray.experimental.data.dataset_pipeline import DatasetPipeline from ray.data.dataset_pipeline import DatasetPipeline
import collections import collections
import itertools import itertools
@ -23,17 +23,17 @@ import numpy as np
import ray import ray
from ray.types import ObjectRef from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI, PublicAPI from ray.util.annotations import DeveloperAPI, PublicAPI
from ray.experimental.data.block import Block, BlockAccessor, BlockMetadata from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.experimental.data.datasource import Datasource, WriteTask from ray.data.datasource import Datasource, WriteTask
from ray.experimental.data.impl.remote_fn import cached_remote_fn from ray.data.impl.remote_fn import cached_remote_fn
from ray.experimental.data.impl.batcher import Batcher from ray.data.impl.batcher import Batcher
from ray.experimental.data.impl.compute import get_compute, cache_wrapper, \ from ray.data.impl.compute import get_compute, cache_wrapper, \
CallableClass CallableClass
from ray.experimental.data.impl.progress_bar import ProgressBar from ray.data.impl.progress_bar import ProgressBar
from ray.experimental.data.impl.shuffle import simple_shuffle from ray.data.impl.shuffle import simple_shuffle
from ray.experimental.data.impl.sort import sort_impl from ray.data.impl.sort import sort_impl
from ray.experimental.data.impl.block_list import BlockList from ray.data.impl.block_list import BlockList
from ray.experimental.data.impl.arrow_block import DelegatingArrowBlockBuilder from ray.data.impl.arrow_block import DelegatingArrowBlockBuilder
T = TypeVar("T") T = TypeVar("T")
U = TypeVar("U") U = TypeVar("U")
@ -1073,7 +1073,7 @@ class Dataset(Generic[T]):
""" """
import torch import torch
from ray.experimental.data.impl.torch_iterable_dataset import \ from ray.data.impl.torch_iterable_dataset import \
TorchIterableDataset TorchIterableDataset
if feature_columns and feature_column_dtypes: if feature_columns and feature_column_dtypes:
@ -1303,7 +1303,7 @@ class Dataset(Generic[T]):
times: The number of times to loop over this dataset, or None times: The number of times to loop over this dataset, or None
to repeat indefinitely. to repeat indefinitely.
""" """
from ray.experimental.data.dataset_pipeline import DatasetPipeline from ray.data.dataset_pipeline import DatasetPipeline
if times is not None and times < 1: if times is not None and times < 1:
raise ValueError("`times` must be >= 1, got {}".format(times)) raise ValueError("`times` must be >= 1, got {}".format(times))
@ -1375,7 +1375,7 @@ class Dataset(Generic[T]):
length of the pipeline. Setting this to infinity effectively length of the pipeline. Setting this to infinity effectively
disables pipelining. disables pipelining.
""" """
from ray.experimental.data.dataset_pipeline import DatasetPipeline from ray.data.dataset_pipeline import DatasetPipeline
class Iterator: class Iterator:
def __init__(self, splits): def __init__(self, splits):

View file

@ -4,10 +4,10 @@ from typing import Any, Callable, List, Iterator, Iterable, Generic, Union, \
TYPE_CHECKING TYPE_CHECKING
import ray import ray
from ray.experimental.data.dataset import Dataset, T, U, BatchType from ray.data.dataset import Dataset, T, U, BatchType
from ray.experimental.data.impl.pipeline_executor import PipelineExecutor, \ from ray.data.impl.pipeline_executor import PipelineExecutor, \
PipelineSplitExecutorCoordinator PipelineSplitExecutorCoordinator
from ray.experimental.data.impl import progress_bar from ray.data.impl import progress_bar
from ray.util.annotations import PublicAPI, DeveloperAPI from ray.util.annotations import PublicAPI, DeveloperAPI
if TYPE_CHECKING: if TYPE_CHECKING:

View file

@ -0,0 +1,24 @@
from ray.data.datasource.datasource import (
Datasource, RangeDatasource, DummyOutputDatasource, ReadTask, WriteTask)
from ray.data.datasource.json_datasource import JSONDatasource
from ray.data.datasource.csv_datasource import CSVDatasource
from ray.data.datasource.numpy_datasource import NumpyDatasource
from ray.data.datasource.parquet_datasource import (ParquetDatasource)
from ray.data.datasource.binary_datasource import BinaryDatasource
from ray.data.datasource.file_based_datasource import (FileBasedDatasource,
_S3FileSystemWrapper)
__all__ = [
"JSONDatasource",
"CSVDatasource",
"NumpyDatasource",
"ParquetDatasource",
"BinaryDatasource",
"FileBasedDatasource",
"_S3FileSystemWrapper",
"Datasource",
"RangeDatasource",
"DummyOutputDatasource",
"ReadTask",
"WriteTask",
]

View file

@ -3,8 +3,7 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
import pyarrow import pyarrow
from ray.experimental.data.datasource.file_based_datasource import ( from ray.data.datasource.file_based_datasource import (FileBasedDatasource)
FileBasedDatasource)
class BinaryDatasource(FileBasedDatasource): class BinaryDatasource(FileBasedDatasource):

View file

@ -3,8 +3,7 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
import pyarrow import pyarrow
from ray.experimental.data.datasource.file_based_datasource import ( from ray.data.datasource.file_based_datasource import (FileBasedDatasource)
FileBasedDatasource)
class CSVDatasource(FileBasedDatasource): class CSVDatasource(FileBasedDatasource):

View file

@ -5,9 +5,9 @@ import numpy as np
import ray import ray
from ray.types import ObjectRef from ray.types import ObjectRef
from ray.experimental.data.block import Block, BlockAccessor, \ from ray.data.block import Block, BlockAccessor, \
BlockMetadata, T BlockMetadata, T
from ray.experimental.data.impl.arrow_block import ArrowRow from ray.data.impl.arrow_block import ArrowRow
from ray.util.annotations import PublicAPI from ray.util.annotations import PublicAPI
WriteResult = Any WriteResult = Any

View file

@ -5,10 +5,9 @@ from urllib.parse import urlparse
if TYPE_CHECKING: if TYPE_CHECKING:
import pyarrow import pyarrow
from ray.experimental.data.impl.arrow_block import ( from ray.data.impl.arrow_block import (ArrowRow, DelegatingArrowBlockBuilder)
ArrowRow, DelegatingArrowBlockBuilder) from ray.data.impl.block_list import BlockMetadata
from ray.experimental.data.impl.block_list import BlockMetadata from ray.data.datasource.datasource import Datasource, ReadTask
from ray.experimental.data.datasource.datasource import Datasource, ReadTask
from ray.util.annotations import DeveloperAPI from ray.util.annotations import DeveloperAPI
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -3,8 +3,7 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
import pyarrow import pyarrow
from ray.experimental.data.datasource.file_based_datasource import ( from ray.data.datasource.file_based_datasource import (FileBasedDatasource)
FileBasedDatasource)
class JSONDatasource(FileBasedDatasource): class JSONDatasource(FileBasedDatasource):

View file

@ -6,8 +6,7 @@ import numpy as np
if TYPE_CHECKING: if TYPE_CHECKING:
import pyarrow import pyarrow
from ray.experimental.data.datasource.file_based_datasource import ( from ray.data.datasource.file_based_datasource import (FileBasedDatasource)
FileBasedDatasource)
class NumpyDatasource(FileBasedDatasource): class NumpyDatasource(FileBasedDatasource):

View file

@ -4,10 +4,10 @@ from typing import Optional, List, Union, TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
import pyarrow import pyarrow
from ray.experimental.data.impl.arrow_block import ArrowRow from ray.data.impl.arrow_block import ArrowRow
from ray.experimental.data.impl.block_list import BlockMetadata from ray.data.impl.block_list import BlockMetadata
from ray.experimental.data.datasource.datasource import Datasource, ReadTask from ray.data.datasource.datasource import Datasource, ReadTask
from ray.experimental.data.datasource.file_based_datasource import ( from ray.data.datasource.file_based_datasource import (
_resolve_paths_and_filesystem) _resolve_paths_and_filesystem)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -3,7 +3,7 @@ import time
ray.init(num_gpus=2) ray.init(num_gpus=2)
ds = ray.experimental.data.range(100) ds = ray.data.range(100)
def preprocess(x): def preprocess(x):

View file

@ -3,7 +3,7 @@ import ray
ray.init() ray.init()
ds = ray.experimental.data.from_items(range(200)) ds = ray.data.from_items(range(200))
def slow(x): def slow(x):

View file

@ -2,5 +2,5 @@ import ray
ray.init() ray.init()
ds = ray.experimental.data.range(100000000) ds = ray.data.range(100000000)
ds.repartition(1000) ds.repartition(1000)

View file

@ -10,10 +10,10 @@ try:
except ImportError: except ImportError:
pyarrow = None pyarrow = None
from ray.experimental.data.block import Block, BlockAccessor, BlockMetadata from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.experimental.data.impl.block_builder import BlockBuilder from ray.data.impl.block_builder import BlockBuilder
from ray.experimental.data.impl.simple_block import SimpleBlockBuilder from ray.data.impl.simple_block import SimpleBlockBuilder
from ray.experimental.data.impl.tensor_block import TensorBlockBuilder from ray.data.impl.tensor_block import TensorBlockBuilder
if TYPE_CHECKING: if TYPE_CHECKING:
import pandas import pandas

View file

@ -1,7 +1,7 @@
from typing import Optional from typing import Optional
from ray.experimental.data.block import Block, BlockAccessor from ray.data.block import Block, BlockAccessor
from ray.experimental.data.impl.arrow_block import DelegatingArrowBlockBuilder from ray.data.impl.arrow_block import DelegatingArrowBlockBuilder
class Batcher: class Batcher:

View file

@ -1,6 +1,6 @@
from typing import Generic from typing import Generic
from ray.experimental.data.block import Block, T from ray.data.block import Block, T
class BlockBuilder(Generic[T]): class BlockBuilder(Generic[T]):

View file

@ -4,7 +4,7 @@ from typing import Iterable, List
import numpy as np import numpy as np
from ray.types import ObjectRef from ray.types import ObjectRef
from ray.experimental.data.block import Block, BlockMetadata from ray.data.block import Block, BlockMetadata
class BlockList(Iterable[ObjectRef[Block]]): class BlockList(Iterable[ObjectRef[Block]]):

View file

@ -2,10 +2,10 @@ from typing import TypeVar, Iterable, Any, Union, Callable
import ray import ray
from ray.types import ObjectRef from ray.types import ObjectRef
from ray.experimental.data.block import Block, BlockAccessor, BlockMetadata from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.experimental.data.impl.block_list import BlockList from ray.data.impl.block_list import BlockList
from ray.experimental.data.impl.progress_bar import ProgressBar from ray.data.impl.progress_bar import ProgressBar
from ray.experimental.data.impl.remote_fn import cached_remote_fn from ray.data.impl.remote_fn import cached_remote_fn
T = TypeVar("T") T = TypeVar("T")
U = TypeVar("U") U = TypeVar("U")

View file

@ -4,8 +4,8 @@ from typing import Callable, List
import numpy as np import numpy as np
from ray.types import ObjectRef from ray.types import ObjectRef
from ray.experimental.data.block import Block, BlockMetadata, T from ray.data.block import Block, BlockMetadata, T
from ray.experimental.data.impl.block_list import BlockList from ray.data.impl.block_list import BlockList
class LazyBlockList(BlockList[T]): class LazyBlockList(BlockList[T]):

View file

@ -1,13 +1,13 @@
from typing import Any, Callable, List, Optional, TYPE_CHECKING from typing import Any, Callable, List, Optional, TYPE_CHECKING
import ray import ray
from ray.experimental.data.dataset import Dataset, T from ray.data.dataset import Dataset, T
from ray.experimental.data.impl.progress_bar import ProgressBar, \ from ray.data.impl.progress_bar import ProgressBar, \
set_progress_bars set_progress_bars
from ray.types import ObjectRef from ray.types import ObjectRef
if TYPE_CHECKING: if TYPE_CHECKING:
from ray.experimental.data.dataset_pipeline import DatasetPipeline from ray.data.dataset_pipeline import DatasetPipeline
@ray.remote @ray.remote

View file

@ -4,11 +4,11 @@ from typing import TypeVar, List, Optional
import numpy as np import numpy as np
import ray import ray
from ray.experimental.data.block import Block, BlockAccessor, BlockMetadata from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.experimental.data.impl.progress_bar import ProgressBar from ray.data.impl.progress_bar import ProgressBar
from ray.experimental.data.impl.block_list import BlockList from ray.data.impl.block_list import BlockList
from ray.experimental.data.impl.arrow_block import DelegatingArrowBlockBuilder from ray.data.impl.arrow_block import DelegatingArrowBlockBuilder
from ray.experimental.data.impl.remote_fn import cached_remote_fn from ray.data.impl.remote_fn import cached_remote_fn
T = TypeVar("T") T = TypeVar("T")

View file

@ -9,8 +9,8 @@ if TYPE_CHECKING:
import pandas import pandas
import pyarrow import pyarrow
from ray.experimental.data.impl.block_builder import BlockBuilder from ray.data.impl.block_builder import BlockBuilder
from ray.experimental.data.block import Block, BlockAccessor, BlockMetadata, T from ray.data.block import Block, BlockAccessor, BlockMetadata, T
# A simple block can be sorted by value (None) or a lambda function (Callable). # A simple block can be sorted by value (None) or a lambda function (Callable).
SortKeyT = Union[None, Callable[[T], Any]] SortKeyT = Union[None, Callable[[T], Any]]

View file

@ -20,10 +20,10 @@ from typing import List, Any, Callable, TypeVar, Tuple, Union
import numpy as np import numpy as np
import ray import ray
from ray.experimental.data.block import Block, BlockAccessor from ray.data.block import Block, BlockAccessor
from ray.experimental.data.impl.block_list import BlockList from ray.data.impl.block_list import BlockList
from ray.experimental.data.impl.progress_bar import ProgressBar from ray.data.impl.progress_bar import ProgressBar
from ray.experimental.data.impl.remote_fn import cached_remote_fn from ray.data.impl.remote_fn import cached_remote_fn
T = TypeVar("T") T = TypeVar("T")

View file

@ -6,8 +6,8 @@ if TYPE_CHECKING:
import pandas import pandas
import pyarrow import pyarrow
from ray.experimental.data.block import Block, BlockAccessor from ray.data.block import Block, BlockAccessor
from ray.experimental.data.impl.block_builder import BlockBuilder from ray.data.impl.block_builder import BlockBuilder
T = TypeVar("T") T = TypeVar("T")

View file

@ -14,16 +14,16 @@ if TYPE_CHECKING:
import ray import ray
from ray.types import ObjectRef from ray.types import ObjectRef
from ray.util.annotations import PublicAPI from ray.util.annotations import PublicAPI
from ray.experimental.data.block import Block, BlockAccessor, BlockMetadata from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.experimental.data.dataset import Dataset from ray.data.dataset import Dataset
from ray.experimental.data.datasource import Datasource, RangeDatasource, \ from ray.data.datasource import Datasource, RangeDatasource, \
JSONDatasource, CSVDatasource, ParquetDatasource, BinaryDatasource, \ JSONDatasource, CSVDatasource, ParquetDatasource, BinaryDatasource, \
NumpyDatasource, ReadTask NumpyDatasource, ReadTask
from ray.experimental.data.impl.arrow_block import ArrowRow, \ from ray.data.impl.arrow_block import ArrowRow, \
DelegatingArrowBlockBuilder DelegatingArrowBlockBuilder
from ray.experimental.data.impl.block_list import BlockList from ray.data.impl.block_list import BlockList
from ray.experimental.data.impl.lazy_block_list import LazyBlockList from ray.data.impl.lazy_block_list import LazyBlockList
from ray.experimental.data.impl.remote_fn import cached_remote_fn from ray.data.impl.remote_fn import cached_remote_fn
T = TypeVar("T") T = TypeVar("T")

View file

@ -16,9 +16,9 @@ from fsspec.implementations.local import LocalFileSystem
import ray import ray
from ray.tests.conftest import * # noqa from ray.tests.conftest import * # noqa
from ray.experimental.data.datasource import DummyOutputDatasource from ray.data.datasource import DummyOutputDatasource
from ray.experimental.data.block import BlockAccessor from ray.data.block import BlockAccessor
import ray.experimental.data.tests.util as util import ray.data.tests.util as util
def maybe_pipeline(ds, enabled): def maybe_pipeline(ds, enabled):
@ -31,7 +31,7 @@ def maybe_pipeline(ds, enabled):
@pytest.mark.parametrize("pipelined", [False, True]) @pytest.mark.parametrize("pipelined", [False, True])
def test_basic_actors(shutdown_only, pipelined): def test_basic_actors(shutdown_only, pipelined):
ray.init(num_cpus=2) ray.init(num_cpus=2)
ds = ray.experimental.data.range(5) ds = ray.data.range(5)
ds = maybe_pipeline(ds, pipelined) ds = maybe_pipeline(ds, pipelined)
assert sorted(ds.map(lambda x: x + 1, assert sorted(ds.map(lambda x: x + 1,
compute="actors").take()) == [1, 2, 3, 4, 5] compute="actors").take()) == [1, 2, 3, 4, 5]
@ -64,7 +64,7 @@ def test_equal_split(shutdown_only, pipelined):
def test_callable_classes(shutdown_only): def test_callable_classes(shutdown_only):
ray.init(num_cpus=1) ray.init(num_cpus=1)
ds = ray.experimental.data.range(10) ds = ray.data.range(10)
class StatefulFn: class StatefulFn:
def __init__(self): def __init__(self):
@ -120,7 +120,7 @@ def test_callable_classes(shutdown_only):
@pytest.mark.parametrize("pipelined", [False, True]) @pytest.mark.parametrize("pipelined", [False, True])
def test_basic(ray_start_regular_shared, pipelined): def test_basic(ray_start_regular_shared, pipelined):
ds = ray.experimental.data.range(5) ds = ray.data.range(5)
ds = maybe_pipeline(ds, pipelined) ds = maybe_pipeline(ds, pipelined)
assert sorted(ds.map(lambda x: x + 1).take()) == [1, 2, 3, 4, 5] assert sorted(ds.map(lambda x: x + 1).take()) == [1, 2, 3, 4, 5]
assert ds.count() == 5 assert ds.count() == 5
@ -132,7 +132,7 @@ def test_basic(ray_start_regular_shared, pipelined):
# def test_avoid_placement_group_capture(ray_start_regular_shared, pipelined): # def test_avoid_placement_group_capture(ray_start_regular_shared, pipelined):
# @ray.remote # @ray.remote
# def run(): # def run():
# ds = ray.experimental.data.range(5) # ds = ray.data.range(5)
# ds = maybe_pipeline(ds, pipelined) # ds = maybe_pipeline(ds, pipelined)
# assert sorted(ds.map(lambda x: x + 1).take()) == [1, 2, 3, 4, 5] # assert sorted(ds.map(lambda x: x + 1).take()) == [1, 2, 3, 4, 5]
# assert ds.count() == 5 # assert ds.count() == 5
@ -144,8 +144,7 @@ def test_basic(ray_start_regular_shared, pipelined):
def test_batch_tensors(ray_start_regular_shared): def test_batch_tensors(ray_start_regular_shared):
import torch import torch
ds = ray.experimental.data.from_items( ds = ray.data.from_items([torch.tensor([0, 0]) for _ in range(40)])
[torch.tensor([0, 0]) for _ in range(40)])
res = "Dataset(num_blocks=40, num_rows=40, schema=<class 'torch.Tensor'>)" res = "Dataset(num_blocks=40, num_rows=40, schema=<class 'torch.Tensor'>)"
assert str(ds) == res, str(ds) assert str(ds) == res, str(ds)
with pytest.raises(pa.lib.ArrowInvalid): with pytest.raises(pa.lib.ArrowInvalid):
@ -156,7 +155,7 @@ def test_batch_tensors(ray_start_regular_shared):
def test_tensors(ray_start_regular_shared): def test_tensors(ray_start_regular_shared):
# Create directly. # Create directly.
ds = ray.experimental.data.range_tensor(5, shape=(3, 5)) ds = ray.data.range_tensor(5, shape=(3, 5))
assert str(ds) == ("Dataset(num_blocks=5, num_rows=5, " assert str(ds) == ("Dataset(num_blocks=5, num_rows=5, "
"schema=<Tensor: shape=(None, 3, 5), dtype=int64>)") "schema=<Tensor: shape=(None, 3, 5), dtype=int64>)")
@ -228,7 +227,7 @@ def test_read_text(ray_start_regular_shared, tmp_path):
@pytest.mark.parametrize("pipelined", [False, True]) @pytest.mark.parametrize("pipelined", [False, True])
def test_write_datasource(ray_start_regular_shared, pipelined): def test_write_datasource(ray_start_regular_shared, pipelined):
output = DummyOutputDatasource() output = DummyOutputDatasource()
ds = ray.experimental.data.range(10, parallelism=2) ds = ray.data.range(10, parallelism=2)
ds = maybe_pipeline(ds, pipelined) ds = maybe_pipeline(ds, pipelined)
ds.write_datasource(output) ds.write_datasource(output)
if pipelined: if pipelined:
@ -250,20 +249,20 @@ def test_write_datasource(ray_start_regular_shared, pipelined):
def test_empty_dataset(ray_start_regular_shared): def test_empty_dataset(ray_start_regular_shared):
ds = ray.experimental.data.range(0) ds = ray.data.range(0)
assert ds.count() == 0 assert ds.count() == 0
assert ds.size_bytes() is None assert ds.size_bytes() is None
assert ds.schema() is None assert ds.schema() is None
ds = ray.experimental.data.range(1) ds = ray.data.range(1)
ds = ds.filter(lambda x: x > 1) ds = ds.filter(lambda x: x > 1)
assert str(ds) == \ assert str(ds) == \
"Dataset(num_blocks=1, num_rows=0, schema=Unknown schema)" "Dataset(num_blocks=1, num_rows=0, schema=Unknown schema)"
def test_schema(ray_start_regular_shared): def test_schema(ray_start_regular_shared):
ds = ray.experimental.data.range(10) ds = ray.data.range(10)
ds2 = ray.experimental.data.range_arrow(10) ds2 = ray.data.range_arrow(10)
ds3 = ds2.repartition(5) ds3 = ds2.repartition(5)
ds4 = ds3.map(lambda x: {"a": "hi", "b": 1.0}).limit(5).repartition(1) ds4 = ds3.map(lambda x: {"a": "hi", "b": 1.0}).limit(5).repartition(1)
assert str(ds) == \ assert str(ds) == \
@ -277,7 +276,7 @@ def test_schema(ray_start_regular_shared):
def test_lazy_loading_exponential_rampup(ray_start_regular_shared): def test_lazy_loading_exponential_rampup(ray_start_regular_shared):
ds = ray.experimental.data.range(100, parallelism=20) ds = ray.data.range(100, parallelism=20)
assert len(ds._blocks._blocks) == 1 assert len(ds._blocks._blocks) == 1
assert ds.take(10) == list(range(10)) assert ds.take(10) == list(range(10))
assert len(ds._blocks._blocks) == 2 assert len(ds._blocks._blocks) == 2
@ -292,18 +291,18 @@ def test_lazy_loading_exponential_rampup(ray_start_regular_shared):
def test_limit(ray_start_regular_shared): def test_limit(ray_start_regular_shared):
ds = ray.experimental.data.range(100, parallelism=20) ds = ray.data.range(100, parallelism=20)
for i in range(100): for i in range(100):
assert ds.limit(i).take(200) == list(range(i)) assert ds.limit(i).take(200) == list(range(i))
def test_convert_types(ray_start_regular_shared): def test_convert_types(ray_start_regular_shared):
plain_ds = ray.experimental.data.range(1) plain_ds = ray.data.range(1)
arrow_ds = plain_ds.map(lambda x: {"a": x}) arrow_ds = plain_ds.map(lambda x: {"a": x})
assert arrow_ds.take() == [{"a": 0}] assert arrow_ds.take() == [{"a": 0}]
assert "ArrowRow" in arrow_ds.map(lambda x: str(x)).take()[0] assert "ArrowRow" in arrow_ds.map(lambda x: str(x)).take()[0]
arrow_ds = ray.experimental.data.range_arrow(1) arrow_ds = ray.data.range_arrow(1)
assert arrow_ds.map(lambda x: "plain_{}".format(x["value"])).take() \ assert arrow_ds.map(lambda x: "plain_{}".format(x["value"])).take() \
== ["plain_0"] == ["plain_0"]
assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == \ assert arrow_ds.map(lambda x: {"a": (x["value"],)}).take() == \
@ -311,12 +310,12 @@ def test_convert_types(ray_start_regular_shared):
def test_from_items(ray_start_regular_shared): def test_from_items(ray_start_regular_shared):
ds = ray.experimental.data.from_items(["hello", "world"]) ds = ray.data.from_items(["hello", "world"])
assert ds.take() == ["hello", "world"] assert ds.take() == ["hello", "world"]
def test_repartition(ray_start_regular_shared): def test_repartition(ray_start_regular_shared):
ds = ray.experimental.data.range(20, parallelism=10) ds = ray.data.range(20, parallelism=10)
assert ds.num_blocks() == 10 assert ds.num_blocks() == 10
assert ds.sum() == 190 assert ds.sum() == 190
assert ds._block_sizes() == [2] * 10 assert ds._block_sizes() == [2] * 10
@ -332,13 +331,13 @@ def test_repartition(ray_start_regular_shared):
assert ds3.sum() == 190 assert ds3.sum() == 190
ds2._block_sizes() == [2] * 10 + [0] * 10 ds2._block_sizes() == [2] * 10 + [0] * 10
large = ray.experimental.data.range(10000, parallelism=10) large = ray.data.range(10000, parallelism=10)
large = large.repartition(20) large = large.repartition(20)
assert large._block_sizes() == [500] * 20 assert large._block_sizes() == [500] * 20
def test_repartition_arrow(ray_start_regular_shared): def test_repartition_arrow(ray_start_regular_shared):
ds = ray.experimental.data.range_arrow(20, parallelism=10) ds = ray.data.range_arrow(20, parallelism=10)
assert ds.num_blocks() == 10 assert ds.num_blocks() == 10
assert ds.count() == 20 assert ds.count() == 20
assert ds._block_sizes() == [2] * 10 assert ds._block_sizes() == [2] * 10
@ -353,7 +352,7 @@ def test_repartition_arrow(ray_start_regular_shared):
assert ds3.count() == 20 assert ds3.count() == 20
ds2._block_sizes() == [2] * 10 + [0] * 10 ds2._block_sizes() == [2] * 10 + [0] * 10
large = ray.experimental.data.range_arrow(10000, parallelism=10) large = ray.data.range_arrow(10000, parallelism=10)
large = large.repartition(20) large = large.repartition(20)
assert large._block_sizes() == [500] * 20 assert large._block_sizes() == [500] * 20
@ -361,7 +360,7 @@ def test_repartition_arrow(ray_start_regular_shared):
def test_from_pandas(ray_start_regular_shared): def test_from_pandas(ray_start_regular_shared):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.experimental.data.from_pandas([ray.put(df1), ray.put(df2)]) ds = ray.data.from_pandas([ray.put(df1), ray.put(df2)])
values = [(r["one"], r["two"]) for r in ds.take(6)] values = [(r["one"], r["two"]) for r in ds.take(6)]
rows = [(r.one, r.two) for _, r in pd.concat([df1, df2]).iterrows()] rows = [(r.one, r.two) for _, r in pd.concat([df1, df2]).iterrows()]
assert values == rows assert values == rows
@ -370,7 +369,7 @@ def test_from_pandas(ray_start_regular_shared):
def test_from_arrow(ray_start_regular_shared): def test_from_arrow(ray_start_regular_shared):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.experimental.data.from_arrow([ ds = ray.data.from_arrow([
ray.put(pa.Table.from_pandas(df1)), ray.put(pa.Table.from_pandas(df1)),
ray.put(pa.Table.from_pandas(df2)) ray.put(pa.Table.from_pandas(df2))
]) ])
@ -382,7 +381,7 @@ def test_from_arrow(ray_start_regular_shared):
def test_to_pandas(ray_start_regular_shared): def test_to_pandas(ray_start_regular_shared):
n = 5 n = 5
df = pd.DataFrame({"value": list(range(n))}) df = pd.DataFrame({"value": list(range(n))})
ds = ray.experimental.data.range_arrow(n) ds = ray.data.range_arrow(n)
dfds = pd.concat(ray.get(ds.to_pandas()), ignore_index=True) dfds = pd.concat(ray.get(ds.to_pandas()), ignore_index=True)
assert df.equals(dfds) assert df.equals(dfds)
@ -392,21 +391,21 @@ def test_to_arrow(ray_start_regular_shared):
# Zero-copy. # Zero-copy.
df = pd.DataFrame({"value": list(range(n))}) df = pd.DataFrame({"value": list(range(n))})
ds = ray.experimental.data.range_arrow(n) ds = ray.data.range_arrow(n)
dfds = pd.concat( dfds = pd.concat(
[t.to_pandas() for t in ray.get(ds.to_arrow())], ignore_index=True) [t.to_pandas() for t in ray.get(ds.to_arrow())], ignore_index=True)
assert df.equals(dfds) assert df.equals(dfds)
# Conversion. # Conversion.
df = pd.DataFrame({0: list(range(n))}) df = pd.DataFrame({0: list(range(n))})
ds = ray.experimental.data.range(n) ds = ray.data.range(n)
dfds = pd.concat( dfds = pd.concat(
[t.to_pandas() for t in ray.get(ds.to_arrow())], ignore_index=True) [t.to_pandas() for t in ray.get(ds.to_arrow())], ignore_index=True)
assert df.equals(dfds) assert df.equals(dfds)
def test_get_blocks(ray_start_regular_shared): def test_get_blocks(ray_start_regular_shared):
blocks = ray.experimental.data.range(10).get_blocks() blocks = ray.data.range(10).get_blocks()
assert len(blocks) == 10 assert len(blocks) == 10
out = [] out = []
for b in ray.get(blocks): for b in ray.get(blocks):
@ -418,7 +417,7 @@ def test_get_blocks(ray_start_regular_shared):
def test_pandas_roundtrip(ray_start_regular_shared, tmp_path): def test_pandas_roundtrip(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.experimental.data.from_pandas([ray.put(df1), ray.put(df2)]) ds = ray.data.from_pandas([ray.put(df1), ray.put(df2)])
dfds = pd.concat(ray.get(ds.to_pandas())) dfds = pd.concat(ray.get(ds.to_pandas()))
assert pd.concat([df1, df2]).equals(dfds) assert pd.concat([df1, df2]).equals(dfds)
@ -441,7 +440,7 @@ def test_fsspec_filesystem(ray_start_regular_shared, tmp_path):
fs = LocalFileSystem() fs = LocalFileSystem()
ds = ray.experimental.data.read_parquet([path1, path2], filesystem=fs) ds = ray.data.read_parquet([path1, path2], filesystem=fs)
# Test metadata-only parquet ops. # Test metadata-only parquet ops.
assert len(ds._blocks._blocks) == 1 assert len(ds._blocks._blocks) == 1
@ -456,7 +455,7 @@ def test_parquet_read(ray_start_regular_shared, tmp_path):
table = pa.Table.from_pandas(df2) table = pa.Table.from_pandas(df2)
pq.write_table(table, os.path.join(str(tmp_path), "test2.parquet")) pq.write_table(table, os.path.join(str(tmp_path), "test2.parquet"))
ds = ray.experimental.data.read_parquet(str(tmp_path)) ds = ray.data.read_parquet(str(tmp_path))
# Test metadata-only parquet ops. # Test metadata-only parquet ops.
assert len(ds._blocks._blocks) == 1 assert len(ds._blocks._blocks) == 1
@ -482,7 +481,7 @@ def test_parquet_read(ray_start_regular_shared, tmp_path):
[6, "g"]] [6, "g"]]
# Test column selection. # Test column selection.
ds = ray.experimental.data.read_parquet(str(tmp_path), columns=["one"]) ds = ray.data.read_parquet(str(tmp_path), columns=["one"])
values = [s["one"] for s in ds.take()] values = [s["one"] for s in ds.take()]
assert sorted(values) == [1, 2, 3, 4, 5, 6] assert sorted(values) == [1, 2, 3, 4, 5, 6]
@ -491,7 +490,7 @@ def test_parquet_write(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
df = pd.concat([df1, df2]) df = pd.concat([df1, df2])
ds = ray.experimental.data.from_pandas([ray.put(df1), ray.put(df2)]) ds = ray.data.from_pandas([ray.put(df1), ray.put(df2)])
path = os.path.join(tmp_path, "test_parquet_dir") path = os.path.join(tmp_path, "test_parquet_dir")
os.mkdir(path) os.mkdir(path)
ds._set_uuid("data") ds._set_uuid("data")
@ -503,16 +502,16 @@ def test_parquet_write(ray_start_regular_shared, tmp_path):
def test_convert_to_pyarrow(ray_start_regular_shared, tmp_path): def test_convert_to_pyarrow(ray_start_regular_shared, tmp_path):
ds = ray.experimental.data.range(100) ds = ray.data.range(100)
assert ds.to_dask().sum().compute()[0] == 4950 assert ds.to_dask().sum().compute()[0] == 4950
path = os.path.join(tmp_path, "test_parquet_dir") path = os.path.join(tmp_path, "test_parquet_dir")
os.mkdir(path) os.mkdir(path)
ds.write_parquet(path) ds.write_parquet(path)
assert ray.experimental.data.read_parquet(path).count() == 100 assert ray.data.read_parquet(path).count() == 100
def test_pyarrow(ray_start_regular_shared): def test_pyarrow(ray_start_regular_shared):
ds = ray.experimental.data.range_arrow(5) ds = ray.data.range_arrow(5)
assert ds.map(lambda x: {"b": x["value"] + 2}).take() == \ assert ds.map(lambda x: {"b": x["value"] + 2}).take() == \
[{"b": 2}, {"b": 3}, {"b": 4}, {"b": 5}, {"b": 6}] [{"b": 2}, {"b": 3}, {"b": 4}, {"b": 5}, {"b": 6}]
assert ds.map(lambda x: {"b": x["value"] + 2}) \ assert ds.map(lambda x: {"b": x["value"] + 2}) \
@ -525,7 +524,7 @@ def test_pyarrow(ray_start_regular_shared):
def test_read_binary_files(ray_start_regular_shared): def test_read_binary_files(ray_start_regular_shared):
with util.gen_bin_files(10) as (_, paths): with util.gen_bin_files(10) as (_, paths):
ds = ray.experimental.data.read_binary_files(paths, parallelism=10) ds = ray.data.read_binary_files(paths, parallelism=10)
for i, item in enumerate(ds.iter_rows()): for i, item in enumerate(ds.iter_rows()):
expected = open(paths[i], "rb").read() expected = open(paths[i], "rb").read()
assert expected == item assert expected == item
@ -539,8 +538,7 @@ def test_read_binary_files_with_fs(ray_start_regular_shared):
with util.gen_bin_files(10) as (tempdir, paths): with util.gen_bin_files(10) as (tempdir, paths):
# All the paths are absolute, so we want the root file system. # All the paths are absolute, so we want the root file system.
fs, _ = pa.fs.FileSystem.from_uri("/") fs, _ = pa.fs.FileSystem.from_uri("/")
ds = ray.experimental.data.read_binary_files( ds = ray.data.read_binary_files(paths, filesystem=fs, parallelism=10)
paths, filesystem=fs, parallelism=10)
for i, item in enumerate(ds.iter_rows()): for i, item in enumerate(ds.iter_rows()):
expected = open(paths[i], "rb").read() expected = open(paths[i], "rb").read()
assert expected == item assert expected == item
@ -548,7 +546,7 @@ def test_read_binary_files_with_fs(ray_start_regular_shared):
def test_read_binary_files_with_paths(ray_start_regular_shared): def test_read_binary_files_with_paths(ray_start_regular_shared):
with util.gen_bin_files(10) as (_, paths): with util.gen_bin_files(10) as (_, paths):
ds = ray.experimental.data.read_binary_files( ds = ray.data.read_binary_files(
paths, include_paths=True, parallelism=10) paths, include_paths=True, parallelism=10)
for i, (path, item) in enumerate(ds.iter_rows()): for i, (path, item) in enumerate(ds.iter_rows()):
assert path == paths[i] assert path == paths[i]
@ -560,8 +558,7 @@ def test_read_binary_files_with_paths(ray_start_regular_shared):
# credentials issue, unskip this test once that's fixed or once ported to moto. # credentials issue, unskip this test once that's fixed or once ported to moto.
@pytest.mark.skip(reason="Shouldn't hit S3 in CI") @pytest.mark.skip(reason="Shouldn't hit S3 in CI")
def test_read_binary_files_s3(ray_start_regular_shared): def test_read_binary_files_s3(ray_start_regular_shared):
ds = ray.experimental.data.read_binary_files( ds = ray.data.read_binary_files(["s3://anyscale-data/small-files/0.dat"])
["s3://anyscale-data/small-files/0.dat"])
item = ds.take(1).pop() item = ds.take(1).pop()
expected = requests.get( expected = requests.get(
"https://anyscale-data.s3.us-west-2.amazonaws.com/small-files/0.dat" "https://anyscale-data.s3.us-west-2.amazonaws.com/small-files/0.dat"
@ -575,7 +572,7 @@ def test_iter_batches_basic(ray_start_regular_shared):
df3 = pd.DataFrame({"one": [7, 8, 9], "two": [8, 9, 10]}) df3 = pd.DataFrame({"one": [7, 8, 9], "two": [8, 9, 10]})
df4 = pd.DataFrame({"one": [10, 11, 12], "two": [11, 12, 13]}) df4 = pd.DataFrame({"one": [10, 11, 12], "two": [11, 12, 13]})
dfs = [df1, df2, df3, df4] dfs = [df1, df2, df3, df4]
ds = ray.experimental.data.from_pandas( ds = ray.data.from_pandas(
[ray.put(df1), ray.put(df2), [ray.put(df1), ray.put(df2),
ray.put(df3), ray.put(df4)]) ray.put(df3), ray.put(df4)])
@ -677,7 +674,7 @@ def test_iter_batches_grid(ray_start_regular_shared):
})) }))
running_size += block_size running_size += block_size
num_rows = running_size num_rows = running_size
ds = ray.experimental.data.from_pandas([ray.put(df) for df in dfs]) ds = ray.data.from_pandas([ray.put(df) for df in dfs])
for batch_size in np.random.randint( for batch_size in np.random.randint(
1, num_rows + 1, size=batch_size_samples): 1, num_rows + 1, size=batch_size_samples):
for drop_last in (False, True): for drop_last in (False, True):
@ -720,7 +717,7 @@ def test_iter_batches_grid(ray_start_regular_shared):
def test_lazy_loading_iter_batches_exponential_rampup( def test_lazy_loading_iter_batches_exponential_rampup(
ray_start_regular_shared): ray_start_regular_shared):
ds = ray.experimental.data.range(32, parallelism=8) ds = ray.data.range(32, parallelism=8)
expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8] expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8]
for _, expected in zip(ds.iter_batches(), expected_num_blocks): for _, expected in zip(ds.iter_batches(), expected_num_blocks):
assert len(ds._blocks._blocks) == expected assert len(ds._blocks._blocks) == expected
@ -728,7 +725,7 @@ def test_lazy_loading_iter_batches_exponential_rampup(
def test_map_batch(ray_start_regular_shared, tmp_path): def test_map_batch(ray_start_regular_shared, tmp_path):
# Test input validation # Test input validation
ds = ray.experimental.data.range(5) ds = ray.data.range(5)
with pytest.raises(ValueError): with pytest.raises(ValueError):
ds.map_batches( ds.map_batches(
lambda x: x + 1, batch_format="pyarrow", batch_size=-1).take() lambda x: x + 1, batch_format="pyarrow", batch_size=-1).take()
@ -737,7 +734,7 @@ def test_map_batch(ray_start_regular_shared, tmp_path):
df = pd.DataFrame({"one": [1, 2, 3], "two": [2, 3, 4]}) df = pd.DataFrame({"one": [1, 2, 3], "two": [2, 3, 4]})
table = pa.Table.from_pandas(df) table = pa.Table.from_pandas(df)
pq.write_table(table, os.path.join(tmp_path, "test1.parquet")) pq.write_table(table, os.path.join(tmp_path, "test1.parquet"))
ds = ray.experimental.data.read_parquet(str(tmp_path)) ds = ray.data.read_parquet(str(tmp_path))
ds_list = ds.map_batches( ds_list = ds.map_batches(
lambda df: df + 1, batch_size=1, batch_format="pandas").take() lambda df: df + 1, batch_size=1, batch_format="pandas").take()
values = [s["one"] for s in ds_list] values = [s["one"] for s in ds_list]
@ -746,7 +743,7 @@ def test_map_batch(ray_start_regular_shared, tmp_path):
assert values == [3, 4, 5] assert values == [3, 4, 5]
# Test Pyarrow # Test Pyarrow
ds = ray.experimental.data.read_parquet(str(tmp_path)) ds = ray.data.read_parquet(str(tmp_path))
ds_list = ds.map_batches( ds_list = ds.map_batches(
lambda pa: pa, batch_size=1, batch_format="pyarrow").take() lambda pa: pa, batch_size=1, batch_format="pyarrow").take()
values = [s["one"] for s in ds_list] values = [s["one"] for s in ds_list]
@ -756,7 +753,7 @@ def test_map_batch(ray_start_regular_shared, tmp_path):
# Test batch # Test batch
size = 300 size = 300
ds = ray.experimental.data.range(size) ds = ray.data.range(size)
ds_list = ds.map_batches( ds_list = ds.map_batches(
lambda df: df + 1, batch_size=17, lambda df: df + 1, batch_size=17,
batch_format="pandas").take(limit=size) batch_format="pandas").take(limit=size)
@ -769,27 +766,27 @@ def test_map_batch(ray_start_regular_shared, tmp_path):
# Test the lambda returns different types than the batch_format # Test the lambda returns different types than the batch_format
# pandas => list block # pandas => list block
ds = ray.experimental.data.read_parquet(str(tmp_path)) ds = ray.data.read_parquet(str(tmp_path))
ds_list = ds.map_batches(lambda df: [1], batch_size=1).take() ds_list = ds.map_batches(lambda df: [1], batch_size=1).take()
assert ds_list == [1, 1, 1] assert ds_list == [1, 1, 1]
assert ds.count() == 3 assert ds.count() == 3
# pyarrow => list block # pyarrow => list block
ds = ray.experimental.data.read_parquet(str(tmp_path)) ds = ray.data.read_parquet(str(tmp_path))
ds_list = ds.map_batches( ds_list = ds.map_batches(
lambda df: [1], batch_size=1, batch_format="pyarrow").take() lambda df: [1], batch_size=1, batch_format="pyarrow").take()
assert ds_list == [1, 1, 1] assert ds_list == [1, 1, 1]
assert ds.count() == 3 assert ds.count() == 3
# Test the wrong return value raises an exception. # Test the wrong return value raises an exception.
ds = ray.experimental.data.read_parquet(str(tmp_path)) ds = ray.data.read_parquet(str(tmp_path))
with pytest.raises(ValueError): with pytest.raises(ValueError):
ds_list = ds.map_batches( ds_list = ds.map_batches(
lambda df: 1, batch_size=2, batch_format="pyarrow").take() lambda df: 1, batch_size=2, batch_format="pyarrow").take()
def test_split(ray_start_regular_shared): def test_split(ray_start_regular_shared):
ds = ray.experimental.data.range(20, parallelism=10) ds = ray.data.range(20, parallelism=10)
assert ds.num_blocks() == 10 assert ds.num_blocks() == 10
assert ds.sum() == 190 assert ds.sum() == 190
assert ds._block_sizes() == [2] * 10 assert ds._block_sizes() == [2] * 10
@ -839,7 +836,7 @@ def test_split_hints(ray_start_regular_shared):
datasets[1] contains block 2. datasets[1] contains block 2.
""" """
num_blocks = len(block_node_ids) num_blocks = len(block_node_ids)
ds = ray.experimental.data.range(num_blocks, parallelism=num_blocks) ds = ray.data.range(num_blocks, parallelism=num_blocks)
blocks = list(ds._blocks) blocks = list(ds._blocks)
assert len(block_node_ids) == len(blocks) assert len(block_node_ids) == len(blocks)
actors = [Actor.remote() for i in range(len(actor_node_ids))] actors = [Actor.remote() for i in range(len(actor_node_ids))]
@ -921,7 +918,7 @@ def test_from_dask(ray_start_regular_shared):
import dask.dataframe as dd import dask.dataframe as dd
df = pd.DataFrame({"one": list(range(100)), "two": list(range(100))}) df = pd.DataFrame({"one": list(range(100)), "two": list(range(100))})
ddf = dd.from_pandas(df, npartitions=10) ddf = dd.from_pandas(df, npartitions=10)
ds = ray.experimental.data.from_dask(ddf) ds = ray.data.from_dask(ddf)
dfds = pd.concat(ray.get(ds.to_pandas())) dfds = pd.concat(ray.get(ds.to_pandas()))
assert df.equals(dfds) assert df.equals(dfds)
@ -931,7 +928,7 @@ def test_to_dask(ray_start_regular_shared):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
df = pd.concat([df1, df2]) df = pd.concat([df1, df2])
ds = ray.experimental.data.from_pandas([ray.put(df1), ray.put(df2)]) ds = ray.data.from_pandas([ray.put(df1), ray.put(df2)])
ddf = ds.to_dask() ddf = ds.to_dask()
# Explicit Dask-on-Ray # Explicit Dask-on-Ray
assert df.equals(ddf.compute(scheduler=ray_dask_get)) assert df.equals(ddf.compute(scheduler=ray_dask_get))
@ -954,8 +951,7 @@ def test_to_tf(ray_start_regular_shared, pipelined):
}) })
df3 = pd.DataFrame({"one": [7, 8], "two": [7.0, 8.0], "label": [7.0, 8.0]}) df3 = pd.DataFrame({"one": [7, 8], "two": [7.0, 8.0], "label": [7.0, 8.0]})
df = pd.concat([df1, df2, df3]) df = pd.concat([df1, df2, df3])
ds = ray.experimental.data.from_pandas( ds = ray.data.from_pandas([ray.put(df1), ray.put(df2), ray.put(df3)])
[ray.put(df1), ray.put(df2), ray.put(df3)])
ds = maybe_pipeline(ds, pipelined) ds = maybe_pipeline(ds, pipelined)
tfd = ds.to_tf( tfd = ds.to_tf(
label_column="label", label_column="label",
@ -983,8 +979,7 @@ def test_to_tf_feature_columns(ray_start_regular_shared):
}) })
df3 = pd.DataFrame({"one": [7, 8], "two": [7.0, 8.0], "label": [7.0, 8.0]}) df3 = pd.DataFrame({"one": [7, 8], "two": [7.0, 8.0], "label": [7.0, 8.0]})
df = pd.concat([df1, df2, df3]).drop("two", axis=1) df = pd.concat([df1, df2, df3]).drop("two", axis=1)
ds = ray.experimental.data.from_pandas( ds = ray.data.from_pandas([ray.put(df1), ray.put(df2), ray.put(df3)])
[ray.put(df1), ray.put(df2), ray.put(df3)])
tfd = ds.to_tf( tfd = ds.to_tf(
label_column="label", label_column="label",
feature_columns=["one"], feature_columns=["one"],
@ -1013,8 +1008,7 @@ def test_to_torch(ray_start_regular_shared, pipelined):
}) })
df3 = pd.DataFrame({"one": [7, 8], "two": [7.0, 8.0], "label": [7.0, 8.0]}) df3 = pd.DataFrame({"one": [7, 8], "two": [7.0, 8.0], "label": [7.0, 8.0]})
df = pd.concat([df1, df2, df3]) df = pd.concat([df1, df2, df3])
ds = ray.experimental.data.from_pandas( ds = ray.data.from_pandas([ray.put(df1), ray.put(df2), ray.put(df3)])
[ray.put(df1), ray.put(df2), ray.put(df3)])
ds = maybe_pipeline(ds, pipelined) ds = maybe_pipeline(ds, pipelined)
torchd = ds.to_torch(label_column="label", batch_size=3) torchd = ds.to_torch(label_column="label", batch_size=3)
@ -1041,8 +1035,7 @@ def test_to_torch_feature_columns(ray_start_regular_shared):
}) })
df3 = pd.DataFrame({"one": [7, 8], "two": [7.0, 8.0], "label": [7.0, 8.0]}) df3 = pd.DataFrame({"one": [7, 8], "two": [7.0, 8.0], "label": [7.0, 8.0]})
df = pd.concat([df1, df2, df3]).drop("two", axis=1) df = pd.concat([df1, df2, df3]).drop("two", axis=1)
ds = ray.experimental.data.from_pandas( ds = ray.data.from_pandas([ray.put(df1), ray.put(df2), ray.put(df3)])
[ray.put(df1), ray.put(df2), ray.put(df3)])
torchd = ds.to_torch( torchd = ds.to_torch(
label_column="label", feature_columns=["one"], batch_size=3) label_column="label", feature_columns=["one"], batch_size=3)
iterations = [] iterations = []
@ -1058,7 +1051,7 @@ def test_json_read(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path1 = os.path.join(tmp_path, "test1.json") path1 = os.path.join(tmp_path, "test1.json")
df1.to_json(path1, orient="records", lines=True) df1.to_json(path1, orient="records", lines=True)
ds = ray.experimental.data.read_json(path1) ds = ray.data.read_json(path1)
assert df1.equals(ray.get(ds.to_pandas())[0]) assert df1.equals(ray.get(ds.to_pandas())[0])
# Test metadata ops. # Test metadata ops.
assert ds.count() == 3 assert ds.count() == 3
@ -1069,7 +1062,7 @@ def test_json_read(ray_start_regular_shared, tmp_path):
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
path2 = os.path.join(tmp_path, "test2.json") path2 = os.path.join(tmp_path, "test2.json")
df2.to_json(path2, orient="records", lines=True) df2.to_json(path2, orient="records", lines=True)
ds = ray.experimental.data.read_json([path1, path2], parallelism=2) ds = ray.data.read_json([path1, path2], parallelism=2)
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
assert pd.concat([df1, df2]).equals(dsdf) assert pd.concat([df1, df2]).equals(dsdf)
# Test metadata ops. # Test metadata ops.
@ -1081,7 +1074,7 @@ def test_json_read(ray_start_regular_shared, tmp_path):
path3 = os.path.join(tmp_path, "test3.json") path3 = os.path.join(tmp_path, "test3.json")
df3.to_json(path3, orient="records", lines=True) df3.to_json(path3, orient="records", lines=True)
df = pd.concat([df1, df2, df3], ignore_index=True) df = pd.concat([df1, df2, df3], ignore_index=True)
ds = ray.experimental.data.read_json([path1, path2, path3], parallelism=2) ds = ray.data.read_json([path1, path2, path3], parallelism=2)
dsdf = pd.concat(ray.get(ds.to_pandas()), ignore_index=True) dsdf = pd.concat(ray.get(ds.to_pandas()), ignore_index=True)
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1094,7 +1087,7 @@ def test_json_read(ray_start_regular_shared, tmp_path):
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
path2 = os.path.join(path, "data1.json") path2 = os.path.join(path, "data1.json")
df2.to_json(path2, orient="records", lines=True) df2.to_json(path2, orient="records", lines=True)
ds = ray.experimental.data.read_json(path) ds = ray.data.read_json(path)
df = pd.concat([df1, df2]) df = pd.concat([df1, df2])
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1114,7 +1107,7 @@ def test_json_read(ray_start_regular_shared, tmp_path):
df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]}) df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]})
file_path3 = os.path.join(path2, "data2.json") file_path3 = os.path.join(path2, "data2.json")
df3.to_json(file_path3, orient="records", lines=True) df3.to_json(file_path3, orient="records", lines=True)
ds = ray.experimental.data.read_json([path1, path2]) ds = ray.data.read_json([path1, path2])
df = pd.concat([df1, df2, df3]) df = pd.concat([df1, df2, df3])
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1130,7 +1123,7 @@ def test_json_read(ray_start_regular_shared, tmp_path):
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
path2 = os.path.join(tmp_path, "data1.json") path2 = os.path.join(tmp_path, "data1.json")
df2.to_json(path2, orient="records", lines=True) df2.to_json(path2, orient="records", lines=True)
ds = ray.experimental.data.read_json([dir_path, path2]) ds = ray.data.read_json([dir_path, path2])
df = pd.concat([df1, df2]) df = pd.concat([df1, df2])
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1142,7 +1135,7 @@ def test_zipped_json_read(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path1 = os.path.join(tmp_path, "test1.json.gz") path1 = os.path.join(tmp_path, "test1.json.gz")
df1.to_json(path1, compression="gzip", orient="records", lines=True) df1.to_json(path1, compression="gzip", orient="records", lines=True)
ds = ray.experimental.data.read_json(path1) ds = ray.data.read_json(path1)
assert df1.equals(ray.get(ds.to_pandas())[0]) assert df1.equals(ray.get(ds.to_pandas())[0])
# Test metadata ops. # Test metadata ops.
assert ds.count() == 3 assert ds.count() == 3
@ -1152,7 +1145,7 @@ def test_zipped_json_read(ray_start_regular_shared, tmp_path):
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
path2 = os.path.join(tmp_path, "test2.json.gz") path2 = os.path.join(tmp_path, "test2.json.gz")
df2.to_json(path2, compression="gzip", orient="records", lines=True) df2.to_json(path2, compression="gzip", orient="records", lines=True)
ds = ray.experimental.data.read_json([path1, path2], parallelism=2) ds = ray.data.read_json([path1, path2], parallelism=2)
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
assert pd.concat([df1, df2]).equals(dsdf) assert pd.concat([df1, df2]).equals(dsdf)
# Test metadata ops. # Test metadata ops.
@ -1168,7 +1161,7 @@ def test_zipped_json_read(ray_start_regular_shared, tmp_path):
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
path2 = os.path.join(tmp_path, "data1.json.gz") path2 = os.path.join(tmp_path, "data1.json.gz")
df2.to_json(path2, compression="gzip", orient="records", lines=True) df2.to_json(path2, compression="gzip", orient="records", lines=True)
ds = ray.experimental.data.read_json([dir_path, path2]) ds = ray.data.read_json([dir_path, path2])
df = pd.concat([df1, df2]) df = pd.concat([df1, df2])
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1181,7 +1174,7 @@ def test_json_write(ray_start_regular_shared, tmp_path):
# Single block. # Single block.
os.mkdir(path) os.mkdir(path)
df = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.experimental.data.from_pandas([ray.put(df)]) ds = ray.data.from_pandas([ray.put(df)])
ds._set_uuid("data") ds._set_uuid("data")
ds.write_json(path) ds.write_json(path)
file_path = os.path.join(path, "data_000000.json") file_path = os.path.join(path, "data_000000.json")
@ -1191,7 +1184,7 @@ def test_json_write(ray_start_regular_shared, tmp_path):
# Two blocks. # Two blocks.
os.mkdir(path) os.mkdir(path)
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.experimental.data.from_pandas([ray.put(df), ray.put(df2)]) ds = ray.data.from_pandas([ray.put(df), ray.put(df2)])
ds._set_uuid("data") ds._set_uuid("data")
ds.write_json(path) ds.write_json(path)
file_path2 = os.path.join(path, "data_000001.json") file_path2 = os.path.join(path, "data_000001.json")
@ -1206,7 +1199,7 @@ def test_csv_read(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
path1 = os.path.join(tmp_path, "test1.csv") path1 = os.path.join(tmp_path, "test1.csv")
df1.to_csv(path1, index=False) df1.to_csv(path1, index=False)
ds = ray.experimental.data.read_csv(path1) ds = ray.data.read_csv(path1)
dsdf = ray.get(ds.to_pandas())[0] dsdf = ray.get(ds.to_pandas())[0]
assert df1.equals(dsdf) assert df1.equals(dsdf)
# Test metadata ops. # Test metadata ops.
@ -1218,7 +1211,7 @@ def test_csv_read(ray_start_regular_shared, tmp_path):
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
path2 = os.path.join(tmp_path, "test2.csv") path2 = os.path.join(tmp_path, "test2.csv")
df2.to_csv(path2, index=False) df2.to_csv(path2, index=False)
ds = ray.experimental.data.read_csv([path1, path2], parallelism=2) ds = ray.data.read_csv([path1, path2], parallelism=2)
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
df = pd.concat([df1, df2]) df = pd.concat([df1, df2])
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1230,7 +1223,7 @@ def test_csv_read(ray_start_regular_shared, tmp_path):
df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]}) df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]})
path3 = os.path.join(tmp_path, "test3.csv") path3 = os.path.join(tmp_path, "test3.csv")
df3.to_csv(path3, index=False) df3.to_csv(path3, index=False)
ds = ray.experimental.data.read_csv([path1, path2, path3], parallelism=2) ds = ray.data.read_csv([path1, path2, path3], parallelism=2)
df = pd.concat([df1, df2, df3], ignore_index=True) df = pd.concat([df1, df2, df3], ignore_index=True)
dsdf = pd.concat(ray.get(ds.to_pandas()), ignore_index=True) dsdf = pd.concat(ray.get(ds.to_pandas()), ignore_index=True)
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1244,7 +1237,7 @@ def test_csv_read(ray_start_regular_shared, tmp_path):
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
path2 = os.path.join(path, "data1.csv") path2 = os.path.join(path, "data1.csv")
df2.to_csv(path2, index=False) df2.to_csv(path2, index=False)
ds = ray.experimental.data.read_csv(path) ds = ray.data.read_csv(path)
df = pd.concat([df1, df2]) df = pd.concat([df1, df2])
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1264,7 +1257,7 @@ def test_csv_read(ray_start_regular_shared, tmp_path):
df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]}) df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]})
file_path3 = os.path.join(path2, "data2.csv") file_path3 = os.path.join(path2, "data2.csv")
df3.to_csv(file_path3, index=False) df3.to_csv(file_path3, index=False)
ds = ray.experimental.data.read_csv([path1, path2]) ds = ray.data.read_csv([path1, path2])
df = pd.concat([df1, df2, df3]) df = pd.concat([df1, df2, df3])
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1280,7 +1273,7 @@ def test_csv_read(ray_start_regular_shared, tmp_path):
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
path2 = os.path.join(tmp_path, "data1.csv") path2 = os.path.join(tmp_path, "data1.csv")
df2.to_csv(path2, index=False) df2.to_csv(path2, index=False)
ds = ray.experimental.data.read_csv([dir_path, path2]) ds = ray.data.read_csv([dir_path, path2])
df = pd.concat([df1, df2]) df = pd.concat([df1, df2])
dsdf = pd.concat(ray.get(ds.to_pandas())) dsdf = pd.concat(ray.get(ds.to_pandas()))
assert df.equals(dsdf) assert df.equals(dsdf)
@ -1293,7 +1286,7 @@ def test_csv_write(ray_start_regular_shared, tmp_path):
# Single block. # Single block.
os.mkdir(path) os.mkdir(path)
df = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.experimental.data.from_pandas([ray.put(df)]) ds = ray.data.from_pandas([ray.put(df)])
ds._set_uuid("data") ds._set_uuid("data")
ds.write_csv(path) ds.write_csv(path)
file_path = os.path.join(path, "data_000000.csv") file_path = os.path.join(path, "data_000000.csv")
@ -1303,7 +1296,7 @@ def test_csv_write(ray_start_regular_shared, tmp_path):
# Two blocks. # Two blocks.
os.mkdir(path) os.mkdir(path)
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
ds = ray.experimental.data.from_pandas([ray.put(df), ray.put(df2)]) ds = ray.data.from_pandas([ray.put(df), ray.put(df2)])
ds._set_uuid("data") ds._set_uuid("data")
ds.write_csv(path) ds.write_csv(path)
file_path2 = os.path.join(path, "data_000001.csv") file_path2 = os.path.join(path, "data_000001.csv")
@ -1318,7 +1311,7 @@ def test_sort_simple(ray_start_regular_shared):
parallelism = 4 parallelism = 4
xs = list(range(num_items)) xs = list(range(num_items))
random.shuffle(xs) random.shuffle(xs)
ds = ray.experimental.data.from_items(xs, parallelism=parallelism) ds = ray.data.from_items(xs, parallelism=parallelism)
assert ds.sort().take(num_items) == list(range(num_items)) assert ds.sort().take(num_items) == list(range(num_items))
assert ds.sort(descending=True).take(num_items) == list( assert ds.sort(descending=True).take(num_items) == list(
reversed(range(num_items))) reversed(range(num_items)))
@ -1380,7 +1373,7 @@ def test_sort_arrow(ray_start_regular_shared, num_items, parallelism):
offset += shard offset += shard
if offset < num_items: if offset < num_items:
dfs.append(pd.DataFrame({"a": a[offset:], "b": b[offset:]})) dfs.append(pd.DataFrame({"a": a[offset:], "b": b[offset:]}))
ds = ray.experimental.data.from_pandas([ray.put(df) for df in dfs]) ds = ray.data.from_pandas([ray.put(df) for df in dfs])
def assert_sorted(sorted_ds, expected_rows): def assert_sorted(sorted_ds, expected_rows):
assert [tuple(row.values()) assert [tuple(row.values())

View file

@ -5,14 +5,14 @@ import pytest
import pandas as pd import pandas as pd
import ray import ray
from ray.experimental.data.dataset_pipeline import DatasetPipeline from ray.data.dataset_pipeline import DatasetPipeline
from ray.tests.conftest import * # noqa from ray.tests.conftest import * # noqa
def test_pipeline_actors(shutdown_only): def test_pipeline_actors(shutdown_only):
ray.init(num_cpus=2, num_gpus=1) ray.init(num_cpus=2, num_gpus=1)
pipe = ray.experimental.data.range(3) \ pipe = ray.data.range(3) \
.repeat(10) \ .repeat(10) \
.map(lambda x: x + 1) \ .map(lambda x: x + 1) \
.map(lambda x: x + 1, compute="actors", num_gpus=1) .map(lambda x: x + 1, compute="actors", num_gpus=1)
@ -29,13 +29,13 @@ def test_incremental_take(shutdown_only):
time.sleep(999999) time.sleep(999999)
return x return x
pipe = ray.experimental.data.range(2).pipeline(parallelism=1) pipe = ray.data.range(2).pipeline(parallelism=1)
pipe = pipe.map(block_on_ones) pipe = pipe.map(block_on_ones)
assert pipe.take(1) == [0] assert pipe.take(1) == [0]
def test_basic_pipeline(ray_start_regular_shared): def test_basic_pipeline(ray_start_regular_shared):
ds = ray.experimental.data.range(10) ds = ray.data.range(10)
pipe = ds.pipeline(parallelism=1) pipe = ds.pipeline(parallelism=1)
assert str(pipe) == "DatasetPipeline(length=10, num_stages=1)" assert str(pipe) == "DatasetPipeline(length=10, num_stages=1)"
@ -64,7 +64,7 @@ def test_from_iterable(ray_start_regular_shared):
def test_repeat_forever(ray_start_regular_shared): def test_repeat_forever(ray_start_regular_shared):
ds = ray.experimental.data.range(10) ds = ray.data.range(10)
pipe = ds.repeat() pipe = ds.repeat()
assert str(pipe) == "DatasetPipeline(length=None, num_stages=1)" assert str(pipe) == "DatasetPipeline(length=None, num_stages=1)"
for i, v in enumerate(pipe.iter_rows()): for i, v in enumerate(pipe.iter_rows()):
@ -74,42 +74,42 @@ def test_repeat_forever(ray_start_regular_shared):
def test_repartition(ray_start_regular_shared): def test_repartition(ray_start_regular_shared):
pipe = ray.experimental.data.range(10).repeat(10) pipe = ray.data.range(10).repeat(10)
assert pipe.repartition(1).sum() == 450 assert pipe.repartition(1).sum() == 450
assert pipe.repartition(10).sum() == 450 assert pipe.repartition(10).sum() == 450
assert pipe.repartition(100).sum() == 450 assert pipe.repartition(100).sum() == 450
def test_iter_batches(ray_start_regular_shared): def test_iter_batches(ray_start_regular_shared):
pipe = ray.experimental.data.range(10).pipeline(parallelism=2) pipe = ray.data.range(10).pipeline(parallelism=2)
batches = list(pipe.iter_batches()) batches = list(pipe.iter_batches())
assert len(batches) == 10 assert len(batches) == 10
assert all(len(e) == 1 for e in batches) assert all(len(e) == 1 for e in batches)
def test_iter_datasets(ray_start_regular_shared): def test_iter_datasets(ray_start_regular_shared):
pipe = ray.experimental.data.range(10).pipeline(parallelism=2) pipe = ray.data.range(10).pipeline(parallelism=2)
ds = list(pipe.iter_datasets()) ds = list(pipe.iter_datasets())
assert len(ds) == 5 assert len(ds) == 5
pipe = ray.experimental.data.range(10).pipeline(parallelism=5) pipe = ray.data.range(10).pipeline(parallelism=5)
ds = list(pipe.iter_datasets()) ds = list(pipe.iter_datasets())
assert len(ds) == 2 assert len(ds) == 2
def test_foreach_dataset(ray_start_regular_shared): def test_foreach_dataset(ray_start_regular_shared):
pipe = ray.experimental.data.range(5).pipeline(parallelism=2) pipe = ray.data.range(5).pipeline(parallelism=2)
pipe = pipe.foreach_dataset(lambda ds: ds.map(lambda x: x * 2)) pipe = pipe.foreach_dataset(lambda ds: ds.map(lambda x: x * 2))
assert pipe.take() == [0, 2, 4, 6, 8] assert pipe.take() == [0, 2, 4, 6, 8]
def test_schema(ray_start_regular_shared): def test_schema(ray_start_regular_shared):
pipe = ray.experimental.data.range(5).pipeline(parallelism=2) pipe = ray.data.range(5).pipeline(parallelism=2)
assert pipe.schema() == int assert pipe.schema() == int
def test_split(ray_start_regular_shared): def test_split(ray_start_regular_shared):
pipe = ray.experimental.data.range(3) \ pipe = ray.data.range(3) \
.map(lambda x: x + 1) \ .map(lambda x: x + 1) \
.repeat(10) .repeat(10)
@ -130,7 +130,7 @@ def test_parquet_write(ray_start_regular_shared, tmp_path):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]})
df = pd.concat([df1, df2]) df = pd.concat([df1, df2])
ds = ray.experimental.data.from_pandas([ray.put(df1), ray.put(df2)]) ds = ray.data.from_pandas([ray.put(df1), ray.put(df2)])
ds = ds.pipeline(parallelism=1) ds = ds.pipeline(parallelism=1)
path = os.path.join(tmp_path, "test_parquet_dir") path = os.path.join(tmp_path, "test_parquet_dir")
os.mkdir(path) os.mkdir(path)

View file

@ -1,5 +1,4 @@
from ray.experimental.dynamic_resources import set_resource from ray.experimental.dynamic_resources import set_resource
from ray.experimental.packaging.load_package import load_package from ray.experimental.packaging.load_package import load_package
from ray.experimental.locations import get_object_locations from ray.experimental.locations import get_object_locations
from ray.experimental import data __all__ = ["get_object_locations", "set_resource", "load_package"]
__all__ = ["get_object_locations", "set_resource", "load_package", "data"]

View file

@ -1,25 +0,0 @@
from ray.experimental.data.datasource.datasource import (
Datasource, RangeDatasource, DummyOutputDatasource, ReadTask, WriteTask)
from ray.experimental.data.datasource.json_datasource import JSONDatasource
from ray.experimental.data.datasource.csv_datasource import CSVDatasource
from ray.experimental.data.datasource.numpy_datasource import NumpyDatasource
from ray.experimental.data.datasource.parquet_datasource import (
ParquetDatasource)
from ray.experimental.data.datasource.binary_datasource import BinaryDatasource
from ray.experimental.data.datasource.file_based_datasource import (
FileBasedDatasource, _S3FileSystemWrapper)
__all__ = [
"JSONDatasource",
"CSVDatasource",
"NumpyDatasource",
"ParquetDatasource",
"BinaryDatasource",
"FileBasedDatasource",
"_S3FileSystemWrapper",
"Datasource",
"RangeDatasource",
"DummyOutputDatasource",
"ReadTask",
"WriteTask",
]

View file

@ -78,7 +78,7 @@ ray.init()
start_time = time.time() start_time = time.time()
print("Downloading...") print("Downloading...")
ds = ray.experimental.data.read_binary_files( ds = ray.data.read_binary_files(
"s3://anyscale-data/small-images/", "s3://anyscale-data/small-images/",
parallelism=1000, parallelism=1000,
ray_remote_args={"num_cpus": 0.5}) ray_remote_args={"num_cpus": 0.5})