Initial commit of new cluster resource scheduler (#6178)

This commit is contained in:
Ion 2019-11-22 11:14:46 -08:00 committed by Edward Oakes
parent 05ce789e5b
commit 68ac08332b
6 changed files with 926 additions and 48 deletions

View file

@ -218,6 +218,8 @@ cc_library(
":ray_util", ":ray_util",
"@boost//:asio", "@boost//:asio",
"@com_github_grpc_grpc//:grpc++", "@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_googletest//:gtest", "@com_google_googletest//:gtest",
"@plasma//:plasma_client", "@plasma//:plasma_client",
], ],

View file

@ -0,0 +1,196 @@
#include "cluster_resource_scheduler.h"
ClusterResourceScheduler::ClusterResourceScheduler(
int64_t local_node_id, const NodeResources &local_node_resources)
: local_node_id_(local_node_id) {
AddOrUpdateNode(local_node_id_, local_node_resources);
}
void ClusterResourceScheduler::SetPredefinedResources(const NodeResources &new_resources,
NodeResources *old_resources) {
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
old_resources->capacities[i].total = new_resources.capacities[i].total;
old_resources->capacities[i].available = new_resources.capacities[i].available;
}
}
void ClusterResourceScheduler::SetCustomResources(
const absl::flat_hash_map<int64_t, ResourceCapacity> &new_custom_resources,
absl::flat_hash_map<int64_t, ResourceCapacity> *old_custom_resources) {
old_custom_resources->clear();
for (auto &elem : new_custom_resources) {
old_custom_resources->insert(elem);
}
}
void ClusterResourceScheduler::AddOrUpdateNode(int64_t node_id,
const NodeResources &node_resources) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
// This node is new, so add it to the map.
nodes_.emplace(node_id, node_resources);
} else {
// This node exists, so update its resources.
NodeResources &resources = it->second;
SetPredefinedResources(node_resources, &resources);
SetCustomResources(node_resources.custom_resources, &resources.custom_resources);
}
}
bool ClusterResourceScheduler::RemoveNode(int64_t node_id) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
// Node not found.
return false;
} else {
it->second.custom_resources.clear();
nodes_.erase(it);
return true;
}
}
int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req,
int64_t node_id,
const NodeResources &resources) {
int violations = 0;
// First, check predefined resources.
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (task_req.predefined_resources[i].demand > resources.capacities[i].available) {
if (task_req.predefined_resources[i].soft) {
// A soft constraint has been violated.
violations++;
} else {
// A hard constraint has been violated.
return -1;
}
}
}
for (size_t i = 0; i < task_req.custom_resources.size(); i++) {
auto it = resources.custom_resources.find(task_req.custom_resources[i].id);
if (it == resources.custom_resources.end()) {
// Requested resource doesn't exist at this node.
if (task_req.custom_resources[i].req.soft) {
violations++;
} else {
return -1;
}
} else {
if (task_req.custom_resources[i].req.demand > it->second.available) {
// Resource constraint is violated.
if (task_req.custom_resources[i].req.soft) {
violations++;
} else {
return -1;
}
}
}
}
if (task_req.placement_hints.size() > 0) {
auto it_p = task_req.placement_hints.find(node_id);
if (it_p == task_req.placement_hints.end()) {
// Node not found in the placement_hints list, so
// record this a soft constraint violation.
violations++;
}
}
return violations;
}
int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task_req,
int64_t *total_violations) {
// Min number of violations across all nodes that can schedule the request.
int64_t min_violations = INT_MAX;
// Node associated to min_violations.
int64_t best_node = -1;
*total_violations = 0;
// Check whether local node is schedulable. We return immediately
// the local node only if there are zero violations.
auto it = nodes_.find(local_node_id_);
if (it != nodes_.end()) {
if (IsSchedulable(task_req, it->first, it->second) == 0) {
return local_node_id_;
}
}
// Check whether any node in the request placement_hints, satisfes
// all resource constraints of the request.
for (auto it_p = task_req.placement_hints.begin();
it_p != task_req.placement_hints.end(); ++it_p) {
auto it = nodes_.find(*it_p);
if (it != nodes_.end()) {
if (IsSchedulable(task_req, it->first, it->second) == 0) {
return it->first;
}
}
}
for (auto it = nodes_.begin(); it != nodes_.end(); ++it) {
// Return -1 if node not schedulable. otherwise return the number
// of soft constraint violations.
int64_t violations;
if ((violations = IsSchedulable(task_req, it->first, it->second)) == -1) {
break;
}
// Update the node with the smallest number of soft constraints violated.
if (min_violations > violations) {
min_violations = violations;
best_node = it->first;
}
if (violations == 0) {
*total_violations = 0;
return best_node;
}
}
*total_violations = min_violations;
return best_node;
}
bool ClusterResourceScheduler::SubtractNodeAvailableResources(
int64_t node_id, const TaskRequest &task_req) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
return false;
}
NodeResources &resources = it->second;
// Just double check this node can still schedule the task request.
if (IsSchedulable(task_req, local_node_id_, resources) == -1) {
return false;
}
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
resources.capacities[i].available =
std::max(static_cast<int64_t>(0), resources.capacities[i].available -
task_req.predefined_resources[i].demand);
}
for (size_t i = 0; i < task_req.custom_resources.size(); i++) {
auto it = resources.custom_resources.find(task_req.custom_resources[i].id);
if (it != resources.custom_resources.end()) {
it->second.available =
std::max(static_cast<int64_t>(0),
it->second.available - task_req.custom_resources[i].req.demand);
}
}
return true;
}
bool ClusterResourceScheduler::GetNodeResources(int64_t node_id,
NodeResources *ret_resources) {
auto it = nodes_.find(node_id);
if (it != nodes_.end()) {
*ret_resources = it->second;
return true;
} else {
return false;
}
}
int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); }

View file

@ -0,0 +1,162 @@
#ifndef RAY_COMMON_SCHEDULING_SCHEDULING_H
#define RAY_COMMON_SCHEDULING_SCHEDULING_H
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include <iostream>
#include <vector>
/// List of predefined resources.
enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX };
struct ResourceCapacity {
int64_t total;
int64_t available;
};
struct ResourceRequest {
/// Amount of resource being requested.
int64_t demand;
/// Specify whether the request is soft or hard.
/// If hard, the entire request is denied if the demand exceeds the resource
/// availability. Otherwise, the request can be still be granted.
/// Prefernces are given to the nodes with the lowest number of violations.
bool soft;
};
/// Resource request, including resource ID. This is used for custom resources.
struct ResourceRequestWithId {
/// Resource ID.
int64_t id;
/// Resource request.
ResourceRequest req;
};
struct NodeResources {
/// Available and total capacities for predefined resources.
std::vector<ResourceCapacity> capacities;
/// Map containing custom resources. The key of each entry represents the
/// custom resource ID.
absl::flat_hash_map<int64_t, ResourceCapacity> custom_resources;
};
struct TaskRequest {
/// List of predefined resources required by the task.
std::vector<ResourceRequest> predefined_resources;
/// List of custom resources required by the tasl.
std::vector<ResourceRequestWithId> custom_resources;
/// List of placement hints. A placement hint is a node on which
/// we desire to run this task. This is a soft constraint in that
/// the task will run on a different node in the cluster, if none of the
/// nodes in this list can schedule this task.
absl::flat_hash_set<int64_t> placement_hints;
};
/// Class encapsulating the cluster resources and the logic to assign
/// tasks to nodes based on the task's constraints and the available
/// resources at those nodes.
class ClusterResourceScheduler {
/// List of nodes in the clusters and their resources organized as a map.
/// The key of the map is the node ID.
absl::flat_hash_map<int64_t, NodeResources> nodes_;
/// ID of local node.
int64_t local_node_id_;
/// Set predefined resources.
///
/// \param[in] new_resources: New predefined resources.
/// \param[out] old_resources: Predefined resources to be updated.
void SetPredefinedResources(const NodeResources &new_resources,
NodeResources *old_resources);
/// Set custom resources.
///
/// \param[in] new_resources: New custom resources.
/// \param[out] old_resources: Custom resources to be updated.
void SetCustomResources(
const absl::flat_hash_map<int64_t, ResourceCapacity> &new_custom_resources,
absl::flat_hash_map<int64_t, ResourceCapacity> *old_custom_resources);
public:
ClusterResourceScheduler(void){};
/// Constructor initializing the resources associated with the local node.
///
/// \param local_node_id: ID of local node,
/// \param node_resources: The total and the available resources associated
/// with the local node.
ClusterResourceScheduler(int64_t local_node_id, const NodeResources &node_resources);
/// Add a new node or overwrite the resources of an existing node.
///
/// \param node_id: Node ID.
/// \param node_resources: Up to date total and available resources of the node.
void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param ID of the node to be removed.
bool RemoveNode(int64_t node_id);
/// Check whether a task request can be scheduled given a node.
///
/// \param task_req: Task request to be scheduled.
/// \param node_id: ID of the node.
/// \param resources: Node's resources. (Note: Technically, this is
/// redundant, as we can get the node's resources from nodes_
/// using node_id. However, typically both node_id and resources
/// are available when we call this function, and this way we avoid
/// a map find call which could be expensive.)
///
/// \return: -1, if the request cannot be scheduled. This happens when at
/// least a hard constraints is violated.
/// >= 0, the number soft constraint violations. If 0, no
/// constraint is violated.
int64_t IsSchedulable(const TaskRequest &task_req, int64_t node_id,
const NodeResources &resources);
/// Find a node in the cluster on which we can schedule a given task request.
///
/// First, this function checks whether the local node can schedule
/// the request without violating any constraints. If yes, it returns the
/// ID of the local node.
///
/// If not, this function checks whether there is another node in the cluster
/// that satisfies all request's constraints (both soft and hard).
///
/// If no such node exists, the function checks whether there are nodes
/// that satisfy all the request's hard constraints, but might violate some
/// soft constraints. Among these nodes, it returns a node which violates
/// the least number of soft constraints.
///
/// Finally, if no such node exists, return -1.
///
/// \param task_req: Task to be scheduled.
/// \param violations: The number of soft constraint violations associated
/// with the node returned by this function (assuming
/// a node that can schedule task_req is found).
///
/// \return -1, if no node can schedule the current request; otherwise,
/// return the ID of a node that can schedule the task request.
int64_t GetBestSchedulableNode(const TaskRequest &task_req, int64_t *violations);
/// Update the available resources of a node when a task request is
/// scheduled on the given node.
///
/// \param node_id: ID of node on which request is being scheduled.
/// \param task_req: task request being scheduled.
///
/// \return true, if task_req can be indeed scheduled on the node,
/// and false otherwise.
bool SubtractNodeAvailableResources(int64_t node_id, const TaskRequest &task_req);
/// Return resources associated to the given node_id in ret_resources.
/// If node_id not found, return false; otherwise return true.
bool GetNodeResources(int64_t node_id, NodeResources *ret_resources);
/// Get number of nodes in the cluster.
int64_t NumNodes();
};
#endif // RAY_COMMON_SCHEDULING_SCHEDULING_H

View file

@ -1,8 +1,7 @@
#include "scheduling_ids.h" #include "scheduling_ids.h"
using namespace std;
int64_t StringIdMap::get(const string sid) { int64_t StringIdMap::Get(const std::string &string_id) {
auto it = string_to_int_.find(sid); auto it = string_to_int_.find(string_id);
if (it == string_to_int_.end()) { if (it == string_to_int_.end()) {
return -1; return -1;
} else { } else {
@ -10,19 +9,25 @@ int64_t StringIdMap::get(const string sid) {
} }
}; };
int64_t StringIdMap::insert(const string sid, bool test) { int64_t StringIdMap::Insert(const std::string &string_id, uint8_t max_id) {
auto sit = string_to_int_.find(sid); auto sit = string_to_int_.find(string_id);
if (sit == string_to_int_.end()) { if (sit == string_to_int_.end()) {
int64_t id = test ? hasher_(sid) % 10 : hasher_(sid); int64_t id = hasher_(string_id);
for (int i = 0; true; i++) { if (max_id != 0) {
id = id % MAX_ID_TEST;
}
for (size_t i = 0; true; i++) {
auto it = int_to_string_.find(id); auto it = int_to_string_.find(id);
if (it == int_to_string_.end()) { if (it == int_to_string_.end()) {
/// No hash collision, so associated sid with id. /// No hash collision, so associate string_id with id.
string_to_int_.insert(make_pair(sid, id)); string_to_int_.emplace(string_id, id);
int_to_string_.insert(make_pair(id, sid)); int_to_string_.emplace(id, string_id);
break; break;
} }
id = test ? hasher_(sid + to_string(i)) % 10 : hasher_(sid + to_string(i)); id = hasher_(string_id + std::to_string(i));
if (max_id != 0) {
id = id % max_id;
}
} }
return id; return id;
} else { } else {
@ -30,24 +35,20 @@ int64_t StringIdMap::insert(const string sid, bool test) {
} }
}; };
void StringIdMap::remove(const string sid) { void StringIdMap::Remove(const std::string &string_id) {
auto sit = string_to_int_.find(sid); auto sit = string_to_int_.find(string_id);
if (sit != string_to_int_.end()) { if (sit != string_to_int_.end()) {
uint64_t id = string_to_int_[sid]; int_to_string_.erase(string_to_int_[string_id]);
string_to_int_.erase(sit); string_to_int_.erase(sit);
auto it = int_to_string_.find(id);
int_to_string_.erase(it);
} }
}; };
void StringIdMap::remove(int64_t id) { void StringIdMap::Remove(int64_t id) {
auto it = int_to_string_.find(id); auto it = int_to_string_.find(id);
if (it != int_to_string_.end()) { if (it != int_to_string_.end()) {
string sid = int_to_string_[id]; string_to_int_.erase(int_to_string_[id]);
int_to_string_.erase(it); int_to_string_.erase(it);
auto sit = string_to_int_.find(sid);
string_to_int_.erase(sit);
} }
}; };
int64_t StringIdMap::count() { return string_to_int_.size(); } int64_t StringIdMap::Count() { return string_to_int_.size(); }

View file

@ -1,13 +1,17 @@
#ifndef RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H #ifndef RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H
#define RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H #define RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H
#include "absl/container/flat_hash_map.h"
#include <string> #include <string>
#include <unordered_map>
/// Limit the ID range to test for collisions.
#define MAX_ID_TEST 8
/// Class to map string IDs to unique integer IDs and back. /// Class to map string IDs to unique integer IDs and back.
class StringIdMap { class StringIdMap {
std::unordered_map<std::string, int64_t> string_to_int_; absl::flat_hash_map<std::string, int64_t> string_to_int_;
std::unordered_map<int64_t, std::string> int_to_string_; absl::flat_hash_map<int64_t, std::string> int_to_string_;
std::hash<std::string> hasher_; std::hash<std::string> hasher_;
public: public:
@ -18,28 +22,28 @@ class StringIdMap {
/// ///
/// \param String ID. /// \param String ID.
/// \return The integer ID associated with the given string ID. /// \return The integer ID associated with the given string ID.
int64_t get(const std::string sid); int64_t Get(const std::string &string_id);
/// Insert a string ID and get the associated integer ID. /// Insert a string ID and get the associated integer ID.
/// ///
/// \param String ID to be inserted. /// \param String ID to be inserted.
/// \param test: if "true" it specifies that the range of /// \param max_id The number of unique possible ids. This is used
/// IDs is limited to 0..10 for testing purposes. /// to force collisions for testing. If -1, it is not used.
/// \return The integer ID associated with string ID sid. /// \return The integer ID associated with string ID string_id.
int64_t insert(const std::string sid, bool test = false); int64_t Insert(const std::string &string_id, uint8_t num_ids = 0);
/// Delete an ID identified by its string format. /// Delete an ID identified by its string format.
/// ///
/// \param ID to be deleted. /// \param ID to be deleted.
void remove(const std::string sid); void Remove(const std::string &string_id);
/// Delete an ID identified by its integer format. /// Delete an ID identified by its integer format.
/// ///
/// \param ID to be deleted. /// \param ID to be deleted.
void remove(int64_t id); void Remove(int64_t id);
/// Get number of identifiers. /// Get number of identifiers.
int64_t count(); int64_t Count();
}; };
#endif // RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H #endif // RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H

View file

@ -2,11 +2,144 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include <string> #include <string>
#include <thread>
#include "ray/common/scheduling/cluster_resource_scheduler.h"
#include "ray/common/scheduling/scheduling_ids.h" #include "ray/common/scheduling/scheduling_ids.h"
#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION
#include <chrono>
#include "absl/container/flat_hash_map.h"
#endif // UNORDERED_VS_ABSL_MAPS_EVALUATION
using namespace std; using namespace std;
/// Used to path empty vector argiuments.
vector<int64_t> EmptyIntVector;
vector<bool> EmptyBoolVector;
void initTaskRequest(TaskRequest &tr, vector<int64_t> &pred_demands,
vector<bool> &pred_soft, vector<int64_t> &cust_ids,
vector<int64_t> &cust_demands, vector<bool> &cust_soft,
vector<int64_t> &placement_hints) {
for (size_t i = 0; i < pred_demands.size(); i++) {
ResourceRequest rq;
rq.demand = pred_demands[i];
rq.soft = pred_soft[i];
tr.predefined_resources.push_back(rq);
}
for (size_t i = pred_demands.size(); i < PredefinedResources_MAX; i++) {
ResourceRequest rq;
rq.demand = 0;
rq.soft = 0;
tr.predefined_resources.push_back(rq);
}
for (size_t i = 0; i < cust_ids.size(); i++) {
ResourceRequestWithId rq;
rq.id = cust_ids[i];
rq.req.demand = cust_demands[i];
rq.req.soft = cust_soft[i];
tr.custom_resources.push_back(rq);
}
for (size_t i = 0; i < placement_hints.size(); i++) {
tr.placement_hints.insert(placement_hints[i]);
}
};
void initNodeResources(NodeResources &node, vector<int64_t> &pred_capacities,
vector<int64_t> &cust_ids, vector<int64_t> &cust_capacities) {
for (size_t i = 0; i < pred_capacities.size(); i++) {
ResourceCapacity rc;
rc.total = rc.available = pred_capacities[i];
node.capacities.push_back(rc);
}
if (pred_capacities.size() < PredefinedResources_MAX) {
for (int i = pred_capacities.size(); i < PredefinedResources_MAX; i++) {
ResourceCapacity rc;
rc.total = rc.available = 0;
node.capacities.push_back(rc);
}
}
ResourceCapacity rc;
for (size_t i = 0; i < cust_capacities.size(); i++) {
rc.total = rc.available = cust_capacities[i];
node.custom_resources.insert(pair<int64_t, ResourceCapacity>(cust_ids[i], rc));
}
}
void initCluster(ClusterResourceScheduler &cluster_resources, int n) {
vector<int64_t> pred_capacities;
vector<int64_t> cust_ids;
vector<int64_t> cust_capacities;
int i, k;
for (i = 0; i < n; i++) {
NodeResources node_resources;
for (k = 0; k < PredefinedResources_MAX; k++) {
if (rand() % 3 == 0) {
pred_capacities.push_back(0);
} else {
pred_capacities.push_back(rand() % 10);
}
}
int m = min(rand() % PredefinedResources_MAX, n);
int start = rand() % n;
for (k = 0; k < m; k++) {
cust_ids.push_back((start + k) % n);
cust_capacities.push_back(rand() % 10);
}
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
cluster_resources.AddOrUpdateNode(i, node_resources);
node_resources.custom_resources.clear();
}
}
bool nodeResourcesEqual(const NodeResources &nr1, const NodeResources &nr2) {
if (nr1.capacities.size() != nr2.capacities.size()) {
cout << nr1.capacities.size() << " " << nr2.capacities.size() << endl;
return false;
}
for (size_t i = 0; i < nr1.capacities.size(); i++) {
if (nr1.capacities[i].available != nr2.capacities[i].available) {
return false;
}
if (nr1.capacities[i].total != nr2.capacities[i].total) {
return false;
}
}
if (nr1.custom_resources.size() != nr2.custom_resources.size()) {
return false;
}
auto cr1 = nr1.custom_resources;
auto cr2 = nr2.custom_resources;
for (auto it1 = cr1.begin(); it1 != cr1.end(); ++it1) {
auto it2 = cr2.find(it1->first);
if (it2 == cr2.end()) {
return false;
}
if (it1->second.total != it2->second.total) {
return false;
}
if (it1->second.available != it2->second.available) {
return false;
}
}
return true;
}
namespace ray { namespace ray {
class SchedulingTest : public ::testing::Test { class SchedulingTest : public ::testing::Test {
@ -19,33 +152,413 @@ class SchedulingTest : public ::testing::Test {
TEST_F(SchedulingTest, SchedulingIdTest) { TEST_F(SchedulingTest, SchedulingIdTest) {
StringIdMap ids; StringIdMap ids;
hash<string> hasher; hash<string> hasher;
int num = 10; // should be greater than 10. size_t num = 10; // should be greater than 10.
for (int i = 0; i < num; i++) { for (size_t i = 0; i < num; i++) {
ids.insert(to_string(i)); ids.Insert(to_string(i));
} }
ASSERT_EQ(ids.count(), num); ASSERT_EQ(ids.Count(), num);
ids.remove(to_string(1)); ids.Remove(to_string(1));
ASSERT_EQ(ids.count(), num - 1); ASSERT_EQ(ids.Count(), num - 1);
ids.remove(hasher(to_string(2))); ids.Remove(hasher(to_string(2)));
ASSERT_EQ(ids.count(), num - 2); ASSERT_EQ(ids.Count(), num - 2);
ASSERT_TRUE(ids.get(to_string(3)) == static_cast<int64_t>(hasher(to_string(3)))); ASSERT_TRUE(ids.Get(to_string(3)) == static_cast<int64_t>(hasher(to_string(3))));
ASSERT_TRUE(ids.get(to_string(100)) == -1); ASSERT_TRUE(ids.Get(to_string(100)) == -1);
/// Test for handling collision. /// Test for handling collision.
StringIdMap short_ids; StringIdMap short_ids;
for (int i = 0; i < 10; i++) { uint8_t max_id = 8;
/// "true" reduces the range of IDs to [0..9] for (size_t i = 0; i < max_id; i++) {
int64_t id = short_ids.insert(to_string(i), true); int64_t id = short_ids.Insert(to_string(i), max_id);
ASSERT_TRUE(id < 10); ASSERT_TRUE(id < max_id);
} }
ASSERT_EQ(short_ids.count(), 10); ASSERT_EQ(short_ids.Count(), max_id);
} }
TEST_F(SchedulingTest, SchedulingInitClusterTest) {
int num_nodes = 10;
ClusterResourceScheduler cluster_resources;
initCluster(cluster_resources, num_nodes);
ASSERT_EQ(cluster_resources.NumNodes(), num_nodes);
}
TEST_F(SchedulingTest, SchedulingDeleteClusterNodeTest) {
int num_nodes = 4;
int64_t remove_id = 2;
ClusterResourceScheduler cluster_resources;
initCluster(cluster_resources, num_nodes);
cluster_resources.RemoveNode(remove_id);
ASSERT_TRUE(num_nodes - 1 == cluster_resources.NumNodes());
}
TEST_F(SchedulingTest, SchedulingModifyClusterNodeTest) {
int num_nodes = 4;
int64_t update_id = 2;
ClusterResourceScheduler cluster_resources;
initCluster(cluster_resources, num_nodes);
NodeResources node_resources;
vector<int64_t> pred_capacities;
vector<int64_t> cust_ids;
vector<int64_t> cust_capacities;
int k;
for (k = 0; k < PredefinedResources_MAX; k++) {
if (rand() % 3 == 0) {
pred_capacities.push_back(0);
} else {
pred_capacities.push_back(rand() % 10);
}
}
int m = min(rand() % PredefinedResources_MAX, num_nodes);
int start = rand() % num_nodes;
for (k = 0; k < m; k++) {
cust_ids.push_back((start + k) % num_nodes);
cust_capacities.push_back(rand() % 10);
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
cluster_resources.AddOrUpdateNode(update_id, node_resources);
}
ASSERT_TRUE(num_nodes == cluster_resources.NumNodes());
}
TEST_F(SchedulingTest, SchedulingUpdateAvailableResourcesTest) {
/// Create cluster resources.
NodeResources node_resources;
vector<int64_t> pred_capacities{10, 5, 3};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(1, node_resources);
{
TaskRequest task_req;
#define PRED_CUSTOM_LEN 2
vector<int64_t> pred_demands{7, 7};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_demands{3, 10};
vector<bool> cust_soft{false, true};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);
NodeResources nr1, nr2;
ASSERT_TRUE(cluster_resources.GetNodeResources(node_id, &nr1));
cluster_resources.SubtractNodeAvailableResources(node_id, task_req);
ASSERT_TRUE(cluster_resources.GetNodeResources(node_id, &nr2));
for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) {
int64_t t = nr1.capacities[i].available - task_req.predefined_resources[i].demand;
if (t < 0) t = 0;
ASSERT_EQ(nr2.capacities[i].available, t);
}
for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) {
auto it1 = nr1.custom_resources.find(task_req.custom_resources[i].id);
if (it1 != nr1.custom_resources.end()) {
auto it2 = nr2.custom_resources.find(task_req.custom_resources[i].id);
if (it2 != nr2.custom_resources.end()) {
int64_t t = it1->second.available - task_req.custom_resources[i].req.demand;
if (t < 0) t = 0;
ASSERT_EQ(it2->second.available, t);
}
}
}
}
}
TEST_F(SchedulingTest, SchedulingAddOrUpdateNodeTest) {
ClusterResourceScheduler cluster_resources;
NodeResources nr, nr_out;
int64_t node_id = 1;
/// Add node.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{10, 5, 3};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
cluster_resources.AddOrUpdateNode(node_id, node_resources);
nr = node_resources;
}
/// Check whether node resources were correctly added.
if (cluster_resources.GetNodeResources(node_id, &nr_out)) {
ASSERT_TRUE(nodeResourcesEqual(nr, nr_out));
} else {
ASSERT_TRUE(false);
}
/// Update node.
{
NodeResources node_resources;
vector<int64_t> pred_capacities{10, 10};
vector<int64_t> cust_ids{2, 3};
vector<int64_t> cust_capacities{6, 6};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
cluster_resources.AddOrUpdateNode(node_id, node_resources);
nr = node_resources;
}
if (cluster_resources.GetNodeResources(node_id, &nr_out)) {
ASSERT_TRUE(nodeResourcesEqual(nr, nr_out));
} else {
ASSERT_TRUE(false);
}
}
TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
/// Create cluster resources containing local node.
NodeResources node_resources;
vector<int64_t> pred_capacities{5, 5};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_capacities{10};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
ClusterResourceScheduler cluster_resources(0, node_resources);
{
NodeResources node_resources;
vector<int64_t> pred_capacities{10, 2, 3};
vector<int64_t> cust_ids{1, 2};
vector<int64_t> cust_capacities{5, 5};
initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities);
cluster_resources.AddOrUpdateNode(1, node_resources);
}
/// Predefined resources, hard constraint violation
{
TaskRequest task_req;
vector<int64_t> pred_demands = {11};
vector<bool> pred_soft = {false};
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyIntVector,
EmptyBoolVector, EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_EQ(node_id, -1);
}
/// Predefined resources, soft constraint violation
{
TaskRequest task_req;
vector<int64_t> pred_demands = {11};
vector<bool> pred_soft = {true};
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyIntVector,
EmptyBoolVector, EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);
}
/// Predefined resources, no constraint violation.
{
TaskRequest task_req;
vector<int64_t> pred_demands = {5};
vector<bool> pred_soft = {false};
initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyIntVector,
EmptyBoolVector, EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations == 0);
}
/// Custom resources, hard constraint violation.
{
TaskRequest task_req;
vector<int64_t> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_demands{11};
vector<bool> cust_soft{false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id == -1);
}
/// Custom resources, soft constraint violation.
{
TaskRequest task_req;
vector<int64_t> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_demands{11};
vector<bool> cust_soft{true};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);
}
/// Custom resources, no constraint violation.
{
TaskRequest task_req;
vector<int64_t> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_demands{5};
vector<bool> cust_soft{false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations == 0);
}
/// Custom resource missing, hard constraint violation.
{
TaskRequest task_req;
vector<int64_t> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{100};
vector<int64_t> cust_demands{5};
vector<bool> cust_soft{false};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id == -1);
}
/// Custom resource missing, soft constraint violation.
{
TaskRequest task_req;
vector<int64_t> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{100};
vector<int64_t> cust_demands{5};
vector<bool> cust_soft{true};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
EmptyIntVector);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);
}
/// Placement_hints, soft constraint violation.
{
TaskRequest task_req;
vector<int64_t> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_demands{5};
vector<bool> cust_soft{true};
vector<int64_t> placement_hints{2, 3};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
placement_hints);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations > 0);
}
/// Placement hints, no constraint violation.
{
TaskRequest task_req;
vector<int64_t> pred_demands{5, 2};
vector<bool> pred_soft{false, true};
vector<int64_t> cust_ids{1};
vector<int64_t> cust_demands{5};
vector<bool> cust_soft{true};
vector<int64_t> placement_hints{1, 2, 3};
initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft,
placement_hints);
int64_t violations;
int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations);
ASSERT_TRUE(node_id != -1);
ASSERT_TRUE(violations == 0);
}
}
#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION
TEST_F(SchedulingTest, SchedulingMapPerformanceTest) {
size_t map_len = 1000000;
unordered_map<int64_t, int64_t> umap_int_key;
unordered_map<string, int64_t> umap_string_key;
absl::flat_hash_map<int64_t, int64_t> amap_int_key;
absl::flat_hash_map<string, int64_t> amap_string_key;
vector<string> search_key_strings;
vector<int64_t> search_key_ints;
for (size_t i = 0; i < map_len; i++) {
int id = rand() % map_len;
search_key_strings.push_back(to_string(id));
search_key_ints.push_back(id);
umap_int_key.emplace(i, i);
umap_string_key.emplace(to_string(i), i);
amap_int_key.emplace(i, i);
amap_string_key.emplace(to_string(i), i);
}
for (size_t i = 0; i < 25; i++) {
cout << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" << endl;
}
int64_t sum;
auto t_start = std::chrono::high_resolution_clock::now();
sum = 0;
for (size_t i = 0; i < map_len; i++) {
auto it = umap_int_key.find(search_key_ints[i]);
if (it != umap_int_key.end()) {
sum += it->second;
}
}
auto t_end = std::chrono::high_resolution_clock::now();
double duration = std::chrono::duration<double, std::milli>(t_end - t_start).count();
cout << "sum = " << sum << " in " << duration << endl;
t_start = std::chrono::high_resolution_clock::now();
sum = 0;
for (size_t i = 0; i < map_len; i++) {
auto it = umap_string_key.find(search_key_strings[i]);
if (it != umap_string_key.end()) {
sum += it->second;
}
}
t_end = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration<double, std::milli>(t_end - t_start).count();
cout << "sum = " << sum << " in " << duration << endl;
t_start = std::chrono::high_resolution_clock::now();
sum = 0;
for (size_t i = 0; i < map_len; i++) {
auto it = amap_int_key.find(search_key_ints[i]);
if (it != amap_int_key.end()) {
sum += it->second;
}
}
t_end = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration<double, std::milli>(t_end - t_start).count();
cout << "sum = " << sum << " in " << duration << endl;
t_start = std::chrono::high_resolution_clock::now();
sum = 0;
for (size_t i = 0; i < map_len; i++) {
auto it = amap_string_key.find(search_key_strings[i]);
if (it != amap_string_key.end()) {
sum += it->second;
}
}
t_end = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration<double, std::milli>(t_end - t_start).count();
cout << "sum = " << sum << " in " << duration << endl;
}
#endif // UNORDERED_VS_ABSL_MAPS_EVALUATION
} // namespace ray } // namespace ray
int main(int argc, char **argv) { int main(int argc, char **argv) {