mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[C++] Use RayConfig to read internal environment variables only once (#18869)
* store environ on first access * fix * Use RayConfig * fix * fix * Revert removal of headers. They are actually used. * rename * fix lint * format * use std::getenv() * fix
This commit is contained in:
parent
65fa740c3b
commit
66aac2e219
10 changed files with 80 additions and 36 deletions
|
@ -45,6 +45,8 @@ void RayConfig::initialize(const std::string &config_list) {
|
|||
// We use a big chain of if else statements because C++ doesn't allow
|
||||
// switch statements on strings.
|
||||
#include "ray/common/ray_config_def.h"
|
||||
// "ray/common/ray_internal_flag_def.h" is intentionally not included,
|
||||
// because it contains Ray internal settings.
|
||||
RAY_LOG(FATAL) << "Received unexpected config parameter " << pair.key();
|
||||
}
|
||||
if (RAY_LOG_ENABLED(DEBUG)) {
|
||||
|
|
|
@ -49,7 +49,7 @@ class RayConfig {
|
|||
/// In particular, this generates a private field called `name_` and a public getter
|
||||
/// method called `name()` for a given config item.
|
||||
///
|
||||
/// Configs defined in this way can be overriden by setting the env variable
|
||||
/// Configs defined in this way can be overridden by setting the env variable
|
||||
/// RAY_{name}=value where {name} is the variable name.
|
||||
///
|
||||
/// \param type Type of the config item.
|
||||
|
@ -63,7 +63,16 @@ class RayConfig {
|
|||
inline type &name() { return name##_; }
|
||||
|
||||
#include "ray/common/ray_config_def.h"
|
||||
/// -------------------------------------------------------------------------
|
||||
|
||||
/// -----------Include ray_internal_flag_def.h to define internal flags-------
|
||||
/// RAY_INTERNAL_FLAG defines RayConfig fields similar to the RAY_CONFIG macro.
|
||||
/// The difference is that RAY_INTERNAL_FLAG is intended for Ray internal
|
||||
/// settings that users should not modify.
|
||||
#define RAY_INTERNAL_FLAG RAY_CONFIG
|
||||
|
||||
#include "ray/common/ray_internal_flag_def.h"
|
||||
|
||||
#undef RAY_INTERNAL_FLAG
|
||||
#undef RAY_CONFIG
|
||||
|
||||
public:
|
||||
|
@ -74,7 +83,7 @@ class RayConfig {
|
|||
private:
|
||||
template <typename T>
|
||||
T ReadEnv(const std::string &name, const std::string &type_string, T default_value) {
|
||||
auto value = getenv(name.c_str());
|
||||
auto value = std::getenv(name.c_str());
|
||||
if (value == nullptr) {
|
||||
return default_value;
|
||||
} else {
|
||||
|
|
|
@ -107,8 +107,8 @@ RAY_CONFIG(bool, scheduler_hybrid_scheduling, true)
|
|||
/// to prefer spreading tasks to other nodes. This balances between locality and
|
||||
/// even balancing of load. Low values (min 0.0) encourage more load spreading.
|
||||
RAY_CONFIG(float, scheduler_spread_threshold,
|
||||
getenv("RAY_SCHEDULER_SPREAD_THRESHOLD") != nullptr
|
||||
? std::stof(getenv("RAY_SCHEDULER_SPREAD_THRESHOLD"))
|
||||
std::getenv("RAY_SCHEDULER_SPREAD_THRESHOLD") != nullptr
|
||||
? std::stof(std::getenv("RAY_SCHEDULER_SPREAD_THRESHOLD"))
|
||||
: 0.5)
|
||||
|
||||
// The max allowed size in bytes of a return object from direct actor calls.
|
||||
|
@ -429,8 +429,8 @@ RAY_CONFIG(uint64_t, gcs_actor_table_min_duration_ms, /* 5 min */ 60 * 1000 * 5
|
|||
|
||||
/// Whether to enable GCS-based actor scheduling.
|
||||
RAY_CONFIG(bool, gcs_actor_scheduling_enabled,
|
||||
getenv("RAY_GCS_ACTOR_SCHEDULING_ENABLED") != nullptr &&
|
||||
getenv("RAY_GCS_ACTOR_SCHEDULING_ENABLED") == std::string("true"))
|
||||
std::getenv("RAY_GCS_ACTOR_SCHEDULING_ENABLED") != nullptr &&
|
||||
std::getenv("RAY_GCS_ACTOR_SCHEDULING_ENABLED") == std::string("true"))
|
||||
|
||||
RAY_CONFIG(uint32_t, max_error_msg_size_bytes, 512 * 1024)
|
||||
|
||||
|
@ -441,8 +441,8 @@ RAY_CONFIG(bool, enable_light_weight_resource_report, true)
|
|||
// fast, but when RAY_preallocate_plasma_memory=1 is set, it may take some time
|
||||
// (a few GB/s) to populate all the pages on Raylet startup.
|
||||
RAY_CONFIG(uint32_t, raylet_start_wait_time_s,
|
||||
getenv("RAY_preallocate_plasma_memory") != nullptr &&
|
||||
getenv("RAY_preallocate_plasma_memory") == std::string("1")
|
||||
std::getenv("RAY_preallocate_plasma_memory") != nullptr &&
|
||||
std::getenv("RAY_preallocate_plasma_memory") == std::string("1")
|
||||
? 120
|
||||
: 10)
|
||||
|
||||
|
@ -488,3 +488,6 @@ RAY_CONFIG(std::string, event_level, "warning")
|
|||
|
||||
/// Whether to avoid scheduling cpu requests on gpu nodes
|
||||
RAY_CONFIG(bool, scheduler_avoid_gpu_nodes, true)
|
||||
|
||||
/// Whether to skip running local GC in runtime env.
|
||||
RAY_CONFIG(bool, runtime_env_skip_local_gc, false)
|
||||
|
|
29
src/ray/common/ray_internal_flag_def.h
Normal file
29
src/ray/common/ray_internal_flag_def.h
Normal file
|
@ -0,0 +1,29 @@
|
|||
// Copyright 2021 The Ray Authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// This header file contains Ray internal flags that should not be set
|
||||
// by users. They are set by Ray with environment variable
|
||||
// RAY_{VARIABLE NAME}.
|
||||
//
|
||||
// The format is used to avoid code duplication.
|
||||
// It can be included multiple times in ray_config.h, and each inclusion
|
||||
// could use a different definition of the RAY_INTERNAL_FLAG macro.
|
||||
// Macro definition format: RAY_INTERNAL_FLAG(type, name, default_value).
|
||||
// NOTE: This file should NOT be included in any file other than ray_config.h.
|
||||
|
||||
/// Ray Job ID.
|
||||
RAY_INTERNAL_FLAG(std::string, JOB_ID, "")
|
||||
|
||||
/// Raylet process ID.
|
||||
RAY_INTERNAL_FLAG(std::string, RAYLET_PID, "")
|
|
@ -23,7 +23,6 @@
|
|||
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
|
||||
#include "ray/stats/stats.h"
|
||||
#include "ray/util/event.h"
|
||||
#include "ray/util/process.h"
|
||||
#include "ray/util/util.h"
|
||||
|
||||
namespace ray {
|
||||
|
@ -65,8 +64,8 @@ JobID GetProcessJobID(const CoreWorkerOptions &options) {
|
|||
|
||||
if (options.worker_type == WorkerType::WORKER) {
|
||||
// For workers, the job ID is assigned by Raylet via an environment variable.
|
||||
const char *job_id_env = std::getenv(kEnvVarKeyJobId);
|
||||
RAY_CHECK(job_id_env);
|
||||
const std::string &job_id_env = RayConfig::instance().JOB_ID();
|
||||
RAY_CHECK(!job_id_env.empty());
|
||||
return JobID::FromHex(job_id_env);
|
||||
}
|
||||
return options.job_id;
|
||||
|
@ -936,8 +935,8 @@ void CoreWorker::RegisterToGcs() {
|
|||
void CoreWorker::CheckForRayletFailure() {
|
||||
// When running worker process in container, the worker parent process is not raylet.
|
||||
// So we add RAY_RAYLET_PID enviroment to ray worker process.
|
||||
if (const char *env_pid = std::getenv("RAY_RAYLET_PID")) {
|
||||
pid_t pid = static_cast<pid_t>(std::atoi(env_pid));
|
||||
if (auto env_pid = RayConfig::instance().RAYLET_PID(); !env_pid.empty()) {
|
||||
auto pid = static_cast<pid_t>(std::stoi(env_pid));
|
||||
if (!IsProcessAlive(pid)) {
|
||||
RAY_LOG(ERROR) << "Raylet failed. Shutting down. Raylet PID: " << pid;
|
||||
Shutdown();
|
||||
|
|
|
@ -177,16 +177,16 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
|
|||
: self_node_id_(self_node_id),
|
||||
io_service_(io_service),
|
||||
gcs_client_(gcs_client),
|
||||
worker_pool_(io_service, self_node_id_, config.node_manager_address,
|
||||
config.num_workers_soft_limit,
|
||||
config.num_initial_python_workers_for_first_job,
|
||||
config.maximum_startup_concurrency, config.min_worker_port,
|
||||
config.max_worker_port, config.worker_ports, gcs_client_,
|
||||
config.worker_commands,
|
||||
/*starting_worker_timeout_callback=*/
|
||||
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
|
||||
config.ray_debugger_external,
|
||||
/*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),
|
||||
worker_pool_(
|
||||
io_service, self_node_id_, config.node_manager_address,
|
||||
config.num_workers_soft_limit, config.num_initial_python_workers_for_first_job,
|
||||
config.maximum_startup_concurrency, config.min_worker_port,
|
||||
config.max_worker_port, config.worker_ports, gcs_client_,
|
||||
config.worker_commands,
|
||||
/*starting_worker_timeout_callback=*/
|
||||
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
|
||||
config.ray_debugger_external,
|
||||
/*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),
|
||||
client_call_manager_(io_service),
|
||||
worker_rpc_pool_(client_call_manager_),
|
||||
core_worker_subscriber_(std::make_unique<pubsub::Subscriber>(
|
||||
|
@ -284,7 +284,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
|
|||
local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9),
|
||||
record_metrics_period_ms_(config.record_metrics_period_ms),
|
||||
runtime_env_manager_([this](const std::string &uri, std::function<void(bool)> cb) {
|
||||
if (std::getenv("RUNTIME_ENV_SKIP_LOCAL_GC") != nullptr) {
|
||||
if (RayConfig::instance().runtime_env_skip_local_gc()) {
|
||||
return cb(true);
|
||||
}
|
||||
return agent_manager_->DeleteURIs({uri}, cb);
|
||||
|
|
|
@ -265,7 +265,7 @@ Process WorkerPool::StartWorkerProcess(
|
|||
// Append user-defined per-process options here
|
||||
options.insert(options.end(), dynamic_options.begin(), dynamic_options.end());
|
||||
|
||||
// Extract pointers from the worker command to pass into execvp.
|
||||
// Extract pointers from the worker command to pass into execvpe.
|
||||
std::vector<std::string> worker_command_args;
|
||||
for (auto const &token : state.worker_command) {
|
||||
if (token == kWorkerDynamicOptionPlaceholder) {
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
|
||||
#include "ray/util/filesystem.h"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <cstdlib>
|
||||
|
||||
#include "ray/util/logging.h"
|
||||
|
||||
|
@ -80,11 +80,11 @@ std::string GetUserTempDir() {
|
|||
n = GetTempPath(static_cast<DWORD>(result.size()), &*result.begin());
|
||||
}
|
||||
result.resize(0 < n && n <= result.size() ? static_cast<size_t>(n) : 0);
|
||||
#else // not Linux, Darwin, or Windows
|
||||
#else // not Darwin, or Windows
|
||||
const char *candidates[] = {"TMPDIR", "TMP", "TEMP", "TEMPDIR"};
|
||||
const char *found = NULL;
|
||||
for (char const *candidate : candidates) {
|
||||
found = getenv(candidate);
|
||||
found = std::getenv(candidate);
|
||||
if (found) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -14,14 +14,13 @@
|
|||
|
||||
#include "ray/util/logging.h"
|
||||
|
||||
#include <cstdlib>
|
||||
#ifdef _WIN32
|
||||
#include <process.h>
|
||||
#else
|
||||
#include <execinfo.h>
|
||||
#endif
|
||||
|
||||
#include <signal.h>
|
||||
#include <stdlib.h>
|
||||
#ifndef _WIN32
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
|
@ -170,7 +169,7 @@ std::vector<FatalLogCallback> RayLog::fatal_log_callbacks_;
|
|||
|
||||
void RayLog::StartRayLog(const std::string &app_name, RayLogLevel severity_threshold,
|
||||
const std::string &log_dir) {
|
||||
const char *var_value = getenv("RAY_BACKEND_LOG_LEVEL");
|
||||
const char *var_value = std::getenv("RAY_BACKEND_LOG_LEVEL");
|
||||
if (var_value != nullptr) {
|
||||
std::string data = var_value;
|
||||
std::transform(data.begin(), data.end(), data.begin(), ::tolower);
|
||||
|
@ -223,16 +222,16 @@ void RayLog::StartRayLog(const std::string &app_name, RayLogLevel severity_thres
|
|||
#endif
|
||||
// Reset log pattern and level and we assume a log file can be rotated with
|
||||
// 10 files in max size 512M by default.
|
||||
if (getenv("RAY_ROTATION_MAX_BYTES")) {
|
||||
long max_size = std::atol(getenv("RAY_ROTATION_MAX_BYTES"));
|
||||
if (std::getenv("RAY_ROTATION_MAX_BYTES")) {
|
||||
long max_size = std::atol(std::getenv("RAY_ROTATION_MAX_BYTES"));
|
||||
// 0 means no log rotation in python, but not in spdlog. We just use the default
|
||||
// value here.
|
||||
if (max_size != 0) {
|
||||
log_rotation_max_size_ = max_size;
|
||||
}
|
||||
}
|
||||
if (getenv("RAY_ROTATION_BACKUP_COUNT")) {
|
||||
long file_num = std::atol(getenv("RAY_ROTATION_BACKUP_COUNT"));
|
||||
if (std::getenv("RAY_ROTATION_BACKUP_COUNT")) {
|
||||
long file_num = std::atol(std::getenv("RAY_ROTATION_BACKUP_COUNT"));
|
||||
if (file_num != 0) {
|
||||
log_rotation_file_num_ = file_num;
|
||||
}
|
||||
|
|
|
@ -51,6 +51,9 @@ extern char **environ;
|
|||
int execvpe(const char *program, char *const argv[], char *const envp[]) {
|
||||
char **saved = environ;
|
||||
int rc;
|
||||
// Mutating environ is generally unsafe, but this logic only runs on the
|
||||
// start of a worker process. There should be no concurrent access to the
|
||||
// environment.
|
||||
environ = const_cast<char **>(envp);
|
||||
rc = execvp(program, argv);
|
||||
environ = saved;
|
||||
|
|
Loading…
Add table
Reference in a new issue