[runtime env] plugin refactor [7/n]: support runtime env in C++ API (#27010)

Signed-off-by: 久龙 <guyang.sgy@antfin.com>
This commit is contained in:
Guyang Song 2022-07-27 18:24:31 +08:00 committed by GitHub
parent 5d6bc5360d
commit 06b0e715c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 419 additions and 37 deletions

View file

@ -1,9 +1,9 @@
cc_library(
name = "nlohmann_json",
hdrs = glob([
"include/**/*.hpp",
"single_include/**/*.hpp",
]),
includes = ["include"],
includes = ["single_include"],
visibility = ["//visibility:public"],
alwayslink = 1,
)

View file

@ -66,6 +66,7 @@ cc_library(
"@com_google_absl//absl/flags:flag",
"@com_google_absl//absl/flags:parse",
"@msgpack",
"@nlohmann_json",
],
alwayslink = True,
)
@ -102,6 +103,7 @@ cc_binary(
"@boost//:callable_traits",
"@boost//:optional",
"@msgpack",
"@nlohmann_json",
],
}),
)
@ -123,6 +125,7 @@ genrule(
mkdir -p "$$PY_CPP_DIR/lib/" &&
cp -f $(location default_worker) "$$PY_CPP_DIR/" &&
cp -f -r $$WORK_DIR/external/msgpack/include/* "$$PY_CPP_DIR/include" &&
cp -f -r $$WORK_DIR/external/nlohmann_json/single_include/* "$$PY_CPP_DIR/include" &&
cp -f -r "$$WORK_DIR/external/boost/boost/archive" "$$BOOST_DIR" &&
cp -f -r "$$WORK_DIR/external/boost/boost/assert" "$$BOOST_DIR" &&
cp -f -r "$$WORK_DIR/external/boost/boost/bind" "$$BOOST_DIR" &&
@ -250,6 +253,7 @@ cc_binary(
"@boost//:callable_traits",
"@boost//:optional",
"@msgpack",
"@nlohmann_json",
],
)
@ -271,6 +275,7 @@ cc_binary(
"@boost//:callable_traits",
"@boost//:optional",
"@msgpack",
"@nlohmann_json",
],
)
@ -308,6 +313,7 @@ cc_binary(
"@boost//:callable_traits",
"@boost//:optional",
"@msgpack",
"@nlohmann_json",
],
)

View file

@ -23,6 +23,7 @@
#include <ray/api/ray_remote.h>
#include <ray/api/ray_runtime.h>
#include <ray/api/ray_runtime_holder.h>
#include <ray/api/runtime_env.h>
#include <ray/api/task_caller.h>
#include <ray/api/wait_result.h>

View file

@ -15,6 +15,7 @@
#pragma once
#include <ray/api/actor_handle.h>
#include <ray/api/runtime_env.h>
#include <ray/api/task_options.h>
namespace ray {
@ -71,6 +72,11 @@ class ActorCreator {
return *this;
}
ActorCreator &SetRuntimeEnv(const ray::RuntimeEnv &runtime_env) {
create_options_.serialized_runtime_env_info = runtime_env.SerializeToRuntimeEnvInfo();
return *this;
}
private:
RayRuntime *runtime_;
RemoteFunctionHolder remote_function_holder_;

View file

@ -14,6 +14,7 @@
#pragma once
#include <ray/api/ray_exception.h>
#include <ray/api/runtime_env.h>
#include <memory>
#include <string>
@ -52,6 +53,9 @@ class RayConfig {
// The default actor lifetime type, `DETACHED` or `NON_DETACHED`.
ActorLifetime default_actor_lifetime = ActorLifetime::NON_DETACHED;
// The job level runtime environments.
boost::optional<RuntimeEnv> runtime_env;
/* The following are unstable parameters and their use is discouraged. */
// Prevents external clients without the password from connecting to Redis if provided.

View file

@ -53,5 +53,10 @@ class RayFunctionNotFound : public RayException {
public:
RayFunctionNotFound(const std::string &msg) : RayException(msg){};
};
class RayRuntimeEnvException : public RayException {
public:
RayRuntimeEnvException(const std::string &msg) : RayException(msg){};
};
} // namespace internal
} // namespace ray

View file

@ -0,0 +1,102 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <ray/api/ray_exception.h>
#include <string>
#include "nlohmann/json.hpp"
using json = nlohmann::json;
namespace ray {
/// This class provides interfaces of setting runtime environments for job/actor/task.
class RuntimeEnv {
public:
/// Set a runtime env field by name and Object.
/// \param[in] name The runtime env plugin name.
/// \param[in] value An object with primitive data type or jsonable type of
/// nlohmann/json.
template <typename T>
void Set(const std::string &name, const T &value);
/// Get the object of a runtime env field.
/// \param[in] name The runtime env plugin name.
template <typename T>
T Get(const std::string &name) const;
/// Set a runtime env field by name and json string.
/// \param[in] name The runtime env plugin name.
/// \param[in] json_str A json string represents the runtime env field.
void SetJsonStr(const std::string &name, const std::string &json_str);
/// Get the json string of a runtime env field.
/// \param[in] name The runtime env plugin name.
std::string GetJsonStr(const std::string &name) const;
/// Whether a field is contained.
/// \param[in] name The runtime env plugin name.
bool Contains(const std::string &name) const;
/// Remove a field by name.
/// \param[in] name The runtime env plugin name.
/// \return true if remove an existing field, otherwise false.
bool Remove(const std::string &name);
/// Whether the runtime env is empty.
bool Empty() const;
/// Serialize the runtime env to string.
std::string Serialize() const;
/// Serialize the runtime env to RuntimeEnvInfo.
std::string SerializeToRuntimeEnvInfo() const;
/// Deserialize the runtime env from string.
/// \return The deserialized RuntimeEnv instance.
static RuntimeEnv Deserialize(const std::string &serialized_runtime_env);
private:
json fields_;
};
// --------- inline implementation ------------
template <typename T>
inline void RuntimeEnv::Set(const std::string &name, const T &value) {
try {
json value_j = value;
fields_[name] = value_j;
} catch (std::exception &e) {
throw ray::internal::RayRuntimeEnvException("Failed to set the field " + name + ": " +
e.what());
}
}
template <typename T>
inline T RuntimeEnv::Get(const std::string &name) const {
if (!Contains(name)) {
throw ray::internal::RayRuntimeEnvException("The field " + name + " not found.");
}
try {
return fields_[name].get<T>();
} catch (std::exception &e) {
throw ray::internal::RayRuntimeEnvException("Failed to get the field " + name + ": " +
e.what());
}
}
} // namespace ray

View file

@ -14,8 +14,10 @@
#pragma once
#include <ray/api/runtime_env.h>
#include <ray/api/static_check.h>
#include <ray/api/task_options.h>
namespace ray {
namespace internal {
@ -50,6 +52,11 @@ class TaskCaller {
return *this;
}
TaskCaller &SetRuntimeEnv(const ray::RuntimeEnv &runtime_env) {
task_options_.serialized_runtime_env_info = runtime_env.SerializeToRuntimeEnvInfo();
return *this;
}
private:
RayRuntime *runtime_;
RemoteFunctionHolder remote_function_holder_{};

View file

@ -95,6 +95,7 @@ struct CallOptions {
std::unordered_map<std::string, double> resources;
PlacementGroup group;
int bundle_index;
std::string serialized_runtime_env_info;
};
struct ActorCreationOptions {
@ -105,6 +106,7 @@ struct ActorCreationOptions {
int max_concurrency = 1;
PlacementGroup group;
int bundle_index;
std::string serialized_runtime_env_info;
};
} // namespace internal

View file

@ -75,6 +75,13 @@ ABSL_FLAG(std::string,
"",
"The default actor lifetime type, `detached` or `non_detached`.");
ABSL_FLAG(std::string, ray_runtime_env, "", "The serialized runtime env.");
ABSL_FLAG(int,
ray_runtime_env_hash,
-1,
"The computed hash of the runtime env for this worker.");
namespace ray {
namespace internal {
@ -111,6 +118,9 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
if (config.default_actor_lifetime == ActorLifetime::DETACHED) {
default_actor_lifetime = rpc::JobConfig_ActorLifetime_DETACHED;
}
if (config.runtime_env) {
runtime_env = config.runtime_env;
}
if (argc != 0 && argv != nullptr) {
// Parse config from command line.
@ -158,6 +168,10 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
default_actor_lifetime =
ParseDefaultActorLifetimeType(FLAGS_ray_default_actor_lifetime.CurrentValue());
}
if (!FLAGS_ray_runtime_env.CurrentValue().empty()) {
runtime_env = RuntimeEnv::Deserialize(FLAGS_ray_runtime_env.CurrentValue());
}
runtime_env_hash = absl::GetFlag<int>(FLAGS_ray_runtime_env_hash);
}
worker_type = config.is_worker_ ? WorkerType::WORKER : WorkerType::DRIVER;
if (worker_type == WorkerType::DRIVER && run_mode == RunMode::CLUSTER) {

View file

@ -60,6 +60,10 @@ class ConfigInternal {
std::vector<std::string> head_args = {};
boost::optional<RuntimeEnv> runtime_env;
int runtime_env_hash = 0;
// The default actor lifetime type.
rpc::JobConfig_ActorLifetime default_actor_lifetime =
rpc::JobConfig_ActorLifetime_NON_DETACHED;

View file

@ -0,0 +1,78 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <google/protobuf/util/json_util.h>
#include <ray/api/runtime_env.h>
#include <ray/util/logging.h>
#include "src/ray/protobuf/runtime_env_common.pb.h"
namespace ray {
void RuntimeEnv::SetJsonStr(const std::string &name, const std::string &json_str) {
try {
json value_j = json::parse(json_str);
fields_[name] = value_j;
} catch (std::exception &e) {
throw ray::internal::RayRuntimeEnvException("Failed to set the field " + name +
" by json string: " + e.what());
}
}
std::string RuntimeEnv::GetJsonStr(const std::string &name) const {
if (!Contains(name)) {
throw ray::internal::RayRuntimeEnvException("The field " + name + " not found.");
}
auto j = fields_[name].get<json>();
return j.dump();
}
bool RuntimeEnv::Contains(const std::string &name) const {
return fields_.contains(name);
}
bool RuntimeEnv::Remove(const std::string &name) {
if (Contains(name)) {
fields_.erase(name);
return true;
}
return false;
}
bool RuntimeEnv::Empty() const { return fields_.empty(); }
std::string RuntimeEnv::Serialize() const { return fields_.dump(); }
std::string RuntimeEnv::SerializeToRuntimeEnvInfo() const {
rpc::RuntimeEnvInfo runtime_env_info;
runtime_env_info.set_serialized_runtime_env(Serialize());
std::string serialized_runtime_env_info;
RAY_CHECK(google::protobuf::util::MessageToJsonString(runtime_env_info,
&serialized_runtime_env_info)
.ok());
return serialized_runtime_env_info;
}
RuntimeEnv RuntimeEnv::Deserialize(const std::string &serialized_runtime_env) {
RuntimeEnv runtime_env;
try {
runtime_env.fields_ = json::parse(serialized_runtime_env);
} catch (std::exception &e) {
throw ray::internal::RayRuntimeEnvException("Failed to deserialize runtime env " +
serialized_runtime_env + ": " + e.what());
}
return runtime_env;
}
} // namespace ray

View file

@ -63,6 +63,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
TaskOptions options{};
options.name = call_options.name;
options.resources = call_options.resources;
options.serialized_runtime_env_info = call_options.serialized_runtime_env_info;
std::optional<std::vector<rpc::ObjectReference>> return_refs;
if (invocation.task_type == TaskType::ACTOR_TASK) {
return_refs = core_worker.SubmitActorTask(
@ -120,17 +121,19 @@ ActorID NativeTaskSubmitter::CreateActor(InvocationSpec &invocation,
bundle_id.second);
placement_group_scheduling_strategy->set_placement_group_capture_child_tasks(false);
}
ray::core::ActorCreationOptions actor_options{create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
create_options.resources,
resources,
/*dynamic_worker_options=*/{},
/*is_detached=*/std::nullopt,
name,
ray_namespace,
/*is_asyncio=*/false,
scheduling_strategy};
ray::core::ActorCreationOptions actor_options{
create_options.max_restarts,
/*max_task_retries=*/0,
create_options.max_concurrency,
create_options.resources,
resources,
/*dynamic_worker_options=*/{},
/*is_detached=*/std::nullopt,
name,
ray_namespace,
/*is_asyncio=*/false,
scheduling_strategy,
create_options.serialized_runtime_env_info};
ActorID actor_id;
auto status = core_worker.CreateActor(
BuildRayFunction(invocation), invocation.args, actor_options, "", &actor_id);

View file

@ -529,6 +529,122 @@ TEST(RayClusterModeTest, GetNamespaceApiTest) {
auto actor_handle = ray::Actor(RAY_FUNC(Counter::FactoryCreate)).Remote();
auto actor_ns = actor_handle.Task(&Counter::GetNamespaceInActor).Remote();
EXPECT_EQ(*actor_ns.Get(), ns);
ray::Shutdown();
}
class Pip {
public:
std::vector<std::string> packages;
bool pip_check = false;
Pip() = default;
Pip(const std::vector<std::string> &packages, bool pip_check)
: packages(packages), pip_check(pip_check) {}
};
void to_json(json &j, const Pip &pip) {
j = json{{"packages", pip.packages}, {"pip_check", pip.pip_check}};
};
void from_json(const json &j, Pip &pip) {
j.at("packages").get_to(pip.packages);
j.at("pip_check").get_to(pip.pip_check);
};
TEST(RayClusterModeTest, RuntimeEnvApiTest) {
ray::RuntimeEnv runtime_env;
// Set pip
std::vector<std::string> packages = {"requests"};
Pip pip(packages, true);
runtime_env.Set("pip", pip);
// Set working_dir
std::string working_dir = "https://path/to/working_dir.zip";
runtime_env.Set("working_dir", working_dir);
// Serialize
auto serialized_runtime_env = runtime_env.Serialize();
// Deserialize
auto runtime_env_2 = ray::RuntimeEnv::Deserialize(serialized_runtime_env);
auto pip2 = runtime_env_2.Get<Pip>("pip");
EXPECT_EQ(pip2.packages, pip.packages);
EXPECT_EQ(pip2.pip_check, pip.pip_check);
auto working_dir2 = runtime_env_2.Get<std::string>("working_dir");
EXPECT_EQ(working_dir2, working_dir);
// Construct runtime env with raw json string
ray::RuntimeEnv runtime_env_3;
std::string pip_raw_json_string =
R"({"packages":["requests","tensorflow"],"pip_check":false})";
runtime_env_3.SetJsonStr("pip", pip_raw_json_string);
auto get_json_result = runtime_env_3.GetJsonStr("pip");
EXPECT_EQ(get_json_result, pip_raw_json_string);
}
TEST(RayClusterModeTest, RuntimeEnvApiExceptionTest) {
ray::RuntimeEnv runtime_env;
EXPECT_THROW(runtime_env.Get<std::string>("working_dir"),
ray::internal::RayRuntimeEnvException);
runtime_env.Set("working_dir", "https://path/to/working_dir.zip");
EXPECT_THROW(runtime_env.Get<Pip>("working_dir"),
ray::internal::RayRuntimeEnvException);
EXPECT_THROW(runtime_env.SetJsonStr("pip", "{123"),
ray::internal::RayRuntimeEnvException);
EXPECT_THROW(runtime_env.GetJsonStr("pip"), ray::internal::RayRuntimeEnvException);
EXPECT_EQ(runtime_env.Empty(), false);
EXPECT_EQ(runtime_env.Remove("working_dir"), true);
// Do nothing when removing a non-existent key.
EXPECT_EQ(runtime_env.Remove("pip"), false);
EXPECT_EQ(runtime_env.Empty(), true);
}
TEST(RayClusterModeTest, RuntimeEnvTaskLevelEnvVarsTest) {
ray::RayConfig config;
ray::Init(config, cmd_argc, cmd_argv);
auto r0 = ray::Task(GetEnvVar).Remote("KEY1");
auto get_result0 = *(ray::Get(r0));
EXPECT_EQ("", get_result0);
auto actor_handle = ray::Actor(RAY_FUNC(Counter::FactoryCreate)).Remote();
auto r1 = actor_handle.Task(&Counter::GetEnvVar).Remote("KEY1");
auto get_result1 = *(ray::Get(r1));
EXPECT_EQ("", get_result1);
ray::RuntimeEnv runtime_env;
std::map<std::string, std::string> env_vars{{"KEY1", "value1"}};
runtime_env.Set("env_vars", env_vars);
auto r2 = ray::Task(GetEnvVar).SetRuntimeEnv(runtime_env).Remote("KEY1");
auto get_result2 = *(ray::Get(r2));
EXPECT_EQ("value1", get_result2);
ray::RuntimeEnv runtime_env2;
std::map<std::string, std::string> env_vars2{{"KEY1", "value2"}};
runtime_env2.Set("env_vars", env_vars2);
auto actor_handle2 =
ray::Actor(RAY_FUNC(Counter::FactoryCreate)).SetRuntimeEnv(runtime_env2).Remote();
auto r3 = actor_handle2.Task(&Counter::GetEnvVar).Remote("KEY1");
auto get_result3 = *(ray::Get(r3));
EXPECT_EQ("value2", get_result3);
ray::Shutdown();
}
TEST(RayClusterModeTest, RuntimeEnvJobLevelEnvVarsTest) {
ray::RayConfig config;
ray::RuntimeEnv runtime_env;
std::map<std::string, std::string> env_vars{{"KEY1", "value1"}};
runtime_env.Set("env_vars", env_vars);
config.runtime_env = runtime_env;
ray::Init(config, cmd_argc, cmd_argv);
auto r0 = ray::Task(GetEnvVar).Remote("KEY1");
auto get_result0 = *(ray::Get(r0));
EXPECT_EQ("value1", get_result0);
auto actor_handle = ray::Actor(RAY_FUNC(Counter::FactoryCreate)).Remote();
auto r1 = actor_handle.Task(&Counter::GetEnvVar).Remote("KEY1");
auto get_result1 = *(ray::Get(r1));
EXPECT_EQ("value1", get_result1);
ray::Shutdown();
}
int main(int argc, char **argv) {

View file

@ -106,6 +106,14 @@ RAY_REMOTE(RAY_FUNC(Counter::FactoryCreate),
&Counter::GetVal,
&Counter::GetIntVal,
&Counter::Initialized,
&Counter::CreateChildActor);
&Counter::CreateChildActor,
&Counter::GetEnvVar);
RAY_REMOTE(ActorConcurrentCall::FactoryCreate, &ActorConcurrentCall::CountDown);
std::string GetEnvVar(std::string key) {
auto value = std::getenv(key.c_str());
return value == NULL ? "" : std::string(value);
}
RAY_REMOTE(GetEnvVar);

View file

@ -50,11 +50,18 @@ class Counter {
bool Initialized() { return ray::IsInitialized(); }
std::string GetEnvVar(std::string key) {
auto value = std::getenv(key.c_str());
return value == NULL ? "" : std::string(value);
}
private:
int count;
bool is_restared = false;
};
std::string GetEnvVar(std::string key);
inline Counter *CreateCounter() { return new Counter(0); }
RAY_REMOTE(CreateCounter);

View file

@ -140,6 +140,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
options.metrics_agent_port = -1;
options.task_execution_callback = callback;
options.startup_token = ConfigInternal::Instance().startup_token;
options.runtime_env_hash = ConfigInternal::Instance().runtime_env_hash;
rpc::JobConfig job_config;
job_config.set_default_actor_lifetime(
ConfigInternal::Instance().default_actor_lifetime);
@ -148,6 +149,10 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
job_config.add_code_search_path(path);
}
job_config.set_ray_namespace(ConfigInternal::Instance().ray_namespace);
if (ConfigInternal::Instance().runtime_env) {
job_config.mutable_runtime_env_info()->set_serialized_runtime_env(
ConfigInternal::Instance().runtime_env->Serialize());
}
std::string serialized_job_config;
RAY_CHECK(job_config.SerializeToString(&serialized_job_config));
options.serialized_job_config = serialized_job_config;

View file

@ -51,9 +51,10 @@ public interface RuntimeEnv {
* Remove a runtime env field by name.
*
* @param name The build-in names or a runtime env plugin name.
* @return true if remove an existing field, otherwise false.
* @throws RuntimeEnvException
*/
public void remove(String name) throws RuntimeEnvException;
public boolean remove(String name) throws RuntimeEnvException;
/**
* Serialize the runtime env to string.

View file

@ -65,8 +65,12 @@ public class RuntimeEnvImpl implements RuntimeEnv {
}
@Override
public void remove(String name) {
runtimeEnvs.remove(name);
public boolean remove(String name) {
if (runtimeEnvs.has(name)) {
runtimeEnvs.remove(name);
return true;
}
return false;
}
@Override

View file

@ -238,9 +238,10 @@ public class RuntimeEnvTest {
Assert.assertEquals(pip2.getPackages(), pip.getPackages());
Assert.assertEquals(pip2.getPip_check(), pip.getPip_check());
runtimeEnv2.remove("working_dir");
runtimeEnv2.remove("py_modules");
runtimeEnv2.remove("pip");
Assert.assertEquals(runtimeEnv2.remove("working_dir"), true);
Assert.assertEquals(runtimeEnv2.remove("py_modules"), true);
Assert.assertEquals(runtimeEnv2.remove("pip"), true);
Assert.assertEquals(runtimeEnv2.remove("conda"), false);
Assert.assertEquals(runtimeEnv2.get("working_dir", String.class), null);
Assert.assertEquals(runtimeEnv2.get("py_modules", String[].class), null);
Assert.assertEquals(runtimeEnv2.get("pip", Pip.class), null);

View file

@ -1571,6 +1571,7 @@ def start_raylet(
session_dir,
log_dir,
node_ip_address,
setup_worker_path,
)
else:
cpp_worker_command = []
@ -1785,6 +1786,7 @@ def build_cpp_worker_command(
session_dir: str,
log_dir: str,
node_ip_address: str,
setup_worker_path: str,
):
"""This method assembles the command used to start a CPP worker.
@ -1797,11 +1799,15 @@ def build_cpp_worker_command(
session_dir: The path of this session.
log_dir: The path of logs.
node_ip_address: The ip address for this node.
setup_worker_path: The path of the Python file that will set up
the environment for the worker process.
Returns:
The command string for starting CPP worker.
"""
command = [
sys.executable,
setup_worker_path,
DEFAULT_WORKER_EXECUTABLE,
f"--ray_plasma_store_socket_name={plasma_store_name}",
f"--ray_raylet_socket_name={raylet_name}",

View file

@ -362,27 +362,29 @@ std::tuple<Process, StartupToken> WorkerPool::StartWorkerProcess(
std::to_string(worker_startup_token_counter_));
}
if (language == Language::PYTHON || language == Language::JAVA) {
if (serialized_runtime_env_context != "{}" &&
!serialized_runtime_env_context.empty()) {
worker_command_args.push_back("--language=" + Language_Name(language));
if (serialized_runtime_env_context != "{}" && !serialized_runtime_env_context.empty()) {
worker_command_args.push_back("--language=" + Language_Name(language));
if (language == Language::CPP) {
worker_command_args.push_back("--ray_runtime_env_hash=" +
std::to_string(runtime_env_hash));
} else {
worker_command_args.push_back("--runtime-env-hash=" +
std::to_string(runtime_env_hash));
worker_command_args.push_back("--serialized-runtime-env-context=" +
serialized_runtime_env_context);
} else if (language == Language::PYTHON && worker_command_args.size() >= 2 &&
worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) {
// Check that the arg really is the path to the setup worker before erasing it, to
// prevent breaking tests that mock out the worker command args.
worker_command_args.erase(worker_command_args.begin() + 1,
worker_command_args.begin() + 2);
} else if (language == Language::JAVA) {
worker_command_args.push_back("--language=" + Language_Name(language));
}
worker_command_args.push_back("--serialized-runtime-env-context=" +
serialized_runtime_env_context);
} else if (language == Language::PYTHON && worker_command_args.size() >= 2 &&
worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) {
// Check that the arg really is the path to the setup worker before erasing it, to
// prevent breaking tests that mock out the worker command args.
worker_command_args.erase(worker_command_args.begin() + 1,
worker_command_args.begin() + 2);
} else {
worker_command_args.push_back("--language=" + Language_Name(language));
}
if (ray_debugger_external) {
worker_command_args.push_back("--ray-debugger-external");
}
if (ray_debugger_external) {
worker_command_args.push_back("--ray-debugger-external");
}
ProcessEnvironment env;