mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[Datasets] Autodetect dataset parallelism based on available resources and data size (#25883)
This PR defaults the parallelism of Dataset reads to `-1`. The parallelism is determined according to the following rule in this case: - The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster. If the estimated CPUs is less than 8, it is set to 8. - The parallelism is set to the estimated number of CPUs multiplied by 2. - The in-memory data size is estimated. If the parallelism would create in-memory blocks larger than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size. These rules fix two common user problems: 1. Insufficient parallelism in a large cluster, or too much parallelism on a small cluster. 2. Overly large block sizes leading to OOMs when processing a single block. TODO: - [x] Unit tests - [x] Docs update Supercedes part of: https://github.com/ray-project/ray/pull/25708 Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
This commit is contained in:
parent
14800e5ac7
commit
9de1add073
14 changed files with 616 additions and 331 deletions
|
@ -288,8 +288,13 @@ Each of these APIs take a path or list of paths to files or directories. Any dir
|
|||
provided will be walked in order to obtain concrete file paths, at which point all files
|
||||
will be read in parallel.
|
||||
|
||||
Datasets uses a default parallelism of 200, truncated by the number of files being read:
|
||||
``parallelism = min(num_files, 200)``. ``parallelism`` parallel read tasks will be
|
||||
Datasets automatically selects the read ``parallelism`` according to the following procedure:
|
||||
1. The number of available CPUs is estimated. If in a placement group, the number of CPUs in the cluster is scaled by the size of the placement group compared to the cluster size. If not in a placement group, this is the number of CPUs in the cluster.
|
||||
2. The parallelism is set to the estimated number of CPUs multiplied by 2. If the parallelism is less than 8, it is set to 8.
|
||||
3. The in-memory data size is estimated. If the parallelism would create in-memory blocks that are larger on average than the target block size (512MiB), the parallelism is increased until the blocks are < 512MiB in size.
|
||||
4. The parallelism is truncated to ``min(num_files, parallelism)``.
|
||||
|
||||
To perform the read, ``parallelism`` parallel read tasks will be
|
||||
launched, each reading one or more files and each creating a single block of data.
|
||||
When reading from remote datasources, these parallel read tasks will be spread across
|
||||
the nodes in your Ray cluster, creating the distributed collection of blocks that makes
|
||||
|
|
|
@ -61,7 +61,7 @@ This page overviews the execution model of Datasets, which may be useful for und
|
|||
Reading Data
|
||||
============
|
||||
|
||||
Datasets uses Ray tasks to read data from remote storage. When reading from a file-based datasource (e.g., S3, GCS), it creates a number of read tasks equal to the specified read parallelism (200 by default). One or more files will be assigned to each read task. Each read task reads its assigned files and produces one or more output blocks (Ray objects):
|
||||
Datasets uses Ray tasks to read data from remote storage. When reading from a file-based datasource (e.g., S3, GCS), it creates a number of read tasks equal to the specified read parallelism (autodetected by default). One or more files will be assigned to each read task. Each read task reads its assigned files and produces one or more output blocks (Ray objects):
|
||||
|
||||
.. image:: images/dataset-read.svg
|
||||
:width: 650px
|
||||
|
|
|
@ -109,13 +109,13 @@ This can be used in conjunction with column pruning when appropriate to get the
|
|||
Tuning Read Parallelism
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
By default, Ray requests 0.5 CPUs per read task, which means two read tasks can concurrently execute per CPU.
|
||||
By default, Ray requests 1 CPU per read task, which means one read tasks per CPU can execute concurrently.
|
||||
For data sources that can benefit from higher degress of I/O parallelism, you can specify a lower ``num_cpus`` value for the read function via the ``ray_remote_args`` parameter.
|
||||
For example, use ``ray.data.read_parquet(path, ray_remote_args={"num_cpus": 0.25})`` to allow up to four read tasks per CPU.
|
||||
|
||||
The number of read tasks can also be increased by increasing the ``parallelism`` parameter.
|
||||
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to create up to 1000 read tasks.
|
||||
Typically, increasing the number of read tasks only helps if you have more cluster CPUs than the default parallelism.
|
||||
By default, Datasets automatically selects the read parallelism based on the current cluster size and dataset size.
|
||||
However, the number of read tasks can also be increased manually via the ``parallelism`` parameter.
|
||||
For example, use ``ray.data.read_parquet(path, parallelism=1000)`` to force up to 1000 read tasks to be created.
|
||||
|
||||
Tuning Max Block Size
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
|
|
@ -87,7 +87,7 @@ class ProgressBar:
|
|||
self._bar.set_description(name)
|
||||
|
||||
def update(self, i: int) -> None:
|
||||
if self._bar:
|
||||
if self._bar and i != 0:
|
||||
self._bar.update(i)
|
||||
|
||||
def close(self):
|
||||
|
|
|
@ -11,7 +11,7 @@ _default_context: "Optional[DatasetContext]" = None
|
|||
_context_lock = threading.Lock()
|
||||
|
||||
# The max target block size in bytes for reads and transformations.
|
||||
DEFAULT_TARGET_MAX_BLOCK_SIZE = 2048 * 1024 * 1024
|
||||
DEFAULT_TARGET_MAX_BLOCK_SIZE = 512 * 1024 * 1024
|
||||
|
||||
# Whether block splitting is on by default
|
||||
DEFAULT_BLOCK_SPLITTING_ENABLED = False
|
||||
|
@ -33,6 +33,9 @@ DEFAULT_OPTIMIZE_FUSE_READ_STAGES = True
|
|||
# Whether to furthermore fuse prior map tasks with shuffle stages.
|
||||
DEFAULT_OPTIMIZE_FUSE_SHUFFLE_STAGES = True
|
||||
|
||||
# Minimum amount of parallelism to auto-detect for a dataset.
|
||||
DEFAULT_MIN_PARALLELISM = 8
|
||||
|
||||
# Wether to use actor based block prefetcher.
|
||||
DEFAULT_ACTOR_PREFETCHER_ENABLED = True
|
||||
|
||||
|
@ -71,6 +74,7 @@ class DatasetContext:
|
|||
pipeline_push_based_shuffle_reduce_tasks: bool,
|
||||
scheduling_strategy: SchedulingStrategyT,
|
||||
use_polars: bool,
|
||||
min_parallelism: bool,
|
||||
):
|
||||
"""Private constructor (use get_current() instead)."""
|
||||
self.block_owner = block_owner
|
||||
|
@ -88,6 +92,7 @@ class DatasetContext:
|
|||
)
|
||||
self.scheduling_strategy = scheduling_strategy
|
||||
self.use_polars = use_polars
|
||||
self.min_parallelism = min_parallelism
|
||||
|
||||
@staticmethod
|
||||
def get_current() -> "DatasetContext":
|
||||
|
@ -118,6 +123,7 @@ class DatasetContext:
|
|||
pipeline_push_based_shuffle_reduce_tasks=True,
|
||||
scheduling_strategy=DEFAULT_SCHEDULING_STRATEGY,
|
||||
use_polars=DEFAULT_USE_POLARS,
|
||||
min_parallelism=DEFAULT_MIN_PARALLELISM,
|
||||
)
|
||||
|
||||
if (
|
||||
|
|
|
@ -6,6 +6,7 @@ from ray.data.datasource.datasource import (
|
|||
RandomIntRowDatasource,
|
||||
RangeDatasource,
|
||||
ReadTask,
|
||||
Reader,
|
||||
WriteResult,
|
||||
)
|
||||
from ray.data.datasource.file_based_datasource import (
|
||||
|
@ -64,6 +65,7 @@ __all__ = [
|
|||
"RandomIntRowDatasource",
|
||||
"RangeDatasource",
|
||||
"ReadTask",
|
||||
"Reader",
|
||||
"SimpleTensorFlowDatasource",
|
||||
"SimpleTorchDatasource",
|
||||
"WriteResult",
|
||||
|
|
|
@ -1,29 +1,29 @@
|
|||
import builtins
|
||||
from typing import Any, Generic, List, Dict, Callable, Union, Tuple, Iterable
|
||||
from typing import Any, Callable, Dict, Generic, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
import numpy as np
|
||||
|
||||
import ray
|
||||
from ray.types import ObjectRef
|
||||
from ray.data._internal.arrow_block import ArrowRow
|
||||
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
|
||||
from ray.data._internal.util import _check_pyarrow_version
|
||||
from ray.data.block import (
|
||||
Block,
|
||||
BlockAccessor,
|
||||
BlockMetadata,
|
||||
T,
|
||||
BlockPartition,
|
||||
BlockPartitionMetadata,
|
||||
MaybeBlockPartition,
|
||||
T,
|
||||
)
|
||||
from ray.data.context import DatasetContext
|
||||
from ray.data._internal.arrow_block import ArrowRow
|
||||
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
|
||||
from ray.data._internal.util import _check_pyarrow_version
|
||||
from ray.util.annotations import DeveloperAPI, PublicAPI
|
||||
from ray.types import ObjectRef
|
||||
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI
|
||||
|
||||
WriteResult = Any
|
||||
|
||||
|
||||
@DeveloperAPI
|
||||
@PublicAPI
|
||||
class Datasource(Generic[T]):
|
||||
"""Interface for defining a custom ``ray.data.Dataset`` datasource.
|
||||
|
||||
|
@ -33,22 +33,24 @@ class Datasource(Generic[T]):
|
|||
See ``RangeDatasource`` and ``DummyOutputDatasource`` for examples
|
||||
of how to implement readable and writable datasources.
|
||||
|
||||
Datasource instances must be serializable, since ``prepare_read()`` and
|
||||
Datasource instances must be serializable, since ``create_reader()`` and
|
||||
``do_write()`` are called in remote tasks.
|
||||
"""
|
||||
|
||||
def prepare_read(self, parallelism: int, **read_args) -> List["ReadTask[T]"]:
|
||||
"""Return the list of tasks needed to perform a read.
|
||||
def create_reader(self, **read_args) -> "Reader[T]":
|
||||
"""Return a Reader for the given read arguments.
|
||||
|
||||
The reader object will be responsible for querying the read metadata, and
|
||||
generating the actual read tasks to retrieve the data blocks upon request.
|
||||
|
||||
Args:
|
||||
parallelism: The requested read parallelism. The number of read
|
||||
tasks should be as close to this value as possible.
|
||||
read_args: Additional kwargs to pass to the datasource impl.
|
||||
|
||||
Returns:
|
||||
A list of read tasks that can be executed to read blocks from the
|
||||
datasource in parallel.
|
||||
"""
|
||||
return _LegacyDatasourceReader(self, **read_args)
|
||||
|
||||
@Deprecated
|
||||
def prepare_read(self, parallelism: int, **read_args) -> List["ReadTask[T]"]:
|
||||
"""Deprecated: Please implement create_reader() instead."""
|
||||
raise NotImplementedError
|
||||
|
||||
def do_write(
|
||||
|
@ -100,11 +102,54 @@ class Datasource(Generic[T]):
|
|||
pass
|
||||
|
||||
|
||||
@PublicAPI
|
||||
class Reader(Generic[T]):
|
||||
"""A bound read operation for a datasource.
|
||||
|
||||
This is a stateful class so that reads can be prepared in multiple stages.
|
||||
For example, it is useful for Datasets to know the in-memory size of the read
|
||||
prior to executing it.
|
||||
"""
|
||||
|
||||
def estimate_inmemory_data_size(self) -> Optional[int]:
|
||||
"""Return an estimate of the in-memory data size, or None if unknown.
|
||||
|
||||
Note that the in-memory data size may be larger than the on-disk data size.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def get_read_tasks(self, parallelism: int) -> List["ReadTask[T]"]:
|
||||
"""Execute the read and return read tasks.
|
||||
|
||||
Args:
|
||||
parallelism: The requested read parallelism. The number of read
|
||||
tasks should equal to this value if possible.
|
||||
read_args: Additional kwargs to pass to the datasource impl.
|
||||
|
||||
Returns:
|
||||
A list of read tasks that can be executed to read blocks from the
|
||||
datasource in parallel.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class _LegacyDatasourceReader(Reader):
|
||||
def __init__(self, datasource: Datasource, **read_args):
|
||||
self._datasource = datasource
|
||||
self._read_args = read_args
|
||||
|
||||
def estimate_inmemory_data_size(self) -> Optional[int]:
|
||||
return None
|
||||
|
||||
def get_read_tasks(self, parallelism: int) -> List["ReadTask[T]"]:
|
||||
return self._datasource.prepare_read(parallelism, **self._read_args)
|
||||
|
||||
|
||||
@DeveloperAPI
|
||||
class ReadTask(Callable[[], BlockPartition]):
|
||||
"""A function used to read blocks from the dataset.
|
||||
|
||||
Read tasks are generated by ``datasource.prepare_read()``, and return
|
||||
Read tasks are generated by ``reader.get_read_tasks()``, and return
|
||||
a list of ``ray.data.Block`` when called. Initial metadata about the read
|
||||
operation can be retrieved via ``get_metadata()`` prior to executing the
|
||||
read. Final metadata is returned after the read along with the blocks.
|
||||
|
@ -171,14 +216,36 @@ class RangeDatasource(Datasource[Union[ArrowRow, int]]):
|
|||
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
||||
"""
|
||||
|
||||
def prepare_read(
|
||||
def create_reader(
|
||||
self,
|
||||
parallelism: int,
|
||||
n: int,
|
||||
block_format: str = "list",
|
||||
tensor_shape: Tuple = (1,),
|
||||
) -> List[ReadTask]:
|
||||
return _RangeDatasourceReader(n, block_format, tensor_shape)
|
||||
|
||||
|
||||
class _RangeDatasourceReader(Reader):
|
||||
def __init__(self, n: int, block_format: str = "list", tensor_shape: Tuple = (1,)):
|
||||
self._n = n
|
||||
self._block_format = block_format
|
||||
self._tensor_shape = tensor_shape
|
||||
|
||||
def estimate_inmemory_data_size(self) -> Optional[int]:
|
||||
if self._block_format == "tensor":
|
||||
element_size = np.product(self._tensor_shape)
|
||||
else:
|
||||
element_size = 1
|
||||
return 8 * self._n * element_size
|
||||
|
||||
def get_read_tasks(
|
||||
self,
|
||||
parallelism: int,
|
||||
) -> List[ReadTask]:
|
||||
read_tasks: List[ReadTask] = []
|
||||
n = self._n
|
||||
block_format = self._block_format
|
||||
tensor_shape = self._tensor_shape
|
||||
block_size = max(1, n // parallelism)
|
||||
|
||||
# Example of a read task. In a real datasource, this would pull data
|
||||
|
@ -316,13 +383,32 @@ class RandomIntRowDatasource(Datasource[ArrowRow]):
|
|||
{'c_0': 4983608804013926748, 'c_1': 1160140066899844087}
|
||||
"""
|
||||
|
||||
def prepare_read(
|
||||
self, parallelism: int, n: int, num_columns: int
|
||||
def create_reader(
|
||||
self,
|
||||
n: int,
|
||||
num_columns: int,
|
||||
) -> List[ReadTask]:
|
||||
return _RandomIntRowDatasourceReader(n, num_columns)
|
||||
|
||||
|
||||
class _RandomIntRowDatasourceReader(Reader):
|
||||
def __init__(self, n: int, num_columns: int):
|
||||
self._n = n
|
||||
self._num_columns = num_columns
|
||||
|
||||
def estimate_inmemory_data_size(self) -> Optional[int]:
|
||||
return self._n * self._num_columns * 8
|
||||
|
||||
def get_read_tasks(
|
||||
self,
|
||||
parallelism: int,
|
||||
) -> List[ReadTask]:
|
||||
_check_pyarrow_version()
|
||||
import pyarrow
|
||||
|
||||
read_tasks: List[ReadTask] = []
|
||||
n = self._n
|
||||
num_columns = self._num_columns
|
||||
block_size = max(1, n // parallelism)
|
||||
|
||||
def make_block(count: int, num_columns: int) -> Block:
|
||||
|
|
|
@ -25,7 +25,7 @@ from ray.data._internal.remote_fn import cached_remote_fn
|
|||
from ray.data._internal.util import _check_pyarrow_version
|
||||
from ray.data.block import Block, BlockAccessor
|
||||
from ray.data.context import DatasetContext
|
||||
from ray.data.datasource.datasource import Datasource, ReadTask, WriteResult
|
||||
from ray.data.datasource.datasource import Datasource, Reader, ReadTask, WriteResult
|
||||
from ray.data.datasource.file_meta_provider import (
|
||||
BaseFileMetadataProvider,
|
||||
DefaultFileMetadataProvider,
|
||||
|
@ -187,119 +187,23 @@ class FileBasedDatasource(Datasource[Union[ArrowRow, Any]]):
|
|||
|
||||
_FILE_EXTENSION: Optional[Union[str, List[str]]] = None
|
||||
|
||||
def prepare_read(
|
||||
def _open_input_source(
|
||||
self,
|
||||
parallelism: int,
|
||||
paths: Union[str, List[str]],
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
schema: Optional[Union[type, "pyarrow.lib.Schema"]] = None,
|
||||
open_stream_args: Optional[Dict[str, Any]] = None,
|
||||
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
|
||||
partition_filter: PathPartitionFilter = None,
|
||||
# TODO(ekl) deprecate this once read fusion is available.
|
||||
_block_udf: Optional[Callable[[Block], Block]] = None,
|
||||
**reader_args,
|
||||
) -> List[ReadTask]:
|
||||
"""Creates and returns read tasks for a file-based datasource."""
|
||||
_check_pyarrow_version()
|
||||
import numpy as np
|
||||
filesystem: "pyarrow.fs.FileSystem",
|
||||
path: str,
|
||||
**open_args,
|
||||
) -> "pyarrow.NativeFile":
|
||||
"""Opens a source path for reading and returns the associated Arrow NativeFile.
|
||||
|
||||
paths, filesystem = _resolve_paths_and_filesystem(paths, filesystem)
|
||||
paths, file_sizes = meta_provider.expand_paths(paths, filesystem)
|
||||
if partition_filter is not None:
|
||||
filtered_paths = partition_filter(paths)
|
||||
if not filtered_paths:
|
||||
raise ValueError(
|
||||
"All provided and expanded paths have been filtered out by "
|
||||
"the path filter; please change the provided paths or the "
|
||||
f"path filter.\nPaths: {paths}\nFilter: {partition_filter}"
|
||||
)
|
||||
paths = filtered_paths
|
||||
The default implementation opens the source path as a sequential input stream.
|
||||
|
||||
read_stream = self._read_stream
|
||||
Implementations that do not support streaming reads (e.g. that require random
|
||||
access) should override this method.
|
||||
"""
|
||||
return filesystem.open_input_stream(path, **open_args)
|
||||
|
||||
filesystem = _wrap_s3_serialization_workaround(filesystem)
|
||||
read_options = reader_args.get("read_options")
|
||||
if read_options is not None:
|
||||
import pyarrow.json as pajson
|
||||
|
||||
if isinstance(read_options, pajson.ReadOptions):
|
||||
_register_arrow_json_readoptions_serializer()
|
||||
|
||||
if open_stream_args is None:
|
||||
open_stream_args = {}
|
||||
|
||||
def read_files(
|
||||
read_paths: List[str],
|
||||
fs: Union["pyarrow.fs.FileSystem", _S3FileSystemWrapper],
|
||||
) -> Iterable[Block]:
|
||||
logger.debug(f"Reading {len(read_paths)} files.")
|
||||
if isinstance(fs, _S3FileSystemWrapper):
|
||||
fs = fs.unwrap()
|
||||
ctx = DatasetContext.get_current()
|
||||
output_buffer = BlockOutputBuffer(
|
||||
block_udf=_block_udf, target_max_block_size=ctx.target_max_block_size
|
||||
)
|
||||
for read_path in read_paths:
|
||||
compression = open_stream_args.pop("compression", None)
|
||||
if compression is None:
|
||||
import pyarrow as pa
|
||||
|
||||
try:
|
||||
# If no compression manually given, try to detect
|
||||
# compression codec from path.
|
||||
compression = pa.Codec.detect(read_path).name
|
||||
except (ValueError, TypeError):
|
||||
# Arrow's compression inference on the file path
|
||||
# doesn't work for Snappy, so we double-check ourselves.
|
||||
import pathlib
|
||||
|
||||
suffix = pathlib.Path(read_path).suffix
|
||||
if suffix and suffix[1:] == "snappy":
|
||||
compression = "snappy"
|
||||
else:
|
||||
compression = None
|
||||
if compression == "snappy":
|
||||
# Pass Snappy compression as a reader arg, so datasource subclasses
|
||||
# can manually handle streaming decompression in
|
||||
# self._read_stream().
|
||||
reader_args["compression"] = compression
|
||||
reader_args["filesystem"] = fs
|
||||
elif compression is not None:
|
||||
# Non-Snappy compression, pass as open_input_stream() arg so Arrow
|
||||
# can take care of streaming decompression for us.
|
||||
open_stream_args["compression"] = compression
|
||||
with self._open_input_source(fs, read_path, **open_stream_args) as f:
|
||||
for data in read_stream(f, read_path, **reader_args):
|
||||
output_buffer.add_block(data)
|
||||
if output_buffer.has_next():
|
||||
yield output_buffer.next()
|
||||
output_buffer.finalize()
|
||||
if output_buffer.has_next():
|
||||
yield output_buffer.next()
|
||||
|
||||
# fix https://github.com/ray-project/ray/issues/24296
|
||||
parallelism = min(parallelism, len(paths))
|
||||
|
||||
read_tasks = []
|
||||
for read_paths, file_sizes in zip(
|
||||
np.array_split(paths, parallelism), np.array_split(file_sizes, parallelism)
|
||||
):
|
||||
if len(read_paths) <= 0:
|
||||
continue
|
||||
|
||||
meta = meta_provider(
|
||||
read_paths,
|
||||
schema,
|
||||
rows_per_file=self._rows_per_file(),
|
||||
file_sizes=file_sizes,
|
||||
)
|
||||
read_task = ReadTask(
|
||||
lambda read_paths=read_paths: read_files(read_paths, filesystem), meta
|
||||
)
|
||||
read_tasks.append(read_task)
|
||||
|
||||
return read_tasks
|
||||
def create_reader(self, **kwargs):
|
||||
return _FileBasedDatasourceReader(self, **kwargs)
|
||||
|
||||
def _rows_per_file(self):
|
||||
"""Returns the number of rows per file, or None if unknown."""
|
||||
|
@ -323,21 +227,6 @@ class FileBasedDatasource(Datasource[Union[ArrowRow, Any]]):
|
|||
"Subclasses of FileBasedDatasource must implement _read_file()."
|
||||
)
|
||||
|
||||
def _open_input_source(
|
||||
self,
|
||||
filesystem: "pyarrow.fs.FileSystem",
|
||||
path: str,
|
||||
**open_args,
|
||||
) -> "pyarrow.NativeFile":
|
||||
"""Opens a source path for reading and returns the associated Arrow NativeFile.
|
||||
|
||||
The default implementation opens the source path as a sequential input stream.
|
||||
|
||||
Implementations that do not support streaming reads (e.g. that require random
|
||||
access) should override this method.
|
||||
"""
|
||||
return filesystem.open_input_stream(path, **open_args)
|
||||
|
||||
def do_write(
|
||||
self,
|
||||
blocks: List[ObjectRef[Block]],
|
||||
|
@ -429,6 +318,138 @@ class FileBasedDatasource(Datasource[Union[ArrowRow, Any]]):
|
|||
return FileExtensionFilter(cls._FILE_EXTENSION)
|
||||
|
||||
|
||||
class _FileBasedDatasourceReader(Reader):
|
||||
def __init__(
|
||||
self,
|
||||
delegate: FileBasedDatasource,
|
||||
paths: Union[str, List[str]],
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
schema: Optional[Union[type, "pyarrow.lib.Schema"]] = None,
|
||||
open_stream_args: Optional[Dict[str, Any]] = None,
|
||||
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
|
||||
partition_filter: PathPartitionFilter = None,
|
||||
# TODO(ekl) deprecate this once read fusion is available.
|
||||
_block_udf: Optional[Callable[[Block], Block]] = None,
|
||||
**reader_args,
|
||||
):
|
||||
_check_pyarrow_version()
|
||||
self._delegate = delegate
|
||||
self._schema = schema
|
||||
self._open_stream_args = open_stream_args
|
||||
self._meta_provider = meta_provider
|
||||
self._partition_filter = partition_filter
|
||||
self._block_udf = _block_udf
|
||||
self._reader_args = reader_args
|
||||
paths, self._filesystem = _resolve_paths_and_filesystem(paths, filesystem)
|
||||
self._paths, self._file_sizes = meta_provider.expand_paths(
|
||||
paths, self._filesystem
|
||||
)
|
||||
|
||||
def estimate_inmemory_data_size(self) -> Optional[int]:
|
||||
total_size = 0
|
||||
for sz in self._file_sizes:
|
||||
if sz is not None:
|
||||
total_size += sz
|
||||
return total_size
|
||||
|
||||
def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
|
||||
import numpy as np
|
||||
|
||||
open_stream_args = self._open_stream_args
|
||||
reader_args = self._reader_args
|
||||
_block_udf = self._block_udf
|
||||
|
||||
paths, file_sizes = self._paths, self._file_sizes
|
||||
if self._partition_filter is not None:
|
||||
paths = self._partition_filter(paths)
|
||||
|
||||
read_stream = self._delegate._read_stream
|
||||
filesystem = _wrap_s3_serialization_workaround(self._filesystem)
|
||||
read_options = reader_args.get("read_options")
|
||||
if read_options is not None:
|
||||
import pyarrow.json as pajson
|
||||
|
||||
if isinstance(read_options, pajson.ReadOptions):
|
||||
_register_arrow_json_readoptions_serializer()
|
||||
|
||||
if open_stream_args is None:
|
||||
open_stream_args = {}
|
||||
|
||||
open_input_source = self._delegate._open_input_source
|
||||
|
||||
def read_files(
|
||||
read_paths: List[str],
|
||||
fs: Union["pyarrow.fs.FileSystem", _S3FileSystemWrapper],
|
||||
) -> Iterable[Block]:
|
||||
logger.debug(f"Reading {len(read_paths)} files.")
|
||||
if isinstance(fs, _S3FileSystemWrapper):
|
||||
fs = fs.unwrap()
|
||||
ctx = DatasetContext.get_current()
|
||||
output_buffer = BlockOutputBuffer(
|
||||
block_udf=_block_udf, target_max_block_size=ctx.target_max_block_size
|
||||
)
|
||||
for read_path in read_paths:
|
||||
compression = open_stream_args.pop("compression", None)
|
||||
if compression is None:
|
||||
import pyarrow as pa
|
||||
|
||||
try:
|
||||
# If no compression manually given, try to detect
|
||||
# compression codec from path.
|
||||
compression = pa.Codec.detect(read_path).name
|
||||
except (ValueError, TypeError):
|
||||
# Arrow's compression inference on the file path
|
||||
# doesn't work for Snappy, so we double-check ourselves.
|
||||
import pathlib
|
||||
|
||||
suffix = pathlib.Path(read_path).suffix
|
||||
if suffix and suffix[1:] == "snappy":
|
||||
compression = "snappy"
|
||||
else:
|
||||
compression = None
|
||||
if compression == "snappy":
|
||||
# Pass Snappy compression as a reader arg, so datasource subclasses
|
||||
# can manually handle streaming decompression in
|
||||
# self._delegate._read_stream().
|
||||
reader_args["compression"] = compression
|
||||
reader_args["filesystem"] = fs
|
||||
elif compression is not None:
|
||||
# Non-Snappy compression, pass as open_input_stream() arg so Arrow
|
||||
# can take care of streaming decompression for us.
|
||||
open_stream_args["compression"] = compression
|
||||
with open_input_source(fs, read_path, **open_stream_args) as f:
|
||||
for data in read_stream(f, read_path, **reader_args):
|
||||
output_buffer.add_block(data)
|
||||
if output_buffer.has_next():
|
||||
yield output_buffer.next()
|
||||
output_buffer.finalize()
|
||||
if output_buffer.has_next():
|
||||
yield output_buffer.next()
|
||||
|
||||
# fix https://github.com/ray-project/ray/issues/24296
|
||||
parallelism = min(parallelism, len(paths))
|
||||
|
||||
read_tasks = []
|
||||
for read_paths, file_sizes in zip(
|
||||
np.array_split(paths, parallelism), np.array_split(file_sizes, parallelism)
|
||||
):
|
||||
if len(read_paths) <= 0:
|
||||
continue
|
||||
|
||||
meta = self._meta_provider(
|
||||
read_paths,
|
||||
self._schema,
|
||||
rows_per_file=self._delegate._rows_per_file(),
|
||||
file_sizes=file_sizes,
|
||||
)
|
||||
read_task = ReadTask(
|
||||
lambda read_paths=read_paths: read_files(read_paths, filesystem), meta
|
||||
)
|
||||
read_tasks.append(read_task)
|
||||
|
||||
return read_tasks
|
||||
|
||||
|
||||
# TODO(Clark): Add unit test coverage of _resolve_paths_and_filesystem and
|
||||
# _expand_paths.
|
||||
|
||||
|
|
|
@ -1,28 +1,29 @@
|
|||
import logging
|
||||
import itertools
|
||||
from typing import Callable, Optional, List, Union, Iterator, TYPE_CHECKING
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Callable, Iterator, List, Optional, Union
|
||||
|
||||
import numpy as np
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import pyarrow
|
||||
|
||||
import ray
|
||||
from ray.types import ObjectRef
|
||||
from ray.data.block import Block
|
||||
from ray.data.context import DatasetContext
|
||||
from ray.data.datasource.datasource import ReadTask
|
||||
from ray.data.datasource.file_based_datasource import _resolve_paths_and_filesystem
|
||||
from ray.data.datasource.parquet_base_datasource import ParquetBaseDatasource
|
||||
from ray.data.datasource.file_meta_provider import (
|
||||
ParquetMetadataProvider,
|
||||
DefaultParquetMetadataProvider,
|
||||
)
|
||||
from ray.data._internal.output_buffer import BlockOutputBuffer
|
||||
from ray.data._internal.progress_bar import ProgressBar
|
||||
from ray.data._internal.remote_fn import cached_remote_fn
|
||||
from ray.data._internal.util import _check_pyarrow_version
|
||||
from ray.data.block import Block
|
||||
from ray.data.context import DatasetContext
|
||||
from ray.data.datasource.datasource import Reader, ReadTask
|
||||
from ray.data.datasource.file_based_datasource import _resolve_paths_and_filesystem
|
||||
from ray.data.datasource.file_meta_provider import (
|
||||
DefaultParquetMetadataProvider,
|
||||
ParquetMetadataProvider,
|
||||
)
|
||||
from ray.data.datasource.parquet_base_datasource import ParquetBaseDatasource
|
||||
from ray.types import ObjectRef
|
||||
from ray.util.annotations import PublicAPI
|
||||
import ray.cloudpickle as cloudpickle
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import pyarrow
|
||||
from pyarrow.dataset import ParquetFileFragment
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -36,38 +37,30 @@ PARQUET_READER_ROW_BATCH_SIZE = 100000
|
|||
FILE_READING_RETRY = 8
|
||||
|
||||
|
||||
def _register_parquet_file_fragment_serialization():
|
||||
from pyarrow.dataset import ParquetFileFragment
|
||||
# TODO(ekl) this is a workaround for a pyarrow serialization bug, where serializing a
|
||||
# raw pyarrow file fragment causes S3 network calls.
|
||||
class _SerializedPiece:
|
||||
def __init__(self, frag: "ParquetFileFragment"):
|
||||
self._data = cloudpickle.dumps(
|
||||
(frag.format, frag.path, frag.filesystem, frag.partition_expression)
|
||||
)
|
||||
|
||||
def serialize(frag):
|
||||
return (frag.format, frag.path, frag.filesystem, frag.partition_expression)
|
||||
def deserialize(self) -> "ParquetFileFragment":
|
||||
# Implicitly trigger S3 subsystem initialization by importing
|
||||
# pyarrow.fs.
|
||||
import pyarrow.fs # noqa: F401
|
||||
|
||||
def deserialize(obj):
|
||||
file_format, path, filesystem, partition_expression = obj
|
||||
(file_format, path, filesystem, partition_expression) = cloudpickle.loads(
|
||||
self._data
|
||||
)
|
||||
return file_format.make_fragment(path, filesystem, partition_expression)
|
||||
|
||||
ray.util.register_serializer(
|
||||
ParquetFileFragment, serializer=serialize, deserializer=deserialize
|
||||
)
|
||||
|
||||
|
||||
def _deregister_parquet_file_fragment_serialization():
|
||||
from pyarrow.dataset import ParquetFileFragment
|
||||
|
||||
ray.util.deregister_serializer(ParquetFileFragment)
|
||||
|
||||
|
||||
# This is the bare bone deserializing function with no retry
|
||||
# easier to mock its behavior for testing when isolated from retry logic
|
||||
# Visible for test mocking.
|
||||
def _deserialize_pieces(
|
||||
serialized_pieces: str,
|
||||
serialized_pieces: List[_SerializedPiece],
|
||||
) -> List["pyarrow._dataset.ParquetFileFragment"]:
|
||||
from ray import cloudpickle
|
||||
|
||||
pieces: List["pyarrow._dataset.ParquetFileFragment"] = cloudpickle.loads(
|
||||
serialized_pieces
|
||||
)
|
||||
return pieces
|
||||
return [p.deserialize() for p in serialized_pieces]
|
||||
|
||||
|
||||
# This retry helps when the upstream datasource is not able to handle
|
||||
|
@ -78,7 +71,7 @@ def _deserialize_pieces(
|
|||
# with ray.data parallelism setting at high value like the default 200
|
||||
# Such connection failure can be restored with some waiting and retry.
|
||||
def _deserialize_pieces_with_retry(
|
||||
serialized_pieces: str,
|
||||
serialized_pieces: List[_SerializedPiece],
|
||||
) -> List["pyarrow._dataset.ParquetFileFragment"]:
|
||||
min_interval = 0
|
||||
final_exception = None
|
||||
|
@ -86,8 +79,8 @@ def _deserialize_pieces_with_retry(
|
|||
try:
|
||||
return _deserialize_pieces(serialized_pieces)
|
||||
except Exception as e:
|
||||
import time
|
||||
import random
|
||||
import time
|
||||
|
||||
retry_timing = (
|
||||
""
|
||||
|
@ -138,9 +131,13 @@ class ParquetDatasource(ParquetBaseDatasource):
|
|||
[{"a": 1, "b": "foo"}, ...]
|
||||
"""
|
||||
|
||||
def prepare_read(
|
||||
def create_reader(self, **kwargs):
|
||||
return _ParquetDatasourceReader(**kwargs)
|
||||
|
||||
|
||||
class _ParquetDatasourceReader(Reader):
|
||||
def __init__(
|
||||
self,
|
||||
parallelism: int,
|
||||
paths: Union[str, List[str]],
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
columns: Optional[List[str]] = None,
|
||||
|
@ -148,17 +145,10 @@ class ParquetDatasource(ParquetBaseDatasource):
|
|||
meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(),
|
||||
_block_udf: Optional[Callable[[Block], Block]] = None,
|
||||
**reader_args,
|
||||
) -> List[ReadTask]:
|
||||
"""Creates and returns read tasks for a Parquet file-based datasource."""
|
||||
# NOTE: We override the base class FileBasedDatasource.prepare_read
|
||||
# method in order to leverage pyarrow's ParquetDataset abstraction,
|
||||
# which simplifies partitioning logic. We still use
|
||||
# FileBasedDatasource's write side (do_write), however.
|
||||
):
|
||||
_check_pyarrow_version()
|
||||
from ray import cloudpickle
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import numpy as np
|
||||
|
||||
paths, filesystem = _resolve_paths_and_filesystem(paths, filesystem)
|
||||
if len(paths) == 1:
|
||||
|
@ -175,28 +165,98 @@ class ParquetDatasource(ParquetBaseDatasource):
|
|||
[schema.field(column) for column in columns], schema.metadata
|
||||
)
|
||||
|
||||
def read_pieces(serialized_pieces: str) -> Iterator[pa.Table]:
|
||||
# Implicitly trigger S3 subsystem initialization by importing
|
||||
# pyarrow.fs.
|
||||
import pyarrow.fs # noqa: F401
|
||||
|
||||
# Deserialize after loading the filesystem class.
|
||||
if _block_udf is not None:
|
||||
# Try to infer dataset schema by passing dummy table through UDF.
|
||||
dummy_table = schema.empty_table()
|
||||
try:
|
||||
_register_parquet_file_fragment_serialization()
|
||||
inferred_schema = _block_udf(dummy_table).schema
|
||||
inferred_schema = inferred_schema.with_metadata(schema.metadata)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"Failed to infer schema of dataset by passing dummy table "
|
||||
"through UDF due to the following exception:",
|
||||
exc_info=True,
|
||||
)
|
||||
inferred_schema = schema
|
||||
else:
|
||||
inferred_schema = schema
|
||||
self._metadata = meta_provider.prefetch_file_metadata(pq_ds.pieces) or []
|
||||
self._pq_ds = pq_ds
|
||||
self._meta_provider = meta_provider
|
||||
self._inferred_schema = inferred_schema
|
||||
self._block_udf = _block_udf
|
||||
self._reader_args = reader_args
|
||||
self._columns = columns
|
||||
self._schema = schema
|
||||
|
||||
def estimate_inmemory_data_size(self) -> Optional[int]:
|
||||
# TODO(ekl) better estimate the in-memory size here.
|
||||
PARQUET_DECOMPRESSION_MULTIPLIER = 5
|
||||
total_size = 0
|
||||
for meta in self._metadata:
|
||||
total_size += meta.serialized_size
|
||||
return total_size * PARQUET_DECOMPRESSION_MULTIPLIER
|
||||
|
||||
def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
|
||||
# NOTE: We override the base class FileBasedDatasource.get_read_tasks()
|
||||
# method in order to leverage pyarrow's ParquetDataset abstraction,
|
||||
# which simplifies partitioning logic. We still use
|
||||
# FileBasedDatasource's write side (do_write), however.
|
||||
read_tasks = []
|
||||
for pieces, metadata in zip(
|
||||
np.array_split(self._pq_ds.pieces, parallelism),
|
||||
np.array_split(self._metadata, parallelism),
|
||||
):
|
||||
if len(pieces) <= 0:
|
||||
continue
|
||||
serialized_pieces = [_SerializedPiece(p) for p in pieces]
|
||||
input_files = [p.path for p in pieces]
|
||||
meta = self._meta_provider(
|
||||
input_files,
|
||||
self._inferred_schema,
|
||||
pieces=pieces,
|
||||
prefetched_metadata=metadata,
|
||||
)
|
||||
block_udf, reader_args, columns, schema = (
|
||||
self._block_udf,
|
||||
self._reader_args,
|
||||
self._columns,
|
||||
self._schema,
|
||||
)
|
||||
read_tasks.append(
|
||||
ReadTask(
|
||||
lambda p=serialized_pieces: _read_pieces(
|
||||
block_udf,
|
||||
reader_args,
|
||||
columns,
|
||||
schema,
|
||||
p,
|
||||
),
|
||||
meta,
|
||||
)
|
||||
)
|
||||
|
||||
return read_tasks
|
||||
|
||||
|
||||
def _read_pieces(
|
||||
block_udf, reader_args, columns, schema, serialized_pieces: List[_SerializedPiece]
|
||||
) -> Iterator["pyarrow.Table"]:
|
||||
# Deserialize after loading the filesystem class.
|
||||
pieces: List[
|
||||
"pyarrow._dataset.ParquetFileFragment"
|
||||
] = _deserialize_pieces_with_retry(serialized_pieces)
|
||||
finally:
|
||||
_deregister_parquet_file_fragment_serialization()
|
||||
|
||||
# Ensure that we're reading at least one dataset fragment.
|
||||
assert len(pieces) > 0
|
||||
|
||||
import pyarrow as pa
|
||||
from pyarrow.dataset import _get_partition_keys
|
||||
|
||||
ctx = DatasetContext.get_current()
|
||||
output_buffer = BlockOutputBuffer(
|
||||
block_udf=_block_udf, target_max_block_size=ctx.target_max_block_size
|
||||
block_udf=block_udf,
|
||||
target_max_block_size=ctx.target_max_block_size,
|
||||
)
|
||||
|
||||
logger.debug(f"Reading {len(pieces)} parquet pieces")
|
||||
|
@ -211,7 +271,7 @@ class ParquetDatasource(ParquetBaseDatasource):
|
|||
**reader_args,
|
||||
)
|
||||
for batch in batches:
|
||||
table = pyarrow.Table.from_batches([batch], schema=schema)
|
||||
table = pa.Table.from_batches([batch], schema=schema)
|
||||
if part:
|
||||
for col, value in part.items():
|
||||
table = table.set_column(
|
||||
|
@ -228,65 +288,19 @@ class ParquetDatasource(ParquetBaseDatasource):
|
|||
if output_buffer.has_next():
|
||||
yield output_buffer.next()
|
||||
|
||||
if _block_udf is not None:
|
||||
# Try to infer dataset schema by passing dummy table through UDF.
|
||||
dummy_table = schema.empty_table()
|
||||
try:
|
||||
inferred_schema = _block_udf(dummy_table).schema
|
||||
inferred_schema = inferred_schema.with_metadata(schema.metadata)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"Failed to infer schema of dataset by passing dummy table "
|
||||
"through UDF due to the following exception:",
|
||||
exc_info=True,
|
||||
)
|
||||
inferred_schema = schema
|
||||
else:
|
||||
inferred_schema = schema
|
||||
read_tasks = []
|
||||
metadata = meta_provider.prefetch_file_metadata(pq_ds.pieces) or []
|
||||
try:
|
||||
_register_parquet_file_fragment_serialization()
|
||||
for pieces, metadata in zip(
|
||||
np.array_split(pq_ds.pieces, parallelism),
|
||||
np.array_split(metadata, parallelism),
|
||||
):
|
||||
if len(pieces) <= 0:
|
||||
continue
|
||||
serialized_pieces = cloudpickle.dumps(pieces)
|
||||
input_files = [p.path for p in pieces]
|
||||
meta = meta_provider(
|
||||
input_files,
|
||||
inferred_schema,
|
||||
pieces=pieces,
|
||||
prefetched_metadata=metadata,
|
||||
)
|
||||
read_tasks.append(
|
||||
ReadTask(lambda p=serialized_pieces: read_pieces(p), meta)
|
||||
)
|
||||
finally:
|
||||
_deregister_parquet_file_fragment_serialization()
|
||||
|
||||
return read_tasks
|
||||
|
||||
|
||||
def _fetch_metadata_remotely(
|
||||
pieces: List["pyarrow._dataset.ParquetFileFragment"],
|
||||
) -> List[ObjectRef["pyarrow.parquet.FileMetaData"]]:
|
||||
from ray import cloudpickle
|
||||
|
||||
remote_fetch_metadata = cached_remote_fn(_fetch_metadata_serialization_wrapper)
|
||||
metas = []
|
||||
parallelism = min(len(pieces) // PIECES_PER_META_FETCH, 100)
|
||||
meta_fetch_bar = ProgressBar("Metadata Fetch Progress", total=parallelism)
|
||||
try:
|
||||
_register_parquet_file_fragment_serialization()
|
||||
for pcs in np.array_split(pieces, parallelism):
|
||||
if len(pcs) == 0:
|
||||
continue
|
||||
metas.append(remote_fetch_metadata.remote(cloudpickle.dumps(pcs)))
|
||||
finally:
|
||||
_deregister_parquet_file_fragment_serialization()
|
||||
metas.append(remote_fetch_metadata.remote([_SerializedPiece(p) for p in pcs]))
|
||||
metas = meta_fetch_bar.fetch_until_complete(metas)
|
||||
return list(itertools.chain.from_iterable(metas))
|
||||
|
||||
|
@ -294,18 +308,10 @@ def _fetch_metadata_remotely(
|
|||
def _fetch_metadata_serialization_wrapper(
|
||||
pieces: str,
|
||||
) -> List["pyarrow.parquet.FileMetaData"]:
|
||||
# Implicitly trigger S3 subsystem initialization by importing
|
||||
# pyarrow.fs.
|
||||
import pyarrow.fs # noqa: F401
|
||||
|
||||
# Deserialize after loading the filesystem class.
|
||||
try:
|
||||
_register_parquet_file_fragment_serialization()
|
||||
pieces: List[
|
||||
"pyarrow._dataset.ParquetFileFragment"
|
||||
] = _deserialize_pieces_with_retry(pieces)
|
||||
finally:
|
||||
_deregister_parquet_file_fragment_serialization()
|
||||
|
||||
return _fetch_metadata(pieces)
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ from ray.data.datasource import (
|
|||
ParquetMetadataProvider,
|
||||
PathPartitionFilter,
|
||||
RangeDatasource,
|
||||
Reader,
|
||||
ReadTask,
|
||||
)
|
||||
from ray.data.datasource.file_based_datasource import (
|
||||
|
@ -39,6 +40,7 @@ from ray.data.datasource.file_based_datasource import (
|
|||
)
|
||||
from ray.types import ObjectRef
|
||||
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI
|
||||
from ray.util.placement_group import PlacementGroup
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import dask
|
||||
|
@ -56,7 +58,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
@PublicAPI
|
||||
def from_items(items: List[Any], *, parallelism: int = 200) -> Dataset[Any]:
|
||||
def from_items(items: List[Any], *, parallelism: int = -1) -> Dataset[Any]:
|
||||
"""Create a dataset from a list of local Python objects.
|
||||
|
||||
Examples:
|
||||
|
@ -75,7 +77,14 @@ def from_items(items: List[Any], *, parallelism: int = 200) -> Dataset[Any]:
|
|||
Returns:
|
||||
Dataset holding the items.
|
||||
"""
|
||||
block_size = max(1, len(items) // parallelism)
|
||||
|
||||
detected_parallelism, _ = _autodetect_parallelism(
|
||||
parallelism, ray.util.get_current_placement_group()
|
||||
)
|
||||
block_size = max(
|
||||
1,
|
||||
len(items) // detected_parallelism,
|
||||
)
|
||||
|
||||
blocks: List[ObjectRef[Block]] = []
|
||||
metadata: List[BlockMetadata] = []
|
||||
|
@ -105,7 +114,7 @@ def from_items(items: List[Any], *, parallelism: int = 200) -> Dataset[Any]:
|
|||
|
||||
|
||||
@PublicAPI
|
||||
def range(n: int, *, parallelism: int = 200) -> Dataset[int]:
|
||||
def range(n: int, *, parallelism: int = -1) -> Dataset[int]:
|
||||
"""Create a dataset from a range of integers [0..n).
|
||||
|
||||
Examples:
|
||||
|
@ -130,7 +139,7 @@ def range(n: int, *, parallelism: int = 200) -> Dataset[int]:
|
|||
|
||||
|
||||
@PublicAPI
|
||||
def range_table(n: int, *, parallelism: int = 200) -> Dataset[ArrowRow]:
|
||||
def range_table(n: int, *, parallelism: int = -1) -> Dataset[ArrowRow]:
|
||||
"""Create a tabular dataset from a range of integers [0..n).
|
||||
|
||||
Examples:
|
||||
|
@ -164,7 +173,7 @@ def range_arrow(*args, **kwargs):
|
|||
|
||||
@PublicAPI
|
||||
def range_tensor(
|
||||
n: int, *, shape: Tuple = (1,), parallelism: int = 200
|
||||
n: int, *, shape: Tuple = (1,), parallelism: int = -1
|
||||
) -> Dataset[ArrowRow]:
|
||||
"""Create a Tensor dataset from a range of integers [0..n).
|
||||
|
||||
|
@ -208,7 +217,7 @@ def range_tensor(
|
|||
def read_datasource(
|
||||
datasource: Datasource[T],
|
||||
*,
|
||||
parallelism: int = 200,
|
||||
parallelism: int = -1,
|
||||
ray_remote_args: Dict[str, Any] = None,
|
||||
**read_args,
|
||||
) -> Dataset[T]:
|
||||
|
@ -217,7 +226,9 @@ def read_datasource(
|
|||
Args:
|
||||
datasource: The datasource to read data from.
|
||||
parallelism: The requested parallelism of the read. Parallelism may be
|
||||
limited by the available partitioning of the datasource.
|
||||
limited by the available partitioning of the datasource. If set to -1,
|
||||
parallelism will be automatically chosen based on the available cluster
|
||||
resources and estimated in-memory data size.
|
||||
read_args: Additional kwargs to pass to the datasource impl.
|
||||
ray_remote_args: kwargs passed to ray.remote in the read tasks.
|
||||
|
||||
|
@ -227,6 +238,7 @@ def read_datasource(
|
|||
ctx = DatasetContext.get_current()
|
||||
# TODO(ekl) remove this feature flag.
|
||||
force_local = "RAY_DATASET_FORCE_LOCAL_METADATA" in os.environ
|
||||
cur_pg = ray.util.get_current_placement_group()
|
||||
pa_ds = _lazy_import_pyarrow_dataset()
|
||||
if pa_ds:
|
||||
partitioning = read_args.get("dataset_kwargs", {}).get("partitioning", None)
|
||||
|
@ -238,23 +250,37 @@ def read_datasource(
|
|||
force_local = True
|
||||
|
||||
if force_local:
|
||||
read_tasks = datasource.prepare_read(parallelism, **read_args)
|
||||
requested_parallelism, min_safe_parallelism, read_tasks = _get_read_tasks(
|
||||
datasource, ctx, cur_pg, parallelism, read_args
|
||||
)
|
||||
else:
|
||||
# Prepare read in a remote task so that in Ray client mode, we aren't
|
||||
# attempting metadata resolution from the client machine.
|
||||
prepare_read = cached_remote_fn(
|
||||
_prepare_read, retry_exceptions=False, num_cpus=0
|
||||
get_read_tasks = cached_remote_fn(
|
||||
_get_read_tasks, retry_exceptions=False, num_cpus=0
|
||||
)
|
||||
read_tasks = ray.get(
|
||||
prepare_read.remote(
|
||||
|
||||
requested_parallelism, min_safe_parallelism, read_tasks = ray.get(
|
||||
get_read_tasks.remote(
|
||||
datasource,
|
||||
ctx,
|
||||
cur_pg,
|
||||
parallelism,
|
||||
_wrap_and_register_arrow_serialization_workaround(read_args),
|
||||
)
|
||||
)
|
||||
|
||||
if len(read_tasks) < parallelism and (
|
||||
if read_tasks and len(read_tasks) < min_safe_parallelism * 0.7:
|
||||
perc = 1 + round((min_safe_parallelism - len(read_tasks)) / len(read_tasks), 1)
|
||||
logger.warning(
|
||||
f"The blocks of this dataset are estimated to be {perc}x larger than the "
|
||||
"target block size "
|
||||
f"of {int(ctx.target_max_block_size / 1024 / 1024)} MiB. This may lead to "
|
||||
"out-of-memory errors during processing. Consider reducing the size of "
|
||||
"input files or using `.repartition(n)` to increase the number of "
|
||||
"dataset blocks."
|
||||
)
|
||||
elif len(read_tasks) < requested_parallelism and (
|
||||
len(read_tasks) < ray.available_resources().get("CPU", 1) // 2
|
||||
):
|
||||
logger.warning(
|
||||
|
@ -289,7 +315,7 @@ def read_parquet(
|
|||
*,
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
columns: Optional[List[str]] = None,
|
||||
parallelism: int = 200,
|
||||
parallelism: int = -1,
|
||||
ray_remote_args: Dict[str, Any] = None,
|
||||
tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
|
||||
meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(),
|
||||
|
@ -348,7 +374,7 @@ def read_parquet_bulk(
|
|||
*,
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
columns: Optional[List[str]] = None,
|
||||
parallelism: int = 200,
|
||||
parallelism: int = -1,
|
||||
ray_remote_args: Dict[str, Any] = None,
|
||||
arrow_open_file_args: Optional[Dict[str, Any]] = None,
|
||||
tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
|
||||
|
@ -443,7 +469,7 @@ def read_json(
|
|||
paths: Union[str, List[str]],
|
||||
*,
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
parallelism: int = 200,
|
||||
parallelism: int = -1,
|
||||
ray_remote_args: Dict[str, Any] = None,
|
||||
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
|
||||
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
|
||||
|
@ -504,7 +530,7 @@ def read_csv(
|
|||
paths: Union[str, List[str]],
|
||||
*,
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
parallelism: int = 200,
|
||||
parallelism: int = -1,
|
||||
ray_remote_args: Dict[str, Any] = None,
|
||||
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
|
||||
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
|
||||
|
@ -568,7 +594,7 @@ def read_text(
|
|||
errors: str = "ignore",
|
||||
drop_empty_lines: bool = True,
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
parallelism: int = 200,
|
||||
parallelism: int = -1,
|
||||
ray_remote_args: Optional[Dict[str, Any]] = None,
|
||||
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
|
||||
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
|
||||
|
@ -630,7 +656,7 @@ def read_numpy(
|
|||
paths: Union[str, List[str]],
|
||||
*,
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
parallelism: int = 200,
|
||||
parallelism: int = -1,
|
||||
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
|
||||
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
|
||||
partition_filter: Optional[
|
||||
|
@ -688,7 +714,7 @@ def read_binary_files(
|
|||
*,
|
||||
include_paths: bool = False,
|
||||
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
|
||||
parallelism: int = 200,
|
||||
parallelism: int = -1,
|
||||
ray_remote_args: Dict[str, Any] = None,
|
||||
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
|
||||
meta_provider: BaseFileMetadataProvider = DefaultFileMetadataProvider(),
|
||||
|
@ -1072,12 +1098,92 @@ def _get_metadata(table: Union["pyarrow.Table", "pandas.DataFrame"]) -> BlockMet
|
|||
)
|
||||
|
||||
|
||||
def _prepare_read(
|
||||
ds: Datasource, ctx: DatasetContext, parallelism: int, kwargs: dict
|
||||
) -> List[ReadTask]:
|
||||
def _get_read_tasks(
|
||||
ds: Datasource,
|
||||
ctx: DatasetContext,
|
||||
cur_pg: Optional[PlacementGroup],
|
||||
parallelism: int,
|
||||
kwargs: dict,
|
||||
) -> (int, int, List[ReadTask]):
|
||||
"""Generates read tasks.
|
||||
|
||||
Args:
|
||||
ds: Datasource to read from.
|
||||
ctx: Dataset config to use.
|
||||
cur_pg: The current placement group, if any.
|
||||
parallelism: The user-requested parallelism, or -1 for autodetection.
|
||||
kwargs: Additional kwargs to pass to the reader.
|
||||
|
||||
Returns:
|
||||
Request parallelism from the datasource, the min safe parallelism to avoid
|
||||
OOM, and the list of read tasks generated.
|
||||
"""
|
||||
kwargs = _unwrap_arrow_serialization_workaround(kwargs)
|
||||
DatasetContext._set_current(ctx)
|
||||
return ds.prepare_read(parallelism, **kwargs)
|
||||
reader = ds.create_reader(**kwargs)
|
||||
requested_parallelism, min_safe_parallelism = _autodetect_parallelism(
|
||||
parallelism, cur_pg, reader
|
||||
)
|
||||
return (
|
||||
requested_parallelism,
|
||||
min_safe_parallelism,
|
||||
reader.get_read_tasks(requested_parallelism),
|
||||
)
|
||||
|
||||
|
||||
def _autodetect_parallelism(
|
||||
parallelism: int, cur_pg: Optional[PlacementGroup], reader: Optional[Reader] = None
|
||||
) -> (int, int):
|
||||
"""Returns parallelism to use and the min safe parallelism to avoid OOMs."""
|
||||
# Autodetect parallelism requested. The heuristic here are that we should try
|
||||
# to create as many blocks needed to saturate available resources, and also keep
|
||||
# block sizes below the target memory size, but no more. Creating too many
|
||||
# blocks is inefficient.
|
||||
min_safe_parallelism = 1
|
||||
ctx = DatasetContext.get_current()
|
||||
if reader:
|
||||
mem_size = reader.estimate_inmemory_data_size()
|
||||
if mem_size is not None:
|
||||
min_safe_parallelism = max(1, int(mem_size / ctx.target_max_block_size))
|
||||
else:
|
||||
mem_size = None
|
||||
if parallelism < 0:
|
||||
if parallelism != -1:
|
||||
raise ValueError("`parallelism` must either be -1 or a positive integer.")
|
||||
# Start with 2x the number of cores as a baseline, with a min floor.
|
||||
avail_cpus = _estimate_avail_cpus(cur_pg)
|
||||
parallelism = max(ctx.min_parallelism, min_safe_parallelism, avail_cpus * 2)
|
||||
logger.debug(
|
||||
f"Autodetected parallelism={parallelism} based on "
|
||||
f"estimated_available_cpus={avail_cpus} and "
|
||||
f"estimated_data_size={mem_size}."
|
||||
)
|
||||
return parallelism, min_safe_parallelism
|
||||
|
||||
|
||||
def _estimate_avail_cpus(cur_pg: Optional[PlacementGroup]) -> int:
|
||||
cluster_cpus = int(ray.cluster_resources().get("CPU", 1))
|
||||
cluster_gpus = int(ray.cluster_resources().get("GPU", 0))
|
||||
|
||||
# If we're in a placement group, we shouldn't assume the entire cluster's
|
||||
# resources are available for us to use. Estimate an upper bound on what's
|
||||
# reasonable to assume is available for datasets to use.
|
||||
if cur_pg:
|
||||
pg_cpus = 0
|
||||
for bundle in cur_pg.bundle_specs:
|
||||
# Calculate the proportion of the cluster this placement group "takes up".
|
||||
# Then scale our cluster_cpus proportionally to avoid over-parallelizing
|
||||
# if there are many parallel Tune trials using the cluster.
|
||||
cpu_fraction = bundle.get("CPU", 0) / max(1, cluster_cpus)
|
||||
gpu_fraction = bundle.get("GPU", 0) / max(1, cluster_gpus)
|
||||
max_fraction = max(cpu_fraction, gpu_fraction)
|
||||
# Over-parallelize by up to a factor of 2, but no more than that. It's
|
||||
# preferrable to over-estimate than under-estimate.
|
||||
pg_cpus += 2 * int(max_fraction * cluster_cpus)
|
||||
|
||||
return min(cluster_cpus, pg_cpus)
|
||||
|
||||
return cluster_cpus
|
||||
|
||||
|
||||
def _resolve_parquet_args(
|
||||
|
|
53
python/ray/data/tests/test_auto_parallelism.py
Normal file
53
python/ray/data/tests/test_auto_parallelism.py
Normal file
|
@ -0,0 +1,53 @@
|
|||
import pytest
|
||||
|
||||
import ray
|
||||
from ray.data.context import DatasetContext
|
||||
from ray.tests.conftest import * # noqa
|
||||
|
||||
|
||||
def test_auto_parallelism_basic(shutdown_only):
|
||||
ray.init(num_cpus=8)
|
||||
context = DatasetContext.get_current()
|
||||
context.min_parallelism = 1
|
||||
# Datasource bound.
|
||||
ds = ray.data.range_tensor(5, shape=(100,), parallelism=-1)
|
||||
assert ds.num_blocks() == 5, ds
|
||||
# CPU bound. TODO(ekl) we should fix range datasource to respect parallelism more
|
||||
# properly, currently it can go a little over.
|
||||
ds = ray.data.range_tensor(10000, shape=(100,), parallelism=-1)
|
||||
assert ds.num_blocks() == 16, ds
|
||||
# Block size bound.
|
||||
ds = ray.data.range_tensor(100000000, shape=(100,), parallelism=-1)
|
||||
assert ds.num_blocks() == 150, ds
|
||||
|
||||
|
||||
def test_auto_parallelism_placement_group(shutdown_only):
|
||||
ray.init(num_cpus=16, num_gpus=8)
|
||||
|
||||
@ray.remote
|
||||
def run():
|
||||
context = DatasetContext.get_current()
|
||||
context.min_parallelism = 1
|
||||
ds = ray.data.range_tensor(10000, shape=(100,), parallelism=-1)
|
||||
return ds.num_blocks()
|
||||
|
||||
# 1/16 * 4 * 16 = 4
|
||||
pg = ray.util.placement_group([{"CPU": 1}])
|
||||
num_blocks = ray.get(run.options(placement_group=pg).remote())
|
||||
assert num_blocks == 4, num_blocks
|
||||
|
||||
# 2/16 * 4 * 16 = 8
|
||||
pg = ray.util.placement_group([{"CPU": 2}])
|
||||
num_blocks = ray.get(run.options(placement_group=pg).remote())
|
||||
assert num_blocks == 8, num_blocks
|
||||
|
||||
# 1/8 * 4 * 16 = 8
|
||||
pg = ray.util.placement_group([{"CPU": 1, "GPU": 1}])
|
||||
num_blocks = ray.get(run.options(placement_group=pg).remote())
|
||||
assert num_blocks == 8, num_blocks
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
|
@ -377,7 +377,7 @@ def test_zip_arrow(ray_start_regular_shared):
|
|||
def test_batch_tensors(ray_start_regular_shared):
|
||||
import torch
|
||||
|
||||
ds = ray.data.from_items([torch.tensor([0, 0]) for _ in range(40)])
|
||||
ds = ray.data.from_items([torch.tensor([0, 0]) for _ in range(40)], parallelism=40)
|
||||
res = "Dataset(num_blocks=40, num_rows=40, schema=<class 'torch.Tensor'>)"
|
||||
assert str(ds) == res, str(ds)
|
||||
with pytest.raises(pa.lib.ArrowInvalid):
|
||||
|
|
|
@ -36,6 +36,7 @@ from ray.data.datasource import (
|
|||
from ray.data.datasource.file_based_datasource import _unwrap_protocol
|
||||
from ray.data.datasource.parquet_datasource import (
|
||||
PARALLELIZE_META_FETCH_THRESHOLD,
|
||||
_SerializedPiece,
|
||||
_deserialize_pieces_with_retry,
|
||||
)
|
||||
from ray.data.tests.conftest import * # noqa
|
||||
|
@ -360,7 +361,6 @@ def test_read_pandas_data_array_column(ray_start_regular_shared):
|
|||
def test_parquet_deserialize_pieces_with_retry(
|
||||
ray_start_regular_shared, fs, data_path, monkeypatch
|
||||
):
|
||||
from ray import cloudpickle
|
||||
|
||||
setup_data_path = _unwrap_protocol(data_path)
|
||||
df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
|
||||
|
@ -376,7 +376,7 @@ def test_parquet_deserialize_pieces_with_retry(
|
|||
pq_ds = pq.ParquetDataset(
|
||||
data_path, **dataset_kwargs, filesystem=fs, use_legacy_dataset=False
|
||||
)
|
||||
serialized_pieces = cloudpickle.dumps(pq_ds.pieces)
|
||||
serialized_pieces = [_SerializedPiece(p) for p in pq_ds.pieces]
|
||||
|
||||
# test 1st attempt succeed
|
||||
pieces = _deserialize_pieces_with_retry(serialized_pieces)
|
||||
|
|
|
@ -161,7 +161,7 @@ def test_split_read_parquet(ray_start_regular_shared, tmp_path):
|
|||
ray.data.range(200000, parallelism=1).map(
|
||||
lambda _: uuid.uuid4().hex
|
||||
).write_parquet(path)
|
||||
return ray.data.read_parquet(path)
|
||||
return ray.data.read_parquet(path, parallelism=200)
|
||||
|
||||
# 20MiB
|
||||
ctx.target_max_block_size = 20_000_000
|
||||
|
|
Loading…
Add table
Reference in a new issue