Refactor node manager to eliminate new_scheduler_enabled_ (#12936)

This commit is contained in:
ZhuSenlin 2021-01-18 00:15:35 +08:00 committed by GitHub
parent 2cd51ce608
commit a4ebdbd7da
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1433 additions and 759 deletions

View file

@ -141,13 +141,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
config.max_worker_port, config.worker_ports, gcs_client_, config.max_worker_port, config.worker_ports, gcs_client_,
config.worker_commands, config.raylet_config, config.worker_commands, config.raylet_config,
/*starting_worker_timeout_callback=*/ /*starting_worker_timeout_callback=*/
[this]() { [this] { cluster_task_manager_->ScheduleAndDispatchTasks(); }),
if (RayConfig::instance().new_scheduler_enabled()) {
ScheduleAndDispatch();
} else {
this->DispatchTasks(this->local_queues_.GetReadyTasksByClass());
}
}),
scheduling_policy_(local_queues_), scheduling_policy_(local_queues_),
reconstruction_policy_( reconstruction_policy_(
io_service_, io_service_,
@ -177,7 +171,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
/*local_only=*/false); /*local_only=*/false);
}, },
is_plasma_object_spillable), is_plasma_object_spillable),
new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()), report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
last_local_gc_ns_(absl::GetCurrentTimeNanos()), last_local_gc_ns_(absl::GetCurrentTimeNanos()),
local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9),
@ -197,9 +190,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( RAY_CHECK_OK(object_manager_.SubscribeObjDeleted(
[this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); [this](const ObjectID &object_id) { HandleObjectMissing(object_id); }));
if (new_scheduler_enabled_) { if (RayConfig::instance().new_scheduler_enabled()) {
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
new_resource_scheduler_ = cluster_resource_scheduler_ =
std::shared_ptr<ClusterResourceScheduler>(new ClusterResourceScheduler( std::shared_ptr<ClusterResourceScheduler>(new ClusterResourceScheduler(
self_node_id_.Binary(), self_node_id_.Binary(),
local_resources.GetTotalResources().GetResourceMap())); local_resources.GetTotalResources().GetResourceMap()));
@ -216,11 +209,23 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
PublishInfeasibleTaskError(task); PublishInfeasibleTaskError(task);
}; };
cluster_task_manager_ = std::shared_ptr<ClusterTaskManager>(new ClusterTaskManager( cluster_task_manager_ = std::shared_ptr<ClusterTaskManager>(new ClusterTaskManager(
self_node_id_, new_resource_scheduler_, dependency_manager_, is_owner_alive, self_node_id_,
get_node_info_func, announce_infeasible_task)); std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
dependency_manager_, is_owner_alive, get_node_info_func, announce_infeasible_task,
worker_pool_, leased_workers_));
placement_group_resource_manager_ = placement_group_resource_manager_ =
std::make_shared<NewPlacementGroupResourceManager>(new_resource_scheduler_); std::make_shared<NewPlacementGroupResourceManager>(
std::dynamic_pointer_cast<ClusterResourceScheduler>(
cluster_resource_scheduler_));
} else { } else {
cluster_resource_scheduler_ = std::make_shared<OldClusterResourceScheduler>(
self_node_id_, local_available_resources_, cluster_resource_map_,
gcs_client_->NodeResources().GetLastResourceUsage());
cluster_task_manager_ = std::shared_ptr<NodeManager>(this, [](NodeManager *) {
// If new scheduler is not enabled, the `cluster_task_manager_` is this NodeManager
// itself. So we create a `shared_ptr` that points to `this` and the deleter does
// nothing.
});
placement_group_resource_manager_ = placement_group_resource_manager_ =
std::make_shared<OldPlacementGroupResourceManager>( std::make_shared<OldPlacementGroupResourceManager>(
local_available_resources_, cluster_resource_map_, self_node_id_); local_available_resources_, cluster_resource_map_, self_node_id_);
@ -362,11 +367,7 @@ void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_
// Tasks of this job may already arrived but failed to pop a worker because the job // Tasks of this job may already arrived but failed to pop a worker because the job
// config is not local yet. So we trigger dispatching again here to try to // config is not local yet. So we trigger dispatching again here to try to
// reschedule these tasks. // reschedule these tasks.
if (new_scheduler_enabled_) { cluster_task_manager_->ScheduleAndDispatchTasks();
ScheduleAndDispatch();
} else {
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
} }
void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job_data) { void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job_data) {
@ -439,65 +440,13 @@ void NodeManager::Heartbeat() {
void NodeManager::ReportResourceUsage() { void NodeManager::ReportResourceUsage() {
auto resources_data = std::make_shared<rpc::ResourcesData>(); auto resources_data = std::make_shared<rpc::ResourcesData>();
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
resources_data->set_node_id(self_node_id_.Binary()); resources_data->set_node_id(self_node_id_.Binary());
// Update local chche from gcs remote cache, this is needed when gcs restart.
if (new_scheduler_enabled_) { // We should always keep the cache view consistent.
// Update local chche from gcs remote cache, this is needed when gcs restart. cluster_resource_scheduler_->UpdateLastResourceUsage(
// We should always keep the cache view consistent. gcs_client_->NodeResources().GetLastResourceUsage());
new_resource_scheduler_->UpdateLastResourceUsage( cluster_resource_scheduler_->FillResourceUsage(resources_data);
gcs_client_->NodeResources().GetLastResourceUsage()); cluster_task_manager_->FillResourceUsage(resources_data);
new_resource_scheduler_->FillResourceUsage(resources_data);
cluster_task_manager_->FillResourceUsage(resources_data);
} else {
// TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet
// directly.
// TODO(atumanov): implement a ResourceSet const_iterator.
// We only set fileds that changed.
auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage();
if (!last_heartbeat_resources->GetTotalResources().IsEqual(
local_resources.GetTotalResources())) {
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
}
if (!last_heartbeat_resources->GetAvailableResources().IsEqual(
local_resources.GetAvailableResources())) {
resources_data->set_resources_available_changed(true);
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
}
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
if (!last_heartbeat_resources->GetLoadResources().IsEqual(
local_resources.GetLoadResources())) {
resources_data->set_resource_load_changed(true);
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
}
}
if (!new_scheduler_enabled_) {
// Add resource load by shape. This will be used by the new autoscaler.
auto resource_load = local_queues_.GetResourceLoadByShape(
RayConfig::instance().max_resource_shapes_per_load_report());
resources_data->mutable_resource_load_by_shape()->Swap(&resource_load);
}
// Set the global gc bit on the outgoing heartbeat message. // Set the global gc bit on the outgoing heartbeat message.
if (should_global_gc_) { if (should_global_gc_) {
@ -627,32 +576,14 @@ void NodeManager::WarnResourceDeadlock() {
} }
} }
if (new_scheduler_enabled_) { // Check if any tasks are blocked on resource acquisition.
// Check if any tasks are blocked on resource acquisition. if (!cluster_task_manager_->AnyPendingTasks(&exemplar, &any_pending,
if (!cluster_task_manager_->AnyPendingTasks( &pending_actor_creations, &pending_tasks)) {
&exemplar, &any_pending, &pending_actor_creations, &pending_tasks)) { // No pending tasks, no need to warn.
// No pending tasks, no need to warn. resource_deadlock_warned_ = 0;
resource_deadlock_warned_ = 0; return;
return;
}
available_resources = new_resource_scheduler_->GetLocalResourceViewString();
} else {
// See if any tasks are blocked trying to acquire resources.
for (const auto &task : local_queues_.GetTasks(TaskState::READY)) {
const TaskSpecification &spec = task.GetTaskSpecification();
if (spec.IsActorCreationTask()) {
pending_actor_creations += 1;
} else {
pending_tasks += 1;
}
if (!any_pending) {
exemplar = task;
any_pending = true;
}
}
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
available_resources = local_resources.GetAvailableResources().ToString();
} }
available_resources = cluster_resource_scheduler_->GetLocalResourceViewString();
// Push an warning to the driver that a task is blocked trying to acquire resources. // Push an warning to the driver that a task is blocked trying to acquire resources.
// To avoid spurious triggers, only take action starting with the second time. // To avoid spurious triggers, only take action starting with the second time.
@ -767,21 +698,12 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) {
// not be necessary. // not be necessary.
// Remove the node from the resource map. // Remove the node from the resource map.
if (0 == cluster_resource_map_.erase(node_id)) { if (!cluster_resource_scheduler_->RemoveNode(node_id.Binary())) {
RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id
<< "."; << ".";
return; return;
} }
// Remove the node from the resource map.
if (new_scheduler_enabled_) {
if (!new_resource_scheduler_->RemoveNode(node_id.Binary())) {
RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id
<< ".";
return;
}
}
// Remove the node manager address. // Remove the node manager address.
const auto node_entry = remote_node_manager_addresses_.find(node_id); const auto node_entry = remote_node_manager_addresses_.find(node_id);
if (node_entry != remote_node_manager_addresses_.end()) { if (node_entry != remote_node_manager_addresses_.end()) {
@ -849,30 +771,19 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
<< " with created or updated resources: " << " with created or updated resources: "
<< createUpdatedResources.ToString() << ". Updating resource map."; << createUpdatedResources.ToString() << ". Updating resource map.";
SchedulingResources &cluster_schedres = cluster_resource_map_[node_id];
// Update local_available_resources_ and SchedulingResources // Update local_available_resources_ and SchedulingResources
for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) { for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) {
const std::string &resource_label = resource_pair.first; const std::string &resource_label = resource_pair.first;
const double &new_resource_capacity = resource_pair.second; const double &new_resource_capacity = resource_pair.second;
cluster_resource_scheduler_->UpdateResourceCapacity(node_id.Binary(), resource_label,
cluster_schedres.UpdateResourceCapacity(resource_label, new_resource_capacity); new_resource_capacity);
if (node_id == self_node_id_) {
local_available_resources_.AddOrUpdateResource(resource_label,
new_resource_capacity);
}
if (new_scheduler_enabled_) {
new_resource_scheduler_->UpdateResourceCapacity(node_id.Binary(), resource_label,
new_resource_capacity);
}
} }
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map."; RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map.";
if (node_id == self_node_id_) { if (node_id == self_node_id_) {
// The resource update is on the local node, check if we can reschedule tasks. // The resource update is on the local node, check if we can reschedule tasks.
TryLocalInfeasibleTaskScheduling(); cluster_task_manager_->ScheduleInfeasibleTasks();
} }
return;
} }
void NodeManager::ResourceDeleted(const NodeID &node_id, void NodeManager::ResourceDeleted(const NodeID &node_id,
@ -887,17 +798,9 @@ void NodeManager::ResourceDeleted(const NodeID &node_id,
<< ". Updating resource map."; << ". Updating resource map.";
} }
SchedulingResources &cluster_schedres = cluster_resource_map_[node_id];
// Update local_available_resources_ and SchedulingResources // Update local_available_resources_ and SchedulingResources
for (const auto &resource_label : resource_names) { for (const auto &resource_label : resource_names) {
cluster_schedres.DeleteResource(resource_label); cluster_resource_scheduler_->DeleteResource(node_id.Binary(), resource_label);
if (node_id == self_node_id_) {
local_available_resources_.DeleteResource(resource_label);
}
if (new_scheduler_enabled_) {
new_resource_scheduler_->DeleteResource(node_id.Binary(), resource_label);
}
} }
return; return;
} }
@ -925,71 +828,21 @@ void NodeManager::TryLocalInfeasibleTaskScheduling() {
void NodeManager::UpdateResourceUsage(const NodeID &node_id, void NodeManager::UpdateResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resource_data) { const rpc::ResourcesData &resource_data) {
// Locate the node id in remote node table and update available resources based on if (!cluster_resource_scheduler_->UpdateNode(node_id.Binary(), resource_data)) {
// the received resource usage information.
auto it = cluster_resource_map_.find(node_id);
if (it == cluster_resource_map_.end()) {
// Haven't received the node registration for this node yet, skip this message.
RAY_LOG(INFO) RAY_LOG(INFO)
<< "[UpdateResourceUsage]: received resource usage from unknown node id " << "[UpdateResourceUsage]: received resource usage from unknown node id "
<< node_id; << node_id;
return; return;
} }
// Trigger local GC at the next heartbeat interval. // Trigger local GC at the next heartbeat interval.
if (resource_data.should_global_gc()) { if (resource_data.should_global_gc()) {
should_local_gc_ = true; should_local_gc_ = true;
} }
SchedulingResources &remote_resources = it->second; // If light resource usage report enabled, we update remote resources only when related
// resources map in heartbeat is not empty.
// We update remote resources only when related cluster_task_manager_->OnNodeResourceUsageUpdated(node_id, resource_data);
// resources map in message changed.
if (resource_data.resources_total_size() > 0) {
ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total()));
remote_resources.SetTotalResources(std::move(remote_total));
}
if (resource_data.resources_available_changed()) {
ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available()));
remote_resources.SetAvailableResources(std::move(remote_available));
}
if (resource_data.resource_load_changed()) {
ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load()));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));
}
if (new_scheduler_enabled_ && node_id != self_node_id_) {
new_resource_scheduler_->AddOrUpdateNode(
node_id.Binary(), remote_resources.GetTotalResources().GetResourceMap(),
remote_resources.GetAvailableResources().GetResourceMap());
// TODO(swang): We could probably call this once per batch instead of once
// per node in the batch.
ScheduleAndDispatch();
return;
}
// Extract decision for this raylet.
auto decision = scheduling_policy_.SpillOver(remote_resources,
cluster_resource_map_[self_node_id_]);
std::unordered_set<TaskID> local_task_ids;
for (const auto &task_id : decision) {
// (See design_docs/task_states.rst for the state transition diagram.)
Task task;
TaskState state;
if (!local_queues_.RemoveTask(task_id, &task, &state)) {
return;
}
// Since we are spilling back from the ready and waiting queues, we need
// to unsubscribe the dependencies.
if (state != TaskState::INFEASIBLE) {
// Don't unsubscribe for infeasible tasks because we never subscribed in
// the first place.
dependency_manager_.RemoveTaskDependencies(task_id);
}
// Attempt to forward the task. If this fails to forward the task,
// the task will be resubmit locally.
ForwardTaskOrResubmit(task, node_id);
}
} }
void NodeManager::ResourceUsageBatchReceived( void NodeManager::ResourceUsageBatchReceived(
@ -1012,8 +865,8 @@ void NodeManager::ProcessNewClient(ClientConnection &client) {
// A helper function to create a mapping from task scheduling class to // A helper function to create a mapping from task scheduling class to
// tasks with that class from a given list of tasks. // tasks with that class from a given list of tasks.
std::unordered_map<SchedulingClass, ordered_set<TaskID>> MakeTasksByClass( std::unordered_map<SchedulingClass, ordered_set<TaskID>> NodeManager::MakeTasksByClass(
const std::vector<Task> &tasks) { const std::vector<Task> &tasks) const {
std::unordered_map<SchedulingClass, ordered_set<TaskID>> result; std::unordered_map<SchedulingClass, ordered_set<TaskID>> result;
for (const auto &task : tasks) { for (const auto &task : tasks) {
auto spec = task.GetTaskSpecification(); auto spec = task.GetTaskSpecification();
@ -1028,7 +881,6 @@ void NodeManager::DispatchTasks(
// one class of tasks become stuck behind others in the queue, causing Ray to start // one class of tasks become stuck behind others in the queue, causing Ray to start
// many workers. See #3644 for a more detailed description of this issue. // many workers. See #3644 for a more detailed description of this issue.
std::vector<const std::pair<const SchedulingClass, ordered_set<TaskID>> *> fair_order; std::vector<const std::pair<const SchedulingClass, ordered_set<TaskID>> *> fair_order;
RAY_CHECK(new_scheduler_enabled_ == false);
for (auto &it : tasks_by_class) { for (auto &it : tasks_by_class) {
fair_order.emplace_back(&it); fair_order.emplace_back(&it);
} }
@ -1231,7 +1083,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
// If the worker failed to register to Raylet, trigger task dispatching here to // If the worker failed to register to Raylet, trigger task dispatching here to
// allow new worker processes to be started (if capped by // allow new worker processes to be started (if capped by
// maximum_startup_concurrency). // maximum_startup_concurrency).
DispatchTasks(local_queues_.GetReadyTasksByClass()); cluster_task_manager_->ScheduleAndDispatchTasks();
} }
} else { } else {
// Register the new driver. // Register the new driver.
@ -1306,14 +1158,10 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
} }
// Local resource availability changed: invoke scheduling policy for local node. // Local resource availability changed: invoke scheduling policy for local node.
if (new_scheduler_enabled_) { cluster_resource_map_[self_node_id_].SetLoadResources(
ScheduleAndDispatch(); local_queues_.GetTotalResourceLoad());
} else {
cluster_resource_map_[self_node_id_].SetLoadResources( cluster_task_manager_->ScheduleAndDispatchTasks();
local_queues_.GetTotalResourceLoad());
// Call task dispatch to assign work to the new worker.
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
} }
void NodeManager::ProcessDisconnectClientMessage( void NodeManager::ProcessDisconnectClientMessage(
@ -1396,32 +1244,10 @@ void NodeManager::ProcessDisconnectClientMessage(
worker_pool_.DisconnectWorker(worker); worker_pool_.DisconnectWorker(worker);
// Return the resources that were being used by this worker. // Return the resources that were being used by this worker.
if (new_scheduler_enabled_) { cluster_task_manager_->ReleaseWorkerResources(worker);
new_resource_scheduler_->FreeLocalTaskResources(worker->GetAllocatedInstances());
worker->ClearAllocatedInstances();
new_resource_scheduler_->FreeLocalTaskResources(
worker->GetLifetimeAllocatedInstances());
worker->ClearLifetimeAllocatedInstances();
} else {
auto const &task_resources = worker->GetTaskResourceIds();
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet());
worker->ResetTaskResourceIds();
auto const &lifetime_resources = worker->GetLifetimeResourceIds();
local_available_resources_.ReleaseConstrained(
lifetime_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet());
worker->ResetLifetimeResourceIds();
}
// Since some resources may have been released, we can try to dispatch more tasks. YYY // Since some resources may have been released, we can try to dispatch more tasks. YYY
if (new_scheduler_enabled_) { cluster_task_manager_->ScheduleAndDispatchTasks();
ScheduleAndDispatch();
} else {
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
} else if (is_driver) { } else if (is_driver) {
// The client is a driver. // The client is a driver.
const auto job_id = worker->GetAssignedJobId(); const auto job_id = worker->GetAssignedJobId();
@ -1596,12 +1422,6 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) {
SubmitTask(Task(task_message)); SubmitTask(Task(task_message));
} }
void NodeManager::ScheduleAndDispatch() {
RAY_CHECK(new_scheduler_enabled_);
cluster_task_manager_->SchedulePendingTasks();
cluster_task_manager_->DispatchScheduledTasksToWorkers(worker_pool_, leased_workers_);
}
void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest &request, void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest &request,
rpc::RequestWorkerLeaseReply *reply, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) { rpc::SendReplyCallback send_reply_callback) {
@ -1632,77 +1452,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
worker_pool_.PrestartWorkers(task_spec, request.backlog_size()); worker_pool_.PrestartWorkers(task_spec, request.backlog_size());
} }
if (new_scheduler_enabled_) { cluster_task_manager_->QueueAndScheduleTask(task, reply, send_reply_callback);
auto task_spec = task.GetTaskSpecification();
cluster_task_manager_->QueueTask(task, reply, [send_reply_callback]() {
send_reply_callback(Status::OK(), nullptr, nullptr);
});
ScheduleAndDispatch();
return;
}
// Override the task dispatch to call back to the client instead of executing the
// task directly on the worker.
TaskID task_id = task.GetTaskSpecification().TaskId();
rpc::Address owner_address = task.GetTaskSpecification().CallerAddress();
task.OnDispatchInstead(
[this, owner_address, reply, send_reply_callback](
const std::shared_ptr<void> granted, const std::string &address, int port,
const WorkerID &worker_id, const ResourceIdSet &resource_ids) {
auto worker = std::static_pointer_cast<Worker>(granted);
uint32_t worker_pid = static_cast<uint32_t>(worker->GetProcess().GetId());
reply->mutable_worker_address()->set_ip_address(address);
reply->mutable_worker_address()->set_port(port);
reply->mutable_worker_address()->set_worker_id(worker_id.Binary());
reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary());
reply->set_worker_pid(worker_pid);
for (const auto &mapping : resource_ids.AvailableResources()) {
auto resource = reply->add_resource_mapping();
resource->set_name(mapping.first);
for (const auto &id : mapping.second.WholeIds()) {
auto rid = resource->add_resource_ids();
rid->set_index(id);
rid->set_quantity(1.0);
}
for (const auto &id : mapping.second.FractionalIds()) {
auto rid = resource->add_resource_ids();
rid->set_index(id.first);
rid->set_quantity(id.second.ToDouble());
}
}
auto reply_failure_handler = [this, worker_id]() {
RAY_LOG(WARNING)
<< "Failed to reply to GCS server, because it might have restarted. GCS "
"cannot obtain the information of the leased worker, so we need to "
"release the leased worker to avoid leakage.";
leased_workers_.erase(worker_id);
metrics_num_task_executed_ -= 1;
};
metrics_num_task_executed_ += 1;
send_reply_callback(Status::OK(), nullptr, reply_failure_handler);
RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end())
<< "Worker is already leased out " << worker_id;
leased_workers_[worker_id] = worker;
});
task.OnSpillbackInstead(
[this, reply, task_id, send_reply_callback](const NodeID &spillback_to,
const std::string &address, int port) {
RAY_LOG(DEBUG) << "Worker lease request SPILLBACK " << task_id;
reply->mutable_retry_at_raylet_address()->set_ip_address(address);
reply->mutable_retry_at_raylet_address()->set_port(port);
reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary());
metrics_num_task_spilled_back_ += 1;
send_reply_callback(Status::OK(), nullptr, nullptr);
});
task.OnCancellationInstead([reply, task_id, send_reply_callback]() {
RAY_LOG(DEBUG) << "Task lease request canceled " << task_id;
reply->set_canceled(true);
send_reply_callback(Status::OK(), nullptr, nullptr);
});
SubmitTask(task);
} }
void NodeManager::HandlePrepareBundleResources( void NodeManager::HandlePrepareBundleResources(
@ -1715,12 +1465,6 @@ void NodeManager::HandlePrepareBundleResources(
auto prepared = placement_group_resource_manager_->PrepareBundle(bundle_spec); auto prepared = placement_group_resource_manager_->PrepareBundle(bundle_spec);
reply->set_success(prepared); reply->set_success(prepared);
send_reply_callback(Status::OK(), nullptr, nullptr); send_reply_callback(Status::OK(), nullptr, nullptr);
if (!new_scheduler_enabled_) {
// Call task dispatch to assign work to the new group.
TryLocalInfeasibleTaskScheduling();
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
} }
void NodeManager::HandleCommitBundleResources( void NodeManager::HandleCommitBundleResources(
@ -1732,15 +1476,8 @@ void NodeManager::HandleCommitBundleResources(
placement_group_resource_manager_->CommitBundle(bundle_spec); placement_group_resource_manager_->CommitBundle(bundle_spec);
send_reply_callback(Status::OK(), nullptr, nullptr); send_reply_callback(Status::OK(), nullptr, nullptr);
if (new_scheduler_enabled_) { cluster_task_manager_->ScheduleInfeasibleTasks();
// Schedule in case a lease request for this placement group arrived before the commit cluster_task_manager_->ScheduleAndDispatchTasks();
// message.
ScheduleAndDispatch();
} else {
// Call task dispatch to assign work to the new group.
TryLocalInfeasibleTaskScheduling();
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
} }
void NodeManager::HandleCancelResourceReserve( void NodeManager::HandleCancelResourceReserve(
@ -1774,17 +1511,8 @@ void NodeManager::HandleCancelResourceReserve(
// Return bundle resources. // Return bundle resources.
placement_group_resource_manager_->ReturnBundle(bundle_spec); placement_group_resource_manager_->ReturnBundle(bundle_spec);
cluster_task_manager_->ScheduleInfeasibleTasks();
if (new_scheduler_enabled_) { cluster_task_manager_->ScheduleAndDispatchTasks();
// Schedule in case a lease request for this placement group arrived before the commit
// message.
ScheduleAndDispatch();
} else {
// Call task dispatch to assign work to the new group.
TryLocalInfeasibleTaskScheduling();
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
send_reply_callback(Status::OK(), nullptr, nullptr); send_reply_callback(Status::OK(), nullptr, nullptr);
} }
@ -1807,9 +1535,7 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
if (worker->IsBlocked()) { if (worker->IsBlocked()) {
HandleDirectCallTaskUnblocked(worker); HandleDirectCallTaskUnblocked(worker);
} }
if (new_scheduler_enabled_) { cluster_task_manager_->ReturnWorkerResources(worker);
cluster_task_manager_->HandleTaskFinished(worker);
}
HandleWorkerAvailable(worker); HandleWorkerAvailable(worker);
} }
} else { } else {
@ -1847,38 +1573,7 @@ void NodeManager::HandleCancelWorkerLease(const rpc::CancelWorkerLeaseRequest &r
rpc::CancelWorkerLeaseReply *reply, rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) { rpc::SendReplyCallback send_reply_callback) {
const TaskID task_id = TaskID::FromBinary(request.task_id()); const TaskID task_id = TaskID::FromBinary(request.task_id());
Task removed_task; bool canceled = cluster_task_manager_->CancelTask(task_id);
TaskState removed_task_state;
bool canceled;
if (new_scheduler_enabled_) {
canceled = cluster_task_manager_->CancelTask(task_id);
if (!canceled) {
// There are 2 cases here.
// 1. We haven't received the lease request yet. It's the caller's job to
// retry the cancellation once we've received the request.
// 2. We have already granted the lease. The caller is now responsible
// for returning the lease, not cancelling it.
}
} else {
canceled = local_queues_.RemoveTask(task_id, &removed_task, &removed_task_state);
if (!canceled) {
// We do not have the task. This could be because we haven't received the
// lease request yet, or because we already granted the lease request and
// it has already been returned.
} else {
if (removed_task.OnDispatch()) {
// We have not yet granted the worker lease. Cancel it now.
removed_task.OnCancellation()();
if (removed_task_state == TaskState::WAITING) {
dependency_manager_.RemoveTaskDependencies(task_id);
}
} else {
// We already granted the worker lease and sent the reply. Re-queue the
// task and wait for the requester to return the leased worker.
local_queues_.QueueTasks({removed_task}, removed_task_state);
}
}
}
// The task cancellation failed if we did not have the task queued, since // The task cancellation failed if we did not have the task queued, since
// this means that we may not have received the task request yet. It is // this means that we may not have received the task request yet. It is
// successful if we did have the task queued, since we have now replied to // successful if we did have the task queued, since we have now replied to
@ -2061,36 +1756,15 @@ void NodeManager::SubmitTask(const Task &task) {
void NodeManager::HandleDirectCallTaskBlocked( void NodeManager::HandleDirectCallTaskBlocked(
const std::shared_ptr<WorkerInterface> &worker, bool release_resources) { const std::shared_ptr<WorkerInterface> &worker, bool release_resources) {
if (new_scheduler_enabled_) { if (!worker || worker->IsBlocked() || worker->GetAssignedTaskId().IsNil() ||
if (!worker || worker->IsBlocked() || !release_resources) {
return;
}
std::vector<double> cpu_instances;
if (worker->GetAllocatedInstances() != nullptr) {
cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble();
}
if (cpu_instances.size() > 0) {
std::vector<double> overflow_cpu_instances =
new_resource_scheduler_->AddCPUResourceInstances(cpu_instances);
for (unsigned int i = 0; i < overflow_cpu_instances.size(); i++) {
RAY_CHECK(overflow_cpu_instances[i] == 0) << "Should not be overflow";
}
worker->MarkBlocked();
}
ScheduleAndDispatch();
return;
}
if (!worker || worker->GetAssignedTaskId().IsNil() || worker->IsBlocked() ||
!release_resources) { !release_resources) {
return; // The worker may have died or is no longer processing the task. return; // The worker may have died or is no longer processing the task.
} }
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
local_available_resources_.Release(cpu_resource_ids);
cluster_resource_map_[self_node_id_].Release(cpu_resource_ids.ToResourceSet());
worker->MarkBlocked(); if (cluster_task_manager_->ReleaseCpuResourcesFromUnblockedWorker(worker)) {
DispatchTasks(local_queues_.GetReadyTasksByClass()); worker->MarkBlocked();
}
cluster_task_manager_->ScheduleAndDispatchTasks();
} }
void NodeManager::HandleDirectCallTaskUnblocked( void NodeManager::HandleDirectCallTaskUnblocked(
@ -2098,57 +1772,17 @@ void NodeManager::HandleDirectCallTaskUnblocked(
if (!worker || worker->GetAssignedTaskId().IsNil()) { if (!worker || worker->GetAssignedTaskId().IsNil()) {
return; // The worker may have died or is no longer processing the task. return; // The worker may have died or is no longer processing the task.
} }
TaskID task_id = worker->GetAssignedTaskId();
// First, always release task dependencies. This ensures we don't leak resources even // First, always release task dependencies. This ensures we don't leak resources even
// if we don't need to unblock the worker below. // if we don't need to unblock the worker below.
dependency_manager_.CancelGetRequest(worker->WorkerId()); dependency_manager_.CancelGetRequest(worker->WorkerId());
if (new_scheduler_enabled_) { if (worker->IsBlocked()) {
// Important: avoid double unblocking if the unblock RPC finishes after task end. if (cluster_task_manager_->ReturnCpuResourcesToBlockedWorker(worker)) {
if (!worker || !worker->IsBlocked()) {
return;
}
std::vector<double> cpu_instances;
if (worker->GetAllocatedInstances() != nullptr) {
cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble();
}
if (cpu_instances.size() > 0) {
// Important: we allow going negative here, since otherwise you can use infinite
// CPU resources by repeatedly blocking / unblocking a task. By allowing it to go
// negative, at most one task can "borrow" this worker's resources.
new_resource_scheduler_->SubtractCPUResourceInstances(
cpu_instances, /*allow_going_negative=*/true);
worker->MarkUnblocked(); worker->MarkUnblocked();
} }
ScheduleAndDispatch(); cluster_task_manager_->ScheduleAndDispatchTasks();
return;
} }
if (!worker->IsBlocked()) {
return; // Don't need to unblock the worker.
}
Task task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING);
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
const ResourceSet cpu_resources = required_resources.GetNumCpus();
bool oversubscribed = !local_available_resources_.Contains(cpu_resources);
if (!oversubscribed) {
// Reacquire the CPU resources for the worker. Note that care needs to be
// taken if the user is using the specific CPU IDs since the IDs that we
// reacquire here may be different from the ones that the task started with.
auto const resource_ids = local_available_resources_.Acquire(cpu_resources);
worker->AcquireTaskCpuResources(resource_ids);
cluster_resource_map_[self_node_id_].Acquire(cpu_resources);
} else {
// In this case, we simply don't reacquire the CPU resources for the worker.
// The worker can keep running and when the task finishes, it will simply
// not have any CPU resources to release.
RAY_LOG(WARNING)
<< "Resources oversubscribed: "
<< cluster_resource_map_[self_node_id_].GetAvailableResources().ToString();
}
worker->MarkUnblocked();
} }
void NodeManager::AsyncResolveObjects( void NodeManager::AsyncResolveObjects(
@ -2348,23 +1982,7 @@ bool NodeManager::FinishAssignedTask(const std::shared_ptr<WorkerInterface> &wor
RAY_LOG(DEBUG) << "Finished task " << task_id; RAY_LOG(DEBUG) << "Finished task " << task_id;
Task task; Task task;
if (new_scheduler_enabled_) { cluster_task_manager_->TaskFinished(worker_ptr, &task);
task = worker.GetAssignedTask();
// leased_workers_.erase(worker.WorkerId()); // Maybe RAY_CHECK ?
if (worker.GetAllocatedInstances() != nullptr) {
cluster_task_manager_->HandleTaskFinished(worker_ptr);
}
} else {
// (See design_docs/task_states.rst for the state transition diagram.)
RAY_CHECK(local_queues_.RemoveTask(task_id, &task)) << task_id;
// Release task's resources. The worker's lifetime resources are still held.
auto const &task_resources = worker.GetTaskResourceIds();
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet());
worker.ResetTaskResourceIds();
}
const auto &spec = task.GetTaskSpecification(); // const auto &spec = task.GetTaskSpecification(); //
if ((spec.IsActorCreationTask())) { if ((spec.IsActorCreationTask())) {
@ -2479,36 +2097,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Object local " << object_id << ", " RAY_LOG(DEBUG) << "Object local " << object_id << ", "
<< " on " << self_node_id_ << ", " << ready_task_ids.size() << " on " << self_node_id_ << ", " << ready_task_ids.size()
<< " tasks ready"; << " tasks ready";
// Transition the tasks whose dependencies are now fulfilled to the ready state. cluster_task_manager_->TasksUnblocked(ready_task_ids);
if (new_scheduler_enabled_) {
cluster_task_manager_->TasksUnblocked(ready_task_ids);
ScheduleAndDispatch();
} else {
if (ready_task_ids.size() > 0) {
std::unordered_set<TaskID> ready_task_id_set(ready_task_ids.begin(),
ready_task_ids.end());
// First filter out the tasks that should not be moved to READY.
local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED);
local_queues_.FilterState(ready_task_id_set, TaskState::RUNNING);
local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER);
// Make sure that the remaining tasks are all WAITING or direct call
// actors.
auto ready_task_id_set_copy = ready_task_id_set;
local_queues_.FilterState(ready_task_id_set_copy, TaskState::WAITING);
// Filter out direct call actors. These are not tracked by the raylet and
// their assigned task ID is the actor ID.
for (const auto &id : ready_task_id_set_copy) {
ready_task_id_set.erase(id);
}
// Queue and dispatch the tasks that are ready to run (i.e., WAITING).
auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set);
local_queues_.QueueTasks(ready_tasks, TaskState::READY);
DispatchTasks(MakeTasksByClass(ready_tasks));
}
}
} }
bool NodeManager::IsActorCreationTask(const TaskID &task_id) { bool NodeManager::IsActorCreationTask(const TaskID &task_id) {
@ -2536,50 +2125,7 @@ void NodeManager::HandleObjectMissing(const ObjectID &object_id) {
} }
RAY_LOG(DEBUG) << result.str(); RAY_LOG(DEBUG) << result.str();
// We don't need to do anything if the new scheduler is enabled because tasks cluster_task_manager_->OnObjectMissing(object_id, waiting_task_ids);
// will get moved back to waiting once they reach the front of the dispatch
// queue.
if (!new_scheduler_enabled_) {
// Transition any tasks that were in the runnable state and are dependent on
// this object to the waiting state.
if (!waiting_task_ids.empty()) {
std::unordered_set<TaskID> waiting_task_id_set(waiting_task_ids.begin(),
waiting_task_ids.end());
// NOTE(zhijunfu): For direct actors, the worker is initially assigned actor
// creation task ID, which will not be reset after the task finishes. And later
// tasks of this actor will reuse this task ID to require objects from plasma with
// FetchOrReconstruct, since direct actor task IDs are not known to raylet.
// To support actor reconstruction for direct actor, raylet marks actor creation
// task as completed and removes it from `local_queues_` when it receives `TaskDone`
// message from worker. This is necessary because the actor creation task will be
// re-submitted during reconstruction, if the task is not removed previously, the
// new submitted task will be marked as duplicate and thus ignored. So here we check
// for direct actor creation task explicitly to allow this case.
auto iter = waiting_task_id_set.begin();
while (iter != waiting_task_id_set.end()) {
if (IsActorCreationTask(*iter)) {
RAY_LOG(DEBUG) << "Ignoring direct actor creation task " << *iter
<< " when handling object missing for " << object_id;
iter = waiting_task_id_set.erase(iter);
} else {
++iter;
}
}
// First filter out any tasks that can't be transitioned to READY. These
// are running workers or drivers, now blocked in a get.
local_queues_.FilterState(waiting_task_id_set, TaskState::RUNNING);
local_queues_.FilterState(waiting_task_id_set, TaskState::DRIVER);
// Transition the tasks back to the waiting state. They will be made
// runnable once the deleted object becomes available again.
local_queues_.MoveTasks(waiting_task_id_set, TaskState::READY, TaskState::WAITING);
RAY_CHECK(waiting_task_id_set.empty());
// Moving ready tasks to waiting may have changed the load, making space for placing
// new tasks locally.
ScheduleTasks(cluster_resource_map_);
}
}
} }
void NodeManager::ForwardTaskOrResubmit(const Task &task, const NodeID &node_manager_id) { void NodeManager::ForwardTaskOrResubmit(const Task &task, const NodeID &node_manager_id) {
@ -2750,7 +2296,7 @@ std::string NodeManager::DebugString() const {
result << "\nInitialConfigResources: " << initial_config_.resource_config.ToString(); result << "\nInitialConfigResources: " << initial_config_.resource_config.ToString();
if (cluster_task_manager_ != nullptr) { if (cluster_task_manager_ != nullptr) {
result << "\nClusterTaskManager:\n"; result << "\nClusterTaskManager:\n";
result << cluster_task_manager_->DebugString(); result << cluster_task_manager_->DebugStr();
} }
result << "\nClusterResources:"; result << "\nClusterResources:";
for (auto &pair : cluster_resource_map_) { for (auto &pair : cluster_resource_map_) {
@ -2838,9 +2384,8 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request, void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request,
rpc::GetNodeStatsReply *reply, rpc::GetNodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) { rpc::SendReplyCallback send_reply_callback) {
if (new_scheduler_enabled_) { cluster_task_manager_->FillPendingActorInfo(reply);
cluster_task_manager_->FillPendingActorInfo(reply);
}
for (const auto &task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { for (const auto &task : local_queues_.GetTasks(TaskState::INFEASIBLE)) {
if (task.GetTaskSpecification().IsActorCreationTask()) { if (task.GetTaskSpecification().IsActorCreationTask()) {
auto infeasible_task = reply->add_infeasible_tasks(); auto infeasible_task = reply->add_infeasible_tasks();

View file

@ -32,6 +32,8 @@
#include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/raylet/scheduling/scheduling_ids.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler.h" #include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include "ray/raylet/scheduling/cluster_task_manager.h" #include "ray/raylet/scheduling/cluster_task_manager.h"
#include "ray/raylet/scheduling/old_cluster_resource_scheduler.h"
#include "ray/raylet/scheduling/cluster_task_manager_interface.h"
#include "ray/raylet/scheduling_policy.h" #include "ray/raylet/scheduling_policy.h"
#include "ray/raylet/scheduling_queue.h" #include "ray/raylet/scheduling_queue.h"
#include "ray/raylet/reconstruction_policy.h" #include "ray/raylet/reconstruction_policy.h"
@ -110,7 +112,8 @@ struct NodeManagerConfig {
int64_t min_spilling_size; int64_t min_spilling_size;
}; };
class NodeManager : public rpc::NodeManagerServiceHandler { class NodeManager : public rpc::NodeManagerServiceHandler,
public ClusterTaskManagerInterface {
public: public:
/// Create a node manager. /// Create a node manager.
/// ///
@ -613,13 +616,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Dispatch tasks to available workers. /// Dispatch tasks to available workers.
void DispatchScheduledTasksToWorkers(); void DispatchScheduledTasksToWorkers();
/// For the pending task at the head of tasks_to_schedule_, return a node
/// in the system (local or remote) that has enough resources available to
/// run the task, if any such node exist.
/// Repeat the process as long as we can schedule a task.
/// NEW SCHEDULER_FUNCTION
void ScheduleAndDispatch();
/// Whether a task is an actor creation task. /// Whether a task is an actor creation task.
bool IsActorCreationTask(const TaskID &task_id); bool IsActorCreationTask(const TaskID &task_id);
@ -635,6 +631,122 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \param task Task that is infeasible /// \param task Task that is infeasible
void PublishInfeasibleTaskError(const Task &task) const; void PublishInfeasibleTaskError(const Task &task) const;
std::unordered_map<SchedulingClass, ordered_set<TaskID>> MakeTasksByClass(
const std::vector<Task> &tasks) const;
///////////////////////////////////////////////////////////////////////////////////////
//////////////////// Begin of the override methods of ClusterTaskManager //////////////
// The following methods are defined in node_manager.task.cc instead of node_manager.cc
/// Return the resources that were being used by this worker.
void ReleaseWorkerResources(std::shared_ptr<WorkerInterface> worker) override;
/// When a task is blocked in ray.get or ray.wait, the worker who is executing the task
/// should give up the CPU resources allocated for the running task for the time being
/// and the worker itself should also be marked as blocked.
///
/// \param worker The worker who will give up the CPU resources.
/// \return true if the cpu resources of the specified worker are released successfully,
/// else false.
bool ReleaseCpuResourcesFromUnblockedWorker(
std::shared_ptr<WorkerInterface> worker) override;
/// When a task is no longer blocked in a ray.get or ray.wait, the CPU resources that
/// the worker gave up should be returned to it.
///
/// \param worker The blocked worker.
/// \return true if the cpu resources are returned back to the specified worker, else
/// false.
bool ReturnCpuResourcesToBlockedWorker(
std::shared_ptr<WorkerInterface> worker) override;
// Schedule and dispatch tasks.
void ScheduleAndDispatchTasks() override;
/// Move tasks from waiting to ready for dispatch. Called when a task's
/// dependencies are resolved.
///
/// \param readyIds: The tasks which are now ready to be dispatched.
void TasksUnblocked(const std::vector<TaskID> &ready_ids) override;
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resource_load and resource_load_by_shape.
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) override;
/// Populate the list of pending or infeasible actor tasks for node stats.
///
/// \param Output parameter.
void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const override;
/// Return the finished task and relase the worker resources.
/// This method will be removed and can be replaced by `ReleaseWorkerResources` directly
/// once we remove the legacy scheduler.
///
/// \param worker: The worker which was running the task.
/// \param task: Output parameter.
void TaskFinished(std::shared_ptr<WorkerInterface> worker, Task *task) override;
/// Return worker resources.
/// This method will be removed and can be replaced by `ReleaseWorkerResources` directly
/// once we remove the legacy scheduler.
///
/// \param worker: The worker which was running the task.
void ReturnWorkerResources(std::shared_ptr<WorkerInterface> worker) override;
/// Attempt to cancel an already queued task.
///
/// \param task_id: The id of the task to remove.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(const TaskID &task_id) override;
/// Queue task and schedule. This hanppens when processing the worker lease request.
///
/// \param task: The incoming task to be queued and scheduled.
/// \param reply: The reply of the lease request.
/// \param send_reply_callback: The function used during dispatching.
void QueueAndScheduleTask(const Task &task, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Schedule infeasible tasks.
void ScheduleInfeasibleTasks() override;
/// Return if any tasks are pending resource acquisition.
///
/// \param[in] exemplar An example task that is deadlocking.
/// \param[in] num_pending_actor_creation Number of pending actor creation tasks.
/// \param[in] num_pending_tasks Number of pending tasks.
/// \param[in] any_pending True if there's any pending exemplar.
/// \return True if any progress is any tasks are pending.
bool AnyPendingTasks(Task *exemplar, bool *any_pending, int *num_pending_actor_creation,
int *num_pending_tasks) const override;
/// Handle the resource usage updated event of the specified node.
///
/// \param node_id ID of the node which resources are updated.
/// \param resource_data The node resources.
void OnNodeResourceUsageUpdated(const NodeID &node_id,
const rpc::ResourcesData &resource_data) override;
/// Handle the object missing event.
///
/// \param object_id ID of the missing object.
/// \param waiting_task_ids IDs of tasks that are waitting for the specified missing
/// object.
void OnObjectMissing(const ObjectID &object_id,
const std::vector<TaskID> &waiting_task_ids) override;
/// The helper to dump the debug state of the cluster task manater.
std::string DebugStr() const override;
//////////////////// End of the Override of ClusterTaskManager //////////////////////
///////////////////////////////////////////////////////////////////////////////////////
/// ID of this node. /// ID of this node.
NodeID self_node_id_; NodeID self_node_id_;
boost::asio::io_service &io_service_; boost::asio::io_service &io_service_;
@ -732,9 +844,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// lease on. /// lease on.
absl::flat_hash_map<WorkerID, std::vector<WorkerID>> leased_workers_by_owner_; absl::flat_hash_map<WorkerID, std::vector<WorkerID>> leased_workers_by_owner_;
/// Whether new schedule is enabled.
const bool new_scheduler_enabled_;
/// Whether to report the worker's backlog size in the GCS heartbeat. /// Whether to report the worker's backlog size in the GCS heartbeat.
const bool report_worker_backlog_; const bool report_worker_backlog_;
@ -760,8 +869,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// responsible for maintaining a view of the cluster state w.r.t resource /// responsible for maintaining a view of the cluster state w.r.t resource
/// usage. ClusterTaskManager is responsible for queuing, spilling back, and /// usage. ClusterTaskManager is responsible for queuing, spilling back, and
/// dispatching tasks. /// dispatching tasks.
std::shared_ptr<ClusterResourceScheduler> new_resource_scheduler_; std::shared_ptr<ClusterResourceSchedulerInterface> cluster_resource_scheduler_;
std::shared_ptr<ClusterTaskManager> cluster_task_manager_; std::shared_ptr<ClusterTaskManagerInterface> cluster_task_manager_;
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_; absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_;

View file

@ -0,0 +1,357 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/raylet/node_manager.h"
namespace ray {
namespace raylet {
void NodeManager::ReleaseWorkerResources(std::shared_ptr<WorkerInterface> worker) {
auto const &task_resources = worker->GetTaskResourceIds();
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet());
worker->ResetTaskResourceIds();
auto const &lifetime_resources = worker->GetLifetimeResourceIds();
local_available_resources_.ReleaseConstrained(
lifetime_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet());
worker->ResetLifetimeResourceIds();
}
bool NodeManager::ReleaseCpuResourcesFromUnblockedWorker(
std::shared_ptr<WorkerInterface> worker) {
if (!worker || worker->IsBlocked()) {
return false;
}
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
local_available_resources_.Release(cpu_resource_ids);
cluster_resource_map_[self_node_id_].Release(cpu_resource_ids.ToResourceSet());
return true;
}
bool NodeManager::ReturnCpuResourcesToBlockedWorker(
std::shared_ptr<WorkerInterface> worker) {
if (!worker || !worker->IsBlocked()) {
return false;
}
const TaskID &task_id = worker->GetAssignedTaskId();
const Task &task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING);
const auto &required_resources = task.GetTaskSpecification().GetRequiredResources();
const ResourceSet cpu_resources = required_resources.GetNumCpus();
bool oversubscribed = !local_available_resources_.Contains(cpu_resources);
if (!oversubscribed) {
// Reacquire the CPU resources for the worker. Note that care needs to be
// taken if the user is using the specific CPU IDs since the IDs that we
// reacquire here may be different from the ones that the task started with.
auto const resource_ids = local_available_resources_.Acquire(cpu_resources);
worker->AcquireTaskCpuResources(resource_ids);
cluster_resource_map_[self_node_id_].Acquire(cpu_resources);
} else {
// In this case, we simply don't reacquire the CPU resources for the worker.
// The worker can keep running and when the task finishes, it will simply
// not have any CPU resources to release.
RAY_LOG(WARNING)
<< "Resources oversubscribed: "
<< cluster_resource_map_[self_node_id_].GetAvailableResources().ToString();
}
return true;
}
void NodeManager::ScheduleAndDispatchTasks() {
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
void NodeManager::TasksUnblocked(const std::vector<TaskID> &ready_ids) {
if (ready_ids.empty()) {
return;
}
std::unordered_set<TaskID> ready_task_id_set(ready_ids.begin(), ready_ids.end());
// First filter out the tasks that should not be moved to READY.
local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED);
local_queues_.FilterState(ready_task_id_set, TaskState::RUNNING);
local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER);
// Make sure that the remaining tasks are all WAITING or direct call
// actors.
auto ready_task_id_set_copy = ready_task_id_set;
local_queues_.FilterState(ready_task_id_set_copy, TaskState::WAITING);
// Filter out direct call actors. These are not tracked by the raylet and
// their assigned task ID is the actor ID.
for (const auto &id : ready_task_id_set_copy) {
ready_task_id_set.erase(id);
}
// Queue and dispatch the tasks that are ready to run (i.e., WAITING).
auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set);
local_queues_.QueueTasks(ready_tasks, TaskState::READY);
DispatchTasks(MakeTasksByClass(ready_tasks));
}
void NodeManager::FillResourceUsage(std::shared_ptr<rpc::ResourcesData> resources_data) {
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad());
auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage();
if (!last_heartbeat_resources->GetLoadResources().IsEqual(
local_resources.GetLoadResources())) {
resources_data->set_resource_load_changed(true);
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources->SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
}
// Add resource load by shape. This will be used by the new autoscaler.
auto resource_load = local_queues_.GetResourceLoadByShape(
RayConfig::instance().max_resource_shapes_per_load_report());
resources_data->mutable_resource_load_by_shape()->Swap(&resource_load);
}
void NodeManager::TaskFinished(std::shared_ptr<WorkerInterface> worker, Task *task) {
RAY_CHECK(worker != nullptr && task != nullptr);
const auto &task_id = worker->GetAssignedTaskId();
// (See design_docs/task_states.rst for the state transition diagram.)
RAY_CHECK(local_queues_.RemoveTask(task_id, task)) << task_id;
// Release task's resources. The worker's lifetime resources are still held.
auto const &task_resources = worker->GetTaskResourceIds();
local_available_resources_.ReleaseConstrained(
task_resources, cluster_resource_map_[self_node_id_].GetTotalResources());
cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet());
worker->ResetTaskResourceIds();
}
void NodeManager::ReturnWorkerResources(std::shared_ptr<WorkerInterface> worker) {
// Do nothing.
}
bool NodeManager::CancelTask(const TaskID &task_id) {
Task removed_task;
TaskState removed_task_state;
bool canceled = local_queues_.RemoveTask(task_id, &removed_task, &removed_task_state);
if (!canceled) {
// We do not have the task. This could be because we haven't received the
// lease request yet, or because we already granted the lease request and
// it has already been returned.
} else {
if (removed_task.OnDispatch()) {
// We have not yet granted the worker lease. Cancel it now.
removed_task.OnCancellation()();
if (removed_task_state == TaskState::WAITING) {
dependency_manager_.RemoveTaskDependencies(task_id);
}
} else {
// We already granted the worker lease and sent the reply. Re-queue the
// task and wait for the requester to return the leased worker.
local_queues_.QueueTasks({removed_task}, removed_task_state);
}
}
return canceled;
}
void NodeManager::QueueAndScheduleTask(const Task &task,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) {
// Override the task dispatch to call back to the client instead of executing the
// task directly on the worker.
TaskID task_id = task.GetTaskSpecification().TaskId();
rpc::Address owner_address = task.GetTaskSpecification().CallerAddress();
Task &mutable_task = const_cast<Task &>(task);
mutable_task.OnDispatchInstead(
[this, owner_address, reply, send_reply_callback](
const std::shared_ptr<void> granted, const std::string &address, int port,
const WorkerID &worker_id, const ResourceIdSet &resource_ids) {
auto worker = std::static_pointer_cast<Worker>(granted);
uint32_t worker_pid = static_cast<uint32_t>(worker->GetProcess().GetId());
reply->mutable_worker_address()->set_ip_address(address);
reply->mutable_worker_address()->set_port(port);
reply->mutable_worker_address()->set_worker_id(worker_id.Binary());
reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary());
reply->set_worker_pid(worker_pid);
for (const auto &mapping : resource_ids.AvailableResources()) {
auto resource = reply->add_resource_mapping();
resource->set_name(mapping.first);
for (const auto &id : mapping.second.WholeIds()) {
auto rid = resource->add_resource_ids();
rid->set_index(id);
rid->set_quantity(1.0);
}
for (const auto &id : mapping.second.FractionalIds()) {
auto rid = resource->add_resource_ids();
rid->set_index(id.first);
rid->set_quantity(id.second.ToDouble());
}
}
auto reply_failure_handler = [this, worker_id]() {
RAY_LOG(WARNING)
<< "Failed to reply to GCS server, because it might have restarted. GCS "
"cannot obtain the information of the leased worker, so we need to "
"release the leased worker to avoid leakage.";
leased_workers_.erase(worker_id);
metrics_num_task_executed_ -= 1;
};
metrics_num_task_executed_ += 1;
send_reply_callback(Status::OK(), nullptr, reply_failure_handler);
RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end())
<< "Worker is already leased out " << worker_id;
leased_workers_[worker_id] = worker;
});
mutable_task.OnSpillbackInstead(
[this, reply, task_id, send_reply_callback](const NodeID &spillback_to,
const std::string &address, int port) {
RAY_LOG(DEBUG) << "Worker lease request SPILLBACK " << task_id;
reply->mutable_retry_at_raylet_address()->set_ip_address(address);
reply->mutable_retry_at_raylet_address()->set_port(port);
reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary());
metrics_num_task_spilled_back_ += 1;
send_reply_callback(Status::OK(), nullptr, nullptr);
});
mutable_task.OnCancellationInstead([reply, task_id, send_reply_callback]() {
RAY_LOG(DEBUG) << "Task lease request canceled " << task_id;
reply->set_canceled(true);
send_reply_callback(Status::OK(), nullptr, nullptr);
});
SubmitTask(task);
}
void NodeManager::ScheduleInfeasibleTasks() { TryLocalInfeasibleTaskScheduling(); }
/// Return if any tasks are pending resource acquisition.
///
/// \param[in] exemplar An example task that is deadlocking.
/// \param[in] num_pending_actor_creation Number of pending actor creation tasks.
/// \param[in] num_pending_tasks Number of pending tasks.
/// \param[in] any_pending True if there's any pending exemplar.
/// \return True if any progress is any tasks are pending.
bool NodeManager::AnyPendingTasks(Task *exemplar, bool *any_pending,
int *num_pending_actor_creation,
int *num_pending_tasks) const {
// See if any tasks are blocked trying to acquire resources.
for (const auto &task : local_queues_.GetTasks(TaskState::READY)) {
const TaskSpecification &spec = task.GetTaskSpecification();
if (spec.IsActorCreationTask()) {
*num_pending_actor_creation += 1;
} else {
*num_pending_tasks += 1;
}
if (!any_pending) {
*exemplar = task;
*any_pending = true;
}
}
return *any_pending;
}
void NodeManager::OnNodeResourceUsageUpdated(const NodeID &node_id,
const rpc::ResourcesData &resource_data) {
auto it = cluster_resource_map_.find(node_id);
if (it == cluster_resource_map_.end()) {
return;
}
// Extract decision for this raylet.
auto decision =
scheduling_policy_.SpillOver(it->second, cluster_resource_map_[self_node_id_]);
std::unordered_set<TaskID> local_task_ids;
for (const auto &task_id : decision) {
// (See design_docs/task_states.rst for the state transition diagram.)
Task task;
TaskState state;
if (!local_queues_.RemoveTask(task_id, &task, &state)) {
return;
}
// Since we are spilling back from the ready and waiting queues, we need
// to unsubscribe the dependencies.
if (state != TaskState::INFEASIBLE) {
// Don't unsubscribe for infeasible tasks because we never subscribed in
// the first place.
dependency_manager_.RemoveTaskDependencies(task_id);
}
// Attempt to forward the task. If this fails to forward the task,
// the task will be resubmit locally.
ForwardTaskOrResubmit(task, node_id);
}
}
void NodeManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const {
// TODO(Shanly): Implement.
}
void NodeManager::OnObjectMissing(const ObjectID &object_id,
const std::vector<TaskID> &waiting_task_ids) {
RAY_UNUSED(object_id);
// Transition any tasks that were in the runnable state and are dependent on
// this object to the waiting state.
if (!waiting_task_ids.empty()) {
std::unordered_set<TaskID> waiting_task_id_set(waiting_task_ids.begin(),
waiting_task_ids.end());
// NOTE(zhijunfu): For direct actors, the worker is initially assigned actor
// creation task ID, which will not be reset after the task finishes. And later
// tasks of this actor will reuse this task ID to require objects from plasma with
// FetchOrReconstruct, since direct actor task IDs are not known to raylet.
// To support actor reconstruction for direct actor, raylet marks actor creation
// task as completed and removes it from `local_queues_` when it receives `TaskDone`
// message from worker. This is necessary because the actor creation task will be
// re-submitted during reconstruction, if the task is not removed previously, the
// new submitted task will be marked as duplicate and thus ignored. So here we check
// for direct actor creation task explicitly to allow this case.
auto iter = waiting_task_id_set.begin();
while (iter != waiting_task_id_set.end()) {
if (IsActorCreationTask(*iter)) {
RAY_LOG(DEBUG) << "Ignoring direct actor creation task " << *iter
<< " when handling object missing for " << object_id;
iter = waiting_task_id_set.erase(iter);
} else {
++iter;
}
}
// First filter out any tasks that can't be transitioned to READY. These
// are running workers or drivers, now blocked in a get.
local_queues_.FilterState(waiting_task_id_set, TaskState::RUNNING);
local_queues_.FilterState(waiting_task_id_set, TaskState::DRIVER);
// Transition the tasks back to the waiting state. They will be made
// runnable once the deleted object becomes available again.
local_queues_.MoveTasks(waiting_task_id_set, TaskState::READY, TaskState::WAITING);
RAY_CHECK(waiting_task_id_set.empty());
// Moving ready tasks to waiting may have changed the load, making space for placing
// new tasks locally.
ScheduleTasks(cluster_resource_map_);
}
}
std::string NodeManager::DebugStr() const {
// As the NodeManager inherites from ClusterTaskManager and the
// `cluster_task_manager_->DebugString()` is invoked inside
// `NodeManager::DebugString()`, which will leads to infinite loop and cause stack
// overflow, so we should rename `ClusterTaskManager::DebugString` to
// `ClusterTaskManager::DebugStr` to avoid this.
return "";
}
} // namespace raylet
} // namespace ray

View file

@ -229,7 +229,7 @@ void NewPlacementGroupResourceManager::ReturnBundle(
// Return original resources to resource allocator `ClusterResourceScheduler`. // Return original resources to resource allocator `ClusterResourceScheduler`.
auto original_resources = it->second->resources_; auto original_resources = it->second->resources_;
cluster_resource_scheduler_->FreeLocalTaskResources(original_resources); cluster_resource_scheduler_->ReleaseWorkerResources(original_resources);
// Substract placement group resources from resource allocator // Substract placement group resources from resource allocator
// `ClusterResourceScheduler`. // `ClusterResourceScheduler`.

View file

@ -14,6 +14,9 @@
#include "ray/raylet/scheduling/cluster_resource_scheduler.h" #include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include "ray/common/grpc_util.h"
#include "ray/common/ray_config.h"
namespace ray { namespace ray {
ClusterResourceScheduler::ClusterResourceScheduler( ClusterResourceScheduler::ClusterResourceScheduler(
@ -55,6 +58,44 @@ void ClusterResourceScheduler::AddOrUpdateNode(int64_t node_id,
} }
} }
bool ClusterResourceScheduler::UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data) {
auto node_id = string_to_int_map_.Insert(node_id_string);
if (!nodes_.contains(node_id)) {
return false;
}
auto resources_total = MapFromProtobuf(resource_data.resources_total());
auto resources_available = MapFromProtobuf(resource_data.resources_available());
NodeResources node_resources = ResourceMapToNodeResources(
string_to_int_map_, resources_total, resources_available);
NodeResources local_view;
RAY_CHECK(GetNodeResources(node_id, &local_view));
if (resource_data.resources_total_size() > 0) {
for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) {
local_view.predefined_resources[i].total =
node_resources.predefined_resources[i].total;
}
for (auto &entry : node_resources.custom_resources) {
local_view.custom_resources[entry.first].total = entry.second.total;
}
}
if (resource_data.resources_available_changed()) {
for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) {
local_view.predefined_resources[i].available =
node_resources.predefined_resources[i].available;
}
for (auto &entry : node_resources.custom_resources) {
local_view.custom_resources[entry.first].available = entry.second.available;
}
}
AddOrUpdateNode(node_id, local_view);
return true;
}
bool ClusterResourceScheduler::RemoveNode(int64_t node_id) { bool ClusterResourceScheduler::RemoveNode(int64_t node_id) {
auto it = nodes_.find(node_id); auto it = nodes_.find(node_id);
if (it == nodes_.end()) { if (it == nodes_.end()) {
@ -324,6 +365,12 @@ bool ClusterResourceScheduler::GetNodeResources(int64_t node_id,
} }
} }
const NodeResources &ClusterResourceScheduler::GetLocalNodeResources() const {
const auto &node_it = nodes_.find(local_node_id_);
RAY_CHECK(node_it != nodes_.end());
return node_it->second.GetLocalView();
}
int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); } int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); }
void ClusterResourceScheduler::AddLocalResource(const std::string &resource_name, void ClusterResourceScheduler::AddLocalResource(const std::string &resource_name,
@ -883,7 +930,7 @@ bool ClusterResourceScheduler::AllocateRemoteTaskResources(
return SubtractRemoteNodeAvailableResources(node_id, task_request); return SubtractRemoteNodeAvailableResources(node_id, task_request);
} }
void ClusterResourceScheduler::FreeLocalTaskResources( void ClusterResourceScheduler::ReleaseWorkerResources(
std::shared_ptr<TaskResourceInstances> task_allocation) { std::shared_ptr<TaskResourceInstances> task_allocation) {
if (task_allocation == nullptr || task_allocation->IsEmpty()) { if (task_allocation == nullptr || task_allocation->IsEmpty()) {
return; return;

View file

@ -22,10 +22,10 @@
#include "absl/container/flat_hash_set.h" #include "absl/container/flat_hash_set.h"
#include "ray/common/task/scheduling_resources.h" #include "ray/common/task/scheduling_resources.h"
#include "ray/raylet/scheduling/cluster_resource_data.h" #include "ray/raylet/scheduling/cluster_resource_data.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h"
#include "ray/raylet/scheduling/fixed_point.h" #include "ray/raylet/scheduling/fixed_point.h"
#include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/raylet/scheduling/scheduling_ids.h"
#include "ray/util/logging.h" #include "ray/util/logging.h"
#include "src/ray/protobuf/gcs.pb.h" #include "src/ray/protobuf/gcs.pb.h"
namespace ray { namespace ray {
@ -38,7 +38,7 @@ static std::unordered_set<int64_t> UnitInstanceResources{CPU, GPU, TPU};
/// Class encapsulating the cluster resources and the logic to assign /// Class encapsulating the cluster resources and the logic to assign
/// tasks to nodes based on the task's constraints and the available /// tasks to nodes based on the task's constraints and the available
/// resources at those nodes. /// resources at those nodes.
class ClusterResourceScheduler { class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
public: public:
ClusterResourceScheduler(void){}; ClusterResourceScheduler(void){};
@ -66,12 +66,19 @@ class ClusterResourceScheduler {
const std::unordered_map<std::string, double> &resource_map_total, const std::unordered_map<std::string, double> &resource_map_total,
const std::unordered_map<std::string, double> &resource_map_available); const std::unordered_map<std::string, double> &resource_map_available);
/// Update node resources. This hanppens when a node resource usage udpated.
///
/// \param node_id_string ID of the node which resoruces need to be udpated.
/// \param resource_data The node resource data.
bool UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data) override;
/// Remove node from the cluster data structure. This happens /// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster. /// when a node fails or it is removed from the cluster.
/// ///
/// \param ID of the node to be removed. /// \param ID of the node to be removed.
bool RemoveNode(int64_t node_id); bool RemoveNode(int64_t node_id);
bool RemoveNode(const std::string &node_id_string); bool RemoveNode(const std::string &node_id_string) override;
/// Check whether a task request is feasible on a given node. A node is /// Check whether a task request is feasible on a given node. A node is
/// feasible if it has the total resources needed to eventually execute the /// feasible if it has the total resources needed to eventually execute the
@ -155,6 +162,9 @@ class ClusterResourceScheduler {
/// If node_id not found, return false; otherwise return true. /// If node_id not found, return false; otherwise return true.
bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const; bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const;
/// Get local node resources.
const NodeResources &GetLocalNodeResources() const;
/// Get number of nodes in the cluster. /// Get number of nodes in the cluster.
int64_t NumNodes(); int64_t NumNodes();
@ -175,7 +185,8 @@ class ClusterResourceScheduler {
/// \param resource_name: Resource which we want to update. /// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource. /// \param resource_total: New capacity of the resource.
void UpdateResourceCapacity(const std::string &node_name, void UpdateResourceCapacity(const std::string &node_name,
const std::string &resource_name, double resource_total); const std::string &resource_name,
double resource_total) override;
/// Delete a given resource from the local node. /// Delete a given resource from the local node.
/// ///
@ -186,13 +197,14 @@ class ClusterResourceScheduler {
/// ///
/// \param node_name: Node whose resource we want to delete. /// \param node_name: Node whose resource we want to delete.
/// \param resource_name: Resource we want to delete /// \param resource_name: Resource we want to delete
void DeleteResource(const std::string &node_name, const std::string &resource_name); void DeleteResource(const std::string &node_name,
const std::string &resource_name) override;
/// Return local resources. /// Return local resources.
NodeResourceInstances GetLocalResources() { return local_resources_; }; NodeResourceInstances GetLocalResources() { return local_resources_; };
/// Return local resources in human-readable string form. /// Return local resources in human-readable string form.
std::string GetLocalResourceViewString() const; std::string GetLocalResourceViewString() const override;
/// Create instances for each resource associated with the local node, given /// Create instances for each resource associated with the local node, given
/// the node's resources. /// the node's resources.
@ -349,7 +361,7 @@ class ClusterResourceScheduler {
const std::string &node_id, const std::string &node_id,
const std::unordered_map<std::string, double> &task_resources); const std::unordered_map<std::string, double> &task_resources);
void FreeLocalTaskResources(std::shared_ptr<TaskResourceInstances> task_allocation); void ReleaseWorkerResources(std::shared_ptr<TaskResourceInstances> task_allocation);
/// Update the available resources of the local node given /// Update the available resources of the local node given
/// the available instances of each resource of the local node. /// the available instances of each resource of the local node.
@ -368,13 +380,14 @@ class ClusterResourceScheduler {
/// ///
/// \param Output parameter. `resources_available` and `resources_total` are the only /// \param Output parameter. `resources_available` and `resources_total` are the only
/// fields used. /// fields used.
void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> resources_data); void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> resources_data) override;
/// Update last report resources local cache from gcs cache, /// Update last report resources local cache from gcs cache,
/// this is needed when gcs fo. /// this is needed when gcs fo.
/// ///
/// \param gcs_resources: The remote cache from gcs. /// \param gcs_resources: The remote cache from gcs.
void UpdateLastResourceUsage(std::shared_ptr<SchedulingResources> gcs_resources); void UpdateLastResourceUsage(
std::shared_ptr<SchedulingResources> gcs_resources) override;
/// Return human-readable string for this scheduler state. /// Return human-readable string for this scheduler state.
std::string DebugString() const; std::string DebugString() const;

View file

@ -0,0 +1,70 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/common/task/scheduling_resources.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
class ClusterResourceSchedulerInterface {
public:
virtual ~ClusterResourceSchedulerInterface() = default;
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param node_id_string ID of the node to be removed.
virtual bool RemoveNode(const std::string &node_id_string) = 0;
/// Update node resources. This hanppens when a node resource usage udpated.
///
/// \param node_id_string ID of the node which resoruces need to be udpated.
/// \param resource_data The node resource data.
virtual bool UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data) = 0;
/// \param node_name: Node whose resource we want to update.
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
virtual void UpdateResourceCapacity(const std::string &node_id_string,
const std::string &resource_name,
double resource_total) = 0;
/// Delete a given resource from a given node.
///
/// \param node_name: Node whose resource we want to delete.
/// \param resource_name: Resource we want to delete
virtual void DeleteResource(const std::string &node_id_string,
const std::string &resource_name) = 0;
/// Update last report resources local cache from gcs cache,
/// this is needed when gcs fo.
///
/// \param gcs_resources: The remote cache from gcs.
virtual void UpdateLastResourceUsage(
std::shared_ptr<SchedulingResources> gcs_resources) {}
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resources_available and resources_total.
///
/// \param Output parameter. `resources_available` and `resources_total` are the only
/// fields used.
virtual void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) = 0;
/// Return local resources in human-readable string form.
virtual std::string GetLocalResourceViewString() const = 0;
};
} // namespace ray

View file

@ -18,7 +18,9 @@ ClusterTaskManager::ClusterTaskManager(
TaskDependencyManagerInterface &task_dependency_manager, TaskDependencyManagerInterface &task_dependency_manager,
std::function<bool(const WorkerID &, const NodeID &)> is_owner_alive, std::function<bool(const WorkerID &, const NodeID &)> is_owner_alive,
NodeInfoGetter get_node_info, NodeInfoGetter get_node_info,
std::function<void(const Task &)> announce_infeasible_task) std::function<void(const Task &)> announce_infeasible_task,
WorkerPoolInterface &worker_pool,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers)
: self_node_id_(self_node_id), : self_node_id_(self_node_id),
cluster_resource_scheduler_(cluster_resource_scheduler), cluster_resource_scheduler_(cluster_resource_scheduler),
task_dependency_manager_(task_dependency_manager), task_dependency_manager_(task_dependency_manager),
@ -27,7 +29,9 @@ ClusterTaskManager::ClusterTaskManager(
announce_infeasible_task_(announce_infeasible_task), announce_infeasible_task_(announce_infeasible_task),
max_resource_shapes_per_load_report_( max_resource_shapes_per_load_report_(
RayConfig::instance().max_resource_shapes_per_load_report()), RayConfig::instance().max_resource_shapes_per_load_report()),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()) {} report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
worker_pool_(worker_pool),
leased_workers_(leased_workers) {}
bool ClusterTaskManager::SchedulePendingTasks() { bool ClusterTaskManager::SchedulePendingTasks() {
// Always try to schedule infeasible tasks in case they are now feasible. // Always try to schedule infeasible tasks in case they are now feasible.
@ -150,7 +154,7 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
continue; continue;
} }
std::shared_ptr<WorkerInterface> worker = worker_pool.PopWorker(spec); std::shared_ptr<WorkerInterface> worker = worker_pool_.PopWorker(spec);
if (!worker) { if (!worker) {
// No worker available, we won't be able to schedule any kind of task. // No worker available, we won't be able to schedule any kind of task.
return; return;
@ -163,7 +167,7 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
if (!spec.IsDetachedActor() && !is_owner_alive_(owner_worker_id, owner_node_id)) { if (!spec.IsDetachedActor() && !is_owner_alive_(owner_worker_id, owner_node_id)) {
RAY_LOG(WARNING) << "Task: " << task.GetTaskSpecification().TaskId() RAY_LOG(WARNING) << "Task: " << task.GetTaskSpecification().TaskId()
<< "'s caller is no longer running. Cancelling task."; << "'s caller is no longer running. Cancelling task.";
worker_pool.PushWorker(worker); worker_pool_.PushWorker(worker);
if (!spec.GetDependencies().empty()) { if (!spec.GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies( task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId()); task.GetTaskSpecification().TaskId());
@ -175,9 +179,9 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
if (worker_leased) { if (worker_leased) {
auto reply = std::get<1>(*work_it); auto reply = std::get<1>(*work_it);
auto callback = std::get<2>(*work_it); auto callback = std::get<2>(*work_it);
Dispatch(worker, leased_workers, task, reply, callback); Dispatch(worker, leased_workers_, task, reply, callback);
} else { } else {
worker_pool.PushWorker(worker); worker_pool_.PushWorker(worker);
} }
if (remove) { if (remove) {
if (!spec.GetDependencies().empty()) { if (!spec.GetDependencies().empty()) {
@ -242,10 +246,14 @@ bool ClusterTaskManager::AttemptDispatchWork(const Work &work,
return dispatched; return dispatched;
} }
void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, void ClusterTaskManager::QueueAndScheduleTask(
std::function<void(void)> callback) { const Task &task, rpc::RequestWorkerLeaseReply *reply,
RAY_LOG(DEBUG) << "Queuing task " << task.GetTaskSpecification().TaskId(); rpc::SendReplyCallback send_reply_callback) {
Work work = std::make_tuple(task, reply, callback); RAY_LOG(DEBUG) << "Queuing and scheduling task "
<< task.GetTaskSpecification().TaskId();
Work work = std::make_tuple(task, reply, [send_reply_callback] {
send_reply_callback(Status::OK(), nullptr, nullptr);
});
const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass();
// If the scheduling class is infeasible, just add the work to the infeasible queue // If the scheduling class is infeasible, just add the work to the infeasible queue
// directly. // directly.
@ -255,9 +263,19 @@ void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseRepl
tasks_to_schedule_[scheduling_class].push_back(work); tasks_to_schedule_[scheduling_class].push_back(work);
} }
AddToBacklogTracker(task); AddToBacklogTracker(task);
ScheduleAndDispatchTasks();
} }
void ClusterTaskManager::TasksUnblocked(const std::vector<TaskID> ready_ids) { void ClusterTaskManager::ScheduleInfeasibleTasks() {
// Do nothing.
// TODO(Shanly): This method will be removed once we remove the legacy scheduler.
}
void ClusterTaskManager::TasksUnblocked(const std::vector<TaskID> &ready_ids) {
if (ready_ids.empty()) {
return;
}
for (const auto &task_id : ready_ids) { for (const auto &task_id : ready_ids) {
auto it = waiting_tasks_.find(task_id); auto it = waiting_tasks_.find(task_id);
if (it != waiting_tasks_.end()) { if (it != waiting_tasks_.end()) {
@ -270,14 +288,22 @@ void ClusterTaskManager::TasksUnblocked(const std::vector<TaskID> ready_ids) {
waiting_tasks_.erase(it); waiting_tasks_.erase(it);
} }
} }
ScheduleAndDispatchTasks();
} }
void ClusterTaskManager::HandleTaskFinished(std::shared_ptr<WorkerInterface> worker) { void ClusterTaskManager::TaskFinished(std::shared_ptr<WorkerInterface> worker,
cluster_resource_scheduler_->FreeLocalTaskResources(worker->GetAllocatedInstances()); Task *task) {
worker->ClearAllocatedInstances(); RAY_CHECK(worker != nullptr && task != nullptr);
cluster_resource_scheduler_->FreeLocalTaskResources( *task = worker->GetAssignedTask();
worker->GetLifetimeAllocatedInstances()); if (worker->GetAllocatedInstances() != nullptr) {
worker->ClearLifetimeAllocatedInstances(); ReleaseWorkerResources(worker);
}
}
void ClusterTaskManager::ReturnWorkerResources(std::shared_ptr<WorkerInterface> worker) {
// TODO(Shanly): This method will be removed and can be replaced by
// `ReleaseWorkerResources` directly once we remove the legacy scheduler.
ReleaseWorkerResources(worker);
} }
void ReplyCancelled(Work &work) { void ReplyCancelled(Work &work) {
@ -394,8 +420,7 @@ void ClusterTaskManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) con
} }
} }
void ClusterTaskManager::FillResourceUsage( void ClusterTaskManager::FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) {
std::shared_ptr<rpc::ResourcesData> data) const {
if (max_resource_shapes_per_load_report_ == 0) { if (max_resource_shapes_per_load_report_ == 0) {
return; return;
} }
@ -568,7 +593,7 @@ bool ClusterTaskManager::AnyPendingTasks(Task *exemplar, bool *any_pending,
int *num_pending_actor_creation, int *num_pending_actor_creation,
int *num_pending_tasks) const { int *num_pending_tasks) const {
// We are guaranteed that these tasks are blocked waiting for resources after a // We are guaranteed that these tasks are blocked waiting for resources after a
// call to ScheduleAndDispatch(). They may be waiting for workers as well, but // call to ScheduleAndDispatchTasks(). They may be waiting for workers as well, but
// this should be a transient condition only. // this should be a transient condition only.
for (const auto &shapes_it : boost::join(tasks_to_dispatch_, tasks_to_schedule_)) { for (const auto &shapes_it : boost::join(tasks_to_dispatch_, tasks_to_schedule_)) {
auto &work_queue = shapes_it.second; auto &work_queue = shapes_it.second;
@ -590,7 +615,9 @@ bool ClusterTaskManager::AnyPendingTasks(Task *exemplar, bool *any_pending,
return *any_pending; return *any_pending;
} }
std::string ClusterTaskManager::DebugString() const { std::string ClusterTaskManager::DebugStr() const {
// TODO(Shanly): This method will be replaced with `DebugString` once we remove the
// legacy scheduler.
std::stringstream buffer; std::stringstream buffer;
buffer << "========== Node: " << self_node_id_ << " =================\n"; buffer << "========== Node: " << self_node_id_ << " =================\n";
buffer << "Schedule queue length: " << tasks_to_schedule_.size() << "\n"; buffer << "Schedule queue length: " << tasks_to_schedule_.size() << "\n";
@ -747,5 +774,74 @@ void ClusterTaskManager::RemoveFromBacklogTracker(const Task &task) {
} }
} }
void ClusterTaskManager::ReleaseWorkerResources(std::shared_ptr<WorkerInterface> worker) {
RAY_CHECK(worker != nullptr);
cluster_resource_scheduler_->ReleaseWorkerResources(worker->GetAllocatedInstances());
worker->ClearAllocatedInstances();
cluster_resource_scheduler_->ReleaseWorkerResources(
worker->GetLifetimeAllocatedInstances());
worker->ClearLifetimeAllocatedInstances();
}
bool ClusterTaskManager::ReleaseCpuResourcesFromUnblockedWorker(
std::shared_ptr<WorkerInterface> worker) {
if (!worker || worker->IsBlocked()) {
return false;
}
if (worker->GetAllocatedInstances() != nullptr) {
auto cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble();
if (cpu_instances.size() > 0) {
std::vector<double> overflow_cpu_instances =
cluster_resource_scheduler_->AddCPUResourceInstances(cpu_instances);
for (unsigned int i = 0; i < overflow_cpu_instances.size(); i++) {
RAY_CHECK(overflow_cpu_instances[i] == 0) << "Should not be overflow";
}
return true;
}
}
return false;
}
bool ClusterTaskManager::ReturnCpuResourcesToBlockedWorker(
std::shared_ptr<WorkerInterface> worker) {
if (!worker || !worker->IsBlocked()) {
return false;
}
if (worker->GetAllocatedInstances() != nullptr) {
auto cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble();
if (cpu_instances.size() > 0) {
// Important: we allow going negative here, since otherwise you can use infinite
// CPU resources by repeatedly blocking / unblocking a task. By allowing it to go
// negative, at most one task can "borrow" this worker's resources.
cluster_resource_scheduler_->SubtractCPUResourceInstances(
cpu_instances, /*allow_going_negative=*/true);
return true;
}
}
return false;
}
void ClusterTaskManager::ScheduleAndDispatchTasks() {
SchedulePendingTasks();
DispatchScheduledTasksToWorkers(worker_pool_, leased_workers_);
}
void ClusterTaskManager::OnNodeResourceUsageUpdated(
const NodeID &node_id, const rpc::ResourcesData &resource_data) {
// TODO(Shanly): This method will be removed and can be replaced by
// `ScheduleAndDispatchTasks` directly once we remove the legacy scheduler.
ScheduleAndDispatchTasks();
}
void ClusterTaskManager::OnObjectMissing(const ObjectID &object_id,
const std::vector<TaskID> &waiting_task_ids) {
// We don't need to do anything if the new scheduler is enabled because tasks
// will get moved back to waiting once they reach the front of the dispatch
// queue.
// TODO(Shanly): This method will be removed once we remove the legacy scheduler.
}
} // namespace raylet } // namespace raylet
} // namespace ray } // namespace ray

View file

@ -6,6 +6,7 @@
#include "ray/common/task/task_common.h" #include "ray/common/task/task_common.h"
#include "ray/raylet/dependency_manager.h" #include "ray/raylet/dependency_manager.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler.h" #include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include "ray/raylet/scheduling/cluster_task_manager_interface.h"
#include "ray/raylet/worker.h" #include "ray/raylet/worker.h"
#include "ray/raylet/worker_pool.h" #include "ray/raylet/worker_pool.h"
#include "ray/rpc/grpc_client.h" #include "ray/rpc/grpc_client.h"
@ -37,7 +38,7 @@ typedef std::function<boost::optional<rpc::GcsNodeInfo>(const NodeID &node_id)>
/// there is a new worker which can dispatch the tasks. /// there is a new worker which can dispatch the tasks.
/// 5. When a worker finishes executing its task(s), the requester will return /// 5. When a worker finishes executing its task(s), the requester will return
/// it and we should release the resources in our view of the node's state. /// it and we should release the resources in our view of the node's state.
class ClusterTaskManager { class ClusterTaskManager : public ClusterTaskManagerInterface {
public: public:
/// fullfills_dependencies_func Should return if all dependencies are /// fullfills_dependencies_func Should return if all dependencies are
/// fulfilled and unsubscribe from dependencies only if they're fulfilled. If /// fulfilled and unsubscribe from dependencies only if they're fulfilled. If
@ -51,13 +52,126 @@ class ClusterTaskManager {
/// \param is_owner_alive: A callback which returns if the owner process is alive /// \param is_owner_alive: A callback which returns if the owner process is alive
/// (according to our ownership model). /// (according to our ownership model).
/// \param gcs_client: A gcs client. /// \param gcs_client: A gcs client.
ClusterTaskManager(const NodeID &self_node_id, ClusterTaskManager(
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler, const NodeID &self_node_id,
TaskDependencyManagerInterface &task_dependency_manager, std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler,
std::function<bool(const WorkerID &, const NodeID &)> is_owner_alive, TaskDependencyManagerInterface &task_dependency_manager,
NodeInfoGetter get_node_info, std::function<bool(const WorkerID &, const NodeID &)> is_owner_alive,
std::function<void(const Task &)> announce_infeasible_task); NodeInfoGetter get_node_info,
std::function<void(const Task &)> announce_infeasible_task,
WorkerPoolInterface &worker_pool,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers);
/// (Step 1) Queue tasks and schedule.
/// Queue task and schedule. This hanppens when processing the worker lease request.
///
/// \param task: The incoming task to be queued and scheduled.
/// \param reply: The reply of the lease request.
/// \param send_reply_callback: The function used during dispatching.
void QueueAndScheduleTask(const Task &task, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Schedule infeasible tasks.
void ScheduleInfeasibleTasks() override;
/// Move tasks from waiting to ready for dispatch. Called when a task's
/// dependencies are resolved.
///
/// \param readyIds: The tasks which are now ready to be dispatched.
void TasksUnblocked(const std::vector<TaskID> &ready_ids) override;
/// Return the finished task and relase the worker resources.
/// This method will be removed and can be replaced by `ReleaseWorkerResources` directly
/// once we remove the legacy scheduler.
///
/// \param worker: The worker which was running the task.
/// \param task: Output parameter.
void TaskFinished(std::shared_ptr<WorkerInterface> worker, Task *task) override;
/// Return worker resources.
/// This method will be removed and can be replaced by `ReleaseWorkerResources` directly
/// once we remove the legacy scheduler.
///
/// \param worker: The worker which was running the task.
void ReturnWorkerResources(std::shared_ptr<WorkerInterface> worker) override;
/// Attempt to cancel an already queued task.
///
/// \param task_id: The id of the task to remove.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(const TaskID &task_id) override;
/// Populate the list of pending or infeasible actor tasks for node stats.
///
/// \param Output parameter.
void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const override;
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending resource usage of raylet to gcs. In particular, this should fill in
/// resource_load and resource_load_by_shape.
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) override;
/// Return if any tasks are pending resource acquisition.
///
/// \param[in] exemplar An example task that is deadlocking.
/// \param[in] num_pending_actor_creation Number of pending actor creation tasks.
/// \param[in] num_pending_tasks Number of pending tasks.
/// \param[in] any_pending True if there's any pending exemplar.
/// \return True if any progress is any tasks are pending.
bool AnyPendingTasks(Task *exemplar, bool *any_pending, int *num_pending_actor_creation,
int *num_pending_tasks) const override;
/// (Step 5) Call once a task finishes (i.e. a worker is returned).
///
/// \param worker: The worker which was running the task.
void ReleaseWorkerResources(std::shared_ptr<WorkerInterface> worker) override;
/// When a task is blocked in ray.get or ray.wait, the worker who is executing the task
/// should give up the CPU resources allocated for the running task for the time being
/// and the worker itself should also be marked as blocked.
///
/// \param worker The worker who will give up the CPU resources.
/// \return true if the cpu resources of the specified worker are released successfully,
/// else false.
bool ReleaseCpuResourcesFromUnblockedWorker(
std::shared_ptr<WorkerInterface> worker) override;
/// When a task is no longer blocked in a ray.get or ray.wait, the CPU resources that
/// the worker gave up should be returned to it.
///
/// \param worker The blocked worker.
/// \return true if the cpu resources are returned back to the specified worker, else
/// false.
bool ReturnCpuResourcesToBlockedWorker(
std::shared_ptr<WorkerInterface> worker) override;
// Schedule and dispatch tasks.
void ScheduleAndDispatchTasks() override;
/// Handle the resource usage updated event of the specified node.
///
/// \param node_id ID of the node which resources are updated.
/// \param resource_data The node resources.
void OnNodeResourceUsageUpdated(const NodeID &node_id,
const rpc::ResourcesData &resource_data) override;
/// Handle the object missing event.
///
/// \param object_id ID of the missing object.
/// \param waiting_task_ids IDs of tasks that are waitting for the specified missing
/// object.
void OnObjectMissing(const ObjectID &object_id,
const std::vector<TaskID> &waiting_task_ids) override;
/// The helper to dump the debug state of the cluster task manater.
std::string DebugStr() const override;
private:
/// (Step 2) For each task in tasks_to_schedule_, pick a node in the system /// (Step 2) For each task in tasks_to_schedule_, pick a node in the system
/// (local or remote) that has enough resources available to run the task, if /// (local or remote) that has enough resources available to run the task, if
/// any such node exist. Skip tasks which are not schedulable. /// any such node exist. Skip tasks which are not schedulable.
@ -72,65 +186,10 @@ class ClusterTaskManager {
/// If there are not enough resources locally, up to one task per resource /// If there are not enough resources locally, up to one task per resource
/// shape (the task at the head of the queue) will get spilled back to a /// shape (the task at the head of the queue) will get spilled back to a
/// different node. /// different node.
///
/// \param worker_pool: The pool of workers which will be dispatched to.
/// `worker_pool` state will be modified (idle workers will be popped) during
/// dispatching.
void DispatchScheduledTasksToWorkers( void DispatchScheduledTasksToWorkers(
WorkerPoolInterface &worker_pool, WorkerPoolInterface &worker_pool,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers); std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers);
/// (Step 1) Queue tasks for scheduling.
/// \param fn: The function used during dispatching.
/// \param task: The incoming task to schedule.
void QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)>);
/// Move tasks from waiting to ready for dispatch. Called when a task's
/// dependencies are resolved.
///
/// \param readyIds: The tasks which are now ready to be dispatched.
void TasksUnblocked(const std::vector<TaskID> ready_ids);
/// (Step 5) Call once a task finishes (i.e. a worker is returned).
///
/// \param worker: The worker which was running the task.
void HandleTaskFinished(std::shared_ptr<WorkerInterface> worker);
/// Attempt to cancel an already queued task.
///
/// \param task_id: The id of the task to remove.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(const TaskID &task_id);
/// Populate the list of pending or infeasible actor tasks for node stats.
///
/// \param Output parameter.
void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const;
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending resource usage of raylet to gcs. In particular, this should fill in
/// resource_load and resource_load_by_shape.
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) const;
/// Return if any tasks are pending resource acquisition.
///
/// \param[in] exemplar An example task that is deadlocking.
/// \param[in] num_pending_actor_creation Number of pending actor creation tasks.
/// \param[in] num_pending_tasks Number of pending tasks.
/// \param[in] any_pending True if there's any pending exemplar.
/// \return True if any progress is any tasks are pending.
bool AnyPendingTasks(Task *exemplar, bool *any_pending, int *num_pending_actor_creation,
int *num_pending_tasks) const;
std::string DebugString() const;
private:
/// Helper method to try dispatching a single task from the queue to an /// Helper method to try dispatching a single task from the queue to an
/// available worker. Returns whether the task should be removed from the /// available worker. Returns whether the task should be removed from the
/// queue and whether the worker was successfully leased to execute the work. /// queue and whether the worker was successfully leased to execute the work.
@ -184,6 +243,11 @@ class ClusterTaskManager {
/// Track the cumulative backlog of all workers requesting a lease to this raylet. /// Track the cumulative backlog of all workers requesting a lease to this raylet.
std::unordered_map<SchedulingClass, int> backlog_tracker_; std::unordered_map<SchedulingClass, int> backlog_tracker_;
/// TODO(Shanly): Remove `worker_pool_` and `leased_workers_` and make them as
/// parameters of methods if necessary once we remove the legacy scheduler.
WorkerPoolInterface &worker_pool_;
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers_;
/// Determine whether a task should be immediately dispatched, /// Determine whether a task should be immediately dispatched,
/// or placed on a wait queue. /// or placed on a wait queue.
/// ///

View file

@ -0,0 +1,140 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/raylet/worker.h"
#include "ray/rpc/server_call.h"
#include "src/ray/protobuf/node_manager.pb.h"
namespace ray {
namespace raylet {
class ClusterTaskManagerInterface {
public:
virtual ~ClusterTaskManagerInterface() = default;
/// Return the resources that were being used by this worker.
virtual void ReleaseWorkerResources(std::shared_ptr<WorkerInterface> worker) = 0;
/// When a task is blocked in ray.get or ray.wait, the worker who is executing the task
/// should give up the CPU resources allocated for the running task for the time being
/// and the worker itself should also be marked as blocked.
///
/// \param worker The worker who will give up the CPU resources.
/// \return true if the cpu resources of the specified worker are released successfully,
/// else false.
virtual bool ReleaseCpuResourcesFromUnblockedWorker(
std::shared_ptr<WorkerInterface> worker) = 0;
/// When a task is no longer blocked in a ray.get or ray.wait, the CPU resources that
/// the worker gave up should be returned to it.
///
/// \param worker The blocked worker.
/// \return true if the cpu resources are returned back to the specified worker, else
/// false.
virtual bool ReturnCpuResourcesToBlockedWorker(
std::shared_ptr<WorkerInterface> worker) = 0;
// Schedule and dispatch tasks.
virtual void ScheduleAndDispatchTasks() = 0;
/// Move tasks from waiting to ready for dispatch. Called when a task's
/// dependencies are resolved.
///
/// \param readyIds: The tasks which are now ready to be dispatched.
virtual void TasksUnblocked(const std::vector<TaskID> &ready_ids) = 0;
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resource_load and resource_load_by_shape.
///
/// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only
/// fields used.
virtual void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) = 0;
/// Populate the list of pending or infeasible actor tasks for node stats.
///
/// \param Output parameter.
virtual void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const = 0;
/// Return the finished task and relase the worker resources.
/// This method will be removed and can be replaced by `ReleaseWorkerResources` directly
/// once we remove the legacy scheduler.
///
/// \param worker: The worker which was running the task.
/// \param task: Output parameter.
virtual void TaskFinished(std::shared_ptr<WorkerInterface> worker, Task *task) = 0;
/// Return worker resources.
/// This method will be removed and can be replaced by `ReleaseWorkerResources` directly
/// once we remove the legacy scheduler
///
/// \param worker: The worker which was running the task.
virtual void ReturnWorkerResources(std::shared_ptr<WorkerInterface> worker) = 0;
/// Attempt to cancel an already queued task.
///
/// \param task_id: The id of the task to remove.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
virtual bool CancelTask(const TaskID &task_id) = 0;
/// Queue task and schedule. This hanppens when processing the worker lease request.
///
/// \param task: The incoming task to be queued and scheduled.
/// \param reply: The reply of the lease request.
/// \param send_reply_callback: The function used during dispatching.
virtual void QueueAndScheduleTask(const Task &task, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;
/// Schedule infeasible tasks.
virtual void ScheduleInfeasibleTasks() = 0;
/// Return if any tasks are pending resource acquisition.
///
/// \param[in] exemplar An example task that is deadlocking.
/// \param[in] num_pending_actor_creation Number of pending actor creation tasks.
/// \param[in] num_pending_tasks Number of pending tasks.
/// \param[in] any_pending True if there's any pending exemplar.
/// \return True if any progress is any tasks are pending.
virtual bool AnyPendingTasks(Task *exemplar, bool *any_pending,
int *num_pending_actor_creation,
int *num_pending_tasks) const = 0;
/// Handle the resource usage updated event of the specified node.
///
/// \param node_id ID of the node which resources are updated.
/// \param resource_data The node resources.
virtual void OnNodeResourceUsageUpdated(const NodeID &node_id,
const rpc::ResourcesData &resource_data) = 0;
/// Handle the object missing event.
///
/// \param object_id ID of the missing object.
/// \param waiting_task_ids IDs of tasks that are waitting for the specified missing
/// object.
virtual void OnObjectMissing(const ObjectID &object_id,
const std::vector<TaskID> &waiting_task_ids) = 0;
/// The helper to dump the debug state of the cluster task manater.
///
/// As the NodeManager inherites from ClusterTaskManager and the
/// `cluster_task_manager_->DebugString()` is invoked inside
/// `NodeManager::DebugString()`, which will leads to infinite loop and cause stack
/// overflow, so we should rename `DebugString` to `DebugStr` to avoid this.
virtual std::string DebugStr() const = 0;
};
} // namespace raylet
} // namespace ray

View file

@ -129,7 +129,8 @@ class ClusterTaskManagerTest : public ::testing::Test {
node_info_calls_++; node_info_calls_++;
return node_info_[node_id]; return node_info_[node_id];
}, },
[this](const Task &task) { announce_infeasible_task_calls_++; }) {} [this](const Task &task) { announce_infeasible_task_calls_++; },
pool_, leased_workers_) {}
void SetUp() {} void SetUp() {}
@ -180,11 +181,12 @@ TEST_F(ClusterTaskManagerTest, BasicTest) {
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
bool callback_occurred = false; bool callback_occurred = false;
bool *callback_occurred_ptr = &callback_occurred; bool *callback_occurred_ptr = &callback_occurred;
auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; }; auto callback = [callback_occurred_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_occurred_ptr = true;
};
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_occurred); ASSERT_FALSE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -194,7 +196,7 @@ TEST_F(ClusterTaskManagerTest, BasicTest) {
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234); std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); task_manager_.ScheduleAndDispatchTasks();
ASSERT_TRUE(callback_occurred); ASSERT_TRUE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(leased_workers_.size(), 1);
@ -214,11 +216,12 @@ TEST_F(ClusterTaskManagerTest, NoFeasibleNodeTest) {
bool callback_called = false; bool callback_called = false;
bool *callback_called_ptr = &callback_called; bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; auto callback = [callback_called_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_called_ptr = true;
};
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_called); ASSERT_FALSE(callback_called);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -243,7 +246,8 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
int num_callbacks = 0; int num_callbacks = 0;
int *num_callbacks_ptr = &num_callbacks; int *num_callbacks_ptr = &num_callbacks;
auto callback = [num_callbacks_ptr]() { auto callback = [num_callbacks_ptr](Status, std::function<void()>,
std::function<void()>) {
(*num_callbacks_ptr) = *num_callbacks_ptr + 1; (*num_callbacks_ptr) = *num_callbacks_ptr + 1;
}; };
@ -252,9 +256,7 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1); auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1);
std::unordered_set<TaskID> expected_subscribed_tasks = { std::unordered_set<TaskID> expected_subscribed_tasks = {
task.GetTaskSpecification().TaskId()}; task.GetTaskSpecification().TaskId()};
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(num_callbacks, 0);
@ -263,9 +265,7 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
/* This task can run */ /* This task can run */
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}); auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}});
task_manager_.QueueTask(task2, &reply, callback); task_manager_.QueueAndScheduleTask(task2, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(num_callbacks, 1);
@ -277,8 +277,6 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
auto id = task.GetTaskSpecification().TaskId(); auto id = task.GetTaskSpecification().TaskId();
std::vector<TaskID> unblocked = {id}; std::vector<TaskID> unblocked = {id};
task_manager_.TasksUnblocked(unblocked); task_manager_.TasksUnblocked(unblocked);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(num_callbacks, 1);
@ -287,10 +285,9 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
/* Second task finishes, making space for the original task */ /* Second task finishes, making space for the original task */
leased_workers_.clear(); leased_workers_.clear();
task_manager_.HandleTaskFinished(worker); task_manager_.ReleaseWorkerResources(worker);
task_manager_.SchedulePendingTasks(); task_manager_.ScheduleAndDispatchTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_TRUE(dependency_manager_.subscribed_tasks.empty()); ASSERT_TRUE(dependency_manager_.subscribed_tasks.empty());
// Task2 is now done so task can run. // Task2 is now done so task can run.
@ -313,14 +310,14 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
AddNode(remote_node_id, 5); AddNode(remote_node_id, 5);
int num_callbacks = 0; int num_callbacks = 0;
auto callback = [&]() { num_callbacks++; }; auto callback = [&](Status, std::function<void()>, std::function<void()>) {
num_callbacks++;
};
/* Blocked on starting a worker. */ /* Blocked on starting a worker. */
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}); auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}});
rpc::RequestWorkerLeaseReply local_reply; rpc::RequestWorkerLeaseReply local_reply;
task_manager_.QueueTask(task, &local_reply, callback); task_manager_.QueueAndScheduleTask(task, &local_reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(num_callbacks, 0);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -328,9 +325,7 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
/* This task can run but not at the same time as the first */ /* This task can run but not at the same time as the first */
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}); auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}});
rpc::RequestWorkerLeaseReply spillback_reply; rpc::RequestWorkerLeaseReply spillback_reply;
task_manager_.QueueTask(task2, &spillback_reply, callback); task_manager_.QueueAndScheduleTask(task2, &spillback_reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(num_callbacks, 0);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -339,7 +334,7 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
// longer available for the second. // longer available for the second.
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); task_manager_.ScheduleAndDispatchTasks();
// Check that both tasks got removed from the queue. // Check that both tasks got removed from the queue.
ASSERT_EQ(num_callbacks, 2); ASSERT_EQ(num_callbacks, 2);
// The first task was dispatched. // The first task was dispatched.
@ -353,47 +348,33 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { TEST_F(ClusterTaskManagerTest, TaskCancellationTest) {
std::shared_ptr<MockWorker> worker = std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234); std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
Task task = CreateTask({{ray::kCPU_ResourceLabel, 1}}); Task task = CreateTask({{ray::kCPU_ResourceLabel, 1}});
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
bool callback_called = false; bool callback_called = false;
bool *callback_called_ptr = &callback_called; bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; auto callback = [callback_called_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_called_ptr = true;
};
// Task not queued so we can't cancel it. // Task not queued so we can't cancel it.
ASSERT_FALSE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); ASSERT_FALSE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId()));
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
// Task is now queued so cancellation works. // Task is now in dispatch queue.
callback_called = false; callback_called = false;
reply.Clear(); reply.Clear();
ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId()));
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); task_manager_.ScheduleAndDispatchTasks();
// Task will not execute. // Task will not execute.
ASSERT_TRUE(callback_called); ASSERT_TRUE(callback_called);
ASSERT_TRUE(reply.canceled()); ASSERT_TRUE(reply.canceled());
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
ASSERT_EQ(pool_.workers.size(), 1);
task_manager_.QueueTask(task, &reply, callback); pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.SchedulePendingTasks(); task_manager_.QueueAndScheduleTask(task, &reply, callback);
// We can still cancel the task if it's on the dispatch queue.
callback_called = false;
reply.Clear();
ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId()));
// Task will not execute.
ASSERT_TRUE(reply.canceled());
ASSERT_TRUE(callback_called);
ASSERT_EQ(leased_workers_.size(), 0);
ASSERT_EQ(pool_.workers.size(), 1);
task_manager_.QueueTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
// Task is now running so we can't cancel it. // Task is now running so we can't cancel it.
callback_called = false; callback_called = false;
@ -418,14 +399,16 @@ TEST_F(ClusterTaskManagerTest, TaskCancelInfeasibleTask) {
bool callback_called = false; bool callback_called = false;
bool *callback_called_ptr = &callback_called; bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; auto callback = [callback_called_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_called_ptr = true;
};
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
// Task is now queued so cancellation works. // Task is now queued so cancellation works.
ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId()));
task_manager_.SchedulePendingTasks(); task_manager_.ScheduleAndDispatchTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
// Task will not execute. // Task will not execute.
ASSERT_TRUE(callback_called); ASSERT_TRUE(callback_called);
ASSERT_TRUE(reply.canceled()); ASSERT_TRUE(reply.canceled());
@ -436,8 +419,7 @@ TEST_F(ClusterTaskManagerTest, TaskCancelInfeasibleTask) {
// cancelled. // cancelled.
auto remote_node_id = NodeID::FromRandom(); auto remote_node_id = NodeID::FromRandom();
AddNode(remote_node_id, 12); AddNode(remote_node_id, 12);
task_manager_.SchedulePendingTasks(); task_manager_.ScheduleAndDispatchTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_TRUE(callback_called); ASSERT_TRUE(callback_called);
ASSERT_TRUE(reply.canceled()); ASSERT_TRUE(reply.canceled());
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -456,11 +438,12 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
bool callback_called = false; bool callback_called = false;
bool *callback_called_ptr = &callback_called; bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; auto callback = [callback_called_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_called_ptr = true;
};
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_TRUE(callback_called); ASSERT_TRUE(callback_called);
// Now {CPU: 7, GPU: 4, MEM:128} // Now {CPU: 7, GPU: 4, MEM:128}
} }
@ -471,11 +454,12 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
bool callback_called = false; bool callback_called = false;
bool *callback_called_ptr = &callback_called; bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; auto callback = [callback_called_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_called_ptr = true;
};
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_called); // No worker available. ASSERT_FALSE(callback_called); // No worker available.
// Now {CPU: 7, GPU: 4, MEM:128} with 1 queued task. // Now {CPU: 7, GPU: 4, MEM:128} with 1 queued task.
} }
@ -486,11 +470,12 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
bool callback_called = false; bool callback_called = false;
bool *callback_called_ptr = &callback_called; bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; auto callback = [callback_called_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_called_ptr = true;
};
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_called); // Infeasible. ASSERT_FALSE(callback_called); // Infeasible.
// Now there is also an infeasible task {CPU: 9}. // Now there is also an infeasible task {CPU: 9}.
} }
@ -501,11 +486,12 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
bool callback_called = false; bool callback_called = false;
bool *callback_called_ptr = &callback_called; bool *callback_called_ptr = &callback_called;
auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; auto callback = [callback_called_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_called_ptr = true;
};
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_called); // Infeasible. ASSERT_FALSE(callback_called); // Infeasible.
// Now there is also an infeasible task {CPU: 10}. // Now there is also an infeasible task {CPU: 10}.
} }
@ -563,7 +549,10 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
bool callback_occurred = false; bool callback_occurred = false;
bool *callback_occurred_ptr = &callback_occurred; bool *callback_occurred_ptr = &callback_occurred;
auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; }; auto callback = [callback_occurred_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_occurred_ptr = true;
};
std::shared_ptr<MockWorker> worker = std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234); std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
@ -574,11 +563,9 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
Task task = CreateTask({{ray::kCPU_ResourceLabel, 100}}); Task task = CreateTask({{ray::kCPU_ResourceLabel, 100}});
task.SetBacklogSize(i); task.SetBacklogSize(i);
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
to_cancel.push_back(task.GetTaskSpecification().TaskId()); to_cancel.push_back(task.GetTaskSpecification().TaskId());
} }
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_occurred); ASSERT_FALSE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -620,24 +607,24 @@ TEST_F(ClusterTaskManagerTest, OwnerDeadTest) {
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
bool callback_occurred = false; bool callback_occurred = false;
bool *callback_occurred_ptr = &callback_occurred; bool *callback_occurred_ptr = &callback_occurred;
auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; }; auto callback = [callback_occurred_ptr](Status, std::function<void()>,
std::function<void()>) {
*callback_occurred_ptr = true;
};
std::shared_ptr<MockWorker> worker = std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234); std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.QueueTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
is_owner_alive_ = false; is_owner_alive_ = false;
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); task_manager_.QueueAndScheduleTask(task, &reply, callback);
ASSERT_FALSE(callback_occurred); ASSERT_FALSE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
ASSERT_EQ(pool_.workers.size(), 1); ASSERT_EQ(pool_.workers.size(), 1);
is_owner_alive_ = true; is_owner_alive_ = true;
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); task_manager_.ScheduleAndDispatchTasks();
ASSERT_FALSE(callback_occurred); ASSERT_FALSE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -653,20 +640,21 @@ TEST_F(ClusterTaskManagerTest, TestInfeasibleTaskWarning) {
Task task = CreateTask({{ray::kCPU_ResourceLabel, 12}}); Task task = CreateTask({{ray::kCPU_ResourceLabel, 12}});
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
std::shared_ptr<bool> callback_occurred = std::make_shared<bool>(false); std::shared_ptr<bool> callback_occurred = std::make_shared<bool>(false);
auto callback = [callback_occurred]() { *callback_occurred = true; }; auto callback = [callback_occurred](Status, std::function<void()>,
task_manager_.QueueTask(task, &reply, callback); std::function<void()>) {
task_manager_.SchedulePendingTasks(); *callback_occurred = true;
};
task_manager_.QueueAndScheduleTask(task, &reply, callback);
ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_EQ(announce_infeasible_task_calls_, 1);
// Infeasible warning shouldn't be reprinted when the previous task is still infeasible // Infeasible warning shouldn't be reprinted when the previous task is still infeasible
// after adding a new node. // after adding a new node.
AddNode(NodeID::FromRandom(), 8); AddNode(NodeID::FromRandom(), 8);
task_manager_.SchedulePendingTasks();
std::shared_ptr<MockWorker> worker = std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234); std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.ScheduleAndDispatchTasks();
// Task shouldn't be scheduled yet. // Task shouldn't be scheduled yet.
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_EQ(announce_infeasible_task_calls_, 1);
ASSERT_FALSE(*callback_occurred); ASSERT_FALSE(*callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -676,8 +664,7 @@ TEST_F(ClusterTaskManagerTest, TestInfeasibleTaskWarning) {
// task is spillbacked properly. // task is spillbacked properly.
auto remote_node_id = NodeID::FromRandom(); auto remote_node_id = NodeID::FromRandom();
AddNode(remote_node_id, 12); AddNode(remote_node_id, 12);
task_manager_.SchedulePendingTasks(); task_manager_.ScheduleAndDispatchTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
// Make sure nothing happens locally. // Make sure nothing happens locally.
ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_EQ(announce_infeasible_task_calls_, 1);
ASSERT_TRUE(*callback_occurred); ASSERT_TRUE(*callback_occurred);
@ -697,18 +684,22 @@ TEST_F(ClusterTaskManagerTest, TestMultipleInfeasibleTasksWarnOnce) {
Task task = CreateTask({{ray::kCPU_ResourceLabel, 12}}); Task task = CreateTask({{ray::kCPU_ResourceLabel, 12}});
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
std::shared_ptr<bool> callback_occurred = std::make_shared<bool>(false); std::shared_ptr<bool> callback_occurred = std::make_shared<bool>(false);
auto callback = [callback_occurred]() { *callback_occurred = true; }; auto callback = [callback_occurred](Status, std::function<void()>,
task_manager_.QueueTask(task, &reply, callback); std::function<void()>) {
task_manager_.SchedulePendingTasks(); *callback_occurred = true;
};
task_manager_.QueueAndScheduleTask(task, &reply, callback);
ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_EQ(announce_infeasible_task_calls_, 1);
// Make sure the same shape infeasible task won't be announced. // Make sure the same shape infeasible task won't be announced.
Task task2 = CreateTask({{ray::kCPU_ResourceLabel, 12}}); Task task2 = CreateTask({{ray::kCPU_ResourceLabel, 12}});
rpc::RequestWorkerLeaseReply reply2; rpc::RequestWorkerLeaseReply reply2;
std::shared_ptr<bool> callback_occurred2 = std::make_shared<bool>(false); std::shared_ptr<bool> callback_occurred2 = std::make_shared<bool>(false);
auto callback2 = [callback_occurred2]() { *callback_occurred2 = true; }; auto callback2 = [callback_occurred2](Status, std::function<void()>,
task_manager_.QueueTask(task2, &reply2, callback2); std::function<void()>) {
task_manager_.SchedulePendingTasks(); *callback_occurred2 = true;
};
task_manager_.QueueAndScheduleTask(task2, &reply2, callback2);
ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_EQ(announce_infeasible_task_calls_, 1);
} }
@ -716,18 +707,19 @@ TEST_F(ClusterTaskManagerTest, TestAnyPendingTasks) {
/* /*
Check if the manager can correctly identify pending tasks. Check if the manager can correctly identify pending tasks.
*/ */
std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
// task1: running // task1: running
Task task = CreateTask({{ray::kCPU_ResourceLabel, 6}}); Task task = CreateTask({{ray::kCPU_ResourceLabel, 6}});
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
std::shared_ptr<bool> callback_occurred = std::make_shared<bool>(false); std::shared_ptr<bool> callback_occurred = std::make_shared<bool>(false);
auto callback = [callback_occurred]() { *callback_occurred = true; }; auto callback = [callback_occurred](Status, std::function<void()>,
task_manager_.QueueTask(task, &reply, callback); std::function<void()>) {
task_manager_.SchedulePendingTasks(); *callback_occurred = true;
std::shared_ptr<MockWorker> worker = };
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234); task_manager_.QueueAndScheduleTask(task, &reply, callback);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_TRUE(*callback_occurred); ASSERT_TRUE(*callback_occurred);
ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(leased_workers_.size(), 1);
ASSERT_EQ(pool_.workers.size(), 0); ASSERT_EQ(pool_.workers.size(), 0);
@ -744,9 +736,11 @@ TEST_F(ClusterTaskManagerTest, TestAnyPendingTasks) {
Task task2 = CreateTask({{ray::kCPU_ResourceLabel, 6}}); Task task2 = CreateTask({{ray::kCPU_ResourceLabel, 6}});
rpc::RequestWorkerLeaseReply reply2; rpc::RequestWorkerLeaseReply reply2;
std::shared_ptr<bool> callback_occurred2 = std::make_shared<bool>(false); std::shared_ptr<bool> callback_occurred2 = std::make_shared<bool>(false);
auto callback2 = [callback_occurred2]() { *callback_occurred2 = true; }; auto callback2 = [callback_occurred2](Status, std::function<void()>,
task_manager_.QueueTask(task2, &reply2, callback2); std::function<void()>) {
task_manager_.SchedulePendingTasks(); *callback_occurred2 = true;
};
task_manager_.QueueAndScheduleTask(task2, &reply2, callback2);
ASSERT_FALSE(*callback_occurred2); ASSERT_FALSE(*callback_occurred2);
ASSERT_TRUE(task_manager_.AnyPendingTasks(&exemplar, &any_pending, ASSERT_TRUE(task_manager_.AnyPendingTasks(&exemplar, &any_pending,
&pending_actor_creations, &pending_tasks)); &pending_actor_creations, &pending_tasks));
@ -764,7 +758,8 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) {
rpc::RequestWorkerLeaseReply reply; rpc::RequestWorkerLeaseReply reply;
int num_callbacks = 0; int num_callbacks = 0;
int *num_callbacks_ptr = &num_callbacks; int *num_callbacks_ptr = &num_callbacks;
auto callback = [num_callbacks_ptr]() { auto callback = [num_callbacks_ptr](Status, std::function<void()>,
std::function<void()>) {
(*num_callbacks_ptr) = *num_callbacks_ptr + 1; (*num_callbacks_ptr) = *num_callbacks_ptr + 1;
}; };
@ -773,9 +768,7 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) {
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 2); auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 2);
std::unordered_set<TaskID> expected_subscribed_tasks = { std::unordered_set<TaskID> expected_subscribed_tasks = {
task.GetTaskSpecification().TaskId()}; task.GetTaskSpecification().TaskId()};
task_manager_.QueueTask(task, &reply, callback); task_manager_.QueueAndScheduleTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(num_callbacks, 0);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -785,8 +778,6 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) {
pool_.workers.clear(); pool_.workers.clear();
auto id = task.GetTaskSpecification().TaskId(); auto id = task.GetTaskSpecification().TaskId();
task_manager_.TasksUnblocked({id}); task_manager_.TasksUnblocked({id});
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(num_callbacks, 0);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
@ -794,22 +785,72 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) {
/* Task argument gets evicted */ /* Task argument gets evicted */
dependency_manager_.task_ready_ = false; dependency_manager_.task_ready_ = false;
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker)); pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.SchedulePendingTasks(); task_manager_.ScheduleAndDispatchTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(num_callbacks, 0);
ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(leased_workers_.size(), 0);
/* Worker available and arguments available */ /* Worker available and arguments available */
task_manager_.TasksUnblocked({id});
dependency_manager_.task_ready_ = true; dependency_manager_.task_ready_ = true;
task_manager_.SchedulePendingTasks(); task_manager_.TasksUnblocked({id});
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(num_callbacks, 1);
ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(leased_workers_.size(), 1);
AssertNoLeaks(); AssertNoLeaks();
} }
TEST_F(ClusterTaskManagerTest, RleaseAndReturnWorkerCpuResources) {
const NodeResources &node_resources = scheduler_->GetLocalNodeResources();
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8);
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 4);
auto worker = std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
// Check failed as the worker has no allocated resource instances.
ASSERT_FALSE(task_manager_.ReleaseCpuResourcesFromUnblockedWorker(worker));
auto node_resource_instances = scheduler_->GetLocalResources();
auto available_resource_instances =
node_resource_instances.GetAvailableResourceInstances();
auto allocated_instances = std::make_shared<TaskResourceInstances>();
const std::unordered_map<std::string, double> task_spec = {{"CPU", 1.}, {"GPU", 1.}};
ASSERT_TRUE(scheduler_->AllocateLocalTaskResources(task_spec, allocated_instances));
worker->SetAllocatedInstances(allocated_instances);
// Check that the resoruces are allocated successfully.
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 7);
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3);
// Check that the cpu resources are released successfully.
ASSERT_TRUE(task_manager_.ReleaseCpuResourcesFromUnblockedWorker(worker));
// Check that only cpu resources are released.
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8);
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3);
// Mark worker as blocked.
worker->MarkBlocked();
// Check failed as the worker is blocked.
ASSERT_FALSE(task_manager_.ReleaseCpuResourcesFromUnblockedWorker(worker));
// Check nothing will be changed.
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8);
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3);
// Check that the cpu resources are returned back to worker successfully.
ASSERT_TRUE(task_manager_.ReturnCpuResourcesToBlockedWorker(worker));
// Check that only cpu resources are returned back to the worker.
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 7);
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3);
// Mark worker as unblocked.
worker->MarkUnblocked();
ASSERT_FALSE(task_manager_.ReturnCpuResourcesToBlockedWorker(worker));
// Check nothing will be changed.
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 7);
ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3);
}
int main(int argc, char **argv) { int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();

View file

@ -0,0 +1,122 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/raylet/scheduling/old_cluster_resource_scheduler.h"
#include "ray/common/grpc_util.h"
#include "ray/common/ray_config.h"
namespace ray {
OldClusterResourceScheduler::OldClusterResourceScheduler(
const NodeID &self_node_id, ResourceIdSet &local_available_resources,
std::unordered_map<NodeID, SchedulingResources> &cluster_resource_map,
std::shared_ptr<SchedulingResources> last_heartbeat_resources)
: self_node_id_string_(self_node_id.Binary()),
local_available_resources_(local_available_resources),
cluster_resource_map_(cluster_resource_map),
last_heartbeat_resources_(last_heartbeat_resources) {}
bool OldClusterResourceScheduler::RemoveNode(const std::string &node_id_string) {
return cluster_resource_map_.erase(NodeID::FromBinary(node_id_string)) != 0;
}
void OldClusterResourceScheduler::UpdateResourceCapacity(
const std::string &node_id_string, const std::string &resource_name,
double resource_total) {
if (node_id_string == self_node_id_string_) {
local_available_resources_.AddOrUpdateResource(resource_name, resource_total);
}
auto iter = cluster_resource_map_.find(NodeID::FromBinary(node_id_string));
if (iter != cluster_resource_map_.end()) {
auto &scheduling_resources = iter->second;
scheduling_resources.UpdateResourceCapacity(resource_name, resource_total);
}
}
void OldClusterResourceScheduler::DeleteResource(const std::string &node_id_string,
const std::string &resource_name) {
if (node_id_string == self_node_id_string_) {
local_available_resources_.DeleteResource(resource_name);
}
auto iter = cluster_resource_map_.find(NodeID::FromBinary(node_id_string));
if (iter != cluster_resource_map_.end()) {
auto &scheduling_resources = iter->second;
scheduling_resources.DeleteResource(resource_name);
}
}
void OldClusterResourceScheduler::FillResourceUsage(
std::shared_ptr<rpc::ResourcesData> resources_data) {
// TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet
// directly.
// TODO(atumanov): implement a ResourceSet const_iterator.
// If light resource usage report enabled, we only set filed that represent resources
// changed.
auto &local_resources = cluster_resource_map_[NodeID::FromBinary(self_node_id_string_)];
if (!last_heartbeat_resources_->GetTotalResources().IsEqual(
local_resources.GetTotalResources())) {
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_->SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
}
if (!last_heartbeat_resources_->GetAvailableResources().IsEqual(
local_resources.GetAvailableResources())) {
resources_data->set_resources_available_changed(true);
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_->SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
}
}
bool OldClusterResourceScheduler::UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data) {
NodeID node_id = NodeID::FromBinary(node_id_string);
auto iter = cluster_resource_map_.find(node_id);
if (iter == cluster_resource_map_.end()) {
return false;
}
SchedulingResources &remote_resources = iter->second;
if (resource_data.resources_total_size() > 0) {
ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total()));
remote_resources.SetTotalResources(std::move(remote_total));
}
if (resource_data.resources_available_changed()) {
ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available()));
remote_resources.SetAvailableResources(std::move(remote_available));
}
if (resource_data.resource_load_changed()) {
ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load()));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));
}
return true;
}
std::string OldClusterResourceScheduler::GetLocalResourceViewString() const {
SchedulingResources &local_resources =
cluster_resource_map_[NodeID::FromBinary(self_node_id_string_)];
return local_resources.GetAvailableResources().ToString();
}
} // namespace ray

View file

@ -0,0 +1,72 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/common/task/scheduling_resources.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h"
namespace ray {
class OldClusterResourceScheduler : public ClusterResourceSchedulerInterface {
public:
explicit OldClusterResourceScheduler(
const NodeID &self_node_id, ResourceIdSet &local_available_resources,
std::unordered_map<NodeID, SchedulingResources> &cluster_resource_map,
std::shared_ptr<SchedulingResources> last_heartbeat_resources);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///
/// \param node_id_string ID of the node to be removed.
bool RemoveNode(const std::string &node_id_string) override;
/// Update node resources. This hanppens when a node resource usage udpated.
///
/// \param node_id_string ID of the node which resoruces need to be udpated.
/// \param resource_data The node resource data.
bool UpdateNode(const std::string &node_id_string,
const rpc::ResourcesData &resource_data) override;
/// \param node_name: Node whose resource we want to update.
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void UpdateResourceCapacity(const std::string &node_id_string,
const std::string &resource_name,
double resource_total) override;
/// Delete a given resource from a given node.
///
/// \param node_name: Node whose resource we want to delete.
/// \param resource_name: Resource we want to delete
void DeleteResource(const std::string &node_id_string,
const std::string &resource_name) override;
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resources_available and resources_total.
///
/// \param Output parameter. `resources_available` and `resources_total` are the only
/// fields used.
void FillResourceUsage(std::shared_ptr<rpc::ResourcesData> data) override;
/// Return local resources in human-readable string form.
std::string GetLocalResourceViewString() const override;
private:
std::string self_node_id_string_;
ResourceIdSet &local_available_resources_;
std::unordered_map<NodeID, SchedulingResources> &cluster_resource_map_;
std::shared_ptr<SchedulingResources> last_heartbeat_resources_;
};
} // namespace ray

View file

@ -59,12 +59,9 @@ class MockWorker : public WorkerInterface {
RAY_CHECK(false) << "Method unused"; RAY_CHECK(false) << "Method unused";
return false; return false;
} }
void MarkBlocked() { RAY_CHECK(false) << "Method unused"; } void MarkBlocked() { blocked_ = true; }
void MarkUnblocked() { RAY_CHECK(false) << "Method unused"; } void MarkUnblocked() { blocked_ = false; }
bool IsBlocked() const { bool IsBlocked() const { return blocked_; }
RAY_CHECK(false) << "Method unused";
return false;
}
Process GetProcess() const { return Process::CreateNewDummy(); } Process GetProcess() const { return Process::CreateNewDummy(); }
void SetProcess(Process proc) { RAY_CHECK(false) << "Method unused"; } void SetProcess(Process proc) { RAY_CHECK(false) << "Method unused"; }
@ -190,6 +187,7 @@ class MockWorker : public WorkerInterface {
std::vector<double> borrowed_cpu_instances_; std::vector<double> borrowed_cpu_instances_;
bool is_detached_actor_; bool is_detached_actor_;
BundleID bundle_id_; BundleID bundle_id_;
bool blocked_ = false;
}; };
} // namespace raylet } // namespace raylet