From 1ce745cf44b3614351eeeae4c2574347a25d0ce5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 11 Dec 2020 17:09:58 -0800 Subject: [PATCH] Add automatic local GC and plasma debug logs every 10 minutes by default (#12804) --- python/ray/tests/test_global_gc.py | 37 +++++++++++++++++++ src/ray/common/ray_config_def.h | 8 +++- .../transport/direct_actor_transport.cc | 6 +-- .../transport/direct_task_transport.cc | 2 +- src/ray/object_manager/plasma/store.cc | 9 +++++ src/ray/object_manager/plasma/store.h | 6 +++ src/ray/raylet/local_object_manager.cc | 2 +- src/ray/raylet/node_manager.cc | 18 +++++---- src/ray/raylet/node_manager.h | 6 +++ src/ray/raylet/worker_pool.cc | 10 ++--- src/ray/rpc/worker/core_worker_client_pool.cc | 3 +- src/ray/util/process.cc | 2 +- 12 files changed, 89 insertions(+), 20 deletions(-) diff --git a/python/ray/tests/test_global_gc.py b/python/ray/tests/test_global_gc.py index 685edacf0..247516450 100644 --- a/python/ray/tests/test_global_gc.py +++ b/python/ray/tests/test_global_gc.py @@ -15,6 +15,43 @@ from ray.internal.internal_api import global_gc logger = logging.getLogger(__name__) +def test_auto_local_gc(shutdown_only): + ray.init(num_cpus=2, _system_config={"local_gc_interval_s": 1}) + + class ObjectWithCyclicRef: + def __init__(self): + self.loop = self + + @ray.remote(num_cpus=1) + class GarbageHolder: + def __init__(self): + gc.disable() + x = ObjectWithCyclicRef() + self.garbage = weakref.ref(x) + + def has_garbage(self): + return self.garbage() is not None + + try: + gc.disable() + + # Local driver. + local_ref = weakref.ref(ObjectWithCyclicRef()) + + # Remote workers. + actors = [GarbageHolder.remote() for _ in range(2)] + assert local_ref() is not None + assert all(ray.get([a.has_garbage.remote() for a in actors])) + + def check_refs_gced(): + return (local_ref() is None and + not any(ray.get([a.has_garbage.remote() for a in actors]))) + + wait_for_condition(check_refs_gced) + finally: + gc.enable() + + def test_global_gc(shutdown_only): cluster = ray.cluster_utils.Cluster() for _ in range(2): diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 82faa3be9..3d9630a9d 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -32,7 +32,7 @@ RAY_CONFIG(int64_t, ray_cookie, 0x5241590000000000) /// The duration that a single handler on the event loop can take before a /// warning is logged that the handler is taking too long. -RAY_CONFIG(int64_t, handler_warning_timeout_ms, 100) +RAY_CONFIG(int64_t, handler_warning_timeout_ms, 1000) /// The duration between heartbeats sent by the raylets. RAY_CONFIG(int64_t, raylet_heartbeat_timeout_milliseconds, 1000) @@ -248,6 +248,12 @@ RAY_CONFIG(int32_t, object_store_full_max_retries, 5) /// This will be exponentially increased for each retry. RAY_CONFIG(uint32_t, object_store_full_initial_delay_ms, 1000) +/// The amount of time to wait between logging plasma space usage debug messages. +RAY_CONFIG(uint64_t, object_store_usage_log_interval_s, 10 * 60) + +/// The amount of time between automatic local Python GC triggers. +RAY_CONFIG(uint64_t, local_gc_interval_s, 10 * 60) + /// Duration to wait between retries for failed tasks. RAY_CONFIG(uint32_t, task_retry_delay_ms, 5000) diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 19d911de1..e266b0d94 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -162,9 +162,9 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id, // TODO(swang): This assumes that all replies from the previous incarnation // of the actor have been received. Fix this by setting an epoch for each // actor task, so we can ignore completed tasks from old epochs. - RAY_LOG(INFO) << "Resetting caller starts at for actor " << actor_id << " from " - << queue->second.caller_starts_at << " to " - << queue->second.next_task_reply_position; + RAY_LOG(DEBUG) << "Resetting caller starts at for actor " << actor_id << " from " + << queue->second.caller_starts_at << " to " + << queue->second.next_task_reply_position; queue->second.caller_starts_at = queue->second.next_task_reply_position; ResendOutOfOrderTasks(actor_id); diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 1d4cd40ef..c343c8d0f 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -48,7 +48,7 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RAY_CHECK_OK(actor_creator_->AsyncCreateActor( task_spec, [this, actor_id, task_id](Status status) { if (status.ok()) { - RAY_LOG(INFO) << "Created actor, actor id = " << actor_id; + RAY_LOG(DEBUG) << "Created actor, actor id = " << actor_id; task_finisher_->CompletePendingTask(task_id, rpc::PushTaskReply(), rpc::Address()); } else { diff --git a/src/ray/object_manager/plasma/store.cc b/src/ray/object_manager/plasma/store.cc index 21f50161e..1ff2dd29d 100644 --- a/src/ray/object_manager/plasma/store.cc +++ b/src/ray/object_manager/plasma/store.cc @@ -133,6 +133,8 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire external_store_(external_store), spill_objects_callback_(spill_objects_callback), delay_on_oom_ms_(delay_on_oom_ms), + usage_log_interval_ns_(RayConfig::instance().object_store_usage_log_interval_s() * + 1e9), create_request_queue_( RayConfig::instance().object_store_full_max_retries(), /*evict_if_full=*/RayConfig::instance().object_pinning_enabled(), @@ -258,6 +260,13 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE RAY_CHECK(*fd != INVALID_FD); *error = PlasmaError::OK; } + + auto now = absl::GetCurrentTimeNanos(); + if (now - last_usage_log_ns_ > usage_log_interval_ns_) { + RAY_LOG(INFO) << "Object store current usage " << (PlasmaAllocator::Allocated() / 1e9) + << " / " << (PlasmaAllocator::GetFootprintLimit() / 1e9) << " GB."; + last_usage_log_ns_ = now; + } return pointer; } diff --git a/src/ray/object_manager/plasma/store.h b/src/ray/object_manager/plasma/store.h index 4ea4ab51f..5ef2cd654 100644 --- a/src/ray/object_manager/plasma/store.h +++ b/src/ray/object_manager/plasma/store.h @@ -296,6 +296,12 @@ class PlasmaStore { /// transient OOM error. const uint32_t delay_on_transient_oom_ms_ = 10; + /// The amount of time to wait between logging space usage debug messages. + const uint64_t usage_log_interval_ns_; + + /// The last time space usage was logged. + uint64_t last_usage_log_ns_ = 0; + /// A timer that is set when the first request in the queue is not /// serviceable because there is not enough memory. The request will be /// retried when this timer expires. diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index bd0ca72b1..b42641a1e 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -52,7 +52,7 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address, wait_request, [this, object_id](Status status, const rpc::WaitForObjectEvictionReply &reply) { if (!status.ok()) { - RAY_LOG(WARNING) << "Worker failed. Unpinning object " << object_id; + RAY_LOG(DEBUG) << "Worker failed. Unpinning object " << object_id; } ReleaseFreedObject(object_id); }); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 84df06cc4..1e1f06104 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -172,6 +172,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self }), new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()), report_worker_backlog_(RayConfig::instance().report_worker_backlog()), + last_local_gc_ns_(absl::GetCurrentTimeNanos()), + local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), record_metrics_period_(config.record_metrics_period_ms) { RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_; RAY_CHECK(heartbeat_period_.count() > 0); @@ -528,9 +530,11 @@ void NodeManager::ReportResourceUsage() { // Trigger local GC if needed. This throttles the frequency of local GC calls // to at most once per heartbeat interval. - if (should_local_gc_) { + auto now = absl::GetCurrentTimeNanos(); + if (should_local_gc_ || now - last_local_gc_ns_ > local_gc_interval_ns_) { DoLocalGC(); should_local_gc_ = false; + last_local_gc_ns_ = now; } if (resources_data->resources_total_size() > 0 || @@ -553,14 +557,14 @@ void NodeManager::DoLocalGC() { for (const auto &driver : worker_pool_.GetAllRegisteredDrivers()) { all_workers.push_back(driver); } - RAY_LOG(WARNING) << "Sending local GC request to " << all_workers.size() - << " workers. It is due to memory pressure on the local node."; + RAY_LOG(INFO) << "Sending Python GC request to " << all_workers.size() + << " local workers to clean up Python cyclic references."; for (const auto &worker : all_workers) { rpc::LocalGCRequest request; worker->rpc_client()->LocalGC( request, [](const ray::Status &status, const rpc::LocalGCReply &r) { if (!status.ok()) { - RAY_LOG(ERROR) << "Failed to send local GC request: " << status.ToString(); + RAY_LOG(DEBUG) << "Failed to send local GC request: " << status.ToString(); } }); } @@ -3221,9 +3225,9 @@ void NodeManager::HandleGlobalGC(const rpc::GlobalGCRequest &request, } void NodeManager::TriggerGlobalGC() { - RAY_LOG(WARNING) - << "Broadcasting global GC request to all raylets. This is usually because " - "clusters have memory pressure, and ray needs to GC unused memory."; + RAY_LOG(INFO) << "Broadcasting Python GC request to all raylets since the cluster " + << "is low on object store memory. This removes Ray object refs " + << "that are stuck in Python reference cycles."; should_global_gc_ = true; // We won't see our own request, so trigger local GC in the next heartbeat. should_local_gc_ = true; diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 810927dfa..7a31da54b 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -779,6 +779,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// on all local workers of this raylet. bool should_local_gc_ = false; + /// The last time local GC was triggered. + int64_t last_local_gc_ns_ = 0; + + /// The interval in nanoseconds between local GC automatic triggers. + const int64_t local_gc_interval_ns_ = 10 * 60 * 1e9; + /// These two classes make up the new scheduler. ClusterResourceScheduler is /// responsible for maintaining a view of the cluster state w.r.t resource /// usage. ClusterTaskManager is responsible for queuing, spilling back, and diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index a6640dc5b..a7ad5c245 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -714,11 +714,11 @@ void WorkerPool::TryKillingIdleWorkers() { } for (const auto &worker : workers_in_the_same_process) { - RAY_LOG(INFO) << "The worker pool has " << running_size - << " registered workers which exceeds the soft limit of " - << num_workers_soft_limit_ << ", and worker " << worker->WorkerId() - << " with pid " << process.GetId() - << " has been idle for a a while. Kill it."; + RAY_LOG(DEBUG) << "The worker pool has " << running_size + << " registered workers which exceeds the soft limit of " + << num_workers_soft_limit_ << ", and worker " << worker->WorkerId() + << " with pid " << process.GetId() + << " has been idle for a a while. Kill it."; // To avoid object lost issue caused by forcibly killing, send an RPC request to the // worker to allow it to do cleanup before exiting. auto rpc_client = worker->rpc_client(); diff --git a/src/ray/rpc/worker/core_worker_client_pool.cc b/src/ray/rpc/worker/core_worker_client_pool.cc index 794987dfc..8cb9d41b0 100644 --- a/src/ray/rpc/worker/core_worker_client_pool.cc +++ b/src/ray/rpc/worker/core_worker_client_pool.cc @@ -25,7 +25,8 @@ shared_ptr CoreWorkerClientPool::GetOrConnect( auto connection = client_factory_(addr_proto); client_map_[id] = connection; - RAY_LOG(INFO) << "Connected to " << addr_proto.ip_address() << ":" << addr_proto.port(); + RAY_LOG(DEBUG) << "Connected to " << addr_proto.ip_address() << ":" + << addr_proto.port(); return connection; } diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index 6a289ed05..0928c4402 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -495,7 +495,7 @@ void Process::Kill() { } #endif if (error) { - RAY_LOG(ERROR) << "Failed to kill process " << pid << " with error " << error + RAY_LOG(DEBUG) << "Failed to kill process " << pid << " with error " << error << ": " << error.message(); } } else {