diff --git a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java index 4bebeb444..9332da6c6 100644 --- a/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java +++ b/java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java @@ -163,6 +163,20 @@ public class GcsClient { return nodeInfo; } + public byte[] getActorAddress(ActorId actorId) { + byte[] serializedActorInfo = globalStateAccessor.getActorInfo(actorId); + if (serializedActorInfo == null) { + return null; + } + + try { + Gcs.ActorTableData actorTableData = Gcs.ActorTableData.parseFrom(serializedActorInfo); + return actorTableData.getAddress().toByteArray(); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Received invalid protobuf data from GCS."); + } + } + /** Destroy global state accessor when ray native runtime will be shutdown. */ public void destroy() { // Only ray shutdown should call gcs client destroy. diff --git a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java index 136712c09..bf99e6f2a 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java @@ -1,10 +1,12 @@ package io.ray.runtime.object; import com.google.protobuf.InvalidProtocolBufferException; +import io.ray.api.Ray; import io.ray.api.id.ActorId; import io.ray.api.id.BaseId; import io.ray.api.id.ObjectId; import io.ray.api.id.UniqueId; +import io.ray.runtime.RayRuntimeInternal; import io.ray.runtime.context.WorkerContext; import io.ray.runtime.generated.Common.Address; import java.util.HashMap; @@ -37,7 +39,9 @@ public class NativeObjectStore extends ObjectStore { @Override public ObjectId putRaw(NativeRayObject obj, ActorId ownerActorId) { - return new ObjectId(nativePut(obj, ownerActorId.getBytes())); + byte[] serializedOwnerAddressBytes = + ((RayRuntimeInternal) Ray.internal()).getGcsClient().getActorAddress(ownerActorId); + return new ObjectId(nativePut(obj, serializedOwnerAddressBytes)); } @Override @@ -113,7 +117,7 @@ public class NativeObjectStore extends ObjectStore { return ids.stream().map(BaseId::getBytes).collect(Collectors.toList()); } - private static native byte[] nativePut(NativeRayObject obj, byte[] ownerActorIdBytes); + private static native byte[] nativePut(NativeRayObject obj, byte[] serializedOwnerAddressBytes); private static native void nativePut(byte[] objectId, NativeRayObject obj); diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index edcaf9613..531951e4a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3095,8 +3095,6 @@ void CoreWorker::SetActorTitle(const std::string &title) { const rpc::JobConfig &CoreWorker::GetJobConfig() const { return *job_config_; } -std::shared_ptr CoreWorker::GetGcsClient() const { return gcs_client_; } - bool CoreWorker::IsExiting() const { return exiting_; } std::unordered_map> CoreWorker::GetActorCallStats() diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 5c80459f5..d3c5ff4d7 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -769,9 +769,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // Get serialized job configuration. const rpc::JobConfig &GetJobConfig() const; - // Get gcs_client - std::shared_ptr GetGcsClient() const; - /// Return true if the core worker is in the exit process. bool IsExiting() const; diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc index c883b401a..887966ee7 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_object_NativeObjectStore.cc @@ -76,31 +76,13 @@ extern "C" { JNIEXPORT jbyteArray JNICALL Java_io_ray_runtime_object_NativeObjectStore_nativePut__Lio_ray_runtime_object_NativeRayObject_2_3B( - JNIEnv *env, jclass, jobject obj, jbyteArray owner_actor_id_bytes) { + JNIEnv *env, jclass, jobject obj, jbyteArray serialized_owner_actor_address_bytes) { ObjectID object_id; std::unique_ptr owner_address = nullptr; - if (owner_actor_id_bytes) { - rpc::ActorTableData actor_table_data; - { - /// Get actor info from GCS synchronously. - std::unique_ptr serialized_actor_table_data; - std::promise promise; - auto gcs_client = CoreWorkerProcess::GetCoreWorker().GetGcsClient(); - RAY_CHECK_OK(gcs_client->Actors().AsyncGet( - ActorID::FromBinary(JavaByteArrayToNativeString(env, owner_actor_id_bytes)), - [&serialized_actor_table_data, &promise]( - const Status &status, const boost::optional &result) { - RAY_CHECK_OK(status); - if (result) { - serialized_actor_table_data.reset( - new std::string(result->SerializeAsString())); - } - promise.set_value(true); - })); - promise.get_future().get(); - actor_table_data.ParseFromString(*serialized_actor_table_data); - } - owner_address = std::make_unique(actor_table_data.address()); + if (serialized_owner_actor_address_bytes != nullptr) { + owner_address = std::make_unique(); + owner_address->ParseFromString( + JavaByteArrayToNativeString(env, serialized_owner_actor_address_bytes)); } auto status = PutSerializedObject(env, obj, /*object_id=*/ObjectID::Nil(), /*out_object_id=*/&object_id, /*pin_object=*/true,