From 80657e5dfe3b5b449c16fe3238b91c57ca7eb9f3 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Wed, 24 Feb 2021 19:44:31 -0800 Subject: [PATCH] Revert "[Core]Pull off timers out of heartbeat in raylet (#13963)" (#14319) --- python/ray/tests/test_gcs_fault_tolerance.py | 1 - src/ray/common/ray_config_def.h | 18 ++--- src/ray/core_worker/core_worker.cc | 40 +++++++--- src/ray/core_worker/core_worker.h | 12 +-- src/ray/core_worker/profiling.cc | 14 +++- src/ray/core_worker/profiling.h | 7 +- .../gcs_client/service_based_gcs_client.cc | 21 +++-- .../gcs/gcs_client/service_based_gcs_client.h | 5 +- .../gcs/gcs_server/gcs_heartbeat_manager.cc | 27 ++++++- .../gcs/gcs_server/gcs_heartbeat_manager.h | 14 +++- .../gcs_server/gcs_redis_failure_detector.cc | 25 +++++- .../gcs_server/gcs_redis_failure_detector.h | 11 ++- .../gcs/gcs_server/gcs_resource_manager.cc | 20 ++++- src/ray/gcs/gcs_server/gcs_resource_manager.h | 5 +- src/ray/raylet/local_object_manager.cc | 7 ++ src/ray/raylet/local_object_manager.h | 9 ++- src/ray/raylet/main.cc | 4 + src/ray/raylet/node_manager.cc | 78 ++++++++++++------- src/ray/raylet/node_manager.h | 27 +++++-- src/ray/raylet/worker_pool.cc | 22 ++++-- src/ray/raylet/worker_pool.h | 8 +- src/ray/util/periodical_runner.cc | 55 ------------- src/ray/util/periodical_runner.h | 44 ----------- 23 files changed, 271 insertions(+), 203 deletions(-) delete mode 100644 src/ray/util/periodical_runner.cc delete mode 100644 src/ray/util/periodical_runner.h diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 12946b998..642cf4dd3 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -48,7 +48,6 @@ def test_gcs_server_restart(ray_start_regular): assert result == 9 -@pytest.mark.skipif(sys.platform == "win32", reason="Fails on Windows.") @pytest.mark.parametrize( "ray_start_regular", [ generate_system_config_map( diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 62f22a76c..8d1fb5bc9 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -35,7 +35,7 @@ RAY_CONFIG(int64_t, ray_cookie, 0x5241590000000000) RAY_CONFIG(int64_t, handler_warning_timeout_ms, 1000) /// The duration between heartbeats sent by the raylets. -RAY_CONFIG(uint64_t, raylet_heartbeat_period_milliseconds, 100) +RAY_CONFIG(int64_t, raylet_heartbeat_period_milliseconds, 100) /// If a component has not sent a heartbeat in the last num_heartbeats_timeout /// heartbeat intervals, the raylet monitor process will report /// it as dead to the db_client table. @@ -46,10 +46,10 @@ RAY_CONFIG(int64_t, num_heartbeats_timeout, 300) RAY_CONFIG(uint64_t, num_heartbeats_warning, 5) /// The duration between reporting resources sent by the raylets. -RAY_CONFIG(uint64_t, raylet_report_resources_period_milliseconds, 100) +RAY_CONFIG(int64_t, raylet_report_resources_period_milliseconds, 100) -/// The duration between dumping debug info to logs, or 0 to disable. -RAY_CONFIG(uint64_t, debug_dump_period_milliseconds, 10000) +/// The duration between dumping debug info to logs, or -1 to disable. +RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000) /// Whether to enable fair queueing between task classes in raylet. When /// fair queueing is enabled, the raylet will try to balance the number @@ -136,7 +136,7 @@ RAY_CONFIG(int64_t, worker_lease_timeout_milliseconds, 500) /// The interval at which the workers will check if their raylet has gone down. /// When this happens, they will kill themselves. -RAY_CONFIG(uint64_t, raylet_death_check_interval_milliseconds, 1000) +RAY_CONFIG(int64_t, raylet_death_check_interval_milliseconds, 1000) /// These are used by the worker to set timeouts and to batch requests when /// getting objects. @@ -215,7 +215,7 @@ RAY_CONFIG(int64_t, gcs_service_connect_retries, 50) RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100) /// The interval at which the gcs server will check if redis has gone down. /// When this happens, gcs server will kill itself. -RAY_CONFIG(uint64_t, gcs_redis_heartbeat_interval_milliseconds, 100) +RAY_CONFIG(int64_t, gcs_redis_heartbeat_interval_milliseconds, 100) /// Duration to wait between retries for leasing worker in gcs server. RAY_CONFIG(uint32_t, gcs_lease_worker_retry_interval_ms, 200) /// Duration to wait between retries for creating actor in gcs server. @@ -265,7 +265,7 @@ RAY_CONFIG(bool, release_resources_during_plasma_fetch, false) /// The interval at which the gcs client will check if the address of gcs service has /// changed. When the address changed, we will resubscribe again. -RAY_CONFIG(uint64_t, gcs_service_address_check_interval_milliseconds, 1000) +RAY_CONFIG(int64_t, gcs_service_address_check_interval_milliseconds, 1000) /// The batch size for metrics export. RAY_CONFIG(int64_t, metrics_report_batch_size, 100) @@ -303,9 +303,9 @@ RAY_CONFIG(bool, enable_worker_prestart, getenv("RAY_ENABLE_WORKER_PRESTART") == nullptr || getenv("RAY_ENABLE_WORKER_PRESTART") == std::string("1")) -/// The interval of periodic idle worker killing. Value of 0 means worker capping is +/// The interval of periodic idle worker killing. A negative value means worker capping is /// disabled. -RAY_CONFIG(uint64_t, kill_idle_workers_interval_ms, 200) +RAY_CONFIG(int64_t, kill_idle_workers_interval_ms, 200) /// The idle time threshold for an idle worker to be killed. RAY_CONFIG(int64_t, idle_worker_killing_time_threshold_ms, 1000) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 934d08de4..37bb12be7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -28,7 +28,7 @@ namespace { // Duration between internal book-keeping heartbeats. -const uint64_t kInternalHeartbeatMillis = 1000; +const int kInternalHeartbeatMillis = 1000; void BuildCommonTaskSpec( ray::TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, @@ -301,7 +301,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ worker_context_(options_.worker_type, worker_id, GetProcessJobID(options_)), io_work_(io_service_), client_call_manager_(new rpc::ClientCallManager(io_service_)), - periodical_runner_(io_service_), + death_check_timer_(io_service_), + internal_timer_(io_service_), task_queue_length_(0), num_executed_tasks_(0), task_execution_service_work_(task_execution_service_), @@ -406,13 +407,15 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ }); if (options_.worker_type == ray::WorkerType::WORKER) { - periodical_runner_.RunFnPeriodically( - [this] { CheckForRayletFailure(); }, - RayConfig::instance().raylet_death_check_interval_milliseconds()); + death_check_timer_.expires_from_now(boost::asio::chrono::milliseconds( + RayConfig::instance().raylet_death_check_interval_milliseconds())); + death_check_timer_.async_wait( + boost::bind(&CoreWorker::CheckForRayletFailure, this, _1)); } - periodical_runner_.RunFnPeriodically([this] { InternalHeartbeat(); }, - kInternalHeartbeatMillis); + internal_timer_.expires_from_now( + boost::asio::chrono::milliseconds(kInternalHeartbeatMillis)); + internal_timer_.async_wait(boost::bind(&CoreWorker::InternalHeartbeat, this, _1)); plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider( options_.store_socket, local_raylet_client_, reference_counter_, @@ -788,14 +791,30 @@ void CoreWorker::RegisterToGcs() { RAY_CHECK_OK(gcs_client_->Workers().AsyncAdd(worker_data, nullptr)); } -void CoreWorker::CheckForRayletFailure() { +void CoreWorker::CheckForRayletFailure(const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + if (!IsParentProcessAlive()) { RAY_LOG(ERROR) << "Raylet failed. Shutting down."; Shutdown(); } + + // Reset the timer from the previous expiration time to avoid drift. + death_check_timer_.expires_at( + death_check_timer_.expiry() + + boost::asio::chrono::milliseconds( + RayConfig::instance().raylet_death_check_interval_milliseconds())); + death_check_timer_.async_wait( + boost::bind(&CoreWorker::CheckForRayletFailure, this, _1)); } -void CoreWorker::InternalHeartbeat() { +void CoreWorker::InternalHeartbeat(const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + absl::MutexLock lock(&mutex_); while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.front().first) { @@ -807,6 +826,9 @@ void CoreWorker::InternalHeartbeat() { } to_resubmit_.pop_front(); } + internal_timer_.expires_at(internal_timer_.expiry() + + boost::asio::chrono::milliseconds(kInternalHeartbeatMillis)); + internal_timer_.async_wait(boost::bind(&CoreWorker::InternalHeartbeat, this, _1)); } std::unordered_map> diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 33213438e..1492caf85 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -36,7 +36,6 @@ #include "ray/rpc/node_manager/node_manager_client.h" #include "ray/rpc/worker/core_worker_client.h" #include "ray/rpc/worker/core_worker_server.h" -#include "ray/util/periodical_runner.h" /// The set of gRPC handlers and their associated level of concurrency. If you want to /// add a new call to the worker gRPC server, do the following: @@ -977,10 +976,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void RegisterToGcs(); /// Check if the raylet has failed. If so, shutdown. - void CheckForRayletFailure(); + void CheckForRayletFailure(const boost::system::error_code &error); /// Heartbeat for internal bookkeeping. - void InternalHeartbeat(); + void InternalHeartbeat(const boost::system::error_code &error); /// /// Private methods related to task submission. @@ -1136,8 +1135,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Shared core worker client pool. std::shared_ptr core_worker_client_pool_; - /// The runner to run function periodically. - PeriodicalRunner periodical_runner_; + /// Timer used to periodically check if the raylet has died. + boost::asio::steady_timer death_check_timer_; + + /// Timer for internal book-keeping. + boost::asio::steady_timer internal_timer_; /// RPC server used to receive tasks to execute. std::unique_ptr core_worker_server_; diff --git a/src/ray/core_worker/profiling.cc b/src/ray/core_worker/profiling.cc index 711ab2379..b56155407 100644 --- a/src/ray/core_worker/profiling.cc +++ b/src/ray/core_worker/profiling.cc @@ -31,13 +31,13 @@ Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_add boost::asio::io_service &io_service, const std::shared_ptr &gcs_client) : io_service_(io_service), - periodical_runner_(io_service_), + timer_(io_service_, boost::asio::chrono::seconds(1)), rpc_profile_data_(new rpc::ProfileTableData()), gcs_client_(gcs_client) { rpc_profile_data_->set_component_type(WorkerTypeString(worker_context.GetWorkerType())); rpc_profile_data_->set_component_id(worker_context.GetWorkerID().Binary()); rpc_profile_data_->set_node_ip_address(node_ip_address); - periodical_runner_.RunFnPeriodically([this] { FlushEvents(); }, 1000); + timer_.async_wait(boost::bind(&Profiler::FlushEvents, this, _1)); } void Profiler::AddEvent(const rpc::ProfileTableData::ProfileEvent &event) { @@ -45,7 +45,11 @@ void Profiler::AddEvent(const rpc::ProfileTableData::ProfileEvent &event) { rpc_profile_data_->add_profile_events()->CopyFrom(event); } -void Profiler::FlushEvents() { +void Profiler::FlushEvents(const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + auto cur_profile_data = std::make_shared(); { absl::MutexLock lock(&mutex_); @@ -67,6 +71,10 @@ void Profiler::FlushEvents() { << " events to GCS."; } } + + // Reset the timer to 1 second from the previous expiration time to avoid drift. + timer_.expires_at(timer_.expiry() + boost::asio::chrono::seconds(1)); + timer_.async_wait(boost::bind(&Profiler::FlushEvents, this, _1)); } } // namespace worker diff --git a/src/ray/core_worker/profiling.h b/src/ray/core_worker/profiling.h index ee37e2028..1b111858a 100644 --- a/src/ray/core_worker/profiling.h +++ b/src/ray/core_worker/profiling.h @@ -19,7 +19,6 @@ #include "absl/time/clock.h" #include "ray/core_worker/context.h" #include "ray/gcs/gcs_client.h" -#include "ray/util/periodical_runner.h" namespace ray { @@ -36,7 +35,7 @@ class Profiler { private: // Flush all of the events that have been added since last flush to the GCS. - void FlushEvents() LOCKS_EXCLUDED(mutex_); + void FlushEvents(const boost::system::error_code &error) LOCKS_EXCLUDED(mutex_); // Mutex guarding rpc_profile_data_. absl::Mutex mutex_; @@ -44,8 +43,8 @@ class Profiler { // ASIO IO service event loop. Must be started by the caller. boost::asio::io_service &io_service_; - /// The runner to run function periodically. - PeriodicalRunner periodical_runner_; + // Timer used to periodically flush events to the GCS. + boost::asio::steady_timer timer_; // RPC message containing profiling data. Holds the queue of profile events // until they are flushed. diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.cc b/src/ray/gcs/gcs_client/service_based_gcs_client.cc index 9981fc57f..d59481e0b 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.cc +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.cc @@ -84,10 +84,8 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) { placement_group_accessor_.reset(new ServiceBasedPlacementGroupInfoAccessor(this)); // Init gcs service address check timer. - periodical_runner_.reset(new PeriodicalRunner(io_service)); - periodical_runner_->RunFnPeriodically( - [this] { PeriodicallyCheckGcsServerAddress(); }, - RayConfig::instance().gcs_service_address_check_interval_milliseconds()); + detect_timer_.reset(new boost::asio::deadline_timer(io_service)); + PeriodicallyCheckGcsServerAddress(); is_connected_ = true; @@ -98,7 +96,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) { void ServiceBasedGcsClient::Disconnect() { RAY_CHECK(is_connected_); is_connected_ = false; - periodical_runner_.reset(); + detect_timer_->cancel(); gcs_pub_sub_.reset(); redis_client_->Disconnect(); redis_client_.reset(); @@ -154,6 +152,19 @@ void ServiceBasedGcsClient::PeriodicallyCheckGcsServerAddress() { GcsServiceFailureDetected(rpc::GcsServiceFailureType::GCS_SERVER_RESTART); } } + + auto check_period = boost::posix_time::milliseconds( + RayConfig::instance().gcs_service_address_check_interval_milliseconds()); + detect_timer_->expires_from_now(check_period); + detect_timer_->async_wait([this](const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + // `operation_aborted` is set when `detect_timer_` is canceled or destroyed. + return; + } + RAY_CHECK(!error) << "Checking gcs server address failed with error: " + << error.message(); + PeriodicallyCheckGcsServerAddress(); + }); } void ServiceBasedGcsClient::GcsServiceFailureDetected(rpc::GcsServiceFailureType type) { diff --git a/src/ray/gcs/gcs_client/service_based_gcs_client.h b/src/ray/gcs/gcs_client/service_based_gcs_client.h index 7e2512ed4..9b0e79806 100644 --- a/src/ray/gcs/gcs_client/service_based_gcs_client.h +++ b/src/ray/gcs/gcs_client/service_based_gcs_client.h @@ -18,7 +18,6 @@ #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/gcs/redis_client.h" #include "ray/rpc/gcs_server/gcs_rpc_client.h" -#include "ray/util/periodical_runner.h" namespace ray { namespace gcs { @@ -67,8 +66,8 @@ class RAY_EXPORT ServiceBasedGcsClient : public GcsClient { std::unique_ptr gcs_rpc_client_; std::unique_ptr client_call_manager_; - // The runner to run function periodically. - std::unique_ptr periodical_runner_; + // A timer used to check if gcs server address changed. + std::unique_ptr detect_timer_; std::function *)> get_server_address_func_; std::function resubscribe_func_; std::pair current_gcs_server_address_; diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc index de5ce6d47..5991c20a8 100644 --- a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc @@ -26,7 +26,7 @@ GcsHeartbeatManager::GcsHeartbeatManager( : io_service_(io_service), on_node_death_callback_(std::move(on_node_death_callback)), num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), - periodical_runner_(io_service) { + detect_timer_(io_service) { io_service_thread_.reset(new std::thread([this] { SetThreadName("heartbeat"); /// The asio work to keep io_service_ alive. @@ -46,9 +46,7 @@ void GcsHeartbeatManager::Initialize(const GcsInitData &gcs_init_data) { void GcsHeartbeatManager::Start() { io_service_.post([this] { if (!is_started_) { - periodical_runner_.RunFnPeriodically( - [this] { DetectDeadNodes(); }, - RayConfig::instance().raylet_heartbeat_period_milliseconds()); + Tick(); is_started_ = true; } }); @@ -82,6 +80,12 @@ void GcsHeartbeatManager::HandleReportHeartbeat( GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); } +/// A periodic timer that checks for timed out clients. +void GcsHeartbeatManager::Tick() { + DetectDeadNodes(); + ScheduleTick(); +} + void GcsHeartbeatManager::DetectDeadNodes() { for (auto it = heartbeats_.begin(); it != heartbeats_.end();) { auto current = it++; @@ -97,5 +101,20 @@ void GcsHeartbeatManager::DetectDeadNodes() { } } +void GcsHeartbeatManager::ScheduleTick() { + auto heartbeat_period = boost::posix_time::milliseconds( + RayConfig::instance().raylet_heartbeat_period_milliseconds()); + detect_timer_.expires_from_now(heartbeat_period); + detect_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + // `operation_aborted` is set when `detect_timer_` is canceled or destroyed. + // The Monitor lifetime may be short than the object who use it. (e.g. gcs_server) + return; + } + RAY_CHECK(!error) << "Checking heartbeat failed with error: " << error.message(); + Tick(); + }); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h index 45068d381..1febd3ee9 100644 --- a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.h @@ -21,9 +21,7 @@ #include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/rpc/client_call.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" -#include "ray/util/periodical_runner.h" #include "src/ray/protobuf/gcs.pb.h" - namespace ray { namespace gcs { @@ -64,10 +62,18 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler { void AddNode(const NodeID &node_id); protected: + /// A periodic timer that fires on every heartbeat period. Raylets that have + /// not sent a heartbeat within the last num_heartbeats_timeout ticks will be + /// marked as dead in the client table. + void Tick(); + /// Check that if any raylet is inactive due to no heartbeat for a period of time. /// If found any, mark it as dead. void DetectDeadNodes(); + /// Schedule another tick after a short time. + void ScheduleTick(); + private: /// The main event loop for node failure detector. boost::asio::io_service &io_service_; @@ -76,8 +82,8 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler { std::function on_node_death_callback_; /// The number of heartbeats that can be missed before a node is removed. int64_t num_heartbeats_timeout_; - /// The runner to run function periodically. - PeriodicalRunner periodical_runner_; + /// A timer that ticks every heartbeat_timeout_ms_ milliseconds. + boost::asio::deadline_timer detect_timer_; /// For each Raylet that we receive a heartbeat from, the number of ticks /// that may pass before the Raylet will be declared dead. absl::flat_hash_map heartbeats_; diff --git a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc index 8d5552c40..25f8b60ff 100644 --- a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc +++ b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc @@ -23,14 +23,12 @@ GcsRedisFailureDetector::GcsRedisFailureDetector( boost::asio::io_service &io_service, std::shared_ptr redis_context, std::function callback) : redis_context_(redis_context), - periodical_runner_(io_service), + detect_timer_(io_service), callback_(std::move(callback)) {} void GcsRedisFailureDetector::Start() { RAY_LOG(INFO) << "Starting redis failure detector."; - periodical_runner_.RunFnPeriodically( - [this] { DetectRedis(); }, - RayConfig::instance().gcs_redis_heartbeat_interval_milliseconds()); + Tick(); } void GcsRedisFailureDetector::DetectRedis() { @@ -49,5 +47,24 @@ void GcsRedisFailureDetector::DetectRedis() { } } +/// A periodic timer that checks for timed out clients. +void GcsRedisFailureDetector::Tick() { + DetectRedis(); + ScheduleTick(); +} + +void GcsRedisFailureDetector::ScheduleTick() { + auto detect_period = boost::posix_time::milliseconds( + RayConfig::instance().gcs_redis_heartbeat_interval_milliseconds()); + detect_timer_.expires_from_now(detect_period); + detect_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + RAY_CHECK(!error) << "Detecting redis failed with error: " << error.message(); + Tick(); + }); +} + } // namespace gcs } // namespace ray \ No newline at end of file diff --git a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.h b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.h index 19753d9b0..78425b183 100644 --- a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.h +++ b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.h @@ -17,7 +17,6 @@ #include #include "ray/gcs/redis_context.h" -#include "ray/util/periodical_runner.h" namespace ray { @@ -45,6 +44,12 @@ class GcsRedisFailureDetector { void Start(); protected: + /// A periodic timer that fires on every gcs detect period. + void Tick(); + + /// Schedule another tick after a short time. + void ScheduleTick(); + /// Check that if redis is inactive. void DetectRedis(); @@ -53,8 +58,8 @@ class GcsRedisFailureDetector { /// TODO(ffbin): We will use redis client later. std::shared_ptr redis_context_; - /// The runner to run function periodically. - PeriodicalRunner periodical_runner_; + /// A timer that ticks every gcs_detect_timeout_milliseconds. + boost::asio::deadline_timer detect_timer_; /// A function is called when redis is detected to be unavailable. std::function callback_; diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index d4fa55439..15662e5b8 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -22,12 +22,10 @@ namespace gcs { GcsResourceManager::GcsResourceManager( boost::asio::io_service &main_io_service, std::shared_ptr gcs_pub_sub, std::shared_ptr gcs_table_storage) - : periodical_runner_(main_io_service), + : resource_timer_(main_io_service), gcs_pub_sub_(gcs_pub_sub), gcs_table_storage_(gcs_table_storage) { - periodical_runner_.RunFnPeriodically( - [this] { SendBatchedResourceUsage(); }, - RayConfig::instance().raylet_report_resources_period_milliseconds()); + SendBatchedResourceUsage(); } void GcsResourceManager::HandleGetResources(const rpc::GetResourcesRequest &request, @@ -366,6 +364,20 @@ void GcsResourceManager::SendBatchedResourceUsage() { batch->SerializeAsString(), nullptr)); resources_buffer_.clear(); } + + auto resources_period = boost::posix_time::milliseconds( + RayConfig::instance().raylet_report_resources_period_milliseconds()); + resource_timer_.expires_from_now(resources_period); + resource_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + // `operation_aborted` is set when `resource_timer_` is canceled or destroyed. + // The Monitor lifetime may be short than the object who use it. (e.g. gcs_server) + return; + } + RAY_CHECK(!error) << "Sending batched resource usage failed with error: " + << error.message(); + SendBatchedResourceUsage(); + }); } void GcsResourceManager::UpdatePlacementGroupLoad( diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index 10b0cb25f..792a8b3e4 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -24,7 +24,6 @@ #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/rpc/client_call.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" -#include "ray/util/periodical_runner.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -156,8 +155,8 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { /// Send any buffered resource usage as a single publish. void SendBatchedResourceUsage(); - /// The runner to run function periodically. - PeriodicalRunner periodical_runner_; + /// A timer that ticks every raylet_report_resources_period_milliseconds. + boost::asio::deadline_timer resource_timer_; /// Newest resource usage of all nodes. absl::flat_hash_map node_resource_usages_; /// A buffer containing resource usage received from node managers in the last tick. diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index de0e1524e..4bd73fb96 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -97,6 +97,13 @@ void LocalObjectManager::FlushFreeObjects() { last_free_objects_at_ms_ = current_time_ms(); } +void LocalObjectManager::FlushFreeObjectsIfNeeded(int64_t now_ms) { + if (free_objects_period_ms_ > 0 && + static_cast(now_ms - last_free_objects_at_ms_) > free_objects_period_ms_) { + FlushFreeObjects(); + } +} + void LocalObjectManager::SpillObjectUptoMaxThroughput() { if (RayConfig::instance().object_spilling_config().empty() || !RayConfig::instance().automatic_object_spilling_enabled()) { diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index a17a17f4c..8599c10a1 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -113,9 +113,8 @@ class LocalObjectManager { const NodeID &node_id, std::function callback); - /// Clear any freed objects. This will trigger the callback for freed - /// objects. - void FlushFreeObjects(); + /// Try to clear any objects that have been freed. + void FlushFreeObjectsIfNeeded(int64_t now_ms); /// Judge if objects are deletable from pending_delete_queue and delete them if /// necessary. @@ -168,6 +167,10 @@ class LocalObjectManager { /// Release an object that has been freed by its owner. void ReleaseFreedObject(const ObjectID &object_id); + /// Clear any freed objects. This will trigger the callback for freed + /// objects. + void FlushFreeObjects(); + // A callback for unpinning spilled objects. This should be invoked after the object // has been spilled and after the object directory has been sent the spilled URL. void UnpinSpilledObjectCallback(const ObjectID &object_id, diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index f6e892317..cc980a460 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -180,8 +180,12 @@ int main(int argc, char *argv[]) { RAY_LOG(DEBUG) << "Agent command is empty."; } + node_manager_config.heartbeat_period_ms = + RayConfig::instance().raylet_heartbeat_period_milliseconds(); node_manager_config.report_resources_period_ms = RayConfig::instance().raylet_report_resources_period_milliseconds(); + node_manager_config.debug_dump_period_ms = + RayConfig::instance().debug_dump_period_milliseconds(); node_manager_config.record_metrics_period_ms = RayConfig::instance().metrics_report_interval_ms() / 2; node_manager_config.fair_queueing_enabled = diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a7397601d..606493eeb 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -28,7 +28,6 @@ #include "ray/stats/stats.h" #include "ray/util/asio_util.h" #include "ray/util/sample.h" -#include "ray/util/util.h" namespace { @@ -124,10 +123,15 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self object_manager_(object_manager), gcs_client_(gcs_client), object_directory_(object_directory), - periodical_runner_(io_service), - report_resources_period_ms_(config.report_resources_period_ms), + heartbeat_timer_(io_service), + heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)), + report_resources_timer_(io_service), + report_resources_period_( + std::chrono::milliseconds(config.report_resources_period_ms)), + debug_dump_period_(config.debug_dump_period_ms), fair_queueing_enabled_(config.fair_queueing_enabled), temp_dir_(config.temp_dir), + object_manager_profile_timer_(io_service), initial_config_(config), local_available_resources_(config.resource_config), worker_pool_(io_service, config.num_workers_soft_limit, @@ -179,9 +183,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self last_local_gc_ns_(absl::GetCurrentTimeNanos()), local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), local_gc_min_interval_ns_(RayConfig::instance().local_gc_min_interval_s() * 1e9), - record_metrics_period_ms_(config.record_metrics_period_ms) { + record_metrics_period_(config.record_metrics_period_ms) { RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; - RAY_CHECK(RayConfig::instance().raylet_heartbeat_period_milliseconds() > 0); + RAY_CHECK(heartbeat_period_.count() > 0); // Initialize the resource map with own cluster resource configuration. cluster_resource_map_.emplace(self_node_id_, SchedulingResources(config.resource_config)); @@ -327,29 +331,12 @@ ray::Status NodeManager::RegisterGcs() { // Start sending heartbeats to the GCS. last_heartbeat_at_ms_ = current_time_ms(); - periodical_runner_.RunFnPeriodically( - [this] { Heartbeat(); }, - RayConfig::instance().raylet_heartbeat_period_milliseconds()); - periodical_runner_.RunFnPeriodically( - [this] { - DumpDebugState(); - WarnResourceDeadlock(); - }, - RayConfig::instance().debug_dump_period_milliseconds()); - periodical_runner_.RunFnPeriodically([this] { RecordMetrics(); }, - record_metrics_period_ms_); - if (RayConfig::instance().free_objects_period_milliseconds() > 0) { - periodical_runner_.RunFnPeriodically( - [this] { local_object_manager_.FlushFreeObjects(); }, - RayConfig::instance().free_objects_period_milliseconds()); - } - periodical_runner_.RunFnPeriodically([this] { ReportResourceUsage(); }, - report_resources_period_ms_); + last_debug_dump_at_ms_ = current_time_ms(); + Heartbeat(); + ReportResourceUsage(); // Start the timer that gets object manager profiling information and sends it // to the GCS. - periodical_runner_.RunFnPeriodically( - [this] { GetObjectManagerProfileInfo(); }, - RayConfig::instance().raylet_heartbeat_period_milliseconds()); + GetObjectManagerProfileInfo(); return ray::Status::OK(); } @@ -438,6 +425,30 @@ void NodeManager::Heartbeat() { RAY_LOG(FATAL) << "This node has beem marked as dead."; } })); + + if (debug_dump_period_ > 0 && + static_cast(now_ms - last_debug_dump_at_ms_) > debug_dump_period_) { + DumpDebugState(); + WarnResourceDeadlock(); + last_debug_dump_at_ms_ = now_ms; + } + + if (record_metrics_period_ > 0 && + static_cast(now_ms - metrics_last_recorded_time_ms_) > + record_metrics_period_) { + RecordMetrics(); + metrics_last_recorded_time_ms_ = now_ms; + } + + // Evict all copies of freed objects from the cluster. + local_object_manager_.FlushFreeObjectsIfNeeded(now_ms); + + // Reset the timer. + heartbeat_timer_.expires_from_now(heartbeat_period_); + heartbeat_timer_.async_wait([this](const boost::system::error_code &error) { + RAY_CHECK(!error); + Heartbeat(); + }); } void NodeManager::ReportResourceUsage() { @@ -473,6 +484,13 @@ void NodeManager::ReportResourceUsage() { RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(resources_data, /*done*/ nullptr)); } + + // Reset the timer. + report_resources_timer_.expires_from_now(report_resources_period_); + report_resources_timer_.async_wait([this](const boost::system::error_code &error) { + RAY_CHECK(!error); + ReportResourceUsage(); + }); } void NodeManager::DoLocalGC() { @@ -648,6 +666,14 @@ void NodeManager::GetObjectManagerProfileInfo() { RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(profile_info, nullptr)); } + // Reset the timer. + object_manager_profile_timer_.expires_from_now(heartbeat_period_); + object_manager_profile_timer_.async_wait( + [this](const boost::system::error_code &error) { + RAY_CHECK(!error); + GetObjectManagerProfileInfo(); + }); + int64_t interval = current_time_ms() - start_time_ms; if (interval > RayConfig::instance().handler_warning_timeout_ms()) { RAY_LOG(WARNING) << "GetObjectManagerProfileInfo handler took " << interval << " ms."; diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 3f2476fa2..c3d066786 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -14,6 +14,8 @@ #pragma once +#include + // clang-format off #include "ray/rpc/grpc_client.h" #include "ray/rpc/node_manager/node_manager_server.h" @@ -83,8 +85,12 @@ struct NodeManagerConfig { WorkerCommandMap worker_commands; /// The command used to start agent. std::string agent_command; + /// The time between heartbeats in milliseconds. + uint64_t heartbeat_period_ms; /// The time between reports resources in milliseconds. uint64_t report_resources_period_ms; + /// The time between debug dumps in milliseconds, or -1 to disable. + uint64_t debug_dump_period_ms; /// Whether to enable fair queueing between task classes in raylet. bool fair_queueing_enabled; /// Whether to enable automatic object deletion for object spilling. @@ -97,7 +103,7 @@ struct NodeManagerConfig { std::string session_dir; /// The raylet config list of this node. std::string raylet_config; - // The time between record metrics in milliseconds, or 0 to disable. + // The time between record metrics in milliseconds, or -1 to disable. uint64_t record_metrics_period_ms; // The number if max io workers. int max_io_workers; @@ -781,10 +787,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler, std::shared_ptr gcs_client_; /// The object table. This is shared with the object manager. std::shared_ptr object_directory_; - /// The runner to run function periodically. - PeriodicalRunner periodical_runner_; + /// The timer used to send heartbeats. + boost::asio::steady_timer heartbeat_timer_; + /// The period used for the heartbeat timer. + std::chrono::milliseconds heartbeat_period_; + /// The timer used to report resources. + boost::asio::steady_timer report_resources_timer_; /// The period used for the resources report timer. - uint64_t report_resources_period_ms_; + std::chrono::milliseconds report_resources_period_; + /// The period between debug state dumps. + int64_t debug_dump_period_; /// Whether to enable fair queueing between task classes in raylet. bool fair_queueing_enabled_; /// Incremented each time we encounter a potential resource deadlock condition. @@ -794,9 +806,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler, bool recorded_metrics_ = false; /// The path to the ray temp dir. std::string temp_dir_; + /// The timer used to get profiling information from the object manager and + /// push it to the GCS. + boost::asio::steady_timer object_manager_profile_timer_; /// The time that the last heartbeat was sent at. Used to make sure we are /// keeping up with heartbeats. uint64_t last_heartbeat_at_ms_; + /// The time that the last debug string was logged to the console. + uint64_t last_debug_dump_at_ms_; /// The number of heartbeats that we should wait before sending the /// next load report. uint8_t num_heartbeats_before_load_report_; @@ -898,7 +915,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// Fields that are used to report metrics. /// The period between debug state dumps. - uint64_t record_metrics_period_ms_; + int64_t record_metrics_period_; /// Last time metrics are recorded. uint64_t metrics_last_recorded_time_ms_; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index bbe89ecfe..8000f37d6 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -70,7 +70,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers_soft first_job_driver_wait_num_python_workers_(std::min( num_initial_python_workers_for_first_job, maximum_startup_concurrency)), num_initial_python_workers_for_first_job_(num_initial_python_workers_for_first_job), - periodical_runner_(io_service) { + kill_idle_workers_timer_(io_service) { RAY_CHECK(maximum_startup_concurrency > 0); #ifndef _WIN32 // Ignore SIGCHLD signals. If we don't do this, then worker processes will @@ -102,11 +102,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers_soft free_ports_->push(port); } } - if (RayConfig::instance().kill_idle_workers_interval_ms() > 0) { - periodical_runner_.RunFnPeriodically( - [this] { TryKillingIdleWorkers(); }, - RayConfig::instance().kill_idle_workers_interval_ms()); - } + ScheduleIdleWorkerKilling(); } WorkerPool::~WorkerPool() { @@ -619,6 +615,20 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { } } +void WorkerPool::ScheduleIdleWorkerKilling() { + if (RayConfig::instance().kill_idle_workers_interval_ms() > 0) { + kill_idle_workers_timer_.expires_from_now(boost::posix_time::milliseconds( + RayConfig::instance().kill_idle_workers_interval_ms())); + kill_idle_workers_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + TryKillingIdleWorkers(); + ScheduleIdleWorkerKilling(); + }); + } +} + void WorkerPool::TryKillingIdleWorkers() { RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size()); diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 7a101d55d..aa459354a 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -28,7 +28,6 @@ #include "ray/common/task/task_common.h" #include "ray/gcs/gcs_client.h" #include "ray/raylet/worker.h" -#include "ray/util/periodical_runner.h" namespace ray { @@ -438,6 +437,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// reasonable size. void TryKillingIdleWorkers(); + /// Schedule the periodic killing of idle workers. + void ScheduleIdleWorkerKilling(); + /// Get all workers of the given process. /// /// \param process The process of workers. @@ -510,8 +512,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { std::unordered_map, int64_t> idle_of_all_languages_map_; - /// The runner to run function periodically. - PeriodicalRunner periodical_runner_; + /// The timer to trigger idle worker killing. + boost::asio::deadline_timer kill_idle_workers_timer_; }; } // namespace raylet diff --git a/src/ray/util/periodical_runner.cc b/src/ray/util/periodical_runner.cc deleted file mode 100644 index 9ffbfb5f4..000000000 --- a/src/ray/util/periodical_runner.cc +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/util/periodical_runner.h" - -#include "ray/util/logging.h" - -namespace ray { - -PeriodicalRunner::PeriodicalRunner(boost::asio::io_service &io_service) - : io_service_(io_service) {} - -PeriodicalRunner::~PeriodicalRunner() { - for (const auto &timer : timers_) { - timer->cancel(); - } - timers_.clear(); -} - -void PeriodicalRunner::RunFnPeriodically(std::function fn, uint64_t period_ms) { - if (period_ms > 0) { - auto timer = std::make_shared(io_service_); - timers_.push_back(timer); - DoRunFnPeriodically(fn, boost::posix_time::milliseconds(period_ms), *timer); - } -} - -void PeriodicalRunner::DoRunFnPeriodically(std::function fn, - boost::posix_time::milliseconds period, - boost::asio::deadline_timer &timer) { - fn(); - timer.expires_from_now(period); - timer.async_wait([this, fn, period, &timer](const boost::system::error_code &error) { - if (error == boost::asio::error::operation_aborted) { - // `operation_aborted` is set when `timer` is canceled or destroyed. - // The Monitor lifetime may be short than the object who use it. (e.g. gcs_server) - return; - } - RAY_CHECK(!error) << error.message(); - DoRunFnPeriodically(fn, period, timer); - }); -} - -} // namespace ray diff --git a/src/ray/util/periodical_runner.h b/src/ray/util/periodical_runner.h deleted file mode 100644 index cb85fd419..000000000 --- a/src/ray/util/periodical_runner.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include - -namespace ray { - -/// \class PeriodicalRunner -/// A periodical runner attached with an io_context. -/// It can run functions with specified period. Each function is triggered by its timer. -/// To run a function, call `RunFnPeriodically(fn, period_ms)`. -/// All registered functions will stop running once this object is destructed. -class PeriodicalRunner { - public: - PeriodicalRunner(boost::asio::io_service &io_service); - - ~PeriodicalRunner(); - - void RunFnPeriodically(std::function fn, uint64_t period_ms); - - private: - void DoRunFnPeriodically(std::function fn, - boost::posix_time::milliseconds period, - boost::asio::deadline_timer &timer); - - boost::asio::io_service &io_service_; - std::vector> timers_; -}; - -} // namespace ray \ No newline at end of file