Queue tasks in the raylet in between async callbacks (#4766)

* Add a SWAP TaskQueue so that we can keep track of tasks that are temporarily dequeued

* Fix bug where tasks that fail to be forwarded don't appear to be local by adding them to SWAP queue

* cleanups

* updates

* updates
This commit is contained in:
Stephanie Wang 2019-05-15 10:23:25 -07:00 committed by GitHub
parent 3bbafc7105
commit cb1a195ca2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 77 additions and 30 deletions

View file

@ -774,7 +774,10 @@ void NodeManager::DispatchTasks(
}
}
}
local_queues_.RemoveTasks(removed_task_ids);
// Move the ASSIGNED task to the SWAP queue so that we remember that we have
// it queued locally. Once the GetTaskReply has been sent, the task will get
// re-queued, depending on whether the message succeeded or not.
local_queues_.MoveTasks(removed_task_ids, TaskState::READY, TaskState::SWAP);
}
void NodeManager::ProcessClientMessage(
@ -1825,11 +1828,15 @@ bool NodeManager::AssignTask(const Task &task) {
auto message = protocol::CreateGetTaskReply(fbb, spec.ToFlatbuffer(fbb),
fbb.CreateVector(resource_id_set_flatbuf));
fbb.Finish(message);
// Give the callback a copy of the task so it can modify it.
Task assigned_task(task);
const auto &task_id = spec.TaskId();
worker->Connection()->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::ExecuteTask), fbb.GetSize(),
fbb.GetBufferPointer(), [this, worker, assigned_task](ray::Status status) mutable {
fbb.GetBufferPointer(), [this, worker, task_id](ray::Status status) {
// Remove the ASSIGNED task from the SWAP queue.
TaskState state;
auto assigned_task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);
if (status.ok()) {
auto spec = assigned_task.GetTaskSpecification();
// We successfully assigned the task to the worker.
@ -2212,9 +2219,9 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
/// TODO(rkn): Should we check that the node manager is remote and not local?
/// TODO(rkn): Should we check if the remote node manager is known to be dead?
// Attempt to forward the task.
ForwardTask(task, node_manager_id, [this, task, node_manager_id](ray::Status error) {
ForwardTask(task, node_manager_id, [this, node_manager_id](ray::Status error,
const Task &task) {
const TaskID task_id = task.GetTaskSpecification().TaskId();
RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
<< node_manager_id;
@ -2236,14 +2243,22 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds());
retry_timer->expires_from_now(retry_duration);
retry_timer->async_wait(
[this, task, task_id, retry_timer](const boost::system::error_code &error) {
[this, task_id, retry_timer](const boost::system::error_code &error) {
// Timer killing will receive the boost::asio::error::operation_aborted,
// we only handle the timeout event.
RAY_CHECK(!error);
RAY_LOG(INFO) << "Resubmitting task " << task_id
<< " because ForwardTask failed.";
// Remove the RESUBMITTED task from the SWAP queue.
TaskState state;
const auto task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);
// Submit the task again.
SubmitTask(task, Lineage());
});
// Temporarily move the RESUBMITTED task to the SWAP queue while the
// timer is active.
local_queues_.QueueTasks({task}, TaskState::SWAP);
// Remove the task from the lineage cache. The task will get added back
// once it is resubmitted.
lineage_cache_.RemoveWaitingTask(task_id);
@ -2256,8 +2271,9 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
});
}
void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
const std::function<void(const ray::Status &)> &on_error) {
void NodeManager::ForwardTask(
const Task &task, const ClientID &node_id,
const std::function<void(const ray::Status &, const Task &)> &on_error) {
const auto &spec = task.GetTaskSpecification();
auto task_id = spec.TaskId();
@ -2291,16 +2307,25 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
if (it == remote_server_connections_.end()) {
// TODO(atumanov): caller must handle failure to ensure tasks are not lost.
RAY_LOG(INFO) << "No NodeManager connection found for GCS client id " << node_id;
on_error(ray::Status::IOError("NodeManager connection not found"));
on_error(ray::Status::IOError("NodeManager connection not found"), task);
return;
}
auto &server_conn = it->second;
// Move the FORWARDING task to the SWAP queue so that we remember that we
// have it queued locally. Once the ForwardTaskRequest has been sent, the
// task will get re-queued, depending on whether the message succeeded or
// not.
local_queues_.QueueTasks({task}, TaskState::SWAP);
server_conn->WriteMessageAsync(
static_cast<int64_t>(protocol::MessageType::ForwardTaskRequest), fbb.GetSize(),
fbb.GetBufferPointer(),
[this, on_error, task_id, node_id, spec](ray::Status status) {
fbb.GetBufferPointer(), [this, on_error, task_id, node_id](ray::Status status) {
// Remove the FORWARDING task from the SWAP queue.
TaskState state;
const auto task = local_queues_.RemoveTask(task_id, &state);
RAY_CHECK(state == TaskState::SWAP);
if (status.ok()) {
const auto &spec = task.GetTaskSpecification();
// If we were able to forward the task, remove the forwarded task from the
// lineage cache since the receiving node is now responsible for writing
// the task to the GCS.
@ -2335,7 +2360,7 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
}
}
} else {
on_error(status);
on_error(status, task);
}
});
}

View file

@ -246,8 +246,9 @@ class NodeManager {
/// \param task The task to forward.
/// \param node_id The ID of the node to forward the task to.
/// \param on_error Callback on run on non-ok status.
void ForwardTask(const Task &task, const ClientID &node_id,
const std::function<void(const ray::Status &)> &on_error);
void ForwardTask(
const Task &task, const ClientID &node_id,
const std::function<void(const ray::Status &, const Task &)> &on_error);
/// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to
/// "running" task state.

View file

@ -9,7 +9,8 @@ namespace {
static constexpr const char *task_state_strings[] = {
"placeable", "waiting", "ready",
"running", "infeasible", "waiting_for_actor_creation"};
"running", "infeasible", "waiting for actor creation",
"swap"};
static_assert(sizeof(task_state_strings) / sizeof(const char *) ==
static_cast<int>(ray::raylet::TaskState::kNumTaskQueues),
"Must specify a TaskState name for every task queue");
@ -172,6 +173,9 @@ void SchedulingQueue::FilterState(std::unordered_set<TaskID> &task_ids,
case TaskState::INFEASIBLE:
FilterStateFromQueue(task_ids, TaskState::INFEASIBLE);
break;
case TaskState::SWAP:
FilterStateFromQueue(task_ids, TaskState::SWAP);
break;
case TaskState::BLOCKED: {
const auto blocked_ids = GetBlockedTaskIds();
for (auto it = task_ids.begin(); it != task_ids.end();) {
@ -230,7 +234,7 @@ std::vector<Task> SchedulingQueue::RemoveTasks(std::unordered_set<TaskID> &task_
// Try to find the tasks to remove from the queues.
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY, TaskState::RUNNING,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION, TaskState::SWAP,
}) {
RemoveTasksFromQueue(task_state, task_ids, &removed_tasks);
}
@ -245,7 +249,7 @@ Task SchedulingQueue::RemoveTask(const TaskID &task_id, TaskState *removed_task_
// Try to find the task to remove in the queues.
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY, TaskState::RUNNING,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION, TaskState::SWAP,
}) {
RemoveTasksFromQueue(task_state, task_id_set, &removed_tasks);
if (task_id_set.empty()) {
@ -260,7 +264,7 @@ Task SchedulingQueue::RemoveTask(const TaskID &task_id, TaskState *removed_task_
}
// Make sure we got the removed task.
RAY_CHECK(removed_tasks.size() == 1);
RAY_CHECK(removed_tasks.size() == 1) << task_id;
const auto &task = removed_tasks.front();
RAY_CHECK(task.GetTaskSpecification().TaskId() == task_id);
return task;
@ -287,6 +291,9 @@ void SchedulingQueue::MoveTasks(std::unordered_set<TaskID> &task_ids, TaskState
case TaskState::INFEASIBLE:
RemoveTasksFromQueue(TaskState::INFEASIBLE, task_ids, &removed_tasks);
break;
case TaskState::SWAP:
RemoveTasksFromQueue(TaskState::SWAP, task_ids, &removed_tasks);
break;
default:
RAY_LOG(FATAL) << "Attempting to move tasks from unrecognized state "
<< static_cast<std::underlying_type<TaskState>::type>(src_state);
@ -312,6 +319,9 @@ void SchedulingQueue::MoveTasks(std::unordered_set<TaskID> &task_ids, TaskState
case TaskState::INFEASIBLE:
QueueTasks(removed_tasks, TaskState::INFEASIBLE);
break;
case TaskState::SWAP:
QueueTasks(removed_tasks, TaskState::SWAP);
break;
default:
RAY_LOG(FATAL) << "Attempting to move tasks to unrecognized state "
<< static_cast<std::underlying_type<TaskState>::type>(dst_state);
@ -348,8 +358,16 @@ std::unordered_set<TaskID> SchedulingQueue::GetTaskIdsForDriver(
std::unordered_set<TaskID> SchedulingQueue::GetTaskIdsForActor(
const ActorID &actor_id) const {
std::unordered_set<TaskID> task_ids;
int swap = static_cast<int>(TaskState::SWAP);
int i = 0;
for (const auto &task_queue : task_queues_) {
GetActorTasksFromQueue(*task_queue, actor_id, task_ids);
// This is a hack to make sure that we don't remove tasks from the SWAP
// queue, since these are always guaranteed to be removed and eventually
// resubmitted if necessary by the node manager.
if (i != swap) {
GetActorTasksFromQueue(*task_queue, actor_id, task_ids);
}
i++;
}
return task_ids;
}
@ -385,10 +403,8 @@ const std::unordered_set<TaskID> &SchedulingQueue::GetDriverTaskIds() const {
std::string SchedulingQueue::DebugString() const {
std::stringstream result;
result << "SchedulingQueue:";
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY, TaskState::RUNNING,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION,
}) {
for (size_t i = 0; i < static_cast<int>(ray::raylet::TaskState::kNumTaskQueues); i++) {
TaskState task_state = static_cast<TaskState>(i);
result << "\n- num " << GetTaskStateString(task_state)
<< " tasks: " << GetTaskQueue(task_state)->GetTasks().size();
}
@ -397,10 +413,8 @@ std::string SchedulingQueue::DebugString() const {
}
void SchedulingQueue::RecordMetrics() const {
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY, TaskState::RUNNING,
TaskState::INFEASIBLE, TaskState::WAITING_FOR_ACTOR_CREATION,
}) {
for (size_t i = 0; i < static_cast<int>(ray::raylet::TaskState::kNumTaskQueues); i++) {
TaskState task_state = static_cast<TaskState>(i);
stats::SchedulingQueueStats().Record(
static_cast<double>(GetTaskQueue(task_state)->GetTasks().size()),
{{stats::ValueTypeKey,

View file

@ -33,6 +33,13 @@ enum class TaskState {
// The task is an actor method and is waiting to learn where the actor was
// created.
WAITING_FOR_ACTOR_CREATION,
// Swap queue for tasks that are in between states. This can happen when a
// task is removed from one queue, and an async callback is responsible for
// re-queuing the task. For example, a READY task that has just been assigned
// to a worker will get moved to the SWAP queue while waiting for a response
// from the worker. If the worker accepts the task, the task will be added to
// the RUNNING queue, else it will be returned to READY.
SWAP,
// The number of task queues. All states that precede this enum must have an
// associated TaskQueue in SchedulingQueue. All states that succeed
// this enum do not have an associated TaskQueue, since the tasks
@ -144,7 +151,7 @@ class SchedulingQueue {
for (const auto &task_state : {
TaskState::PLACEABLE, TaskState::WAITING, TaskState::READY,
TaskState::RUNNING, TaskState::INFEASIBLE,
TaskState::WAITING_FOR_ACTOR_CREATION,
TaskState::WAITING_FOR_ACTOR_CREATION, TaskState::SWAP,
}) {
if (task_state == TaskState::READY) {
task_queues_[static_cast<int>(task_state)] = ready_queue_;