[core] Report resource load by shape (#9806)

* Report and aggregate resource load by shape

* python test

* python test

* x

* update
This commit is contained in:
Stephanie Wang 2020-07-31 16:57:30 -07:00 committed by GitHub
parent 3506910c5d
commit 37a9c5783c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 283 additions and 47 deletions

View file

@ -1,3 +1,4 @@
import json
import pytest
try:
import pytest_timeout
@ -6,6 +7,8 @@ except ImportError:
import time
import ray
import ray.ray_constants
import ray.test_utils
# TODO(rliaw): The proper way to do this is to have the pytest config setup.
@ -130,6 +133,88 @@ def test_global_state_actor_entry(ray_start_regular):
actor_id=b_actor_id)["State"] == ray.gcs_utils.ActorTableData.ALIVE
@pytest.mark.parametrize("max_shapes", [0, 2, -1])
def test_load_report(shutdown_only, max_shapes):
resource1 = "A"
resource2 = "B"
cluster = ray.init(
num_cpus=1,
resources={resource1: 1},
_internal_config=json.dumps({
"max_resource_shapes_per_load_report": max_shapes,
}))
redis = ray.services.create_redis_client(
cluster["redis_address"],
password=ray.ray_constants.REDIS_DEFAULT_PASSWORD)
client = redis.pubsub(ignore_subscribe_messages=True)
client.psubscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN)
@ray.remote
def sleep():
time.sleep(1000)
sleep.remote()
for _ in range(3):
sleep.remote()
sleep.options(resources={resource1: 1}).remote()
sleep.options(resources={resource2: 1}).remote()
class Checker:
def __init__(self):
self.report = None
def check_load_report(self):
try:
message = client.get_message()
except redis.exceptions.ConnectionError:
pass
if message is None:
return False
pattern = message["pattern"]
data = message["data"]
if pattern != ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN:
return False
pub_message = ray.gcs_utils.PubSubMessage.FromString(data)
heartbeat_data = pub_message.data
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(
heartbeat_data)
self.report = heartbeat.resource_load_by_shape.resource_demands
if max_shapes == 0:
return True
elif max_shapes == 2:
return len(self.report) >= 2
else:
return len(self.report) >= 3
# Wait for load information to arrive.
checker = Checker()
ray.test_utils.wait_for_condition(checker.check_load_report)
# Check that we respect the max shapes limit.
if max_shapes != -1:
assert len(checker.report) <= max_shapes
if max_shapes > 0:
# Check that we always include the 1-CPU resource shape.
one_cpu_shape = {"CPU": 1}
one_cpu_found = False
for demand in checker.report:
if demand.shape == one_cpu_shape:
one_cpu_found = True
assert one_cpu_found
# Check that we differentiate between infeasible and ready tasks.
for demand in checker.report:
if resource2 in demand.shape:
assert demand.num_infeasible_requests_queued > 0
assert demand.num_ready_requests_queued == 0
else:
assert demand.num_ready_requests_queued > 0
assert demand.num_infeasible_requests_queued == 0
if __name__ == "__main__":
import pytest
import sys

View file

@ -333,3 +333,7 @@ RAY_CONFIG(int, metrics_agent_port, -1)
/// the owner has been granted a lease. A value >1 is used when we want to enable
/// pipelining task submission.
RAY_CONFIG(uint32_t, max_tasks_in_flight_per_worker, 1)
/// The maximum number of resource shapes included in the resource
/// load reported by each raylet.
RAY_CONFIG(int64_t, max_resource_shapes_per_load_report, 100)

View file

@ -21,6 +21,28 @@ SchedulingClassDescriptor &TaskSpecification::GetSchedulingClassDescriptor(
return it->second;
}
SchedulingClass TaskSpecification::GetSchedulingClass(const ResourceSet &sched_cls) {
SchedulingClass sched_cls_id;
absl::MutexLock lock(&mutex_);
auto it = sched_cls_to_id_.find(sched_cls);
if (it == sched_cls_to_id_.end()) {
sched_cls_id = ++next_sched_id_;
// TODO(ekl) we might want to try cleaning up task types in these cases
if (sched_cls_id > 100) {
RAY_LOG(WARNING) << "More than " << sched_cls_id
<< " types of tasks seen, this may reduce performance.";
} else if (sched_cls_id > 1000) {
RAY_LOG(ERROR) << "More than " << sched_cls_id
<< " types of tasks seen, this may reduce performance.";
}
sched_cls_to_id_[sched_cls] = sched_cls_id;
sched_id_to_cls_[sched_cls_id] = sched_cls;
} else {
sched_cls_id = it->second;
}
return sched_cls_id;
}
void TaskSpecification::ComputeResources() {
auto required_resources = MapFromProtobuf(message_->required_resources());
auto required_placement_resources =
@ -48,23 +70,7 @@ void TaskSpecification::ComputeResources() {
// Map the scheduling class descriptor to an integer for performance.
auto sched_cls = GetRequiredResources();
absl::MutexLock lock(&mutex_);
auto it = sched_cls_to_id_.find(sched_cls);
if (it == sched_cls_to_id_.end()) {
sched_cls_id_ = ++next_sched_id_;
// TODO(ekl) we might want to try cleaning up task types in these cases
if (sched_cls_id_ > 100) {
RAY_LOG(WARNING) << "More than " << sched_cls_id_
<< " types of tasks seen, this may reduce performance.";
} else if (sched_cls_id_ > 1000) {
RAY_LOG(ERROR) << "More than " << sched_cls_id_
<< " types of tasks seen, this may reduce performance.";
}
sched_cls_to_id_[sched_cls] = sched_cls_id_;
sched_id_to_cls_[sched_cls_id_] = sched_cls;
} else {
sched_cls_id_ = it->second;
}
sched_cls_id_ = GetSchedulingClass(sched_cls);
}
}

View file

@ -183,8 +183,12 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {
// A one-word summary of the task func as a call site (e.g., __main__.foo).
std::string CallSiteString() const;
// Lookup the resource shape that corresponds to the static key.
static SchedulingClassDescriptor &GetSchedulingClassDescriptor(SchedulingClass id);
// Compute a static key that represents the given resource shape.
static SchedulingClass GetSchedulingClass(const ResourceSet &sched_cls);
private:
void ComputeResources();

View file

@ -89,8 +89,31 @@ void GcsNodeManager::NodeFailureDetector::DetectDeadNodes() {
void GcsNodeManager::NodeFailureDetector::SendBatchedHeartbeat() {
if (!heartbeat_buffer_.empty()) {
auto batch = std::make_shared<rpc::HeartbeatBatchTableData>();
for (const auto &heartbeat : heartbeat_buffer_) {
batch->add_batch()->CopyFrom(heartbeat.second);
std::unordered_map<ResourceSet, rpc::ResourceDemand> aggregate_load;
for (auto &heartbeat : heartbeat_buffer_) {
// Aggregate the load reported by each raylet.
auto load = heartbeat.second.resource_load_by_shape();
for (const auto &demand : load.resource_demands()) {
auto scheduling_key = ResourceSet(MapFromProtobuf(demand.shape()));
auto &aggregate_demand = aggregate_load[scheduling_key];
aggregate_demand.set_num_ready_requests_queued(
aggregate_demand.num_ready_requests_queued() +
demand.num_ready_requests_queued());
aggregate_demand.set_num_infeasible_requests_queued(
aggregate_demand.num_infeasible_requests_queued() +
demand.num_infeasible_requests_queued());
}
heartbeat.second.clear_resource_load_by_shape();
batch->add_batch()->Swap(&heartbeat.second);
}
for (auto &demand : aggregate_load) {
auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands();
demand_proto->Swap(&demand.second);
for (const auto &resource_pair : demand.first.GetResourceMap()) {
(*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second;
}
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(HEARTBEAT_BATCH_CHANNEL, "",

View file

@ -253,6 +253,26 @@ message GcsNodeInfo {
string node_manager_hostname = 8;
}
// Represents the demand for a particular resource shape.
message ResourceDemand {
// The resource shape requested. This is a map from the resource string
// (e.g., "CPU") to the amount requested.
map<string, double> shape = 1;
// The number of requests that are ready to run (i.e., dependencies have been
// fulfilled), but that are waiting for resources.
uint64 num_ready_requests_queued = 2;
// The number of requests for which there is no node that is a superset of
// the requested resource shape.
uint64 num_infeasible_requests_queued = 3;
}
// Represents the demand sorted by resource shape.
message ResourceLoad {
// A list of all resource demands. The resource shape in each demand is
// unique.
repeated ResourceDemand resource_demands = 1;
}
message HeartbeatTableData {
// Node manager client id
bytes client_id = 1;
@ -262,14 +282,19 @@ message HeartbeatTableData {
map<string, double> resources_total = 3;
// Aggregate outstanding resource load on this node manager.
map<string, double> resource_load = 4;
// The resource load on this node, sorted by resource shape.
ResourceLoad resource_load_by_shape = 5;
// Object IDs that are in use by workers on this node manager's node.
repeated bytes active_object_id = 5;
repeated bytes active_object_id = 6;
// Whether this node manager is requesting global GC.
bool should_global_gc = 6;
bool should_global_gc = 7;
}
message HeartbeatBatchTableData {
repeated HeartbeatTableData batch = 1;
// The total resource demand on all nodes included in the batch, sorted by
// resource shape.
ResourceLoad resource_load_by_shape = 2;
}
// Data for a lease on task execution.

View file

@ -418,7 +418,7 @@ void NodeManager::Heartbeat() {
ResourceSet(local_resources.GetTotalResources()));
}
local_resources.SetLoadResources(local_queues_.GetResourceLoad());
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
if (!last_heartbeat_resources_.GetLoadResources().IsEqual(
local_resources.GetLoadResources())) {
for (const auto &resource_pair :
@ -447,7 +447,7 @@ void NodeManager::Heartbeat() {
last_heartbeat_resources_.SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
local_resources.SetLoadResources(local_queues_.GetResourceLoad());
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
@ -457,6 +457,11 @@ void NodeManager::Heartbeat() {
ResourceSet(local_resources.GetLoadResources()));
}
// Add resource load by shape. This will be used by the new autoscaler.
auto resource_load = local_queues_.GetResourceLoadByShape(
RayConfig::instance().max_resource_shapes_per_load_report());
heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load);
// Set the global gc bit on the outgoing heartbeat message.
if (should_global_gc_) {
heartbeat_data->set_should_global_gc(true);
@ -1367,7 +1372,7 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
ScheduleAndDispatch();
} else {
cluster_resource_map_[self_node_id_].SetLoadResources(
local_queues_.GetResourceLoad());
local_queues_.GetTotalResourceLoad());
// Call task dispatch to assign work to the new worker.
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
@ -1990,7 +1995,7 @@ ResourceIdSet NodeManager::ScheduleBundle(
const BundleSpecification &bundle_spec) {
// If the resource map contains the local raylet, update load before calling policy.
if (resource_map.count(self_node_id_) > 0) {
resource_map[self_node_id_].SetLoadResources(local_queues_.GetResourceLoad());
resource_map[self_node_id_].SetLoadResources(local_queues_.GetTotalResourceLoad());
}
// Invoke the scheduling policy.
auto reserve_resource_success =
@ -2015,7 +2020,7 @@ void NodeManager::ScheduleTasks(
std::unordered_map<ClientID, SchedulingResources> &resource_map) {
// If the resource map contains the local raylet, update load before calling policy.
if (resource_map.count(self_node_id_) > 0) {
resource_map[self_node_id_].SetLoadResources(local_queues_.GetResourceLoad());
resource_map[self_node_id_].SetLoadResources(local_queues_.GetTotalResourceLoad());
}
// Invoke the scheduling policy.
auto policy_decision = scheduling_policy_.Schedule(resource_map, self_node_id_);

View file

@ -723,6 +723,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// The time that we last sent a FreeObjects request to other nodes for
/// objects that have gone out of scope in the application.
uint64_t last_free_objects_at_ms_;
/// The number of heartbeats that we should wait before sending the
/// next load report.
uint8_t num_heartbeats_before_load_report_;
/// Initial node manager configuration.
const NodeManagerConfig initial_config_;
/// The resources (and specific resource IDs) that are currently available.

View file

@ -70,7 +70,8 @@ bool TaskQueue::AppendTask(const TaskID &task_id, const Task &task) {
auto list_iterator = task_list_.insert(task_list_.end(), task);
task_map_[task_id] = list_iterator;
// Resource bookkeeping
current_resource_load_.AddResources(task.GetTaskSpecification().GetRequiredResources());
total_resource_load_.AddResources(task.GetTaskSpecification().GetRequiredResources());
resource_load_by_shape_[task.GetTaskSpecification().GetSchedulingClass()]++;
return true;
}
@ -80,15 +81,20 @@ bool TaskQueue::RemoveTask(const TaskID &task_id, std::vector<Task> *removed_tas
return false;
}
auto list_iterator = task_found_iterator->second;
auto it = task_found_iterator->second;
// Resource bookkeeping
current_resource_load_.SubtractResourcesStrict(
list_iterator->GetTaskSpecification().GetRequiredResources());
total_resource_load_.SubtractResourcesStrict(
it->GetTaskSpecification().GetRequiredResources());
auto scheduling_class = it->GetTaskSpecification().GetSchedulingClass();
resource_load_by_shape_[scheduling_class]--;
if (resource_load_by_shape_[scheduling_class] == 0) {
resource_load_by_shape_.erase(scheduling_class);
}
if (removed_tasks) {
removed_tasks->push_back(std::move(*list_iterator));
removed_tasks->push_back(std::move(*it));
}
task_map_.erase(task_found_iterator);
task_list_.erase(list_iterator);
task_list_.erase(it);
return true;
}
@ -104,8 +110,13 @@ const Task &TaskQueue::GetTask(const TaskID &task_id) const {
return *it->second;
}
const ResourceSet &TaskQueue::GetCurrentResourceLoad() const {
return current_resource_load_;
const ResourceSet &TaskQueue::GetTotalResourceLoad() const {
return total_resource_load_;
}
const std::unordered_map<SchedulingClass, uint64_t> &TaskQueue::GetResourceLoadByShape()
const {
return resource_load_by_shape_;
}
bool ReadyQueue::AppendTask(const TaskID &task_id, const Task &task) {
@ -144,14 +155,71 @@ const Task &SchedulingQueue::GetTaskOfState(const TaskID &task_id,
return queue->GetTask(task_id);
}
ResourceSet SchedulingQueue::GetResourceLoad() const {
auto load = ready_queue_->GetCurrentResourceLoad();
ResourceSet SchedulingQueue::GetTotalResourceLoad() const {
auto load = ready_queue_->GetTotalResourceLoad();
// Also take into account infeasible tasks so they show up for autoscaling.
load.AddResources(
task_queues_[static_cast<int>(TaskState::INFEASIBLE)]->GetCurrentResourceLoad());
task_queues_[static_cast<int>(TaskState::INFEASIBLE)]->GetTotalResourceLoad());
return load;
}
rpc::ResourceLoad SchedulingQueue::GetResourceLoadByShape(int64_t max_shapes) const {
std::unordered_map<SchedulingClass, rpc::ResourceDemand> load;
auto infeasible_queue_load =
task_queues_[static_cast<int>(TaskState::INFEASIBLE)]->GetResourceLoadByShape();
auto ready_queue_load = ready_queue_->GetResourceLoadByShape();
size_t max_shapes_to_add = ready_queue_load.size() + infeasible_queue_load.size();
if (max_shapes >= 0) {
max_shapes_to_add = max_shapes;
}
// Always collect the 1-CPU resource shape stats, if the specified max shapes
// allows.
static const ResourceSet one_cpu_resource_set(
std::unordered_map<std::string, double>({{kCPU_ResourceLabel, 1}}));
static const SchedulingClass one_cpu_scheduling_cls(
TaskSpecification::GetSchedulingClass(one_cpu_resource_set));
if (max_shapes_to_add > 0) {
if (infeasible_queue_load.count(one_cpu_scheduling_cls) > 0) {
load[one_cpu_scheduling_cls].set_num_infeasible_requests_queued(
infeasible_queue_load.at(one_cpu_scheduling_cls));
}
if (ready_queue_load.count(one_cpu_scheduling_cls) > 0) {
load[one_cpu_scheduling_cls].set_num_ready_requests_queued(
ready_queue_load.at(one_cpu_scheduling_cls));
}
}
// Collect the infeasible queue's load.
auto infeasible_it = infeasible_queue_load.begin();
while (infeasible_it != infeasible_queue_load.end() &&
load.size() < max_shapes_to_add) {
load[infeasible_it->first].set_num_infeasible_requests_queued(infeasible_it->second);
infeasible_it++;
}
// Collect the ready queue's load.
auto ready_it = ready_queue_load.begin();
while (ready_it != ready_queue_load.end() && load.size() < max_shapes_to_add) {
load[ready_it->first].set_num_ready_requests_queued(ready_it->second);
ready_it++;
}
// Set the resource shapes.
rpc::ResourceLoad load_proto;
for (auto &demand : load) {
auto demand_proto = load_proto.add_resource_demands();
demand_proto->Swap(&demand.second);
const auto &resource_map =
TaskSpecification::GetSchedulingClassDescriptor(demand.first).GetResourceMap();
for (const auto &resource_pair : resource_map) {
(*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second;
}
}
return load_proto;
}
const std::unordered_set<TaskID> &SchedulingQueue::GetBlockedTaskIds() const {
return blocked_task_ids_;
}

View file

@ -23,6 +23,7 @@
#include "ray/common/task/task.h"
#include "ray/util/logging.h"
#include "ray/util/ordered_set.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
@ -106,10 +107,16 @@ class TaskQueue {
/// \return The task.
const Task &GetTask(const TaskID &task_id) const;
/// \brief Get the total resources required by the tasks in the queue.
/// \brief Return all resource demand associated with the ready queue.
///
/// \return Total resources required by the tasks in the queue.
const ResourceSet &GetCurrentResourceLoad() const;
/// \return Aggregate resource demand from ready tasks.
const ResourceSet &GetTotalResourceLoad() const;
/// \brief Get the resources required by the tasks in the queue.
///
/// \return A map from resource shape key to the number of tasks queued that
/// require that shape.
const std::unordered_map<SchedulingClass, uint64_t> &GetResourceLoadByShape() const;
protected:
/// A list of tasks.
@ -117,7 +124,11 @@ class TaskQueue {
/// A hash to speed up looking up a task.
std::unordered_map<TaskID, std::list<Task>::iterator> task_map_;
/// Aggregate resources of all the tasks in this queue.
ResourceSet current_resource_load_;
ResourceSet total_resource_load_;
/// Required resources for all the tasks in this queue. This is a
/// map from resource shape key to number of tasks queued that require that
/// shape.
std::unordered_map<SchedulingClass, uint64_t> resource_load_by_shape_;
};
class ReadyQueue : public TaskQueue {
@ -210,7 +221,14 @@ class SchedulingQueue {
///
/// \return A resource set with aggregate resource information about resource load on
/// this raylet.
ResourceSet GetResourceLoad() const;
ResourceSet GetTotalResourceLoad() const;
/// \brief Return a summary of the requests in the ready and infeasible
/// queues.
///
/// \return A message summarizing the number of requests, sorted by shape, in
/// the ready and infeasible queues.
rpc::ResourceLoad GetResourceLoadByShape(int64_t max_shapes = -1) const;
/// Get the tasks in the blocked state.
///
@ -309,11 +327,6 @@ class SchedulingQueue {
/// \return All the tasks that have the given actor ID.
std::unordered_set<TaskID> GetTaskIdsForActor(const ActorID &actor_id) const;
/// \brief Return all resource demand associated with the ready queue.
///
/// \return Aggregate resource demand from ready tasks.
ResourceSet GetReadyQueueResources() const;
/// Returns the number of running tasks in this class.
///
/// \return int.