Fixes checkpoints not being recorded in Tune's checkpoint manager if the first checkpoint has None value. This also deflakes `test_checkpoint_manager.py::CheckpointManagerTest`.
**TL;DR:** Don't clear for eager, clear all but non-lazy input blocks if lazy, clear everything if pipelining.
This PR provides more efficient and intuitive block clearing semantics for eager mode, lazy mode, and pipelining, while still supporting multiple operations applied to the same base dataset, i.e. fan-out. For example, two different map operations are applied to the same base `ds` in this example:
```python
ds = ray.data.range(10).map(lambda x: x+1)
ds1 = ds.map(lambda x: 2*x)
ds2 = ds.map(lambda x: 3*x)
```
If naively clear the blocks when executing the map to produce `ds1`, the map producing `ds2` will fail.
### Desired Semantics
- **Eager mode** - don’t clear input blocks, thereby supporting fan-out from cached data at any point in the stage chain without triggering unexpected recomputation.
- **Lazy mode** - if lazy datasource, clear the input blocks for every stage, relying on recomputing via stage lineage if fan-out occurs; if non-lazy datasource, do not clear source blocks for execution plan when executing first stage, but do clear input blocks for every subsequent stage.
- **Pipelines** - Same as lazy mode, although the only fan-out that can occur is from the pipeline source blocks when repeating a dataset/pipeline, so unintended intermediate recomputation will never happen.
#17581 introduced a warning about excess queuing for actors. Unfortunately since Ray 1.10.0, the metric used became wrong for async actors, resulting in bogus warnings when they are called more than 5000 times, even though there are not 5000 pending tasks.
The difference between 1.9.2 and 1.10.0 is that async actors tasks skip the queue in CoreWorkerClient::PushActorTask. However CoreWorkerClient::ClientProcessedUpToSeqno uses max_finished_seq_no_ which is never updated when the queue is skipped.
I think that a better metric for the amount of tasks that are pending submissions is the size of the internal queue CoreWorkerDirectActorTaskSubmitter::inflight_task_callbacks.
Currently nightly tests are unable to finish in a day because of concurrency group limit on `large` tests. This is an attempt to adjust the limits so buildkite can run / finish more tests. I will observe which tests fall into the `enormous` group and adjust the test resource / concurrency group limits again.
Fix CQL getting stuck when deprecated timesteps_per_iteration is used (use min_train_timesteps_per_reporting instead).
CQL does not perform sampling timesteps and the deprecated timesteps_per_iteration is automatically translated into the new min_sample_timesteps_per_reporting, but should be translated (only for CQL and other purely offline RL algos) into min_train_timesteps_per_reporting.
If timesteps_per_iteration, CQL lever leaves the first iteration as it thinks it's not done yet (sample timesteps always remain at 0).
For debugging client environments, it is helpful to print the installed pip packages.
Additionally, a fix for the environment of the ml_user_tune_rllib_connect_test is added. Additionally, anyscale import errors are reported verbosely to help debug missing packages.
The prefetch_blocks implementation doesn't work as expected. Due to ray.wait() doesn't given us fine grained control, today we block waiting any of the block returns. As I read the code, it may or may not actually fetching all the blocks.
A better way to ensure prefetching not blocking is to use ray remote function call, which is not blocking and ensures the blocks are fetched eventually.
Lint was still failing (but only caught with doctest):
```
File "../../python/ray/rllib/utils/numpy.py", line ?, in default
Failed example:
tree.traverse(make_action_immutable, d, top_down=False)
Exception raised:
Traceback (most recent call last):
File "/opt/miniconda/lib/python3.6/doctest.py", line 1330, in __run
compileflags, 1), test.globs)
File "<doctest default[4]>", line 1, in <module>
tree.traverse(make_action_immutable, d, top_down=False)
NameError: name 'make_action_immutable' is not defined
```
Adds a fast file metadata provider that trades comprehensive file metadata collection for speed of metadata collection, and which also disabled directory path expansion which can be very slow on some cloud storage service providers. This PR also refactors the Parquet datasource to be able to take advantage of both these changes and the content-type agnostic partitioning support from #23624.
This is the second PR of a series originally proposed in #23179.
Adds a Categorizer preprocessor to automatically set the Categorical dtype on a dataset. This is useful for eg. LightGBM, which has build-in support for features with that dtype.
Depends on #24144.
After https://github.com/ray-project/ray/pull/24066, some release tests are running into:
```
ModuleNotFoundError: No module named 'ray.train.impl'
```
This PR simply adds a `__init__.py` file to resolve this.
We also add a 5 wecond delay for client runners in release test to give clusters a bit of slack to come up (and avoid ray client connection errors)
#24311 added the `test_update_num_replicas_anonymous_namespace` unit test to check for replica leaks in anonymous namespaces. This change adds this test to the master branch.
Closes https://github.com/ray-project/ray/issues/24300
Adds a field to the job submission snapshot that matches the job name in the existing snapshot. Before this PR, the job submission name was camelcased because all snapshot keys are automatically camelcased. This PR allows jobs from the old job field to be linked to ones in the new job submission snapshot.
The postprocess checkpoint method was introduced to be able to add data to function runner checkpoint directories before they are uploaded to external (cloud) storage. Instead, we should just use the existing separation of `save_checkpoint()` and `save()`.
The simple shuffle currently implemented in Datasets does not reliably scale past 1000+ partitions due to metadata and I/O overhead.
This PR adds an experimental shuffle implementation for a "push-based shuffle", as described in this paper draft. This algorithm should see better performance at larger data scales. The algorithm works by merging intermediate map outputs at the reducer side while other map tasks are executing. Then, a final reduce task merges these merged outputs.
Currently, the PR exposes this option through the DatasetContext. It can also be set through a hidden OS environment variable (RAY_DATASET_PUSH_BASED_SHUFFLE). Once we have more comprehensive benchmarks, we can better document this option and allow the algorithm to be chosen at run time.
Redo for #23758 to fix CI.
For tasks with node affinity scheduling strategy, the resource demands shouldn't create new nodes. This PR achieves this by not reporting demand to autoscaler. In the future, we will explore sending scheduling strategy information to autoscaler.
To add basic plotting feature for Ray DAGs.
`ray.experimental.dag.plot(dag: DAGNode, to_file=None)`
### Behavior
1. dump the dag plot (Dot) to file.
2. also render the image whenever possible. E.g. if running in Jupyter notebook, the image will not only be saved, but also rendered in the notebook.
3. when to_file is not set (i.e. None), it will be saved to a tempfile for rendering purpose only. This is common when users plot DAGs in notebook env to explore the DAG structure without wanting to save it to a file.
Refactors _get_unique_value_indices (used in Encoder preprocessors) for much improved performance with multiple columns. Also uses the same, more robust intermediary dataset format in _get_most_frequent_values (Imputers).
The existing unit tests pass, and no functionality has been changed.