[Java] refine generation of jvm options (#14931)

This commit is contained in:
Kai Yang 2021-03-31 21:04:52 +08:00 committed by GitHub
parent 73fb5d6022
commit 6278df8604
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 105 additions and 55 deletions

View file

@ -3,6 +3,7 @@ package io.ray.api.call;
import io.ray.api.ActorHandle;
import io.ray.api.Ray;
import io.ray.api.function.RayFuncR;
import java.util.List;
/**
* A helper to create java actor.
@ -25,9 +26,9 @@ public class ActorCreator<A> extends BaseActorCreator<ActorCreator<A>> {
*
* @param jvmOptions JVM options for the Java worker that this actor is running in.
* @return self
* @see io.ray.api.options.ActorCreationOptions.Builder#setJvmOptions(java.lang.String)
* @see io.ray.api.options.ActorCreationOptions.Builder#setJvmOptions(List)
*/
public ActorCreator<A> setJvmOptions(String jvmOptions) {
public ActorCreator<A> setJvmOptions(List<String> jvmOptions) {
builder.setJvmOptions(jvmOptions);
return this;
}

View file

@ -2,7 +2,10 @@ package io.ray.api.options;
import io.ray.api.Ray;
import io.ray.api.placementgroup.PlacementGroup;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** The options for creating actor. */
@ -10,7 +13,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
public final boolean global;
public final String name;
public final int maxRestarts;
public final String jvmOptions;
public final List<String> jvmOptions;
public final int maxConcurrency;
public final PlacementGroup group;
public final int bundleIndex;
@ -20,7 +23,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
String name,
Map<String, Double> resources,
int maxRestarts,
String jvmOptions,
List<String> jvmOptions,
int maxConcurrency,
PlacementGroup group,
int bundleIndex) {
@ -40,7 +43,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
private String name;
private Map<String, Double> resources = new HashMap<>();
private int maxRestarts = 0;
private String jvmOptions = null;
private List<String> jvmOptions = new ArrayList<>();
private int maxConcurrency = 1;
private PlacementGroup group;
private int bundleIndex;
@ -120,8 +123,22 @@ public class ActorCreationOptions extends BaseTaskOptions {
*
* @param jvmOptions JVM options for the Java worker that this actor is running in.
* @return self
* @deprecated Use {@link #setJvmOptions(List)} instead.
*/
public Builder setJvmOptions(String jvmOptions) {
this.jvmOptions = Arrays.asList(jvmOptions.split(" +"));
return this;
}
/**
* Set the JVM options for the Java worker that this actor is running in.
*
* <p>Note, if this is set, this actor won't share Java worker with other actors or tasks.
*
* @param jvmOptions JVM options for the Java worker that this actor is running in.
* @return self
*/
public Builder setJvmOptions(List<String> jvmOptions) {
this.jvmOptions = jvmOptions;
return this;
}

View file

@ -1,7 +1,6 @@
package io.ray.runtime;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
@ -296,7 +295,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal {
List<FunctionArg> functionArgs = ArgumentsBuilder.wrap(args, functionDescriptor.getLanguage());
if (functionDescriptor.getLanguage() != Language.JAVA && options != null) {
Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions));
Preconditions.checkState(options.jvmOptions == null || options.jvmOptions.size() == 0);
}
BaseActorHandle actor = taskSubmitter.createActor(functionDescriptor, functionArgs, options);
return actor;

View file

@ -2,6 +2,7 @@ package io.ray.test;
import static io.ray.runtime.util.SystemUtil.pid;
import com.google.common.collect.ImmutableList;
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
@ -96,7 +97,7 @@ public class ExitActorTest extends BaseTest {
Ray.actor(ExitingActor::new)
.setMaxRestarts(10000)
// Set dummy JVM options to start a worker process with only one worker.
.setJvmOptions(" ")
.setJvmOptions(ImmutableList.of("-Ddummy=value"))
.remote();
int pid = actor.task(ExitingActor::getPid).remote().get();
Assert.assertTrue(SystemUtil.isProcessAlive(pid));

View file

@ -1,5 +1,6 @@
package io.ray.test;
import com.google.common.collect.ImmutableList;
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
@ -20,7 +21,7 @@ public class WorkerJvmOptionsTest extends BaseTest {
// that raylet can correctly handle dynamic options with whitespaces.
ActorHandle<Echo> actor =
Ray.actor(Echo::new)
.setJvmOptions(" -Dtest.suffix=suffix -Dtest.suffix1=suffix1 ")
.setJvmOptions(ImmutableList.of("-Dtest.suffix=suffix", "-Dtest.suffix1=suffix1"))
.remote();
ObjectRef<String> obj = actor.task(Echo::getOptions).remote();
Assert.assertEquals(obj.get(), "suffix");

View file

@ -149,11 +149,10 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
jobject java_resources =
env->GetObjectField(actorCreationOptions, java_base_task_options_resources);
resources = ToResources(env, java_resources);
jstring java_jvm_options = (jstring)env->GetObjectField(
jobject java_jvm_options = env->GetObjectField(
actorCreationOptions, java_actor_creation_options_jvm_options);
if (java_jvm_options) {
std::string jvm_options = JavaStringToNativeString(env, java_jvm_options);
dynamic_worker_options.emplace_back(jvm_options);
JavaStringListToNativeStringVector(env, java_jvm_options, &dynamic_worker_options);
}
max_concurrency = static_cast<uint64_t>(env->GetIntField(
actorCreationOptions, java_actor_creation_options_max_concurrency));

View file

@ -276,7 +276,7 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
java_actor_creation_options_max_restarts =
env->GetFieldID(java_actor_creation_options_class, "maxRestarts", "I");
java_actor_creation_options_jvm_options = env->GetFieldID(
java_actor_creation_options_class, "jvmOptions", "Ljava/lang/String;");
java_actor_creation_options_class, "jvmOptions", "Ljava/util/List;");
java_actor_creation_options_max_concurrency =
env->GetFieldID(java_actor_creation_options_class, "maxConcurrency", "I");
java_actor_creation_options_group =

View file

@ -141,7 +141,7 @@ void WorkerPool::SetNodeManagerPort(int node_manager_port) {
Process WorkerPool::StartWorkerProcess(
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
std::vector<std::string> dynamic_options,
const std::vector<std::string> &dynamic_options,
std::unordered_map<std::string, std::string> override_environment_variables) {
rpc::JobConfig *job_config = nullptr;
if (!IsIOWorkerType(worker_type)) {
@ -182,15 +182,11 @@ Process WorkerPool::StartWorkerProcess(
}
}
std::vector<std::string> options;
// Append Ray-defined per-job options here
if (language == Language::JAVA) {
if (job_config) {
// Note that we push the item to the front of the vector to make
// sure this is the freshest option than others.
if (!job_config->jvm_options().empty()) {
dynamic_options.insert(dynamic_options.begin(), job_config->jvm_options().begin(),
job_config->jvm_options().end());
}
std::string code_search_path_str;
for (int i = 0; i < job_config->code_search_path_size(); i++) {
auto path = job_config->code_search_path(i);
@ -201,23 +197,34 @@ Process WorkerPool::StartWorkerProcess(
}
if (!code_search_path_str.empty()) {
code_search_path_str = "-Dray.job.code-search-path=" + code_search_path_str;
dynamic_options.push_back(code_search_path_str);
options.push_back(code_search_path_str);
}
}
dynamic_options.push_back("-Dray.job.num-java-workers-per-process=" +
std::to_string(workers_to_start));
}
// Append user-defined per-job options here
if (language == Language::JAVA) {
if (!job_config->jvm_options().empty()) {
options.insert(options.end(), job_config->jvm_options().begin(),
job_config->jvm_options().end());
}
}
// Append Ray-defined per-process options here
if (language == Language::JAVA) {
options.push_back("-Dray.job.num-java-workers-per-process=" +
std::to_string(workers_to_start));
}
// Append user-defined per-process options here
options.insert(options.end(), dynamic_options.begin(), dynamic_options.end());
// Extract pointers from the worker command to pass into execvp.
std::vector<std::string> worker_command_args;
for (auto const &token : state.worker_command) {
if (token == kWorkerDynamicOptionPlaceholder) {
for (const auto &dynamic_option : dynamic_options) {
auto options = ParseCommandLine(dynamic_option);
worker_command_args.insert(worker_command_args.end(), options.begin(),
options.end());
}
worker_command_args.insert(worker_command_args.end(), options.begin(),
options.end());
continue;
}
RAY_CHECK(node_manager_port_ != 0)

View file

@ -349,7 +349,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// otherwise it means we didn't start a process.
Process StartWorkerProcess(
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
std::vector<std::string> dynamic_options = {},
const std::vector<std::string> &dynamic_options = {},
std::unordered_map<std::string, std::string> override_environment_variables = {});
/// The implementation of how to start a new worker process with command arguments.

View file

@ -33,6 +33,10 @@ JobID JOB_ID = JobID::FromInt(1);
std::vector<Language> LANGUAGES = {Language::PYTHON, Language::JAVA};
static inline std::string GetNumJavaWorkersPerProcessSystemProperty(int num) {
return std::string("-Dray.job.num-java-workers-per-process=") + std::to_string(num);
}
class MockWorkerClient : public rpc::CoreWorkerClientInterface {
public:
MockWorkerClient(instrumented_io_context &io_service) : io_service_(io_service) {}
@ -217,8 +221,7 @@ class WorkerPoolTest : public ::testing::Test {
RegisterDriver(Language::PYTHON, JOB_ID, job_config);
}
void TestStartupWorkerProcessCount(Language language, int num_workers_per_process,
std::vector<std::string> expected_worker_command) {
void TestStartupWorkerProcessCount(Language language, int num_workers_per_process) {
int desired_initial_worker_process_count = 100;
int expected_worker_process_count = static_cast<int>(std::ceil(
static_cast<double>(MAXIMUM_STARTUP_CONCURRENCY) / num_workers_per_process));
@ -234,7 +237,12 @@ class WorkerPoolTest : public ::testing::Test {
last_started_worker_process = prev;
const auto &real_command =
worker_pool_->GetWorkerCommand(last_started_worker_process);
ASSERT_EQ(real_command, expected_worker_command);
if (language == Language::JAVA) {
auto it = std::find(
real_command.begin(), real_command.end(),
GetNumJavaWorkersPerProcessSystemProperty(num_workers_per_process));
ASSERT_NE(it, real_command.end());
}
} else {
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(),
expected_worker_process_count);
@ -263,7 +271,8 @@ class WorkerPoolTest : public ::testing::Test {
static inline TaskSpecification ExampleTaskSpec(
const ActorID actor_id = ActorID::Nil(), const Language &language = Language::PYTHON,
const JobID &job_id = JOB_ID, const ActorID actor_creation_id = ActorID::Nil(),
const std::vector<std::string> &dynamic_worker_options = {}) {
const std::vector<std::string> &dynamic_worker_options = {},
const TaskID &task_id = TaskID::Nil()) {
rpc::TaskSpec message;
message.set_job_id(job_id.Binary());
message.set_language(language);
@ -272,6 +281,7 @@ static inline TaskSpecification ExampleTaskSpec(
message.mutable_actor_task_spec()->set_actor_id(actor_id.Binary());
} else if (!actor_creation_id.IsNil()) {
message.set_type(TaskType::ACTOR_CREATION_TASK);
message.set_task_id(task_id.Binary());
message.mutable_actor_creation_task_spec()->set_actor_id(actor_creation_id.Binary());
for (const auto &option : dynamic_worker_options) {
message.mutable_actor_creation_task_spec()->add_dynamic_worker_options(option);
@ -282,10 +292,6 @@ static inline TaskSpecification ExampleTaskSpec(
return TaskSpecification(std::move(message));
}
static inline std::string GetNumJavaWorkersPerProcessSystemProperty(int num) {
return std::string("-Dray.job.num-java-workers-per-process=") + std::to_string(num);
}
TEST_F(WorkerPoolTest, CompareWorkerProcessObjects) {
typedef Process T;
T a(T::CreateNewDummy()), b(T::CreateNewDummy()), empty = T();
@ -335,14 +341,11 @@ TEST_F(WorkerPoolTest, HandleUnknownWorkerRegistration) {
}
TEST_F(WorkerPoolTest, StartupPythonWorkerProcessCount) {
TestStartupWorkerProcessCount(Language::PYTHON, 1, {"dummy_py_worker_command"});
TestStartupWorkerProcessCount(Language::PYTHON, 1);
}
TEST_F(WorkerPoolTest, StartupJavaWorkerProcessCount) {
TestStartupWorkerProcessCount(
Language::JAVA, NUM_WORKERS_PER_PROCESS_JAVA,
{"java", GetNumJavaWorkersPerProcessSystemProperty(NUM_WORKERS_PER_PROCESS_JAVA),
"MainClass"});
TestStartupWorkerProcessCount(Language::JAVA, NUM_WORKERS_PER_PROCESS_JAVA);
}
TEST_F(WorkerPoolTest, InitialWorkerProcessCount) {
@ -416,25 +419,47 @@ TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) {
}
TEST_F(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) {
TaskSpecification task_spec = ExampleTaskSpec(
ActorID::Nil(), Language::JAVA, JOB_ID,
ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1), {"test_op_0", "test_op_1"});
std::vector<std::string> actor_jvm_options;
actor_jvm_options.insert(
actor_jvm_options.end(),
{"-Dmy-actor.hello=foo", "-Dmy-actor.world=bar", "-Xmx2g", "-Xms1g"});
auto task_id = TaskID::ForDriverTask(JOB_ID);
auto actor_id = ActorID::Of(JOB_ID, task_id, 1);
TaskSpecification task_spec = ExampleTaskSpec(ActorID::Nil(), Language::JAVA, JOB_ID,
actor_id, actor_jvm_options, task_id);
rpc::JobConfig job_config = rpc::JobConfig();
job_config.add_code_search_path("/test/code_serch_path");
job_config.add_code_search_path("/test/code_search_path");
job_config.set_num_java_workers_per_process(1);
job_config.add_jvm_options("-Xmx1g");
job_config.add_jvm_options("-Xms500m");
job_config.add_jvm_options("-Dmy-job.hello=world");
job_config.add_jvm_options("-Dmy-job.foo=bar");
worker_pool_->HandleJobStarted(JOB_ID, job_config);
worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID,
task_spec.DynamicWorkerOptions());
ASSERT_EQ(worker_pool_->PopWorker(task_spec), nullptr);
const auto real_command =
worker_pool_->GetWorkerCommand(worker_pool_->LastStartedWorkerProcess());
ASSERT_EQ(real_command,
std::vector<std::string>({"java", "test_op_0", "test_op_1",
"-Dray.job.code-search-path=/test/code_serch_path",
GetNumJavaWorkersPerProcessSystemProperty(1),
"MainClass"}));
// NOTE: When adding a new parameter to Java worker command, think carefully about the
// position of this new parameter. Do not modify the order of existing parameters.
std::vector<std::string> expected_command;
expected_command.push_back("java");
// Ray-defined per-job options
expected_command.insert(expected_command.end(),
{"-Dray.job.code-search-path=/test/code_search_path"});
// User-defined per-job options
expected_command.insert(
expected_command.end(),
{"-Xmx1g", "-Xms500m", "-Dmy-job.hello=world", "-Dmy-job.foo=bar"});
// Ray-defined per-process options
expected_command.push_back(GetNumJavaWorkersPerProcessSystemProperty(1));
// User-defined per-process options
expected_command.insert(expected_command.end(), actor_jvm_options.begin(),
actor_jvm_options.end());
// Entry point
expected_command.push_back("MainClass");
ASSERT_EQ(real_command, expected_command);
worker_pool_->HandleJobFinished(JOB_ID);
}