Serve stores context state, including the `_INTERNAL_REPLICA_CONTEXT` and the `_global_client` in `api.py`. However, these data structures are referenced throughout the codebase, causing circular dependencies. This change introduces two new files:
* `context.py`
* Intended to expose process-wide state to internal Serve code as well as `api.py`
* Stores the `_INTERNAL_REPLICA_CONTEXT` and the `_global_client` global variables
* `client.py`
* Stores the definition for the Serve `Client` object, now called the `ServeControllerClient`
- Closes#23874 by fixing a typo ("num_gpus" -> "num-gpus").
- Adds end-to-end test logic confirming the fix.
- Adds end-to-end test logic confirming autoscaling with custom resources works.
- Slightly refines developer instructions.
- Deflakes test logic a bit by allowing for the event that the head pod changes its identity as the Ray cluster starts up.
Since remote calls provide no ordering guarantees, it could happen that `reconfigure` gets called before `is_allocated` Since `reconfigure` then runs the user initialization code, the replica actor could get blocked and never provide its allocation check.
This PR ensures that the allocation proof has been received before we run the replica initialization.
See dag layering summary in https://github.com/ray-project/ray/issues/24061
We need to cleanup and set right ray dag -> serve dag layering where `.bind()` can be called on `@serve.deployment` decorated class or func, but only returns raw Ray DAGNode type, executable by ray core and serve_dag is only available after serve-specific transformations.
Thus this PR removes exposed serve DAGNode type such as DeploymentNode.
It also removes the syntax of `class.bind().bind()` to return a `DeploymentMethodNode` that defaults to `__call__` to match same behavior in ray dag building.
When a `Trainer` is initialized with a run config and then passed into a `Tuner`, it is currently silently discarded and a default RunConfig is used. Instead we should use the run config in trainer if not overridden.
Add example for distributed pytorch geometric (graph learning) with Ray AIR
This only showcases distributed training, but with data small enough that it can be loaded in by each training worker individually. Distributed data ingest is out of scope for this PR.
Co-authored-by: matthewdeng <matthew.j.deng@gmail.com>
Ray Tune currently gracefully stops training on SIGINT. However, the Ray core worker prevents SIGINT (and SIGTERM) to be processed by child tasks, which means that Ray Tune runs that are started in remote tasks (e.g. via Ray client) cannot be gracefully interrupted.
In k8s-based cloud tests that used the Ray client to kick off a Ray Tune run, this lead to test flakiness, as final experiment state could not be gracefully persisted to cloud storage.
This PR adds support for SIGUSR1 in addition to SIGINT to interrupt training gracefully.
`test_cluster: test_replica_startup_status_transitions` is periodically flaky with the replica hanging in `PENDING_ALLOCATION`. This could be because there is no ordering guarantee on async actor calls, so the `reconfigure` method might execute first and block the asyncio loop (due to `ray.get`), not allowing the `is_allocated` call to run.
Closes#23503
We are fixing two issue here:
1. The unified controller API used pickle to pack the init args, we are changing it to cloudpickle for now. (this is something I missed during code review)
2. The checkpoint state functionality in controller uses pickle to prevent ray cluster specific state written to checkpoint and unable to recover in a fresh new cluster. However, this recover from new cluster is not good UX and we should prefer an end to end solution like resubmitting via REST API.
As a corollary, the deployment state manager should not care about deserializing replica config and init args. Rather, it should just pass the protobuf directly to replica. I can do that either here or as a follow up.
`set_start_time()` was not implemented for the progress reporter base class, but it's called in `tune.run()`.
Instead of adding new methods to set runtime arguments, this PR moves to a singular and forward-compatible `setup()` method that defaults to no-op. This way custom reporters can make use of runtime information passed to the reporter, but can choose to ignore it per default.
Previously we have double dump behavior that makes json serde not human readable or friendly, but it's required given `DAGDriver` takes `dag_node_json` as first arg and it will appear in YAML.
This PR removes extra `json.dumps()` in encoder path, eliminated and simplified most encoder / object_hooks that are not needed in the first place to make everything simpler again.
Sample YAML now for a complex DAG: https://gist.github.com/jiaodong/32991771e9d78c35767eb24ed73f8236
We're pretty close to have a better minimal JSON representation of the whole dag after this. I might include in this PR or separate one.
`gcsfs` complains about an invalid `create_parents` argument when using google cloud storage with cloud checkpoints. Thus we should use an alternative fs spec handler that omits this argument for gs.
The root issue will be fixed here: https://github.com/fsspec/gcsfs/pull/471
Implements `SklearnTrainer` and `SklearnPredictor`. Full parallelism with joblib + support for GPU enabled estimators like cuML.
Interface has been modified slightly by addition of several arguments, which were required for full functionality.
I haven't tested cuML yet, will do it later.
Depends on https://github.com/ray-project/ray/pull/23889
Co-authored-by: Kai Fricke <kai@anyscale.com>
Adds a `ScalingConfigDataClass.validate_config` classmethod to allow for a generic way of validating ScalingConfigs by allowing only certain keys.
Co-authored-by: Kai Fricke <kai@anyscale.com>
The ray.timeline command currently only shows task for task events, which isn't very useful if your program has multiple types of tasks. This PR adds "::<function name>" to the string, similar to what we do for process names, to distinguish between different tasks.
A legacy K8s test fails due to incorrect usage of @ray.method which only started raising errors after the Ray 1.12.0 branch cut.
This PR removes the use of @ray.method in the test.
Some context in #23271 and #23471
In addition, I noticed some of the test were flakey due to out-of-memory issues. For that reason, I've doubled the memory request and limits in the legacy operator's example files.
I've also added CPU limits in an example file that was missing them -- it makes the most sense for consistency with Ray's resource model to use CPU limits in K8s configs.
Finally, I added an extra note to the instructions for running the tests.
This change sets `"memory"`'s default to `0` in the `resource_dict` but keeps the default as `None` in `ray_actor_options`. It adds logic to both problematic lines to handle `None` in case of future settings updates. It also adds unit tests to prevent regressions.
* Provide a utility to ping a Ray cluster and verify it has the same Ray version. This is useful to check if a Ray cluster is available at a given address, without connecting to the cluster with the more heavyweight ray.init(). This utility is integrated with ray memory to provide a better error message when the Ray cluster is unavailable. There seem to be user demand for exposing this as an API as well.
* Improve the error message when the address provided to Ray does not contain port.
Memory limit determination test is not relevant for Windows; we already skip the CPU limit determination tests for Windows, so we skip for the memory limit determination as well.
People use models that accept dictionaries as input. For example, a model might take the following dictionary as input:
{
"value1": [[7], [8]],
"value2": [[10], [15]],
}
To facilitate using Ray Datasets with such models and to provide feature parity with to_torch, we should support more feature_columns types.
This PR adds support for vectorized global and grouped aggregations, porting the built-in aggregations to vectorized block aggregations for tabular datasets.
This PR refactors `LazyBlockList` in service of out-of-band serialization (see [mono-PR](https://github.com/ray-project/ray/pull/22616)) and is a precursor to an execution plan refactor (PR #2) and adding the actual out-of-band serialization APIs (PR #3). The following is included in this refactor:
1. `ReadTask`s are now a first-class concept, replacing calls;
2. read stage progress tracking is consolidated into `LazyBlockList._get_blocks_with_metadta()` and more of the read task complexity, e.g. the read remote function, was pushed into `LazyBlockList` to make `ray.data.read_datasource()` simpler;
3. we are a bit smarter with how we progressively launch tasks and fetch and cache metadata, including fetching the metadata for read tasks in `.iter_blocks_with_metadata()` instead of relying on the pre-read task metadata (which will be less accurate), and we also fix some small bugs in the lazy ramp-up around progressive metadata fetching.
(1) is the most important item for supporting out-of-band serialization and fundamentally changes the `LazyBlockList` data model. This is required since we need to be able to reference the underlying read tasks when rewriting read stages during optimization and when serializing the lineage of the Dataset. See the [mono-PR](https://github.com/ray-project/ray/pull/22616) for more context.
Other changes:
1. Changed stats actor to a global named actor singleton in order to obviate the need for serializing the actor handle with the Dataset stats; without this, we were encountering serialization failures.
This PR adds a `Checkpoint_as_directory()` context manager that either returns the local path (if checkpoint is already a directory) or a temporary directory path containing the checkpoint data, which is cleaned up after use. The path should be considered as a read-only source for loading data from the checkpoint.
A common use case for processing checkpoint data is to convert it into a directory with `Checkpoint.to_directory()` and then do some read-only processing (e.g. restoring a ML model).
This process has two flaws: First, `to_directory()` creates a temporary directory that has to be explicitly cleaned up by the user after use. Secondly, if the checkpoint is already a directory checkpoint, it is copied over, which is inefficient for large checkpoints (e.g. huggingface models) and then even more prone to unwanted side effects if not cleaned up properly.
With this context manager that effectively returns a directory that is to be used as a read-only data source, we can avoid manual cleaning up and unnecessary data copies (or avoid internal inspection as e.g. in https://github.com/ray-project/ray/pull/23876/files#diff-47db2f054ca359879f77306e7b054dd8b780aab994961e3b4911330ae15eeae3R57-R60)
See also discussion in https://github.com/ray-project/ray/pull/23850/files#r850036905