This PR adds the API `setRuntimeEnv` for submitting a normal task, for the usage:
```java
RuntimeEnv runtimeEnv =
new RuntimeEnv.Builder()
.addEnvVar("KEY1", "A")
.build();
/// Return `A`
Ray.task(RuntimeEnvTest::getEnvVar, "KEY1").setRuntimeEnv(runtimeEnv).remote().get();
```
1. Support setting environment variables in runtime env for a job, like:
```yaml
ray : {
job : {
runtime-env: {
// Environment variables to be set on worker processes in current job.
"env-vars": {
// key1: "value11"
// key2: "value22"
}
}
}
}
```
It could be set by system properties before `Ray.init()` as well:
```java
System.setProperty("ray.job.runtime-env.env-vars.KEY1", "A");
System.setProperty("ray.job.runtime-env.env-vars.KEY2", "B");
Ray.init();
```
2. Setting environment variables for an actor will overwrite and merge to the environment variables of job.
```java
System.setProperty("ray.job.runtime-env.env-vars.KEY1", "A");
System.setProperty("ray.job.runtime-env.env-vars.KEY2", "B");
Ray.init();
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder().addEnvVar("KEY1", "C").build();
/// actor1 has the env vars: {"KEY1" : "C", "KEY2" : "B"}
ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
/// actor2 has the env vars: {"KEY1" : "A", "KEY2" : "B"}
ActorHandle<A> actor2 = Ray.actor(A::new).remote();
```
This PR supports setting actor level env vars for Java worker in runtime env.
General API looks like:
```java
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder()
.addEnvVar("KEY1", "A")
.addEnvVar("KEY2", "B")
.addEnvVar("KEY1", "C") // This overwrites "KEY1" to "C"
.build();
ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
```
If `num-java-workers-per-process` > 1, it will never reuse the worker process except they have the same runtime envs.
Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
When a Ray program first creates an ObjectRef (via ray.put or task call), we add it with a ref count of 0 in the C++ backend because the language frontend will increment the initial local ref once we return the allocated ObjectID, then delete the local ref once the ObjectRef goes out of scope. Thus, there is a brief window where the object ref will appear to be out of scope.
This can cause problems with async protocols that check whether the object is in scope or not, such as the previous bug fixed in #19910. Now that we plan to enable lineage reconstruction to automatically recover lost objects, this race condition can also be problematic because we use the ref count to decide whether an object needs to be recovered or not.
This PR avoids these race conditions by incrementing the local ref count in the C++ backend when executing ray.put() and task calls. The frontend is then responsible for skipping the initial local ref increment when creating the ObjectRef. This is the same fix used in #19910, but generalized to all initial ObjectRefs.
This is a re-merge for #21719 with a fix for removing the owned object ref if creation fails.
When a Ray program first creates an ObjectRef (via ray.put or task call), we add it with a ref count of 0 in the C++ backend because the language frontend will increment the initial local ref once we return the allocated ObjectID, then delete the local ref once the ObjectRef goes out of scope. Thus, there is a brief window where the object ref will appear to be out of scope.
This can cause problems with async protocols that check whether the object is in scope or not, such as the previous bug fixed in #19910. Now that we plan to enable lineage reconstruction to automatically recover lost objects, this race condition can also be problematic because we use the ref count to decide whether an object needs to be recovered or not.
This PR avoids these race conditions by incrementing the local ref count in the C++ backend when executing ray.put() and task calls. The frontend is then responsible for skipping the initial local ref increment when creating the ObjectRef. This is the same fix used in #19910, but generalized to all initial ObjectRefs.
Support the ability to specify a default lifetime for actors which are not specified lifetime when creating. This is a job level configuration item.
#### API Change
The Python API looks like:
```python
ray.init(job_config=JobConfig(default_actor_lifetime="detached"))
```
Java API looks like:
```java
System.setProperty("ray.job.default-actor-lifetime", defaultActorLifetime.name());
Ray.init();
```
One example usage is:
```python
ray.init(job_config=JobConfig(default_actor_lifetime="detached"))
a1 = A.options(lifetime="non_detached").remote() # a1 is a non-detached actor.
a2 = A.remote() # a2 is a non-detached actor.
```
Co-authored-by: Kai Yang <kfstorm@outlook.com>
Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
This is the second last PR to improve `ActorDiedError` exception.
This propagates Actor death cause metadata to the ray error object. In this way, we can raise a better actor died error exception.
After this PR is merged, I will add more metadata to each error message and write a documentation that explains when each error happens.
TODO
- [x] Fix test failures
- [x] Add unit tests
- [x] Fix Java/cpp cases
Follow up PRs
- Not allowing nullptr for RayErrorInfo input.
To make other system or internal project reuse ray deps bazel function, we need change this local accessing style to global accessing with ray-project namespace.
Co-authored-by: 林濯 <lingxuzn.zlx@antgroup.com>
In python or C++, we can specify the bundle index as -1 to use any available bundle in the placement group. We should also enable it in Java to keep the API consistent across all languages.
This PR changes the enum value `ActorLifetime.DEFAULT` to `ActorLifetime.NON_DETACHED`. In our release versions, `ActorLifetime` was not introduced <= 1.9.2
Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
This PR introduces statically defining ConcurrencyGroup APIs in Java.
We introduce 2 APIs:
1. Introducing `@DefConcurrencyGroup` annotation for an actor class to define a concurrency group statically.
2. Introducing `@UseConcurrencyGroup` annotation for actor methods to define the concurrency group to be used in the method.
Examples are below:
```java
@DefConcurrencyGroup(name = "io", maxConcurrency = 2)
@DefConcurrencyGroup(name = "compute", maxConcurrency = 4)
private static class MyActor {
@UseConcurrencyGroup(name = "io")
public long f1() { }
@UseConcurrencyGroup(name = "io")
public long f2() { }
@UseConcurrencyGroup(name = "compute")
public long f3(int a, int b) { }
@UseConcurrencyGroup(name = "compute")
public long f4() { }
}
ActorHandle<> myActor = Ray.actor(MyActor::new).remote();
myActor.task(MyActor::f1).remote();
myActor.task(MyActor::f2).remote();
myActor.task(MyActor::f3).remote();
myActor.task(MyActor::f4).remote();
```
`MyActor` has 3 concurrency groups: `io` with 2 concurrency, `compute` with 4 concurrency and `default` with 1 concurrency.
f1 and f2 will be executed in `io`, f3 and f4 will be executed in `compute`.
In Xlang(Python call Java), a Java method which overrides a `default` method of the super class is not able to be invoked successfully, due to we treat it as overloaded method instead of overrided method. This PR correctly handle it at the case it overrides a `default` method.
Before this PR, the following usage is not able to be invoked from Python -> Java.
```Java
public interface ExampleInterface {
default String echo(String inp) {
return inp;
}
}
public class ExampleImpl implements ExampleInterface {
@Override
public String echo(String inp) {
return inp + " echo";
}
}
```
```python
/// Invoke it in Python.
cls = ray.java_actor_class("io.ray.serve.util.ExampleImpl")
handle = cls.remote()
print(ray.get(handle.echo.remote("hi")))
```
This PR moves the internal kv namespace logic into cpp to reduce logic in python for the following reasons:
- internal kv is used in x-lang so we have to move it to cpp so that all langs can benefit.
- for https://github.com/ray-project/ray/issues/8822 we need to delete resource when job finished in gcs
One extra field about del is also added so that when delete, we are able to delete by prefix instead of just a key
There's a redis connection in gcs client, but most time the gcs client is never used in worker. We can make the initialization lazy to reduce redis connections.
After that, the number of redis connections reduces from 2 to 1 in one core worker.
This PR povided universal native memory access support in java worker mentioned in #21234, which will also be the foundation for later zero-copy and serialization.
The main changes include:
* Native memory operations based on `sun.misc.Unsafe`
* Little-Endian based Native memory buffer.
* Native memory based IO operations:
* InputStream/OutputStream
* ReadChannel/WriteChannel
* MockReadChannel/MockWriteChannel
We add a enum class ActorLifetime to indicate the lifetime of an actor. In this PR, we also add the necessary API to create an actor with specifying lifetime.
Currently, it has 2 values: detached and default.
These dependencies are widely used:
- com.google.common
- com.google.protobuf
- com.google.thirdparty
So that we need to shade them to avoid being conflict with jars introduced by user.
In this PR, we introduce a `bazel_jar_jar` rule for doing these and also shade them in maven pom files.
Resubmit the PR https://github.com/ray-project/ray/pull/19936
I've figure out that the test case `//rllib:tests/test_gpus::test_gpus_in_local_mode` failed due to deadlock in local mode.
In local mode, if the user code submits another task during the executing of current task, the `CoreWorker::actor_task_mutex_` may cause deadlock.
The solution is quite simple, release the lock before executing task in local mode.
In the commit 7c2f61c76c:
1. Release the lock in local mode to fix the bug. @scv119
2. `test_local_mode_deadlock` added to cover the case. @rkooo567
3. Left a trivial change in `rllib/tests/test_gpus.py` to make the `RAY_CI_RLLIB_DIRECTLY_AFFECTED ` to take effect.
Fix Remote code injection in Log4j
Log4j versions prior to 2.15.0 are subject to a remote code execution vulnerability via the ldap JNDI parser.
Check this refer: [CVE-2021-44228](https://github.com/advisories/GHSA-jfh8-c2jp-5v3q)