diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 5ddb5d74a..d431e8d1a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1294,26 +1294,40 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task, RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager " << node_manager_id; // Mark the failed task as pending to let other raylets know that we still - // have the task. Once the task is successfully retried, it will be - // canceled. TaskDependencyManager::TaskPending() is assumed to be + // have the task. TaskDependencyManager::TaskPending() is assumed to be // idempotent. task_dependency_manager_.TaskPending(task); - // Create a timer to resubmit the task in a little bit. TODO(rkn): Really - // this should be a unique_ptr instead of a shared_ptr. However, it's a - // little harder to move unique_ptrs into lambdas. - auto retry_timer = std::make_shared(io_service_); - auto retry_duration = boost::posix_time::milliseconds( - RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds()); - retry_timer->expires_from_now(retry_duration); - retry_timer->async_wait( - [this, task, task_id, retry_timer](const boost::system::error_code &error) { - // Timer killing will receive the boost::asio::error::operation_aborted, - // we only handle the timeout event. - RAY_CHECK(!error); - RAY_LOG(INFO) << "In ForwardTask retry callback for task " << task_id; - EnqueuePlaceableTask(task); - }); + // Actor tasks can only be executed at the actor's location, so they are + // retried after a timeout. All other tasks that fail to be forwarded are + // deemed to be placeable again. + if (task.GetTaskSpecification().IsActorTask()) { + // The task is for an actor on another node. Create a timer to resubmit + // the task in a little bit. TODO(rkn): Really this should be a + // unique_ptr instead of a shared_ptr. However, it's a little harder to + // move unique_ptrs into lambdas. + auto retry_timer = std::make_shared(io_service_); + auto retry_duration = boost::posix_time::milliseconds( + RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds()); + retry_timer->expires_from_now(retry_duration); + retry_timer->async_wait( + [this, task, task_id, retry_timer](const boost::system::error_code &error) { + // Timer killing will receive the boost::asio::error::operation_aborted, + // we only handle the timeout event. + RAY_CHECK(!error); + RAY_LOG(DEBUG) << "Resubmitting task " << task_id + << " because ForwardTask failed."; + SubmitTask(task, Lineage()); + }); + // Remove the task from the lineage cache. The task will get added back + // once it is resubmitted. + lineage_cache_.RemoveWaitingTask(task_id); + } else { + // The task is not for an actor and may therefore be placed on another + // node immediately. Send it to the scheduling policy to be placed again. + local_queues_.QueuePlaceableTasks({task}); + ScheduleTasks(); + } } } @@ -1384,10 +1398,6 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id) } } } - } else { - // TODO(atumanov): caller must handle ForwardTask failure. - RAY_LOG(WARNING) << "[NodeManager][ForwardTask] failed to forward task " << task_id - << " to node " << node_id; } return status; }