mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[Java] Change log dir to /tmp/raylogs (#2677)
Currently, log directory in Java is a relative path . This PR changes it to `/tmp/raylogs` (with the same format as Python, e.g., `local_scheduler-2018-51-17_17-8-6-05164.err`). It also cleans up some relative code.
This commit is contained in:
parent
e56eb354eb
commit
78b6bfb7f9
8 changed files with 101 additions and 191 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -153,7 +153,6 @@ build
|
|||
|
||||
# Java
|
||||
java/**/target
|
||||
java/run
|
||||
java/**/lib
|
||||
java/**/.settings
|
||||
java/**/.classpath
|
||||
|
|
|
@ -6,14 +6,9 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import net.lingala.zip4j.core.ZipFile;
|
||||
import net.lingala.zip4j.exception.ZipException;
|
||||
import org.ray.api.UniqueID;
|
||||
import org.ray.cli.CommandStart;
|
||||
import org.ray.cli.CommandStop;
|
||||
import org.ray.core.RayRuntime;
|
||||
import org.ray.core.model.RayParameters;
|
||||
import org.ray.core.model.RunMode;
|
||||
import org.ray.runner.RunInfo;
|
||||
import org.ray.runner.RunManager;
|
||||
import org.ray.runner.worker.DefaultDriver;
|
||||
import org.ray.spi.KeyValueStoreLink;
|
||||
|
@ -73,7 +68,7 @@ public class RayCli {
|
|||
RayParameters params = new RayParameters(config);
|
||||
|
||||
// Init RayLog before using it.
|
||||
RayLog.init(params.working_directory);
|
||||
RayLog.init(params.log_dir);
|
||||
|
||||
RayLog.core.info("Using IP address {} for this node.", params.node_ip_address);
|
||||
RunManager manager;
|
||||
|
@ -173,7 +168,7 @@ public class RayCli {
|
|||
UniqueID resourceId = functionManager.registerResource(zip);
|
||||
|
||||
// Init RayLog before using it.
|
||||
RayLog.init(params.working_directory);
|
||||
RayLog.init(params.log_dir);
|
||||
|
||||
RayLog.rapp.debug(
|
||||
"registerResource " + resourceId + " for package " + packageName + " done");
|
||||
|
|
|
@ -22,14 +22,13 @@ public class RayLog {
|
|||
public static Logger rapp;
|
||||
|
||||
/**
|
||||
* it must be called before using Ray loggers,
|
||||
* or the dynamic update does not work.
|
||||
* @param workingDir store the logs under params.working_directory
|
||||
* Initialize loggers
|
||||
* @param logDir directory of the log files.
|
||||
*/
|
||||
public static void init(String workingDir) {
|
||||
public static void init(String logDir) {
|
||||
String loggingPath = System.getProperty("logging.path");
|
||||
if (loggingPath == null) {
|
||||
System.setProperty("logging.path", workingDir + "/logs");
|
||||
System.setProperty("logging.path", logDir);
|
||||
}
|
||||
String loggingFileName = System.getProperty("logging.file.name");
|
||||
if (loggingFileName != null && loggingFileName.contains("*pid_suffix*")) {
|
||||
|
|
|
@ -42,8 +42,6 @@ num_workers = 2
|
|||
|
||||
driver_id = 0123456789abcdef0123456789abcdef01234567
|
||||
|
||||
working_directory = %CONFIG_FILE_DIR%/run
|
||||
|
||||
redis_port = 34111
|
||||
|
||||
num_local_schedulers = 1
|
||||
|
|
|
@ -80,7 +80,7 @@ public abstract class RayRuntime implements RayApi {
|
|||
configReader = new ConfigReader(configPath, updateConfigStr);
|
||||
RayRuntime.params = new RayParameters(configReader);
|
||||
|
||||
RayLog.init(params.working_directory);
|
||||
RayLog.init(params.log_dir);
|
||||
assert RayLog.core != null;
|
||||
|
||||
ins = instantiate(params);
|
||||
|
|
|
@ -46,8 +46,8 @@ public class RayParameters {
|
|||
@AConfig(comment = "driver ID when the worker is served as a driver")
|
||||
public UniqueID driver_id = UniqueID.nil;
|
||||
|
||||
@AConfig(comment = "working directory")
|
||||
public String working_directory = "./run";
|
||||
@AConfig(comment = "logging directory")
|
||||
public String log_dir = "/tmp/raylogs";
|
||||
|
||||
@AConfig(comment = "primary redis port")
|
||||
public int redis_port = 34222;
|
||||
|
|
|
@ -5,7 +5,7 @@ public class ProcessInfo {
|
|||
public Process process;
|
||||
public String[] cmd;
|
||||
public RunInfo.ProcessType type;
|
||||
public String workDir;
|
||||
public String name;
|
||||
public String redisAddress;
|
||||
public String ip;
|
||||
public boolean redirect;
|
||||
|
|
|
@ -1,15 +1,17 @@
|
|||
package org.ray.runner;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import org.ray.api.UniqueID;
|
||||
import org.ray.core.model.RayParameters;
|
||||
import org.ray.core.model.RunMode;
|
||||
|
@ -29,18 +31,20 @@ public class RunManager {
|
|||
|
||||
public static final int INT16_MAX = 32767;
|
||||
|
||||
private static final DateTimeFormatter DATE_TIME_FORMATTER =
|
||||
DateTimeFormatter.ofPattern("Y-m-d_H-M-S");
|
||||
|
||||
private RayParameters params;
|
||||
|
||||
private PathConfig paths;
|
||||
|
||||
private ConfigReader configReader;
|
||||
|
||||
private String procStdoutFileName = "";
|
||||
|
||||
private String procStderrFileName = "";
|
||||
|
||||
private RunInfo runInfo = new RunInfo();
|
||||
|
||||
private Random random = new Random();
|
||||
|
||||
|
||||
public RunManager(RayParameters params, PathConfig paths, ConfigReader configReader) {
|
||||
this.params = params;
|
||||
this.paths = paths;
|
||||
|
@ -60,18 +64,6 @@ public class RunManager {
|
|||
return runInfo;
|
||||
}
|
||||
|
||||
public PathConfig getPathManager() {
|
||||
return paths;
|
||||
}
|
||||
|
||||
public String getProcStdoutFileName() {
|
||||
return procStdoutFileName;
|
||||
}
|
||||
|
||||
public String getProcStderrFileName() {
|
||||
return procStderrFileName;
|
||||
}
|
||||
|
||||
public void startRayHead() throws Exception {
|
||||
if (params.redis_address.length() != 0) {
|
||||
throw new Exception("Redis address must be empty in head node.");
|
||||
|
@ -110,7 +102,7 @@ public class RunManager {
|
|||
}
|
||||
|
||||
public Process startDriver(String mainClass, String redisAddress, UniqueID driverId,
|
||||
String workDir, String ip,
|
||||
String logDir, String ip,
|
||||
String driverClass, String driverArgs, String additonalClassPaths,
|
||||
String additionalConfigs) {
|
||||
String driverConfigs =
|
||||
|
@ -131,10 +123,9 @@ public class RunManager {
|
|||
additonalClassPaths,
|
||||
additionalConfigs,
|
||||
"",
|
||||
workDir,
|
||||
ip,
|
||||
redisAddress,
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
null
|
||||
);
|
||||
|
@ -142,19 +133,19 @@ public class RunManager {
|
|||
|
||||
private Process startJavaProcess(RunInfo.ProcessType pt, String mainClass,
|
||||
String additonalClassPaths, String additionalConfigs,
|
||||
String additionalJvmArgs, String workDir, String ip, String
|
||||
String additionalJvmArgs, String ip, String
|
||||
redisAddr, boolean redirect,
|
||||
boolean cleanup, String agentlibAddr) {
|
||||
|
||||
String cmd = buildJavaProcessCommand(pt, mainClass, additonalClassPaths, additionalConfigs,
|
||||
additionalJvmArgs, workDir, ip, redisAddr, agentlibAddr);
|
||||
return startProcess(cmd.split(" "), null, pt, workDir, redisAddr, ip, redirect, cleanup);
|
||||
additionalJvmArgs, ip, redisAddr, agentlibAddr);
|
||||
return startProcess(cmd.split(" "), null, pt, "", redisAddr, ip, redirect, cleanup);
|
||||
}
|
||||
|
||||
private String buildJavaProcessCommand(
|
||||
RunInfo.ProcessType pt, String mainClass, String additionalClassPaths,
|
||||
String additionalConfigs,
|
||||
String additionalJvmArgs, String workDir, String ip, String redisAddr, String agentlibAddr) {
|
||||
String additionalJvmArgs, String ip, String redisAddr, String agentlibAddr) {
|
||||
String cmd = "java -ea -noverify " + params.jvm_parameters + " ";
|
||||
if (agentlibAddr != null && !agentlibAddr.equals("")) {
|
||||
cmd += " -agentlib:jdwp=transport=dt_socket,address=" + agentlibAddr + ",server=y,suspend=n";
|
||||
|
@ -178,7 +169,7 @@ public class RunManager {
|
|||
cmd += " --overwrite="
|
||||
+ section + "node_ip_address=" + ip + ";"
|
||||
+ section + "redis_address=" + redisAddr + ";"
|
||||
+ section + "working_directory=" + workDir + ";"
|
||||
+ section + "log_dir=" + params.log_dir + ";"
|
||||
+ section + "run_mode=" + params.run_mode;
|
||||
|
||||
if (additionalConfigs.length() > 0) {
|
||||
|
@ -189,34 +180,21 @@ public class RunManager {
|
|||
}
|
||||
|
||||
private Process startProcess(String[] cmd, Map<String, String> env, RunInfo.ProcessType type,
|
||||
String workDir,
|
||||
String name,
|
||||
String redisAddress, String ip, boolean redirect,
|
||||
boolean cleanup) {
|
||||
File wdir = new File(workDir);
|
||||
if (!wdir.exists()) {
|
||||
wdir.mkdirs();
|
||||
}
|
||||
|
||||
int processIndex = runInfo.allProcesses.get(type.ordinal()).size();
|
||||
|
||||
ProcessBuilder builder;
|
||||
List<String> newCommand = Arrays.asList(cmd);
|
||||
builder = new ProcessBuilder(newCommand);
|
||||
builder.directory(new File(workDir));
|
||||
|
||||
if (redirect) {
|
||||
String stdoutFile;
|
||||
String stderrFile;
|
||||
stdoutFile = workDir + "/" + processIndex + ".out.txt";
|
||||
stderrFile = workDir + "/" + processIndex + ".err.txt";
|
||||
builder.redirectOutput(new File(stdoutFile));
|
||||
builder.redirectError(new File(stderrFile));
|
||||
List<String> stdFileList = new ArrayList<>();
|
||||
stdFileList.add(stdoutFile);
|
||||
stdFileList.add(stderrFile);
|
||||
record_log_files_in_redis(redisAddress, ip, stdFileList);
|
||||
procStdoutFileName = stdoutFile;
|
||||
procStderrFileName = stderrFile;
|
||||
int logId = random.nextInt(10000);
|
||||
String date = DATE_TIME_FORMATTER.format(LocalDateTime.now());
|
||||
String stdout = String.format("%s/%s-%s-%05d.out", params.log_dir, name, date, logId);
|
||||
String stderr = String.format("%s/%s-%s-%05d.err", params.log_dir, name, date, logId);
|
||||
builder.redirectOutput(new File(stdout));
|
||||
builder.redirectError(new File(stderr));
|
||||
recordLogFilesInRedis(redisAddress, ip, ImmutableList.of(stdout, stderr));
|
||||
}
|
||||
|
||||
if (env != null && !env.isEmpty()) {
|
||||
|
@ -227,16 +205,11 @@ public class RunManager {
|
|||
try {
|
||||
p = builder.start();
|
||||
} catch (IOException e) {
|
||||
RayLog.core
|
||||
.error("Start process " + Arrays.toString(cmd).replace(',', ' ') + " in working dir '"
|
||||
+ workDir + "' failed",
|
||||
e);
|
||||
RayLog.core.error("Failed to start process {}", name, e);
|
||||
return null;
|
||||
}
|
||||
|
||||
RayLog.core.info(
|
||||
"Start process " + p.hashCode() + " OK, cmd = " + Arrays.toString(cmd).replace(',', ' ')
|
||||
+ ", working dir = '" + workDir + "'" + (redirect ? ", redirect" : ", no redirect"));
|
||||
RayLog.core.info("Process {} started", name);
|
||||
|
||||
if (cleanup) {
|
||||
runInfo.toBeCleanedProcesses.get(type.ordinal()).add(p);
|
||||
|
@ -245,7 +218,7 @@ public class RunManager {
|
|||
ProcessInfo processInfo = new ProcessInfo();
|
||||
processInfo.cmd = cmd;
|
||||
processInfo.type = type;
|
||||
processInfo.workDir = workDir;
|
||||
processInfo.name = name;
|
||||
processInfo.redisAddress = redisAddress;
|
||||
processInfo.ip = ip;
|
||||
processInfo.redirect = redirect;
|
||||
|
@ -256,28 +229,28 @@ public class RunManager {
|
|||
return p;
|
||||
}
|
||||
|
||||
private void record_log_files_in_redis(String redisAddress, String nodeIpAddress,
|
||||
List<String> logfiles) {
|
||||
private void recordLogFilesInRedis(String redisAddress, String nodeIpAddress,
|
||||
List<String> logFiles) {
|
||||
if (redisAddress != null && !redisAddress.isEmpty() && nodeIpAddress != null
|
||||
&& !nodeIpAddress.isEmpty() && logfiles.size() > 0) {
|
||||
&& !nodeIpAddress.isEmpty() && logFiles.size() > 0) {
|
||||
String[] ipPort = redisAddress.split(":");
|
||||
Jedis jedisClient = new Jedis(ipPort[0], Integer.parseInt(ipPort[1]));
|
||||
String logFileListKey = String.format("LOG_FILENAMES:{%s}", nodeIpAddress);
|
||||
for (String logfile : logfiles) {
|
||||
for (String logfile : logFiles) {
|
||||
jedisClient.rpush(logFileListKey, logfile);
|
||||
}
|
||||
jedisClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void startRayProcesses() {
|
||||
private void startRayProcesses() {
|
||||
Jedis redisClient = null;
|
||||
|
||||
RayLog.core.info("start ray processes @ " + params.node_ip_address + " ...");
|
||||
|
||||
// start primary redis
|
||||
if (params.redis_address.length() == 0) {
|
||||
List<String> primaryShards = startRedis(params.working_directory + "/redis",
|
||||
List<String> primaryShards = startRedis(
|
||||
params.node_ip_address, params.redis_port, 1, params.redirect, params.cleanup);
|
||||
params.redis_address = primaryShards.get(0);
|
||||
|
||||
|
@ -295,7 +268,7 @@ public class RunManager {
|
|||
|
||||
// start redis shards
|
||||
if (params.start_redis_shards) {
|
||||
runInfo.redisShards = startRedis(params.working_directory + "/redis/shards",
|
||||
runInfo.redisShards = startRedis(
|
||||
params.node_ip_address, params.redis_port + 1, params.num_redis_shards,
|
||||
params.redirect,
|
||||
params.cleanup);
|
||||
|
@ -310,7 +283,7 @@ public class RunManager {
|
|||
|
||||
// start global scheduler
|
||||
if (params.include_global_scheduler && !params.use_raylet) {
|
||||
startGlobalScheduler(params.working_directory + "/globalScheduler",
|
||||
startGlobalScheduler(
|
||||
params.redis_address, params.node_ip_address, params.redirect, params.cleanup);
|
||||
}
|
||||
|
||||
|
@ -349,7 +322,7 @@ public class RunManager {
|
|||
int rpcPort = params.object_store_rpc_port;
|
||||
String storeName = "/tmp/plasma_store" + rpcPort;
|
||||
|
||||
startObjectStore(0, info, params.working_directory + "/store",
|
||||
startObjectStore(0, info,
|
||||
params.redis_address, params.node_ip_address, params.redirect, params.cleanup);
|
||||
|
||||
Map<String, Double> staticResources =
|
||||
|
@ -357,18 +330,18 @@ public class RunManager {
|
|||
|
||||
//Start raylet
|
||||
startRaylet(storeName, info, params.num_workers,
|
||||
params.working_directory + "/raylet", params.redis_address,
|
||||
params.redis_address,
|
||||
params.node_ip_address, params.redirect, staticResources, params.cleanup);
|
||||
|
||||
runInfo.localStores.add(info);
|
||||
} else {
|
||||
for (int i = 0; i < params.num_local_schedulers; i++) {
|
||||
// Start object stores
|
||||
startObjectStore(i, info, params.working_directory + "/store",
|
||||
startObjectStore(i, info,
|
||||
params.redis_address, params.node_ip_address, params.redirect, params.cleanup);
|
||||
|
||||
startObjectManager(i, info,
|
||||
params.working_directory + "/storeManager", params.redis_address,
|
||||
params.redis_address,
|
||||
params.node_ip_address, params.redirect, params.cleanup);
|
||||
|
||||
// Start local scheduler
|
||||
|
@ -381,7 +354,7 @@ public class RunManager {
|
|||
|
||||
startLocalScheduler(i, info,
|
||||
params.num_cpus[i], params.num_gpus[i], workerCount,
|
||||
params.working_directory + "/localsc", params.redis_address,
|
||||
params.redis_address,
|
||||
params.node_ip_address, params.redirect, params.cleanup);
|
||||
|
||||
runInfo.localStores.add(info);
|
||||
|
@ -395,7 +368,7 @@ public class RunManager {
|
|||
localStores.workerCount = localNumWorkers[i];
|
||||
for (int j = 0; j < localNumWorkers[i]; j++) {
|
||||
startWorker(localStores.storeName, localStores.managerName, localStores.schedulerName,
|
||||
params.working_directory + "/worker" + i + "." + j, params.redis_address,
|
||||
"/worker" + i + "." + j, params.redis_address,
|
||||
params.node_ip_address, UniqueID.nil, "", params.redirect, params.cleanup);
|
||||
}
|
||||
}
|
||||
|
@ -415,7 +388,7 @@ public class RunManager {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean checkAlive(HashSet<RunInfo.ProcessType> excludeTypes) {
|
||||
private boolean checkAlive(HashSet<RunInfo.ProcessType> excludeTypes) {
|
||||
RunInfo.ProcessType[] types = RunInfo.ProcessType.values();
|
||||
for (int i = 0; i < types.length; i++) {
|
||||
if (excludeTypes.contains(types[i])) {
|
||||
|
@ -439,43 +412,6 @@ public class RunManager {
|
|||
return runInfo.deadProcess.isEmpty();
|
||||
}
|
||||
|
||||
public boolean tryRecoverDeadProcess() {
|
||||
|
||||
if (runInfo.deadProcess.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/* check the dead process */
|
||||
for (ProcessInfo info : runInfo.deadProcess) {
|
||||
if (info.type == RunInfo.ProcessType.PT_LOCAL_SCHEDULER
|
||||
|| info.type == RunInfo.ProcessType.PT_PLASMA_STORE
|
||||
|| info.type == RunInfo.ProcessType.PT_PLASMA_MANAGER) {
|
||||
/* When local scheduler or plasma store or plasma manager process dead, we can not
|
||||
* recover this node simply by restarting the dead process. Instead, We need to restart
|
||||
* all the node processes
|
||||
* */
|
||||
RayLog.core
|
||||
.error(info.type.name() + "process dead, we can not simply restart this process");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/* try to recover */
|
||||
ProcessInfo info;
|
||||
for (int i = 0; i < runInfo.deadProcess.size(); i++) {
|
||||
info = runInfo.deadProcess.get(i);
|
||||
if (info.type == RunInfo.ProcessType.PT_GLOBAL_SCHEDULER) {
|
||||
RayLog.core.error(info.type.name() + "process dead, restart this process");
|
||||
startProcess(info.cmd, null, info.type, info.workDir, info.redisAddress, info.ip,
|
||||
info.redirect, info.cleanup);
|
||||
} else {
|
||||
RayLog.core.error(info.type.name() + "process dead, we don't deal with it");
|
||||
}
|
||||
}
|
||||
runInfo.deadProcess.clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
// kill all processes started by startRayHead
|
||||
public void cleanup(boolean killAll) {
|
||||
// clean up the process in reverse order
|
||||
|
@ -512,12 +448,12 @@ public class RunManager {
|
|||
// when the worker exits
|
||||
// @return primary redis shard address
|
||||
//
|
||||
private List<String> startRedis(String workDir, String ip, int port, int numOfShards,
|
||||
private List<String> startRedis(String ip, int port, int numOfShards,
|
||||
boolean redirect, boolean cleanup) {
|
||||
ArrayList<String> shards = new ArrayList<>();
|
||||
String addr;
|
||||
for (int i = 0; i < numOfShards; i++) {
|
||||
addr = startRedisInstance(workDir, ip, port + i, redirect, cleanup);
|
||||
addr = startRedisInstance(ip, port + i, redirect, cleanup);
|
||||
|
||||
if (addr.length() == 0) {
|
||||
cleanup(cleanup);
|
||||
|
@ -540,7 +476,7 @@ public class RunManager {
|
|||
// @param port given port for this redis instance, 0 for auto-selected port
|
||||
// @return redis server address
|
||||
//
|
||||
private String startRedisInstance(String workDir, String ip, int port,
|
||||
private String startRedisInstance(String ip, int port,
|
||||
boolean redirect, boolean cleanup) {
|
||||
String redisFilePath = paths.redis_server;
|
||||
String redisModule = paths.redis_module;
|
||||
|
@ -548,13 +484,12 @@ public class RunManager {
|
|||
assert (new File(redisFilePath).exists()) : "file don't exsits : " + redisFilePath;
|
||||
assert (new File(redisModule).exists()) : "file don't exsits : " + redisModule;
|
||||
|
||||
|
||||
String cmd = redisFilePath + " --protected-mode no --port " + port + " --loglevel warning"
|
||||
+ " --loadmodule " + redisModule;
|
||||
|
||||
Map<String, String> env = null;
|
||||
Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_REDIS_SERVER,
|
||||
workDir + port, "", ip, redirect, cleanup);
|
||||
"redis", "", ip, redirect, cleanup);
|
||||
|
||||
if (p == null || !p.isAlive()) {
|
||||
return "";
|
||||
|
@ -578,31 +513,17 @@ public class RunManager {
|
|||
return ip + ":" + port;
|
||||
}
|
||||
|
||||
private void startGlobalScheduler(String workDir, String redisAddress, String ip,
|
||||
private void startGlobalScheduler(String redisAddress, String ip,
|
||||
boolean redirect, boolean cleanup) {
|
||||
String filePath = paths.global_scheduler;
|
||||
String cmd = filePath + " -r " + redisAddress + " -h " + ip;
|
||||
|
||||
Map<String, String> env = null;
|
||||
startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_GLOBAL_SCHEDULER, workDir,
|
||||
startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_GLOBAL_SCHEDULER, "global_scheduler",
|
||||
redisAddress,
|
||||
ip, redirect, cleanup);
|
||||
}
|
||||
|
||||
private Map<String, String> retrieveEnv(String conf, Map<String, String> env) {
|
||||
String[] splits = conf.split(" ");
|
||||
for (String item : splits) {
|
||||
int idx = item.trim().indexOf('=');
|
||||
if (idx == -1) {
|
||||
continue;
|
||||
}
|
||||
String key = item.substring(0, idx);
|
||||
String val = item.substring(idx + 1);
|
||||
env.put(key, val);
|
||||
}
|
||||
return env;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param storeName The name of the plasma store socket to connect to
|
||||
*
|
||||
|
@ -625,7 +546,7 @@ public class RunManager {
|
|||
* start
|
||||
*/
|
||||
private void startLocalScheduler(int index, AddressInfo info, int numCpus,
|
||||
int numGpus, int numWorkers, String workDir,
|
||||
int numGpus, int numWorkers,
|
||||
String redisAddress, String ip, boolean redirect,
|
||||
boolean cleanup) {
|
||||
//if (numCpus <= 0)
|
||||
|
@ -649,7 +570,7 @@ public class RunManager {
|
|||
|
||||
String workerCmd = null;
|
||||
workerCmd = buildWorkerCommand(true, info.storeName, info.managerName, name,
|
||||
UniqueID.nil, "", workDir + rpcPort, ip, redisAddress);
|
||||
UniqueID.nil, "", ip, redisAddress);
|
||||
cmd += " -w \"" + workerCmd + "\"";
|
||||
|
||||
if (redisAddress.length() > 0) {
|
||||
|
@ -662,7 +583,7 @@ public class RunManager {
|
|||
Map<String, String> env = null;
|
||||
String[] cmds = StringUtil.split(cmd, " ", "\"", "\"").toArray(new String[0]);
|
||||
Process p = startProcess(cmds, env, RunInfo.ProcessType.PT_LOCAL_SCHEDULER,
|
||||
workDir + rpcPort, redisAddress, ip, redirect, cleanup);
|
||||
"local_scheduler", redisAddress, ip, redirect, cleanup);
|
||||
|
||||
if (p != null && p.isAlive()) {
|
||||
try {
|
||||
|
@ -683,7 +604,7 @@ public class RunManager {
|
|||
}
|
||||
|
||||
private void startRaylet(String storeName, AddressInfo info, int numWorkers,
|
||||
String workDir, String redisAddress, String ip, boolean redirect,
|
||||
String redisAddress, String ip, boolean redirect,
|
||||
Map<String, Double> staticResources, boolean cleanup) {
|
||||
|
||||
int rpcPort = params.raylet_port;
|
||||
|
@ -693,7 +614,7 @@ public class RunManager {
|
|||
|
||||
//Create the worker command that the raylet will use to start workers.
|
||||
String workerCommand = buildWorkerCommandRaylet(info.storeName, rayletSocketName,
|
||||
UniqueID.nil, "", workDir + rpcPort, ip, redisAddress);
|
||||
UniqueID.nil, "", ip, redisAddress);
|
||||
|
||||
int sep = redisAddress.indexOf(':');
|
||||
assert (sep != -1);
|
||||
|
@ -703,12 +624,12 @@ public class RunManager {
|
|||
String resourceArgument = ResourceUtil.getResourcesStringFromMap(staticResources);
|
||||
|
||||
// The second-last arugment is the worker command for Python, not needed for Java.
|
||||
String[] cmds = new String[]{filePath,rayletSocketName, storeName, ip, gcsIp,
|
||||
String[] cmds = new String[]{filePath, rayletSocketName, storeName, ip, gcsIp,
|
||||
gcsPort, "" + numWorkers, resourceArgument,
|
||||
"", workerCommand};
|
||||
|
||||
Process p = startProcess(cmds, null, RunInfo.ProcessType.PT_RAYLET,
|
||||
workDir + rpcPort, redisAddress, ip, redirect, cleanup);
|
||||
"raylet", redisAddress, ip, redirect, cleanup);
|
||||
|
||||
if (p != null && p.isAlive()) {
|
||||
try {
|
||||
|
@ -729,7 +650,7 @@ public class RunManager {
|
|||
}
|
||||
|
||||
private String buildWorkerCommandRaylet(String storeName, String rayletSocketName,
|
||||
UniqueID actorId, String actorClass, String workDir,
|
||||
UniqueID actorId, String actorClass,
|
||||
String ip, String redisAddress) {
|
||||
String workerConfigs = "ray.java.start.object_store_name=" + storeName
|
||||
+ ";ray.java.start.raylet_socket_name=" + rayletSocketName
|
||||
|
@ -743,7 +664,7 @@ public class RunManager {
|
|||
}
|
||||
|
||||
String jvmArgs = "";
|
||||
jvmArgs += " -Dlogging.path=" + params.working_directory + "/logs/workers";
|
||||
jvmArgs += " -Dlogging.path=" + params.log_dir;
|
||||
jvmArgs += " -Dlogging.file.name=core-*pid_suffix*";
|
||||
|
||||
return buildJavaProcessCommand(
|
||||
|
@ -752,7 +673,6 @@ public class RunManager {
|
|||
"",
|
||||
workerConfigs,
|
||||
jvmArgs,
|
||||
workDir,
|
||||
ip,
|
||||
redisAddress,
|
||||
null
|
||||
|
@ -761,7 +681,7 @@ public class RunManager {
|
|||
|
||||
private String buildWorkerCommand(boolean isFromLocalScheduler, String storeName,
|
||||
String storeManagerName, String localSchedulerName,
|
||||
UniqueID actorId, String actorClass, String workDir, String
|
||||
UniqueID actorId, String actorClass, String
|
||||
ip, String redisAddress) {
|
||||
String workerConfigs = "ray.java.start.object_store_name=" + storeName
|
||||
+ ";ray.java.start.object_store_manager_name=" + storeManagerName
|
||||
|
@ -776,7 +696,7 @@ public class RunManager {
|
|||
}
|
||||
|
||||
String jvmArgs = "";
|
||||
jvmArgs += " -Dlogging.path=" + params.working_directory + "/logs/workers";
|
||||
jvmArgs += " -Dlogging.path=" + params.log_dir;
|
||||
jvmArgs += " -Dlogging.file.name=core-*pid_suffix*";
|
||||
|
||||
return buildJavaProcessCommand(
|
||||
|
@ -785,14 +705,13 @@ public class RunManager {
|
|||
"",
|
||||
workerConfigs,
|
||||
jvmArgs,
|
||||
workDir,
|
||||
ip,
|
||||
redisAddress,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
private void startObjectStore(int index, AddressInfo info, String workDir, String redisAddress,
|
||||
private void startObjectStore(int index, AddressInfo info, String redisAddress,
|
||||
String ip, boolean redirect, boolean cleanup) {
|
||||
int occupiedMemoryMb = params.object_store_occupied_memory_MB;
|
||||
long memoryBytes = occupiedMemoryMb * 1000000;
|
||||
|
@ -804,7 +723,7 @@ public class RunManager {
|
|||
|
||||
Map<String, String> env = null;
|
||||
Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_PLASMA_STORE,
|
||||
workDir + rpcPort, redisAddress, ip, redirect, cleanup);
|
||||
"plasma_store", redisAddress, ip, redirect, cleanup);
|
||||
|
||||
if (p != null && p.isAlive()) {
|
||||
try {
|
||||
|
@ -824,7 +743,7 @@ public class RunManager {
|
|||
}
|
||||
}
|
||||
|
||||
private AddressInfo startObjectManager(int index, AddressInfo info, String workDir,
|
||||
private AddressInfo startObjectManager(int index, AddressInfo info,
|
||||
String redisAddress, String ip, boolean redirect,
|
||||
boolean cleanup) {
|
||||
String filePath = paths.store_manager;
|
||||
|
@ -838,7 +757,7 @@ public class RunManager {
|
|||
|
||||
Map<String, String> env = null;
|
||||
Process p = startProcess(cmd.split(" "), env, RunInfo.ProcessType.PT_PLASMA_MANAGER,
|
||||
workDir + rpcPort, redisAddress, ip, redirect, cleanup);
|
||||
"object_manager", redisAddress, ip, redirect, cleanup);
|
||||
|
||||
if (p != null && p.isAlive()) {
|
||||
try {
|
||||
|
@ -859,12 +778,12 @@ public class RunManager {
|
|||
}
|
||||
|
||||
public void startWorker(String storeName, String storeManagerName,
|
||||
String localSchedulerName, String workDir, String redisAddress,
|
||||
String localSchedulerName, String workerName, String redisAddress,
|
||||
String ip, UniqueID actorId, String actorClass,
|
||||
boolean redirect, boolean cleanup) {
|
||||
String cmd = buildWorkerCommand(false, storeName, storeManagerName, localSchedulerName, actorId,
|
||||
actorClass, workDir, ip, redisAddress);
|
||||
startProcess(cmd.split(" "), null, RunInfo.ProcessType.PT_WORKER, workDir, redisAddress, ip,
|
||||
actorClass, ip, redisAddress);
|
||||
startProcess(cmd.split(" "), null, RunInfo.ProcessType.PT_WORKER, workerName, redisAddress, ip,
|
||||
redirect, cleanup);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue