[scheduler] Clean up TaskRequest (#16288)

This commit is contained in:
Lixin Wei 2021-06-08 02:38:34 +08:00 committed by GitHub
parent 418dd1e8b9
commit 75196cf7f4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 79 deletions

View file

@ -66,26 +66,21 @@ TaskRequest ResourceMapToTaskRequest(
TaskRequest task_request;
task_request.predefined_resources.resize(PredefinedResources_MAX);
task_request.custom_resources.resize(resource_map.size());
size_t i = 0;
for (auto const &resource : resource_map) {
if (resource.first == ray::kCPU_ResourceLabel) {
task_request.predefined_resources[CPU].demand = resource.second;
task_request.predefined_resources[CPU] = resource.second;
} else if (resource.first == ray::kGPU_ResourceLabel) {
task_request.predefined_resources[GPU].demand = resource.second;
task_request.predefined_resources[GPU] = resource.second;
} else if (resource.first == ray::kObjectStoreMemory_ResourceLabel) {
task_request.predefined_resources[OBJECT_STORE_MEM].demand = resource.second;
task_request.predefined_resources[OBJECT_STORE_MEM] = resource.second;
} else if (resource.first == ray::kMemory_ResourceLabel) {
task_request.predefined_resources[MEM].demand = resource.second;
task_request.predefined_resources[MEM] = resource.second;
} else {
string_to_int_map.Insert(resource.first);
task_request.custom_resources[i].id = string_to_int_map.Get(resource.first);
task_request.custom_resources[i].demand = resource.second;
i++;
int64_t id = string_to_int_map.Insert(resource.first);
task_request.custom_resources[id] = resource.second;
}
}
task_request.custom_resources.resize(i);
return task_request;
}
@ -113,22 +108,18 @@ TaskRequest TaskResourceInstances::ToTaskRequest() const {
task_req.predefined_resources.resize(PredefinedResources_MAX);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
task_req.predefined_resources[i].demand = 0;
task_req.predefined_resources[i] = 0;
for (auto predefined_resource_instance : this->predefined_resources[i]) {
task_req.predefined_resources[i].demand += predefined_resource_instance;
task_req.predefined_resources[i] += predefined_resource_instance;
}
}
task_req.custom_resources.resize(this->custom_resources.size());
size_t i = 0;
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
task_req.custom_resources[i].id = it->first;
task_req.custom_resources[i].demand = 0;
task_req.custom_resources[it->first] = 0;
for (size_t j = 0; j < it->second.size(); j++) {
task_req.custom_resources[i].demand += it->second[j];
task_req.custom_resources[it->first] += it->second[j];
}
i++;
}
return task_req;
}
@ -200,14 +191,14 @@ bool NodeResources::IsAvailable(const TaskRequest &task_req) const {
// First, check predefined resources.
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (i >= this->predefined_resources.size()) {
if (task_req.predefined_resources[i].demand != 0) {
if (task_req.predefined_resources[i] != 0) {
return false;
}
continue;
}
const auto &resource = this->predefined_resources[i].available;
const auto &demand = task_req.predefined_resources[i].demand;
const auto &demand = task_req.predefined_resources[i];
if (resource < demand) {
return false;
@ -216,10 +207,10 @@ bool NodeResources::IsAvailable(const TaskRequest &task_req) const {
// Now check custom resources.
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = this->custom_resources.find(task_req_custom_resource.id);
auto it = this->custom_resources.find(task_req_custom_resource.first);
if (it == this->custom_resources.end()) {
return false;
} else if (task_req_custom_resource.demand > it->second.available) {
} else if (task_req_custom_resource.second > it->second.available) {
return false;
}
}
@ -230,13 +221,13 @@ bool NodeResources::IsFeasible(const TaskRequest &task_req) const {
// First, check predefined resources.
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (i >= this->predefined_resources.size()) {
if (task_req.predefined_resources[i].demand != 0) {
if (task_req.predefined_resources[i] != 0) {
return false;
}
continue;
}
const auto &resource = this->predefined_resources[i].total;
const auto &demand = task_req.predefined_resources[i].demand;
const auto &demand = task_req.predefined_resources[i];
if (resource < demand) {
return false;
@ -245,10 +236,10 @@ bool NodeResources::IsFeasible(const TaskRequest &task_req) const {
// Now check custom resources.
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = this->custom_resources.find(task_req_custom_resource.id);
auto it = this->custom_resources.find(task_req_custom_resource.first);
if (it == this->custom_resources.end()) {
return false;
} else if (task_req_custom_resource.demand > it->second.total) {
} else if (task_req_custom_resource.second > it->second.total) {
return false;
}
}
@ -466,12 +457,12 @@ TaskResourceInstances NodeResourceInstances::GetAvailableResourceInstances() {
bool TaskRequest::IsEmpty() const {
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
if (this->predefined_resources[i].demand != 0) {
if (this->predefined_resources[i] != 0) {
return false;
}
}
for (size_t i = 0; i < this->custom_resources.size(); i++) {
if (this->custom_resources[i].demand != 0) {
for (auto &it : custom_resources) {
if (it.second != 0) {
return false;
}
}
@ -482,14 +473,14 @@ std::string TaskRequest::DebugString() const {
std::stringstream buffer;
buffer << " {";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
buffer << "(" << this->predefined_resources[i].demand << ") ";
buffer << "(" << this->predefined_resources[i] << ") ";
}
buffer << "}";
buffer << " [";
for (size_t i = 0; i < this->custom_resources.size(); i++) {
buffer << this->custom_resources[i].id << ":"
<< "(" << this->custom_resources[i].demand << ") ";
for (auto &it : this->custom_resources) {
buffer << it.first << ":"
<< "(" << it.second << ") ";
}
buffer << "]" << std::endl;
return buffer.str();

View file

@ -53,24 +53,13 @@ struct ResourceInstanceCapacities {
std::vector<FixedPoint> available;
};
struct ResourceRequest {
/// Amount of resource being requested.
FixedPoint demand;
};
/// Resource request, including resource ID. This is used for custom resources.
struct ResourceRequestWithId : ResourceRequest {
/// Resource ID.
int64_t id;
};
// Data structure specifying the capacity of each resource requested by a task.
class TaskRequest {
public:
/// List of predefined resources required by the task.
std::vector<ResourceRequest> predefined_resources;
std::vector<FixedPoint> predefined_resources;
/// List of custom resources required by the task.
std::vector<ResourceRequestWithId> custom_resources;
std::unordered_map<int64_t, FixedPoint> custom_resources;
/// Check whether the request contains no resources.
bool IsEmpty() const;
/// Returns human-readable string for this task request.

View file

@ -145,20 +145,19 @@ bool ClusterResourceScheduler::IsFeasible(const TaskRequest &task_req,
const NodeResources &resources) const {
// First, check predefined resources.
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (task_req.predefined_resources[i].demand >
resources.predefined_resources[i].total) {
if (task_req.predefined_resources[i] > resources.predefined_resources[i].total) {
return false;
}
}
// Now check custom resources.
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = resources.custom_resources.find(task_req_custom_resource.id);
auto it = resources.custom_resources.find(task_req_custom_resource.first);
if (it == resources.custom_resources.end()) {
return false;
}
if (task_req_custom_resource.demand > it->second.total) {
if (task_req_custom_resource.second > it->second.total) {
return false;
}
}
@ -173,8 +172,7 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req,
// First, check predefined resources.
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (task_req.predefined_resources[i].demand >
resources.predefined_resources[i].available) {
if (task_req.predefined_resources[i] > resources.predefined_resources[i].available) {
// A hard constraint has been violated, so we cannot schedule
// this task request.
return -1;
@ -183,14 +181,14 @@ int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req,
// Now check custom resources.
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = resources.custom_resources.find(task_req_custom_resource.id);
auto it = resources.custom_resources.find(task_req_custom_resource.first);
if (it == resources.custom_resources.end()) {
// Requested resource doesn't exist at this node.
// This is a hard constraint so cannot schedule this task request.
return -1;
} else {
if (task_req_custom_resource.demand > it->second.available) {
if (task_req_custom_resource.second > it->second.available) {
// Resource constraint is violated.
return -1;
}
@ -363,14 +361,14 @@ bool ClusterResourceScheduler::SubtractRemoteNodeAvailableResources(
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
resources->predefined_resources[i].available =
std::max(FixedPoint(0), resources->predefined_resources[i].available -
task_req.predefined_resources[i].demand);
task_req.predefined_resources[i]);
}
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = resources->custom_resources.find(task_req_custom_resource.id);
auto it = resources->custom_resources.find(task_req_custom_resource.first);
if (it != resources->custom_resources.end()) {
it->second.available =
std::max(FixedPoint(0), it->second.available - task_req_custom_resource.demand);
std::max(FixedPoint(0), it->second.available - task_req_custom_resource.second);
}
}
return true;
@ -748,8 +746,8 @@ bool ClusterResourceScheduler::AllocateTaskResourceInstances(
task_allocation->predefined_resources.resize(PredefinedResources_MAX);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (task_req.predefined_resources[i].demand > 0) {
if (!AllocateResourceInstances(task_req.predefined_resources[i].demand,
if (task_req.predefined_resources[i] > 0) {
if (!AllocateResourceInstances(task_req.predefined_resources[i],
local_resources_.predefined_resources[i].available,
&task_allocation->predefined_resources[i])) {
// Allocation failed. Restore node's local resources by freeing the resources
@ -761,11 +759,11 @@ bool ClusterResourceScheduler::AllocateTaskResourceInstances(
}
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = local_resources_.custom_resources.find(task_req_custom_resource.id);
auto it = local_resources_.custom_resources.find(task_req_custom_resource.first);
if (it != local_resources_.custom_resources.end()) {
if (task_req_custom_resource.demand > 0) {
if (task_req_custom_resource.second > 0) {
std::vector<FixedPoint> allocation;
bool success = AllocateResourceInstances(task_req_custom_resource.demand,
bool success = AllocateResourceInstances(task_req_custom_resource.second,
it->second.available, &allocation);
// Even if allocation failed we need to remember partial allocations to correctly
// free resources.

View file

@ -53,23 +53,17 @@ vector<FixedPoint> EmptyFixedPointVector;
void initTaskRequest(TaskRequest &tr, vector<FixedPoint> &pred_demands,
vector<int64_t> &cust_ids, vector<FixedPoint> &cust_demands) {
tr.predefined_resources.resize(PredefinedResources_MAX + pred_demands.size());
for (size_t i = 0; i < pred_demands.size(); i++) {
ResourceRequest rq;
rq.demand = pred_demands[i];
tr.predefined_resources.push_back(rq);
tr.predefined_resources[i] = pred_demands[i];
}
for (size_t i = pred_demands.size(); i < PredefinedResources_MAX; i++) {
ResourceRequest rq;
rq.demand = 0;
tr.predefined_resources.push_back(rq);
tr.predefined_resources.push_back(0);
}
for (size_t i = 0; i < cust_ids.size(); i++) {
ResourceRequestWithId rq;
rq.id = cust_ids[i];
rq.demand = cust_demands[i];
tr.custom_resources.push_back(rq);
tr.custom_resources[cust_ids[i]] = cust_demands[i];
}
};
@ -354,18 +348,17 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {
ASSERT_TRUE(resource_scheduler.GetNodeResources(node_id, &nr2));
for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) {
auto t =
nr1.predefined_resources[i].available - task_req.predefined_resources[i].demand;
auto t = nr1.predefined_resources[i].available - task_req.predefined_resources[i];
if (t < 0) t = 0;
ASSERT_EQ(nr2.predefined_resources[i].available, t);
}
for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) {
auto it1 = nr1.custom_resources.find(task_req.custom_resources[i].id);
for (size_t i = 1; i <= PRED_CUSTOM_LEN; i++) {
auto it1 = nr1.custom_resources.find(i);
if (it1 != nr1.custom_resources.end()) {
auto it2 = nr2.custom_resources.find(task_req.custom_resources[i].id);
auto it2 = nr2.custom_resources.find(i);
if (it2 != nr2.custom_resources.end()) {
auto t = it1->second.available - task_req.custom_resources[i].demand;
auto t = it1->second.available - task_req.custom_resources[i];
if (t < 0) t = 0;
ASSERT_EQ(it2->second.available, t);
}