Track newly created actor's parent actor (#5098)

* Track parent actor of actor

* Update src/ray/raylet/node_manager.cc

Co-Authored-By: Stephanie Wang <swang@cs.berkeley.edu>

* Update src/ray/raylet/node_manager.cc

Co-Authored-By: Stephanie Wang <swang@cs.berkeley.edu>

* fixing a comment

* Fixing typo in a comment

* capturing task_spec instead of actor_data

* adding const for some local variables

* changing an if else to else

* Linted version

* use updated method to create task from task_data

Change-Id: I9c1a65134dc23a2d175047e96b86ab9d9cf61971

* fixing linter issues

Change-Id: I1def06218130b399d2527b999258aecf9abb98dd
This commit is contained in:
vipulharsh 2019-07-11 14:52:04 -07:00 committed by Stephanie Wang
parent 3456afdea7
commit 3b42d5ccb1
3 changed files with 176 additions and 91 deletions

View file

@ -89,20 +89,22 @@ message ActorTableData {
}
// The ID of the actor that was created.
bytes actor_id = 1;
// The ID of the actor that created it, null if created by non-actor task.
bytes parent_actor_id = 2;
// The dummy object ID returned by the actor creation task. If the actor
// dies, then this is the object that should be reconstructed for the actor
// to be recreated.
bytes actor_creation_dummy_object_id = 2;
bytes actor_creation_dummy_object_id = 3;
// The ID of the job that created the actor.
bytes job_id = 3;
bytes job_id = 4;
// The ID of the node manager that created the actor.
bytes node_manager_id = 4;
bytes node_manager_id = 5;
// Current state of this actor.
ActorState state = 5;
ActorState state = 6;
// Max number of times this actor should be reconstructed.
uint64 max_reconstructions = 6;
uint64 max_reconstructions = 7;
// Remaining number of reconstructions.
uint64 remaining_reconstructions = 7;
uint64 remaining_reconstructions = 8;
}
message ErrorTableData {

View file

@ -1840,9 +1840,10 @@ void NodeManager::FinishAssignedTask(Worker &worker) {
}
}
ActorTableData NodeManager::CreateActorTableDataFromCreationTask(const Task &task) {
RAY_CHECK(task.GetTaskSpecification().IsActorCreationTask());
auto actor_id = task.GetTaskSpecification().ActorCreationId();
ActorTableData NodeManager::CreateActorTableDataFromCreationTask(
const TaskSpecification &task_spec) {
RAY_CHECK(task_spec.IsActorCreationTask());
auto actor_id = task_spec.ActorCreationId();
auto actor_entry = actor_registry_.find(actor_id);
ActorTableData new_actor_data;
// TODO(swang): If this is an actor that was reconstructed, and previous
@ -1854,14 +1855,12 @@ ActorTableData NodeManager::CreateActorTableDataFromCreationTask(const Task &tas
// change even if the actor fails or is reconstructed.
new_actor_data.set_actor_id(actor_id.Binary());
new_actor_data.set_actor_creation_dummy_object_id(
task.GetTaskSpecification().ActorDummyObject().Binary());
new_actor_data.set_job_id(task.GetTaskSpecification().JobId().Binary());
new_actor_data.set_max_reconstructions(
task.GetTaskSpecification().MaxActorReconstructions());
task_spec.ActorDummyObject().Binary());
new_actor_data.set_job_id(task_spec.JobId().Binary());
new_actor_data.set_max_reconstructions(task_spec.MaxActorReconstructions());
// This is the first time that the actor has been created, so the number
// of remaining reconstructions is the max.
new_actor_data.set_remaining_reconstructions(
task.GetTaskSpecification().MaxActorReconstructions());
new_actor_data.set_remaining_reconstructions(task_spec.MaxActorReconstructions());
} else {
// If we've already seen this actor, it means that this actor was reconstructed.
// Thus, its previous state must be RECONSTRUCTING.
@ -1885,95 +1884,159 @@ ActorTableData NodeManager::CreateActorTableDataFromCreationTask(const Task &tas
void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
ActorID actor_id;
ActorHandleID actor_handle_id;
const TaskSpecification task_spec = task.GetTaskSpecification();
bool resumed_from_checkpoint = false;
if (task.GetTaskSpecification().IsActorCreationTask()) {
actor_id = task.GetTaskSpecification().ActorCreationId();
if (task_spec.IsActorCreationTask()) {
actor_id = task_spec.ActorCreationId();
actor_handle_id = ActorHandleID::Nil();
if (checkpoint_id_to_restore_.count(actor_id) > 0) {
resumed_from_checkpoint = true;
}
} else {
actor_id = task.GetTaskSpecification().ActorId();
actor_handle_id = task.GetTaskSpecification().ActorHandleId();
actor_id = task_spec.ActorId();
actor_handle_id = task_spec.ActorHandleId();
}
if (task.GetTaskSpecification().IsActorCreationTask()) {
if (task_spec.IsActorCreationTask()) {
// This was an actor creation task. Convert the worker to an actor.
worker.AssignActorId(actor_id);
// Notify the other node managers that the actor has been created.
const auto new_actor_data = CreateActorTableDataFromCreationTask(task);
if (resumed_from_checkpoint) {
// This actor was resumed from a checkpoint. In this case, we first look
// up the checkpoint in GCS and use it to restore the actor registration
// and frontier.
const auto checkpoint_id = checkpoint_id_to_restore_[actor_id];
checkpoint_id_to_restore_.erase(actor_id);
RAY_LOG(DEBUG) << "Looking up checkpoint " << checkpoint_id << " for actor "
<< actor_id;
RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Lookup(
JobID::Nil(), checkpoint_id,
[this, actor_id, new_actor_data](ray::gcs::AsyncGcsClient *client,
const UniqueID &checkpoint_id,
const ActorCheckpointData &checkpoint_data) {
RAY_LOG(INFO) << "Restoring registration for actor " << actor_id
<< " from checkpoint " << checkpoint_id;
ActorRegistration actor_registration =
ActorRegistration(new_actor_data, checkpoint_data);
// Mark the unreleased dummy objects in the checkpoint frontier as local.
for (const auto &entry : actor_registration.GetDummyObjects()) {
HandleObjectLocal(entry.first);
// Lookup the parent actor id.
auto parent_task_id = task_spec.ParentTaskId();
RAY_CHECK(actor_handle_id.IsNil());
RAY_CHECK_OK(gcs_client_->raylet_task_table().Lookup(
JobID::Nil(), parent_task_id,
/*success_callback=*/
[this, task_spec, resumed_from_checkpoint](
ray::gcs::AsyncGcsClient *client, const TaskID &parent_task_id,
const TaskTableData &parent_task_data) {
// The task was in the GCS task table. Use the stored task spec to
// get the parent actor id.
Task parent_task(parent_task_data.task());
ActorID parent_actor_id;
if (parent_task.GetTaskSpecification().IsActorCreationTask()) {
parent_actor_id = parent_task.GetTaskSpecification().ActorCreationId();
} else {
parent_actor_id = parent_task.GetTaskSpecification().ActorId();
}
FinishAssignedActorCreationTask(parent_actor_id, task_spec,
resumed_from_checkpoint);
},
/*failure_callback=*/
[this, task_spec, resumed_from_checkpoint](ray::gcs::AsyncGcsClient *client,
const TaskID &parent_task_id) {
// The parent task was not in the GCS task table. It should most likely be in
// the
// lineage cache.
ActorID parent_actor_id = ActorID::Nil();
if (lineage_cache_.ContainsTask(parent_task_id)) {
// Use a copy of the cached task spec to get the parent actor id.
Task parent_task = lineage_cache_.GetTaskOrDie(parent_task_id);
if (parent_task.GetTaskSpecification().IsActorCreationTask()) {
parent_actor_id = parent_task.GetTaskSpecification().ActorCreationId();
} else {
parent_actor_id = parent_task.GetTaskSpecification().ActorId();
}
HandleActorStateTransition(actor_id, std::move(actor_registration));
PublishActorStateTransition(
actor_id, new_actor_data,
/*failure_callback=*/
[](gcs::AsyncGcsClient *client, const ActorID &id,
const ActorTableData &data) {
// Only one node at a time should succeed at creating the actor.
RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id;
});
},
[actor_id](ray::gcs::AsyncGcsClient *client, const UniqueID &checkpoint_id) {
RAY_LOG(FATAL) << "Couldn't find checkpoint " << checkpoint_id
<< " for actor " << actor_id << " in GCS.";
}));
} else {
// The actor did not resume from a checkpoint. Immediately notify the
// other node managers that the actor has been created.
HandleActorStateTransition(actor_id, ActorRegistration(new_actor_data));
PublishActorStateTransition(
actor_id, new_actor_data,
/*failure_callback=*/
[](gcs::AsyncGcsClient *client, const ActorID &id, const ActorTableData &data) {
// Only one node at a time should succeed at creating the actor.
RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id;
});
}
} else {
RAY_LOG(WARNING)
<< "Task metadata not found in either GCS or lineage cache. It may have "
"been "
"evicted "
<< "by the redis LRU configuration. Consider increasing the memory "
"allocation via "
<< "ray.init(redis_max_memory=<max_memory_bytes>).";
}
FinishAssignedActorCreationTask(parent_actor_id, task_spec,
resumed_from_checkpoint);
}));
} else {
// The actor was not resumed from a checkpoint. We extend the actor's
// frontier as usual since there is no frontier to restore.
ExtendActorFrontier(task_spec.ActorDummyObject(), actor_id, actor_handle_id);
}
}
void NodeManager::ExtendActorFrontier(const ObjectID &dummy_object,
const ActorID &actor_id,
const ActorHandleID &actor_handle_id) {
auto actor_entry = actor_registry_.find(actor_id);
RAY_CHECK(actor_entry != actor_registry_.end());
// Extend the actor's frontier to include the executed task.
const ObjectID object_to_release =
actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object);
if (!object_to_release.IsNil()) {
// If there were no new actor handles created, then no other actor task
// will depend on this execution dependency, so it safe to release.
HandleObjectMissing(object_to_release);
}
// Mark the dummy object as locally available to indicate that the actor's
// state has changed and the next method can run. This is not added to the
// object table, so the update will be invisible to both the local object
// manager and the other nodes.
// NOTE(swang): The dummy objects must be marked as local whenever
// ExtendFrontier is called, and vice versa, so that we can clean up the
// dummy objects properly in case the actor fails and needs to be
// reconstructed.
HandleObjectLocal(dummy_object);
}
void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id,
const TaskSpecification &task_spec,
bool resumed_from_checkpoint) {
// Notify the other node managers that the actor has been created.
const ActorID actor_id = task_spec.ActorCreationId();
auto new_actor_data = CreateActorTableDataFromCreationTask(task_spec);
new_actor_data.set_parent_actor_id(parent_actor_id.Binary());
if (resumed_from_checkpoint) {
// This actor was resumed from a checkpoint. In this case, we first look
// up the checkpoint in GCS and use it to restore the actor registration
// and frontier.
const auto checkpoint_id = checkpoint_id_to_restore_[actor_id];
checkpoint_id_to_restore_.erase(actor_id);
RAY_LOG(DEBUG) << "Looking up checkpoint " << checkpoint_id << " for actor "
<< actor_id;
RAY_CHECK_OK(gcs_client_->actor_checkpoint_table().Lookup(
JobID::Nil(), checkpoint_id,
[this, actor_id, new_actor_data](ray::gcs::AsyncGcsClient *client,
const UniqueID &checkpoint_id,
const ActorCheckpointData &checkpoint_data) {
RAY_LOG(INFO) << "Restoring registration for actor " << actor_id
<< " from checkpoint " << checkpoint_id;
ActorRegistration actor_registration =
ActorRegistration(new_actor_data, checkpoint_data);
// Mark the unreleased dummy objects in the checkpoint frontier as local.
for (const auto &entry : actor_registration.GetDummyObjects()) {
HandleObjectLocal(entry.first);
}
HandleActorStateTransition(actor_id, std::move(actor_registration));
PublishActorStateTransition(
actor_id, new_actor_data,
/*failure_callback=*/
[](gcs::AsyncGcsClient *client, const ActorID &id,
const ActorTableData &data) {
// Only one node at a time should succeed at creating the actor.
RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id;
});
},
[actor_id](ray::gcs::AsyncGcsClient *client, const UniqueID &checkpoint_id) {
RAY_LOG(FATAL) << "Couldn't find checkpoint " << checkpoint_id << " for actor "
<< actor_id << " in GCS.";
}));
} else {
// The actor did not resume from a checkpoint. Immediately notify the
// other node managers that the actor has been created.
HandleActorStateTransition(actor_id, ActorRegistration(new_actor_data));
PublishActorStateTransition(
actor_id, new_actor_data,
/*failure_callback=*/
[](gcs::AsyncGcsClient *client, const ActorID &id, const ActorTableData &data) {
// Only one node at a time should succeed at creating the actor.
RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id;
});
}
if (!resumed_from_checkpoint) {
// The actor was not resumed from a checkpoint. We extend the actor's
// frontier as usual since there is no frontier to restore.
auto actor_entry = actor_registry_.find(actor_id);
RAY_CHECK(actor_entry != actor_registry_.end());
// Extend the actor's frontier to include the executed task.
const auto dummy_object = task.GetTaskSpecification().ActorDummyObject();
const ObjectID object_to_release =
actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object);
if (!object_to_release.IsNil()) {
// If there were no new actor handles created, then no other actor task
// will depend on this execution dependency, so it safe to release.
HandleObjectMissing(object_to_release);
}
// Mark the dummy object as locally available to indicate that the actor's
// state has changed and the next method can run. This is not added to the
// object table, so the update will be invisible to both the local object
// manager and the other nodes.
// NOTE(swang): The dummy objects must be marked as local whenever
// ExtendFrontier is called, and vice versa, so that we can clean up the
// dummy objects properly in case the actor fails and needs to be
// reconstructed.
HandleObjectLocal(dummy_object);
ExtendActorFrontier(task_spec.ActorDummyObject(), actor_id, ActorHandleID::Nil());
}
}

View file

@ -218,13 +218,33 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
void FinishAssignedTask(Worker &worker);
/// Helper function to produce actor table data for a newly created actor.
///
/// \param task The actor creation task that created the actor.
ActorTableData CreateActorTableDataFromCreationTask(const Task &task);
/// \param task_spec Task specification of the actor creation task that created the
/// actor.
ActorTableData CreateActorTableDataFromCreationTask(const TaskSpecification &task_spec);
/// Handle a worker finishing an assigned actor task or actor creation task.
/// \param worker The worker that finished the task.
/// \param task The actor task or actor creationt ask.
/// \param task The actor task or actor creation task.
/// \return Void.
void FinishAssignedActorTask(Worker &worker, const Task &task);
/// Helper function for handling worker to finish its assigned actor task
/// or actor creation task. Gets invoked when tasks's parent actor is known.
///
/// \param actor_id The actor id corresponding to the actor (creation) task.
/// \param actor_handle_id The actor id corresponding to the actor (creation) task.
/// \param new_actor_data The struct which will be used to register the task.
/// \param resumed_from_checkpoint If the actor was resumed from a checkpoint.
/// \param dummy_object Dummy object corresponding to the actor creation task.
/// \return Void.
void FinishAssignedActorCreationTask(const ActorID &parent_actor_id,
const TaskSpecification &task_spec,
bool resumed_from_checkpoint);
/// Extend actor frontier after an actor task or actor creation task executes.
///
/// \param dummy_object Dummy object corresponding to the task.
/// \param actor_id The relevant actor ID.
/// \param actor_handle_id The relevant actor handle ID.
void ExtendActorFrontier(const ObjectID &dummy_object, const ActorID &actor_id,
const ActorHandleID &actor_handle_id);
/// Make a placement decision for placeable tasks given the resource_map
/// provided. This will perform task state transitions and task forwarding.
///