Refactor helper methods for new scheduler integration (#6354)

This commit is contained in:
Ion 2019-12-05 18:49:25 -08:00 committed by Eric Liang
parent f63b64310a
commit 1c638a11a7
5 changed files with 126 additions and 77 deletions

View file

@ -94,6 +94,14 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_stress_direct",
size = "medium",
srcs = ["test_stress_direct.py", "test_stress.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_stress_sharded",
size = "medium",
@ -102,6 +110,14 @@ py_test(
deps = ["//:ray_lib"],
)
py_test(
name = "test_stress_sharded_direct",
size = "medium",
srcs = ["test_stress_sharded_direct.py", "test_stress_sharded.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_stress_failure",
size = "large",

View file

@ -0,0 +1,16 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main(
["-v",
os.path.join(os.path.dirname(__file__), "test_stress.py")]))

View file

@ -0,0 +1,17 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main([
"-v",
os.path.join(os.path.dirname(__file__), "test_stress_sharded.py")
]))

View file

@ -1,5 +1,73 @@
#include "cluster_resource_scheduler.h"
std::string NodeResources::DebugString() {
std::stringstream buffer;
buffer << " node predefined resources {";
for (size_t i = 0; i < this->capacities.size(); i++) {
buffer << "(" << this->capacities[i].total << ":" << this->capacities[i].available
<< ") ";
}
buffer << "}" << std::endl;
buffer << " node custom resources {";
for (auto it = this->custom_resources.begin(); it != this->custom_resources.end();
++it) {
buffer << it->first << ":(" << it->second.total << ":" << it->second.available
<< ") ";
}
buffer << "}" << std::endl;
return buffer.str();
}
std::string TaskRequest::DebugString() {
std::stringstream buffer;
buffer << std::endl << " request predefined resources {";
for (size_t i = 0; i < this->predefined_resources.size(); i++) {
buffer << "(" << this->predefined_resources[i].demand << ":"
<< this->predefined_resources[i].soft << ") ";
}
buffer << "}" << std::endl;
buffer << " request custom resources {";
for (size_t i = 0; i < this->custom_resources.size(); i++) {
buffer << this->custom_resources[i].id << ":"
<< "(" << this->custom_resources[i].req.demand << ":"
<< this->custom_resources[i].req.soft << ") ";
}
buffer << "}" << std::endl;
return buffer.str();
}
bool NodeResources::operator==(const NodeResources &other) {
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (this->capacities[i].total != other.capacities[i].total) {
return false;
}
if (this->capacities[i].available != other.capacities[i].available) {
return false;
}
}
if (this->custom_resources.size() != other.custom_resources.size()) {
return true;
}
for (auto it1 = this->custom_resources.begin(); it1 != this->custom_resources.end();
++it1) {
auto it2 = other.custom_resources.find(it1->first);
if (it2 == other.custom_resources.end()) {
return false;
}
if (it1->second.total != it2->second.total) {
return false;
}
if (it1->second.available != it2->second.available) {
return false;
}
}
return true;
}
ClusterResourceScheduler::ClusterResourceScheduler(
int64_t local_node_id, const NodeResources &local_node_resources)
: local_node_id_(local_node_id) {
@ -408,85 +476,12 @@ void ClusterResourceScheduler::DeleteResource(const std::string &client_id_strin
}
}
bool ClusterResourceScheduler::EqualNodeResources(const NodeResources &node_resources1,
const NodeResources &node_resources2) {
for (size_t i = 0; i < PredefinedResources_MAX; i++) {
if (node_resources1.capacities[i].total != node_resources2.capacities[i].total) {
return false;
}
if (node_resources1.capacities[i].available !=
node_resources2.capacities[i].available) {
return false;
}
}
if (node_resources1.custom_resources.size() !=
node_resources2.custom_resources.size()) {
return true;
}
for (auto it1 = node_resources1.custom_resources.begin();
it1 != node_resources1.custom_resources.end(); ++it1) {
auto it2 = node_resources2.custom_resources.find(it1->first);
if (it2 == node_resources2.custom_resources.end()) {
return false;
}
if (it1->second.total != it2->second.total) {
return false;
}
if (it1->second.available != it2->second.available) {
return false;
}
}
return true;
}
std::string ClusterResourceScheduler::NodeResourcesDebugString(
const NodeResources &node_resources) {
std::stringstream buffer;
buffer << " node predefined resources {";
for (size_t i = 0; i < node_resources.capacities.size(); i++) {
buffer << "(" << node_resources.capacities[i].total << ":"
<< node_resources.capacities[i].available << ") ";
}
buffer << "}" << std::endl;
buffer << " node custom resources {";
for (auto it = node_resources.custom_resources.begin();
it != node_resources.custom_resources.end(); ++it) {
buffer << it->first << ":(" << it->second.total << ":" << it->second.available
<< ") ";
}
buffer << "}" << std::endl;
return buffer.str();
}
std::string ClusterResourceScheduler::TaskRequestDebugString(
const TaskRequest &task_request) {
std::stringstream buffer;
buffer << std::endl << " request predefined resources {";
for (size_t i = 0; i < task_request.predefined_resources.size(); i++) {
buffer << "(" << task_request.predefined_resources[i].demand << ":"
<< task_request.predefined_resources[i].soft << ") ";
}
buffer << "}" << std::endl;
buffer << " request custom resources {";
for (size_t i = 0; i < task_request.custom_resources.size(); i++) {
buffer << task_request.custom_resources[i].id << ":"
<< "(" << task_request.custom_resources[i].req.demand << ":"
<< task_request.custom_resources[i].req.soft << ") ";
}
buffer << "}" << std::endl;
return buffer.str();
}
std::string ClusterResourceScheduler::DebugString(void) {
std::stringstream buffer;
buffer << std::endl << "local node id: " << local_node_id_ << std::endl;
for (auto it = nodes_.begin(); it != nodes_.end(); ++it) {
buffer << "node id: " << it->first << std::endl;
buffer << NodeResourcesDebugString(it->second);
buffer << it->second.DebugString();
}
return buffer.str();
}

View file

@ -43,6 +43,10 @@ struct NodeResources {
/// Map containing custom resources. The key of each entry represents the
/// custom resource ID.
absl::flat_hash_map<int64_t, ResourceCapacity> custom_resources;
/// Returns if this equals another node resources.
bool operator==(const NodeResources &other);
/// Returns human-readable string for these resources.
std::string DebugString();
};
struct TaskRequest {
@ -55,6 +59,8 @@ struct TaskRequest {
/// the task will run on a different node in the cluster, if none of the
/// nodes in this list can schedule this task.
absl::flat_hash_set<int64_t> placement_hints;
/// Returns human-readable string for this task request.
std::string DebugString();
};
/// Class encapsulating the cluster resources and the logic to assign
@ -84,9 +90,8 @@ class ClusterResourceScheduler {
const absl::flat_hash_map<int64_t, ResourceCapacity> &new_custom_resources,
absl::flat_hash_map<int64_t, ResourceCapacity> *old_custom_resources);
std::string TaskRequestDebugString(const TaskRequest &task_request);
std::string NodeResourcesDebugString(const NodeResources &node_resources);
std::string DebugString(void);
/// Returns human-readable string for this scheduler.
std::string DebugString();
public:
ClusterResourceScheduler(void){};