[Java] Support wasCurrentActorRestarted in actor task. (#13120)

* Remove check.

* Add test

* fix lint

* lint

* Fix spotless lint

* Address comments.

* Fix lint

Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
This commit is contained in:
Qing Wang 2021-01-02 11:31:08 +08:00 committed by GitHub
parent 710615c228
commit d3dd5b87ce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 27 deletions

View file

@ -17,11 +17,7 @@ public interface RuntimeContext {
*/ */
ActorId getCurrentActorId(); ActorId getCurrentActorId();
/** /** Returns true if the current actor was restarted, otherwise false. */
* Returns true if the current actor was restarted, false if it's created for the first time.
*
* <p>Note, this method should only be called from an actor creation task.
*/
boolean wasCurrentActorRestarted(); boolean wasCurrentActorRestarted();
/** /**

View file

@ -7,7 +7,6 @@ import io.ray.api.runtimecontext.NodeInfo;
import io.ray.api.runtimecontext.RuntimeContext; import io.ray.api.runtimecontext.RuntimeContext;
import io.ray.runtime.RayRuntimeInternal; import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.config.RunMode; import io.ray.runtime.config.RunMode;
import io.ray.runtime.generated.Common.TaskType;
import java.util.List; import java.util.List;
public class RuntimeContextImpl implements RuntimeContext { public class RuntimeContextImpl implements RuntimeContext {
@ -33,14 +32,9 @@ public class RuntimeContextImpl implements RuntimeContext {
@Override @Override
public boolean wasCurrentActorRestarted() { public boolean wasCurrentActorRestarted() {
TaskType currentTaskType = runtime.getWorkerContext().getCurrentTaskType();
Preconditions.checkState(
currentTaskType == TaskType.ACTOR_CREATION_TASK,
"This method can only be called from an actor creation task.");
if (isSingleProcess()) { if (isSingleProcess()) {
return false; return false;
} }
return runtime.getGcsClient().wasCurrentActorRestarted(getCurrentActorId()); return runtime.getGcsClient().wasCurrentActorRestarted(getCurrentActorId());
} }

View file

@ -22,7 +22,7 @@ public class ActorRestartTest extends BaseTest {
wasCurrentActorRestarted = Ray.getRuntimeContext().wasCurrentActorRestarted(); wasCurrentActorRestarted = Ray.getRuntimeContext().wasCurrentActorRestarted();
} }
public boolean wasCurrentActorRestarted() { public boolean checkWasCurrentActorRestartedInActorCreationTask() {
return wasCurrentActorRestarted; return wasCurrentActorRestarted;
} }
@ -31,6 +31,10 @@ public class ActorRestartTest extends BaseTest {
return value; return value;
} }
public boolean checkWasCurrentActorRestartedInActorTask() {
return Ray.getRuntimeContext().wasCurrentActorRestarted();
}
public int getPid() { public int getPid() {
return SystemUtil.pid(); return SystemUtil.pid();
} }
@ -43,30 +47,38 @@ public class ActorRestartTest extends BaseTest {
actor.task(Counter::increase).remote().get(); actor.task(Counter::increase).remote().get();
} }
Assert.assertFalse(actor.task(Counter::wasCurrentActorRestarted).remote().get()); // Check if actor was restarted.
Assert.assertFalse(
actor.task(Counter::checkWasCurrentActorRestartedInActorCreationTask).remote().get());
Assert.assertFalse(
actor.task(Counter::checkWasCurrentActorRestartedInActorTask).remote().get());
// Kill the actor process. // Kill the actor process.
int pid = actor.task(Counter::getPid).remote().get(); killActorProcess(actor);
Runtime.getRuntime().exec("kill -9 " + pid);
// Wait for the actor to be killed.
TimeUnit.SECONDS.sleep(1);
int value = actor.task(Counter::increase).remote().get(); int value = actor.task(Counter::increase).remote().get();
Assert.assertEquals(value, 1); Assert.assertEquals(value, 1);
Assert.assertTrue(actor.task(Counter::wasCurrentActorRestarted).remote().get()); // Check if actor was restarted again.
Assert.assertTrue(
actor.task(Counter::checkWasCurrentActorRestartedInActorCreationTask).remote().get());
Assert.assertTrue(actor.task(Counter::checkWasCurrentActorRestartedInActorTask).remote().get());
// Kill the actor process again. // Kill the actor process again.
pid = actor.task(Counter::getPid).remote().get(); killActorProcess(actor);
Runtime.getRuntime().exec("kill -9 " + pid);
TimeUnit.SECONDS.sleep(1);
// Try calling increase on this actor again and this should fail. // Try calling increase on this actor again and this should fail.
try { Assert.assertThrows(
actor.task(Counter::increase).remote().get(); RayActorException.class, () -> actor.task(Counter::increase).remote().get());
Assert.fail("The above task didn't fail.");
} catch (RayActorException e) {
// We should receive a RayActorException because the actor is dead.
} }
/** The helper to kill a counter actor. */
private static void killActorProcess(ActorHandle<Counter> actor)
throws IOException, InterruptedException {
// Kill the actor process.
int pid = actor.task(Counter::getPid).remote().get();
Process p = Runtime.getRuntime().exec("kill -9 " + pid);
// Wait for the actor to be killed.
TimeUnit.SECONDS.sleep(1);
} }
} }