Next move of #19220. This pr replace unordered_map to flat_hash_map in most GCS code and some util & common modules.
The placement group part, which exposes user interfaces in Java/Python, is exclusive as it's a little bit complicated.
The follow-up PRs would be migrating in core worker, placement group and others.
To hidden symbols of thirdparty library, this pull request reuses internal namespace that can be imported by any external native projects without side effects.
Besides, we suggest all of contributors to make sure it'd better use thirdparty library in ray scopes/namspaces and only ray::internal should be exported.
More details in https://github.com/ray-project/ray/pull/22526
Mobius has applied this change in https://github.com/ray-project/mobius/pull/28.
Co-authored-by: 林濯 <lingxuan.zlx@antgroup.com>
Currently object transfers assume that the object size is fixed. This is a bad assumption during failures, especially with lineage reconstruction enabled and tasks with nondeterministic outputs.
This PR cleans up the handling and hopefully guards against two cases where the object size may change during a transfer:
1. The object manager's size information does not match the object in the local plasma store (due to async notifications). --> the object manager overwrites its own information if it finds that the physical object has a different size.
2. The receiver's created buffer size does not match the sender's object size. --> the receiver destroys the previous buffer and creates a new buffer with the correct size. This might cause some transient errors but eventually object transfer should succeed.
Unfortunately I couldn't trigger this from Python because it depends on some pretty specific timing conditions. However, I did add some unit tests for case 2 (this is the majority of the PR).
In Ray, functions are exported to the function table during runtime. But it's not cleaned up after use. This PR garbage collects the resource when there is no job/detached actor referencing the resource.
Ideally, we should move the function table imports/exports feature to core, so gcs function manager is introduced, and currently, it's for reference counting only.
- This PR moves the `ObjectManager::Wait` related logic to a separate WaitManager class.
- Fix the wait hang issue by not relying on the async object location notification, but checking if wait is complete when the local object is added, at that time the object is guaranteed to be local.
We need to get not only ray_namespace config of a job. In this PR, we cache the job_configs instead of ray_namespaces, so that we can use it for other PR(For example, this PR #21249 needs the num_java_worker_pre_process item).
Also, before this PR, ray_namespaces_ cache will not be cleared, and we clear the cache in this PR.
For actor channel, GCS clients subscribe to a single actor but dashboard subscribes to all actors. This change makes supporting this possible.
Most of the added code is in `integration_test.cc`, which tests the publisher and subscriber together.
Also, add the basic support for dashboard reporter pubsub.
This PR centralizes all existing metrics to `metric_defs.h`.
Previously, each file relies on implicit import of metric_def.h within the stats module. After this PR we only precisely import `metric_defs.h` for each file.
This is part work of redis removal. In this PR we introduced a new mode for internal kv, memory mode.
There are two ways to address this:
- Update store client and use store client in internal kv
- Add memory table into internal kv directly.
The former one actually is a better choice since it put everything related to storage into a lowerlevel. But it's pretty hard to do this now, since internal kv use hset/hget and redis store client use set/get, so the data will not be compatible and it'll be a brake change.
So the easier way to do this is 2) and it's what this PR doing.
Next: use the flag for store client
Here we met a crash in line 446's RAY_CHECK
d26c9e67e8/src/ray/object_manager/ownership_based_object_directory.cc (L441-L450)
And we found out that it's because we didn't set the node_id for dead nodes. If there are dead nodes and we are trying to LookupRemoteConnectionInfo in it. This crash will happen.
This PR fixes this crash.
Right now in ray, a lot of edge cases related to grpc are not tested. This PR is just a simple try to give the developer some way to delay grpc request. It could be used with manual testing and also e2e test since it's supporting delay for specific grpc method.
To use this feature, just simple set os env `RAY_TESTING_ASIO_DELAY_US="method1=10:20,method2=20:30,*=200:200"`
This means, for `method1` it'll delay 10-20us, for method2 it'll delay 20-30us. For all the rest, it'll delay 200us.
## Why are these changes needed?
ThreadPoolManager and FiberStateManager have the same functionality and logic. This PR aims to remove the duplicate implementations of them.
Add a ConcurrencyGroupExecutor class to do that logic. `ConcurrencyGroupExecutor<FiberState>` is used as FiberStateManager, `ConcurrencyGroupExecutor<BoundedExecutor>` is used as ThreadPoolManager.
Why are these changes needed?
This is the third PR in the stack that supports out or order execution for threaded/async actors. Previous PR #20149 Next PR #20160
At a high level, threaded actor/async actor already don't guarantee execution order, and the current "sequential" order implementation has caused some confusion and inconvenience. Please refer to #19822 for detailed discussion.
In this PR, we implemented the out-of-order of queue that supports out of order execution. Conceptually it's very simple: it sends the requests as soon as the dependency is resolved.
* [Core]Make convertion between ray/grpc status more specific
* per comments
* lint
* per comments
* use ABORT instead of UNKNOWN, add some tests
* lint
* lint
To avoid exporting thrirdparty library symbol globally, these absl/grpc libs have been applied in _streaming.so.
Side-effect:
Static variables might be uninitialized if core worker lib and streaming lib both use them.
## Why are these changes needed?
Prior to this PR, we have:
```cpp
class XxxAccessor {}
class ServiceBasedXxxAccessor : public XxxAccessor{}
class GcsClient {}
class ServiceBasedGcsClient : public GcsClient{}
```
However, XxxAccessor has only one implementation: ServiceBasedXxxAccessor. And GcsClient has only one implementation: ServiceBasedGcsClient.
I think this abstraction is not necessary and will make development hard(I have to modify two files every time).
This PR removes all ServiceBasedXxx and moves its implementations to the base class.
Now we only have:
```cpp
class XxxAccessor {}
class GcsClient {}
```
This PR adds more infrastructure for subscribing to GCS via ray::pubsub instead of Redis.
Most important logic added are
GCS subscriber RPC interface in src/ray/protobuf/gcs_service.proto
GCS subscriber handler in src/ray/gcs/gcs_server/pubsub_handler.{h,cc}
GCS wrapper for ray::pubsub subscriber in src/ray/gcs/pubsub/gcs_pub_sub.{h,cc}
Other files are modified for adding boilerplates, plumbing, removing dead code and cleanups.
This PR can also be reviewed commit by commit. 418f065, 3279430 are cleanups. 028939c is a pure-refactoring of how GCS clients subscribe to GCS updates that should not change behavior yet, similar to [Pubsub] Wrap Redis-based publisher in GCS to allow incrementally switching to the GCS-based publisher #19600. 286161f parameterized gcs_server_test to test GCS pubsub. The rest of commits have new logic added.
All new logic are behind the gcs_grpc_based_pubsub flag, so this PR should not affect Ray's default behavior.
The added subscriber logic was tested by enabling gcs_grpc_based_pubsub in service_based_gcs_client_test.cc and adding basic handling logic for TaskLease. Since TaskLease pubsub will be removed, the change will not be checked in.
Next step is to support SubscribeAll entities for a channel in ray::pubsub, and test migrating more channels.
## Why are these changes needed?
When ray spill back, it'll check whether the node exists or not through gcs, so there is a race condition and sometimes raylet crashes due to this.
This PR filter out the node that's not available when select the node.
## Related issue number
#19438
## Why are these changes needed?
The most significant change of the PR is the `GcsPublisher` wrapper added to `src/ray/gcs/pubsub/gcs_pub_sub.h`. It forwards publishing to the underlying `GcsPubSub` (Redis-based) or `pubsub::Publisher` (GCS-based) depending on the migration status, so it allows incremental migration by channel.
- Since it was decided that we want to use typed ID and messages for GCS-based publishing, each member function of `GcsPublisher` accepts a typed message.
Most of the modified files are from migrating publishing logic in GCS to use `GcsPublisher` instead of `GcsPubSub`.
Later on, `GcsPublisher` member functions will be migrated to use GCS-based publishing.
This change should make no functionality difference. If this looks ok, a similar change would be made for subscribers in GCS client.
## Related issue number
## Why are these changes needed?
There are some issues left from previous PRs.
- Put the gcs_actor_scheduler_mock_test back
- Add comment for named actor creation behavior
- Fix the comment for some flags.
## Related issue number
* exp backoff
* up
* format
* up
* up
* up
* up
* up
* format
* fix
* up
* format
* adjust ordering
* up
* Revert "[tune] Cache unstaged placement groups for potential re-use (#18706)"
This reverts commit 2e99fb215f.
* up
* update
* format
* up
* format
* fix
* Revert "Revert "[tune] Cache unstaged placement groups for potential re-use (#18706)""
This reverts commit 93425fdb986059e53699623a0fc8590c062e139b.
* up
* format
* fix lint
* up
* up
* up
* up
* check
* add test1
* format
* up
* add test
* up
* up
* up
* fix
* up
* up
* up
* add test
* format
* up
* up
* fix lint
* format
* fix
* format
* fix
* up