diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index e7779a43b..aef130e68 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -12,6 +12,7 @@ from ray.test_utils import (get_other_nodes, wait_for_condition, get_error_message) import ray.cluster_utils from ray._raylet import PlacementGroupID +from ray.test_utils import run_string_as_driver from ray.util.placement_group import (PlacementGroup, get_current_placement_group) @@ -995,5 +996,165 @@ def test_ready_warning_suppressed(ray_start_regular, error_pubsub): assert len(errors) == 0 +def test_automatic_cleanup_job(ray_start_cluster): + # Make sure the placement groups created by a + # job, actor, and task are cleaned when the job is done. + cluster = ray_start_cluster + num_nodes = 3 + num_cpu_per_node = 4 + # Create 3 nodes cluster. + for _ in range(num_nodes): + cluster.add_node(num_cpus=num_cpu_per_node) + + info = ray.init(address=cluster.address) + available_cpus = ray.available_resources()["CPU"] + assert available_cpus == num_nodes * num_cpu_per_node + + driver_code = f""" +import ray + +ray.init(address="{info["redis_address"]}") + +def create_pg(): + pg = ray.util.placement_group( + [{{"CPU": 1}} for _ in range(3)], + strategy="STRICT_SPREAD") + ray.get(pg.ready()) + return pg + +@ray.remote(num_cpus=0) +def f(): + create_pg() + +@ray.remote(num_cpus=0) +class A: + def create_pg(self): + create_pg() + +ray.get(f.remote()) +a = A.remote() +ray.get(a.create_pg.remote()) +# Create 2 pgs to make sure multiple placement groups that belong +# to a single job will be properly cleaned. +create_pg() +create_pg() + +ray.shutdown() + """ + + run_string_as_driver(driver_code) + + # Wait until the driver is reported as dead by GCS. + def is_job_done(): + jobs = ray.jobs() + for job in jobs: + if "StopTime" in job: + return True + return False + + def assert_num_cpus(expected_num_cpus): + if expected_num_cpus == 0: + return "CPU" not in ray.available_resources() + return ray.available_resources()["CPU"] == expected_num_cpus + + wait_for_condition(is_job_done) + available_cpus = ray.available_resources()["CPU"] + wait_for_condition(lambda: assert_num_cpus(num_nodes * num_cpu_per_node)) + + +def test_automatic_cleanup_detached_actors(ray_start_cluster): + # Make sure the placement groups created by a + # detached actors are cleaned properly. + cluster = ray_start_cluster + num_nodes = 3 + num_cpu_per_node = 2 + # Create 3 nodes cluster. + for _ in range(num_nodes): + cluster.add_node(num_cpus=num_cpu_per_node) + + info = ray.init(address=cluster.address) + available_cpus = ray.available_resources()["CPU"] + assert available_cpus == num_nodes * num_cpu_per_node + + driver_code = f""" +import ray + +ray.init(address="{info["redis_address"]}") + +def create_pg(): + pg = ray.util.placement_group( + [{{"CPU": 1}} for _ in range(3)], + strategy="STRICT_SPREAD") + ray.get(pg.ready()) + return pg + +# TODO(sang): Placement groups created by tasks launched by detached actor +# is not cleaned with the current protocol. +# @ray.remote(num_cpus=0) +# def f(): +# create_pg() + +@ray.remote(num_cpus=0, max_restarts=1) +class A: + def create_pg(self): + create_pg() + def create_child_pg(self): + self.a = A.options(name="B").remote() + ray.get(self.a.create_pg.remote()) + def kill_child_actor(self): + ray.kill(self.a) + try: + ray.get(self.a.create_pg.remote()) + except Exception: + pass + +a = A.options(lifetime="detached", name="A").remote() +ray.get(a.create_pg.remote()) +# TODO(sang): Currently, child tasks are cleaned when a detached actor +# is dead. We cannot test this scenario until it is fixed. +# ray.get(a.create_child_pg.remote()) + +ray.shutdown() + """ + + run_string_as_driver(driver_code) + + # Wait until the driver is reported as dead by GCS. + def is_job_done(): + jobs = ray.jobs() + for job in jobs: + if "StopTime" in job: + return True + return False + + def assert_num_cpus(expected_num_cpus): + if expected_num_cpus == 0: + return "CPU" not in ray.available_resources() + return ray.available_resources()["CPU"] == expected_num_cpus + + wait_for_condition(is_job_done) + assert assert_num_cpus(num_nodes) + # Make sure when a child actor spawned by a detached actor + # is killed, the placement group is removed. + a = ray.get_actor("A") + # TODO(sang): child of detached actors + # seem to be killed when jobs are done. We should fix this before + # testing this scenario. + # ray.get(a.kill_child_actor.remote()) + # assert assert_num_cpus(num_nodes) + + # Make sure placement groups are cleaned when detached actors are killed. + ray.kill(a, no_restart=False) + wait_for_condition(lambda: assert_num_cpus(num_nodes * num_cpu_per_node)) + # The detached actor a should've been restarted. + # Recreate a placement group. + ray.get(a.create_pg.remote()) + wait_for_condition(lambda: assert_num_cpus(num_nodes)) + # Kill it again and make sure the placement group + # that is created is deleted again. + ray.kill(a, no_restart=False) + wait_for_condition(lambda: assert_num_cpus(num_nodes * num_cpu_per_node)) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/placement_group.h b/src/ray/common/placement_group.h index d304631f8..a068ce4a1 100644 --- a/src/ray/common/placement_group.h +++ b/src/ray/common/placement_group.h @@ -67,10 +67,22 @@ class PlacementGroupSpecBuilder { PlacementGroupSpecBuilder &SetPlacementGroupSpec( const PlacementGroupID &placement_group_id, std::string name, const std::vector> &bundles, - const rpc::PlacementStrategy strategy) { + const rpc::PlacementStrategy strategy, const JobID &creator_job_id, + const ActorID &creator_actor_id, bool is_creator_detached_actor) { message_->set_placement_group_id(placement_group_id.Binary()); message_->set_name(name); message_->set_strategy(strategy); + // Configure creator job and actor ID for automatic lifecycle management. + RAY_CHECK(!creator_job_id.IsNil()); + message_->set_creator_job_id(creator_job_id.Binary()); + // When the creator is detached actor, we should just consider the job is dead. + // It is because the detached actor can be created AFTER the job is dead. + // Imagine a case where detached actor is restarted by GCS after the creator job is + // dead. + message_->set_creator_job_dead(is_creator_detached_actor); + message_->set_creator_actor_id(creator_actor_id.Binary()); + message_->set_creator_actor_dead(creator_actor_id.IsNil()); + for (size_t i = 0; i < bundles.size(); i++) { auto resources = bundles[i]; auto message_bundle = message_->add_bundles(); diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 7e47fc212..aa3f3f548 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -182,6 +182,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { current_actor_is_direct_call_ = true; current_actor_max_concurrency_ = task_spec.MaxActorConcurrency(); current_actor_is_asyncio_ = task_spec.IsAsyncioActor(); + is_detached_actor_ = task_spec.IsDetachedActor(); current_actor_placement_group_id_ = task_spec.PlacementGroupId(); placement_group_capture_child_tasks_ = task_spec.PlacementGroupCaptureChildTasks(); override_environment_variables_ = task_spec.OverrideEnvironmentVariables(); @@ -235,6 +236,8 @@ int WorkerContext::CurrentActorMaxConcurrency() const { bool WorkerContext::CurrentActorIsAsync() const { return current_actor_is_asyncio_; } +bool WorkerContext::CurrentActorDetached() const { return is_detached_actor_; } + WorkerThreadContext &WorkerContext::GetThreadContext() { if (thread_context_ == nullptr) { thread_context_ = std::unique_ptr(new WorkerThreadContext()); diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 6bd3b1bfa..fac3f2c08 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -74,6 +74,8 @@ class WorkerContext { bool CurrentActorIsAsync() const; + bool CurrentActorDetached() const; + int GetNextTaskIndex(); // Returns the next put object index; used to calculate ObjectIDs for puts. @@ -91,6 +93,7 @@ class WorkerContext { ActorID current_actor_id_; int current_actor_max_concurrency_ = 1; bool current_actor_is_asyncio_ = false; + bool is_detached_actor_ = false; // The placement group id that the current actor belongs to. PlacementGroupID current_actor_placement_group_id_; // Whether or not we should implicitly capture parent's placement group. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 49d90b91f..ab9a6cf4a 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1408,9 +1408,11 @@ Status CoreWorker::CreatePlacementGroup( PlacementGroupID *return_placement_group_id) { const PlacementGroupID placement_group_id = PlacementGroupID::FromRandom(); PlacementGroupSpecBuilder builder; - builder.SetPlacementGroupSpec(placement_group_id, placement_group_creation_options.name, - placement_group_creation_options.bundles, - placement_group_creation_options.strategy); + builder.SetPlacementGroupSpec( + placement_group_id, placement_group_creation_options.name, + placement_group_creation_options.bundles, placement_group_creation_options.strategy, + worker_context_.GetCurrentJobID(), worker_context_.GetCurrentActorID(), + worker_context_.CurrentActorDetached()); PlacementGroupSpecification placement_group_spec = builder.Build(); *return_placement_group_id = placement_group_id; RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id; diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index b0205c565..6ed5657b4 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -83,15 +83,19 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const { rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; } ///////////////////////////////////////////////////////////////////////////////////////// -GcsActorManager::GcsActorManager(std::shared_ptr scheduler, - std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub, - const rpc::ClientFactoryFn &worker_client_factory) +GcsActorManager::GcsActorManager( + std::shared_ptr scheduler, + std::shared_ptr gcs_table_storage, + std::shared_ptr gcs_pub_sub, + std::function destroy_owned_placement_group_if_needed, + const rpc::ClientFactoryFn &worker_client_factory) : gcs_actor_scheduler_(std::move(scheduler)), gcs_table_storage_(std::move(gcs_table_storage)), gcs_pub_sub_(std::move(gcs_pub_sub)), - worker_client_factory_(worker_client_factory) { + worker_client_factory_(worker_client_factory), + destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed) { RAY_CHECK(worker_client_factory_); + RAY_CHECK(destroy_owned_placement_group_if_needed_); } void GcsActorManager::HandleRegisterActor(const rpc::RegisterActorRequest &request, @@ -655,6 +659,8 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(), actor_table_data->SerializeAsString(), nullptr)); + // Destroy placement group owned by this actor. + destroy_owned_placement_group_if_needed_(actor_id); })); } @@ -800,6 +806,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche int64_t max_restarts = mutable_actor_table_data->max_restarts(); uint64_t num_restarts = mutable_actor_table_data->num_restarts(); int64_t remaining_restarts; + // Destroy placement group owned by this actor. + destroy_owned_placement_group_if_needed_(actor_id); if (!need_reschedule) { remaining_restarts = 0; } else if (max_restarts == -1) { @@ -848,7 +856,9 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche // if actor was an detached actor, make sure to destroy it. // We need to do this because detached actors are not destroyed // when its owners are dead because it doesn't have owners. - if (actor->IsDetached()) DestroyActor(actor_id); + if (actor->IsDetached()) { + DestroyActor(actor_id); + } RAY_CHECK_OK(gcs_pub_sub_->Publish( ACTOR_CHANNEL, actor_id.Hex(), mutable_actor_table_data->SerializeAsString(), nullptr)); diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 995c3d4ce..213f2ed38 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -161,10 +161,12 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// \param scheduler Used to schedule actor creation tasks. /// \param gcs_table_storage Used to flush actor data to storage. /// \param gcs_pub_sub Used to publish gcs message. - GcsActorManager(std::shared_ptr scheduler, - std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub, - const rpc::ClientFactoryFn &worker_client_factory = nullptr); + GcsActorManager( + std::shared_ptr scheduler, + std::shared_ptr gcs_table_storage, + std::shared_ptr gcs_pub_sub, + std::function destroy_ownded_placement_group_if_needed, + const rpc::ClientFactoryFn &worker_client_factory = nullptr); ~GcsActorManager() = default; @@ -404,6 +406,10 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// Factory to produce clients to workers. This is used to communicate with /// actors and their owners. rpc::ClientFactoryFn worker_client_factory_; + /// A callback that is used to destroy placemenet group owned by the actor. + /// This method MUST BE IDEMPOTENT because it can be called multiple times during + /// actor destroy process. + std::function destroy_owned_placement_group_if_needed_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 326381538..92953ec93 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -79,6 +79,27 @@ rpc::Bundle *GcsPlacementGroup::GetMutableBundle(int bundle_index) { return placement_group_table_data_.mutable_bundles(bundle_index); } +const ActorID GcsPlacementGroup::GetCreatorActorId() const { + return ActorID::FromBinary(placement_group_table_data_.creator_actor_id()); +} + +const JobID GcsPlacementGroup::GetCreatorJobId() const { + return JobID::FromBinary(placement_group_table_data_.creator_job_id()); +} + +void GcsPlacementGroup::MarkCreatorJobDead() { + placement_group_table_data_.set_creator_job_dead(true); +} + +void GcsPlacementGroup::MarkCreatorActorDead() { + placement_group_table_data_.set_creator_actor_dead(true); +} + +bool GcsPlacementGroup::IsPlacementGroupRemovable() const { + return placement_group_table_data_.creator_job_dead() && + placement_group_table_data_.creator_actor_dead(); +} + ///////////////////////////////////////////////////////////////////////////////////////// GcsPlacementGroupManager::GcsPlacementGroupManager( @@ -177,8 +198,7 @@ void GcsPlacementGroupManager::SchedulePendingPlacementGroups() { const auto placement_group = pending_placement_groups_.front(); const auto &placement_group_id = placement_group->GetPlacementGroupID(); // Do not reschedule if the placement group has removed already. - if (registered_placement_groups_.find(placement_group_id) != - registered_placement_groups_.end()) { + if (registered_placement_groups_.contains(placement_group_id)) { MarkSchedulingStarted(placement_group_id); gcs_placement_group_scheduler_->ScheduleUnplacedBundles( placement_group, @@ -210,8 +230,7 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup( [this, request, reply, send_reply_callback, placement_group_id, placement_group](Status status) { RAY_CHECK_OK(status); - if (registered_placement_groups_.find(placement_group_id) == - registered_placement_groups_.end()) { + if (!registered_placement_groups_.contains(placement_group_id)) { std::stringstream stream; stream << "Placement group of id " << placement_group_id << " has been removed before registration."; @@ -375,6 +394,34 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) { SchedulePendingPlacementGroups(); } +void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead( + const JobID &job_id) { + for (const auto &it : registered_placement_groups_) { + auto &placement_group = it.second; + if (placement_group->GetCreatorJobId() != job_id) { + continue; + } + placement_group->MarkCreatorJobDead(); + if (placement_group->IsPlacementGroupRemovable()) { + RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {}); + } + } +} + +void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead( + const ActorID &actor_id) { + for (const auto &it : registered_placement_groups_) { + auto &placement_group = it.second; + if (placement_group->GetCreatorActorId() != actor_id) { + continue; + } + placement_group->MarkCreatorActorDead(); + if (placement_group->IsPlacementGroupRemovable()) { + RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {}); + } + } +} + void GcsPlacementGroupManager::Tick() { UpdatePlacementGroupLoad(); execute_after(io_context_, [this] { Tick(); }, 1000 /* milliseconds */); diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index e2e81b36e..c19ab4689 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -53,6 +53,13 @@ class GcsPlacementGroup { placement_group_table_data_.mutable_bundles()->CopyFrom( placement_group_spec.bundles()); placement_group_table_data_.set_strategy(placement_group_spec.strategy()); + placement_group_table_data_.set_creator_job_id(placement_group_spec.creator_job_id()); + placement_group_table_data_.set_creator_actor_id( + placement_group_spec.creator_actor_id()); + placement_group_table_data_.set_creator_job_dead( + placement_group_spec.creator_job_dead()); + placement_group_table_data_.set_creator_actor_dead( + placement_group_spec.creator_actor_dead()); } /// Get the immutable PlacementGroupTableData of this placement group. @@ -82,18 +89,32 @@ class GcsPlacementGroup { /// Get the Strategy rpc::PlacementStrategy GetStrategy() const; - // Get debug string for the placement group. + /// Get debug string for the placement group. std::string DebugString() const; + /// Below fields are used for automatic cleanup of placement groups. + + /// Get the actor id that created the placement group. + const ActorID GetCreatorActorId() const; + + /// Get the job id that created the placement group. + const JobID GetCreatorJobId() const; + + /// Mark that the creator job of this placement group is dead. + void MarkCreatorJobDead(); + + /// Mark that the creator actor of this placement group is dead. + void MarkCreatorActorDead(); + + /// Return True if the placement group is removable. False otherwise. + bool IsPlacementGroupRemovable() const; + private: /// The placement_group meta data which contains the task specification as well as the /// state of the gcs placement_group and so on (see gcs.proto). rpc::PlacementGroupTableData placement_group_table_data_; }; -using RegisterPlacementGroupCallback = - std::function)>; - /// GcsPlacementGroupManager is responsible for managing the lifecycle of all placement /// group. This class is not thread-safe. /// The placementGroup will be added into queue and set the status as pending first and @@ -176,6 +197,37 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// \param node_id The specified node id. void OnNodeDead(const NodeID &node_id); + /// Clean placement group that belongs to the job id if necessary. + /// + /// This interface is a part of automatic lifecycle management for placement groups. + /// When a job is killed, this method should be invoked to clean up + /// placement groups that belong to the given job. + /// + /// Calling this method doesn't mean placement groups that belong to the given job + /// will be cleaned. Placement groups are cleaned only when the creator job AND actor + /// are both dead. + /// + /// NOTE: This method is idempotent. + /// + /// \param job_id The job id where placement groups that need to be cleaned belong to. + void CleanPlacementGroupIfNeededWhenJobDead(const JobID &job_id); + + /// Clean placement group that belongs to the actor id if necessary. + /// + /// This interface is a part of automatic lifecycle management for placement groups. + /// When an actor is killed, this method should be invoked to clean up + /// placement groups that belong to the given actor. + /// + /// Calling this method doesn't mean placement groups that belong to the given actor + /// will be cleaned. Placement groups are cleaned only when the creator job AND actor + /// are both dead. + /// + /// NOTE: This method is idempotent. + /// + /// \param actor_id The actor id where placement groups that need to be cleaned belong + /// to. + void CleanPlacementGroupIfNeededWhenActorDead(const ActorID &actor_id); + private: /// Try to create placement group after a short time. void RetryCreatingPlacementGroup(); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 7cbe050e4..a53437ef2 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -191,9 +191,14 @@ void GcsServer::InitGcsActorManager() { return std::make_shared(address, client_call_manager_); }); gcs_actor_manager_ = std::make_shared( - scheduler, gcs_table_storage_, gcs_pub_sub_, [this](const rpc::Address &address) { + scheduler, gcs_table_storage_, gcs_pub_sub_, + [this](const ActorID &actor_id) { + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id); + }, + [this](const rpc::Address &address) { return std::make_shared(address, client_call_manager_); }); + gcs_node_manager_->AddNodeAddedListener( [this](const std::shared_ptr &) { // Because a new node has been added, we need to try to schedule the pending @@ -227,6 +232,7 @@ void GcsServer::InitGcsJobManager() { std::unique_ptr(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_)); gcs_job_manager_->AddJobFinishedListener([this](std::shared_ptr job_id) { gcs_actor_manager_->OnJobFinished(*job_id); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(*job_id); }); } diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index c543603df..de3fc8fb6 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -87,6 +87,7 @@ class GcsActorManagerTest : public ::testing::Test { gcs_table_storage_ = std::make_shared(io_service_); gcs_actor_manager_.reset(new gcs::GcsActorManager( mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_, + [](const ActorID &actor_id) {}, [this](const rpc::Address &addr) { return worker_client_; })); } diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index b5b357896..f7cf0927b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -351,6 +351,132 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) { placement_group->GetPlacementGroupID()); } +TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorDeadAndJobDead) { + // Test the scenario where actor dead -> job dead. + const auto job_id = JobID::FromInt(1); + const auto actor_id = ActorID::Of(job_id, TaskID::Nil(), 0); + auto request = Mocker::GenCreatePlacementGroupRequest( + /* name */ "", rpc::PlacementStrategy::SPREAD, + /* bundles_count */ 2, + /* cpu_num */ 1.0, + /* job_id */ job_id, + /* actor_id */ actor_id); + std::atomic finished_placement_group_count(0); + gcs_placement_group_manager_->RegisterPlacementGroup( + std::make_shared(request), + [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + auto placement_group_id = placement_group->GetPlacementGroupID(); + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); + WaitForExpectedCount(finished_placement_group_count, 1); + // When both job and actor is dead, placement group should be destroyed. + EXPECT_CALL(*mock_placement_group_scheduler_, + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) + .Times(0); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id); + // Placement group shouldn't be cleaned when only an actor is killed. + // When both job and actor is dead, placement group should be destroyed. + EXPECT_CALL(*mock_placement_group_scheduler_, + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) + .Times(1); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id); +} + +TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorAndJobDead) { + // Test the scenario where job dead -> actor dead. + const auto job_id = JobID::FromInt(1); + const auto actor_id = ActorID::Of(job_id, TaskID::Nil(), 0); + auto request = Mocker::GenCreatePlacementGroupRequest( + /* name */ "", rpc::PlacementStrategy::SPREAD, + /* bundles_count */ 2, + /* cpu_num */ 1.0, + /* job_id */ job_id, + /* actor_id */ actor_id); + std::atomic finished_placement_group_count(0); + gcs_placement_group_manager_->RegisterPlacementGroup( + std::make_shared(request), + [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + auto placement_group_id = placement_group->GetPlacementGroupID(); + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); + WaitForExpectedCount(finished_placement_group_count, 1); + EXPECT_CALL(*mock_placement_group_scheduler_, + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) + .Times(0); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id); + // Placement group shouldn't be cleaned when only an actor is killed. + EXPECT_CALL(*mock_placement_group_scheduler_, + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) + .Times(1); + // This method should ensure idempotency. + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id); +} + +TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenOnlyJobDead) { + // Test placement group is cleaned when both actor & job are dead. + const auto job_id = JobID::FromInt(1); + auto request = Mocker::GenCreatePlacementGroupRequest( + /* name */ "", rpc::PlacementStrategy::SPREAD, + /* bundles_count */ 2, + /* cpu_num */ 1.0, + /* job_id */ job_id, + /* actor_id */ ActorID::Nil()); + std::atomic finished_placement_group_count(0); + gcs_placement_group_manager_->RegisterPlacementGroup( + std::make_shared(request), + [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + auto placement_group_id = placement_group->GetPlacementGroupID(); + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); + WaitForExpectedCount(finished_placement_group_count, 1); + EXPECT_CALL(*mock_placement_group_scheduler_, + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) + .Times(1); + // This method should ensure idempotency. + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id); +} + +TEST_F(GcsPlacementGroupManagerTest, + TestAutomaticCleanupDoNothingWhenDifferentJobIsDead) { + // Test placement group is cleaned when both actor & job are dead. + const auto job_id = JobID::FromInt(1); + const auto different_job_id = JobID::FromInt(3); + auto request = Mocker::GenCreatePlacementGroupRequest( + /* name */ "", rpc::PlacementStrategy::SPREAD, + /* bundles_count */ 2, + /* cpu_num */ 1.0, + /* job_id */ job_id, + /* actor_id */ ActorID::Nil()); + std::atomic finished_placement_group_count(0); + gcs_placement_group_manager_->RegisterPlacementGroup( + std::make_shared(request), + [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); + auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); + auto placement_group_id = placement_group->GetPlacementGroupID(); + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); + WaitForExpectedCount(finished_placement_group_count, 1); + // This shouldn't have been called. + EXPECT_CALL(*mock_placement_group_scheduler_, + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) + .Times(0); + // This method should ensure idempotency. + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(different_job_id); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(different_job_id); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(different_job_id); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index 5192d7397..d96f6ea1f 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -81,18 +81,20 @@ struct Mocker { static PlacementGroupSpecification GenPlacementGroupCreation( const std::string &name, std::vector> &bundles, - rpc::PlacementStrategy strategy) { + rpc::PlacementStrategy strategy, const JobID &job_id, const ActorID &actor_id) { PlacementGroupSpecBuilder builder; auto placement_group_id = PlacementGroupID::FromRandom(); - builder.SetPlacementGroupSpec(placement_group_id, name, bundles, strategy); + builder.SetPlacementGroupSpec(placement_group_id, name, bundles, strategy, job_id, + actor_id, /* is_creator_detached */ false); return builder.Build(); } static rpc::CreatePlacementGroupRequest GenCreatePlacementGroupRequest( const std::string name = "", rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD, - int bundles_count = 2, double cpu_num = 1.0) { + int bundles_count = 2, double cpu_num = 1.0, const JobID job_id = JobID::FromInt(1), + const ActorID &actor_id = ActorID::Nil()) { rpc::CreatePlacementGroupRequest request; std::vector> bundles; std::unordered_map bundle; @@ -101,7 +103,7 @@ struct Mocker { bundles.push_back(bundle); } auto placement_group_creation_spec = - GenPlacementGroupCreation(name, bundles, strategy); + GenPlacementGroupCreation(name, bundles, strategy, job_id, actor_id); request.mutable_placement_group_spec()->CopyFrom( placement_group_creation_spec.GetMessage()); return request; diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 99e654c45..60f418e8e 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -214,6 +214,14 @@ message PlacementGroupSpec { repeated Bundle bundles = 3; // The schedule strategy of this Placement Group. PlacementStrategy strategy = 4; + // The job id that created this placement group. + bytes creator_job_id = 5; + // The actor id that created this placement group. + bytes creator_actor_id = 6; + // Whether or not if the creator job is dead. + bool creator_job_dead = 7; + // Whether or not if the creator actor is dead. + bool creator_actor_dead = 8; } message ObjectReference { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 25f34e2bc..fd4b4cd2a 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -181,6 +181,16 @@ message PlacementGroupTableData { PlacementStrategy strategy = 4; // Current state of this placement group. PlacementGroupState state = 5; + // Fields to detect the owner of the placement group + // for automatic lifecycle management. + // The job id that created this placement group. + bytes creator_job_id = 6; + // The actor id that created this placement group. + bytes creator_actor_id = 7; + // Whether or not if the creator job is dead. + bool creator_job_dead = 8; + // Whether or not if the creator actor is dead. + bool creator_actor_dead = 9; } message ScheduleData {