[xray] Fix bug in updating actor execution dependencies (#2064)

* [xray] FIX: bugs in actor execution

* comments

* Stronger check
This commit is contained in:
Stephanie Wang 2018-05-18 12:45:17 -07:00 committed by GitHub
parent 0b07602c89
commit 71e5cca59f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 10 additions and 5 deletions

View file

@ -644,10 +644,13 @@ void NodeManager::AssignTask(Task &task) {
// If the task was an actor task, then record this execution to guarantee
// consistency in the case of reconstruction.
if (spec.IsActorTask()) {
// Extend the frontier to include the executing task.
auto actor_entry = actor_registry_.find(spec.ActorId());
RAY_CHECK(actor_entry != actor_registry_.end());
actor_entry->second.ExtendFrontier(spec.ActorHandleId(), spec.ActorDummyObject());
auto execution_dependency = actor_entry->second.GetExecutionDependency();
// The execution dependency is initialized to the actor creation task's
// return value, and is subsequently updated to the assigned tasks'
// return values, so it should never be nil.
RAY_CHECK(!execution_dependency.is_nil());
// Update the task's execution dependencies to reflect the actual
// execution order, to support deterministic reconstruction.
// NOTE(swang): The update of an actor task's execution dependencies is
@ -656,8 +659,9 @@ void NodeManager::AssignTask(Task &task) {
// guarantee deterministic reconstruction ordering for tasks whose
// updates are reflected in the task table.
TaskExecutionSpecification &mutable_spec = task.GetTaskExecutionSpec();
mutable_spec.SetExecutionDependencies(
{actor_entry->second.GetExecutionDependency()});
mutable_spec.SetExecutionDependencies({execution_dependency});
// Extend the frontier to include the executing task.
actor_entry->second.ExtendFrontier(spec.ActorHandleId(), spec.ActorDummyObject());
}
// We started running the task, so the task is ready to write to GCS.
lineage_cache_.AddReadyTask(task);
@ -690,7 +694,7 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
auto actor_notification = std::make_shared<ActorTableDataT>();
actor_notification->actor_id = actor_id.binary();
actor_notification->actor_creation_dummy_object_id =
task.GetTaskSpecification().ActorCreationDummyObjectId().binary();
task.GetTaskSpecification().ActorDummyObject().binary();
// TODO(swang): The driver ID.
actor_notification->driver_id = JobID::nil().binary();
actor_notification->node_manager_id =

View file

@ -32,6 +32,7 @@ std::vector<ObjectID> TaskExecutionSpecification::ExecutionDependencies() const
void TaskExecutionSpecification::SetExecutionDependencies(
const std::vector<ObjectID> &dependencies) {
execution_spec_.dependencies.clear();
for (const auto &dependency : dependencies) {
execution_spec_.dependencies.push_back(dependency.binary());
}