mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Raise the dataset block size limit to 2GiB (#20551)
The default block size of 500MiB seems too low for some common workloads, e.g. shuffling 500GB. This creates 1000 blocks which means 1 million intermediate shuffle objects until we implement #20500.
This commit is contained in:
parent
2d50bf1302
commit
65a8698e82
2 changed files with 6 additions and 4 deletions
|
@ -15,7 +15,7 @@ Datasets uses Ray tasks to read data from remote storage. When reading from a fi
|
|||
..
|
||||
https://docs.google.com/drawings/d/15B4TB8b5xN15Q9S8-s0MjW6iIvo_PrH7JtV1fL123pU/edit
|
||||
|
||||
In the common case, each read task produces a single output block. Read tasks may split the output into multiple blocks if the data exceeds the target max block size (500MiB by default). This automatic block splitting avoids out-of-memory errors when reading very large single files (e.g., a 100-gigabyte CSV file). All of the built-in datasources except for JSON currently support automatic block splitting.
|
||||
In the common case, each read task produces a single output block. Read tasks may split the output into multiple blocks if the data exceeds the target max block size (2GiB by default). This automatic block splitting avoids out-of-memory errors when reading very large single files (e.g., a 100-gigabyte CSV file). All of the built-in datasources except for JSON currently support automatic block splitting.
|
||||
|
||||
Deferred Read Task Execution
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
@ -26,7 +26,7 @@ When a Dataset is created using ``ray.data.read_*``, only the first read task wi
|
|||
Dataset Transforms
|
||||
------------------
|
||||
|
||||
Datasets use either Ray tasks or Ray actors to transform datasets (i.e., for ``.map``, ``.flat_map``, or ``.map_batches``). By default, tasks are used (``compute="tasks"``). Actors can be specified with ``compute="actors"``, in which case an autoscaling pool of Ray actors will be used to apply transformations. Using actors allows for expensive state initialization (e.g., for GPU-based tasks) to be re-used. Whichever compute strategy is used, each map task generally takes in one block and produces one or more output blocks. The output block splitting rule is the same as for file reads (blocks are split after hitting the target max block size of 500MiB):
|
||||
Datasets use either Ray tasks or Ray actors to transform datasets (i.e., for ``.map``, ``.flat_map``, or ``.map_batches``). By default, tasks are used (``compute="tasks"``). Actors can be specified with ``compute="actors"``, in which case an autoscaling pool of Ray actors will be used to apply transformations. Using actors allows for expensive state initialization (e.g., for GPU-based tasks) to be re-used. Whichever compute strategy is used, each map task generally takes in one block and produces one or more output blocks. The output block splitting rule is the same as for file reads (blocks are split after hitting the target max block size of 2GiB):
|
||||
|
||||
.. image:: dataset-map.svg
|
||||
:width: 650px
|
||||
|
@ -105,6 +105,8 @@ The number of read tasks can also be increased by increasing the ``parallelism``
|
|||
Tuning Max Block Size
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The max target block size can be adjusted via the Dataset context API. For example, to configure a max target block size of 64MB, run ``ray.data.context.DatasetContext.get_current().target_max_block_size = 64_000_000`` prior to creating the Dataset. Lower block sizes reduce the max amount of object store and Python heap memory required during execution. However, having too many blocks may introduce task scheduling overheads. We do not recommend adjusting this value for most workloads.
|
||||
The max target block size can be adjusted via the Dataset context API. For example, to configure a max target block size of 8GiB, run ``ray.data.context.DatasetContext.get_current().target_max_block_size = 8192 * 1024 * 1024`` prior to creating the Dataset. Lower block sizes reduce the max amount of object store and Python heap memory required during execution. However, having too many blocks may introduce task scheduling overheads.
|
||||
|
||||
We do not recommend adjusting this value for most workloads. However, if shuffling a large amount of data, increasing the block size limit reduces the number of intermediate blocks (as a rule of thumb, shuffle creates ``O(num_blocks**2)`` intermediate blocks). Alternatively, you can ``.repartition()`` the dataset to reduce the number of blocks prior to shuffle/groupby operations. If you're seeing out of memory errors during map tasks, reducing the max block size may also be worth trying.
|
||||
|
||||
Note that the number of blocks a Dataset created from ``ray.data.read_*`` contains is not fully known until all read tasks are fully executed. The number of blocks printed in the Dataset's string representation is initially set to the number of read tasks generated. To view the actual number of blocks created after block splitting, use ``len(ds.get_internal_block_refs())``, which will block until all data has been read.
|
||||
|
|
|
@ -9,7 +9,7 @@ _default_context: "Optional[DatasetContext]" = None
|
|||
_context_lock = threading.Lock()
|
||||
|
||||
# The max target block size in bytes for reads and transformations.
|
||||
DEFAULT_TARGET_MAX_BLOCK_SIZE = 500 * 1024 * 1024
|
||||
DEFAULT_TARGET_MAX_BLOCK_SIZE = 2048 * 1024 * 1024
|
||||
|
||||
|
||||
@DeveloperAPI
|
||||
|
|
Loading…
Add table
Reference in a new issue