This PR implements ray list tasks and ray list objects APIs.
NOTE: You can ignore the merge conflict for now. It is because the first PR was reverted. There's a fix PR open now.
This PR focuses on updating syncer-related code and comments from this #23660 to reduce the code size.
Update Snapshot/Update -> CreateSyncMessage/ConsumeSyncMessage
Make ray syncer test work even when we add more components in the protobuf
Make ray syncer able to reconnect to a new node.
Several changes to make spread scheduling work better under load:
* When nodes are not available, spread among feasible nodes.
* If grant_or_reject is true, don't spill back if the selected node is not available.
* Don't spill due to waiting for dependencies for spread tasks.
In a1e06f64ae, memory bound was added for each subscribed entity in the publisher. It adds two extra `std::deque` per subscribed entity, which turns out to cost a lot more memory when there are a large number of `ObjectRef`s: https://github.com/ray-project/ray/pull/23853#issuecomment-1098382286
This PR avoids the extra memory usage for entities in channels unlikely to grow too large, i.e. all channels except those for logs and error info. Subscribed entity memory usage no longer shows up in the memory profile when there are 1M object refs. Raw data: [profile006.pb.gz](https://github.com/ray-project/ray/files/8508547/profile006.pb.gz)
A user has reported a crash in GCS client where the client was unable to connect to the GCS server after retries, even when GCS server has always been running. I was not able to reproduce the exact issue, but noticed that the health check logic with socket has unexpected behavior sometimes, e.g. it is much slower to use socket for health check compared to using gRPC (~40s vs < 1s sometimes). The user issue could be related to this slowness, so this PR updates the logic to use gRPC health check.
* Provide a utility to ping a Ray cluster and verify it has the same Ray version. This is useful to check if a Ray cluster is available at a given address, without connecting to the cluster with the more heavyweight ray.init(). This utility is integrated with ray memory to provide a better error message when the Ray cluster is unavailable. There seem to be user demand for exposing this as an API as well.
* Improve the error message when the address provided to Ray does not contain port.
To remove symbols conflict effect on core worker linked different ray versions. This PR extracts an united core worker api (not all) and collect them into a internal library, so native devs can use them anywhere no matter the core worker implementation changes.
- Logically these two rpcs are about notifying the owner about the object location changes, so we should just have one rpc for that purpose. This prevents out-of-order updates seen by the owner (i.e. receiving object removed from object store before spill update). Also by using UpdateObjectLocationBatch, we get batch update for free.
- Maintain a FIFO order for object location updates so we won't have starvation.
During large scale shuffle (number of partitions used >= 1000), driver uses significant amount of memory for storing ObjectRefs. On Intel MacOS, each Reference struct currently takes up 592 bytes. We can reduce per-Reference memory footprint:
- During shuffle, no ObjectRef borrowing or nesting happens. And in this case fields related to borrowing or nesting should not take up memory. This reduces sizeof(Reference) from 592 to 400.
- Fields in the Reference struct can be reordered to enhance packing. This reduces sizeof(Reference) from 400 to 368.
On Intel MacOS running the shuffle benchmark with 1000 partitions and 10MB partition size, RSS at the end of shuffle drops from ~5GB to ~4.5GB.
Related issue number
#23604
Instead of relying on the node-ip custom resource for static task-to-node placement, this PR introduces an explicit NodeAffinitySchedulingStrategy with the following benefits:
1. Specify node using id instead of ip since ip may not be unique for each node.
2. Support soft constraint so the task can be tolerant to node failures.
After this PR, the node-ip custom resource can be deprecated.
This is a rebase version of #11592. As task spec info is only needed when gcs create or start an actor, so we can remove it from actor table and save the serialization time and memory/network cost when gcs clients get actor infos from gcs.
As internal repository varies very much from the community. This pr just add some manual check with simple cherry pick. Welcome to comment first and at the meantime I'll see if there's any test case failed or some points were missed.
Today we have two storage interfaces in Gcs, one is InternalKvInterface which exposes key value interfaces, another is StoreClient which is kv interface with secondary index support.
To make GCS storage pluggable, we need to narrow down and unify the storage interface. This is a try to only use kv store and build index purely in memory.
known limitations:
we need to rebuild index during GCS startup
there might be consistency issues when concurrent change (write/delete) to the same key; but the current redis based solution also suffer from the same issue.
We want to limit the maximum memory used for each subscribed entity, e.g. to avoid having GCS run out of memory if some workers publish a huge amount of logs and subscribers cannot keep up.
After this change, Ray publishers maintain one message buffer for all subscribers of an entity, and one message buffer for all subscribers of all entities in the channel.
The limit can be configured with publisher_entity_buffer_max_bytes. The default value is 10MiB.
As we (@scv119 @raulchen @Chong-Li @WangTaoTheTonic) discussed offline, and @scv119 also mentioned it in (#23323 (comment)). I refactor the interface of ISchedulingPolicy and make it expose only one batch interface., and still provide a SingleSchedulePolicy to be compatible with single scheduler.
Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
Copied from #22571:
Whenever we spill, we try to spill all spillable objects. We also try to fuse small objects together to reduce total IOPS. If there aren't enough objects in the object store to meet the fusion threshold, we spill the objects anyway to avoid liveness issues. However, currently we spill at most the object fusion size when instead we should be spilling at least the fusion size. Then we use the max number of fused objects as a cap.
This PR fixes the fusion behavior so that we always spill at minimum the fusion size. If we reach the end of the spillable objects, and we are under the fusion threshold, we'll only spill it if we don't have other spills pending too. This gives the pending spills time to finish, and then we can re-evaluate whether it's necessary to spill the remaining objects. Liveness is also preserved.
Increases some test timeouts to allow tests to pass.
placement_group_test_5 is flakey. Reason is requesting PG with exact object store memory as node. If object store has object, then PG scheduling fails.
Also fix bug - typo.
```
src/ray/common/test/ray_syncer_test.cc:495: Failure
| Expected: (s1.GetNumConsumedMessages(s2.syncer->GetLocalNodeID())) < (max_sends * 2 + 3), actual: 5 vs 5
```
This is measuring number of request send. For extreme case, they should equal. This PR fixed this.
Adds some metrics useful for object-intensive workloads:
Per raylet/object manager:
Add num bytes pending restore to spill manager
Add num requests cumulative to PullManager
Num bytes pushed/pulled from other nodes cumulative
Histogram for request latencies in PullManager:
total life time of request, from start to cancel
request satisfaction time, from start to object local
pull time, from object activation to object local
Per-node disk read/write speed, IOPS
"ResourceRequest" now uses 2 containers: a vector for predefined resources, and a map for custom resources.
This was intended to be a perf optimization. However, in practice, this makes the code more complex, and, moreover, prevents optimizations for some methods (e.g., "ResourceIds", "Size").
This PR removes the vector and makes ResourceRequest use only one map for all resources. Also, "ResourceIds" now returns a "boost:range" to allow iterating resource IDs without having to construct temporary sets.
microbenchmark shows a slight perf improvement.
last nightly: `placement group create/removal per second 837.76 +- 16.68`.
this PR: `placement group create/removal per second 895.76 +- 16.99`.
As we (@scv119 @iycheng @raulchen @Chong-Li @WangTaoTheTonic ) discussed offline, the GcsResourceScheduler on the GCS side should be unified to ClusterResourceScheduler.
There is already a big PR( #23268 ) to do this, but in order to make review easy, I will split it to two or mall small PRs.
This is [3/n]:
Move the implementation of all policies from gcs_resource_scheduler to bundle_scheduling_plocy
Delete gcs_resource_scheduler
Refactor gcs_resource_scheduler_test to cluster_resource_scheduler_2_test
BTW: The interface inside ISchedulingPolicy should be refactor in another PR, see the discussion #23323 (comment)
To be clear:
scorer related codes are moved out from gcs_resoruce_scheduler to scorer.h/.cc and no logic changes.
Policy related codes are moved out from gcs_resoruce_scheduler to bundle_scheduling_policy.h/.cc, and a small part of the logic in "GcsResourceScheduler::Schedule" is distributed into each policy.
Some codes inside gcs_placement_group_scheduler.h/.cc are changed to adapt to new data structure (SchedulingResult and SchedulingContext)
## Why are these changes needed?
This PR refactor the resource syncer to decouple it from GCS and raylet. GCS and raylet will use the same module to sync data. The integration will happen in the next PR.
There are several new introduced components:
* RaySyncer: the place where remote and local information sits. It's a coordinator layer.
* NodeState: keeps track of the local status, similar to NodeSyncConnection.
* NodeSyncConnection: keeps track of the sending and receiving information and make sure not sending the information the remote node knows.
The core protocol is that each node will send {what it has} - {what the target has} to the target.
For example, think about node A <-> B. A will send all A has exclude what B has to B.
Whenever when there is new information (from NodeState or NodeSyncConnection), it'll be passed to RaySyncer broadcast message to broadcast.
NodeSyncConnection is for the communication layer. It has two implementations Client and Server:
* Server => Client: client will send a long-polling request and server will response every 100ms if there is data to be sent.
* Client => Server: client will check every 100ms to see whether there is new data to be sent. If there is, just use RPC call to send the data.
Here is one example:
```mermaid
flowchart LR;
A-->B;
B-->C;
B-->D;
```
It means A initialize the connection to B and B initialize the connections to C and D
Now C generate a message M:
1. [C] RaySyncer check whether there is new message generated in C and get M
2. [C] RaySyncer will push M to NodeSyncConnection in local component (B)
3. [C] ServerSyncConnection will wait until B send a long polling and send the data to B
4. [B] B received the message from C and push it to local sync connection (C, A, D)
5. [B] ClientSyncConnection of C will not push it to its local queue since it's received by this channel.
6. [B] ClientSyncConnection of D will send this message to D
7. [B] ServerSyncConnection of A will be used to send this message to A (long-polling here)
8. [B] B will update NodeState (local component) with this message M
9. [D] D's pipelines is similar to 5) (with ServerSyncConnection) and 8)
10. [A] A's pipeline is similar to 5) and 8)
Running Datasets shuffle with 1TB data and 2k partitions sometimes times out due to a failed object fetch. This happens because the object directory notifies the PullManager that the object is already on the local node, even though it isn't. This seems to be a bug in the object directory.
To work around this on the PullManager side, this PR filters out the current node from the list of locations provided by the object directory. @jjyao confirmed that this fixes the issue for Datasets shuffle.
This is the first PR to refactor scheduler data structures (See #22850).
Major changes:
- Hid the implementation details in the `ResourceRequest` and `TaskResourceInstnaces` classes, which expose public methods such as algebra operators and comparison operators.
- Hid the differences between "predefined" and "custom" resources inside these 2 classes. Call sites can simply use the resource ID to access the resource, no matter it is predefined or custom.
- The predefined_resources vector now always has the full length. So no more "resize"s are needed.
- Removed the `ResourceCapacity` class. Now "total" and "available" resources are stored in separate fields in "NodeResources".
- Moved helper functions for FixedPoint vectors from "cluster_resource_data.h" to "fixed_point.h"
- "ResourceID" now has static methods to get the resource ids of predefined resources, e.g. "ResourceID::CPU()".
- Encapsulated unit-instance resource logic to "ResourceID"
Other planned changes that are not included in this PR:
- Rename ResourceRequest to ResourceSet, and move it to its own file.
- Remove the predefined vectors and always use maps.
Co-authored-by: Chong-Li <lc300133@antgroup.com>
This PR adds the API `setRuntimeEnv` for submitting a normal task, for the usage:
```java
RuntimeEnv runtimeEnv =
new RuntimeEnv.Builder()
.addEnvVar("KEY1", "A")
.build();
/// Return `A`
Ray.task(RuntimeEnvTest::getEnvVar, "KEY1").setRuntimeEnv(runtimeEnv).remote().get();
```
Getting or creating a named actor is a common pattern, however it is somewhat esoteric in how to achieve this. Add a utility function and test that it doesn't cause any scary error messages.
Actor.options(name="my_singleton", get_if_exists=True).remote(args)
- Move the URI reference logic from raylet to agent.
- Redefine the runtime env agent RPC to `CreateRuntimeEnvOrGet` and `DeleteRuntimeEnvIfPossible`
- More details https://github.com/ray-project/ray/issues/21695#issuecomment-1032161528
Future works
- We don't remove the `RuntimeEnvUris` from `RuntimeEnv` protobuf in current PR because gcs also uses those URIs to do GC by runtime_env_manager. We should also clear this.
- Ray client server shouldn't interact with agent directly. Or Ray client server should also decrease the reference count.
- Currently, `WorkerPool::HandleJobStarted` will be called multiple times for one job. So we should make sure this function is idempotent. Can we change this logic and make this function be called only once?
* Add new interface to policy for batch scheduling and unify the scheduling result and context
* Remove the dependence of GcsClient on ClusterResourceScheduler
* fix compile error
* fix lint error
Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>