From 1d06e025aea831326c8b1d30af52f06bf5bf1a4b Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Sun, 20 Sep 2020 16:51:14 +0800 Subject: [PATCH] [Java]Add actor id when throw RayActorException (#10886) --- .../api/src/main/java/io/ray/api/id/TaskId.java | 2 +- .../runtime/exception/RayActorException.java | 10 ++++++++++ .../io/ray/runtime/object/ObjectSerializer.java | 3 ++- .../main/java/io/ray/runtime/util/IdUtil.java | 17 +++++++++++++++++ .../src/main/java/io/ray/test/FailureTest.java | 1 + 5 files changed, 31 insertions(+), 2 deletions(-) diff --git a/java/api/src/main/java/io/ray/api/id/TaskId.java b/java/api/src/main/java/io/ray/api/id/TaskId.java index 06c6547c9..4f5b0c5ff 100644 --- a/java/api/src/main/java/io/ray/api/id/TaskId.java +++ b/java/api/src/main/java/io/ray/api/id/TaskId.java @@ -9,7 +9,7 @@ import java.util.Arrays; */ public class TaskId extends BaseId implements Serializable { - private static final int UNIQUE_BYTES_LENGTH = 8; + public static final int UNIQUE_BYTES_LENGTH = 8; public static final int LENGTH = ActorId.LENGTH + UNIQUE_BYTES_LENGTH; diff --git a/java/runtime/src/main/java/io/ray/runtime/exception/RayActorException.java b/java/runtime/src/main/java/io/ray/runtime/exception/RayActorException.java index 4765be5cb..1bb4b67cd 100644 --- a/java/runtime/src/main/java/io/ray/runtime/exception/RayActorException.java +++ b/java/runtime/src/main/java/io/ray/runtime/exception/RayActorException.java @@ -1,5 +1,7 @@ package io.ray.runtime.exception; +import io.ray.api.id.ActorId; + /** * Indicates that the actor died unexpectedly before finishing a task. * @@ -8,10 +10,18 @@ package io.ray.runtime.exception; */ public class RayActorException extends RayException { + public ActorId actorId; + public RayActorException() { super("The actor died unexpectedly before finishing this task."); } + public RayActorException(ActorId actorId) { + super(String.format( + "The actor %s died unexpectedly before finishing this task.", actorId)); + this.actorId = actorId; + } + public RayActorException(String message) { super(message); } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java index 6e9dd6c05..f26b20b68 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java @@ -9,6 +9,7 @@ import io.ray.runtime.exception.RayWorkerException; import io.ray.runtime.exception.UnreconstructableException; import io.ray.runtime.generated.Common.ErrorType; import io.ray.runtime.serializer.Serializer; +import io.ray.runtime.util.IdUtil; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -74,7 +75,7 @@ public class ObjectSerializer { } else if (Arrays.equals(meta, WORKER_EXCEPTION_META)) { return new RayWorkerException(); } else if (Arrays.equals(meta, ACTOR_EXCEPTION_META)) { - return new RayActorException(); + return new RayActorException(IdUtil.getActorIdFromObjectId(objectId)); } else if (Arrays.equals(meta, UNRECONSTRUCTABLE_EXCEPTION_META)) { return new UnreconstructableException(objectId); } else if (Arrays.equals(meta, TASK_EXECUTION_EXCEPTION_META)) { diff --git a/java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java b/java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java index 2dbe2e8b4..eca2860af 100644 --- a/java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java +++ b/java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java @@ -1,6 +1,9 @@ package io.ray.runtime.util; +import io.ray.api.id.ActorId; import io.ray.api.id.BaseId; +import io.ray.api.id.ObjectId; +import io.ray.api.id.TaskId; /** * Helper method for different Ids. Note: any changes to these methods must be synced with C++ @@ -75,4 +78,18 @@ public class IdUtil { return h; } + + /** + * Compute the actor ID of the task which created this object. + * @return The actor ID of the task which created this object. + */ + public static ActorId getActorIdFromObjectId(ObjectId objectId) { + byte[] taskIdBytes = new byte[TaskId.LENGTH]; + System.arraycopy(objectId.getBytes(), 0, taskIdBytes, 0, TaskId.LENGTH); + TaskId taskId = TaskId.fromBytes(taskIdBytes); + byte[] actorIdBytes = new byte[ActorId.LENGTH]; + System.arraycopy(taskId.getBytes(), TaskId.UNIQUE_BYTES_LENGTH, + actorIdBytes, 0, ActorId.LENGTH); + return ActorId.fromBytes(actorIdBytes); + } } diff --git a/java/test/src/main/java/io/ray/test/FailureTest.java b/java/test/src/main/java/io/ray/test/FailureTest.java index 3f779bf23..6529b1beb 100644 --- a/java/test/src/main/java/io/ray/test/FailureTest.java +++ b/java/test/src/main/java/io/ray/test/FailureTest.java @@ -121,6 +121,7 @@ public class FailureTest extends BaseTest { } catch (RayActorException e) { // When the actor process dies while executing a task, we should receive an // RayActorException. + Assert.assertEquals(e.actorId, actor.getId()); } try { actor.task(BadActor::badMethod).remote().get();