diff --git a/cpp/src/ray/api.cc b/cpp/src/ray/api.cc index a1a8c6507..ed2b1b892 100644 --- a/cpp/src/ray/api.cc +++ b/cpp/src/ray/api.cc @@ -40,7 +40,7 @@ void Init() { bool IsInitialized() { return is_init_; } void Shutdown() { - // TODO(guyang.sgy): Clean the ray runtime. + // TODO(SongGuyang): Clean the ray runtime. internal::AbstractRayRuntime::DoShutdown(); is_init_ = false; } diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 177fae17d..db9fac32d 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -145,7 +145,7 @@ InvocationSpec BuildInvocationSpec1(TaskType task_type, InvocationSpec invocation_spec; invocation_spec.task_type = task_type; invocation_spec.task_id = - TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task + TaskID::ForFakeTask(); // TODO(SongGuyang): make it from different task invocation_spec.remote_function_holder = remote_function_holder; invocation_spec.actor_id = actor; invocation_spec.args = TransformArgs(args); diff --git a/cpp/src/ray/runtime/object/native_object_store.cc b/cpp/src/ray/runtime/object/native_object_store.cc index d9326feb2..7add3b72b 100644 --- a/cpp/src/ray/runtime/object/native_object_store.cc +++ b/cpp/src/ray/runtime/object/native_object_store.cc @@ -116,7 +116,7 @@ std::vector NativeObjectStore::Wait(const std::vector &ids, int num_objects, int timeout_ms) { std::vector results; auto &core_worker = CoreWorkerProcess::GetCoreWorker(); - // TODO(guyang.sgy): Support `fetch_local` option in API. + // TODO(SongGuyang): Support `fetch_local` option in API. // Simply set `fetch_local` to be true. ::ray::Status status = core_worker.Wait(ids, num_objects, timeout_ms, &results, true); if (!status.ok()) { diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index cb24e9d3a..40b784557 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -32,7 +32,7 @@ LocalModeTaskSubmitter::LocalModeTaskSubmitter( ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation, const ActorCreationOptions &options) { - /// TODO(Guyang Song): Make the information of TaskSpecification more reasonable + /// TODO(SongGuyang): Make the information of TaskSpecification more reasonable /// We just reuse the TaskSpecification class and make the single process mode work. /// Maybe some infomation of TaskSpecification are not reasonable or invalid. /// We will enhance this after implement the cluster mode. @@ -82,7 +82,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation, AbstractRayRuntime *runtime = &local_mode_ray_tuntime_; if (invocation.task_type == TaskType::ACTOR_CREATION_TASK || invocation.task_type == TaskType::ACTOR_TASK) { - /// TODO(Guyang Song): Handle task dependencies. + /// TODO(SongGuyang): Handle task dependencies. /// Execute actor task directly in the main thread because we must guarantee the actor /// task executed by calling order. TaskExecutor::Invoke(task_specification, actor, runtime, actor_contexts_, diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index be24fe98d..f0a1e12fa 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -75,7 +75,7 @@ std::shared_ptr TaskExecutor::current_actor_ = nullptr; TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_) : abstract_ray_tuntime_(abstract_ray_tuntime_) {} -// TODO(Guyang Song): Make a common task execution function used for both local mode and +// TODO(SongGuyang): Make a common task execution function used for both local mode and // cluster mode. std::unique_ptr TaskExecutor::Execute(InvocationSpec &invocation) { abstract_ray_tuntime_.GetWorkerContext(); diff --git a/cpp/src/ray/runtime/task/task_executor.h b/cpp/src/ray/runtime/task/task_executor.h index a528f17e0..825e5ca52 100644 --- a/cpp/src/ray/runtime/task/task_executor.h +++ b/cpp/src/ray/runtime/task/task_executor.h @@ -16,8 +16,10 @@ #include #include + #include #include + #include "absl/synchronization/mutex.h" #include "invocation_spec.h" #include "ray/common/id.h" @@ -62,7 +64,7 @@ class TaskExecutor { public: TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_); - /// TODO(Guyang Song): support multiple tasks execution + /// TODO(SongGuyang): support multiple tasks execution std::unique_ptr Execute(InvocationSpec &invocation); static void Invoke( diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index cd60f3da1..35ecd8123 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -125,7 +125,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) if (!ConfigInternal::Instance().job_id.empty()) { options.job_id = JobID::FromHex(ConfigInternal::Instance().job_id); } else { - /// TODO(Guyang Song): Get next job id from core worker by GCS client. + /// TODO(SongGuyang): Get next job id from core worker by GCS client. /// Random a number to avoid repeated job ids. /// The repeated job ids will lead to task hang when driver connects to a existing /// cluster more than once. diff --git a/dashboard/modules/snapshot/snapshot_head.py b/dashboard/modules/snapshot/snapshot_head.py index 3618c6f48..87082f546 100644 --- a/dashboard/modules/snapshot/snapshot_head.py +++ b/dashboard/modules/snapshot/snapshot_head.py @@ -73,7 +73,6 @@ class APIHead(dashboard_utils.DashboardHeadModule): for job_table_entry in reply.job_info_list: job_id = job_table_entry.job_id.hex() config = { - "env_vars": dict(job_table_entry.config.worker_env), "namespace": job_table_entry.config.ray_namespace, "metadata": dict(job_table_entry.config.metadata), "runtime_env": json.loads( diff --git a/dashboard/modules/snapshot/snapshot_schema.json b/dashboard/modules/snapshot/snapshot_schema.json index f66081311..4768c2a5e 100644 --- a/dashboard/modules/snapshot/snapshot_schema.json +++ b/dashboard/modules/snapshot/snapshot_schema.json @@ -39,9 +39,6 @@ "config": { "type": "object", "properties": { - "envVars": { - "type": "object" - }, "namespace": { "type": "string" }, @@ -53,7 +50,6 @@ } }, "required": [ - "envVars", "namespace", "metadata", "runtimeEnv" diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java index acda82aa6..172ff78df 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java @@ -1,6 +1,7 @@ package io.ray.runtime; import com.google.common.base.Preconditions; +import com.google.gson.Gson; import io.ray.api.BaseActorHandle; import io.ray.api.id.ActorId; import io.ray.api.id.JobId; @@ -10,6 +11,7 @@ import io.ray.runtime.context.NativeWorkerContext; import io.ray.runtime.exception.RayIntentionalSystemExitException; import io.ray.runtime.gcs.GcsClient; import io.ray.runtime.gcs.GcsClientOptions; +import io.ray.runtime.generated.Common.RuntimeEnv; import io.ray.runtime.generated.Common.WorkerType; import io.ray.runtime.generated.Gcs.GcsNodeInfo; import io.ray.runtime.generated.Gcs.JobConfig; @@ -20,6 +22,8 @@ import io.ray.runtime.task.NativeTaskSubmitter; import io.ray.runtime.task.TaskExecutor; import io.ray.runtime.util.BinaryFileUtil; import io.ray.runtime.util.JniUtils; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -102,8 +106,20 @@ public final class RayNativeRuntime extends AbstractRayRuntime { JobConfig.newBuilder() .setNumJavaWorkersPerProcess(rayConfig.numWorkersPerProcess) .addAllJvmOptions(rayConfig.jvmOptionsForJavaWorker) - .putAllWorkerEnv(rayConfig.workerEnv) .addAllCodeSearchPath(rayConfig.codeSearchPath); + RuntimeEnv.Builder runtimeEnvBuilder = RuntimeEnv.newBuilder(); + if (!rayConfig.workerEnv.isEmpty()) { + // TODO(SongGuyang): Suppport complete runtime env interface for users. + // Set worker env to the serialized runtime env json. + Gson gson = new Gson(); + Map> runtimeEnv = new HashMap<>(); + runtimeEnv.put("env_vars", rayConfig.workerEnv); + String gsonString = gson.toJson(runtimeEnv); + runtimeEnvBuilder.setSerializedRuntimeEnv(gsonString); + } else { + runtimeEnvBuilder.setSerializedRuntimeEnv("{}"); + } + jobConfigBuilder.setRuntimeEnv(runtimeEnvBuilder.build()); serializedJobConfig = jobConfigBuilder.build().toByteArray(); } diff --git a/python/ray/_private/parameter.py b/python/ray/_private/parameter.py index 430380860..24eace1a8 100644 --- a/python/ray/_private/parameter.py +++ b/python/ray/_private/parameter.py @@ -72,8 +72,8 @@ class RayParams: be created. worker_path (str): The path of the source code that will be run by the worker. - setup_worker_path (str): The path of the Python file that will run - worker_setup_hook to set up the environment for the worker process. + setup_worker_path (str): The path of the Python file that will set up + the environment for the worker process. huge_pages: Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory. include_dashboard: Boolean flag indicating whether to start the web diff --git a/python/ray/_private/runtime_env/context.py b/python/ray/_private/runtime_env/context.py index c0a27cc80..c5db64437 100644 --- a/python/ray/_private/runtime_env/context.py +++ b/python/ray/_private/runtime_env/context.py @@ -5,6 +5,7 @@ import sys from typing import Dict, List, Optional from ray.util.annotations import DeveloperAPI +from ray.core.generated.common_pb2 import Language logger = logging.getLogger(__name__) @@ -34,10 +35,13 @@ class RuntimeEnvContext: def deserialize(json_string): return RuntimeEnvContext(**json.loads(json_string)) - def exec_worker(self, passthrough_args: List[str]): + def exec_worker(self, passthrough_args: List[str], language: Language): os.environ.update(self.env_vars) - exec_command = " ".join([f"exec {self.py_executable}"] + - passthrough_args) + if language == Language.PYTHON: + executable = f"exec {self.py_executable}" + else: + executable = "exec" + exec_command = " ".join([executable] + passthrough_args) command_str = " && ".join(self.command_prefix + [exec_command]) logger.info(f"Exec'ing worker with command: {command_str}") os.execvp("bash", ["bash", "-c", command_str]) diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 1bfcc235f..04b63ec92 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -21,6 +21,7 @@ from typing import Optional import ray import ray.ray_constants as ray_constants import redis +from ray.core.generated.common_pb2 import Language # Import psutil and colorama after ray so the packaged version is used. import colorama @@ -1373,8 +1374,8 @@ def start_raylet(redis_address, to. worker_path (str): The path of the Python file that new worker processes will execute. - setup_worker_path (str): The path of the Python file that will run - worker_setup_hook to set up the environment for the worker process. + setup_worker_path (str): The path of the Python file that will set up + the environment for the worker process. temp_dir (str): The path of the temporary directory Ray will use. session_dir (str): The path of this session. resource_dir(str): The path of resource of this session . @@ -1450,6 +1451,7 @@ def start_raylet(redis_address, redis_password, session_dir, node_ip_address, + setup_worker_path, ) else: java_worker_command = [] @@ -1604,6 +1606,7 @@ def build_java_worker_command( redis_password, session_dir, node_ip_address, + setup_worker_path, ): """This method assembles the command used to start a Java worker. @@ -1615,6 +1618,8 @@ def build_java_worker_command( redis_password (str): The password of connect to redis. session_dir (str): The path of this session. node_ip_address (str): The ip address for this node. + setup_worker_path (str): The path of the Python file that will set up + the environment for the worker process. Returns: The command string for starting Java worker. """ @@ -1639,7 +1644,9 @@ def build_java_worker_command( pairs.append(("ray.home", RAY_HOME)) pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs"))) pairs.append(("ray.session-dir", session_dir)) - command = ["java"] + ["-D{}={}".format(*pair) for pair in pairs] + command = [sys.executable] + [setup_worker_path] + ["java"] + [ + "-D{}={}".format(*pair) for pair in pairs + ] # Add ray jars path to java classpath ray_jars = os.path.join(get_ray_jars_dir(), "*") @@ -1921,9 +1928,14 @@ def start_ray_client_server( ray_constants.SETUP_WORKER_FILENAME) command = [ - sys.executable, setup_worker_path, "-m", "ray.util.client.server", - f"--redis-address={redis_address}", f"--port={ray_client_server_port}", - f"--mode={server_type}" + sys.executable, + setup_worker_path, + "-m", + "ray.util.client.server", + f"--redis-address={redis_address}", + f"--port={ray_client_server_port}", + f"--mode={server_type}", + f"--language={Language.Name(Language.PYTHON)}", ] if redis_password: command.append(f"--redis-password={redis_password}") diff --git a/python/ray/tests/mock_setup_worker.py b/python/ray/tests/mock_setup_worker.py index a19a9ce22..7cd981b9a 100644 --- a/python/ray/tests/mock_setup_worker.py +++ b/python/ray/tests/mock_setup_worker.py @@ -30,6 +30,9 @@ parser.add_argument( parser.add_argument( "--session-dir", type=str, help="the directory for the current session") +parser.add_argument( + "--language", type=str, help="the language type of the worker") + args, remaining_args = parser.parse_known_args() # add worker-shim-pid argument diff --git a/python/ray/workers/setup_worker.py b/python/ray/workers/setup_worker.py index 3c97682b6..b40737c1a 100644 --- a/python/ray/workers/setup_worker.py +++ b/python/ray/workers/setup_worker.py @@ -4,6 +4,7 @@ import logging import os from ray._private.runtime_env.context import RuntimeEnvContext +from ray.core.generated.common_pb2 import Language logger = logging.getLogger(__name__) @@ -26,6 +27,9 @@ parser.add_argument( type=str, help="the worker allocated resource") +parser.add_argument( + "--language", type=str, help="the language type of the worker") + def get_tmp_dir(remaining_args): for arg in remaining_args: @@ -117,5 +121,5 @@ if __name__ == "__main__": # probably not even go through this codepath. runtime_env_context = RuntimeEnvContext.deserialize( args.serialized_runtime_env_context or "{}") - - runtime_env_context.exec_worker(remaining_args) + runtime_env_context.exec_worker(remaining_args, + Language.Value(args.language)) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b9f4d6b95..4270a31f5 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -32,30 +32,6 @@ namespace { // Duration between internal book-keeping heartbeats. const uint64_t kInternalHeartbeatMillis = 1000; -void BuildCommonTaskSpec( - TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, - const std::string name, const TaskID ¤t_task_id, const uint64_t task_index, - const TaskID &caller_id, const rpc::Address &address, const RayFunction &function, - const std::vector> &args, uint64_t num_returns, - const std::unordered_map &required_resources, - const std::unordered_map &required_placement_resources, - const BundleID &bundle_id, bool placement_group_capture_child_tasks, - const std::string debugger_breakpoint, const std::string &serialized_runtime_env, - const std::vector &runtime_env_uris, - const std::string &concurrency_group_name = "") { - // Build common task spec. - builder.SetCommonTaskSpec( - task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, - current_task_id, task_index, caller_id, address, num_returns, required_resources, - required_placement_resources, bundle_id, placement_group_capture_child_tasks, - debugger_breakpoint, serialized_runtime_env, runtime_env_uris, - concurrency_group_name); - // Set task arguments. - for (const auto &arg : args) { - builder.AddArg(*arg); - } -} - JobID GetProcessJobID(const CoreWorkerOptions &options) { if (options.worker_type == WorkerType::DRIVER) { RAY_CHECK(!options.job_id.IsNil()); @@ -1642,6 +1618,37 @@ std::unordered_map AddPlacementGroupConstraint( return resources; } +void CoreWorker::BuildCommonTaskSpec( + TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, + const std::string &name, const TaskID ¤t_task_id, uint64_t task_index, + const TaskID &caller_id, const rpc::Address &address, const RayFunction &function, + const std::vector> &args, uint64_t num_returns, + const std::unordered_map &required_resources, + const std::unordered_map &required_placement_resources, + const BundleID &bundle_id, bool placement_group_capture_child_tasks, + const std::string &debugger_breakpoint, const std::string &serialized_runtime_env, + const std::vector &runtime_env_uris, + const std::string &concurrency_group_name) { + // Build common task spec. + builder.SetCommonTaskSpec( + task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, + current_task_id, task_index, caller_id, address, num_returns, required_resources, + required_placement_resources, bundle_id, placement_group_capture_child_tasks, + debugger_breakpoint, + // TODO(SongGuyang): Move the logic of `prepare_runtime_env` from Python to Core + // Worker. A common process is needed. + // If runtime env is not provided, use job config. Only for Java and C++ because it + // has been set in Python by `prepare_runtime_env`. + (serialized_runtime_env.empty() || serialized_runtime_env == "{}") + ? job_config_->runtime_env().serialized_runtime_env() + : serialized_runtime_env, + runtime_env_uris, concurrency_group_name); + // Set task arguments. + for (const auto &arg : args) { + builder.AddArg(*arg); + } +} + std::vector CoreWorker::SubmitTask( const RayFunction &function, const std::vector> &args, const TaskOptions &task_options, int max_retries, bool retry_exceptions, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b5c584ab3..883a1b013 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1040,6 +1040,17 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::unordered_map> GetActorCallStats() const; private: + void BuildCommonTaskSpec( + TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, + const std::string &name, const TaskID ¤t_task_id, uint64_t task_index, + const TaskID &caller_id, const rpc::Address &address, const RayFunction &function, + const std::vector> &args, uint64_t num_returns, + const std::unordered_map &required_resources, + const std::unordered_map &required_placement_resources, + const BundleID &bundle_id, bool placement_group_capture_child_tasks, + const std::string &debugger_breakpoint, const std::string &serialized_runtime_env, + const std::vector &runtime_env_uris, + const std::string &concurrency_group_name = ""); void SetCurrentTaskId(const TaskID &task_id); void SetActorId(const ActorID &actor_id); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index a7bab3470..5f35c1a21 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -277,24 +277,20 @@ message TaskLeaseData { } message JobConfig { - // Environment variables to be set on worker processes. - // TODO(edoakes): this is only used by Java. Once Java moves to runtime_env we - // should remove worker_env. - map worker_env = 1; // The number of java workers per worker process. - uint32 num_java_workers_per_process = 2; + uint32 num_java_workers_per_process = 1; // The jvm options for java workers of the job. - repeated string jvm_options = 3; + repeated string jvm_options = 2; // A list of directories or files (jar files or dynamic libraries) that specify the // search path for user code. This will be used as `CLASSPATH` in Java, and `PYTHONPATH` // in Python. In C++, libraries under these paths will be loaded by 'dlopen'. - repeated string code_search_path = 4; + repeated string code_search_path = 3; // Runtime environment to run the code - RuntimeEnv runtime_env = 5; + RuntimeEnv runtime_env = 4; // The job's namespace. Named `ray_namespace` to avoid confusions when invoked in c++. - string ray_namespace = 6; + string ray_namespace = 5; // An opaque kv store for job related metadata. - map metadata = 7; + map metadata = 6; } message JobTableData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e6d04e2c2..5970d06a2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -527,7 +527,7 @@ void NodeManager::DestroyWorker(std::shared_ptr worker, } void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_data) { - RAY_LOG(DEBUG) << "HandleJobStarted " << job_id; + RAY_LOG(DEBUG) << "HandleJobStarted for job " << job_id; worker_pool_.HandleJobStarted(job_id, job_data.config()); // NOTE: Technically `HandleJobStarted` isn't idempotent because we'll // increment the ref count multiple times. This is fine because diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 18665b17c..959cc551f 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -313,37 +313,40 @@ Process WorkerPool::StartWorkerProcess( env.emplace(kEnvVarKeyJobId, job_id.Hex()); } - // TODO(edoakes): this is only used by Java. Once Java moves to runtime_env we - // should remove worker_env. - if (job_config) { - env.insert(job_config->worker_env().begin(), job_config->worker_env().end()); - } - - if (language == Language::PYTHON) { + if (language == Language::PYTHON || language == Language::JAVA) { if (serialized_runtime_env != "{}" && serialized_runtime_env != "") { worker_command_args.push_back("--serialized-runtime-env=" + serialized_runtime_env); // Allocated_resource_json is only used in "shim process". worker_command_args.push_back("--allocated-instances-serialized-json=" + allocated_instances_serialized_json); + + worker_command_args.push_back("--language=" + Language_Name(language)); + + worker_command_args.push_back("--runtime-env-hash=" + + std::to_string(runtime_env_hash)); + + if (serialized_runtime_env_context != "{}" && + !serialized_runtime_env_context.empty()) { + worker_command_args.push_back("--serialized-runtime-env-context=" + + serialized_runtime_env_context); + } } else { // The "shim process" setup worker is not needed, so do not run it. // Check that the arg really is the path to the setup worker before erasing it, to // prevent breaking tests that mock out the worker command args. if (worker_command_args.size() >= 2 && worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) { - worker_command_args.erase(worker_command_args.begin() + 1, - worker_command_args.begin() + 2); + if (language == Language::PYTHON) { + worker_command_args.erase(worker_command_args.begin() + 1, + worker_command_args.begin() + 2); + } else { + // Erase the python executable as well for other languages. + worker_command_args.erase(worker_command_args.begin(), + worker_command_args.begin() + 2); + } } } - worker_command_args.push_back("--runtime-env-hash=" + - std::to_string(runtime_env_hash)); - - if (serialized_runtime_env_context != "{}" && serialized_runtime_env_context != "") { - worker_command_args.push_back("--serialized-runtime-env-context=" + - serialized_runtime_env_context); - } - if (ray_debugger_external) { worker_command_args.push_back("--ray-debugger-external"); } @@ -765,7 +768,7 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { // The worker is used for the actor creation task with dynamic options. if (!used) { // Put it into idle dedicated worker pool. - // TODO(guyang.sgy): This worker will not be used forever. We should kill it. + // TODO(SongGuyang): This worker will not be used forever. We should kill it. state.idle_dedicated_workers[task_id] = worker; } return; @@ -964,7 +967,7 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, state.starting_workers_to_tasks[proc] = std::move(task_info); } } else { - // TODO(guyang.sgy): Wait until a worker is pushed or a worker can be started If + // TODO(SongGuyang): Wait until a worker is pushed or a worker can be started If // startup concurrency maxed out or job not started. PopWorkerCallbackAsync(callback, nullptr, status); } diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 4ff383a82..37fb903b4 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -258,7 +258,7 @@ class WorkerPoolMock : public WorkerPool { is_java = true; } } - // TODO(guyang.sgy): support C++ language workers. + // TODO(SongGuyang): support C++ language workers. int num_workers = is_java ? NUM_WORKERS_PER_PROCESS_JAVA : 1; for (int i = 0; i < num_workers; i++) { auto worker = diff --git a/src/ray/util/event.h b/src/ray/util/event.h index 9caed946f..4f2e98a44 100644 --- a/src/ray/util/event.h +++ b/src/ray/util/event.h @@ -13,6 +13,8 @@ // limitations under the License. #pragma once +#include + #include #include #include @@ -22,6 +24,8 @@ #include #include #include + +#include "nlohmann/json.hpp" #include "ray/util/logging.h" #include "ray/util/util.h" #include "spdlog/sinks/basic_file_sink.h" @@ -29,10 +33,6 @@ #include "spdlog/spdlog.h" #include "src/ray/protobuf/event.pb.h" -#include "nlohmann/json.hpp" - -#include - using json = nlohmann::json; namespace ray { @@ -102,7 +102,7 @@ class EventManager final { // We added `const json &custom_fields` here because we need to support typed custom // fields. - // TODO(guyang.sgy): Remove the protobuf `rpc::Event` and use an internal struct + // TODO(SongGuyang): Remove the protobuf `rpc::Event` and use an internal struct // instead. void Publish(const rpc::Event &event, const json &custom_fields); diff --git a/streaming/src/test/mock_actor.cc b/streaming/src/test/mock_actor.cc index c51b1a8a1..5e5b57522 100644 --- a/streaming/src/test/mock_actor.cc +++ b/streaming/src/test/mock_actor.cc @@ -639,7 +639,7 @@ class StreamingWorker { } // namespace ray int main(int argc, char **argv) { - RAY_CHECK(argc == 5); + RAY_CHECK(argc >= 4); auto store_socket = std::string(argv[1]); auto raylet_socket = std::string(argv[2]); auto node_manager_port = std::stoi(std::string(argv[3]));