[Java] Simplify Ray.wrapXxx() (#14895)

Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
This commit is contained in:
Qing Wang 2021-03-25 10:58:12 +08:00 committed by GitHub
parent 03afaed6e1
commit 493d15e05b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -219,26 +219,12 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
isContextSet.set(true); isContextSet.set(true);
} }
// TODO (kfstorm): Simplify the duplicate code in wrap*** methods.
@Override @Override
public final Runnable wrapRunnable(Runnable runnable) { public final Runnable wrapRunnable(Runnable runnable) {
Object asyncContext = getAsyncContext(); Object asyncContext = getAsyncContext();
return () -> { return () -> {
boolean oldIsContextSet = isContextSet.get(); try (RayAsyncContextUpdater updater = new RayAsyncContextUpdater(asyncContext, this)) {
Object oldAsyncContext = null;
if (oldIsContextSet) {
oldAsyncContext = getAsyncContext();
}
setAsyncContext(asyncContext);
try {
runnable.run(); runnable.run();
} finally {
if (oldIsContextSet) {
setAsyncContext(oldAsyncContext);
} else {
setIsContextSet(false);
}
} }
}; };
} }
@ -247,20 +233,8 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
public final <T> Callable<T> wrapCallable(Callable<T> callable) { public final <T> Callable<T> wrapCallable(Callable<T> callable) {
Object asyncContext = getAsyncContext(); Object asyncContext = getAsyncContext();
return () -> { return () -> {
boolean oldIsContextSet = isContextSet.get(); try (RayAsyncContextUpdater updater = new RayAsyncContextUpdater(asyncContext, this)) {
Object oldAsyncContext = null;
if (oldIsContextSet) {
oldAsyncContext = getAsyncContext();
}
setAsyncContext(asyncContext);
try {
return callable.call(); return callable.call();
} finally {
if (oldIsContextSet) {
setAsyncContext(oldAsyncContext);
} else {
setIsContextSet(false);
}
} }
}; };
} }
@ -328,6 +302,34 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
return actor; return actor;
} }
/// An auto closable class that is used for updating the async context when invoking Ray APIs.
private static final class RayAsyncContextUpdater implements AutoCloseable {
private AbstractRayRuntime runtime;
private boolean oldIsContextSet;
private Object oldAsyncContext = null;
public RayAsyncContextUpdater(Object asyncContext, AbstractRayRuntime runtime) {
this.runtime = runtime;
oldIsContextSet = runtime.isContextSet.get();
if (oldIsContextSet) {
oldAsyncContext = runtime.getAsyncContext();
}
runtime.setAsyncContext(asyncContext);
}
@Override
public void close() {
if (oldIsContextSet) {
runtime.setAsyncContext(oldAsyncContext);
} else {
runtime.setIsContextSet(false);
}
}
}
@Override @Override
public WorkerContext getWorkerContext() { public WorkerContext getWorkerContext() {
return workerContext; return workerContext;