diff --git a/dashboard/modules/actor/actor_head.py b/dashboard/modules/actor/actor_head.py index fdc315036..56f741514 100644 --- a/dashboard/modules/actor/actor_head.py +++ b/dashboard/modules/actor/actor_head.py @@ -14,7 +14,7 @@ import ray.dashboard.utils as dashboard_utils import ray.dashboard.optional_utils as dashboard_optional_utils from ray.dashboard.optional_utils import rest_response from ray.dashboard.modules.actor import actor_consts -from ray.dashboard.modules.actor.actor_utils import actor_classname_from_task_spec +from ray.dashboard.modules.actor.actor_utils import actor_classname_from_func_descriptor from ray.core.generated import node_manager_pb2_grpc from ray.core.generated import gcs_service_pb2 from ray.core.generated import gcs_service_pb2_grpc @@ -56,20 +56,17 @@ def actor_table_data_to_dict(message): "state", "name", "numRestarts", - "taskSpec", + "functionDescriptor", "timestamp", "numExecutedTasks", } light_message = {k: v for (k, v) in orig_message.items() if k in fields} - if "taskSpec" in light_message: - actor_class = actor_classname_from_task_spec(light_message["taskSpec"]) + logger.info(light_message) + if "functionDescriptor" in light_message: + actor_class = actor_classname_from_func_descriptor( + light_message["functionDescriptor"] + ) light_message["actorClass"] = actor_class - if "functionDescriptor" in light_message["taskSpec"]: - light_message["taskSpec"] = { - "functionDescriptor": light_message["taskSpec"]["functionDescriptor"] - } - else: - light_message.pop("taskSpec") return light_message diff --git a/dashboard/modules/actor/actor_utils.py b/dashboard/modules/actor/actor_utils.py index 814e0eb79..62f40b125 100644 --- a/dashboard/modules/actor/actor_utils.py +++ b/dashboard/modules/actor/actor_utils.py @@ -32,6 +32,14 @@ def actor_classname_from_task_spec(task_spec): ) +def actor_classname_from_func_descriptor(func_desc): + return ( + func_desc.get("pythonFunctionDescriptor", {}) + .get("className", "Unknown actor class") + .split(".")[-1] + ) + + def _group_actors_by_python_class(actors): groups = defaultdict(list) for actor in actors.values(): diff --git a/dashboard/modules/actor/tests/test_actor.py b/dashboard/modules/actor/tests/test_actor.py index 79824306e..5a53b263e 100644 --- a/dashboard/modules/actor/tests/test_actor.py +++ b/dashboard/modules/actor/tests/test_actor.py @@ -118,9 +118,8 @@ def test_actors(disable_aiohttp_cache, ray_start_with_dashboard): assert len(actors) == 3 one_entry = list(actors.values())[0] assert "jobId" in one_entry - assert "taskSpec" in one_entry - assert "functionDescriptor" in one_entry["taskSpec"] - assert type(one_entry["taskSpec"]["functionDescriptor"]) is dict + assert "functionDescriptor" in one_entry + assert type(one_entry["functionDescriptor"]) is dict assert "address" in one_entry assert type(one_entry["address"]) is dict assert "state" in one_entry @@ -264,7 +263,7 @@ def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard): including_default_value_fields=False, ) - non_state_keys = ("actorId", "jobId", "taskSpec") + non_state_keys = ("actorId", "jobId") for msg in msgs: actor_data_dict = actor_table_data_to_dict(msg) @@ -292,9 +291,9 @@ def test_actor_pubsub(disable_aiohttp_cache, ray_start_with_dashboard): "actorCreationDummyObjectId", "jobId", "ownerAddress", - "taskSpec", "className", "serializedRuntimeEnv", + "functionDescriptor", "rayNamespace", } else: diff --git a/dashboard/modules/snapshot/snapshot_head.py b/dashboard/modules/snapshot/snapshot_head.py index 4b6461f0d..289988498 100644 --- a/dashboard/modules/snapshot/snapshot_head.py +++ b/dashboard/modules/snapshot/snapshot_head.py @@ -169,7 +169,7 @@ class APIHead(dashboard_utils.DashboardHeadModule): "start_time": actor_table_entry.start_time, "end_time": actor_table_entry.end_time, "is_detached": actor_table_entry.is_detached, - "resources": dict(actor_table_entry.task_spec.required_resources), + "resources": dict(actor_table_entry.required_resources), "actor_class": actor_table_entry.class_name, "current_worker_id": actor_table_entry.address.worker_id.hex(), "current_raylet_id": actor_table_entry.address.raylet_id.hex(), diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index d012f074f..92cdd5232 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -57,29 +57,28 @@ rpc::ActorHandle CreateInnerActorHandleFromString(const std::string &serialized) return inner; } -rpc::ActorHandle CreateInnerActorHandleFromActorTableData( - const rpc::ActorTableData &actor_table_data) { +rpc::ActorHandle CreateInnerActorHandleFromActorData( + const rpc::ActorTableData &actor_table_data, const rpc::TaskSpec &task_spec) { rpc::ActorHandle inner; inner.set_actor_id(actor_table_data.actor_id()); inner.set_owner_id(actor_table_data.parent_id()); inner.mutable_owner_address()->CopyFrom(actor_table_data.owner_address()); inner.set_creation_job_id(actor_table_data.job_id()); - inner.set_actor_language(actor_table_data.task_spec().language()); + inner.set_actor_language(task_spec.language()); inner.mutable_actor_creation_task_function_descriptor()->CopyFrom( - actor_table_data.task_spec().function_descriptor()); - TaskSpecification task_spec(actor_table_data.task_spec()); - inner.set_actor_cursor(task_spec.ReturnId(0).Binary()); - inner.set_extension_data( - actor_table_data.task_spec().actor_creation_task_spec().extension_data()); - inner.set_max_task_retries( - actor_table_data.task_spec().actor_creation_task_spec().max_task_retries()); - inner.set_name(actor_table_data.task_spec().actor_creation_task_spec().name()); - inner.set_ray_namespace( - actor_table_data.task_spec().actor_creation_task_spec().ray_namespace()); + actor_table_data.function_descriptor()); + inner.set_actor_cursor( + ObjectID::FromIndex( + TaskID::ForActorCreationTask(ActorID::FromBinary(actor_table_data.actor_id())), + 1) + .Binary()); + inner.set_extension_data(task_spec.actor_creation_task_spec().extension_data()); + inner.set_max_task_retries(task_spec.actor_creation_task_spec().max_task_retries()); + inner.set_name(actor_table_data.name()); + inner.set_ray_namespace(actor_table_data.ray_namespace()); inner.set_execute_out_of_order( - actor_table_data.task_spec().actor_creation_task_spec().execute_out_of_order()); - inner.set_max_pending_calls( - actor_table_data.task_spec().actor_creation_task_spec().max_pending_calls()); + task_spec.actor_creation_task_spec().execute_out_of_order()); + inner.set_max_pending_calls(task_spec.actor_creation_task_spec().max_pending_calls()); return inner; } } // namespace @@ -115,8 +114,9 @@ ActorHandle::ActorHandle( ActorHandle::ActorHandle(const std::string &serialized) : ActorHandle(CreateInnerActorHandleFromString(serialized)) {} -ActorHandle::ActorHandle(const rpc::ActorTableData &actor_table_data) - : ActorHandle(CreateInnerActorHandleFromActorTableData(actor_table_data)) {} +ActorHandle::ActorHandle(const rpc::ActorTableData &actor_table_data, + const rpc::TaskSpec &task_spec) + : ActorHandle(CreateInnerActorHandleFromActorData(actor_table_data, task_spec)) {} void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor) { absl::MutexLock guard(&mutex_); diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index 0b43c55e3..58bd0ae95 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -49,8 +49,9 @@ class ActorHandle { /// Constructs an ActorHandle from a serialized string. explicit ActorHandle(const std::string &serialized); - /// Constructs an ActorHandle from a gcs::ActorTableData message. - ActorHandle(const rpc::ActorTableData &actor_table_data); + /// Constructs an ActorHandle from a rpc::ActorTableData and a rpc::TaskSpec message. + ActorHandle(const rpc::ActorTableData &actor_table_data, + const rpc::TaskSpec &task_spec); ActorID GetActorID() const { return ActorID::FromBinary(inner_.actor_id()); }; diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 5b35f5d36..723cb90b3 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -64,13 +64,16 @@ std::pair, Status> ActorManager::GetNamedActo // implemented using a promise that's captured in the RPC callback. // There should be no risk of deadlock because we don't hold any // locks during the call and the RPCs run on a separate thread. - rpc::ActorTableData result; - const auto status = gcs_client_->Actors().SyncGetByName(name, ray_namespace, result); + rpc::ActorTableData actor_table_data; + rpc::TaskSpec task_spec; + const auto status = gcs_client_->Actors().SyncGetByName( + name, ray_namespace, actor_table_data, task_spec); if (status.ok()) { - auto actor_handle = std::make_unique(result); + auto actor_handle = std::make_unique(actor_table_data, task_spec); actor_id = actor_handle->GetActorID(); AddNewActorHandle(std::move(actor_handle), - GenerateCachedActorName(result.ray_namespace(), result.name()), + GenerateCachedActorName(actor_table_data.ray_namespace(), + actor_table_data.name()), call_site, caller_address, /*is_detached*/ true); diff --git a/src/ray/gcs/gcs_client/accessor.cc b/src/ray/gcs/gcs_client/accessor.cc index e1b820539..8414b564a 100644 --- a/src/ray/gcs/gcs_client/accessor.cc +++ b/src/ray/gcs/gcs_client/accessor.cc @@ -194,15 +194,17 @@ Status ActorInfoAccessor::AsyncGetByName( Status ActorInfoAccessor::SyncGetByName(const std::string &name, const std::string &ray_namespace, - rpc::ActorTableData &actor_table_data) { + rpc::ActorTableData &actor_table_data, + rpc::TaskSpec &task_spec) { rpc::GetNamedActorInfoRequest request; rpc::GetNamedActorInfoReply reply; request.set_name(name); request.set_ray_namespace(ray_namespace); auto status = client_impl_->GetGcsRpcClient().SyncGetNamedActorInfo( request, &reply, /*timeout_ms*/ GetGcsTimeoutMs()); - if (status.ok() && reply.has_actor_table_data()) { + if (status.ok()) { actor_table_data = reply.actor_table_data(); + task_spec = reply.task_spec(); } return status; } diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index de4f34614..b2e91dad2 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -80,7 +80,8 @@ class ActorInfoAccessor { /// NotFound if the name doesn't exist. virtual Status SyncGetByName(const std::string &name, const std::string &ray_namespace, - rpc::ActorTableData &actor_table_data); + rpc::ActorTableData &actor_table_data, + rpc::TaskSpec &task_spec); /// List all named actors from the GCS asynchronously. /// diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index 47cec2c79..9deb1def8 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -908,7 +908,6 @@ TEST_P(GcsClientTest, DISABLED_TestGetActorPerf) { } for (int index = 0; index < actor_count; ++index) { auto actor_table_data = Mocker::GenActorTableData(job_id); - actor_table_data->mutable_task_spec()->CopyFrom(task_spec); RegisterActor(actor_table_data, false, true); } diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index d4378d2fe..c8bca96ce 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -172,8 +172,9 @@ std::string GcsActor::GetRayNamespace() const { } TaskSpecification GcsActor::GetCreationTaskSpecification() const { - const auto &task_spec = actor_table_data_.task_spec(); - return TaskSpecification(task_spec); + // The task spec is not available when the actor is dead. + RAY_CHECK(actor_table_data_.state() != rpc::ActorTableData::DEAD); + return TaskSpecification(*task_spec_); } const rpc::ActorTableData &GcsActor::GetActorTableData() const { @@ -182,6 +183,8 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const { rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; } +rpc::TaskSpec *GcsActor::GetMutableTaskSpec() { return task_spec_.get(); } + std::shared_ptr GcsActor::GetActorWorkerAssignment() const { return assignment_ptr_; @@ -372,6 +375,8 @@ void GcsActorManager::HandleGetNamedActorInfo( } else { reply->unsafe_arena_set_allocated_actor_table_data( iter->second->GetMutableActorTableData()); + RAY_LOG(INFO) << "WANGTAO " << iter->second->GetState(); + reply->unsafe_arena_set_allocated_task_spec(iter->second->GetMutableTaskSpec()); RAY_LOG(DEBUG) << "Finished getting actor info, job id = " << actor_id.JobId() << ", actor id = " << actor_id; } @@ -502,6 +507,8 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ } // The backend storage is supposed to be reliable, so the status must be ok. + RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Put( + actor_id, request.task_spec(), [](const Status &status) {})); RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor->GetActorID(), *actor->GetMutableActorTableData(), @@ -726,8 +733,6 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, gcs_actor_scheduler_->OnActorDestruction(it->second); } - const auto &task_id = it->second->GetCreationTaskSpecification().TaskId(); - it->second->GetMutableActorTableData()->mutable_task_spec()->Clear(); it->second->GetMutableActorTableData()->set_timestamp(current_sys_time_ms()); AddDestroyedActorToCache(it->second); const auto actor = std::move(it->second); @@ -768,7 +773,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, created_actors_.erase(node_it); } } else { - CancelActorInScheduling(actor, task_id); + CancelActorInScheduling(actor, TaskID::ForActorCreationTask(actor_id)); } } @@ -792,6 +797,7 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id, [this, actor_id, actor_table_data](Status status) { RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr)); + RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); // Destroy placement group owned by this actor. destroy_owned_placement_group_if_needed_(actor_id); })); @@ -1054,6 +1060,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, } RAY_CHECK_OK(gcs_publisher_->PublishActor( actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); + RAY_CHECK_OK( + gcs_table_storage_->ActorTaskSpecTable().Delete(actor_id, nullptr)); })); // The actor is dead, but we should not remove the entry from the // registered actors yet. If the actor is owned, we will destroy the actor @@ -1172,27 +1180,31 @@ void GcsActorManager::SetSchedulePendingActorsPosted(bool posted) { void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) { const auto &jobs = gcs_init_data.Jobs(); + const auto &actor_task_specs = gcs_init_data.ActorTaskSpecs(); absl::flat_hash_map> node_to_workers; - for (const auto &entry : gcs_init_data.Actors()) { - auto job_iter = jobs.find(entry.first.JobId()); + std::vector dead_actors; + for (const auto &[actor_id, actor_table_data] : gcs_init_data.Actors()) { + auto job_iter = jobs.find(actor_id.JobId()); auto is_job_dead = (job_iter == jobs.end() || job_iter->second.is_dead()); - auto actor = std::make_shared(entry.second); - if (entry.second.state() != ray::rpc::ActorTableData::DEAD && !is_job_dead) { - registered_actors_.emplace(entry.first, actor); + if (actor_table_data.state() != ray::rpc::ActorTableData::DEAD && !is_job_dead) { + const auto &iter = actor_task_specs.find(actor_id); + RAY_CHECK(iter != actor_task_specs.end()); + auto actor = std::make_shared(actor_table_data, iter->second); + registered_actors_.emplace(actor_id, actor); function_manager_.AddJobReference(actor->GetActorID().JobId()); if (!actor->GetName().empty()) { auto &actors_in_namespace = named_actors_[actor->GetRayNamespace()]; actors_in_namespace.emplace(actor->GetName(), actor->GetActorID()); } - if (entry.second.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) { + if (actor_table_data.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) { const auto &owner = actor->GetOwnerAddress(); const auto &owner_node = NodeID::FromBinary(owner.raylet_id()); const auto &owner_worker = WorkerID::FromBinary(owner.worker_id()); RAY_CHECK(unresolved_actors_[owner_node][owner_worker] .emplace(actor->GetActorID()) .second); - } else if (entry.second.state() == ray::rpc::ActorTableData::ALIVE) { + } else if (actor_table_data.state() == ray::rpc::ActorTableData::ALIVE) { created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(), actor->GetActorID()); } @@ -1208,11 +1220,17 @@ void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) { node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID()); } } else { - destroyed_actors_.emplace(entry.first, actor); - sorted_destroyed_actor_list_.emplace_back(entry.first, - (int64_t)entry.second.timestamp()); + dead_actors.push_back(actor_id); + auto actor = std::make_shared(actor_table_data); + destroyed_actors_.emplace(actor_id, actor); + sorted_destroyed_actor_list_.emplace_back(actor_id, + (int64_t)actor_table_data.timestamp()); } } + if (!dead_actors.empty()) { + RAY_CHECK_OK( + gcs_table_storage_->ActorTaskSpecTable().BatchDelete(dead_actors, nullptr)); + } sorted_destroyed_actor_list_.sort([](const std::pair &left, const std::pair &right) { return left.second < right.second; @@ -1251,8 +1269,11 @@ void GcsActorManager::OnJobFinished(const JobID &job_id) { run_delayed_( [this, non_detached_actors = std::move(non_detached_actors)]() { - RAY_CHECK_OK(gcs_table_storage_->ActorTable().BatchDelete(non_detached_actors, - nullptr)); + RAY_CHECK_OK(gcs_table_storage_->ActorTable().BatchDelete( + non_detached_actors, [this, non_detached_actors](const Status &status) { + RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().BatchDelete( + non_detached_actors, nullptr)); + })); }, actor_gc_delay_); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 788241df7..187f721b0 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -45,10 +45,22 @@ class GcsActor { explicit GcsActor(rpc::ActorTableData actor_table_data) : actor_table_data_(std::move(actor_table_data)) {} + /// Create a GcsActor by actor_table_data and task_spec. + /// This is only for ALIVE actors. + /// + /// \param actor_table_data Data of the actor (see gcs.proto). + /// \param task_spec Task spec of the actor. + explicit GcsActor(rpc::ActorTableData actor_table_data, rpc::TaskSpec task_spec) + : actor_table_data_(std::move(actor_table_data)), + task_spec_(std::make_unique(task_spec)) { + RAY_CHECK(actor_table_data_.state() != rpc::ActorTableData::DEAD); + } + /// Create a GcsActor by TaskSpec. /// /// \param task_spec Contains the actor creation task specification. - explicit GcsActor(const ray::rpc::TaskSpec &task_spec, std::string ray_namespace) { + explicit GcsActor(const ray::rpc::TaskSpec &task_spec, std::string ray_namespace) + : task_spec_(std::make_unique(task_spec)) { RAY_CHECK(task_spec.type() == TaskType::ACTOR_CREATION_TASK); const auto &actor_creation_task_spec = task_spec.actor_creation_task_spec(); actor_table_data_.set_actor_id(actor_creation_task_spec.actor_id()); @@ -59,18 +71,26 @@ class GcsActor { auto dummy_object = TaskSpecification(task_spec).ActorDummyObject().Binary(); actor_table_data_.set_actor_creation_dummy_object_id(dummy_object); + actor_table_data_.mutable_function_descriptor()->CopyFrom( + task_spec.function_descriptor()); + actor_table_data_.set_is_detached(actor_creation_task_spec.is_detached()); actor_table_data_.set_name(actor_creation_task_spec.name()); actor_table_data_.mutable_owner_address()->CopyFrom(task_spec.caller_address()); actor_table_data_.set_state(rpc::ActorTableData::DEPENDENCIES_UNREADY); - actor_table_data_.mutable_task_spec()->CopyFrom(task_spec); actor_table_data_.mutable_address()->set_raylet_id(NodeID::Nil().Binary()); actor_table_data_.mutable_address()->set_worker_id(WorkerID::Nil().Binary()); actor_table_data_.set_ray_namespace(ray_namespace); + // Set required resources. + auto resource_map = + GetCreationTaskSpecification().GetRequiredResources().GetResourceMap(); + actor_table_data_.mutable_required_resources()->insert(resource_map.begin(), + resource_map.end()); + const auto &function_descriptor = task_spec.function_descriptor(); switch (function_descriptor.function_descriptor_case()) { case rpc::FunctionDescriptor::FunctionDescriptorCase::kJavaFunctionDescriptor: @@ -127,6 +147,7 @@ class GcsActor { const rpc::ActorTableData &GetActorTableData() const; /// Get the mutable ActorTableData of this actor. rpc::ActorTableData *GetMutableActorTableData(); + rpc::TaskSpec *GetMutableTaskSpec(); std::shared_ptr GetActorWorkerAssignment() const; @@ -136,6 +157,7 @@ class GcsActor { /// The actor meta data which contains the task specification as well as the state of /// the gcs actor and so on (see gcs.proto). rpc::ActorTableData actor_table_data_; + const std::unique_ptr task_spec_; // TODO(Chong-Li): Considering shared assignments, this pointer would be moved out. std::shared_ptr assignment_ptr_ = nullptr; }; diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 24a3aabd6..028ddd0ba 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -231,7 +231,7 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, // Actor leases should be sent to the raylet immediately, so we should never build up a // backlog in GCS. lease_client->RequestWorkerLease( - actor->GetActorTableData().task_spec(), + actor->GetCreationTaskSpecification().GetMessage(), RayConfig::instance().gcs_actor_scheduling_enabled(), [this, actor, node](const Status &status, const rpc::RequestWorkerLeaseReply &reply) { diff --git a/src/ray/gcs/gcs_server/gcs_init_data.cc b/src/ray/gcs/gcs_server/gcs_init_data.cc index 63ba042e6..0b83270c9 100644 --- a/src/ray/gcs/gcs_server/gcs_init_data.cc +++ b/src/ray/gcs/gcs_server/gcs_init_data.cc @@ -18,7 +18,7 @@ namespace ray { namespace gcs { void GcsInitData::AsyncLoad(const EmptyCallback &on_done) { // There are 6 kinds of table data need to be loaded. - auto count_down = std::make_shared(5); + auto count_down = std::make_shared(6); auto on_load_finished = [count_down, on_done] { if (--(*count_down) == 0) { if (on_done) { @@ -35,6 +35,8 @@ void GcsInitData::AsyncLoad(const EmptyCallback &on_done) { AsyncLoadActorTableData(on_load_finished); + AsyncLoadActorTaskSpecTableData(on_load_finished); + AsyncLoadPlacementGroupTableData(on_load_finished); } @@ -102,5 +104,18 @@ void GcsInitData::AsyncLoadActorTableData(const EmptyCallback &on_done) { load_actor_table_data_callback)); } +void GcsInitData::AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done) { + RAY_LOG(INFO) << "Loading actor task spec table data."; + auto load_actor_task_spec_table_data_callback = + [this, on_done](const absl::flat_hash_map &result) { + actor_task_spec_table_data_ = std::move(result); + RAY_LOG(INFO) << "Finished loading actor task spec table data, size = " + << actor_task_spec_table_data_.size(); + on_done(); + }; + RAY_CHECK_OK(gcs_table_storage_->ActorTaskSpecTable().GetAll( + load_actor_task_spec_table_data_callback)); +} + } // namespace gcs } // namespace ray \ No newline at end of file diff --git a/src/ray/gcs/gcs_server/gcs_init_data.h b/src/ray/gcs/gcs_server/gcs_init_data.h index ff54f552c..6da8e9dc8 100644 --- a/src/ray/gcs/gcs_server/gcs_init_data.h +++ b/src/ray/gcs/gcs_server/gcs_init_data.h @@ -59,6 +59,10 @@ class GcsInitData { return actor_table_data_; } + const absl::flat_hash_map &ActorTaskSpecs() const { + return actor_task_spec_table_data_; + } + /// Get placement group metadata. const absl::flat_hash_map &PlacementGroups() const { @@ -91,6 +95,8 @@ class GcsInitData { /// \param on_done The callback when actor metadata is loaded successfully. void AsyncLoadActorTableData(const EmptyCallback &on_done); + void AsyncLoadActorTaskSpecTableData(const EmptyCallback &on_done); + protected: /// The gcs table storage. std::shared_ptr gcs_table_storage_; @@ -110,6 +116,8 @@ class GcsInitData { /// Actor metadata. absl::flat_hash_map actor_table_data_; + + absl::flat_hash_map actor_task_spec_table_data_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.cc b/src/ray/gcs/gcs_server/gcs_table_storage.cc index 2e8324ad0..a6b60714f 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.cc +++ b/src/ray/gcs/gcs_server/gcs_table_storage.cc @@ -176,8 +176,10 @@ template class GcsTable; template class GcsTable; template class GcsTable; template class GcsTable; +template class GcsTable; template class GcsTable; template class GcsTableWithJobId; +template class GcsTableWithJobId; template class GcsTable; template class GcsTable; diff --git a/src/ray/gcs/gcs_server/gcs_table_storage.h b/src/ray/gcs/gcs_server/gcs_table_storage.h index 7fd2b4df5..7c75dce44 100644 --- a/src/ray/gcs/gcs_server/gcs_table_storage.h +++ b/src/ray/gcs/gcs_server/gcs_table_storage.h @@ -37,6 +37,7 @@ using rpc::ResourceTableData; using rpc::ResourceUsageBatchData; using rpc::ScheduleData; using rpc::StoredConfig; +using rpc::TaskSpec; using rpc::WorkerTableData; /// \class GcsTable @@ -173,6 +174,17 @@ class GcsActorTable : public GcsTableWithJobId { JobID GetJobIdFromKey(const ActorID &key) override { return key.JobId(); } }; +class GcsActorTaskSpecTable : public GcsTableWithJobId { + public: + explicit GcsActorTaskSpecTable(std::shared_ptr &store_client) + : GcsTableWithJobId(store_client) { + table_name_ = TablePrefix_Name(TablePrefix::ACTOR_TASK_SPEC); + } + + private: + JobID GetJobIdFromKey(const ActorID &key) override { return key.JobId(); } +}; + class GcsPlacementGroupTable : public GcsTable { public: @@ -248,6 +260,7 @@ class GcsTableStorage { : store_client_(std::move(store_client)) { job_table_ = std::make_unique(store_client_); actor_table_ = std::make_unique(store_client_); + actor_task_spec_table_ = std::make_unique(store_client_); placement_group_table_ = std::make_unique(store_client_); node_table_ = std::make_unique(store_client_); node_resource_table_ = std::make_unique(store_client_); @@ -270,6 +283,11 @@ class GcsTableStorage { return *actor_table_; } + GcsActorTaskSpecTable &ActorTaskSpecTable() { + RAY_CHECK(actor_task_spec_table_ != nullptr); + return *actor_task_spec_table_; + } + GcsPlacementGroupTable &PlacementGroupTable() { RAY_CHECK(placement_group_table_ != nullptr); return *placement_group_table_; @@ -319,6 +337,7 @@ class GcsTableStorage { std::shared_ptr store_client_; std::unique_ptr job_table_; std::unique_ptr actor_table_; + std::unique_ptr actor_task_spec_table_; std::unique_ptr placement_group_table_; std::unique_ptr node_table_; std::unique_ptr node_resource_table_; diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index 76f3f0084..8ceb2d68e 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -263,7 +263,7 @@ TEST_F(GcsActorManagerTest, TestBasic) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; Status status = gcs_actor_manager_->CreateActor( @@ -294,7 +294,7 @@ TEST_F(GcsActorManagerTest, TestSchedulingFailed) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -328,7 +328,7 @@ TEST_F(GcsActorManagerTest, TestWorkerFailure) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -376,7 +376,7 @@ TEST_F(GcsActorManagerTest, TestNodeFailure) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; Status status = gcs_actor_manager_->CreateActor( @@ -428,7 +428,7 @@ TEST_F(GcsActorManagerTest, TestActorReconstruction) { /*detached=*/false); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; Status status = gcs_actor_manager_->CreateActor( @@ -498,7 +498,7 @@ TEST_F(GcsActorManagerTest, TestActorRestartWhenOwnerDead) { /*detached=*/false); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -550,7 +550,7 @@ TEST_F(GcsActorManagerTest, TestDetachedActorRestartWhenCreatorDead) { /*detached=*/true); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -668,7 +668,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { /*name=*/actor_name); rpc::CreateActorRequest request1; request1.mutable_task_spec()->CopyFrom( - registered_actor_1->GetActorTableData().task_spec()); + registered_actor_1->GetCreationTaskSpecification().GetMessage()); Status status = gcs_actor_manager_->CreateActor( request1, @@ -705,7 +705,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionWorkerFailure) { /*name=*/actor_name); rpc::CreateActorRequest request2; request2.mutable_task_spec()->CopyFrom( - registered_actor_2->GetActorTableData().task_spec()); + registered_actor_2->GetCreationTaskSpecification().GetMessage()); status = gcs_actor_manager_->CreateActor( request2, @@ -724,7 +724,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { /*name=*/"actor"); rpc::CreateActorRequest request1; request1.mutable_task_spec()->CopyFrom( - registered_actor_1->GetActorTableData().task_spec()); + registered_actor_1->GetCreationTaskSpecification().GetMessage()); Status status = gcs_actor_manager_->CreateActor( request1, @@ -760,7 +760,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNodeFailure) { /*name=*/"actor"); rpc::CreateActorRequest request2; request2.mutable_task_spec()->CopyFrom( - registered_actor_2->GetActorTableData().task_spec()); + registered_actor_2->GetCreationTaskSpecification().GetMessage()); status = gcs_actor_manager_->CreateActor( request2, @@ -780,7 +780,7 @@ TEST_F(GcsActorManagerTest, TestNamedActorDeletionNotHappendWhenReconstructed) { /*name=*/"actor"); rpc::CreateActorRequest request1; request1.mutable_task_spec()->CopyFrom( - registered_actor_1->GetActorTableData().task_spec()); + registered_actor_1->GetCreationTaskSpecification().GetMessage()); Status status = gcs_actor_manager_->CreateActor( request1, @@ -825,7 +825,7 @@ TEST_F(GcsActorManagerTest, TestDestroyActorBeforeActorCreationCompletes) { auto registered_actor = RegisterActor(job_id); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -861,7 +861,7 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { /*detached=*/false); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( @@ -886,8 +886,8 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { address.set_worker_id(worker_id.Binary()); actor->UpdateAddress(address); const auto &actor_id = actor->GetActorID(); - const auto &task_id = - TaskID::FromBinary(registered_actor->GetActorTableData().task_spec().task_id()); + const auto &task_id = TaskID::FromBinary( + registered_actor->GetCreationTaskSpecification().GetMessage().task_id()); EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id, task_id)); gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id); ASSERT_TRUE(actor->GetActorTableData().death_cause().has_actor_died_error_context()); @@ -907,7 +907,7 @@ TEST_F(GcsActorManagerTest, TestRegisterActor) { std::vector> finished_actors; rpc::CreateActorRequest request; request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); RAY_CHECK_OK(gcs_actor_manager_->CreateActor( request, [&finished_actors](std::shared_ptr actor, @@ -1025,7 +1025,7 @@ TEST_F(GcsActorManagerTest, TestOwnerAndChildDiedAtTheSameTimeRaceCondition) { /*detached=*/false); rpc::CreateActorRequest create_actor_request; create_actor_request.mutable_task_spec()->CopyFrom( - registered_actor->GetActorTableData().task_spec()); + registered_actor->GetCreationTaskSpecification().GetMessage()); std::vector> finished_actors; RAY_CHECK_OK(gcs_actor_manager_->CreateActor( diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc index c0ddfdea6..6687cc7c3 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc @@ -82,7 +82,7 @@ TEST_F(GcsActorSchedulerTest, KillWorkerLeak1) { rpc::ActorTableData actor_data; actor_data.set_state(rpc::ActorTableData::PENDING_CREATION); actor_data.set_actor_id(actor_id.Binary()); - auto actor = std::make_shared(actor_data); + auto actor = std::make_shared(actor_data, rpc::TaskSpec()); std::function cb; EXPECT_CALL(*raylet_client, RequestWorkerLease(An(), _, _, _, _)) .WillOnce(testing::SaveArg<2>(&cb)); @@ -109,7 +109,7 @@ TEST_F(GcsActorSchedulerTest, KillWorkerLeak2) { rpc::ActorTableData actor_data; actor_data.set_state(rpc::ActorTableData::PENDING_CREATION); actor_data.set_actor_id(actor_id.Binary()); - auto actor = std::make_shared(actor_data); + auto actor = std::make_shared(actor_data, rpc::TaskSpec()); rpc::ClientCallback request_worker_lease_cb; // Ensure actor is killed EXPECT_CALL(*core_worker_client, KillActor(_, _)); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 4f100465b..1d850e1e5 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -46,6 +46,7 @@ enum TablePrefix { PLACEMENT_GROUP_SCHEDULE = 18; PLACEMENT_GROUP = 19; KV = 20; + ACTOR_TASK_SPEC = 21; } // The channel that Add operations to the Table should be published on, if any. @@ -126,13 +127,13 @@ message ActorTableData { string name = 12; // Last timestamp that the actor state was updated. double timestamp = 13; - // The task specification of this actor's creation task. - TaskSpec task_spec = 14; // Resource mapping ids acquired by the leased worker. This field is only set when this // actor already has a leased worker. repeated ResourceMapEntry resource_mapping = 15; // The process id of this actor. uint32 pid = 16; + // The function descriptor of the actor creation task. + FunctionDescriptor function_descriptor = 17; // The actor's namespace. Named `ray_namespace` to avoid confusions when invoked in c++. string ray_namespace = 19; // The unix ms timestamp the actor was started at. @@ -148,6 +149,8 @@ message ActorTableData { string class_name = 23; // Contains metadata about why the actor is dead. ActorDeathCause death_cause = 24; + // Quantities of the different resources required by this actor. + map required_resources = 28; } message ErrorTableData { diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 6e2785e72..af8adf32b 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -97,6 +97,8 @@ message GetNamedActorInfoReply { GcsStatus status = 1; // Data of actor. ActorTableData actor_table_data = 2; + // The task specification of this actor's creation task. + TaskSpec task_spec = 3; } message ListNamedActorsRequest {