From 3b42d5ccb13abc02f0976f2f1aa6ea04a56f988a Mon Sep 17 00:00:00 2001 From: vipulharsh Date: Thu, 11 Jul 2019 14:52:04 -0700 Subject: [PATCH] Track newly created actor's parent actor (#5098) * Track parent actor of actor * Update src/ray/raylet/node_manager.cc Co-Authored-By: Stephanie Wang * Update src/ray/raylet/node_manager.cc Co-Authored-By: Stephanie Wang * fixing a comment * Fixing typo in a comment * capturing task_spec instead of actor_data * adding const for some local variables * changing an if else to else * Linted version * use updated method to create task from task_data Change-Id: I9c1a65134dc23a2d175047e96b86ab9d9cf61971 * fixing linter issues Change-Id: I1def06218130b399d2527b999258aecf9abb98dd --- src/ray/protobuf/gcs.proto | 14 +- src/ray/raylet/node_manager.cc | 227 +++++++++++++++++++++------------ src/ray/raylet/node_manager.h | 26 +++- 3 files changed, 176 insertions(+), 91 deletions(-) diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 9ed92f7ea..5dcc36662 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -89,20 +89,22 @@ message ActorTableData { } // The ID of the actor that was created. bytes actor_id = 1; + // The ID of the actor that created it, null if created by non-actor task. + bytes parent_actor_id = 2; // The dummy object ID returned by the actor creation task. If the actor // dies, then this is the object that should be reconstructed for the actor // to be recreated. - bytes actor_creation_dummy_object_id = 2; + bytes actor_creation_dummy_object_id = 3; // The ID of the job that created the actor. - bytes job_id = 3; + bytes job_id = 4; // The ID of the node manager that created the actor. - bytes node_manager_id = 4; + bytes node_manager_id = 5; // Current state of this actor. - ActorState state = 5; + ActorState state = 6; // Max number of times this actor should be reconstructed. - uint64 max_reconstructions = 6; + uint64 max_reconstructions = 7; // Remaining number of reconstructions. - uint64 remaining_reconstructions = 7; + uint64 remaining_reconstructions = 8; } message ErrorTableData { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index ca9006e95..4a5d2c612 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1840,9 +1840,10 @@ void NodeManager::FinishAssignedTask(Worker &worker) { } } -ActorTableData NodeManager::CreateActorTableDataFromCreationTask(const Task &task) { - RAY_CHECK(task.GetTaskSpecification().IsActorCreationTask()); - auto actor_id = task.GetTaskSpecification().ActorCreationId(); +ActorTableData NodeManager::CreateActorTableDataFromCreationTask( + const TaskSpecification &task_spec) { + RAY_CHECK(task_spec.IsActorCreationTask()); + auto actor_id = task_spec.ActorCreationId(); auto actor_entry = actor_registry_.find(actor_id); ActorTableData new_actor_data; // TODO(swang): If this is an actor that was reconstructed, and previous @@ -1854,14 +1855,12 @@ ActorTableData NodeManager::CreateActorTableDataFromCreationTask(const Task &tas // change even if the actor fails or is reconstructed. new_actor_data.set_actor_id(actor_id.Binary()); new_actor_data.set_actor_creation_dummy_object_id( - task.GetTaskSpecification().ActorDummyObject().Binary()); - new_actor_data.set_job_id(task.GetTaskSpecification().JobId().Binary()); - new_actor_data.set_max_reconstructions( - task.GetTaskSpecification().MaxActorReconstructions()); + task_spec.ActorDummyObject().Binary()); + new_actor_data.set_job_id(task_spec.JobId().Binary()); + new_actor_data.set_max_reconstructions(task_spec.MaxActorReconstructions()); // This is the first time that the actor has been created, so the number // of remaining reconstructions is the max. - new_actor_data.set_remaining_reconstructions( - task.GetTaskSpecification().MaxActorReconstructions()); + new_actor_data.set_remaining_reconstructions(task_spec.MaxActorReconstructions()); } else { // If we've already seen this actor, it means that this actor was reconstructed. // Thus, its previous state must be RECONSTRUCTING. @@ -1885,95 +1884,159 @@ ActorTableData NodeManager::CreateActorTableDataFromCreationTask(const Task &tas void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { ActorID actor_id; ActorHandleID actor_handle_id; + const TaskSpecification task_spec = task.GetTaskSpecification(); bool resumed_from_checkpoint = false; - if (task.GetTaskSpecification().IsActorCreationTask()) { - actor_id = task.GetTaskSpecification().ActorCreationId(); + if (task_spec.IsActorCreationTask()) { + actor_id = task_spec.ActorCreationId(); actor_handle_id = ActorHandleID::Nil(); if (checkpoint_id_to_restore_.count(actor_id) > 0) { resumed_from_checkpoint = true; } } else { - actor_id = task.GetTaskSpecification().ActorId(); - actor_handle_id = task.GetTaskSpecification().ActorHandleId(); + actor_id = task_spec.ActorId(); + actor_handle_id = task_spec.ActorHandleId(); } - if (task.GetTaskSpecification().IsActorCreationTask()) { + if (task_spec.IsActorCreationTask()) { // This was an actor creation task. Convert the worker to an actor. worker.AssignActorId(actor_id); - // Notify the other node managers that the actor has been created. - const auto new_actor_data = CreateActorTableDataFromCreationTask(task); - if (resumed_from_checkpoint) { - // This actor was resumed from a checkpoint. In this case, we first look - // up the checkpoint in GCS and use it to restore the actor registration - // and frontier. - const auto checkpoint_id = checkpoint_id_to_restore_[actor_id]; - checkpoint_id_to_restore_.erase(actor_id); - RAY_LOG(DEBUG) << "Looking up checkpoint " << checkpoint_id << " for actor " - << actor_id; - RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Lookup( - JobID::Nil(), checkpoint_id, - [this, actor_id, new_actor_data](ray::gcs::AsyncGcsClient *client, - const UniqueID &checkpoint_id, - const ActorCheckpointData &checkpoint_data) { - RAY_LOG(INFO) << "Restoring registration for actor " << actor_id - << " from checkpoint " << checkpoint_id; - ActorRegistration actor_registration = - ActorRegistration(new_actor_data, checkpoint_data); - // Mark the unreleased dummy objects in the checkpoint frontier as local. - for (const auto &entry : actor_registration.GetDummyObjects()) { - HandleObjectLocal(entry.first); + // Lookup the parent actor id. + auto parent_task_id = task_spec.ParentTaskId(); + RAY_CHECK(actor_handle_id.IsNil()); + RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup( + JobID::Nil(), parent_task_id, + /*success_callback=*/ + [this, task_spec, resumed_from_checkpoint]( + ray::gcs::AsyncGcsClient *client, const TaskID &parent_task_id, + const TaskTableData &parent_task_data) { + // The task was in the GCS task table. Use the stored task spec to + // get the parent actor id. + Task parent_task(parent_task_data.task()); + ActorID parent_actor_id; + if (parent_task.GetTaskSpecification().IsActorCreationTask()) { + parent_actor_id = parent_task.GetTaskSpecification().ActorCreationId(); + } else { + parent_actor_id = parent_task.GetTaskSpecification().ActorId(); + } + FinishAssignedActorCreationTask(parent_actor_id, task_spec, + resumed_from_checkpoint); + }, + /*failure_callback=*/ + [this, task_spec, resumed_from_checkpoint](ray::gcs::AsyncGcsClient *client, + const TaskID &parent_task_id) { + // The parent task was not in the GCS task table. It should most likely be in + // the + // lineage cache. + ActorID parent_actor_id = ActorID::Nil(); + if (lineage_cache_.ContainsTask(parent_task_id)) { + // Use a copy of the cached task spec to get the parent actor id. + Task parent_task = lineage_cache_.GetTaskOrDie(parent_task_id); + if (parent_task.GetTaskSpecification().IsActorCreationTask()) { + parent_actor_id = parent_task.GetTaskSpecification().ActorCreationId(); + } else { + parent_actor_id = parent_task.GetTaskSpecification().ActorId(); } - HandleActorStateTransition(actor_id, std::move(actor_registration)); - PublishActorStateTransition( - actor_id, new_actor_data, - /*failure_callback=*/ - [](gcs::AsyncGcsClient *client, const ActorID &id, - const ActorTableData &data) { - // Only one node at a time should succeed at creating the actor. - RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id; - }); - }, - [actor_id](ray::gcs::AsyncGcsClient *client, const UniqueID &checkpoint_id) { - RAY_LOG(FATAL) << "Couldn't find checkpoint " << checkpoint_id - << " for actor " << actor_id << " in GCS."; - })); - } else { - // The actor did not resume from a checkpoint. Immediately notify the - // other node managers that the actor has been created. - HandleActorStateTransition(actor_id, ActorRegistration(new_actor_data)); - PublishActorStateTransition( - actor_id, new_actor_data, - /*failure_callback=*/ - [](gcs::AsyncGcsClient *client, const ActorID &id, const ActorTableData &data) { - // Only one node at a time should succeed at creating the actor. - RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id; - }); - } + } else { + RAY_LOG(WARNING) + << "Task metadata not found in either GCS or lineage cache. It may have " + "been " + "evicted " + << "by the redis LRU configuration. Consider increasing the memory " + "allocation via " + << "ray.init(redis_max_memory=)."; + } + FinishAssignedActorCreationTask(parent_actor_id, task_spec, + resumed_from_checkpoint); + })); + } else { + // The actor was not resumed from a checkpoint. We extend the actor's + // frontier as usual since there is no frontier to restore. + ExtendActorFrontier(task_spec.ActorDummyObject(), actor_id, actor_handle_id); } +} +void NodeManager::ExtendActorFrontier(const ObjectID &dummy_object, + const ActorID &actor_id, + const ActorHandleID &actor_handle_id) { + auto actor_entry = actor_registry_.find(actor_id); + RAY_CHECK(actor_entry != actor_registry_.end()); + // Extend the actor's frontier to include the executed task. + const ObjectID object_to_release = + actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object); + if (!object_to_release.IsNil()) { + // If there were no new actor handles created, then no other actor task + // will depend on this execution dependency, so it safe to release. + HandleObjectMissing(object_to_release); + } + // Mark the dummy object as locally available to indicate that the actor's + // state has changed and the next method can run. This is not added to the + // object table, so the update will be invisible to both the local object + // manager and the other nodes. + // NOTE(swang): The dummy objects must be marked as local whenever + // ExtendFrontier is called, and vice versa, so that we can clean up the + // dummy objects properly in case the actor fails and needs to be + // reconstructed. + HandleObjectLocal(dummy_object); +} + +void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id, + const TaskSpecification &task_spec, + bool resumed_from_checkpoint) { + // Notify the other node managers that the actor has been created. + const ActorID actor_id = task_spec.ActorCreationId(); + auto new_actor_data = CreateActorTableDataFromCreationTask(task_spec); + new_actor_data.set_parent_actor_id(parent_actor_id.Binary()); + if (resumed_from_checkpoint) { + // This actor was resumed from a checkpoint. In this case, we first look + // up the checkpoint in GCS and use it to restore the actor registration + // and frontier. + const auto checkpoint_id = checkpoint_id_to_restore_[actor_id]; + checkpoint_id_to_restore_.erase(actor_id); + RAY_LOG(DEBUG) << "Looking up checkpoint " << checkpoint_id << " for actor " + << actor_id; + RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Lookup( + JobID::Nil(), checkpoint_id, + [this, actor_id, new_actor_data](ray::gcs::AsyncGcsClient *client, + const UniqueID &checkpoint_id, + const ActorCheckpointData &checkpoint_data) { + RAY_LOG(INFO) << "Restoring registration for actor " << actor_id + << " from checkpoint " << checkpoint_id; + ActorRegistration actor_registration = + ActorRegistration(new_actor_data, checkpoint_data); + // Mark the unreleased dummy objects in the checkpoint frontier as local. + for (const auto &entry : actor_registration.GetDummyObjects()) { + HandleObjectLocal(entry.first); + } + HandleActorStateTransition(actor_id, std::move(actor_registration)); + PublishActorStateTransition( + actor_id, new_actor_data, + /*failure_callback=*/ + [](gcs::AsyncGcsClient *client, const ActorID &id, + const ActorTableData &data) { + // Only one node at a time should succeed at creating the actor. + RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id; + }); + }, + [actor_id](ray::gcs::AsyncGcsClient *client, const UniqueID &checkpoint_id) { + RAY_LOG(FATAL) << "Couldn't find checkpoint " << checkpoint_id << " for actor " + << actor_id << " in GCS."; + })); + } else { + // The actor did not resume from a checkpoint. Immediately notify the + // other node managers that the actor has been created. + HandleActorStateTransition(actor_id, ActorRegistration(new_actor_data)); + PublishActorStateTransition( + actor_id, new_actor_data, + /*failure_callback=*/ + [](gcs::AsyncGcsClient *client, const ActorID &id, const ActorTableData &data) { + // Only one node at a time should succeed at creating the actor. + RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id; + }); + } if (!resumed_from_checkpoint) { // The actor was not resumed from a checkpoint. We extend the actor's // frontier as usual since there is no frontier to restore. - auto actor_entry = actor_registry_.find(actor_id); - RAY_CHECK(actor_entry != actor_registry_.end()); - // Extend the actor's frontier to include the executed task. - const auto dummy_object = task.GetTaskSpecification().ActorDummyObject(); - const ObjectID object_to_release = - actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object); - if (!object_to_release.IsNil()) { - // If there were no new actor handles created, then no other actor task - // will depend on this execution dependency, so it safe to release. - HandleObjectMissing(object_to_release); - } - // Mark the dummy object as locally available to indicate that the actor's - // state has changed and the next method can run. This is not added to the - // object table, so the update will be invisible to both the local object - // manager and the other nodes. - // NOTE(swang): The dummy objects must be marked as local whenever - // ExtendFrontier is called, and vice versa, so that we can clean up the - // dummy objects properly in case the actor fails and needs to be - // reconstructed. - HandleObjectLocal(dummy_object); + ExtendActorFrontier(task_spec.ActorDummyObject(), actor_id, ActorHandleID::Nil()); } } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index fa92fa73e..9fbb4a9bc 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -218,13 +218,33 @@ class NodeManager : public rpc::NodeManagerServiceHandler { void FinishAssignedTask(Worker &worker); /// Helper function to produce actor table data for a newly created actor. /// - /// \param task The actor creation task that created the actor. - ActorTableData CreateActorTableDataFromCreationTask(const Task &task); + /// \param task_spec Task specification of the actor creation task that created the + /// actor. + ActorTableData CreateActorTableDataFromCreationTask(const TaskSpecification &task_spec); /// Handle a worker finishing an assigned actor task or actor creation task. /// \param worker The worker that finished the task. - /// \param task The actor task or actor creationt ask. + /// \param task The actor task or actor creation task. /// \return Void. void FinishAssignedActorTask(Worker &worker, const Task &task); + /// Helper function for handling worker to finish its assigned actor task + /// or actor creation task. Gets invoked when tasks's parent actor is known. + /// + /// \param actor_id The actor id corresponding to the actor (creation) task. + /// \param actor_handle_id The actor id corresponding to the actor (creation) task. + /// \param new_actor_data The struct which will be used to register the task. + /// \param resumed_from_checkpoint If the actor was resumed from a checkpoint. + /// \param dummy_object Dummy object corresponding to the actor creation task. + /// \return Void. + void FinishAssignedActorCreationTask(const ActorID &parent_actor_id, + const TaskSpecification &task_spec, + bool resumed_from_checkpoint); + /// Extend actor frontier after an actor task or actor creation task executes. + /// + /// \param dummy_object Dummy object corresponding to the task. + /// \param actor_id The relevant actor ID. + /// \param actor_handle_id The relevant actor handle ID. + void ExtendActorFrontier(const ObjectID &dummy_object, const ActorID &actor_id, + const ActorHandleID &actor_handle_id); /// Make a placement decision for placeable tasks given the resource_map /// provided. This will perform task state transitions and task forwarding. ///