When loading the data from GCS, for detached actors, we treat it the same as normal actors.
But the detached actor lives beyond the job's scope and should be loaded even when the job is finished.
This PR fixed it.
This fixes two bugs in Datasets push-based shuffle:
Scheduling strategy specified by the caller was not getting propagated correctly to the map stage in push-based shuffle. This is because the map and reduce stages shared the same ray.remote options dict, and we deleted the caller-specified scheduling strategy from the reduce stage so that we could specify a NodeAffinitySchedulingStrategy instead.
We were only reporting partial stats for the merge stage.
Related issue number
Issue 1 is necessary for performance at large-scale (#24480).
This makes it possible to use an NFS file system that is shared on a cluster for runtime_env working directories.
Co-authored-by: shrekris-anyscale <92341594+shrekris-anyscale@users.noreply.github.com>
Co-authored-by: Eric Liang <ekhliang@gmail.com>
closes#24475
Current deployment graph has big perf issues compare with using plain deployment handle, mostly because overhead of DAGNode traversal mechanism. We need this mechanism to empower DAG API, specially deeply nested objects in args where we rely on pickling; But meanwhile the nature of each execution becomes re-creating and replacing every `DAGNode` instances involved upon each execution, that incurs overhead.
Some overhead is inevitable due to pickling and executing DAGNode python code, but they could be quite minimal. As I profiled earlier, pickling itself is quite fast for our benchmarks at magnitude of microseconds.
Meanwhile the elephant in the room is DeploymentNode and its relatives are doing too much work in constructor that's beyond necessary, thus slowing everything down. So the fix is as simple as
1) Introduce a new set of executor dag node types that contains absolute minimal information that only preserves the DAG structure with traversal mechanism, and ability to call relevant deployment handles.
2) Add a simple new pass in our build() that generates and replaces nodes with executor dag to produce a final executor dag to run the graph.
Current ray dag -> serve dag mixed a lot of stuff related to deployment generation and init args, in longer term we should remove them but our correctness depends on it so i rather leave it as separate PR.
### Current 10 node chain with deployment graph `.bind()`
```
chain_length: 10, num_clients: 1
latency_mean_ms: 41.05, latency_std_ms: 15.18
throughput_mean_tps: 27.5, throughput_std_tps: 3.2
```
### Using raw deployment handle without dag overhead
```
chain_length: 10, num_clients: 1
latency_mean_ms: 20.39, latency_std_ms: 4.57
throughput_mean_tps: 51.9, throughput_std_tps: 1.04
```
### After this PR:
```
chain_length: 10, num_clients: 1
latency_mean_ms: 20.35, latency_std_ms: 0.87
throughput_mean_tps: 48.4, throughput_std_tps: 1.43
```
This PR consolidates the Ray Train and Tune checkpoint managers. These concepts previously did something very similar but in different modules. To simplify maintenance in the future, we've consolidated the common core.
- This PR keeps full compatibility with the previous interfaces and implementations. This means that for now, Train and Tune will have separate CheckpointManagers that both extend the common core
- This PR prepares Tune to move to a CheckpointStrategy object
- In follow-up PRs, we can further unify interfacing with the common core, possibly removing any train- or tune-specific adjustments (e.g. moving to setup on init rather on runtime for Ray Train)
The consolidation is split into three PRs:
1. This PR - adds a common checkpoint manager class.
2. #24772 - based on this PR, adds the integration for Ray Train
3. #24430 - based on #24772, adds the integration for Ray Tune
Currently the release test runner prefers the first successfully version of a cluster env, instead of the last version. But sometimes a cluster env may build successfully on Anyscale but cannot launch cluster successfully (e.g. version 2 here) or new dependencies need to be installed, so a new version needs to be built. The existing logic always picks up the 1st successful build and cannot pick up the new cluster env version.
Although this is an edge case (tweaking cluster env versions, with the same Ray wheel or cluster env name), I believe it is possible for others to run into it.
Also, avoid running most of the CI tests for changes under release/ray_release/.
Follow up: #24017
Briefly, wandb service is still in experimental stage, and is not ready to be released as an integration without extensive testing. Hence, we are interested in rolling back the update to the integration we made recently, until this feature is ready to be shipped.
When the primary copy of an object is lost, owner will try to pin the secondary copy. In the meantime, the secondary copy might be evicted. In this case, the PinObjectIDs rpc call should return error to let the owner know that the pin failed. Otherwise the owner will mistakenly think the secondary copy is pinned.
This is a follow-up PRs of https://github.com/ray-project/ray/pull/24813 and https://github.com/ray-project/ray/pull/24628
Unlike the change in cpp layer, where the resubscription is done by GCS broadcast a request to raylet/core_worker and the client-side do the resubscription, in the python layer, we detect the failure in the client-side.
In case of a failure, the protocol is:
1. call subscribe
2. if timeout when doing resubscribe, throw an exception and this will crash the system. This is ok because when GCS has been down for a time longer than expected, we expect the ray cluster to be down.
3. continue to poll once subscribe ok.
However, there is an extreme case where things might be broken: the client might miss detecting a failure.
This could happen if the long-polling has been returned and the python layer is doing its own work. And before it sends another long-polling, GCS restarts and recovered.
Here we are not going to take care of this case because:
1. usually GCS is going to take several seconds to be up and the python layer's work is simply pushing data into a queue (sync version). For the async version, it's only used in Dashboard which is not a critical component.
2. pubsub in python layer is not doing critical work: it handles logs/errors for ray job;
3. for the dashboard, it can just restart to fix the issue.
A known issue here is that we might miss logs in case of GCS failure due to the following reasons:
- py's pubsub is only doing best effort publishing. If it failed too many times, it'll skip publishing the message (lose messages from producer side)
- if message is pushed to GCS, but the worker hasn't done resubscription yet, the pushed message will be lost (lose messages from consumer side)
We think it's reasonable and valid behavior given that the logs are not defined to be a critical component and we'd like to simplify the design of pubsub in GCS.
Another things is `run_functions_on_all_workers`. We'll plan to stop using it within ray core and deprecate it in the longer term. But it won't cause a problem for the current cases because:
1. It's only set in driver and we don't support creating a new driver when GCS is down.
2. When GCS is down, we don't support starting new ray workers.
And `run_functions_on_all_workers` is only used when we initialize driver/workers.