[GCS]Cherry pick heartbeat function into another thread (#14301)

This commit is contained in:
Tao Wang 2021-03-02 17:49:02 +08:00 committed by GitHub
parent 09fd38ede1
commit 2de01ee3b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 104 additions and 51 deletions

View file

@ -110,7 +110,7 @@ int main(int argc, char *argv[]) {
gcs_client = std::make_shared<ray::gcs::ServiceBasedGcsClient>(client_options);
RAY_CHECK_OK(gcs_client->Connect(main_service));
std::unique_ptr<ray::raylet::Raylet> server(nullptr);
std::unique_ptr<ray::raylet::Raylet> raylet(nullptr);
RAY_CHECK_OK(gcs_client->Nodes().AsyncGetInternalConfig(
[&](::ray::Status status,
@ -229,22 +229,22 @@ int main(int argc, char *argv[]) {
ray::stats::Init(global_tags, metrics_agent_port);
// Initialize the node manager.
server.reset(new ray::raylet::Raylet(
raylet.reset(new ray::raylet::Raylet(
main_service, raylet_socket_name, node_ip_address, redis_address, redis_port,
redis_password, node_manager_config, object_manager_config, gcs_client,
metrics_export_port));
server->Start();
raylet->Start();
}));
// Destroy the Raylet on a SIGTERM. The pointer to main_service is
// guaranteed to be valid since this function will run the event loop
// instead of returning immediately.
// We should stop the service and remove the local socket file.
auto handler = [&main_service, &raylet_socket_name, &server, &gcs_client](
auto handler = [&main_service, &raylet_socket_name, &raylet, &gcs_client](
const boost::system::error_code &error, int signal_number) {
RAY_LOG(INFO) << "Raylet received SIGTERM, shutting down...";
server->Stop();
raylet->Stop();
gcs_client->Disconnect();
ray::stats::Shutdown();
main_service.stop();

View file

@ -114,6 +114,57 @@ std::string WorkerOwnerString(std::shared_ptr<WorkerInterface> &worker) {
return buffer.str();
}
HeartbeatSender::HeartbeatSender(NodeID self_node_id,
std::shared_ptr<gcs::GcsClient> gcs_client)
: self_node_id_(self_node_id), gcs_client_(gcs_client) {
// Init heartbeat thread and run its io service.
heartbeat_thread_.reset(new std::thread([this] {
SetThreadName("heartbeat");
/// The asio work to keep io_service_ alive.
boost::asio::io_service::work io_service_work_(heartbeat_io_service_);
heartbeat_io_service_.run();
}));
heartbeat_runner_.reset(new PeriodicalRunner(heartbeat_io_service_));
// Start sending heartbeats to the GCS.
last_heartbeat_at_ms_ = current_time_ms();
heartbeat_runner_->RunFnPeriodically(
[this] { Heartbeat(); },
RayConfig::instance().raylet_heartbeat_period_milliseconds());
}
HeartbeatSender::~HeartbeatSender() {
heartbeat_runner_.reset();
heartbeat_io_service_.stop();
if (heartbeat_thread_->joinable()) {
heartbeat_thread_->join();
}
heartbeat_thread_.reset();
}
void HeartbeatSender::Heartbeat() {
uint64_t now_ms = current_time_ms();
uint64_t interval = now_ms - last_heartbeat_at_ms_;
if (interval > RayConfig::instance().num_heartbeats_warning() *
RayConfig::instance().raylet_heartbeat_period_milliseconds()) {
RAY_LOG(WARNING)
<< "Last heartbeat was sent " << interval
<< " ms ago. There might be resource pressure on this node. If heartbeat keeps "
"lagging, this node can be marked as dead mistakenly.";
}
last_heartbeat_at_ms_ = now_ms;
stats::HeartbeatReportMs.Record(interval);
auto heartbeat_data = std::make_shared<HeartbeatTableData>();
heartbeat_data->set_node_id(self_node_id_.Binary());
RAY_CHECK_OK(
gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, [](Status status) {
if (status.IsDisconnected()) {
RAY_LOG(FATAL) << "This node has beem marked as dead.";
}
}));
}
NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self_node_id,
const NodeManagerConfig &config, ObjectManager &object_manager,
std::shared_ptr<gcs::GcsClient> gcs_client,
@ -175,7 +226,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
SendSpilledObjectRestorationRequestToRemoteNode(object_id, spilled_url,
node_id);
}),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
last_local_gc_ns_(absl::GetCurrentTimeNanos()),
local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9),
local_gc_min_interval_ns_(RayConfig::instance().local_gc_min_interval_s() * 1e9),
@ -268,6 +318,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
}
ray::Status NodeManager::RegisterGcs() {
// Start sending heartbeat here to ensure it happening after raylet being registered.
heartbeat_sender_.reset(new HeartbeatSender(self_node_id_, gcs_client_));
auto on_node_change = [this](const NodeID &node_id, const GcsNodeInfo &data) {
if (data.state() == GcsNodeInfo::ALIVE) {
NodeAdded(data);
@ -335,11 +387,6 @@ ray::Status NodeManager::RegisterGcs() {
RAY_RETURN_NOT_OK(
gcs_client_->Jobs().AsyncSubscribeAll(job_subscribe_handler, nullptr));
// Start sending heartbeats to the GCS.
last_heartbeat_at_ms_ = current_time_ms();
periodical_runner_.RunFnPeriodically(
[this] { Heartbeat(); },
RayConfig::instance().raylet_heartbeat_period_milliseconds());
periodical_runner_.RunFnPeriodically(
[this] {
DumpDebugState();
@ -427,29 +474,6 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job
}
}
void NodeManager::Heartbeat() {
uint64_t now_ms = current_time_ms();
uint64_t interval = now_ms - last_heartbeat_at_ms_;
if (interval > RayConfig::instance().num_heartbeats_warning() *
RayConfig::instance().raylet_heartbeat_period_milliseconds()) {
RAY_LOG(WARNING)
<< "Last heartbeat was sent " << interval
<< " ms ago. There might be resource pressure on this node. If heartbeat keeps "
"lagging, this node can be marked as dead mistakenly.";
}
last_heartbeat_at_ms_ = now_ms;
stats::HeartbeatReportMs.Record(interval);
auto heartbeat_data = std::make_shared<HeartbeatTableData>();
heartbeat_data->set_node_id(self_node_id_.Binary());
RAY_CHECK_OK(
gcs_client_->Nodes().AsyncReportHeartbeat(heartbeat_data, [](Status status) {
if (status.IsDisconnected()) {
RAY_LOG(FATAL) << "This node has beem marked as dead.";
}
}));
}
void NodeManager::ReportResourceUsage() {
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->set_node_id(self_node_id_.Binary());
@ -1448,7 +1472,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
rpc::Task task_message;
task_message.mutable_task_spec()->CopyFrom(request.resource_spec());
auto backlog_size = -1;
if (report_worker_backlog_) {
if (RayConfig::instance().report_worker_backlog()) {
backlog_size = request.backlog_size();
}
Task task(task_message, backlog_size);
@ -2669,6 +2693,12 @@ void NodeManager::TriggerGlobalGC() {
should_local_gc_ = true;
}
void NodeManager::Stop() {
if (heartbeat_sender_) {
heartbeat_sender_.reset();
}
}
void NodeManager::RecordMetrics() {
recorded_metrics_ = true;
if (stats::StatsConfig::instance().IsStatsDisabled()) {

View file

@ -105,6 +105,35 @@ struct NodeManagerConfig {
int64_t min_spilling_size;
};
class HeartbeatSender {
public:
/// Create a heartbeat sender.
///
/// \param self_node_id ID of this node.
/// \param gcs_client GCS client to send heartbeat.
HeartbeatSender(NodeID self_node_id, std::shared_ptr<gcs::GcsClient> gcs_client);
~HeartbeatSender();
private:
/// Send heartbeats to the GCS.
void Heartbeat();
/// ID of this node.
NodeID self_node_id_;
/// A client connection to the GCS.
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// The io service used in heartbeat loop in case of it being
/// blocked by main thread.
boost::asio::io_service heartbeat_io_service_;
/// Heartbeat thread, using with heartbeat_io_service_.
std::unique_ptr<std::thread> heartbeat_thread_;
std::unique_ptr<PeriodicalRunner> heartbeat_runner_;
/// The time that the last heartbeat was sent at. Used to make sure we are
/// keeping up with heartbeats.
uint64_t last_heartbeat_at_ms_;
};
class NodeManager : public rpc::NodeManagerServiceHandler,
public ClusterTaskManagerInterface {
public:
@ -160,6 +189,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// object ids.
void TriggerGlobalGC();
/// Stop this node manager.
void Stop();
private:
/// Methods for handling nodes.
@ -198,9 +230,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \return Void.
void TryLocalInfeasibleTaskScheduling();
/// Send heartbeats to the GCS.
void Heartbeat();
/// Report resource usage to the GCS.
void ReportResourceUsage();
@ -684,8 +713,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \param readyIds: The tasks which are now ready to be dispatched.
void TasksUnblocked(const std::vector<TaskID> &ready_ids) override;
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// Populate the relevant parts of the resource usage. This is intended for
/// sending resource usage from raylet to gcs. In particular, this should fill in
/// resource_load and resource_load_by_shape.
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
@ -772,6 +801,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// ID of this node.
NodeID self_node_id_;
boost::asio::io_service &io_service_;
/// Class to send heartbeat to GCS.
std::unique_ptr<HeartbeatSender> heartbeat_sender_;
ObjectManager &object_manager_;
/// A Plasma object store client. This is used for creating new objects in
/// the object store (e.g., for actor tasks that can't be run because the
@ -794,12 +825,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
bool recorded_metrics_ = false;
/// The path to the ray temp dir.
std::string temp_dir_;
/// The time that the last heartbeat was sent at. Used to make sure we are
/// keeping up with heartbeats.
uint64_t last_heartbeat_at_ms_;
/// The number of heartbeats that we should wait before sending the
/// next load report.
uint8_t num_heartbeats_before_load_report_;
/// Initial node manager configuration.
const NodeManagerConfig initial_config_;
/// The resources (and specific resource IDs) that are currently available.
@ -853,14 +878,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// lease on.
absl::flat_hash_map<WorkerID, std::vector<WorkerID>> leased_workers_by_owner_;
/// Whether to report the worker's backlog size in the GCS heartbeat.
const bool report_worker_backlog_;
/// Whether to trigger global GC in the next heartbeat. This will broadcast
/// Whether to trigger global GC in the next resource usage report. This will broadcast
/// a global GC message to all raylets except for this one.
bool should_global_gc_ = false;
/// Whether to trigger local GC in the next heartbeat. This will trigger gc
/// Whether to trigger local GC in the next resource usage report. This will trigger gc
/// on all local workers of this raylet.
bool should_local_gc_ = false;

View file

@ -124,6 +124,7 @@ void Raylet::Start() {
void Raylet::Stop() {
object_manager_.Stop();
RAY_CHECK_OK(gcs_client_->Nodes().UnregisterSelf());
node_manager_.Stop();
acceptor_.close();
}