I have seen errors where the prometheus view data dictionary is changed when iterating over it. So make a copy (which is atomic) before iterating.
Also, use a separate thread for GCS client heartbeat RPC. This avoids the issue of missing heartbeat when raylet is too busy.
When an object is under reconstruction, pull manager keeps the bundle request active with no timeout, which may block the next bundle request that's needed for the object reconstruction. As a result, we have deadlock.
For example, task 1 takes object A as argument and returns object B, task 2 takes object B as argument. When we run task 2, pull manager will add B to the queue and then B is lost. In this case, task 1 is re-submitted and A is added the the pull manager queue after B (assuming both tasks are scheduled to the same node). Due to limited available object store memory, A cannot be activated until B is pulled but B cannot be pulled until A is pulled and B is reconstructed.
The solution is that if an active pull request has pending-creation objects, pull manager will deactivates it until creation is done. This way, we will free object store memory occupied by the current active pull request so that next requests can proceed and potentially unblock the object creation.
This is the 1st PR to remove the code path of multiple core workers in one process. This PR is aiming to remove the flags and APIs related to `num_workers`.
After this PR checking in, we needn't to consider the multiple core workers any longer.
The further following PRs are related to the deeper logic refactor, like eliminating the gap between core worker and core worker process, removing the logic related to multiple workers from workerpool, gcs and etc.
**BREAK CHANGE**
This PR removes these APIs:
- Ray.wrapRunnable();
- Ray.wrapCallable();
- Ray.setAsyncContext();
- Ray.getAsyncContext();
And the following APIs are not allowed to invoke in a user-created thread in local mode:
- Ray.getRuntimeContext().getCurrentActorId();
- Ray.getRuntimeContext().getCurrentTaskId()
Note that this PR shouldn't be merged to 1.x.
A follow-up PR from this one: https://github.com/ray-project/ray/pull/24628
In the previous PR, it fixed the resubscribing issue for raylet. But there is also core worker which needs to do resubscribing.
There are two ways of doing resubscribe:
1. When the client-side detects any failure, it'll do resubscribing.
2. Server side will ask the client to do resubscribing.
1) is a cleaner and better solution. However, it's a little bit hard due to the following reasons:
- We are using long-polling, so for some extreme cases, we won't be able to detect the failure. For example, the client-side received the message, but before it sends another request, the server-side restarts, and the client will miss the opportunity of detecting the failure. This could happen if we have a standby GCS that starts very fast and somehow the client-side has a lot of traffic and runs very slow.
- The current gRPC framework doesn't give the user a way to handle failure which might need some refactoring on this one.
We can go with this way once we have gRPC streaming.
This PR is implementing 2) which includes three parts:
- raylet: (https://github.com/ray-project/ray/pull/24628)
- core worker: (this pr)
- python
Correctness: whenever when a worker started, it'll register to raylet immediately (sync call) before connecting to GCS. So, we just need to send all restart rpcs to registered workers and it should work because:
- if the worker just started and hasn't registered with the raylet: it's ok, because the worker hasn't connected with GCS yet, so no need to do resubscribing.
- if the worker has registered with the rayelt: it's covered by the code path here.
This PR adds 2 more states into TaskStatus
enum TaskStatus {
// The task is scheduled properly and waiting for execution.
// It includes time to deliver the task to the remote worker + queueing time
// from the execution side.
WAITING_FOR_EXECUTION = 5;
// The task that is running.
RUNNING = 6;
}
## Why are these changes needed?
<!-- Please give a short summary of the change and the problem this solves. -->
This PR fixes the path to resubscribe to GCS when GCS restarts.
When GCS restarts, it'll lose all subscription information since everything is stored in memory. Then in the runtime, we need to tell GCS what's currently being subscribed.
The previous method:
- We'll have a thread in core worker/raylet to check whether the GCS restarted or not.
- If it restarted, we'll send resubscribe request to GCS.
However, this is not working in these cases:
- GCS restarts happen so fast so the checker in raylet/core worker missed them.
- GCS doesn't restart, but just being lag due to network issues then, the resubscribe is not necessary.
Actually, GCS knows when a resubscribe is needed: when it restarts. So the PR here is to send a resubscribe request from GCS -> Raylet and Raylet will do the resubscription.
There are two parts for this to work:
- [x] raylet send resubscription
- [ ] raylet ask core worker to send resubscription
Currently, when an actor has `max_restarts` > 0 and has crashed, the actor will enter RESTARTING state and then ALIVE. Imagine this scenario: an online service provides HTTP service and the proxy actor receives requests, forwards them to worker actors, and replies to clients with the execution results from worker actors.
```
-> Worker A (actor)
/
/
HTTP requests -------> Proxy (actor with HTTP server) ---> Worker B (actor)
\
\
-> ...
```
For each HTTP request, the proxy picks one worker (e.g. worker A) based on some algorithm, sends the request to it, and calls `ray.get()` to wait for the result. If for some reason the picked worker crashed, Ray will restart the actor, and `ray.get()` will throw an error. The proxy may pick another worker (e.g. worker B) and re-send the request to it. This is OK.
But new requests keep coming. The proxy may pick worker A again. But because worker A is still in RESTARTING state, it's not ready to serve requests. `ray.get()` on subsequent requests sent to worker A will hang until worker A is back online (ALIVE state). The proxy won't be able to reschedule these requests to another worker because currently there's no way to know if worker A is alive or not before sending a request. We can't say worker A is not alive just based on whether `ray.get()` hangs either.
To solve this issue, we change the semantics of `max_task_retries`.
* When max_task_retries is 0 (which is the default value), if the callee actor is in the RESTARTING state, subsequently submitted tasks will fail immediately with a RayActorError. Users can catch the RayActorError and implement their own fallback strategies to improve service availability and mitigate service outages.
* When max_task_retries is not 0, subsequently submitted tasks will be queued on the caller side and we only send them to the callee when the callee actor is back to the ALIVE state.
TODO
- [x] Add test cases.
- [ ] Update docs.
- [x] API change review.
Aiming to:
1. addressing the bug about concurrency group, see #19593
2. improving the stability of the ray call latency perf in online applications.
we're proposing using async post instead of `PostBlocking` in threadpool.
Note that since we have already had back pressure in the caller side, I believe this change is safe to merge and it doesn't break any behavior.
This was a holdover from when local resources for URIs were deleted directly from the runtime env agent, and the URI name itself needed to store the information of which plugin it corresponded to so the appropriate plugin's `delete()` function could be called. After the URI reference refactor, I don't think this is needed anymore.
This PR supports setting the jars for an actor in Ray API. The API looks like:
```java
class A {
public boolean findClass(String className) {
try {
Class.forName(className);
} catch (ClassNotFoundException e) {
return false;
}
return true;
}
}
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder()
.addJars(ImmutableList.of("https://github.com/ray-project/test_packages/raw/main/raw_resources/java-1.0-SNAPSHOT.jar"))
.build();
ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
boolean ret = actor1.task(A::findClass, "io.testpackages.Foo").remote().get();
System.out.println(ret); // true
```
This PR support reconnection of the GCS client in gRPC channel layer. Previously this is implemented in the application layer:
- Health check is in the application layer by starting a new channel.
- Monitor the GCS address change and do resubscribe.
- Always retry the failed request and do reconnection in case of a failure.
However, there are several issues with this approach:
- We need service to discover for GCS address change.
- Monitor is too heavy since it always creates a channel.
- Reconnection is a blocking call that prevents the code from running.
This new approach moves the reconnection to gRPC layer directly to fix these issues.
- DNS name resolution is done by gRPC so we don't need to write this.
- Health check is done by checking the channel state.
- Queue the failed call and retry once GCS is up so that it's not a blocking call.
Under heap memory pressure, the raylet is often killed by the OS OOM killer. This is bad because it can cause whole system crashes and it is difficult to find the error afterwards. This PR adjusts the OOM score for any workers that the raylet spawns so that the OOM killer will hopefully prioritize killing those instead of the raylet.
Adds support for chunking large schedule calls. Needed to support ray.remote calls with more than 2GiB of arguments.
Deprecates the args and kwargs fields of ClientTask and replaces them with a data field that contains a tuple of the serialized args and kwargs fields, which can be chunked and reassembled more easily using the same logic as PutRequest's.
Found many log messages about Not enough memory to create requested object ... when running shuffle tests, even when object store memory is far from full.
It seems when ObjectBufferPool::AbortCreate() is called, Raylet logs Not enough memory to create requested object .... However, ObjectBufferPool::AbortCreate() is called under 3 different codepaths:
ObjectManager::ReceiveObjectChunk()
PullManager::UpdatePullsBasedOnAvailableMemory() -> cancel_pull_request_
PullManager::CancelPull() -> cancel_pull_request_
Only codepath (2) is due to having not enough object store memory. So the logging in ObjectBufferPool::AbortCreate() is moved to the callsites instead, which have more context of the situation and can log with more accurate messages.
Also change codepath (3) to be DEBUG, because it is an expected behavior and can be quite spammy when running shuffle / sort workload.
As discussed in #24322, rename so the function name matches its signature for PinObjectID(). Also rename the RPC request/reply/method names, to keep them consistent.
According to https://grpc.io/docs/guides/performance/ we should: Always re-use stubs and channels when possible.
This PR share channels between different services.
During investigations for #24176, it is found that the majority of memory used by Raylet and core workers are due to gRPC client (core worker) and server (raylet) data structures for inflight PinObjectIDs RPCs. Instead of buffering the requests in gRPC, this PR changes to buffer ObjectIDs that need to be pinned inside RayletClient instead. This shows significant reduction in raylet's memory usage outside of object stores.
Also made minor cleanups in Raylet client:
Move aborting object creation error from ObjectBufferPool::AbortCreate() to callsites, with hopefully more accurate reasons.
C++ style cleanups.
We have several issues if DisconnectClient happens before AnnounceWorkerPort:
- Check failure for removing io worker from registered_io_workers since the io worker is only added to that set after AnnounceWorkerPort.
- num_starting_(io)_workers is not decremented.
This PR fixes a bug: when the task is pushed to a core worker but hasn't been scheduled to run cancel is not called which will lead to the get request hanging forever.
The fix is to call the `Cancel`.
This PR allows for Ray to disable metrics collection. It was possible with RAY_enable_metrics_collection, but it didn't fully disable collection because there was a metrics collection happening from agent that wasn't properly disabled. This PR also adds tests.
#17581 introduced a warning about excess queuing for actors. Unfortunately since Ray 1.10.0, the metric used became wrong for async actors, resulting in bogus warnings when they are called more than 5000 times, even though there are not 5000 pending tasks.
The difference between 1.9.2 and 1.10.0 is that async actors tasks skip the queue in CoreWorkerClient::PushActorTask. However CoreWorkerClient::ClientProcessedUpToSeqno uses max_finished_seq_no_ which is never updated when the queue is skipped.
I think that a better metric for the amount of tasks that are pending submissions is the size of the internal queue CoreWorkerDirectActorTaskSubmitter::inflight_task_callbacks.
For tasks with node affinity scheduling strategy, the resource demands shouldn't create new nodes. This PR achieves this by not reporting demand to autoscaler. In the future, we will explore sending scheduling strategy information to autoscaler.
Resource load has different pattern compared with resource reporting. It's a simple rpc to gcs and gcs will do aggregation and later autoscaler will call gcs rpc to get the aggregated result.
This PR moves the load to another rpc which will simplify the syncer api.
This PR depends on #23754.
#23754 removes the need for index in the StoreClient interface.
This PR unifies InternalKVInterface and StoreClient. Specifically, we implement an InternalKVInterface which wraps around StoreClient.
When reconnecting GCS client to GCS, the client can attempt to re-subscribe existing keys and channels. One mitigation is to unsubscribe first before re-subscribing, but it is complicated to implement and only done for actors. Instead, we can ignore subscribing to the same key in the GCS client.
This PR implements ray list tasks and ray list objects APIs.
NOTE: You can ignore the merge conflict for now. It is because the first PR was reverted. There's a fix PR open now.
This PR focuses on updating syncer-related code and comments from this #23660 to reduce the code size.
Update Snapshot/Update -> CreateSyncMessage/ConsumeSyncMessage
Make ray syncer test work even when we add more components in the protobuf
Make ray syncer able to reconnect to a new node.
Several changes to make spread scheduling work better under load:
* When nodes are not available, spread among feasible nodes.
* If grant_or_reject is true, don't spill back if the selected node is not available.
* Don't spill due to waiting for dependencies for spread tasks.