[xray] Resubmit tasks that fail to be forwarded (#2645)

This commit is contained in:
Stephanie Wang 2018-08-16 02:12:56 -05:00 committed by Robert Nishihara
parent dd924a388b
commit e3e0cfce87

View file

@ -1294,26 +1294,40 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
<< node_manager_id;
// Mark the failed task as pending to let other raylets know that we still
// have the task. Once the task is successfully retried, it will be
// canceled. TaskDependencyManager::TaskPending() is assumed to be
// have the task. TaskDependencyManager::TaskPending() is assumed to be
// idempotent.
task_dependency_manager_.TaskPending(task);
// Create a timer to resubmit the task in a little bit. TODO(rkn): Really
// this should be a unique_ptr instead of a shared_ptr. However, it's a
// little harder to move unique_ptrs into lambdas.
auto retry_timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
auto retry_duration = boost::posix_time::milliseconds(
RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds());
retry_timer->expires_from_now(retry_duration);
retry_timer->async_wait(
[this, task, task_id, retry_timer](const boost::system::error_code &error) {
// Timer killing will receive the boost::asio::error::operation_aborted,
// we only handle the timeout event.
RAY_CHECK(!error);
RAY_LOG(INFO) << "In ForwardTask retry callback for task " << task_id;
EnqueuePlaceableTask(task);
});
// Actor tasks can only be executed at the actor's location, so they are
// retried after a timeout. All other tasks that fail to be forwarded are
// deemed to be placeable again.
if (task.GetTaskSpecification().IsActorTask()) {
// The task is for an actor on another node. Create a timer to resubmit
// the task in a little bit. TODO(rkn): Really this should be a
// unique_ptr instead of a shared_ptr. However, it's a little harder to
// move unique_ptrs into lambdas.
auto retry_timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
auto retry_duration = boost::posix_time::milliseconds(
RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds());
retry_timer->expires_from_now(retry_duration);
retry_timer->async_wait(
[this, task, task_id, retry_timer](const boost::system::error_code &error) {
// Timer killing will receive the boost::asio::error::operation_aborted,
// we only handle the timeout event.
RAY_CHECK(!error);
RAY_LOG(DEBUG) << "Resubmitting task " << task_id
<< " because ForwardTask failed.";
SubmitTask(task, Lineage());
});
// Remove the task from the lineage cache. The task will get added back
// once it is resubmitted.
lineage_cache_.RemoveWaitingTask(task_id);
} else {
// The task is not for an actor and may therefore be placed on another
// node immediately. Send it to the scheduling policy to be placed again.
local_queues_.QueuePlaceableTasks({task});
ScheduleTasks();
}
}
}
@ -1384,10 +1398,6 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id)
}
}
}
} else {
// TODO(atumanov): caller must handle ForwardTask failure.
RAY_LOG(WARNING) << "[NodeManager][ForwardTask] failed to forward task " << task_id
<< " to node " << node_id;
}
return status;
}