[resource scheduler] unify the GetBestSchedulableNode into one public method. (#22560)

* clean up cluster resource scheduler

* address comments

* always prioritize local node when spill back waiting tasks

* address comments
This commit is contained in:
Chen Shen 2022-02-25 01:09:21 -08:00 committed by GitHub
parent b2b442297e
commit 89aaa79ee9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 87 additions and 110 deletions

View file

@ -281,4 +281,24 @@ bool ClusterResourceScheduler::IsSchedulableOnNode(
cluster_resource_manager_->GetNodeResources(node_name));
}
std::string ClusterResourceScheduler::GetBestSchedulableNode(
const TaskSpecification &task_spec, bool prioritize_local_node,
bool exclude_local_node, bool requires_object_store_memory, bool *is_infeasible) {
// If the local node is available, we should directly return it instead of
// going through the full hybrid policy since we don't want spillback.
if (prioritize_local_node && !exclude_local_node &&
IsSchedulableOnNode(string_to_int_map_.Get(local_node_id_),
task_spec.GetRequiredResources().GetResourceMap())) {
*is_infeasible = false;
return string_to_int_map_.Get(local_node_id_);
}
// This argument is used to set violation, which is an unsupported feature now.
int64_t _unused;
return GetBestSchedulableNode(
task_spec.GetRequiredPlacementResources().GetResourceMap(),
task_spec.GetMessage().scheduling_strategy(), requires_object_store_memory,
task_spec.IsActorCreationTask(), exclude_local_node, &_unused, is_infeasible);
}
} // namespace ray

View file

@ -28,6 +28,7 @@
#include "ray/raylet/scheduling/cluster_resource_data.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"
#include "ray/raylet/scheduling/fixed_point.h"
#include "ray/raylet/scheduling/internal.h"
#include "ray/raylet/scheduling/local_resource_manager.h"
#include "ray/raylet/scheduling/scheduling_ids.h"
#include "ray/raylet/scheduling/scheduling_policy.h"
@ -64,36 +65,21 @@ class ClusterResourceScheduler {
/// Find a node in the cluster on which we can schedule a given resource request.
/// In hybrid mode, see `scheduling_policy.h` for a description of the policy.
///
/// \param resource_request: Task to be scheduled.
/// \param scheduling_strategy: Strategy about how to schedule this task.
/// \param actor_creation: True if this is an actor creation task.
/// \param force_spillback: For non-actor creation requests, pick a remote
/// feasible node. If this is false, then the task may be scheduled to the
/// local node.
/// \param violations: The number of soft constraint violations associated
/// with the node returned by this function (assuming
/// a node that can schedule resource_request is found).
/// \param is_infeasible[in]: It is set true if the task is not schedulable because it
/// \param task_spec: Task/Actor to be scheduled.
/// \param prioritize_local_node: true if we want to try out local node first.
/// \param exclude_local_node: true if we want to avoid local node. This will cancel
/// prioritize_local_node if set to true.
/// \param requires_object_store_memory: take object store memory usage as part of
/// scheduling decision.
/// \param is_infeasible[out]: It is set true if the task is not schedulable because it
/// is infeasible.
///
/// \return -1, if no node can schedule the current request; otherwise,
/// return the ID of a node that can schedule the resource request.
int64_t GetBestSchedulableNode(const ResourceRequest &resource_request,
const rpc::SchedulingStrategy &scheduling_strategy,
bool actor_creation, bool force_spillback,
int64_t *violations, bool *is_infeasible);
/// Similar to
/// int64_t GetBestSchedulableNode(...)
/// but the return value is different:
/// \return "", if no node can schedule the current request; otherwise,
/// return the ID in string format of a node that can schedule the
// resource request.
std::string GetBestSchedulableNode(
const absl::flat_hash_map<std::string, double> &resource_request,
const rpc::SchedulingStrategy &scheduling_strategy,
bool requires_object_store_memory, bool actor_creation, bool force_spillback,
int64_t *violations, bool *is_infeasible);
/// \return emptry string, if no node can schedule the current request; otherwise,
/// return the string name of a node that can schedule the resource request.
std::string GetBestSchedulableNode(const TaskSpecification &task_spec,
bool prioritize_local_node, bool exclude_local_node,
bool requires_object_store_memory,
bool *is_infeasible);
/// Subtract the resources required by a given resource request (resource_request) from
/// a given remote node.
@ -150,6 +136,38 @@ class ClusterResourceScheduler {
bool IsSchedulable(const ResourceRequest &resource_request, int64_t node_id,
const NodeResources &resources) const;
/// Find a node in the cluster on which we can schedule a given resource request.
/// In hybrid mode, see `scheduling_policy.h` for a description of the policy.
///
/// \param resource_request: Task to be scheduled.
/// \param scheduling_strategy: Strategy about how to schedule this task.
/// \param actor_creation: True if this is an actor creation task.
/// \param force_spillback: True if we want to avoid local node.
/// \param violations: The number of soft constraint violations associated
/// with the node returned by this function (assuming
/// a node that can schedule resource_request is found).
/// \param is_infeasible[in]: It is set true if the task is not schedulable because it
/// is infeasible.
///
/// \return -1, if no node can schedule the current request; otherwise,
/// return the ID of a node that can schedule the resource request.
int64_t GetBestSchedulableNode(const ResourceRequest &resource_request,
const rpc::SchedulingStrategy &scheduling_strategy,
bool actor_creation, bool force_spillback,
int64_t *violations, bool *is_infeasible);
/// Similar to
/// int64_t GetBestSchedulableNode(...)
/// but the return value is different:
/// \return "", if no node can schedule the current request; otherwise,
/// return the ID in string format of a node that can schedule the
// resource request.
std::string GetBestSchedulableNode(
const absl::flat_hash_map<std::string, double> &resource_request,
const rpc::SchedulingStrategy &scheduling_strategy,
bool requires_object_store_memory, bool actor_creation, bool force_spillback,
int64_t *violations, bool *is_infeasible);
/// Keep the mapping between node and resource IDs in string representation
/// to integer representation. Used for improving map performance.
StringIdMap string_to_int_map_;

View file

@ -77,7 +77,10 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() {
RayTask task = work->task;
RAY_LOG(DEBUG) << "Scheduling pending task "
<< task.GetTaskSpecification().TaskId();
std::string node_id_string = GetBestSchedulableNode(*work, &is_infeasible);
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
task.GetTaskSpecification(), work->PrioritizeLocalNode(),
/*exclude_local_node*/ false,
/*requires_object_store_memory*/ false, &is_infeasible);
// There is no node that has available resources to run the request.
// Move on to the next shape.
@ -126,7 +129,10 @@ void ClusterTaskManager::TryScheduleInfeasibleTask() {
RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:"
<< task.GetTaskSpecification().TaskId();
bool is_infeasible;
std::string node_id_string = GetBestSchedulableNode(*work, &is_infeasible);
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
task.GetTaskSpecification(), work->PrioritizeLocalNode(),
/*exclude_local_node*/ false,
/*requires_object_store_memory*/ false, &is_infeasible);
// There is no node that has available resources to run the request.
// Move on to the next shape.
@ -304,32 +310,5 @@ void ClusterTaskManager::ScheduleOnNode(const NodeID &spillback_to,
send_reply_callback();
}
bool ClusterTaskManager::IsLocallySchedulable(const RayTask &task) const {
const auto &spec = task.GetTaskSpecification();
return cluster_resource_scheduler_->IsSchedulableOnNode(
self_node_id_.Binary(), spec.GetRequiredResources().GetResourceMap());
}
std::string ClusterTaskManager::GetBestSchedulableNode(const internal::Work &work,
bool *is_infeasible) {
// If the local node is available, we should directly return it instead of
// going through the full hybrid policy since we don't want spillback.
if ((work.grant_or_reject || work.is_selected_based_on_locality) &&
IsLocallySchedulable(work.task)) {
*is_infeasible = false;
return self_node_id_.Binary();
}
// This argument is used to set violation, which is an unsupported feature now.
int64_t _unused;
return cluster_resource_scheduler_->GetBestSchedulableNode(
work.task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(),
work.task.GetTaskSpecification().GetMessage().scheduling_strategy(),
/*requires_object_store_memory=*/false,
work.task.GetTaskSpecification().IsActorCreationTask(),
/*force_spillback=*/false, &_unused, is_infeasible);
}
} // namespace raylet
} // namespace ray

View file

@ -139,13 +139,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// The helper to dump the debug state of the cluster task manater.
std::string DebugStr() const override;
/// Check if there are enough available resources for the given input.
bool IsLocallySchedulable(const RayTask &task) const override;
private:
/// Helper method to get the best node for running the task.
std::string GetBestSchedulableNode(const internal::Work &work, bool *is_infeasible);
void TryScheduleInfeasibleTask();
// Schedule the task onto a node (which could be either remote or local).

View file

@ -83,9 +83,6 @@ class ClusterTaskManagerInterface {
/// Record the internal metrics.
virtual void RecordMetrics() const = 0;
/// Check if there are enough available resources for the given input.
virtual bool IsLocallySchedulable(const RayTask &task) const = 0;
};
} // namespace raylet
} // namespace ray

View file

@ -92,6 +92,10 @@ class Work {
UnscheduledWorkCause GetUnscheduledCause() const { return unscheduled_work_cause_; }
bool PrioritizeLocalNode() const {
return grant_or_reject || is_selected_based_on_locality;
}
private:
WorkStatus status_ = WorkStatus::WAITING;
UnscheduledWorkCause unscheduled_work_cause_ =

View file

@ -315,10 +315,11 @@ void LocalTaskManager::SpillWaitingTasks() {
// TODO(swang): The policy currently does not account for the amount of
// object store memory availability. Ideally, we should pick the node with
// the most memory availability.
std::string node_id_string =
GetBestSchedulableNode(*(*it),
/*spill_waiting_task=*/true,
/*force_spillback=*/force_spillback, &is_infeasible);
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
(*it)->task.GetTaskSpecification(),
/*prioritize_local_node*/ true,
/*exclude_local_node*/ force_spillback,
/*requires_object_store_memory*/ true, &is_infeasible);
if (!node_id_string.empty() && node_id_string != self_node_id_.Binary()) {
NodeID node_id = NodeID::FromBinary(node_id_string);
Spillback(node_id, *it);
@ -346,10 +347,10 @@ void LocalTaskManager::SpillWaitingTasks() {
bool LocalTaskManager::TrySpillback(const std::shared_ptr<internal::Work> &work,
bool &is_infeasible) {
std::string node_id_string =
GetBestSchedulableNode(*work,
/*spill_waiting_task=*/false,
/*force_spillback=*/false, &is_infeasible);
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
work->task.GetTaskSpecification(), work->PrioritizeLocalNode(),
/*exclude_local_node*/ false,
/*requires_object_store_memory*/ false, &is_infeasible);
if (is_infeasible || node_id_string == self_node_id_.Binary() ||
node_id_string.empty()) {
@ -981,12 +982,6 @@ bool LocalTaskManager::ReturnCpuResourcesToBlockedWorker(
return false;
}
bool LocalTaskManager::IsLocallySchedulable(const RayTask &task) const {
const auto &spec = task.GetTaskSpecification();
return cluster_resource_scheduler_->IsSchedulableOnNode(
self_node_id_.Binary(), spec.GetRequiredResources().GetResourceMap());
}
ResourceSet LocalTaskManager::CalcNormalTaskResources() const {
absl::flat_hash_map<std::string, FixedPoint> total_normal_task_resources;
const auto &string_id_map = cluster_resource_scheduler_->GetStringIdMap();
@ -1035,28 +1030,5 @@ uint64_t LocalTaskManager::MaxRunningTasksPerSchedulingClass(
return static_cast<uint64_t>(std::round(total_cpus / cpu_req));
}
std::string LocalTaskManager::GetBestSchedulableNode(const internal::Work &work,
bool spill_waiting_task,
bool force_spillback,
bool *is_infeasible) {
// If the local node is available, we should directly return it instead of
// going through the full hybrid policy since we don't want spillback.
if ((work.grant_or_reject || work.is_selected_based_on_locality ||
spill_waiting_task) &&
!force_spillback && IsLocallySchedulable(work.task)) {
*is_infeasible = false;
return self_node_id_.Binary();
}
// This argument is used to set violation, which is an unsupported feature now.
int64_t _unused;
return cluster_resource_scheduler_->GetBestSchedulableNode(
work.task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(),
work.task.GetTaskSpecification().GetMessage().scheduling_strategy(),
/*requires_object_store_memory=*/spill_waiting_task,
work.task.GetTaskSpecification().IsActorCreationTask(), force_spillback, &_unused,
is_infeasible);
}
} // namespace raylet
} // namespace ray

View file

@ -156,9 +156,6 @@ class LocalTaskManager {
/// false.
bool ReturnCpuResourcesToBlockedWorker(std::shared_ptr<WorkerInterface> worker);
/// Check if there are enough available resources for the given input.
bool IsLocallySchedulable(const RayTask &task) const;
/// Calculate normal task resources.
ResourceSet CalcNormalTaskResources() const;
@ -209,10 +206,6 @@ class LocalTaskManager {
/// should be running (or blocked) at once.
uint64_t MaxRunningTasksPerSchedulingClass(SchedulingClass sched_cls_id) const;
/// Helper method to get the best node for running the task.
std::string GetBestSchedulableNode(const internal::Work &work, bool spill_waiting_task,
bool force_spillback, bool *is_infeasible);
/// Recompute the debug stats.
/// It is needed because updating the debug state is expensive for cluster_task_manager.
/// TODO(sang): Update the internal states value dynamically instead of iterating the