[Placement Group] Placement group automatic cleanup. (#11546)

* In progress. Done with all placement group manager code.

* It is working with job.

* Finished detached actor implementation.

* Fix minor issue.

* In progress.

* Addressed code review.

* Addressed code review.

* Addressed code reivew.

* Fix a build error.
This commit is contained in:
SangBin Cho 2020-10-30 10:55:43 -07:00 committed by GitHub
parent 5a83d8918a
commit 6e2a1eac36
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 476 additions and 27 deletions

View file

@ -12,6 +12,7 @@ from ray.test_utils import (get_other_nodes, wait_for_condition,
get_error_message)
import ray.cluster_utils
from ray._raylet import PlacementGroupID
from ray.test_utils import run_string_as_driver
from ray.util.placement_group import (PlacementGroup,
get_current_placement_group)
@ -995,5 +996,165 @@ def test_ready_warning_suppressed(ray_start_regular, error_pubsub):
assert len(errors) == 0
def test_automatic_cleanup_job(ray_start_cluster):
# Make sure the placement groups created by a
# job, actor, and task are cleaned when the job is done.
cluster = ray_start_cluster
num_nodes = 3
num_cpu_per_node = 4
# Create 3 nodes cluster.
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_cpu_per_node)
info = ray.init(address=cluster.address)
available_cpus = ray.available_resources()["CPU"]
assert available_cpus == num_nodes * num_cpu_per_node
driver_code = f"""
import ray
ray.init(address="{info["redis_address"]}")
def create_pg():
pg = ray.util.placement_group(
[{{"CPU": 1}} for _ in range(3)],
strategy="STRICT_SPREAD")
ray.get(pg.ready())
return pg
@ray.remote(num_cpus=0)
def f():
create_pg()
@ray.remote(num_cpus=0)
class A:
def create_pg(self):
create_pg()
ray.get(f.remote())
a = A.remote()
ray.get(a.create_pg.remote())
# Create 2 pgs to make sure multiple placement groups that belong
# to a single job will be properly cleaned.
create_pg()
create_pg()
ray.shutdown()
"""
run_string_as_driver(driver_code)
# Wait until the driver is reported as dead by GCS.
def is_job_done():
jobs = ray.jobs()
for job in jobs:
if "StopTime" in job:
return True
return False
def assert_num_cpus(expected_num_cpus):
if expected_num_cpus == 0:
return "CPU" not in ray.available_resources()
return ray.available_resources()["CPU"] == expected_num_cpus
wait_for_condition(is_job_done)
available_cpus = ray.available_resources()["CPU"]
wait_for_condition(lambda: assert_num_cpus(num_nodes * num_cpu_per_node))
def test_automatic_cleanup_detached_actors(ray_start_cluster):
# Make sure the placement groups created by a
# detached actors are cleaned properly.
cluster = ray_start_cluster
num_nodes = 3
num_cpu_per_node = 2
# Create 3 nodes cluster.
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_cpu_per_node)
info = ray.init(address=cluster.address)
available_cpus = ray.available_resources()["CPU"]
assert available_cpus == num_nodes * num_cpu_per_node
driver_code = f"""
import ray
ray.init(address="{info["redis_address"]}")
def create_pg():
pg = ray.util.placement_group(
[{{"CPU": 1}} for _ in range(3)],
strategy="STRICT_SPREAD")
ray.get(pg.ready())
return pg
# TODO(sang): Placement groups created by tasks launched by detached actor
# is not cleaned with the current protocol.
# @ray.remote(num_cpus=0)
# def f():
# create_pg()
@ray.remote(num_cpus=0, max_restarts=1)
class A:
def create_pg(self):
create_pg()
def create_child_pg(self):
self.a = A.options(name="B").remote()
ray.get(self.a.create_pg.remote())
def kill_child_actor(self):
ray.kill(self.a)
try:
ray.get(self.a.create_pg.remote())
except Exception:
pass
a = A.options(lifetime="detached", name="A").remote()
ray.get(a.create_pg.remote())
# TODO(sang): Currently, child tasks are cleaned when a detached actor
# is dead. We cannot test this scenario until it is fixed.
# ray.get(a.create_child_pg.remote())
ray.shutdown()
"""
run_string_as_driver(driver_code)
# Wait until the driver is reported as dead by GCS.
def is_job_done():
jobs = ray.jobs()
for job in jobs:
if "StopTime" in job:
return True
return False
def assert_num_cpus(expected_num_cpus):
if expected_num_cpus == 0:
return "CPU" not in ray.available_resources()
return ray.available_resources()["CPU"] == expected_num_cpus
wait_for_condition(is_job_done)
assert assert_num_cpus(num_nodes)
# Make sure when a child actor spawned by a detached actor
# is killed, the placement group is removed.
a = ray.get_actor("A")
# TODO(sang): child of detached actors
# seem to be killed when jobs are done. We should fix this before
# testing this scenario.
# ray.get(a.kill_child_actor.remote())
# assert assert_num_cpus(num_nodes)
# Make sure placement groups are cleaned when detached actors are killed.
ray.kill(a, no_restart=False)
wait_for_condition(lambda: assert_num_cpus(num_nodes * num_cpu_per_node))
# The detached actor a should've been restarted.
# Recreate a placement group.
ray.get(a.create_pg.remote())
wait_for_condition(lambda: assert_num_cpus(num_nodes))
# Kill it again and make sure the placement group
# that is created is deleted again.
ray.kill(a, no_restart=False)
wait_for_condition(lambda: assert_num_cpus(num_nodes * num_cpu_per_node))
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -67,10 +67,22 @@ class PlacementGroupSpecBuilder {
PlacementGroupSpecBuilder &SetPlacementGroupSpec(
const PlacementGroupID &placement_group_id, std::string name,
const std::vector<std::unordered_map<std::string, double>> &bundles,
const rpc::PlacementStrategy strategy) {
const rpc::PlacementStrategy strategy, const JobID &creator_job_id,
const ActorID &creator_actor_id, bool is_creator_detached_actor) {
message_->set_placement_group_id(placement_group_id.Binary());
message_->set_name(name);
message_->set_strategy(strategy);
// Configure creator job and actor ID for automatic lifecycle management.
RAY_CHECK(!creator_job_id.IsNil());
message_->set_creator_job_id(creator_job_id.Binary());
// When the creator is detached actor, we should just consider the job is dead.
// It is because the detached actor can be created AFTER the job is dead.
// Imagine a case where detached actor is restarted by GCS after the creator job is
// dead.
message_->set_creator_job_dead(is_creator_detached_actor);
message_->set_creator_actor_id(creator_actor_id.Binary());
message_->set_creator_actor_dead(creator_actor_id.IsNil());
for (size_t i = 0; i < bundles.size(); i++) {
auto resources = bundles[i];
auto message_bundle = message_->add_bundles();

View file

@ -182,6 +182,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) {
current_actor_is_direct_call_ = true;
current_actor_max_concurrency_ = task_spec.MaxActorConcurrency();
current_actor_is_asyncio_ = task_spec.IsAsyncioActor();
is_detached_actor_ = task_spec.IsDetachedActor();
current_actor_placement_group_id_ = task_spec.PlacementGroupId();
placement_group_capture_child_tasks_ = task_spec.PlacementGroupCaptureChildTasks();
override_environment_variables_ = task_spec.OverrideEnvironmentVariables();
@ -235,6 +236,8 @@ int WorkerContext::CurrentActorMaxConcurrency() const {
bool WorkerContext::CurrentActorIsAsync() const { return current_actor_is_asyncio_; }
bool WorkerContext::CurrentActorDetached() const { return is_detached_actor_; }
WorkerThreadContext &WorkerContext::GetThreadContext() {
if (thread_context_ == nullptr) {
thread_context_ = std::unique_ptr<WorkerThreadContext>(new WorkerThreadContext());

View file

@ -74,6 +74,8 @@ class WorkerContext {
bool CurrentActorIsAsync() const;
bool CurrentActorDetached() const;
int GetNextTaskIndex();
// Returns the next put object index; used to calculate ObjectIDs for puts.
@ -91,6 +93,7 @@ class WorkerContext {
ActorID current_actor_id_;
int current_actor_max_concurrency_ = 1;
bool current_actor_is_asyncio_ = false;
bool is_detached_actor_ = false;
// The placement group id that the current actor belongs to.
PlacementGroupID current_actor_placement_group_id_;
// Whether or not we should implicitly capture parent's placement group.

View file

@ -1408,9 +1408,11 @@ Status CoreWorker::CreatePlacementGroup(
PlacementGroupID *return_placement_group_id) {
const PlacementGroupID placement_group_id = PlacementGroupID::FromRandom();
PlacementGroupSpecBuilder builder;
builder.SetPlacementGroupSpec(placement_group_id, placement_group_creation_options.name,
placement_group_creation_options.bundles,
placement_group_creation_options.strategy);
builder.SetPlacementGroupSpec(
placement_group_id, placement_group_creation_options.name,
placement_group_creation_options.bundles, placement_group_creation_options.strategy,
worker_context_.GetCurrentJobID(), worker_context_.GetCurrentActorID(),
worker_context_.CurrentActorDetached());
PlacementGroupSpecification placement_group_spec = builder.Build();
*return_placement_group_id = placement_group_id;
RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id;

View file

@ -83,15 +83,19 @@ const rpc::ActorTableData &GcsActor::GetActorTableData() const {
rpc::ActorTableData *GcsActor::GetMutableActorTableData() { return &actor_table_data_; }
/////////////////////////////////////////////////////////////////////////////////////////
GcsActorManager::GcsActorManager(std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
const rpc::ClientFactoryFn &worker_client_factory)
GcsActorManager::GcsActorManager(
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory)
: gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_pub_sub_(std::move(gcs_pub_sub)),
worker_client_factory_(worker_client_factory) {
worker_client_factory_(worker_client_factory),
destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed) {
RAY_CHECK(worker_client_factory_);
RAY_CHECK(destroy_owned_placement_group_if_needed_);
}
void GcsActorManager::HandleRegisterActor(const rpc::RegisterActorRequest &request,
@ -655,6 +659,8 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data->SerializeAsString(),
nullptr));
// Destroy placement group owned by this actor.
destroy_owned_placement_group_if_needed_(actor_id);
}));
}
@ -800,6 +806,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
int64_t max_restarts = mutable_actor_table_data->max_restarts();
uint64_t num_restarts = mutable_actor_table_data->num_restarts();
int64_t remaining_restarts;
// Destroy placement group owned by this actor.
destroy_owned_placement_group_if_needed_(actor_id);
if (!need_reschedule) {
remaining_restarts = 0;
} else if (max_restarts == -1) {
@ -848,7 +856,9 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
// if actor was an detached actor, make sure to destroy it.
// We need to do this because detached actors are not destroyed
// when its owners are dead because it doesn't have owners.
if (actor->IsDetached()) DestroyActor(actor_id);
if (actor->IsDetached()) {
DestroyActor(actor_id);
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
mutable_actor_table_data->SerializeAsString(), nullptr));

View file

@ -161,10 +161,12 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param scheduler Used to schedule actor creation tasks.
/// \param gcs_table_storage Used to flush actor data to storage.
/// \param gcs_pub_sub Used to publish gcs message.
GcsActorManager(std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
const rpc::ClientFactoryFn &worker_client_factory = nullptr);
GcsActorManager(
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::function<void(const ActorID &)> destroy_ownded_placement_group_if_needed,
const rpc::ClientFactoryFn &worker_client_factory = nullptr);
~GcsActorManager() = default;
@ -404,6 +406,10 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::ClientFactoryFn worker_client_factory_;
/// A callback that is used to destroy placemenet group owned by the actor.
/// This method MUST BE IDEMPOTENT because it can be called multiple times during
/// actor destroy process.
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed_;
};
} // namespace gcs

View file

@ -79,6 +79,27 @@ rpc::Bundle *GcsPlacementGroup::GetMutableBundle(int bundle_index) {
return placement_group_table_data_.mutable_bundles(bundle_index);
}
const ActorID GcsPlacementGroup::GetCreatorActorId() const {
return ActorID::FromBinary(placement_group_table_data_.creator_actor_id());
}
const JobID GcsPlacementGroup::GetCreatorJobId() const {
return JobID::FromBinary(placement_group_table_data_.creator_job_id());
}
void GcsPlacementGroup::MarkCreatorJobDead() {
placement_group_table_data_.set_creator_job_dead(true);
}
void GcsPlacementGroup::MarkCreatorActorDead() {
placement_group_table_data_.set_creator_actor_dead(true);
}
bool GcsPlacementGroup::IsPlacementGroupRemovable() const {
return placement_group_table_data_.creator_job_dead() &&
placement_group_table_data_.creator_actor_dead();
}
/////////////////////////////////////////////////////////////////////////////////////////
GcsPlacementGroupManager::GcsPlacementGroupManager(
@ -177,8 +198,7 @@ void GcsPlacementGroupManager::SchedulePendingPlacementGroups() {
const auto placement_group = pending_placement_groups_.front();
const auto &placement_group_id = placement_group->GetPlacementGroupID();
// Do not reschedule if the placement group has removed already.
if (registered_placement_groups_.find(placement_group_id) !=
registered_placement_groups_.end()) {
if (registered_placement_groups_.contains(placement_group_id)) {
MarkSchedulingStarted(placement_group_id);
gcs_placement_group_scheduler_->ScheduleUnplacedBundles(
placement_group,
@ -210,8 +230,7 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup(
[this, request, reply, send_reply_callback, placement_group_id,
placement_group](Status status) {
RAY_CHECK_OK(status);
if (registered_placement_groups_.find(placement_group_id) ==
registered_placement_groups_.end()) {
if (!registered_placement_groups_.contains(placement_group_id)) {
std::stringstream stream;
stream << "Placement group of id " << placement_group_id
<< " has been removed before registration.";
@ -375,6 +394,34 @@ void GcsPlacementGroupManager::OnNodeDead(const NodeID &node_id) {
SchedulePendingPlacementGroups();
}
void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead(
const JobID &job_id) {
for (const auto &it : registered_placement_groups_) {
auto &placement_group = it.second;
if (placement_group->GetCreatorJobId() != job_id) {
continue;
}
placement_group->MarkCreatorJobDead();
if (placement_group->IsPlacementGroupRemovable()) {
RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {});
}
}
}
void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead(
const ActorID &actor_id) {
for (const auto &it : registered_placement_groups_) {
auto &placement_group = it.second;
if (placement_group->GetCreatorActorId() != actor_id) {
continue;
}
placement_group->MarkCreatorActorDead();
if (placement_group->IsPlacementGroupRemovable()) {
RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {});
}
}
}
void GcsPlacementGroupManager::Tick() {
UpdatePlacementGroupLoad();
execute_after(io_context_, [this] { Tick(); }, 1000 /* milliseconds */);

View file

@ -53,6 +53,13 @@ class GcsPlacementGroup {
placement_group_table_data_.mutable_bundles()->CopyFrom(
placement_group_spec.bundles());
placement_group_table_data_.set_strategy(placement_group_spec.strategy());
placement_group_table_data_.set_creator_job_id(placement_group_spec.creator_job_id());
placement_group_table_data_.set_creator_actor_id(
placement_group_spec.creator_actor_id());
placement_group_table_data_.set_creator_job_dead(
placement_group_spec.creator_job_dead());
placement_group_table_data_.set_creator_actor_dead(
placement_group_spec.creator_actor_dead());
}
/// Get the immutable PlacementGroupTableData of this placement group.
@ -82,18 +89,32 @@ class GcsPlacementGroup {
/// Get the Strategy
rpc::PlacementStrategy GetStrategy() const;
// Get debug string for the placement group.
/// Get debug string for the placement group.
std::string DebugString() const;
/// Below fields are used for automatic cleanup of placement groups.
/// Get the actor id that created the placement group.
const ActorID GetCreatorActorId() const;
/// Get the job id that created the placement group.
const JobID GetCreatorJobId() const;
/// Mark that the creator job of this placement group is dead.
void MarkCreatorJobDead();
/// Mark that the creator actor of this placement group is dead.
void MarkCreatorActorDead();
/// Return True if the placement group is removable. False otherwise.
bool IsPlacementGroupRemovable() const;
private:
/// The placement_group meta data which contains the task specification as well as the
/// state of the gcs placement_group and so on (see gcs.proto).
rpc::PlacementGroupTableData placement_group_table_data_;
};
using RegisterPlacementGroupCallback =
std::function<void(std::shared_ptr<GcsPlacementGroup>)>;
/// GcsPlacementGroupManager is responsible for managing the lifecycle of all placement
/// group. This class is not thread-safe.
/// The placementGroup will be added into queue and set the status as pending first and
@ -176,6 +197,37 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// \param node_id The specified node id.
void OnNodeDead(const NodeID &node_id);
/// Clean placement group that belongs to the job id if necessary.
///
/// This interface is a part of automatic lifecycle management for placement groups.
/// When a job is killed, this method should be invoked to clean up
/// placement groups that belong to the given job.
///
/// Calling this method doesn't mean placement groups that belong to the given job
/// will be cleaned. Placement groups are cleaned only when the creator job AND actor
/// are both dead.
///
/// NOTE: This method is idempotent.
///
/// \param job_id The job id where placement groups that need to be cleaned belong to.
void CleanPlacementGroupIfNeededWhenJobDead(const JobID &job_id);
/// Clean placement group that belongs to the actor id if necessary.
///
/// This interface is a part of automatic lifecycle management for placement groups.
/// When an actor is killed, this method should be invoked to clean up
/// placement groups that belong to the given actor.
///
/// Calling this method doesn't mean placement groups that belong to the given actor
/// will be cleaned. Placement groups are cleaned only when the creator job AND actor
/// are both dead.
///
/// NOTE: This method is idempotent.
///
/// \param actor_id The actor id where placement groups that need to be cleaned belong
/// to.
void CleanPlacementGroupIfNeededWhenActorDead(const ActorID &actor_id);
private:
/// Try to create placement group after a short time.
void RetryCreatingPlacementGroup();

View file

@ -191,9 +191,14 @@ void GcsServer::InitGcsActorManager() {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_actor_manager_ = std::make_shared<GcsActorManager>(
scheduler, gcs_table_storage_, gcs_pub_sub_, [this](const rpc::Address &address) {
scheduler, gcs_table_storage_, gcs_pub_sub_,
[this](const ActorID &actor_id) {
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
},
[this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_node_manager_->AddNodeAddedListener(
[this](const std::shared_ptr<rpc::GcsNodeInfo> &) {
// Because a new node has been added, we need to try to schedule the pending
@ -227,6 +232,7 @@ void GcsServer::InitGcsJobManager() {
std::unique_ptr<GcsJobManager>(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_));
gcs_job_manager_->AddJobFinishedListener([this](std::shared_ptr<JobID> job_id) {
gcs_actor_manager_->OnJobFinished(*job_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(*job_id);
});
}

View file

@ -87,6 +87,7 @@ class GcsActorManagerTest : public ::testing::Test {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_actor_manager_.reset(new gcs::GcsActorManager(
mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_,
[](const ActorID &actor_id) {},
[this](const rpc::Address &addr) { return worker_client_; }));
}

View file

@ -351,6 +351,132 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
placement_group->GetPlacementGroupID());
}
TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorDeadAndJobDead) {
// Test the scenario where actor dead -> job dead.
const auto job_id = JobID::FromInt(1);
const auto actor_id = ActorID::Of(job_id, TaskID::Nil(), 0);
auto request = Mocker::GenCreatePlacementGroupRequest(
/* name */ "", rpc::PlacementStrategy::SPREAD,
/* bundles_count */ 2,
/* cpu_num */ 1.0,
/* job_id */ job_id,
/* actor_id */ actor_id);
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
// When both job and actor is dead, placement group should be destroyed.
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))
.Times(0);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
// Placement group shouldn't be cleaned when only an actor is killed.
// When both job and actor is dead, placement group should be destroyed.
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))
.Times(1);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
}
TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorAndJobDead) {
// Test the scenario where job dead -> actor dead.
const auto job_id = JobID::FromInt(1);
const auto actor_id = ActorID::Of(job_id, TaskID::Nil(), 0);
auto request = Mocker::GenCreatePlacementGroupRequest(
/* name */ "", rpc::PlacementStrategy::SPREAD,
/* bundles_count */ 2,
/* cpu_num */ 1.0,
/* job_id */ job_id,
/* actor_id */ actor_id);
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))
.Times(0);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
// Placement group shouldn't be cleaned when only an actor is killed.
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))
.Times(1);
// This method should ensure idempotency.
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id);
}
TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenOnlyJobDead) {
// Test placement group is cleaned when both actor & job are dead.
const auto job_id = JobID::FromInt(1);
auto request = Mocker::GenCreatePlacementGroupRequest(
/* name */ "", rpc::PlacementStrategy::SPREAD,
/* bundles_count */ 2,
/* cpu_num */ 1.0,
/* job_id */ job_id,
/* actor_id */ ActorID::Nil());
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))
.Times(1);
// This method should ensure idempotency.
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
}
TEST_F(GcsPlacementGroupManagerTest,
TestAutomaticCleanupDoNothingWhenDifferentJobIsDead) {
// Test placement group is cleaned when both actor & job are dead.
const auto job_id = JobID::FromInt(1);
const auto different_job_id = JobID::FromInt(3);
auto request = Mocker::GenCreatePlacementGroupRequest(
/* name */ "", rpc::PlacementStrategy::SPREAD,
/* bundles_count */ 2,
/* cpu_num */ 1.0,
/* job_id */ job_id,
/* actor_id */ ActorID::Nil());
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
// This shouldn't have been called.
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))
.Times(0);
// This method should ensure idempotency.
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(different_job_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(different_job_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(different_job_id);
}
} // namespace ray
int main(int argc, char **argv) {

View file

@ -81,18 +81,20 @@ struct Mocker {
static PlacementGroupSpecification GenPlacementGroupCreation(
const std::string &name,
std::vector<std::unordered_map<std::string, double>> &bundles,
rpc::PlacementStrategy strategy) {
rpc::PlacementStrategy strategy, const JobID &job_id, const ActorID &actor_id) {
PlacementGroupSpecBuilder builder;
auto placement_group_id = PlacementGroupID::FromRandom();
builder.SetPlacementGroupSpec(placement_group_id, name, bundles, strategy);
builder.SetPlacementGroupSpec(placement_group_id, name, bundles, strategy, job_id,
actor_id, /* is_creator_detached */ false);
return builder.Build();
}
static rpc::CreatePlacementGroupRequest GenCreatePlacementGroupRequest(
const std::string name = "",
rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD,
int bundles_count = 2, double cpu_num = 1.0) {
int bundles_count = 2, double cpu_num = 1.0, const JobID job_id = JobID::FromInt(1),
const ActorID &actor_id = ActorID::Nil()) {
rpc::CreatePlacementGroupRequest request;
std::vector<std::unordered_map<std::string, double>> bundles;
std::unordered_map<std::string, double> bundle;
@ -101,7 +103,7 @@ struct Mocker {
bundles.push_back(bundle);
}
auto placement_group_creation_spec =
GenPlacementGroupCreation(name, bundles, strategy);
GenPlacementGroupCreation(name, bundles, strategy, job_id, actor_id);
request.mutable_placement_group_spec()->CopyFrom(
placement_group_creation_spec.GetMessage());
return request;

View file

@ -214,6 +214,14 @@ message PlacementGroupSpec {
repeated Bundle bundles = 3;
// The schedule strategy of this Placement Group.
PlacementStrategy strategy = 4;
// The job id that created this placement group.
bytes creator_job_id = 5;
// The actor id that created this placement group.
bytes creator_actor_id = 6;
// Whether or not if the creator job is dead.
bool creator_job_dead = 7;
// Whether or not if the creator actor is dead.
bool creator_actor_dead = 8;
}
message ObjectReference {

View file

@ -181,6 +181,16 @@ message PlacementGroupTableData {
PlacementStrategy strategy = 4;
// Current state of this placement group.
PlacementGroupState state = 5;
// Fields to detect the owner of the placement group
// for automatic lifecycle management.
// The job id that created this placement group.
bytes creator_job_id = 6;
// The actor id that created this placement group.
bytes creator_actor_id = 7;
// Whether or not if the creator job is dead.
bool creator_job_dead = 8;
// Whether or not if the creator actor is dead.
bool creator_actor_dead = 9;
}
message ScheduleData {