[runtime env] move worker env to runtime env in Java (#19060)

This commit is contained in:
Guyang Song 2021-10-11 17:25:09 +08:00 committed by GitHub
parent 0c4603f836
commit ab55b808c5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 141 additions and 88 deletions

View file

@ -40,7 +40,7 @@ void Init() {
bool IsInitialized() { return is_init_; }
void Shutdown() {
// TODO(guyang.sgy): Clean the ray runtime.
// TODO(SongGuyang): Clean the ray runtime.
internal::AbstractRayRuntime::DoShutdown();
is_init_ = false;
}

View file

@ -145,7 +145,7 @@ InvocationSpec BuildInvocationSpec1(TaskType task_type,
InvocationSpec invocation_spec;
invocation_spec.task_type = task_type;
invocation_spec.task_id =
TaskID::ForFakeTask(); // TODO(Guyang Song): make it from different task
TaskID::ForFakeTask(); // TODO(SongGuyang): make it from different task
invocation_spec.remote_function_holder = remote_function_holder;
invocation_spec.actor_id = actor;
invocation_spec.args = TransformArgs(args);

View file

@ -116,7 +116,7 @@ std::vector<bool> NativeObjectStore::Wait(const std::vector<ObjectID> &ids,
int num_objects, int timeout_ms) {
std::vector<bool> results;
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
// TODO(guyang.sgy): Support `fetch_local` option in API.
// TODO(SongGuyang): Support `fetch_local` option in API.
// Simply set `fetch_local` to be true.
::ray::Status status = core_worker.Wait(ids, num_objects, timeout_ms, &results, true);
if (!status.ok()) {

View file

@ -32,7 +32,7 @@ LocalModeTaskSubmitter::LocalModeTaskSubmitter(
ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
const ActorCreationOptions &options) {
/// TODO(Guyang Song): Make the information of TaskSpecification more reasonable
/// TODO(SongGuyang): Make the information of TaskSpecification more reasonable
/// We just reuse the TaskSpecification class and make the single process mode work.
/// Maybe some infomation of TaskSpecification are not reasonable or invalid.
/// We will enhance this after implement the cluster mode.
@ -82,7 +82,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
AbstractRayRuntime *runtime = &local_mode_ray_tuntime_;
if (invocation.task_type == TaskType::ACTOR_CREATION_TASK ||
invocation.task_type == TaskType::ACTOR_TASK) {
/// TODO(Guyang Song): Handle task dependencies.
/// TODO(SongGuyang): Handle task dependencies.
/// Execute actor task directly in the main thread because we must guarantee the actor
/// task executed by calling order.
TaskExecutor::Invoke(task_specification, actor, runtime, actor_contexts_,

View file

@ -75,7 +75,7 @@ std::shared_ptr<msgpack::sbuffer> TaskExecutor::current_actor_ = nullptr;
TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_)
: abstract_ray_tuntime_(abstract_ray_tuntime_) {}
// TODO(Guyang Song): Make a common task execution function used for both local mode and
// TODO(SongGuyang): Make a common task execution function used for both local mode and
// cluster mode.
std::unique_ptr<ObjectID> TaskExecutor::Execute(InvocationSpec &invocation) {
abstract_ray_tuntime_.GetWorkerContext();

View file

@ -16,8 +16,10 @@
#include <ray/api/function_manager.h>
#include <ray/api/serializer.h>
#include <boost/dll.hpp>
#include <memory>
#include "absl/synchronization/mutex.h"
#include "invocation_spec.h"
#include "ray/common/id.h"
@ -62,7 +64,7 @@ class TaskExecutor {
public:
TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_);
/// TODO(Guyang Song): support multiple tasks execution
/// TODO(SongGuyang): support multiple tasks execution
std::unique_ptr<ObjectID> Execute(InvocationSpec &invocation);
static void Invoke(

View file

@ -125,7 +125,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
if (!ConfigInternal::Instance().job_id.empty()) {
options.job_id = JobID::FromHex(ConfigInternal::Instance().job_id);
} else {
/// TODO(Guyang Song): Get next job id from core worker by GCS client.
/// TODO(SongGuyang): Get next job id from core worker by GCS client.
/// Random a number to avoid repeated job ids.
/// The repeated job ids will lead to task hang when driver connects to a existing
/// cluster more than once.

View file

@ -73,7 +73,6 @@ class APIHead(dashboard_utils.DashboardHeadModule):
for job_table_entry in reply.job_info_list:
job_id = job_table_entry.job_id.hex()
config = {
"env_vars": dict(job_table_entry.config.worker_env),
"namespace": job_table_entry.config.ray_namespace,
"metadata": dict(job_table_entry.config.metadata),
"runtime_env": json.loads(

View file

@ -39,9 +39,6 @@
"config": {
"type": "object",
"properties": {
"envVars": {
"type": "object"
},
"namespace": {
"type": "string"
},
@ -53,7 +50,6 @@
}
},
"required": [
"envVars",
"namespace",
"metadata",
"runtimeEnv"

View file

@ -1,6 +1,7 @@
package io.ray.runtime;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import io.ray.api.BaseActorHandle;
import io.ray.api.id.ActorId;
import io.ray.api.id.JobId;
@ -10,6 +11,7 @@ import io.ray.runtime.context.NativeWorkerContext;
import io.ray.runtime.exception.RayIntentionalSystemExitException;
import io.ray.runtime.gcs.GcsClient;
import io.ray.runtime.gcs.GcsClientOptions;
import io.ray.runtime.generated.Common.RuntimeEnv;
import io.ray.runtime.generated.Common.WorkerType;
import io.ray.runtime.generated.Gcs.GcsNodeInfo;
import io.ray.runtime.generated.Gcs.JobConfig;
@ -20,6 +22,8 @@ import io.ray.runtime.task.NativeTaskSubmitter;
import io.ray.runtime.task.TaskExecutor;
import io.ray.runtime.util.BinaryFileUtil;
import io.ray.runtime.util.JniUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -102,8 +106,20 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
JobConfig.newBuilder()
.setNumJavaWorkersPerProcess(rayConfig.numWorkersPerProcess)
.addAllJvmOptions(rayConfig.jvmOptionsForJavaWorker)
.putAllWorkerEnv(rayConfig.workerEnv)
.addAllCodeSearchPath(rayConfig.codeSearchPath);
RuntimeEnv.Builder runtimeEnvBuilder = RuntimeEnv.newBuilder();
if (!rayConfig.workerEnv.isEmpty()) {
// TODO(SongGuyang): Suppport complete runtime env interface for users.
// Set worker env to the serialized runtime env json.
Gson gson = new Gson();
Map<String, Map<String, String>> runtimeEnv = new HashMap<>();
runtimeEnv.put("env_vars", rayConfig.workerEnv);
String gsonString = gson.toJson(runtimeEnv);
runtimeEnvBuilder.setSerializedRuntimeEnv(gsonString);
} else {
runtimeEnvBuilder.setSerializedRuntimeEnv("{}");
}
jobConfigBuilder.setRuntimeEnv(runtimeEnvBuilder.build());
serializedJobConfig = jobConfigBuilder.build().toByteArray();
}

View file

@ -72,8 +72,8 @@ class RayParams:
be created.
worker_path (str): The path of the source code that will be run by the
worker.
setup_worker_path (str): The path of the Python file that will run
worker_setup_hook to set up the environment for the worker process.
setup_worker_path (str): The path of the Python file that will set up
the environment for the worker process.
huge_pages: Boolean flag indicating whether to start the Object
Store with hugetlbfs support. Requires plasma_directory.
include_dashboard: Boolean flag indicating whether to start the web

View file

@ -5,6 +5,7 @@ import sys
from typing import Dict, List, Optional
from ray.util.annotations import DeveloperAPI
from ray.core.generated.common_pb2 import Language
logger = logging.getLogger(__name__)
@ -34,10 +35,13 @@ class RuntimeEnvContext:
def deserialize(json_string):
return RuntimeEnvContext(**json.loads(json_string))
def exec_worker(self, passthrough_args: List[str]):
def exec_worker(self, passthrough_args: List[str], language: Language):
os.environ.update(self.env_vars)
exec_command = " ".join([f"exec {self.py_executable}"] +
passthrough_args)
if language == Language.PYTHON:
executable = f"exec {self.py_executable}"
else:
executable = "exec"
exec_command = " ".join([executable] + passthrough_args)
command_str = " && ".join(self.command_prefix + [exec_command])
logger.info(f"Exec'ing worker with command: {command_str}")
os.execvp("bash", ["bash", "-c", command_str])

View file

@ -21,6 +21,7 @@ from typing import Optional
import ray
import ray.ray_constants as ray_constants
import redis
from ray.core.generated.common_pb2 import Language
# Import psutil and colorama after ray so the packaged version is used.
import colorama
@ -1373,8 +1374,8 @@ def start_raylet(redis_address,
to.
worker_path (str): The path of the Python file that new worker
processes will execute.
setup_worker_path (str): The path of the Python file that will run
worker_setup_hook to set up the environment for the worker process.
setup_worker_path (str): The path of the Python file that will set up
the environment for the worker process.
temp_dir (str): The path of the temporary directory Ray will use.
session_dir (str): The path of this session.
resource_dir(str): The path of resource of this session .
@ -1450,6 +1451,7 @@ def start_raylet(redis_address,
redis_password,
session_dir,
node_ip_address,
setup_worker_path,
)
else:
java_worker_command = []
@ -1604,6 +1606,7 @@ def build_java_worker_command(
redis_password,
session_dir,
node_ip_address,
setup_worker_path,
):
"""This method assembles the command used to start a Java worker.
@ -1615,6 +1618,8 @@ def build_java_worker_command(
redis_password (str): The password of connect to redis.
session_dir (str): The path of this session.
node_ip_address (str): The ip address for this node.
setup_worker_path (str): The path of the Python file that will set up
the environment for the worker process.
Returns:
The command string for starting Java worker.
"""
@ -1639,7 +1644,9 @@ def build_java_worker_command(
pairs.append(("ray.home", RAY_HOME))
pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs")))
pairs.append(("ray.session-dir", session_dir))
command = ["java"] + ["-D{}={}".format(*pair) for pair in pairs]
command = [sys.executable] + [setup_worker_path] + ["java"] + [
"-D{}={}".format(*pair) for pair in pairs
]
# Add ray jars path to java classpath
ray_jars = os.path.join(get_ray_jars_dir(), "*")
@ -1921,9 +1928,14 @@ def start_ray_client_server(
ray_constants.SETUP_WORKER_FILENAME)
command = [
sys.executable, setup_worker_path, "-m", "ray.util.client.server",
f"--redis-address={redis_address}", f"--port={ray_client_server_port}",
f"--mode={server_type}"
sys.executable,
setup_worker_path,
"-m",
"ray.util.client.server",
f"--redis-address={redis_address}",
f"--port={ray_client_server_port}",
f"--mode={server_type}",
f"--language={Language.Name(Language.PYTHON)}",
]
if redis_password:
command.append(f"--redis-password={redis_password}")

View file

@ -30,6 +30,9 @@ parser.add_argument(
parser.add_argument(
"--session-dir", type=str, help="the directory for the current session")
parser.add_argument(
"--language", type=str, help="the language type of the worker")
args, remaining_args = parser.parse_known_args()
# add worker-shim-pid argument

View file

@ -4,6 +4,7 @@ import logging
import os
from ray._private.runtime_env.context import RuntimeEnvContext
from ray.core.generated.common_pb2 import Language
logger = logging.getLogger(__name__)
@ -26,6 +27,9 @@ parser.add_argument(
type=str,
help="the worker allocated resource")
parser.add_argument(
"--language", type=str, help="the language type of the worker")
def get_tmp_dir(remaining_args):
for arg in remaining_args:
@ -117,5 +121,5 @@ if __name__ == "__main__":
# probably not even go through this codepath.
runtime_env_context = RuntimeEnvContext.deserialize(
args.serialized_runtime_env_context or "{}")
runtime_env_context.exec_worker(remaining_args)
runtime_env_context.exec_worker(remaining_args,
Language.Value(args.language))

View file

@ -32,30 +32,6 @@ namespace {
// Duration between internal book-keeping heartbeats.
const uint64_t kInternalHeartbeatMillis = 1000;
void BuildCommonTaskSpec(
TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id,
const std::string name, const TaskID &current_task_id, const uint64_t task_index,
const TaskID &caller_id, const rpc::Address &address, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
const std::string debugger_breakpoint, const std::string &serialized_runtime_env,
const std::vector<std::string> &runtime_env_uris,
const std::string &concurrency_group_name = "") {
// Build common task spec.
builder.SetCommonTaskSpec(
task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
current_task_id, task_index, caller_id, address, num_returns, required_resources,
required_placement_resources, bundle_id, placement_group_capture_child_tasks,
debugger_breakpoint, serialized_runtime_env, runtime_env_uris,
concurrency_group_name);
// Set task arguments.
for (const auto &arg : args) {
builder.AddArg(*arg);
}
}
JobID GetProcessJobID(const CoreWorkerOptions &options) {
if (options.worker_type == WorkerType::DRIVER) {
RAY_CHECK(!options.job_id.IsNil());
@ -1642,6 +1618,37 @@ std::unordered_map<std::string, double> AddPlacementGroupConstraint(
return resources;
}
void CoreWorker::BuildCommonTaskSpec(
TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id,
const std::string &name, const TaskID &current_task_id, uint64_t task_index,
const TaskID &caller_id, const rpc::Address &address, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
const std::string &debugger_breakpoint, const std::string &serialized_runtime_env,
const std::vector<std::string> &runtime_env_uris,
const std::string &concurrency_group_name) {
// Build common task spec.
builder.SetCommonTaskSpec(
task_id, name, function.GetLanguage(), function.GetFunctionDescriptor(), job_id,
current_task_id, task_index, caller_id, address, num_returns, required_resources,
required_placement_resources, bundle_id, placement_group_capture_child_tasks,
debugger_breakpoint,
// TODO(SongGuyang): Move the logic of `prepare_runtime_env` from Python to Core
// Worker. A common process is needed.
// If runtime env is not provided, use job config. Only for Java and C++ because it
// has been set in Python by `prepare_runtime_env`.
(serialized_runtime_env.empty() || serialized_runtime_env == "{}")
? job_config_->runtime_env().serialized_runtime_env()
: serialized_runtime_env,
runtime_env_uris, concurrency_group_name);
// Set task arguments.
for (const auto &arg : args) {
builder.AddArg(*arg);
}
}
std::vector<rpc::ObjectReference> CoreWorker::SubmitTask(
const RayFunction &function, const std::vector<std::unique_ptr<TaskArg>> &args,
const TaskOptions &task_options, int max_retries, bool retry_exceptions,

View file

@ -1040,6 +1040,17 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
std::unordered_map<std::string, std::vector<uint64_t>> GetActorCallStats() const;
private:
void BuildCommonTaskSpec(
TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id,
const std::string &name, const TaskID &current_task_id, uint64_t task_index,
const TaskID &caller_id, const rpc::Address &address, const RayFunction &function,
const std::vector<std::unique_ptr<TaskArg>> &args, uint64_t num_returns,
const std::unordered_map<std::string, double> &required_resources,
const std::unordered_map<std::string, double> &required_placement_resources,
const BundleID &bundle_id, bool placement_group_capture_child_tasks,
const std::string &debugger_breakpoint, const std::string &serialized_runtime_env,
const std::vector<std::string> &runtime_env_uris,
const std::string &concurrency_group_name = "");
void SetCurrentTaskId(const TaskID &task_id);
void SetActorId(const ActorID &actor_id);

View file

@ -277,24 +277,20 @@ message TaskLeaseData {
}
message JobConfig {
// Environment variables to be set on worker processes.
// TODO(edoakes): this is only used by Java. Once Java moves to runtime_env we
// should remove worker_env.
map<string, string> worker_env = 1;
// The number of java workers per worker process.
uint32 num_java_workers_per_process = 2;
uint32 num_java_workers_per_process = 1;
// The jvm options for java workers of the job.
repeated string jvm_options = 3;
repeated string jvm_options = 2;
// A list of directories or files (jar files or dynamic libraries) that specify the
// search path for user code. This will be used as `CLASSPATH` in Java, and `PYTHONPATH`
// in Python. In C++, libraries under these paths will be loaded by 'dlopen'.
repeated string code_search_path = 4;
repeated string code_search_path = 3;
// Runtime environment to run the code
RuntimeEnv runtime_env = 5;
RuntimeEnv runtime_env = 4;
// The job's namespace. Named `ray_namespace` to avoid confusions when invoked in c++.
string ray_namespace = 6;
string ray_namespace = 5;
// An opaque kv store for job related metadata.
map<string, string> metadata = 7;
map<string, string> metadata = 6;
}
message JobTableData {

View file

@ -527,7 +527,7 @@ void NodeManager::DestroyWorker(std::shared_ptr<WorkerInterface> worker,
}
void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_data) {
RAY_LOG(DEBUG) << "HandleJobStarted " << job_id;
RAY_LOG(DEBUG) << "HandleJobStarted for job " << job_id;
worker_pool_.HandleJobStarted(job_id, job_data.config());
// NOTE: Technically `HandleJobStarted` isn't idempotent because we'll
// increment the ref count multiple times. This is fine because

View file

@ -313,37 +313,40 @@ Process WorkerPool::StartWorkerProcess(
env.emplace(kEnvVarKeyJobId, job_id.Hex());
}
// TODO(edoakes): this is only used by Java. Once Java moves to runtime_env we
// should remove worker_env.
if (job_config) {
env.insert(job_config->worker_env().begin(), job_config->worker_env().end());
}
if (language == Language::PYTHON) {
if (language == Language::PYTHON || language == Language::JAVA) {
if (serialized_runtime_env != "{}" && serialized_runtime_env != "") {
worker_command_args.push_back("--serialized-runtime-env=" + serialized_runtime_env);
// Allocated_resource_json is only used in "shim process".
worker_command_args.push_back("--allocated-instances-serialized-json=" +
allocated_instances_serialized_json);
worker_command_args.push_back("--language=" + Language_Name(language));
worker_command_args.push_back("--runtime-env-hash=" +
std::to_string(runtime_env_hash));
if (serialized_runtime_env_context != "{}" &&
!serialized_runtime_env_context.empty()) {
worker_command_args.push_back("--serialized-runtime-env-context=" +
serialized_runtime_env_context);
}
} else {
// The "shim process" setup worker is not needed, so do not run it.
// Check that the arg really is the path to the setup worker before erasing it, to
// prevent breaking tests that mock out the worker command args.
if (worker_command_args.size() >= 2 &&
worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) {
worker_command_args.erase(worker_command_args.begin() + 1,
worker_command_args.begin() + 2);
if (language == Language::PYTHON) {
worker_command_args.erase(worker_command_args.begin() + 1,
worker_command_args.begin() + 2);
} else {
// Erase the python executable as well for other languages.
worker_command_args.erase(worker_command_args.begin(),
worker_command_args.begin() + 2);
}
}
}
worker_command_args.push_back("--runtime-env-hash=" +
std::to_string(runtime_env_hash));
if (serialized_runtime_env_context != "{}" && serialized_runtime_env_context != "") {
worker_command_args.push_back("--serialized-runtime-env-context=" +
serialized_runtime_env_context);
}
if (ray_debugger_external) {
worker_command_args.push_back("--ray-debugger-external");
}
@ -765,7 +768,7 @@ void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
// The worker is used for the actor creation task with dynamic options.
if (!used) {
// Put it into idle dedicated worker pool.
// TODO(guyang.sgy): This worker will not be used forever. We should kill it.
// TODO(SongGuyang): This worker will not be used forever. We should kill it.
state.idle_dedicated_workers[task_id] = worker;
}
return;
@ -964,7 +967,7 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
state.starting_workers_to_tasks[proc] = std::move(task_info);
}
} else {
// TODO(guyang.sgy): Wait until a worker is pushed or a worker can be started If
// TODO(SongGuyang): Wait until a worker is pushed or a worker can be started If
// startup concurrency maxed out or job not started.
PopWorkerCallbackAsync(callback, nullptr, status);
}

View file

@ -258,7 +258,7 @@ class WorkerPoolMock : public WorkerPool {
is_java = true;
}
}
// TODO(guyang.sgy): support C++ language workers.
// TODO(SongGuyang): support C++ language workers.
int num_workers = is_java ? NUM_WORKERS_PER_PROCESS_JAVA : 1;
for (int i = 0; i < num_workers; i++) {
auto worker =

View file

@ -13,6 +13,8 @@
// limitations under the License.
#pragma once
#include <gtest/gtest_prod.h>
#include <boost/asio.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <cmath>
@ -22,6 +24,8 @@
#include <memory>
#include <sstream>
#include <vector>
#include "nlohmann/json.hpp"
#include "ray/util/logging.h"
#include "ray/util/util.h"
#include "spdlog/sinks/basic_file_sink.h"
@ -29,10 +33,6 @@
#include "spdlog/spdlog.h"
#include "src/ray/protobuf/event.pb.h"
#include "nlohmann/json.hpp"
#include <gtest/gtest_prod.h>
using json = nlohmann::json;
namespace ray {
@ -102,7 +102,7 @@ class EventManager final {
// We added `const json &custom_fields` here because we need to support typed custom
// fields.
// TODO(guyang.sgy): Remove the protobuf `rpc::Event` and use an internal struct
// TODO(SongGuyang): Remove the protobuf `rpc::Event` and use an internal struct
// instead.
void Publish(const rpc::Event &event, const json &custom_fields);

View file

@ -639,7 +639,7 @@ class StreamingWorker {
} // namespace ray
int main(int argc, char **argv) {
RAY_CHECK(argc == 5);
RAY_CHECK(argc >= 4);
auto store_socket = std::string(argv[1]);
auto raylet_socket = std::string(argv[2]);
auto node_manager_port = std::stoi(std::string(argv[3]));