mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[Java] Add session_dir
as temp_dir for logs, socket files like Python (#7044)
* Support * Add gcs_server support * Fix ut * Fix * Remove unused py code * Fix linting * Fix cross language ci * Fix CI * Add docstring * Fix * Fix linting * Add a singleton for config * Refine * fix * Fix * linting * Remove FileUnit * Fix * Fix * Fix * Update java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java Co-Authored-By: Hao Chen <chenh1024@gmail.com> * Fix streaming singleprocess CI * Fix checkstyle Co-authored-by: Hao Chen <chenh1024@gmail.com>
This commit is contained in:
parent
5518a738b3
commit
94a286ef1d
11 changed files with 277 additions and 183 deletions
|
@ -18,7 +18,7 @@ public class DefaultRayRuntimeFactory implements RayRuntimeFactory {
|
|||
|
||||
@Override
|
||||
public RayRuntime createRayRuntime() {
|
||||
RayConfig rayConfig = RayConfig.create();
|
||||
RayConfig rayConfig = RayConfig.getInstance();
|
||||
try {
|
||||
FunctionManager functionManager = new FunctionManager(rayConfig.jobResourcePath);
|
||||
RayRuntime runtime;
|
||||
|
|
|
@ -41,6 +41,7 @@ public class RayDevRuntime extends AbstractRayRuntime {
|
|||
taskSubmitter = null;
|
||||
}
|
||||
taskExecutor = null;
|
||||
RayConfig.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -45,17 +45,24 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
|
|||
// Expose ray ABI symbols which may be depended by other shared
|
||||
// libraries such as libstreaming_java.so.
|
||||
// See BUILD.bazel:libcore_worker_library_java.so
|
||||
final RayConfig rayConfig = RayConfig.getInstance();
|
||||
if (rayConfig.getRedisAddress() != null && rayConfig.workerMode == WorkerType.DRIVER) {
|
||||
// Fetch session dir from GCS if this is a driver that is connecting to the existing GCS.
|
||||
RedisClient client = new RedisClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);
|
||||
final String sessionDir = client.get("session_dir", null);
|
||||
Preconditions.checkNotNull(sessionDir);
|
||||
rayConfig.setSessionDir(sessionDir);
|
||||
}
|
||||
|
||||
JniUtils.loadLibrary("core_worker_library_java", true);
|
||||
LOGGER.debug("Native libraries loaded.");
|
||||
RayConfig globalRayConfig = RayConfig.create();
|
||||
resetLibraryPath(globalRayConfig);
|
||||
|
||||
resetLibraryPath(rayConfig);
|
||||
try {
|
||||
FileUtils.forceMkdir(new File(globalRayConfig.logDir));
|
||||
FileUtils.forceMkdir(new File(rayConfig.logDir));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to create the log directory.", e);
|
||||
}
|
||||
nativeSetup(globalRayConfig.logDir);
|
||||
nativeSetup(rayConfig.logDir);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(RayNativeRuntime::nativeShutdownHook));
|
||||
}
|
||||
|
||||
|
@ -67,7 +74,6 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
|
|||
|
||||
public RayNativeRuntime(RayConfig rayConfig, FunctionManager functionManager) {
|
||||
super(rayConfig, functionManager);
|
||||
|
||||
// Reset library path at runtime.
|
||||
resetLibraryPath(rayConfig);
|
||||
|
||||
|
@ -111,7 +117,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
|
|||
manager.cleanup();
|
||||
manager = null;
|
||||
}
|
||||
|
||||
RayConfig.reset();
|
||||
LOGGER.info("RayNativeRuntime shutdown");
|
||||
}
|
||||
|
||||
|
|
|
@ -8,10 +8,12 @@ import com.typesafe.config.ConfigException;
|
|||
import com.typesafe.config.ConfigFactory;
|
||||
import com.typesafe.config.ConfigValue;
|
||||
import java.io.File;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import java.util.Random;
|
||||
import org.ray.api.id.JobId;
|
||||
import org.ray.runtime.generated.Common.WorkerType;
|
||||
import org.ray.runtime.util.NetworkUtil;
|
||||
|
@ -30,12 +32,22 @@ public class RayConfig {
|
|||
public static final String DEFAULT_CONFIG_FILE = "ray.default.conf";
|
||||
public static final String CUSTOM_CONFIG_FILE = "ray.conf";
|
||||
|
||||
private static final Random RANDOM = new Random();
|
||||
|
||||
private static final DateTimeFormatter DATE_TIME_FORMATTER =
|
||||
DateTimeFormatter.ofPattern("YYYY-MM-dd_HH-mm-ss");
|
||||
|
||||
private static final String DEFAULT_TEMP_DIR = "/tmp/ray";
|
||||
|
||||
private Config config;
|
||||
|
||||
public final String nodeIp;
|
||||
public final WorkerType workerMode;
|
||||
public final RunMode runMode;
|
||||
public final Map<String, Double> resources;
|
||||
private JobId jobId;
|
||||
public final String logDir;
|
||||
public String sessionDir;
|
||||
public String logDir;
|
||||
public final boolean redirectOutput;
|
||||
public final List<String> libraryPath;
|
||||
public final List<String> classpath;
|
||||
|
@ -50,16 +62,35 @@ public class RayConfig {
|
|||
public final String headRedisPassword;
|
||||
public final String redisPassword;
|
||||
|
||||
public final String objectStoreSocketName;
|
||||
public String objectStoreSocketName;
|
||||
public final Long objectStoreSize;
|
||||
|
||||
public final String rayletSocketName;
|
||||
public String rayletSocketName;
|
||||
private int nodeManagerPort;
|
||||
public final List<String> rayletConfigParameters;
|
||||
|
||||
public final String jobResourcePath;
|
||||
public final String pythonWorkerCommand;
|
||||
|
||||
private static volatile RayConfig instance = null;
|
||||
|
||||
public static RayConfig getInstance() {
|
||||
if (instance == null) {
|
||||
synchronized (RayConfig.class) {
|
||||
if (instance == null) {
|
||||
instance = RayConfig.create();
|
||||
}
|
||||
}
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static void reset() {
|
||||
synchronized (RayConfig.class) {
|
||||
instance = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of threads that execute tasks.
|
||||
*/
|
||||
|
@ -83,6 +114,7 @@ public class RayConfig {
|
|||
}
|
||||
|
||||
public RayConfig(Config config) {
|
||||
this.config = config;
|
||||
// Worker mode.
|
||||
WorkerType localWorkerMode;
|
||||
try {
|
||||
|
@ -118,8 +150,11 @@ public class RayConfig {
|
|||
} else {
|
||||
this.jobId = JobId.NIL;
|
||||
}
|
||||
// Log dir.
|
||||
logDir = removeTrailingSlash(config.getString("ray.log-dir"));
|
||||
|
||||
updateSessionDir();
|
||||
// Object store configurations.
|
||||
objectStoreSize = config.getBytes("ray.object-store.size");
|
||||
|
||||
// Redirect output.
|
||||
redirectOutput = config.getBoolean("ray.redirect-output");
|
||||
// Library path.
|
||||
|
@ -160,12 +195,6 @@ public class RayConfig {
|
|||
headRedisPassword = config.getString("ray.redis.head-password");
|
||||
redisPassword = config.getString("ray.redis.password");
|
||||
|
||||
// Object store configurations.
|
||||
objectStoreSocketName = config.getString("ray.object-store.socket-name");
|
||||
objectStoreSize = config.getBytes("ray.object-store.size");
|
||||
|
||||
// Raylet socket name.
|
||||
rayletSocketName = config.getString("ray.raylet.socket-name");
|
||||
// Raylet node manager port.
|
||||
nodeManagerPort = config.getInt("ray.raylet.node-manager-port");
|
||||
if (nodeManagerPort == 0) {
|
||||
|
@ -234,6 +263,66 @@ public class RayConfig {
|
|||
return nodeManagerPort;
|
||||
}
|
||||
|
||||
public void setSessionDir(String sessionDir) {
|
||||
this.sessionDir = sessionDir;
|
||||
}
|
||||
|
||||
public String getSessionDir() {
|
||||
return sessionDir;
|
||||
}
|
||||
|
||||
private void updateSessionDir() {
|
||||
// session dir
|
||||
String localSessionDir = System.getProperty("ray.session-dir");
|
||||
if (workerMode == WorkerType.DRIVER) {
|
||||
Preconditions.checkState(localSessionDir == null);
|
||||
final int minBound = 100000;
|
||||
final int maxBound = 999999;
|
||||
final String sessionName = String.format("session_%s_%d", DATE_TIME_FORMATTER.format(
|
||||
LocalDateTime.now()), RANDOM.nextInt(maxBound - minBound) + minBound);
|
||||
sessionDir = String.format("%s/%s", DEFAULT_TEMP_DIR, sessionName);
|
||||
} else if (workerMode == WorkerType.WORKER) {
|
||||
Preconditions.checkState(localSessionDir != null);
|
||||
sessionDir = removeTrailingSlash(localSessionDir);
|
||||
} else {
|
||||
throw new RuntimeException("Unknown worker type.");
|
||||
}
|
||||
|
||||
// Log dir.
|
||||
String localLogDir = null;
|
||||
if (config.hasPath("ray.log-dir")) {
|
||||
localLogDir = removeTrailingSlash(config.getString("ray.log-dir"));
|
||||
}
|
||||
if (Strings.isNullOrEmpty(localLogDir)) {
|
||||
logDir = String.format("%s/logs", sessionDir);
|
||||
} else {
|
||||
logDir = localLogDir;
|
||||
}
|
||||
|
||||
// Object store socket name.
|
||||
String localObjectStoreSocketName = null;
|
||||
if (config.hasPath("ray.object-store.socket-name")) {
|
||||
localObjectStoreSocketName = config.getString("ray.object-store.socket-name");
|
||||
}
|
||||
if (Strings.isNullOrEmpty(localObjectStoreSocketName)) {
|
||||
objectStoreSocketName = String.format("%s/sockets/object_store", sessionDir);
|
||||
} else {
|
||||
objectStoreSocketName = localObjectStoreSocketName;
|
||||
}
|
||||
|
||||
// Raylet socket name.
|
||||
String localRayletSocketName = null;
|
||||
if (config.hasPath("ray.raylet.socket-name")) {
|
||||
localRayletSocketName = config.getString("ray.raylet.socket-name");
|
||||
}
|
||||
if (Strings.isNullOrEmpty(localRayletSocketName)) {
|
||||
rayletSocketName = String.format("%s/sockets/raylet", sessionDir);
|
||||
} else {
|
||||
rayletSocketName = localRayletSocketName;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RayConfig{"
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
package org.ray.runtime.runner;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
|
@ -18,7 +21,7 @@ import java.util.stream.Stream;
|
|||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.ray.runtime.config.RayConfig;
|
||||
import org.ray.runtime.util.FileUtil;
|
||||
import org.ray.runtime.util.BinaryFileUtil;
|
||||
import org.ray.runtime.util.ResourceUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -36,9 +39,11 @@ public class RunManager {
|
|||
|
||||
private static final String WORKER_CLASS = "org.ray.runtime.runner.worker.DefaultWorker";
|
||||
|
||||
private static final String SESSION_LATEST = "session_latest";
|
||||
|
||||
private RayConfig rayConfig;
|
||||
|
||||
private Random random;
|
||||
private Random random = new Random();
|
||||
|
||||
private List<Pair<String, Process>> processes;
|
||||
|
||||
|
@ -47,7 +52,7 @@ public class RunManager {
|
|||
public RunManager(RayConfig rayConfig) {
|
||||
this.rayConfig = rayConfig;
|
||||
processes = new ArrayList<>();
|
||||
random = new Random();
|
||||
createTempDirs();
|
||||
}
|
||||
|
||||
public void cleanup() {
|
||||
|
@ -95,6 +100,17 @@ public class RunManager {
|
|||
FileUtils.forceMkdir(new File(rayConfig.logDir));
|
||||
FileUtils.forceMkdir(new File(rayConfig.rayletSocketName).getParentFile());
|
||||
FileUtils.forceMkdir(new File(rayConfig.objectStoreSocketName).getParentFile());
|
||||
|
||||
// Remove session_latest first, and then create a new symbolic link for session_latest.
|
||||
final String parentOfSessionDir = new File(rayConfig.sessionDir).getParent();
|
||||
final File sessionLatest = new File(
|
||||
String.format("%s/%s", parentOfSessionDir, SESSION_LATEST));
|
||||
if (sessionLatest.exists()) {
|
||||
sessionLatest.delete();
|
||||
}
|
||||
Files.createSymbolicLink(
|
||||
Paths.get(sessionLatest.getAbsolutePath()),
|
||||
Paths.get(rayConfig.sessionDir));
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Couldn't create temp directories.", e);
|
||||
throw new RuntimeException(e);
|
||||
|
@ -171,7 +187,6 @@ public class RunManager {
|
|||
public void startRayProcesses(boolean isHead) {
|
||||
LOGGER.info("Starting ray processes @ {}.", rayConfig.nodeIp);
|
||||
try {
|
||||
createTempDirs();
|
||||
if (isHead) {
|
||||
startGcs();
|
||||
}
|
||||
|
@ -218,50 +233,47 @@ public class RunManager {
|
|||
}
|
||||
|
||||
// See `src/ray/gcs/gcs_server/gcs_server_main.cc` for the meaning of each parameter.
|
||||
try (FileUtil.TempFile gcsServerFile = FileUtil.getTempFileFromResource("gcs_server")) {
|
||||
gcsServerFile.getFile().setExecutable(true);
|
||||
List<String> command = ImmutableList.of(
|
||||
gcsServerFile.getFile().getAbsolutePath(),
|
||||
String.format("--redis_address=%s", rayConfig.getRedisIp()),
|
||||
String.format("--redis_port=%d", rayConfig.getRedisPort()),
|
||||
String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)),
|
||||
String.format("--redis_password=%s", redisPasswordOption)
|
||||
);
|
||||
|
||||
startProcess(command, null, "gcs_server");
|
||||
}
|
||||
final File gcsServerFile = BinaryFileUtil.getFile(
|
||||
rayConfig.sessionDir, BinaryFileUtil.GCS_SERVER_BINARY_NAME);
|
||||
Preconditions.checkState(gcsServerFile.setExecutable(true));
|
||||
List<String> command = ImmutableList.of(
|
||||
gcsServerFile.getAbsolutePath(),
|
||||
String.format("--redis_address=%s", rayConfig.getRedisIp()),
|
||||
String.format("--redis_port=%d", rayConfig.getRedisPort()),
|
||||
String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)),
|
||||
String.format("--redis_password=%s", redisPasswordOption)
|
||||
);
|
||||
startProcess(command, null, "gcs_server");
|
||||
}
|
||||
}
|
||||
|
||||
private String startRedisInstance(String ip, int port, String password, Integer shard) {
|
||||
try (FileUtil.TempFile redisServerFile = FileUtil.getTempFileFromResource("redis-server")) {
|
||||
try (FileUtil.TempFile redisModuleFile = FileUtil.getTempFileFromResource(
|
||||
"libray_redis_module.so")) {
|
||||
redisServerFile.getFile().setExecutable(true);
|
||||
List<String> command = Lists.newArrayList(
|
||||
// The redis-server executable file.
|
||||
redisServerFile.getFile().getAbsolutePath(),
|
||||
"--protected-mode",
|
||||
"no",
|
||||
"--port",
|
||||
String.valueOf(port),
|
||||
"--loglevel",
|
||||
"warning",
|
||||
"--loadmodule",
|
||||
// The redis module file.
|
||||
redisModuleFile.getFile().getAbsolutePath()
|
||||
);
|
||||
final File redisServerFile = BinaryFileUtil.getFile(
|
||||
rayConfig.sessionDir, BinaryFileUtil.REDIS_SERVER_BINARY_NAME);
|
||||
Preconditions.checkState(redisServerFile.setExecutable(true));
|
||||
List<String> command = Lists.newArrayList(
|
||||
// The redis-server executable file.
|
||||
redisServerFile.getAbsolutePath(),
|
||||
"--protected-mode",
|
||||
"no",
|
||||
"--port",
|
||||
String.valueOf(port),
|
||||
"--loglevel",
|
||||
"warning",
|
||||
"--loadmodule",
|
||||
// The redis module file.
|
||||
BinaryFileUtil.getFile(
|
||||
rayConfig.sessionDir, BinaryFileUtil.REDIS_MODULE_LIBRARY_NAME).getAbsolutePath()
|
||||
);
|
||||
|
||||
if (!Strings.isNullOrEmpty(password)) {
|
||||
command.add("--requirepass ");
|
||||
command.add(password);
|
||||
}
|
||||
|
||||
String name = shard == null ? "redis" : "redis-" + shard;
|
||||
startProcess(command, null, name);
|
||||
}
|
||||
if (!Strings.isNullOrEmpty(password)) {
|
||||
command.add("--requirepass ");
|
||||
command.add(password);
|
||||
}
|
||||
|
||||
String name = shard == null ? "redis" : "redis-" + shard;
|
||||
startProcess(command, null, name);
|
||||
|
||||
try (Jedis client = new Jedis("127.0.0.1", port)) {
|
||||
if (!Strings.isNullOrEmpty(password)) {
|
||||
client.auth(password);
|
||||
|
@ -287,30 +299,30 @@ public class RunManager {
|
|||
}
|
||||
|
||||
// See `src/ray/raylet/main.cc` for the meaning of each parameter.
|
||||
try (FileUtil.TempFile rayletFile = FileUtil.getTempFileFromResource("raylet")) {
|
||||
rayletFile.getFile().setExecutable(true);
|
||||
List<String> command = ImmutableList.of(
|
||||
rayletFile.getFile().getAbsolutePath(),
|
||||
String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName),
|
||||
String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName),
|
||||
String.format("--object_manager_port=%d", 0), // The object manager port.
|
||||
// The node manager port.
|
||||
String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()),
|
||||
String.format("--node_ip_address=%s", rayConfig.nodeIp),
|
||||
String.format("--redis_address=%s", rayConfig.getRedisIp()),
|
||||
String.format("--redis_port=%d", rayConfig.getRedisPort()),
|
||||
String.format("--num_initial_workers=%d", 0), // number of initial workers
|
||||
String.format("--maximum_startup_concurrency=%d", maximumStartupConcurrency),
|
||||
String.format("--static_resource_list=%s",
|
||||
ResourceUtil.getResourcesStringFromMap(rayConfig.resources)),
|
||||
String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)),
|
||||
String.format("--python_worker_command=%s", buildPythonWorkerCommand()),
|
||||
String.format("--java_worker_command=%s", buildWorkerCommandRaylet()),
|
||||
String.format("--redis_password=%s", redisPasswordOption)
|
||||
);
|
||||
final File rayletFile = BinaryFileUtil.getFile(
|
||||
rayConfig.sessionDir, BinaryFileUtil.RAYLET_BINARY_NAME);
|
||||
Preconditions.checkState(rayletFile.setExecutable(true));
|
||||
List<String> command = ImmutableList.of(
|
||||
rayletFile.getAbsolutePath(),
|
||||
String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName),
|
||||
String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName),
|
||||
String.format("--object_manager_port=%d", 0), // The object manager port.
|
||||
// The node manager port.
|
||||
String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()),
|
||||
String.format("--node_ip_address=%s", rayConfig.nodeIp),
|
||||
String.format("--redis_address=%s", rayConfig.getRedisIp()),
|
||||
String.format("--redis_port=%d", rayConfig.getRedisPort()),
|
||||
String.format("--num_initial_workers=%d", 0), // number of initial workers
|
||||
String.format("--maximum_startup_concurrency=%d", maximumStartupConcurrency),
|
||||
String.format("--static_resource_list=%s",
|
||||
ResourceUtil.getResourcesStringFromMap(rayConfig.resources)),
|
||||
String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)),
|
||||
String.format("--python_worker_command=%s", buildPythonWorkerCommand()),
|
||||
String.format("--java_worker_command=%s", buildWorkerCommand()),
|
||||
String.format("--redis_password=%s", redisPasswordOption)
|
||||
);
|
||||
|
||||
startProcess(command, null, "raylet");
|
||||
}
|
||||
startProcess(command, null, "raylet");
|
||||
}
|
||||
|
||||
private String concatPath(Stream<String> stream) {
|
||||
|
@ -319,7 +331,7 @@ public class RunManager {
|
|||
return stream.filter(s -> !s.contains(" ")).collect(Collectors.joining(":"));
|
||||
}
|
||||
|
||||
private String buildWorkerCommandRaylet() {
|
||||
private String buildWorkerCommand() {
|
||||
List<String> cmd = new ArrayList<>();
|
||||
cmd.add("java");
|
||||
cmd.add("-classpath");
|
||||
|
@ -335,6 +347,9 @@ public class RunManager {
|
|||
String libraryPath = concatPath(rayConfig.libraryPath.stream());
|
||||
cmd.add("-Djava.library.path=" + libraryPath);
|
||||
|
||||
// session path
|
||||
cmd.add("-Dray.session-dir=" + rayConfig.sessionDir);
|
||||
|
||||
// logging path
|
||||
if (rayConfig.redirectOutput) {
|
||||
cmd.add("-Dray.logging.stdout=org.apache.log4j.varia.NullAppender");
|
||||
|
@ -379,21 +394,21 @@ public class RunManager {
|
|||
}
|
||||
|
||||
private void startObjectStore() {
|
||||
try (FileUtil.TempFile plasmaStoreFile = FileUtil
|
||||
.getTempFileFromResource("plasma_store_server")) {
|
||||
plasmaStoreFile.getFile().setExecutable(true);
|
||||
List<String> command = ImmutableList.of(
|
||||
// The plasma store executable file.
|
||||
plasmaStoreFile.getFile().getAbsolutePath(),
|
||||
"-s",
|
||||
rayConfig.objectStoreSocketName,
|
||||
"-m",
|
||||
rayConfig.objectStoreSize.toString()
|
||||
);
|
||||
startProcess(command, null, "plasma_store");
|
||||
}
|
||||
final File objectStoreFile = BinaryFileUtil.getFile(
|
||||
rayConfig.sessionDir, BinaryFileUtil.PLASMA_STORE_SERVER_BINARY_NAME);
|
||||
Preconditions.checkState(objectStoreFile.setExecutable(true));
|
||||
List<String> command = ImmutableList.of(
|
||||
// The plasma store executable file.
|
||||
objectStoreFile.getAbsolutePath(),
|
||||
"-s",
|
||||
rayConfig.objectStoreSocketName,
|
||||
"-m",
|
||||
rayConfig.objectStoreSize.toString()
|
||||
);
|
||||
startProcess(command, null, "plasma_store");
|
||||
}
|
||||
|
||||
|
||||
private String buildPythonWorkerCommand() {
|
||||
// disable python worker start from raylet, which starts from java
|
||||
if (rayConfig.pythonWorkerCommand == null) {
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
package org.ray.runtime.util;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
public class BinaryFileUtil {
|
||||
public static final String REDIS_SERVER_BINARY_NAME = "redis-server";
|
||||
|
||||
public static final String GCS_SERVER_BINARY_NAME = "gcs_server";
|
||||
|
||||
public static final String PLASMA_STORE_SERVER_BINARY_NAME = "plasma_store_server";
|
||||
|
||||
public static final String RAYLET_BINARY_NAME = "raylet";
|
||||
|
||||
public static final String REDIS_MODULE_LIBRARY_NAME = "libray_redis_module.so";
|
||||
|
||||
public static final String CORE_WORKER_JAVA_LIBRARY =
|
||||
System.mapLibraryName("core_worker_library_java");
|
||||
|
||||
public static File getFile(String destDir, String fileName) {
|
||||
File file = new File(String.format("%s/%s", destDir, fileName));
|
||||
if (file.exists()) {
|
||||
return file;
|
||||
}
|
||||
|
||||
final File dir = file.getParentFile();
|
||||
try {
|
||||
if (!dir.exists()) {
|
||||
FileUtils.forceMkdir(dir);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Couldn't make directory: " + dir.getAbsolutePath(), e);
|
||||
}
|
||||
// File does not exist.
|
||||
try (InputStream is = BinaryFileUtil.class.getResourceAsStream("/" + fileName)) {
|
||||
Preconditions.checkNotNull(is, "{} doesn't exist.", fileName);
|
||||
Files.copy(is, Paths.get(file.getCanonicalPath()));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Couldn't get temp file from resource " + fileName, e);
|
||||
}
|
||||
return file;
|
||||
}
|
||||
}
|
|
@ -1,68 +0,0 @@
|
|||
package org.ray.runtime.util;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class FileUtil {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FileUtil.class);
|
||||
|
||||
/**
|
||||
* Represents a temp file.
|
||||
*
|
||||
* This class implements the `AutoCloseable` interface. It can be used in a `try-with-resource`
|
||||
* block. When exiting the block, the temp file will be automatically removed.
|
||||
*/
|
||||
public static class TempFile implements AutoCloseable {
|
||||
|
||||
File file;
|
||||
|
||||
TempFile(File file) {
|
||||
this.file = file;
|
||||
}
|
||||
|
||||
public File getFile() {
|
||||
return file;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (!file.delete()) {
|
||||
LOGGER.warn("Couldn't delete temp file {}", file.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a temp file from resource.
|
||||
*
|
||||
* @param resourceFileName File name.
|
||||
* @return A `TempFile` object.
|
||||
*/
|
||||
public static TempFile getTempFileFromResource(String resourceFileName) {
|
||||
File file;
|
||||
try {
|
||||
file = File.createTempFile(resourceFileName, "");
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Couldn't create temp file " + resourceFileName, e);
|
||||
}
|
||||
|
||||
try (InputStream in = FileUtil.class.getResourceAsStream("/" + resourceFileName)) {
|
||||
Preconditions.checkNotNull(in, "{} doesn't exist.", resourceFileName);
|
||||
Files.copy(in, Paths.get(file.getCanonicalPath()), StandardCopyOption.REPLACE_EXISTING);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Couldn't get temp file from resource " + resourceFileName, e);
|
||||
}
|
||||
|
||||
return new TempFile(file);
|
||||
}
|
||||
}
|
||||
|
|
@ -3,8 +3,10 @@ package org.ray.runtime.util;
|
|||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.sun.jna.NativeLibrary;
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Set;
|
||||
import org.ray.runtime.config.RayConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -36,18 +38,17 @@ public class JniUtils {
|
|||
LOGGER.debug("Loading native library {}.", libraryName);
|
||||
// Load native library.
|
||||
String fileName = System.mapLibraryName(libraryName);
|
||||
String libPath = null;
|
||||
try (FileUtil.TempFile libFile = FileUtil.getTempFileFromResource(fileName)) {
|
||||
libPath = libFile.getFile().getAbsolutePath();
|
||||
if (exportSymbols) {
|
||||
// Expose library symbols using RTLD_GLOBAL which may be depended by other shared
|
||||
// libraries.
|
||||
NativeLibrary.getInstance(libFile.getFile().getAbsolutePath());
|
||||
}
|
||||
System.load(libPath);
|
||||
final String sessionDir = RayConfig.getInstance().sessionDir;
|
||||
final File file = BinaryFileUtil.getFile(sessionDir, fileName);
|
||||
|
||||
if (exportSymbols) {
|
||||
// Expose library symbols using RTLD_GLOBAL which may be depended by other shared
|
||||
// libraries.
|
||||
NativeLibrary.getInstance(file.getAbsolutePath());
|
||||
}
|
||||
System.load(file.getAbsolutePath());
|
||||
LOGGER.debug("Native library loaded.");
|
||||
resetLibraryPath(libPath);
|
||||
resetLibraryPath(file.getAbsolutePath());
|
||||
loadedLibs.add(libraryName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,8 @@ ray {
|
|||
}
|
||||
|
||||
// Root dir of log files.
|
||||
log-dir: /tmp/ray/logs
|
||||
// If this is not set, the default log-dir will be `${temp-dir}/session_xxx/logs`.
|
||||
log-dir: ""
|
||||
|
||||
// If true, output of worker processes will be redirected to log files.
|
||||
// Otherwise, output will be printed to console.
|
||||
|
@ -74,8 +75,9 @@ ray {
|
|||
// Object store configurations
|
||||
// ----------------------------
|
||||
object-store {
|
||||
// RPC socket name of object store
|
||||
socket-name: /tmp/ray/sockets/object_store
|
||||
// RPC socket name of object store.
|
||||
// If this is not set, the default name will be `${temp-dir}/session_xxx/sockets/object_store`.
|
||||
socket-name: ""
|
||||
// Initial size of the object store.
|
||||
size: 10 MB
|
||||
}
|
||||
|
@ -84,8 +86,9 @@ ray {
|
|||
// Raylet configurations
|
||||
// ----------------------------
|
||||
raylet {
|
||||
// RPC socket name of Raylet
|
||||
socket-name: /tmp/ray/sockets/raylet
|
||||
// RPC socket name of Raylet.
|
||||
// If this is not set, the default name will be `${temp-dir}/session_xxx/sockets/raylet`.
|
||||
socket-name: ""
|
||||
// Listening port for node manager.
|
||||
node-manager-port: 0
|
||||
|
||||
|
|
|
@ -1328,7 +1328,7 @@ def build_java_worker_command(
|
|||
|
||||
command += "-Dray.home={} ".format(RAY_HOME)
|
||||
command += "-Dray.log-dir={} ".format(os.path.join(session_dir, "logs"))
|
||||
|
||||
command += "-Dray.session-dir={}".format(session_dir)
|
||||
command += ("-Dray.raylet.config.num_workers_per_process_java=" +
|
||||
"RAY_WORKER_NUM_WORKERS_PLACEHOLDER ")
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@ import setuptools.command.build_ext as _build_ext
|
|||
# manually.
|
||||
|
||||
# NOTE: The lists below must be kept in sync with ray/BUILD.bazel.
|
||||
|
||||
ray_files = [
|
||||
"ray/core/src/ray/thirdparty/redis/src/redis-server",
|
||||
"ray/core/src/ray/gcs/redis_module/libray_redis_module.so",
|
||||
|
|
Loading…
Add table
Reference in a new issue