In some cases, the task that's added to the `running_tasks` is never removed and introduces wait time for all the following tasks due to worker cap. One such case is lease request cancellation: the request is cancelled after `PopWorker` is called and the task is never removed from `running_tasks`.
I tried reproducing the many pg mini integration failure from this PR; https://github.com/ray-project/ray/pull/21216, but I failed to do that. (this was the only test that became flaky when we turned on the flag last time).
I tried
- Run tests:test_placement_group_mini_integration 5 times instead of 3 (the default)
- Re-run the PR 3 times.
So I think it is worth trying re-enabling it again.
CoreWorker hangs there before exiting if gcs exits first due to in correct ordering of destruction. This PR fixed this. It'll stop gcs client first and then job the thread.
This PR moves the internal kv namespace logic into cpp to reduce logic in python for the following reasons:
- internal kv is used in x-lang so we have to move it to cpp so that all langs can benefit.
- for https://github.com/ray-project/ray/issues/8822 we need to delete resource when job finished in gcs
One extra field about del is also added so that when delete, we are able to delete by prefix instead of just a key
This PR turns worker capping on by default. Note that there are a couple of faulty tests that this uncovers which are fixed here.
Co-authored-by: Alex Wu <alex@anyscale.com>
- This PR moves the `ObjectManager::Wait` related logic to a separate WaitManager class.
- Fix the wait hang issue by not relying on the async object location notification, but checking if wait is complete when the local object is added, at that time the object is guaranteed to be local.
We need to get not only ray_namespace config of a job. In this PR, we cache the job_configs instead of ray_namespaces, so that we can use it for other PR(For example, this PR #21249 needs the num_java_worker_pre_process item).
Also, before this PR, ray_namespaces_ cache will not be cleared, and we clear the cache in this PR.
`PublisherClient` is a more reasonable name than `SubscriberClient` since XClient means ‘client used to access X’, like GcsClient.
Besides, in the current codebase we already called this client `publisher_client`(line 329/333), while the actual class name is `SubscriberClient`, this is inconsistent.
a8d7897a56/src/ray/pubsub/subscriber.cc (L326-L339)
This PR is added to handle this comment; https://github.com/ray-project/ray/pull/20903#discussion_r772635662
The PR
- Unifies the multiple actor died error to a single schema. (cannot unify runtime env or creation task exception)
- Improve each of actor error message to include more metadata.
- Include actor information to actor death cause.
After this change in GCS bootstrapping mode, Redis no longer starts and `address` is treated as the GCS address of the Ray cluster.
Co-authored-by: Yi Cheng <chengyidna@gmail.com>
Co-authored-by: Yi Cheng <74173148+iycheng@users.noreply.github.com>
It seems that the `StealTasks` rpc has no different from other common rpc methods, should be implemented by `VOID_RPC_CLIENT_METHOD` macro. We find this when merge code into our internal codebase.
We add a enum class ActorLifetime to indicate the lifetime of an actor. In this PR, we also add the necessary API to create an actor with specifying lifetime.
Currently, it has 2 values: detached and default.
These dependencies are widely used:
- com.google.common
- com.google.protobuf
- com.google.thirdparty
So that we need to shade them to avoid being conflict with jars introduced by user.
In this PR, we introduce a `bazel_jar_jar` rule for doing these and also shade them in maven pom files.
This is part of #21129
This PR tries to cover the cpp/ray part of the bootstrap, some updates there:
remove the unused function/tests
some API updates
Co-authored-by: mwtian <81660174+mwtian@users.noreply.github.com>
Currently, the logic of uri reference in raylet is:
- For job level, add uri reference when job started and remove uri reference when job finished.
- For actor level, add and remove uri reference for detached actor only.
In this PR, the logic is optimized to:
- For job level, check if runtime env should be installed eagerly first. If true, add or remove uri reference.
- For actor level
* First, add uri reference for starting worker process to avoid that runtime env is gcd before worker registered.
* Second, add uri reference for echo worker thread of worker process. We will remove reference when worker disconnected.
- Besides, we move the instance of `RuntimeEnvManager` from `node_manager` to `worker_pool`.
- Enable the test `test_actor_level_gc` and add some tests in python and worker pool test.
GcsClient accepts only redis before. To make it work without redis, we need to be able to pass gcs address to gcs client as well.
In this PR, we add GCS related into into GcsClientOptions so that we can connect to the gcs directly with gcs address.
This PR is part of GCS bootstrap. In the following PR, we'll add functionality to set the correct GcsClientOptions based on flags.
The current resource reporting is run in OSS. Revert the change. For example it reported
InitialConfigResources: {node:172.31.45.118: 1.000000}, {object_store_memory: 468605759.960938 GiB},
For 10GB memory object_store.
This PR implements gRPC timeout for various blocking RPCs.
Previously, the timeout with promise didn't work properly because the client didn't cancel the timed out RPCs. This PR will properly implement RPC timeout.
This PR supports;
- Blocking RPCs for core APIs, creating / getting / removing actor + pg.
- Internal KV ops
The global state accessor also has the infinite blocking calls which we need to fix. But fixing them requires a huge refactoring, so this will be done in a separate PR.
Same for the placement group calls (they will be done in a separate PR)
Also, this means we can have scenario such as the client receives the DEADLINE EXCEEDED error, but the handler is invoked. Right now, this is not handled correctly in Ray. We should start thinking about how to handle these scenarios better.
Uses a direct `pip install` instead of creating a conda env to make pip installs incremental to the cluster environment.
Separates the handling of `pip` and `conda` dependencies.
The new `pip` approach still works if only the base Ray is installed on the cluster and the user specifies libraries like "ray[serve]" in the `pip` field. The mechanism is as follows:
- We don't actually want to reinstall ray via pip, since this could lead to version mismatch issues. Instead, we want to use the Ray that's already installed in the cluster.
- So if "ray" was included by the user in the pip list, remove it
- If a library "ray[serve]" or "ray[tune, rllib]" was included in the pip list, remove it and replace it by its dependencies (e.g. "uvicorn", "requests", ..)
Co-authored-by: architkulkarni <arkulkar@gmail.com>
Co-authored-by: architkulkarni <architkulkarni@users.noreply.github.com>
Before this PR, GcsActorManager::CreateActor() would replace actor's namespace by
actors's owner job's namespace, even if actor is created by user with a user specified
namespace. But in named_actors_, actor is set to use user specified namespace by
GcsActorManager::RegisterActor before CreateActor() is called, So that
GcsActorManager::DestroyActor failed to find actor from named_actors_ by owner job's
namespace to remove, hence reuse actor name in same namespace failed for same name actor
not removed by GcsActorManager::DestroyActor in named_actors_.
issue #20611
Currently, when the GCS RPC failed with gRPC unavailable error because the GCS is dead, it will retry forever.
b3a9d4d87d/src/ray/rpc/gcs_server/gcs_rpc_client.h (L57)
And it takes about 10 minutes to detect the GCS server failure, meaning if GCS is dead, users will notice in 10 minutes.
This can easily cause confusion that the cluster is hanging (since users are not that patient). Also, since GCS is not fault tolerant in OSS now, 10 minutes are too long timeout to detect GCS death.
This PR changes the value to 60 seconds, which I believe is much more reasonable (since this is the same value as our blocking RPC call timeout).
Resubmit the PR https://github.com/ray-project/ray/pull/19936
I've figure out that the test case `//rllib:tests/test_gpus::test_gpus_in_local_mode` failed due to deadlock in local mode.
In local mode, if the user code submits another task during the executing of current task, the `CoreWorker::actor_task_mutex_` may cause deadlock.
The solution is quite simple, release the lock before executing task in local mode.
In the commit 7c2f61c76c:
1. Release the lock in local mode to fix the bug. @scv119
2. `test_local_mode_deadlock` added to cover the case. @rkooo567
3. Left a trivial change in `rllib/tests/test_gpus.py` to make the `RAY_CI_RLLIB_DIRECTLY_AFFECTED ` to take effect.
PR #19014 introduced the idea of a StartupToken to uniquely identify a worker via a counter. This PR:
- returns the Process and the StartupToken from StartWorkerProcess (previously only Process was returned)
- Change the starting_workers_to_tasks map to index via the StartupToken, which seems to fix the windows failures.
- Unskip the windows tests in test_basic_2.py
It seems once a fix to PR #18167 goes in, the starting_workers_to_tasks map will be removed, which should remove the need for the changes to StartWorkerProcess made in this PR.
Currently if local lease request fails due to raylet death, direct_task_transport.cc will retry forever for driver.
With this PR, we treat grpc unavailable as non-retryable error (the assumption is that local grpc is always reliable and grpc unavailable error indicates that server is dead) and will just fail the task.
Note: this PR doesn't try to address a bigger problem: don't crash driver when local raylet dies. We have multiple places in the code that assumes the local raylet never fail and have CHECK_STATUS_OK for that. All these places need to be changed so we can properly propagate failures to the user.
We erase the elements from object_id_refs_ in the method `RemoveLocalReferenceInternal()` which may cause iterator invalidation issue.
Note that, normally flatmap will not trigger any iterator invalidation except triggering `rehash()`. But in this case, we may remove other elements(not only the current iterator), so there is still a risk of it.
A worker can crash right after putting its return values into the object store. Then, the owner will receive the worker crashed error, but the return objects will still be in the remote object store. Later, if the task is retried, the worker will crash on [this line](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/direct_actor_transport.cc#L105) because the object already exists.
Another way this can happen is if a task has multiple return values, and one of those return values is transferred to another node. If the task is later re-executed on that node, the task will fail because of the same error.
This PR fixes the crash so that:
1. If an object already exists, we try to pin that copy. Ideally, we should destroy the old copy and create the new one to make sure that metadata like the owner address is in sync, but this is pretty complicated to do right now.
2. If the pinning fails, we store an OBJECT_LOST error to throw to the application.
3. On the raylet, we check whether we already have the object pinned, and only subscribe to the owner's eviction message if the object is not pinned.
4. Also fixes bugs in the analogous case for `ray.put` (previously this would hang, now the application will receive an error if a `ray.put` object already exists).
This is part of redis removal. In this PR, if `RAY_gcs_storage=memory`, it'll use memory table instead of redis table.
The config setup has to be moved into GcsServer because with the memory table it's transistent.
For actor channel, GCS clients subscribe to a single actor but dashboard subscribes to all actors. This change makes supporting this possible.
Most of the added code is in `integration_test.cc`, which tests the publisher and subscriber together.
Also, add the basic support for dashboard reporter pubsub.
Share the same code for RecordMetrics & DebugString for cluster task manager.
Both requires almost identical (and also expensive) operation. This PR makes them share the same `UpdateState` code which stores stats in the struct.
Note that we don't update state when metrics are recorded because the debug string is anyway consistently called and states are updated.
Ideally, we should dynamically update the stats.