[Ray][xlang]Setting async flag for Python actor actor in Java (#28149)

It's important that setting async flag for Python actor in Java for us.
So we added the API which is named "PyActorCreator setAsync(boolean enabled)" based on PyActorCreator,
To avoid misuse for user, we check the flag before the ActorCreationTask is executed.
This commit is contained in:
XiaodongLv 2022-09-03 11:09:19 +08:00 committed by GitHub
parent 3b7346ab50
commit a31be7cef1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 105 additions and 5 deletions

View file

@ -14,6 +14,11 @@ public class PyActorCreator extends BaseActorCreator<PyActorCreator> {
this.args = args;
}
public PyActorCreator setAsync(boolean isAsync) {
builder.setAsync(isAsync);
return this;
}
/**
* Create a python actor remotely and return a handle to the created actor.
*

View file

@ -25,6 +25,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
public final String serializedRuntimeEnv;
public final String namespace;
public final int maxPendingCalls;
public final boolean isAsync;
private ActorCreationOptions(
String name,
@ -38,7 +39,8 @@ public class ActorCreationOptions extends BaseTaskOptions {
List<ConcurrencyGroup> concurrencyGroups,
String serializedRuntimeEnv,
String namespace,
int maxPendingCalls) {
int maxPendingCalls,
boolean isAsync) {
super(resources);
this.name = name;
this.lifetime = lifetime;
@ -51,6 +53,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
this.serializedRuntimeEnv = serializedRuntimeEnv;
this.namespace = namespace;
this.maxPendingCalls = maxPendingCalls;
this.isAsync = isAsync;
}
/** The inner class for building ActorCreationOptions. */
@ -67,6 +70,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
private RuntimeEnv runtimeEnv = null;
private String namespace = null;
private int maxPendingCalls = -1;
private boolean isAsync = false;
/**
* Set the actor name of a named actor. This named actor is accessible in this namespace by this
@ -176,6 +180,17 @@ public class ActorCreationOptions extends BaseTaskOptions {
return this;
}
/**
* Mark the creating actor as async. If the Python actor is/is not async but it's marked
* async/not async in Java, it will result in RayValueError errors
*
* @return self
*/
public Builder setAsync(boolean isAsync) {
this.isAsync = isAsync;
return this;
}
/**
* Set the placement group to place this actor in.
*
@ -202,7 +217,8 @@ public class ActorCreationOptions extends BaseTaskOptions {
concurrencyGroups,
runtimeEnv != null ? runtimeEnv.serializeToRuntimeEnvInfo() : "",
namespace,
maxPendingCalls);
maxPendingCalls,
isAsync);
}
/** Set the concurrency groups for this actor. */

View file

@ -23,6 +23,7 @@ import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
import org.apache.commons.io.FileUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@ -172,6 +173,38 @@ public class CrossLanguageInvocationTest extends BaseTest {
Assert.assertEquals(res.get(), "2".getBytes());
}
@Test
public void testCallingPythonAsyncActor() {
{
PyActorHandle actor =
Ray.actor(PyActorClass.of(PYTHON_MODULE, "AsyncCounter"), "1".getBytes())
.setAsync(true)
.remote();
actor.task(PyActorMethod.of("block_task", byte[].class)).remote();
ObjectRef<byte[]> res =
actor.task(PyActorMethod.of("increase", byte[].class), "1".getBytes()).remote();
Assert.assertEquals(res.get(), "2".getBytes());
}
{
PyActorHandle actor =
Ray.actor(PyActorClass.of(PYTHON_MODULE, "SyncCounter"), "1".getBytes())
.setAsync(false)
.remote();
actor.task(PyActorMethod.of("block_task", byte[].class)).remote();
ObjectRef<byte[]> res =
actor.task(PyActorMethod.of("increase", byte[].class), "1".getBytes()).remote();
Supplier<Boolean> getValue =
() -> {
if (equals(res.get() == "2".getBytes())) {
return true;
} else {
return false;
}
};
Assert.assertFalse(TestUtils.waitForCondition(getValue, 30000));
}
}
@Test
public void testCallingCppFunction() {
// Test calling a simple C++ function.

View file

@ -1,6 +1,8 @@
# This file is used by CrossLanguageInvocationTest.java to test cross-language
# invocation.
import asyncio
import ray
@ -128,6 +130,35 @@ class Counter(object):
return str(self.value).encode("utf-8")
@ray.remote
class AsyncCounter(object):
async def __init__(self, value):
self.value = int(value)
self.event = asyncio.Event()
async def block_task(self):
self.event.wait()
async def increase(self, delta):
self.value += int(delta)
self.event.set()
return str(self.value).encode("utf-8")
@ray.remote
class SyncCounter(object):
def __init__(self, value):
self.value = int(value)
self.event = asyncio.Event()
def block_task(self):
self.event.wait()
def increase(self, delta):
self.value += int(delta)
return str(self.value).encode("utf-8")
@ray.remote
def py_func_create_named_actor():
counter = Counter.options(name="py_named_actor", lifetime="detached").remote(100)

View file

@ -634,6 +634,13 @@ cdef execute_task(
function = execution_info.function
if core_worker.current_actor_is_asyncio():
if len(inspect.getmembers(
actor.__class__,
predicate=inspect.iscoroutinefunction)) == 0:
raise RayActorError(
f"Failed to create the actor {core_worker.get_actor_id()}. "
"The failure reason is that you set the async flag, "
"but the actor has no any coroutine function.")
# Increase recursion limit if necessary. In asyncio mode,
# we have many parallel callstacks (represented in fibers)
# that's suspended for execution. Python interpreter will

View file

@ -168,6 +168,7 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
std::string serialized_runtime_env = "";
std::string ray_namespace = "";
int32_t max_pending_calls = -1;
bool is_async = false;
if (actorCreationOptions) {
auto java_name = (jstring)env->GetObjectField(actorCreationOptions,
@ -259,6 +260,8 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
max_pending_calls = static_cast<int32_t>(env->GetIntField(
actorCreationOptions, java_actor_creation_options_max_pending_calls));
is_async = (bool)env->GetBooleanField(actorCreationOptions,
java_actor_creation_options_is_async);
}
rpc::SchedulingStrategy scheduling_strategy;
@ -282,7 +285,7 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
is_detached,
name,
ray_namespace,
/*is_asyncio=*/false,
is_async,
/*scheduling_strategy=*/scheduling_strategy,
serialized_runtime_env,
concurrency_groups,

View file

@ -114,6 +114,7 @@ jfieldID java_actor_creation_options_concurrency_groups;
jfieldID java_actor_creation_options_serialized_runtime_env;
jfieldID java_actor_creation_options_namespace;
jfieldID java_actor_creation_options_max_pending_calls;
jfieldID java_actor_creation_options_is_async;
jclass java_actor_lifetime_class;
int DETACHED_LIFETIME_ORDINAL_VALUE;
@ -367,6 +368,8 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
java_actor_creation_options_class, "namespace", "Ljava/lang/String;");
java_actor_creation_options_max_pending_calls =
env->GetFieldID(java_actor_creation_options_class, "maxPendingCalls", "I");
java_actor_creation_options_is_async =
env->GetFieldID(java_actor_creation_options_class, "isAsync", "Z");
java_actor_lifetime_class = LoadClass(env, "io/ray/api/options/ActorLifetime");
java_actor_lifetime_ordinal =

View file

@ -205,6 +205,8 @@ extern jfieldID java_actor_creation_options_serialized_runtime_env;
extern jfieldID java_actor_creation_options_namespace;
/// maxPendingCalls field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_pending_calls;
/// isAsync field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_is_async;
/// ActorLifetime enum class
extern jclass java_actor_lifetime_class;
/// ordinal method of ActorLifetime class