mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[scheduler][monitoring] dump detailed spilling metrics (#23321)
Dump the detailed spilling metrics in scheduler.
This commit is contained in:
parent
aae144d7f9
commit
51bdefc2c8
4 changed files with 18 additions and 0 deletions
|
@ -228,6 +228,7 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() {
|
|||
internal::UnscheduledWorkCause::WAITING_FOR_RESOURCES_AVAILABLE);
|
||||
break;
|
||||
}
|
||||
num_unschedulable_task_spilled_++;
|
||||
if (!spec.GetDependencies().empty()) {
|
||||
task_dependency_manager_.RemoveTaskDependencies(
|
||||
task.GetTaskSpecification().TaskId());
|
||||
|
@ -337,6 +338,7 @@ void LocalTaskManager::SpillWaitingTasks() {
|
|||
task_dependency_manager_.RemoveTaskDependencies(
|
||||
task.GetTaskSpecification().TaskId());
|
||||
}
|
||||
num_waiting_task_spilled_++;
|
||||
waiting_tasks_index_.erase(task_id);
|
||||
it = waiting_task_queue_.erase(it);
|
||||
} else {
|
||||
|
@ -1059,6 +1061,10 @@ void LocalTaskManager::DebugStr(std::stringstream &buffer) const {
|
|||
buffer << "Waiting tasks size: " << waiting_tasks_index_.size() << "\n";
|
||||
buffer << "Number of executing tasks: " << executing_task_args_.size() << "\n";
|
||||
buffer << "Number of pinned task arguments: " << pinned_task_arguments_.size() << "\n";
|
||||
buffer << "Number of total spilled tasks: " << num_task_spilled_ << "\n";
|
||||
buffer << "Number of spilled waiting tasks: " << num_waiting_task_spilled_ << "\n";
|
||||
buffer << "Number of spilled unschedulable tasks: " << num_unschedulable_task_spilled_
|
||||
<< "\n";
|
||||
buffer << "Resource usage {\n";
|
||||
|
||||
// Calculates how much resources are occupied by tasks or actors.
|
||||
|
|
|
@ -183,6 +183,10 @@ class LocalTaskManager : public ILocalTaskManager {
|
|||
void DebugStr(std::stringstream &buffer) const override;
|
||||
|
||||
size_t GetNumTaskSpilled() const override { return num_task_spilled_; }
|
||||
size_t GetNumWaitingTaskSpilled() const override { return num_waiting_task_spilled_; }
|
||||
size_t GetNumUnschedulableTaskSpilled() const override {
|
||||
return num_unschedulable_task_spilled_;
|
||||
}
|
||||
|
||||
private:
|
||||
struct SchedulingClassInfo;
|
||||
|
@ -376,6 +380,8 @@ class LocalTaskManager : public ILocalTaskManager {
|
|||
const int64_t sched_cls_cap_max_ms_;
|
||||
|
||||
size_t num_task_spilled_ = 0;
|
||||
size_t num_waiting_task_spilled_ = 0;
|
||||
size_t num_unschedulable_task_spilled_ = 0;
|
||||
|
||||
friend class SchedulerResourceReporter;
|
||||
friend class ClusterTaskManagerTest;
|
||||
|
|
|
@ -68,6 +68,8 @@ class ILocalTaskManager {
|
|||
virtual void DebugStr(std::stringstream &buffer) const = 0;
|
||||
|
||||
virtual size_t GetNumTaskSpilled() const = 0;
|
||||
virtual size_t GetNumWaitingTaskSpilled() const = 0;
|
||||
virtual size_t GetNumUnschedulableTaskSpilled() const = 0;
|
||||
};
|
||||
} // namespace raylet
|
||||
} // namespace ray
|
||||
|
|
|
@ -138,6 +138,10 @@ void SchedulerStats::RecordMetrics() const {
|
|||
ray::stats::STATS_scheduler_tasks.Record(num_cancelled_tasks_, "Cancelled");
|
||||
ray::stats::STATS_scheduler_tasks.Record(num_tasks_to_dispatch_, "Dispatched");
|
||||
ray::stats::STATS_scheduler_tasks.Record(num_tasks_to_schedule_, "Received");
|
||||
ray::stats::STATS_scheduler_tasks.Record(local_task_manager_.GetNumWaitingTaskSpilled(),
|
||||
"SpilledWaiting");
|
||||
ray::stats::STATS_scheduler_tasks.Record(
|
||||
local_task_manager_.GetNumUnschedulableTaskSpilled(), "SpilledUnschedulable");
|
||||
|
||||
/// Pending task count.
|
||||
ray::stats::STATS_scheduler_unscheduleable_tasks.Record(num_infeasible_tasks_,
|
||||
|
|
Loading…
Add table
Reference in a new issue