This reverts commit f1eedb15b6.
## Why are these changes needed?
Self node should avoid reading any updates from gcs for node resource change since it'll maintain local view by itself.
## Related issue number
#19438
## Why are these changes needed?
This is the first step in migrating Redis pubsub to be GCS pubsub based. Changes include:
- Remove `SubscribeAll()` API for Actor pubsub since it is only used in tests. Supporting both `Subscribe()` and `SubscribeAll()` APIs would be too complex without much return.
- Update `Subscribe()` API to accept a done status callback.
- Implement `SubscribeAll()` / `Unsubscribe()`(from channel) API in Ray pubsub.
- Implement using Ray pubsub for Actor, Job, Node info and Node resource publishing / subscribing.
GCS changes are tested with GCS server test in GCS pubsub mode.
## Related issue number
This reverts commit a907168184.
## Why are these changes needed?
This PR seems to have some huge perf regression on `placement_group_test_2.py`. It took 128s before, and after this PR was merged, it took 315 seconds.
## Related issue number
## Why are these changes needed?
If an actor failover is triggered, but the RPC connection between the caller and the crashed actor instance is not disconnected automatically, subsequent tasks to the new actor instance may not be executed. The root cause is that the sequence numbers of tasks sent to the new actor instance is not starting from 0. Details can be found in #14727.
This PR fixes it by ensuring all inflight actor tasks fail immediately when actor failover is detected (via actor state notifications).
## Related issue number
closes#14727
## Why are these changes needed?
When gcs broad cast node resource change, raylet will use that to update local node as well which will lead to local node instance and nodes_ inconsistent.
1. local node has used all some pg resource
2. gcs broadcast node resources
3. local node now have resources
4. scheduler picks local node
5. local node can't schedule the task
6. since there is only one type of job and local nodes hasn't finished any tasks so it'll go to step 4 ==> hangs
## Related issue number
#19438
## Why are these changes needed?
Prior to this PR, we have:
```cpp
class XxxAccessor {}
class ServiceBasedXxxAccessor : public XxxAccessor{}
class GcsClient {}
class ServiceBasedGcsClient : public GcsClient{}
```
However, XxxAccessor has only one implementation: ServiceBasedXxxAccessor. And GcsClient has only one implementation: ServiceBasedGcsClient.
I think this abstraction is not necessary and will make development hard(I have to modify two files every time).
This PR removes all ServiceBasedXxx and moves its implementations to the base class.
Now we only have:
```cpp
class XxxAccessor {}
class GcsClient {}
```
## Why are these changes needed?
- Since broadcasting is moving to grpc, introducing the option to increase the client side thread number
- For hybrid schedule, ignore the threshold if gcs based actor scheduler is enabled
With these fixing, actor creation rate > 600actor/s vs ~ 140 actor/s
## Related issue number
Why are these changes needed?
The current logic can cause resource leak if AllocateTaskResourceInstances is requested with the custom resources that don't exist in the local node. The original assumption was the caller will free resources when it returns false, but it is an error prone API, and it actually turns out that we don't do this anywhere.
Related issue number
Closes#17044
This PR adds more infrastructure for subscribing to GCS via ray::pubsub instead of Redis.
Most important logic added are
GCS subscriber RPC interface in src/ray/protobuf/gcs_service.proto
GCS subscriber handler in src/ray/gcs/gcs_server/pubsub_handler.{h,cc}
GCS wrapper for ray::pubsub subscriber in src/ray/gcs/pubsub/gcs_pub_sub.{h,cc}
Other files are modified for adding boilerplates, plumbing, removing dead code and cleanups.
This PR can also be reviewed commit by commit. 418f065, 3279430 are cleanups. 028939c is a pure-refactoring of how GCS clients subscribe to GCS updates that should not change behavior yet, similar to [Pubsub] Wrap Redis-based publisher in GCS to allow incrementally switching to the GCS-based publisher #19600. 286161f parameterized gcs_server_test to test GCS pubsub. The rest of commits have new logic added.
All new logic are behind the gcs_grpc_based_pubsub flag, so this PR should not affect Ray's default behavior.
The added subscriber logic was tested by enabling gcs_grpc_based_pubsub in service_based_gcs_client_test.cc and adding basic handling logic for TaskLease. Since TaskLease pubsub will be removed, the change will not be checked in.
Next step is to support SubscribeAll entities for a channel in ray::pubsub, and test migrating more channels.
## Why are these changes needed?
Previously, we don't send requests if there is an in-flight request. But this is actually bad, because it prevent raylet get the latest information. For example, if the request needs 200ms to arrive at the raylet, the raylet will lose one update. In this case, the next request will arrive after 200 + 100 + (in flight time) ms. So we still should send the request.
TODO:
- Push the snapshot to raylet if the message is lost.
- Handle message loss in raylet better.
## Related issue number
#19438
This RPC is from legacy code and not needed anymore (the task spec is already in the actor table), but it adds quite amount of keys to Redis.
The below is the sum of bytes size(? I am not sure if it is bytes size, but I grabbed the length of the value when I queried Redis) of each prefix when running many_ppo. As you can see Task& and Task takes a lot of part although they are not really used.
�[0m ��[12A�[9C�[?7h�[0m�[?12l�[?25h�[?25l�[?7l�[0mb�[?7h�[0m�[?12l�[?25h�[?25l�[?7l�[10D�[0m�[J�[0;38;5;28mIn [�[0;92;1m82�[0;38;5;28m]: �[0mb�[10D�[0m
�[J�[?7h�[0m�[?12l�[?25h�[?2004l�[0m�[?7h�[0;38;5;88mOut[�[0;91;1m82�[0;38;5;88m]: �[0m�[0m
defaultdict(int,
{b'WORKE': 1080864,
b'ACTOR': 1470931,
b'TASK&': 1020646,
b'TASK:': 870551,
b'PROFI': 360000,
b'PLACE': 10107,
b'JOB:\x01': 8,
b'JOB:\x04': 8,
b'NODE:': 99,
b'NODE_': 126,
b'INTER': 44,
b'JOB:\x03': 8,
b'redis': 16,
b'JOB:\x02': 8,
b'JOB:\x05': 8})
## Why are these changes needed?
When ray spill back, it'll check whether the node exists or not through gcs, so there is a race condition and sometimes raylet crashes due to this.
This PR filter out the node that's not available when select the node.
## Related issue number
#19438
## Why are these changes needed?
The most significant change of the PR is the `GcsPublisher` wrapper added to `src/ray/gcs/pubsub/gcs_pub_sub.h`. It forwards publishing to the underlying `GcsPubSub` (Redis-based) or `pubsub::Publisher` (GCS-based) depending on the migration status, so it allows incremental migration by channel.
- Since it was decided that we want to use typed ID and messages for GCS-based publishing, each member function of `GcsPublisher` accepts a typed message.
Most of the modified files are from migrating publishing logic in GCS to use `GcsPublisher` instead of `GcsPubSub`.
Later on, `GcsPublisher` member functions will be migrated to use GCS-based publishing.
This change should make no functionality difference. If this looks ok, a similar change would be made for subscribers in GCS client.
## Related issue number
## Why are these changes needed?
Recently we found that gcs is using a lot of CPU in scheduling actors and it's because the code is not well organized. This PR improved the SelectNodes function. From profiling, for many nodes actor test, 50% of CPU is wasted and could be saved here.
## Related issue number