[Java] Simplify Java worker configuration (#2938)

## What do these changes do?
Previously, Java worker configuration is complicated, because it requires setting environment variables as well as command-line arguments.

This PR aims to simplify Java worker's configuration. 
1) Configuration management is now migrated to [lightbend config](https://github.com/lightbend/config), thus doesn't require setting environment variables.
2) Many unused config items are removed.
3) Provide a simple `example.conf` file, so users can get started quickly.
4) All possible options and their default values are declared and documented in `ray.default.conf` file.

This PR also simplifies and refines the following code:
1) The process of `Ray.init()`.
2) `RunManager`.
3) `WorkerContext`. 

### How to use this configuration?
1. Copy `example.conf` into your classpath and rename it to `ray.conf`.
2. Modify/add your configuration items. The all items are declared in `ray.default.conf`.
3. You can also set the items in java system prosperities.

Note: configuration is read in this priority:
System properties > `ray.conf` > `ray.default.conf`

## Related issue number
N/A
This commit is contained in:
Wang Qing 2018-09-26 20:14:22 +08:00 committed by Hao Chen
parent 0e552fbb22
commit 8e8e123777
45 changed files with 865 additions and 1892 deletions

View file

@ -1,18 +1,15 @@
This directory contains the java worker, with the following components.
- java/api: Ray API definition
- java/common: utilities
- java/runtime-common: common implementation of the runtime in worker
- java/runtime-dev: a pure-java mock implementation of the runtime for
fast development
- java/runtime-native: a native implementation of the runtime
- java/test: various tests
- src/local\_scheduler/lib/java: JNI client library for local scheduler
- src/plasma/lib/java: JNI client library for plasma storage
Quick start
===========
Configuration
-------------
Ray will read your configurations in the following order:
* Java system properties: e.g., ``-Dray.home=/path/to/ray``.
* A ``ray.conf`` file in the classpath: `example <https://github.com/ray-project/ray/java/example.conf>`_.
For all available config items and default values, see `this file <https://github.com/ray-project/ray/java/runtime/src/main/resources/ray.default.conf>`_.
Starting Ray
------------

View file

@ -14,4 +14,11 @@
<description>java api for ray</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>
</project>

View file

@ -2,7 +2,6 @@ package org.ray.api;
import java.util.List;
import org.ray.api.id.UniqueId;
import org.ray.api.runtime.DefaultRayRuntimeFactory;
import org.ray.api.runtime.RayRuntime;
import org.ray.api.runtime.RayRuntimeFactory;
@ -17,7 +16,14 @@ public final class Ray extends RayCall {
* Initialize Ray runtime with the default runtime implementation.
*/
public static void init() {
init(new DefaultRayRuntimeFactory());
try {
Class clz = Class.forName("org.ray.runtime.DefaultRayRuntimeFactory");
RayRuntimeFactory factory = (RayRuntimeFactory) clz.newInstance();
init(factory);
} catch (Exception e) {
throw new RuntimeException("Failed to initialize Ray runtime.", e);
}
}
/**

View file

@ -1,22 +0,0 @@
package org.ray.api.runtime;
import java.lang.reflect.Method;
/**
* The default Ray runtime factory. It produces an instance of AbstractRayRuntime.
*/
public class DefaultRayRuntimeFactory implements RayRuntimeFactory {
@Override
public RayRuntime createRayRuntime() {
try {
Method m = Class.forName("org.ray.runtime.AbstractRayRuntime").getDeclaredMethod("init");
m.setAccessible(true);
RayRuntime runtime = (RayRuntime) m.invoke(null);
m.setAccessible(false);
return runtime;
} catch (Exception e) {
throw new RuntimeException("Failed to initialize ray runtime", e);
}
}
}

View file

@ -12,10 +12,4 @@ public class CommandStart {
@Parameter(names = "--head", description = "start the head node")
public boolean head;
@Parameter(names = "--config", description = "the config file of ray")
public String config = "";
@Parameter(names = "--overwrite", description = "the overwrite items of config")
public String overwrite = "";
}

View file

@ -1,27 +0,0 @@
package org.ray.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
/**
* Arguments for command submit.
*/
@Parameters(separators = "= ", commandDescription = "submit a job to ray cluster")
public class CommandSubmit {
@Parameter(names = "--package", description = "java jar package zip file", required = true)
public String packageZip;
@Parameter(names = "--class", description = "java class name", required = true)
public String className;
@Parameter(names = "--args", description = "arguments for the java class")
public String classArgs;
@Parameter(names = "--config", description = "the config file of ray")
public String config;
@Parameter(names = "--redis-address", description = "ip:port for redis service", required = true)
public String redisAddress;
}

View file

@ -2,20 +2,10 @@ package org.ray.cli;
import com.beust.jcommander.JCommander;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.PathConfig;
import org.ray.runtime.config.RayParameters;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.gcs.KeyValueStoreLink;
import org.ray.runtime.gcs.RedisClient;
import org.ray.runtime.gcs.StateStoreProxy;
import org.ray.runtime.gcs.StateStoreProxyImpl;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.runner.RunManager;
import org.ray.runtime.runner.worker.DefaultDriver;
import org.ray.runtime.util.config.ConfigReader;
import org.ray.runtime.util.logger.RayLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
@ -23,179 +13,69 @@ import org.ray.runtime.util.logger.RayLog;
*/
public class RayCli {
private static final Logger LOGGER = LoggerFactory.getLogger(RayCli.class);
private static RayCliArgs rayArgs = new RayCliArgs();
private static RunManager startRayHead(RayParameters params, PathConfig paths,
ConfigReader configReader) {
RunManager manager = new RunManager(params, paths, configReader);
private static RunManager startRayHead() {
RayConfig rayConfig = RayConfig.create();
RunManager manager = new RunManager(rayConfig);
try {
manager.startRayHead();
manager.startRayProcesses(true);
} catch (Exception e) {
e.printStackTrace();
RayLog.core.error("error at RayCli startRayHead", e);
throw new RuntimeException("Ray head node start failed", e);
LOGGER.error("Failed to start head node.", e);
throw new RuntimeException("Failed to start Ray head node.", e);
}
RayLog.core.info("Started Ray head node. Redis address: {}", manager.info().redisAddress);
LOGGER.info("Ray head node started. Redis address is {}", rayConfig.getRedisAddress());
return manager;
}
private static RunManager startRayNode(RayParameters params, PathConfig paths,
ConfigReader configReader) {
RunManager manager = new RunManager(params, paths, configReader);
private static RunManager startRayNode() {
RayConfig rayConfig = RayConfig.create();
RunManager manager = new RunManager(rayConfig);
try {
manager.startRayNode();
manager.startRayProcesses(false);
} catch (Exception e) {
e.printStackTrace();
RayLog.core.error("error at RayCli startRayNode", e);
throw new RuntimeException("Ray work node start failed, err = " + e.getMessage());
LOGGER.error("Failed to start work node.", e);
throw new RuntimeException("Failed to start work node.", e);
}
RayLog.core.info("Started Ray work node.");
LOGGER.info("Ray work node started.");
return manager;
}
private static RunManager startProcess(CommandStart cmdStart, ConfigReader config) {
PathConfig paths = new PathConfig(config);
RayParameters params = new RayParameters(config);
// Init RayLog before using it.
RayLog.init(params.log_dir);
RayLog.core.info("Using IP address {} for this node.", params.node_ip_address);
private static RunManager startProcess(CommandStart cmdStart) {
RunManager manager;
if (cmdStart.head) {
manager = startRayHead(params, paths, config);
manager = startRayHead();
} else {
manager = startRayNode(params, paths, config);
manager = startRayNode();
}
return manager;
}
private static void start(CommandStart cmdStart, ConfigReader reader) {
startProcess(cmdStart, reader);
private static void start(CommandStart cmdStart) {
startProcess(cmdStart);
}
private static void stop(CommandStop cmdStop) {
String[] cmd = {"/bin/sh", "-c", ""};
cmd[2] = "killall global_scheduler local_scheduler plasma_store plasma_manager";
try {
Runtime.getRuntime().exec(cmd);
} catch (IOException e) {
RayLog.core.warn("exception in killing ray processes");
}
cmd[2] = "kill $(ps aux | grep redis-server | grep -v grep | "
cmd[2] = "kill $(ps aux | grep ray | grep -v grep | "
+ "awk \'{ print $2 }\') 2> /dev/null";
try {
Runtime.getRuntime().exec(cmd);
} catch (IOException e) {
RayLog.core.warn("exception in killing ray processes");
}
cmd[2] = "kill -9 $(ps aux | grep DefaultWorker | grep -v grep | "
+ "awk \'{ print $2 }\') 2> /dev/null";
try {
Runtime.getRuntime().exec(cmd);
} catch (IOException e) {
RayLog.core.warn("exception in killing ray processes");
LOGGER.error("Exception in killing ray processes.", e);
}
}
private static String[] buildRayRuntimeArgs(CommandSubmit cmdSubmit) {
if (cmdSubmit.redisAddress == null) {
throw new RuntimeException(
"--redis-address must be specified to submit a job");
}
List<String> argList = new ArrayList<String>();
String section = "ray.java.start.";
String overwrite = "--overwrite="
+ section + "redis_address=" + cmdSubmit.redisAddress + ";"
+ section + "run_mode=" + "CLUSTER";
argList.add(overwrite);
if (cmdSubmit.config != null) {
String config = "--config=" + cmdSubmit.config;
argList.add(config);
}
String[] args = new String[argList.size()];
argList.toArray(args);
return args;
}
private static void submit(CommandSubmit cmdSubmit, String configPath) throws Exception {
ConfigReader config = new ConfigReader(configPath, "ray.java.start.deploy=true");
PathConfig paths = new PathConfig(config);
RayParameters params = new RayParameters(config);
params.redis_address = cmdSubmit.redisAddress;
params.run_mode = RunMode.CLUSTER;
KeyValueStoreLink kvStore = new RedisClient();
kvStore.setAddr(cmdSubmit.redisAddress);
StateStoreProxy stateStoreProxy = new StateStoreProxyImpl(kvStore);
stateStoreProxy.initializeGlobalState();
// Init RayLog before using it.
RayLog.init(params.log_dir);
UniqueId appId = params.driver_id;
String appDir = "/tmp/" + cmdSubmit.className;
// Start driver process.
RunManager runManager = new RunManager(params, paths, config);
Process proc = runManager.startDriver(
DefaultDriver.class.getName(),
cmdSubmit.redisAddress,
appId,
appDir,
params.node_ip_address,
cmdSubmit.className,
cmdSubmit.classArgs,
"",
null);
if (null == proc) {
RayLog.rapp.error("Failed to start driver.");
return;
}
RayLog.rapp.info("Driver started.");
}
private static String getConfigPath(String config) {
String configPath;
if (config != null && !config.equals("")) {
configPath = config;
} else {
configPath = System.getenv("RAY_CONFIG");
if (configPath == null) {
configPath = System.getProperty("ray.config");
}
if (configPath == null) {
throw new RuntimeException(
"Please set config file path in env RAY_CONFIG or property ray.config");
}
}
return configPath;
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
CommandStart cmdStart = new CommandStart();
CommandStop cmdStop = new CommandStop();
CommandSubmit cmdSubmit = new CommandSubmit();
JCommander rayCommander = JCommander.newBuilder().addObject(rayArgs)
.addCommand("start", cmdStart)
.addCommand("stop", cmdStop)
.addCommand("submit", cmdSubmit)
.build();
rayCommander.parse(args);
@ -210,21 +90,13 @@ public class RayCli {
System.exit(0);
}
String configPath;
switch (cmd) {
case "start": {
configPath = getConfigPath(cmdStart.config);
ConfigReader config = new ConfigReader(configPath, cmdStart.overwrite);
start(cmdStart, config);
}
break;
case "start":
start(cmdStart);
break;
case "stop":
stop(cmdStop);
break;
case "submit":
configPath = getConfigPath(cmdSubmit.config);
submit(cmdSubmit, configPath);
break;
default:
rayCommander.usage();
}

View file

@ -57,5 +57,4 @@ Run tests
::
# in `ray/java` directory
export RAY_CONFIG=ray.config.ini
mvn test

29
java/example.conf Normal file
View file

@ -0,0 +1,29 @@
# This is an example ray config file.
# To use this file, copy it to your classpath and rename it to 'ray.conf'.
# For all available config items and default values,
# see 'java/runtime/src/main/resources/ray.default.conf'.
# For config file format, see 'https://github.com/lightbend/config/blob/master/HOCON.md'.
ray {
// This is the path to the directory where Ray is installed, e.g.,
// something like /home/ubmutu/ray. This can be an absolute path or
// a relative path from the current working directory.
home = "/path/to/your/ray/home"
// Run mode, available options are:
//
// `SINGLE_PROCESS`: Ray is running in one single Java process, without Raylet backend,
// object store, and GCS. It's useful for debug.
// `CLUSTER`: Ray is running on one or more nodes, with multiple processes.
run-mode = CLUSTER
// Available resources on this node.
resources: "CPU:4,GPU:0"
// The address of the redis server to connect, in format `ip:port`.
// If not provided, Ray processes will be started locally, including
// Redis server, Raylet and object store.
redis.address = ""
}

View file

@ -95,6 +95,11 @@
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>

View file

@ -1,133 +0,0 @@
[ray]
ray_protocol_version = 0x0000000000000000
heartbeat_timeout_milliseconds = 1000
num_heartbeats_timeout = 100
get_timeout_milliseconds = 1000
worker_get_request_size = 1000
worker_fetch_request_size = 1000
num_connect_attempts = 50
connect_timeout_milliseconds = 1000
local_scheduler_fetch_timeout_milliseconds = 1000
local_scheduler_reconstruction_timeout_milliseconds = 1000
max_num_to_reconstruct = 1000
local_scheduler_fetch_request_size = 10000
kill_worker_timeout_milliseconds = 1000
manager_timeout_milliseconds = 1000
buf_size = 4096
max_time_for_handler_milliseconds = 1000
size_limit = 100
num_elements_limit = 1000
max_time_for_loop = 1000
redis_db_connect_retries = 50
redis_db_connect_wait_milliseconds = 1000
plasma_default_release_delay = 0
L3_cache_size_bytes = 100000000
simple_fail_over = false
;store_evict_soft_max_count = 10
[ray.java]
;network_interface = en0
[ray.java.start]
; run mode for this app SINGLE_PROCESS | SINGLE_BOX | CLUSTER
;run_mode = SINGLE_PROCESS
run_mode = SINGLE_BOX
; worker mode for this app DRIVER | WORKER | NONE
worker_mode = DRIVER
; number of workers initially started
num_workers = 2
driver_id = 0123456789abcdef0123456789abcdef01234567
redis_port = 34111
max_submit_task_buffer_size_bytes = 51200
default_first_check_timeout_ms = 1000
default_get_check_interval_ms = 5000
;jvm_parameters = -XX:+TraceClassLoading
object_store_occupied_memory_MB = 2
deploy = false
onebox_delay_seconds_before_run_app_logic = 0
static_resources = CPU:4,GPU:0
; java class which main is served as the driver in a java worker
driver_class =
; arguments for the java class main function which is served at the driver
; the arguments are separated by ','
driver_args =
[ray.java.start.job]
[ray.java.path.classes.source]
%CONFIG_FILE_DIR%/api/target/classes =
%CONFIG_FILE_DIR%/api/target/test-classes =
%CONFIG_FILE_DIR%/runtime/target/classes =
%CONFIG_FILE_DIR%/runtime/target/test-classes =
%CONFIG_FILE_DIR%/tutorial/target/classes =
%CONFIG_FILE_DIR%/test/target/classes =
%CONFIG_FILE_DIR%/test/target/test-classes =
%CONFIG_FILE_DIR%/test/lib/* =
[ray.java.path.classes.package]
%CONFIG_FILE_DIR%/api/target/ray-api-1.0.jar =
%CONFIG_FILE_DIR%/runtime/target/ray-runtime-1.0.jar =
%CONFIG_FILE_DIR%/test/target/ray-test-1.0.jar =
%CONFIG_FILE_DIR%/test/target/test-classes =
%CONFIG_FILE_DIR%/test/lib/* =
%CONFIG_FILE_DIR%/tutorial/target/ray-tutorial-1.0.jar =
[ray.java.path.classes.deploy]
%CONFIG_FILE_DIR%/java/lib/* =
[ray.java.path.jni.package]
%CONFIG_FILE_DIR%/../build/src/plasma =
%CONFIG_FILE_DIR%/../build/src/local_scheduler =
[ray.java.path.jni.deploy]
%CONFIG_FILE_DIR%/native/lib =
[ray.java.path.source]
redis_server = %CONFIG_FILE_DIR%/../build/src/common/thirdparty/redis/src/redis-server
redis_module = %CONFIG_FILE_DIR%/../build/src/common/redis_module/libray_redis_module.so
store = %CONFIG_FILE_DIR%/../build/src/plasma/plasma_store_server
raylet = %CONFIG_FILE_DIR%/../build/src/ray/raylet/raylet
python_dir = %CONFIG_FILE_DIR%/../build/
java_runtime_rewritten_jars_dir =
java_class_paths = ray.java.path.classes.source
java_jnilib_paths = ray.java.path.jni.package
[ray.java.path.package]
redis_server = %CONFIG_FILE_DIR%/../build/src/common/thirdparty/redis/src/redis-server
redis_module = %CONFIG_FILE_DIR%/../build/src/common/redis_module/libray_redis_module.so
store = %CONFIG_FILE_DIR%/../build/src/plasma/plasma_store_server
raylet = %CONFIG_FILE_DIR%/../build/src/ray/raylet/raylet
python_dir = %CONFIG_FILE_DIR%/../build/
java_runtime_rewritten_jars_dir =
java_class_paths = ray.java.path.classes.package
java_jnilib_paths = ray.java.path.jni.package
[ray.java.path.deploy]
redis_server = %CONFIG_FILE_DIR%/native/bin/redis-server
redis_module = %CONFIG_FILE_DIR%/native/lib/libray_redis_module.so
store = %CONFIG_FILE_DIR%/native/bin/plasma_store_server
raylet = %CONFIG_FILE_DIR%/native/bin/raylet
python_dir = %CONFIG_FILE_DIR%/python
java_runtime_rewritten_jars_dir = %CONFIG_FILE_DIR%/java/lib/
java_class_paths = ray.java.path.classes.deploy
java_jnilib_paths = ray.java.path.jni.deploy

View file

@ -21,6 +21,10 @@
<artifactId>ray-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View file

@ -6,17 +6,14 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.apache.commons.lang3.tuple.Pair;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.function.RayFunc;
import org.ray.api.id.UniqueId;
import org.ray.api.runtime.RayRuntime;
import org.ray.runtime.config.PathConfig;
import org.ray.runtime.config.RayParameters;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.functionmanager.FunctionManager;
import org.ray.runtime.functionmanager.RayFunction;
import org.ray.runtime.objectstore.ObjectStoreProxy;
@ -26,7 +23,6 @@ import org.ray.runtime.task.ArgumentsBuilder;
import org.ray.runtime.task.TaskSpec;
import org.ray.runtime.util.ResourceUtil;
import org.ray.runtime.util.UniqueIdHelper;
import org.ray.runtime.util.config.ConfigReader;
import org.ray.runtime.util.exception.TaskExecutionException;
import org.ray.runtime.util.logger.RayLog;
@ -35,132 +31,32 @@ import org.ray.runtime.util.logger.RayLog;
*/
public abstract class AbstractRayRuntime implements RayRuntime {
public static ConfigReader configReader;
protected static AbstractRayRuntime ins = null;
protected static RayParameters params = null;
private static boolean fromRayInit = false;
private static final int GET_TIMEOUT_MS = 1000;
private static final int FETCH_BATCH_SIZE = 1000;
protected RayConfig rayConfig;
protected WorkerContext workerContext;
protected Worker worker;
protected RayletClient rayletClient;
protected ObjectStoreProxy objectStoreProxy;
protected FunctionManager functionManager;
protected PathConfig pathConfig;
/**
* Actor ID -> local actor instance.
*/
Map<UniqueId, Object> localActors = new HashMap<>();
// app level Ray.init()
// make it private so there is no direct usage but only from Ray.init
private static AbstractRayRuntime init() {
if (ins == null) {
try {
fromRayInit = true;
AbstractRayRuntime.init(null, null);
fromRayInit = false;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Ray.init failed", e);
}
}
return ins;
}
// engine level AbstractRayRuntime.init(xx, xx)
// updateConfigStr is sth like section1.k1=v1;section2.k2=v2
public static AbstractRayRuntime init(String configPath, String updateConfigStr)
throws Exception {
if (ins == null) {
if (configPath == null) {
configPath = System.getenv("RAY_CONFIG");
if (configPath == null) {
configPath = System.getProperty("ray.config");
}
if (configPath == null) {
throw new Exception(
"Please set config file path in env RAY_CONFIG or property ray.config");
}
}
configReader = new ConfigReader(configPath, updateConfigStr);
AbstractRayRuntime.params = new RayParameters(configReader);
RayLog.init(params.log_dir);
assert RayLog.core != null;
ins = instantiate(params);
assert (ins != null);
if (!fromRayInit) {
Ray.init(); // assign Ray._impl
}
}
return ins;
}
// init with command line args
// --config=ray.config.ini --overwrite=updateConfigStr
public static AbstractRayRuntime init(String[] args) throws Exception {
String config = null;
String updateConfig = null;
for (String arg : args) {
if (arg.startsWith("--config=")) {
config = arg.substring("--config=".length());
} else if (arg.startsWith("--overwrite=")) {
updateConfig = arg.substring("--overwrite=".length());
} else {
throw new RuntimeException("Input argument " + arg
+ " is not recognized, please use --overwrite to merge it into config file");
}
}
return init(config, updateConfig);
}
protected void init(
RayletClient slink,
ObjectStoreLink plink,
PathConfig pathManager
) {
pathConfig = pathManager;
public AbstractRayRuntime(RayConfig rayConfig) {
this.rayConfig = rayConfig;
functionManager = new FunctionManager();
rayletClient = slink;
objectStoreProxy = new ObjectStoreProxy(plink);
worker = new Worker(this);
}
private static AbstractRayRuntime instantiate(RayParameters params) {
AbstractRayRuntime runtime;
if (params.run_mode.isNativeRuntime()) {
runtime = new RayNativeRuntime();
} else {
runtime = new RayDevRuntime();
}
RayLog.core
.info("Start " + runtime.getClass().getName() + " with " + params.run_mode.toString());
try {
runtime.start(params);
} catch (Exception e) {
RayLog.core.error("Failed to init RayRuntime", e);
System.exit(-1);
}
return runtime;
workerContext = new WorkerContext(rayConfig.workerMode, rayConfig.driverId);
}
/**
* start runtime.
* Start runtime.
*/
public abstract void start(RayParameters params) throws Exception;
public static AbstractRayRuntime getInstance() {
return ins;
}
public static RayParameters getParams() {
return params;
}
public abstract void start() throws Exception;
@Override
public abstract void shutdown();
@ -168,14 +64,14 @@ public abstract class AbstractRayRuntime implements RayRuntime {
@Override
public <T> RayObject<T> put(T obj) {
UniqueId objectId = UniqueIdHelper.computePutId(
WorkerContext.currentTask().taskId, WorkerContext.nextPutIndex());
workerContext.getCurrentTask().taskId, workerContext.nextPutIndex());
put(objectId, obj);
return new RayObjectImpl<>(objectId);
}
public <T> void put(UniqueId objectId, T obj) {
UniqueId taskId = WorkerContext.currentTask().taskId;
UniqueId taskId = workerContext.getCurrentTask().taskId;
RayLog.core.info("Putting object {}, for task {} ", objectId, taskId);
objectStoreProxy.put(objectId, obj, null);
}
@ -189,21 +85,21 @@ public abstract class AbstractRayRuntime implements RayRuntime {
@Override
public <T> List<T> get(List<UniqueId> objectIds) {
boolean wasBlocked = false;
UniqueId taskId = WorkerContext.currentTask().taskId;
UniqueId taskId = workerContext.getCurrentTask().taskId;
try {
int numObjectIds = objectIds.size();
// Do an initial fetch for remote objects.
List<List<UniqueId>> fetchBatches =
splitIntoBatches(objectIds, params.worker_fetch_request_size);
splitIntoBatches(objectIds, FETCH_BATCH_SIZE);
for (List<UniqueId> batch : fetchBatches) {
rayletClient.reconstructObjects(batch, true);
}
// Get the objects. We initially try to get the objects immediately.
List<Pair<T, GetStatus>> ret = objectStoreProxy
.get(objectIds, params.default_first_check_timeout_ms, false);
.get(objectIds, GET_TIMEOUT_MS, false);
assert ret.size() == numObjectIds;
// Mapping the object IDs that we haven't gotten yet to their original index in objectIds.
@ -220,14 +116,14 @@ public abstract class AbstractRayRuntime implements RayRuntime {
while (unreadys.size() > 0) {
List<UniqueId> unreadyList = new ArrayList<>(unreadys.keySet());
List<List<UniqueId>> reconstructBatches =
splitIntoBatches(unreadyList, params.worker_fetch_request_size);
splitIntoBatches(unreadyList, FETCH_BATCH_SIZE);
for (List<UniqueId> batch : reconstructBatches) {
rayletClient.reconstructObjects(batch, false);
}
List<Pair<T, GetStatus>> results = objectStoreProxy
.get(unreadyList, params.default_get_check_interval_ms, false);
.get(unreadyList, GET_TIMEOUT_MS, false);
// Remove any entries for objects we received during this iteration so we
// don't retrieve the same object twice.
@ -341,10 +237,10 @@ public abstract class AbstractRayRuntime implements RayRuntime {
*/
private TaskSpec createTaskSpec(RayFunc func, RayActorImpl actor, Object[] args,
boolean isActorCreationTask) {
final TaskSpec current = WorkerContext.currentTask();
final TaskSpec current = workerContext.getCurrentTask();
UniqueId taskId = rayletClient.generateTaskId(current.driverId,
current.taskId,
WorkerContext.nextCallIndex());
workerContext.nextCallIndex());
int numReturns = actor.getId().isNil() ? 1 : 2;
UniqueId[] returnIds = genReturnIds(taskId, numReturns);
@ -378,6 +274,10 @@ public abstract class AbstractRayRuntime implements RayRuntime {
return worker;
}
public WorkerContext getWorkerContext() {
return workerContext;
}
public RayletClient getRayletClient() {
return rayletClient;
}

View file

@ -0,0 +1,40 @@
package org.ray.runtime;
import com.google.common.base.Strings;
import java.lang.reflect.Field;
import java.util.stream.Collectors;
import org.ray.api.runtime.RayRuntime;
import org.ray.api.runtime.RayRuntimeFactory;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.util.logger.RayLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The default Ray runtime factory. It produces an instance of AbstractRayRuntime.
*/
public class DefaultRayRuntimeFactory implements RayRuntimeFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRayRuntimeFactory.class);
@Override
public RayRuntime createRayRuntime() {
RayLog.init();
RayConfig rayConfig = RayConfig.create();
try {
AbstractRayRuntime runtime;
if (rayConfig.runMode == RunMode.SINGLE_PROCESS) {
runtime = new RayDevRuntime(rayConfig);
} else {
runtime = new RayNativeRuntime(rayConfig);
}
runtime.start();
return runtime;
} catch (Exception e) {
LOGGER.error("Failed to initialize ray runtime", e);
throw new RuntimeException("Failed to initialize ray runtime", e);
}
}
}

View file

@ -1,18 +1,21 @@
package org.ray.runtime;
import org.ray.runtime.config.PathConfig;
import org.ray.runtime.config.RayParameters;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.objectstore.MockObjectStore;
import org.ray.runtime.objectstore.ObjectStoreProxy;
import org.ray.runtime.raylet.MockRayletClient;
public class RayDevRuntime extends AbstractRayRuntime {
public RayDevRuntime(RayConfig rayConfig) {
super(rayConfig);
}
@Override
public void start(RayParameters params) {
PathConfig pathConfig = new PathConfig(configReader);
MockObjectStore store = new MockObjectStore();
MockRayletClient scheduler = new MockRayletClient(this, store);
init(scheduler, store, pathConfig);
public void start() {
MockObjectStore store = new MockObjectStore(this);
objectStoreProxy = new ObjectStoreProxy(this, store);
rayletClient = new MockRayletClient(this, store);
}
@Override

View file

@ -1,148 +1,119 @@
package org.ray.runtime;
import com.google.common.base.Strings;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.apache.arrow.plasma.PlasmaClient;
import org.ray.runtime.config.PathConfig;
import org.ray.runtime.config.RayParameters;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.config.WorkerMode;
import org.ray.runtime.gcs.AddressInfo;
import org.ray.runtime.gcs.KeyValueStoreLink;
import org.ray.runtime.gcs.RedisClient;
import org.ray.runtime.gcs.StateStoreProxy;
import org.ray.runtime.gcs.StateStoreProxyImpl;
import org.ray.runtime.raylet.RayletClient;
import org.ray.runtime.objectstore.ObjectStoreProxy;
import org.ray.runtime.raylet.RayletClientImpl;
import org.ray.runtime.runner.RunManager;
import org.ray.runtime.util.logger.RayLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* native runtime for local box and cluster run.
*/
public final class RayNativeRuntime extends AbstractRayRuntime {
static {
System.err.println("Current working directory is " + System.getProperty("user.dir"));
System.loadLibrary("local_scheduler_library_java");
System.loadLibrary("plasma_java");
}
private static final Logger LOGGER = LoggerFactory.getLogger(RayNativeRuntime.class);
private StateStoreProxy stateStoreProxy;
private KeyValueStoreLink kvStore = null;
private RunManager manager = null;
public RayNativeRuntime() {
public RayNativeRuntime(RayConfig rayConfig) {
super(rayConfig);
}
private void resetLibaryPath() {
String path = System.getProperty("java.library.path");
if (Strings.isNullOrEmpty(path)) {
path = "";
} else {
path += ":";
}
path += rayConfig.libraryPath.stream().collect(Collectors.joining(":"));
// This is a hack to reset library path at runtime,
// see https://stackoverflow.com/questions/15409223/.
System.setProperty("java.library.path", path);
//set sys_paths to null so that java.library.path will be re-evalueted next time it is needed
final Field sysPathsField;
try {
sysPathsField = ClassLoader.class.getDeclaredField("sys_paths");
sysPathsField.setAccessible(true);
sysPathsField.set(null, null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
LOGGER.error("Failed to set library path.", e);
}
}
@Override
public void start(RayParameters params) throws Exception {
boolean isWorker = (params.worker_mode == WorkerMode.WORKER);
PathConfig pathConfig = new PathConfig(configReader);
// initialize params
if (params.redis_address.length() == 0) {
if (isWorker) {
throw new Error("Redis address must be configured under Worker mode.");
}
startOnebox(params, pathConfig);
initStateStore(params.redis_address);
} else {
initStateStore(params.redis_address);
if (!isWorker) {
List<AddressInfo> nodes = stateStoreProxy.getAddressInfo(
params.node_ip_address, params.redis_address, 5);
params.object_store_name = nodes.get(0).storeName;
params.raylet_socket_name = nodes.get(0).rayletSocketName;
}
public void start() throws Exception {
// Load native libraries.
try {
resetLibaryPath();
System.loadLibrary("local_scheduler_library_java");
System.loadLibrary("plasma_java");
} catch (Exception e) {
LOGGER.error("Failed to load native libraries.", e);
throw e;
}
// initialize worker context
if (params.worker_mode == WorkerMode.DRIVER) {
// TODO: The relationship between workerID, driver_id and dummy_task.driver_id should be
// recheck carefully
WorkerContext.workerID = params.driver_id;
if (rayConfig.getRedisAddress() == null) {
manager = new RunManager(rayConfig);
manager.startRayProcesses(true);
}
WorkerContext.init(params);
kvStore = new RedisClient(rayConfig.getRedisAddress());
if (params.onebox_delay_seconds_before_run_app_logic > 0) {
for (int i = 0; i < params.onebox_delay_seconds_before_run_app_logic; ++i) {
System.err.println("Pause for debugger, "
+ (params.onebox_delay_seconds_before_run_app_logic - i)
+ " seconds left ...");
Thread.sleep(1000);
}
}
ObjectStoreLink store = new PlasmaClient(rayConfig.objectStoreSocketName, "", 0);
objectStoreProxy = new ObjectStoreProxy(this, store);
if (params.worker_mode != WorkerMode.NONE) {
// initialize the links
int releaseDelay = AbstractRayRuntime.configReader
.getIntegerValue("ray", "plasma_default_release_delay", 0,
"how many release requests should be delayed in plasma client");
rayletClient = new RayletClientImpl(
rayConfig.rayletSocketName,
workerContext.getCurrentWorkerId(),
rayConfig.workerMode == WorkerMode.WORKER,
workerContext.getCurrentTask().taskId
);
ObjectStoreLink plink = new PlasmaClient(params.object_store_name, "", releaseDelay);
RayletClient rayletClient = new RayletClientImpl(
params.raylet_socket_name,
WorkerContext.currentWorkerId(),
isWorker,
WorkerContext.currentTask().taskId
);
// register
registerWorker();
init(rayletClient, plink, pathConfig);
// register
registerWorker(isWorker, params.node_ip_address, params.object_store_name,
params.raylet_socket_name);
}
RayLog.core.info("RayNativeRuntime started with store {}, raylet {}",
params.object_store_name, params.raylet_socket_name);
LOGGER.info("RayNativeRuntime started with store {}, raylet {}",
rayConfig.objectStoreSocketName, rayConfig.rayletSocketName);
}
@Override
public void shutdown() {
if (null != manager) {
manager.cleanup(true);
manager.cleanup();
}
}
private void startOnebox(RayParameters params, PathConfig paths) throws Exception {
params.cleanup = true;
manager = new RunManager(params, paths, AbstractRayRuntime.configReader);
manager.startRayHead();
params.redis_address = manager.info().redisAddress;
params.object_store_name = manager.info().localStores.get(0).storeName;
params.raylet_socket_name = manager.info().localStores.get(0).rayletSocketName;
//params.node_ip_address = NetworkUtil.getIpAddress();
}
private void initStateStore(String redisAddress) throws Exception {
kvStore = new RedisClient();
kvStore.setAddr(redisAddress);
stateStoreProxy = new StateStoreProxyImpl(kvStore);
stateStoreProxy.initializeGlobalState();
}
private void registerWorker(boolean isWorker, String nodeIpAddress, String storeName,
String rayletSocketName) {
private void registerWorker() {
Map<String, String> workerInfo = new HashMap<>();
String workerId = new String(WorkerContext.currentWorkerId().getBytes());
if (!isWorker) {
workerInfo.put("node_ip_address", nodeIpAddress);
String workerId = new String(workerContext.getCurrentWorkerId().getBytes());
if (rayConfig.workerMode == WorkerMode.DRIVER) {
workerInfo.put("node_ip_address", rayConfig.nodeIp);
workerInfo.put("driver_id", workerId);
workerInfo.put("start_time", String.valueOf(System.currentTimeMillis()));
workerInfo.put("plasma_store_socket", storeName);
workerInfo.put("raylet_socket", rayletSocketName);
workerInfo.put("plasma_store_socket", rayConfig.objectStoreSocketName);
workerInfo.put("raylet_socket", rayConfig.rayletSocketName);
workerInfo.put("name", System.getProperty("user.dir"));
//TODO: worker.redis_client.hmset(b"Drivers:" + worker.workerId, driver_info)
kvStore.hmset("Drivers:" + workerId, workerInfo);
} else {
workerInfo.put("node_ip_address", nodeIpAddress);
workerInfo.put("plasma_store_socket", storeName);
workerInfo.put("raylet_socket", rayletSocketName);
workerInfo.put("node_ip_address", rayConfig.nodeIp);
workerInfo.put("plasma_store_socket", rayConfig.objectStoreSocketName);
workerInfo.put("raylet_socket", rayConfig.rayletSocketName);
//TODO: b"Workers:" + worker.workerId,
kvStore.hmset("Workers:" + workerId, workerInfo);
}

View file

@ -44,7 +44,8 @@ public class Worker {
RayFunction rayFunction = runtime.getFunctionManager()
.getFunction(spec.driverId, spec.functionDescriptor);
// Set context
WorkerContext.prepare(spec, rayFunction.classLoader);
runtime.getWorkerContext().setCurrentTask(spec);
runtime.getWorkerContext().setCurrentClassLoader(rayFunction.classLoader);
Thread.currentThread().setContextClassLoader(rayFunction.classLoader);
// Get local actor object and arguments.
Object actor = spec.isActorTask() ? runtime.localActors.get(spec.actorId) : null;

View file

@ -1,42 +1,80 @@
package org.ray.runtime;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.RayParameters;
import org.ray.runtime.config.WorkerMode;
import org.ray.runtime.task.TaskSpec;
public class WorkerContext {
private static final ThreadLocal<WorkerContext> currentWorkerCtx =
ThreadLocal.withInitial(() -> init(AbstractRayRuntime.getParams()));
/**
* id of worker.
* Worker id.
*/
public static UniqueId workerID = UniqueId.randomId();
private UniqueId workerId;
/**
* current doing task.
* Current task.
*/
private TaskSpec currentTask;
/**
* current app classloader.
* Current class loader.
*/
private ClassLoader currentClassLoader;
/**
* how many puts done by current task.
* How many puts have been done by current task.
*/
private int currentTaskPutCount;
/**
* how many calls done by current task.
* How many calls have been done by current task.
*/
private int currentTaskCallCount;
public static WorkerContext init(RayParameters params) {
WorkerContext ctx = new WorkerContext();
currentWorkerCtx.set(ctx);
public WorkerContext(WorkerMode workerMode, UniqueId driverId) {
workerId = workerMode == WorkerMode.DRIVER ? driverId : UniqueId.randomId();
currentTaskPutCount = 0;
currentTaskCallCount = 0;
currentClassLoader = null;
currentTask = createDummyTask(workerMode, driverId);
}
TaskSpec dummy = new TaskSpec(
params.driver_id,
params.worker_mode == WorkerMode.DRIVER ? UniqueId.randomId() : UniqueId.NIL,
public void setWorkerId(UniqueId workerId) {
this.workerId = workerId;
}
public TaskSpec getCurrentTask() {
return currentTask;
}
public int nextPutIndex() {
return ++currentTaskPutCount;
}
public int nextCallIndex() {
return ++currentTaskCallCount;
}
public UniqueId getCurrentWorkerId() {
return workerId;
}
public ClassLoader getCurrentClassLoader() {
return currentClassLoader;
}
public void setCurrentTask(TaskSpec currentTask) {
this.currentTask = currentTask;
}
public void setCurrentClassLoader(ClassLoader currentClassLoader) {
this.currentClassLoader = currentClassLoader;
}
private TaskSpec createDummyTask(WorkerMode workerMode, UniqueId driverId) {
return new TaskSpec(
driverId,
workerMode == WorkerMode.DRIVER ? UniqueId.randomId() : UniqueId.NIL,
UniqueId.NIL,
0,
UniqueId.NIL,
@ -46,42 +84,6 @@ public class WorkerContext {
null,
null,
null,
null
);
prepare(dummy, null);
return ctx;
}
public static void prepare(TaskSpec task, ClassLoader classLoader) {
WorkerContext wc = get();
wc.currentTask = task;
wc.currentTaskPutCount = 0;
wc.currentTaskCallCount = 0;
wc.currentClassLoader = classLoader;
}
public static WorkerContext get() {
return currentWorkerCtx.get();
}
public static TaskSpec currentTask() {
return get().currentTask;
}
public static int nextPutIndex() {
return ++get().currentTaskPutCount;
}
public static int nextCallIndex() {
return ++get().currentTaskCallCount;
}
public static UniqueId currentWorkerId() {
return WorkerContext.workerID;
}
public static ClassLoader currentClassLoader() {
return get().currentClassLoader;
null);
}
}

View file

@ -1,57 +0,0 @@
package org.ray.runtime.config;
import org.ray.runtime.util.config.AConfig;
import org.ray.runtime.util.config.ConfigReader;
/**
* Path related configurations.
*/
public class PathConfig {
@AConfig(comment = "additional class path for JAVA",
defaultArrayIndirectSectionName = "ray.java.path.classes.source")
public String[] java_class_paths;
@AConfig(comment = "additional JNI library paths for JAVA",
defaultArrayIndirectSectionName = "ray.java.path.jni.build")
public String[] java_jnilib_paths;
@AConfig(comment = "path to ray_functions.txt for the default rewritten functions in ray runtime")
public String java_runtime_rewritten_jars_dir = "";
@AConfig(comment = "path to redis-server")
public String redis_server;
@AConfig(comment = "path to redis module")
public String redis_module;
@AConfig(comment = "path to plasma storage")
public String store;
@AConfig(comment = "path to raylet")
public String raylet;
@AConfig(comment = "path to python directory")
public String python_dir;
@AConfig(comment = "path to log server")
public String log_server;
@AConfig(comment = "path to log server config file")
public String log_server_config;
public PathConfig(ConfigReader config) {
if (config.getBooleanValue("ray.java.start", "deploy", false,
"whether the package is used as a cluster deployment")) {
config.readObject("ray.java.path.deploy", this, this);
} else {
boolean isJar = this.getClass().getResource(this.getClass().getSimpleName() + ".class")
.getFile().split("!")[0].endsWith(".jar");
if (isJar) {
config.readObject("ray.java.path.package", this, this);
} else {
config.readObject("ray.java.path.source", this, this);
}
}
}
}

View file

@ -0,0 +1,227 @@
package org.ray.runtime.config;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import java.util.List;
import java.util.Map;
import org.ray.api.id.UniqueId;
import org.ray.runtime.util.NetworkUtil;
import org.ray.runtime.util.ResourceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Configurations of Ray runtime.
* See `ray.default.conf` for the meaning of each field.
*/
public class RayConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RayConfig.class);
public static final String DEFAULT_CONFIG_FILE = "ray.default.conf";
public static final String CUSTOM_CONFIG_FILE = "ray.conf";
public final String rayHome;
public final String nodeIp;
public final WorkerMode workerMode;
public final RunMode runMode;
public final Map<String, Double> resources;
public final UniqueId driverId;
public final String logDir;
public final boolean redirectOutput;
public final List<String> libraryPath;
public final List<String> classpath;
private String redisAddress;
private String redisIp;
private Integer redisPort;
public final int headRedisPort;
public final int numberRedisShards;
public final String objectStoreSocketName;
public final Long objectStoreSize;
public final String rayletSocketName;
public final String redisServerExecutablePath;
public final String redisModulePath;
public final String plasmaStoreExecutablePath;
public final String rayletExecutablePath;
private void validate() {
if (workerMode == WorkerMode.WORKER) {
Preconditions.checkArgument(redisAddress != null,
"Redis address must be set in worker mode.");
} else {
Preconditions.checkArgument(!rayHome.isEmpty(),
"'ray.home' must be set in driver mode");
}
}
private String removeTrailingSlash(String path) {
if (path.endsWith("/")) {
return path.substring(0, path.length() - 1);
} else {
return path;
}
}
public RayConfig(Config config) {
// worker mode
WorkerMode localWorkerMode;
try {
localWorkerMode = config.getEnum(WorkerMode.class, "ray.worker.mode");
} catch (ConfigException.Missing e) {
localWorkerMode = WorkerMode.DRIVER;
}
workerMode = localWorkerMode;
boolean isDriver = workerMode == WorkerMode.DRIVER;
// run mode
runMode = config.getEnum(RunMode.class, "ray.run-mode");
// ray home
String rayHome = config.getString("ray.home");
if (!rayHome.startsWith("/")) {
// If ray.home isn't an absolute path, prepend it with current work dir.
rayHome = System.getProperty("user.dir") + "/" + rayHome;
}
this.rayHome = removeTrailingSlash(rayHome);
// node ip
String nodeIp = config.getString("ray.node-ip");
if (nodeIp.isEmpty()) {
nodeIp = NetworkUtil.getIpAddress(null);
}
this.nodeIp = nodeIp;
// resources
resources = ResourceUtil.getResourcesMapFromString(
config.getString("ray.resources"));
if (isDriver) {
if (!resources.containsKey("CPU")) {
int numCpu = Runtime.getRuntime().availableProcessors();
LOGGER.warn("No CPU resource is set in configuration, "
+ "setting it to the number of CPU cores: {}", numCpu);
resources.put("CPU", numCpu * 1.0);
}
if (!resources.containsKey("GPU")) {
LOGGER.warn("No GPU resource is set in configuration, setting it to 0");
resources.put("GPU", 0.0);
}
}
// driver id
String driverId = config.getString("ray.driver.id");
if (!driverId.isEmpty()) {
this.driverId = UniqueId.fromHexString(driverId);
} else {
this.driverId = UniqueId.randomId();
}
// log dir
logDir = removeTrailingSlash(config.getString("ray.log-dir"));
// redirect output
redirectOutput = config.getBoolean("ray.redirect-output");
// custom library path
List<String> customLibraryPath = config.getStringList("ray.library.path");
// custom classpath
classpath = config.getStringList("ray.classpath");
// redis configurations
String redisAddress = config.getString("ray.redis.address");
if (!redisAddress.isEmpty()) {
setRedisAddress(redisAddress);
} else {
this.redisAddress = null;
}
headRedisPort = config.getInt("ray.redis.head-port");
numberRedisShards = config.getInt("ray.redis.shard-number");
// object store configurations
objectStoreSocketName = config.getString("ray.object-store.socket-name");
objectStoreSize = config.getBytes("ray.object-store.size");
// raylet socket name
rayletSocketName = config.getString("ray.raylet.socket-name");
// library path
this.libraryPath = new ImmutableList.Builder<String>().add(
rayHome + "/build/src/plasma",
rayHome + "/build/src/local_scheduler"
).addAll(customLibraryPath).build();
redisServerExecutablePath = rayHome + "/build/src/common/thirdparty/redis/src/redis-server";
redisModulePath = rayHome + "/build/src/common/redis_module/libray_redis_module.so";
plasmaStoreExecutablePath = rayHome + "/build/src/plasma/plasma_store_server";
rayletExecutablePath = rayHome + "/build/src/ray/raylet/raylet";
// validate config
validate();
LOGGER.debug("Created config: {}", this);
}
public void setRedisAddress(String redisAddress) {
Preconditions.checkNotNull(redisAddress);
Preconditions.checkState(this.redisAddress == null, "Redis address was already set");
this.redisAddress = redisAddress;
String[] ipAndPort = redisAddress.split(":");
Preconditions.checkArgument(ipAndPort.length == 2, "Invalid redis address.");
this.redisIp = ipAndPort[0];
this.redisPort = Integer.parseInt(ipAndPort[1]);
}
public String getRedisAddress() {
return redisAddress;
}
public String getRedisIp() {
return redisIp;
}
public Integer getRedisPort() {
return redisPort;
}
@Override
public String toString() {
return "RayConfig{"
+ "rayHome='" + rayHome + '\''
+ ", nodeIp='" + nodeIp + '\''
+ ", workerMode=" + workerMode
+ ", runMode=" + runMode
+ ", resources=" + resources
+ ", driverId=" + driverId
+ ", logDir='" + logDir + '\''
+ ", redirectOutput=" + redirectOutput
+ ", libraryPath=" + libraryPath
+ ", classpath=" + classpath
+ ", redisAddress='" + redisAddress + '\''
+ ", redisIp='" + redisIp + '\''
+ ", redisPort=" + redisPort
+ ", headRedisPort=" + headRedisPort
+ ", numberRedisShards=" + numberRedisShards
+ ", objectStoreSocketName='" + objectStoreSocketName + '\''
+ ", objectStoreSize=" + objectStoreSize
+ ", rayletSocketName='" + rayletSocketName + '\''
+ ", redisServerExecutablePath='" + redisServerExecutablePath + '\''
+ ", plasmaStoreExecutablePath='" + plasmaStoreExecutablePath + '\''
+ ", rayletExecutablePath='" + rayletExecutablePath + '\''
+ '}';
}
/**
* Create a RayConfig by reading configuration in the following order:
* 1. System properties.
* 2. `ray.conf` file.
* 3. `ray.default.conf` file.
*/
public static RayConfig create() {
ConfigFactory.invalidateCaches();
Config config = ConfigFactory.systemProperties()
.withFallback(ConfigFactory.load(CUSTOM_CONFIG_FILE))
.withFallback(ConfigFactory.load(DEFAULT_CONFIG_FILE));
return new RayConfig(config);
}
}

View file

@ -1,106 +0,0 @@
package org.ray.runtime.config;
import org.ray.api.id.UniqueId;
import org.ray.runtime.util.NetworkUtil;
import org.ray.runtime.util.config.AConfig;
import org.ray.runtime.util.config.ConfigReader;
/**
* Runtime parameters of Ray process.
*/
public class RayParameters {
@AConfig(comment = "worker mode for this process DRIVER | WORKER | NONE")
public WorkerMode worker_mode = WorkerMode.DRIVER;
@AConfig(comment = "run mode for this app SINGLE_PROCESS | SINGLE_BOX | CLUSTER")
public RunMode run_mode = RunMode.SINGLE_PROCESS;
@AConfig(comment = "local node ip")
public String node_ip_address = NetworkUtil.getIpAddress(null);
@AConfig(comment = "primary redis address (e.g., 127.0.0.1:34222")
public String redis_address = "";
@AConfig(comment = "object store name (e.g., /tmp/store1111")
public String object_store_name = "";
@AConfig(comment = "object store rpc listen port")
public int object_store_rpc_port = 32567;
@AConfig(comment = "driver ID when the worker is served as a driver")
public UniqueId driver_id = UniqueId.NIL;
@AConfig(comment = "logging directory")
public String log_dir = "/tmp/raylogs";
@AConfig(comment = "primary redis port")
public int redis_port = 34222;
@AConfig(comment = "number of workers started initially")
public int num_workers = 1;
@AConfig(comment = "redirect err and stdout to files for newly created processes")
public boolean redirect = true;
@AConfig(comment = "whether to start redis shard server in addition to the primary server")
public boolean start_redis_shards = false;
@AConfig(comment = "whether to clean up the processes when there is a process start failure")
public boolean cleanup = false;
@AConfig(comment = "number of redis shard servers to be started")
public int num_redis_shards = 0;
@AConfig(comment = "whether this is a deployment in cluster")
public boolean deploy = false;
@AConfig(comment = "whether this is for python deployment")
public boolean py = false;
@AConfig(comment = "the max bytes of the buffer for task submit")
public int max_submit_task_buffer_size_bytes = 2 * 1024 * 1024;
@AConfig(comment = "default first check timeout(ms)")
public int default_first_check_timeout_ms = 1000;
@AConfig(comment = "default get check rate(ms)")
public int default_get_check_interval_ms = 5000;
@AConfig(comment = "add the jvm parameters for java worker")
public String jvm_parameters = "";
@AConfig(comment = "set the occupied memory(MB) size of object store")
public int object_store_occupied_memory_MB = 1000;
@AConfig(comment = "whether to use supreme failover strategy")
public boolean supremeFO = false;
@AConfig(comment = "whether to disable process failover")
public boolean disable_process_failover = false;
@AConfig(comment = "delay seconds under onebox before app logic for debugging")
public int onebox_delay_seconds_before_run_app_logic = 0;
@AConfig(comment = "raylet socket name (e.g., /tmp/raylet1111")
public String raylet_socket_name = "";
@AConfig(comment = "raylet rpc listen port")
public int raylet_port = 35567;
@AConfig(comment = "worker fetch request size")
public int worker_fetch_request_size = 10000;
@AConfig(comment = "static resource list of this node")
public String static_resources = "";
public RayParameters(ConfigReader config) {
if (null != config) {
String networkInterface = config.getStringValue("ray.java", "network_interface", null,
"Network interface to be specified for host ip address(e.g., en0, eth0), may use "
+ "ifconfig to get options");
node_ip_address = NetworkUtil.getIpAddress(networkInterface);
config.readObject("ray.java.start", this, this);
}
}
}

View file

@ -1,39 +1,15 @@
package org.ray.runtime.config;
public enum RunMode {
SINGLE_PROCESS(true, false), // dev path, dev runtime
SINGLE_BOX(true, true), // dev path, native runtime
CLUSTER(false, true); // deploy path, naive runtime
RunMode(boolean devPathManager,
boolean nativeRuntime) {
this.devPathManager = devPathManager;
this.nativeRuntime = nativeRuntime;
}
/**
* the jar has add to java -cp, no need to load jar after started.
* Ray is running in one single Java process, without Raylet backend, object store, and GCS.
* It's useful for debug.
*/
private final boolean devPathManager;
private final boolean nativeRuntime;
SINGLE_PROCESS,
/**
* Getter method for property <tt>devPathManager</tt>.
*
* @return property value of devPathManager
* Ray is running on one or more nodes, with multiple processes.
*/
public boolean isDevPathManager() {
return devPathManager;
}
/**
* Getter method for property <tt>nativeRuntime</tt>.
*
* @return property value of nativeRuntime
*/
public boolean isNativeRuntime() {
return nativeRuntime;
}
}
CLUSTER,
}

View file

@ -1,7 +1,6 @@
package org.ray.runtime.config;
public enum WorkerMode {
NONE, // not set
DRIVER, // driver
WORKER // worker
DRIVER,
WORKER
}

View file

@ -7,7 +7,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.ray.api.id.UniqueId;
import org.ray.runtime.WorkerContext;
import org.ray.runtime.RayDevRuntime;
import org.ray.runtime.raylet.MockRayletClient;
import org.ray.runtime.util.logger.RayLog;
@ -16,10 +16,15 @@ import org.ray.runtime.util.logger.RayLog;
*/
public class MockObjectStore implements ObjectStoreLink {
private final RayDevRuntime runtime;
private final Map<UniqueId, byte[]> data = new ConcurrentHashMap<>();
private final Map<UniqueId, byte[]> metadata = new ConcurrentHashMap<>();
private MockRayletClient scheduler = null;
public MockObjectStore(RayDevRuntime runtime) {
this.runtime = runtime;
}
@Override
public void put(byte[] objectId, byte[] value, byte[] metadataValue) {
if (objectId == null || objectId.length == 0 || value == null) {
@ -87,7 +92,7 @@ public class MockObjectStore implements ObjectStoreLink {
}
private String logPrefix() {
return WorkerContext.currentTask().taskId + "-" + getUserTrace() + " -> ";
return runtime.getWorkerContext().getCurrentTask().taskId + "-" + getUserTrace() + " -> ";
}
private String getUserTrace() {

View file

@ -5,7 +5,7 @@ import java.util.List;
import org.apache.arrow.plasma.ObjectStoreLink;
import org.apache.commons.lang3.tuple.Pair;
import org.ray.api.id.UniqueId;
import org.ray.runtime.WorkerContext;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.util.Serializer;
import org.ray.runtime.util.exception.TaskExecutionException;
@ -15,12 +15,14 @@ import org.ray.runtime.util.exception.TaskExecutionException;
*/
public class ObjectStoreProxy {
private final AbstractRayRuntime runtime;
private final ObjectStoreLink store;
private final int getTimeoutMs = 1000;
public ObjectStoreProxy(ObjectStoreLink store) {
public ObjectStoreProxy(AbstractRayRuntime runtime, ObjectStoreLink store) {
this.runtime = runtime;
this.store = store;
}
}
public <T> Pair<T, GetStatus> get(UniqueId objectId, boolean isMetadata)
throws TaskExecutionException {
@ -31,7 +33,7 @@ public class ObjectStoreProxy {
throws TaskExecutionException {
byte[] obj = store.get(id.getBytes(), timeoutMs, isMetadata);
if (obj != null) {
T t = Serializer.decode(obj, WorkerContext.currentClassLoader());
T t = Serializer.decode(obj, runtime.getWorkerContext().getCurrentClassLoader());
store.release(id.getBytes());
if (t instanceof TaskExecutionException) {
throw (TaskExecutionException) t;
@ -54,7 +56,7 @@ public class ObjectStoreProxy {
for (int i = 0; i < objs.size(); i++) {
byte[] obj = objs.get(i);
if (obj != null) {
T t = Serializer.decode(obj, WorkerContext.currentClassLoader());
T t = Serializer.decode(obj, runtime.getWorkerContext().getCurrentClassLoader());
store.release(ids.get(i).getBytes());
if (t instanceof TaskExecutionException) {
throw (TaskExecutionException) t;

View file

@ -1,13 +0,0 @@
package org.ray.runtime.runner;
public class ProcessInfo {
public Process process;
public String[] cmd;
public RunInfo.ProcessType type;
public String name;
public String redisAddress;
public String ip;
public boolean redirect;
public boolean cleanup;
}

View file

@ -1,45 +0,0 @@
package org.ray.runtime.runner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.ray.runtime.gcs.AddressInfo;
/**
* information of kinds of processes.
*/
public class RunInfo {
public String redisAddress;
public List<String> redisShards;
public List<AddressInfo> localStores = new ArrayList<>();
public ArrayList<List<ProcessInfo>> allProcesses = initProcessInfoArray();
public ArrayList<List<Process>> toBeCleanedProcesses = initProcessArray();
public ArrayList<ProcessInfo> deadProcess = new ArrayList<>();
private ArrayList<List<Process>> initProcessArray() {
ArrayList<List<Process>> processes = new ArrayList<>();
for (ProcessType ignored : ProcessType.values()) {
processes.add(Collections.synchronizedList(new ArrayList<>()));
}
return processes;
}
private ArrayList<List<ProcessInfo>> initProcessInfoArray() {
ArrayList<List<ProcessInfo>> processes = new ArrayList<>();
for (ProcessType ignored : ProcessType.values()) {
processes.add(Collections.synchronizedList(new ArrayList<>()));
}
return processes;
}
public enum ProcessType {
PT_WORKER,
PT_PLASMA_STORE,
PT_REDIS_SERVER,
PT_WEB_UI,
PT_RAYLET,
PT_DRIVER
}
}

View file

@ -1,25 +1,23 @@
package org.ray.runtime.runner;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.PathConfig;
import org.ray.runtime.config.RayParameters;
import org.ray.runtime.gcs.AddressInfo;
import org.ray.runtime.runner.RunInfo.ProcessType;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.util.FileUtil;
import org.ray.runtime.util.ResourceUtil;
import org.ray.runtime.util.StringUtil;
import org.ray.runtime.util.config.ConfigReader;
import org.ray.runtime.util.logger.RayLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
/**
@ -27,501 +25,224 @@ import redis.clients.jedis.Jedis;
*/
public class RunManager {
private static final Logger LOGGER = LoggerFactory.getLogger(RunManager.class);
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("Y-m-d_H-M-S");
DateTimeFormatter.ofPattern("Y-M-d_H-m-s");
private RayParameters params;
private static final String WORKER_CLASS = "org.ray.runtime.runner.worker.DefaultWorker";
private PathConfig paths;
private RayConfig rayConfig;
private ConfigReader configReader;
private Random random;
private RunInfo runInfo = new RunInfo();
private List<Process> processes;
private Random random = new Random();
public RunManager(RayParameters params, PathConfig paths, ConfigReader configReader) {
this.params = params;
this.paths = paths;
this.configReader = configReader;
public RunManager(RayConfig rayConfig) {
this.rayConfig = rayConfig;
processes = new ArrayList<>();
random = new Random();
}
private static boolean killProcess(Process p) {
if (p.isAlive()) {
public void cleanup() {
for (Process p : processes) {
p.destroy();
return true;
} else {
return false;
}
}
public RunInfo info() {
return runInfo;
private void createTempDirs() {
FileUtil.mkDir(new File(rayConfig.logDir));
FileUtil.mkDir(new File(rayConfig.rayletSocketName).getParentFile());
FileUtil.mkDir(new File(rayConfig.objectStoreSocketName).getParentFile());
}
public void startRayHead() throws Exception {
if (params.redis_address.length() != 0) {
throw new Exception("Redis address must be empty in head node.");
}
if (params.num_redis_shards <= 0) {
params.num_redis_shards = 1;
/**
* Start a process.
* @param command The command to start the process with.
* @param env Environment variables.
* @param name Process name.
*/
private void startProcess(List<String> command, Map<String, String> env, String name) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Starting process {} with command: {}", name, command,
Joiner.on(" ").join(command));
}
params.start_redis_shards = true;
ProcessBuilder builder = new ProcessBuilder(command);
startRayProcesses();
}
public void startRayNode() throws Exception {
if (params.redis_address.length() == 0) {
throw new Exception("Redis address cannot be empty in non-head node.");
}
if (params.num_redis_shards != 0) {
throw new Exception("Number of redis shards should be zero in non-head node.");
}
params.start_redis_shards = false;
startRayProcesses();
}
public Process startDriver(String mainClass, String redisAddress, UniqueId driverId,
String logDir, String ip,
String driverClass, String driverArgs, String additonalClassPaths,
String additionalConfigs) {
String driverConfigs =
"ray.java.start.driver_id=" + driverId + ";ray.java.start.driver_class=" + driverClass;
if (driverArgs != null) {
driverConfigs += ";ray.java.start.driver_args=" + driverArgs;
}
if (null != additionalConfigs) {
additionalConfigs += ";" + driverConfigs;
} else {
additionalConfigs = driverConfigs;
}
return startJavaProcess(
RunInfo.ProcessType.PT_DRIVER,
mainClass,
additonalClassPaths,
additionalConfigs,
"",
ip,
redisAddress,
false,
false,
null
);
}
private Process startJavaProcess(RunInfo.ProcessType pt, String mainClass,
String additonalClassPaths, String additionalConfigs,
String additionalJvmArgs, String ip, String
redisAddr, boolean redirect,
boolean cleanup, String agentlibAddr) {
String cmd = buildJavaProcessCommand(pt, mainClass, additonalClassPaths, additionalConfigs,
additionalJvmArgs, ip, redisAddr, agentlibAddr);
return startProcess(cmd.split(" "), null, pt, "", redisAddr, ip, redirect, cleanup);
}
private String buildJavaProcessCommand(
RunInfo.ProcessType pt, String mainClass, String additionalClassPaths,
String additionalConfigs,
String additionalJvmArgs, String ip, String redisAddr, String agentlibAddr) {
String cmd = "java -ea -noverify " + params.jvm_parameters + " ";
if (agentlibAddr != null && !agentlibAddr.equals("")) {
cmd += " -agentlib:jdwp=transport=dt_socket,address=" + agentlibAddr + ",server=y,suspend=n";
}
cmd += " -Djava.library.path=" + StringUtil.mergeArray(paths.java_jnilib_paths, ":");
cmd += " -classpath " + StringUtil.mergeArray(paths.java_class_paths, ":");
if (additionalClassPaths.length() > 0) {
cmd += ":" + additionalClassPaths;
}
if (additionalJvmArgs.length() > 0) {
cmd += " " + additionalJvmArgs;
}
cmd += " " + mainClass;
String section = "ray.java.start.";
cmd += " --config=" + configReader.filePath();
cmd += " --overwrite="
+ section + "node_ip_address=" + ip + ";"
+ section + "redis_address=" + redisAddr + ";"
+ section + "log_dir=" + params.log_dir + ";"
+ section + "run_mode=" + params.run_mode;
if (additionalConfigs.length() > 0) {
cmd += ";" + additionalConfigs;
}
return cmd;
}
private Process startProcess(String[] cmd, Map<String, String> env, RunInfo.ProcessType type,
String name,
String redisAddress, String ip, boolean redirect,
boolean cleanup) {
ProcessBuilder builder;
List<String> newCommand = Arrays.asList(cmd);
builder = new ProcessBuilder(newCommand);
if (redirect) {
if (rayConfig.redirectOutput) {
// Set stdout and stderr paths.
int logId = random.nextInt(10000);
String date = DATE_TIME_FORMATTER.format(LocalDateTime.now());
String stdout = String.format("%s/%s-%s-%05d.out", params.log_dir, name, date, logId);
String stderr = String.format("%s/%s-%s-%05d.err", params.log_dir, name, date, logId);
String stdout = String.format("%s/%s-%s-%05d.out", rayConfig.logDir, name, date, logId);
String stderr = String.format("%s/%s-%s-%05d.err", rayConfig.logDir, name, date, logId);
builder.redirectOutput(new File(stdout));
builder.redirectError(new File(stderr));
recordLogFilesInRedis(redisAddress, ip, ImmutableList.of(stdout, stderr));
}
// Set environment variables.
if (env != null && !env.isEmpty()) {
builder.environment().putAll(env);
}
Process p = null;
Process p;
try {
p = builder.start();
} catch (IOException e) {
RayLog.core.error("Failed to start process {}", name, e);
return null;
LOGGER.error("Failed to start process " + name, e);
throw new RuntimeException("Failed to start process " + name, e);
}
RayLog.core.info("Process {} started", name);
if (cleanup) {
runInfo.toBeCleanedProcesses.get(type.ordinal()).add(p);
}
ProcessInfo processInfo = new ProcessInfo();
processInfo.cmd = cmd;
processInfo.type = type;
processInfo.name = name;
processInfo.redisAddress = redisAddress;
processInfo.ip = ip;
processInfo.redirect = redirect;
processInfo.cleanup = cleanup;
processInfo.process = p;
runInfo.allProcesses.get(type.ordinal()).add(processInfo);
return p;
}
private void recordLogFilesInRedis(String redisAddress, String nodeIpAddress,
List<String> logFiles) {
if (redisAddress != null && !redisAddress.isEmpty() && nodeIpAddress != null
&& !nodeIpAddress.isEmpty() && logFiles.size() > 0) {
String[] ipPort = redisAddress.split(":");
Jedis jedisClient = new Jedis(ipPort[0], Integer.parseInt(ipPort[1]));
String logFileListKey = String.format("LOG_FILENAMES:{%s}", nodeIpAddress);
for (String logfile : logFiles) {
jedisClient.rpush(logFileListKey, logfile);
}
jedisClient.close();
}
}
private void startRayProcesses() {
Jedis redisClient = null;
RayLog.core.info("start ray processes @ " + params.node_ip_address + " ...");
// start primary redis
if (params.redis_address.length() == 0) {
List<String> primaryShards = startRedis(
params.node_ip_address, params.redis_port, 1, params.redirect, params.cleanup);
params.redis_address = primaryShards.get(0);
String[] args = params.redis_address.split(":");
redisClient = new Jedis(args[0], Integer.parseInt(args[1]));
// Register the number of Redis shards in the primary shard, so that clients
// know how many redis shards to expect under RedisShards.
redisClient.set("NumRedisShards", Integer.toString(params.num_redis_shards));
} else {
String[] args = params.redis_address.split(":");
redisClient = new Jedis(args[0], Integer.parseInt(args[1]));
}
runInfo.redisAddress = params.redis_address;
// start redis shards
if (params.start_redis_shards) {
runInfo.redisShards = startRedis(
params.node_ip_address, params.redis_port + 1, params.num_redis_shards,
params.redirect,
params.cleanup);
// Store redis shard information in the primary redis shard.
for (int i = 0; i < runInfo.redisShards.size(); i++) {
String addr = runInfo.redisShards.get(i);
redisClient.rpush("RedisShards", addr);
}
}
redisClient.close();
AddressInfo info = new AddressInfo();
// Start object store
int rpcPort = params.object_store_rpc_port;
String storeName = "/tmp/plasma_store" + rpcPort;
startObjectStore(0, info,
params.redis_address, params.node_ip_address, params.redirect, params.cleanup);
Map<String, Double> staticResources =
ResourceUtil.getResourcesMapFromString(params.static_resources);
//Start raylet
startRaylet(storeName, info, params.num_workers,
params.redis_address,
params.node_ip_address, params.redirect, staticResources, params.cleanup);
runInfo.localStores.add(info);
if (!checkAlive()) {
cleanup(true);
throw new RuntimeException("Start Ray processes failed");
}
}
private boolean checkAlive() {
RunInfo.ProcessType[] types = RunInfo.ProcessType.values();
for (int i = 0; i < types.length; i++) {
ProcessInfo p;
for (int j = 0; j < runInfo.allProcesses.get(i).size(); ) {
p = runInfo.allProcesses.get(i).get(j);
if (!p.process.isAlive()) {
RayLog.core.error("Process " + p.process.hashCode() + " is not alive!" + " Process Type "
+ types[i].name());
runInfo.deadProcess.add(p);
runInfo.allProcesses.get(i).remove(j);
} else {
j++;
}
}
}
return runInfo.deadProcess.isEmpty();
}
// kill all processes started by startRayHead
public void cleanup(boolean killAll) {
// clean up the process in reverse order
for (int i = ProcessType.values().length - 1; i >= 0; i--) {
if (killAll) {
runInfo.allProcesses.get(i).forEach(p -> {
if (killProcess(p.process)) {
RayLog.core.info("Kill process " + p.process.hashCode() + " forcely");
}
});
} else {
runInfo.toBeCleanedProcesses.get(i).forEach(p -> {
if (killProcess(p)) {
RayLog.core.info("Kill process " + p.hashCode() + " forcely");
}
});
}
runInfo.toBeCleanedProcesses.get(i).clear();
runInfo.allProcesses.get(i).clear();
runInfo.deadProcess.clear();
}
}
//
// start a redis server
//
// @param ip the IP address of the local node
// @param port port to be opended for redis traffic
// @param numOfShards the number of redis shards to start
// @param redirect whether to redirect the output/err to the log files
// @param cleanup true if using ray in local mode. If cleanup is true, when
// all Redis processes started by this method will be killed by @cleanup
// when the worker exits
// @return primary redis shard address
//
private List<String> startRedis(String ip, int port, int numOfShards,
boolean redirect, boolean cleanup) {
ArrayList<String> shards = new ArrayList<>();
String addr;
for (int i = 0; i < numOfShards; i++) {
addr = startRedisInstance(ip, port + i, redirect, cleanup);
if (addr.length() == 0) {
cleanup(cleanup);
shards.clear();
return shards;
} else {
shards.add(addr);
}
}
for (String shard : shards) {
// TODO: wait for redis server to start
}
return shards;
}
//
// @param ip local node ip, only used for logging purpose
// @param port given port for this redis instance, 0 for auto-selected port
// @return redis server address
//
private String startRedisInstance(String ip, int port,
boolean redirect, boolean cleanup) {
String redisFilePath = paths.redis_server;
String redisModule = paths.redis_module;
assert (new File(redisFilePath).exists()) : "file don't exsits : " + redisFilePath;
assert (new File(redisModule).exists()) : "file don't exsits : " + redisModule;
String cmd = redisFilePath + " --protected-mode no --port " + port + " --loglevel warning"
+ " --loadmodule " + redisModule;
Map<String, String> env = null;
Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_REDIS_SERVER,
"redis", "", ip, redirect, cleanup);
if (p == null || !p.isAlive()) {
return "";
}
// Wait 200ms and check whether the process is alive.
try {
TimeUnit.MILLISECONDS.sleep(300);
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!p.isAlive()) {
throw new RuntimeException("Failed to start " + name);
}
processes.add(p);
LOGGER.info("{} process started", name);
}
Jedis client = new Jedis(params.node_ip_address, port);
/**
* Start all Ray processes on this node.
* @param isHead Whether this node is the head node. If true, redis server will be started.
*/
public void startRayProcesses(boolean isHead) {
LOGGER.info("Starting ray processes @ {}.", rayConfig.nodeIp);
try {
createTempDirs();
if (isHead) {
startRedisServer();
}
startObjectStore();
startRaylet();
LOGGER.info("All processes started @ {}.", rayConfig.nodeIp);
} catch (Exception e) {
// Clean up started processes.
cleanup();
LOGGER.error("Failed to start ray processes.", e);
throw new RuntimeException("Failed to start ray processes.", e);
}
}
// Configure Redis to only generate notifications for the export keys.
client.configSet("notify-keyspace-events", "Kl");
private void startRedisServer() {
// start primary redis
String primary = startRedisInstance(rayConfig.nodeIp, rayConfig.headRedisPort, null);
rayConfig.setRedisAddress(primary);
try (Jedis client = new Jedis("127.0.0.1", rayConfig.headRedisPort)) {
client.set("UseRaylet", "1");
// Register the number of Redis shards in the primary shard, so that clients
// know how many redis shards to expect under RedisShards.
client.set("NumRedisShards", Integer.toString(rayConfig.numberRedisShards));
// Put a time stamp in Redis to indicate when it was started.
client.set("redis_start_time", LocalDateTime.now().toString());
// start redis shards
for (int i = 0; i < rayConfig.numberRedisShards; i++) {
String shard = startRedisInstance(rayConfig.nodeIp, rayConfig.headRedisPort + i + 1, i);
client.rpush("RedisShards", shard);
}
}
}
private String startRedisInstance(String ip, int port, Integer shard) {
List<String> command = ImmutableList.of(
rayConfig.redisServerExecutablePath,
"--protected-mode",
"no",
"--port",
String.valueOf(port),
"--loglevel",
"warning",
"--loadmodule",
rayConfig.redisModulePath
);
String name = shard == null ? "redis" : "redis-" + shard;
startProcess(command, null, name);
try (Jedis client = new Jedis("127.0.0.1", port)) {
// Configure Redis to only generate notifications for the export keys.
client.configSet("notify-keyspace-events", "Kl");
// Put a time stamp in Redis to indicate when it was started.
client.set("redis_start_time", LocalDateTime.now().toString());
}
client.close();
return ip + ":" + port;
}
private void startRaylet(String storeName, AddressInfo info, int numWorkers,
String redisAddress, String ip, boolean redirect,
Map<String, Double> staticResources, boolean cleanup) {
int rpcPort = params.raylet_port;
String rayletSocketName = "/tmp/raylet" + rpcPort;
String filePath = paths.raylet;
//Create the worker command that the raylet will use to start workers.
String workerCommand = buildWorkerCommandRaylet(info.storeName, rayletSocketName,
UniqueId.NIL, "", ip, redisAddress);
int sep = redisAddress.indexOf(':');
assert (sep != -1);
String gcsIp = redisAddress.substring(0, sep);
String gcsPort = redisAddress.substring(sep + 1);
String resourceArgument = ResourceUtil.getResourcesStringFromMap(staticResources);
private void startRaylet() {
int hardwareConcurrency = Runtime.getRuntime().availableProcessors();
int maximumStartupConcurrency = Math.max(1, Math.min(staticResources.get("CPU").intValue(),
hardwareConcurrency));
int maximumStartupConcurrency = Math.max(1,
Math.min(rayConfig.resources.getOrDefault("CPU", 0.0).intValue(), hardwareConcurrency));
// The second-last arugment is the worker command for Python, not needed for Java.
String[] cmds = new String[]{filePath, rayletSocketName, storeName, ip, gcsIp,
gcsPort, String.valueOf(numWorkers), String.valueOf(maximumStartupConcurrency),
resourceArgument, "", workerCommand};
Process p = startProcess(cmds, null, RunInfo.ProcessType.PT_RAYLET,
"raylet", redisAddress, ip, redirect, cleanup);
if (p != null && p.isAlive()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (p == null || !p.isAlive()) {
info.rayletSocketName = "";
info.rayletRpcAddr = "";
throw new RuntimeException("Failed to start raylet process.");
} else {
info.rayletSocketName = rayletSocketName;
info.rayletRpcAddr = ip + ":" + rpcPort;
}
}
private String buildWorkerCommandRaylet(String storeName, String rayletSocketName,
UniqueId actorId, String actorClass,
String ip, String redisAddress) {
String workerConfigs = "ray.java.start.object_store_name=" + storeName
+ ";ray.java.start.raylet_socket_name=" + rayletSocketName
+ ";ray.java.start.worker_mode=WORKER";
workerConfigs += ";ray.java.start.deploy=" + params.deploy;
if (!actorId.equals(UniqueId.NIL)) {
workerConfigs += ";ray.java.start.actor_id=" + actorId;
}
if (!actorClass.equals("")) {
workerConfigs += ";ray.java.start.driver_class=" + actorClass;
}
String jvmArgs = "";
jvmArgs += " -Dlogging.path=" + params.log_dir;
jvmArgs += " -Dlogging.file.name=core-*pid_suffix*";
return buildJavaProcessCommand(
RunInfo.ProcessType.PT_WORKER,
"org.ray.runtime.runner.worker.DefaultWorker",
"",
workerConfigs,
jvmArgs,
ip,
redisAddress,
null
// See `src/ray/raylet/main.cc` for the meaning of each parameter.
List<String> command = ImmutableList.of(
rayConfig.rayletExecutablePath,
rayConfig.rayletSocketName,
rayConfig.objectStoreSocketName,
rayConfig.nodeIp,
rayConfig.getRedisIp(),
rayConfig.getRedisPort().toString(),
"0", // number of initial workers
String.valueOf(maximumStartupConcurrency),
ResourceUtil.getResourcesStringFromMap(rayConfig.resources),
"", // python worker command
buildWorkerCommandRaylet() // java worker command
);
startProcess(command, null, "raylet");
}
private void startObjectStore(int index, AddressInfo info, String redisAddress,
String ip, boolean redirect, boolean cleanup) {
int occupiedMemoryMb = params.object_store_occupied_memory_MB;
long memoryBytes = occupiedMemoryMb * 1000000;
String filePath = paths.store;
int rpcPort = params.object_store_rpc_port + index;
String name = "/tmp/plasma_store" + rpcPort;
String rpcAddr = "";
String cmd = filePath + " -s " + name + " -m " + memoryBytes;
private String concatPath(Stream<String> stream) {
// TODO (hchen): Right now, raylet backend doesn't support worker command with spaces.
// Thus, we have to drop some some paths until that is fixed.
return stream.filter(s -> !s.contains(" ")).collect(Collectors.joining(":"));
}
Map<String, String> env = null;
Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_PLASMA_STORE,
"plasma_store", redisAddress, ip, redirect, cleanup);
private String buildWorkerCommandRaylet() {
List<String> cmd = new ArrayList<>();
cmd.add("java");
cmd.add("-classpath");
if (p != null && p.isAlive()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Generate classpath based on current classpath + user-defined classpath.
String classpath = concatPath(Stream.concat(
Stream.of(System.getProperty("java.class.path").split(":")),
rayConfig.classpath.stream()
));
cmd.add(classpath);
// library path
String libraryPath = concatPath(rayConfig.libraryPath.stream());
cmd.add("-Djava.library.path=" + libraryPath);
// logging path
if (rayConfig.redirectOutput) {
cmd.add("-Dray.logging.stdout=org.apache.log4j.varia.NullAppender");
cmd.add("-Dray.logging.file=org.apache.log4j.FileAppender");
int logId = random.nextInt(10000);
String date = DATE_TIME_FORMATTER.format(LocalDateTime.now());
String logFile = String.format("%s/worker-%s-%05d.out", rayConfig.logDir, date, logId);
cmd.add("-Dray.logging.file.path=" + logFile);
}
if (p == null || !p.isAlive()) {
info.storeName = "";
info.storeRpcAddr = "";
throw new RuntimeException("Start object store failed ...");
} else {
info.storeName = name;
info.storeRpcAddr = rpcAddr;
}
// Config overwrite
cmd.add("-Dray.redis.address=" + rayConfig.getRedisAddress());
// Main class
cmd.add(WORKER_CLASS);
String command = Joiner.on(" ").join(cmd);
LOGGER.debug("Worker command is: {}", command);
return command;
}
private void startObjectStore() {
List<String> command = ImmutableList.of(
rayConfig.plasmaStoreExecutablePath,
"-s",
rayConfig.objectStoreSocketName,
"-m",
rayConfig.objectStoreSize.toString()
);
startProcess(command, null, "plasma_store");
}
}

View file

@ -1,7 +1,6 @@
package org.ray.runtime.runner.worker;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.config.WorkerMode;
import org.ray.api.Ray;
/**
* The main function of DefaultDriver.
@ -15,15 +14,11 @@ public class DefaultDriver {
//
public static void main(String[] args) {
try {
AbstractRayRuntime.init(args);
assert AbstractRayRuntime.getParams().worker_mode == WorkerMode.DRIVER;
System.setProperty("ray.worker.mode", "DRIVER");
Ray.init();
String driverClass = AbstractRayRuntime.configReader
.getStringValue("ray.java.start", "driver_class", "",
"java class which main is served as the driver in a java worker");
String driverArgs = AbstractRayRuntime.configReader
.getStringValue("ray.java.start", "driver_args", "",
"arguments for the java class main function which is served at the driver");
String driverClass = null;
String driverArgs = null;
Class<?> cls = Class.forName(driverClass);
String[] argsArray = (driverArgs != null) ? driverArgs.split(",") : (new String[] {});
cls.getMethod("main", String[].class).invoke(null, (Object) argsArray);

View file

@ -1,31 +1,29 @@
package org.ray.runtime.runner.worker;
import org.ray.api.Ray;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.config.WorkerMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* default worker implementation.
* Default implementation of the worker process.
*/
public class DefaultWorker {
//
// String workerCmd = "java" + " -jarls " + workerPath + " --node-ip-address=" + ip
// + " --object-store-name=" + storeName
// + " --object-store-manager-name=" + storeManagerName
// + " --local-scheduler-name=" + name + " --redis-address=" + redisAddress
//
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultWorker.class);
public static void main(String[] args) {
try {
AbstractRayRuntime.init(args);
assert AbstractRayRuntime.getParams().worker_mode == WorkerMode.WORKER;
AbstractRayRuntime.getInstance().loop();
throw new RuntimeException("Control flow should never reach here");
} catch (Throwable e) {
e.printStackTrace();
System.err
.println("--config=ray.config.ini --overwrite=ray.java.start.worker_mode=WORKER;...");
System.exit(-1);
System.setProperty("ray.worker.mode", "WORKER");
Ray.init();
} catch (Exception e) {
LOGGER.error("Worker failed to start.", e);
}
LOGGER.info("Worker started.");
try {
((AbstractRayRuntime)Ray.internal()).loop();
} catch (Exception e) {
LOGGER.error("Error occurred in worker.", e);
}
}
}

View file

@ -37,6 +37,9 @@ public class ResourceUtil {
* @return The format resources string, like "{CPU:4, GPU:0}".
*/
public static String getResourcesFromatStringFromMap(Map<String, Double> resources) {
if (resources == null) {
return "{}";
}
StringBuilder builder = new StringBuilder();
builder.append("{");
int count = 1;
@ -89,6 +92,9 @@ public class ResourceUtil {
String[] items = resources.split(",");
for (String item : items) {
String trimItem = item.trim();
if (trimItem.isEmpty()) {
continue;
}
String[] resourcePair = trimItem.split(":");
if (resourcePair.length != 2) {

View file

@ -1,36 +0,0 @@
package org.ray.runtime.util.config;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotate a field as a ray configuration item.
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AConfig {
/**
* comments for this configuration field.
*/
String comment();
/**
* when the config is an array list, a splitter set is specified, e.g., " \t" to use ' ' and '\t'
* as possible splits.
*/
String splitters() default ", \t";
/**
* indirect with value as the new section name, the field name remains the same.
*/
String defaultIndirectSectionName() default "";
/**
* see ConfigReader.getIndirectStringArray this config tells which is the default
* indirectSectionName in that function.
*/
String defaultArrayIndirectSectionName() default "";
}

View file

@ -1,15 +0,0 @@
package org.ray.runtime.util.config;
/**
* A ray configuration item of type {@code T}.
*/
public class ConfigItem<T> {
public String key;
public String oriValue;
public T defaultValue;
public String desc;
}

View file

@ -1,382 +0,0 @@
package org.ray.runtime.util.config;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Vector;
import org.ini4j.Config;
import org.ini4j.Ini;
import org.ini4j.Profile;
import org.ray.api.id.UniqueId;
import org.ray.runtime.util.ObjectUtil;
import org.ray.runtime.util.StringUtil;
/**
* Loads configurations from a file.
*/
public class ConfigReader {
private final CurrentUseConfig currentUseConfig = new CurrentUseConfig();
private final Ini ini = new Ini();
private String file = "";
public ConfigReader(String filePath) throws Exception {
this(filePath, null);
}
public ConfigReader(String filePath, String updateConfigStr) throws Exception {
System.out.println("Build ConfigReader, the file path " + filePath + " ,the update config str "
+ updateConfigStr);
try {
loadConfigFile(filePath);
updateConfigFile(updateConfigStr);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
private void loadConfigFile(String filePath) throws Exception {
this.currentUseConfig.filePath = filePath;
String configFileDir = (new File(filePath)).getAbsoluteFile().getParent();
byte[] encoded = Files.readAllBytes(Paths.get(filePath));
String content = new String(encoded, StandardCharsets.UTF_8);
content = content.replaceAll("%CONFIG_FILE_DIR%", configFileDir);
InputStream fis = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
Config config = new Config();
ini.setConfig(config);
ini.load(fis);
file = currentUseConfig.filePath;
}
private void updateConfigFile(String updateConfigStr) {
if (updateConfigStr == null) {
return;
}
String[] updateConfigArray = updateConfigStr.split(";");
for (String currentUpdateConfig : updateConfigArray) {
if (StringUtil.isNullOrEmpty(currentUpdateConfig)) {
continue;
}
String[] currentUpdateConfigArray = currentUpdateConfig.split("=");
String sectionAndItemKey;
String value = "";
if (currentUpdateConfigArray.length == 2) {
sectionAndItemKey = currentUpdateConfigArray[0];
value = currentUpdateConfigArray[1];
} else if (currentUpdateConfigArray.length == 1) {
sectionAndItemKey = currentUpdateConfigArray[0];
} else {
String errorMsg = "invalid config (must be of k=v or k or k=): " + currentUpdateConfig;
System.err.println(errorMsg);
throw new RuntimeException(errorMsg);
}
int splitOffset = sectionAndItemKey.lastIndexOf(".");
int len = sectionAndItemKey.length();
if (splitOffset < 1 || splitOffset == len - 1) {
String errorMsg =
"invalid config (no '.' found for section name and key):" + currentUpdateConfig;
System.err.println(errorMsg);
throw new RuntimeException(errorMsg);
}
String sectionKey = sectionAndItemKey.substring(0, splitOffset);
String itemKey = sectionAndItemKey.substring(splitOffset + 1);
if (ini.containsKey(sectionKey)) {
ini.get(sectionKey).put(itemKey, value);
} else {
ini.add(sectionKey, itemKey, value);
}
}
}
public String filePath() {
return file;
}
public CurrentUseConfig getCurrentUseConfig() {
return currentUseConfig;
}
public String getStringValue(String sectionKey, String configKey, String defaultValue,
String dsptr) {
String value = getOriValue(sectionKey, configKey, defaultValue, dsptr);
if (value != null) {
return value;
} else {
return defaultValue;
}
}
public boolean getBooleanValue(String sectionKey, String configKey, boolean defaultValue,
String dsptr) {
String value = getOriValue(sectionKey, configKey, defaultValue, dsptr);
if (value != null) {
if (value.length() == 0) {
return defaultValue;
} else {
return Boolean.valueOf(value);
}
} else {
return defaultValue;
}
}
public int getIntegerValue(String sectionKey, String configKey, int defaultValue, String dsptr) {
String value = getOriValue(sectionKey, configKey, defaultValue, dsptr);
if (value != null) {
if (value.length() == 0) {
return defaultValue;
} else {
return Integer.valueOf(value);
}
} else {
return defaultValue;
}
}
private synchronized <T> String getOriValue(String sectionKey, String configKey, T defaultValue,
String deptr) {
if (null == deptr) {
throw new RuntimeException("desc must not be empty of the key:" + configKey);
}
Profile.Section section = ini.get(sectionKey);
String oriValue = null;
if (section != null && section.containsKey(configKey)) {
oriValue = section.get(configKey);
}
if (!currentUseConfig.sectionMap.containsKey(sectionKey)) {
ConfigSection configSection = new ConfigSection();
configSection.sectionKey = sectionKey;
updateConfigSection(configSection, configKey, defaultValue, deptr, oriValue);
currentUseConfig.sectionMap.put(sectionKey, configSection);
} else if (!currentUseConfig.sectionMap.get(sectionKey).itemMap.containsKey(configKey)) {
ConfigSection configSection = currentUseConfig.sectionMap.get(sectionKey);
updateConfigSection(configSection, configKey, defaultValue, deptr, oriValue);
}
return oriValue;
}
private <T> void updateConfigSection(ConfigSection configSection, String configKey,
T defaultValue, String deptr, String oriValue) {
ConfigItem<T> configItem = new ConfigItem<>();
configItem.defaultValue = defaultValue;
configItem.key = configKey;
configItem.oriValue = oriValue;
configItem.desc = deptr;
configSection.itemMap.put(configKey, configItem);
}
public long getLongValue(String sectionKey, String configKey, long defaultValue, String dsptr) {
String value = getOriValue(sectionKey, configKey, defaultValue, dsptr);
if (value != null) {
if (value.length() == 0) {
return defaultValue;
} else {
return Long.valueOf(value);
}
} else {
return defaultValue;
}
}
public double getDoubleValue(String sectionKey, String configKey, double defaultValue,
String dsptr) {
String value = getOriValue(sectionKey, configKey, defaultValue, dsptr);
if (value != null) {
if (value.length() == 0) {
return defaultValue;
} else {
return Double.valueOf(value);
}
} else {
return defaultValue;
}
}
public int[] getIntegerArray(String sectionKey, String configKey, int[] defaultValue,
String dsptr) {
String value = getOriValue(sectionKey, configKey, defaultValue, dsptr);
int[] array = defaultValue;
if (value != null) {
String[] list = value.split(",");
array = new int[list.length];
for (int i = 0; i < list.length; i++) {
array[i] = Integer.valueOf(list[i]);
}
}
return array;
}
/**
* get a string list from a whole section as keys e.g., [core] data_dirs = local.dirs # or
* cluster.dirs
* [local.dirs] /home/xxx/1 /home/yyy/2
* [cluster.dirs] ...
*
* @param sectionKey e.g., core
* @param configKey e.g., data_dirs
* @param indirectSectionName e.g., cluster.dirs
* @return string list
*/
public String[] getIndirectStringArray(String sectionKey, String configKey,
String indirectSectionName, String dsptr) {
String s = getStringValue(sectionKey, configKey, indirectSectionName, dsptr);
Profile.Section section = ini.get(s);
if (section == null) {
return new String[] {};
} else {
return section.keySet().toArray(new String[] {});
}
}
public <T> void readObject(String sectionKey, T obj, T defaultValues) {
for (Field fld : obj.getClass().getFields()) {
Object defaultFldValue;
try {
defaultFldValue = defaultValues != null ? fld.get(defaultValues) : null;
} catch (IllegalArgumentException | IllegalAccessException e) {
defaultFldValue = null;
}
String section = sectionKey;
String comment;
String splitters = ", \t";
String defaultArrayIndirectSectionName;
AConfig[] anns = fld.getAnnotationsByType(AConfig.class);
if (anns.length > 0) {
comment = anns[0].comment();
if (!StringUtil.isNullOrEmpty(anns[0].splitters())) {
splitters = anns[0].splitters();
}
defaultArrayIndirectSectionName = anns[0].defaultArrayIndirectSectionName();
// redirect the section if necessary
if (!StringUtil.isNullOrEmpty(anns[0].defaultIndirectSectionName())) {
section = this
.getStringValue(sectionKey, fld.getName(), anns[0].defaultIndirectSectionName(),
comment);
}
} else {
throw new RuntimeException("unspecified comment, please use @AConfig(comment = xxxx) for "
+ obj.getClass().getName() + "." + fld.getName() + "'s configuration descriptions ");
}
try {
if (fld.getType().isPrimitive()) {
if (fld.getType().equals(boolean.class)) {
boolean v = getBooleanValue(section, fld.getName(), (boolean) defaultFldValue, comment);
fld.set(obj, v);
} else if (fld.getType().equals(float.class)) {
float v = (float) getDoubleValue(section, fld.getName(),
(double) (float) defaultFldValue, comment);
fld.set(obj, v);
} else if (fld.getType().equals(double.class)) {
double v = getDoubleValue(section, fld.getName(), (double) defaultFldValue, comment);
fld.set(obj, v);
} else if (fld.getType().equals(byte.class)) {
byte v = (byte) getLongValue(section, fld.getName(), (long) (byte) defaultFldValue,
comment);
fld.set(obj, v);
} else if (fld.getType().equals(char.class)) {
char v = (char) getLongValue(section, fld.getName(), (long) (char) defaultFldValue,
comment);
fld.set(obj, v);
} else if (fld.getType().equals(short.class)) {
short v = (short) getLongValue(section, fld.getName(), (long) (short) defaultFldValue,
comment);
fld.set(obj, v);
} else if (fld.getType().equals(int.class)) {
int v = (int) getLongValue(section, fld.getName(), (long) (int) defaultFldValue,
comment);
fld.set(obj, v);
} else if (fld.getType().equals(long.class)) {
long v = getLongValue(section, fld.getName(), (long) defaultFldValue, comment);
fld.set(obj, v);
} else {
throw new RuntimeException("unhandled type " + fld.getType().getName());
}
} else if (fld.getType().equals(String.class)) {
String v = getStringValue(section, fld.getName(), (String) defaultFldValue, comment);
fld.set(obj, v);
} else if (fld.getType().isEnum()) {
String sv = getStringValue(section, fld.getName(), defaultFldValue.toString(), comment);
@SuppressWarnings({"unchecked", "rawtypes"})
Object v = Enum.valueOf((Class<Enum>) fld.getType(), sv);
fld.set(obj, v);
// TODO: this is a hack and needs to be resolved later
} else if (fld.getType().equals(UniqueId.class)) {
String sv = getStringValue(section, fld.getName(), defaultFldValue.toString(), comment);
Object v;
try {
v = UniqueId.fromHexString(sv);
} catch (IllegalArgumentException e) {
System.err.println(
section + "." + fld.getName() + "'s format (" + sv + ") is invalid, default to "
+ defaultFldValue.toString());
v = defaultFldValue;
}
fld.set(obj, v);
} else if (fld.getType().isArray()) {
Class<?> ccls = fld.getType().getComponentType();
String ss = getStringValue(section, fld.getName(), null, comment);
if (null == ss) {
fld.set(obj, defaultFldValue);
} else {
Vector<String> ls = StringUtil.split(ss, splitters, "", "");
if (ccls.equals(boolean.class)) {
boolean[] v = ObjectUtil
.toBooleanArray(ls.stream().map(Boolean::parseBoolean).toArray());
fld.set(obj, v);
} else if (ccls.equals(double.class)) {
double[] v = ls.stream().mapToDouble(Double::parseDouble).toArray();
fld.set(obj, v);
} else if (ccls.equals(int.class)) {
int[] v = ls.stream().mapToInt(Integer::parseInt).toArray();
fld.set(obj, v);
} else if (ccls.equals(long.class)) {
long[] v = ls.stream().mapToLong(Long::parseLong).toArray();
fld.set(obj, v);
} else if (ccls.equals(String.class)) {
String[] v;
if (StringUtil.isNullOrEmpty(defaultArrayIndirectSectionName)) {
v = ls.toArray(new String[] {});
} else {
v = this
.getIndirectStringArray(section, fld.getName(),
defaultArrayIndirectSectionName,
comment);
}
fld.set(obj, v);
} else {
throw new RuntimeException(
"Array with component type " + ccls.getName() + " is not supported yet");
}
}
} else {
Object fldObj = ObjectUtil.newObject(fld.getType());
fld.set(obj, fldObj);
readObject(section + "." + fld.getName(), fldObj, defaultFldValue);
}
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new RuntimeException("set fld " + fld.getName() + " failed, err = " + e.getMessage(),
e);
}
}
}
}

View file

@ -1,13 +0,0 @@
package org.ray.runtime.util.config;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* A configuration section of related items.
*/
public class ConfigSection {
public final Map<String, ConfigItem<?>> itemMap = new ConcurrentHashMap<>();
public String sectionKey;
}

View file

@ -1,15 +0,0 @@
package org.ray.runtime.util.config;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* The configuration which is currently in use.
*/
public class CurrentUseConfig {
public final Map<String, ConfigSection> sectionMap = new ConcurrentHashMap<>();
public String filePath;
}

View file

@ -1,6 +1,5 @@
package org.ray.runtime.util.logger;
import org.ray.runtime.util.SystemUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -21,24 +20,8 @@ public class RayLog {
*/
public static Logger rapp;
/**
* Initialize loggers
* @param logDir directory of the log files.
*/
public static void init(String logDir) {
String loggingPath = System.getProperty("logging.path");
if (loggingPath == null) {
System.setProperty("logging.path", logDir);
}
String loggingFileName = System.getProperty("logging.file.name");
if (loggingFileName != null && loggingFileName.contains("*pid_suffix*")) {
loggingFileName = loggingFileName.replaceAll("\\*pid_suffix\\*",
String.valueOf(SystemUtil.pid()));
System.setProperty("logging.file.name", loggingFileName);
}
public static void init() {
core = LoggerFactory.getLogger("core");
rapp = core;
}
}

View file

@ -1,20 +1,17 @@
# define default properties here
logging.level=WARN
logging.path=./run/logs
logging.file.name=core
logging.max.log.file.num=10
logging.max.log.file.size=500MB
ray.logging.level=INFO
log4j.rootLogger=${logging.level}, stdout, core
ray.logging.stdout=org.apache.log4j.ConsoleAppender
ray.logging.file=org.apache.log4j.varia.NullAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.rootLogger=${ray.logging.level}, stdout, file
log4j.appender.stdout=${ray.logging.stdout}
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} [%t]: %m%n
log4j.appender.core=org.apache.log4j.RollingFileAppender
log4j.appender.core.File=${logging.path}/${logging.file.name}.log
log4j.appender.core.Append=true
log4j.appender.core.MaxFileSize=${logging.max.log.file.size}
log4j.appender.core.MaxBackupIndex=${logging.max.log.file.num}
log4j.appender.core.layout=org.apache.log4j.PatternLayout
log4j.appender.core.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} [%t]: %m%n
# Set the file appender to null by default. If `ray.redirect-output` config is set to true,
# this appender will be set to a real file appender.
log4j.appender.file=${ray.logging.file}
log4j.appender.file.File=${ray.logging.file.path}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1} [%t]: %m%n

View file

@ -0,0 +1,79 @@
// This file contains default values of all Ray configurations.
// Users should define their own 'ray.conf' file in the classpath,
// or use Java properties, to overwrite these values.
ray {
// ----------------------
// Basic configurations
// ----------------------
// This is the path to the directory where Ray is installed, e.g.,
// something like /home/ubmutu/ray. This can be an absolute path or
// a relative path from the current working directory.
home: ""
// IP of this node. if not provided, IP will be automatically detected.
node-ip: ""
// Run mode, available options are:
//
// `SINGLE_PROCESS`: Ray is running in one single Java process, without Raylet backend,
// object store, and GCS. It's useful for debug.
// `CLUSTER`: Ray is running on one or more nodes, with multiple processes.
run-mode: CLUSTER
// Available resources on this node, for example "CPU:4,GPU:0".
resources: ""
// If worker.mode is DRIVER, specify the driver id.
// If not provided, a random id will be used.
driver.id: ""
// Root dir of log files.
log-dir: /tmp/ray/logs
// If true, output of worker processes will be redirected to log files.
// Otherwise, output will be printed to console.
redirect-output: true
// Custom `java.library.path`
// Note, do not use `dir1:dir2` format, put each dir as a list item.
library.path: []
// Custom classpath.
// Note, do not use `dir1:dir2` format, put each dir as a list item.
classpath = []
// ----------------------
// Redis configurations
// ----------------------
redis {
// The address of the redis server to connect, in format `ip:port`.
// If not provided, Ray processes will be started locally, including
// Redis server, Raylet and object store.
address: ""
// If `redis.server` isn't provided, which port we should use to start redis server.
head-port: 6379
// If `redis.server` isn't provided, how many Redis shards we should start in addition to the
// primary Redis shard. The ports of these shards will be `head-port + 1`, `head-port + 2`, etc.
shard-number: 1
}
// ----------------------------
// Object store configurations
// ----------------------------
object-store {
// RPC socket name of object store
socket-name: /tmp/ray/sockets/object_store
// Initial size of the object store.
size: 10 MB
}
// ----------------------------
// Raylet configurations
// ----------------------------
raylet {
// RPC socket name of Raylet
socket-name: /tmp/ray/sockets/raylet
}
}

View file

@ -51,14 +51,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.21.0</version>
<configuration>
<environmentVariables>
<RAY_CONFIG>${basedir}/../ray.config.ini</RAY_CONFIG>
</environmentVariables>
<argLine>-ea
-Djava.library.path=${basedir}/../../build/src/plasma:${basedir}/../../build/src/local_scheduler
-noverify
-DlogOutput=console
</argLine>
<testSourceDirectory>${basedir}/src/main/java/</testSourceDirectory>
<testClassesDirectory>${project.build.directory}/classes/</testClassesDirectory>
</configuration>

View file

@ -0,0 +1,20 @@
package org.ray.api.test;
import org.junit.Assert;
import org.junit.Test;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.config.WorkerMode;
public class RayConfigTest {
@Test
public void testCreateRayConfig() {
System.setProperty("ray.home", "/path/to/ray");
RayConfig rayConfig = RayConfig.create();
Assert.assertEquals("/path/to/ray", rayConfig.rayHome);
Assert.assertEquals(WorkerMode.DRIVER, rayConfig.workerMode);
Assert.assertEquals(RunMode.CLUSTER, rayConfig.runMode);
}
}

View file

@ -9,6 +9,8 @@ public class TestListener extends RunListener {
@Override
public void testRunStarted(Description description) {
System.setProperty("ray.home", "../..");
System.setProperty("ray.resources", "CPU:4");
Ray.init();
}

View file

@ -7,12 +7,12 @@ Ray Java Tutorial
Exercises
---------
Each file ``java/example/src/main/java/org/ray/exercise/Exercise*.java`` is a separate exercise.
To run a exercise case, set the ``RAY_CONFIG`` env variable and run the following command in ``ray/java/`` directory.
Each file of ``java/example/src/main/java/org/ray/exercise/Exercise*.java`` is a separate exercise.
To run them, execute the following command under ``ray/java`` folder.
.. code-block:: shell
java -Djava.library.path=../build/src/plasma/:../build/src/local_scheduler/ -classpath "tutorial/target/ray-tutorial-1.0.jar:tutorial/lib/*" org.ray.exercise.Exercise01
java -Dray.home=.. -classpath "tutorial/target/ray-tutorial-1.0.jar:tutorial/lib/*" org.ray.exercise.Exercise01
`Exercise 1 <https://github.com/ray-project/ray/tree/master/java/tutorial/src/main/java/org/ray/exercise/Exercise01.java>`_: Define a remote function, and execute multiple remote functions in parallel.

View file

@ -0,0 +1,5 @@
ray{
home: ".."
run-mode: CLUSTER
redirect-output: false
}