1. Dataset pipeline is advanced usage of Ray Dataset, which should not jam into the Getting Started page
2. We already have a separate/dedicated page called Pipelining Compute to cover the same content
- Adds links to Job Submission from existing library tutorials where `ray submit` is used. When Jobs becomes GA, we should fully replace the uses of `ray submit` with Ray job submission and ensure this is tested.
- Adds docstrings for the Jobs SDK, which automatically show up in the API reference
- Improve the Job Submission main page
- Add a "Deployment Guide" landing page explaining when to use Ray Client vs Ray Jobs
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
This PR adds experimental support for random access to datasets. A Dataset can be random access enabled by calling `ds.to_random_access_dataset(key, num_workers=N)`. This creates a RandomAccessDataset.
RandomAccessDataset partitions the dataset across the cluster by the given sort key, providing efficient random access to records via binary search. A number of worker actors are created, each of which has zero-copy access to the underlying sorted data blocks of the Dataset.
Performance-wise, you can expect each worker to provide ~3000 records / second via ``get_async()``, and ~10000 records / second via ``multiget()``.
Since Ray actor calls go direct from worker->worker, throughput scales linearly with the number of workers.
This PR properly exposes `TableRow` as a public API (API docs + the "Public" tag), since it's already exposed to the user in our row-based ops. In addition, the following changes are made:
1. During row-based ops, we also choose a batch format that lines up with the current dataset format in order to eliminate unnecessary copies and type conversions.
2. `TableRow` now derives from `collections.abc.Mapping`, which lets `TableRow` better interop with code expecting a mapping, and includes a few helpful mixins so we only have to implement `__getitem__`, `__iter__`, and `__len__`.
Preview: [docs](https://ray--21931.org.readthedocs.build/en/21931/data/dataset.html)
The Ray Data project's docs now have a clearer structure and have partly been rewritten/modified. In particular we have
- [x] A Getting Started Guide
- [x] An explicit User / How-To Guide
- [x] A dedicated Key Concepts page
- [x] A consistent naming convention in `Ray Data` whenever is is referred to the project.
This surfaces quite clearly that, apart from the "Getting Started" sections, we really only have one real example. Once we have more, we can create an "Example" section like many other sub-projects have. This will be addressed in https://github.com/ray-project/ray/issues/21838.
This PR consolidates both #21667 and #21759 (look there for features), but improves on them in the following way:
- [x] we reverted renaming of existing projects `tune`, `rllib`, `train`, `cluster`, `serve`, `raysgd` and `data` so that links won't break. I think my consolidation efforts with the `ray-` prefix were a little overeager in that regard. It's better like this. Only the creation of `ray-core` was a necessity, and some files moved into the `rllib` folder, so that should be relatively benign.
- [x] Additionally, we added Algolia `docsearch`, screenshot below. This is _much_ better than our current search. Caveat: there's a sphinx dependency that needs to be replaced (`sphinx-tabs`) by another, newer one (`sphinx-panels`), as the former prevents loading of the `algolia.js` library. Will follow-up in the next PR (hoping this one doesn't get re-re-re-re-reverted).
I updated this version compatibility table on the release branch but didn't update it on master. This is my mistake, the process is to make a PR to master and then cherry pick that commit to the release branch.
This PR finishes most of the stats todos for dataset. The main thing punted for future work is instrumentation of split(), which is particularly tricky since only certain blocks are transformed.
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
Dask default's to a disk-based shuffle even thought we're using a distributed scheduler, which appears to be resulting in dropped data since the filesystem isn't shared across nodes. Dask Distributed manually sets the shuffle algorithm in the global config to the task-based shuffle, which the Dask-on-Ray scheduler should probably do as well.
This PR adds a Dask config helper, `enable_dask_on_ray`, that sets Dask-on-Ray as the default scheduler along with changing the default shuffle to a task-based shuffle. The shuffle method can still be overridden by the user by manually specifying `df.set_index(shuffle="disk")`.
This adds an initial Dataset.stats() framework for debugging dataset performance. At a high level, execution stats for tasks (e.g., CPU time) are attached to block metadata objects. Datasets have stats objects that hold references to these stats and parent dataset stats (this avoids stats holding references to parent datasets, allowing them to be gc'ed). Similarly, DatasetPipelines hold stats from recently computed datasets.
Currently only basic ops like map / map_batches are instrumented. TODO placeholders are left for future PRs.
Datasets docs for last-mile preprocessing, particularly geared towards ML ingest. This gives groupby, aggregations, and random shuffling examples in the overview page (not present previously), adds some concreteness to our last-mile preprocessing positioning, and provides some preprocessing recipes for a few common transformations.
block splitting and makes it off by default. This makes it easier to debug problems potentially related to this feature. Criteria for enabling by default:
- We're confident all nightly tests pass (currently, there may be an issue with large-scale groupby with block splitting).
- We're confident lineage-based reconstruction can work with block splitting.
The default block size of 500MiB seems too low for some common workloads, e.g. shuffling 500GB. This creates 1000 blocks which means 1 million intermediate shuffle objects until we implement #20500.
This PR adds support for automatic block splitting on read and map transforms, to keep block size bounded to ~500MiB. This avoids potential OOM situations where a map task may consume too much intermediate Python heap memory, or too much object store shared memory for one block.
## Why are these changes needed?
- Since broadcasting is moving to grpc, introducing the option to increase the client side thread number
- For hybrid schedule, ignore the threshold if gcs based actor scheduler is enabled
With these fixing, actor creation rate > 600actor/s vs ~ 140 actor/s
## Related issue number