diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b98716979..d3060f697 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -644,10 +644,13 @@ void NodeManager::AssignTask(Task &task) { // If the task was an actor task, then record this execution to guarantee // consistency in the case of reconstruction. if (spec.IsActorTask()) { - // Extend the frontier to include the executing task. auto actor_entry = actor_registry_.find(spec.ActorId()); RAY_CHECK(actor_entry != actor_registry_.end()); - actor_entry->second.ExtendFrontier(spec.ActorHandleId(), spec.ActorDummyObject()); + auto execution_dependency = actor_entry->second.GetExecutionDependency(); + // The execution dependency is initialized to the actor creation task's + // return value, and is subsequently updated to the assigned tasks' + // return values, so it should never be nil. + RAY_CHECK(!execution_dependency.is_nil()); // Update the task's execution dependencies to reflect the actual // execution order, to support deterministic reconstruction. // NOTE(swang): The update of an actor task's execution dependencies is @@ -656,8 +659,9 @@ void NodeManager::AssignTask(Task &task) { // guarantee deterministic reconstruction ordering for tasks whose // updates are reflected in the task table. TaskExecutionSpecification &mutable_spec = task.GetTaskExecutionSpec(); - mutable_spec.SetExecutionDependencies( - {actor_entry->second.GetExecutionDependency()}); + mutable_spec.SetExecutionDependencies({execution_dependency}); + // Extend the frontier to include the executing task. + actor_entry->second.ExtendFrontier(spec.ActorHandleId(), spec.ActorDummyObject()); } // We started running the task, so the task is ready to write to GCS. lineage_cache_.AddReadyTask(task); @@ -690,7 +694,7 @@ void NodeManager::FinishAssignedTask(Worker &worker) { auto actor_notification = std::make_shared(); actor_notification->actor_id = actor_id.binary(); actor_notification->actor_creation_dummy_object_id = - task.GetTaskSpecification().ActorCreationDummyObjectId().binary(); + task.GetTaskSpecification().ActorDummyObject().binary(); // TODO(swang): The driver ID. actor_notification->driver_id = JobID::nil().binary(); actor_notification->node_manager_id = diff --git a/src/ray/raylet/task_execution_spec.cc b/src/ray/raylet/task_execution_spec.cc index 91473f557..c5b1486d5 100644 --- a/src/ray/raylet/task_execution_spec.cc +++ b/src/ray/raylet/task_execution_spec.cc @@ -32,6 +32,7 @@ std::vector TaskExecutionSpecification::ExecutionDependencies() const void TaskExecutionSpecification::SetExecutionDependencies( const std::vector &dependencies) { + execution_spec_.dependencies.clear(); for (const auto &dependency : dependencies) { execution_spec_.dependencies.push_back(dependency.binary()); }