[Java] Support exchange ObjectRef between processes (#10729)

This commit is contained in:
Xianyang Liu 2020-09-13 11:54:45 +08:00 committed by GitHub
parent 9f9b53e624
commit 8166d71bde
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 175 additions and 1 deletions

View file

@ -105,4 +105,14 @@ public class LocalModeObjectStore extends ObjectStore {
@Override
public void removeLocalReference(UniqueId workerId, ObjectId objectId) {
}
@Override
public byte[] promoteAndGetOwnershipInfo(ObjectId objectId) {
return new byte[0];
}
@Override
public void registerOwnershipInfoAndResolveFuture(
ObjectId objectId, ObjectId outerObjectId, byte[] ownerAddress) {
}
}

View file

@ -68,6 +68,21 @@ public class NativeObjectStore extends ObjectStore {
}
}
@Override
public byte[] promoteAndGetOwnershipInfo(ObjectId objectId) {
return nativePromoteAndGetOwnershipInfo(objectId.getBytes());
}
@Override
public void registerOwnershipInfoAndResolveFuture(ObjectId objectId, ObjectId outerObjectId,
byte[] ownerAddress) {
byte[] outer = null;
if (outerObjectId != null) {
outer = outerObjectId.getBytes();
}
nativeRegisterOwnershipInfoAndResolveFuture(objectId.getBytes(), outer, ownerAddress);
}
public Map<ObjectId, long[]> getAllReferenceCounts() {
Map<ObjectId, long[]> referenceCounts = new HashMap<>();
for (Map.Entry<byte[], long[]> entry :
@ -98,4 +113,9 @@ public class NativeObjectStore extends ObjectStore {
private static native void nativeRemoveLocalReference(byte[] workerId, byte[] objectId);
private static native Map<byte[], long[]> nativeGetAllReferenceCounts();
private static native byte[] nativePromoteAndGetOwnershipInfo(byte[] objectId);
private static native void nativeRegisterOwnershipInfoAndResolveFuture(byte[] objectId,
byte[] outerObjectId, byte[] ownerAddress);
}

View file

@ -65,6 +65,10 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(this.getId());
out.writeObject(this.getType());
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
byte[] ownerAddress = runtime.getObjectStore().promoteAndGetOwnershipInfo(this.getId());
out.writeInt(ownerAddress.length);
out.write(ownerAddress);
ObjectSerializer.addContainedObjectId(this.getId());
}
@ -72,7 +76,13 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.id = (ObjectId) in.readObject();
this.type = (Class<T>) in.readObject();
int len = in.readInt();
byte[] ownerAddress = new byte[len];
in.readFully(ownerAddress);
addLocalReference();
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
runtime.getObjectStore().registerOwnershipInfoAndResolveFuture(
this.id, ObjectSerializer.getOuterObjectId(), ownerAddress);
}
private void addLocalReference() {

View file

@ -47,6 +47,8 @@ public class ObjectSerializer {
// field will contain all the nested object IDs.
static ThreadLocal<Set<ObjectId>> containedObjectIds = ThreadLocal.withInitial(HashSet::new);
static ThreadLocal<ObjectId> outerObjectId = ThreadLocal.withInitial(() -> null);
/**
* Deserialize an object from an {@link NativeRayObject} instance.
*
@ -170,4 +172,16 @@ public class ObjectSerializer {
containedObjectIds.get().clear();
return ids;
}
static void setOuterObjectId(ObjectId objectId) {
outerObjectId.set(objectId);
}
static ObjectId getOuterObjectId() {
return outerObjectId.get();
}
static void resetOuterObjectId() {
outerObjectId.set(null);
}
}

View file

@ -96,8 +96,13 @@ public abstract class ObjectStore {
NativeRayObject dataAndMeta = dataAndMetaList.get(i);
Object object = null;
if (dataAndMeta != null) {
object = ObjectSerializer
try {
ObjectSerializer.setOuterObjectId(ids.get(i));
object = ObjectSerializer
.deserialize(dataAndMeta, ids.get(i), elementType);
} finally {
ObjectSerializer.resetOuterObjectId();
}
}
if (object instanceof RayException) {
// If the object is a `RayException`, it means that an error occurred during task
@ -181,4 +186,27 @@ public abstract class ObjectStore {
* @param objectId The object ID to decrease the reference count for.
*/
public abstract void removeLocalReference(UniqueId workerId, ObjectId objectId);
/**
* Promote the given object to the underlying object store, and get the ownership info.
*
* @param objectId The ID of the object to promote
* @return the serialized ownership address
*/
public abstract byte[] promoteAndGetOwnershipInfo(ObjectId objectId);
/**
* Add a reference to an ObjectID that will deserialized. This will also start the process to
* resolve the future. Specifically, we will periodically contact the owner, until we learn that
* the object has been created or the owner is no longer reachable. This will then unblock any
* Gets or submissions of tasks dependent on the object.
*
* @param objectId The object ID to deserialize.
* @param outerObjectId The object ID that contained objectId, if any. This may be nil if the
* object ID was inlined directly in a task spec or if it was passed
* out-of-band by the application (deserialized from a byte string).
* @param ownerAddress The address of the object's owner.
*/
public abstract void registerOwnershipInfoAndResolveFuture(ObjectId objectId,
ObjectId outerObjectId, byte[] ownerAddress);
}

View file

@ -0,0 +1,45 @@
package io.ray.test;
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import java.util.ArrayList;
import java.util.List;
import org.testng.Assert;
import org.testng.annotations.Test;
public class ObjectRefTransferTest extends BaseTest {
@Test
public void testObjectTransfer() {
ObjectRef<String> objectRef = Ray.put("test");
List<ObjectRef<String>> data = new ArrayList<>();
data.add(objectRef);
ActorHandle<RemoteActor> handle = Ray.actor(RemoteActor::new).remote();
String result = handle.task(RemoteActor::get, data).remote().get();
Assert.assertEquals(result, "test");
}
@Test
public void testNestedObjectId() {
ObjectRef<String> inner = Ray.put("inner");
ObjectRef<ObjectRef<String>> outer = Ray.put(inner);
List<ObjectRef<ObjectRef<String>>> data = new ArrayList<>();
data.add(outer);
ActorHandle<RemoteActor> handle = Ray.actor(RemoteActor::new).remote();
String result = handle.task(RemoteActor::getNested, data).remote().get();
Assert.assertEquals(result, "inner");
}
public static class RemoteActor {
public String get(List<ObjectRef<String>> value) {
return Ray.get(value.get(0));
}
public String getNested(List<ObjectRef<ObjectRef<String>>> value) {
return Ray.get(value.get(0).get());
}
}
}

View file

@ -174,6 +174,34 @@ Java_io_ray_runtime_object_NativeObjectStore_nativeGetAllReferenceCounts(JNIEnv
});
}
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativePromoteAndGetOwnershipInfo(
JNIEnv *env, jclass, jbyteArray objectId) {
auto object_id = JavaByteArrayToId<ray::ObjectID>(env, objectId);
ray::CoreWorkerProcess::GetCoreWorker().PromoteObjectToPlasma(object_id);
ray::rpc::Address address;
ray::CoreWorkerProcess::GetCoreWorker().GetOwnershipInfo(object_id, &address);
auto address_str = address.SerializeAsString();
auto arr = NativeStringToJavaByteArray(env, address_str);
return arr;
}
JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeRegisterOwnershipInfoAndResolveFuture(
JNIEnv *env, jclass, jbyteArray objectId, jbyteArray outerObjectId,
jbyteArray ownerAddress) {
auto object_id = JavaByteArrayToId<ray::ObjectID>(env, objectId);
auto outer_objectId = ray::ObjectID::Nil();
if (outerObjectId != NULL) {
outer_objectId = JavaByteArrayToId<ray::ObjectID>(env, outerObjectId);
}
auto ownerAddressStr = JavaByteArrayToNativeString(env, ownerAddress);
ray::rpc::Address address;
address.ParseFromString(ownerAddressStr);
ray::CoreWorkerProcess::GetCoreWorker().RegisterOwnershipInfoAndResolveFuture(
object_id, outer_objectId, address);
}
#ifdef __cplusplus
}
#endif

View file

@ -94,6 +94,25 @@ JNIEXPORT jobject JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeGetAllReferenceCounts(JNIEnv *,
jclass);
/*
* Class: io_ray_runtime_object_NativeObjectStore
* Method: nativePromoteAndGetOwnershipInfo
* Signature: ([B)[B
*/
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativePromoteAndGetOwnershipInfo(JNIEnv *,
jclass,
jbyteArray);
/*
* Class: io_ray_runtime_object_NativeObjectStore
* Method: nativeRegisterOwnershipInfoAndResolveFuture
* Signature: ([B[B[B)V
*/
JNIEXPORT void JNICALL
Java_io_ray_runtime_object_NativeObjectStore_nativeRegisterOwnershipInfoAndResolveFuture(
JNIEnv *, jclass, jbyteArray, jbyteArray, jbyteArray);
#ifdef __cplusplus
}
#endif