diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index f6e892317..0e125c9c0 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -110,7 +110,7 @@ int main(int argc, char *argv[]) { gcs_client = std::make_shared(client_options); RAY_CHECK_OK(gcs_client->Connect(main_service)); - std::unique_ptr server(nullptr); + std::unique_ptr raylet(nullptr); RAY_CHECK_OK(gcs_client->Nodes().AsyncGetInternalConfig( [&](::ray::Status status, @@ -229,22 +229,22 @@ int main(int argc, char *argv[]) { ray::stats::Init(global_tags, metrics_agent_port); // Initialize the node manager. - server.reset(new ray::raylet::Raylet( + raylet.reset(new ray::raylet::Raylet( main_service, raylet_socket_name, node_ip_address, redis_address, redis_port, redis_password, node_manager_config, object_manager_config, gcs_client, metrics_export_port)); - server->Start(); + raylet->Start(); })); // Destroy the Raylet on a SIGTERM. The pointer to main_service is // guaranteed to be valid since this function will run the event loop // instead of returning immediately. // We should stop the service and remove the local socket file. - auto handler = [&main_service, &raylet_socket_name, &server, &gcs_client]( + auto handler = [&main_service, &raylet_socket_name, &raylet, &gcs_client]( const boost::system::error_code &error, int signal_number) { RAY_LOG(INFO) << "Raylet received SIGTERM, shutting down..."; - server->Stop(); + raylet->Stop(); gcs_client->Disconnect(); ray::stats::Shutdown(); main_service.stop(); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 32b5f8612..129e5315f 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -114,6 +114,57 @@ std::string WorkerOwnerString(std::shared_ptr &worker) { return buffer.str(); } +HeartbeatSender::HeartbeatSender(NodeID self_node_id, + std::shared_ptr gcs_client) + : self_node_id_(self_node_id), gcs_client_(gcs_client) { + // Init heartbeat thread and run its io service. + heartbeat_thread_.reset(new std::thread([this] { + SetThreadName("heartbeat"); + /// The asio work to keep io_service_ alive. + boost::asio::io_service::work io_service_work_(heartbeat_io_service_); + heartbeat_io_service_.run(); + })); + heartbeat_runner_.reset(new PeriodicalRunner(heartbeat_io_service_)); + + // Start sending heartbeats to the GCS. + last_heartbeat_at_ms_ = current_time_ms(); + heartbeat_runner_->RunFnPeriodically( + [this] { Heartbeat(); }, + RayConfig::instance().raylet_heartbeat_period_milliseconds()); +} + +HeartbeatSender::~HeartbeatSender() { + heartbeat_runner_.reset(); + heartbeat_io_service_.stop(); + if (heartbeat_thread_->joinable()) { + heartbeat_thread_->join(); + } + heartbeat_thread_.reset(); +} + +void HeartbeatSender::Heartbeat() { + uint64_t now_ms = current_time_ms(); + uint64_t interval = now_ms - last_heartbeat_at_ms_; + if (interval > RayConfig::instance().num_heartbeats_warning() * + RayConfig::instance().raylet_heartbeat_period_milliseconds()) { + RAY_LOG(WARNING) + << "Last heartbeat was sent " << interval + << " ms ago. There might be resource pressure on this node. If heartbeat keeps " + "lagging, this node can be marked as dead mistakenly."; + } + last_heartbeat_at_ms_ = now_ms; + stats::HeartbeatReportMs.Record(interval); + + auto heartbeat_data = std::make_shared(); + heartbeat_data->set_node_id(self_node_id_.Binary()); + RAY_CHECK_OK( + gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, [](Status status) { + if (status.IsDisconnected()) { + RAY_LOG(FATAL) << "This node has beem marked as dead."; + } + })); +} + NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self_node_id, const NodeManagerConfig &config, ObjectManager &object_manager, std::shared_ptr gcs_client, @@ -175,7 +226,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self SendSpilledObjectRestorationRequestToRemoteNode(object_id, spilled_url, node_id); }), - report_worker_backlog_(RayConfig::instance().report_worker_backlog()), last_local_gc_ns_(absl::GetCurrentTimeNanos()), local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), local_gc_min_interval_ns_(RayConfig::instance().local_gc_min_interval_s() * 1e9), @@ -268,6 +318,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self } ray::Status NodeManager::RegisterGcs() { + // Start sending heartbeat here to ensure it happening after raylet being registered. + heartbeat_sender_.reset(new HeartbeatSender(self_node_id_, gcs_client_)); auto on_node_change = [this](const NodeID &node_id, const GcsNodeInfo &data) { if (data.state() == GcsNodeInfo::ALIVE) { NodeAdded(data); @@ -335,11 +387,6 @@ ray::Status NodeManager::RegisterGcs() { RAY_RETURN_NOT_OK( gcs_client_->Jobs().AsyncSubscribeAll(job_subscribe_handler, nullptr)); - // Start sending heartbeats to the GCS. - last_heartbeat_at_ms_ = current_time_ms(); - periodical_runner_.RunFnPeriodically( - [this] { Heartbeat(); }, - RayConfig::instance().raylet_heartbeat_period_milliseconds()); periodical_runner_.RunFnPeriodically( [this] { DumpDebugState(); @@ -427,29 +474,6 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job } } -void NodeManager::Heartbeat() { - uint64_t now_ms = current_time_ms(); - uint64_t interval = now_ms - last_heartbeat_at_ms_; - if (interval > RayConfig::instance().num_heartbeats_warning() * - RayConfig::instance().raylet_heartbeat_period_milliseconds()) { - RAY_LOG(WARNING) - << "Last heartbeat was sent " << interval - << " ms ago. There might be resource pressure on this node. If heartbeat keeps " - "lagging, this node can be marked as dead mistakenly."; - } - last_heartbeat_at_ms_ = now_ms; - stats::HeartbeatReportMs.Record(interval); - - auto heartbeat_data = std::make_shared(); - heartbeat_data->set_node_id(self_node_id_.Binary()); - RAY_CHECK_OK( - gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, [](Status status) { - if (status.IsDisconnected()) { - RAY_LOG(FATAL) << "This node has beem marked as dead."; - } - })); -} - void NodeManager::ReportResourceUsage() { auto resources_data = std::make_shared(); resources_data->set_node_id(self_node_id_.Binary()); @@ -1448,7 +1472,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest rpc::Task task_message; task_message.mutable_task_spec()->CopyFrom(request.resource_spec()); auto backlog_size = -1; - if (report_worker_backlog_) { + if (RayConfig::instance().report_worker_backlog()) { backlog_size = request.backlog_size(); } Task task(task_message, backlog_size); @@ -2669,6 +2693,12 @@ void NodeManager::TriggerGlobalGC() { should_local_gc_ = true; } +void NodeManager::Stop() { + if (heartbeat_sender_) { + heartbeat_sender_.reset(); + } +} + void NodeManager::RecordMetrics() { recorded_metrics_ = true; if (stats::StatsConfig::instance().IsStatsDisabled()) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 3f2476fa2..7d515c975 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -105,6 +105,35 @@ struct NodeManagerConfig { int64_t min_spilling_size; }; +class HeartbeatSender { + public: + /// Create a heartbeat sender. + /// + /// \param self_node_id ID of this node. + /// \param gcs_client GCS client to send heartbeat. + HeartbeatSender(NodeID self_node_id, std::shared_ptr gcs_client); + + ~HeartbeatSender(); + + private: + /// Send heartbeats to the GCS. + void Heartbeat(); + + /// ID of this node. + NodeID self_node_id_; + /// A client connection to the GCS. + std::shared_ptr gcs_client_; + /// The io service used in heartbeat loop in case of it being + /// blocked by main thread. + boost::asio::io_service heartbeat_io_service_; + /// Heartbeat thread, using with heartbeat_io_service_. + std::unique_ptr heartbeat_thread_; + std::unique_ptr heartbeat_runner_; + /// The time that the last heartbeat was sent at. Used to make sure we are + /// keeping up with heartbeats. + uint64_t last_heartbeat_at_ms_; +}; + class NodeManager : public rpc::NodeManagerServiceHandler, public ClusterTaskManagerInterface { public: @@ -160,6 +189,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// object ids. void TriggerGlobalGC(); + /// Stop this node manager. + void Stop(); + private: /// Methods for handling nodes. @@ -198,9 +230,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// \return Void. void TryLocalInfeasibleTaskScheduling(); - /// Send heartbeats to the GCS. - void Heartbeat(); - /// Report resource usage to the GCS. void ReportResourceUsage(); @@ -684,8 +713,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// \param readyIds: The tasks which are now ready to be dispatched. void TasksUnblocked(const std::vector &ready_ids) override; - /// Populate the relevant parts of the heartbeat table. This is intended for - /// sending raylet <-> gcs heartbeats. In particular, this should fill in + /// Populate the relevant parts of the resource usage. This is intended for + /// sending resource usage from raylet to gcs. In particular, this should fill in /// resource_load and resource_load_by_shape. /// /// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only @@ -772,6 +801,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// ID of this node. NodeID self_node_id_; boost::asio::io_service &io_service_; + /// Class to send heartbeat to GCS. + std::unique_ptr heartbeat_sender_; ObjectManager &object_manager_; /// A Plasma object store client. This is used for creating new objects in /// the object store (e.g., for actor tasks that can't be run because the @@ -794,12 +825,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, bool recorded_metrics_ = false; /// The path to the ray temp dir. std::string temp_dir_; - /// The time that the last heartbeat was sent at. Used to make sure we are - /// keeping up with heartbeats. - uint64_t last_heartbeat_at_ms_; - /// The number of heartbeats that we should wait before sending the - /// next load report. - uint8_t num_heartbeats_before_load_report_; /// Initial node manager configuration. const NodeManagerConfig initial_config_; /// The resources (and specific resource IDs) that are currently available. @@ -853,14 +878,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// lease on. absl::flat_hash_map> leased_workers_by_owner_; - /// Whether to report the worker's backlog size in the GCS heartbeat. - const bool report_worker_backlog_; - - /// Whether to trigger global GC in the next heartbeat. This will broadcast + /// Whether to trigger global GC in the next resource usage report. This will broadcast /// a global GC message to all raylets except for this one. bool should_global_gc_ = false; - /// Whether to trigger local GC in the next heartbeat. This will trigger gc + /// Whether to trigger local GC in the next resource usage report. This will trigger gc /// on all local workers of this raylet. bool should_local_gc_ = false; diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 4d9514e62..687b68409 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -124,6 +124,7 @@ void Raylet::Start() { void Raylet::Stop() { object_manager_.Stop(); RAY_CHECK_OK(gcs_client_->Nodes().UnregisterSelf()); + node_manager_.Stop(); acceptor_.close(); }