From f6bdd5ab17be37caf51a0858d7f20adc5d4978a1 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Tue, 17 Nov 2020 19:13:59 -0500 Subject: [PATCH] [New Scheduler] Spillback from the queue of tasks assigned to the local node (#12084) --- python/ray/tests/test_actor_advanced.py | 1 - src/ray/raylet/node_manager.cc | 2 + .../scheduling/cluster_resource_scheduler.cc | 2 +- .../scheduling/cluster_resource_scheduler.h | 2 +- .../raylet/scheduling/cluster_task_manager.cc | 131 +++++++++++------- .../raylet/scheduling/cluster_task_manager.h | 17 ++- .../scheduling/cluster_task_manager_test.cc | 72 +++++++++- src/ray/raylet/test/util.h | 2 +- src/ray/raylet/worker.h | 4 +- 9 files changed, 170 insertions(+), 63 deletions(-) diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 2eec9fa56..579acd0d0 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -91,7 +91,6 @@ def test_actor_load_balancing(ray_start_cluster): ray.get(results) -@pytest.mark.skipif(new_scheduler_enabled(), reason="multi node broken") def test_actor_lifetime_load_balancing(ray_start_cluster): cluster = ray_start_cluster cluster.add_node(num_cpus=0) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index bb247f28e..a8bf79699 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -910,6 +910,8 @@ void NodeManager::HeartbeatAdded(const NodeID &client_id, new_resource_scheduler_->AddOrUpdateNode( client_id.Binary(), remote_resources.GetTotalResources().GetResourceMap(), remote_resources.GetAvailableResources().GetResourceMap()); + // TODO(swang): We could probably call this once per batch instead of once + // per node in the batch. ScheduleAndDispatch(); return; } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 86ffb2e82..44f10e1c4 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -786,7 +786,7 @@ std::string ClusterResourceScheduler::GetResourceNameFromIndex(int64_t res_idx) } void ClusterResourceScheduler::AllocateRemoteTaskResources( - std::string &node_string, + const std::string &node_string, const std::unordered_map &task_resources) { TaskRequest task_request = ResourceMapToTaskRequest(string_to_int_map_, task_resources); auto node_id = string_to_int_map_.Insert(node_string); diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 15375bd10..c476e174a 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -361,7 +361,7 @@ class ClusterResourceScheduler { /// \param node_id Remote node whose resources we allocate. /// \param task_req Task for which we allocate resources. void AllocateRemoteTaskResources( - std::string &node_id, + const std::string &node_id, const std::unordered_map &task_resources); void FreeLocalTaskResources(std::shared_ptr task_allocation); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 482cf6eb4..b971f1611 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -18,7 +18,6 @@ ClusterTaskManager::ClusterTaskManager( bool ClusterTaskManager::SchedulePendingTasks() { bool did_schedule = false; - for (auto shapes_it = tasks_to_schedule_.begin(); shapes_it != tasks_to_schedule_.end();) { auto &work_queue = shapes_it->second; @@ -28,8 +27,10 @@ bool ClusterTaskManager::SchedulePendingTasks() { // blocking where a task which cannot be scheduled because // there are not enough available resources blocks other // tasks from being scheduled. - Work work = *work_it; + const Work &work = *work_it; Task task = std::get<0>(work); + RAY_LOG(DEBUG) << "Scheduling pending task " + << task.GetTaskSpecification().TaskId(); auto placement_resources = task.GetTaskSpecification().GetRequiredPlacementResources().GetResourceMap(); int64_t _unused; @@ -50,20 +51,8 @@ bool ClusterTaskManager::SchedulePendingTasks() { did_schedule = task_scheduled || did_schedule; } else { // Should spill over to a different node. - cluster_resource_scheduler_->AllocateRemoteTaskResources( - node_id_string, - task.GetTaskSpecification().GetRequiredResources().GetResourceMap()); - NodeID node_id = NodeID::FromBinary(node_id_string); - auto node_info_opt = get_node_info_(node_id); - // gcs_client_->Nodes().Get(node_id); - RAY_CHECK(node_info_opt) - << "Spilling back to a node manager, but no GCS info found for node " - << node_id; - auto reply = std::get<1>(work); - auto callback = std::get<2>(work); - Spillback(node_id, node_info_opt->node_manager_address(), - node_info_opt->node_manager_port(), reply, callback); + Spillback(node_id, work); } work_it = work_queue.erase(work_it); } @@ -85,6 +74,8 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(Work work) { if (object_ids.size() > 0) { bool args_ready = fulfills_dependencies_func_(task); if (args_ready) { + RAY_LOG(DEBUG) << "Args already ready, task can be dispatched " + << task.GetTaskSpecification().TaskId(); tasks_to_dispatch_[scheduling_key].push_back(work); } else { can_dispatch = false; @@ -92,6 +83,8 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(Work work) { waiting_tasks_[task_id] = work; } } else { + RAY_LOG(DEBUG) << "No args, task can be dispatched " + << task.GetTaskSpecification().TaskId(); tasks_to_dispatch_[scheduling_key].push_back(work); } return can_dispatch; @@ -109,9 +102,7 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( shapes_it != tasks_to_dispatch_.end();) { auto &dispatch_queue = shapes_it->second; for (auto work_it = dispatch_queue.begin(); work_it != dispatch_queue.end();) { - auto work = *work_it; - auto task = std::get<0>(work); - auto spec = task.GetTaskSpecification(); + auto spec = std::get<0>(*work_it).GetTaskSpecification(); std::shared_ptr worker = worker_pool.PopWorker(spec); if (!worker) { @@ -119,33 +110,21 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( return; } - std::shared_ptr allocated_instances( - new TaskResourceInstances()); - bool schedulable = cluster_resource_scheduler_->AllocateLocalTaskResources( - spec.GetRequiredResources().GetResourceMap(), allocated_instances); - if (!schedulable) { - // Not enough resources to schedule this task. + bool worker_leased; + bool remove = AttemptDispatchWork(*work_it, worker, &worker_leased); + if (worker_leased) { + auto reply = std::get<1>(*work_it); + auto callback = std::get<2>(*work_it); + Dispatch(worker, leased_workers, spec, reply, callback); + } else { worker_pool.PushWorker(worker); - // All the tasks in this queue are the same, so move on to the next queue. - break; } - auto reply = std::get<1>(work); - auto callback = std::get<2>(work); - worker->SetOwnerAddress(spec.CallerAddress()); - if (spec.IsActorCreationTask()) { - // The actor belongs to this worker now. - worker->SetLifetimeAllocatedInstances(allocated_instances); + if (remove) { + work_it = dispatch_queue.erase(work_it); } else { - worker->SetAllocatedInstances(allocated_instances); + break; } - worker->AssignTaskId(spec.TaskId()); - if (!RayConfig::instance().enable_multi_tenancy()) { - worker->AssignJobId(spec.JobId()); - } - worker->SetAssignedTask(task); - Dispatch(worker, leased_workers, spec, reply, callback); - work_it = dispatch_queue.erase(work_it); } if (dispatch_queue.empty()) { shapes_it = tasks_to_dispatch_.erase(shapes_it); @@ -155,8 +134,52 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( } } +bool ClusterTaskManager::AttemptDispatchWork(const Work &work, + std::shared_ptr &worker, + bool *worker_leased) { + const auto &task = std::get<0>(work); + const auto &spec = task.GetTaskSpecification(); + RAY_LOG(DEBUG) << "Attempting to dispatch task " << spec.TaskId(); + + std::shared_ptr allocated_instances(new TaskResourceInstances()); + bool schedulable = cluster_resource_scheduler_->AllocateLocalTaskResources( + spec.GetRequiredResources().GetResourceMap(), allocated_instances); + bool dispatched = false; + if (!schedulable) { + *worker_leased = false; + // Spill at most one task from this queue, then move on to the next + // queue. + int64_t _unused; + auto placement_resources = spec.GetRequiredPlacementResources().GetResourceMap(); + std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( + placement_resources, spec.IsActorCreationTask(), &_unused); + if (node_id_string != self_node_id_.Binary() && !node_id_string.empty()) { + NodeID node_id = NodeID::FromBinary(node_id_string); + Spillback(node_id, work); + dispatched = true; + } + } else { + worker->SetOwnerAddress(spec.CallerAddress()); + if (spec.IsActorCreationTask()) { + // The actor belongs to this worker now. + worker->SetLifetimeAllocatedInstances(allocated_instances); + } else { + worker->SetAllocatedInstances(allocated_instances); + } + worker->AssignTaskId(spec.TaskId()); + if (!RayConfig::instance().enable_multi_tenancy()) { + worker->AssignJobId(spec.JobId()); + } + worker->SetAssignedTask(task); + *worker_leased = true; + dispatched = true; + } + return dispatched; +} + void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, std::function callback) { + RAY_LOG(DEBUG) << "Queuing task " << task.GetTaskSpecification().TaskId(); Work work = std::make_tuple(task, reply, callback); const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); tasks_to_schedule_[scheduling_class].push_back(work); @@ -167,8 +190,10 @@ void ClusterTaskManager::TasksUnblocked(const std::vector ready_ids) { auto it = waiting_tasks_.find(task_id); if (it != waiting_tasks_.end()) { auto work = it->second; - const auto &scheduling_key = - std::get<0>(work).GetTaskSpecification().GetSchedulingClass(); + const auto &task = std::get<0>(work); + const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass(); + RAY_LOG(DEBUG) << "Args ready, task can be dispatched " + << task.GetTaskSpecification().TaskId(); tasks_to_dispatch_[scheduling_key].push_back(work); waiting_tasks_.erase(it); } @@ -303,6 +328,7 @@ void ClusterTaskManager::Dispatch( std::unordered_map> &leased_workers, const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, std::function send_reply_callback) { + RAY_LOG(DEBUG) << "Dispatching task " << task_spec.TaskId(); // Pass the contact info of the worker to use. reply->mutable_worker_address()->set_ip_address(worker->IpAddress()); reply->mutable_worker_address()->set_port(worker->Port()); @@ -362,12 +388,23 @@ void ClusterTaskManager::Dispatch( send_reply_callback(); } -void ClusterTaskManager::Spillback(NodeID spillback_to, std::string address, int port, - rpc::RequestWorkerLeaseReply *reply, - std::function send_reply_callback) { - reply->mutable_retry_at_raylet_address()->set_ip_address(address); - reply->mutable_retry_at_raylet_address()->set_port(port); +void ClusterTaskManager::Spillback(const NodeID &spillback_to, const Work &work) { + const auto &task_spec = std::get<0>(work).GetTaskSpecification(); + RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to; + cluster_resource_scheduler_->AllocateRemoteTaskResources( + spillback_to.Binary(), task_spec.GetRequiredResources().GetResourceMap()); + + auto node_info_opt = get_node_info_(spillback_to); + RAY_CHECK(node_info_opt) + << "Spilling back to a node manager, but no GCS info found for node " + << spillback_to; + auto reply = std::get<1>(work); + reply->mutable_retry_at_raylet_address()->set_ip_address( + node_info_opt->node_manager_address()); + reply->mutable_retry_at_raylet_address()->set_port(node_info_opt->node_manager_port()); reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary()); + + auto send_reply_callback = std::get<2>(work); send_reply_callback(); } diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index e31d63900..d94b386a3 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -63,7 +63,12 @@ class ClusterTaskManager { /// (Step 3) Attempts to dispatch all tasks which are ready to run. A task /// will be dispatched if it is on `tasks_to_dispatch_` and there are still - /// avaialable resources on the node. + /// available resources on the node. + /// + /// If there are not enough resources locally, up to one task per resource + /// shape (the task at the head of the queue) will get spilled back to a + /// different node. + /// /// \param worker_pool: The pool of workers which will be dispatched to. /// `worker_pool` state will be modified (idle workers will be popped) during /// dispatching. @@ -109,6 +114,12 @@ class ClusterTaskManager { std::string DebugString(); private: + /// Helper method to try dispatching a single task from the queue to an + /// available worker. Returns whether the task should be removed from the + /// queue and whether the worker was successfully leased to execute the work. + bool AttemptDispatchWork(const Work &work, std::shared_ptr &worker, + bool *worker_leased); + const NodeID &self_node_id_; std::shared_ptr cluster_resource_scheduler_; std::function fulfills_dependencies_func_; @@ -135,9 +146,7 @@ class ClusterTaskManager { const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, std::function send_reply_callback); - void Spillback(NodeID spillback_to, std::string address, int port, - rpc::RequestWorkerLeaseReply *reply, - std::function send_reply_callback); + void Spillback(const NodeID &spillback_to, const Work &work); }; } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index d9cee05e2..afdf924fc 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -95,27 +95,38 @@ class ClusterTaskManagerTest : public ::testing::Test { public: ClusterTaskManagerTest() : id_(NodeID::FromRandom()), - single_node_resource_scheduler_(CreateSingleNodeScheduler(id_.Binary())), + scheduler_(CreateSingleNodeScheduler(id_.Binary())), fulfills_dependencies_calls_(0), dependencies_fulfilled_(true), node_info_calls_(0), - node_info_(boost::optional{}), - task_manager_(id_, single_node_resource_scheduler_, + task_manager_(id_, scheduler_, [this](const Task &_task) { fulfills_dependencies_calls_++; return dependencies_fulfilled_; }, [this](const NodeID &node_id) { node_info_calls_++; - return node_info_; + return node_info_[node_id]; }) {} void SetUp() {} void Shutdown() {} + void AddNode(const NodeID &id, double num_cpus, double num_gpus = 0, + double memory = 0) { + std::unordered_map node_resources; + node_resources[ray::kCPU_ResourceLabel] = num_cpus; + node_resources[ray::kGPU_ResourceLabel] = num_gpus; + node_resources[ray::kMemory_ResourceLabel] = memory; + scheduler_->AddOrUpdateNode(id.Binary(), node_resources, node_resources); + + rpc::GcsNodeInfo info; + node_info_[id] = info; + } + NodeID id_; - std::shared_ptr single_node_resource_scheduler_; + std::shared_ptr scheduler_; MockWorkerPool pool_; std::unordered_map> leased_workers_; @@ -123,7 +134,7 @@ class ClusterTaskManagerTest : public ::testing::Test { bool dependencies_fulfilled_; int node_info_calls_; - boost::optional node_info_; + std::unordered_map> node_info_; ClusterTaskManager task_manager_; }; @@ -248,6 +259,55 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { ASSERT_EQ(pool_.workers.size(), 0); } +TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) { + /* + Test the race condition in which a task is assigned to the local node, but + it cannot be run because a different task gets assigned the resources + first. The un-runnable task should eventually get spilled back to another + node. + */ + std::shared_ptr worker = + std::make_shared(WorkerID::FromRandom(), 1234); + auto remote_node_id = NodeID::FromRandom(); + AddNode(remote_node_id, 5); + + int num_callbacks = 0; + auto callback = [&]() { num_callbacks++; }; + + /* Blocked on starting a worker. */ + auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}); + rpc::RequestWorkerLeaseReply local_reply; + task_manager_.QueueTask(task, &local_reply, callback); + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + + ASSERT_EQ(num_callbacks, 0); + ASSERT_EQ(leased_workers_.size(), 0); + + /* This task can run but not at the same time as the first */ + auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}); + rpc::RequestWorkerLeaseReply spillback_reply; + task_manager_.QueueTask(task2, &spillback_reply, callback); + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + + ASSERT_EQ(num_callbacks, 0); + ASSERT_EQ(leased_workers_.size(), 0); + + // Two workers start. First task is dispatched now, but resources are no + // longer available for the second. + pool_.PushWorker(std::dynamic_pointer_cast(worker)); + pool_.PushWorker(std::dynamic_pointer_cast(worker)); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + // Check that both tasks got removed from the queue. + ASSERT_EQ(num_callbacks, 2); + // The first task was dispatched. + ASSERT_EQ(leased_workers_.size(), 1); + // The second task was spilled. + ASSERT_EQ(spillback_reply.retry_at_raylet_address().raylet_id(), + remote_node_id.Binary()); +} + TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 476cd1f4f..6625043d1 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -35,7 +35,7 @@ class MockWorker : public WorkerInterface { // TODO(kfstorm): Remove this once `enable_multi_tenancy` is deleted. void AssignJobId(const JobID &job_id) {} - void SetAssignedTask(Task &assigned_task) {} + void SetAssignedTask(const Task &assigned_task) {} const std::string IpAddress() const { return address_.ip_address(); } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 512230db7..830ae656d 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -108,7 +108,7 @@ class WorkerInterface { virtual Task &GetAssignedTask() = 0; - virtual void SetAssignedTask(Task &assigned_task) = 0; + virtual void SetAssignedTask(const Task &assigned_task) = 0; virtual bool IsRegistered() = 0; @@ -210,7 +210,7 @@ class Worker : public WorkerInterface { Task &GetAssignedTask() { return assigned_task_; }; - void SetAssignedTask(Task &assigned_task) { assigned_task_ = assigned_task; }; + void SetAssignedTask(const Task &assigned_task) { assigned_task_ = assigned_task; }; bool IsRegistered() { return rpc_client_ != nullptr; }