Revert "[Core]Pull off timers out of heartbeat in raylet (#13963)" (#14319)

This commit is contained in:
Richard Liaw 2021-02-24 19:44:31 -08:00 committed by GitHub
parent be28e8fae4
commit 80657e5dfe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 271 additions and 203 deletions

View file

@ -48,7 +48,6 @@ def test_gcs_server_restart(ray_start_regular):
assert result == 9
@pytest.mark.skipif(sys.platform == "win32", reason="Fails on Windows.")
@pytest.mark.parametrize(
"ray_start_regular", [
generate_system_config_map(

View file

@ -35,7 +35,7 @@ RAY_CONFIG(int64_t, ray_cookie, 0x5241590000000000)
RAY_CONFIG(int64_t, handler_warning_timeout_ms, 1000)
/// The duration between heartbeats sent by the raylets.
RAY_CONFIG(uint64_t, raylet_heartbeat_period_milliseconds, 100)
RAY_CONFIG(int64_t, raylet_heartbeat_period_milliseconds, 100)
/// If a component has not sent a heartbeat in the last num_heartbeats_timeout
/// heartbeat intervals, the raylet monitor process will report
/// it as dead to the db_client table.
@ -46,10 +46,10 @@ RAY_CONFIG(int64_t, num_heartbeats_timeout, 300)
RAY_CONFIG(uint64_t, num_heartbeats_warning, 5)
/// The duration between reporting resources sent by the raylets.
RAY_CONFIG(uint64_t, raylet_report_resources_period_milliseconds, 100)
RAY_CONFIG(int64_t, raylet_report_resources_period_milliseconds, 100)
/// The duration between dumping debug info to logs, or 0 to disable.
RAY_CONFIG(uint64_t, debug_dump_period_milliseconds, 10000)
/// The duration between dumping debug info to logs, or -1 to disable.
RAY_CONFIG(int64_t, debug_dump_period_milliseconds, 10000)
/// Whether to enable fair queueing between task classes in raylet. When
/// fair queueing is enabled, the raylet will try to balance the number
@ -136,7 +136,7 @@ RAY_CONFIG(int64_t, worker_lease_timeout_milliseconds, 500)
/// The interval at which the workers will check if their raylet has gone down.
/// When this happens, they will kill themselves.
RAY_CONFIG(uint64_t, raylet_death_check_interval_milliseconds, 1000)
RAY_CONFIG(int64_t, raylet_death_check_interval_milliseconds, 1000)
/// These are used by the worker to set timeouts and to batch requests when
/// getting objects.
@ -215,7 +215,7 @@ RAY_CONFIG(int64_t, gcs_service_connect_retries, 50)
RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100)
/// The interval at which the gcs server will check if redis has gone down.
/// When this happens, gcs server will kill itself.
RAY_CONFIG(uint64_t, gcs_redis_heartbeat_interval_milliseconds, 100)
RAY_CONFIG(int64_t, gcs_redis_heartbeat_interval_milliseconds, 100)
/// Duration to wait between retries for leasing worker in gcs server.
RAY_CONFIG(uint32_t, gcs_lease_worker_retry_interval_ms, 200)
/// Duration to wait between retries for creating actor in gcs server.
@ -265,7 +265,7 @@ RAY_CONFIG(bool, release_resources_during_plasma_fetch, false)
/// The interval at which the gcs client will check if the address of gcs service has
/// changed. When the address changed, we will resubscribe again.
RAY_CONFIG(uint64_t, gcs_service_address_check_interval_milliseconds, 1000)
RAY_CONFIG(int64_t, gcs_service_address_check_interval_milliseconds, 1000)
/// The batch size for metrics export.
RAY_CONFIG(int64_t, metrics_report_batch_size, 100)
@ -303,9 +303,9 @@ RAY_CONFIG(bool, enable_worker_prestart,
getenv("RAY_ENABLE_WORKER_PRESTART") == nullptr ||
getenv("RAY_ENABLE_WORKER_PRESTART") == std::string("1"))
/// The interval of periodic idle worker killing. Value of 0 means worker capping is
/// The interval of periodic idle worker killing. A negative value means worker capping is
/// disabled.
RAY_CONFIG(uint64_t, kill_idle_workers_interval_ms, 200)
RAY_CONFIG(int64_t, kill_idle_workers_interval_ms, 200)
/// The idle time threshold for an idle worker to be killed.
RAY_CONFIG(int64_t, idle_worker_killing_time_threshold_ms, 1000)

View file

@ -28,7 +28,7 @@
namespace {
// Duration between internal book-keeping heartbeats.
const uint64_t kInternalHeartbeatMillis = 1000;
const int kInternalHeartbeatMillis = 1000;
void BuildCommonTaskSpec(
ray::TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id,
@ -301,7 +301,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
worker_context_(options_.worker_type, worker_id, GetProcessJobID(options_)),
io_work_(io_service_),
client_call_manager_(new rpc::ClientCallManager(io_service_)),
periodical_runner_(io_service_),
death_check_timer_(io_service_),
internal_timer_(io_service_),
task_queue_length_(0),
num_executed_tasks_(0),
task_execution_service_work_(task_execution_service_),
@ -406,13 +407,15 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
});
if (options_.worker_type == ray::WorkerType::WORKER) {
periodical_runner_.RunFnPeriodically(
[this] { CheckForRayletFailure(); },
RayConfig::instance().raylet_death_check_interval_milliseconds());
death_check_timer_.expires_from_now(boost::asio::chrono::milliseconds(
RayConfig::instance().raylet_death_check_interval_milliseconds()));
death_check_timer_.async_wait(
boost::bind(&CoreWorker::CheckForRayletFailure, this, _1));
}
periodical_runner_.RunFnPeriodically([this] { InternalHeartbeat(); },
kInternalHeartbeatMillis);
internal_timer_.expires_from_now(
boost::asio::chrono::milliseconds(kInternalHeartbeatMillis));
internal_timer_.async_wait(boost::bind(&CoreWorker::InternalHeartbeat, this, _1));
plasma_store_provider_.reset(new CoreWorkerPlasmaStoreProvider(
options_.store_socket, local_raylet_client_, reference_counter_,
@ -788,14 +791,30 @@ void CoreWorker::RegisterToGcs() {
RAY_CHECK_OK(gcs_client_->Workers().AsyncAdd(worker_data, nullptr));
}
void CoreWorker::CheckForRayletFailure() {
void CoreWorker::CheckForRayletFailure(const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
}
if (!IsParentProcessAlive()) {
RAY_LOG(ERROR) << "Raylet failed. Shutting down.";
Shutdown();
}
// Reset the timer from the previous expiration time to avoid drift.
death_check_timer_.expires_at(
death_check_timer_.expiry() +
boost::asio::chrono::milliseconds(
RayConfig::instance().raylet_death_check_interval_milliseconds()));
death_check_timer_.async_wait(
boost::bind(&CoreWorker::CheckForRayletFailure, this, _1));
}
void CoreWorker::InternalHeartbeat() {
void CoreWorker::InternalHeartbeat(const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
}
absl::MutexLock lock(&mutex_);
while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.front().first) {
@ -807,6 +826,9 @@ void CoreWorker::InternalHeartbeat() {
}
to_resubmit_.pop_front();
}
internal_timer_.expires_at(internal_timer_.expiry() +
boost::asio::chrono::milliseconds(kInternalHeartbeatMillis));
internal_timer_.async_wait(boost::bind(&CoreWorker::InternalHeartbeat, this, _1));
}
std::unordered_map<ObjectID, std::pair<size_t, size_t>>

View file

@ -36,7 +36,6 @@
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/worker/core_worker_client.h"
#include "ray/rpc/worker/core_worker_server.h"
#include "ray/util/periodical_runner.h"
/// The set of gRPC handlers and their associated level of concurrency. If you want to
/// add a new call to the worker gRPC server, do the following:
@ -977,10 +976,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void RegisterToGcs();
/// Check if the raylet has failed. If so, shutdown.
void CheckForRayletFailure();
void CheckForRayletFailure(const boost::system::error_code &error);
/// Heartbeat for internal bookkeeping.
void InternalHeartbeat();
void InternalHeartbeat(const boost::system::error_code &error);
///
/// Private methods related to task submission.
@ -1136,8 +1135,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Shared core worker client pool.
std::shared_ptr<rpc::CoreWorkerClientPool> core_worker_client_pool_;
/// The runner to run function periodically.
PeriodicalRunner periodical_runner_;
/// Timer used to periodically check if the raylet has died.
boost::asio::steady_timer death_check_timer_;
/// Timer for internal book-keeping.
boost::asio::steady_timer internal_timer_;
/// RPC server used to receive tasks to execute.
std::unique_ptr<rpc::GrpcServer> core_worker_server_;

View file

@ -31,13 +31,13 @@ Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_add
boost::asio::io_service &io_service,
const std::shared_ptr<gcs::GcsClient> &gcs_client)
: io_service_(io_service),
periodical_runner_(io_service_),
timer_(io_service_, boost::asio::chrono::seconds(1)),
rpc_profile_data_(new rpc::ProfileTableData()),
gcs_client_(gcs_client) {
rpc_profile_data_->set_component_type(WorkerTypeString(worker_context.GetWorkerType()));
rpc_profile_data_->set_component_id(worker_context.GetWorkerID().Binary());
rpc_profile_data_->set_node_ip_address(node_ip_address);
periodical_runner_.RunFnPeriodically([this] { FlushEvents(); }, 1000);
timer_.async_wait(boost::bind(&Profiler::FlushEvents, this, _1));
}
void Profiler::AddEvent(const rpc::ProfileTableData::ProfileEvent &event) {
@ -45,7 +45,11 @@ void Profiler::AddEvent(const rpc::ProfileTableData::ProfileEvent &event) {
rpc_profile_data_->add_profile_events()->CopyFrom(event);
}
void Profiler::FlushEvents() {
void Profiler::FlushEvents(const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
}
auto cur_profile_data = std::make_shared<rpc::ProfileTableData>();
{
absl::MutexLock lock(&mutex_);
@ -67,6 +71,10 @@ void Profiler::FlushEvents() {
<< " events to GCS.";
}
}
// Reset the timer to 1 second from the previous expiration time to avoid drift.
timer_.expires_at(timer_.expiry() + boost::asio::chrono::seconds(1));
timer_.async_wait(boost::bind(&Profiler::FlushEvents, this, _1));
}
} // namespace worker

View file

@ -19,7 +19,6 @@
#include "absl/time/clock.h"
#include "ray/core_worker/context.h"
#include "ray/gcs/gcs_client.h"
#include "ray/util/periodical_runner.h"
namespace ray {
@ -36,7 +35,7 @@ class Profiler {
private:
// Flush all of the events that have been added since last flush to the GCS.
void FlushEvents() LOCKS_EXCLUDED(mutex_);
void FlushEvents(const boost::system::error_code &error) LOCKS_EXCLUDED(mutex_);
// Mutex guarding rpc_profile_data_.
absl::Mutex mutex_;
@ -44,8 +43,8 @@ class Profiler {
// ASIO IO service event loop. Must be started by the caller.
boost::asio::io_service &io_service_;
/// The runner to run function periodically.
PeriodicalRunner periodical_runner_;
// Timer used to periodically flush events to the GCS.
boost::asio::steady_timer timer_;
// RPC message containing profiling data. Holds the queue of profile events
// until they are flushed.

View file

@ -84,10 +84,8 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
placement_group_accessor_.reset(new ServiceBasedPlacementGroupInfoAccessor(this));
// Init gcs service address check timer.
periodical_runner_.reset(new PeriodicalRunner(io_service));
periodical_runner_->RunFnPeriodically(
[this] { PeriodicallyCheckGcsServerAddress(); },
RayConfig::instance().gcs_service_address_check_interval_milliseconds());
detect_timer_.reset(new boost::asio::deadline_timer(io_service));
PeriodicallyCheckGcsServerAddress();
is_connected_ = true;
@ -98,7 +96,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
void ServiceBasedGcsClient::Disconnect() {
RAY_CHECK(is_connected_);
is_connected_ = false;
periodical_runner_.reset();
detect_timer_->cancel();
gcs_pub_sub_.reset();
redis_client_->Disconnect();
redis_client_.reset();
@ -154,6 +152,19 @@ void ServiceBasedGcsClient::PeriodicallyCheckGcsServerAddress() {
GcsServiceFailureDetected(rpc::GcsServiceFailureType::GCS_SERVER_RESTART);
}
}
auto check_period = boost::posix_time::milliseconds(
RayConfig::instance().gcs_service_address_check_interval_milliseconds());
detect_timer_->expires_from_now(check_period);
detect_timer_->async_wait([this](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `detect_timer_` is canceled or destroyed.
return;
}
RAY_CHECK(!error) << "Checking gcs server address failed with error: "
<< error.message();
PeriodicallyCheckGcsServerAddress();
});
}
void ServiceBasedGcsClient::GcsServiceFailureDetected(rpc::GcsServiceFailureType type) {

View file

@ -18,7 +18,6 @@
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_client.h"
#include "ray/util/periodical_runner.h"
namespace ray {
namespace gcs {
@ -67,8 +66,8 @@ class RAY_EXPORT ServiceBasedGcsClient : public GcsClient {
std::unique_ptr<rpc::GcsRpcClient> gcs_rpc_client_;
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;
// The runner to run function periodically.
std::unique_ptr<PeriodicalRunner> periodical_runner_;
// A timer used to check if gcs server address changed.
std::unique_ptr<boost::asio::deadline_timer> detect_timer_;
std::function<bool(std::pair<std::string, int> *)> get_server_address_func_;
std::function<void(bool)> resubscribe_func_;
std::pair<std::string, int> current_gcs_server_address_;

View file

@ -26,7 +26,7 @@ GcsHeartbeatManager::GcsHeartbeatManager(
: io_service_(io_service),
on_node_death_callback_(std::move(on_node_death_callback)),
num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()),
periodical_runner_(io_service) {
detect_timer_(io_service) {
io_service_thread_.reset(new std::thread([this] {
SetThreadName("heartbeat");
/// The asio work to keep io_service_ alive.
@ -46,9 +46,7 @@ void GcsHeartbeatManager::Initialize(const GcsInitData &gcs_init_data) {
void GcsHeartbeatManager::Start() {
io_service_.post([this] {
if (!is_started_) {
periodical_runner_.RunFnPeriodically(
[this] { DetectDeadNodes(); },
RayConfig::instance().raylet_heartbeat_period_milliseconds());
Tick();
is_started_ = true;
}
});
@ -82,6 +80,12 @@ void GcsHeartbeatManager::HandleReportHeartbeat(
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}
/// A periodic timer that checks for timed out clients.
void GcsHeartbeatManager::Tick() {
DetectDeadNodes();
ScheduleTick();
}
void GcsHeartbeatManager::DetectDeadNodes() {
for (auto it = heartbeats_.begin(); it != heartbeats_.end();) {
auto current = it++;
@ -97,5 +101,20 @@ void GcsHeartbeatManager::DetectDeadNodes() {
}
}
void GcsHeartbeatManager::ScheduleTick() {
auto heartbeat_period = boost::posix_time::milliseconds(
RayConfig::instance().raylet_heartbeat_period_milliseconds());
detect_timer_.expires_from_now(heartbeat_period);
detect_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `detect_timer_` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g. gcs_server)
return;
}
RAY_CHECK(!error) << "Checking heartbeat failed with error: " << error.message();
Tick();
});
}
} // namespace gcs
} // namespace ray

View file

@ -21,9 +21,7 @@
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "ray/util/periodical_runner.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
@ -64,10 +62,18 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler {
void AddNode(const NodeID &node_id);
protected:
/// A periodic timer that fires on every heartbeat period. Raylets that have
/// not sent a heartbeat within the last num_heartbeats_timeout ticks will be
/// marked as dead in the client table.
void Tick();
/// Check that if any raylet is inactive due to no heartbeat for a period of time.
/// If found any, mark it as dead.
void DetectDeadNodes();
/// Schedule another tick after a short time.
void ScheduleTick();
private:
/// The main event loop for node failure detector.
boost::asio::io_service &io_service_;
@ -76,8 +82,8 @@ class GcsHeartbeatManager : public rpc::HeartbeatInfoHandler {
std::function<void(const NodeID &)> on_node_death_callback_;
/// The number of heartbeats that can be missed before a node is removed.
int64_t num_heartbeats_timeout_;
/// The runner to run function periodically.
PeriodicalRunner periodical_runner_;
/// A timer that ticks every heartbeat_timeout_ms_ milliseconds.
boost::asio::deadline_timer detect_timer_;
/// For each Raylet that we receive a heartbeat from, the number of ticks
/// that may pass before the Raylet will be declared dead.
absl::flat_hash_map<NodeID, int64_t> heartbeats_;

View file

@ -23,14 +23,12 @@ GcsRedisFailureDetector::GcsRedisFailureDetector(
boost::asio::io_service &io_service, std::shared_ptr<RedisContext> redis_context,
std::function<void()> callback)
: redis_context_(redis_context),
periodical_runner_(io_service),
detect_timer_(io_service),
callback_(std::move(callback)) {}
void GcsRedisFailureDetector::Start() {
RAY_LOG(INFO) << "Starting redis failure detector.";
periodical_runner_.RunFnPeriodically(
[this] { DetectRedis(); },
RayConfig::instance().gcs_redis_heartbeat_interval_milliseconds());
Tick();
}
void GcsRedisFailureDetector::DetectRedis() {
@ -49,5 +47,24 @@ void GcsRedisFailureDetector::DetectRedis() {
}
}
/// A periodic timer that checks for timed out clients.
void GcsRedisFailureDetector::Tick() {
DetectRedis();
ScheduleTick();
}
void GcsRedisFailureDetector::ScheduleTick() {
auto detect_period = boost::posix_time::milliseconds(
RayConfig::instance().gcs_redis_heartbeat_interval_milliseconds());
detect_timer_.expires_from_now(detect_period);
detect_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
}
RAY_CHECK(!error) << "Detecting redis failed with error: " << error.message();
Tick();
});
}
} // namespace gcs
} // namespace ray

View file

@ -17,7 +17,6 @@
#include <boost/asio.hpp>
#include "ray/gcs/redis_context.h"
#include "ray/util/periodical_runner.h"
namespace ray {
@ -45,6 +44,12 @@ class GcsRedisFailureDetector {
void Start();
protected:
/// A periodic timer that fires on every gcs detect period.
void Tick();
/// Schedule another tick after a short time.
void ScheduleTick();
/// Check that if redis is inactive.
void DetectRedis();
@ -53,8 +58,8 @@ class GcsRedisFailureDetector {
/// TODO(ffbin): We will use redis client later.
std::shared_ptr<RedisContext> redis_context_;
/// The runner to run function periodically.
PeriodicalRunner periodical_runner_;
/// A timer that ticks every gcs_detect_timeout_milliseconds.
boost::asio::deadline_timer detect_timer_;
/// A function is called when redis is detected to be unavailable.
std::function<void()> callback_;

View file

@ -22,12 +22,10 @@ namespace gcs {
GcsResourceManager::GcsResourceManager(
boost::asio::io_service &main_io_service, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: periodical_runner_(main_io_service),
: resource_timer_(main_io_service),
gcs_pub_sub_(gcs_pub_sub),
gcs_table_storage_(gcs_table_storage) {
periodical_runner_.RunFnPeriodically(
[this] { SendBatchedResourceUsage(); },
RayConfig::instance().raylet_report_resources_period_milliseconds());
SendBatchedResourceUsage();
}
void GcsResourceManager::HandleGetResources(const rpc::GetResourcesRequest &request,
@ -366,6 +364,20 @@ void GcsResourceManager::SendBatchedResourceUsage() {
batch->SerializeAsString(), nullptr));
resources_buffer_.clear();
}
auto resources_period = boost::posix_time::milliseconds(
RayConfig::instance().raylet_report_resources_period_milliseconds());
resource_timer_.expires_from_now(resources_period);
resource_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `resource_timer_` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g. gcs_server)
return;
}
RAY_CHECK(!error) << "Sending batched resource usage failed with error: "
<< error.message();
SendBatchedResourceUsage();
});
}
void GcsResourceManager::UpdatePlacementGroupLoad(

View file

@ -24,7 +24,6 @@
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "ray/util/periodical_runner.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
@ -156,8 +155,8 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
/// Send any buffered resource usage as a single publish.
void SendBatchedResourceUsage();
/// The runner to run function periodically.
PeriodicalRunner periodical_runner_;
/// A timer that ticks every raylet_report_resources_period_milliseconds.
boost::asio::deadline_timer resource_timer_;
/// Newest resource usage of all nodes.
absl::flat_hash_map<NodeID, rpc::ResourcesData> node_resource_usages_;
/// A buffer containing resource usage received from node managers in the last tick.

View file

@ -97,6 +97,13 @@ void LocalObjectManager::FlushFreeObjects() {
last_free_objects_at_ms_ = current_time_ms();
}
void LocalObjectManager::FlushFreeObjectsIfNeeded(int64_t now_ms) {
if (free_objects_period_ms_ > 0 &&
static_cast<int64_t>(now_ms - last_free_objects_at_ms_) > free_objects_period_ms_) {
FlushFreeObjects();
}
}
void LocalObjectManager::SpillObjectUptoMaxThroughput() {
if (RayConfig::instance().object_spilling_config().empty() ||
!RayConfig::instance().automatic_object_spilling_enabled()) {

View file

@ -113,9 +113,8 @@ class LocalObjectManager {
const NodeID &node_id,
std::function<void(const ray::Status &)> callback);
/// Clear any freed objects. This will trigger the callback for freed
/// objects.
void FlushFreeObjects();
/// Try to clear any objects that have been freed.
void FlushFreeObjectsIfNeeded(int64_t now_ms);
/// Judge if objects are deletable from pending_delete_queue and delete them if
/// necessary.
@ -168,6 +167,10 @@ class LocalObjectManager {
/// Release an object that has been freed by its owner.
void ReleaseFreedObject(const ObjectID &object_id);
/// Clear any freed objects. This will trigger the callback for freed
/// objects.
void FlushFreeObjects();
// A callback for unpinning spilled objects. This should be invoked after the object
// has been spilled and after the object directory has been sent the spilled URL.
void UnpinSpilledObjectCallback(const ObjectID &object_id,

View file

@ -180,8 +180,12 @@ int main(int argc, char *argv[]) {
RAY_LOG(DEBUG) << "Agent command is empty.";
}
node_manager_config.heartbeat_period_ms =
RayConfig::instance().raylet_heartbeat_period_milliseconds();
node_manager_config.report_resources_period_ms =
RayConfig::instance().raylet_report_resources_period_milliseconds();
node_manager_config.debug_dump_period_ms =
RayConfig::instance().debug_dump_period_milliseconds();
node_manager_config.record_metrics_period_ms =
RayConfig::instance().metrics_report_interval_ms() / 2;
node_manager_config.fair_queueing_enabled =

View file

@ -28,7 +28,6 @@
#include "ray/stats/stats.h"
#include "ray/util/asio_util.h"
#include "ray/util/sample.h"
#include "ray/util/util.h"
namespace {
@ -124,10 +123,15 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
object_manager_(object_manager),
gcs_client_(gcs_client),
object_directory_(object_directory),
periodical_runner_(io_service),
report_resources_period_ms_(config.report_resources_period_ms),
heartbeat_timer_(io_service),
heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)),
report_resources_timer_(io_service),
report_resources_period_(
std::chrono::milliseconds(config.report_resources_period_ms)),
debug_dump_period_(config.debug_dump_period_ms),
fair_queueing_enabled_(config.fair_queueing_enabled),
temp_dir_(config.temp_dir),
object_manager_profile_timer_(io_service),
initial_config_(config),
local_available_resources_(config.resource_config),
worker_pool_(io_service, config.num_workers_soft_limit,
@ -179,9 +183,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
last_local_gc_ns_(absl::GetCurrentTimeNanos()),
local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9),
local_gc_min_interval_ns_(RayConfig::instance().local_gc_min_interval_s() * 1e9),
record_metrics_period_ms_(config.record_metrics_period_ms) {
record_metrics_period_(config.record_metrics_period_ms) {
RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_;
RAY_CHECK(RayConfig::instance().raylet_heartbeat_period_milliseconds() > 0);
RAY_CHECK(heartbeat_period_.count() > 0);
// Initialize the resource map with own cluster resource configuration.
cluster_resource_map_.emplace(self_node_id_,
SchedulingResources(config.resource_config));
@ -327,29 +331,12 @@ ray::Status NodeManager::RegisterGcs() {
// Start sending heartbeats to the GCS.
last_heartbeat_at_ms_ = current_time_ms();
periodical_runner_.RunFnPeriodically(
[this] { Heartbeat(); },
RayConfig::instance().raylet_heartbeat_period_milliseconds());
periodical_runner_.RunFnPeriodically(
[this] {
DumpDebugState();
WarnResourceDeadlock();
},
RayConfig::instance().debug_dump_period_milliseconds());
periodical_runner_.RunFnPeriodically([this] { RecordMetrics(); },
record_metrics_period_ms_);
if (RayConfig::instance().free_objects_period_milliseconds() > 0) {
periodical_runner_.RunFnPeriodically(
[this] { local_object_manager_.FlushFreeObjects(); },
RayConfig::instance().free_objects_period_milliseconds());
}
periodical_runner_.RunFnPeriodically([this] { ReportResourceUsage(); },
report_resources_period_ms_);
last_debug_dump_at_ms_ = current_time_ms();
Heartbeat();
ReportResourceUsage();
// Start the timer that gets object manager profiling information and sends it
// to the GCS.
periodical_runner_.RunFnPeriodically(
[this] { GetObjectManagerProfileInfo(); },
RayConfig::instance().raylet_heartbeat_period_milliseconds());
GetObjectManagerProfileInfo();
return ray::Status::OK();
}
@ -438,6 +425,30 @@ void NodeManager::Heartbeat() {
RAY_LOG(FATAL) << "This node has beem marked as dead.";
}
}));
if (debug_dump_period_ > 0 &&
static_cast<int64_t>(now_ms - last_debug_dump_at_ms_) > debug_dump_period_) {
DumpDebugState();
WarnResourceDeadlock();
last_debug_dump_at_ms_ = now_ms;
}
if (record_metrics_period_ > 0 &&
static_cast<int64_t>(now_ms - metrics_last_recorded_time_ms_) >
record_metrics_period_) {
RecordMetrics();
metrics_last_recorded_time_ms_ = now_ms;
}
// Evict all copies of freed objects from the cluster.
local_object_manager_.FlushFreeObjectsIfNeeded(now_ms);
// Reset the timer.
heartbeat_timer_.expires_from_now(heartbeat_period_);
heartbeat_timer_.async_wait([this](const boost::system::error_code &error) {
RAY_CHECK(!error);
Heartbeat();
});
}
void NodeManager::ReportResourceUsage() {
@ -473,6 +484,13 @@ void NodeManager::ReportResourceUsage() {
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(resources_data,
/*done*/ nullptr));
}
// Reset the timer.
report_resources_timer_.expires_from_now(report_resources_period_);
report_resources_timer_.async_wait([this](const boost::system::error_code &error) {
RAY_CHECK(!error);
ReportResourceUsage();
});
}
void NodeManager::DoLocalGC() {
@ -648,6 +666,14 @@ void NodeManager::GetObjectManagerProfileInfo() {
RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(profile_info, nullptr));
}
// Reset the timer.
object_manager_profile_timer_.expires_from_now(heartbeat_period_);
object_manager_profile_timer_.async_wait(
[this](const boost::system::error_code &error) {
RAY_CHECK(!error);
GetObjectManagerProfileInfo();
});
int64_t interval = current_time_ms() - start_time_ms;
if (interval > RayConfig::instance().handler_warning_timeout_ms()) {
RAY_LOG(WARNING) << "GetObjectManagerProfileInfo handler took " << interval << " ms.";

View file

@ -14,6 +14,8 @@
#pragma once
#include <boost/asio/steady_timer.hpp>
// clang-format off
#include "ray/rpc/grpc_client.h"
#include "ray/rpc/node_manager/node_manager_server.h"
@ -83,8 +85,12 @@ struct NodeManagerConfig {
WorkerCommandMap worker_commands;
/// The command used to start agent.
std::string agent_command;
/// The time between heartbeats in milliseconds.
uint64_t heartbeat_period_ms;
/// The time between reports resources in milliseconds.
uint64_t report_resources_period_ms;
/// The time between debug dumps in milliseconds, or -1 to disable.
uint64_t debug_dump_period_ms;
/// Whether to enable fair queueing between task classes in raylet.
bool fair_queueing_enabled;
/// Whether to enable automatic object deletion for object spilling.
@ -97,7 +103,7 @@ struct NodeManagerConfig {
std::string session_dir;
/// The raylet config list of this node.
std::string raylet_config;
// The time between record metrics in milliseconds, or 0 to disable.
// The time between record metrics in milliseconds, or -1 to disable.
uint64_t record_metrics_period_ms;
// The number if max io workers.
int max_io_workers;
@ -781,10 +787,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
std::shared_ptr<gcs::GcsClient> gcs_client_;
/// The object table. This is shared with the object manager.
std::shared_ptr<ObjectDirectoryInterface> object_directory_;
/// The runner to run function periodically.
PeriodicalRunner periodical_runner_;
/// The timer used to send heartbeats.
boost::asio::steady_timer heartbeat_timer_;
/// The period used for the heartbeat timer.
std::chrono::milliseconds heartbeat_period_;
/// The timer used to report resources.
boost::asio::steady_timer report_resources_timer_;
/// The period used for the resources report timer.
uint64_t report_resources_period_ms_;
std::chrono::milliseconds report_resources_period_;
/// The period between debug state dumps.
int64_t debug_dump_period_;
/// Whether to enable fair queueing between task classes in raylet.
bool fair_queueing_enabled_;
/// Incremented each time we encounter a potential resource deadlock condition.
@ -794,9 +806,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
bool recorded_metrics_ = false;
/// The path to the ray temp dir.
std::string temp_dir_;
/// The timer used to get profiling information from the object manager and
/// push it to the GCS.
boost::asio::steady_timer object_manager_profile_timer_;
/// The time that the last heartbeat was sent at. Used to make sure we are
/// keeping up with heartbeats.
uint64_t last_heartbeat_at_ms_;
/// The time that the last debug string was logged to the console.
uint64_t last_debug_dump_at_ms_;
/// The number of heartbeats that we should wait before sending the
/// next load report.
uint8_t num_heartbeats_before_load_report_;
@ -898,7 +915,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// Fields that are used to report metrics.
/// The period between debug state dumps.
uint64_t record_metrics_period_ms_;
int64_t record_metrics_period_;
/// Last time metrics are recorded.
uint64_t metrics_last_recorded_time_ms_;

View file

@ -70,7 +70,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers_soft
first_job_driver_wait_num_python_workers_(std::min(
num_initial_python_workers_for_first_job, maximum_startup_concurrency)),
num_initial_python_workers_for_first_job_(num_initial_python_workers_for_first_job),
periodical_runner_(io_service) {
kill_idle_workers_timer_(io_service) {
RAY_CHECK(maximum_startup_concurrency > 0);
#ifndef _WIN32
// Ignore SIGCHLD signals. If we don't do this, then worker processes will
@ -102,11 +102,7 @@ WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers_soft
free_ports_->push(port);
}
}
if (RayConfig::instance().kill_idle_workers_interval_ms() > 0) {
periodical_runner_.RunFnPeriodically(
[this] { TryKillingIdleWorkers(); },
RayConfig::instance().kill_idle_workers_interval_ms());
}
ScheduleIdleWorkerKilling();
}
WorkerPool::~WorkerPool() {
@ -619,6 +615,20 @@ void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
}
}
void WorkerPool::ScheduleIdleWorkerKilling() {
if (RayConfig::instance().kill_idle_workers_interval_ms() > 0) {
kill_idle_workers_timer_.expires_from_now(boost::posix_time::milliseconds(
RayConfig::instance().kill_idle_workers_interval_ms()));
kill_idle_workers_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
}
TryKillingIdleWorkers();
ScheduleIdleWorkerKilling();
});
}
}
void WorkerPool::TryKillingIdleWorkers() {
RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size());

View file

@ -28,7 +28,6 @@
#include "ray/common/task/task_common.h"
#include "ray/gcs/gcs_client.h"
#include "ray/raylet/worker.h"
#include "ray/util/periodical_runner.h"
namespace ray {
@ -438,6 +437,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// reasonable size.
void TryKillingIdleWorkers();
/// Schedule the periodic killing of idle workers.
void ScheduleIdleWorkerKilling();
/// Get all workers of the given process.
///
/// \param process The process of workers.
@ -510,8 +512,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::unordered_map<std::shared_ptr<WorkerInterface>, int64_t>
idle_of_all_languages_map_;
/// The runner to run function periodically.
PeriodicalRunner periodical_runner_;
/// The timer to trigger idle worker killing.
boost::asio::deadline_timer kill_idle_workers_timer_;
};
} // namespace raylet

View file

@ -1,55 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/util/periodical_runner.h"
#include "ray/util/logging.h"
namespace ray {
PeriodicalRunner::PeriodicalRunner(boost::asio::io_service &io_service)
: io_service_(io_service) {}
PeriodicalRunner::~PeriodicalRunner() {
for (const auto &timer : timers_) {
timer->cancel();
}
timers_.clear();
}
void PeriodicalRunner::RunFnPeriodically(std::function<void()> fn, uint64_t period_ms) {
if (period_ms > 0) {
auto timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
timers_.push_back(timer);
DoRunFnPeriodically(fn, boost::posix_time::milliseconds(period_ms), *timer);
}
}
void PeriodicalRunner::DoRunFnPeriodically(std::function<void()> fn,
boost::posix_time::milliseconds period,
boost::asio::deadline_timer &timer) {
fn();
timer.expires_from_now(period);
timer.async_wait([this, fn, period, &timer](const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
// `operation_aborted` is set when `timer` is canceled or destroyed.
// The Monitor lifetime may be short than the object who use it. (e.g. gcs_server)
return;
}
RAY_CHECK(!error) << error.message();
DoRunFnPeriodically(fn, period, timer);
});
}
} // namespace ray

View file

@ -1,44 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <boost/asio.hpp>
#include <boost/asio/deadline_timer.hpp>
namespace ray {
/// \class PeriodicalRunner
/// A periodical runner attached with an io_context.
/// It can run functions with specified period. Each function is triggered by its timer.
/// To run a function, call `RunFnPeriodically(fn, period_ms)`.
/// All registered functions will stop running once this object is destructed.
class PeriodicalRunner {
public:
PeriodicalRunner(boost::asio::io_service &io_service);
~PeriodicalRunner();
void RunFnPeriodically(std::function<void()> fn, uint64_t period_ms);
private:
void DoRunFnPeriodically(std::function<void()> fn,
boost::posix_time::milliseconds period,
boost::asio::deadline_timer &timer);
boost::asio::io_service &io_service_;
std::vector<std::shared_ptr<boost::asio::deadline_timer>> timers_;
};
} // namespace ray