This PR adds support for vectorized global and grouped aggregations, porting the built-in aggregations to vectorized block aggregations for tabular datasets.
This PR refactors `LazyBlockList` in service of out-of-band serialization (see [mono-PR](https://github.com/ray-project/ray/pull/22616)) and is a precursor to an execution plan refactor (PR #2) and adding the actual out-of-band serialization APIs (PR #3). The following is included in this refactor:
1. `ReadTask`s are now a first-class concept, replacing calls;
2. read stage progress tracking is consolidated into `LazyBlockList._get_blocks_with_metadta()` and more of the read task complexity, e.g. the read remote function, was pushed into `LazyBlockList` to make `ray.data.read_datasource()` simpler;
3. we are a bit smarter with how we progressively launch tasks and fetch and cache metadata, including fetching the metadata for read tasks in `.iter_blocks_with_metadata()` instead of relying on the pre-read task metadata (which will be less accurate), and we also fix some small bugs in the lazy ramp-up around progressive metadata fetching.
(1) is the most important item for supporting out-of-band serialization and fundamentally changes the `LazyBlockList` data model. This is required since we need to be able to reference the underlying read tasks when rewriting read stages during optimization and when serializing the lineage of the Dataset. See the [mono-PR](https://github.com/ray-project/ray/pull/22616) for more context.
Other changes:
1. Changed stats actor to a global named actor singleton in order to obviate the need for serializing the actor handle with the Dataset stats; without this, we were encountering serialization failures.
This PR adds a `Checkpoint_as_directory()` context manager that either returns the local path (if checkpoint is already a directory) or a temporary directory path containing the checkpoint data, which is cleaned up after use. The path should be considered as a read-only source for loading data from the checkpoint.
A common use case for processing checkpoint data is to convert it into a directory with `Checkpoint.to_directory()` and then do some read-only processing (e.g. restoring a ML model).
This process has two flaws: First, `to_directory()` creates a temporary directory that has to be explicitly cleaned up by the user after use. Secondly, if the checkpoint is already a directory checkpoint, it is copied over, which is inefficient for large checkpoints (e.g. huggingface models) and then even more prone to unwanted side effects if not cleaned up properly.
With this context manager that effectively returns a directory that is to be used as a read-only data source, we can avoid manual cleaning up and unnecessary data copies (or avoid internal inspection as e.g. in https://github.com/ray-project/ray/pull/23876/files#diff-47db2f054ca359879f77306e7b054dd8b780aab994961e3b4911330ae15eeae3R57-R60)
See also discussion in https://github.com/ray-project/ray/pull/23850/files#r850036905
Add BatchMapper preprocessor.
Update the semantics of preprocessor.fit() to allow for multiple fit. This is to follow scikitlearn example.
Introduce FitStatus to explicitly incorporate Chain case.
Following #23862, there was an uncaught bug when comparing nan-priority checkpoints. This is because float("nan") <= float("nan") is always False (unlike e.g. np.nan <= np.nan, which is True).
This PR fixes this bug and adds a new test to ensure correct behavior.
Changes the logic in CheckpointManager to consider checkpoints with nan value of the metric as worst values, meaning they will be deleted first if keep_checkpoints_num is set.
Serve gets actors using the current Ray namespace. However, the Ray namespace and the controller namespace may not match when using the `_override_controller_namespace` argument in `serve.start()`. This change ensures that the `get_actor()` calls in `ActorReplicaWrapper` use the controller namespace. This also allows `num_replicas` to be scaled up and down properly when using `_override_controller_namespace`.
Clean up the ci/ directory. This means getting rid of the travis/ path completely and moving the files into sensible subdirectories.
Details:
- Moves everything under ci/travis into subdirectories, e.g. ci/build, ci/lint, etc.
- Minor adjustments to some scripts (variable renames)
- Removes the outdated (unused) asan tests
What: This class adds a generic BatchPredictor class that offers an interface to run batch inference on Ray datasets. It takes a Predictor class and checkpoint as an input, and provides a predict(dataset) method to run scalable scoring inference.
Why: Currently users have to implement scorers themselves. This is mostly boilerplate and prone to errors, so we should provide a simple solution instead.
Note that this predictor also implements the Predictor interface.
Instead of relying on the node-ip custom resource for static task-to-node placement, this PR introduces an explicit NodeAffinitySchedulingStrategy with the following benefits:
1. Specify node using id instead of ip since ip may not be unique for each node.
2. Support soft constraint so the task can be tolerant to node failures.
After this PR, the node-ip custom resource can be deprecated.
`ray.data.from_numpy()` currently expects to be given a list of ndarray futures, instead of handling concrete ndarrays, as expected (and as allowed by other `from_*` APIs, e.g. `from_pandas`). This PR renames the existing `from_numpy` API to `from_numpy_refs`, and exposes `ray.data.from_numpy`, which takes concrete ndarrays (not object references).
In some cases, the UDF for map_groups() may return value of different types, which should be disallowed.
This PR is to add unit test to make sure we do raise error if such case happens.
What: Only open (create) CSV files when actually reporting results.
Why: When trials crash before they report first (e.g. on init), they will have created an empty CSV file. When results are subsequently written, the CSV header is then missing.
What: Skips left-over checkpoint_tmp* directories when loading experiment analysis. Also loads iteration number from metadata file rather than parsing the checkpoint directory name.
Why: Sometimes temporary checkpoint directories are not deleted correctly when restoring (e.g. when interrupted). In these cases, they shouldn't be included in experiment analysis. Parsing their iteration number also failed, and should generally be done by reading the metadata file, not by inferring it from the directory name.
What: This introduces a general utility to synchronize directories between two nodes, derived from the RemoteTaskClient. This implementation uses chunked transfers for more efficient communication.
Why: Transferring files over 2GB in size leads to superlinear time complexity in some setups (e.g. local macbooks). This could be due to memory limits, swapping, or gRPC limits, and is explored in a different thread. To overcome this limitation, we use chunked data transfers which show quasi-linear scalability for larger files.
This PR preserves block order when transforming under the actor compute model. Before this PR, we were submitting block transformations in reverse order and creating the output block list in completion order.
Nan values do not have a well defined ordering. When sorting metrics to determine the best checkpoint, we should always filter out checkpoints that are associated with nan values.
Closes#23812
Takes care of the TODO left for SimpleImputer with most_frequent strategy by refactoring and optimising the logic for computing the most frequent value.
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
The PR https://github.com/ray-project/ray/pull/22820 introduced a API breakage for xlang usage, causing that `ray.java_actor_class` has not been available any longer from then on.
I'm fixing it in this PR. We should remove these top level APIs in 2.0 instead of minor versions.
This PR adds a RLTrainer to Ray AIR. It works for both offline and online use cases. In offline training, it will leverage the datasets key of the Trainer API to specify a dataset reader input, used e.g. in Behavioral Cloning (BC). In online training, it is a wrapper around the rllib trainables making use of the parameter layering enabled by the Trainer API.
Using the local rank as the device id only works if there is exactly 1 GPU per worker. Instead we should be using ray.get_gpu_ids() to determine which GPU device to use for the worker.
Ray use gcs in memory store by default instead of Redis, which cause gloo group doesn't work by default.
In this PR, we use Ray internal kv for the store of gloo group to replace the RedisStore by default, to make gloo group work well.
This PR depends on another PR in pygloo https://github.com/ray-project/pygloo/pull/10