Unreverts #24812, skipping the memory releasing tests that are already flaky. We have a separate issue tracking the unskipping of these memory releasing tests, once we find a more reliable way to test them.
* Revert "Revert "Revert "Revert "[Datasets] [Tensor Story - 1/2] Automatically provide tensor views to UDFs and infer tensor blocks for pure-tensor datasets."" (#25031)" (#25057)"
This reverts commit fb2933a78f.
* Skip shuffle memory release test.
**Update**: This PR is now part 3 of a three PR group to consolidate the checkpoints.
1. Part 1 adds the common checkpoint management class #24771
2. Part 2 adds the integration for Ray Train #24772
3. This PR builds on #24772 and includes all changes. It moves the Ray Tune integration to use the new common checkpoint manager class.
Old PR description:
This PR consolidates the Ray Train and Tune checkpoint managers. These concepts previously did something very similar but in different modules. To simplify maintenance in the future, we've consolidated the common core.
- This PR keeps full compatibility with the previous interfaces and implementations. This means that for now, Train and Tune will have separate CheckpointManagers that both extend the common core
- This PR prepares Tune to move to a CheckpointStrategy object
- In follow-up PRs, we can further unify interfacing with the common core, possibly removing any train- or tune-specific adjustments (e.g. moving to setup on init rather on runtime for Ray Train)
Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
Follow up on our last discussion for supporting piecemeal fashion air users.
Only did for tensorflow for now, want to collect some feedback on API naming, package structure etc and I will add others.
Currently, each function decorated with `@ray.remote` is marked with type annotations as a `RemoteFunction` class (only used for type annotations, autocompletion, inline errors, etc). The current class takes several *type parameters*. And then it uses those parameters in the extended `func.remote()` method.
But with the current type annotations, it marks any of the unused type parameters as `None`. This means that calling the `.remote()` method would check the first (actual) arguments and the rest are marked as `None`, but that means that for type annotations it considers "correct" to pass extra `None` arguments, while actually, that would not be valid. So, this doesn't show an error, but it should:
<img width="371" alt="Screenshot 2022-06-07 at 05 38 48" src="https://user-images.githubusercontent.com/1326112/172360355-9b344220-7824-4b5c-87da-038f5b53fe04.png">
...those 2 extra `None` values should be marked as invalid.
After this PR, those invalid extra arguments would be marked as invalid:
<img width="588" alt="Screenshot 2022-06-07 at 05 42 10" src="https://user-images.githubusercontent.com/1326112/172360956-424b40d4-8197-4663-8298-617a1df37658.png">
And:
<img width="687" alt="Screenshot 2022-06-07 at 05 42 50" src="https://user-images.githubusercontent.com/1326112/172361140-eb93c675-f5d6-4e0c-b9b2-83c4801bb450.png">
## More context
I also tried the new `TypeVarTuple`, it might simplify these type annotations in the future, but it's not currently supported by mypy yet, it's a very recent addition to the language (and `typing_extensions`) so it's probably too early to adopt it.
This adds the following options to DatasetConfig, which can be used to enable streaming ingest.
```
# Whether the dataset should be streamed into memory using pipelined reads.
# When enabled, get_dataset_shard() returns DatasetPipeline instead of Dataset.
# The amount of memory to use is controlled by `stream_window_size`.
# False by default for all datasets.
use_stream_api: Optional[bool] = None
# Configure the streaming window size in bytes. A typical value is something like
# 20% of object store memory. If set to -1, then an infinite window size will be
# used (similar to bulk ingest). This only has an effect if use_stream_api is set.
# Set to 1.0 GiB by default.
stream_window_size: Optional[float] = None
# Whether to enable global shuffle (per pipeline window in streaming mode). Note
# that this is an expensive all-to-all operation, and most likely you want to use
# local shuffle instead.
# False by default for all datasets.
global_shuffle: Optional[bool] = None
```
When running an experiment for example in the cloud and syncing to a bucket the logdir path in the trials will be changed when working with the checkpoints in the bucket. There are some workarounds, but the easier solution is to also add a rel_logdir containing the relative path to the trials/checkpoints that can handle any changes in the location of experiment results.
As discussed with @Yard1 and @krfricke
Co-authored-by: Antoni Baum <antoni.baum@protonmail.com>
Co-authored-by: Kai Fricke <kai@anyscale.com>
Add visibility into the following to help Ray users and developers debug performance and OOM issues:
Raylet memory usage broken down by USS vs remaining RSS.
Total workers' count, CPU percentage usage, and memory usage.
Dataset.to_tf and TensorflowPredictor attempt to convert Pandas dataframes to NumPy arrays by calling DataFrame.values. However, DataFrame.values fails if the dataframe contains multidimensional arrays.
This PR solves this problem by introducing a function convert_pandas_to_tf_tensor. The implementation of the function is based on the implementation of convert_pandas_to_torch_tensor.
This is the PR to implement ray log to the server side. The PR is continued from #24068.
The PR supports two endpoints;
/api/v0/logs # list logs of the node id filtered by the given glob.
/api/v0/logs/{[file | stream]}?filename&pid&actor_id&task_id&interval&lines # Stream the requested file log. The filename can be inferred by pid/actor_id/task_id
Some tests need to be re-written, I will do it soon.
As a follow-up after this PR, there will be 2 PRs.
PR to add actual CLI
PR to remove in-memory cached logs and do on-demand query for actor/worker logs
test_modin.py is flakey right now. It complains about some modules can't be imported. This seems like a init issue where client mode and non-client mode are mixed. This test closes the cluster for each run. It slows the test a little bit, but it's more stable.
This PR adds a filtering support. The filtering is done from the API server side (not from the source side). Source side filtering is a bit complicated to write an elegant solution, and we will handle it in the future (no optimization for alpha APIs).
We will also support limited types of columns for each API.
The API is as follows
ray list [resources] -- filter [key] [value] => filter data that's key==value.
In the future, we can also support more complicated filtering like !=, And, Or , or etc.
This adds a per-dataset config object to DataParallelTrainer. These configs define how the Dataset should be read into the DataParallelTrainer. It configures the preprocessing, splitting, and ingest strategy per-dataset. DataParallelTrainers declare default DatasetConfigs for each dataset passed in the ``datasets`` argument. Users have the opportunity to selectively override these configs by passing the ``dataset_config`` argument. Trainers can also define user customizable values (e.g., XGBoostTrainer doesn't support streaming ingest).
This PR adds the minimal support for dataset configs. Future PRs will:
- Add support for streaming ingest
- Move this config from DataParallelTrainer to ml.Trainer
This PR uses a task/actor launch hook to generate better error messages for nested Tune tasks/actors in the case there are no extra resources reserved for them. The idea is that the Tune trial runner actor can set a hook prior to executing the user code. If the user code launches a task, and the placement group for the trial cannot possibly fit the task, then we raise TuneError right off to warn the user.
The package "ml" should be renamed to "air".
Main question: Keep a `ml.py` with `from ray.air import *` for some level of backwards compatibility?
I'd go for no to force people to use the new structure.
The TrialExecutor base class was a stub and has been deprecated long ago; direct inheritance was disabled. This PR removes the base class and moves the remaining functionality into the RayTrialExecutor.
The main issue with this test is that the worker is trying to connect to the raylet but the raylet exits, and in this case, it'll hang there. This happens before the periodical check runs so the worker won't exit as well.
This fix moves the hanging part to the place after the periodical check starts.
Another issue is the pubsub timeout. The default one is 60s, and we need to adjust it to smaller value to make it work within 60s for the test.
Looks like the test_logging fails when syncer is enabled. However, I found the test was badly written, and the failure might be a side effect of syncer (I am not sure why. Maybe syncer slows down ray.init()?)
ray/python/ray/tests/test_logging.py
Line 228 in f75ede1
def test_log_monitor_backpressure(ray_start_cluster, monkeypatch):
Anyway, it seems like the test will fail if there's a delay after log monitor is started.
Testing this is not trivial. Instead, I made log_monitor unit testable and added full unit tests.
This also adds a better exception message on another flaky test test_log_rotation . I need more data before actually fixing this issue.