this starts breaking Mac java build with new errors; I think it is the same issue as before why we reverted this PR
…ment from Java. …" (#27945)"
This reverts commit af488e1.
We have encountered `java.lang.ClassNotFoundException` when deploying Java Ray Serve deployments. The property `ray.job.code-search-path` which specifies the search path of user's classes is not working. The reason is that `ray.job.code-search-path` is loaded in an independent classloader in Ray context, but Serve Replica initialized user class with `AppClassLoader`. We need to change the classloader used to construct user classes to the one in Ray context.
In the previously merged pr(https://github.com/ray-project/ray/pull/22726/commits), java serve's support for python deployment was not implemented. This PR is used to implement this feature.
Co-authored-by: nanqi.yxf <nanqi.yxf@antgroup.com>
If compile Ray in debug mode,
* run `MetricsTest:: testAddHistogram` will crash with below error message:
```
BucketBoundaries::Explicit called with non-monotonic boundary list.
java: external/io_opencensus_cpp/opencensus/stats/internal/bucket_boundaries.cc:64: opencensus::stats::BucketBoundaries::Explicit(std::__debug::vector<double>)::<lambda()>: Assertion `false && "0"' failed.
```
* run `NamespaceTest::testIsolationInTheSameNamespaces` can fail with great possibility with below error message:
```
java.util.NoSuchElementException: No value present
at java.util.Optional.get(Optional.java:135)
at io.ray.test.NamespaceTest.lambda$testIsolationInTheSameNamespaces$2(NamespaceTest.java:39)
at io.ray.test.NamespaceTest.testIsolation(NamespaceTest.java:116)
at io.ray.test.NamespaceTest.testIsolationInTheSameNamespaces(NamespaceTest.java:36)
```
Add an API to get the node id of this worker, see usage:
```java
UniqueId currNodeId = Ray.getRuntimeContext().getCurrentNodeId();
```
for the requirement from Ray Serve.
Allow you start actors in different namespace instead of the driver namespace.
Usage is simple:
```java
Ray.init(namespace="a");
/// Named actor a will starts in namespace `b`
ActorHandle<A> a = Ray.actor(A::new).setName("myActor", "b").remote();
```
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Now we can run custom java tests by:
0. `cp testng_custom_template.xml testng_custom.xml`
1. Specify test class/method in `testng_custom.xml`
2. `bazel test //java:custom_test --test_output=streamed`
We removed the thread local core worker instance in this PR, which is the further arch cleaning stuff for removing multiple workers in one process.
It also removes the unnecessary parameter `workerId` from JNI.
* [runtime env] runtime env inheritance refactor (#22244)
Runtime Environments is already GA in Ray 1.6.0. The latest doc is [here](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#runtime-environments). And now, we already supported a [inheritance](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#inheritance) behavior as follows (copied from the doc):
- The runtime_env["env_vars"] field will be merged with the runtime_env["env_vars"] field of the parent. This allows for environment variables set in the parent’s runtime environment to be automatically propagated to the child, even if new environment variables are set in the child’s runtime environment.
- Every other field in the runtime_env will be overridden by the child, not merged. For example, if runtime_env["py_modules"] is specified, it will replace the runtime_env["py_modules"] field of the parent.
We think this runtime env merging logic is so complex and confusing to users because users can't know the final runtime env before the jobs are run.
Current PR tries to do a refactor and change the behavior of Runtime Environments inheritance. Here is the new behavior:
- **If there is no runtime env option when we create actor, inherit the parent runtime env.**
- **Otherwise, use the optional runtime env directly and don't do the merging.**
Add a new API named `ray.runtime_env.get_current_runtime_env()` to get the parent runtime env and modify this dict by yourself. Like:
```Actor.options(runtime_env=ray.runtime_env.get_current_runtime_env().update({"X": "Y"}))```
This new API also can be used in ray client.
This PR moves all exception classes from runtime module to api module. It's aiming to eliminate the confusion about ray exceptions. It means that Ray users don't need to touch runtime module when API programming after this PR.
Note that this should be merged onto 2.0.
This is the 1st PR to remove the code path of multiple core workers in one process. This PR is aiming to remove the flags and APIs related to `num_workers`.
After this PR checking in, we needn't to consider the multiple core workers any longer.
The further following PRs are related to the deeper logic refactor, like eliminating the gap between core worker and core worker process, removing the logic related to multiple workers from workerpool, gcs and etc.
**BREAK CHANGE**
This PR removes these APIs:
- Ray.wrapRunnable();
- Ray.wrapCallable();
- Ray.setAsyncContext();
- Ray.getAsyncContext();
And the following APIs are not allowed to invoke in a user-created thread in local mode:
- Ray.getRuntimeContext().getCurrentActorId();
- Ray.getRuntimeContext().getCurrentTaskId()
Note that this PR shouldn't be merged to 1.x.
This PR supports specifying the jars(or zip packages) for a job, which are used for all workers for this job.
You can specify jars or zips in the config file of your job:
```yml
ray {
job {
runtime-env: {
"jars": [
"https://my_host/a.jar",
"https://my_host/b.jar"
]
}
}
}
```
or via system properties:
```java
System.setProperty("ray.job.runtime-env.jars.0", "https://my_host/a.jar");
System.setProperty("ray.job.runtime-env.jars.1", "https://my_host/a.jar");
Ray.init();
// all workers of this job will add a.jar and b.jar into the classpath.
```
Currently, when an actor has `max_restarts` > 0 and has crashed, the actor will enter RESTARTING state and then ALIVE. Imagine this scenario: an online service provides HTTP service and the proxy actor receives requests, forwards them to worker actors, and replies to clients with the execution results from worker actors.
```
-> Worker A (actor)
/
/
HTTP requests -------> Proxy (actor with HTTP server) ---> Worker B (actor)
\
\
-> ...
```
For each HTTP request, the proxy picks one worker (e.g. worker A) based on some algorithm, sends the request to it, and calls `ray.get()` to wait for the result. If for some reason the picked worker crashed, Ray will restart the actor, and `ray.get()` will throw an error. The proxy may pick another worker (e.g. worker B) and re-send the request to it. This is OK.
But new requests keep coming. The proxy may pick worker A again. But because worker A is still in RESTARTING state, it's not ready to serve requests. `ray.get()` on subsequent requests sent to worker A will hang until worker A is back online (ALIVE state). The proxy won't be able to reschedule these requests to another worker because currently there's no way to know if worker A is alive or not before sending a request. We can't say worker A is not alive just based on whether `ray.get()` hangs either.
To solve this issue, we change the semantics of `max_task_retries`.
* When max_task_retries is 0 (which is the default value), if the callee actor is in the RESTARTING state, subsequently submitted tasks will fail immediately with a RayActorError. Users can catch the RayActorError and implement their own fallback strategies to improve service availability and mitigate service outages.
* When max_task_retries is not 0, subsequently submitted tasks will be queued on the caller side and we only send them to the callee when the callee actor is back to the ALIVE state.
TODO
- [x] Add test cases.
- [ ] Update docs.
- [x] API change review.
Aiming to:
1. addressing the bug about concurrency group, see #19593
2. improving the stability of the ray call latency perf in online applications.
we're proposing using async post instead of `PostBlocking` in threadpool.
Note that since we have already had back pressure in the caller side, I believe this change is safe to merge and it doesn't break any behavior.
This PR supports setting the jars for an actor in Ray API. The API looks like:
```java
class A {
public boolean findClass(String className) {
try {
Class.forName(className);
} catch (ClassNotFoundException e) {
return false;
}
return true;
}
}
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder()
.addJars(ImmutableList.of("https://github.com/ray-project/test_packages/raw/main/raw_resources/java-1.0-SNAPSHOT.jar"))
.build();
ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
boolean ret = actor1.task(A::findClass, "io.testpackages.Foo").remote().get();
System.out.println(ret); // true
```
Jackson is a widely-used utility. User from Ant reports the jackson class is conflicted between Ray jar and user's jar.
This PR shade the jackson in Ray jar to avoid the conflict.
Co-authored-by: Kai Yang <kfstorm@outlook.com>
For the purpose to provide an alternative option for running multiple actor instances in a Java worker process, and the eventual goal is to remove the original multi-worker-instances in one worker process implementation. we're proposing supporting parallel actor concept in Java. This feature enables that users could define some homogeneous parallel execution instances in an actor, and all instances hold one thread as the execution backend.
### Introduction
For the following example, we define a parallel actor with 10 parallelism. The backend actor has 10 concurrency groups for the parallel executions, it also means there're 10 threads for that.
We can access the instance by the instance handle, like:
```java
ParallelActorHandle<A> actor = ParallelActor.actor(A::new).setParallelism(10).remote();
ParallelInstance<A> instance = actor.getInstance(/*index=*/ 2);
Preconditions.checkNotNull(instance);
Ray.get(instance.task(A::incr, 1000000).remote()); // print 1000000
instance = actor.getInstance(/*index=*/ 2);
Preconditions.checkNotNull(instance);
Ray.get(instance.task(A::incr, 2000000).remote().get()); // print 3000000
instance = actor.getInstance(/*index=*/ 3);
Preconditions.checkNotNull(instance);
Ray.get(instance.task(A::incr, 2000000).remote().get()); // print 2000000
```
### Limitation
- It doesn't support concurrency group on a parallel actor yet.
Co-authored-by: Kai Yang <kfstorm@outlook.com>