[GCS] integrate cluster_resource_manager into gcs_resource_manager and gcs_resource_scheduler (#23105)

* refactor gcs_resource_manager

* fix lint error

* fix lint error

* fix compile error

* fix test

* fix test

* fix test

* add unit test

* refactor UpdateNodeNormalTaskResources

* fix comment

Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
This commit is contained in:
ZhuSenlin 2022-03-17 07:27:14 +08:00 committed by GitHub
parent ce71c5bbbd
commit 125ef0e5a6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 585 additions and 484 deletions

View file

@ -551,6 +551,7 @@ cc_library(
":node_manager_rpc",
":pubsub_lib",
":raylet_client_lib",
":scheduler",
":worker_rpc",
"@com_google_absl//absl/container:btree",
],

View file

@ -599,6 +599,9 @@ TEST_P(GcsClientTest, TestNodeResourceUsageWithLightResourceUsageReport) {
TEST_P(GcsClientTest, TestGetAllAvailableResources) {
// Register node.
auto node_info = Mocker::GenNodeInfo();
node_info->mutable_resources_total()->insert({"CPU", 1.0});
node_info->mutable_resources_total()->insert({"GPU", 10.0});
RAY_CHECK(RegisterNode(*node_info));
// Report resource usage of a node to GCS.
@ -609,6 +612,8 @@ TEST_P(GcsClientTest, TestGetAllAvailableResources) {
resource->set_resources_available_changed(true);
(*resource->mutable_resources_available())["CPU"] = 1.0;
(*resource->mutable_resources_available())["GPU"] = 10.0;
(*resource->mutable_resources_total())["CPU"] = 1.0;
(*resource->mutable_resources_total())["GPU"] = 10.0;
ASSERT_TRUE(ReportResourceUsage(resource));
// Assert get all available resources right.
@ -622,6 +627,9 @@ TEST_P(GcsClientTest, TestGetAllAvailableResources) {
TEST_P(GcsClientTest, TestGetAllAvailableResourcesWithLightResourceUsageReport) {
// Register node.
auto node_info = Mocker::GenNodeInfo();
node_info->mutable_resources_total()->insert({"CPU", 1.0});
node_info->mutable_resources_total()->insert({"GPU", 10.0});
RAY_CHECK(RegisterNode(*node_info));
// Report resource usage of a node to GCS.
@ -631,6 +639,8 @@ TEST_P(GcsClientTest, TestGetAllAvailableResourcesWithLightResourceUsageReport)
resource->set_resources_available_changed(true);
(*resource->mutable_resources_available())["CPU"] = 1.0;
(*resource->mutable_resources_available())["GPU"] = 10.0;
(*resource->mutable_resources_total())["CPU"] = 1.0;
(*resource->mutable_resources_total())["GPU"] = 10.0;
ASSERT_TRUE(ReportResourceUsage(resource));
// Assert get all available resources right.

View file

@ -168,6 +168,8 @@ TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) {
ASSERT_EQ(resource_usage_batch_data.batch_size(), 0);
auto node_table_data = Mocker::GenNodeInfo();
node_table_data->mutable_resources_total()->insert({"CPU", 1});
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncRegister(
*node_table_data, [&promise](Status status) { promise.set_value(status.ok()); }));

View file

@ -36,12 +36,13 @@ GcsBasedActorScheduler::GcsBasedActorScheduler(
instrumented_io_context &io_context,
GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler,
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory)
rpc::ClientFactoryFn client_factory,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback)
: GcsActorScheduler(io_context,
gcs_actor_table,
gcs_node_manager,
@ -49,8 +50,8 @@ GcsBasedActorScheduler::GcsBasedActorScheduler(
schedule_success_handler,
raylet_client_pool,
client_factory),
gcs_resource_manager_(std::move(gcs_resource_manager)),
gcs_resource_scheduler_(std::move(gcs_resource_scheduler)) {}
gcs_resource_scheduler_(std::move(gcs_resource_scheduler)),
normal_task_resources_changed_callback_(normal_task_resources_changed_callback) {}
NodeID GcsBasedActorScheduler::SelectNode(std::shared_ptr<GcsActor> actor) {
if (actor->GetActorWorkerAssignment()) {
@ -96,12 +97,12 @@ GcsBasedActorScheduler::AllocateNewActorWorkerAssignment(
// Create a new gcs actor worker assignment.
auto gcs_actor_worker_assignment = std::make_unique<GcsActorWorkerAssignment>(
selected_node_id, required_resources, is_shared);
NodeID::FromBinary(selected_node_id.Binary()), required_resources, is_shared);
return gcs_actor_worker_assignment;
}
NodeID GcsBasedActorScheduler::AllocateResources(
scheduling::NodeID GcsBasedActorScheduler::AllocateResources(
const ResourceRequest &required_resources) {
auto selected_nodes =
gcs_resource_scheduler_->Schedule({required_resources}, SchedulingType::SPREAD)
@ -110,33 +111,35 @@ NodeID GcsBasedActorScheduler::AllocateResources(
if (selected_nodes.size() == 0) {
RAY_LOG(INFO)
<< "Scheduling resources failed, schedule type = SchedulingType::SPREAD";
return NodeID::Nil();
return scheduling::NodeID::Nil();
}
RAY_CHECK(selected_nodes.size() == 1);
auto selected_node_id = selected_nodes[0];
if (!selected_node_id.IsNil()) {
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
// Acquire the resources from the selected node.
RAY_CHECK(
gcs_resource_manager_->AcquireResources(selected_node_id, required_resources));
RAY_CHECK(cluster_resource_manager.SubtractNodeAvailableResources(
selected_node_id, required_resources));
}
return selected_node_id;
}
NodeID GcsBasedActorScheduler::GetHighestScoreNodeResource(
scheduling::NodeID GcsBasedActorScheduler::GetHighestScoreNodeResource(
const ResourceRequest &required_resources) const {
const auto &cluster_map = gcs_resource_manager_->GetClusterResources();
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
const auto &resource_view = cluster_resource_manager.GetResourceView();
/// Get the highest score node
LeastResourceScorer scorer;
double highest_score = std::numeric_limits<double>::lowest();
auto highest_score_node = NodeID::Nil();
for (const auto &pair : cluster_map) {
auto highest_score_node = scheduling::NodeID::Nil();
for (const auto &pair : resource_view) {
double least_resource_val =
scorer.Score(required_resources, pair.second->GetLocalView());
scorer.Score(required_resources, pair.second.GetLocalView());
if (least_resource_val > highest_score) {
highest_score = least_resource_val;
highest_score_node = pair.first;
@ -148,14 +151,14 @@ NodeID GcsBasedActorScheduler::GetHighestScoreNodeResource(
void GcsBasedActorScheduler::WarnResourceAllocationFailure(
const TaskSpecification &task_spec, const ResourceRequest &required_resources) const {
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
auto scheduling_node_id = GetHighestScoreNodeResource(required_resources);
const NodeResources *scheduling_resource = nullptr;
auto iter = gcs_resource_manager_->GetClusterResources().find(scheduling_node_id);
if (iter != gcs_resource_manager_->GetClusterResources().end()) {
scheduling_resource = iter->second->GetMutableLocalView();
const NodeResources *node_resources = nullptr;
if (!scheduling_node_id.IsNil()) {
node_resources = &cluster_resource_manager.GetNodeResources(scheduling_node_id);
}
std::string scheduling_resource_str =
scheduling_resource ? scheduling_resource->DebugString() : "None";
std::string node_resources_str =
node_resources ? node_resources->DebugString() : "None";
// Return nullptr if the cluster resources are not enough.
RAY_LOG(WARNING) << "No enough resources for creating actor "
<< task_spec.ActorCreationId()
@ -163,9 +166,11 @@ void GcsBasedActorScheduler::WarnResourceAllocationFailure(
<< "\nRequired resources: " << required_resources.DebugString()
<< "\nThe node with the most resources is:"
<< "\n Node id: " << scheduling_node_id
<< "\n Node resources: " << scheduling_resource_str;
<< "\n Node resources: " << node_resources_str;
RAY_LOG(DEBUG) << "Cluster resources: " << gcs_resource_manager_->ToString();
std::stringstream ostr;
cluster_resource_manager.DebugString(ostr);
RAY_LOG(DEBUG) << "Cluster resources: " << ostr.str();
}
void GcsBasedActorScheduler::HandleWorkerLeaseReply(
@ -232,9 +237,12 @@ void GcsBasedActorScheduler::HandleWorkerLeaseRejectedReply(
std::shared_ptr<GcsActor> actor, const rpc::RequestWorkerLeaseReply &reply) {
// The request was rejected because of insufficient resources.
auto node_id = actor->GetNodeID();
gcs_resource_manager_->UpdateNodeNormalTaskResources(node_id, reply.resources_data());
gcs_resource_manager_->ReleaseResources(
actor->GetActorWorkerAssignment()->GetNodeID(),
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
if (normal_task_resources_changed_callback_) {
normal_task_resources_changed_callback_(node_id, reply.resources_data());
}
cluster_resource_manager.AddNodeAvailableResources(
scheduling::NodeID(actor->GetActorWorkerAssignment()->GetNodeID().Binary()),
actor->GetActorWorkerAssignment()->GetResources());
actor->UpdateAddress(rpc::Address());
actor->SetActorWorkerAssignment(nullptr);
@ -253,8 +261,9 @@ void GcsBasedActorScheduler::NotifyClusterResourcesChanged() {
}
void GcsBasedActorScheduler::ResetActorWorkerAssignment(GcsActor *actor) {
if (gcs_resource_manager_->ReleaseResources(
actor->GetActorWorkerAssignment()->GetNodeID(),
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
if (cluster_resource_manager.AddNodeAvailableResources(
scheduling::NodeID(actor->GetActorWorkerAssignment()->GetNodeID().Binary()),
actor->GetActorWorkerAssignment()->GetResources())) {
NotifyClusterResourcesChanged();
};

View file

@ -22,9 +22,10 @@
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
#include "ray/gcs/gcs_server/gcs_actor_scheduler.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/gcs/gcs_server/gcs_resource_scheduler.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"
#include "ray/raylet/scheduling/scheduling_ids.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
@ -70,7 +71,6 @@ class GcsBasedActorScheduler : public GcsActorScheduler {
/// \param io_context The main event loop.
/// \param gcs_actor_table Used to flush actor info to storage.
/// \param gcs_node_manager The node manager which is used when scheduling.
/// \param gcs_resource_manager The resource manager that maintains cluster resources.
/// \param gcs_resource_scheduler The scheduler to select nodes based on cluster
/// resources.
/// \param schedule_failure_handler Invoked when there are no available nodes to
@ -84,12 +84,13 @@ class GcsBasedActorScheduler : public GcsActorScheduler {
instrumented_io_context &io_context,
GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler,
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory = nullptr);
rpc::ClientFactoryFn client_factory = nullptr,
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback = nullptr);
virtual ~GcsBasedActorScheduler() = default;
@ -140,9 +141,10 @@ class GcsBasedActorScheduler : public GcsActorScheduler {
///
/// \param required_resources The resources to be allocated.
/// \return ID of the node from which the resources are allocated.
NodeID AllocateResources(const ResourceRequest &required_resources);
scheduling::NodeID AllocateResources(const ResourceRequest &required_resources);
NodeID GetHighestScoreNodeResource(const ResourceRequest &required_resources) const;
scheduling::NodeID GetHighestScoreNodeResource(
const ResourceRequest &required_resources) const;
void WarnResourceAllocationFailure(const TaskSpecification &task_spec,
const ResourceRequest &required_resources) const;
@ -158,13 +160,15 @@ class GcsBasedActorScheduler : public GcsActorScheduler {
/// Notify that the cluster resources are changed.
void NotifyClusterResourcesChanged();
std::shared_ptr<GcsResourceManager> gcs_resource_manager_;
/// The resource changed listeners.
std::vector<std::function<void()>> resource_changed_listeners_;
/// Gcs resource scheduler
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler_;
/// Normal task resources changed callback.
std::function<void(const NodeID &, const rpc::ResourcesData &)>
normal_task_resources_changed_callback_;
};
} // namespace gcs
} // namespace ray

View file

@ -77,14 +77,15 @@ std::vector<ResourceRequest> GcsScheduleStrategy::GetRequiredResourcesFromBundle
ScheduleResult GcsScheduleStrategy::GenerateScheduleResult(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::vector<NodeID> &selected_nodes,
const std::vector<scheduling::NodeID> &selected_nodes,
const SchedulingResultStatus &status) {
ScheduleMap schedule_map;
if (status == SchedulingResultStatus::SUCCESS && !selected_nodes.empty()) {
RAY_CHECK(bundles.size() == selected_nodes.size());
int index = 0;
for (const auto &bundle : bundles) {
schedule_map[bundle->BundleId()] = selected_nodes[index++];
schedule_map[bundle->BundleId()] =
NodeID::FromBinary(selected_nodes[index++].Binary());
}
}
return std::make_pair(status, schedule_map);
@ -135,11 +136,11 @@ ScheduleResult GcsStrictSpreadStrategy::Schedule(
// in the next pr.
// Filter out the nodes already scheduled by this placement group.
absl::flat_hash_set<NodeID> nodes_in_use;
absl::flat_hash_set<scheduling::NodeID> nodes_in_use;
if (context->bundle_locations_.has_value()) {
const auto &bundle_locations = context->bundle_locations_.value();
for (auto &bundle : *bundle_locations) {
nodes_in_use.insert(bundle.second.first);
nodes_in_use.insert(scheduling::NodeID(bundle.second.first.Binary()));
}
}
@ -147,7 +148,7 @@ ScheduleResult GcsStrictSpreadStrategy::Schedule(
const auto &scheduling_result = gcs_resource_scheduler.Schedule(
required_resources,
SchedulingType::STRICT_SPREAD,
/*node_filter_func=*/[&nodes_in_use](const NodeID &node_id) {
/*node_filter_func=*/[&nodes_in_use](const scheduling::NodeID &node_id) {
return nodes_in_use.count(node_id) == 0;
});
return GenerateScheduleResult(
@ -655,8 +656,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
void GcsPlacementGroupScheduler::AcquireBundleResources(
const std::shared_ptr<BundleLocations> &bundle_locations) {
// Acquire bundle resources from gcs resources manager.
auto &cluster_resource_manager = gcs_resource_scheduler_.GetClusterResourceManager();
for (auto &bundle : *bundle_locations) {
gcs_resource_manager_.AcquireResources(bundle.second.first,
cluster_resource_manager.SubtractNodeAvailableResources(
scheduling::NodeID(bundle.second.first.Binary()),
bundle.second.second->GetRequiredResources());
}
}
@ -664,8 +667,10 @@ void GcsPlacementGroupScheduler::AcquireBundleResources(
void GcsPlacementGroupScheduler::ReturnBundleResources(
const std::shared_ptr<BundleLocations> &bundle_locations) {
// Release bundle resources to gcs resources manager.
auto &cluster_resource_manager = gcs_resource_scheduler_.GetClusterResourceManager();
for (auto &bundle : *bundle_locations) {
gcs_resource_manager_.ReleaseResources(bundle.second.first,
cluster_resource_manager.AddNodeAvailableResources(
scheduling::NodeID(bundle.second.first.Binary()),
bundle.second.second->GetRequiredResources());
}
}

View file

@ -22,6 +22,7 @@
#include "ray/gcs/gcs_server/gcs_resource_scheduler.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/gcs_server/ray_syncer.h"
#include "ray/raylet/scheduling/scheduling_ids.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"
@ -142,7 +143,7 @@ class GcsScheduleStrategy {
/// \return The scheduling result from the required resource.
ScheduleResult GenerateScheduleResult(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::vector<NodeID> &selected_nodes,
const std::vector<scheduling::NodeID> &selected_nodes,
const SchedulingResultStatus &status);
};

View file

@ -21,21 +21,20 @@ namespace ray {
namespace gcs {
GcsResourceManager::GcsResourceManager(
instrumented_io_context &main_io_service,
std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: periodical_runner_(main_io_service),
gcs_publisher_(gcs_publisher),
gcs_table_storage_(gcs_table_storage) {}
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
ClusterResourceManager &cluster_resource_manager)
: gcs_table_storage_(gcs_table_storage),
cluster_resource_manager_(cluster_resource_manager) {}
void GcsResourceManager::HandleGetResources(const rpc::GetResourcesRequest &request,
rpc::GetResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.node_id());
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
scheduling::NodeID node_id(request.node_id());
const auto &resource_view = cluster_resource_manager_.GetResourceView();
auto iter = resource_view.find(node_id);
if (iter != resource_view.end()) {
rpc::ResourceTableData resource_table_data;
const auto &node_resources = iter->second->GetLocalView();
const auto &node_resources = iter->second.GetLocalView();
for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) {
const auto &resource_value = node_resources.predefined_resources[i].total;
if (resource_value <= 0) {
@ -60,19 +59,21 @@ void GcsResourceManager::HandleGetResources(const rpc::GetResourcesRequest &requ
void GcsResourceManager::UpdateResources(
const NodeID &node_id, absl::flat_hash_map<std::string, double> changed_resources) {
RAY_LOG(DEBUG) << "Updating resources, node id = " << node_id;
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
// Update `cluster_scheduling_resources_`.
auto node_resources = iter->second->GetMutableLocalView();
scheduling::NodeID scheduling_node_id(node_id.Binary());
if (cluster_resource_manager_.ContainsNode(scheduling_node_id)) {
// Update `cluster_resource_manager_`.
for (const auto &[name, capacity] : changed_resources) {
UpdateResourceCapacity(node_resources, name, capacity);
cluster_resource_manager_.UpdateResourceCapacity(
scheduling_node_id, scheduling::ResourceID(name), capacity);
}
const auto &node_resources =
cluster_resource_manager_.GetNodeResources(scheduling_node_id);
// Update gcs storage.
rpc::ResourceMap resource_map;
for (size_t i = 0; i < node_resources->predefined_resources.size(); ++i) {
const auto &resource_value = node_resources->predefined_resources[i].total;
for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) {
const auto &resource_value = node_resources.predefined_resources[i].total;
if (resource_value <= 0) {
continue;
}
@ -81,7 +82,7 @@ void GcsResourceManager::UpdateResources(
(*resource_map.mutable_items())[resource_name].set_resource_capacity(
resource_value.Double());
}
for (const auto &[id, capacity] : node_resources->custom_resources) {
for (const auto &[id, capacity] : node_resources.custom_resources) {
const auto &resource_name = scheduling::ResourceID(id).Binary();
const auto &resource_value = capacity.total;
(*resource_map.mutable_items())[resource_name].set_resource_capacity(
@ -108,22 +109,26 @@ void GcsResourceManager::UpdateResources(
void GcsResourceManager::DeleteResources(const NodeID &node_id,
std::vector<std::string> resource_names) {
RAY_LOG(DEBUG) << "Deleting node resources, node id = " << node_id;
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
auto node_resources = iter->second->GetMutableLocalView();
// Update `cluster_scheduling_resources_`.
DeleteResources(node_resources, resource_names);
scheduling::NodeID scheduling_node_id(node_id.Binary());
if (cluster_resource_manager_.ContainsNode(scheduling_node_id)) {
// Update `cluster_resource_manager_`.
for (const auto &resource_name : resource_names) {
cluster_resource_manager_.DeleteResource(scheduling_node_id,
scheduling::ResourceID(resource_name));
}
const auto &node_resources =
cluster_resource_manager_.GetNodeResources(scheduling_node_id);
// Update gcs storage.
rpc::ResourceMap resource_map;
for (size_t i = 0; i < node_resources->predefined_resources.size(); ++i) {
for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) {
const auto &resource_name = scheduling::ResourceID(i).Binary();
if (std::find(resource_names.begin(), resource_names.end(), resource_name) !=
resource_names.end()) {
continue;
}
const auto &resource_value = node_resources->predefined_resources[i].total;
const auto &resource_value = node_resources.predefined_resources[i].total;
if (resource_value <= 0) {
continue;
}
@ -131,7 +136,7 @@ void GcsResourceManager::DeleteResources(const NodeID &node_id,
(*resource_map.mutable_items())[resource_name].set_resource_capacity(
resource_value.Double());
}
for (const auto &entry : node_resources->custom_resources) {
for (const auto &entry : node_resources.custom_resources) {
const auto &resource_name = scheduling::ResourceID(entry.first).Binary();
if (std::find(resource_names.begin(), resource_names.end(), resource_name) !=
resource_names.end()) {
@ -159,9 +164,9 @@ void GcsResourceManager::HandleGetAllAvailableResources(
const rpc::GetAllAvailableResourcesRequest &request,
rpc::GetAllAvailableResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {
for (const auto &node_resources_entry : cluster_scheduling_resources_) {
for (const auto &node_resources_entry : cluster_resource_manager_.GetResourceView()) {
const auto &node_id = node_resources_entry.first;
const auto &node_resources = node_resources_entry.second->GetLocalView();
const auto &node_resources = node_resources_entry.second.GetLocalView();
rpc::AvailableResources resource;
resource.set_node_id(node_id.Binary());
@ -196,8 +201,11 @@ void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
UpdateNodeNormalTaskResources(node_id, data);
} else {
if (node_resource_usages_.count(node_id) == 0 || data.resources_available_changed()) {
SetAvailableResources(node_id, MapFromProtobuf(data.resources_available()));
if (!cluster_resource_manager_.UpdateNodeAvailableResourcesIfExist(
scheduling::NodeID(node_id.Binary()), data)) {
RAY_LOG(INFO)
<< "[UpdateFromResourceReport]: received resource usage from unknown node id "
<< node_id;
}
}
@ -289,141 +297,39 @@ void GcsResourceManager::UpdateNodeResourceUsage(const NodeID &node_id,
}
void GcsResourceManager::Initialize(const GcsInitData &gcs_init_data) {
const auto &nodes = gcs_init_data.Nodes();
for (const auto &entry : nodes) {
for (const auto &entry : gcs_init_data.Nodes()) {
if (entry.second.state() == rpc::GcsNodeInfo::ALIVE) {
OnNodeAdd(entry.second);
}
}
const auto &cluster_resources = gcs_init_data.ClusterResources();
for (const auto &entry : cluster_resources) {
const auto &iter = cluster_scheduling_resources_.find(entry.first);
if (iter != cluster_scheduling_resources_.end()) {
auto node_resources = iter->second->GetMutableLocalView();
for (const auto &entry : gcs_init_data.ClusterResources()) {
scheduling::NodeID node_id(entry.first.Binary());
for (const auto &resource : entry.second.items()) {
UpdateResourceCapacity(
node_resources, resource.first, resource.second.resource_capacity());
}
}
}
}
const absl::flat_hash_map<NodeID, std::shared_ptr<Node>>
&GcsResourceManager::GetClusterResources() const {
return cluster_scheduling_resources_;
}
void GcsResourceManager::SetAvailableResources(
const NodeID &node_id, const absl::flat_hash_map<std::string, double> &resource_map) {
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
auto resources = ResourceMapToResourceRequest(resource_map,
/*requires_object_store_memory=*/false);
auto node_resources = iter->second->GetMutableLocalView();
for (size_t i = 0; i < node_resources->predefined_resources.size(); ++i) {
node_resources->predefined_resources[i].available =
resources.predefined_resources[i];
}
for (auto &entry : node_resources->custom_resources) {
auto it = resources.custom_resources.find(entry.first);
if (it != resources.custom_resources.end()) {
entry.second.available = it->second;
} else {
entry.second.available = 0.;
}
}
} else {
RAY_LOG(WARNING)
<< "Skip the setting of available resources of node " << node_id
<< " as it does not exist, maybe it is not registered yet or is already dead.";
}
}
void GcsResourceManager::DeleteResources(NodeResources *node_resources,
const std::vector<std::string> &resource_names) {
for (const auto &resource_name : resource_names) {
auto resource_id = scheduling::ResourceID(resource_name).ToInt();
if (resource_id == -1) {
continue;
}
if (resource_id >= 0 && resource_id < PredefinedResources_MAX) {
node_resources->predefined_resources[resource_id].total = 0;
node_resources->predefined_resources[resource_id].available = 0;
} else {
node_resources->custom_resources.erase(resource_id);
cluster_resource_manager_.UpdateResourceCapacity(
node_id,
scheduling::ResourceID(resource.first),
resource.second.resource_capacity());
}
}
}
void GcsResourceManager::OnNodeAdd(const rpc::GcsNodeInfo &node) {
auto node_id = NodeID::FromBinary(node.node_id());
if (!cluster_scheduling_resources_.contains(node_id)) {
absl::flat_hash_map<std::string, double> resource_mapping(
node.resources_total().begin(), node.resources_total().end());
// Update the cluster scheduling resources as new node is added.
cluster_scheduling_resources_.emplace(
node_id,
std::make_shared<Node>(
ResourceMapToNodeResources(resource_mapping, resource_mapping)));
if (!node.resources_total().empty()) {
scheduling::NodeID node_id(node.node_id());
for (const auto &entry : node.resources_total()) {
cluster_resource_manager_.UpdateResourceCapacity(
node_id, scheduling::ResourceID(entry.first), entry.second);
}
} else {
RAY_LOG(WARNING) << "The registered node " << NodeID::FromBinary(node.node_id())
<< " doesn't set the total resources.";
}
}
void GcsResourceManager::OnNodeDead(const NodeID &node_id) {
node_resource_usages_.erase(node_id);
cluster_scheduling_resources_.erase(node_id);
latest_resources_normal_task_timestamp_.erase(node_id);
}
bool GcsResourceManager::AcquireResources(const NodeID &node_id,
const ResourceRequest &required_resources) {
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
auto node_resources = iter->second->GetMutableLocalView();
if (!node_resources->IsAvailable(required_resources)) {
return false;
}
for (size_t i = 0; i < required_resources.predefined_resources.size(); ++i) {
node_resources->predefined_resources[i].available -=
required_resources.predefined_resources[i];
}
for (auto &entry : required_resources.custom_resources) {
node_resources->custom_resources[entry.first].available -= entry.second;
}
}
// If node dead, we will not find the node. This is a normal scenario, so it returns
// true.
return true;
}
bool GcsResourceManager::ReleaseResources(const NodeID &node_id,
const ResourceRequest &acquired_resources) {
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter != cluster_scheduling_resources_.end()) {
auto node_resources = iter->second->GetMutableLocalView();
RAY_CHECK(acquired_resources.predefined_resources.size() <=
node_resources->predefined_resources.size());
for (size_t i = 0; i < acquired_resources.predefined_resources.size(); ++i) {
node_resources->predefined_resources[i].available +=
acquired_resources.predefined_resources[i];
node_resources->predefined_resources[i].available =
std::min(node_resources->predefined_resources[i].available,
node_resources->predefined_resources[i].total);
}
for (auto &entry : acquired_resources.custom_resources) {
auto it = node_resources->custom_resources.find(entry.first);
if (it != node_resources->custom_resources.end()) {
it->second.available += entry.second;
it->second.available = std::min(it->second.available, it->second.total);
}
}
}
// If node dead, we will not find the node. This is a normal scenario, so it returns
// true.
return true;
cluster_resource_manager_.RemoveNode(scheduling::NodeID(node_id.Binary()));
}
void GcsResourceManager::UpdatePlacementGroupLoad(
@ -456,28 +362,8 @@ void GcsResourceManager::AddResourcesChangedListener(std::function<void()> liste
void GcsResourceManager::UpdateNodeNormalTaskResources(
const NodeID &node_id, const rpc::ResourcesData &heartbeat) {
auto iter = cluster_scheduling_resources_.find(node_id);
if (iter == cluster_scheduling_resources_.end()) {
return;
}
auto normal_task_resources =
ResourceMapToResourceRequest(MapFromProtobuf(heartbeat.resources_normal_task()),
/*requires_object_store_memory=*/false);
auto &local_normal_task_resources =
iter->second->GetMutableLocalView()->normal_task_resources;
if (heartbeat.resources_normal_task_changed() &&
heartbeat.resources_normal_task_timestamp() >
latest_resources_normal_task_timestamp_[node_id] &&
local_normal_task_resources != normal_task_resources) {
local_normal_task_resources.predefined_resources.resize(PredefinedResources_MAX);
for (size_t i = 0; i < PredefinedResources_MAX; ++i) {
local_normal_task_resources.predefined_resources[i] =
normal_task_resources.predefined_resources[i];
}
local_normal_task_resources.custom_resources = normal_task_resources.custom_resources;
latest_resources_normal_task_timestamp_[node_id] =
heartbeat.resources_normal_task_timestamp();
if (cluster_resource_manager_.UpdateNodeNormalTaskResources(
scheduling::NodeID(node_id.Binary()), heartbeat)) {
for (const auto &listener : resources_changed_listeners_) {
listener();
}
@ -490,53 +376,13 @@ std::string GcsResourceManager::ToString() const {
std::string indent_0(indent + 0 * 2, ' ');
std::string indent_1(indent + 1 * 2, ' ');
ostr << "{\n";
for (const auto &entry : cluster_scheduling_resources_) {
ostr << indent_1 << entry.first << " : " << entry.second->GetLocalView().DebugString()
for (const auto &entry : cluster_resource_manager_.GetResourceView()) {
ostr << indent_1 << entry.first << " : " << entry.second.GetLocalView().DebugString()
<< ",\n";
}
ostr << indent_0 << "}\n";
return ostr.str();
}
void GcsResourceManager::UpdateResourceCapacity(NodeResources *node_resources,
const std::string &resource_name,
double capacity) {
auto idx = scheduling::ResourceID(resource_name).ToInt();
if (idx == -1) {
return;
}
FixedPoint resource_total_fp(capacity);
if (idx >= 0 && idx < PredefinedResources_MAX) {
auto diff_capacity =
resource_total_fp - node_resources->predefined_resources[idx].total;
node_resources->predefined_resources[idx].total += diff_capacity;
node_resources->predefined_resources[idx].available += diff_capacity;
if (node_resources->predefined_resources[idx].available < 0) {
node_resources->predefined_resources[idx].available = 0;
}
if (node_resources->predefined_resources[idx].total < 0) {
node_resources->predefined_resources[idx].total = 0;
}
} else {
auto itr = node_resources->custom_resources.find(idx);
if (itr != node_resources->custom_resources.end()) {
auto diff_capacity = resource_total_fp - itr->second.total;
itr->second.total += diff_capacity;
itr->second.available += diff_capacity;
if (itr->second.available < 0) {
itr->second.available = 0;
}
if (itr->second.total < 0) {
itr->second.total = 0;
}
} else {
ResourceCapacity resource_capacity;
resource_capacity.total = resource_capacity.available = resource_total_fp;
node_resources->custom_resources.emplace(idx, resource_capacity);
}
}
}
} // namespace gcs
} // namespace ray

View file

@ -15,19 +15,31 @@
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/common/id.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/raylet/scheduling/cluster_resource_data.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
/// Ideally, the logic related to resource calculation should be moved from
/// `gcs_resoruce_manager` to `cluster_resource_manager`, and all logic related to
/// resource modification should directly depend on `cluster_resource_manager`, while
/// `gcs_resoruce_manager` is still responsible for processing resource-related RPC
/// request. We will split several small PR to achieve this goal, so as to prevent one PR
/// from being too large to review.
///
/// 1). Remove `node_resource_usages_` related code as it could be calculated from
/// `cluseter_resource_mananger`
/// 2). Move all resource-write-related logic out from `gcs_resource_manager`
/// 3). Move `placement_group_load_` from `gcs_resource_manager` to
/// `placement_group_manager` and make `gcs_resource_manager` depend on
/// `placement_group_manager`
/// Gcs resource manager interface.
/// It is responsible for handing node resource related rpc requests and it is used for
/// actor and placement group scheduling. It obtains the available resources of nodes
@ -36,12 +48,9 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
public:
/// Create a GcsResourceManager.
///
/// \param main_io_service The main event loop.
/// \param gcs_publisher GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
explicit GcsResourceManager(instrumented_io_context &main_io_service,
std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage);
explicit GcsResourceManager(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
ClusterResourceManager &cluster_resource_manager);
virtual ~GcsResourceManager() {}
@ -66,11 +75,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
rpc::GetAllResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Get the resources of all nodes in the cluster.
///
/// \return The resources of all nodes in the cluster.
const absl::flat_hash_map<NodeID, std::shared_ptr<Node>> &GetClusterResources() const;
/// Update resources of a node
/// \param node_id Id of a node.
/// \param changed_resources The newly added resources for the node. Usually it's
@ -93,30 +97,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
/// \param node_id The specified node id.
void OnNodeDead(const NodeID &node_id);
/// Set the available resources of the specified node.
///
/// \param node_id Id of a node.
/// \param resources Available resources of a node.
void SetAvailableResources(
const NodeID &node_id,
const absl::flat_hash_map<std::string, double> &resource_map);
/// Acquire resources from the specified node. It will deduct directly from the node
/// resources.
///
/// \param node_id Id of a node.
/// \param required_resources Resources to apply for.
/// \return True if acquire resources successfully. False otherwise.
bool AcquireResources(const NodeID &node_id, const ResourceRequest &required_resources);
/// Release the resources of the specified node. It will be added directly to the node
/// resources.
///
/// \param node_id Id of a node.
/// \param acquired_resources Resources to release.
/// \return True if release resources successfully. False otherwise.
bool ReleaseResources(const NodeID &node_id, const ResourceRequest &acquired_resources);
/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
@ -153,34 +133,13 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
const std::shared_ptr<rpc::PlacementGroupLoad> placement_group_load);
private:
/// Delete the scheduling resources of the specified node.
///
/// \param node_resources Id of a node.
/// \param resource_names Deleted resources of a node.
void DeleteResources(NodeResources *node_resources,
const std::vector<std::string> &resource_names);
void UpdateResourceCapacity(NodeResources *node_resources,
const std::string &resource_name,
double capacity);
/// The runner to run function periodically.
PeriodicalRunner periodical_runner_;
/// Newest resource usage of all nodes.
absl::flat_hash_map<NodeID, rpc::ResourcesData> node_resource_usages_;
/// A publisher for publishing gcs messages.
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Storage for GCS tables.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// Map from node id to the scheduling resources of the node.
absl::flat_hash_map<NodeID, std::shared_ptr<Node>> cluster_scheduling_resources_;
/// Placement group load information that is used for autoscaler.
absl::optional<std::shared_ptr<rpc::PlacementGroupLoad>> placement_group_load_;
/// Normal task resources could be uploaded by 1) Raylets' periodical reporters; 2)
/// Rejected RequestWorkerLeaseReply. So we need the timestamps to decide whether an
/// upload is latest.
absl::flat_hash_map<NodeID, int64_t> latest_resources_normal_task_timestamp_;
/// The resources changed listeners.
std::vector<std::function<void()>> resources_changed_listeners_;
@ -195,6 +154,8 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
CountType_MAX = 6,
};
uint64_t counts_[CountType::CountType_MAX] = {0};
ClusterResourceManager &cluster_resource_manager_;
};
} // namespace gcs

View file

@ -111,7 +111,7 @@ double LeastResourceScorer::Calculate(const FixedPoint &requested,
SchedulingResult SortSchedulingResult(const SchedulingResult &result,
const std::vector<int> &sorted_index) {
if (result.first == SchedulingResultStatus::SUCCESS) {
std::vector<NodeID> sorted_nodes(result.second.size());
std::vector<scheduling::NodeID> sorted_nodes(result.second.size());
for (int i = 0; i < (int)sorted_index.size(); i++) {
sorted_nodes[sorted_index[i]] = result.second[i];
}
@ -124,12 +124,13 @@ SchedulingResult SortSchedulingResult(const SchedulingResult &result,
SchedulingResult GcsResourceScheduler::Schedule(
const std::vector<ResourceRequest> &required_resources_list,
const SchedulingType &scheduling_type,
const std::function<bool(const NodeID &)> &node_filter_func) {
const std::function<bool(const scheduling::NodeID &)> &node_filter_func) {
// Filter candidate nodes.
auto candidate_nodes = FilterCandidateNodes(node_filter_func);
if (candidate_nodes.empty()) {
RAY_LOG(DEBUG) << "The candidate nodes is empty, return directly.";
return std::make_pair(SchedulingResultStatus::INFEASIBLE, std::vector<NodeID>());
return std::make_pair(SchedulingResultStatus::INFEASIBLE,
std::vector<scheduling::NodeID>());
}
// If scheduling type is strict pack, we do not need to sort resources of each placement
@ -164,9 +165,9 @@ SchedulingResult GcsResourceScheduler::Schedule(
UNREACHABLE;
}
absl::flat_hash_set<NodeID> GcsResourceScheduler::FilterCandidateNodes(
const std::function<bool(const NodeID &)> &node_filter_func) {
absl::flat_hash_set<NodeID> result;
absl::flat_hash_set<scheduling::NodeID> GcsResourceScheduler::FilterCandidateNodes(
const std::function<bool(const scheduling::NodeID &)> &node_filter_func) {
absl::flat_hash_set<scheduling::NodeID> result;
const auto &resource_view = GetResourceView();
result.reserve(resource_view.size());
for (const auto &iter : resource_view) {
@ -239,25 +240,26 @@ std::vector<int> GcsResourceScheduler::SortRequiredResources(
SchedulingResult GcsResourceScheduler::StrictSpreadSchedule(
const std::vector<ResourceRequest> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes) {
if (required_resources_list.size() > candidate_nodes.size()) {
RAY_LOG(DEBUG) << "The number of required resources "
<< required_resources_list.size()
<< " is greater than the number of candidate nodes "
<< candidate_nodes.size() << ", scheduling fails.";
return std::make_pair(SchedulingResultStatus::INFEASIBLE, std::vector<NodeID>());
return std::make_pair(SchedulingResultStatus::INFEASIBLE,
std::vector<scheduling::NodeID>());
}
std::vector<NodeID> result_nodes;
absl::flat_hash_set<NodeID> candidate_nodes_copy(candidate_nodes);
std::vector<scheduling::NodeID> result_nodes;
absl::flat_hash_set<scheduling::NodeID> candidate_nodes_copy(candidate_nodes);
for (const auto &iter : required_resources_list) {
// Score and sort nodes.
auto best_node = GetBestNode(iter, candidate_nodes_copy);
// There are nodes to meet the scheduling requirements.
if (best_node) {
candidate_nodes_copy.erase(*best_node);
result_nodes.emplace_back(std::move(*best_node));
if (!best_node.IsNil()) {
candidate_nodes_copy.erase(best_node);
result_nodes.emplace_back(best_node);
} else {
// There is no node to meet the scheduling requirements.
break;
@ -266,32 +268,33 @@ SchedulingResult GcsResourceScheduler::StrictSpreadSchedule(
if (result_nodes.size() != required_resources_list.size()) {
// Can't meet the scheduling requirements temporarily.
return std::make_pair(SchedulingResultStatus::FAILED, std::vector<NodeID>());
return std::make_pair(SchedulingResultStatus::FAILED,
std::vector<scheduling::NodeID>());
}
return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes);
}
SchedulingResult GcsResourceScheduler::SpreadSchedule(
const std::vector<ResourceRequest> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result_nodes;
absl::flat_hash_set<NodeID> candidate_nodes_copy(candidate_nodes);
absl::flat_hash_set<NodeID> selected_nodes;
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes) {
std::vector<scheduling::NodeID> result_nodes;
absl::flat_hash_set<scheduling::NodeID> candidate_nodes_copy(candidate_nodes);
absl::flat_hash_set<scheduling::NodeID> selected_nodes;
for (const auto &iter : required_resources_list) {
// Score and sort nodes.
auto best_node = GetBestNode(iter, candidate_nodes_copy);
// There are nodes to meet the scheduling requirements.
if (best_node) {
result_nodes.emplace_back(std::move(*best_node));
if (!best_node.IsNil()) {
result_nodes.emplace_back(best_node);
RAY_CHECK(AllocateRemoteTaskResources(result_nodes.back(), iter));
candidate_nodes_copy.erase(result_nodes.back());
selected_nodes.insert(result_nodes.back());
} else {
// Scheduling from selected nodes.
auto best_node = GetBestNode(iter, selected_nodes);
if (best_node) {
result_nodes.push_back(std::move(*best_node));
if (!best_node.IsNil()) {
result_nodes.push_back(best_node);
RAY_CHECK(AllocateRemoteTaskResources(result_nodes.back(), iter));
} else {
break;
@ -299,19 +302,20 @@ SchedulingResult GcsResourceScheduler::SpreadSchedule(
}
}
// Releasing the resources temporarily deducted from `gcs_resource_manager_`.
// Releasing the resources temporarily deducted from `cluster_resource_manager_`.
ReleaseTemporarilyDeductedResources(required_resources_list, result_nodes);
if (result_nodes.size() != required_resources_list.size()) {
// Can't meet the scheduling requirements temporarily.
return std::make_pair(SchedulingResultStatus::FAILED, std::vector<NodeID>());
return std::make_pair(SchedulingResultStatus::FAILED,
std::vector<scheduling::NodeID>());
}
return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes);
}
SchedulingResult GcsResourceScheduler::StrictPackSchedule(
const std::vector<ResourceRequest> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes) {
// Aggregate required resources.
ResourceRequest aggregated_resource_request;
for (const auto &resource_request : required_resources_list) {
@ -329,35 +333,37 @@ SchedulingResult GcsResourceScheduler::StrictPackSchedule(
}
}
const auto &cluster_resource = GetResourceView();
const auto &resource_view = GetResourceView();
const auto &right_node_it = std::find_if(
cluster_resource.begin(),
cluster_resource.end(),
resource_view.begin(),
resource_view.end(),
[&aggregated_resource_request](const auto &entry) {
return entry.second->GetLocalView().IsAvailable(aggregated_resource_request);
return entry.second.GetLocalView().IsAvailable(aggregated_resource_request);
});
if (right_node_it == cluster_resource.end()) {
if (right_node_it == resource_view.end()) {
RAY_LOG(DEBUG) << "The required resource is bigger than the maximum resource in the "
"whole cluster, schedule failed.";
return std::make_pair(SchedulingResultStatus::INFEASIBLE, std::vector<NodeID>());
return std::make_pair(SchedulingResultStatus::INFEASIBLE,
std::vector<scheduling::NodeID>());
}
std::vector<NodeID> result_nodes;
std::vector<scheduling::NodeID> result_nodes;
auto best_node = GetBestNode(aggregated_resource_request, candidate_nodes);
// Select the node with the highest score.
// `StrictPackSchedule` does not need to consider the scheduling context, because it
// only schedules to a node and triggers rescheduling when node dead.
if (best_node) {
if (!best_node.IsNil()) {
for (int index = 0; index < (int)required_resources_list.size(); ++index) {
result_nodes.emplace_back(std::move(*best_node));
result_nodes.emplace_back(best_node);
}
}
if (result_nodes.empty()) {
// Can't meet the scheduling requirements temporarily.
return std::make_pair(SchedulingResultStatus::FAILED, std::vector<NodeID>());
return std::make_pair(SchedulingResultStatus::FAILED,
std::vector<scheduling::NodeID>());
}
return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes);
@ -365,10 +371,10 @@ SchedulingResult GcsResourceScheduler::StrictPackSchedule(
SchedulingResult GcsResourceScheduler::PackSchedule(
const std::vector<ResourceRequest> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
std::vector<NodeID> result_nodes;
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes) {
std::vector<scheduling::NodeID> result_nodes;
result_nodes.resize(required_resources_list.size());
absl::flat_hash_set<NodeID> candidate_nodes_copy(candidate_nodes);
absl::flat_hash_set<scheduling::NodeID> candidate_nodes_copy(candidate_nodes);
std::list<std::pair<int, ResourceRequest>> required_resources_list_copy;
int index = 0;
for (const auto &iter : required_resources_list) {
@ -379,64 +385,63 @@ SchedulingResult GcsResourceScheduler::PackSchedule(
const auto &required_resources_index = required_resources_list_copy.front().first;
const auto &required_resources = required_resources_list_copy.front().second;
auto best_node = GetBestNode(required_resources, candidate_nodes_copy);
if (!best_node) {
if (best_node.IsNil()) {
// There is no node to meet the scheduling requirements.
break;
}
RAY_CHECK(AllocateRemoteTaskResources(*best_node, required_resources));
result_nodes[required_resources_index] = *best_node;
RAY_CHECK(AllocateRemoteTaskResources(best_node, required_resources));
result_nodes[required_resources_index] = best_node;
required_resources_list_copy.pop_front();
// We try to schedule more resources on one node.
for (auto iter = required_resources_list_copy.begin();
iter != required_resources_list_copy.end();) {
if (AllocateRemoteTaskResources(*best_node, iter->second)) {
result_nodes[iter->first] = *best_node;
if (AllocateRemoteTaskResources(best_node, iter->second)) {
result_nodes[iter->first] = best_node;
required_resources_list_copy.erase(iter++);
} else {
++iter;
}
}
candidate_nodes_copy.erase(*best_node);
candidate_nodes_copy.erase(best_node);
}
// Releasing the resources temporarily deducted from `gcs_resource_manager_`.
// Releasing the resources temporarily deducted from `cluster_resource_manager_`.
ReleaseTemporarilyDeductedResources(required_resources_list, result_nodes);
if (!required_resources_list_copy.empty()) {
// Can't meet the scheduling requirements temporarily.
return std::make_pair(SchedulingResultStatus::FAILED, std::vector<NodeID>());
return std::make_pair(SchedulingResultStatus::FAILED,
std::vector<scheduling::NodeID>());
}
return std::make_pair(SchedulingResultStatus::SUCCESS, result_nodes);
}
std::optional<NodeID> GcsResourceScheduler::GetBestNode(
scheduling::NodeID GcsResourceScheduler::GetBestNode(
const ResourceRequest &required_resources,
const absl::flat_hash_set<NodeID> &candidate_nodes) {
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes) {
double best_node_score = -1;
const NodeID *best_node_id = nullptr;
auto best_node_id = scheduling::NodeID::Nil();
// Score the nodes.
for (const auto &node_id : candidate_nodes) {
const auto &node_resources = GetNodeResources(node_id);
double node_score = node_scorer_->Score(required_resources, node_resources);
if (best_node_id == nullptr || best_node_score < node_score) {
best_node_id = &node_id;
if (best_node_id.IsNil() || best_node_score < node_score) {
best_node_id = node_id;
best_node_score = node_score;
}
}
if (best_node_id && best_node_score >= 0) {
return *best_node_id;
} else {
return std::nullopt;
if (!best_node_id.IsNil() && best_node_score >= 0) {
return best_node_id;
}
return std::nullopt;
return scheduling::NodeID::Nil();
}
void GcsResourceScheduler::ReleaseTemporarilyDeductedResources(
const std::vector<ResourceRequest> &required_resources_list,
const std::vector<NodeID> &nodes) {
const std::vector<scheduling::NodeID> &nodes) {
for (int index = 0; index < (int)nodes.size(); index++) {
// If `PackSchedule` fails, the id of some nodes may be nil.
if (!nodes[index].IsNil()) {
@ -445,26 +450,30 @@ void GcsResourceScheduler::ReleaseTemporarilyDeductedResources(
}
}
const NodeResources &GcsResourceScheduler::GetNodeResources(const NodeID &node_id) const {
const auto &resource_view = GetResourceView();
auto iter = resource_view.find(node_id);
RAY_CHECK(iter != resource_view.end());
return iter->second->GetLocalView();
const NodeResources &GcsResourceScheduler::GetNodeResources(
const scheduling::NodeID &node_id) const {
return cluster_resource_manager_->GetNodeResources(
scheduling::NodeID(node_id.Binary()));
}
bool GcsResourceScheduler::AllocateRemoteTaskResources(
const NodeID &node_id, const ResourceRequest &resource_request) {
return gcs_resource_manager_.AcquireResources(node_id, resource_request);
const scheduling::NodeID &node_id, const ResourceRequest &resource_request) {
if (!cluster_resource_manager_->HasSufficientResource(
node_id, resource_request, /*ignore_object_store_memory_requirement=*/true)) {
return false;
}
return cluster_resource_manager_->SubtractNodeAvailableResources(node_id,
resource_request);
}
bool GcsResourceScheduler::ReleaseRemoteTaskResources(
const NodeID &node_id, const ResourceRequest &resource_request) {
return gcs_resource_manager_.ReleaseResources(node_id, resource_request);
const scheduling::NodeID &node_id, const ResourceRequest &resource_request) {
return cluster_resource_manager_->AddNodeAvailableResources(node_id, resource_request);
}
const absl::flat_hash_map<NodeID, std::shared_ptr<Node>>
const absl::flat_hash_map<scheduling::NodeID, Node>
&GcsResourceScheduler::GetResourceView() const {
return gcs_resource_manager_.GetClusterResources();
return cluster_resource_manager_->GetResourceView();
}
} // namespace gcs

View file

@ -20,6 +20,7 @@
#include "ray/common/id.h"
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/raylet/scheduling/cluster_resource_data.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"
namespace ray {
namespace gcs {
@ -48,8 +49,9 @@ enum class SchedulingResultStatus {
SUCCESS = 2,
};
typedef std::pair<NodeID, double> NodeScore;
typedef std::pair<SchedulingResultStatus, std::vector<NodeID>> SchedulingResult;
using NodeScore = std::pair<scheduling::NodeID, double>;
typedef std::pair<SchedulingResultStatus, std::vector<scheduling::NodeID>>
SchedulingResult;
/// NodeScorer is a scorer to make a grade to the node, which is used for scheduling
/// decision.
@ -87,8 +89,8 @@ class LeastResourceScorer : public NodeScorer {
/// Non-thread safe.
class GcsResourceScheduler {
public:
GcsResourceScheduler(GcsResourceManager &gcs_resource_manager)
: gcs_resource_manager_(gcs_resource_manager),
GcsResourceScheduler()
: cluster_resource_manager_(std::make_unique<ClusterResourceManager>()),
node_scorer_(new LeastResourceScorer()) {}
virtual ~GcsResourceScheduler() = default;
@ -106,7 +108,11 @@ class GcsResourceScheduler {
SchedulingResult Schedule(
const std::vector<ResourceRequest> &required_resources_list,
const SchedulingType &scheduling_type,
const std::function<bool(const NodeID &)> &node_filter_func = nullptr);
const std::function<bool(const scheduling::NodeID &)> &node_filter_func = nullptr);
ClusterResourceManager &GetClusterResourceManager() {
return *cluster_resource_manager_;
}
private:
/// Filter out candidate nodes which can be used for scheduling.
@ -116,8 +122,8 @@ class GcsResourceScheduler {
/// returns true, it can be used for scheduling. By default, all nodes in the cluster
/// can be used for scheduling.
/// \return The candidate nodes which can be used for scheduling.
absl::flat_hash_set<NodeID> FilterCandidateNodes(
const std::function<bool(const NodeID &)> &node_filter_func);
absl::flat_hash_set<scheduling::NodeID> FilterCandidateNodes(
const std::function<bool(const scheduling::NodeID &)> &node_filter_func);
/// Sort required resources according to the scarcity and capacity of resources.
/// We will first schedule scarce resources (such as GPU) and large capacity resources
@ -137,7 +143,7 @@ class GcsResourceScheduler {
/// request can be retry or not.
SchedulingResult StrictSpreadSchedule(
const std::vector<ResourceRequest> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes);
/// Schedule resources according to `SPREAD` strategy.
///
@ -148,7 +154,7 @@ class GcsResourceScheduler {
/// request can be retry or not.
SchedulingResult SpreadSchedule(
const std::vector<ResourceRequest> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes);
/// Schedule resources according to `STRICT_PACK` strategy.
///
@ -159,7 +165,7 @@ class GcsResourceScheduler {
/// request can be retry or not.
SchedulingResult StrictPackSchedule(
const std::vector<ResourceRequest> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes);
/// Schedule resources according to `PACK` strategy.
///
@ -170,18 +176,19 @@ class GcsResourceScheduler {
/// request can be retry or not.
SchedulingResult PackSchedule(
const std::vector<ResourceRequest> &required_resources_list,
const absl::flat_hash_set<NodeID> &candidate_nodes);
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes);
/// Score all nodes according to the specified resources.
///
/// \param required_resources The resources to be scheduled.
/// \param candidate_nodes The nodes can be used for scheduling.
/// \return Score of all nodes.
std::optional<NodeID> GetBestNode(const ResourceRequest &required_resources,
const absl::flat_hash_set<NodeID> &candidate_nodes);
scheduling::NodeID GetBestNode(
const ResourceRequest &required_resources,
const absl::flat_hash_set<scheduling::NodeID> &candidate_nodes);
/// Get node resources.
const NodeResources &GetNodeResources(const NodeID &node_id) const;
const NodeResources &GetNodeResources(const scheduling::NodeID &node_id) const;
/// Return the resources temporarily deducted from gcs resource manager.
///
@ -190,7 +197,7 @@ class GcsResourceScheduler {
/// one by one.
void ReleaseTemporarilyDeductedResources(
const std::vector<ResourceRequest> &required_resources_list,
const std::vector<NodeID> &nodes);
const std::vector<scheduling::NodeID> &nodes);
/// Subtract the resources required by a given resource request (resource_request) from
/// a given remote node.
@ -199,16 +206,16 @@ class GcsResourceScheduler {
/// \param resource_request Task for which we allocate resources.
/// \return True if remote node has enough resources to satisfy the resource request.
/// False otherwise.
bool AllocateRemoteTaskResources(const NodeID &node_id,
bool AllocateRemoteTaskResources(const scheduling::NodeID &node_id,
const ResourceRequest &resource_request);
bool ReleaseRemoteTaskResources(const NodeID &node_id,
bool ReleaseRemoteTaskResources(const scheduling::NodeID &node_id,
const ResourceRequest &resource_request);
const absl::flat_hash_map<NodeID, std::shared_ptr<Node>> &GetResourceView() const;
const absl::flat_hash_map<scheduling::NodeID, Node> &GetResourceView() const;
/// Reference of GcsResourceManager.
GcsResourceManager &gcs_resource_manager_;
std::unique_ptr<ClusterResourceManager> cluster_resource_manager_;
/// Scorer to make a grade to the node.
std::unique_ptr<NodeScorer> node_scorer_;

View file

@ -125,15 +125,15 @@ void GcsServer::Start() {
}
void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs resource scheduler.
InitGcsResourceScheduler();
// Init gcs resource manager.
InitGcsResourceManager(gcs_init_data);
// Init synchronization service
InitRaySyncer(gcs_init_data);
// Init gcs resource scheduler.
InitGcsResourceScheduler();
// Init gcs node manager.
InitGcsNodeManager(gcs_init_data);
@ -255,9 +255,9 @@ void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) {
}
void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
RAY_CHECK(gcs_table_storage_ && gcs_resource_scheduler_);
gcs_resource_manager_ = std::make_shared<GcsResourceManager>(
main_service_, gcs_publisher_, gcs_table_storage_);
gcs_table_storage_, gcs_resource_scheduler_->GetClusterResourceManager());
// Initialize by gcs tables data.
gcs_resource_manager_->Initialize(gcs_init_data);
@ -268,9 +268,7 @@ void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
}
void GcsServer::InitGcsResourceScheduler() {
RAY_CHECK(gcs_resource_manager_);
gcs_resource_scheduler_ =
std::make_shared<GcsResourceScheduler>(*gcs_resource_manager_);
gcs_resource_scheduler_ = std::make_shared<GcsResourceScheduler>();
}
void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) {
@ -309,15 +307,19 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
RAY_CHECK(gcs_resource_manager_ && gcs_resource_scheduler_);
scheduler = std::make_unique<GcsBasedActorScheduler>(main_service_,
scheduler = std::make_unique<GcsBasedActorScheduler>(
main_service_,
gcs_table_storage_->ActorTable(),
*gcs_node_manager_,
gcs_resource_manager_,
gcs_resource_scheduler_,
schedule_failure_handler,
schedule_success_handler,
raylet_client_pool_,
client_factory);
client_factory,
/*normal_task_resources_changed_callback=*/
[this](const NodeID &node_id, const rpc::ResourcesData &resources) {
gcs_resource_manager_->UpdateNodeNormalTaskResources(node_id, resources);
});
} else {
scheduler =
std::make_unique<RayletBasedActorScheduler>(main_service_,

View file

@ -39,17 +39,15 @@ class GcsBasedActorSchedulerTest : public ::testing::Test {
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
gcs_resource_scheduler_ = std::make_shared<gcs::GcsResourceScheduler>();
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
io_service_, gcs_publisher_, gcs_table_storage_);
auto resource_scheduler =
std::make_shared<gcs::GcsResourceScheduler>(*gcs_resource_manager_);
gcs_table_storage_, gcs_resource_scheduler_->GetClusterResourceManager());
gcs_actor_scheduler_ =
std::make_shared<GcsServerMocker::MockedGcsBasedActorScheduler>(
io_service_,
*gcs_actor_table_,
*gcs_node_manager_,
gcs_resource_manager_,
resource_scheduler,
gcs_resource_scheduler_,
/*schedule_failure_handler=*/
[this](std::shared_ptr<gcs::GcsActor> actor,
const rpc::RequestWorkerLeaseReply::SchedulingFailureType,
@ -63,7 +61,11 @@ class GcsBasedActorSchedulerTest : public ::testing::Test {
},
raylet_client_pool_,
/*client_factory=*/
[this](const rpc::Address &address) { return worker_client_; });
[this](const rpc::Address &address) { return worker_client_; },
/*normal_task_resources_changed_callback=*/
[this](const NodeID &node_id, const rpc::ResourcesData &resources) {
gcs_resource_manager_->UpdateNodeNormalTaskResources(node_id, resources);
});
}
std::shared_ptr<gcs::GcsActor> NewGcsActor(
@ -108,6 +110,7 @@ class GcsBasedActorSchedulerTest : public ::testing::Test {
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
std::shared_ptr<GcsServerMocker::MockWorkerClient> worker_client_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<gcs::GcsResourceScheduler> gcs_resource_scheduler_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<GcsServerMocker::MockedGcsBasedActorScheduler> gcs_actor_scheduler_;
std::vector<std::shared_ptr<gcs::GcsActor>> success_actors_;
@ -165,13 +168,12 @@ TEST_F(GcsBasedActorSchedulerTest, TestScheduleAndDestroyOneActor) {
{kCPU_ResourceLabel, 8}};
auto node = AddNewNode(node_resources);
auto node_id = NodeID::FromBinary(node->node_id());
scheduling::NodeID scheduling_node_id(node->node_id());
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
absl::flat_hash_map<NodeID, std::shared_ptr<Node>> cluster_resources_before_scheduling;
for (auto &entry : gcs_resource_manager_->GetClusterResources()) {
cluster_resources_before_scheduling.emplace(entry.first,
std::make_shared<Node>(*entry.second));
}
ASSERT_TRUE(cluster_resources_before_scheduling.contains(node_id));
const auto &cluster_resource_manager =
gcs_resource_scheduler_->GetClusterResourceManager();
auto resource_view_before_scheduling = cluster_resource_manager.GetResourceView();
ASSERT_TRUE(resource_view_before_scheduling.contains(scheduling_node_id));
// Schedule a actor (requiring 32 memory units and 4 CPU).
std::unordered_map<std::string, double> required_placement_resources = {
@ -203,17 +205,17 @@ TEST_F(GcsBasedActorSchedulerTest, TestScheduleAndDestroyOneActor) {
ASSERT_EQ(actor->GetNodeID(), node_id);
ASSERT_EQ(actor->GetWorkerID(), worker_id);
auto cluster_resources_after_scheduling = gcs_resource_manager_->GetClusterResources();
ASSERT_TRUE(cluster_resources_after_scheduling.contains(node_id));
ASSERT_NE(cluster_resources_before_scheduling[node_id]->GetLocalView(),
cluster_resources_after_scheduling[node_id]->GetLocalView());
auto resource_view_after_scheduling = cluster_resource_manager.GetResourceView();
ASSERT_TRUE(resource_view_after_scheduling.contains(scheduling_node_id));
ASSERT_NE(resource_view_before_scheduling.at(scheduling_node_id).GetLocalView(),
resource_view_after_scheduling.at(scheduling_node_id).GetLocalView());
// When destroying an actor, its acquired resources have to be returned.
gcs_actor_scheduler_->OnActorDestruction(actor);
auto cluster_resources_after_destruction = gcs_resource_manager_->GetClusterResources();
ASSERT_TRUE(cluster_resources_after_destruction.contains(node_id));
ASSERT_EQ(cluster_resources_before_scheduling[node_id]->GetLocalView(),
cluster_resources_after_scheduling[node_id]->GetLocalView());
auto resource_view_after_destruction = cluster_resource_manager.GetResourceView();
ASSERT_TRUE(resource_view_after_destruction.contains(scheduling_node_id));
ASSERT_TRUE(resource_view_after_destruction.at(scheduling_node_id).GetLocalView() ==
resource_view_before_scheduling.at(scheduling_node_id).GetLocalView());
}
TEST_F(GcsBasedActorSchedulerTest, TestBalancedSchedule) {

View file

@ -16,6 +16,7 @@
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"
#include "mock/ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h"
#include "mock/ray/gcs/gcs_server/gcs_resource_manager.h"
@ -37,7 +38,7 @@ class GcsPlacementGroupManagerMockTest : public Test {
gcs_placement_group_scheduler_ =
std::make_shared<MockGcsPlacementGroupSchedulerInterface>();
resource_manager_ =
std::make_shared<MockGcsResourceManager>(io_context_, nullptr, nullptr);
std::make_shared<MockGcsResourceManager>(nullptr, cluster_resource_manager_);
gcs_placement_group_manager_ =
std::make_unique<GcsPlacementGroupManager>(io_context_,
@ -51,6 +52,7 @@ class GcsPlacementGroupManagerMockTest : public Test {
std::shared_ptr<MockGcsPlacementGroupSchedulerInterface> gcs_placement_group_scheduler_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<MockStoreClient> store_client_;
ClusterResourceManager cluster_resource_manager_;
std::shared_ptr<GcsResourceManager> resource_manager_;
instrumented_io_context io_context_;
};

View file

@ -20,6 +20,7 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/gcs/gcs_server/test/gcs_server_test_util.h"
#include "ray/gcs/test/gcs_test_util.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"
namespace ray {
namespace gcs {
@ -81,7 +82,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
std::make_unique<GcsServerMocker::MockGcsPubSub>(redis_client_));
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_resource_manager_ =
std::make_shared<gcs::GcsResourceManager>(io_service_, nullptr, nullptr);
std::make_shared<gcs::GcsResourceManager>(nullptr, cluster_resource_manager_);
gcs_placement_group_manager_.reset(new gcs::GcsPlacementGroupManager(
io_service_,
mock_placement_group_scheduler_,
@ -163,6 +164,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
std::unique_ptr<std::thread> thread_io_service_;
instrumented_io_context io_service_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
ClusterResourceManager cluster_resource_manager_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<gcs::GcsPublisher> gcs_publisher_;
std::shared_ptr<gcs::RedisClient> redis_client_;

View file

@ -41,12 +41,11 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<GcsServerMocker::MockGcsPubSub>(redis_client_));
gcs_resource_scheduler_ = std::make_shared<gcs::GcsResourceScheduler>();
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
io_service_, nullptr, gcs_table_storage_);
gcs_table_storage_, gcs_resource_scheduler_->GetClusterResourceManager());
ray_syncer_ = std::make_shared<ray::syncer::RaySyncer>(
io_service_, nullptr, *gcs_resource_manager_);
gcs_resource_scheduler_ =
std::make_shared<gcs::GcsResourceScheduler>(*gcs_resource_manager_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
raylet_client_pool_ = std::make_shared<rpc::NodeManagerClientPool>(
[this](const rpc::Address &addr) { return raylet_clients_[addr.port()]; });
@ -182,7 +181,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
if (ray_syncer_->resources_buffer_proto_.batch().size() == n) {
break;
}
sleep(1);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}

View file

@ -19,6 +19,7 @@
#include "gtest/gtest.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/gcs/test/gcs_test_util.h"
#include "ray/raylet/scheduling/cluster_resource_manager.h"
namespace ray {
@ -28,10 +29,10 @@ class GcsResourceManagerTest : public ::testing::Test {
public:
GcsResourceManagerTest() {
gcs_resource_manager_ =
std::make_shared<gcs::GcsResourceManager>(io_service_, nullptr, nullptr);
std::make_shared<gcs::GcsResourceManager>(nullptr, cluster_resource_manager_);
}
instrumented_io_context io_service_;
ClusterResourceManager cluster_resource_manager_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
};
@ -46,22 +47,28 @@ TEST_F(GcsResourceManagerTest, TestBasic) {
gcs_resource_manager_->OnNodeAdd(*node);
// Get and check cluster resources.
const auto &cluster_resource = gcs_resource_manager_->GetClusterResources();
ASSERT_EQ(1, cluster_resource.size());
const auto &resource_view = cluster_resource_manager_.GetResourceView();
ASSERT_EQ(1, resource_view.size());
const auto &node_id = NodeID::FromBinary(node->node_id());
scheduling::NodeID scheduling_node_id(node->node_id());
auto resource_request =
ResourceMapToResourceRequest(resource_map, /*requires_object_store_memory=*/false);
// Test `AcquireResources`.
ASSERT_TRUE(gcs_resource_manager_->AcquireResources(node_id, resource_request));
ASSERT_FALSE(gcs_resource_manager_->AcquireResources(node_id, resource_request));
ASSERT_TRUE(cluster_resource_manager_.HasSufficientResource(
scheduling_node_id,
resource_request,
/*ignore_object_store_memory_requirement=*/true));
ASSERT_TRUE(cluster_resource_manager_.SubtractNodeAvailableResources(scheduling_node_id,
resource_request));
ASSERT_FALSE(cluster_resource_manager_.HasSufficientResource(
scheduling_node_id,
resource_request,
/*ignore_object_store_memory_requirement=*/true));
// Test `ReleaseResources`.
ASSERT_TRUE(
gcs_resource_manager_->ReleaseResources(NodeID::FromRandom(), resource_request));
ASSERT_TRUE(gcs_resource_manager_->ReleaseResources(node_id, resource_request));
ASSERT_TRUE(gcs_resource_manager_->AcquireResources(node_id, resource_request));
ASSERT_TRUE(cluster_resource_manager_.AddNodeAvailableResources(scheduling_node_id,
resource_request));
}
TEST_F(GcsResourceManagerTest, TestResourceUsageAPI) {
@ -96,18 +103,19 @@ TEST_F(GcsResourceManagerTest, TestSetAvailableResourcesWhenNodeDead) {
node->mutable_resources_total()->insert({"CPU", 10});
gcs_resource_manager_->OnNodeAdd(*node);
ASSERT_EQ(gcs_resource_manager_->GetClusterResources().size(), 1);
ASSERT_EQ(cluster_resource_manager_.GetResourceView().size(), 1);
auto node_id = NodeID::FromBinary(node->node_id());
gcs_resource_manager_->OnNodeDead(node_id);
ASSERT_EQ(gcs_resource_manager_->GetClusterResources().size(), 0);
ASSERT_EQ(cluster_resource_manager_.GetResourceView().size(), 0);
rpc::ResourcesData resources_data;
resources_data.set_node_id(node->node_id());
resources_data.mutable_resources_total()->insert({"CPU", 5});
resources_data.mutable_resources_available()->insert({"CPU", 5});
resources_data.set_resources_available_changed(true);
gcs_resource_manager_->UpdateFromResourceReport(resources_data);
ASSERT_EQ(gcs_resource_manager_->GetClusterResources().size(), 0);
ASSERT_EQ(cluster_resource_manager_.GetResourceView().size(), 0);
}
} // namespace ray

View file

@ -27,15 +27,18 @@ using ::testing::_;
class GcsResourceSchedulerTest : public ::testing::Test {
public:
void SetUp() override {
gcs_resource_manager_ =
std::make_shared<gcs::GcsResourceManager>(io_service_, nullptr, nullptr);
gcs_resource_scheduler_ =
std::make_shared<gcs::GcsResourceScheduler>(*gcs_resource_manager_);
gcs_resource_scheduler_ = std::make_shared<gcs::GcsResourceScheduler>();
}
void TearDown() override {
gcs_resource_scheduler_.reset();
gcs_resource_manager_.reset();
void TearDown() override { gcs_resource_scheduler_.reset(); }
void AddNode(const rpc::GcsNodeInfo &node) {
scheduling::NodeID node_id(node.node_id());
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
for (const auto &entry : node.resources_total()) {
cluster_resource_manager.UpdateResourceCapacity(
node_id, scheduling::ResourceID(entry.first), entry.second);
}
}
void AddClusterResources(const NodeID &node_id,
@ -44,7 +47,7 @@ class GcsResourceSchedulerTest : public ::testing::Test {
auto node = Mocker::GenNodeInfo();
node->set_node_id(node_id.Binary());
(*node->mutable_resources_total())[resource_name] = resource_value;
gcs_resource_manager_->OnNodeAdd(*node);
AddNode(*node);
}
void AddClusterResources(const NodeID &node_id,
@ -54,16 +57,16 @@ class GcsResourceSchedulerTest : public ::testing::Test {
for (auto r : resource) {
(*node->mutable_resources_total())[r.first] = r.second;
}
gcs_resource_manager_->OnNodeAdd(*node);
AddNode(*node);
}
void CheckClusterAvailableResources(const NodeID &node_id,
const std::string &resource_name,
double resource_value) {
const auto &cluster_resource = gcs_resource_manager_->GetClusterResources();
auto iter = cluster_resource.find(node_id);
ASSERT_TRUE(iter != cluster_resource.end());
const auto &node_resources = iter->second->GetLocalView();
const auto &cluster_resource_manager =
gcs_resource_scheduler_->GetClusterResourceManager();
const auto &node_resources =
cluster_resource_manager.GetNodeResources(scheduling::NodeID(node_id.Binary()));
auto resource_id = scheduling::ResourceID(resource_name).ToInt();
ASSERT_NE(resource_id, -1);
@ -160,7 +163,6 @@ class GcsResourceSchedulerTest : public ::testing::Test {
ASSERT_EQ(result1.second.size(), resources_list.size());
}
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<gcs::GcsResourceScheduler> gcs_resource_scheduler_;
private:
@ -200,18 +202,18 @@ TEST_F(GcsResourceSchedulerTest, TestNodeFilter) {
resource_map[cpu_resource] = 1;
required_resources_list.emplace_back(
ResourceMapToResourceRequest(resource_map, /*requires_object_store_memory=*/false));
const auto &result1 = gcs_resource_scheduler_->Schedule(
required_resources_list, gcs::SchedulingType::STRICT_SPREAD, [](const NodeID &) {
return false;
});
const auto &result1 =
gcs_resource_scheduler_->Schedule(required_resources_list,
gcs::SchedulingType::STRICT_SPREAD,
[](const scheduling::NodeID &) { return false; });
ASSERT_TRUE(result1.first == gcs::SchedulingResultStatus::INFEASIBLE);
ASSERT_EQ(result1.second.size(), 0);
// Scheduling succeeded.
const auto &result2 = gcs_resource_scheduler_->Schedule(
required_resources_list, gcs::SchedulingType::STRICT_SPREAD, [](const NodeID &) {
return true;
});
const auto &result2 =
gcs_resource_scheduler_->Schedule(required_resources_list,
gcs::SchedulingType::STRICT_SPREAD,
[](const scheduling::NodeID &) { return true; });
ASSERT_TRUE(result2.first == gcs::SchedulingResultStatus::SUCCESS);
ASSERT_EQ(result2.second.size(), 1);
}

View file

@ -151,6 +151,9 @@ class NodeResources {
NodeResources(const NodeResources &other)
: predefined_resources(other.predefined_resources),
custom_resources(other.custom_resources),
normal_task_resources(other.normal_task_resources),
latest_resources_normal_task_timestamp(
other.latest_resources_normal_task_timestamp),
object_pulls_queued(other.object_pulls_queued) {}
/// Available and total capacities for predefined resources.
std::vector<ResourceCapacity> predefined_resources;
@ -159,6 +162,10 @@ class NodeResources {
absl::flat_hash_map<int64_t, ResourceCapacity> custom_resources;
/// Resources owned by normal tasks.
ResourceRequest normal_task_resources;
/// Normal task resources could be uploaded by 1) Raylets' periodical reporters; 2)
/// Rejected RequestWorkerLeaseReply. So we need the timestamps to decide whether an
/// upload is latest.
int64_t latest_resources_normal_task_timestamp = 0;
bool object_pulls_queued = false;
/// Amongst CPU, memory, and object store memory, calculate the utilization percentage

View file

@ -274,6 +274,99 @@ bool ClusterResourceManager::HasSufficientResource(
return true;
}
bool ClusterResourceManager::AddNodeAvailableResources(
scheduling::NodeID node_id, const ResourceRequest &resource_request) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return false;
}
auto node_resources = it->second.GetMutableLocalView();
RAY_CHECK(resource_request.predefined_resources.size() <=
node_resources->predefined_resources.size());
for (size_t i = 0; i < resource_request.predefined_resources.size(); ++i) {
node_resources->predefined_resources[i].available +=
resource_request.predefined_resources[i];
node_resources->predefined_resources[i].available =
std::min(node_resources->predefined_resources[i].available,
node_resources->predefined_resources[i].total);
}
for (auto &entry : resource_request.custom_resources) {
auto it = node_resources->custom_resources.find(entry.first);
if (it != node_resources->custom_resources.end()) {
it->second.available += entry.second;
it->second.available = std::min(it->second.available, it->second.total);
}
}
return true;
}
bool ClusterResourceManager::UpdateNodeAvailableResourcesIfExist(
scheduling::NodeID node_id, const rpc::ResourcesData &resource_data) {
auto iter = nodes_.find(node_id);
if (iter == nodes_.end()) {
return false;
}
if (!resource_data.resources_available_changed()) {
return true;
}
auto resources =
ResourceMapToResourceRequest(MapFromProtobuf(resource_data.resources_available()),
/*requires_object_store_memory=*/false);
auto node_resources = iter->second.GetMutableLocalView();
for (size_t i = 0; i < node_resources->predefined_resources.size(); ++i) {
node_resources->predefined_resources[i].available = resources.predefined_resources[i];
}
for (auto &entry : node_resources->custom_resources) {
auto it = resources.custom_resources.find(entry.first);
if (it != resources.custom_resources.end()) {
entry.second.available = it->second;
} else {
entry.second.available = 0.;
}
}
return true;
}
bool ClusterResourceManager::UpdateNodeNormalTaskResources(
scheduling::NodeID node_id, const rpc::ResourcesData &resource_data) {
auto iter = nodes_.find(node_id);
if (iter != nodes_.end()) {
auto node_resources = iter->second.GetMutableLocalView();
if (resource_data.resources_normal_task_changed() &&
resource_data.resources_normal_task_timestamp() >
node_resources->latest_resources_normal_task_timestamp) {
auto normal_task_resources = ResourceMapToResourceRequest(
MapFromProtobuf(resource_data.resources_normal_task()),
/*requires_object_store_memory=*/false);
auto &local_normal_task_resources = node_resources->normal_task_resources;
if (normal_task_resources != local_normal_task_resources) {
local_normal_task_resources.predefined_resources.resize(PredefinedResources_MAX);
for (size_t i = 0; i < PredefinedResources_MAX; ++i) {
local_normal_task_resources.predefined_resources[i] =
normal_task_resources.predefined_resources[i];
}
local_normal_task_resources.custom_resources =
std::move(normal_task_resources.custom_resources);
node_resources->latest_resources_normal_task_timestamp =
resource_data.resources_normal_task_timestamp();
return true;
}
}
}
return false;
}
bool ClusterResourceManager::ContainsNode(scheduling::NodeID node_id) const {
return nodes_.contains(node_id);
}
void ClusterResourceManager::DebugString(std::stringstream &buffer) const {
for (auto &node : GetResourceView()) {
buffer << "node id: " << node.first.ToInt();

View file

@ -82,7 +82,7 @@ class ClusterResourceManager {
const NodeResources &GetNodeResources(scheduling::NodeID node_id) const;
/// Subtract available resource from a given node.
//// Return false if such node doesn't exist.
/// Return false if such node doesn't exist.
bool SubtractNodeAvailableResources(scheduling::NodeID node_id,
const ResourceRequest &resource_request);
@ -96,6 +96,31 @@ class ClusterResourceManager {
const ResourceRequest &resource_request,
bool ignore_object_store_memory_requirement) const;
/// Add available resource to a given node.
/// Return false if such node doesn't exist.
bool AddNodeAvailableResources(scheduling::NodeID node_id,
const ResourceRequest &resource_request);
/// Update node available resources.
/// NOTE: This method only updates the existing resources of the node, and the
/// nonexistent resources will be filtered out, whitch is different from `UpdateNode`.
/// Return false if such node doesn't exist.
/// TODO(Shanly): This method will be replaced with UpdateNode once we have resource
/// version.
bool UpdateNodeAvailableResourcesIfExist(scheduling::NodeID node_id,
const rpc::ResourcesData &resource_data);
/// Update node normal task resources.
/// Return false if such node doesn't exist.
/// TODO(Shanly): Integrated this method into `UpdateNode` later.
bool UpdateNodeNormalTaskResources(scheduling::NodeID node_id,
const rpc::ResourcesData &resource_data);
/// Return false if the specified node doesn't exist.
/// TODO(Shanly): This method will be removed once the `gcs_resource_manager` is
/// replaced with `cluster_resource_scheduler`.
bool ContainsNode(scheduling::NodeID node_id) const;
void DebugString(std::stringstream &buffer) const;
private:

View file

@ -90,4 +90,106 @@ TEST_F(ClusterResourceManagerTest, HasSufficientResourceTest) {
/*requires_object_store_memory=*/true),
/*ignore_object_store_memory_requirement*/ true));
}
TEST_F(ClusterResourceManagerTest, SubtractAndAddNodeAvailableResources) {
const auto &node_resources = manager->GetNodeResources(node0);
ASSERT_TRUE(
node_resources.predefined_resources[scheduling::kCPUResource.ToInt()].available ==
1);
manager->SubtractNodeAvailableResources(
node0,
ResourceMapToResourceRequest({{"CPU", 1}},
/*requires_object_store_memory=*/false));
ASSERT_TRUE(
node_resources.predefined_resources[scheduling::kCPUResource.ToInt()].available ==
0);
// Subtract again and make sure the available == 0.
manager->SubtractNodeAvailableResources(
node0,
ResourceMapToResourceRequest({{"CPU", 1}},
/*requires_object_store_memory=*/false));
ASSERT_TRUE(
node_resources.predefined_resources[scheduling::kCPUResource.ToInt()].available ==
0);
// Add resources back.
manager->AddNodeAvailableResources(
node0,
ResourceMapToResourceRequest({{"CPU", 1}},
/*requires_object_store_memory=*/false));
ASSERT_TRUE(
node_resources.predefined_resources[scheduling::kCPUResource.ToInt()].available ==
1);
// Add again and make sure the available == 1 (<= total).
manager->AddNodeAvailableResources(
node0,
ResourceMapToResourceRequest({{"CPU", 1}},
/*requires_object_store_memory=*/false));
ASSERT_TRUE(
node_resources.predefined_resources[scheduling::kCPUResource.ToInt()].available ==
1);
}
TEST_F(ClusterResourceManagerTest, UpdateNodeAvailableResourcesIfExist) {
const auto &node_resources = manager->GetNodeResources(node0);
ASSERT_TRUE(
node_resources.predefined_resources[scheduling::kCPUResource.ToInt()].available ==
1);
rpc::ResourcesData resources_data;
resources_data.set_resources_available_changed(true);
(*resources_data.mutable_resources_available())["CPU"] = 0;
manager->UpdateNodeAvailableResourcesIfExist(node0, resources_data);
ASSERT_TRUE(
node_resources.predefined_resources[scheduling::kCPUResource.ToInt()].available ==
0);
(*resources_data.mutable_resources_available())["CUSTOM_RESOURCE"] = 1;
manager->UpdateNodeAvailableResourcesIfExist(node0, resources_data);
ASSERT_FALSE(node_resources.custom_resources.contains(
scheduling::ResourceID("CUSTOM_RESOURCE").ToInt()));
}
TEST_F(ClusterResourceManagerTest, UpdateNodeNormalTaskResources) {
const auto &node_resources = manager->GetNodeResources(node0);
ASSERT_TRUE(node_resources.normal_task_resources.IsEmpty());
rpc::ResourcesData resources_data;
resources_data.set_resources_normal_task_changed(true);
resources_data.set_resources_normal_task_timestamp(absl::GetCurrentTimeNanos());
resources_data.mutable_resources_normal_task()->insert({"CPU", 0.5});
manager->UpdateNodeNormalTaskResources(node0, resources_data);
ASSERT_TRUE(node_resources.normal_task_resources
.predefined_resources[scheduling::kCPUResource.ToInt()] == 0.5);
(*resources_data.mutable_resources_normal_task())["CPU"] = 0.8;
resources_data.set_resources_normal_task_changed(false);
resources_data.set_resources_normal_task_timestamp(absl::GetCurrentTimeNanos());
manager->UpdateNodeNormalTaskResources(node0, resources_data);
ASSERT_TRUE(node_resources.normal_task_resources
.predefined_resources[scheduling::kCPUResource.ToInt()] == 0.5);
resources_data.set_resources_normal_task_changed(true);
resources_data.set_resources_normal_task_timestamp(0);
manager->UpdateNodeNormalTaskResources(node0, resources_data);
ASSERT_TRUE(node_resources.normal_task_resources
.predefined_resources[scheduling::kCPUResource.ToInt()] == 0.5);
resources_data.set_resources_normal_task_changed(true);
resources_data.set_resources_normal_task_timestamp(0);
manager->UpdateNodeNormalTaskResources(node0, resources_data);
ASSERT_TRUE(node_resources.normal_task_resources
.predefined_resources[scheduling::kCPUResource.ToInt()] == 0.5);
resources_data.set_resources_normal_task_changed(true);
resources_data.set_resources_normal_task_timestamp(absl::GetCurrentTimeNanos());
manager->UpdateNodeNormalTaskResources(node0, resources_data);
ASSERT_TRUE(node_resources.normal_task_resources
.predefined_resources[scheduling::kCPUResource.ToInt()] == 0.8);
}
} // namespace ray

View file

@ -88,6 +88,8 @@ enum class SchedulingIDTag { Node, Resource };
template <SchedulingIDTag T>
class BaseSchedulingID {
public:
explicit BaseSchedulingID() = default;
explicit BaseSchedulingID(const std::string &name) : id_{GetMap().Insert(name)} {}
explicit BaseSchedulingID(int64_t id) : id_{id} {}