Support the ability to specify a default lifetime for actors which are not specified lifetime when creating. This is a job level configuration item.
#### API Change
The Python API looks like:
```python
ray.init(job_config=JobConfig(default_actor_lifetime="detached"))
```
Java API looks like:
```java
System.setProperty("ray.job.default-actor-lifetime", defaultActorLifetime.name());
Ray.init();
```
One example usage is:
```python
ray.init(job_config=JobConfig(default_actor_lifetime="detached"))
a1 = A.options(lifetime="non_detached").remote() # a1 is a non-detached actor.
a2 = A.remote() # a2 is a non-detached actor.
```
Co-authored-by: Kai Yang <kfstorm@outlook.com>
Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
C++ API need to call python and java worker, this pr support call python worker. Call python worker is similar with call c++ worker, need to pass PyFunction, PyActorClass and PyActorMethod.
## call python normal task
```python
#test_cross_language_invocation.py
import ray
@ray.remote
def py_return_input(v):
return v
```
c++ api call python function
```c++
auto py_obj1 = ray::Task(ray::PyFunction</*ReturnType*/int>{/*module_name=*/"test_cross_language_invocation",
/*function_name=*/"py_return_input"})
.Remote(42);
EXPECT_EQ(42, *py_obj1.Get());
```
The user need to fill python module name and function name, then pass arguments into the remote.
The user also need to assign the return type and arguments types of the python function, it used to do static safe checking and get result.
## call python actor task
```python
#test_cross_language_invocation.py
@ray.remote
class Counter(object):
def __init__(self, value):
self.value = int(value)
def increase(self, delta):
self.value += int(delta)
return str(self.value)
```
c++ api call python actor function
```c++
// Create python actor
auto py_actor_handle =
ray::Actor(ray::PyActorClass{/*module_name=*/"test_cross_language_invocation", /*class_name=*/"Counter"})
.Remote(1);
EXPECT_TRUE(!py_actor_handle.ID().empty());
// Call python actor task
auto py_actor_ret =
py_actor_handle.Task(ray::PyActorMethod</*ReturnType*/std::string>{/*actor_function_name=*/"increase"}).Remote(1);
EXPECT_EQ("2", *py_actor_ret.Get());
```
The user need to fill python module name and class name when creating python actor.
PyActorMethod only need to fill the function name.
It's also similar with calling c++ actor task, also has compile-time safe checking.
Previously, ref arg is handled wrongly, serializing the object ref, instead of RayObject to be passed as args buffer to the user function.
That's because CoreWorker is the component responsible for ensuring that all ObjectReferences are resolved and serialized into `RayObject`s at the time of the `task_execution_callback` invocation, not any component downstream of the callback.
This resulted in the following error for large objects which are not turned into `TaskArg::value` due to being over 100KB.
```
C++ exception with description "Invalid: invalid arguments: std::bad_cast" thrown in the test body.
```
This was not caught due to lack of testing for large objects, which has now been added.
This PR moves the internal kv namespace logic into cpp to reduce logic in python for the following reasons:
- internal kv is used in x-lang so we have to move it to cpp so that all langs can benefit.
- for https://github.com/ray-project/ray/issues/8822 we need to delete resource when job finished in gcs
One extra field about del is also added so that when delete, we are able to delete by prefix instead of just a key
In some compiler, the static ray runtime in ray runtime holder maybe a new un-init instance in dynamic library,
so we need to init ray time holder in dynamic library to make sure the new instance valid.
GcsClient accepts only redis before. To make it work without redis, we need to be able to pass gcs address to gcs client as well.
In this PR, we add GCS related into into GcsClientOptions so that we can connect to the gcs directly with gcs address.
This PR is part of GCS bootstrap. In the following PR, we'll add functionality to set the correct GcsClientOptions based on flags.
Resubmit the PR https://github.com/ray-project/ray/pull/19936
I've figure out that the test case `//rllib:tests/test_gpus::test_gpus_in_local_mode` failed due to deadlock in local mode.
In local mode, if the user code submits another task during the executing of current task, the `CoreWorker::actor_task_mutex_` may cause deadlock.
The solution is quite simple, release the lock before executing task in local mode.
In the commit 7c2f61c76c:
1. Release the lock in local mode to fix the bug. @scv119
2. `test_local_mode_deadlock` added to cover the case. @rkooo567
3. Left a trivial change in `rllib/tests/test_gpus.py` to make the `RAY_CI_RLLIB_DIRECTLY_AFFECTED ` to take effect.
A worker can crash right after putting its return values into the object store. Then, the owner will receive the worker crashed error, but the return objects will still be in the remote object store. Later, if the task is retried, the worker will crash on [this line](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/direct_actor_transport.cc#L105) because the object already exists.
Another way this can happen is if a task has multiple return values, and one of those return values is transferred to another node. If the task is later re-executed on that node, the task will fail because of the same error.
This PR fixes the crash so that:
1. If an object already exists, we try to pin that copy. Ideally, we should destroy the old copy and create the new one to make sure that metadata like the owner address is in sync, but this is pretty complicated to do right now.
2. If the pinning fails, we store an OBJECT_LOST error to throw to the application.
3. On the raylet, we check whether we already have the object pinned, and only subscribe to the owner's eviction message if the object is not pinned.
4. Also fixes bugs in the analogous case for `ray.put` (previously this would hang, now the application will receive an error if a `ray.put` object already exists).
This PR adds per task/actor scheduling strategy and currently the only strategy are PlacementGroupSchedulingStrategy and DefaultSchedulingStrategy.
Going forward, people should use `scheduling_strategy=PlacementGroupSchedulingStrategy` to define placement group for actor/task. The old way will be deprecated.
## Why are these changes needed?
This is a part of redis removal. This PR remove redis kv in function table.
rpush related code is not updated in this PR.
## Related issue number
This PR removes global named actor and global PGs.
I believe these APIs are not used widely in OSS.
CPP part is not included in this PR.
@kfstorm @clay4444 @raulchen Please take a look if this change is reasonable.
IMPORTANT NOTE: This is a Java API change and will lead backward incompatibility in Java global named actor and global PG usage.
CPP part is not included in this PR.
INCLUDES:
Remove setGlobalName() and getGlobalActor() APIs.
Remove getGlobalPlacementGroup() and setGlobalPG
Add getActor(name, namespace) API
Add getPlacementGroup(name, namespace) API
Update doc pages.
## Why are these changes needed?
This is part of redis removal project. In this PR all direct usage of redis got removed except function table.
Function table will be migrated in the next PR
## Related issue number
#19443
## Why are these changes needed?
## Related issue number
Final part of #13984
## Checks
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [x] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(
## Why are these changes needed?
Currently, when `WorkerContext::GetCurrentTaskID()` returns a random task ID in user-created threads, and the returned task ID doesn't include the job ID. In this case, subsequent non-actor tasks and return values, and objects created by `ray.put()` don't include the job ID neither. This makes us hard to find the correct job ID from a task or object ID.
This PR updates the task ID generation code to always encode the job ID.
A side-effect of this PR is the change of possibility of task ID collision in user-created threads due to the fixed job ID part. w/o this PR: `sqrt(pi * 256 ^ 12 / 2)` ~= 352 trillion tasks. w/ this PR: `sqrt(pi * 256 ^ 8 / 2)` ~= 5 billion tasks. But this should be OK because the job ID part of task IDs in non-user-created threads are always fixed, so it won't be worse than non-user-created threads.
## Related issue number
## Checks
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [ ] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(
## Why are these changes needed?
This PR aims to port concurrency groups functionality with asyncio for Python.
### API
```python
@ray.remote(concurrency_groups={"io": 2, "compute": 4})
class AsyncActor:
def __init__(self):
pass
@ray.method(concurrency_group="io")
async def f1(self):
pass
@ray.method(concurrency_group="io")
def f2(self):
pass
@ray.method(concurrency_group="compute")
def f3(self):
pass
@ray.method(concurrency_group="compute")
def f4(self):
pass
def f5(self):
pass
```
The annotation above the actor class `AsyncActor` defines this actor will have 2 concurrency groups and defines their max concurrencies, and it has a default concurrency group. Every concurrency group has an async eventloop and a pythread to execute the methods which is defined on them.
Method `f1` will be invoked in the `io` concurrency group. `f2` in `io`, `f3` in `compute` and etc.
TO BE NOTICED, `f5` and `__init__` will be invoked in the default concurrency.
The following method `f2` will be invoked in the concurrency group `compute` since the dynamic specifying has a higher priority.
```python
a.f2.options(concurrency_group="compute").remote()
```
### Implementation
The straightforward implementation details are:
- Before we only have 1 eventloop binding 1 pythread for an asyncio actor. Now we create 1 eventloop binding 1 pythread for every concurrency group of the asyncio actor.
- Before we have 1 fiber state for every caller in the asyncio actor. Now we create a FiberStateManager for every caller in the asyncio actor. And the FiberStateManager manages the fiber states for concurrency groups.
## Related issue number
#16047
* Update build rules and patches for darwin_arm64 platform.
Changes include:
Update nelhage/rules_boost package from current version (08/5/2020) to 5/27/2021 version.
Remove rules_boost-undefine-boost_fallthrough.patch, since BOOST_FALLTHROUGH seems to be defined now.
Minor changes to rules_boost-windows-linkopts.patch to use default condition to add -lpthread flag for all platforms.
Add darwin_arm64 config to BUILD files for lib civetweb pulled in via prometheu dependency.
* upgrade boost to 1.74.0 from 1.71.0 to match the udpated build file for windows.
* Fix ray_cpp_pkg
* Use boost/bind/bind.hpp
boost/bind.hpp and global namespace placeholders are deprecated.
* lint
* Use absl::bind_front when possible. Otherwise, NOLINT
* lint
* lint
* lint
* lint
* more lint
* final lint
* trigger build