This PR is a pre-work before actually fixing a thread-safety bug within shutdown.
It is doing
- Add better logging upon core worker shutdown.
- Improve document around core worker shutdown.
- Remove unnecessary pointer usage from periodical runner for clean destruction order.
- Remove unnecessary `WaitForShutdown` API and combine them into a single `Shutdown` API.
When a node is dead, reference table should remove locations for those objects on the node. Otherwise locality-aware scheduling will schedule tasks to the dead node.
GCS, when running as an individual component, can cause other components to fail in case of crashes.
Here are two main cases covered in this patch:
1. monitor.py will raise an exception when disconnected from GCS.
2. When GCS becomes available later than other components, the missing KV of GCS address can cause other components to fail to start.
In our patch, we fixed these two issues as well as increased the timeout for redis connection which was too small.
Co-authored-by: Mingwei Tian <mwtian@anyscale.com>
An object can get created/pinned twice if the original worker fails mid-task, or when lineage reconstruction is enabled. This can cause inconsistencies in the LocalObjectManager if the second creation races with object spilling and/or object free. For example:
1. Object X get created, then is pending spill.
2. Object X is freed by original owner because it goes out of scope.
3. Task that created X gets re-executed due to failure.
4. Task recreates X, which can now get spilled again while the original copy is also being spilled/freed.
This PR better enforces the state machine for objects managed by the LocalObjectManager. An object can be either: pinned, pending spill, or spilled. If we receive a free message from the owner, we do not delete the object metadata until all shared-memory and spilled copies of the object are removed.
Use a separate event loop for pubsub work, to provide some isolation from other workload. There is no benchmark result but the downside, if there is any, should not be large.
This PR changes the enum value `ActorLifetime.DEFAULT` to `ActorLifetime.NON_DETACHED`. In our release versions, `ActorLifetime` was not introduced <= 1.9.2
Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
Previously, ref arg is handled wrongly, serializing the object ref, instead of RayObject to be passed as args buffer to the user function.
That's because CoreWorker is the component responsible for ensuring that all ObjectReferences are resolved and serialized into `RayObject`s at the time of the `task_execution_callback` invocation, not any component downstream of the callback.
This resulted in the following error for large objects which are not turned into `TaskArg::value` due to being over 100KB.
```
C++ exception with description "Invalid: invalid arguments: std::bad_cast" thrown in the test body.
```
This was not caught due to lack of testing for large objects, which has now been added.
When cleanup the function table, we use the prefix to delete the data. But right now prefix contains binary data and it won't work well with redis keys/scan which use `*` in the pattern.
For example, when job id increases to 41, it'll delete the keys for job 1 which leads to the new worker failing to import the function.
This PR uses hex of job id to avoid this.
Why are these changes needed?
fix dlmalloc allocate bug, details in here #21310
* fix dlmalloc bug
* make lint happy
* make lint happy
* fix by comment
* use _check_spilled_mb
* add cpp UT
If a task is re-executed on failure, it will deterministically generate the same IDs for any ray.put or .remote task calls because it uses its own task ID as a seed. This can cause problems if those objects conflict with previous versions that still exist in the cluster.
This PR adds the execution attempt number to the current task ID seed. This avoids collisions with any ObjectIDs generated by the previous execution attempt of the task.
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
In Ray, functions are exported to the function table during runtime. But it's not cleaned up after use. This PR garbage collects the resource when there is no job/detached actor referencing the resource.
Ideally, we should move the function table imports/exports feature to core, so gcs function manager is introduced, and currently, it's for reference counting only.
External Redis should still be supported with GCS bootstrapping, to avoid breaking users.
In GCS mode, some logic are removed for external Redis:
- Printing external Redis addresses to terminal: hard to implement across `ray start`, `ray.init()` and Ray cluster util.
- Starting local Redis if external Redis is unavailable: failing loudly here seems more appropriate.
Also, re-enable a few tests which restarts GCS in GCS bootstrapping mode, by using external Redis for KV storage.
In some cases, the task that's added to the `running_tasks` is never removed and introduces wait time for all the following tasks due to worker cap. One such case is lease request cancellation: the request is cancelled after `PopWorker` is called and the task is never removed from `running_tasks`.
I tried reproducing the many pg mini integration failure from this PR; https://github.com/ray-project/ray/pull/21216, but I failed to do that. (this was the only test that became flaky when we turned on the flag last time).
I tried
- Run tests:test_placement_group_mini_integration 5 times instead of 3 (the default)
- Re-run the PR 3 times.
So I think it is worth trying re-enabling it again.
CoreWorker hangs there before exiting if gcs exits first due to in correct ordering of destruction. This PR fixed this. It'll stop gcs client first and then job the thread.
This PR moves the internal kv namespace logic into cpp to reduce logic in python for the following reasons:
- internal kv is used in x-lang so we have to move it to cpp so that all langs can benefit.
- for https://github.com/ray-project/ray/issues/8822 we need to delete resource when job finished in gcs
One extra field about del is also added so that when delete, we are able to delete by prefix instead of just a key
This PR turns worker capping on by default. Note that there are a couple of faulty tests that this uncovers which are fixed here.
Co-authored-by: Alex Wu <alex@anyscale.com>
- This PR moves the `ObjectManager::Wait` related logic to a separate WaitManager class.
- Fix the wait hang issue by not relying on the async object location notification, but checking if wait is complete when the local object is added, at that time the object is guaranteed to be local.
We need to get not only ray_namespace config of a job. In this PR, we cache the job_configs instead of ray_namespaces, so that we can use it for other PR(For example, this PR #21249 needs the num_java_worker_pre_process item).
Also, before this PR, ray_namespaces_ cache will not be cleared, and we clear the cache in this PR.
`PublisherClient` is a more reasonable name than `SubscriberClient` since XClient means ‘client used to access X’, like GcsClient.
Besides, in the current codebase we already called this client `publisher_client`(line 329/333), while the actual class name is `SubscriberClient`, this is inconsistent.
a8d7897a56/src/ray/pubsub/subscriber.cc (L326-L339)
This PR is added to handle this comment; https://github.com/ray-project/ray/pull/20903#discussion_r772635662
The PR
- Unifies the multiple actor died error to a single schema. (cannot unify runtime env or creation task exception)
- Improve each of actor error message to include more metadata.
- Include actor information to actor death cause.
After this change in GCS bootstrapping mode, Redis no longer starts and `address` is treated as the GCS address of the Ray cluster.
Co-authored-by: Yi Cheng <chengyidna@gmail.com>
Co-authored-by: Yi Cheng <74173148+iycheng@users.noreply.github.com>
It seems that the `StealTasks` rpc has no different from other common rpc methods, should be implemented by `VOID_RPC_CLIENT_METHOD` macro. We find this when merge code into our internal codebase.
We add a enum class ActorLifetime to indicate the lifetime of an actor. In this PR, we also add the necessary API to create an actor with specifying lifetime.
Currently, it has 2 values: detached and default.
These dependencies are widely used:
- com.google.common
- com.google.protobuf
- com.google.thirdparty
So that we need to shade them to avoid being conflict with jars introduced by user.
In this PR, we introduce a `bazel_jar_jar` rule for doing these and also shade them in maven pom files.
This is part of #21129
This PR tries to cover the cpp/ray part of the bootstrap, some updates there:
remove the unused function/tests
some API updates
Co-authored-by: mwtian <81660174+mwtian@users.noreply.github.com>