Refactor command line argument parsing with gflags (#4676)

This commit is contained in:
Qing Wang 2019-04-24 14:53:07 +08:00 committed by Hao Chen
parent 4dd628a837
commit f39b6747e5
6 changed files with 96 additions and 61 deletions

View file

@ -15,6 +15,7 @@ cc_binary(
deps = [
":ray_util",
":raylet_lib",
"@com_github_gflags_gflags//:gflags",
],
)

View file

@ -253,20 +253,21 @@ public class RunManager {
List<String> command = ImmutableList.of(
// The raylet executable file.
getTempFile("/raylet").getAbsolutePath(),
rayConfig.rayletSocketName,
rayConfig.objectStoreSocketName,
"0", // The object manager port.
"0", // The node manager port.
rayConfig.nodeIp,
rayConfig.getRedisIp(),
rayConfig.getRedisPort().toString(),
"0", // number of initial workers
String.valueOf(maximumStartupConcurrency),
ResourceUtil.getResourcesStringFromMap(rayConfig.resources),
String.join(",", rayConfig.rayletConfigParameters), // The internal config list.
buildPythonWorkerCommand(), // python worker command
buildWorkerCommandRaylet(), // java worker command
redisPasswordOption
String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName),
String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName),
String.format("--object_manager_port=%d", 0), // The object manager port.
String.format("--node_manager_port=%d", 0), // The node manager port.
String.format("--node_ip_address=%s",rayConfig.nodeIp),
String.format("--redis_address=%s", rayConfig.getRedisIp()),
String.format("--redis_port=%d", rayConfig.getRedisPort()),
String.format("--num_initial_workers=%d", 0), // number of initial workers
String.format("--maximum_startup_concurrency=%d", maximumStartupConcurrency),
String.format("--static_resource_list=%s",
ResourceUtil.getResourcesStringFromMap(rayConfig.resources)),
String.format("--config_list=%s", String.join(",", rayConfig.rayletConfigParameters)),
String.format("--python_worker_command=%s", buildPythonWorkerCommand()),
String.format("--java_worker_command=%s", buildWorkerCommandRaylet()),
String.format("--redis_password=%s", redisPasswordOption)
);
startProcess(command, null, "raylet");

View file

@ -1177,21 +1177,21 @@ def start_raylet(redis_address,
command = [
RAYLET_EXECUTABLE,
raylet_name,
plasma_store_name,
str(object_manager_port),
str(node_manager_port),
node_ip_address,
gcs_ip_address,
gcs_port,
str(num_initial_workers),
str(maximum_startup_concurrency),
resource_argument,
config_str,
start_worker_command,
java_worker_command,
redis_password or "",
temp_dir,
"--raylet_socket_name={}".format(raylet_name),
"--store_socket_name={}".format(plasma_store_name),
"--object_manager_port={}".format(object_manager_port),
"--node_manager_port={}".format(node_manager_port),
"--node_ip_address={}".format(node_ip_address),
"--redis_address={}".format(gcs_ip_address),
"--redis_port={}".format(gcs_port),
"--num_initial_workers={}".format(num_initial_workers),
"--maximum_startup_concurrency={}".format(maximum_startup_concurrency),
"--static_resource_list={}".format(resource_argument),
"--config_list={}".format(config_str),
"--python_worker_command={}".format(start_worker_command),
"--java_worker_command={}".format(java_worker_command),
"--redis_password={}".format(redis_password or ""),
"--temp_dir={}".format(temp_dir),
]
process_info = start_ray_process(
command,
@ -1555,7 +1555,12 @@ def start_raylet_monitor(redis_address,
redis_password = redis_password or ""
config = config or {}
config_str = ",".join(["{},{}".format(*kv) for kv in config.items()])
command = [RAYLET_MONITOR_EXECUTABLE, gcs_ip_address, gcs_port, config_str]
command = [
RAYLET_MONITOR_EXECUTABLE,
"--redis_address={}".format(gcs_ip_address),
"--redis_port={}".format(gcs_port),
"--config_list={}".format(config_str),
]
if redis_password:
command += [redis_password]
process_info = start_ray_process(

View file

@ -5,6 +5,28 @@
#include "ray/stats/stats.h"
#include "ray/status.h"
#include "gflags/gflags.h"
DEFINE_string(raylet_socket_name, "", "The socket name of raylet.");
DEFINE_string(store_socket_name, "", "The socket name of object store.");
DEFINE_int32(object_manager_port, -1, "The port of object manager.");
DEFINE_int32(node_manager_port, -1, "The port of node manager.");
DEFINE_string(node_ip_address, "", "The ip address of this node.");
DEFINE_string(redis_address, "", "The ip address of redis server.");
DEFINE_int32(redis_port, -1, "The port of redis server.");
DEFINE_int32(num_initial_workers, 0, "Number of initial workers.");
DEFINE_int32(maximum_startup_concurrency, 1, "Maximum startup concurrency");
DEFINE_string(static_resource_list, "", "The static resource list of this node.");
DEFINE_string(config_list, "", "The raylet config list of this node.");
DEFINE_string(python_worker_command, "", "Python worker command.");
DEFINE_string(java_worker_command, "", "Java worker command.");
DEFINE_string(redis_password, "", "The password of redis.");
DEFINE_string(temp_dir, "", "Temporary directory.");
DEFINE_bool(disable_stats, false, "Whether disable the stats.");
DEFINE_string(stat_address, "127.0.0.1:8888", "The address that we report metrics to.");
DEFINE_bool(enable_stdout_exporter, false,
"Whether enable the stdout exporter for stats.");
#ifndef RAYLET_TEST
/// A helper function that parse the worker command string into a vector of arguments.
@ -21,37 +43,35 @@ int main(int argc, char *argv[]) {
ray::RayLogLevel::INFO,
/*log_dir=*/"");
ray::RayLog::InstallFailureSignalHandler();
RAY_CHECK(argc >= 14 && argc <= 19);
const std::string raylet_socket_name = std::string(argv[1]);
const std::string store_socket_name = std::string(argv[2]);
int object_manager_port = std::stoi(argv[3]);
int node_manager_port = std::stoi(argv[4]);
const std::string node_ip_address = std::string(argv[5]);
const std::string redis_address = std::string(argv[6]);
int redis_port = std::stoi(argv[7]);
int num_initial_workers = std::stoi(argv[8]);
int maximum_startup_concurrency = std::stoi(argv[9]);
const std::string static_resource_list = std::string(argv[10]);
const std::string config_list = std::string(argv[11]);
const std::string python_worker_command = std::string(argv[12]);
const std::string java_worker_command = std::string(argv[13]);
const std::string redis_password = (argc >= 15 ? std::string(argv[14]) : "");
const std::string temp_dir = (argc >= 16 ? std::string(argv[15]) : "/tmp/ray");
const std::string disable_stats_str(argc >= 17 ? std::string(argv[16]) : "false");
const bool disable_stats("true" == disable_stats_str);
const std::string stat_address =
(argc >= 18 ? std::string(argv[17]) : "127.0.0.1:8888");
const std::string disable_stdout_exporter_str(argc >= 19 ? std::string(argv[18])
: "true");
const bool disable_stdout_exporter("true" == disable_stdout_exporter_str);
gflags::ParseCommandLineFlags(&argc, &argv, true);
const std::string raylet_socket_name = FLAGS_raylet_socket_name;
const std::string store_socket_name = FLAGS_store_socket_name;
const int object_manager_port = static_cast<int>(FLAGS_object_manager_port);
const int node_manager_port = static_cast<int>(FLAGS_node_manager_port);
const std::string node_ip_address = FLAGS_node_ip_address;
const std::string redis_address = FLAGS_redis_address;
const int redis_port = static_cast<int>(FLAGS_redis_port);
const int num_initial_workers = static_cast<int>(FLAGS_num_initial_workers);
const int maximum_startup_concurrency =
static_cast<int>(FLAGS_maximum_startup_concurrency);
const std::string static_resource_list = FLAGS_static_resource_list;
const std::string config_list = FLAGS_config_list;
const std::string python_worker_command = FLAGS_python_worker_command;
const std::string java_worker_command = FLAGS_java_worker_command;
const std::string redis_password = FLAGS_redis_password;
const std::string temp_dir = FLAGS_temp_dir;
const bool disable_stats = FLAGS_disable_stats;
const std::string stat_address = FLAGS_stat_address;
const bool enable_stdout_exporter = FLAGS_enable_stdout_exporter;
gflags::ShutDownCommandLineFlags();
// Initialize stats.
const ray::stats::TagsType global_tags = {
{ray::stats::JobNameKey, "raylet"},
{ray::stats::VersionKey, "0.7.0"},
{ray::stats::NodeAddressKey, node_ip_address}};
ray::stats::Init(stat_address, global_tags, disable_stats, disable_stdout_exporter);
ray::stats::Init(stat_address, global_tags, disable_stats, enable_stdout_exporter);
// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;

View file

@ -4,17 +4,25 @@
#include "ray/raylet/monitor.h"
#include "ray/util/util.h"
#include "gflags/gflags.h"
DEFINE_string(redis_address, "", "The ip address of redis.");
DEFINE_int32(redis_port, -1, "The port of redis.");
DEFINE_string(config_list, "", "The config list of raylet.");
DEFINE_string(redis_password, "", "The password of redis.");
int main(int argc, char *argv[]) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
ray::RayLog::ShutDownRayLog, argv[0],
ray::RayLogLevel::INFO, /*log_dir=*/"");
ray::RayLog::InstallFailureSignalHandler();
RAY_CHECK(argc == 4 || argc == 5);
const std::string redis_address = std::string(argv[1]);
int redis_port = std::stoi(argv[2]);
const std::string config_list = std::string(argv[3]);
const std::string redis_password = (argc == 5 ? std::string(argv[4]) : "");
gflags::ParseCommandLineFlags(&argc, &argv, true);
const std::string redis_address = FLAGS_redis_address;
const int redis_port = static_cast<int>(FLAGS_redis_port);
const std::string config_list = FLAGS_config_list;
const std::string redis_password = FLAGS_redis_password;
gflags::ShutDownCommandLineFlags();
std::unordered_map<std::string, int> raylet_config;

View file

@ -23,7 +23,7 @@ namespace stats {
/// Initialize stats.
static void Init(const std::string &address, const TagsType &global_tags,
bool disable_stats = false, bool disable_stdout_exporter = true) {
bool disable_stats = false, bool enable_stdout_exporter = false) {
StatsConfig::instance().SetIsDisableStats(disable_stats);
if (disable_stats) {
RAY_LOG(INFO) << "Disabled stats.";
@ -36,7 +36,7 @@ static void Init(const std::string &address, const TagsType &global_tags,
static auto exporter =
std::make_shared<opencensus::exporters::stats::PrometheusExporter>();
if (!disable_stdout_exporter) {
if (enable_stdout_exporter) {
// Enable stdout exporter by default.
opencensus::exporters::stats::StdoutExporter::Register();
}