Add automatic local GC and plasma debug logs every 10 minutes by default (#12804)

This commit is contained in:
Eric Liang 2020-12-11 17:09:58 -08:00 committed by GitHub
parent abb1eefdc2
commit 1ce745cf44
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 89 additions and 20 deletions

View file

@ -15,6 +15,43 @@ from ray.internal.internal_api import global_gc
logger = logging.getLogger(__name__)
def test_auto_local_gc(shutdown_only):
ray.init(num_cpus=2, _system_config={"local_gc_interval_s": 1})
class ObjectWithCyclicRef:
def __init__(self):
self.loop = self
@ray.remote(num_cpus=1)
class GarbageHolder:
def __init__(self):
gc.disable()
x = ObjectWithCyclicRef()
self.garbage = weakref.ref(x)
def has_garbage(self):
return self.garbage() is not None
try:
gc.disable()
# Local driver.
local_ref = weakref.ref(ObjectWithCyclicRef())
# Remote workers.
actors = [GarbageHolder.remote() for _ in range(2)]
assert local_ref() is not None
assert all(ray.get([a.has_garbage.remote() for a in actors]))
def check_refs_gced():
return (local_ref() is None and
not any(ray.get([a.has_garbage.remote() for a in actors])))
wait_for_condition(check_refs_gced)
finally:
gc.enable()
def test_global_gc(shutdown_only):
cluster = ray.cluster_utils.Cluster()
for _ in range(2):

View file

@ -32,7 +32,7 @@ RAY_CONFIG(int64_t, ray_cookie, 0x5241590000000000)
/// The duration that a single handler on the event loop can take before a
/// warning is logged that the handler is taking too long.
RAY_CONFIG(int64_t, handler_warning_timeout_ms, 100)
RAY_CONFIG(int64_t, handler_warning_timeout_ms, 1000)
/// The duration between heartbeats sent by the raylets.
RAY_CONFIG(int64_t, raylet_heartbeat_timeout_milliseconds, 1000)
@ -248,6 +248,12 @@ RAY_CONFIG(int32_t, object_store_full_max_retries, 5)
/// This will be exponentially increased for each retry.
RAY_CONFIG(uint32_t, object_store_full_initial_delay_ms, 1000)
/// The amount of time to wait between logging plasma space usage debug messages.
RAY_CONFIG(uint64_t, object_store_usage_log_interval_s, 10 * 60)
/// The amount of time between automatic local Python GC triggers.
RAY_CONFIG(uint64_t, local_gc_interval_s, 10 * 60)
/// Duration to wait between retries for failed tasks.
RAY_CONFIG(uint32_t, task_retry_delay_ms, 5000)

View file

@ -162,9 +162,9 @@ void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
// TODO(swang): This assumes that all replies from the previous incarnation
// of the actor have been received. Fix this by setting an epoch for each
// actor task, so we can ignore completed tasks from old epochs.
RAY_LOG(INFO) << "Resetting caller starts at for actor " << actor_id << " from "
<< queue->second.caller_starts_at << " to "
<< queue->second.next_task_reply_position;
RAY_LOG(DEBUG) << "Resetting caller starts at for actor " << actor_id << " from "
<< queue->second.caller_starts_at << " to "
<< queue->second.next_task_reply_position;
queue->second.caller_starts_at = queue->second.next_task_reply_position;
ResendOutOfOrderTasks(actor_id);

View file

@ -48,7 +48,7 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_CHECK_OK(actor_creator_->AsyncCreateActor(
task_spec, [this, actor_id, task_id](Status status) {
if (status.ok()) {
RAY_LOG(INFO) << "Created actor, actor id = " << actor_id;
RAY_LOG(DEBUG) << "Created actor, actor id = " << actor_id;
task_finisher_->CompletePendingTask(task_id, rpc::PushTaskReply(),
rpc::Address());
} else {

View file

@ -133,6 +133,8 @@ PlasmaStore::PlasmaStore(boost::asio::io_service &main_service, std::string dire
external_store_(external_store),
spill_objects_callback_(spill_objects_callback),
delay_on_oom_ms_(delay_on_oom_ms),
usage_log_interval_ns_(RayConfig::instance().object_store_usage_log_interval_s() *
1e9),
create_request_queue_(
RayConfig::instance().object_store_full_max_retries(),
/*evict_if_full=*/RayConfig::instance().object_pinning_enabled(),
@ -258,6 +260,13 @@ uint8_t *PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, MEMFD_TYPE
RAY_CHECK(*fd != INVALID_FD);
*error = PlasmaError::OK;
}
auto now = absl::GetCurrentTimeNanos();
if (now - last_usage_log_ns_ > usage_log_interval_ns_) {
RAY_LOG(INFO) << "Object store current usage " << (PlasmaAllocator::Allocated() / 1e9)
<< " / " << (PlasmaAllocator::GetFootprintLimit() / 1e9) << " GB.";
last_usage_log_ns_ = now;
}
return pointer;
}

View file

@ -296,6 +296,12 @@ class PlasmaStore {
/// transient OOM error.
const uint32_t delay_on_transient_oom_ms_ = 10;
/// The amount of time to wait between logging space usage debug messages.
const uint64_t usage_log_interval_ns_;
/// The last time space usage was logged.
uint64_t last_usage_log_ns_ = 0;
/// A timer that is set when the first request in the queue is not
/// serviceable because there is not enough memory. The request will be
/// retried when this timer expires.

View file

@ -52,7 +52,7 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address,
wait_request,
[this, object_id](Status status, const rpc::WaitForObjectEvictionReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Worker failed. Unpinning object " << object_id;
RAY_LOG(DEBUG) << "Worker failed. Unpinning object " << object_id;
}
ReleaseFreedObject(object_id);
});

View file

@ -172,6 +172,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
}),
new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
last_local_gc_ns_(absl::GetCurrentTimeNanos()),
local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9),
record_metrics_period_(config.record_metrics_period_ms) {
RAY_LOG(INFO) << "Initializing NodeManager with ID " << self_node_id_;
RAY_CHECK(heartbeat_period_.count() > 0);
@ -528,9 +530,11 @@ void NodeManager::ReportResourceUsage() {
// Trigger local GC if needed. This throttles the frequency of local GC calls
// to at most once per heartbeat interval.
if (should_local_gc_) {
auto now = absl::GetCurrentTimeNanos();
if (should_local_gc_ || now - last_local_gc_ns_ > local_gc_interval_ns_) {
DoLocalGC();
should_local_gc_ = false;
last_local_gc_ns_ = now;
}
if (resources_data->resources_total_size() > 0 ||
@ -553,14 +557,14 @@ void NodeManager::DoLocalGC() {
for (const auto &driver : worker_pool_.GetAllRegisteredDrivers()) {
all_workers.push_back(driver);
}
RAY_LOG(WARNING) << "Sending local GC request to " << all_workers.size()
<< " workers. It is due to memory pressure on the local node.";
RAY_LOG(INFO) << "Sending Python GC request to " << all_workers.size()
<< " local workers to clean up Python cyclic references.";
for (const auto &worker : all_workers) {
rpc::LocalGCRequest request;
worker->rpc_client()->LocalGC(
request, [](const ray::Status &status, const rpc::LocalGCReply &r) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send local GC request: " << status.ToString();
RAY_LOG(DEBUG) << "Failed to send local GC request: " << status.ToString();
}
});
}
@ -3221,9 +3225,9 @@ void NodeManager::HandleGlobalGC(const rpc::GlobalGCRequest &request,
}
void NodeManager::TriggerGlobalGC() {
RAY_LOG(WARNING)
<< "Broadcasting global GC request to all raylets. This is usually because "
"clusters have memory pressure, and ray needs to GC unused memory.";
RAY_LOG(INFO) << "Broadcasting Python GC request to all raylets since the cluster "
<< "is low on object store memory. This removes Ray object refs "
<< "that are stuck in Python reference cycles.";
should_global_gc_ = true;
// We won't see our own request, so trigger local GC in the next heartbeat.
should_local_gc_ = true;

View file

@ -779,6 +779,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// on all local workers of this raylet.
bool should_local_gc_ = false;
/// The last time local GC was triggered.
int64_t last_local_gc_ns_ = 0;
/// The interval in nanoseconds between local GC automatic triggers.
const int64_t local_gc_interval_ns_ = 10 * 60 * 1e9;
/// These two classes make up the new scheduler. ClusterResourceScheduler is
/// responsible for maintaining a view of the cluster state w.r.t resource
/// usage. ClusterTaskManager is responsible for queuing, spilling back, and

View file

@ -714,11 +714,11 @@ void WorkerPool::TryKillingIdleWorkers() {
}
for (const auto &worker : workers_in_the_same_process) {
RAY_LOG(INFO) << "The worker pool has " << running_size
<< " registered workers which exceeds the soft limit of "
<< num_workers_soft_limit_ << ", and worker " << worker->WorkerId()
<< " with pid " << process.GetId()
<< " has been idle for a a while. Kill it.";
RAY_LOG(DEBUG) << "The worker pool has " << running_size
<< " registered workers which exceeds the soft limit of "
<< num_workers_soft_limit_ << ", and worker " << worker->WorkerId()
<< " with pid " << process.GetId()
<< " has been idle for a a while. Kill it.";
// To avoid object lost issue caused by forcibly killing, send an RPC request to the
// worker to allow it to do cleanup before exiting.
auto rpc_client = worker->rpc_client();

View file

@ -25,7 +25,8 @@ shared_ptr<CoreWorkerClientInterface> CoreWorkerClientPool::GetOrConnect(
auto connection = client_factory_(addr_proto);
client_map_[id] = connection;
RAY_LOG(INFO) << "Connected to " << addr_proto.ip_address() << ":" << addr_proto.port();
RAY_LOG(DEBUG) << "Connected to " << addr_proto.ip_address() << ":"
<< addr_proto.port();
return connection;
}

View file

@ -495,7 +495,7 @@ void Process::Kill() {
}
#endif
if (error) {
RAY_LOG(ERROR) << "Failed to kill process " << pid << " with error " << error
RAY_LOG(DEBUG) << "Failed to kill process " << pid << " with error " << error
<< ": " << error.message();
}
} else {