[docs] Cleanup the Datasets key concept docs (#26908)

Clean up the Datasets key concept doc to be suitable for consumption by a beginner level user and improving the diagrams.
This commit is contained in:
Eric Liang 2022-07-22 23:30:54 -07:00 committed by GitHub
parent 042450d319
commit 63a6c1dfac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 88 additions and 145 deletions

View file

@ -83,7 +83,7 @@ Advanced users can refer directly to the Ray Datasets :ref:`API reference <data_
Understand the key concepts behind Ray Datasets.
Learn what :ref:`Datasets<dataset_concept>` and :ref:`Dataset Pipelines<dataset_pipeline_concept>` are
and :ref:`how they get executed<dataset_execution_concept>` in Ray Datasets.
and how they get executed in Ray Datasets.
+++
.. link-button:: data_key_concepts

View file

@ -12,11 +12,10 @@ def objective(*args):
# Create a cluster with 4 CPU slots available.
ray.init(num_cpus=4)
# This runs, since Tune schedules one trial on 1 CPU, leaving 3 spare CPUs in the
# cluster for Dataset execution. However, deadlock can occur if you set num_samples=4,
# which would leave no extra CPUs for Datasets! To resolve these issues, see the
# "Inside Trial Placement Group" example tab.
tune.run(objective, num_samples=1, resources_per_trial={"cpu": 1})
# By setting `max_concurrent_trials=3`, this ensures the cluster will always
# have a sparse CPU for Datasets. Try setting `max_concurrent_trials=4` here,
# and notice that the experiment will appear to hang.
tune.run(objective, num_samples=4, max_concurrent_trials=3)
# __resource_allocation_1_end__
# fmt: on

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 62 KiB

After

Width:  |  Height:  |  Size: 155 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 84 KiB

After

Width:  |  Height:  |  Size: 139 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 102 KiB

After

Width:  |  Height:  |  Size: 102 KiB

View file

@ -4,23 +4,16 @@
Key Concepts
============
To work with Ray Datasets, you need to understand how Datasets and Dataset Pipelines work.
You might also be interested to learn about the execution model of Ray Datasets operations.
.. _dataset_concept:
--------
Datasets
--------
Ray Datasets implements `Distributed Arrow <https://arrow.apache.org/>`__.
A Dataset consists of a list of Ray object references to *blocks*.
Each block holds a set of items in either an `Arrow table <https://arrow.apache.org/docs/python/data.html#tables>`__
(when creating from or transforming to tabular or tensor data), a `Pandas DataFrame <https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html>`__
(when creating from or transforming to Pandas data), or a Python list (otherwise).
Having multiple blocks in a dataset allows for parallel transformation and ingest of the data
(e.g., into :ref:`Ray Train <train-docs>` for ML training).
Each block holds a set of items in either `Arrow table format <https://arrow.apache.org/docs/python/data.html#tables>`__
or a Python list (for non-tabular data).
Having multiple blocks in a dataset allows for parallel transformation and ingest of the data.
The following figure visualizes a Dataset that has three Arrow table blocks, each block holding 1000 rows each:
@ -33,11 +26,57 @@ Since a Dataset is just a list of Ray object references, it can be freely passed
actors, and libraries like any other object reference.
This flexibility is a unique characteristic of Ray Datasets.
Compared to `Spark RDDs <https://spark.apache.org/docs/latest/rdd-programming-guide.html>`__
and `Dask Bags <https://docs.dask.org/en/latest/bag.html>`__, Ray Datasets offers a more basic set of features,
and executes operations eagerly for simplicity.
It is intended that users cast Datasets into more feature-rich dataframe types (e.g.,
:meth:`ds.to_dask() <ray.data.Dataset.to_dask>`) for advanced operations.
Reading Data
============
Datasets uses Ray tasks to read data from remote storage. When reading from a file-based datasource (e.g., S3, GCS), it creates a number of read tasks proportional to the number of CPUs in the cluster. Each read task reads its assigned files and produces an output block:
.. image:: images/dataset-read.svg
:align: center
..
https://docs.google.com/drawings/d/15B4TB8b5xN15Q9S8-s0MjW6iIvo_PrH7JtV1fL123pU/edit
The parallelism can also be manually specified, but the final parallelism for a read is always capped by the number of files in the underlying dataset. See the :ref:`Creating Datasets Guide <creating_datasets>` for an in-depth guide
on creating datasets.
Dataset Transforms
==================
Datasets can use either Ray tasks or Ray actors to transform datasets. By default, tasks are used. Actors can be specified using ``compute=ActorPoolStrategy()``, which creates an autoscaling pool of Ray actors to process transformations. Using actors allows for expensive state initialization (e.g., for GPU-based tasks) to be cached:
.. image:: images/dataset-map.svg
:align: center
..
https://docs.google.com/drawings/d/12STHGV0meGWfdWyBlJMUgw7a-JcFPu9BwSOn5BjRw9k/edit
See the :ref:`Transforming Datasets Guide <transforming_datasets>` for an in-depth guide
on transforming datasets.
Shuffling Data
==============
Certain operations like *sort* or *groupby* require data blocks to be partitioned by value, or *shuffled*. Datasets uses tasks to implement distributed shuffles in a map-reduce style, using map tasks to partition blocks by value, and then reduce tasks to merge co-partitioned blocks together.
You can also change just the number of blocks of a Dataset using :meth:`ds.repartition() <ray.data.Dataset.repartition>`. Repartition has two modes, ``shuffle=False``, which performs the minimal data movement needed to equalize block sizes, and ``shuffle=True``, which performs a full distributed shuffle:
.. image:: images/dataset-shuffle.svg
:align: center
..
https://docs.google.com/drawings/d/132jhE3KXZsf29ho1yUdPrCHB9uheHBWHJhDQMXqIVPA/edit
Datasets shuffle can scale to processing hundreds of terabytes of data. See the :ref:`Performance Tips Guide <shuffle_performance_tips>` for an in-depth guide on shuffle performance.
Fault tolerance
===============
Datasets relies on :ref:`task-based fault tolerance <task-fault-tolerance>` in Ray core. Specifically, a Dataset will be automatically recovered by Ray in case of failures. This works through *lineage reconstruction*: a Dataset is a collection of Ray objects stored in shared memory, and if any of these objects are lost, then Ray will recreate them by re-executing the tasks that created them.
There are a few cases that are not currently supported:
* If the original worker process that created the Dataset dies. This is because the creator stores the metadata for the :ref:`objects <object-fault-tolerance>` that comprise the Dataset.
* When ``compute=ActorPoolStrategy()`` is specified for transformations.
.. _dataset_pipeline_concept:
@ -45,141 +84,35 @@ It is intended that users cast Datasets into more feature-rich dataframe types (
Dataset Pipelines
-----------------
Dataset pipelines allow Dataset transformations to be executed incrementally on *windows* of the base data, instead of on all of the data at once. This can be used for streaming data loading into ML training, or to execute batch transformations on large datasets without needing to load the entire dataset into cluster memory.
Datasets execute their transformations synchronously in blocking calls. However, it can be useful to overlap dataset computations with output. This can be done with a `DatasetPipeline <package-ref.html#datasetpipeline-api>`__.
See the :ref:`Dataset Pipelines Guide <pipelining_datasets>` for an in-depth guide on pipelining compute.
A DatasetPipeline is an unified iterator over a (potentially infinite) sequence of Ray Datasets, each of which represents a *window* over the original data. Conceptually it is similar to a `Spark DStream <https://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams>`__, but manages execution over a bounded amount of source data instead of an unbounded stream. Ray computes each dataset window on-demand and stitches their output together into a single logical data iterator. DatasetPipeline implements most of the same transformation and output methods as Datasets (e.g., map, filter, split, iter_rows, to_torch, etc.).
----------------------------
Datasets and Other Libraries
----------------------------
.. _dataset_execution_concept:
------------------------
Datasets Execution Model
------------------------
This page overviews the execution model of Datasets, which may be useful for understanding and tuning performance.
Reading Data
============
Datasets uses Ray tasks to read data from remote storage. When reading from a file-based datasource (e.g., S3, GCS), it creates a number of read tasks equal to the specified read parallelism (autodetected by default). One or more files will be assigned to each read task. Each read task reads its assigned files and produces one or more output blocks (Ray objects):
.. image:: images/dataset-read.svg
:width: 650px
:align: center
..
https://docs.google.com/drawings/d/15B4TB8b5xN15Q9S8-s0MjW6iIvo_PrH7JtV1fL123pU/edit
In the common case, each read task produces a single output block. Read tasks may split the output into multiple blocks if the data exceeds the target max block size (2GiB by default). This automatic block splitting avoids out-of-memory errors when reading very large single files (e.g., a 100-gigabyte CSV file). All of the built-in datasources except for JSON currently support automatic block splitting.
.. note::
Block splitting is off by default. See the :ref:`performance section <data_performance_tips>` on how to enable block splitting (beta).
.. _dataset_defeferred_reading:
Deferred Read Task Execution
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When a Dataset is created using ``ray.data.read_*``, only the first read task will be
executed initially. This avoids blocking Dataset creation on the reading of all data
files, enabling inspection functions like :meth:`ds.schema() <ray.data.Dataset.schema>``
and :meth:`ds.show() <ray.data.Dataset.show>` to be used right away. Executing further
transformations on the Dataset will trigger execution of all read tasks.
See the :ref:`Creating Datasets guide <creating_datasets>` for details on how to read
data into datasets.
Dataset Transforms
==================
Datasets use either Ray tasks or Ray actors to transform datasets (i.e., for
:meth:`ds.map_batches() <ray.data.Dataset.map_batches>`,
:meth:`ds.map() <ray.data.Dataset.map>`, or
:meth:`ds.flat_map() <ray.data.Dataset.flat_map>`). By default, tasks are used (``compute="tasks"``). Actors can be specified with ``compute="actors"``, in which case an autoscaling pool of Ray actors will be used to apply transformations. Using actors allows for expensive state initialization (e.g., for GPU-based tasks) to be re-used. Whichever compute strategy is used, each map task generally takes in one block and produces one or more output blocks. The output block splitting rule is the same as for file reads (blocks are split after hitting the target max block size of 2GiB):
.. image:: images/dataset-map.svg
:width: 650px
:align: center
..
https://docs.google.com/drawings/d/1MGlGsPyTOgBXswJyLZemqJO1Mf7d-WiEFptIulvcfWE/edit
See the :ref:`Transforming Datasets guide <transforming_datasets>` for an in-depth guide
on transforming datasets.
Shuffling Data
==============
Certain operations like :meth:`ds.sort() <ray.data.Dataset.sort>` and
:meth:`ds.groupby() <ray.data.Dataset.groupby>` require data blocks to be partitioned by value. Datasets executes this in three phases. First, a wave of sampling tasks determines suitable partition boundaries based on a random sample of data. Second, map tasks divide each input block into a number of output blocks equal to the number of reduce tasks. Third, reduce tasks take assigned output blocks from each map task and combines them into one block. Overall, this strategy generates ``O(n^2)`` intermediate objects where ``n`` is the number of input blocks.
You can also change the partitioning of a Dataset using :meth:`ds.random_shuffle()
<ray.data.Dataset.random_shuffle>` or
:meth:`ds.repartition() <ray.data.Dataset.repartition>`. The former should be used if you want to randomize the order of elements in the dataset. The second should be used if you only want to equalize the size of the Dataset blocks (e.g., after a read or transformation that may skew the distribution of block sizes). Note that repartition has two modes, ``shuffle=False``, which performs the minimal data movement needed to equalize block sizes, and ``shuffle=True``, which performs a full (non-random) distributed shuffle:
.. image:: images/dataset-shuffle.svg
:width: 650px
:align: center
..
https://docs.google.com/drawings/d/132jhE3KXZsf29ho1yUdPrCHB9uheHBWHJhDQMXqIVPA/edit
Fault tolerance
===============
Datasets relies on :ref:`task-based fault tolerance <task-fault-tolerance>` in Ray core. Specifically, a ``Dataset`` will be automatically recovered by Ray in case of failures. This works through **lineage reconstruction**: a Dataset is a collection of Ray objects stored in shared memory, and if any of these objects are lost, then Ray will recreate them by re-executing the task(s) that created them.
There are a few cases that are not currently supported:
1. If the original creator of the ``Dataset`` dies. This is because the creator stores the metadata for the :ref:`objects <object-fault-tolerance>` that comprise the ``Dataset``.
2. For a :meth:`DatasetPipeline.split() <ray.data.DatasetPipeline.split>`, we do not support recovery for a consumer failure. When there are multiple consumers, they must all read the split pipeline in lockstep. To recover from this case, the pipeline and all consumers must be restarted together.
3. The ``compute=actors`` option for transformations.
Execution and Memory Management
===============================
See :ref:`Execution and Memory Management <data_advanced>` for more details about how Datasets manages memory and optimizations such as lazy vs eager execution.
-------------------------
Resource Allocation Model
-------------------------
Unlike libraries like Tune and Train, Datasets does not use placement groups to allocate
resources for execution (its tasks and actor pools). Instead, Datasets makes plain
CPU/GPU resource requests to the cluster, *ignoring placement groups by default*. This
can be thought of as Datasets requesting resources from the margins of the cluster,
outside of those ML library placement groups.
To avoid hangs or CPU starvation of Datasets when used with Tune or Train, you can
exclude a fraction of CPUs from placement group scheduling, using the
``_max_cpu_fraction_per_node`` placement group option (Experimental).
Example: Datasets in Tune
=========================
When using Datasets in conjunction with other Ray libraries, it is important to ensure there are enough free CPUs for Datasets to run on. Libraries such as Tune by default try to fully utilize cluster CPUs. This can prevent Datasets from scheduling tasks, reducing performance or causing workloads to hang.
.. _datasets_tune:
Here's an example of how you can configure Datasets to run within Tune trials, which
is the typical case of when you'd encounter placement groups with Datasets. Two
scenarios are shown: running outside the trial group using spare resources, and running with reserved resources.
As an example, the following shows two ways to use Datasets together with the Ray Tune library:
.. tabbed:: Using Spare Cluster Resources
.. tabbed:: Limiting Tune Concurrency
By default, Dataset tasks escape the trial placement group. This means they will use
spare cluster resources for execution, which can be problematic since the availability
of such resources is not guaranteed.
By limiting the number of concurrent Tune trials, we ensure CPU resources are always available for Datasets execution.
This can be done using the ``max_concurrent_trials`` Tune option.
.. literalinclude:: ./doc_code/key_concepts.py
:language: python
:start-after: __resource_allocation_1_begin__
:end-before: __resource_allocation_1_end__
.. tabbed:: Using Reserved CPUs (Experimental)
.. tabbed:: Reserving CPUs (Experimental)
The ``_max_cpu_fraction_per_node`` option can be used to exclude CPUs from placement
group scheduling. In the below example, setting this parameter to ``0.8`` enables Tune
trials to run smoothly without risk of deadlock by reserving 20% of node CPUs for
Dataset execution.
Alternatively, we can tell Tune to set aside CPU resources for other libraries.
This can be done by setting ``_max_cpu_fraction_per_node=0.8``, which reserves
20% of node CPUs for Dataset execution.
.. literalinclude:: ./doc_code/key_concepts.py
:language: python
@ -188,5 +121,5 @@ scenarios are shown: running outside the trial group using spare resources, and
.. warning::
``_max_cpu_fraction_per_node`` is experimental and not currently recommended for use with
This option is experimental and not currently recommended for use with
autoscaling clusters (scale-up will not trigger properly).

View file

@ -136,6 +136,17 @@ Note that the number of blocks a Dataset created from ``ray.data.read_*`` contai
The number of blocks printed in the Dataset's string representation is initially set to the number of read tasks generated.
To view the actual number of blocks created after block splitting, use ``len(ds.get_internal_block_refs())``, which will block until all data has been read.
Datasets and Placement Groups
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
By default, Datasets configures its tasks and actors to use the cluster-default scheduling strategy ("DEFAULT"). You can inspect this configuration variable here:
``ray.data.context.DatasetContext.get_current().scheduling_strategy``. This scheduling strategy will schedule these tasks and actors outside any present
placement group. If you want to force Datasets to schedule tasks within the current placement group (i.e., to use current placement group resources specifically for Datasets), you can set ``ray.data.context.DatasetContext.get_current().scheduling_strategy = None``.
This should be considered for advanced use cases to improve performance predictability only. We generally recommend letting Datasets run outside placement groups as documented in the :ref:`Datasets and Other Libraries <datasets_tune>` section.
.. _shuffle_performance_tips:
Improving shuffle performance
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~