This exposes a low-cost way to perform a pseudo global shuffle.
For extremely large datasets that span multiple nodes, contiguous blocks will often be colocated on the same node. This leads to hot spots during iteration of the dataset in which single nodes (1) must send a lot of data over the network, and (2) perform lots of disk reads if the dataset is spilled to disk.
This allows the workload to be spread across the nodes on which the dataset blocks are on.
Currently, team:ml spans all ML (Tune, Train, AIR) tests and rllib tests. rllib tests are much more flaky and it would be good to split them up in the flaky test tracker. This PR changes Rllib-tests from team:ml to team:rllib to enable this separation.
Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
Unreverts #24812, skipping the memory releasing tests that are already flaky. We have a separate issue tracking the unskipping of these memory releasing tests, once we find a more reliable way to test them.
* Revert "Revert "Revert "Revert "[Datasets] [Tensor Story - 1/2] Automatically provide tensor views to UDFs and infer tensor blocks for pure-tensor datasets."" (#25031)" (#25057)"
This reverts commit fb2933a78f.
* Skip shuffle memory release test.
**Update**: This PR is now part 3 of a three PR group to consolidate the checkpoints.
1. Part 1 adds the common checkpoint management class #24771
2. Part 2 adds the integration for Ray Train #24772
3. This PR builds on #24772 and includes all changes. It moves the Ray Tune integration to use the new common checkpoint manager class.
Old PR description:
This PR consolidates the Ray Train and Tune checkpoint managers. These concepts previously did something very similar but in different modules. To simplify maintenance in the future, we've consolidated the common core.
- This PR keeps full compatibility with the previous interfaces and implementations. This means that for now, Train and Tune will have separate CheckpointManagers that both extend the common core
- This PR prepares Tune to move to a CheckpointStrategy object
- In follow-up PRs, we can further unify interfacing with the common core, possibly removing any train- or tune-specific adjustments (e.g. moving to setup on init rather on runtime for Ray Train)
Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
Follow up on our last discussion for supporting piecemeal fashion air users.
Only did for tensorflow for now, want to collect some feedback on API naming, package structure etc and I will add others.
This adds the following options to DatasetConfig, which can be used to enable streaming ingest.
```
# Whether the dataset should be streamed into memory using pipelined reads.
# When enabled, get_dataset_shard() returns DatasetPipeline instead of Dataset.
# The amount of memory to use is controlled by `stream_window_size`.
# False by default for all datasets.
use_stream_api: Optional[bool] = None
# Configure the streaming window size in bytes. A typical value is something like
# 20% of object store memory. If set to -1, then an infinite window size will be
# used (similar to bulk ingest). This only has an effect if use_stream_api is set.
# Set to 1.0 GiB by default.
stream_window_size: Optional[float] = None
# Whether to enable global shuffle (per pipeline window in streaming mode). Note
# that this is an expensive all-to-all operation, and most likely you want to use
# local shuffle instead.
# False by default for all datasets.
global_shuffle: Optional[bool] = None
```
This adds a per-dataset config object to DataParallelTrainer. These configs define how the Dataset should be read into the DataParallelTrainer. It configures the preprocessing, splitting, and ingest strategy per-dataset. DataParallelTrainers declare default DatasetConfigs for each dataset passed in the ``datasets`` argument. Users have the opportunity to selectively override these configs by passing the ``dataset_config`` argument. Trainers can also define user customizable values (e.g., XGBoostTrainer doesn't support streaming ingest).
This PR adds the minimal support for dataset configs. Future PRs will:
- Add support for streaming ingest
- Move this config from DataParallelTrainer to ml.Trainer
The package "ml" should be renamed to "air".
Main question: Keep a `ml.py` with `from ray.air import *` for some level of backwards compatibility?
I'd go for no to force people to use the new structure.
The TrialExecutor base class was a stub and has been deprecated long ago; direct inheritance was disabled. This PR removes the base class and moves the remaining functionality into the RayTrialExecutor.
The Datasets UX assessment showed that users had difficulties in writing UDFs: what's input/output types, how to write the function etc.
Co-authored-by: Ubuntu <ubuntu@ip-172-31-32-136.us-west-2.compute.internal>
Duplicate for #25247.
Adds a fix for Dask-on-Ray. Previously, for tasks with multiple return values, we implicitly allowed returning a dict with the return index as the key. This was used by Dask-on-Ray, but this is not documented behavior, and we now require task returns to be iterable instead.