diff --git a/java/api/src/main/java/io/ray/api/call/ActorCreator.java b/java/api/src/main/java/io/ray/api/call/ActorCreator.java index b64a4fbcd..784ecbf12 100644 --- a/java/api/src/main/java/io/ray/api/call/ActorCreator.java +++ b/java/api/src/main/java/io/ray/api/call/ActorCreator.java @@ -3,6 +3,7 @@ package io.ray.api.call; import io.ray.api.ActorHandle; import io.ray.api.Ray; import io.ray.api.function.RayFuncR; +import java.util.List; /** * A helper to create java actor. @@ -25,9 +26,9 @@ public class ActorCreator extends BaseActorCreator> { * * @param jvmOptions JVM options for the Java worker that this actor is running in. * @return self - * @see io.ray.api.options.ActorCreationOptions.Builder#setJvmOptions(java.lang.String) + * @see io.ray.api.options.ActorCreationOptions.Builder#setJvmOptions(List) */ - public ActorCreator setJvmOptions(String jvmOptions) { + public ActorCreator setJvmOptions(List jvmOptions) { builder.setJvmOptions(jvmOptions); return this; } diff --git a/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java b/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java index 303239735..3b33815c3 100644 --- a/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java +++ b/java/api/src/main/java/io/ray/api/options/ActorCreationOptions.java @@ -2,7 +2,10 @@ package io.ray.api.options; import io.ray.api.Ray; import io.ray.api.placementgroup.PlacementGroup; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; /** The options for creating actor. */ @@ -10,7 +13,7 @@ public class ActorCreationOptions extends BaseTaskOptions { public final boolean global; public final String name; public final int maxRestarts; - public final String jvmOptions; + public final List jvmOptions; public final int maxConcurrency; public final PlacementGroup group; public final int bundleIndex; @@ -20,7 +23,7 @@ public class ActorCreationOptions extends BaseTaskOptions { String name, Map resources, int maxRestarts, - String jvmOptions, + List jvmOptions, int maxConcurrency, PlacementGroup group, int bundleIndex) { @@ -40,7 +43,7 @@ public class ActorCreationOptions extends BaseTaskOptions { private String name; private Map resources = new HashMap<>(); private int maxRestarts = 0; - private String jvmOptions = null; + private List jvmOptions = new ArrayList<>(); private int maxConcurrency = 1; private PlacementGroup group; private int bundleIndex; @@ -120,8 +123,22 @@ public class ActorCreationOptions extends BaseTaskOptions { * * @param jvmOptions JVM options for the Java worker that this actor is running in. * @return self + * @deprecated Use {@link #setJvmOptions(List)} instead. */ public Builder setJvmOptions(String jvmOptions) { + this.jvmOptions = Arrays.asList(jvmOptions.split(" +")); + return this; + } + + /** + * Set the JVM options for the Java worker that this actor is running in. + * + *

Note, if this is set, this actor won't share Java worker with other actors or tasks. + * + * @param jvmOptions JVM options for the Java worker that this actor is running in. + * @return self + */ + public Builder setJvmOptions(List jvmOptions) { this.jvmOptions = jvmOptions; return this; } diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index afb91e1a0..bae3bbdf5 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -1,7 +1,6 @@ package io.ray.runtime; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import io.ray.api.ActorHandle; import io.ray.api.BaseActorHandle; @@ -296,7 +295,7 @@ public abstract class AbstractRayRuntime implements RayRuntimeInternal { List functionArgs = ArgumentsBuilder.wrap(args, functionDescriptor.getLanguage()); if (functionDescriptor.getLanguage() != Language.JAVA && options != null) { - Preconditions.checkState(Strings.isNullOrEmpty(options.jvmOptions)); + Preconditions.checkState(options.jvmOptions == null || options.jvmOptions.size() == 0); } BaseActorHandle actor = taskSubmitter.createActor(functionDescriptor, functionArgs, options); return actor; diff --git a/java/test/src/main/java/io/ray/test/ExitActorTest.java b/java/test/src/main/java/io/ray/test/ExitActorTest.java index 279af55c0..7c221efc0 100644 --- a/java/test/src/main/java/io/ray/test/ExitActorTest.java +++ b/java/test/src/main/java/io/ray/test/ExitActorTest.java @@ -2,6 +2,7 @@ package io.ray.test; import static io.ray.runtime.util.SystemUtil.pid; +import com.google.common.collect.ImmutableList; import io.ray.api.ActorHandle; import io.ray.api.ObjectRef; import io.ray.api.Ray; @@ -96,7 +97,7 @@ public class ExitActorTest extends BaseTest { Ray.actor(ExitingActor::new) .setMaxRestarts(10000) // Set dummy JVM options to start a worker process with only one worker. - .setJvmOptions(" ") + .setJvmOptions(ImmutableList.of("-Ddummy=value")) .remote(); int pid = actor.task(ExitingActor::getPid).remote().get(); Assert.assertTrue(SystemUtil.isProcessAlive(pid)); diff --git a/java/test/src/main/java/io/ray/test/WorkerJvmOptionsTest.java b/java/test/src/main/java/io/ray/test/WorkerJvmOptionsTest.java index dc9114d15..27b070496 100644 --- a/java/test/src/main/java/io/ray/test/WorkerJvmOptionsTest.java +++ b/java/test/src/main/java/io/ray/test/WorkerJvmOptionsTest.java @@ -1,5 +1,6 @@ package io.ray.test; +import com.google.common.collect.ImmutableList; import io.ray.api.ActorHandle; import io.ray.api.ObjectRef; import io.ray.api.Ray; @@ -20,7 +21,7 @@ public class WorkerJvmOptionsTest extends BaseTest { // that raylet can correctly handle dynamic options with whitespaces. ActorHandle actor = Ray.actor(Echo::new) - .setJvmOptions(" -Dtest.suffix=suffix -Dtest.suffix1=suffix1 ") + .setJvmOptions(ImmutableList.of("-Dtest.suffix=suffix", "-Dtest.suffix1=suffix1")) .remote(); ObjectRef obj = actor.task(Echo::getOptions).remote(); Assert.assertEquals(obj.get(), "suffix"); diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index e46959aaa..76dea1907 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -149,11 +149,10 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env, jobject java_resources = env->GetObjectField(actorCreationOptions, java_base_task_options_resources); resources = ToResources(env, java_resources); - jstring java_jvm_options = (jstring)env->GetObjectField( + jobject java_jvm_options = env->GetObjectField( actorCreationOptions, java_actor_creation_options_jvm_options); if (java_jvm_options) { - std::string jvm_options = JavaStringToNativeString(env, java_jvm_options); - dynamic_worker_options.emplace_back(jvm_options); + JavaStringListToNativeStringVector(env, java_jvm_options, &dynamic_worker_options); } max_concurrency = static_cast(env->GetIntField( actorCreationOptions, java_actor_creation_options_max_concurrency)); diff --git a/src/ray/core_worker/lib/java/jni_init.cc b/src/ray/core_worker/lib/java/jni_init.cc index 0128fb4e9..5606b814a 100644 --- a/src/ray/core_worker/lib/java/jni_init.cc +++ b/src/ray/core_worker/lib/java/jni_init.cc @@ -276,7 +276,7 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) { java_actor_creation_options_max_restarts = env->GetFieldID(java_actor_creation_options_class, "maxRestarts", "I"); java_actor_creation_options_jvm_options = env->GetFieldID( - java_actor_creation_options_class, "jvmOptions", "Ljava/lang/String;"); + java_actor_creation_options_class, "jvmOptions", "Ljava/util/List;"); java_actor_creation_options_max_concurrency = env->GetFieldID(java_actor_creation_options_class, "maxConcurrency", "I"); java_actor_creation_options_group = diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 9796f4f25..4b89fe8f5 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -141,7 +141,7 @@ void WorkerPool::SetNodeManagerPort(int node_manager_port) { Process WorkerPool::StartWorkerProcess( const Language &language, const rpc::WorkerType worker_type, const JobID &job_id, - std::vector dynamic_options, + const std::vector &dynamic_options, std::unordered_map override_environment_variables) { rpc::JobConfig *job_config = nullptr; if (!IsIOWorkerType(worker_type)) { @@ -182,15 +182,11 @@ Process WorkerPool::StartWorkerProcess( } } + std::vector options; + + // Append Ray-defined per-job options here if (language == Language::JAVA) { if (job_config) { - // Note that we push the item to the front of the vector to make - // sure this is the freshest option than others. - if (!job_config->jvm_options().empty()) { - dynamic_options.insert(dynamic_options.begin(), job_config->jvm_options().begin(), - job_config->jvm_options().end()); - } - std::string code_search_path_str; for (int i = 0; i < job_config->code_search_path_size(); i++) { auto path = job_config->code_search_path(i); @@ -201,23 +197,34 @@ Process WorkerPool::StartWorkerProcess( } if (!code_search_path_str.empty()) { code_search_path_str = "-Dray.job.code-search-path=" + code_search_path_str; - dynamic_options.push_back(code_search_path_str); + options.push_back(code_search_path_str); } } - - dynamic_options.push_back("-Dray.job.num-java-workers-per-process=" + - std::to_string(workers_to_start)); } + // Append user-defined per-job options here + if (language == Language::JAVA) { + if (!job_config->jvm_options().empty()) { + options.insert(options.end(), job_config->jvm_options().begin(), + job_config->jvm_options().end()); + } + } + + // Append Ray-defined per-process options here + if (language == Language::JAVA) { + options.push_back("-Dray.job.num-java-workers-per-process=" + + std::to_string(workers_to_start)); + } + + // Append user-defined per-process options here + options.insert(options.end(), dynamic_options.begin(), dynamic_options.end()); + // Extract pointers from the worker command to pass into execvp. std::vector worker_command_args; for (auto const &token : state.worker_command) { if (token == kWorkerDynamicOptionPlaceholder) { - for (const auto &dynamic_option : dynamic_options) { - auto options = ParseCommandLine(dynamic_option); - worker_command_args.insert(worker_command_args.end(), options.begin(), - options.end()); - } + worker_command_args.insert(worker_command_args.end(), options.begin(), + options.end()); continue; } RAY_CHECK(node_manager_port_ != 0) diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 34219c7f6..e22d8f0ec 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -349,7 +349,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// otherwise it means we didn't start a process. Process StartWorkerProcess( const Language &language, const rpc::WorkerType worker_type, const JobID &job_id, - std::vector dynamic_options = {}, + const std::vector &dynamic_options = {}, std::unordered_map override_environment_variables = {}); /// The implementation of how to start a new worker process with command arguments. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 7d7d7184e..146b6c104 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -33,6 +33,10 @@ JobID JOB_ID = JobID::FromInt(1); std::vector LANGUAGES = {Language::PYTHON, Language::JAVA}; +static inline std::string GetNumJavaWorkersPerProcessSystemProperty(int num) { + return std::string("-Dray.job.num-java-workers-per-process=") + std::to_string(num); +} + class MockWorkerClient : public rpc::CoreWorkerClientInterface { public: MockWorkerClient(instrumented_io_context &io_service) : io_service_(io_service) {} @@ -217,8 +221,7 @@ class WorkerPoolTest : public ::testing::Test { RegisterDriver(Language::PYTHON, JOB_ID, job_config); } - void TestStartupWorkerProcessCount(Language language, int num_workers_per_process, - std::vector expected_worker_command) { + void TestStartupWorkerProcessCount(Language language, int num_workers_per_process) { int desired_initial_worker_process_count = 100; int expected_worker_process_count = static_cast(std::ceil( static_cast(MAXIMUM_STARTUP_CONCURRENCY) / num_workers_per_process)); @@ -234,7 +237,12 @@ class WorkerPoolTest : public ::testing::Test { last_started_worker_process = prev; const auto &real_command = worker_pool_->GetWorkerCommand(last_started_worker_process); - ASSERT_EQ(real_command, expected_worker_command); + if (language == Language::JAVA) { + auto it = std::find( + real_command.begin(), real_command.end(), + GetNumJavaWorkersPerProcessSystemProperty(num_workers_per_process)); + ASSERT_NE(it, real_command.end()); + } } else { ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), expected_worker_process_count); @@ -263,7 +271,8 @@ class WorkerPoolTest : public ::testing::Test { static inline TaskSpecification ExampleTaskSpec( const ActorID actor_id = ActorID::Nil(), const Language &language = Language::PYTHON, const JobID &job_id = JOB_ID, const ActorID actor_creation_id = ActorID::Nil(), - const std::vector &dynamic_worker_options = {}) { + const std::vector &dynamic_worker_options = {}, + const TaskID &task_id = TaskID::Nil()) { rpc::TaskSpec message; message.set_job_id(job_id.Binary()); message.set_language(language); @@ -272,6 +281,7 @@ static inline TaskSpecification ExampleTaskSpec( message.mutable_actor_task_spec()->set_actor_id(actor_id.Binary()); } else if (!actor_creation_id.IsNil()) { message.set_type(TaskType::ACTOR_CREATION_TASK); + message.set_task_id(task_id.Binary()); message.mutable_actor_creation_task_spec()->set_actor_id(actor_creation_id.Binary()); for (const auto &option : dynamic_worker_options) { message.mutable_actor_creation_task_spec()->add_dynamic_worker_options(option); @@ -282,10 +292,6 @@ static inline TaskSpecification ExampleTaskSpec( return TaskSpecification(std::move(message)); } -static inline std::string GetNumJavaWorkersPerProcessSystemProperty(int num) { - return std::string("-Dray.job.num-java-workers-per-process=") + std::to_string(num); -} - TEST_F(WorkerPoolTest, CompareWorkerProcessObjects) { typedef Process T; T a(T::CreateNewDummy()), b(T::CreateNewDummy()), empty = T(); @@ -335,14 +341,11 @@ TEST_F(WorkerPoolTest, HandleUnknownWorkerRegistration) { } TEST_F(WorkerPoolTest, StartupPythonWorkerProcessCount) { - TestStartupWorkerProcessCount(Language::PYTHON, 1, {"dummy_py_worker_command"}); + TestStartupWorkerProcessCount(Language::PYTHON, 1); } TEST_F(WorkerPoolTest, StartupJavaWorkerProcessCount) { - TestStartupWorkerProcessCount( - Language::JAVA, NUM_WORKERS_PER_PROCESS_JAVA, - {"java", GetNumJavaWorkersPerProcessSystemProperty(NUM_WORKERS_PER_PROCESS_JAVA), - "MainClass"}); + TestStartupWorkerProcessCount(Language::JAVA, NUM_WORKERS_PER_PROCESS_JAVA); } TEST_F(WorkerPoolTest, InitialWorkerProcessCount) { @@ -416,25 +419,47 @@ TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { } TEST_F(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) { - TaskSpecification task_spec = ExampleTaskSpec( - ActorID::Nil(), Language::JAVA, JOB_ID, - ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1), {"test_op_0", "test_op_1"}); + std::vector actor_jvm_options; + actor_jvm_options.insert( + actor_jvm_options.end(), + {"-Dmy-actor.hello=foo", "-Dmy-actor.world=bar", "-Xmx2g", "-Xms1g"}); + auto task_id = TaskID::ForDriverTask(JOB_ID); + auto actor_id = ActorID::Of(JOB_ID, task_id, 1); + TaskSpecification task_spec = ExampleTaskSpec(ActorID::Nil(), Language::JAVA, JOB_ID, + actor_id, actor_jvm_options, task_id); rpc::JobConfig job_config = rpc::JobConfig(); - job_config.add_code_search_path("/test/code_serch_path"); + job_config.add_code_search_path("/test/code_search_path"); job_config.set_num_java_workers_per_process(1); + job_config.add_jvm_options("-Xmx1g"); + job_config.add_jvm_options("-Xms500m"); + job_config.add_jvm_options("-Dmy-job.hello=world"); + job_config.add_jvm_options("-Dmy-job.foo=bar"); worker_pool_->HandleJobStarted(JOB_ID, job_config); - worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID, - task_spec.DynamicWorkerOptions()); + ASSERT_EQ(worker_pool_->PopWorker(task_spec), nullptr); const auto real_command = worker_pool_->GetWorkerCommand(worker_pool_->LastStartedWorkerProcess()); - ASSERT_EQ(real_command, - std::vector({"java", "test_op_0", "test_op_1", - "-Dray.job.code-search-path=/test/code_serch_path", - GetNumJavaWorkersPerProcessSystemProperty(1), - "MainClass"})); + // NOTE: When adding a new parameter to Java worker command, think carefully about the + // position of this new parameter. Do not modify the order of existing parameters. + std::vector expected_command; + expected_command.push_back("java"); + // Ray-defined per-job options + expected_command.insert(expected_command.end(), + {"-Dray.job.code-search-path=/test/code_search_path"}); + // User-defined per-job options + expected_command.insert( + expected_command.end(), + {"-Xmx1g", "-Xms500m", "-Dmy-job.hello=world", "-Dmy-job.foo=bar"}); + // Ray-defined per-process options + expected_command.push_back(GetNumJavaWorkersPerProcessSystemProperty(1)); + // User-defined per-process options + expected_command.insert(expected_command.end(), actor_jvm_options.begin(), + actor_jvm_options.end()); + // Entry point + expected_command.push_back("MainClass"); + ASSERT_EQ(real_command, expected_command); worker_pool_->HandleJobFinished(JOB_ID); }