Add gcs_service_enabled function to avoid getting environment variable directly (#7742)

This commit is contained in:
fangfengbin 2020-03-26 22:02:53 +08:00 committed by GitHub
parent ca6eabc9cb
commit e196fcdbaf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 31 additions and 21 deletions

View file

@ -72,6 +72,8 @@ public class RayConfig {
public final String jobResourcePath;
public final String pythonWorkerCommand;
public final boolean gcsServiceEnabled;
private static volatile RayConfig instance = null;
public static RayConfig getInstance() {
@ -223,6 +225,9 @@ public class RayConfig {
numWorkersPerProcess = config.getInt("ray.raylet.config.num_workers_per_process_java");
gcsServiceEnabled = System.getenv("RAY_GCS_SERVICE_ENABLED") == null ||
System.getenv("RAY_GCS_SERVICE_ENABLED").toLowerCase().equals("true");
// Validate config.
validate();
LOGGER.debug("Created config: {}", this);

View file

@ -226,8 +226,7 @@ public class RunManager {
}
// start gcs server
if (System.getenv("RAY_GCS_SERVICE_ENABLED") == null ||
System.getenv("RAY_GCS_SERVICE_ENABLED") == "true") {
if (rayConfig.gcsServiceEnabled) {
String redisPasswordOption = "";
if (!Strings.isNullOrEmpty(rayConfig.headRedisPassword)) {
redisPasswordOption = rayConfig.headRedisPassword;

View file

@ -626,7 +626,7 @@ class Node:
# If this is the head node, start the relevant head node processes.
self.start_redis()
if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True):
if ray_constants.GCS_SERVICE_ENABLED:
self.start_gcs_server()
else:
self.start_raylet_monitor()

View file

@ -13,6 +13,12 @@ def env_integer(key, default):
return default
def env_bool(key, default):
if key in os.environ:
return True if os.environ[key].lower() == "true" else False
return default
ID_SIZE = 20
# The default maximum number of bytes to allocate to the object store unless
@ -197,4 +203,4 @@ MACH_PAGE_SIZE_BYTES = 4096
# RAY_GCS_SERVICE_ENABLED only set in ci job.
# TODO(ffbin): Once we entirely migrate to service-based GCS, we should
# remove it.
RAY_GCS_SERVICE_ENABLED = "RAY_GCS_SERVICE_ENABLED"
GCS_SERVICE_ENABLED = env_bool("RAY_GCS_SERVICE_ENABLED", True)

View file

@ -82,7 +82,7 @@ def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_plasma_store()
ray.worker._global_node.kill_log_monitor()
ray.worker._global_node.kill_monitor()
if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True):
if ray_constants.GCS_SERVICE_ENABLED:
ray.worker._global_node.kill_gcs_server()
else:
ray.worker._global_node.kill_raylet_monitor()
@ -96,7 +96,7 @@ def test_driver_lives_sequential(ray_start_regular):
def test_driver_lives_parallel(ray_start_regular):
all_processes = ray.worker._global_node.all_processes
if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True):
if ray_constants.GCS_SERVICE_ENABLED:
process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE]
+ all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER]
+ all_processes[ray_constants.PROCESS_TYPE_RAYLET] +

View file

@ -132,7 +132,7 @@ def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_plasma_store()
ray.worker._global_node.kill_log_monitor()
ray.worker._global_node.kill_monitor()
if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True):
if ray_constants.GCS_SERVICE_ENABLED:
ray.worker._global_node.kill_gcs_server()
else:
ray.worker._global_node.kill_raylet_monitor()
@ -145,7 +145,7 @@ def test_driver_lives_sequential(ray_start_regular):
reason="Hanging with new GCS API.")
def test_driver_lives_parallel(ray_start_regular):
all_processes = ray.worker._global_node.all_processes
if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True):
if ray_constants.GCS_SERVICE_ENABLED:
process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE]
+ all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER]
+ all_processes[ray_constants.PROCESS_TYPE_RAYLET] +

View file

@ -145,7 +145,7 @@ def test_raylet_tempfiles(shutdown_only):
"raylet.err"
}
if os.environ.get(ray_constants.RAY_GCS_SERVICE_ENABLED, True):
if ray_constants.GCS_SERVICE_ENABLED:
log_files_expected.update({"gcs_server.out", "gcs_server.err"})
else:
log_files_expected.update({"raylet_monitor.out", "raylet_monitor.err"})

View file

@ -43,10 +43,4 @@ constexpr char kWorkerDynamicOptionPlaceholderPrefix[] =
constexpr char kWorkerRayletConfigPlaceholder[] = "RAY_WORKER_RAYLET_CONFIG_PLACEHOLDER";
/// RAY_GCS_SERVICE_ENABLED is an env variable which only set in ci job.
/// If the value of RAY_GCS_SERVICE_ENABLED is false, we will disable gcs service,
/// otherwise gcs service is enabled.
/// TODO(ffbin): Once we entirely migrate to service-based GCS, we should remove it.
constexpr char kRayGcsServiceEnabled[] = "RAY_GCS_SERVICE_ENABLED";
#endif // RAY_CONSTANTS_H_

View file

@ -273,3 +273,12 @@ RAY_CONFIG(uint32_t, object_store_full_initial_delay_ms, 1000)
/// Duration to wait between retries for failed tasks.
RAY_CONFIG(uint32_t, task_retry_delay_ms, 5000)
/// Whether to enable gcs service.
/// RAY_GCS_SERVICE_ENABLED is an env variable which only set in ci job.
/// If the value of RAY_GCS_SERVICE_ENABLED is false, we will disable gcs service,
/// otherwise gcs service is enabled.
/// TODO(ffbin): Once we entirely migrate to service-based GCS, we should remove it.
RAY_CONFIG(bool, gcs_service_enabled,
getenv("RAY_GCS_SERVICE_ENABLED") == nullptr ||
getenv("RAY_GCS_SERVICE_ENABLED") == std::string("true"))

View file

@ -115,8 +115,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
RayLog::InstallFailureSignalHandler();
}
// Initialize gcs client.
if (getenv(kRayGcsServiceEnabled) == nullptr ||
strcmp(getenv(kRayGcsServiceEnabled), "true") == 0) {
if (RayConfig::instance().gcs_service_enabled()) {
gcs_client_ = std::make_shared<ray::gcs::ServiceBasedGcsClient>(gcs_options);
} else {
gcs_client_ = std::make_shared<ray::gcs::RedisGcsClient>(gcs_options);

View file

@ -109,8 +109,7 @@ class CoreWorkerTest : public ::testing::Test {
}
// start gcs server
if (getenv(kRayGcsServiceEnabled) == nullptr ||
strcmp(getenv(kRayGcsServiceEnabled), "true") == 0) {
if (RayConfig::instance().gcs_service_enabled()) {
gcs_server_pid_ = StartGcsServer("127.0.0.1");
} else {
// core worker test relies on node resources. It's important that one raylet can

View file

@ -177,8 +177,7 @@ int main(int argc, char *argv[]) {
ray::gcs::GcsClientOptions client_options(redis_address, redis_port, redis_password);
std::shared_ptr<ray::gcs::GcsClient> gcs_client;
if (getenv(kRayGcsServiceEnabled) == nullptr ||
strcmp(getenv(kRayGcsServiceEnabled), "true") == 0) {
if (RayConfig::instance().gcs_service_enabled()) {
gcs_client = std::make_shared<ray::gcs::ServiceBasedGcsClient>(client_options);
} else {
gcs_client = std::make_shared<ray::gcs::RedisGcsClient>(client_options);