[New Scheduler] Spillback from the queue of tasks assigned to the local node (#12084)

This commit is contained in:
Stephanie Wang 2020-11-17 19:13:59 -05:00 committed by GitHub
parent b5dfdb2a21
commit f6bdd5ab17
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 170 additions and 63 deletions

View file

@ -91,7 +91,6 @@ def test_actor_load_balancing(ray_start_cluster):
ray.get(results) ray.get(results)
@pytest.mark.skipif(new_scheduler_enabled(), reason="multi node broken")
def test_actor_lifetime_load_balancing(ray_start_cluster): def test_actor_lifetime_load_balancing(ray_start_cluster):
cluster = ray_start_cluster cluster = ray_start_cluster
cluster.add_node(num_cpus=0) cluster.add_node(num_cpus=0)

View file

@ -910,6 +910,8 @@ void NodeManager::HeartbeatAdded(const NodeID &client_id,
new_resource_scheduler_->AddOrUpdateNode( new_resource_scheduler_->AddOrUpdateNode(
client_id.Binary(), remote_resources.GetTotalResources().GetResourceMap(), client_id.Binary(), remote_resources.GetTotalResources().GetResourceMap(),
remote_resources.GetAvailableResources().GetResourceMap()); remote_resources.GetAvailableResources().GetResourceMap());
// TODO(swang): We could probably call this once per batch instead of once
// per node in the batch.
ScheduleAndDispatch(); ScheduleAndDispatch();
return; return;
} }

View file

@ -786,7 +786,7 @@ std::string ClusterResourceScheduler::GetResourceNameFromIndex(int64_t res_idx)
} }
void ClusterResourceScheduler::AllocateRemoteTaskResources( void ClusterResourceScheduler::AllocateRemoteTaskResources(
std::string &node_string, const std::string &node_string,
const std::unordered_map<std::string, double> &task_resources) { const std::unordered_map<std::string, double> &task_resources) {
TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources); TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources);
auto node_id = string_to_int_map_.Insert(node_string); auto node_id = string_to_int_map_.Insert(node_string);

View file

@ -361,7 +361,7 @@ class ClusterResourceScheduler {
/// \param node_id Remote node whose resources we allocate. /// \param node_id Remote node whose resources we allocate.
/// \param task_req Task for which we allocate resources. /// \param task_req Task for which we allocate resources.
void AllocateRemoteTaskResources( void AllocateRemoteTaskResources(
std::string &node_id, const std::string &node_id,
const std::unordered_map<std::string, double> &task_resources); const std::unordered_map<std::string, double> &task_resources);
void FreeLocalTaskResources(std::shared_ptr<TaskResourceInstances> task_allocation); void FreeLocalTaskResources(std::shared_ptr<TaskResourceInstances> task_allocation);

View file

@ -18,7 +18,6 @@ ClusterTaskManager::ClusterTaskManager(
bool ClusterTaskManager::SchedulePendingTasks() { bool ClusterTaskManager::SchedulePendingTasks() {
bool did_schedule = false; bool did_schedule = false;
for (auto shapes_it = tasks_to_schedule_.begin(); for (auto shapes_it = tasks_to_schedule_.begin();
shapes_it != tasks_to_schedule_.end();) { shapes_it != tasks_to_schedule_.end();) {
auto &work_queue = shapes_it->second; auto &work_queue = shapes_it->second;
@ -28,8 +27,10 @@ bool ClusterTaskManager::SchedulePendingTasks() {
// blocking where a task which cannot be scheduled because // blocking where a task which cannot be scheduled because
// there are not enough available resources blocks other // there are not enough available resources blocks other
// tasks from being scheduled. // tasks from being scheduled.
Work work = *work_it; const Work &work = *work_it;
Task task = std::get<0>(work); Task task = std::get<0>(work);
RAY_LOG(DEBUG) << "Scheduling pending task "
<< task.GetTaskSpecification().TaskId();
auto placement_resources = auto placement_resources =
task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(); task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap();
int64_t _unused; int64_t _unused;
@ -50,20 +51,8 @@ bool ClusterTaskManager::SchedulePendingTasks() {
did_schedule = task_scheduled || did_schedule; did_schedule = task_scheduled || did_schedule;
} else { } else {
// Should spill over to a different node. // Should spill over to a different node.
cluster_resource_scheduler_->AllocateRemoteTaskResources(
node_id_string,
task.GetTaskSpecification().GetRequiredResources().GetResourceMap());
NodeID node_id = NodeID::FromBinary(node_id_string); NodeID node_id = NodeID::FromBinary(node_id_string);
auto node_info_opt = get_node_info_(node_id); Spillback(node_id, work);
// gcs_client_->Nodes().Get(node_id);
RAY_CHECK(node_info_opt)
<< "Spilling back to a node manager, but no GCS info found for node "
<< node_id;
auto reply = std::get<1>(work);
auto callback = std::get<2>(work);
Spillback(node_id, node_info_opt->node_manager_address(),
node_info_opt->node_manager_port(), reply, callback);
} }
work_it = work_queue.erase(work_it); work_it = work_queue.erase(work_it);
} }
@ -85,6 +74,8 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(Work work) {
if (object_ids.size() > 0) { if (object_ids.size() > 0) {
bool args_ready = fulfills_dependencies_func_(task); bool args_ready = fulfills_dependencies_func_(task);
if (args_ready) { if (args_ready) {
RAY_LOG(DEBUG) << "Args already ready, task can be dispatched "
<< task.GetTaskSpecification().TaskId();
tasks_to_dispatch_[scheduling_key].push_back(work); tasks_to_dispatch_[scheduling_key].push_back(work);
} else { } else {
can_dispatch = false; can_dispatch = false;
@ -92,6 +83,8 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(Work work) {
waiting_tasks_[task_id] = work; waiting_tasks_[task_id] = work;
} }
} else { } else {
RAY_LOG(DEBUG) << "No args, task can be dispatched "
<< task.GetTaskSpecification().TaskId();
tasks_to_dispatch_[scheduling_key].push_back(work); tasks_to_dispatch_[scheduling_key].push_back(work);
} }
return can_dispatch; return can_dispatch;
@ -109,9 +102,7 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
shapes_it != tasks_to_dispatch_.end();) { shapes_it != tasks_to_dispatch_.end();) {
auto &dispatch_queue = shapes_it->second; auto &dispatch_queue = shapes_it->second;
for (auto work_it = dispatch_queue.begin(); work_it != dispatch_queue.end();) { for (auto work_it = dispatch_queue.begin(); work_it != dispatch_queue.end();) {
auto work = *work_it; auto spec = std::get<0>(*work_it).GetTaskSpecification();
auto task = std::get<0>(work);
auto spec = task.GetTaskSpecification();
std::shared_ptr<WorkerInterface> worker = worker_pool.PopWorker(spec); std::shared_ptr<WorkerInterface> worker = worker_pool.PopWorker(spec);
if (!worker) { if (!worker) {
@ -119,33 +110,21 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
return; return;
} }
std::shared_ptr<TaskResourceInstances> allocated_instances( bool worker_leased;
new TaskResourceInstances()); bool remove = AttemptDispatchWork(*work_it, worker, &worker_leased);
bool schedulable = cluster_resource_scheduler_->AllocateLocalTaskResources( if (worker_leased) {
spec.GetRequiredResources().GetResourceMap(), allocated_instances); auto reply = std::get<1>(*work_it);
if (!schedulable) { auto callback = std::get<2>(*work_it);
// Not enough resources to schedule this task. Dispatch(worker, leased_workers, spec, reply, callback);
} else {
worker_pool.PushWorker(worker); worker_pool.PushWorker(worker);
// All the tasks in this queue are the same, so move on to the next queue.
break;
} }
auto reply = std::get<1>(work); if (remove) {
auto callback = std::get<2>(work); work_it = dispatch_queue.erase(work_it);
worker->SetOwnerAddress(spec.CallerAddress());
if (spec.IsActorCreationTask()) {
// The actor belongs to this worker now.
worker->SetLifetimeAllocatedInstances(allocated_instances);
} else { } else {
worker->SetAllocatedInstances(allocated_instances); break;
} }
worker->AssignTaskId(spec.TaskId());
if (!RayConfig::instance().enable_multi_tenancy()) {
worker->AssignJobId(spec.JobId());
}
worker->SetAssignedTask(task);
Dispatch(worker, leased_workers, spec, reply, callback);
work_it = dispatch_queue.erase(work_it);
} }
if (dispatch_queue.empty()) { if (dispatch_queue.empty()) {
shapes_it = tasks_to_dispatch_.erase(shapes_it); shapes_it = tasks_to_dispatch_.erase(shapes_it);
@ -155,8 +134,52 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
} }
} }
bool ClusterTaskManager::AttemptDispatchWork(const Work &work,
std::shared_ptr<WorkerInterface> &worker,
bool *worker_leased) {
const auto &task = std::get<0>(work);
const auto &spec = task.GetTaskSpecification();
RAY_LOG(DEBUG) << "Attempting to dispatch task " << spec.TaskId();
std::shared_ptr<TaskResourceInstances> allocated_instances(new TaskResourceInstances());
bool schedulable = cluster_resource_scheduler_->AllocateLocalTaskResources(
spec.GetRequiredResources().GetResourceMap(), allocated_instances);
bool dispatched = false;
if (!schedulable) {
*worker_leased = false;
// Spill at most one task from this queue, then move on to the next
// queue.
int64_t _unused;
auto placement_resources = spec.GetRequiredPlacementResources().GetResourceMap();
std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode(
placement_resources, spec.IsActorCreationTask(), &_unused);
if (node_id_string != self_node_id_.Binary() && !node_id_string.empty()) {
NodeID node_id = NodeID::FromBinary(node_id_string);
Spillback(node_id, work);
dispatched = true;
}
} else {
worker->SetOwnerAddress(spec.CallerAddress());
if (spec.IsActorCreationTask()) {
// The actor belongs to this worker now.
worker->SetLifetimeAllocatedInstances(allocated_instances);
} else {
worker->SetAllocatedInstances(allocated_instances);
}
worker->AssignTaskId(spec.TaskId());
if (!RayConfig::instance().enable_multi_tenancy()) {
worker->AssignJobId(spec.JobId());
}
worker->SetAssignedTask(task);
*worker_leased = true;
dispatched = true;
}
return dispatched;
}
void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> callback) { std::function<void(void)> callback) {
RAY_LOG(DEBUG) << "Queuing task " << task.GetTaskSpecification().TaskId();
Work work = std::make_tuple(task, reply, callback); Work work = std::make_tuple(task, reply, callback);
const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass();
tasks_to_schedule_[scheduling_class].push_back(work); tasks_to_schedule_[scheduling_class].push_back(work);
@ -167,8 +190,10 @@ void ClusterTaskManager::TasksUnblocked(const std::vector<TaskID> ready_ids) {
auto it = waiting_tasks_.find(task_id); auto it = waiting_tasks_.find(task_id);
if (it != waiting_tasks_.end()) { if (it != waiting_tasks_.end()) {
auto work = it->second; auto work = it->second;
const auto &scheduling_key = const auto &task = std::get<0>(work);
std::get<0>(work).GetTaskSpecification().GetSchedulingClass(); const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass();
RAY_LOG(DEBUG) << "Args ready, task can be dispatched "
<< task.GetTaskSpecification().TaskId();
tasks_to_dispatch_[scheduling_key].push_back(work); tasks_to_dispatch_[scheduling_key].push_back(work);
waiting_tasks_.erase(it); waiting_tasks_.erase(it);
} }
@ -303,6 +328,7 @@ void ClusterTaskManager::Dispatch(
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers, std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers,
const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> send_reply_callback) { std::function<void(void)> send_reply_callback) {
RAY_LOG(DEBUG) << "Dispatching task " << task_spec.TaskId();
// Pass the contact info of the worker to use. // Pass the contact info of the worker to use.
reply->mutable_worker_address()->set_ip_address(worker->IpAddress()); reply->mutable_worker_address()->set_ip_address(worker->IpAddress());
reply->mutable_worker_address()->set_port(worker->Port()); reply->mutable_worker_address()->set_port(worker->Port());
@ -362,12 +388,23 @@ void ClusterTaskManager::Dispatch(
send_reply_callback(); send_reply_callback();
} }
void ClusterTaskManager::Spillback(NodeID spillback_to, std::string address, int port, void ClusterTaskManager::Spillback(const NodeID &spillback_to, const Work &work) {
rpc::RequestWorkerLeaseReply *reply, const auto &task_spec = std::get<0>(work).GetTaskSpecification();
std::function<void(void)> send_reply_callback) { RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to;
reply->mutable_retry_at_raylet_address()->set_ip_address(address); cluster_resource_scheduler_->AllocateRemoteTaskResources(
reply->mutable_retry_at_raylet_address()->set_port(port); spillback_to.Binary(), task_spec.GetRequiredResources().GetResourceMap());
auto node_info_opt = get_node_info_(spillback_to);
RAY_CHECK(node_info_opt)
<< "Spilling back to a node manager, but no GCS info found for node "
<< spillback_to;
auto reply = std::get<1>(work);
reply->mutable_retry_at_raylet_address()->set_ip_address(
node_info_opt->node_manager_address());
reply->mutable_retry_at_raylet_address()->set_port(node_info_opt->node_manager_port());
reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary()); reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary());
auto send_reply_callback = std::get<2>(work);
send_reply_callback(); send_reply_callback();
} }

View file

@ -63,7 +63,12 @@ class ClusterTaskManager {
/// (Step 3) Attempts to dispatch all tasks which are ready to run. A task /// (Step 3) Attempts to dispatch all tasks which are ready to run. A task
/// will be dispatched if it is on `tasks_to_dispatch_` and there are still /// will be dispatched if it is on `tasks_to_dispatch_` and there are still
/// avaialable resources on the node. /// available resources on the node.
///
/// If there are not enough resources locally, up to one task per resource
/// shape (the task at the head of the queue) will get spilled back to a
/// different node.
///
/// \param worker_pool: The pool of workers which will be dispatched to. /// \param worker_pool: The pool of workers which will be dispatched to.
/// `worker_pool` state will be modified (idle workers will be popped) during /// `worker_pool` state will be modified (idle workers will be popped) during
/// dispatching. /// dispatching.
@ -109,6 +114,12 @@ class ClusterTaskManager {
std::string DebugString(); std::string DebugString();
private: private:
/// Helper method to try dispatching a single task from the queue to an
/// available worker. Returns whether the task should be removed from the
/// queue and whether the worker was successfully leased to execute the work.
bool AttemptDispatchWork(const Work &work, std::shared_ptr<WorkerInterface> &worker,
bool *worker_leased);
const NodeID &self_node_id_; const NodeID &self_node_id_;
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_; std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
std::function<bool(const Task &)> fulfills_dependencies_func_; std::function<bool(const Task &)> fulfills_dependencies_func_;
@ -135,9 +146,7 @@ class ClusterTaskManager {
const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> send_reply_callback); std::function<void(void)> send_reply_callback);
void Spillback(NodeID spillback_to, std::string address, int port, void Spillback(const NodeID &spillback_to, const Work &work);
rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> send_reply_callback);
}; };
} // namespace raylet } // namespace raylet
} // namespace ray } // namespace ray

View file

@ -95,27 +95,38 @@ class ClusterTaskManagerTest : public ::testing::Test {
public: public:
ClusterTaskManagerTest() ClusterTaskManagerTest()
: id_(NodeID::FromRandom()), : id_(NodeID::FromRandom()),
single_node_resource_scheduler_(CreateSingleNodeScheduler(id_.Binary())), scheduler_(CreateSingleNodeScheduler(id_.Binary())),
fulfills_dependencies_calls_(0), fulfills_dependencies_calls_(0),
dependencies_fulfilled_(true), dependencies_fulfilled_(true),
node_info_calls_(0), node_info_calls_(0),
node_info_(boost::optional<rpc::GcsNodeInfo>{}), task_manager_(id_, scheduler_,
task_manager_(id_, single_node_resource_scheduler_,
[this](const Task &_task) { [this](const Task &_task) {
fulfills_dependencies_calls_++; fulfills_dependencies_calls_++;
return dependencies_fulfilled_; return dependencies_fulfilled_;
}, },
[this](const NodeID &node_id) { [this](const NodeID &node_id) {
node_info_calls_++; node_info_calls_++;
return node_info_; return node_info_[node_id];
}) {} }) {}
void SetUp() {} void SetUp() {}
void Shutdown() {} void Shutdown() {}
void AddNode(const NodeID &id, double num_cpus, double num_gpus = 0,
double memory = 0) {
std::unordered_map<std::string, double> node_resources;
node_resources[ray::kCPU_ResourceLabel] = num_cpus;
node_resources[ray::kGPU_ResourceLabel] = num_gpus;
node_resources[ray::kMemory_ResourceLabel] = memory;
scheduler_->AddOrUpdateNode(id.Binary(), node_resources, node_resources);
rpc::GcsNodeInfo info;
node_info_[id] = info;
}
NodeID id_; NodeID id_;
std::shared_ptr<ClusterResourceScheduler> single_node_resource_scheduler_; std::shared_ptr<ClusterResourceScheduler> scheduler_;
MockWorkerPool pool_; MockWorkerPool pool_;
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> leased_workers_; std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> leased_workers_;
@ -123,7 +134,7 @@ class ClusterTaskManagerTest : public ::testing::Test {
bool dependencies_fulfilled_; bool dependencies_fulfilled_;
int node_info_calls_; int node_info_calls_;
boost::optional<rpc::GcsNodeInfo> node_info_; std::unordered_map<NodeID, boost::optional<rpc::GcsNodeInfo>> node_info_;
ClusterTaskManager task_manager_; ClusterTaskManager task_manager_;
}; };
@ -248,6 +259,55 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
ASSERT_EQ(pool_.workers.size(), 0); ASSERT_EQ(pool_.workers.size(), 0);
} }
TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) {
/*
Test the race condition in which a task is assigned to the local node, but
it cannot be run because a different task gets assigned the resources
first. The un-runnable task should eventually get spilled back to another
node.
*/
std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
auto remote_node_id = NodeID::FromRandom();
AddNode(remote_node_id, 5);
int num_callbacks = 0;
auto callback = [&]() { num_callbacks++; };
/* Blocked on starting a worker. */
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}});
rpc::RequestWorkerLeaseReply local_reply;
task_manager_.QueueTask(task, &local_reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(num_callbacks, 0);
ASSERT_EQ(leased_workers_.size(), 0);
/* This task can run but not at the same time as the first */
auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}});
rpc::RequestWorkerLeaseReply spillback_reply;
task_manager_.QueueTask(task2, &spillback_reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(num_callbacks, 0);
ASSERT_EQ(leased_workers_.size(), 0);
// Two workers start. First task is dispatched now, but resources are no
// longer available for the second.
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
// Check that both tasks got removed from the queue.
ASSERT_EQ(num_callbacks, 2);
// The first task was dispatched.
ASSERT_EQ(leased_workers_.size(), 1);
// The second task was spilled.
ASSERT_EQ(spillback_reply.retry_at_raylet_address().raylet_id(),
remote_node_id.Binary());
}
TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { TEST_F(ClusterTaskManagerTest, TaskCancellationTest) {
std::shared_ptr<MockWorker> worker = std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234); std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);

View file

@ -35,7 +35,7 @@ class MockWorker : public WorkerInterface {
// TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted.
void AssignJobId(const JobID &job_id) {} void AssignJobId(const JobID &job_id) {}
void SetAssignedTask(Task &assigned_task) {} void SetAssignedTask(const Task &assigned_task) {}
const std::string IpAddress() const { return address_.ip_address(); } const std::string IpAddress() const { return address_.ip_address(); }

View file

@ -108,7 +108,7 @@ class WorkerInterface {
virtual Task &GetAssignedTask() = 0; virtual Task &GetAssignedTask() = 0;
virtual void SetAssignedTask(Task &assigned_task) = 0; virtual void SetAssignedTask(const Task &assigned_task) = 0;
virtual bool IsRegistered() = 0; virtual bool IsRegistered() = 0;
@ -210,7 +210,7 @@ class Worker : public WorkerInterface {
Task &GetAssignedTask() { return assigned_task_; }; Task &GetAssignedTask() { return assigned_task_; };
void SetAssignedTask(Task &assigned_task) { assigned_task_ = assigned_task; }; void SetAssignedTask(const Task &assigned_task) { assigned_task_ = assigned_task; };
bool IsRegistered() { return rpc_client_ != nullptr; } bool IsRegistered() { return rpc_client_ != nullptr; }