[Java] Support ActorLifetime (#21074)

We add a enum class ActorLifetime to indicate the lifetime of an actor. In this PR, we also add the necessary API to create an actor with specifying lifetime.
Currently, it has 2 values: detached and default.
This commit is contained in:
Qing Wang 2021-12-23 19:48:56 +08:00 committed by GitHub
parent e653d47533
commit 2df27a5f87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 131 additions and 2 deletions

View file

@ -1,6 +1,7 @@
package io.ray.api.call;
import io.ray.api.options.ActorCreationOptions;
import io.ray.api.options.ActorLifetime;
import io.ray.api.placementgroup.PlacementGroup;
import java.util.Map;
@ -24,6 +25,11 @@ public class BaseActorCreator<T extends BaseActorCreator> {
return self();
}
public T setLifetime(ActorLifetime lifetime) {
builder.setLifetime(lifetime);
return self();
}
/**
* Set a custom resource requirement to reserve for the lifetime of this actor. This method can be
* called multiple times. If the same resource is set multiple times, the latest quantity will be

View file

@ -11,6 +11,7 @@ import java.util.Map;
/** The options for creating actor. */
public class ActorCreationOptions extends BaseTaskOptions {
public final String name;
public final ActorLifetime lifetime;
public final int maxRestarts;
public final List<String> jvmOptions;
public final int maxConcurrency;
@ -21,6 +22,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
private ActorCreationOptions(
String name,
ActorLifetime lifetime,
Map<String, Double> resources,
int maxRestarts,
List<String> jvmOptions,
@ -31,6 +33,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
int maxPendingCalls) {
super(resources);
this.name = name;
this.lifetime = lifetime;
this.maxRestarts = maxRestarts;
this.jvmOptions = jvmOptions;
this.maxConcurrency = maxConcurrency;
@ -43,6 +46,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
/** The inner class for building ActorCreationOptions. */
public static class Builder {
private String name;
private ActorLifetime lifetime = ActorLifetime.DEFAULT;
private Map<String, Double> resources = new HashMap<>();
private int maxRestarts = 0;
private List<String> jvmOptions = new ArrayList<>();
@ -65,6 +69,12 @@ public class ActorCreationOptions extends BaseTaskOptions {
return this;
}
/** Declare the lifetime of this actor. */
public Builder setLifetime(ActorLifetime lifetime) {
this.lifetime = lifetime;
return this;
}
/**
* Set a custom resource requirement to reserve for the lifetime of this actor. This method can
* be called multiple times. If the same resource is set multiple times, the latest quantity
@ -170,6 +180,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
public ActorCreationOptions build() {
return new ActorCreationOptions(
name,
lifetime,
resources,
maxRestarts,
jvmOptions,

View file

@ -0,0 +1,15 @@
package io.ray.api.options;
/** The enumeration class is used for declaring lifetime of actors. It's non detached by default. */
public enum ActorLifetime {
DEFAULT("DEFAULT", 0),
DETACHED("DETACHED", 1);
private String name;
private int value;
ActorLifetime(String name, int value) {
this.name = name;
this.value = value;
}
}

View file

@ -0,0 +1,73 @@
package io.ray.test;
import io.ray.api.ActorHandle;
import io.ray.api.Ray;
import io.ray.api.options.ActorLifetime;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.Test;
@Test(groups = {"cluster"})
public class ActorLifetimeTest {
private static class MyActor {
public String echo(String str) {
return str;
}
}
public void testDetached() throws IOException, InterruptedException {
System.setProperty("ray.job.namespace", "test2");
Ray.init();
startDriver(DriverClassWithDetachedActor.class);
Assert.assertTrue(Ray.getActor("my_actor").isPresent());
Ray.shutdown();
}
public void testNonDetached() throws IOException, InterruptedException {
System.setProperty("ray.job.namespace", "test2");
Ray.init();
startDriver(DriverClassWithNonDetachedActor.class);
Assert.assertFalse(Ray.getActor("my_actor").isPresent());
Ray.shutdown();
}
private static class DriverClassWithDetachedActor {
public static void main(String[] argv) {
System.setProperty("ray.job.long-running", "true");
System.setProperty("ray.job.namespace", "test2");
Ray.init();
ActorHandle<MyActor> myActor =
Ray.actor(MyActor::new).setLifetime(ActorLifetime.DETACHED).setName("my_actor").remote();
myActor.task(MyActor::echo, "hello").remote().get();
Ray.shutdown();
}
}
private static class DriverClassWithNonDetachedActor {
public static void main(String[] argv) {
System.setProperty("ray.job.namespace", "test2");
Ray.init();
ActorHandle<MyActor> myActor = Ray.actor(MyActor::new).setName("my_actor").remote();
myActor.task(MyActor::echo, "hello").remote().get();
Ray.shutdown();
}
}
private static void startDriver(Class<?> driverClass) throws IOException, InterruptedException {
Process driver = null;
try {
ProcessBuilder builder = TestUtils.buildDriver(driverClass, null);
builder.redirectError(ProcessBuilder.Redirect.INHERIT);
driver = builder.start();
// Wait for driver to start.
driver.waitFor();
System.out.println("Driver finished.");
} finally {
if (driver != null) {
driver.waitFor(1, TimeUnit.SECONDS);
}
}
}
}

View file

@ -144,6 +144,7 @@ inline TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject callOptio
inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
jobject actorCreationOptions) {
std::string name = "";
bool is_detached = false;
int64_t max_restarts = 0;
std::unordered_map<std::string, double> resources;
std::vector<std::string> dynamic_worker_options;
@ -158,6 +159,12 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
if (java_name) {
name = JavaStringToNativeString(env, java_name);
}
auto java_actor_lifetime = (jobject)env->GetObjectField(
actorCreationOptions, java_actor_creation_options_lifetime);
RAY_CHECK(java_actor_lifetime != nullptr);
jint actor_lifetime_value =
env->GetIntField(java_actor_lifetime, java_actor_lifetime_value);
is_detached = (actor_lifetime_value == 1);
max_restarts =
env->GetIntField(actorCreationOptions, java_actor_creation_options_max_restarts);
jobject java_resources =
@ -240,7 +247,7 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
resources,
resources,
dynamic_worker_options,
/*is_detached=*/false,
is_detached,
name,
ray_namespace,
/*is_asyncio=*/false,

View file

@ -98,6 +98,7 @@ jfieldID java_call_options_concurrency_group_name;
jclass java_actor_creation_options_class;
jfieldID java_actor_creation_options_name;
jfieldID java_actor_creation_options_lifetime;
jfieldID java_actor_creation_options_max_restarts;
jfieldID java_actor_creation_options_jvm_options;
jfieldID java_actor_creation_options_max_concurrency;
@ -106,6 +107,9 @@ jfieldID java_actor_creation_options_bundle_index;
jfieldID java_actor_creation_options_concurrency_groups;
jfieldID java_actor_creation_options_max_pending_calls;
jclass java_actor_lifetime_class;
jfieldID java_actor_lifetime_value;
jclass java_placement_group_creation_options_class;
jclass java_placement_group_creation_options_strategy_class;
jfieldID java_placement_group_creation_options_name;
@ -312,6 +316,9 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
LoadClass(env, "io/ray/api/options/ActorCreationOptions");
java_actor_creation_options_name =
env->GetFieldID(java_actor_creation_options_class, "name", "Ljava/lang/String;");
java_actor_creation_options_lifetime =
env->GetFieldID(java_actor_creation_options_class, "lifetime",
"Lio/ray/api/options/ActorLifetime;");
java_actor_creation_options_max_restarts =
env->GetFieldID(java_actor_creation_options_class, "maxRestarts", "I");
java_actor_creation_options_jvm_options = env->GetFieldID(
@ -327,6 +334,10 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
java_actor_creation_options_class, "concurrencyGroups", "Ljava/util/List;");
java_actor_creation_options_max_pending_calls =
env->GetFieldID(java_actor_creation_options_class, "maxPendingCalls", "I");
java_actor_lifetime_class = LoadClass(env, "io/ray/api/options/ActorLifetime");
java_actor_lifetime_value = env->GetFieldID(java_actor_lifetime_class, "value", "I");
java_concurrency_group_impl_class =
LoadClass(env, "io/ray/runtime/ConcurrencyGroupImpl");
java_concurrency_group_impl_get_function_descriptors = env->GetMethodID(
@ -400,6 +411,7 @@ void JNI_OnUnload(JavaVM *vm, void *reserved) {
env->DeleteGlobalRef(java_function_arg_class);
env->DeleteGlobalRef(java_base_task_options_class);
env->DeleteGlobalRef(java_actor_creation_options_class);
env->DeleteGlobalRef(java_actor_lifetime_class);
env->DeleteGlobalRef(java_placement_group_creation_options_class);
env->DeleteGlobalRef(java_placement_group_creation_options_strategy_class);
env->DeleteGlobalRef(java_native_ray_object_class);

View file

@ -175,6 +175,8 @@ extern jfieldID java_call_options_concurrency_group_name;
extern jclass java_actor_creation_options_class;
/// name field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_name;
/// lifetime field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_lifetime;
/// maxRestarts field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_restarts;
/// jvmOptions field of ActorCreationOptions class
@ -189,7 +191,10 @@ extern jfieldID java_actor_creation_options_bundle_index;
extern jfieldID java_actor_creation_options_concurrency_groups;
/// maxPendingCalls field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_pending_calls;
/// ActorCreationOptions class
extern jclass java_actor_lifetime_class;
/// name field of ActorCreationOptions class
extern jfieldID java_actor_lifetime_value;
/// ConcurrencyGroupImpl class
extern jclass java_concurrency_group_impl_class;
/// getFunctionDescriptors method of ConcurrencyGroupImpl class