New scheduler int capacities (#8192)

* working version

* working version

* done

* done

* done

* addressing most of Philipp comments

* addressing most of Philipp comments
This commit is contained in:
Ion 2020-05-03 18:47:30 -07:00 committed by GitHub
parent 2c71559b3a
commit e24276e3c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 341 additions and 227 deletions

View file

@ -14,7 +14,7 @@
#include "cluster_resource_scheduler.h"
std::string VectorToString(const std::vector<double> &vector) {
std::string VectorToString(const std::vector<FixedPoint> &vector) {
std::stringstream buffer;
buffer << "[";
@ -39,6 +39,26 @@ std::string UnorderedMapToString(const std::unordered_map<std::string, double> &
return buffer.str();
}
/// Convert a vector of doubles to a vector of resource units.
std::vector<FixedPoint> VectorDoubleToVectorFixedPoint(
const std::vector<double> &vector) {
std::vector<FixedPoint> vector_fp(vector.size());
for (size_t i = 0; i < vector.size(); i++) {
vector_fp[i] = vector[i];
}
return vector_fp;
}
/// Convert a vector of resource units to a vector of doubles.
std::vector<double> VectorFixedPointToVectorDouble(
const std::vector<FixedPoint> &vector_fp) {
std::vector<double> vector(vector_fp.size());
for (size_t i = 0; i < vector_fp.size(); i++) {
vector[i] = FixedPoint(vector_fp[i]).Double();
}
return vector;
}
/// Convert a map of resources to a TaskRequest data structure.
TaskRequest ResourceMapToTaskRequest(
StringIdMap &string_to_int_map,
@ -121,12 +141,12 @@ NodeResources ResourceMapToNodeResources(
for (auto const &resource : resource_map_total) {
ResourceCapacity resource_capacity;
resource_capacity.total = (int64_t)resource.second;
resource_capacity.total = resource.second;
auto it = resource_map_available.find(resource.first);
if (it == resource_map_available.end()) {
resource_capacity.available = 0;
} else {
resource_capacity.available = (int64_t)it->second;
resource_capacity.available = it->second;
}
if (resource.first == ray::kCPU_ResourceLabel) {
node_resources.predefined_resources[CPU] = resource_capacity;
@ -145,6 +165,30 @@ NodeResources ResourceMapToNodeResources(
return node_resources;
}
bool operator<(FixedPoint const &ru1, FixedPoint const &ru2) {
return (ru1.i_ < ru2.i_);
};
bool operator>(FixedPoint const &ru1, FixedPoint const &ru2) {
return (ru1.i_ > ru2.i_);
};
bool operator<=(FixedPoint const &ru1, FixedPoint const &ru2) {
return (ru1.i_ <= ru2.i_);
};
bool operator>=(FixedPoint const &ru1, FixedPoint const &ru2) {
return (ru1.i_ >= ru2.i_);
};
bool operator==(FixedPoint const &ru1, FixedPoint const &ru2) {
return (ru1.i_ == ru2.i_);
};
bool operator!=(FixedPoint const &ru1, FixedPoint const &ru2) {
return (ru1.i_ != ru2.i_);
};
std::ostream &operator<<(std::ostream &out, const FixedPoint &ru) {
out << ru.i_;
return out;
}
bool NodeResources::operator==(const NodeResources &other) {
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (this->predefined_resources[i].total != other.predefined_resources[i].total) {
@ -318,7 +362,7 @@ std::string TaskResourceInstances::DebugString() const {
return buffer.str();
}
bool EqualVectors(const std::vector<double> &v1, const std::vector<double> &v2) {
bool EqualVectors(const std::vector<FixedPoint> &v1, const std::vector<FixedPoint> &v2) {
return (v1.size() == v2.size() && std::equal(v1.begin(), v1.end(), v2.begin()));
}
@ -559,17 +603,19 @@ bool ClusterResourceScheduler::SubtractNodeAvailableResources(
return false;
}
FixedPoint zero(0.);
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
resources.predefined_resources[i].available =
std::max(0., resources.predefined_resources[i].available -
task_req.predefined_resources[i].demand);
std::max(FixedPoint(0), resources.predefined_resources[i].available -
task_req.predefined_resources[i].demand);
}
for (const auto &task_req_custom_resource : task_req.custom_resources) {
auto it = resources.custom_resources.find(task_req_custom_resource.id);
if (it != resources.custom_resources.end()) {
it->second.available =
std::max(0., it->second.available - task_req_custom_resource.demand);
std::max(FixedPoint(0), it->second.available - task_req_custom_resource.demand);
}
}
return true;
@ -629,7 +675,7 @@ int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); }
void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_id_string,
const std::string &resource_name,
int64_t resource_total) {
double resource_total) {
int64_t client_id = string_to_int_map_.Get(client_id_string);
auto it = nodes_.find(client_id);
@ -652,8 +698,10 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_
} else if (resource_name == ray::kMemory_ResourceLabel) {
idx = (int)MEM;
};
FixedPoint resource_total_fp(resource_total);
if (idx != -1) {
int64_t diff_capacity = resource_total - it->second.predefined_resources[idx].total;
auto diff_capacity = resource_total_fp - it->second.predefined_resources[idx].total;
it->second.predefined_resources[idx].total += diff_capacity;
it->second.predefined_resources[idx].available += diff_capacity;
if (it->second.predefined_resources[idx].available < 0) {
@ -666,7 +714,7 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_
int64_t resource_id = string_to_int_map_.Insert(resource_name);
auto itr = it->second.custom_resources.find(resource_id);
if (itr != it->second.custom_resources.end()) {
int64_t diff_capacity = resource_total - itr->second.total;
auto diff_capacity = resource_total_fp - itr->second.total;
itr->second.total += diff_capacity;
itr->second.available += diff_capacity;
if (itr->second.available < 0) {
@ -677,7 +725,7 @@ void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_
}
} else {
ResourceCapacity resource_capacity;
resource_capacity.total = resource_capacity.available = resource_total;
resource_capacity.total = resource_capacity.available = resource_total_fp;
it->second.custom_resources.emplace(resource_id, resource_capacity);
}
}
@ -725,9 +773,9 @@ std::string ClusterResourceScheduler::DebugString(void) const {
}
void ClusterResourceScheduler::InitResourceInstances(
double total, bool unit_instances, ResourceInstanceCapacities *instance_list) {
FixedPoint total, bool unit_instances, ResourceInstanceCapacities *instance_list) {
if (unit_instances) {
size_t num_instances = static_cast<size_t>(total);
size_t num_instances = static_cast<size_t>(total.Double());
instance_list->total.resize(num_instances);
instance_list->available.resize(num_instances);
for (size_t i = 0; i < num_instances; i++) {
@ -766,13 +814,13 @@ void ClusterResourceScheduler::InitLocalResources(const NodeResources &node_reso
}
}
std::vector<double> ClusterResourceScheduler::AddAvailableResourceInstances(
std::vector<double> available, ResourceInstanceCapacities *resource_instances) {
std::vector<double> overflow(available.size(), 0.);
std::vector<FixedPoint> ClusterResourceScheduler::AddAvailableResourceInstances(
std::vector<FixedPoint> available, ResourceInstanceCapacities *resource_instances) {
std::vector<FixedPoint> overflow(available.size(), 0.);
for (size_t i = 0; i < available.size(); i++) {
resource_instances->available[i] = resource_instances->available[i] + available[i];
if (resource_instances->available[i] > resource_instances->total[i]) {
overflow[i] = resource_instances->available[i] - resource_instances->total[i];
overflow[i] = (resource_instances->available[i] - resource_instances->total[i]);
resource_instances->available[i] = resource_instances->total[i];
}
}
@ -780,11 +828,11 @@ std::vector<double> ClusterResourceScheduler::AddAvailableResourceInstances(
return overflow;
}
std::vector<double> ClusterResourceScheduler::SubtractAvailableResourceInstances(
std::vector<double> available, ResourceInstanceCapacities *resource_instances) {
std::vector<FixedPoint> ClusterResourceScheduler::SubtractAvailableResourceInstances(
std::vector<FixedPoint> available, ResourceInstanceCapacities *resource_instances) {
RAY_CHECK(available.size() == resource_instances->available.size());
std::vector<double> underflow(available.size(), 0.);
std::vector<FixedPoint> underflow(available.size(), 0.);
for (size_t i = 0; i < available.size(); i++) {
resource_instances->available[i] = resource_instances->available[i] - available[i];
if (resource_instances->available[i] < 0) {
@ -796,10 +844,10 @@ std::vector<double> ClusterResourceScheduler::SubtractAvailableResourceInstances
}
bool ClusterResourceScheduler::AllocateResourceInstances(
double demand, bool soft, std::vector<double> &available,
std::vector<double> *allocation) {
FixedPoint demand, bool soft, std::vector<FixedPoint> &available,
std::vector<FixedPoint> *allocation) {
allocation->resize(available.size());
double remaining_demand = demand;
FixedPoint remaining_demand = demand;
if (available.size() == 1) {
// This resource has just an instance.
@ -866,7 +914,7 @@ bool ClusterResourceScheduler::AllocateResourceInstances(
// Remaining demand is fractional. Find the best fit, if exists.
if (remaining_demand > 0.) {
int64_t idx_best_fit = -1;
double available_best_fit = 1.;
FixedPoint available_best_fit = 1.;
for (size_t i = 0; i < available.size(); i++) {
if (available[i] >= remaining_demand) {
if (idx_best_fit == -1 ||
@ -912,7 +960,7 @@ bool ClusterResourceScheduler::AllocateTaskResourceInstances(
auto it = local_resources_.custom_resources.find(task_req_custom_resource.id);
if (it != local_resources_.custom_resources.end()) {
if (task_req_custom_resource.demand > 0) {
std::vector<double> allocation;
std::vector<FixedPoint> allocation;
bool success = AllocateResourceInstances(task_req_custom_resource.demand,
task_req_custom_resource.soft,
it->second.available, &allocation);
@ -976,30 +1024,36 @@ void ClusterResourceScheduler::FreeTaskResourceInstances(
std::vector<double> ClusterResourceScheduler::AddCPUResourceInstances(
std::vector<double> &cpu_instances) {
std::vector<FixedPoint> cpu_instances_fp =
VectorDoubleToVectorFixedPoint(cpu_instances);
if (cpu_instances.size() == 0) {
return cpu_instances; // No oveerflow.
}
RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end());
auto overflow = AddAvailableResourceInstances(
cpu_instances, &local_resources_.predefined_resources[CPU]);
cpu_instances_fp, &local_resources_.predefined_resources[CPU]);
UpdateLocalAvailableResourcesFromResourceInstances();
return overflow;
return VectorFixedPointToVectorDouble(overflow);
}
std::vector<double> ClusterResourceScheduler::SubtractCPUResourceInstances(
std::vector<double> &cpu_instances) {
std::vector<FixedPoint> cpu_instances_fp =
VectorDoubleToVectorFixedPoint(cpu_instances);
if (cpu_instances.size() == 0) {
return cpu_instances; // No underflow.
}
RAY_CHECK(nodes_.find(local_node_id_) != nodes_.end());
auto underflow = SubtractAvailableResourceInstances(
cpu_instances, &local_resources_.predefined_resources[CPU]);
cpu_instances_fp, &local_resources_.predefined_resources[CPU]);
UpdateLocalAvailableResourcesFromResourceInstances();
return underflow;
return VectorFixedPointToVectorDouble(underflow);
}
bool ClusterResourceScheduler::AllocateTaskResources(

View file

@ -30,23 +30,97 @@ enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX };
// Specify resources that consists of unit-size instances.
static std::unordered_set<int64_t> UnitInstanceResources{CPU, GPU, TPU};
// Helper function to compare two vectors with double values.
bool EqualVectors(const std::vector<double> &v1, const std::vector<double> &v2);
/// Fixed point data type.
class FixedPoint {
#define RESOURCE_UNIT_SCALING 1000
private:
int64_t i_;
public:
FixedPoint(double d = 0) { i_ = (int64_t)(d * RESOURCE_UNIT_SCALING); }
FixedPoint operator+(FixedPoint const &ru) {
FixedPoint res;
res.i_ = i_ + ru.i_;
return res;
}
FixedPoint operator+=(FixedPoint const &ru) {
i_ += ru.i_;
return *this;
}
FixedPoint operator-(FixedPoint const &ru) {
FixedPoint res;
res.i_ = i_ - ru.i_;
return res;
}
FixedPoint operator-=(FixedPoint const &ru) {
i_ -= ru.i_;
return *this;
}
FixedPoint operator-() const {
FixedPoint res;
res.i_ = -i_;
return res;
}
FixedPoint operator+(double const d) {
FixedPoint res;
res.i_ = i_ + (int64_t)(d * RESOURCE_UNIT_SCALING);
return res;
}
FixedPoint operator-(double const d) {
FixedPoint res;
res.i_ = i_ - (int64_t)(d * RESOURCE_UNIT_SCALING);
return res;
}
FixedPoint operator=(double const d) {
i_ = (int64_t)(d * RESOURCE_UNIT_SCALING);
;
return *this;
}
friend bool operator<(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator>(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator<=(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator>=(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator==(FixedPoint const &ru1, FixedPoint const &ru2);
friend bool operator!=(FixedPoint const &ru1, FixedPoint const &ru2);
double Double() { return (double)i_ / RESOURCE_UNIT_SCALING; };
friend std::ostream &operator<<(std::ostream &out, const FixedPoint &ru);
};
/// Helper function to compare two vectors with FixedPoint values.
bool EqualVectors(const std::vector<FixedPoint> &v1, const std::vector<FixedPoint> &v2);
/// Convert a vector of doubles to a vector of resource units.
std::vector<FixedPoint> VectorDoubleToVectorFixedPoint(const std::vector<double> &vector);
/// Convert a vector of resource units to a vector of doubles.
std::vector<double> VectorFixedPointToVectorDouble(
const std::vector<FixedPoint> &vector_fp);
struct ResourceCapacity {
double total;
double available;
FixedPoint total;
FixedPoint available;
};
/// Capacities of each instance of a resource.
struct ResourceInstanceCapacities {
std::vector<double> total;
std::vector<double> available;
std::vector<FixedPoint> total;
std::vector<FixedPoint> available;
};
struct ResourceRequest {
/// Amount of resource being requested.
double demand;
FixedPoint demand;
/// Specify whether the request is soft or hard.
/// If hard, the entire request is denied if the demand exceeds the resource
/// availability. Otherwise, the request can be still be granted.
@ -81,20 +155,27 @@ class TaskRequest {
class TaskResourceInstances {
public:
/// The list of instances of each predifined resource allocated to a task.
std::vector<std::vector<double>> predefined_resources;
std::vector<std::vector<FixedPoint>> predefined_resources;
/// The list of instances of each custom resource allocated to a task.
absl::flat_hash_map<int64_t, std::vector<double>> custom_resources;
absl::flat_hash_map<int64_t, std::vector<FixedPoint>> custom_resources;
bool operator==(const TaskResourceInstances &other);
/// For each resource of this request aggregate its instances.
TaskRequest ToTaskRequest() const;
/// Get CPU instances only.
std::vector<double> GetCPUInstances() const {
std::vector<FixedPoint> GetCPUInstances() const {
if (!this->predefined_resources.empty()) {
return this->predefined_resources[CPU];
} else {
return {};
}
};
std::vector<double> GetCPUInstancesDouble() const {
if (!this->predefined_resources.empty()) {
return VectorFixedPointToVectorDouble(this->predefined_resources[CPU]);
} else {
return {};
}
};
/// Check whether there are no resource instances.
bool IsEmpty() const;
/// Returns human-readable string for these resources.
@ -298,7 +379,7 @@ class ClusterResourceScheduler {
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void UpdateResourceCapacity(const std::string &node_name,
const std::string &resource_name, int64_t resource_total);
const std::string &resource_name, double resource_total);
/// Delete a given resource from a given node.
///
@ -324,7 +405,7 @@ class ClusterResourceScheduler {
/// \param unit_instances: If true, we split the resource in unit-size instances.
/// If false, we create a single instance of capacity "total".
/// \param instance_list: The list of capacities this resource instances.
void InitResourceInstances(double total, bool unit_instances,
void InitResourceInstances(FixedPoint total, bool unit_instances,
ResourceInstanceCapacities *instance_list);
/// Allocate enough capacity across the instances of a resource to satisfy "demand".
@ -358,8 +439,9 @@ class ClusterResourceScheduler {
///
/// \return true, if allocation successful. In this case, the sum of the elements in
/// "allocation" is equal to "demand".
bool AllocateResourceInstances(double demand, bool soft, std::vector<double> &available,
std::vector<double> *allocation);
bool AllocateResourceInstances(FixedPoint demand, bool soft,
std::vector<FixedPoint> &available,
std::vector<FixedPoint> *allocation);
/// Allocate local resources to satisfy a given request (task_req).
///
@ -386,8 +468,8 @@ class ClusterResourceScheduler {
/// \return Overflow capacities of "resource_instances" after adding instance
/// capacities in "available", i.e.,
/// min(available + resource_instances.available, resource_instances.total)
std::vector<double> AddAvailableResourceInstances(
std::vector<double> available, ResourceInstanceCapacities *resource_instances);
std::vector<FixedPoint> AddAvailableResourceInstances(
std::vector<FixedPoint> available, ResourceInstanceCapacities *resource_instances);
/// Decrease the available capacities of the instances of a given resource.
///
@ -396,8 +478,8 @@ class ClusterResourceScheduler {
/// \return Underflow of "resource_instances" after subtracting instance
/// capacities in "available", i.e.,.
/// max(available - reasource_instances.available, 0)
std::vector<double> SubtractAvailableResourceInstances(
std::vector<double> available, ResourceInstanceCapacities *resource_instances);
std::vector<FixedPoint> SubtractAvailableResourceInstances(
std::vector<FixedPoint> available, ResourceInstanceCapacities *resource_instances);
/// Increase the available CPU instances of this node.
///

View file

@ -30,11 +30,11 @@ using namespace std;
// Used to path empty vector argiuments.
vector<int64_t> EmptyIntVector;
vector<bool> EmptyBoolVector;
vector<double> EmptyDoubleVector;
vector<FixedPoint> EmptyFixedPointVector;
void initTaskRequest(TaskRequest &tr, vector<double> &pred_demands,
void initTaskRequest(TaskRequest &tr, vector<FixedPoint> &pred_demands,
vector<bool> &pred_soft, vector<int64_t> &cust_ids,
vector<double> &cust_demands, vector<bool> &cust_soft,
vector<FixedPoint> &cust_demands, vector<bool> &cust_soft,
vector<int64_t> &placement_hints) {
for (size_t i = 0; i < pred_demands.size(); i++) {
ResourceRequest rq;
@ -65,19 +65,21 @@ void initTaskRequest(TaskRequest &tr, vector<double> &pred_demands,
void addTaskResourceInstances(bool predefined, vector<double> allocation, uint64_t idx,
TaskResourceInstances *task_allocation) {
std::vector<FixedPoint> allocation_fp = VectorDoubleToVectorFixedPoint(allocation);
if (task_allocation->predefined_resources.size() < PredefinedResources_MAX) {
task_allocation->predefined_resources.resize(PredefinedResources_MAX);
}
if (predefined) {
task_allocation->predefined_resources[idx] = allocation;
task_allocation->predefined_resources[idx] = allocation_fp;
} else {
task_allocation->custom_resources.insert(
std::pair<int64_t, vector<double>>(idx, allocation));
std::pair<int64_t, vector<FixedPoint>>(idx, allocation_fp));
}
};
void initNodeResources(NodeResources &node, vector<int64_t> &pred_capacities,
vector<int64_t> &cust_ids, vector<int64_t> &cust_capacities) {
void initNodeResources(NodeResources &node, vector<FixedPoint> &pred_capacities,
vector<int64_t> &cust_ids, vector<FixedPoint> &cust_capacities) {
for (size_t i = 0; i < pred_capacities.size(); i++) {
ResourceCapacity rc;
rc.total = rc.available = pred_capacities[i];
@ -100,9 +102,9 @@ void initNodeResources(NodeResources &node, vector<int64_t> &pred_capacities,
}
void initCluster(ClusterResourceScheduler &cluster_resources, int n) {
vector<int64_t> pred_capacities;
vector<FixedPoint> pred_capacities;
vector<int64_t> cust_ids;
vector<int64_t> cust_capacities;
vector<FixedPoint> cust_capacities;
int i, k;
for (i = 0; i < n; i++) {
@ -178,6 +180,51 @@ class SchedulingTest : public ::testing::Test {
void Shutdown() {}
};
TEST_F(SchedulingTest, SchedulingFixedPointTest) {
{
FixedPoint fp(1.);
FixedPoint fp1(1.);
FixedPoint fp2(2.);
ASSERT_TRUE(fp1 < fp2);
ASSERT_TRUE(fp2 > fp1);
ASSERT_TRUE(fp1 <= fp2);
ASSERT_TRUE(fp2 >= fp1);
ASSERT_TRUE(fp1 != fp2);
ASSERT_TRUE(fp1 == fp);
ASSERT_TRUE(fp1 >= fp);
ASSERT_TRUE(fp1 <= fp);
}
{
FixedPoint fp1(1.);
FixedPoint fp2(2.);
ASSERT_TRUE(fp1 < 2);
ASSERT_TRUE(fp2 > 1.);
ASSERT_TRUE(fp1 <= 2.);
ASSERT_TRUE(fp2 >= 1.);
ASSERT_TRUE(fp1 != 2.);
ASSERT_TRUE(fp1 == 1.);
ASSERT_TRUE(fp1 >= 1.);
ASSERT_TRUE(fp1 <= 1.);
ASSERT_TRUE(fp1 + fp2 == 3.);
ASSERT_TRUE(fp2 - fp1 == 1.);
ASSERT_TRUE((fp1 += 1.) == 2.);
ASSERT_TRUE((fp2 -= 1.) == 1.);
}
{
FixedPoint fp1(1.);
ASSERT_TRUE(fp1.Double() == 1.);
}
}
TEST_F(SchedulingTest, SchedulingIdTest) {
StringIdMap ids;
hash<string> hasher;
@ -237,9 +284,9 @@ TEST_F(SchedulingTest, SchedulingModifyClusterNodeTest) {
initCluster(cluster_resources, num_nodes);
NodeResources node_resources;
vector<int64_t> pred_capacities;
vector<FixedPoint> pred_capacities;
vector<int64_t> cust_ids;
vector<int64_t> cust_capacities;
vector<FixedPoint> cust_capacities;
int k;
for (k = 0; k < PredefinedResources_MAX; k++) {
@ -266,19 +313,19 @@ TEST_F(SchedulingTest, SchedulingModifyClusterNodeTest) {
TEST_F(SchedulingTest, SchedulingUpdateAvailableResourcesTest) {
// Create cluster resources.
NodeResources node_resources;
vector<int64_t> pred_capacities{10, 5, 3};
vector<FixedPoint> pred_capacities{10, 5, 3};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{5, 5};
vector<FixedPoint> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(1, node_resources);
{
TaskRequest task_req;
#define PRED_CUSTOM_LEN 2
vector<double> pred_demands{7, 7};
vector<FixedPoint> pred_demands{7, 7};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1, 2};
vector<double> cust_demands{3, 10};
vector<FixedPoint> cust_demands{3, 10};
vector<bool> cust_soft{false, true};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -293,7 +340,7 @@ TEST_F(SchedulingTest, SchedulingUpdateAvailableResourcesTest) {
ASSERT_TRUE(cluster_resources.GetNodeResources(node_id, &nr2));
for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) {
int64_t t =
auto t =
nr1.predefined_resources[i].available - task_req.predefined_resources[i].demand;
if (t < 0) t = 0;
ASSERT_EQ(nr2.predefined_resources[i].available, t);
@ -304,7 +351,7 @@ TEST_F(SchedulingTest, SchedulingUpdateAvailableResourcesTest) {
if (it1 != nr1.custom_resources.end()) {
auto it2 = nr2.custom_resources.find(task_req.custom_resources[i].id);
if (it2 != nr2.custom_resources.end()) {
int64_t t = it1->second.available - task_req.custom_resources[i].demand;
auto t = it1->second.available - task_req.custom_resources[i].demand;
if (t < 0) t = 0;
ASSERT_EQ(it2->second.available, t);
}
@ -321,9 +368,9 @@ TEST_F(SchedulingTest, SchedulingAddOrUpdateNodeTest) {
// Add node.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{10, 5, 3};
vector<FixedPoint> pred_capacities{10, 5, 3};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{5, 5};
vector<FixedPoint> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
cluster_resources.AddOrUpdateNode(node_id, node_resources);
nr = node_resources;
@ -339,9 +386,9 @@ TEST_F(SchedulingTest, SchedulingAddOrUpdateNodeTest) {
// Update node.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{10, 10};
vector<FixedPoint> pred_capacities{10, 10};
vector<int64_t> cust_ids{2, 3};
vector<int64_t> cust_capacities{6, 6};
vector<FixedPoint> cust_capacities{6, 6};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
cluster_resources.AddOrUpdateNode(node_id, node_resources);
nr = node_resources;
@ -356,9 +403,9 @@ TEST_F(SchedulingTest, SchedulingAddOrUpdateNodeTest) {
TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Create cluster resources containing local node.
NodeResources node_resources;
vector<int64_t> pred_capacities{5, 5};
vector<FixedPoint> pred_capacities{5, 5};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_capacities{10};
vector<FixedPoint> cust_capacities{10};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
@ -366,19 +413,19 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
{
NodeResources node_resources;
vector<int64_t> pred_capacities{10, 2, 3};
vector<FixedPoint> pred_capacities{10, 2, 3};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{5, 5};
vector<FixedPoint> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
cluster_resources.AddOrUpdateNode(1, node_resources);
}
// Predefined resources, hard constraint violation
{
TaskRequest task_req;
vector<double> pred_demands = {11};
vector<FixedPoint> pred_demands = {11};
vector<bool> pred_soft = {false};
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyDoubleVector,
EmptyBoolVector, EmptyIntVector);
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_EQ(node_id, -1);
@ -386,10 +433,10 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Predefined resources, soft constraint violation
{
TaskRequest task_req;
vector<double> pred_demands = {11};
vector<FixedPoint> pred_demands = {11};
vector<bool> pred_soft = {true};
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyDoubleVector,
EmptyBoolVector, EmptyIntVector);
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
@ -399,10 +446,10 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Predefined resources, no constraint violation.
{
TaskRequest task_req;
vector<double> pred_demands = {5};
vector<FixedPoint> pred_demands = {5};
vector<bool> pred_soft = {false};
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyDoubleVector,
EmptyBoolVector, EmptyIntVector);
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
@ -411,10 +458,10 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Custom resources, hard constraint violation.
{
TaskRequest task_req;
vector<double> pred_demands{5, 2};
vector<FixedPoint> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<double> cust_demands{11};
vector<FixedPoint> cust_demands{11};
vector<bool> cust_soft{false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -425,10 +472,10 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Custom resources, soft constraint violation.
{
TaskRequest task_req;
vector<double> pred_demands{5, 2};
vector<FixedPoint> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<double> cust_demands{11};
vector<FixedPoint> cust_demands{11};
vector<bool> cust_soft{true};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -440,10 +487,10 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Custom resources, no constraint violation.
{
TaskRequest task_req;
vector<double> pred_demands{5, 2};
vector<FixedPoint> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<double> cust_demands{5};
vector<FixedPoint> cust_demands{5};
vector<bool> cust_soft{false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -455,10 +502,10 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Custom resource missing, hard constraint violation.
{
TaskRequest task_req;
vector<double> pred_demands{5, 2};
vector<FixedPoint> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{100};
vector<double> cust_demands{5};
vector<FixedPoint> cust_demands{5};
vector<bool> cust_soft{false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -469,10 +516,10 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Custom resource missing, soft constraint violation.
{
TaskRequest task_req;
vector<double> pred_demands{5, 2};
vector<FixedPoint> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{100};
vector<double> cust_demands{5};
vector<FixedPoint> cust_demands{5};
vector<bool> cust_soft{true};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -484,10 +531,10 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Placement_hints, soft constraint violation.
{
TaskRequest task_req;
vector<double> pred_demands{5, 2};
vector<FixedPoint> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<double> cust_demands{5};
vector<FixedPoint> cust_demands{5};
vector<bool> cust_soft{true};
vector<int64_t> placement_hints{2, 3};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
@ -500,10 +547,10 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
// Placement hints, no constraint violation.
{
TaskRequest task_req;
vector<double> pred_demands{5, 2};
vector<FixedPoint> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<double> cust_demands{5};
vector<FixedPoint> cust_demands{5};
vector<bool> cust_soft{true};
vector<int64_t> placement_hints{1, 2, 3};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
@ -518,9 +565,9 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
TEST_F(SchedulingTest, GetLocalAvailableResourcesTest) {
// Create cluster resources containing local node.
NodeResources node_resources;
vector<int64_t> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<FixedPoint> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_capacities{8};
vector<FixedPoint> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
@ -539,22 +586,23 @@ TEST_F(SchedulingTest, GetLocalAvailableResourcesTest) {
ASSERT_EQ(expected_cluster_resources == available_cluster_resources, true);
}
TEST_F(SchedulingTest, GetCPUInstancesTest) {
TEST_F(SchedulingTest, GetCPUInstancesDoubleTest) {
TaskResourceInstances task_resources;
addTaskResourceInstances(true, {1., 1., 1.}, CPU, &task_resources);
addTaskResourceInstances(true, {4.}, MEM, &task_resources);
addTaskResourceInstances(true, {1., 1., 1., 1., 1.}, GPU, &task_resources);
std::vector<double> cpu_instances = task_resources.GetCPUInstances();
std::vector<double> expected_cpu_instances{1., 1., 1.};
std::vector<FixedPoint> cpu_instances = task_resources.GetCPUInstances();
std::vector<FixedPoint> expected_cpu_instances{1., 1., 1.};
ASSERT_EQ(EqualVectors(cpu_instances, expected_cpu_instances), true);
}
TEST_F(SchedulingTest, AvailableResourceInstancesOpsTest) {
NodeResources node_resources;
vector<int64_t> pred_capacities{3 /* CPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyIntVector);
vector<FixedPoint> pred_capacities{3 /* CPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler cluster(0, node_resources);
ResourceInstanceCapacities instances;
@ -563,7 +611,7 @@ TEST_F(SchedulingTest, AvailableResourceInstancesOpsTest) {
instances.available = {3., 2., 5.};
ResourceInstanceCapacities old_instances = instances;
std::vector<double> a{1., 1., 1.};
std::vector<FixedPoint> a{1., 1., 1.};
cluster.AddAvailableResourceInstances(a, &instances);
cluster.SubtractAvailableResourceInstances(a, &instances);
@ -571,7 +619,7 @@ TEST_F(SchedulingTest, AvailableResourceInstancesOpsTest) {
a = {10., 1., 1.};
cluster.AddAvailableResourceInstances(a, &instances);
std::vector<double> expected_available{6., 3., 6.};
std::vector<FixedPoint> expected_available{6., 3., 6.};
ASSERT_EQ(EqualVectors(instances.available, expected_available), true);
@ -585,17 +633,19 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
// Allocate resources for a task request specifying only predefined resources.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyIntVector);
vector<FixedPoint> pred_capacities{3. /* CPU */, 4. /* MEM */, 5. /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler cluster_resources(0, node_resources);
TaskRequest task_req;
vector<double> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<FixedPoint> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<bool> pred_soft = {false};
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyDoubleVector,
EmptyBoolVector, EmptyIntVector);
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
bool success =
@ -611,15 +661,16 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
// resource.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyIntVector);
vector<FixedPoint> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler cluster_resources(0, node_resources);
TaskRequest task_req;
vector<double> pred_demands = {4. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<FixedPoint> pred_demands = {4. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<bool> pred_soft = {false}; // Hard constrained resource.
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyDoubleVector,
EmptyBoolVector, EmptyIntVector);
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
@ -633,15 +684,16 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
// Allocate resources for a task request that overallocates a soft constrained resource.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector, EmptyIntVector);
vector<FixedPoint> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
EmptyFixedPointVector);
ClusterResourceScheduler cluster_resources(0, node_resources);
TaskRequest task_req;
vector<double> pred_demands = {4. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<FixedPoint> pred_demands = {4. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<bool> pred_soft = {true}; // Soft constrained resource.
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyDoubleVector,
EmptyBoolVector, EmptyIntVector);
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector,
EmptyFixedPointVector, EmptyBoolVector, EmptyIntVector);
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
@ -665,16 +717,16 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
// resources.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<FixedPoint> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{4, 4};
vector<FixedPoint> cust_capacities{4, 4};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
TaskRequest task_req;
vector<double> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<FixedPoint> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<bool> pred_soft = {false};
vector<double> cust_demands{3, 2};
vector<FixedPoint> cust_demands{3, 2};
vector<bool> cust_soft{false, false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -695,16 +747,16 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
// resources, but overallocates a hard-constrained custom resource.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<FixedPoint> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{4, 4};
vector<FixedPoint> cust_capacities{4, 4};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
TaskRequest task_req;
vector<double> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<FixedPoint> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<bool> pred_soft = {false};
vector<double> cust_demands{3, 10};
vector<FixedPoint> cust_demands{3, 10};
vector<bool> cust_soft{false, false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -722,16 +774,16 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
// resources, but overallocates a soft-constrained custom resource.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<FixedPoint> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{4, 4};
vector<FixedPoint> cust_capacities{4, 4};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
TaskRequest task_req;
vector<double> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<FixedPoint> pred_demands = {3. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<bool> pred_soft = {false};
vector<double> cust_demands{3, 10};
vector<FixedPoint> cust_demands{3, 10};
vector<bool> cust_soft{false, true};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -761,16 +813,16 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
TEST_F(SchedulingTest, TaskResourceInstancesTest2) {
{
NodeResources node_resources;
vector<int64_t> pred_capacities{4 /* CPU */, 4 /* MEM */, 5 /* GPU */};
vector<FixedPoint> pred_capacities{4. /* CPU */, 4. /* MEM */, 5. /* GPU */};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{4, 4};
vector<FixedPoint> cust_capacities{4., 4.};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
TaskRequest task_req;
vector<double> pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<FixedPoint> pred_demands = {2. /* CPU */, 2. /* MEM */, 1.5 /* GPU */};
vector<bool> pred_soft = {false};
vector<double> cust_demands{3, 2};
vector<FixedPoint> cust_demands{3., 2.};
vector<bool> cust_soft{false, false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
@ -782,7 +834,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest2) {
NodeResourceInstances old_local_resources = cluster_resources.GetLocalResources();
ASSERT_EQ(success, true);
std::vector<double> cpu_instances = task_allocation->GetCPUInstances();
std::vector<double> cpu_instances = task_allocation->GetCPUInstancesDouble();
cluster_resources.AddCPUResourceInstances(cpu_instances);
cluster_resources.SubtractCPUResourceInstances(cpu_instances);
@ -793,9 +845,9 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest2) {
TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) {
{
NodeResources node_resources;
vector<int64_t> pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */};
vector<FixedPoint> pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_capacities{8};
vector<FixedPoint> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
@ -803,7 +855,7 @@ TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) {
cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances);
std::vector<double> available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
.GetCPUInstancesDouble();
std::vector<double> expected_available_cpu_instances{0.5, 0.5, 0.5, 0.5};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
@ -811,7 +863,7 @@ TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) {
cluster_resources.AddCPUResourceInstances(allocate_cpu_instances);
available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
.GetCPUInstancesDouble();
expected_available_cpu_instances = {1., 1., 1., 1.};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
@ -824,7 +876,7 @@ TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) {
std::equal(underflow.begin(), underflow.end(), expected_underflow.begin()));
available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
.GetCPUInstancesDouble();
expected_available_cpu_instances = {0., 0., 0.5, 0.};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
@ -836,7 +888,7 @@ TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) {
ASSERT_TRUE(std::equal(overflow.begin(), overflow.end(), expected_overflow.begin()));
available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
.GetCPUInstancesDouble();
expected_available_cpu_instances = {1., .5, 1., .5};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(), available_cpu_instances.end(),
expected_available_cpu_instances.begin()));
@ -846,9 +898,9 @@ TEST_F(SchedulingTest, TaskCPUResourceInstancesTest) {
TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) {
{
NodeResources node_resources;
vector<int64_t> pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */};
vector<FixedPoint> pred_capacities{4 /* CPU */, 1 /* MEM */, 1 /* GPU */};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_capacities{8};
vector<FixedPoint> cust_capacities{8};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
@ -859,7 +911,7 @@ TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) {
cluster_resources.SubtractCPUResourceInstances(allocate_cpu_instances);
std::vector<double> available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
.GetCPUInstancesDouble();
std::vector<double> expected_available_cpu_instances{0.5, 0.5, 0., 0.5};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(),
available_cpu_instances.end(),
@ -877,7 +929,7 @@ TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) {
cluster_resources.AddCPUResourceInstances(allocate_cpu_instances);
std::vector<double> available_cpu_instances = cluster_resources.GetLocalResources()
.GetAvailableResourceInstances()
.GetCPUInstances();
.GetCPUInstancesDouble();
std::vector<double> expected_available_cpu_instances{1., 1., 1., 0.8};
ASSERT_TRUE(std::equal(available_cpu_instances.begin(),
available_cpu_instances.end(),
@ -890,78 +942,6 @@ TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) {
}
}
#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION
TEST_F(SchedulingTest, SchedulingMapPerformanceTest) {
size_t map_len = 1000000;
unordered_map<int64_t, int64_t> umap_int_key;
unordered_map<string, int64_t> umap_string_key;
absl::flat_hash_map<int64_t, int64_t> amap_int_key;
absl::flat_hash_map<string, int64_t> amap_string_key;
vector<string> search_key_strings;
vector<int64_t> search_key_ints;
for (size_t i = 0; i < map_len; i++) {
int id = rand() % map_len;
search_key_strings.push_back(to_string(id));
search_key_ints.push_back(id);
umap_int_key.emplace(i, i);
umap_string_key.emplace(to_string(i), i);
amap_int_key.emplace(i, i);
amap_string_key.emplace(to_string(i), i);
}
int64_t sum;
auto t_start = std::chrono::high_resolution_clock::now();
sum = 0;
for (size_t i = 0; i < map_len; i++) {
auto it = umap_int_key.find(search_key_ints[i]);
if (it != umap_int_key.end()) {
sum += it->second;
}
}
auto t_end = std::chrono::high_resolution_clock::now();
double duration = std::chrono::duration<double, std::milli>(t_end - t_start).count();
cout << "sum = " << sum << " in " << duration << endl;
t_start = std::chrono::high_resolution_clock::now();
sum = 0;
for (size_t i = 0; i < map_len; i++) {
auto it = umap_string_key.find(search_key_strings[i]);
if (it != umap_string_key.end()) {
sum += it->second;
}
}
t_end = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration<double, std::milli>(t_end - t_start).count();
cout << "sum = " << sum << " in " << duration << endl;
t_start = std::chrono::high_resolution_clock::now();
sum = 0;
for (size_t i = 0; i < map_len; i++) {
auto it = amap_int_key.find(search_key_ints[i]);
if (it != amap_int_key.end()) {
sum += it->second;
}
}
t_end = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration<double, std::milli>(t_end - t_start).count();
cout << "sum = " << sum << " in " << duration << endl;
t_start = std::chrono::high_resolution_clock::now();
sum = 0;
for (size_t i = 0; i < map_len; i++) {
auto it = amap_string_key.find(search_key_strings[i]);
if (it != amap_string_key.end()) {
sum += it->second;
}
}
t_end = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration<double, std::milli>(t_end - t_start).count();
cout << "sum = " << sum << " in " << duration << endl;
}
#endif // UNORDERED_VS_ABSL_MAPS_EVALUATION
} // namespace ray
int main(int argc, char **argv) {

View file

@ -1704,8 +1704,6 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary());
RAY_CHECK(leased_workers_.find(worker->WorkerId()) == leased_workers_.end());
leased_workers_[worker->WorkerId()] = worker;
// TODO (Ion): Fix handling floating point errors, maybe by moving to integers.
#define ZERO_CAPACITY 1.0e-5
std::shared_ptr<TaskResourceInstances> allocated_resources;
if (task_spec.IsActorCreationTask()) {
allocated_resources = worker->GetLifetimeAllocatedInstances();
@ -1719,7 +1717,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
// instances has available capacity.
for (size_t inst_idx = 0; inst_idx < predefined_resources[res_idx].size();
inst_idx++) {
if (std::abs(predefined_resources[res_idx][inst_idx]) > ZERO_CAPACITY) {
if (predefined_resources[res_idx][inst_idx] > 0.) {
if (first) {
resource = reply->add_resource_mapping();
resource->set_name(
@ -1728,7 +1726,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
}
auto rid = resource->add_resource_ids();
rid->set_index(inst_idx);
rid->set_quantity(predefined_resources[res_idx][inst_idx]);
rid->set_quantity(predefined_resources[res_idx][inst_idx].Double());
}
}
}
@ -1737,7 +1735,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
bool first = true; // Set resource name only if at least one of its
// instances has available capacity.
for (size_t inst_idx = 0; inst_idx < it->second.size(); inst_idx++) {
if (std::abs(it->second[inst_idx]) > ZERO_CAPACITY) {
if (it->second[inst_idx] > 0.) {
if (first) {
resource = reply->add_resource_mapping();
resource->set_name(
@ -1746,7 +1744,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
}
auto rid = resource->add_resource_ids();
rid->set_index(inst_idx);
rid->set_quantity(it->second[inst_idx]);
rid->set_quantity(it->second[inst_idx].Double());
}
}
}
@ -2252,7 +2250,7 @@ void NodeManager::HandleDirectCallTaskBlocked(const std::shared_ptr<Worker> &wor
}
std::vector<double> cpu_instances;
if (worker->GetAllocatedInstances() != nullptr) {
cpu_instances = worker->GetAllocatedInstances()->GetCPUInstances();
cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble();
}
if (cpu_instances.size() > 0) {
std::vector<double> borrowed_cpu_instances =
@ -2281,7 +2279,7 @@ void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr<Worker> &w
}
std::vector<double> cpu_instances;
if (worker->GetAllocatedInstances() != nullptr) {
cpu_instances = worker->GetAllocatedInstances()->GetCPUInstances();
cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble();
}
if (cpu_instances.size() > 0) {
new_resource_scheduler_->SubtractCPUResourceInstances(cpu_instances);
@ -2557,7 +2555,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) {
Task task;
if (new_scheduler_enabled_) {
task = worker.GetAssignedTask();
// leased_workers_.erase(worker.WorkerId()); // Maybe RAY_CHECK ???
// leased_workers_.erase(worker.WorkerId()); // Maybe RAY_CHECK ?
if (worker.GetAllocatedInstances() != nullptr) {
new_resource_scheduler_->SubtractCPUResourceInstances(
worker.GetBorrowedCPUInstances());