[C++ API] support head_args config in C++ API (#18709)

This commit is contained in:
Guyang Song 2021-09-23 19:30:53 +08:00 committed by Jiajun Yao
parent 9ec62ee865
commit 80990d18c7
8 changed files with 28 additions and 59 deletions

View file

@ -213,7 +213,7 @@ test_cpp() {
--test_arg=--ray_redis_password="1234"
# run the cpp example
cd cpp/example && sh run.sh
cd cpp/example && bash run.sh
}
test_wheels() {

View file

@ -152,6 +152,7 @@ cc_test(
]),
args = [
"--ray_code_search_path=$(location plus.so):$(location counter.so)",
"--ray_head_args '--include-dashboard false'",
],
copts = COPTS,
data = [

View file

@ -37,21 +37,15 @@ class RayConfig {
// Only searching the top level under a directory.
std::vector<std::string> code_search_path;
// The command line args to be appended as parameters of the `ray start` command. It
// takes effect only if Ray head is started by a driver. Run `ray start --help` for
// details.
std::vector<std::string> head_args = {};
/* The following are unstable parameters and their use is discouraged. */
// Prevents external clients without the password from connecting to Redis if provided.
boost::optional<std::string> redis_password_;
// Number of CPUs the user wishes to assign to each raylet. By default, this is set
// based on virtual cores.
int num_cpus = -1;
// Number of GPUs the user wishes to assign to each raylet. By default, this is set
// based on detected GPUs.
int num_gpus = -1;
// A mapping the names of custom resources to the quantities for them available.
std::unordered_map<std::string, int> resources;
};
} // namespace ray

View file

@ -48,6 +48,11 @@ ABSL_FLAG(std::string, ray_logs_dir, "", "Logs dir for workers.");
ABSL_FLAG(std::string, ray_node_ip_address, "", "The ip address for this node.");
ABSL_FLAG(std::string, ray_head_args, "",
"The command line args to be appended as parameters of the `ray start` "
"command. It takes effect only if Ray head is started by a driver. Run `ray "
"start --help` for details.");
namespace ray {
namespace internal {
@ -62,14 +67,8 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
if (config.redis_password_) {
redis_password = *config.redis_password_;
}
if (config.num_cpus >= 0) {
num_cpus = config.num_cpus;
}
if (config.num_gpus >= 0) {
num_gpus = config.num_gpus;
}
if (!config.resources.empty()) {
resources = config.resources;
if (!config.head_args.empty()) {
head_args = config.head_args;
}
if (argc != 0 && argv != nullptr) {
// Parse config from command line.
@ -107,6 +106,11 @@ void ConfigInternal::Init(RayConfig &config, int argc, char **argv) {
if (!FLAGS_ray_node_ip_address.CurrentValue().empty()) {
node_ip_address = FLAGS_ray_node_ip_address.CurrentValue();
}
if (!FLAGS_ray_head_args.CurrentValue().empty()) {
std::vector<std::string> args =
absl::StrSplit(FLAGS_ray_head_args.CurrentValue(), ' ', absl::SkipEmpty());
head_args.insert(head_args.end(), args.begin(), args.end());
}
}
if (worker_type == WorkerType::DRIVER && run_mode == RunMode::CLUSTER) {
if (redis_ip.empty()) {

View file

@ -53,11 +53,7 @@ class ConfigInternal {
std::string node_ip_address = "";
int num_cpus = -1;
int num_gpus = -1;
std::unordered_map<std::string, int> resources;
std::vector<std::string> head_args = {};
static ConfigInternal &Instance() {
static ConfigInternal config;

View file

@ -37,8 +37,8 @@ TEST(RayClusterModeTest, Initialized) {
TEST(RayClusterModeTest, FullTest) {
ray::RayConfig config;
config.num_cpus = 2;
config.resources = {{"resource1", 1}, {"resource2", 2}};
config.head_args = {"--num-cpus", "2", "--resources",
"{\"resource1\":1,\"resource2\":2}"};
if (absl::GetFlag<bool>(FLAGS_external_cluster)) {
auto port = absl::GetFlag<int32_t>(FLAGS_redis_port);
std::string password = absl::GetFlag<std::string>(FLAGS_redis_password);

View file

@ -25,37 +25,13 @@ namespace internal {
using ray::core::CoreWorkerProcess;
using ray::core::WorkerType;
std::string FormatResourcesArg(const std::unordered_map<std::string, int> &resources) {
std::ostringstream oss;
oss << "{";
for (auto iter = resources.begin(); iter != resources.end();) {
oss << "\"" << iter->first << "\":" << iter->second;
++iter;
if (iter != resources.end()) {
oss << ",";
}
}
oss << "}";
return oss.str();
}
void ProcessHelper::StartRayNode(const int redis_port, const std::string redis_password,
const int num_cpus, const int num_gpus,
const std::unordered_map<std::string, int> resources) {
const std::vector<std::string> &head_args) {
std::vector<std::string> cmdargs({"ray", "start", "--head", "--port",
std::to_string(redis_port), "--redis-password",
redis_password, "--include-dashboard", "false"});
if (num_cpus >= 0) {
cmdargs.emplace_back("--num-cpus");
cmdargs.emplace_back(std::to_string(num_cpus));
}
if (num_gpus >= 0) {
cmdargs.emplace_back("--num-gpus");
cmdargs.emplace_back(std::to_string(num_gpus));
}
if (!resources.empty()) {
cmdargs.emplace_back("--resources");
cmdargs.emplace_back(FormatResourcesArg(resources));
redis_password});
if (!head_args.empty()) {
cmdargs.insert(cmdargs.end(), head_args.begin(), head_args.end());
}
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
@ -85,8 +61,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
redis_ip = "127.0.0.1";
StartRayNode(ConfigInternal::Instance().redis_port,
ConfigInternal::Instance().redis_password,
ConfigInternal::Instance().num_cpus, ConfigInternal::Instance().num_gpus,
ConfigInternal::Instance().resources);
ConfigInternal::Instance().head_args);
}
if (redis_ip == "127.0.0.1") {
redis_ip = GetNodeIpAddress();

View file

@ -30,8 +30,7 @@ class ProcessHelper {
void RayStart(CoreWorkerOptions::TaskExecutionCallback callback);
void RayStop();
void StartRayNode(const int redis_port, const std::string redis_password,
const int num_cpus = -1, const int num_gpus = -1,
const std::unordered_map<std::string, int> resources = {});
const std::vector<std::string> &head_args = {});
void StopRayNode();
static ProcessHelper &GetInstance() {