mirror of
https://github.com/vale981/ray
synced 2025-03-04 17:41:43 -05:00
[runtime env][java] Support runtime env config in Java (#28083)
Support job level and task/actor level runtime env config eg. `setupTimeoutSeconds` and `eagerInstall`.
This commit is contained in:
parent
50cb51387e
commit
cf2cb66d29
14 changed files with 280 additions and 25 deletions
|
@ -5,4 +5,8 @@ public class RuntimeEnvException extends RayException {
|
||||||
public RuntimeEnvException(String message) {
|
public RuntimeEnvException(String message) {
|
||||||
super(message);
|
super(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RuntimeEnvException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import io.ray.api.id.ActorId;
|
||||||
import io.ray.api.id.JobId;
|
import io.ray.api.id.JobId;
|
||||||
import io.ray.api.id.TaskId;
|
import io.ray.api.id.TaskId;
|
||||||
import io.ray.api.id.UniqueId;
|
import io.ray.api.id.UniqueId;
|
||||||
|
import io.ray.api.runtimeenv.RuntimeEnv;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/** A class used for getting information of Ray runtime. */
|
/** A class used for getting information of Ray runtime. */
|
||||||
|
@ -45,4 +46,9 @@ public interface RuntimeContext {
|
||||||
|
|
||||||
/** Get the node id of this worker. */
|
/** Get the node id of this worker. */
|
||||||
UniqueId getCurrentNodeId();
|
UniqueId getCurrentNodeId();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the runtime env of this worker. If it is a driver, job level runtime env will be returned.
|
||||||
|
*/
|
||||||
|
RuntimeEnv getCurrentRuntimeEnv();
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,14 @@ public interface RuntimeEnv {
|
||||||
*/
|
*/
|
||||||
public String getJsonStr(String name) throws RuntimeEnvException;
|
public String getJsonStr(String name) throws RuntimeEnvException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether a field is contained.
|
||||||
|
*
|
||||||
|
* @param name The runtime env plugin name.
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
boolean contains(String name);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove a runtime env field by name.
|
* Remove a runtime env field by name.
|
||||||
*
|
*
|
||||||
|
@ -64,6 +72,13 @@ public interface RuntimeEnv {
|
||||||
*/
|
*/
|
||||||
public String serialize() throws RuntimeEnvException;
|
public String serialize() throws RuntimeEnvException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the runtime env is empty.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
boolean isEmpty();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Serialize the runtime env to string of RuntimeEnvInfo.
|
* Serialize the runtime env to string of RuntimeEnvInfo.
|
||||||
*
|
*
|
||||||
|
@ -83,6 +98,20 @@ public interface RuntimeEnv {
|
||||||
return Ray.internal().deserializeRuntimeEnv(serializedRuntimeEnv);
|
return Ray.internal().deserializeRuntimeEnv(serializedRuntimeEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set runtime env config.
|
||||||
|
*
|
||||||
|
* @param runtimeEnvConfig
|
||||||
|
*/
|
||||||
|
public void setConfig(RuntimeEnvConfig runtimeEnvConfig);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get runtime env config.
|
||||||
|
*
|
||||||
|
* @return The runtime env config.
|
||||||
|
*/
|
||||||
|
public RuntimeEnvConfig getConfig();
|
||||||
|
|
||||||
/** The builder which is used to generate a RuntimeEnv instance. */
|
/** The builder which is used to generate a RuntimeEnv instance. */
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
public RuntimeEnv build() {
|
public RuntimeEnv build() {
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
package io.ray.api.runtimeenv;
|
||||||
|
|
||||||
|
/** A POJO class used to specify configuration options for a runtime environment. */
|
||||||
|
public class RuntimeEnvConfig {
|
||||||
|
/**
|
||||||
|
* The timeout of runtime environment creation, timeout is in seconds. The value `-1` means
|
||||||
|
* disable timeout logic, except `-1`, `setup_timeout_seconds` cannot be less than or equal to 0.
|
||||||
|
* The default value of `setup_timeout_seconds` is 600 seconds.
|
||||||
|
*/
|
||||||
|
private Integer setupTimeoutSeconds = 600;
|
||||||
|
/**
|
||||||
|
* Indicates whether to install the runtime environment on the cluster at `ray.init()` time,
|
||||||
|
* before the workers are leased. This flag is set to `True` by default.
|
||||||
|
*/
|
||||||
|
private Boolean eagerInstall = true;
|
||||||
|
|
||||||
|
public RuntimeEnvConfig() {}
|
||||||
|
|
||||||
|
public RuntimeEnvConfig(int setupTimeoutSeconds, boolean eagerInstall) {
|
||||||
|
this.setupTimeoutSeconds = setupTimeoutSeconds;
|
||||||
|
this.eagerInstall = eagerInstall;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSetupTimeoutSeconds(int setupTimeoutSeconds) {
|
||||||
|
this.setupTimeoutSeconds = setupTimeoutSeconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getSetupTimeoutSeconds() {
|
||||||
|
return this.setupTimeoutSeconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEagerInstall(boolean eagerInstall) {
|
||||||
|
this.eagerInstall = eagerInstall;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean getEagerInstall() {
|
||||||
|
return eagerInstall;
|
||||||
|
}
|
||||||
|
}
|
|
@ -8,6 +8,7 @@ import com.typesafe.config.ConfigFactory;
|
||||||
import com.typesafe.config.ConfigRenderOptions;
|
import com.typesafe.config.ConfigRenderOptions;
|
||||||
import io.ray.api.id.JobId;
|
import io.ray.api.id.JobId;
|
||||||
import io.ray.api.options.ActorLifetime;
|
import io.ray.api.options.ActorLifetime;
|
||||||
|
import io.ray.api.runtimeenv.RuntimeEnvConfig;
|
||||||
import io.ray.api.runtimeenv.types.RuntimeEnvName;
|
import io.ray.api.runtimeenv.types.RuntimeEnvName;
|
||||||
import io.ray.runtime.generated.Common.WorkerType;
|
import io.ray.runtime.generated.Common.WorkerType;
|
||||||
import io.ray.runtime.runtimeenv.RuntimeEnvImpl;
|
import io.ray.runtime.runtimeenv.RuntimeEnvImpl;
|
||||||
|
@ -216,6 +217,22 @@ public class RayConfig {
|
||||||
if (config.hasPath(jarsPath)) {
|
if (config.hasPath(jarsPath)) {
|
||||||
jarUrls = config.getStringList(jarsPath);
|
jarUrls = config.getStringList(jarsPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Runtime env config
|
||||||
|
RuntimeEnvConfig runtimeEnvConfig = null;
|
||||||
|
final String timeoutPath = "ray.job.runtime-env.config.setup-timeout-seconds";
|
||||||
|
if (config.hasPath(timeoutPath)) {
|
||||||
|
runtimeEnvConfig = new RuntimeEnvConfig();
|
||||||
|
runtimeEnvConfig.setSetupTimeoutSeconds(config.getInt(timeoutPath));
|
||||||
|
}
|
||||||
|
final String eagerInstallPath = "ray.job.runtime-env.config.eager-install";
|
||||||
|
if (config.hasPath(eagerInstallPath)) {
|
||||||
|
if (runtimeEnvConfig == null) {
|
||||||
|
runtimeEnvConfig = new RuntimeEnvConfig();
|
||||||
|
}
|
||||||
|
runtimeEnvConfig.setEagerInstall(config.getBoolean(eagerInstallPath));
|
||||||
|
}
|
||||||
|
|
||||||
runtimeEnvImpl = new RuntimeEnvImpl();
|
runtimeEnvImpl = new RuntimeEnvImpl();
|
||||||
if (!envVars.isEmpty()) {
|
if (!envVars.isEmpty()) {
|
||||||
runtimeEnvImpl.set(RuntimeEnvName.ENV_VARS, envVars);
|
runtimeEnvImpl.set(RuntimeEnvName.ENV_VARS, envVars);
|
||||||
|
@ -223,6 +240,9 @@ public class RayConfig {
|
||||||
if (!jarUrls.isEmpty()) {
|
if (!jarUrls.isEmpty()) {
|
||||||
runtimeEnvImpl.set(RuntimeEnvName.JARS, jarUrls);
|
runtimeEnvImpl.set(RuntimeEnvName.JARS, jarUrls);
|
||||||
}
|
}
|
||||||
|
if (runtimeEnvConfig != null) {
|
||||||
|
runtimeEnvImpl.setConfig(runtimeEnvConfig);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
|
@ -6,6 +6,7 @@ import io.ray.api.id.ActorId;
|
||||||
import io.ray.api.id.JobId;
|
import io.ray.api.id.JobId;
|
||||||
import io.ray.api.id.TaskId;
|
import io.ray.api.id.TaskId;
|
||||||
import io.ray.api.id.UniqueId;
|
import io.ray.api.id.UniqueId;
|
||||||
|
import io.ray.api.runtimeenv.RuntimeEnv;
|
||||||
import io.ray.runtime.generated.Common.Address;
|
import io.ray.runtime.generated.Common.Address;
|
||||||
import io.ray.runtime.generated.Common.TaskSpec;
|
import io.ray.runtime.generated.Common.TaskSpec;
|
||||||
import io.ray.runtime.generated.Common.TaskType;
|
import io.ray.runtime.generated.Common.TaskType;
|
||||||
|
@ -71,6 +72,11 @@ public class LocalModeWorkerContext implements WorkerContext {
|
||||||
return Address.getDefaultInstance();
|
return Address.getDefaultInstance();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RuntimeEnv getCurrentRuntimeEnv() {
|
||||||
|
throw new RuntimeException("Not implemented.");
|
||||||
|
}
|
||||||
|
|
||||||
public void setCurrentTask(TaskSpec taskSpec) {
|
public void setCurrentTask(TaskSpec taskSpec) {
|
||||||
currentTask.set(taskSpec);
|
currentTask.set(taskSpec);
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import io.ray.api.id.ActorId;
|
||||||
import io.ray.api.id.JobId;
|
import io.ray.api.id.JobId;
|
||||||
import io.ray.api.id.TaskId;
|
import io.ray.api.id.TaskId;
|
||||||
import io.ray.api.id.UniqueId;
|
import io.ray.api.id.UniqueId;
|
||||||
|
import io.ray.api.runtimeenv.RuntimeEnv;
|
||||||
import io.ray.runtime.generated.Common.Address;
|
import io.ray.runtime.generated.Common.Address;
|
||||||
import io.ray.runtime.generated.Common.TaskType;
|
import io.ray.runtime.generated.Common.TaskType;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
@ -48,6 +49,15 @@ public class NativeWorkerContext implements WorkerContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RuntimeEnv getCurrentRuntimeEnv() {
|
||||||
|
String serialized_runtime_env = nativeGetSerializedRuntimeEnv();
|
||||||
|
if (serialized_runtime_env == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return RuntimeEnv.deserialize(serialized_runtime_env);
|
||||||
|
}
|
||||||
|
|
||||||
private static native int nativeGetCurrentTaskType();
|
private static native int nativeGetCurrentTaskType();
|
||||||
|
|
||||||
private static native ByteBuffer nativeGetCurrentTaskId();
|
private static native ByteBuffer nativeGetCurrentTaskId();
|
||||||
|
@ -59,4 +69,6 @@ public class NativeWorkerContext implements WorkerContext {
|
||||||
private static native ByteBuffer nativeGetCurrentActorId();
|
private static native ByteBuffer nativeGetCurrentActorId();
|
||||||
|
|
||||||
private static native byte[] nativeGetRpcAddress();
|
private static native byte[] nativeGetRpcAddress();
|
||||||
|
|
||||||
|
private static native String nativeGetSerializedRuntimeEnv();
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import io.ray.api.id.UniqueId;
|
||||||
import io.ray.api.runtimecontext.NodeInfo;
|
import io.ray.api.runtimecontext.NodeInfo;
|
||||||
import io.ray.api.runtimecontext.ResourceValue;
|
import io.ray.api.runtimecontext.ResourceValue;
|
||||||
import io.ray.api.runtimecontext.RuntimeContext;
|
import io.ray.api.runtimecontext.RuntimeContext;
|
||||||
|
import io.ray.api.runtimeenv.RuntimeEnv;
|
||||||
import io.ray.runtime.AbstractRayRuntime;
|
import io.ray.runtime.AbstractRayRuntime;
|
||||||
import io.ray.runtime.config.RunMode;
|
import io.ray.runtime.config.RunMode;
|
||||||
import io.ray.runtime.util.ResourceUtil;
|
import io.ray.runtime.util.ResourceUtil;
|
||||||
|
@ -102,4 +103,9 @@ public class RuntimeContextImpl implements RuntimeContext {
|
||||||
public UniqueId getCurrentNodeId() {
|
public UniqueId getCurrentNodeId() {
|
||||||
return runtime.getCurrentNodeId();
|
return runtime.getCurrentNodeId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RuntimeEnv getCurrentRuntimeEnv() {
|
||||||
|
return runtime.getWorkerContext().getCurrentRuntimeEnv();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import io.ray.api.id.ActorId;
|
||||||
import io.ray.api.id.JobId;
|
import io.ray.api.id.JobId;
|
||||||
import io.ray.api.id.TaskId;
|
import io.ray.api.id.TaskId;
|
||||||
import io.ray.api.id.UniqueId;
|
import io.ray.api.id.UniqueId;
|
||||||
|
import io.ray.api.runtimeenv.RuntimeEnv;
|
||||||
import io.ray.runtime.generated.Common.Address;
|
import io.ray.runtime.generated.Common.Address;
|
||||||
import io.ray.runtime.generated.Common.TaskType;
|
import io.ray.runtime.generated.Common.TaskType;
|
||||||
|
|
||||||
|
@ -26,4 +27,7 @@ public interface WorkerContext {
|
||||||
TaskId getCurrentTaskId();
|
TaskId getCurrentTaskId();
|
||||||
|
|
||||||
Address getRpcAddress();
|
Address getRpcAddress();
|
||||||
|
|
||||||
|
/** RuntimeEnv of the current worker or job(for driver). */
|
||||||
|
RuntimeEnv getCurrentRuntimeEnv();
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import com.google.protobuf.util.JsonFormat;
|
import com.google.protobuf.util.JsonFormat;
|
||||||
import io.ray.api.exception.RuntimeEnvException;
|
import io.ray.api.exception.RuntimeEnvException;
|
||||||
import io.ray.api.runtimeenv.RuntimeEnv;
|
import io.ray.api.runtimeenv.RuntimeEnv;
|
||||||
|
import io.ray.api.runtimeenv.RuntimeEnvConfig;
|
||||||
import io.ray.runtime.generated.RuntimeEnvCommon;
|
import io.ray.runtime.generated.RuntimeEnvCommon;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -18,15 +19,20 @@ public class RuntimeEnvImpl implements RuntimeEnv {
|
||||||
|
|
||||||
public ObjectNode runtimeEnvs = MAPPER.createObjectNode();
|
public ObjectNode runtimeEnvs = MAPPER.createObjectNode();
|
||||||
|
|
||||||
|
private static final String CONFIG_FIELD_NAME = "config";
|
||||||
|
|
||||||
public RuntimeEnvImpl() {}
|
public RuntimeEnvImpl() {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void set(String name, Object value) throws RuntimeEnvException {
|
public void set(String name, Object value) throws RuntimeEnvException {
|
||||||
|
if (CONFIG_FIELD_NAME.equals(name) && value instanceof RuntimeEnvConfig == false) {
|
||||||
|
throw new RuntimeEnvException(name + "must be instance of " + RuntimeEnvConfig.class);
|
||||||
|
}
|
||||||
JsonNode node = null;
|
JsonNode node = null;
|
||||||
try {
|
try {
|
||||||
node = MAPPER.valueToTree(value);
|
node = MAPPER.valueToTree(value);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeEnvException("Failed to set field.", e);
|
||||||
}
|
}
|
||||||
runtimeEnvs.set(name, node);
|
runtimeEnvs.set(name, node);
|
||||||
}
|
}
|
||||||
|
@ -37,7 +43,7 @@ public class RuntimeEnvImpl implements RuntimeEnv {
|
||||||
try {
|
try {
|
||||||
node = JsonLoader.fromString(jsonStr);
|
node = JsonLoader.fromString(jsonStr);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeEnvException("Failed to set json field.", e);
|
||||||
}
|
}
|
||||||
runtimeEnvs.set(name, node);
|
runtimeEnvs.set(name, node);
|
||||||
}
|
}
|
||||||
|
@ -51,7 +57,7 @@ public class RuntimeEnvImpl implements RuntimeEnv {
|
||||||
try {
|
try {
|
||||||
return MAPPER.treeToValue(jsonNode, classOfT);
|
return MAPPER.treeToValue(jsonNode, classOfT);
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeEnvException("Failed to get field.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,13 +66,18 @@ public class RuntimeEnvImpl implements RuntimeEnv {
|
||||||
try {
|
try {
|
||||||
return MAPPER.writeValueAsString(runtimeEnvs.get(name));
|
return MAPPER.writeValueAsString(runtimeEnvs.get(name));
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeEnvException("Failed to get json field.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean contains(String name) {
|
||||||
|
return runtimeEnvs.has(name);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean remove(String name) {
|
public boolean remove(String name) {
|
||||||
if (runtimeEnvs.has(name)) {
|
if (contains(name)) {
|
||||||
runtimeEnvs.remove(name);
|
runtimeEnvs.remove(name);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -78,33 +89,53 @@ public class RuntimeEnvImpl implements RuntimeEnv {
|
||||||
try {
|
try {
|
||||||
return MAPPER.writeValueAsString(runtimeEnvs);
|
return MAPPER.writeValueAsString(runtimeEnvs);
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeEnvException("Failed to serialize.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return runtimeEnvs.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String serializeToRuntimeEnvInfo() throws RuntimeEnvException {
|
public String serializeToRuntimeEnvInfo() throws RuntimeEnvException {
|
||||||
// TODO(SongGuyang): Expose runtime env config API to users.
|
RuntimeEnvCommon.RuntimeEnvInfo protoRuntimeEnvInfo = GenerateRuntimeEnvInfo();
|
||||||
|
|
||||||
|
JsonFormat.Printer printer = JsonFormat.printer();
|
||||||
|
try {
|
||||||
|
return printer.print(protoRuntimeEnvInfo);
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
throw new RuntimeEnvException("Failed to serialize to runtime env info.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConfig(RuntimeEnvConfig runtimeEnvConfig) {
|
||||||
|
set(CONFIG_FIELD_NAME, runtimeEnvConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RuntimeEnvConfig getConfig() {
|
||||||
|
if (!contains(CONFIG_FIELD_NAME)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return get(CONFIG_FIELD_NAME, RuntimeEnvConfig.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public RuntimeEnvCommon.RuntimeEnvInfo GenerateRuntimeEnvInfo() throws RuntimeEnvException {
|
||||||
String serializeRuntimeEnv = serialize();
|
String serializeRuntimeEnv = serialize();
|
||||||
RuntimeEnvCommon.RuntimeEnvInfo.Builder protoRuntimeEnvInfoBuilder =
|
RuntimeEnvCommon.RuntimeEnvInfo.Builder protoRuntimeEnvInfoBuilder =
|
||||||
RuntimeEnvCommon.RuntimeEnvInfo.newBuilder();
|
RuntimeEnvCommon.RuntimeEnvInfo.newBuilder();
|
||||||
protoRuntimeEnvInfoBuilder.setSerializedRuntimeEnv(serializeRuntimeEnv);
|
protoRuntimeEnvInfoBuilder.setSerializedRuntimeEnv(serializeRuntimeEnv);
|
||||||
JsonFormat.Printer printer = JsonFormat.printer();
|
RuntimeEnvConfig runtimeEnvConfig = getConfig();
|
||||||
try {
|
if (runtimeEnvConfig != null) {
|
||||||
return printer.print(protoRuntimeEnvInfoBuilder);
|
RuntimeEnvCommon.RuntimeEnvConfig.Builder protoRuntimeEnvConfigBuilder =
|
||||||
} catch (InvalidProtocolBufferException e) {
|
RuntimeEnvCommon.RuntimeEnvConfig.newBuilder();
|
||||||
throw new RuntimeException(e);
|
protoRuntimeEnvConfigBuilder.setSetupTimeoutSeconds(
|
||||||
}
|
runtimeEnvConfig.getSetupTimeoutSeconds());
|
||||||
}
|
protoRuntimeEnvConfigBuilder.setEagerInstall(runtimeEnvConfig.getEagerInstall());
|
||||||
|
protoRuntimeEnvInfoBuilder.setRuntimeEnvConfig(protoRuntimeEnvConfigBuilder.build());
|
||||||
public RuntimeEnvCommon.RuntimeEnvInfo GenerateRuntimeEnvInfo() throws RuntimeEnvException {
|
|
||||||
RuntimeEnvCommon.RuntimeEnvInfo.Builder protoRuntimeEnvInfoBuilder =
|
|
||||||
RuntimeEnvCommon.RuntimeEnvInfo.newBuilder();
|
|
||||||
|
|
||||||
try {
|
|
||||||
protoRuntimeEnvInfoBuilder.setSerializedRuntimeEnv(MAPPER.writeValueAsString(runtimeEnvs));
|
|
||||||
} catch (JsonProcessingException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return protoRuntimeEnvInfoBuilder.build();
|
return protoRuntimeEnvInfoBuilder.build();
|
||||||
|
|
|
@ -44,6 +44,19 @@ ray {
|
||||||
// "https://my_host/a.jar",
|
// "https://my_host/a.jar",
|
||||||
// "https://my_host/b.jar"
|
// "https://my_host/b.jar"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
// The config of runtime env.
|
||||||
|
"config": {
|
||||||
|
// The timeout of runtime environment creation, timeout is in seconds.
|
||||||
|
// The value `-1` means disable timeout logic, except `-1`, `setup_timeout_seconds`
|
||||||
|
// cannot be less than or equal to 0. The default value of `setup_timeout_seconds`
|
||||||
|
// is 600 seconds.
|
||||||
|
// setup-timeout-seconds: 600
|
||||||
|
// Indicates whether to install the runtime environment on the cluster at `ray.init()`
|
||||||
|
// time, before the workers are leased. This flag is set to `True` by default.
|
||||||
|
// eager-install: true
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The namespace of this job. It's used for isolation between jobs.
|
/// The namespace of this job. It's used for isolation between jobs.
|
||||||
|
|
|
@ -1,9 +1,12 @@
|
||||||
package io.ray.test;
|
package io.ray.test;
|
||||||
|
|
||||||
|
import static io.ray.api.runtimeenv.types.RuntimeEnvName.JARS;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import io.ray.api.ActorHandle;
|
import io.ray.api.ActorHandle;
|
||||||
import io.ray.api.Ray;
|
import io.ray.api.Ray;
|
||||||
import io.ray.api.runtimeenv.RuntimeEnv;
|
import io.ray.api.runtimeenv.RuntimeEnv;
|
||||||
|
import io.ray.api.runtimeenv.RuntimeEnvConfig;
|
||||||
import io.ray.api.runtimeenv.types.RuntimeEnvName;
|
import io.ray.api.runtimeenv.types.RuntimeEnvName;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -54,6 +57,8 @@ public class RuntimeEnvTest {
|
||||||
val = actor.task(A::getEnv, "KEY2").remote().get();
|
val = actor.task(A::getEnv, "KEY2").remote().get();
|
||||||
Assert.assertEquals(val, "B");
|
Assert.assertEquals(val, "B");
|
||||||
} finally {
|
} finally {
|
||||||
|
System.clearProperty("ray.job.runtime-env.env-vars.KEY1");
|
||||||
|
System.clearProperty("ray.job.runtime-env.env-vars.KEY2");
|
||||||
Ray.shutdown();
|
Ray.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -108,6 +113,8 @@ public class RuntimeEnvTest {
|
||||||
val = Ray.task(RuntimeEnvTest::getEnvVar, "KEY2").setRuntimeEnv(runtimeEnv).remote().get();
|
val = Ray.task(RuntimeEnvTest::getEnvVar, "KEY2").setRuntimeEnv(runtimeEnv).remote().get();
|
||||||
Assert.assertEquals(val, "B");
|
Assert.assertEquals(val, "B");
|
||||||
} finally {
|
} finally {
|
||||||
|
System.clearProperty("ray.job.runtime-env.env-vars.KEY1");
|
||||||
|
System.clearProperty("ray.job.runtime-env.env-vars.KEY2");
|
||||||
Ray.shutdown();
|
Ray.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -116,7 +123,7 @@ public class RuntimeEnvTest {
|
||||||
try {
|
try {
|
||||||
Ray.init();
|
Ray.init();
|
||||||
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder().build();
|
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder().build();
|
||||||
runtimeEnv.set(RuntimeEnvName.JARS, ImmutableList.of(url));
|
runtimeEnv.set(JARS, ImmutableList.of(url));
|
||||||
ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
|
ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
|
||||||
boolean ret = actor1.task(A::findClass, FOO_CLASS_NAME).remote().get();
|
boolean ret = actor1.task(A::findClass, FOO_CLASS_NAME).remote().get();
|
||||||
Assert.assertTrue(ret);
|
Assert.assertTrue(ret);
|
||||||
|
@ -149,7 +156,7 @@ public class RuntimeEnvTest {
|
||||||
try {
|
try {
|
||||||
Ray.init();
|
Ray.init();
|
||||||
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder().build();
|
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder().build();
|
||||||
runtimeEnv.set(RuntimeEnvName.JARS, urls);
|
runtimeEnv.set(JARS, urls);
|
||||||
boolean ret =
|
boolean ret =
|
||||||
Ray.task(RuntimeEnvTest::findClasses, classNames)
|
Ray.task(RuntimeEnvTest::findClasses, classNames)
|
||||||
.setRuntimeEnv(runtimeEnv)
|
.setRuntimeEnv(runtimeEnv)
|
||||||
|
@ -191,6 +198,8 @@ public class RuntimeEnvTest {
|
||||||
.get();
|
.get();
|
||||||
Assert.assertTrue(ret);
|
Assert.assertTrue(ret);
|
||||||
} finally {
|
} finally {
|
||||||
|
System.clearProperty("ray.job.runtime-env.jars.0");
|
||||||
|
System.clearProperty("ray.job.runtime-env.jars.1");
|
||||||
Ray.shutdown();
|
Ray.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -264,4 +273,54 @@ public class RuntimeEnvTest {
|
||||||
Ray.shutdown();
|
Ray.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRuntimeEnvContextForJob() {
|
||||||
|
System.setProperty("ray.job.runtime-env.jars.0", FOO_JAR_URL);
|
||||||
|
System.setProperty("ray.job.runtime-env.jars.1", BAR_JAR_URL);
|
||||||
|
System.setProperty("ray.job.runtime-env.config.setup-timeout-seconds", "1");
|
||||||
|
try {
|
||||||
|
Ray.init();
|
||||||
|
RuntimeEnv runtimeEnv = Ray.getRuntimeContext().getCurrentRuntimeEnv();
|
||||||
|
Assert.assertNotNull(runtimeEnv);
|
||||||
|
List<String> jars = runtimeEnv.get(JARS, List.class);
|
||||||
|
Assert.assertNotNull(jars);
|
||||||
|
Assert.assertEquals(jars.size(), 2);
|
||||||
|
Assert.assertEquals(jars.get(0), FOO_JAR_URL);
|
||||||
|
Assert.assertEquals(jars.get(1), BAR_JAR_URL);
|
||||||
|
RuntimeEnvConfig runtimeEnvConfig = runtimeEnv.getConfig();
|
||||||
|
Assert.assertNotNull(runtimeEnvConfig);
|
||||||
|
Assert.assertEquals((int) runtimeEnvConfig.getSetupTimeoutSeconds(), 1);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
System.clearProperty("ray.job.runtime-env.jars.0");
|
||||||
|
System.clearProperty("ray.job.runtime-env.jars.1");
|
||||||
|
System.clearProperty("ray.job.runtime-env.config.setup-timeout-seconds");
|
||||||
|
Ray.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Integer getRuntimeEnvTimeout() {
|
||||||
|
RuntimeEnv runtimeEnv = Ray.getRuntimeContext().getCurrentRuntimeEnv();
|
||||||
|
if (runtimeEnv != null) {
|
||||||
|
return runtimeEnv.getConfig().getSetupTimeoutSeconds();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRuntimeEnvContextForTask() {
|
||||||
|
try {
|
||||||
|
Ray.init();
|
||||||
|
RuntimeEnv currentRuntimeEnv = Ray.getRuntimeContext().getCurrentRuntimeEnv();
|
||||||
|
Assert.assertTrue(currentRuntimeEnv.isEmpty());
|
||||||
|
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder().build();
|
||||||
|
RuntimeEnvConfig runtimeEnvConfig = new RuntimeEnvConfig(1, false);
|
||||||
|
runtimeEnv.setConfig(runtimeEnvConfig);
|
||||||
|
Integer result =
|
||||||
|
Ray.task(RuntimeEnvTest::getRuntimeEnvTimeout).setRuntimeEnv(runtimeEnv).remote().get();
|
||||||
|
Assert.assertNotNull(result);
|
||||||
|
Assert.assertEquals((int) result, 1);
|
||||||
|
} finally {
|
||||||
|
Ray.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,6 +71,23 @@ Java_io_ray_runtime_context_NativeWorkerContext_nativeGetRpcAddress(JNIEnv *env,
|
||||||
return NativeStringToJavaByteArray(env, rpc_address.SerializeAsString());
|
return NativeStringToJavaByteArray(env, rpc_address.SerializeAsString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT jstring JNICALL
|
||||||
|
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetSerializedRuntimeEnv(JNIEnv *env,
|
||||||
|
jclass) {
|
||||||
|
std::string serialized_runtime_env;
|
||||||
|
if (CoreWorkerProcess::GetCoreWorker().GetWorkerType() == WorkerType::DRIVER) {
|
||||||
|
serialized_runtime_env = CoreWorkerProcess::GetCoreWorker()
|
||||||
|
.GetJobConfig()
|
||||||
|
.runtime_env_info()
|
||||||
|
.serialized_runtime_env();
|
||||||
|
} else {
|
||||||
|
serialized_runtime_env = CoreWorkerProcess::GetCoreWorker()
|
||||||
|
.GetWorkerContext()
|
||||||
|
.GetCurrentSerializedRuntimeEnv();
|
||||||
|
}
|
||||||
|
return env->NewStringUTF(serialized_runtime_env.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -71,6 +71,15 @@ Java_io_ray_runtime_context_NativeWorkerContext_nativeGetCurrentActorId(JNIEnv *
|
||||||
JNIEXPORT jbyteArray JNICALL
|
JNIEXPORT jbyteArray JNICALL
|
||||||
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetRpcAddress(JNIEnv *, jclass);
|
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetRpcAddress(JNIEnv *, jclass);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Class: io_ray_runtime_context_NativeWorkerContext
|
||||||
|
* Method: nativeGetSerializedRuntimeEnv
|
||||||
|
* Signature: ()Ljava/lang/String;
|
||||||
|
*/
|
||||||
|
JNIEXPORT jstring JNICALL
|
||||||
|
Java_io_ray_runtime_context_NativeWorkerContext_nativeGetSerializedRuntimeEnv(JNIEnv *,
|
||||||
|
jclass);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
Loading…
Add table
Reference in a new issue