[xray] Fix crash in case of spurious reconstruction (#2609)

* Exit if task already queued

* address comments
This commit is contained in:
Stephanie Wang 2018-08-09 14:46:46 -07:00 committed by Philipp Moritz
parent 2de9bfc7e3
commit f093ed1fc6
3 changed files with 20 additions and 0 deletions

View file

@ -824,6 +824,13 @@ void NodeManager::TreatTaskAsFailed(const TaskSpecification &spec) {
void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage,
bool forwarded) {
if (local_queues_.HasTask(task.GetTaskSpecification().TaskId())) {
RAY_LOG(WARNING) << "Submitted task " << task.GetTaskSpecification().TaskId()
<< " is already queued and will not be reconstructed. This is most "
"likely due to spurious reconstruction.";
return;
}
// Add the task and its uncommitted lineage to the lineage cache.
lineage_cache_.AddWaitingTask(task, uncommitted_lineage);

View file

@ -223,6 +223,13 @@ void SchedulingQueue::QueueMethodsWaitingForActorCreation(
QueueTasks(methods_waiting_for_actor_creation_, tasks);
}
bool SchedulingQueue::HasTask(const TaskID &task_id) const {
return (methods_waiting_for_actor_creation_.HasTask(task_id) ||
waiting_tasks_.HasTask(task_id) || placeable_tasks_.HasTask(task_id) ||
ready_tasks_.HasTask(task_id) || running_tasks_.HasTask(task_id) ||
blocked_tasks_.HasTask(task_id));
}
void SchedulingQueue::QueueWaitingTasks(const std::vector<Task> &tasks) {
QueueTasks(waiting_tasks_, tasks);
}

View file

@ -26,6 +26,12 @@ class SchedulingQueue {
/// SchedulingQueue destructor.
virtual ~SchedulingQueue() {}
/// \brief Check if the queue contains a specific task id.
///
/// \param task_id The task ID for the task.
/// \return Whether the task_id exists in the queue.
bool HasTask(const TaskID &task_id) const;
/// Get the queue of tasks that are destined for actors that have not yet
/// been created.
///