[Java] Remove RayRuntimeInternal class (#25016)

Due to we have already removed the multiple workers in one process, remove RayRuntimeInternal for purpose.
This commit is contained in:
Qing Wang 2022-05-24 09:22:48 +08:00 committed by GitHub
parent 7cf4233858
commit 4026b38b09
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 49 additions and 97 deletions

View file

@ -21,6 +21,7 @@ import io.ray.api.options.CallOptions;
import io.ray.api.options.PlacementGroupCreationOptions;
import io.ray.api.parallelactor.ParallelActorContext;
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.api.runtime.RayRuntime;
import io.ray.api.runtimecontext.RuntimeContext;
import io.ray.api.runtimeenv.RuntimeEnv;
import io.ray.runtime.config.RayConfig;
@ -31,6 +32,7 @@ import io.ray.runtime.functionmanager.FunctionDescriptor;
import io.ray.runtime.functionmanager.FunctionManager;
import io.ray.runtime.functionmanager.PyFunctionDescriptor;
import io.ray.runtime.functionmanager.RayFunction;
import io.ray.runtime.gcs.GcsClient;
import io.ray.runtime.generated.Common.Language;
import io.ray.runtime.object.ObjectRefImpl;
import io.ray.runtime.object.ObjectStore;
@ -50,7 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Core functionality to implement Ray APIs. */
public abstract class AbstractRayRuntime implements RayRuntimeInternal {
public abstract class AbstractRayRuntime implements RayRuntime {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRayRuntime.class);
public static final String PYTHON_INIT_METHOD_NAME = "__init__";
@ -82,6 +84,12 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
/*skipAddingLocalRef=*/ true);
}
public abstract GcsClient getGcsClient();
public abstract void start();
public abstract void run();
@Override
public <T> ObjectRef<T> put(T obj, BaseActorHandle ownerActor) {
if (LOGGER.isDebugEnabled()) {
@ -355,27 +363,22 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
abstract List<ObjectId> getCurrentReturnIds(int numReturns, ActorId actorId);
@Override
public WorkerContext getWorkerContext() {
return workerContext;
}
@Override
public ObjectStore getObjectStore() {
return objectStore;
}
@Override
public TaskExecutor getTaskExecutor() {
return taskExecutor;
}
@Override
public FunctionManager getFunctionManager() {
return functionManager;
}
@Override
public RayConfig getRayConfig() {
return rayConfig;
}

View file

@ -24,7 +24,7 @@ public class ConcurrencyGroupImpl implements ConcurrencyGroup {
funcs.forEach(
func -> {
RayFunction rayFunc =
((RayRuntimeInternal) Ray.internal()).getFunctionManager().getFunction(func);
((AbstractRayRuntime) Ray.internal()).getFunctionManager().getFunction(func);
functionDescriptors.add(rayFunc.getFunctionDescriptor());
});
}

View file

@ -28,11 +28,10 @@ public class DefaultRayRuntimeFactory implements RayRuntimeFactory {
try {
logger.debug("Initializing runtime with config: {}", rayConfig);
AbstractRayRuntime innerRuntime =
AbstractRayRuntime runtime =
rayConfig.runMode == RunMode.LOCAL
? new RayDevRuntime(rayConfig)
: new RayNativeRuntime(rayConfig);
RayRuntimeInternal runtime = innerRuntime;
runtime.start();
return runtime;
} catch (Exception e) {

View file

@ -288,8 +288,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
private static native byte[] nativeGetActorIdOfNamedActor(String actorName, String namespace);
private static native void nativeSetCoreWorker(byte[] workerId);
private static native Map<String, List<ResourceValue>> nativeGetResourceIds();
private static native String nativeGetNamespace();

View file

@ -1,30 +0,0 @@
package io.ray.runtime;
import io.ray.api.runtime.RayRuntime;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.context.WorkerContext;
import io.ray.runtime.functionmanager.FunctionManager;
import io.ray.runtime.gcs.GcsClient;
import io.ray.runtime.object.ObjectStore;
import io.ray.runtime.task.TaskExecutor;
/** This interface is required to make {@link RayRuntimeProxy} work. */
public interface RayRuntimeInternal extends RayRuntime {
/** Start runtime. */
void start();
WorkerContext getWorkerContext();
ObjectStore getObjectStore();
TaskExecutor getTaskExecutor();
FunctionManager getFunctionManager();
RayConfig getRayConfig();
GcsClient getGcsClient();
void run();
}

View file

@ -8,7 +8,7 @@ import io.ray.api.BaseActorHandle;
import io.ray.api.Ray;
import io.ray.api.id.ActorId;
import io.ray.api.id.ObjectId;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.generated.Common.Language;
import java.io.Externalizable;
import java.io.IOException;
@ -122,7 +122,7 @@ public abstract class NativeActorHandle implements BaseActorHandle, Externalizab
public NativeActorHandleReference(NativeActorHandle handle) {
super(handle, REFERENCE_QUEUE);
this.actorId = handle.actorId;
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
this.workerId = runtime.getWorkerContext().getCurrentWorkerId().getBytes();
this.removed = new AtomicBoolean(false);
REFERENCES.add(this);

View file

@ -8,7 +8,7 @@ import io.ray.api.id.TaskId;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.api.runtimecontext.ResourceValue;
import io.ray.api.runtimecontext.RuntimeContext;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.config.RunMode;
import io.ray.runtime.util.ResourceUtil;
import java.util.ArrayList;
@ -21,9 +21,9 @@ import java.util.stream.Collectors;
public class RuntimeContextImpl implements RuntimeContext {
private RayRuntimeInternal runtime;
private AbstractRayRuntime runtime;
public RuntimeContextImpl(RayRuntimeInternal runtime) {
public RuntimeContextImpl(AbstractRayRuntime runtime) {
this.runtime = runtime;
}

View file

@ -6,7 +6,7 @@ import io.ray.api.id.ActorId;
import io.ray.api.id.BaseId;
import io.ray.api.id.ObjectId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.context.WorkerContext;
import io.ray.runtime.generated.Common.Address;
import java.util.HashMap;
@ -40,7 +40,7 @@ public class NativeObjectStore extends ObjectStore {
@Override
public ObjectId putRaw(NativeRayObject obj, ActorId ownerActorId) {
byte[] serializedOwnerAddressBytes =
((RayRuntimeInternal) Ray.internal()).getGcsClient().getActorAddress(ownerActorId);
((AbstractRayRuntime) Ray.internal()).getGcsClient().getActorAddress(ownerActorId);
return new ObjectId(nativePut(obj, serializedOwnerAddressBytes));
}

View file

@ -8,7 +8,7 @@ import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.id.ObjectId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
@ -60,7 +60,7 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
public void init(ObjectId id, Class<?> type, boolean skipAddingLocalRef) {
this.id = id;
this.type = (Class<T>) type;
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
Preconditions.checkState(workerId == null);
workerId = runtime.getWorkerContext().getCurrentWorkerId();
@ -106,7 +106,7 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(this.getId());
out.writeObject(this.getType());
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
byte[] ownerAddress = runtime.getObjectStore().getOwnershipInfo(this.getId());
out.writeInt(ownerAddress.length);
out.write(ownerAddress);
@ -121,7 +121,7 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
byte[] ownerAddress = new byte[len];
in.readFully(ownerAddress);
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
Preconditions.checkState(workerId == null);
workerId = runtime.getWorkerContext().getCurrentWorkerId();
runtime.getObjectStore().addLocalReference(workerId, id);
@ -156,7 +156,7 @@ public final class ObjectRefImpl<T> implements ObjectRef<T>, Externalizable {
REFERENCES.remove(this);
// It's possible that GC is executed after the runtime is shutdown.
if (Ray.isInitialized()) {
((RayRuntimeInternal) (Ray.internal()))
((AbstractRayRuntime) (Ray.internal()))
.getObjectStore()
.removeLocalReference(workerId, objectId);
allObjects.remove(objectId);

View file

@ -1,7 +1,7 @@
package io.ray.runtime.runner.worker;
import io.ray.api.Ray;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
/** Default implementation of the worker process. */
public class DefaultWorker {
@ -12,6 +12,6 @@ public class DefaultWorker {
System.setProperty("ray.run-mode", "CLUSTER");
System.setProperty("ray.worker.mode", "WORKER");
Ray.init();
((RayRuntimeInternal) Ray.internal()).run();
((AbstractRayRuntime) Ray.internal()).run();
}
}

View file

@ -5,7 +5,7 @@ import com.google.common.primitives.Bytes;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.id.ObjectId;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.generated.Common.Address;
import io.ray.runtime.generated.Common.Language;
import io.ray.runtime.object.NativeRayObject;
@ -41,7 +41,7 @@ public class ArgumentsBuilder {
if (arg instanceof ObjectRef) {
Preconditions.checkState(arg instanceof ObjectRefImpl);
id = ((ObjectRefImpl<?>) arg).getId();
address = ((RayRuntimeInternal) Ray.internal()).getObjectStore().getOwnerAddress(id);
address = ((AbstractRayRuntime) Ray.internal()).getObjectStore().getOwnerAddress(id);
} else {
value = ObjectSerializer.serialize(arg);
if (language != Language.JAVA) {
@ -60,8 +60,8 @@ public class ArgumentsBuilder {
}
}
if (value.data.length > LARGEST_SIZE_PASS_BY_VALUE) {
id = ((RayRuntimeInternal) Ray.internal()).getObjectStore().putRaw(value);
address = ((RayRuntimeInternal) Ray.internal()).getWorkerContext().getRpcAddress();
id = ((AbstractRayRuntime) Ray.internal()).getObjectStore().putRaw(value);
address = ((AbstractRayRuntime) Ray.internal()).getWorkerContext().getRpcAddress();
value = null;
}
}

View file

@ -1,7 +1,7 @@
package io.ray.runtime.task;
import io.ray.api.id.UniqueId;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
/** Task executor for local mode. */
public class LocalModeTaskExecutor extends TaskExecutor<LocalModeTaskExecutor.LocalActorContext> {
@ -20,7 +20,7 @@ public class LocalModeTaskExecutor extends TaskExecutor<LocalModeTaskExecutor.Lo
}
}
public LocalModeTaskExecutor(RayRuntimeInternal runtime) {
public LocalModeTaskExecutor(AbstractRayRuntime runtime) {
super(runtime);
}

View file

@ -14,8 +14,8 @@ import io.ray.api.options.ActorCreationOptions;
import io.ray.api.options.CallOptions;
import io.ray.api.options.PlacementGroupCreationOptions;
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.ConcurrencyGroupImpl;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.actor.LocalModeActorHandle;
import io.ray.runtime.context.LocalModeWorkerContext;
import io.ray.runtime.functionmanager.FunctionDescriptor;
@ -59,7 +59,7 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
private final Map<ObjectId, Set<TaskSpec>> waitingTasks = new HashMap<>();
private final Object taskAndObjectLock = new Object();
private final RayRuntimeInternal runtime;
private final AbstractRayRuntime runtime;
private final TaskExecutor taskExecutor;
private final LocalModeObjectStore objectStore;
@ -169,7 +169,7 @@ public class LocalModeTaskSubmitter implements TaskSubmitter {
}
public LocalModeTaskSubmitter(
RayRuntimeInternal runtime, TaskExecutor taskExecutor, LocalModeObjectStore objectStore) {
AbstractRayRuntime runtime, TaskExecutor taskExecutor, LocalModeObjectStore objectStore) {
this.runtime = runtime;
this.taskExecutor = taskExecutor;
this.objectStore = objectStore;

View file

@ -1,14 +1,14 @@
package io.ray.runtime.task;
import io.ray.api.id.UniqueId;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
/** Task executor for cluster mode. */
public class NativeTaskExecutor extends TaskExecutor<NativeTaskExecutor.NativeActorContext> {
static class NativeActorContext extends TaskExecutor.ActorContext {}
public NativeTaskExecutor(RayRuntimeInternal runtime) {
public NativeTaskExecutor(AbstractRayRuntime runtime) {
super(runtime);
}

View file

@ -8,7 +8,7 @@ import io.ray.api.exception.RayTaskException;
import io.ray.api.id.JobId;
import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
import io.ray.runtime.functionmanager.RayFunction;
import io.ray.runtime.generated.Common.TaskType;
@ -32,7 +32,7 @@ public abstract class TaskExecutor<T extends TaskExecutor.ActorContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutor.class);
protected final RayRuntimeInternal runtime;
protected final AbstractRayRuntime runtime;
// TODO(qwang): Use actorContext instead later.
private final ConcurrentHashMap<UniqueId, T> actorContextMap = new ConcurrentHashMap<>();
@ -44,7 +44,7 @@ public abstract class TaskExecutor<T extends TaskExecutor.ActorContext> {
Object currentActor = null;
}
TaskExecutor(RayRuntimeInternal runtime) {
TaskExecutor(AbstractRayRuntime runtime) {
this.runtime = runtime;
}

View file

@ -1,7 +1,7 @@
package io.ray.runtime.util;
import io.ray.api.Ray;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@ -47,7 +47,7 @@ public final class MethodUtils {
/// This code path indicates that here might be in another thread of a worker.
/// So try to load the class from URLClassLoader of this worker.
ClassLoader cl =
((RayRuntimeInternal) Ray.internal()).getFunctionManager().getClassLoader();
((AbstractRayRuntime) Ray.internal()).getFunctionManager().getClassLoader();
actorClz = Class.forName(className, true, cl);
}
} catch (Exception e) {

View file

@ -8,7 +8,7 @@ import io.ray.api.concurrencygroup.ConcurrencyGroupBuilder;
import io.ray.api.function.RayFunc;
import io.ray.api.function.RayFuncR;
import io.ray.api.parallelactor.*;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.functionmanager.FunctionManager;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
@ -26,7 +26,7 @@ public class ParallelActorContextImpl implements ParallelActorContext {
.build();
}
FunctionManager functionManager = ((RayRuntimeInternal) Ray.internal()).getFunctionManager();
FunctionManager functionManager = ((AbstractRayRuntime) Ray.internal()).getFunctionManager();
JavaFunctionDescriptor functionDescriptor =
functionManager.getFunction(ctorFunc).getFunctionDescriptor();
ActorHandle<ParallelActorExecutorImpl> parallelExecutorHandle =
@ -42,7 +42,7 @@ public class ParallelActorContextImpl implements ParallelActorContext {
ParallelActorHandle<A> parallelActorHandle, int instanceId, RayFunc func, Object[] args) {
ActorHandle<ParallelActorExecutorImpl> parallelExecutor =
((ParallelActorHandleImpl) parallelActorHandle).getExecutor();
FunctionManager functionManager = ((RayRuntimeInternal) Ray.internal()).getFunctionManager();
FunctionManager functionManager = ((AbstractRayRuntime) Ray.internal()).getFunctionManager();
JavaFunctionDescriptor functionDescriptor =
functionManager.getFunction(func).getFunctionDescriptor();
ObjectRef<Object> ret =

View file

@ -2,7 +2,7 @@ package io.ray.runtime.utils.parallelactor;
import com.google.common.base.Preconditions;
import io.ray.api.Ray;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.functionmanager.FunctionManager;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
import io.ray.runtime.functionmanager.RayFunction;
@ -22,7 +22,7 @@ public class ParallelActorExecutorImpl {
public ParallelActorExecutorImpl(int parallelism, JavaFunctionDescriptor javaFunctionDescriptor)
throws InvocationTargetException, IllegalAccessException {
functionManager = ((RayRuntimeInternal) Ray.internal()).getFunctionManager();
functionManager = ((AbstractRayRuntime) Ray.internal()).getFunctionManager();
RayFunction init = functionManager.getFunction(javaFunctionDescriptor);
Thread.currentThread().setContextClassLoader(init.classLoader);
for (int i = 0; i < parallelism; ++i) {

View file

@ -3,7 +3,7 @@ package io.ray.test;
import com.google.common.base.Preconditions;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.runtime.RayRuntimeInternal;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.config.RunMode;
import io.ray.runtime.task.ArgumentsBuilder;
@ -122,12 +122,8 @@ public class TestUtils {
Assert.assertEquals(obj.get(), "hi");
}
public static RayRuntimeInternal getRuntime() {
return (RayRuntimeInternal) Ray.internal();
}
public static RayRuntimeInternal getUnderlyingRuntime() {
return (RayRuntimeInternal) Ray.internal();
public static AbstractRayRuntime getRuntime() {
return (AbstractRayRuntime) Ray.internal();
}
public static ProcessBuilder buildDriver(Class<?> mainClass, String[] args) {

View file

@ -390,12 +390,6 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeKillActor(
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0);
}
JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeSetCoreWorker(
JNIEnv *env, jclass, jbyteArray workerId) {
const auto worker_id = JavaByteArrayToId<WorkerID>(env, workerId);
CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id);
}
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_RayNativeRuntime_nativeGetResourceIds(JNIEnv *env, jclass) {
auto key_converter = [](JNIEnv *env, const std::string &str) -> jstring {

View file

@ -79,14 +79,6 @@ Java_io_ray_runtime_RayNativeRuntime_nativeGetActorIdOfNamedActor(JNIEnv *,
jstring,
jstring);
/*
* Class: io_ray_runtime_RayNativeRuntime
* Method: nativeSetCoreWorker
* Signature: ([B)V
*/
JNIEXPORT void JNICALL
Java_io_ray_runtime_RayNativeRuntime_nativeSetCoreWorker(JNIEnv *, jclass, jbyteArray);
/*
* Class: io_ray_runtime_RayNativeRuntime
* Method: nativeGetResourceIds