From 37a9c5783c925c8c804472bf2a36920a12c764c9 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 31 Jul 2020 16:57:30 -0700 Subject: [PATCH] [core] Report resource load by shape (#9806) * Report and aggregate resource load by shape * python test * python test * x * update --- python/ray/tests/test_global_state.py | 85 ++++++++++++++++++++ src/ray/common/ray_config_def.h | 4 + src/ray/common/task/task_spec.cc | 40 ++++++---- src/ray/common/task/task_spec.h | 4 + src/ray/gcs/gcs_server/gcs_node_manager.cc | 27 ++++++- src/ray/protobuf/gcs.proto | 29 ++++++- src/ray/raylet/node_manager.cc | 15 ++-- src/ray/raylet/node_manager.h | 3 + src/ray/raylet/scheduling_queue.cc | 90 +++++++++++++++++++--- src/ray/raylet/scheduling_queue.h | 33 +++++--- 10 files changed, 283 insertions(+), 47 deletions(-) diff --git a/python/ray/tests/test_global_state.py b/python/ray/tests/test_global_state.py index e691f99df..06152cca9 100644 --- a/python/ray/tests/test_global_state.py +++ b/python/ray/tests/test_global_state.py @@ -1,3 +1,4 @@ +import json import pytest try: import pytest_timeout @@ -6,6 +7,8 @@ except ImportError: import time import ray +import ray.ray_constants +import ray.test_utils # TODO(rliaw): The proper way to do this is to have the pytest config setup. @@ -130,6 +133,88 @@ def test_global_state_actor_entry(ray_start_regular): actor_id=b_actor_id)["State"] == ray.gcs_utils.ActorTableData.ALIVE +@pytest.mark.parametrize("max_shapes", [0, 2, -1]) +def test_load_report(shutdown_only, max_shapes): + resource1 = "A" + resource2 = "B" + cluster = ray.init( + num_cpus=1, + resources={resource1: 1}, + _internal_config=json.dumps({ + "max_resource_shapes_per_load_report": max_shapes, + })) + redis = ray.services.create_redis_client( + cluster["redis_address"], + password=ray.ray_constants.REDIS_DEFAULT_PASSWORD) + client = redis.pubsub(ignore_subscribe_messages=True) + client.psubscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN) + + @ray.remote + def sleep(): + time.sleep(1000) + + sleep.remote() + for _ in range(3): + sleep.remote() + sleep.options(resources={resource1: 1}).remote() + sleep.options(resources={resource2: 1}).remote() + + class Checker: + def __init__(self): + self.report = None + + def check_load_report(self): + try: + message = client.get_message() + except redis.exceptions.ConnectionError: + pass + if message is None: + return False + + pattern = message["pattern"] + data = message["data"] + if pattern != ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN: + return False + + pub_message = ray.gcs_utils.PubSubMessage.FromString(data) + heartbeat_data = pub_message.data + heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString( + heartbeat_data) + self.report = heartbeat.resource_load_by_shape.resource_demands + if max_shapes == 0: + return True + elif max_shapes == 2: + return len(self.report) >= 2 + else: + return len(self.report) >= 3 + + # Wait for load information to arrive. + checker = Checker() + ray.test_utils.wait_for_condition(checker.check_load_report) + + # Check that we respect the max shapes limit. + if max_shapes != -1: + assert len(checker.report) <= max_shapes + + if max_shapes > 0: + # Check that we always include the 1-CPU resource shape. + one_cpu_shape = {"CPU": 1} + one_cpu_found = False + for demand in checker.report: + if demand.shape == one_cpu_shape: + one_cpu_found = True + assert one_cpu_found + + # Check that we differentiate between infeasible and ready tasks. + for demand in checker.report: + if resource2 in demand.shape: + assert demand.num_infeasible_requests_queued > 0 + assert demand.num_ready_requests_queued == 0 + else: + assert demand.num_ready_requests_queued > 0 + assert demand.num_infeasible_requests_queued == 0 + + if __name__ == "__main__": import pytest import sys diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index c298f43ec..dca49754e 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -333,3 +333,7 @@ RAY_CONFIG(int, metrics_agent_port, -1) /// the owner has been granted a lease. A value >1 is used when we want to enable /// pipelining task submission. RAY_CONFIG(uint32_t, max_tasks_in_flight_per_worker, 1) + +/// The maximum number of resource shapes included in the resource +/// load reported by each raylet. +RAY_CONFIG(int64_t, max_resource_shapes_per_load_report, 100) diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 572ba58ec..e7de6cb3a 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -21,6 +21,28 @@ SchedulingClassDescriptor &TaskSpecification::GetSchedulingClassDescriptor( return it->second; } +SchedulingClass TaskSpecification::GetSchedulingClass(const ResourceSet &sched_cls) { + SchedulingClass sched_cls_id; + absl::MutexLock lock(&mutex_); + auto it = sched_cls_to_id_.find(sched_cls); + if (it == sched_cls_to_id_.end()) { + sched_cls_id = ++next_sched_id_; + // TODO(ekl) we might want to try cleaning up task types in these cases + if (sched_cls_id > 100) { + RAY_LOG(WARNING) << "More than " << sched_cls_id + << " types of tasks seen, this may reduce performance."; + } else if (sched_cls_id > 1000) { + RAY_LOG(ERROR) << "More than " << sched_cls_id + << " types of tasks seen, this may reduce performance."; + } + sched_cls_to_id_[sched_cls] = sched_cls_id; + sched_id_to_cls_[sched_cls_id] = sched_cls; + } else { + sched_cls_id = it->second; + } + return sched_cls_id; +} + void TaskSpecification::ComputeResources() { auto required_resources = MapFromProtobuf(message_->required_resources()); auto required_placement_resources = @@ -48,23 +70,7 @@ void TaskSpecification::ComputeResources() { // Map the scheduling class descriptor to an integer for performance. auto sched_cls = GetRequiredResources(); - absl::MutexLock lock(&mutex_); - auto it = sched_cls_to_id_.find(sched_cls); - if (it == sched_cls_to_id_.end()) { - sched_cls_id_ = ++next_sched_id_; - // TODO(ekl) we might want to try cleaning up task types in these cases - if (sched_cls_id_ > 100) { - RAY_LOG(WARNING) << "More than " << sched_cls_id_ - << " types of tasks seen, this may reduce performance."; - } else if (sched_cls_id_ > 1000) { - RAY_LOG(ERROR) << "More than " << sched_cls_id_ - << " types of tasks seen, this may reduce performance."; - } - sched_cls_to_id_[sched_cls] = sched_cls_id_; - sched_id_to_cls_[sched_cls_id_] = sched_cls; - } else { - sched_cls_id_ = it->second; - } + sched_cls_id_ = GetSchedulingClass(sched_cls); } } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index ccd7db733..8e963f433 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -183,8 +183,12 @@ class TaskSpecification : public MessageWrapper { // A one-word summary of the task func as a call site (e.g., __main__.foo). std::string CallSiteString() const; + // Lookup the resource shape that corresponds to the static key. static SchedulingClassDescriptor &GetSchedulingClassDescriptor(SchedulingClass id); + // Compute a static key that represents the given resource shape. + static SchedulingClass GetSchedulingClass(const ResourceSet &sched_cls); + private: void ComputeResources(); diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 29c3739b3..c6652cb1c 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -89,8 +89,31 @@ void GcsNodeManager::NodeFailureDetector::DetectDeadNodes() { void GcsNodeManager::NodeFailureDetector::SendBatchedHeartbeat() { if (!heartbeat_buffer_.empty()) { auto batch = std::make_shared(); - for (const auto &heartbeat : heartbeat_buffer_) { - batch->add_batch()->CopyFrom(heartbeat.second); + std::unordered_map aggregate_load; + for (auto &heartbeat : heartbeat_buffer_) { + // Aggregate the load reported by each raylet. + auto load = heartbeat.second.resource_load_by_shape(); + for (const auto &demand : load.resource_demands()) { + auto scheduling_key = ResourceSet(MapFromProtobuf(demand.shape())); + auto &aggregate_demand = aggregate_load[scheduling_key]; + aggregate_demand.set_num_ready_requests_queued( + aggregate_demand.num_ready_requests_queued() + + demand.num_ready_requests_queued()); + aggregate_demand.set_num_infeasible_requests_queued( + aggregate_demand.num_infeasible_requests_queued() + + demand.num_infeasible_requests_queued()); + } + heartbeat.second.clear_resource_load_by_shape(); + + batch->add_batch()->Swap(&heartbeat.second); + } + + for (auto &demand : aggregate_load) { + auto demand_proto = batch->mutable_resource_load_by_shape()->add_resource_demands(); + demand_proto->Swap(&demand.second); + for (const auto &resource_pair : demand.first.GetResourceMap()) { + (*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second; + } } RAY_CHECK_OK(gcs_pub_sub_->Publish(HEARTBEAT_BATCH_CHANNEL, "", diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index d96cd3dc0..af090c7bc 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -253,6 +253,26 @@ message GcsNodeInfo { string node_manager_hostname = 8; } +// Represents the demand for a particular resource shape. +message ResourceDemand { + // The resource shape requested. This is a map from the resource string + // (e.g., "CPU") to the amount requested. + map shape = 1; + // The number of requests that are ready to run (i.e., dependencies have been + // fulfilled), but that are waiting for resources. + uint64 num_ready_requests_queued = 2; + // The number of requests for which there is no node that is a superset of + // the requested resource shape. + uint64 num_infeasible_requests_queued = 3; +} + +// Represents the demand sorted by resource shape. +message ResourceLoad { + // A list of all resource demands. The resource shape in each demand is + // unique. + repeated ResourceDemand resource_demands = 1; +} + message HeartbeatTableData { // Node manager client id bytes client_id = 1; @@ -262,14 +282,19 @@ message HeartbeatTableData { map resources_total = 3; // Aggregate outstanding resource load on this node manager. map resource_load = 4; + // The resource load on this node, sorted by resource shape. + ResourceLoad resource_load_by_shape = 5; // Object IDs that are in use by workers on this node manager's node. - repeated bytes active_object_id = 5; + repeated bytes active_object_id = 6; // Whether this node manager is requesting global GC. - bool should_global_gc = 6; + bool should_global_gc = 7; } message HeartbeatBatchTableData { repeated HeartbeatTableData batch = 1; + // The total resource demand on all nodes included in the batch, sorted by + // resource shape. + ResourceLoad resource_load_by_shape = 2; } // Data for a lease on task execution. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 167d992c7..3ab693009 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -418,7 +418,7 @@ void NodeManager::Heartbeat() { ResourceSet(local_resources.GetTotalResources())); } - local_resources.SetLoadResources(local_queues_.GetResourceLoad()); + local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); if (!last_heartbeat_resources_.GetLoadResources().IsEqual( local_resources.GetLoadResources())) { for (const auto &resource_pair : @@ -447,7 +447,7 @@ void NodeManager::Heartbeat() { last_heartbeat_resources_.SetTotalResources( ResourceSet(local_resources.GetTotalResources())); - local_resources.SetLoadResources(local_queues_.GetResourceLoad()); + local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); for (const auto &resource_pair : local_resources.GetLoadResources().GetResourceMap()) { (*heartbeat_data->mutable_resource_load())[resource_pair.first] = @@ -457,6 +457,11 @@ void NodeManager::Heartbeat() { ResourceSet(local_resources.GetLoadResources())); } + // Add resource load by shape. This will be used by the new autoscaler. + auto resource_load = local_queues_.GetResourceLoadByShape( + RayConfig::instance().max_resource_shapes_per_load_report()); + heartbeat_data->mutable_resource_load_by_shape()->Swap(&resource_load); + // Set the global gc bit on the outgoing heartbeat message. if (should_global_gc_) { heartbeat_data->set_should_global_gc(true); @@ -1367,7 +1372,7 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & ScheduleAndDispatch(); } else { cluster_resource_map_[self_node_id_].SetLoadResources( - local_queues_.GetResourceLoad()); + local_queues_.GetTotalResourceLoad()); // Call task dispatch to assign work to the new worker. DispatchTasks(local_queues_.GetReadyTasksByClass()); } @@ -1990,7 +1995,7 @@ ResourceIdSet NodeManager::ScheduleBundle( const BundleSpecification &bundle_spec) { // If the resource map contains the local raylet, update load before calling policy. if (resource_map.count(self_node_id_) > 0) { - resource_map[self_node_id_].SetLoadResources(local_queues_.GetResourceLoad()); + resource_map[self_node_id_].SetLoadResources(local_queues_.GetTotalResourceLoad()); } // Invoke the scheduling policy. auto reserve_resource_success = @@ -2015,7 +2020,7 @@ void NodeManager::ScheduleTasks( std::unordered_map &resource_map) { // If the resource map contains the local raylet, update load before calling policy. if (resource_map.count(self_node_id_) > 0) { - resource_map[self_node_id_].SetLoadResources(local_queues_.GetResourceLoad()); + resource_map[self_node_id_].SetLoadResources(local_queues_.GetTotalResourceLoad()); } // Invoke the scheduling policy. auto policy_decision = scheduling_policy_.Schedule(resource_map, self_node_id_); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index bb094f485..9cd556813 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -723,6 +723,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// The time that we last sent a FreeObjects request to other nodes for /// objects that have gone out of scope in the application. uint64_t last_free_objects_at_ms_; + /// The number of heartbeats that we should wait before sending the + /// next load report. + uint8_t num_heartbeats_before_load_report_; /// Initial node manager configuration. const NodeManagerConfig initial_config_; /// The resources (and specific resource IDs) that are currently available. diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 77224e6b8..4b3137951 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -70,7 +70,8 @@ bool TaskQueue::AppendTask(const TaskID &task_id, const Task &task) { auto list_iterator = task_list_.insert(task_list_.end(), task); task_map_[task_id] = list_iterator; // Resource bookkeeping - current_resource_load_.AddResources(task.GetTaskSpecification().GetRequiredResources()); + total_resource_load_.AddResources(task.GetTaskSpecification().GetRequiredResources()); + resource_load_by_shape_[task.GetTaskSpecification().GetSchedulingClass()]++; return true; } @@ -80,15 +81,20 @@ bool TaskQueue::RemoveTask(const TaskID &task_id, std::vector *removed_tas return false; } - auto list_iterator = task_found_iterator->second; + auto it = task_found_iterator->second; // Resource bookkeeping - current_resource_load_.SubtractResourcesStrict( - list_iterator->GetTaskSpecification().GetRequiredResources()); + total_resource_load_.SubtractResourcesStrict( + it->GetTaskSpecification().GetRequiredResources()); + auto scheduling_class = it->GetTaskSpecification().GetSchedulingClass(); + resource_load_by_shape_[scheduling_class]--; + if (resource_load_by_shape_[scheduling_class] == 0) { + resource_load_by_shape_.erase(scheduling_class); + } if (removed_tasks) { - removed_tasks->push_back(std::move(*list_iterator)); + removed_tasks->push_back(std::move(*it)); } task_map_.erase(task_found_iterator); - task_list_.erase(list_iterator); + task_list_.erase(it); return true; } @@ -104,8 +110,13 @@ const Task &TaskQueue::GetTask(const TaskID &task_id) const { return *it->second; } -const ResourceSet &TaskQueue::GetCurrentResourceLoad() const { - return current_resource_load_; +const ResourceSet &TaskQueue::GetTotalResourceLoad() const { + return total_resource_load_; +} + +const std::unordered_map &TaskQueue::GetResourceLoadByShape() + const { + return resource_load_by_shape_; } bool ReadyQueue::AppendTask(const TaskID &task_id, const Task &task) { @@ -144,14 +155,71 @@ const Task &SchedulingQueue::GetTaskOfState(const TaskID &task_id, return queue->GetTask(task_id); } -ResourceSet SchedulingQueue::GetResourceLoad() const { - auto load = ready_queue_->GetCurrentResourceLoad(); +ResourceSet SchedulingQueue::GetTotalResourceLoad() const { + auto load = ready_queue_->GetTotalResourceLoad(); // Also take into account infeasible tasks so they show up for autoscaling. load.AddResources( - task_queues_[static_cast(TaskState::INFEASIBLE)]->GetCurrentResourceLoad()); + task_queues_[static_cast(TaskState::INFEASIBLE)]->GetTotalResourceLoad()); return load; } +rpc::ResourceLoad SchedulingQueue::GetResourceLoadByShape(int64_t max_shapes) const { + std::unordered_map load; + auto infeasible_queue_load = + task_queues_[static_cast(TaskState::INFEASIBLE)]->GetResourceLoadByShape(); + auto ready_queue_load = ready_queue_->GetResourceLoadByShape(); + size_t max_shapes_to_add = ready_queue_load.size() + infeasible_queue_load.size(); + if (max_shapes >= 0) { + max_shapes_to_add = max_shapes; + } + + // Always collect the 1-CPU resource shape stats, if the specified max shapes + // allows. + static const ResourceSet one_cpu_resource_set( + std::unordered_map({{kCPU_ResourceLabel, 1}})); + static const SchedulingClass one_cpu_scheduling_cls( + TaskSpecification::GetSchedulingClass(one_cpu_resource_set)); + if (max_shapes_to_add > 0) { + if (infeasible_queue_load.count(one_cpu_scheduling_cls) > 0) { + load[one_cpu_scheduling_cls].set_num_infeasible_requests_queued( + infeasible_queue_load.at(one_cpu_scheduling_cls)); + } + if (ready_queue_load.count(one_cpu_scheduling_cls) > 0) { + load[one_cpu_scheduling_cls].set_num_ready_requests_queued( + ready_queue_load.at(one_cpu_scheduling_cls)); + } + } + + // Collect the infeasible queue's load. + auto infeasible_it = infeasible_queue_load.begin(); + while (infeasible_it != infeasible_queue_load.end() && + load.size() < max_shapes_to_add) { + load[infeasible_it->first].set_num_infeasible_requests_queued(infeasible_it->second); + infeasible_it++; + } + + // Collect the ready queue's load. + auto ready_it = ready_queue_load.begin(); + while (ready_it != ready_queue_load.end() && load.size() < max_shapes_to_add) { + load[ready_it->first].set_num_ready_requests_queued(ready_it->second); + ready_it++; + } + + // Set the resource shapes. + rpc::ResourceLoad load_proto; + for (auto &demand : load) { + auto demand_proto = load_proto.add_resource_demands(); + demand_proto->Swap(&demand.second); + const auto &resource_map = + TaskSpecification::GetSchedulingClassDescriptor(demand.first).GetResourceMap(); + for (const auto &resource_pair : resource_map) { + (*demand_proto->mutable_shape())[resource_pair.first] = resource_pair.second; + } + } + + return load_proto; +} + const std::unordered_set &SchedulingQueue::GetBlockedTaskIds() const { return blocked_task_ids_; } diff --git a/src/ray/raylet/scheduling_queue.h b/src/ray/raylet/scheduling_queue.h index 029998827..2e32ecdbe 100644 --- a/src/ray/raylet/scheduling_queue.h +++ b/src/ray/raylet/scheduling_queue.h @@ -23,6 +23,7 @@ #include "ray/common/task/task.h" #include "ray/util/logging.h" #include "ray/util/ordered_set.h" +#include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -106,10 +107,16 @@ class TaskQueue { /// \return The task. const Task &GetTask(const TaskID &task_id) const; - /// \brief Get the total resources required by the tasks in the queue. + /// \brief Return all resource demand associated with the ready queue. /// - /// \return Total resources required by the tasks in the queue. - const ResourceSet &GetCurrentResourceLoad() const; + /// \return Aggregate resource demand from ready tasks. + const ResourceSet &GetTotalResourceLoad() const; + + /// \brief Get the resources required by the tasks in the queue. + /// + /// \return A map from resource shape key to the number of tasks queued that + /// require that shape. + const std::unordered_map &GetResourceLoadByShape() const; protected: /// A list of tasks. @@ -117,7 +124,11 @@ class TaskQueue { /// A hash to speed up looking up a task. std::unordered_map::iterator> task_map_; /// Aggregate resources of all the tasks in this queue. - ResourceSet current_resource_load_; + ResourceSet total_resource_load_; + /// Required resources for all the tasks in this queue. This is a + /// map from resource shape key to number of tasks queued that require that + /// shape. + std::unordered_map resource_load_by_shape_; }; class ReadyQueue : public TaskQueue { @@ -210,7 +221,14 @@ class SchedulingQueue { /// /// \return A resource set with aggregate resource information about resource load on /// this raylet. - ResourceSet GetResourceLoad() const; + ResourceSet GetTotalResourceLoad() const; + + /// \brief Return a summary of the requests in the ready and infeasible + /// queues. + /// + /// \return A message summarizing the number of requests, sorted by shape, in + /// the ready and infeasible queues. + rpc::ResourceLoad GetResourceLoadByShape(int64_t max_shapes = -1) const; /// Get the tasks in the blocked state. /// @@ -309,11 +327,6 @@ class SchedulingQueue { /// \return All the tasks that have the given actor ID. std::unordered_set GetTaskIdsForActor(const ActorID &actor_id) const; - /// \brief Return all resource demand associated with the ready queue. - /// - /// \return Aggregate resource demand from ready tasks. - ResourceSet GetReadyQueueResources() const; - /// Returns the number of running tasks in this class. /// /// \return int.