[Java] Add killActor API in Java (#6728)

* Add killActor API in Java

* fix javadoc

* update test case

* Address comments
This commit is contained in:
Kai Yang 2020-01-14 17:12:00 +08:00 committed by Qing Wang
parent 2bcf72e306
commit ddd4c42fe5
9 changed files with 109 additions and 2 deletions

View file

@ -130,6 +130,7 @@ public final class Ray extends RayCall {
/**
* Set the async context for the current thread.
*
* @param asyncContext The async context to set.
*/
public static void setAsyncContext(Object asyncContext) {
@ -180,6 +181,16 @@ public final class Ray extends RayCall {
runtime.setResource(resourceName, capacity, UniqueId.NIL);
}
/**
* Kill the actor immediately. This will cause any outstanding tasks submitted to the actor to
* fail and the actor to exit in the same way as if it crashed.
*
* @param actor The actor to be killed.
*/
public static void killActor(RayActor<?> actor) {
runtime.killActor(actor);
}
/**
* Get the runtime context.
*/

View file

@ -76,6 +76,13 @@ public interface RayRuntime {
*/
void setResource(String resourceName, double capacity, UniqueId nodeId);
/**
* Kill the actor immediately.
*
* @param actor The actor to be killed.
*/
void killActor(RayActor<?> actor);
/**
* Invoke a remote function.
*

View file

@ -2,6 +2,7 @@ package org.ray.runtime;
import java.util.concurrent.atomic.AtomicInteger;
import org.ray.api.RayActor;
import org.ray.api.id.JobId;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.RayConfig;
@ -47,6 +48,11 @@ public class RayDevRuntime extends AbstractRayRuntime {
LOGGER.error("Not implemented under SINGLE_PROCESS mode.");
}
@Override
public void killActor(RayActor<?> actor) {
throw new UnsupportedOperationException();
}
@Override
public Object getAsyncContext() {
return null;

View file

@ -140,6 +140,11 @@ public class RayMultiWorkerNativeRuntime implements RayRuntime {
getCurrentRuntime().setResource(resourceName, capacity, nodeId);
}
@Override
public void killActor(RayActor<?> actor) {
getCurrentRuntime().killActor(actor);
}
@Override
public RayObject call(RayFunc func, Object[] args, CallOptions options) {
return getCurrentRuntime().call(func, args, options);

View file

@ -6,8 +6,10 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.ray.api.RayActor;
import org.ray.api.id.JobId;
import org.ray.api.id.UniqueId;
import org.ray.runtime.actor.NativeRayActor;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.context.NativeWorkerContext;
import org.ray.runtime.functionmanager.FunctionManager;
@ -127,6 +129,14 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
nativeSetResource(nativeCoreWorkerPointer, resourceName, capacity, nodeId.getBytes());
}
@Override
public void killActor(RayActor<?> actor) {
if (!((NativeRayActor) actor).isDirectCallActor()) {
throw new UnsupportedOperationException("Only direct call actors can be killed.");
}
nativeKillActor(nativeCoreWorkerPointer, actor.getId().getBytes());
}
@Override
public Object getAsyncContext() {
return null;
@ -184,4 +194,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
private static native void nativeSetResource(long conn, String resourceName, double capacity,
byte[] nodeId);
private static native void nativeKillActor(long nativeCoreWorkerPointer, byte[] actorId);
}

View file

@ -28,8 +28,17 @@ public class TestUtils {
}
public static void skipTestIfDirectActorCallEnabled() {
if (ActorCreationOptions.DEFAULT_USE_DIRECT_CALL) {
throw new SkipException("This test doesn't work when direct actor call is enabled.");
skipTestIfDirectActorCallEnabled(true);
}
public static void skipTestIfDirectActorCallDisabled() {
skipTestIfDirectActorCallEnabled(false);
}
private static void skipTestIfDirectActorCallEnabled(boolean enabled) {
if (enabled == ActorCreationOptions.DEFAULT_USE_DIRECT_CALL) {
throw new SkipException(String.format("This test doesn't work when direct actor call is %s.",
enabled ? "enabled" : "disabled"));
}
}

View file

@ -0,0 +1,40 @@
package org.ray.api.test;
import com.google.common.collect.ImmutableList;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.annotation.RayRemote;
import org.ray.api.exception.RayActorException;
import org.testng.Assert;
import org.testng.annotations.Test;
@Test(groups = { "directCall" })
public class KillActorTest extends BaseTest {
@RayRemote
public static class HangActor {
public boolean alive() {
return true;
}
public boolean hang() throws InterruptedException {
while (true) {
Thread.sleep(1000);
}
}
}
public void testKillActor() {
TestUtils.skipTestUnderSingleProcess();
TestUtils.skipTestIfDirectActorCallDisabled();
RayActor<HangActor> actor = Ray.createActor(HangActor::new);
Assert.assertTrue(Ray.call(HangActor::alive, actor).get());
RayObject<Boolean> result = Ray.call(HangActor::hang, actor);
Assert.assertEquals(0, Ray.wait(ImmutableList.of(result), 1, 500).getReady().size());
Ray.killActor(actor);
Assert.expectThrows(RayActorException.class, result::get);
}
}

View file

@ -133,6 +133,13 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetResource(
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
}
JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeKillActor(
JNIEnv *env, jclass, jlong nativeCoreWorkerPointer, jbyteArray actorId) {
auto core_worker = reinterpret_cast<ray::CoreWorker *>(nativeCoreWorkerPointer);
auto status = core_worker->KillActor(JavaByteArrayToId<ActorID>(env, actorId));
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
}
#ifdef __cplusplus
}
#endif

View file

@ -56,6 +56,16 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeShutdownHook(
JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeSetResource(
JNIEnv *, jclass, jlong, jstring, jdouble, jbyteArray);
/*
* Class: org_ray_runtime_RayNativeRuntime
* Method: nativeKillActor
* Signature: (J[B)V
*/
JNIEXPORT void JNICALL Java_org_ray_runtime_RayNativeRuntime_nativeKillActor(JNIEnv *,
jclass,
jlong,
jbyteArray);
#ifdef __cplusplus
}
#endif