Speed up task dispatch. (#3234)

* speed up task dispatch

* minor changes

* improved comments

* improved comments

* change argument of DispatchTasks to list of tasks

* dispatch only tasks whose dependencies have been fullfiled

* some updated comments

* refactored DispatchQueue() and Assigntask() to avoid the copy of the ready list

* minor fixes

* some more minor fixes

* some more minor fixes

* added more comments

* better comments?

* fixed all feedback comments, minus making the argument of AssignTask() const

* Assigntask() now taskes a const argument

* Do the task copy outside of the callback

* fix linting
This commit is contained in:
Ion 2018-11-10 19:55:12 +02:00 committed by Philipp Moritz
parent 29c182d449
commit d681893b0f
2 changed files with 79 additions and 44 deletions

View file

@ -508,15 +508,15 @@ void NodeManager::ProcessNewClient(LocalClientConnection &client) {
client.ProcessMessages(); client.ProcessMessages();
} }
void NodeManager::DispatchTasks() { void NodeManager::DispatchTasks(const std::list<Task> &ready_tasks) {
// Work with a copy of scheduled tasks.
// (See design_docs/task_states.rst for the state transition diagram.)
auto ready_tasks = local_queues_.GetReadyTasks();
// Return if there are no tasks to schedule. // Return if there are no tasks to schedule.
if (ready_tasks.empty()) { if (ready_tasks.empty()) {
return; return;
} }
// Remember ids of the task we need to remove from ready queue.
std::unordered_set<TaskID> removed_task_ids = {};
for (const auto &task : ready_tasks) { for (const auto &task : ready_tasks) {
const auto &task_resources = task.GetTaskSpecification().GetRequiredResources(); const auto &task_resources = task.GetTaskSpecification().GetRequiredResources();
if (!local_available_resources_.Contains(task_resources)) { if (!local_available_resources_.Contains(task_resources)) {
@ -525,10 +525,16 @@ void NodeManager::DispatchTasks() {
continue; continue;
} }
// We have enough resources for this task. Assign task. // We have enough resources for this task. Assign task.
// TODO(atumanov): perform the task state/queue transition inside AssignTask. if (AssignTask(task)) {
// (See design_docs/task_states.rst for the state transition diagram.) // We were successful in assigning this task on a local worker, so
auto dispatched_task = local_queues_.RemoveTask(task.GetTaskSpecification().TaskId()); // remember to remove it from ready queue. If for some reason the
AssignTask(dispatched_task); // scheduling of this task fails later, we will add it back to the
// ready queue.
removed_task_ids.insert(task.GetTaskSpecification().TaskId());
}
}
if (!removed_task_ids.empty()) {
local_queues_.RemoveTasks(removed_task_ids);
} }
} }
@ -612,7 +618,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
if (message->is_worker()) { if (message->is_worker()) {
// Register the new worker. // Register the new worker.
worker_pool_.RegisterWorker(std::move(worker)); worker_pool_.RegisterWorker(std::move(worker));
DispatchTasks(); DispatchTasks(local_queues_.GetReadyTasks());
} else { } else {
// Register the new driver. Note that here the driver_id in RegisterClientRequest // Register the new driver. Note that here the driver_id in RegisterClientRequest
// message is actually the ID of the driver task, while client_id represents the // message is actually the ID of the driver task, while client_id represents the
@ -641,7 +647,7 @@ void NodeManager::ProcessGetTaskMessage(
cluster_resource_map_[local_client_id].SetLoadResources( cluster_resource_map_[local_client_id].SetLoadResources(
local_queues_.GetResourceLoad()); local_queues_.GetResourceLoad());
// Call task dispatch to assign work to the new worker. // Call task dispatch to assign work to the new worker.
DispatchTasks(); DispatchTasks(local_queues_.GetReadyTasks());
} }
void NodeManager::ProcessDisconnectClientMessage( void NodeManager::ProcessDisconnectClientMessage(
@ -738,7 +744,7 @@ void NodeManager::ProcessDisconnectClientMessage(
<< "driver_id: " << worker->GetAssignedDriverId(); << "driver_id: " << worker->GetAssignedDriverId();
// Since some resources may have been released, we can try to dispatch more tasks. // Since some resources may have been released, we can try to dispatch more tasks.
DispatchTasks(); DispatchTasks(local_queues_.GetReadyTasks());
} else if (is_driver) { } else if (is_driver) {
// The client is a driver. // The client is a driver.
RAY_CHECK_OK(gcs_client_->driver_table().AppendDriverData(client->GetClientID(), RAY_CHECK_OK(gcs_client_->driver_table().AppendDriverData(client->GetClientID(),
@ -1102,7 +1108,6 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
// (See design_docs/task_states.rst for the state transition diagram.) // (See design_docs/task_states.rst for the state transition diagram.)
local_queues_.QueuePlaceableTasks({task}); local_queues_.QueuePlaceableTasks({task});
ScheduleTasks(cluster_resource_map_); ScheduleTasks(cluster_resource_map_);
DispatchTasks();
// TODO(atumanov): assert that !placeable.isempty() => insufficient available // TODO(atumanov): assert that !placeable.isempty() => insufficient available
// resources locally. // resources locally.
} }
@ -1136,7 +1141,7 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr<LocalClientConnection>
worker->MarkBlocked(); worker->MarkBlocked();
// Try dispatching tasks since we may have released some resources. // Try dispatching tasks since we may have released some resources.
DispatchTasks(); DispatchTasks(local_queues_.GetReadyTasks());
} }
} else { } else {
// The client is a driver. Drivers do not hold resources, so we simply mark // The client is a driver. Drivers do not hold resources, so we simply mark
@ -1230,8 +1235,11 @@ void NodeManager::EnqueuePlaceableTask(const Task &task) {
// (See design_docs/task_states.rst for the state transition diagram.) // (See design_docs/task_states.rst for the state transition diagram.)
if (args_ready) { if (args_ready) {
local_queues_.QueueReadyTasks({task}); local_queues_.QueueReadyTasks({task});
// Try to dispatch the newly ready task. // Dispatch just the new who was added to the ready task.
DispatchTasks(); // The other tasks in the ready queue can be ignored as no new resources
// have been added and no new worker has became available since the last
// time DispatchTasks() was called.
DispatchTasks({task});
} else { } else {
local_queues_.QueueWaitingTasks({task}); local_queues_.QueueWaitingTasks({task});
} }
@ -1241,14 +1249,14 @@ void NodeManager::EnqueuePlaceableTask(const Task &task) {
task_dependency_manager_.TaskPending(task); task_dependency_manager_.TaskPending(task);
} }
void NodeManager::AssignTask(Task &task) { bool NodeManager::AssignTask(const Task &task) {
const TaskSpecification &spec = task.GetTaskSpecification(); const TaskSpecification &spec = task.GetTaskSpecification();
// If this is an actor task, check that the new task has the correct counter. // If this is an actor task, check that the new task has the correct counter.
if (spec.IsActorTask()) { if (spec.IsActorTask()) {
if (CheckDuplicateActorTask(actor_registry_, spec)) { if (CheckDuplicateActorTask(actor_registry_, spec)) {
// Drop tasks that have already been executed. // This actor has been already assigned, so ignore it.
return; return true;
} }
} }
@ -1261,11 +1269,8 @@ void NodeManager::AssignTask(Task &task) {
// Start a new worker. // Start a new worker.
worker_pool_.StartWorkerProcess(spec.GetLanguage()); worker_pool_.StartWorkerProcess(spec.GetLanguage());
} }
// Queue this task for future assignment. The task will be assigned to a // We couldn't assign this task, as no worker available.
// worker once one becomes available. return false;
// (See design_docs/task_states.rst for the state transition diagram.)
local_queues_.QueueReadyTasks(std::vector<Task>({task}));
return;
} }
RAY_LOG(DEBUG) << "Assigning task to worker with pid " << worker->Pid(); RAY_LOG(DEBUG) << "Assigning task to worker with pid " << worker->Pid();
@ -1292,11 +1297,13 @@ void NodeManager::AssignTask(Task &task) {
auto message = protocol::CreateGetTaskReply(fbb, spec.ToFlatbuffer(fbb), auto message = protocol::CreateGetTaskReply(fbb, spec.ToFlatbuffer(fbb),
fbb.CreateVector(resource_id_set_flatbuf)); fbb.CreateVector(resource_id_set_flatbuf));
fbb.Finish(message); fbb.Finish(message);
// Give the callback a copy of the task so it can modify it.
Task assigned_task(task);
worker->Connection()->WriteMessageAsync( worker->Connection()->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::ExecuteTask), fbb.GetSize(), static_cast<int64_t>(protocol::MessageType::ExecuteTask), fbb.GetSize(),
fbb.GetBufferPointer(), [this, worker, task](ray::Status status) mutable { fbb.GetBufferPointer(), [this, worker, assigned_task](ray::Status status) mutable {
if (status.ok()) { if (status.ok()) {
auto spec = task.GetTaskSpecification(); auto spec = assigned_task.GetTaskSpecification();
// We successfully assigned the task to the worker. // We successfully assigned the task to the worker.
worker->AssignTaskId(spec.TaskId()); worker->AssignTaskId(spec.TaskId());
worker->AssignDriverId(spec.DriverId()); worker->AssignDriverId(spec.DriverId());
@ -1317,20 +1324,22 @@ void NodeManager::AssignTask(Task &task) {
// we may lose updates that are in flight to the task table. We only // we may lose updates that are in flight to the task table. We only
// guarantee deterministic reconstruction ordering for tasks whose // guarantee deterministic reconstruction ordering for tasks whose
// updates are reflected in the task table. // updates are reflected in the task table.
task.SetExecutionDependencies({execution_dependency}); // (SetExecutionDependencies takes a non-const so copy task in a
// on-const variable.)
assigned_task.SetExecutionDependencies({execution_dependency});
// Extend the frontier to include the executing task. // Extend the frontier to include the executing task.
actor_entry->second.ExtendFrontier(spec.ActorHandleId(), actor_entry->second.ExtendFrontier(spec.ActorHandleId(),
spec.ActorDummyObject()); spec.ActorDummyObject());
} }
// We started running the task, so the task is ready to write to GCS. // We started running the task, so the task is ready to write to GCS.
if (!lineage_cache_.AddReadyTask(task)) { if (!lineage_cache_.AddReadyTask(assigned_task)) {
RAY_LOG(WARNING) << "Task " << spec.TaskId() << " already in lineage cache. " RAY_LOG(WARNING) << "Task " << spec.TaskId() << " already in lineage cache. "
"This is most likely due to " "This is most likely due to "
"reconstruction."; "reconstruction.";
} }
// Mark the task as running. // Mark the task as running.
// (See design_docs/task_states.rst for the state transition diagram.) // (See design_docs/task_states.rst for the state transition diagram.)
local_queues_.QueueRunningTasks(std::vector<Task>({task})); local_queues_.QueueRunningTasks(std::vector<Task>({assigned_task}));
// Notify the task dependency manager that we no longer need this task's // Notify the task dependency manager that we no longer need this task's
// object dependencies. // object dependencies.
task_dependency_manager_.UnsubscribeDependencies(spec.TaskId()); task_dependency_manager_.UnsubscribeDependencies(spec.TaskId());
@ -1338,13 +1347,19 @@ void NodeManager::AssignTask(Task &task) {
RAY_LOG(WARNING) << "Failed to send task to worker, disconnecting client"; RAY_LOG(WARNING) << "Failed to send task to worker, disconnecting client";
// We failed to send the task to the worker, so disconnect the worker. // We failed to send the task to the worker, so disconnect the worker.
ProcessDisconnectClientMessage(worker->Connection()); ProcessDisconnectClientMessage(worker->Connection());
// Queue this task for future assignment. The task will be assigned to a // Queue this task for future assignment. We need to do this since
// worker once one becomes available. // DispatchTasks() removed it from the ready queue. The task will be
// assigned to a worker once one becomes available.
// (See design_docs/task_states.rst for the state transition diagram.) // (See design_docs/task_states.rst for the state transition diagram.)
local_queues_.QueueReadyTasks(std::vector<Task>({task})); local_queues_.QueueReadyTasks(std::vector<Task>({assigned_task}));
DispatchTasks(); DispatchTasks({assigned_task});
} }
}); });
// We assigned this task to a worker.
// (Note this means that we sent the task to the worker. The assignment
// might still fail if the worker fails in the meantime, for instance.)
return true;
} }
void NodeManager::FinishAssignedTask(Worker &worker) { void NodeManager::FinishAssignedTask(Worker &worker) {
@ -1473,17 +1488,25 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) {
if (ready_task_ids.size() > 0) { if (ready_task_ids.size() > 0) {
std::unordered_set<TaskID> ready_task_id_set(ready_task_ids.begin(), std::unordered_set<TaskID> ready_task_id_set(ready_task_ids.begin(),
ready_task_ids.end()); ready_task_ids.end());
// Transition tasks from waiting to scheduled.
// (See design_docs/task_states.rst for the state transition diagram.)
local_queues_.MoveTasks(ready_task_id_set, TaskState::WAITING, TaskState::READY);
// New ready tasks appeared in the queue, try to dispatch them.
DispatchTasks();
// Check that remaining tasks that could not be transitioned are blocked // First filter out the tasks that should not be moved to READY.
// workers or drivers.
local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED); local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED);
local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER); local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER);
RAY_CHECK(ready_task_id_set.empty());
// Make sure that the remaining tasks are all WAITING.
auto ready_task_id_set_copy = ready_task_id_set;
local_queues_.FilterState(ready_task_id_set_copy, TaskState::WAITING);
RAY_CHECK(ready_task_id_set_copy.empty());
// Queue and dispatch the tasks that are ready to run (i.e., WAITING).
auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set);
local_queues_.QueueReadyTasks(ready_tasks);
const std::list<Task> ready_tasks_list(ready_tasks.begin(), ready_tasks.end());
// Dispatch only the new ready tasks whose dependencies were fulfilled.
// The other tasks in the ready queue can be ignored as no new resources
// have been added and no new worker has became available since the last
// time DispatchTasks() was called.
DispatchTasks(ready_tasks_list);
} }
} }
@ -1554,7 +1577,6 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
// node immediately. Send it to the scheduling policy to be placed again. // node immediately. Send it to the scheduling policy to be placed again.
local_queues_.QueuePlaceableTasks({task}); local_queues_.QueuePlaceableTasks({task});
ScheduleTasks(cluster_resource_map_); ScheduleTasks(cluster_resource_map_);
DispatchTasks();
} }
}); });
} }

View file

@ -150,8 +150,8 @@ class NodeManager {
/// Assign a task. The task is assumed to not be queued in local_queues_. /// Assign a task. The task is assumed to not be queued in local_queues_.
/// ///
/// \param task The task in question. /// \param task The task in question.
/// \return Void. /// \return true, if tasks was assigned to a worker, false otherwise.
void AssignTask(Task &task); bool AssignTask(const Task &task);
/// Handle a worker finishing its assigned task. /// Handle a worker finishing its assigned task.
/// ///
/// \param The worker that fiished the task. /// \param The worker that fiished the task.
@ -195,7 +195,20 @@ class NodeManager {
/// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to /// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to
/// "running" task state. /// "running" task state.
void DispatchTasks(); ///
/// This function is called in one of the following cases:
/// (1) A set of new tasks is added to the ready queue.
/// (2) New resources are becoming available on the local node.
/// (3) A new worker becomes available.
/// Note in case (1) we only need to look at the new tasks added to the
/// ready queue, as we know that the old tasks in the ready queue cannot
/// be scheduled (We checked those tasks last time new resources or
/// workers became available, and nothing changed since then.) In this case,
/// task_queue contains only the newly added tasks to the ready queue;
/// Otherwise, task_queue points to entire ready queue.
///
/// \param ready_tasks Tasks to be dispatched, a subset from ready queue.
void DispatchTasks(const std::list<Task> &ready_tasks);
/// Handle a task that is blocked. This could be a task assigned to a worker, /// Handle a task that is blocked. This could be a task assigned to a worker,
/// an out-of-band task (e.g., a thread created by the application), or a /// an out-of-band task (e.g., a thread created by the application), or a