This exposes a low-cost way to perform a pseudo global shuffle.
For extremely large datasets that span multiple nodes, contiguous blocks will often be colocated on the same node. This leads to hot spots during iteration of the dataset in which single nodes (1) must send a lot of data over the network, and (2) perform lots of disk reads if the dataset is spilled to disk.
This allows the workload to be spread across the nodes on which the dataset blocks are on.
Stats construction on the from_arrow and from_numpy (and from_pandas with Pandas block support disabled) is currently broken since we weren't resolving the block metadata before passing it to the stats, causing future ds.stats() calls to fail. This PR fixes this and adds some test coverage.
Drivebys:
- Adds stats for from_pandas() zero-copy path (metadata fetch only).
- Changes "from_numpy" stats stage name to "from_numpy_refs", to be consistent with stats for other from_*() APIs.
Currently, team:ml spans all ML (Tune, Train, AIR) tests and rllib tests. rllib tests are much more flaky and it would be good to split them up in the flaky test tracker. This PR changes Rllib-tests from team:ml to team:rllib to enable this separation.
Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
The current inheritance behavior for runtime_envs enables the following workflow for Jobs: A working_dir can be set in the Jobs API, and then inside the driver script, if a new per-task runtime_env is defined, it will automatically inherit the driver's working_dir.
There is an ongoing discussion about the best approach for runtime_env inheritance going forward: https://github.com/ray-project/ray/issues/25484, in which we noted that there were no tests covering this behavior.
This PR adds integration tests for the above behavior. If we ultimately decide to abandon the current inheritance behavior and instead have child runtime envs completely overwrite the parent runtime env, this test will fail, reminding us to do the following:
- Update the internal runtime_env usage in Ray Tune to use the `ray.get_runtime_context().runtime_env.update` API
- Update the documentation for Ray Jobs telling users to use `ray.get_runtime_context().runtime_env.update` and update this test
Breaks the hard dependency on Preprocessor imports for type hints in AIR. Preparation for move of Preprocessors to `ray.data`.
Trainer still has a hard dependency due to an `isinstance` check.
This test was flaky because actor tasks can fail if submitted when the actor process is failed or restarting. This PR changes the test to be more stressful so that the error is easier to reproduce and changes the max_retries parameter to -1 so that the actor task will succeed.
Related issue number
Closes#24942.
Unreverts #24812, skipping the memory releasing tests that are already flaky. We have a separate issue tracking the unskipping of these memory releasing tests, once we find a more reliable way to test them.
* Revert "Revert "Revert "Revert "[Datasets] [Tensor Story - 1/2] Automatically provide tensor views to UDFs and infer tensor blocks for pure-tensor datasets."" (#25031)" (#25057)"
This reverts commit fb2933a78f.
* Skip shuffle memory release test.
**Update**: This PR is now part 3 of a three PR group to consolidate the checkpoints.
1. Part 1 adds the common checkpoint management class #24771
2. Part 2 adds the integration for Ray Train #24772
3. This PR builds on #24772 and includes all changes. It moves the Ray Tune integration to use the new common checkpoint manager class.
Old PR description:
This PR consolidates the Ray Train and Tune checkpoint managers. These concepts previously did something very similar but in different modules. To simplify maintenance in the future, we've consolidated the common core.
- This PR keeps full compatibility with the previous interfaces and implementations. This means that for now, Train and Tune will have separate CheckpointManagers that both extend the common core
- This PR prepares Tune to move to a CheckpointStrategy object
- In follow-up PRs, we can further unify interfacing with the common core, possibly removing any train- or tune-specific adjustments (e.g. moving to setup on init rather on runtime for Ray Train)
Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
Follow up on our last discussion for supporting piecemeal fashion air users.
Only did for tensorflow for now, want to collect some feedback on API naming, package structure etc and I will add others.
Currently, each function decorated with `@ray.remote` is marked with type annotations as a `RemoteFunction` class (only used for type annotations, autocompletion, inline errors, etc). The current class takes several *type parameters*. And then it uses those parameters in the extended `func.remote()` method.
But with the current type annotations, it marks any of the unused type parameters as `None`. This means that calling the `.remote()` method would check the first (actual) arguments and the rest are marked as `None`, but that means that for type annotations it considers "correct" to pass extra `None` arguments, while actually, that would not be valid. So, this doesn't show an error, but it should:
<img width="371" alt="Screenshot 2022-06-07 at 05 38 48" src="https://user-images.githubusercontent.com/1326112/172360355-9b344220-7824-4b5c-87da-038f5b53fe04.png">
...those 2 extra `None` values should be marked as invalid.
After this PR, those invalid extra arguments would be marked as invalid:
<img width="588" alt="Screenshot 2022-06-07 at 05 42 10" src="https://user-images.githubusercontent.com/1326112/172360956-424b40d4-8197-4663-8298-617a1df37658.png">
And:
<img width="687" alt="Screenshot 2022-06-07 at 05 42 50" src="https://user-images.githubusercontent.com/1326112/172361140-eb93c675-f5d6-4e0c-b9b2-83c4801bb450.png">
## More context
I also tried the new `TypeVarTuple`, it might simplify these type annotations in the future, but it's not currently supported by mypy yet, it's a very recent addition to the language (and `typing_extensions`) so it's probably too early to adopt it.
This adds the following options to DatasetConfig, which can be used to enable streaming ingest.
```
# Whether the dataset should be streamed into memory using pipelined reads.
# When enabled, get_dataset_shard() returns DatasetPipeline instead of Dataset.
# The amount of memory to use is controlled by `stream_window_size`.
# False by default for all datasets.
use_stream_api: Optional[bool] = None
# Configure the streaming window size in bytes. A typical value is something like
# 20% of object store memory. If set to -1, then an infinite window size will be
# used (similar to bulk ingest). This only has an effect if use_stream_api is set.
# Set to 1.0 GiB by default.
stream_window_size: Optional[float] = None
# Whether to enable global shuffle (per pipeline window in streaming mode). Note
# that this is an expensive all-to-all operation, and most likely you want to use
# local shuffle instead.
# False by default for all datasets.
global_shuffle: Optional[bool] = None
```