Redo the agent-id changes from #24968. The original PR is in the first commit, the second commit fixes a fatal flaw when using RAY_BACKEND_LOG_LEVEL=debug, which caused the "Ray C++, Java" tests to fail on macOS.
Each node is supposed to report bytes used by its primary in-memory object copies. During spilling, this number is set incorrectly to 1 (see #26639 for more details).
This PR changes the reporting to include objects that are currently being spilled. It also splits out the "return nonzero number if there are local spilled objects" logic into a separate method.
Related issue number
Closes#26639.
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
The PR adds a new experimental flag to the placement group API to avoid placement group taking all cpus on each node. It is used internally by Air to avoid placement group (created by Tune) is using all CPU resources which are needed for dataset
Actor tasks are sometimes failed while their dependencies are still being resolved. This can cause hanging or crashes when we resolve the dependencies for a task that has already been canceled. It can lead to a crash from the ref counter when, for the same actor, actor task 1 depends on actor task 2. The sequence is:
Actor tasks 1 and 2 queued, 1 depends on 2.
Fail actor task 1. We clear its refs, including its dependency on 2.
Fail actor task 2. We store an error as its return value. Since task 1 depends on it, we inline the dependency and try to clear task 1's refs again, causing a ref counting error because we already cleared them in step 2.
This PR fixes the issue by canceling dependency resolution for tasks before failing them. This involves some refactoring of the LocalDependencyResolver. Most of the changes are for testing (split out the unit tests for LocalDependencyResolver into their own suite).
Related issue number
Closes#18908.
The job level ray_namespace will be set to task spec when creating an actor without an explicitly specifying namespace for actor. Therefore, in the gcs actor manager, the ray_namespace in task spec shouldn't be empty.
This PR remove the unreachable code path which is using to get the namespace from a local cache.
When Ray is under memory pressure, the pull manager might cancel ongoing pull request and retry it later. There is a race condition that a pull request is initiated and canceled, and the pull request for the same object is retried by pull manager shortly. When this happens, the pusher (where the object is being pulled) ignores the second pull request if it's still sending the object initiated by the first pull request; instead it will continue sending the remaining chunks. This leads to the puller receiving incomplete data chunks (as some chunks has already being received and then canceled), and the puller has to wait for 10 seconds timeout and retry the pull request.
To fix the problem we simply always resent all chunks when a pull request is received. Since we always send chunks in order, we implement the resend logic by simply reset the remaining number of chunks to send; and treat the chunks as ring buffer.
CheckAlive in GCS is only for checking GCS's liveness. But we also need to check the liveness for raylet.
In KubeRay, we can check the liveness directly by monitoring the raylet's liveness. But it's not good enough given that raylet's process liveness is not directly related to raylet's liveness.
For example, during a network partition, raylet is not able to connect to GCS. GCS mark raylet as dead. So for the cluster, although raylet process is still alive, it can't be treated as alive because GCS has told all the nodes that it's dead.
So for KubeRay, it also needs to talk with GCS to check whether it's alive or not.
This PR extends the CheckAlive API to include raylet address so that we can query GCS to get the cluster status directly.
I'm seeing these errors
(raylet, ip=172.31.58.175) [2022-06-28 03:48:42,324 E 702775 702805] (raylet) file_system_monitor.cc:105: /mnt/data0/ray is over 0.95% full, available space: 50637901824. Object creation will fail if spilling is required.
They should be 95% instead of 0.95%.
To enable one storage be able to be shared by multiple ray clusters, a special prefix is added to isolate the data between clusters: "<EXTERNAL_STORAGE_NAMESPACE>@"
The namespace is given by an os environment: `RAY_external_storage_namespace` when start the head: `RAY_external_storage_namespace=1234 ray start --head`
This flag is very important in HA GCS environment. For example, in ray serve operator, when the operator tries to bring up a new one, it's hard to just start a new db, but it's relatively easy to generate a new cluster id.
Another example is that, the user might only be able to maintain one HA Redis DB, and the namespace enable the user to start multiple ray clusters which share the same db.
This config should be moved to storage config in the future once we build that.
This PR adds supported for specifying an exception allowlist (List[Exception]) as the retry_exceptions argument, such that an application-level exception will only be retried if it is in the allowlist.
## Why are these changes needed?
1. Now, bundle resources are deducted from the cluster resources on the `GCS` side when all Commit requests sent by `GCS` to `Raylet` are returned. Actually, the bundle resources should be deducted before sending `PreprareResources` by `GCS` to `Raylet`, so that the scheduling of actor based on `GCS` could use more fresh resources. BTW, putting the deduction before `PrepareResources` or after reply of all `CommitResources` has no impact on `Raylet` scheduling.
2. The `GcsResourceManager::UpdateResources` and `GcsResourceManager::DeleteResources` could be deleted to simplify `GcsResourceManager`.
- `GcsResourceManager::UpdateResources` is only used when `GcsPlacementGroupScheduler::CommitAllBundles`, we could update the node resources (commit bundle resources) in `GcsPlacementGroupScheduler` directly, and I think it's unnecessary to put these resources to storage (the resources could be replayed by placement group)
- `GcsResourceManager::DeleteResources` is only used when `GcsPlacementGroupScheduler::CancelResourceReserve` which is invoked by `GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources` and `GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources`. in fact, the `GcsPlacementGroupScheduler::ReturnBundleResources` will be called wherever these two functions are used, so I think the `GcsResourceManager::DeleteResources` is redundant. BTW, I think it's unnecessary to put the change of resources to storage (the resources could be replayed by placement group).
3. The `gcs_table_storage_` is useless as both `GcsResourceManager::UpdateResources` and `GcsResourceManager::DeleteResources` is removed, so it could be removed too.
4. The `ray_gcs_new_resource_creation_latency_ms_sum` could be removed too as the `GcsResourceManager::UpdateResources` is removed.
Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
Add an API to get the node id of this worker, see usage:
```java
UniqueId currNodeId = Ray.getRuntimeContext().getCurrentNodeId();
```
for the requirement from Ray Serve.
## Why are these changes needed?
This PR adds data truncation when there are more than N number of entries. The policy is as follow;
By default, we return 100 entries at max. Users can adjust this value, but we won't allow to increase more than 10K.
By default, all internal RPCs truncate data if it's > 10K.
For distributed sources, we query each source with 10K limit and we apply limit again at the end.
## Related issue number
Closes https://github.com/ray-project/ray/issues/25984#issue-1279280673
Part of https://github.com/ray-project/ray/issues/25718#issue-1268968400
Allow you start actors in different namespace instead of the driver namespace.
Usage is simple:
```java
Ray.init(namespace="a");
/// Named actor a will starts in namespace `b`
ActorHandle<A> a = Ray.actor(A::new).setName("myActor", "b").remote();
```
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Why are these changes needed?
This is to address false alarms on subprocesses exiting when killed by ray stop with SIGTERM.
What has been changed?
Added signal handlers for some of the subprocesses:
dashboard (head)
log monitor
ray client server
Changed the --block semantics and prompt messages.
Related issue number
Closes#25518
detached java actor is not working(actor will be dead after driver exit) when creating a java actor with ActorLifetime.DETACHED option
Co-authored-by: sunkunjian1 <sunkunjian1@jd.com>
rkooo567
Member
rkooo567 commented 2 days ago
Why are these changes needed?
Fixes the check failure;
| 2022-06-21 19:14:10,718 WARNING worker.py:1737 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ffffffffffffffff7cc1d49b6d4812ea954ca19a01000000 Worker ID: 9fb0f63d84689c6a9e5257309a6346170c827aa7f970c0ee45e79a8b Node ID: 2d493b4f39f0c382a5dc28137ba73af78b0327696117e9981bd2425c Worker IP address: 172.18.0.3 Worker port: 35883 Worker PID: 31945 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
| (HTTPProxyActor pid=31945) [2022-06-21 19:14:10,710 C 31945 31971] pb_util.h:202: Check failed: death_cause.context_case() == ContextCase::kActorDiedErrorContext
| (HTTPProxyActor pid=31945) *** StackTrace Information ***
| (HTTPProxyActor pid=31945) ray::SpdLogMessage::Flush()
| (HTTPProxyActor pid=31945) ray::RayLog::~RayLog()
| (HTTPProxyActor pid=31945) ray::core::CoreWorker::HandleKillActor()
| (HTTPProxyActor pid=31945) std::_Function_handler<>::_M_invoke()
| (HTTPProxyActor pid=31945) EventTracker::RecordExecution()
| (HTTPProxyActor pid=31945) std::_Function_handler<>::_M_invoke()
| (HTTPProxyActor pid=31945) boost::asio::detail::completion_handler<>::do_complete()
| (HTTPProxyActor pid=31945) boost::asio::detail::scheduler::do_run_one()
| (HTTPProxyActor pid=31945) boost::asio::detail::scheduler::run()
| (HTTPProxyActor pid=31945) boost::asio::io_context::run()
| (HTTPProxyActor pid=31945) ray::core::CoreWorker::RunIOService()
| (HTTPProxyActor pid=31945) execute_native_thread_routine
| (HTTPProxyActor pid=31945)
| (HTTPProxyActor pid=31982) INFO: Started server process [31982]
NOTE: This is a temporary fix. The root cause is that there's a path that doesn't properly report the death cause (when this RPC is triggered by gcs_actor_scheduler). This should be addressed separately to improve exit observability.
Since this is intended to be picked for 1.13.1, I only added the minimal fix.
In [PR](https://github.com/ray-project/ray/pull/24764) we move the reconnection to GcsRPCClient. In case of a GCS failure, we'll queue the requests and resent them once GCS is back.
This actually breaks request with timeout because now, the request will be queued and never got a response. This PR fixed it.
For all requests, it'll be stored by the time it's supposed to be timeout. When GCS is down, we'll check the queued requests and make sure if it's timeout, we'll reply immediately with a Timeout error message.
Ray (on K8s) fails silently when running out of disk space.
Today, when running a script that has a large amount of object spilling, if the disk runs out of space then Kubernetes will silently terminate the node. Autoscaling will kick in and replace the dead node. There is no indication that there was a failure due to disk space.
Instead, we should fail tasks with a good error message when the disk is full.
We monitor the disk usage, when node disk usage grows over the predefined capacity (like 90%), we fail new task/actor/object put that allocates new objects.
Task/actor/object summary
Tasks: Group by the func name. In the future, we will also allow to group by task_group.
Actors: Group by actor class name. In the future, we will also allow to group by actor_group.
Object: Group by callsite. In the future, we will allow to group by reference type or task state.
Enable checking of the ray core module, excluding serve, workflows, and tune, in ./ci/lint/check_api_annotations.py. This required moving many files to ray._private and associated fixes.
This PR ensures that GCS keeps the IP and PID information about a job so that it can be used to find the job's logs in the dashboard after the job terminates.
@alanwguo will handle any dashboard work in a separate PR.
Co-authored-by: Alex Wu <alex@anyscale.com>
When objects that are spilled or being spilled are freed, we queue a request to delete them. We only clear this queue when additional objects get freed, so GC for spilled objects can fall behind when there is a lot of concurrent spilling and deletion (as in Datasets push-based shuffle). This PR fixes this to clear the deletion queue if the backlog is greater than the configured batch size. Also fixes some accounting bugs for how many bytes are pinned vs pending spill.
Related issue number
Closes#25467.
When GCS restarts, it'll recover the placement group and make sure no resource is leaking. The protocol now is like:
- Sending the committed PGs to raylets
- Raylets will check whether any worker is using resources from the PG not in this group
- If there is any, it'll kill that worker.
Right now there is a bug, which will kill the worker using bundle index equals -1.
## Why are these changes needed?
When schedule actors on pg, instead of iterating all nodes in the cluster resource, This optimize will directly queries corresponding nodes by looking at pg location index.
This optimization can reduce the complexity of the algorithm from O (N) to o (1),and N is the number of nodes. In particular, the more nodes in large-scale clusters, the better the optimization effect.
**This PR only optimize schedule by gcs, I will submit a PR for raylet scheduling later.**
In ant group, Now we have achieved the optimization in the GCS scheduling mode and obtained the following performance test results.
1、The average time of selecting nodes is reduced from 330us to 30us, and the performance is improved by about 11 times.
2、The total time of creating & executing 12,000 actors ranges from 271 (s) - > 225 (s) on average. Reduce time consumption by 17%.
More detailed solution information is in the issue.
## Related issue number
[Core/PG/Schedule]Optimize the scheduling performance of actors/tasks with PG specified #23881
Ray's gRPC server wrapper configures a max active call setting for each handler. When the max active call is -1, the handler is supposed to allow handling unlimited number of requests concurrently. However in practice it is often observed that handlers configured with unlimited active calls are still handling at most 100 requests concurrently.
This is a result of the existing logic:
At a high level, each gRPC method is associated with a number of ServerCall objects (acting as "tags") in the gRPC completion queue. When there is no tag for a method, gRPC server thread will not be able to poll requests from the method call from the completion queue. After a request is polled from the completion queue, it is processed by the polling gRPC server thread, then queued to an eventloop.
When a handler is in the "unlimited" mode, it creates when a new ServerCall object (tag) before actual processing. The problem is that new ServerCalls are created on the eventloop instead of the gRPC server thread. When the event loop runs a callback from the gRPC server, the callback creates a new ServerCall object, and can run the gRPC handler to completion if the handler does not have any async step. So overall, the event loop will not run more callbacks than the initial number of ServerCalls, which is 100 in the "unlimited" mode.
The solution is to create a new ServerCall in the gRPC server thread, before sending the ServerCall to the eventloop.
Running some night tests to verify the fix does not introduce instabilities: https://buildkite.com/ray-project/release-tests-branch/builds/652
Also, looking into adding gRPC server / client stress tests with large number of concurrent requests.
This is the PR to implement ray log to the server side. The PR is continued from #24068.
The PR supports two endpoints;
/api/v0/logs # list logs of the node id filtered by the given glob.
/api/v0/logs/{[file | stream]}?filename&pid&actor_id&task_id&interval&lines # Stream the requested file log. The filename can be inferred by pid/actor_id/task_id
Some tests need to be re-written, I will do it soon.
As a follow-up after this PR, there will be 2 PRs.
PR to add actual CLI
PR to remove in-memory cached logs and do on-demand query for actor/worker logs
We removed the thread local core worker instance in this PR, which is the further arch cleaning stuff for removing multiple workers in one process.
It also removes the unnecessary parameter `workerId` from JNI.