[scheduler] hide StringIDMap under BaseSchedulingID (#22722)

* add

* address comments
This commit is contained in:
Chen Shen 2022-03-01 22:50:53 -08:00 committed by GitHub
parent 271ed44143
commit 3e3db8e9cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 695 additions and 665 deletions

View file

@ -1066,6 +1066,20 @@ cc_test(
],
)
cc_test(
name = "scheduling_ids_test",
size = "small",
srcs = [
"src/ray/raylet/scheduling/scheduling_ids_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "local_object_manager_test",
size = "small",

View file

@ -30,12 +30,6 @@ namespace ray {
/// divide to convert from internal to actual.
constexpr double kResourceConversionFactor = 10000;
const std::string kCPU_ResourceLabel = "CPU";
const std::string kGPU_ResourceLabel = "GPU";
const std::string kObjectStoreMemory_ResourceLabel = "object_store_memory";
const std::string kMemory_ResourceLabel = "memory";
const std::string kBundle_ResourceLabel = "bundle";
/// \class ResourceSet
/// \brief Encapsulates and operates on a set of resources, including CPUs,
/// GPUs, and custom labels.

View file

@ -313,8 +313,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
SchedulingResources local_resources(config.resource_config);
cluster_resource_scheduler_ =
std::shared_ptr<ClusterResourceScheduler>(new ClusterResourceScheduler(
self_node_id_.Binary(), local_resources.GetTotalResources().GetResourceMap(),
*gcs_client_,
scheduling::NodeID(self_node_id_.Binary()),
local_resources.GetTotalResources().GetResourceMap(), *gcs_client_,
[this]() {
if (RayConfig::instance().scheduler_report_pinned_bytes_only()) {
return local_object_manager_.GetPinnedBytes();
@ -771,7 +771,7 @@ void NodeManager::WarnResourceDeadlock() {
<< "\n"
<< "Available resources on this node: "
<< cluster_resource_scheduler_->GetClusterResourceManager()
.GetNodeResourceViewString(self_node_id_.Binary())
.GetNodeResourceViewString(scheduling::NodeID(self_node_id_.Binary()))
<< " In total there are " << pending_tasks << " pending tasks and "
<< pending_actor_creations << " pending actors on this node.";
@ -847,7 +847,7 @@ void NodeManager::NodeRemoved(const NodeID &node_id) {
// Remove the node from the resource map.
if (!cluster_resource_scheduler_->GetClusterResourceManager().RemoveNode(
node_id.Binary())) {
scheduling::NodeID(node_id.Binary()))) {
RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id
<< ".";
return;
@ -933,7 +933,8 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
const std::string &resource_label = resource_pair.first;
const double &new_resource_capacity = resource_pair.second;
cluster_resource_scheduler_->GetClusterResourceManager().UpdateResourceCapacity(
node_id.Binary(), resource_label, new_resource_capacity);
scheduling::NodeID(node_id.Binary()), scheduling::ResourceID(resource_label),
new_resource_capacity);
}
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map.";
cluster_task_manager_->ScheduleAndDispatchTasks();
@ -961,7 +962,7 @@ void NodeManager::ResourceDeleted(const NodeID &node_id,
// Update local_available_resources_ and SchedulingResources
for (const auto &resource_label : resource_names) {
cluster_resource_scheduler_->GetClusterResourceManager().DeleteResource(
node_id.Binary(), resource_label);
scheduling::NodeID(node_id.Binary()), scheduling::ResourceID(resource_label));
}
return;
}
@ -969,7 +970,7 @@ void NodeManager::ResourceDeleted(const NodeID &node_id,
void NodeManager::UpdateResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resource_data) {
if (!cluster_resource_scheduler_->GetClusterResourceManager().UpdateNode(
node_id.Binary(), resource_data)) {
scheduling::NodeID(node_id.Binary()), resource_data)) {
RAY_LOG(INFO)
<< "[UpdateResourceUsage]: received resource usage from unknown node id "
<< node_id;
@ -1605,7 +1606,8 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
<< ", normal_task_resources = " << normal_task_resources.ToString()
<< ", local_resoruce_view = "
<< cluster_resource_scheduler_->GetClusterResourceManager()
.GetNodeResourceViewString(self_node_id_.Binary());
.GetNodeResourceViewString(
scheduling::NodeID(self_node_id_.Binary()));
auto resources_data = reply->mutable_resources_data();
resources_data->set_node_id(self_node_id_.Binary());
resources_data->set_resources_normal_task_changed(true);

View file

@ -128,7 +128,6 @@ void NewPlacementGroupResourceManager::CommitBundle(
const auto &bundle_state = it->second;
bundle_state->state_ = CommitState::COMMITTED;
const auto &string_id_map = cluster_resource_scheduler_->GetStringIdMap();
const auto &task_resource_instances = *bundle_state->resources_;
const auto &resources = bundle_spec.GetFormattedResources();
@ -136,13 +135,12 @@ void NewPlacementGroupResourceManager::CommitBundle(
const auto &resource_name = resource.first;
const auto &original_resource_name = GetOriginalResourceName(resource_name);
if (original_resource_name != kBundle_ResourceLabel) {
const auto &instances =
task_resource_instances.Get(original_resource_name, string_id_map);
const auto &instances = task_resource_instances.Get(original_resource_name);
cluster_resource_scheduler_->GetLocalResourceManager().AddLocalResourceInstances(
resource_name, instances);
scheduling::ResourceID{resource_name}, instances);
} else {
cluster_resource_scheduler_->GetLocalResourceManager().AddLocalResourceInstances(
resource_name, {resource.second});
scheduling::ResourceID{resource_name}, {resource.second});
}
}
update_resources_(
@ -185,14 +183,15 @@ void NewPlacementGroupResourceManager::ReturnBundle(
std::vector<std::string> deleted;
for (const auto &resource : placement_group_resources) {
auto resource_id = scheduling::ResourceID{resource.first};
if (cluster_resource_scheduler_->GetLocalResourceManager().IsAvailableResourceEmpty(
resource.first)) {
resource_id)) {
RAY_LOG(DEBUG) << "Available bundle resource:[" << resource.first
<< "] is empty, Will delete it from local resource";
// Delete local resource if available resource is empty when return bundle, or there
// will be resource leak.
cluster_resource_scheduler_->GetLocalResourceManager().DeleteLocalResource(
resource.first);
resource_id);
deleted.push_back(resource.first);
} else {
RAY_LOG(DEBUG) << "Available bundle resource:[" << resource.first

View file

@ -41,8 +41,8 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
}
void InitLocalAvailableResource(
absl::flat_hash_map<std::string, double> &unit_resource) {
cluster_resource_scheduler_ =
std::make_shared<ClusterResourceScheduler>("local", unit_resource, *gcs_client_);
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID("local"), unit_resource, *gcs_client_);
new_placement_group_resource_manager_.reset(
new raylet::NewPlacementGroupResourceManager(
cluster_resource_scheduler_,
@ -57,13 +57,13 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
void CheckAvailableResoueceEmpty(const std::string &resource) {
ASSERT_TRUE(
cluster_resource_scheduler_->GetLocalResourceManager().IsAvailableResourceEmpty(
resource));
scheduling::ResourceID(resource)));
}
void CheckRemainingResourceCorrect(NodeResources &node_resources) {
auto local_node_resource =
cluster_resource_scheduler_->GetClusterResourceManager().GetNodeResources(
"local");
scheduling::NodeID("local"));
ASSERT_TRUE(local_node_resource == node_resources);
}
@ -130,7 +130,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 1000}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
scheduling::NodeID("remaining"), remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(
@ -138,7 +138,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
unit_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
CheckRemainingResourceCorrect(remaining_resource_instance);
}
@ -163,10 +163,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) {
ASSERT_TRUE(delete_called_);
/// 5. check remaining resources is correct.
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", unit_resource, *gcs_client_);
scheduling::NodeID("remaining"), unit_resource, *gcs_client_);
auto remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
CheckRemainingResourceCorrect(remaining_resource_instance);
}
@ -204,7 +204,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
{"bundle_group_2_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 2000}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
scheduling::NodeID("remaining"), remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(
@ -212,7 +212,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
init_unit_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
CheckRemainingResourceCorrect(remaining_resource_instance);
/// 5. return second bundle.
@ -228,7 +228,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 2000}};
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
scheduling::NodeID("remaining"), remaining_resources, *gcs_client_);
ASSERT_TRUE(
remaining_resource_scheduler->GetLocalResourceManager().AllocateLocalTaskResources(
{{"CPU_group_" + group_id.Hex(), 1.0},
@ -237,17 +237,17 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
resource_instances));
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
CheckRemainingResourceCorrect(remaining_resource_instance);
/// 7. return first bundle.
new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec);
/// 8. check remaining resources is correct after all bundle returned.
remaining_resources = {{"CPU", 2.0}};
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
scheduling::NodeID("remaining"), remaining_resources, *gcs_client_);
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
ASSERT_TRUE(update_called_);
ASSERT_TRUE(delete_called_);
CheckRemainingResourceCorrect(remaining_resource_instance);
@ -270,7 +270,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare)
/// 4. check remaining resources is correct.
absl::flat_hash_map<std::string, double> remaining_resources = {{"CPU", 3.0}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
scheduling::NodeID("remaining"), remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(
@ -278,7 +278,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare)
unit_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
CheckRemainingResourceCorrect(remaining_resource_instance);
}
@ -307,7 +307,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 1000}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
scheduling::NodeID("remaining"), remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(
@ -315,7 +315,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
unit_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
CheckRemainingResourceCorrect(remaining_resource_instance);
new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
// 5. prepare bundle -> commit bundle -> commit bundle.
@ -336,10 +336,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
ConvertSingleSpecToVectorPtrs(bundle_spec));
// 8. check remaining resources is correct.
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", available_resource, *gcs_client_);
scheduling::NodeID("remaining"), available_resource, *gcs_client_);
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
CheckRemainingResourceCorrect(remaining_resource_instance);
}
@ -360,10 +360,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
// 4. check remaining resources is correct.
absl::flat_hash_map<std::string, double> remaining_resources = {{"CPU", 3.0}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
scheduling::NodeID("remaining"), remaining_resources, *gcs_client_);
auto remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
CheckRemainingResourceCorrect(remaining_resource_instance);
// 5. re-init the local available resource with 4 CPUs.
available_resource = {std::make_pair("CPU", 4.0)};
@ -386,7 +386,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
{"bundle_group_4_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 4000}};
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
scheduling::NodeID("remaining"), remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
absl::flat_hash_map<std::string, double> allocating_resource;
@ -396,7 +396,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
allocating_resource, resource_instances));
remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
RAY_LOG(INFO) << "The current local resource view: "
<< cluster_resource_scheduler_->DebugString();
CheckRemainingResourceCorrect(remaining_resource_instance);
@ -432,7 +432,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) {
{"bundle_group_4_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 4000}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
scheduling::NodeID("remaining"), remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
absl::flat_hash_map<std::string, double> allocating_resource;
@ -442,7 +442,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestCommiteResourceBatched) {
allocating_resource, resource_instances));
auto remaining_resource_instance =
remaining_resource_scheduler->GetClusterResourceManager().GetNodeResources(
"remaining");
scheduling::NodeID("remaining"));
RAY_LOG(INFO) << "The current local resource view: "
<< cluster_resource_scheduler_->DebugString();
CheckRemainingResourceCorrect(remaining_resource_instance);

View file

@ -18,6 +18,7 @@
#include "ray/common/task/scheduling_resources.h"
namespace ray {
using namespace ::ray::scheduling;
const std::string resource_labels[] = {
ray::kCPU_ResourceLabel, ray::kMemory_ResourceLabel, ray::kGPU_ResourceLabel,
@ -89,7 +90,6 @@ std::vector<double> VectorFixedPointToVectorDouble(
/// Convert a map of resources to a ResourceRequest data structure.
ResourceRequest ResourceMapToResourceRequest(
StringIdMap &string_to_int_map,
const absl::flat_hash_map<std::string, double> &resource_map,
bool requires_object_store_memory) {
ResourceRequest resource_request;
@ -107,8 +107,8 @@ ResourceRequest ResourceMapToResourceRequest(
} else if (resource.first == ray::kMemory_ResourceLabel) {
resource_request.predefined_resources[MEM] = resource.second;
} else {
int64_t id = string_to_int_map.Insert(resource.first);
resource_request.custom_resources[id] = resource.second;
resource_request.custom_resources[ResourceID(resource.first).ToInt()] =
resource.second;
}
}
@ -116,7 +116,7 @@ ResourceRequest ResourceMapToResourceRequest(
}
const std::vector<FixedPoint> &TaskResourceInstances::Get(
const std::string &resource_name, const StringIdMap &string_id_map) const {
const std::string &resource_name) const {
if (ray::kCPU_ResourceLabel == resource_name) {
return predefined_resources[CPU];
} else if (ray::kGPU_ResourceLabel == resource_name) {
@ -126,8 +126,7 @@ const std::vector<FixedPoint> &TaskResourceInstances::Get(
} else if (ray::kMemory_ResourceLabel == resource_name) {
return predefined_resources[MEM];
} else {
int64_t resource_id = string_id_map.Get(resource_name);
auto it = custom_resources.find(resource_id);
auto it = custom_resources.find(ResourceID(resource_name).ToInt());
RAY_CHECK(it != custom_resources.end());
return it->second;
}
@ -162,7 +161,6 @@ ResourceRequest TaskResourceInstances::ToResourceRequest() const {
///
/// \request Conversion result to a ResourceRequest data structure.
NodeResources ResourceMapToNodeResources(
StringIdMap &string_to_int_map,
const absl::flat_hash_map<std::string, double> &resource_map_total,
const absl::flat_hash_map<std::string, double> &resource_map_available) {
NodeResources node_resources;
@ -191,7 +189,7 @@ NodeResources ResourceMapToNodeResources(
node_resources.predefined_resources[MEM] = resource_capacity;
} else {
// This is a custom resource.
node_resources.custom_resources.emplace(string_to_int_map.Insert(resource.first),
node_resources.custom_resources.emplace(ResourceID(resource.first).ToInt(),
resource_capacity);
}
}
@ -325,7 +323,7 @@ bool NodeResources::operator==(const NodeResources &other) {
bool NodeResources::operator!=(const NodeResources &other) { return !(*this == other); }
std::string NodeResources::DebugString(StringIdMap string_to_in_map) const {
std::string NodeResources::DebugString() const {
std::stringstream buffer;
buffer << " {\n";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
@ -352,14 +350,14 @@ std::string NodeResources::DebugString(StringIdMap string_to_in_map) const {
}
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
buffer << "\t" << string_to_in_map.Get(it->first) << ":(" << it->second.total << ":"
buffer << "\t" << ResourceID(it->first).Binary() << ":(" << it->second.total << ":"
<< it->second.available << ")\n";
}
buffer << "}" << std::endl;
return buffer.str();
}
std::string NodeResources::DictString(StringIdMap string_to_in_map) const {
std::string NodeResources::DictString() const {
std::stringstream buffer;
bool first = true;
buffer << "{";
@ -397,7 +395,7 @@ std::string NodeResources::DictString(StringIdMap string_to_in_map) const {
}
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
auto name = string_to_in_map.Get(it->first);
auto name = ResourceID(it->first).Binary();
buffer << ", " << format_resource(name, it->second.available.Double()) << "/"
<< format_resource(name, it->second.total.Double());
buffer << " " << name;
@ -445,7 +443,7 @@ void TaskResourceInstances::ClearCPUInstances() {
}
}
std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) const {
std::string NodeResourceInstances::DebugString() const {
std::stringstream buffer;
buffer << "{\n";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
@ -472,7 +470,7 @@ std::string NodeResourceInstances::DebugString(StringIdMap string_to_int_map) co
}
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
buffer << "\t" << string_to_int_map.Get(it->first) << ":("
buffer << "\t" << ResourceID(it->first).Binary() << ":("
<< VectorToString(it->second.total) << ":"
<< VectorToString(it->second.available) << ")\n";
}
@ -545,7 +543,7 @@ bool TaskResourceInstances::IsEmpty() const {
return true;
}
std::string TaskResourceInstances::DebugString(const StringIdMap &string_id_map) const {
std::string TaskResourceInstances::DebugString() const {
std::stringstream buffer;
buffer << std::endl << " Allocation: {";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
@ -556,7 +554,7 @@ std::string TaskResourceInstances::DebugString(const StringIdMap &string_id_map)
buffer << " [";
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
buffer << string_id_map.Get(it->first) << ":" << VectorToString(it->second) << ", ";
buffer << ResourceID(it->first).Binary() << ":" << VectorToString(it->second) << ", ";
}
buffer << "]" << std::endl;

View file

@ -27,9 +27,6 @@
namespace ray {
/// List of predefined resources.
enum PredefinedResources { CPU, MEM, GPU, OBJECT_STORE_MEM, PredefinedResources_MAX };
const std::string ResourceEnumToString(PredefinedResources resource);
const PredefinedResources ResourceStringToEnum(const std::string &resource);
@ -84,8 +81,7 @@ class TaskResourceInstances {
absl::flat_hash_map<int64_t, std::vector<FixedPoint>> custom_resources;
bool operator==(const TaskResourceInstances &other);
/// Get instances based on the string.
const std::vector<FixedPoint> &Get(const std::string &resource_name,
const StringIdMap &string_id_map) const;
const std::vector<FixedPoint> &Get(const std::string &resource_name) const;
/// For each resource of this request aggregate its instances.
ResourceRequest ToResourceRequest() const;
/// Get CPU instances only.
@ -138,7 +134,7 @@ class TaskResourceInstances {
/// Check whether there are no resource instances.
bool IsEmpty() const;
/// Returns human-readable string for these resources.
[[nodiscard]] std::string DebugString(const StringIdMap &string_id_map) const;
[[nodiscard]] std::string DebugString() const;
};
/// Total and available capacities of each resource of a node.
@ -170,9 +166,9 @@ class NodeResources {
bool operator==(const NodeResources &other);
bool operator!=(const NodeResources &other);
/// Returns human-readable string for these resources.
std::string DebugString(StringIdMap string_to_int_map) const;
std::string DebugString() const;
/// Returns compact dict-like string.
std::string DictString(StringIdMap string_to_int_map) const;
std::string DictString() const;
};
/// Total and available capacities of each resource instance.
@ -189,7 +185,7 @@ class NodeResourceInstances {
/// Returns if this equals another node resources.
bool operator==(const NodeResourceInstances &other);
/// Returns human-readable string for these resources.
[[nodiscard]] std::string DebugString(StringIdMap string_to_int_map) const;
[[nodiscard]] std::string DebugString() const;
};
struct Node {
@ -211,13 +207,11 @@ struct Node {
/// \request Conversion result to a ResourceRequest data structure.
NodeResources ResourceMapToNodeResources(
StringIdMap &string_to_int_map,
const absl::flat_hash_map<std::string, double> &resource_map_total,
const absl::flat_hash_map<std::string, double> &resource_map_available);
/// Convert a map of resources to a ResourceRequest data structure.
ResourceRequest ResourceMapToResourceRequest(
StringIdMap &string_to_int_map,
const absl::flat_hash_map<std::string, double> &resource_map,
bool requires_object_store_memory);

View file

@ -22,22 +22,21 @@
namespace ray {
ClusterResourceManager::ClusterResourceManager(StringIdMap &string_to_int_map)
: nodes_{}, string_to_int_map_{string_to_int_map} {}
ClusterResourceManager::ClusterResourceManager() : nodes_{} {}
void ClusterResourceManager::AddOrUpdateNode(
const std::string &node_id,
scheduling::NodeID node_id,
const absl::flat_hash_map<std::string, double> &resources_total,
const absl::flat_hash_map<std::string, double> &resources_available) {
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, resources_total, resources_available);
AddOrUpdateNode(string_to_int_map_.Insert(node_id), node_resources);
NodeResources node_resources =
ResourceMapToNodeResources(resources_total, resources_available);
AddOrUpdateNode(node_id, node_resources);
}
void ClusterResourceManager::AddOrUpdateNode(int64_t node_id,
void ClusterResourceManager::AddOrUpdateNode(scheduling::NodeID node_id,
const NodeResources &node_resources) {
RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id << ", node_resources: "
<< node_resources.DebugString(string_to_int_map_);
RAY_LOG(DEBUG) << "Update node info, node_id: " << node_id.ToInt()
<< ", node_resources: " << node_resources.DebugString();
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
// This node is new, so add it to the map.
@ -48,17 +47,16 @@ void ClusterResourceManager::AddOrUpdateNode(int64_t node_id,
}
}
bool ClusterResourceManager::UpdateNode(const std::string &node_id_string,
bool ClusterResourceManager::UpdateNode(scheduling::NodeID node_id,
const rpc::ResourcesData &resource_data) {
auto node_id = string_to_int_map_.Insert(node_id_string);
if (!nodes_.contains(node_id)) {
return false;
}
auto resources_total = MapFromProtobuf(resource_data.resources_total());
auto resources_available = MapFromProtobuf(resource_data.resources_available());
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, resources_total, resources_available);
NodeResources node_resources =
ResourceMapToNodeResources(resources_total, resources_available);
NodeResources local_view;
RAY_CHECK(GetNodeResources(node_id, &local_view));
@ -88,7 +86,7 @@ bool ClusterResourceManager::UpdateNode(const std::string &node_id_string,
return true;
}
bool ClusterResourceManager::RemoveNode(int64_t node_id) {
bool ClusterResourceManager::RemoveNode(scheduling::NodeID node_id) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
// Node not found.
@ -99,16 +97,7 @@ bool ClusterResourceManager::RemoveNode(int64_t node_id) {
}
}
bool ClusterResourceManager::RemoveNode(const std::string &node_id_string) {
auto node_id = string_to_int_map_.Get(node_id_string);
if (node_id == -1) {
return false;
}
return RemoveNode(node_id);
}
bool ClusterResourceManager::GetNodeResources(int64_t node_id,
bool ClusterResourceManager::GetNodeResources(scheduling::NodeID node_id,
NodeResources *ret_resources) const {
auto it = nodes_.find(node_id);
if (it != nodes_.end()) {
@ -120,28 +109,24 @@ bool ClusterResourceManager::GetNodeResources(int64_t node_id,
}
const NodeResources &ClusterResourceManager::GetNodeResources(
const std::string &node_name) const {
int64_t node_id = string_to_int_map_.Get(node_name);
scheduling::NodeID node_id) const {
const auto &node = map_find_or_die(nodes_, node_id);
return node.GetLocalView();
}
int64_t ClusterResourceManager::NumNodes() const { return nodes_.size(); }
void ClusterResourceManager::UpdateResourceCapacity(const std::string &node_id_string,
const std::string &resource_name,
void ClusterResourceManager::UpdateResourceCapacity(scheduling::NodeID node_id,
scheduling::ResourceID resource_id,
double resource_total) {
int64_t node_id = string_to_int_map_.Get(node_id_string);
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
NodeResources node_resources;
node_resources.predefined_resources.resize(PredefinedResources_MAX);
node_id = string_to_int_map_.Insert(node_id_string);
it = nodes_.emplace(node_id, node_resources).first;
}
int idx = GetPredefinedResourceIndex(resource_name);
int idx = GetPredefinedResourceIndex(resource_id);
auto local_view = it->second.GetMutableLocalView();
FixedPoint resource_total_fp(resource_total);
@ -156,9 +141,7 @@ void ClusterResourceManager::UpdateResourceCapacity(const std::string &node_id_s
local_view->predefined_resources[idx].total = 0;
}
} else {
string_to_int_map_.Insert(resource_name);
int64_t resource_id = string_to_int_map_.Get(resource_name);
auto itr = local_view->custom_resources.find(resource_id);
auto itr = local_view->custom_resources.find(resource_id.ToInt());
if (itr != local_view->custom_resources.end()) {
auto diff_capacity = resource_total_fp - itr->second.total;
itr->second.total += diff_capacity;
@ -172,29 +155,26 @@ void ClusterResourceManager::UpdateResourceCapacity(const std::string &node_id_s
} else {
ResourceCapacity resource_capacity;
resource_capacity.total = resource_capacity.available = resource_total_fp;
local_view->custom_resources.emplace(resource_id, resource_capacity);
local_view->custom_resources.emplace(resource_id.ToInt(), resource_capacity);
}
}
}
void ClusterResourceManager::DeleteResource(const std::string &node_id_string,
const std::string &resource_name) {
int64_t node_id = string_to_int_map_.Get(node_id_string);
void ClusterResourceManager::DeleteResource(scheduling::NodeID node_id,
scheduling::ResourceID resource_id) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return;
}
int idx = GetPredefinedResourceIndex(resource_name);
int idx = GetPredefinedResourceIndex(resource_id);
auto local_view = it->second.GetMutableLocalView();
if (idx != -1) {
local_view->predefined_resources[idx].available = 0;
local_view->predefined_resources[idx].total = 0;
} else {
int64_t resource_id = string_to_int_map_.Get(resource_name);
auto itr = local_view->custom_resources.find(resource_id);
auto itr = local_view->custom_resources.find(resource_id.ToInt());
if (itr != local_view->custom_resources.end()) {
local_view->custom_resources.erase(itr);
}
@ -202,33 +182,22 @@ void ClusterResourceManager::DeleteResource(const std::string &node_id_string,
}
std::string ClusterResourceManager::GetNodeResourceViewString(
const std::string &node_name) const {
int64_t node_id = string_to_int_map_.Get(node_name);
scheduling::NodeID node_id) const {
const auto &node = map_find_or_die(nodes_, node_id);
return node.GetLocalView().DictString(string_to_int_map_);
return node.GetLocalView().DictString();
}
std::string ClusterResourceManager::GetResourceNameFromIndex(int64_t res_idx) {
if (res_idx == CPU) {
return ray::kCPU_ResourceLabel;
} else if (res_idx == GPU) {
return ray::kGPU_ResourceLabel;
} else if (res_idx == OBJECT_STORE_MEM) {
return ray::kObjectStoreMemory_ResourceLabel;
} else if (res_idx == MEM) {
return ray::kMemory_ResourceLabel;
} else {
return string_to_int_map_.Get((uint64_t)res_idx);
}
return scheduling::ResourceID(res_idx).Binary();
}
const absl::flat_hash_map<int64_t, Node> &ClusterResourceManager::GetResourceView()
const {
const absl::flat_hash_map<scheduling::NodeID, Node>
&ClusterResourceManager::GetResourceView() const {
return nodes_;
}
bool ClusterResourceManager::SubtractNodeAvailableResources(
int64_t node_id, const ResourceRequest &resource_request) {
scheduling::NodeID node_id, const ResourceRequest &resource_request) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return false;
@ -260,7 +229,7 @@ bool ClusterResourceManager::SubtractNodeAvailableResources(
}
bool ClusterResourceManager::HasSufficientResource(
int64_t node_id, const ResourceRequest &resource_request,
scheduling::NodeID node_id, const ResourceRequest &resource_request,
bool ignore_object_store_memory_requirement) const {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
@ -305,8 +274,8 @@ bool ClusterResourceManager::HasSufficientResource(
void ClusterResourceManager::DebugString(std::stringstream &buffer) const {
for (auto &node : GetResourceView()) {
buffer << "node id: " << node.first;
buffer << node.second.GetLocalView().DebugString(string_to_int_map_);
buffer << "node id: " << node.first.ToInt();
buffer << node.second.GetLocalView().DebugString();
}
}

View file

@ -37,10 +37,10 @@ class ClusterTaskManagerTest;
/// This class is not thread safe.
class ClusterResourceManager {
public:
explicit ClusterResourceManager(StringIdMap &string_to_int_map);
explicit ClusterResourceManager();
/// Get the resource view of the cluster.
const absl::flat_hash_map<int64_t, Node> &GetResourceView() const;
const absl::flat_hash_map<scheduling::NodeID, Node> &GetResourceView() const;
// Mapping from predefined resource indexes to resource strings
std::string GetResourceNameFromIndex(int64_t res_idx);
@ -49,14 +49,13 @@ class ClusterResourceManager {
///
/// \param node_id_string ID of the node which resoruces need to be udpated.
/// \param resource_data The node resource data.
bool UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data);
bool UpdateNode(scheduling::NodeID node_id, const rpc::ResourcesData &resource_data);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param node_id_string ID of the node to be removed.
bool RemoveNode(const std::string &node_id_string);
bool RemoveNode(scheduling::NodeID node_id);
/// Get number of nodes in the cluster.
int64_t NumNodes() const;
@ -66,24 +65,24 @@ class ClusterResourceManager {
/// \param node_name: Node whose resource we want to update.
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void UpdateResourceCapacity(const std::string &node_name,
const std::string &resource_name, double resource_total);
void UpdateResourceCapacity(scheduling::NodeID node_id,
scheduling::ResourceID resource_id, double resource_total);
/// Delete a given resource from a given node.
///
/// \param node_name: Node whose resource we want to delete.
/// \param resource_name: Resource we want to delete
void DeleteResource(const std::string &node_name, const std::string &resource_name);
void DeleteResource(scheduling::NodeID node_id, scheduling::ResourceID resource_id);
/// Return local resources in human-readable string form.
std::string GetNodeResourceViewString(const std::string &node_name) const;
std::string GetNodeResourceViewString(scheduling::NodeID node_id) const;
/// Get local resource.
const NodeResources &GetNodeResources(const std::string &node_name) const;
const NodeResources &GetNodeResources(scheduling::NodeID node_id) const;
/// Subtract available resource from a given node.
//// Return false if such node doesn't exist.
bool SubtractNodeAvailableResources(int64_t node_id,
bool SubtractNodeAvailableResources(scheduling::NodeID node_id,
const ResourceRequest &resource_request);
/// Check if we have sufficient resource to fullfill resource request for an given node.
@ -92,7 +91,8 @@ class ClusterResourceManager {
/// \param resource_request: the request we want to check.
/// \param ignore_object_store_memory_requirement: if true, we will ignore the
/// require_object_store_memory in the resource_request.
bool HasSufficientResource(int64_t node_id, const ResourceRequest &resource_request,
bool HasSufficientResource(scheduling::NodeID node_id,
const ResourceRequest &resource_request,
bool ignore_object_store_memory_requirement) const;
void DebugString(std::stringstream &buffer) const;
@ -104,29 +104,20 @@ class ClusterResourceManager {
///
/// \param node_id: Node ID.
/// \param node_resources: Up to date total and available resources of the node.
void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources);
void AddOrUpdateNode(scheduling::NodeID node_id, const NodeResources &node_resources);
void AddOrUpdateNode(
const std::string &node_id,
scheduling::NodeID node_id,
const absl::flat_hash_map<std::string, double> &resource_map_total,
const absl::flat_hash_map<std::string, double> &resource_map_available);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param node_id ID of the node to be removed.
bool RemoveNode(int64_t node_id);
/// Return resources associated to the given node_id in ret_resources.
/// If node_id not found, return false; otherwise return true.
bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const;
bool GetNodeResources(scheduling::NodeID node_id, NodeResources *ret_resources) const;
/// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID.
absl::flat_hash_map<int64_t, Node> nodes_;
/// Keep the mapping between node and resource IDs in string representation
/// to integer representation. Used for improving map performance.
StringIdMap &string_to_int_map_;
absl::flat_hash_map<scheduling::NodeID, Node> nodes_;
friend class ClusterResourceSchedulerTest;
friend struct ClusterResourceManagerTest;

View file

@ -18,14 +18,14 @@
namespace ray {
NodeResources CreateNodeResources(StringIdMap &map, double available_cpu,
double total_cpu, double available_custom_resource = 0,
NodeResources CreateNodeResources(double available_cpu, double total_cpu,
double available_custom_resource = 0,
double total_custom_resource = 0,
bool object_pulls_queued = false) {
NodeResources resources;
resources.predefined_resources = {{available_cpu, total_cpu}, {0, 0}, {0, 0}, {0, 0}};
resources.custom_resources[map.Insert("CUSTOM")] = {available_custom_resource,
total_custom_resource};
resources.custom_resources[scheduling::ResourceID("CUSTOM").ToInt()] = {
available_custom_resource, total_custom_resource};
resources.object_pulls_queued = object_pulls_queued;
return resources;
}
@ -33,22 +33,21 @@ NodeResources CreateNodeResources(StringIdMap &map, double available_cpu,
struct ClusterResourceManagerTest : public ::testing::Test {
void SetUp() {
::testing::Test::SetUp();
manager = std::make_unique<ClusterResourceManager>(map);
manager = std::make_unique<ClusterResourceManager>();
manager->AddOrUpdateNode(node0,
CreateNodeResources(/*available_cpu*/ 1, /*total_cpu*/ 1));
manager->AddOrUpdateNode(
node0, CreateNodeResources(map, /*available_cpu*/ 1, /*total_cpu*/ 1));
manager->AddOrUpdateNode(
node1, CreateNodeResources(map, /*available_cpu*/ 0, /*total_cpu*/ 0,
node1, CreateNodeResources(/*available_cpu*/ 0, /*total_cpu*/ 0,
/*available_custom*/ 1, /*total_custom*/ 1));
manager->AddOrUpdateNode(
node2, CreateNodeResources(map, /*available_cpu*/ 1, /*total_cpu*/ 1,
node2, CreateNodeResources(/*available_cpu*/ 1, /*total_cpu*/ 1,
/*available_custom*/ 1, /*total_custom*/ 1,
/*object_pulls_queued*/ true));
}
StringIdMap map;
int64_t node0 = 0;
int64_t node1 = 1;
int64_t node2 = 2;
int64_t node3 = 3;
scheduling::NodeID node0 = scheduling::NodeID(0);
scheduling::NodeID node1 = scheduling::NodeID(1);
scheduling::NodeID node2 = scheduling::NodeID(2);
scheduling::NodeID node3 = scheduling::NodeID(3);
std::unique_ptr<ClusterResourceManager> manager;
};
@ -57,32 +56,32 @@ TEST_F(ClusterResourceManagerTest, HasSufficientResourceTest) {
node3, {}, /*ignore_object_store_memory_requirement*/ false));
ASSERT_TRUE(manager->HasSufficientResource(
node0,
ResourceMapToResourceRequest(map, {{"CPU", 1}},
ResourceMapToResourceRequest({{"CPU", 1}},
/*requires_object_store_memory=*/true),
/*ignore_object_store_memory_requirement*/ false));
ASSERT_FALSE(manager->HasSufficientResource(
node0,
ResourceMapToResourceRequest(map, {{"CUSTOM", 1}},
ResourceMapToResourceRequest({{"CUSTOM", 1}},
/*requires_object_store_memory=*/true),
/*ignore_object_store_memory_requirement*/ false));
ASSERT_TRUE(manager->HasSufficientResource(
node1,
ResourceMapToResourceRequest(map, {{"CUSTOM", 1}},
ResourceMapToResourceRequest({{"CUSTOM", 1}},
/*requires_object_store_memory=*/true),
/*ignore_object_store_memory_requirement*/ false));
ASSERT_TRUE(manager->HasSufficientResource(
node2,
ResourceMapToResourceRequest(map, {{"CPU", 1}},
ResourceMapToResourceRequest({{"CPU", 1}},
/*requires_object_store_memory=*/false),
/*ignore_object_store_memory_requirement*/ false));
ASSERT_FALSE(manager->HasSufficientResource(
node2,
ResourceMapToResourceRequest(map, {{"CPU", 1}},
ResourceMapToResourceRequest({{"CPU", 1}},
/*requires_object_store_memory=*/true),
/*ignore_object_store_memory_requirement*/ false));
ASSERT_TRUE(manager->HasSufficientResource(
node2,
ResourceMapToResourceRequest(map, {{"CPU", 1}},
ResourceMapToResourceRequest({{"CPU", 1}},
/*requires_object_store_memory=*/true),
/*ignore_object_store_memory_requirement*/ true));
}

View file

@ -21,25 +21,13 @@
namespace ray {
namespace {
// Add predefined resource in string_to_int_map to
// avoid conflict.
void PopulatePredefinedResources(StringIdMap &string_to_int_map) {
string_to_int_map.InsertOrDie(ray::kCPU_ResourceLabel, CPU)
.InsertOrDie(ray::kGPU_ResourceLabel, GPU)
.InsertOrDie(ray::kObjectStoreMemory_ResourceLabel, OBJECT_STORE_MEM)
.InsertOrDie(ray::kMemory_ResourceLabel, MEM);
}
} // namespace
ClusterResourceScheduler::ClusterResourceScheduler() {
PopulatePredefinedResources(string_to_int_map_);
cluster_resource_manager_ =
std::make_unique<ClusterResourceManager>(string_to_int_map_);
ClusterResourceScheduler::ClusterResourceScheduler()
: local_node_id_(scheduling::NodeID::Nil()) {
cluster_resource_manager_ = std::make_unique<ClusterResourceManager>();
NodeResources node_resources;
node_resources.predefined_resources.resize(PredefinedResources_MAX);
local_resource_manager_ = std::make_unique<LocalResourceManager>(
local_node_id_, string_to_int_map_, node_resources,
local_node_id_, node_resources,
/*get_used_object_store_memory*/ nullptr, /*get_pull_manager_at_capacity*/ nullptr,
[&](const NodeResources &local_resource_update) {
cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update);
@ -49,14 +37,12 @@ ClusterResourceScheduler::ClusterResourceScheduler() {
}
ClusterResourceScheduler::ClusterResourceScheduler(
int64_t local_node_id, const NodeResources &local_node_resources,
scheduling::NodeID local_node_id, const NodeResources &local_node_resources,
gcs::GcsClient &gcs_client)
: string_to_int_map_(), local_node_id_(local_node_id), gcs_client_(&gcs_client) {
PopulatePredefinedResources(string_to_int_map_);
cluster_resource_manager_ =
std::make_unique<ClusterResourceManager>(string_to_int_map_);
: local_node_id_(local_node_id), gcs_client_(&gcs_client) {
cluster_resource_manager_ = std::make_unique<ClusterResourceManager>();
local_resource_manager_ = std::make_unique<LocalResourceManager>(
local_node_id, string_to_int_map_, local_node_resources,
local_node_id, local_node_resources,
/*get_used_object_store_memory*/ nullptr, /*get_pull_manager_at_capacity*/ nullptr,
[&](const NodeResources &local_resource_update) {
cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update);
@ -67,19 +53,16 @@ ClusterResourceScheduler::ClusterResourceScheduler(
}
ClusterResourceScheduler::ClusterResourceScheduler(
const std::string &local_node_id,
scheduling::NodeID local_node_id,
const absl::flat_hash_map<std::string, double> &local_node_resources,
gcs::GcsClient &gcs_client, std::function<int64_t(void)> get_used_object_store_memory,
std::function<bool(void)> get_pull_manager_at_capacity)
: string_to_int_map_(), local_node_id_(), gcs_client_(&gcs_client) {
PopulatePredefinedResources(string_to_int_map_);
local_node_id_ = string_to_int_map_.Insert(local_node_id);
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, local_node_resources, local_node_resources);
cluster_resource_manager_ =
std::make_unique<ClusterResourceManager>(string_to_int_map_);
: local_node_id_(local_node_id), gcs_client_(&gcs_client) {
NodeResources node_resources =
ResourceMapToNodeResources(local_node_resources, local_node_resources);
cluster_resource_manager_ = std::make_unique<ClusterResourceManager>();
local_resource_manager_ = std::make_unique<LocalResourceManager>(
local_node_id_, string_to_int_map_, node_resources, get_used_object_store_memory,
local_node_id_, node_resources, get_used_object_store_memory,
get_pull_manager_at_capacity, [&](const NodeResources &local_resource_update) {
cluster_resource_manager_->AddOrUpdateNode(local_node_id_, local_resource_update);
});
@ -88,19 +71,18 @@ ClusterResourceScheduler::ClusterResourceScheduler(
local_node_id_, cluster_resource_manager_->GetResourceView());
}
bool ClusterResourceScheduler::NodeAlive(int64_t node_id) const {
bool ClusterResourceScheduler::NodeAlive(scheduling::NodeID node_id) const {
if (node_id == local_node_id_) {
return true;
}
if (node_id == -1) {
if (node_id.IsNil()) {
return false;
}
auto node_id_binary = string_to_int_map_.Get(node_id);
return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id_binary)) != nullptr;
return gcs_client_->Nodes().Get(NodeID::FromBinary(node_id.Binary())) != nullptr;
}
bool ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_request,
int64_t node_id) const {
scheduling::NodeID node_id) const {
// It's okay if the local node's pull manager is at capacity because we
// will eventually spill the task back from the waiting queue if its args
// cannot be pulled.
@ -109,7 +91,7 @@ bool ClusterResourceScheduler::IsSchedulable(const ResourceRequest &resource_req
/*ignore_object_store_memory_requirement*/ node_id == local_node_id_);
}
int64_t ClusterResourceScheduler::GetBestSchedulableNode(
scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
const ResourceRequest &resource_request,
const rpc::SchedulingStrategy &scheduling_strategy, bool actor_creation,
bool force_spillback, int64_t *total_violations, bool *is_infeasible) {
@ -120,7 +102,7 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(
resource_request, [this](auto node_id) { return this->NodeAlive(node_id); });
}
int64_t best_node_id = -1;
auto best_node_id = scheduling::NodeID::Nil();
if (scheduling_strategy.scheduling_strategy_case() ==
rpc::SchedulingStrategy::SchedulingStrategyCase::kSpreadSchedulingStrategy) {
best_node_id = scheduling_policy_->SpreadPolicy(
@ -135,7 +117,7 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(
[this](auto node_id) { return this->NodeAlive(node_id); });
}
*is_infeasible = best_node_id == -1 ? true : false;
*is_infeasible = best_node_id.IsNil();
if (!*is_infeasible) {
// TODO (Alex): Support soft constraints if needed later.
*total_violations = 0;
@ -143,35 +125,26 @@ int64_t ClusterResourceScheduler::GetBestSchedulableNode(
RAY_LOG(DEBUG) << "Scheduling decision. "
<< "forcing spillback: " << force_spillback
<< ". Best node: " << best_node_id << " "
<< (string_to_int_map_.Get(best_node_id) == "-1"
? NodeID::Nil()
: NodeID::FromBinary(string_to_int_map_.Get(best_node_id)))
<< ". Best node: " << best_node_id.ToInt() << " "
<< (best_node_id.IsNil() ? NodeID::Nil()
: NodeID::FromBinary(best_node_id.Binary()))
<< ", is infeasible: " << *is_infeasible;
return best_node_id;
}
std::string ClusterResourceScheduler::GetBestSchedulableNode(
scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
const absl::flat_hash_map<std::string, double> &task_resources,
const rpc::SchedulingStrategy &scheduling_strategy, bool requires_object_store_memory,
bool actor_creation, bool force_spillback, int64_t *total_violations,
bool *is_infeasible) {
ResourceRequest resource_request = ResourceMapToResourceRequest(
string_to_int_map_, task_resources, requires_object_store_memory);
int64_t node_id =
GetBestSchedulableNode(resource_request, scheduling_strategy, actor_creation,
force_spillback, total_violations, is_infeasible);
if (node_id == -1) {
// This is not a schedulable node, so return empty string.
return "";
}
// Return the string name of the node.
return string_to_int_map_.Get(node_id);
ResourceRequest resource_request =
ResourceMapToResourceRequest(task_resources, requires_object_store_memory);
return GetBestSchedulableNode(resource_request, scheduling_strategy, actor_creation,
force_spillback, total_violations, is_infeasible);
}
bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources(
int64_t node_id, const ResourceRequest &resource_request) {
scheduling::NodeID node_id, const ResourceRequest &resource_request) {
RAY_CHECK(node_id != local_node_id_);
// Just double check this node can still schedule the resource request.
@ -182,46 +155,40 @@ bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources(
resource_request);
}
const StringIdMap &ClusterResourceScheduler::GetStringIdMap() const {
return string_to_int_map_;
}
std::string ClusterResourceScheduler::DebugString(void) const {
std::stringstream buffer;
buffer << "\nLocal id: " << local_node_id_;
buffer << "\nLocal id: " << local_node_id_.ToInt();
buffer << " Local resources: " << local_resource_manager_->DebugString();
cluster_resource_manager_->DebugString(buffer);
return buffer.str();
}
bool ClusterResourceScheduler::AllocateRemoteTaskResources(
const std::string &node_string,
scheduling::NodeID node_id,
const absl::flat_hash_map<std::string, double> &task_resources) {
ResourceRequest resource_request = ResourceMapToResourceRequest(
string_to_int_map_, task_resources, /*requires_object_store_memory=*/false);
auto node_id = string_to_int_map_.Insert(node_string);
task_resources, /*requires_object_store_memory=*/false);
RAY_CHECK(node_id != local_node_id_);
return SubtractRemoteNodeAvailableResources(node_id, resource_request);
}
bool ClusterResourceScheduler::IsSchedulableOnNode(
const std::string &node_name, const absl::flat_hash_map<std::string, double> &shape) {
int64_t node_id = string_to_int_map_.Get(node_name);
auto resource_request = ResourceMapToResourceRequest(
string_to_int_map_, shape, /*requires_object_store_memory=*/false);
scheduling::NodeID node_id, const absl::flat_hash_map<std::string, double> &shape) {
auto resource_request =
ResourceMapToResourceRequest(shape, /*requires_object_store_memory=*/false);
return IsSchedulable(resource_request, node_id);
}
std::string ClusterResourceScheduler::GetBestSchedulableNode(
scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
const TaskSpecification &task_spec, bool prioritize_local_node,
bool exclude_local_node, bool requires_object_store_memory, bool *is_infeasible) {
// If the local node is available, we should directly return it instead of
// going through the full hybrid policy since we don't want spillback.
if (prioritize_local_node && !exclude_local_node &&
IsSchedulableOnNode(string_to_int_map_.Get(local_node_id_),
IsSchedulableOnNode(local_node_id_,
task_spec.GetRequiredResources().GetResourceMap())) {
*is_infeasible = false;
return string_to_int_map_.Get(local_node_id_);
return local_node_id_;
}
// This argument is used to set violation, which is an unsupported feature now.

View file

@ -50,18 +50,17 @@ class ClusterResourceScheduler {
/// \param local_node_id: ID of local node,
/// \param local_node_resources: The total and the available resources associated
/// with the local node.
ClusterResourceScheduler(int64_t local_node_id,
ClusterResourceScheduler(scheduling::NodeID local_node_id,
const NodeResources &local_node_resources,
gcs::GcsClient &gcs_client);
ClusterResourceScheduler(
const std::string &local_node_id,
scheduling::NodeID local_node_id,
const absl::flat_hash_map<std::string, double> &local_node_resources,
gcs::GcsClient &gcs_client,
std::function<int64_t(void)> get_used_object_store_memory = nullptr,
std::function<bool(void)> get_pull_manager_at_capacity = nullptr);
const StringIdMap &GetStringIdMap() const;
/// Find a node in the cluster on which we can schedule a given resource request.
/// In hybrid mode, see `scheduling_policy.h` for a description of the policy.
///
@ -76,10 +75,11 @@ class ClusterResourceScheduler {
///
/// \return emptry string, if no node can schedule the current request; otherwise,
/// return the string name of a node that can schedule the resource request.
std::string GetBestSchedulableNode(const TaskSpecification &task_spec,
bool prioritize_local_node, bool exclude_local_node,
bool requires_object_store_memory,
bool *is_infeasible);
scheduling::NodeID GetBestSchedulableNode(const TaskSpecification &task_spec,
bool prioritize_local_node,
bool exclude_local_node,
bool requires_object_store_memory,
bool *is_infeasible);
/// Subtract the resources required by a given resource request (resource_request) from
/// a given remote node.
@ -89,7 +89,7 @@ class ClusterResourceScheduler {
/// \return True if remote node has enough resources to satisfy the resource request.
/// False otherwise.
bool AllocateRemoteTaskResources(
const std::string &node_id,
scheduling::NodeID node_id,
const absl::flat_hash_map<std::string, double> &task_resources);
/// Return human-readable string for this scheduler state.
@ -100,7 +100,7 @@ class ClusterResourceScheduler {
///
/// \param node_name Name of the node.
/// \param shape The resource demand's shape.
bool IsSchedulableOnNode(const std::string &node_name,
bool IsSchedulableOnNode(scheduling::NodeID node_id,
const absl::flat_hash_map<std::string, double> &shape);
LocalResourceManager &GetLocalResourceManager() { return *local_resource_manager_; }
@ -109,7 +109,7 @@ class ClusterResourceScheduler {
}
private:
bool NodeAlive(int64_t node_id) const;
bool NodeAlive(scheduling::NodeID node_id) const;
/// Decrease the available resources of a node when a resource request is
/// scheduled on the given node.
@ -119,7 +119,7 @@ class ClusterResourceScheduler {
///
/// \return true, if resource_request can be indeed scheduled on the node,
/// and false otherwise.
bool SubtractRemoteNodeAvailableResources(int64_t node_id,
bool SubtractRemoteNodeAvailableResources(scheduling::NodeID node_id,
const ResourceRequest &resource_request);
/// Check whether a resource request can be scheduled given a node.
@ -128,7 +128,8 @@ class ClusterResourceScheduler {
/// \param node_id: ID of the node.
///
/// \return: Whether the request can be scheduled.
bool IsSchedulable(const ResourceRequest &resource_request, int64_t node_id) const;
bool IsSchedulable(const ResourceRequest &resource_request,
scheduling::NodeID node_id) const;
/// Find a node in the cluster on which we can schedule a given resource request.
/// In hybrid mode, see `scheduling_policy.h` for a description of the policy.
@ -145,10 +146,10 @@ class ClusterResourceScheduler {
///
/// \return -1, if no node can schedule the current request; otherwise,
/// return the ID of a node that can schedule the resource request.
int64_t GetBestSchedulableNode(const ResourceRequest &resource_request,
const rpc::SchedulingStrategy &scheduling_strategy,
bool actor_creation, bool force_spillback,
int64_t *violations, bool *is_infeasible);
scheduling::NodeID GetBestSchedulableNode(
const ResourceRequest &resource_request,
const rpc::SchedulingStrategy &scheduling_strategy, bool actor_creation,
bool force_spillback, int64_t *violations, bool *is_infeasible);
/// Similar to
/// int64_t GetBestSchedulableNode(...)
@ -156,17 +157,14 @@ class ClusterResourceScheduler {
/// \return "", if no node can schedule the current request; otherwise,
/// return the ID in string format of a node that can schedule the
// resource request.
std::string GetBestSchedulableNode(
scheduling::NodeID GetBestSchedulableNode(
const absl::flat_hash_map<std::string, double> &resource_request,
const rpc::SchedulingStrategy &scheduling_strategy,
bool requires_object_store_memory, bool actor_creation, bool force_spillback,
int64_t *violations, bool *is_infeasible);
/// Keep the mapping between node and resource IDs in string representation
/// to integer representation. Used for improving map performance.
StringIdMap string_to_int_map_;
/// Identifier of local node.
int64_t local_node_id_;
scheduling::NodeID local_node_id_;
/// Gcs client. It's not owned by this class.
gcs::GcsClient *gcs_client_;
/// Resources of local node.

View file

@ -185,19 +185,19 @@ class ClusterResourceSchedulerTest : public ::testing::Test {
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(i, node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
scheduling::NodeID(i), node_resources);
node_resources.custom_resources.clear();
}
}
void AssertPredefinedNodeResources(const ClusterResourceScheduler &resource_scheduler) {
ASSERT_EQ(resource_scheduler.string_to_int_map_.Get(ray::kCPU_ResourceLabel), CPU);
ASSERT_EQ(resource_scheduler.string_to_int_map_.Get(ray::kGPU_ResourceLabel), GPU);
ASSERT_EQ(
resource_scheduler.string_to_int_map_.Get(ray::kObjectStoreMemory_ResourceLabel),
OBJECT_STORE_MEM);
ASSERT_EQ(resource_scheduler.string_to_int_map_.Get(ray::kMemory_ResourceLabel), MEM);
void AssertPredefinedNodeResources() {
ASSERT_EQ(ray::kCPU_ResourceLabel, scheduling::ResourceID(CPU).Binary());
ASSERT_EQ(ray::kGPU_ResourceLabel, scheduling::ResourceID(GPU).Binary());
ASSERT_EQ(ray::kObjectStoreMemory_ResourceLabel,
scheduling::ResourceID(OBJECT_STORE_MEM).Binary());
ASSERT_EQ(ray::kMemory_ResourceLabel, scheduling::ResourceID(MEM).Binary());
}
std::unique_ptr<gcs::MockGcsClient> gcs_client_;
std::string node_name;
@ -313,7 +313,7 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingIdInsertOrDieTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingInitClusterTest) {
int num_nodes = 10;
ClusterResourceScheduler resource_scheduler;
AssertPredefinedNodeResources(resource_scheduler);
AssertPredefinedNodeResources();
initCluster(resource_scheduler, num_nodes);
@ -327,7 +327,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest) {
ClusterResourceScheduler resource_scheduler;
initCluster(resource_scheduler, num_nodes);
resource_scheduler.GetClusterResourceManager().RemoveNode(remove_id);
resource_scheduler.GetClusterResourceManager().RemoveNode(
scheduling::NodeID(remove_id));
ASSERT_TRUE(num_nodes - 1 == resource_scheduler.GetClusterResourceManager().NumNodes());
}
@ -361,19 +362,19 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) {
cust_capacities.push_back(rand() % 10);
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(update_id,
node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
scheduling::NodeID(update_id), node_resources);
}
ASSERT_TRUE(num_nodes == resource_scheduler.GetClusterResourceManager().NumNodes());
}
TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) {
absl::flat_hash_map<std::string, double> resource_total({{"CPU", 10}});
auto local_node_id = NodeID::FromRandom().Binary();
auto local_node_id = scheduling::NodeID(NodeID::FromRandom().Binary());
ClusterResourceScheduler resource_scheduler(local_node_id, resource_total,
*gcs_client_);
AssertPredefinedNodeResources(resource_scheduler);
auto remote_node_id = NodeID::FromRandom().Binary();
AssertPredefinedNodeResources();
auto remote_node_id = scheduling::NodeID(NodeID::FromRandom().Binary());
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
remote_node_id, resource_total, resource_total);
@ -382,17 +383,17 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) {
bool is_infeasible;
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_spread_scheduling_strategy();
std::string node_id_1 = resource_scheduler.GetBestSchedulableNode(
auto node_id_1 = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, false, &violations,
&is_infeasible);
absl::flat_hash_map<std::string, double> resource_available({{"CPU", 9}});
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
node_id_1, resource_total, resource_available);
std::string node_id_2 = resource_scheduler.GetBestSchedulableNode(
auto node_id_2 = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, false, &violations,
&is_infeasible);
ASSERT_EQ((std::set<std::string>{node_id_1, node_id_2}),
(std::set<std::string>{local_node_id, remote_node_id}));
ASSERT_EQ((std::set<scheduling::NodeID>{node_id_1, node_id_2}),
(std::set<scheduling::NodeID>{local_node_id, remote_node_id}));
}
TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {
@ -402,8 +403,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {
vector<int64_t> cust_ids{1, 2};
vector<FixedPoint> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(1, node_resources, *gcs_client_);
AssertPredefinedNodeResources(resource_scheduler);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(1), node_resources,
*gcs_client_);
AssertPredefinedNodeResources();
{
ResourceRequest resource_request;
@ -416,9 +418,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {
bool is_infeasible;
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_default_scheduling_strategy();
int64_t node_id = resource_scheduler.GetBestSchedulableNode(
auto node_id = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, &violations, &is_infeasible);
ASSERT_EQ(node_id, 1);
ASSERT_EQ(node_id.ToInt(), 1);
ASSERT_TRUE(violations == 0);
NodeResources nr1, nr2;
@ -455,23 +457,23 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest) {
absl::flat_hash_map<std::string, double> initial_resources = {
{ray::kCPU_ResourceLabel, 1}, {"custom", 1}};
std::string name = NodeID::FromRandom().Binary();
ClusterResourceScheduler resource_scheduler(name, initial_resources, *gcs_client_,
nullptr, nullptr);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(name), initial_resources,
*gcs_client_, nullptr, nullptr);
resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances(
ray::kCPU_ResourceLabel, {0, 1, 1});
resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances("custom",
{0, 1, 1});
scheduling::ResourceID(ray::kCPU_ResourceLabel), {0, 1, 1});
resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances(
scheduling::ResourceID("custom"), {0, 1, 1});
const auto &predefined_resources = resource_scheduler.GetClusterResourceManager()
.GetNodeResources(name)
.GetNodeResources(scheduling::NodeID(name))
.predefined_resources;
ASSERT_EQ(predefined_resources[CPU].total.Double(), 3);
const auto &custom_resources = resource_scheduler.GetClusterResourceManager()
.GetNodeResources(name)
.GetNodeResources(scheduling::NodeID(name))
.custom_resources;
auto resource_id = resource_scheduler.string_to_int_map_.Get("custom");
auto resource_id = scheduling::ResourceID("custom").ToInt();
ASSERT_EQ(custom_resources.find(resource_id)->second.total.Double(), 3);
}
@ -487,13 +489,14 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) {
vector<int64_t> cust_ids{1, 2};
vector<FixedPoint> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id,
node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
scheduling::NodeID(node_id), node_resources);
nr = node_resources;
}
// Check whether node resources were correctly added.
if (resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr_out)) {
if (resource_scheduler.GetClusterResourceManager().GetNodeResources(
scheduling::NodeID(node_id), &nr_out)) {
ASSERT_TRUE(nodeResourcesEqual(nr, nr_out));
} else {
ASSERT_TRUE(false);
@ -506,11 +509,12 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) {
vector<int64_t> cust_ids{2, 3};
vector<FixedPoint> cust_capacities{6, 6};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id,
node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
scheduling::NodeID(node_id), node_resources);
nr = node_resources;
}
if (resource_scheduler.GetClusterResourceManager().GetNodeResources(node_id, &nr_out)) {
if (resource_scheduler.GetClusterResourceManager().GetNodeResources(
scheduling::NodeID(node_id), &nr_out)) {
ASSERT_TRUE(nodeResourcesEqual(nr, nr_out));
} else {
ASSERT_TRUE(false);
@ -524,9 +528,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) {
vector<int64_t> cust_ids{1};
vector<FixedPoint> cust_capacities{10};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
auto node_id = NodeID::FromRandom();
auto node_internal_id = resource_scheduler.string_to_int_map_.Insert(node_id.Binary());
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_default_scheduling_strategy();
{
@ -535,8 +539,8 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) {
vector<int64_t> cust_ids{1, 2};
vector<FixedPoint> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_internal_id,
node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
scheduling::NodeID(node_id.Binary()), node_resources);
}
// Predefined resources, hard constraint violation
{
@ -546,9 +550,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) {
EmptyFixedPointVector);
int64_t violations;
bool is_infeasible;
int64_t node_id = resource_scheduler.GetBestSchedulableNode(
auto node_id = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, &violations, &is_infeasible);
ASSERT_EQ(node_id, -1);
ASSERT_TRUE(node_id.IsNil());
}
// Predefined resources, no constraint violation.
@ -559,9 +563,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) {
EmptyFixedPointVector);
int64_t violations;
bool is_infeasible;
int64_t node_id = resource_scheduler.GetBestSchedulableNode(
auto node_id = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, &violations, &is_infeasible);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(!node_id.IsNil());
ASSERT_TRUE(violations == 0);
}
// Custom resources, hard constraint violation.
@ -573,9 +577,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) {
initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands);
int64_t violations;
bool is_infeasible;
int64_t node_id = resource_scheduler.GetBestSchedulableNode(
auto node_id = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, &violations, &is_infeasible);
ASSERT_TRUE(node_id == -1);
ASSERT_TRUE(node_id.IsNil());
}
// Custom resources, no constraint violation.
{
@ -586,9 +590,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) {
initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands);
int64_t violations;
bool is_infeasible;
int64_t node_id = resource_scheduler.GetBestSchedulableNode(
auto node_id = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, &violations, &is_infeasible);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(!node_id.IsNil());
ASSERT_TRUE(violations == 0);
}
// Custom resource missing, hard constraint violation.
@ -600,9 +604,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) {
initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands);
int64_t violations;
bool is_infeasible;
int64_t node_id = resource_scheduler.GetBestSchedulableNode(
auto node_id = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, &violations, &is_infeasible);
ASSERT_TRUE(node_id == -1);
ASSERT_TRUE(node_id.IsNil());
}
// Placement hints, no constraint violation.
{
@ -613,9 +617,9 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) {
initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands);
int64_t violations;
bool is_infeasible;
int64_t node_id = resource_scheduler.GetBestSchedulableNode(
auto node_id = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, &violations, &is_infeasible);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(!node_id.IsNil());
ASSERT_TRUE(violations == 0);
}
}
@ -633,7 +637,8 @@ TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesWithCpuUnitTest)
vector<int64_t> cust_ids{1};
vector<FixedPoint> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
TaskResourceInstances available_cluster_resources =
resource_scheduler.GetLocalResourceManager()
@ -665,7 +670,8 @@ TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesTest) {
vector<int64_t> cust_ids{1};
vector<FixedPoint> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
TaskResourceInstances available_cluster_resources =
resource_scheduler.GetLocalResourceManager()
@ -701,7 +707,7 @@ TEST_F(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest) {
vector<FixedPoint> pred_capacities{3 /* CPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler cluster(0, node_resources, *gcs_client_);
ClusterResourceScheduler cluster(scheduling::NodeID(0), node_resources, *gcs_client_);
ResourceInstanceCapacities instances;
@ -734,7 +740,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
vector<FixedPoint> pred_capacities{3. /* CPU */, 4. /* MEM */, 5. /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
ResourceRequest resource_request;
vector<FixedPoint> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
@ -766,7 +773,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
vector<FixedPoint> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
ResourceRequest resource_request;
vector<FixedPoint> pred_demands = {4. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
@ -794,7 +802,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
vector<int64_t> cust_ids{1, 2};
vector<FixedPoint> cust_capacities{4, 4};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
ResourceRequest resource_request;
vector<FixedPoint> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
@ -826,7 +835,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
vector<int64_t> cust_ids{1, 2};
vector<FixedPoint> cust_capacities{4, 4};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
ResourceRequest resource_request;
vector<FixedPoint> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
@ -855,7 +865,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesAllocationFailureTest)
vector<int64_t> cust_ids{1, 2, 3};
vector<FixedPoint> cust_capacities{4, 4, 4};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
ResourceRequest resource_request;
vector<FixedPoint> pred_demands = {0. /* CPU */, 0. /* MEM */, 0. /* GPU */};
@ -885,7 +896,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) {
vector<int64_t> cust_ids{1, 2};
vector<FixedPoint> cust_capacities{4., 4.};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
ResourceRequest resource_request;
vector<FixedPoint> pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
@ -913,25 +925,29 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) {
}
TEST_F(ClusterResourceSchedulerTest, DeadNodeTest) {
ClusterResourceScheduler resource_scheduler("local", {}, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"),
absl::flat_hash_map<std::string, double>{},
*gcs_client_);
absl::flat_hash_map<std::string, double> resource;
resource["CPU"] = 10000.0;
auto node_id = NodeID::FromRandom();
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_id.Binary(),
resource, resource);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
scheduling::NodeID(node_id.Binary()), resource, resource);
int64_t violations = 0;
bool is_infeasible = false;
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_default_scheduling_strategy();
ASSERT_EQ(node_id.Binary(), resource_scheduler.GetBestSchedulableNode(
resource, scheduling_strategy, false, false, false,
&violations, &is_infeasible));
ASSERT_EQ(scheduling::NodeID(node_id.Binary()),
resource_scheduler.GetBestSchedulableNode(resource, scheduling_strategy,
false, false, false, &violations,
&is_infeasible));
EXPECT_CALL(*gcs_client_->mock_node_accessor, Get(node_id, ::testing::_))
.WillOnce(::testing::Return(nullptr))
.WillOnce(::testing::Return(nullptr));
ASSERT_EQ("", resource_scheduler.GetBestSchedulableNode(resource, scheduling_strategy,
false, false, false,
&violations, &is_infeasible));
ASSERT_TRUE(resource_scheduler
.GetBestSchedulableNode(resource, scheduling_strategy, false, false,
false, &violations, &is_infeasible)
.IsNil());
}
TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) {
@ -941,7 +957,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) {
vector<int64_t> cust_ids{1};
vector<FixedPoint> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
std::vector<double> allocate_gpu_instances{0.5, 0.5, 0.5, 0.5};
resource_scheduler.GetLocalResourceManager().SubtractGPUResourceInstances(
@ -1004,7 +1021,8 @@ TEST_F(ClusterResourceSchedulerTest,
vector<int64_t> cust_ids{1};
vector<FixedPoint> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
{
std::vector<double> allocate_gpu_instances{0.5, 0.5, 2, 0.5};
@ -1023,7 +1041,8 @@ TEST_F(ClusterResourceSchedulerTest,
expected_available_gpu_instances.begin()));
NodeResources nr;
resource_scheduler.GetClusterResourceManager().GetNodeResources(0, &nr);
resource_scheduler.GetClusterResourceManager().GetNodeResources(
scheduling::NodeID(0), &nr);
ASSERT_TRUE(nr.predefined_resources[GPU].available == 1.5);
}
@ -1044,7 +1063,8 @@ TEST_F(ClusterResourceSchedulerTest,
expected_available_gpu_instances.begin()));
NodeResources nr;
resource_scheduler.GetClusterResourceManager().GetNodeResources(0, &nr);
resource_scheduler.GetClusterResourceManager().GetNodeResources(
scheduling::NodeID(0), &nr);
ASSERT_TRUE(nr.predefined_resources[GPU].available == 3.8);
}
}
@ -1055,7 +1075,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) {
vector<FixedPoint> pred_capacities{4. /* CPU */, 2. /* MEM */, 4. /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
ResourceRequest resource_request;
vector<FixedPoint> pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
@ -1081,7 +1102,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithoutCpuUnitTest) {
vector<FixedPoint> pred_capacities{4. /* CPU */, 2. /* MEM */, 4. /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID(0), node_resources,
*gcs_client_);
ResourceRequest resource_request;
vector<FixedPoint> pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
@ -1104,10 +1126,12 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithoutCpuUnitTest) {
TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) {
absl::flat_hash_map<std::string, double> resource_spec({{"CPU", 1}});
ClusterResourceScheduler resource_scheduler("local", {}, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"),
absl::flat_hash_map<std::string, double>{},
*gcs_client_);
for (int i = 0; i < 100; i++) {
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
NodeID::FromRandom().Binary(), {}, {});
scheduling::NodeID(NodeID::FromRandom().Binary()), {}, {});
}
// No feasible nodes.
@ -1115,14 +1139,14 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) {
bool is_infeasible;
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_default_scheduling_strategy();
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(resource_spec, scheduling_strategy,
false, false, false,
&total_violations, &is_infeasible),
"");
ASSERT_TRUE(resource_scheduler
.GetBestSchedulableNode(resource_spec, scheduling_strategy, false,
false, false, &total_violations, &is_infeasible)
.IsNil());
// Feasible remote node, but doesn't currently have resources available. We
// should spill there.
auto remote_feasible = NodeID::FromRandom().Binary();
auto remote_feasible = scheduling::NodeID(NodeID::FromRandom().Binary());
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
remote_feasible, resource_spec, {{"CPU", 0.}});
ASSERT_EQ(remote_feasible, resource_scheduler.GetBestSchedulableNode(
@ -1131,7 +1155,7 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) {
// Feasible remote node, and it currently has resources available. We should
// prefer to spill there.
auto remote_available = NodeID::FromRandom().Binary();
auto remote_available = scheduling::NodeID(NodeID::FromRandom().Binary());
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
remote_available, resource_spec, resource_spec);
ASSERT_EQ(remote_available, resource_scheduler.GetBestSchedulableNode(
@ -1146,14 +1170,15 @@ TEST_F(ClusterResourceSchedulerTest, ResourceUsageReportTest) {
absl::flat_hash_map<std::string, double> initial_resources(
{{"CPU", 1}, {"GPU", 2}, {"memory", 3}, {"1", 1}, {"2", 2}, {"3", 3}});
ClusterResourceScheduler resource_scheduler("0", initial_resources, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID("0"), initial_resources,
*gcs_client_);
NodeResources other_node_resources;
vector<FixedPoint> other_pred_capacities{1. /* CPU */, 1. /* MEM */, 1. /* GPU */};
vector<FixedPoint> other_cust_capacities{5., 4., 3., 2., 1.};
initNodeResources(other_node_resources, other_pred_capacities, cust_ids,
other_cust_capacities);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(12345,
other_node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
scheduling::NodeID(12345), other_node_resources);
{ // Cluster is idle.
rpc::ResourcesData data;
@ -1229,15 +1254,15 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) {
{"object_store_memory", 1000 * 1024 * 1024}});
int64_t used_object_store_memory = 250 * 1024 * 1024;
int64_t *ptr = &used_object_store_memory;
ClusterResourceScheduler resource_scheduler("0", initial_resources, *gcs_client_,
[&] { return *ptr; });
ClusterResourceScheduler resource_scheduler(scheduling::NodeID("0"), initial_resources,
*gcs_client_, [&] { return *ptr; });
NodeResources other_node_resources;
vector<FixedPoint> other_pred_capacities{1. /* CPU */, 1. /* MEM */, 1. /* GPU */};
vector<FixedPoint> other_cust_capacities{10.};
initNodeResources(other_node_resources, other_pred_capacities, cust_ids,
other_cust_capacities);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(12345,
other_node_resources);
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
scheduling::NodeID(12345), other_node_resources);
{
rpc::ResourcesData data;
@ -1326,8 +1351,9 @@ TEST_F(ClusterResourceSchedulerTest, ObjectStoreMemoryUsageTest) {
TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) {
absl::flat_hash_map<std::string, double> initial_resources({{"CPU", 1}});
ClusterResourceScheduler resource_scheduler("local", initial_resources, *gcs_client_);
auto remote = NodeID::FromRandom().Binary();
ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"),
initial_resources, *gcs_client_);
auto remote = scheduling::NodeID(NodeID::FromRandom().Binary());
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(remote, {{"CPU", 2.}},
{{"CPU", 2.}});
const absl::flat_hash_map<std::string, double> task_spec = {{"CPU", 1.}};
@ -1366,9 +1392,10 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) {
}
// Our local view says there are not enough resources on the remote node to
// schedule another task.
ASSERT_EQ("", resource_scheduler.GetBestSchedulableNode(
task_spec, scheduling_strategy, false, false, true, &t,
&is_infeasible));
ASSERT_EQ(
resource_scheduler.GetBestSchedulableNode(task_spec, scheduling_strategy, false,
false, true, &t, &is_infeasible),
scheduling::NodeID::Nil());
ASSERT_FALSE(
resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources(
task_spec, task_allocation));
@ -1378,7 +1405,8 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) {
}
TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) {
ClusterResourceScheduler resource_scheduler("local", {{"CPU", 2}}, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"), {{"CPU", 2}},
*gcs_client_);
absl::flat_hash_map<std::string, double> resource_request = {{"CPU", 1},
{"custom123", 2}};
@ -1387,36 +1415,38 @@ TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) {
rpc::SchedulingStrategy scheduling_strategy;
scheduling_strategy.mutable_default_scheduling_strategy();
std::string result = resource_scheduler.GetBestSchedulableNode(
auto result = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible);
ASSERT_TRUE(result.empty());
ASSERT_TRUE(result.IsNil());
resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances("custom123",
{0., 1.0, 1.0});
resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances(
scheduling::ResourceID("custom123"), {0., 1.0, 1.0});
result = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible);
ASSERT_FALSE(result.empty()) << resource_scheduler.DebugString();
ASSERT_FALSE(result.IsNil()) << resource_scheduler.DebugString();
resource_request["custom123"] = 3;
result = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible);
ASSERT_TRUE(result.empty());
ASSERT_TRUE(result.IsNil());
resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances("custom123",
{1.0});
resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances(
scheduling::ResourceID("custom123"), {1.0});
result = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible);
ASSERT_FALSE(result.empty());
ASSERT_FALSE(result.IsNil());
resource_scheduler.GetLocalResourceManager().DeleteLocalResource("custom123");
resource_scheduler.GetLocalResourceManager().DeleteLocalResource(
scheduling::ResourceID("custom123"));
result = resource_scheduler.GetBestSchedulableNode(
resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible);
ASSERT_TRUE(result.empty());
ASSERT_TRUE(result.IsNil());
}
TEST_F(ClusterResourceSchedulerTest, AvailableResourceEmptyTest) {
ClusterResourceScheduler resource_scheduler("local", {{"custom123", 5}}, *gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"),
{{"custom123", 5}}, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
absl::flat_hash_map<std::string, double> resource_request = {{"custom123", 5}};
@ -1424,16 +1454,17 @@ TEST_F(ClusterResourceSchedulerTest, AvailableResourceEmptyTest) {
resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources(
resource_request, resource_instances);
ASSERT_TRUE(allocated);
ASSERT_TRUE(
resource_scheduler.GetLocalResourceManager().IsAvailableResourceEmpty("custom123"));
ASSERT_TRUE(resource_scheduler.GetLocalResourceManager().IsAvailableResourceEmpty(
scheduling::ResourceID("custom123")));
}
TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) {
absl::flat_hash_map<std::string, double> resource_spec({{"CPU", 1}});
ClusterResourceScheduler resource_scheduler("local", resource_spec, *gcs_client_);
std::vector<string> node_ids;
ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"), resource_spec,
*gcs_client_);
std::vector<scheduling::NodeID> node_ids;
for (int i = 0; i < 100; i++) {
node_ids.push_back(NodeID::FromRandom().Binary());
node_ids.emplace_back(NodeID::FromRandom().Binary());
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_ids.back(), {},
{});
}
@ -1447,20 +1478,20 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) {
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(
resource_spec, scheduling_strategy, false, false,
/*force_spillback=*/false, &total_violations, &is_infeasible),
"local");
scheduling::NodeID("local"));
// If spillback is forced, we try to spill to remote, but only if there is a
// schedulable node.
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(
resource_spec, scheduling_strategy, false, false,
/*force_spillback=*/true, &total_violations, &is_infeasible),
"");
scheduling::NodeID::Nil());
// Choose a remote node that has the resources available.
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(node_ids[50],
resource_spec, {});
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(
resource_spec, scheduling_strategy, false, false,
/*force_spillback=*/true, &total_violations, &is_infeasible),
"");
scheduling::NodeID::Nil());
resource_scheduler.GetClusterResourceManager().AddOrUpdateNode(
node_ids[51], resource_spec, resource_spec);
ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(
@ -1476,8 +1507,8 @@ TEST_F(ClusterResourceSchedulerTest, CustomResourceInstanceTest) {
"custom_unit_instance_resources": "FPGA"
}
)");
ClusterResourceScheduler resource_scheduler("local", {{"CPU", 4}, {"FPGA", 2}},
*gcs_client_);
ClusterResourceScheduler resource_scheduler(scheduling::NodeID("local"),
{{"CPU", 4}, {"FPGA", 2}}, *gcs_client_);
StringIdMap mock_string_to_int_map;
int64_t fpga_resource_id = mock_string_to_int_map.Insert("FPGA");
@ -1509,7 +1540,7 @@ TEST_F(ClusterResourceSchedulerTest, CustomResourceInstanceTest) {
TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesSerializedStringTest) {
ClusterResourceScheduler resource_scheduler(
"local", {{"CPU", 4}, {"memory", 4}, {"GPU", 2}}, *gcs_client_);
scheduling::NodeID("local"), {{"CPU", 4}, {"memory", 4}, {"GPU", 2}}, *gcs_client_);
std::shared_ptr<TaskResourceInstances> cluster_resources =
std::make_shared<TaskResourceInstances>();
addTaskResourceInstances(true, {2.}, 0, cluster_resources.get());
@ -1534,7 +1565,7 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesSerializedStringTest)
addTaskResourceInstances(true, {4.}, 1, cluster_instance_resources.get());
addTaskResourceInstances(true, {1., 1.}, 2, cluster_instance_resources.get());
ClusterResourceScheduler resource_scheduler_cpu_instance(
"local", {{"CPU", 4}, {"memory", 4}, {"GPU", 2}}, *gcs_client_);
scheduling::NodeID("local"), {{"CPU", 4}, {"memory", 4}, {"GPU", 2}}, *gcs_client_);
std::string instance_serialized_string =
resource_scheduler_cpu_instance.GetLocalResourceManager()
.SerializedTaskResourceInstances(cluster_instance_resources);

View file

@ -77,21 +77,21 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() {
RayTask task = work->task;
RAY_LOG(DEBUG) << "Scheduling pending task "
<< task.GetTaskSpecification().TaskId();
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode(
task.GetTaskSpecification(), work->PrioritizeLocalNode(),
/*exclude_local_node*/ false,
/*requires_object_store_memory*/ false, &is_infeasible);
// There is no node that has available resources to run the request.
// Move on to the next shape.
if (node_id_string.empty()) {
if (scheduling_node_id.IsNil()) {
RAY_LOG(DEBUG) << "No node found to schedule a task "
<< task.GetTaskSpecification().TaskId() << " is infeasible?"
<< is_infeasible;
break;
}
NodeID node_id = NodeID::FromBinary(node_id_string);
NodeID node_id = NodeID::FromBinary(scheduling_node_id.Binary());
ScheduleOnNode(node_id, work);
work_it = work_queue.erase(work_it);
}
@ -129,7 +129,7 @@ void ClusterTaskManager::TryScheduleInfeasibleTask() {
RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:"
<< task.GetTaskSpecification().TaskId();
bool is_infeasible;
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
cluster_resource_scheduler_->GetBestSchedulableNode(
task.GetTaskSpecification(), work->PrioritizeLocalNode(),
/*exclude_local_node*/ false,
/*requires_object_store_memory*/ false, &is_infeasible);
@ -293,7 +293,8 @@ void ClusterTaskManager::ScheduleOnNode(const NodeID &spillback_to,
RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to;
if (!cluster_resource_scheduler_->AllocateRemoteTaskResources(
spillback_to.Binary(), task_spec.GetRequiredResources().GetResourceMap())) {
scheduling::NodeID(spillback_to.Binary()),
task_spec.GetRequiredResources().GetResourceMap())) {
RAY_LOG(DEBUG) << "Tried to allocate resources for request " << task_spec.TaskId()
<< " on a remote node that are no longer available";
}

View file

@ -129,8 +129,8 @@ std::shared_ptr<ClusterResourceScheduler> CreateSingleNodeScheduler(
local_node_resources[ray::kGPU_ResourceLabel] = num_gpus;
local_node_resources[ray::kMemory_ResourceLabel] = 128;
auto scheduler =
std::make_shared<ClusterResourceScheduler>(id, local_node_resources, gcs_client);
auto scheduler = std::make_shared<ClusterResourceScheduler>(
scheduling::NodeID(id), local_node_resources, gcs_client);
return scheduler;
}
@ -285,8 +285,8 @@ class ClusterTaskManagerTest : public ::testing::Test {
node_resources[ray::kCPU_ResourceLabel] = num_cpus;
node_resources[ray::kGPU_ResourceLabel] = num_gpus;
node_resources[ray::kMemory_ResourceLabel] = memory;
scheduler_->GetClusterResourceManager().AddOrUpdateNode(id.Binary(), node_resources,
node_resources);
scheduler_->GetClusterResourceManager().AddOrUpdateNode(
scheduling::NodeID(id.Binary()), node_resources, node_resources);
rpc::GcsNodeInfo info;
node_info_[id] = info;
@ -1480,7 +1480,8 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) {
// Delete cpu resource of local node, then task 2 should be turned into
// infeasible.
scheduler_->GetLocalResourceManager().DeleteLocalResource(ray::kCPU_ResourceLabel);
scheduler_->GetLocalResourceManager().DeleteLocalResource(
scheduling::ResourceID(ray::kCPU_ResourceLabel));
RayTask task2 = CreateTask({{ray::kCPU_ResourceLabel, 4}});
rpc::RequestWorkerLeaseReply reply2;
@ -1506,7 +1507,8 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) {
TEST_F(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources) {
const NodeResources &node_resources =
scheduler_->GetClusterResourceManager().GetNodeResources(id_.Binary());
scheduler_->GetClusterResourceManager().GetNodeResources(
scheduling::NodeID(id_.Binary()));
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8);
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 4);
@ -1831,8 +1833,8 @@ TEST_F(ClusterTaskManagerTest, PopWorkerExactlyOnce) {
}
TEST_F(ClusterTaskManagerTest, CapRunningOnDispatchQueue) {
scheduler_->GetLocalResourceManager().AddLocalResourceInstances(ray::kGPU_ResourceLabel,
{1, 1, 1});
scheduler_->GetLocalResourceManager().AddLocalResourceInstances(
scheduling::ResourceID(ray::kGPU_ResourceLabel), {1, 1, 1});
RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 4}, {ray::kGPU_ResourceLabel, 1}},
/*num_args=*/0, /*args=*/{});
RayTask task2 = CreateTask({{ray::kCPU_ResourceLabel, 4}, {ray::kGPU_ResourceLabel, 1}},
@ -1883,8 +1885,8 @@ TEST_F(ClusterTaskManagerTest, CapRunningOnDispatchQueue) {
}
TEST_F(ClusterTaskManagerTest, ZeroCPUTasks) {
scheduler_->GetLocalResourceManager().AddLocalResourceInstances(ray::kGPU_ResourceLabel,
{1, 1, 1});
scheduler_->GetLocalResourceManager().AddLocalResourceInstances(
scheduling::ResourceID(ray::kGPU_ResourceLabel), {1, 1, 1});
RayTask task = CreateTask({{"GPU", 1}}, /*num_args=*/0, /*args=*/{});
RayTask task2 = CreateTask({{"GPU", 1}}, /*num_args=*/0, /*args=*/{});
RayTask task3 = CreateTask({{"GPU", 1}}, /*num_args=*/0, /*args=*/{});

View file

@ -22,20 +22,17 @@
namespace ray {
LocalResourceManager::LocalResourceManager(
int64_t local_node_id, StringIdMap &resource_name_to_id,
const NodeResources &node_resources,
scheduling::NodeID local_node_id, const NodeResources &node_resources,
std::function<int64_t(void)> get_used_object_store_memory,
std::function<bool(void)> get_pull_manager_at_capacity,
std::function<void(const NodeResources &)> resource_change_subscriber)
: local_node_id_(local_node_id),
resource_name_to_id_(resource_name_to_id),
get_used_object_store_memory_(get_used_object_store_memory),
get_pull_manager_at_capacity_(get_pull_manager_at_capacity),
resource_change_subscriber_(resource_change_subscriber) {
InitResourceUnitInstanceInfo();
InitLocalResources(node_resources);
RAY_LOG(DEBUG) << "local resources: "
<< local_resources_.DebugString(resource_name_to_id);
RAY_LOG(DEBUG) << "local resources: " << local_resources_.DebugString();
}
void LocalResourceManager::InitResourceUnitInstanceInfo() {
@ -57,14 +54,15 @@ void LocalResourceManager::InitResourceUnitInstanceInfo() {
std::vector<std::string> results;
boost::split(results, custom_unit_instance_resources, boost::is_any_of(","));
for (std::string &result : results) {
int64_t resource_id = resource_name_to_id_.Insert(result);
int64_t resource_id = scheduling::ResourceID(result).ToInt();
custom_unit_instance_resources_.emplace(resource_id);
}
}
}
void LocalResourceManager::AddLocalResourceInstances(
const std::string &resource_name, const std::vector<FixedPoint> &instances) {
scheduling::ResourceID resource_id, const std::vector<FixedPoint> &instances) {
auto resource_name = resource_id.Binary();
ResourceInstanceCapacities *node_instances;
local_resources_.predefined_resources.resize(PredefinedResources_MAX);
if (kCPU_ResourceLabel == resource_name) {
@ -76,9 +74,7 @@ void LocalResourceManager::AddLocalResourceInstances(
} else if (kMemory_ResourceLabel == resource_name) {
node_instances = &local_resources_.predefined_resources[MEM];
} else {
resource_name_to_id_.Insert(resource_name);
int64_t resource_id = resource_name_to_id_.Get(resource_name);
node_instances = &local_resources_.custom_resources[resource_id];
node_instances = &local_resources_.custom_resources[resource_id.ToInt()];
}
if (node_instances->total.size() < instances.size()) {
@ -93,8 +89,8 @@ void LocalResourceManager::AddLocalResourceInstances(
OnResourceChanged();
}
void LocalResourceManager::DeleteLocalResource(const std::string &resource_name) {
int idx = GetPredefinedResourceIndex(resource_name);
void LocalResourceManager::DeleteLocalResource(scheduling::ResourceID resource_id) {
int idx = GetPredefinedResourceIndex(resource_id);
if (idx != -1) {
for (auto &total : local_resources_.predefined_resources[idx].total) {
total = 0;
@ -103,26 +99,23 @@ void LocalResourceManager::DeleteLocalResource(const std::string &resource_name)
available = 0;
}
} else {
int64_t resource_id = resource_name_to_id_.Get(resource_name);
auto c_itr = local_resources_.custom_resources.find(resource_id);
auto c_itr = local_resources_.custom_resources.find(resource_id.ToInt());
if (c_itr != local_resources_.custom_resources.end()) {
local_resources_.custom_resources[resource_id].total.clear();
local_resources_.custom_resources[resource_id].available.clear();
local_resources_.custom_resources[resource_id.ToInt()].total.clear();
local_resources_.custom_resources[resource_id.ToInt()].available.clear();
local_resources_.custom_resources.erase(c_itr);
}
}
OnResourceChanged();
}
bool LocalResourceManager::IsAvailableResourceEmpty(const std::string &resource_name) {
int idx = GetPredefinedResourceIndex(resource_name);
bool LocalResourceManager::IsAvailableResourceEmpty(scheduling::ResourceID resource_id) {
int idx = GetPredefinedResourceIndex(resource_id);
if (idx != -1) {
return FixedPoint::Sum(local_resources_.predefined_resources[idx].available) <= 0;
}
resource_name_to_id_.Insert(resource_name);
int64_t resource_id = resource_name_to_id_.Get(resource_name);
auto itr = local_resources_.custom_resources.find(resource_id);
auto itr = local_resources_.custom_resources.find(resource_id.ToInt());
if (itr != local_resources_.custom_resources.end()) {
return FixedPoint::Sum(itr->second.available) <= 0;
} else {
@ -132,7 +125,7 @@ bool LocalResourceManager::IsAvailableResourceEmpty(const std::string &resource_
std::string LocalResourceManager::DebugString(void) const {
std::stringstream buffer;
buffer << local_resources_.DebugString(resource_name_to_id_);
buffer << local_resources_.DebugString();
return buffer.str();
}
@ -441,7 +434,7 @@ bool LocalResourceManager::AllocateLocalTaskResources(
RAY_CHECK(task_allocation != nullptr);
// We don't track object store memory demands so no need to allocate them.
ResourceRequest resource_request = ResourceMapToResourceRequest(
resource_name_to_id_, task_resources, /*requires_object_store_memory=*/false);
task_resources, /*requires_object_store_memory=*/false);
return AllocateLocalTaskResources(resource_request, task_allocation);
}
@ -510,8 +503,7 @@ void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data)
// Initialize if last report resources is empty.
if (!last_report_resources_) {
NodeResources node_resources =
ResourceMapToNodeResources(resource_name_to_id_, {{}}, {{}});
NodeResources node_resources = ResourceMapToNodeResources({{}}, {{}});
last_report_resources_.reset(new NodeResources(node_resources));
}
@ -533,7 +525,7 @@ void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data)
uint64_t custom_id = it.first;
const auto &capacity = it.second;
const auto &last_capacity = last_report_resources_->custom_resources[custom_id];
const auto &label = resource_name_to_id_.Get(custom_id);
auto label = scheduling::ResourceID(custom_id).Binary();
// Note: available may be negative, but only report positive to GCS.
if (capacity.available != last_capacity.available && capacity.available > 0) {
resources_data.set_resources_available_changed(true);
@ -541,7 +533,8 @@ void LocalResourceManager::FillResourceUsage(rpc::ResourcesData &resources_data)
capacity.available.Double();
}
if (capacity.total != last_capacity.total) {
(*resources_data.mutable_resources_total())[label] = capacity.total.Double();
(*resources_data.mutable_resources_total())[std::move(label)] =
capacity.total.Double();
}
}
@ -586,7 +579,7 @@ ray::gcs::NodeResourceInfoAccessor::ResourceMap LocalResourceManager::GetResourc
}
for (auto entry : local_resources_.custom_resources) {
std::string resource_name = resource_name_to_id_.Get(entry.first);
std::string resource_name = scheduling::ResourceID(entry.first).Binary();
double resource_total = FixedPoint::Sum(entry.second.total).Double();
if (!resource_map_filter.contains(resource_name)) {
continue;
@ -646,36 +639,28 @@ std::string LocalResourceManager::SerializedTaskResourceInstances(
void LocalResourceManager::ResetLastReportResourceUsage(
const SchedulingResources &replacement) {
last_report_resources_ = std::make_unique<NodeResources>(ResourceMapToNodeResources(
resource_name_to_id_, replacement.GetTotalResources().GetResourceMap(),
replacement.GetAvailableResources().GetResourceMap()));
last_report_resources_ = std::make_unique<NodeResources>(
ResourceMapToNodeResources(replacement.GetTotalResources().GetResourceMap(),
replacement.GetAvailableResources().GetResourceMap()));
}
bool LocalResourceManager::ResourcesExist(const std::string &resource_name) {
int idx = GetPredefinedResourceIndex(resource_name);
bool LocalResourceManager::ResourcesExist(scheduling::ResourceID resource_id) {
int idx = GetPredefinedResourceIndex(resource_id);
if (idx != -1) {
// Return true directly for predefined resources as we always initialize this kind of
// resources at the beginning.
return true;
} else {
int64_t resource_id = resource_name_to_id_.Get(resource_name);
const auto &it = local_resources_.custom_resources.find(resource_id);
const auto &it = local_resources_.custom_resources.find(resource_id.ToInt());
return it != local_resources_.custom_resources.end();
}
}
int GetPredefinedResourceIndex(const std::string &resource_name) {
int idx = -1;
if (resource_name == ray::kCPU_ResourceLabel) {
idx = (int)ray::CPU;
} else if (resource_name == ray::kGPU_ResourceLabel) {
idx = (int)ray::GPU;
} else if (resource_name == ray::kObjectStoreMemory_ResourceLabel) {
idx = (int)ray::OBJECT_STORE_MEM;
} else if (resource_name == ray::kMemory_ResourceLabel) {
idx = (int)ray::MEM;
};
return idx;
int GetPredefinedResourceIndex(scheduling::ResourceID resource_id) {
if (resource_id.ToInt() >= 0 && resource_id.ToInt() < PredefinedResources_MAX) {
return resource_id.ToInt();
}
return -1;
}
} // namespace ray

View file

@ -40,30 +40,29 @@ namespace ray {
class LocalResourceManager {
public:
LocalResourceManager(
int64_t local_node_id, StringIdMap &resource_name_to_id,
const NodeResources &node_resources,
scheduling::NodeID local_node_id, const NodeResources &node_resources,
std::function<int64_t(void)> get_used_object_store_memory,
std::function<bool(void)> get_pull_manager_at_capacity,
std::function<void(const NodeResources &)> resource_change_subscriber);
int64_t GetNodeId() const { return local_node_id_; }
scheduling::NodeID GetNodeId() const { return local_node_id_; }
/// Add a local resource that is available.
///
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void AddLocalResourceInstances(const std::string &resource_name,
void AddLocalResourceInstances(scheduling::ResourceID resource_id,
const std::vector<FixedPoint> &instances);
/// Delete a given resource from the local node.
///
/// \param resource_name: Resource we want to delete
void DeleteLocalResource(const std::string &resource_name);
void DeleteLocalResource(scheduling::ResourceID resource_id);
/// Check whether the available resources are empty.
///
/// \param resource_name: Resource which we want to check.
bool IsAvailableResourceEmpty(const std::string &resource_name);
bool IsAvailableResourceEmpty(scheduling::ResourceID resource_id);
/// Return local resources.
NodeResourceInstances GetLocalResources() const { return local_resources_; }
@ -164,7 +163,7 @@ class LocalResourceManager {
/// \param resource_name: the specific resource name.
///
/// \return true, if exist. otherwise, false.
bool ResourcesExist(const std::string &resource_name);
bool ResourcesExist(scheduling::ResourceID resource_id);
private:
/// Create instances for each resource associated with the local node, given
@ -270,10 +269,7 @@ class LocalResourceManager {
void UpdateAvailableObjectStoreMemResource();
/// Identifier of local node.
int64_t local_node_id_;
/// Keep the mapping between node and resource IDs in string representation
/// to integer representation. Used for improving map performance.
StringIdMap &resource_name_to_id_;
scheduling::NodeID local_node_id_;
/// Resources of local node.
NodeResourceInstances local_resources_;
/// Cached resources, used to compare with newest one in light heartbeat mode.
@ -301,6 +297,6 @@ class LocalResourceManager {
FRIEND_TEST(ClusterResourceSchedulerTest, CustomResourceInstanceTest);
};
int GetPredefinedResourceIndex(const std::string &resource_name);
int GetPredefinedResourceIndex(scheduling::ResourceID resource_id);
} // end namespace ray

View file

@ -315,13 +315,14 @@ void LocalTaskManager::SpillWaitingTasks() {
// TODO(swang): The policy currently does not account for the amount of
// object store memory availability. Ideally, we should pick the node with
// the most memory availability.
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode(
(*it)->task.GetTaskSpecification(),
/*prioritize_local_node*/ true,
/*exclude_local_node*/ force_spillback,
/*requires_object_store_memory*/ true, &is_infeasible);
if (!node_id_string.empty() && node_id_string != self_node_id_.Binary()) {
NodeID node_id = NodeID::FromBinary(node_id_string);
if (!scheduling_node_id.IsNil() &&
scheduling_node_id.Binary() != self_node_id_.Binary()) {
NodeID node_id = NodeID::FromBinary(scheduling_node_id.Binary());
Spillback(node_id, *it);
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
@ -330,7 +331,7 @@ void LocalTaskManager::SpillWaitingTasks() {
waiting_tasks_index_.erase(task_id);
it = waiting_task_queue_.erase(it);
} else {
if (node_id_string.empty()) {
if (scheduling_node_id.IsNil()) {
RAY_LOG(DEBUG) << "RayTask " << task_id
<< " has blocked dependencies, but no other node has resources, "
"keeping the task local";
@ -347,17 +348,17 @@ void LocalTaskManager::SpillWaitingTasks() {
bool LocalTaskManager::TrySpillback(const std::shared_ptr<internal::Work> &work,
bool &is_infeasible) {
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode(
work->task.GetTaskSpecification(), work->PrioritizeLocalNode(),
/*exclude_local_node*/ false,
/*requires_object_store_memory*/ false, &is_infeasible);
if (is_infeasible || node_id_string == self_node_id_.Binary() ||
node_id_string.empty()) {
if (is_infeasible || scheduling_node_id.IsNil() ||
scheduling_node_id.Binary() == self_node_id_.Binary()) {
return false;
}
NodeID node_id = NodeID::FromBinary(node_id_string);
NodeID node_id = NodeID::FromBinary(scheduling_node_id.Binary());
Spillback(node_id, work);
return true;
}
@ -387,7 +388,7 @@ bool LocalTaskManager::PoppedWorkerHandler(
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
for (auto &entry : required_resource) {
if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist(
entry.first)) {
scheduling::ResourceID(entry.first))) {
RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first !=
PlacementGroupID::Nil());
RAY_LOG(DEBUG) << "The placement group: "
@ -522,7 +523,8 @@ void LocalTaskManager::Spillback(const NodeID &spillback_to,
RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to;
if (!cluster_resource_scheduler_->AllocateRemoteTaskResources(
spillback_to.Binary(), task_spec.GetRequiredResources().GetResourceMap())) {
scheduling::NodeID(spillback_to.Binary()),
task_spec.GetRequiredResources().GetResourceMap())) {
RAY_LOG(DEBUG) << "Tried to allocate resources for request " << task_spec.TaskId()
<< " on a remote node that are no longer available";
}
@ -984,7 +986,6 @@ bool LocalTaskManager::ReturnCpuResourcesToBlockedWorker(
ResourceSet LocalTaskManager::CalcNormalTaskResources() const {
absl::flat_hash_map<std::string, FixedPoint> total_normal_task_resources;
const auto &string_id_map = cluster_resource_scheduler_->GetStringIdMap();
for (auto &entry : leased_workers_) {
std::shared_ptr<WorkerInterface> worker = entry.second;
auto &task_spec = worker->GetAssignedTask().GetTaskSpecification();
@ -1009,7 +1010,8 @@ ResourceSet LocalTaskManager::CalcNormalTaskResources() const {
}
for (auto &entry : resource_request.custom_resources) {
if (entry.second > 0) {
total_normal_task_resources[string_id_map.Get(entry.first)] += entry.second;
total_normal_task_resources[scheduling::ResourceID(entry.first).Binary()] +=
entry.second;
}
}
}

View file

@ -14,6 +14,8 @@
#include "ray/raylet/scheduling/scheduling_ids.h"
namespace ray {
int64_t StringIdMap::Get(const std::string &string_id) const {
auto it = string_to_int_.find(string_id);
if (it == string_to_int_.end()) {
@ -68,3 +70,5 @@ StringIdMap &StringIdMap::InsertOrDie(const std::string &string_id, int64_t valu
}
int64_t StringIdMap::Count() { return string_to_int_.size(); }
} // namespace ray

View file

@ -14,6 +14,7 @@
#pragma once
#include <functional>
#include <string>
#include "absl/container/flat_hash_map.h"
@ -22,6 +23,17 @@
/// Limit the ID range to test for collisions.
#define MAX_ID_TEST 8
namespace ray {
/// List of predefined resources.
enum PredefinedResources { CPU, MEM, GPU, OBJECT_STORE_MEM, PredefinedResources_MAX };
const std::string kCPU_ResourceLabel = "CPU";
const std::string kGPU_ResourceLabel = "GPU";
const std::string kObjectStoreMemory_ResourceLabel = "object_store_memory";
const std::string kMemory_ResourceLabel = "memory";
const std::string kBundle_ResourceLabel = "bundle";
/// Class to map string IDs to unique integer IDs and back.
class StringIdMap {
absl::flat_hash_map<std::string, int64_t> string_to_int_;
@ -62,3 +74,85 @@ class StringIdMap {
/// Get number of identifiers.
int64_t Count();
};
enum class SchedulingIDTag { Node, Resource };
/// Represent a string scheduling id. It optimizes the storage by
/// using a singleton StringIdMap, and only store the integer index as
/// its only member.
///
/// Note: this class is not thread safe!
template <SchedulingIDTag T>
class BaseSchedulingID {
public:
explicit BaseSchedulingID(const std::string &name) : id_{GetMap().Insert(name)} {}
explicit BaseSchedulingID(int64_t id) : id_{id} {}
int64_t ToInt() const { return id_; }
std::string Binary() const { return GetMap().Get(id_); }
bool operator==(const BaseSchedulingID &rhs) const { return id_ == rhs.id_; }
bool operator!=(const BaseSchedulingID &rhs) const { return id_ != rhs.id_; }
bool operator<(const BaseSchedulingID &rhs) const { return id_ < rhs.id_; }
bool IsNil() const { return id_ == -1; }
static BaseSchedulingID Nil() { return BaseSchedulingID(-1); }
private:
/// Meyer's singleton to store the StringIdMap.
static StringIdMap &GetMap() {
static StringIdMap map;
return map;
}
int64_t id_ = -1;
};
template <ray::SchedulingIDTag T>
std::ostream &operator<<(std::ostream &os, const ray::BaseSchedulingID<T> &id) {
os << id.ToInt();
return os;
}
template <>
inline std::ostream &operator<<(
std::ostream &os, const ray::BaseSchedulingID<SchedulingIDTag::Resource> &id) {
os << id.Binary();
return os;
}
/// Specialization for SchedulingIDTag. Specifically, we populate
/// the singleton map with PredefinedResources.
template <>
inline StringIdMap &BaseSchedulingID<SchedulingIDTag::Resource>::GetMap() {
static StringIdMap map = []() {
StringIdMap map;
return map.InsertOrDie(kCPU_ResourceLabel, CPU)
.InsertOrDie(kGPU_ResourceLabel, GPU)
.InsertOrDie(kObjectStoreMemory_ResourceLabel, OBJECT_STORE_MEM)
.InsertOrDie(kMemory_ResourceLabel, MEM);
}();
return map;
}
namespace scheduling {
/// The actual scheduling id definitions which are used in scheduler.
using ResourceID = BaseSchedulingID<SchedulingIDTag::Resource>;
using NodeID = BaseSchedulingID<SchedulingIDTag::Node>;
} // namespace scheduling
} // namespace ray
/// implements hash function for BaseSchedulingID<T>
namespace std {
template <ray::SchedulingIDTag T>
struct hash<ray::BaseSchedulingID<T>> {
std::size_t operator()(const ray::BaseSchedulingID<T> &id) const {
return std::hash<int64_t>()(id.ToInt());
}
};
} // namespace std

View file

@ -0,0 +1,52 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/raylet/scheduling/scheduling_ids.h"
#include "gtest/gtest.h"
namespace ray {
using namespace ray::scheduling;
struct SchedulingIDsTest : public ::testing::Test {};
TEST_F(SchedulingIDsTest, BasicTest) {
std::vector<std::string> string_ids = {"hello", "whaaat", "yes"};
std::vector<NodeID> node_ids;
for (auto &string_id : string_ids) {
node_ids.emplace_back(NodeID(string_id));
ASSERT_EQ(node_ids.back().Binary(), string_id);
}
ASSERT_EQ(node_ids[0], NodeID(string_ids[0]));
ASSERT_EQ(node_ids[0], NodeID(node_ids[0].ToInt()));
ASSERT_TRUE(NodeID::Nil().IsNil());
ASSERT_EQ(NodeID::Nil().ToInt(), -1);
ASSERT_EQ(NodeID::Nil().Binary(), "-1");
ASSERT_EQ(NodeID(13), NodeID(13));
ASSERT_NE(NodeID(1), NodeID(2));
ASSERT_TRUE(NodeID(1) < NodeID(2));
}
TEST_F(SchedulingIDsTest, PrepopulateResourceIDTest) {
ASSERT_EQ(kCPU_ResourceLabel, ResourceID(CPU).Binary());
ASSERT_EQ(kGPU_ResourceLabel, ResourceID(GPU).Binary());
ASSERT_EQ(kObjectStoreMemory_ResourceLabel, ResourceID(OBJECT_STORE_MEM).Binary());
ASSERT_EQ(kMemory_ResourceLabel, ResourceID(MEM).Binary());
// mean while NodeID is not populated.
ASSERT_NE(kCPU_ResourceLabel, NodeID(CPU).Binary());
}
} // namespace ray

View file

@ -38,10 +38,10 @@ bool DoesNodeHaveGPUs(const NodeResources &resources) {
}
} // namespace
int64_t SchedulingPolicy::SpreadPolicy(const ResourceRequest &resource_request,
bool force_spillback, bool require_available,
std::function<bool(int64_t)> is_node_available) {
std::vector<int64_t> round;
scheduling::NodeID SchedulingPolicy::SpreadPolicy(
const ResourceRequest &resource_request, bool force_spillback, bool require_available,
std::function<bool(scheduling::NodeID)> is_node_available) {
std::vector<scheduling::NodeID> round;
round.reserve(nodes_.size());
for (const auto &pair : nodes_) {
round.emplace_back(pair.first);
@ -69,19 +69,19 @@ int64_t SchedulingPolicy::SpreadPolicy(const ResourceRequest &resource_request,
is_node_available);
}
int64_t SchedulingPolicy::HybridPolicyWithFilter(
scheduling::NodeID SchedulingPolicy::HybridPolicyWithFilter(
const ResourceRequest &resource_request, float spread_threshold, bool force_spillback,
bool require_available, std::function<bool(int64_t)> is_node_available,
bool require_available, std::function<bool(scheduling::NodeID)> is_node_available,
NodeFilter node_filter) {
// Step 1: Generate the traversal order. We guarantee that the first node is local, to
// encourage local scheduling. The rest of the traversal order should be globally
// consistent, to encourage using "warm" workers.
std::vector<int64_t> round;
std::vector<scheduling::NodeID> round;
round.reserve(nodes_.size());
const auto local_it = nodes_.find(local_node_id_);
RAY_CHECK(local_it != nodes_.end());
auto predicate = [node_filter, &is_node_available](
int64_t node_id, const NodeResources &node_resources) {
scheduling::NodeID node_id, const NodeResources &node_resources) {
if (!is_node_available(node_id)) {
return false;
}
@ -116,7 +116,7 @@ int64_t SchedulingPolicy::HybridPolicyWithFilter(
// place.
std::sort(round.begin() + start_index, round.end());
int64_t best_node_id = -1;
scheduling::NodeID best_node_id = scheduling::NodeID::Nil();
float best_utilization_score = INFINITY;
bool best_is_available = false;
@ -143,12 +143,12 @@ int64_t SchedulingPolicy::HybridPolicyWithFilter(
ignore_pull_manager_at_capacity);
float critical_resource_utilization =
node.GetLocalView().CalculateCriticalResourceUtilization();
RAY_LOG(DEBUG) << "Node " << node_id << " is "
RAY_LOG(DEBUG) << "Node " << node_id.ToInt() << " is "
<< (is_available ? "available" : "not available") << " for request "
<< resource_request.DebugString()
<< " with critical resource utilization "
<< critical_resource_utilization << " based on local view "
<< node.GetLocalView().DebugString(StringIdMap());
<< node.GetLocalView().DebugString();
if (critical_resource_utilization < spread_threshold) {
critical_resource_utilization = 0;
}
@ -180,11 +180,10 @@ int64_t SchedulingPolicy::HybridPolicyWithFilter(
return best_node_id;
}
int64_t SchedulingPolicy::HybridPolicy(const ResourceRequest &resource_request,
float spread_threshold, bool force_spillback,
bool require_available,
std::function<bool(int64_t)> is_node_available,
bool scheduler_avoid_gpu_nodes) {
scheduling::NodeID SchedulingPolicy::HybridPolicy(
const ResourceRequest &resource_request, float spread_threshold, bool force_spillback,
bool require_available, std::function<bool(scheduling::NodeID)> is_node_available,
bool scheduler_avoid_gpu_nodes) {
if (!scheduler_avoid_gpu_nodes || IsGPURequest(resource_request)) {
return HybridPolicyWithFilter(resource_request, spread_threshold, force_spillback,
require_available, std::move(is_node_available));
@ -194,7 +193,7 @@ int64_t SchedulingPolicy::HybridPolicy(const ResourceRequest &resource_request,
auto best_node_id = HybridPolicyWithFilter(
resource_request, spread_threshold, force_spillback,
/*require_available*/ true, is_node_available, NodeFilter::kNonGpu);
if (best_node_id != -1) {
if (!best_node_id.IsNil()) {
return best_node_id;
}
@ -204,9 +203,10 @@ int64_t SchedulingPolicy::HybridPolicy(const ResourceRequest &resource_request,
require_available, is_node_available);
}
int64_t SchedulingPolicy::RandomPolicy(const ResourceRequest &resource_request,
std::function<bool(int64_t)> is_node_available) {
int64_t best_node = -1;
scheduling::NodeID SchedulingPolicy::RandomPolicy(
const ResourceRequest &resource_request,
std::function<bool(scheduling::NodeID)> is_node_available) {
scheduling::NodeID best_node = scheduling::NodeID::Nil();
if (nodes_.empty()) {
return best_node;
}
@ -230,7 +230,7 @@ int64_t SchedulingPolicy::RandomPolicy(const ResourceRequest &resource_request,
iter = nodes_.begin();
}
}
RAY_LOG(DEBUG) << "RandomPolicy, best_node = " << best_node
RAY_LOG(DEBUG) << "RandomPolicy, best_node = " << best_node.ToInt()
<< ", # nodes = " << nodes_.size()
<< ", resource_request = " << resource_request.DebugString();
return best_node;

View file

@ -25,7 +25,8 @@ namespace raylet_scheduling_policy {
class SchedulingPolicy {
public:
SchedulingPolicy(int64_t local_node_id, const absl::flat_hash_map<int64_t, Node> &nodes)
SchedulingPolicy(scheduling::NodeID local_node_id,
const absl::flat_hash_map<scheduling::NodeID, Node> &nodes)
: local_node_id_(local_node_id),
nodes_(nodes),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {}
@ -62,30 +63,31 @@ class SchedulingPolicy {
///
/// \return -1 if the task is unfeasible, otherwise the node id (key in `nodes`) to
/// schedule on.
int64_t HybridPolicy(
scheduling::NodeID HybridPolicy(
const ResourceRequest &resource_request, float spread_threshold,
bool force_spillback, bool require_available,
std::function<bool(int64_t)> is_node_available,
std::function<bool(scheduling::NodeID)> is_node_available,
bool scheduler_avoid_gpu_nodes = RayConfig::instance().scheduler_avoid_gpu_nodes());
/// Round robin among available nodes.
/// If there are no available nodes, fallback to hybrid policy.
int64_t SpreadPolicy(const ResourceRequest &resource_request, bool force_spillback,
bool require_available,
std::function<bool(int64_t)> is_node_available);
scheduling::NodeID SpreadPolicy(
const ResourceRequest &resource_request, bool force_spillback,
bool require_available, std::function<bool(scheduling::NodeID)> is_node_available);
/// Policy that "randomly" picks a node that could fulfil the request.
/// TODO(scv119): if there are a lot of nodes died or can't fulfill the resource
/// requirement, the distribution might not be even.
int64_t RandomPolicy(const ResourceRequest &resource_request,
std::function<bool(int64_t)> is_node_available);
scheduling::NodeID RandomPolicy(
const ResourceRequest &resource_request,
std::function<bool(scheduling::NodeID)> is_node_available);
private:
/// Identifier of local node.
const int64_t local_node_id_;
const scheduling::NodeID local_node_id_;
/// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID.
const absl::flat_hash_map<int64_t, Node> &nodes_;
const absl::flat_hash_map<scheduling::NodeID, Node> &nodes_;
// The node to start round robin if it's spread scheduling.
// The index may be inaccurate when nodes are added or removed dynamically,
// but it should still be better than always scanning from 0 for spread scheduling.
@ -111,11 +113,11 @@ class SchedulingPolicy {
///
/// \return -1 if the task is unfeasible, otherwise the node id (key in `nodes`) to
/// schedule on.
int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request,
float spread_threshold, bool force_spillback,
bool require_available,
std::function<bool(int64_t)> is_node_available,
NodeFilter node_filter = NodeFilter::kAny);
scheduling::NodeID HybridPolicyWithFilter(
const ResourceRequest &resource_request, float spread_threshold,
bool force_spillback, bool require_available,
std::function<bool(scheduling::NodeID)> is_node_available,
NodeFilter node_filter = NodeFilter::kAny);
};
} // namespace raylet_scheduling_policy
} // namespace ray

View file

@ -33,27 +33,28 @@ NodeResources CreateNodeResources(double available_cpu, double total_cpu,
return resources;
}
class SchedulingPolicyTest : public ::testing::Test {};
class SchedulingPolicyTest : public ::testing::Test {
public:
scheduling::NodeID local_node = scheduling::NodeID(0);
scheduling::NodeID remote_node = scheduling::NodeID(1);
scheduling::NodeID remote_node_2 = scheduling::NodeID(2);
scheduling::NodeID remote_node_3 = scheduling::NodeID(3);
absl::flat_hash_map<scheduling::NodeID, Node> nodes;
};
TEST_F(SchedulingPolicyTest, SpreadPolicyTest) {
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node_1 = 1;
int64_t remote_node_2 = 2;
int64_t remote_node_3 = 3;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false);
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(20, 20, 0, 0, 0, 0));
// Unavailable node
nodes.emplace(remote_node_1, CreateNodeResources(0, 20, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(0, 20, 0, 0, 0, 0));
// Infeasible node
nodes.emplace(remote_node_2, CreateNodeResources(0, 0, 0, 0, 0, 0));
nodes.emplace(remote_node_3, CreateNodeResources(20, 20, 0, 0, 0, 0));
raylet_scheduling_policy::SchedulingPolicy scheduling_policy(local_node, nodes);
int64_t to_schedule =
auto to_schedule =
scheduling_policy.SpreadPolicy(req, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, local_node);
@ -67,17 +68,10 @@ TEST_F(SchedulingPolicyTest, SpreadPolicyTest) {
}
TEST_F(SchedulingPolicyTest, RandomPolicyTest) {
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node_1 = 1;
int64_t remote_node_2 = 2;
int64_t remote_node_3 = 3;
absl::flat_hash_map<int64_t, Node> nodes;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false);
nodes.emplace(local_node, CreateNodeResources(20, 20, 0, 0, 0, 0));
nodes.emplace(remote_node_1, CreateNodeResources(20, 20, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(20, 20, 0, 0, 0, 0));
// Unavailable node
nodes.emplace(remote_node_2, CreateNodeResources(0, 20, 0, 0, 0, 0));
// Infeasible node
@ -85,14 +79,14 @@ TEST_F(SchedulingPolicyTest, RandomPolicyTest) {
raylet_scheduling_policy::SchedulingPolicy scheduling_policy(local_node, nodes);
std::map<int64_t, size_t> decisions;
std::map<scheduling::NodeID, size_t> decisions;
size_t num_node_0_picks = 0;
size_t num_node_1_picks = 0;
for (int i = 0; i < 1000; i++) {
int64_t to_schedule = scheduling_policy.RandomPolicy(req, [](auto) { return true; });
ASSERT_TRUE(to_schedule >= 0);
ASSERT_TRUE(to_schedule <= 1);
if (to_schedule == 0) {
auto to_schedule = scheduling_policy.RandomPolicy(req, [](auto) { return true; });
ASSERT_TRUE(to_schedule.ToInt() >= 0);
ASSERT_TRUE(to_schedule.ToInt() <= 1);
if (to_schedule.ToInt() == 0) {
num_node_0_picks++;
} else {
num_node_1_picks++;
@ -104,10 +98,9 @@ TEST_F(SchedulingPolicyTest, RandomPolicyTest) {
}
TEST_F(SchedulingPolicyTest, FeasibleDefinitionTest) {
StringIdMap map;
auto task_req1 =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"object_store_memory", 1}}, false);
auto task_req2 = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
ResourceMapToResourceRequest({{"CPU", 1}, {"object_store_memory", 1}}, false);
auto task_req2 = ResourceMapToResourceRequest({{"CPU", 1}}, false);
{
// Don't break with a non-resized predefined resources array.
NodeResources resources;
@ -127,10 +120,9 @@ TEST_F(SchedulingPolicyTest, FeasibleDefinitionTest) {
}
TEST_F(SchedulingPolicyTest, AvailableDefinitionTest) {
StringIdMap map;
auto task_req1 =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"object_store_memory", 1}}, false);
auto task_req2 = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
ResourceMapToResourceRequest({{"CPU", 1}, {"object_store_memory", 1}}, false);
auto task_req2 = ResourceMapToResourceRequest({{"CPU", 1}}, false);
{
// Don't break with a non-resized predefined resources array.
NodeResources resources;
@ -192,34 +184,28 @@ TEST_F(SchedulingPolicyTest, AvailableTruncationTest) {
// In this test, the local node and a remote node are both available. The remote node
// has a lower critical resource utilization, but they're both truncated to 0, so we
// should still pick the local node (due to traversal order).
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false);
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(0.75, 2, 0, 0, 0, 0));
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, false, false, [](auto) { return true; });
auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, local_node);
}
TEST_F(SchedulingPolicyTest, AvailableTieBreakTest) {
// In this test, the local node and a remote node are both available. The remote node
// has a lower critical resource utilization so we schedule on it.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false);
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(1.5, 2, 0, 0, 0, 0));
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.50, false, false, [](auto) { return true; });
auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.50, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node);
}
@ -227,96 +213,67 @@ TEST_F(SchedulingPolicyTest, AvailableOverFeasibleTest) {
// In this test, the local node is feasible and has a lower critical resource
// utilization, but the remote node can run the task immediately, so we pick the remote
// node.
StringIdMap map;
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
absl::flat_hash_map<int64_t, Node> nodes;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false);
nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 0, 1));
nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 1, 1));
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.50, false, false, [](auto) { return true; });
auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.50, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node);
}
TEST_F(SchedulingPolicyTest, InfeasibleTest) {
// All the nodes are infeasible, so we return -1.
StringIdMap map;
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
absl::flat_hash_map<int64_t, Node> nodes;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false);
nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 0, 0));
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.50, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, -1);
auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.50, false, false, [](auto) { return true; });
ASSERT_TRUE(to_schedule.IsNil());
}
TEST_F(SchedulingPolicyTest, BarelyFeasibleTest) {
// Test the edge case where a task requires all of a node's resources, and the node is
// fully utilized.
StringIdMap map;
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false);
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(0, 1, 0, 0, 0, 1));
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.50, false, false, [](auto) { return true; });
auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.50, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, local_node);
}
TEST_F(SchedulingPolicyTest, TruncationAcrossFeasibleNodesTest) {
// Same as AvailableTruncationTest except now none of the nodes are available, but the
// tie break logic should apply to feasible nodes too.
StringIdMap map;
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
absl::flat_hash_map<int64_t, Node> nodes;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false);
nodes.emplace(local_node, CreateNodeResources(1, 2, 0, 0, 0, 1));
nodes.emplace(remote_node, CreateNodeResources(0.75, 2, 0, 0, 0, 1));
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, false, false, [](auto) { return true; });
auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, false, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, local_node);
}
TEST_F(SchedulingPolicyTest, ForceSpillbackIfAvailableTest) {
// The local node is better, but we force spillback, so we'll schedule on a non-local
// node anyways.
StringIdMap map;
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
absl::flat_hash_map<int64_t, Node> nodes;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false);
nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1));
nodes.emplace(remote_node, CreateNodeResources(1, 10, 0, 0, 1, 10));
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, true, true, [](auto) { return true; });
auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, true, true, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node);
}
TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) {
StringIdMap map;
int64_t local_node = 0;
int64_t remote_node = 1;
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(10, 10, 0, 0, 1, 1));
nodes.emplace(remote_node, CreateNodeResources(1, 2, 0, 0, 0, 0));
@ -324,18 +281,17 @@ TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) {
// The local node is better, but it has GPUs, the request is
// non GPU, and the remote node does not have GPUs, thus
// we should schedule on remote node.
const ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
const int to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
ResourceMapToResourceRequest(map, {{"CPU", 1}}, false), 0.51, false, true,
[](auto) { return true; }, true);
const ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false);
const auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
ResourceMapToResourceRequest({{"CPU", 1}}, false),
0.51, false, true, [](auto) { return true; }, true);
ASSERT_EQ(to_schedule, remote_node);
}
{
// A GPU request should be scheduled on a GPU node.
const ResourceRequest req = ResourceMapToResourceRequest(map, {{"GPU", 1}}, false);
const int to_schedule =
const ResourceRequest req = ResourceMapToResourceRequest({{"GPU", 1}}, false);
const auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
req, 0.51, false, true, [](auto) { return true; }, true);
@ -343,8 +299,8 @@ TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) {
}
{
// A CPU request can be be scheduled on a CPU node.
const ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
const int to_schedule =
const ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false);
const auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
req, 0.51, false, true, [](auto) { return true; }, true);
@ -353,8 +309,8 @@ TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) {
{
// A mixed CPU/GPU request should be scheduled on a GPU node.
const ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
const int to_schedule =
ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false);
const auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
req, 0.51, false, true, [](auto) { return true; }, true);
@ -365,16 +321,11 @@ TEST_F(SchedulingPolicyTest, AvoidSchedulingCPURequestsOnGPUNodes) {
TEST_F(SchedulingPolicyTest, SchedulenCPURequestsOnGPUNodeAsALastResort) {
// Schedule on remote node, even though the request is CPU only, because
// we can not schedule on CPU nodes.
StringIdMap map;
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
absl::flat_hash_map<int64_t, Node> nodes;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false);
nodes.emplace(local_node, CreateNodeResources(0, 10, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(1, 1, 0, 0, 1, 1));
const int to_schedule =
const auto to_schedule =
raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
req, 0.51, false, true, [](auto) { return true; }, true);
@ -383,81 +334,66 @@ TEST_F(SchedulingPolicyTest, SchedulenCPURequestsOnGPUNodeAsALastResort) {
TEST_F(SchedulingPolicyTest, ForceSpillbackTest) {
// The local node is available but disqualified.
StringIdMap map;
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false);
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1));
nodes.emplace(remote_node, CreateNodeResources(0, 2, 0, 0, 0, 1));
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, true, false, [](auto) { return true; });
auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, true, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, remote_node);
}
TEST_F(SchedulingPolicyTest, ForceSpillbackOnlyFeasibleLocallyTest) {
// The local node is better, but we force spillback, so we'll schedule on a non-local
// node anyways.
StringIdMap map;
ResourceRequest req =
ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
int64_t local_node = 0;
int64_t remote_node = 1;
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false);
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1));
nodes.emplace(remote_node, CreateNodeResources(0, 2, 0, 0, 0, 0));
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, true, false, [](auto) { return true; });
ASSERT_EQ(to_schedule, -1);
auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(req, 0.51, true, false, [](auto) { return true; });
ASSERT_TRUE(to_schedule.IsNil());
}
TEST_F(SchedulingPolicyTest, NonGpuNodePreferredSchedulingTest) {
// Prefer to schedule on CPU nodes first.
// GPU nodes should be preferred as a last resort.
StringIdMap map;
int64_t local_node = 0;
int64_t remote_node_1 = 1;
int64_t remote_node_2 = 2;
// local {CPU:2, GPU:1}
// Remote {CPU: 2}
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1));
nodes.emplace(remote_node_1, CreateNodeResources(2, 2, 0, 0, 0, 0));
nodes.emplace(remote_node, CreateNodeResources(2, 2, 0, 0, 0, 0));
nodes.emplace(remote_node_2, CreateNodeResources(3, 3, 0, 0, 0, 0));
ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
req, 0.51, false, true, [](auto) { return true; },
/*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node_1);
ResourceRequest req = ResourceMapToResourceRequest({{"CPU", 1}}, false);
auto to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
req, 0.51, false, true, [](auto) { return true; },
/*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node);
req = ResourceMapToResourceRequest(map, {{"CPU", 3}}, false);
req = ResourceMapToResourceRequest({{"CPU", 3}}, false);
to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
req, 0.51, false, true, [](auto) { return true; },
/*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node_2);
req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
req = ResourceMapToResourceRequest({{"CPU", 1}, {"GPU", 1}}, false);
to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
req, 0.51, false, true, [](auto) { return true; },
/*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, local_node);
req = ResourceMapToResourceRequest(map, {{"CPU", 2}}, false);
req = ResourceMapToResourceRequest({{"CPU", 2}}, false);
to_schedule = raylet_scheduling_policy::SchedulingPolicy(local_node, nodes)
.HybridPolicy(
req, 0.51, false, true, [](auto) { return true; },
/*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node_1);
ASSERT_EQ(to_schedule, remote_node);
}
int main(int argc, char **argv) {