What: Only open (create) CSV files when actually reporting results.
Why: When trials crash before they report first (e.g. on init), they will have created an empty CSV file. When results are subsequently written, the CSV header is then missing.
What: Skips left-over checkpoint_tmp* directories when loading experiment analysis. Also loads iteration number from metadata file rather than parsing the checkpoint directory name.
Why: Sometimes temporary checkpoint directories are not deleted correctly when restoring (e.g. when interrupted). In these cases, they shouldn't be included in experiment analysis. Parsing their iteration number also failed, and should generally be done by reading the metadata file, not by inferring it from the directory name.
What: This introduces a general utility to synchronize directories between two nodes, derived from the RemoteTaskClient. This implementation uses chunked transfers for more efficient communication.
Why: Transferring files over 2GB in size leads to superlinear time complexity in some setups (e.g. local macbooks). This could be due to memory limits, swapping, or gRPC limits, and is explored in a different thread. To overcome this limitation, we use chunked data transfers which show quasi-linear scalability for larger files.
This PR preserves block order when transforming under the actor compute model. Before this PR, we were submitting block transformations in reverse order and creating the output block list in completion order.
Nan values do not have a well defined ordering. When sorting metrics to determine the best checkpoint, we should always filter out checkpoints that are associated with nan values.
Closes#23812
Takes care of the TODO left for SimpleImputer with most_frequent strategy by refactoring and optimising the logic for computing the most frequent value.
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
The PR https://github.com/ray-project/ray/pull/22820 introduced a API breakage for xlang usage, causing that `ray.java_actor_class` has not been available any longer from then on.
I'm fixing it in this PR. We should remove these top level APIs in 2.0 instead of minor versions.
This PR adds a RLTrainer to Ray AIR. It works for both offline and online use cases. In offline training, it will leverage the datasets key of the Trainer API to specify a dataset reader input, used e.g. in Behavioral Cloning (BC). In online training, it is a wrapper around the rllib trainables making use of the parameter layering enabled by the Trainer API.
Using the local rank as the device id only works if there is exactly 1 GPU per worker. Instead we should be using ray.get_gpu_ids() to determine which GPU device to use for the worker.
Ray use gcs in memory store by default instead of Redis, which cause gloo group doesn't work by default.
In this PR, we use Ray internal kv for the store of gloo group to replace the RedisStore by default, to make gloo group work well.
This PR depends on another PR in pygloo https://github.com/ray-project/pygloo/pull/10
Replaces FLAML searchers with a dummy class that throws an informative error on init if FLAML is not installed, removes ConfigSpace import in BOHB example code, adds a note to examples using external dependencies.
The `Application` class is stored in `api.py`. The object is relatively standalone and is used as a dependency in other classes, so this change moves `Application` (and `ImmutableDeploymentDict`) to a new file, `application.py`.
When deployments fail to update, [Serve sets their status to UNHEALTHY and logs the error message](46465abd6d/python/ray/serve/deployment_state.py (L1507-L1511)). However, the message lacks a traceback, making it impossible to find what caused it. [For example](https://console.anyscale.com/o/anyscale-internal/projects/prj_2xR6uT6t7jJuu1aCwWMsle/clusters/ses_SfGPJq8WWJUhAvmHHsDgJWUe?command-history-section=command_history):
```
File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/serve/api.py", line 328, in _wait_for_deployment_healthy
raise RuntimeError(f"Deployment {name} is UNHEALTHY: {status.message}")
RuntimeError: Deployment echo is UNHEALTHY: Failed to update deployment:
'>' not supported between instances of 'NoneType' and 'int'.
```
It's not clear where `'>' not supported between instances of 'NoneType' and 'int'.` is being triggered.
The change includes the full traceback for this type of update failure. The new status message is easier to debug:
```
File "/Users/shrekris/Desktop/ray/python/ray/serve/api.py", line 328, in _wait_for_deployment_healthy
raise RuntimeError(f"Deployment {name} is UNHEALTHY: {status.message}")
RuntimeError: Deployment A is UNHEALTHY: Failed to update deployment:
Traceback (most recent call last):
File "/Users/shrekris/Desktop/ray/python/ray/serve/deployment_state.py", line 1503, in update
running_replicas_changed |= self._check_and_update_replicas()
File "/Users/shrekris/Desktop/ray/python/ray/serve/deployment_state.py", line 1396, in _check_and_update_replicas
a = 1/0
ZeroDivisionError: division by zero
```
(I forced a divide-by-zero error to get this traceback).
This change adds new unit tests and error message to _store_package_in_gcs(). In particular, it tests the function's behavior when it fails to connect to the GCS.
What: Raise meaningful exceptions when invalid parameters are passed.
Why: We want to catch invalid parameters and guide users to use the API in the correct way.
Adds a dynamic property to easily obtain `config` dict from `Result`, extends the `ResultGrid.get_best_config` method for parity with `ExperimentAnalysis.get_best_trial` (allows for using of mode and metric different to the one set in the Tuner).
Currently Datasets primitives repartition, groupby, sort, and random_shuffle all use different internal shuffle implementations. This PR unifies them on a single internal ShuffleOp class. This class exposes static methods for map and reduce which must be implemented by the specific higher-level primitive. Then the ShuffleOp.execute method implements a simple pull-based shuffle by submitting one map task per input block and one reduce task per output block.
Closes#23593.
If rsync/ssh is not available (as in kubernetes setups), Tune previously had no fallback mechanism to synchronize trial directories to the driver. This PR introduces a `RemoteTaskSyncer` trial syncer that uses ray remote tasks to ship file contents between nodes. The implementation utilizes tarfile to compress files for transfer, and it only transfers files that differ between the source and target directory to minimize network bandwidth usage.
The trial syncer works as follows:
1. It collects information about existing files in the target directory. This directory could be remote (when syncing up) or local (when syncing down).
2. It then schedules a `pack` task on the source node. This will always be a remote task so the future can be passed to the unpack task. The pack task will only pack files that are not existent or different in the target directory into a tarfile, which is returned as a bytes string
3. An `unpack` task in scheduled on the target node. This will always be a remote task so the future can be awaited in a call to `wait()`
A test is added to ensure that only modified files are transferred on subsequent sync ups/downs.
Finally, minor changes are made to the `Syncer`/`NodeSyncer` classes to allow passing `(ip, path)` tuples rather than rsync-style remote paths.
Updates @ray.method error message to match the one for @ray.remote. Since the client mode version of ray.method is identical to the regular ray.method, deletes the client mode version and drops the client_mode_hook decorator (guessing that the client copy was added before client_mode_hook was introduced).
Also fixes what I'm guessing is a bug that doesn't allow both num_returns and concurrency_group to be specified at the same time (assert len(kwargs) == 1).
Closes#23271
Copied from #22571:
Whenever we spill, we try to spill all spillable objects. We also try to fuse small objects together to reduce total IOPS. If there aren't enough objects in the object store to meet the fusion threshold, we spill the objects anyway to avoid liveness issues. However, currently we spill at most the object fusion size when instead we should be spilling at least the fusion size. Then we use the max number of fused objects as a cap.
This PR fixes the fusion behavior so that we always spill at minimum the fusion size. If we reach the end of the spillable objects, and we are under the fusion threshold, we'll only spill it if we don't have other spills pending too. This gives the pending spills time to finish, and then we can re-evaluate whether it's necessary to spill the remaining objects. Liveness is also preserved.
Increases some test timeouts to allow tests to pass.
Initial draft of the interface for HuggingFaceTorchTrainer.
One alternative for limiting the number of datasets in datasets dict would be to have the user pass train_dataset and validation_dataset as separate arguments, though that would be inconsistent with TorchTrainer.
placement_group_test_5 is flakey. Reason is requesting PG with exact object store memory as node. If object store has object, then PG scheduling fails.
Also fix bug - typo.
* Uniformly distributed tasks among actors to utilize full concurrency
* Added test to ensure all tasks are launched at the same time
* Applied linting format