Compared with pushing based model, long-polling is slower because to send a message, you need to wait until you receive the polling requests. This PR improves this by sending X polling requests so that at most there can be 10 requests flying in the middle and this can improve the perf. Tested with `many_nodes_actor_tests` and no regression:
```
Actor launch time: 1.6540390349998688 (5000 actors)
Actor ready time: 13.953653221999957 (5000 actors)
Total time: 15.607692256999826 (5000 actors)
```
1. [X] Deprecate old code path for publish node resource change
2. [X] Move poller and broadcaster into RaySyncer
3. [X] Deprecate the old pg reporting code path
4. [X] Remove syncer from gcs cluster resource manager and encapsulate everything in syncer module.
5. [X] Versioning the report
6. [ ] Introduce Reporter/Receiver API in the prototype and adaptor rayler and gcs with that.
7. [ ] Experiment with protocol & communication layer change.
This test is not running well in Redis mode. Given that the other tests are ok, I'd like to only disable this one instead of revert the whole commit to making sure the other tests don't have regression.
`linux://python/ray/tests:test_runtime_env_plugin::test_plugin_timeout`
Now the status of subscribing to Actors in Actor Manager is eager mode, that is to say, when worker A passes List<ActorHandler> as an input parameter to another worker B, worker B will immediately subscribe to the status of all Actors in this list when constructing, even if worker B has not yet used these actors.
Assuming that a graph job has 1000 actors, and each actor has a List of the graph, then this job has nearly 100w subscription relationships. When the job goes offline, the 1000 actor processes will be killed, the redis-server will instantly receive the disconnect event from the 1000 actor processes, each event will trigger 1000 unsubscribexxx operations in the freeClient, causing the redis-server to get stuck.
We suggest to change this eager mode to lazy mode, and only initiate subscription when `SubmitActorTask`, which can reduce many unnecessary subscription relationships.
The microbenchmark (Left is this PR, Right is master branch)

in python3.10, it fixed a bug that a interactively defined class was tagged with a wrong type during inspection; which now throws OSError. detailed pr python/cpython#27171
we need to handle this case properly in otherwise ray actor definition will throw in interactive mode. please refer to #25026 for repo.
This PR tries to fix some features of the gcs actor scheduler, which include:
1. Report pending actor info in gcs such that `HandleGetAllResourceUsage` is able to export the whole thing (including worker nodes and gcs). Otherwise, external features, i.e., autoscaler, can not work properly.
2. In `ClusterResourceScheduler`, actors that can not find available nodes (by gcs scheduler) should stay in the pending queue of `ClusterTaskManager`.
3. If using gcs scheduler, the PG's wildcard resources have to be updated **incrementally** when committing bundles.
In the next PR, we will fix all remaining trivia issues and enable gcs scheduler to pass the entire testing pipeline.
Co-authored-by: Chong-Li <lc300133@antgroup.com>
Adds file locking to prevent parallel file system operations to Tune/AIR syncing functions.
Co-authored-by: Kai Fricke <krfricke@users.noreply.github.com>
Weights and biases creates a wandb directory to collect intermediate logs and artifacts before uploading them. This directory should be in the respective trial directories. This also means we can re-enable auto resuming.
When `create_scheduler("pb2", ....)` is run a `TuneError` exception is raised. See referenced issue below for details.
In addition to the fix, introduced a new test (`ray/tune/tests/test_api.py::ShimCreationTest.testCreateAllSchedulers`) to confirm that `tune.create_scheduler()` will work with all documented schedulers.
Note: `tesCreateAllTestSchedulers` is a superset of what is covered in `testCreateScheduer`. It may be reasonable to retire the later test.
Per the [Ray docs contributing guide](https://docs.ray.io/en/master/ray-contribute/docs.html), code chunks should be in `.py` files and pulled in via `literalinclude` rather than placed directly in `.rst` files. This PR takes a small step in doing this for the RLlib docs, specifically for the training API doc page.
Note that I had to make some changes to the code itself so that it would run, namely adding missing numpy imports and changing `model.from_batch(...)` to `model(...)` in a couple places.
Co-authored-by: Max Pumperla <max.pumperla@googlemail.com>
This PR makes several improvements to the Datasets' tensor story. See the issues for each item for more details.
- Automatically infer tensor blocks (single-column tables representing a single tensor) when returning NumPy ndarrays from map_batches(), map(), and flat_map().
- Automatically infer tensor columns when building tabular blocks in general.
- Fixes shuffling and sorting for tensor columns
This should improve the UX/efficiency of the following:
- Working with pure-tensor datasets in general.
- Mapping tensor UDFs over pure-tensor, a better foundation for tensor-native preprocessing for end-users and AIR.
Since ray supports Redis as a storage backend, we should ensure the code path with Redis as storage is still being covered e2e.
The tests don't run for a while after we switch to memory mode by default. This PR tries to fix this and make it run with every commit.
In the future, if we support more and more storage backends, this should be revised to be more efficient and selective. But now I think the cost should be ok.
This PR is part of GCS HA testing-related work.
* [runtime env] runtime env inheritance refactor (#22244)
Runtime Environments is already GA in Ray 1.6.0. The latest doc is [here](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#runtime-environments). And now, we already supported a [inheritance](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#inheritance) behavior as follows (copied from the doc):
- The runtime_env["env_vars"] field will be merged with the runtime_env["env_vars"] field of the parent. This allows for environment variables set in the parent’s runtime environment to be automatically propagated to the child, even if new environment variables are set in the child’s runtime environment.
- Every other field in the runtime_env will be overridden by the child, not merged. For example, if runtime_env["py_modules"] is specified, it will replace the runtime_env["py_modules"] field of the parent.
We think this runtime env merging logic is so complex and confusing to users because users can't know the final runtime env before the jobs are run.
Current PR tries to do a refactor and change the behavior of Runtime Environments inheritance. Here is the new behavior:
- **If there is no runtime env option when we create actor, inherit the parent runtime env.**
- **Otherwise, use the optional runtime env directly and don't do the merging.**
Add a new API named `ray.runtime_env.get_current_runtime_env()` to get the parent runtime env and modify this dict by yourself. Like:
```Actor.options(runtime_env=ray.runtime_env.get_current_runtime_env().update({"X": "Y"}))```
This new API also can be used in ray client.
This PR adds precise reason details regarding worker failures. All information is available either by
- ray list workers
- exceptions from actor failures.
Here's an example when the actor is killed by a SIGKILL (e.g., OOM killer)
```
RayActorError: The actor died unexpectedly before finishing this task.
class_name: G
actor_id: e818d2f0521a334daf03540701000000
pid: 61251
namespace: 674a49b2-5b9b-4fcc-b6e1-5a1d4b9400d2
ip: 127.0.0.1
The actor is dead because its worker process has died. Worker exit type: UNEXPECTED_SYSTEM_EXIT Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
```
## Design
Worker failures are reported by Raylet from 2 paths.
(1) When the core worker calls `Disconnect`.
(2) When the worker is unexpectedly killed, the socket is closed, raylet reports the worker failures.
The PR ensures all worker failures are reported through Disconnect while it includes more detailed information to its metadata.
## Exit types
Previously, the worker exit types are arbitrary and not correctly categorized. This PR reduces the number of worker exit types while it includes details of each exit type so that users can easily figure out the root cause of worker crashes.
### Status quo
- SYSTEM ERROR EXIT
- Failure from the connection (core worker dead)
- Unexpected exception or exit with exit_code !=0 on core worker
- Direct call failure
- INTENDED EXIT
- Shutdown driver
- Exit_actor
- exit(0)
- Actor kill request
- Task cancel request
- UNUSED_RESOURCE_REMOVED
- Upon GCS restart, it kills bundles that are not registered to GCS to synchronize the state
- PG_REMOVED
- When pg is removed, all workers fate share
- CREATION_TASK (INIT ERROR)
- When actor init has an error
- IDLE
- When worker is idle and num workers > soft limit (by default num cpus)
- NODE DIED
- Only can detect when the node of the owner is dead (need improvement)
### New proposal
Remove unnecessary states and add “details” field. We can categorize failures by 4 types
- UNEXPECTED_SYSTEM_ERROR_EXIT
- When the worker is crashed unexpectedly
- Failure from the connection (core worker dead)
- Unexpected exception or exit with exit_code !=0 on core worker
- Node died
- Direct call failure
- INTENDED_USER_EXIT.
- When the worker is requested to be killed by users. No workflow required. Just correctly store the state.
- Shutdown driver
- Exit_actor
- exit(0)
- Actor kill request
- Task cancel request
- INTENDED_SYSTEM_EXIT
- When the worker is requested to be killed by system (without explicit user request)
- Unused resource removed
- Pg removed
- Idle
- ACTOR_INIT_FAILURE (CREATION_TASK_FAILED)
- When the actor init is failed, we fate share the process with the actor.
- Actor init failed
## Limitation (Follow up)
Worker failures are not reported under following circumstances
- Worker is failed before it registers its information to GCS (it is usually from critical system bug, and extremely uncommon).
- Node is failed. In this case, we should track Node ID -> Worker ID mapping at GCS and when the node is failed, we should record worker metadata.
I will create issues to track these problems.
There is a race condition where the failure message is sent but resubscription hasn't been established. In this case, we'll lose this message.
This PR fixes the test case only by making sure resubscription is done before killing the worker.
We are not going to fix this issue for the first version of GCS HA because:
1. Right now, only failed worker will be reported and it'll be used to eagerly kill the unnecessary workers. From correctness, it's ok.
2. worker/task support is not P0 for GCS HA (Ray Serve). Actor doesn't have this issue since the subscription for the actor is by actor id and raylet will fetch the actor status when resubscribing.
Things are not working:
- we won't kill workers which are not useful anymore. So for a very extreme case (the worker hangs), it'll have a resource leak.
This fixes two bugs in data locality:
When a dependent task is already in the CoreWorker's queue, we ran the data locality policy to choose a raylet before we added the first location for the dependency, so it would appear as if the dependency was not available anywhere.
The locality policy did not take into account spilled locations.
Added C++ unit tests and Python tests for the above.
Related issue number
Fixes#24267.
This PR adds a FAQ to Datasets docs.
Docs preview: https://ray--24932.org.readthedocs.build/en/24932/
## Checks
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [x] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(
Co-authored-by: Eric Liang <ekhliang@gmail.com>
Previously we pinned the controller on the head node because it was the SPOF, now with GCS HA work that will no longer be the case so it should be able to be run on any node. We still prefer the head node for non-HA cases.
This PR adds a dedicated docs page for examples, and adds a basic e2e tabular data processing example on the NYC taxi dataset.
The goal of this example is to demonstrate basic data reading, inspection, transformations, and shuffling, along with ingestion into dummy model trainers and doing dummy batch inference, for tabular (Parquet) data.
This PR overhauls the "Accessing Datasets", adding proper coverage of each data consuming methods, including the ML framework exchange APIs (to_torch() and to_tf()).
It seems `_InactiveRpcError` is not saved as the last exception. Raise an explicit error when publishing fails after retries.
For log and error publishing, dropping messages should be tolerable so log the exception instead.
The previous implementation of TorchPredictor failed when the model outputted an array for each sample (for example outputting logits). This is because Pandas Dataframes cannot hold numpy arrays of more than 1 element. We have to convert the outermost numpy array returned by the model to a list so that it be can stored in a Pandas Dataframe.
The newly added test fails under the old implementation.
For mac and windows, we default to start the head node with 127.0.0.1 to avoid security pop-ups and assume people won't use cluster mode a lot. However, when people do want to create a mac/windows cluster, our connection message is confusing since we cannot connect to the head node that's listening on 127.0.0.1. This PR tells people what to do in this case (e.g. use node-ip-address).
This PR adds a fast path for `sync_dir_between_nodes` that gets triggered if both source IP and target IP are the same. It uses simple `shutil` operations instead of packing and unpacking to improve performance.
This PR moves all exception classes from runtime module to api module. It's aiming to eliminate the confusion about ray exceptions. It means that Ray users don't need to touch runtime module when API programming after this PR.
Note that this should be merged onto 2.0.