[Placement Group] Make the creation of placement group sync (#13858)

* make pg creation sync

* return successful immediately when pg registeration

* hold on

* fix ut

* make collection for callback

* make pg registration vector

* fix new cpp ut

* fix named py ut

* fix python ut bug

* fix python ut

* fix lint

* modify comment

* fix comment

* fix comment

* add new ut and fix old lint issue

* fix comment

* update comment

* fix conflict
This commit is contained in:
DK.Pino 2021-02-24 08:11:43 +08:00 committed by GitHub
parent fe8a500e98
commit 911b028c54
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 254 additions and 197 deletions

View file

@ -13,6 +13,7 @@ from ray.test_utils import (generate_system_config_map, get_other_nodes,
run_string_as_driver, wait_for_condition,
get_error_message)
import ray.cluster_utils
from ray.exceptions import RaySystemError
from ray._raylet import PlacementGroupID
from ray.util.placement_group import (PlacementGroup, placement_group,
remove_placement_group,
@ -1481,13 +1482,17 @@ ray.shutdown()
ray.get(actor.ping.remote())
# Create another placement group and make sure its creation will failed.
same_name_pg = ray.util.placement_group(
[{
"CPU": 1
} for _ in range(2)],
strategy="STRICT_SPREAD",
name=global_placement_group_name)
assert not same_name_pg.wait(10)
error_creation_count = 0
try:
ray.util.placement_group(
[{
"CPU": 1
} for _ in range(2)],
strategy="STRICT_SPREAD",
name=global_placement_group_name)
except RaySystemError:
error_creation_count += 1
assert error_creation_count == 1
# Remove a named placement group and make sure the second creation
# will successful.
@ -1510,5 +1515,35 @@ ray.shutdown()
assert error_count == 1
def test_placement_group_synchronous_registration(ray_start_cluster):
cluster = ray_start_cluster
# One node which only has one CPU.
cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()
ray.init(address=cluster.address)
# Create a placement group that has two bundles and `STRICT_PACK` strategy,
# so, its registration will successful but scheduling failed.
placement_group = ray.util.placement_group(
name="name",
strategy="STRICT_PACK",
bundles=[{
"CPU": 1,
}, {
"CPU": 1
}])
# Make sure we can properly remove it immediately
# as its registration is synchronous.
ray.util.remove_placement_group(placement_group)
def is_placement_group_removed():
table = ray.util.placement_group_table(placement_group)
if "state" not in table:
return False
return table["state"] == "REMOVED"
wait_for_condition(is_placement_group_removed)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -1540,6 +1540,8 @@ Status CoreWorker::CreateActor(const RayFunction &function,
Status CoreWorker::CreatePlacementGroup(
const PlacementGroupCreationOptions &placement_group_creation_options,
PlacementGroupID *return_placement_group_id) {
std::shared_ptr<std::promise<Status>> status_promise =
std::make_shared<std::promise<Status>>();
const PlacementGroupID placement_group_id = PlacementGroupID::FromRandom();
PlacementGroupSpecBuilder builder;
builder.SetPlacementGroupSpec(
@ -1550,9 +1552,21 @@ Status CoreWorker::CreatePlacementGroup(
PlacementGroupSpecification placement_group_spec = builder.Build();
*return_placement_group_id = placement_group_id;
RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id;
RAY_CHECK_OK(
gcs_client_->PlacementGroups().AsyncCreatePlacementGroup(placement_group_spec));
return Status::OK();
RAY_UNUSED(gcs_client_->PlacementGroups().AsyncCreatePlacementGroup(
placement_group_spec,
[status_promise](const Status &status) { status_promise->set_value(status); }));
auto status_future = status_promise->get_future();
if (status_future.wait_for(std::chrono::seconds(
RayConfig::instance().gcs_server_request_timeout_seconds())) !=
std::future_status::ready) {
std::ostringstream stream;
stream << "There was timeout in creating the placement group of id "
<< placement_group_id
<< ". It is probably "
"because GCS server is dead or there's a high load there.";
return Status::TimedOut(stream.str());
}
return status_future.get();
}
Status CoreWorker::RemovePlacementGroup(const PlacementGroupID &placement_group_id) {

View file

@ -715,7 +715,8 @@ class PlacementGroupInfoAccessor {
/// written to GCS.
/// \return Status.
virtual Status AsyncCreatePlacementGroup(
const PlacementGroupSpecification &placement_group_spec) = 0;
const PlacementGroupSpecification &placement_group_spec,
const StatusCallback &callback) = 0;
/// Get a placement group data from GCS asynchronously by id.
///

View file

@ -1394,12 +1394,13 @@ ServiceBasedPlacementGroupInfoAccessor::ServiceBasedPlacementGroupInfoAccessor(
: client_impl_(client_impl) {}
Status ServiceBasedPlacementGroupInfoAccessor::AsyncCreatePlacementGroup(
const ray::PlacementGroupSpecification &placement_group_spec) {
const ray::PlacementGroupSpecification &placement_group_spec,
const StatusCallback &callback) {
rpc::CreatePlacementGroupRequest request;
request.mutable_placement_group_spec()->CopyFrom(placement_group_spec.GetMessage());
client_impl_->GetGcsRpcClient().CreatePlacementGroup(
request, [placement_group_spec](const Status &,
const rpc::CreatePlacementGroupReply &reply) {
request, [placement_group_spec, callback](
const Status &, const rpc::CreatePlacementGroupReply &reply) {
auto status =
reply.status().code() == (int)StatusCode::OK
? Status()
@ -1412,6 +1413,9 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncCreatePlacementGroup(
<< placement_group_spec.PlacementGroupId()
<< " failed to be registered. " << status;
}
if (callback) {
callback(status);
}
});
return Status::OK();
}

View file

@ -440,7 +440,8 @@ class ServiceBasedPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor
virtual ~ServiceBasedPlacementGroupInfoAccessor() = default;
Status AsyncCreatePlacementGroup(
const PlacementGroupSpecification &placement_group_spec) override;
const PlacementGroupSpecification &placement_group_spec,
const StatusCallback &callback) override;
Status AsyncRemovePlacementGroup(const PlacementGroupID &placement_group_id,
const StatusCallback &callback) override;

View file

@ -131,12 +131,12 @@ void GcsPlacementGroupManager::RegisterPlacementGroup(
auto iter = registered_placement_groups_.find(placement_group_id);
if (iter != registered_placement_groups_.end()) {
auto pending_register_iter =
placement_group_to_register_callback_.find(placement_group_id);
if (pending_register_iter != placement_group_to_register_callback_.end()) {
placement_group_to_register_callbacks_.find(placement_group_id);
if (pending_register_iter != placement_group_to_register_callbacks_.end()) {
// 1. The GCS client sends the `RegisterPlacementGroup` request to the GCS server.
// 2. The GCS client receives some network errors.
// 3. The GCS client resends the `RegisterPlacementGroup` request to the GCS server.
pending_register_iter->second = std::move(callback);
pending_register_iter->second.emplace_back(std::move(callback));
} else {
// 1. The GCS client sends the `RegisterPlacementGroup` request to the GCS server.
// 2. The GCS server flushes the placement group to the storage and restarts before
@ -164,10 +164,8 @@ void GcsPlacementGroupManager::RegisterPlacementGroup(
}
}
// Mark the callback as pending and invoke it after the placement_group has been
// successfully created.
placement_group_to_register_callback_[placement_group->GetPlacementGroupID()] =
std::move(callback);
placement_group_to_register_callbacks_[placement_group->GetPlacementGroupID()]
.emplace_back(std::move(callback));
registered_placement_groups_.emplace(placement_group->GetPlacementGroupID(),
placement_group);
pending_placement_groups_.emplace_back(placement_group);
@ -175,18 +173,28 @@ void GcsPlacementGroupManager::RegisterPlacementGroup(
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put(
placement_group_id, placement_group->GetPlacementGroupTableData(),
[this, placement_group_id, placement_group](Status status) {
// The backend storage is supposed to be reliable, so the status must be ok.
RAY_CHECK_OK(status);
if (!registered_placement_groups_.contains(placement_group_id)) {
auto iter = placement_group_to_register_callback_.find(placement_group_id);
if (iter != placement_group_to_register_callback_.end()) {
std::stringstream stream;
stream << "Placement group of id " << placement_group_id
<< " has been removed before registration.";
iter->second(Status::NotFound(stream.str()));
placement_group_to_register_callback_.erase(iter);
if (registered_placement_groups_.contains(placement_group_id)) {
auto iter = placement_group_to_register_callbacks_.find(placement_group_id);
auto callbacks = std::move(iter->second);
placement_group_to_register_callbacks_.erase(iter);
for (const auto &callback : callbacks) {
callback(status);
}
} else {
SchedulePendingPlacementGroups();
} else {
// The placement group registration is synchronous, so if we found the placement
// group was deleted here, it must be triggered by the abnormal exit of job,
// we will return directly in this case.
RAY_CHECK(placement_group_to_register_callbacks_.count(placement_group_id) == 0)
<< "The placement group has been removed unexpectedly with an unknown "
"error. Please file a bug report on here: "
"https://github.com/ray-project/ray/issues";
RAY_LOG(WARNING) << "Failed to create placement group '"
<< placement_group->GetPlacementGroupID()
<< "', because the placement group has been removed by GCS.";
return;
}
}));
}
@ -236,13 +244,6 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
[this, placement_group_id](Status status) {
RAY_CHECK_OK(status);
// Invoke callback for registration request of this placement_group
// and remove it from placement_group_to_register_callback_.
auto iter = placement_group_to_register_callback_.find(placement_group_id);
if (iter != placement_group_to_register_callback_.end()) {
iter->second(Status::OK());
placement_group_to_register_callback_.erase(iter);
}
MarkSchedulingDone();
SchedulePendingPlacementGroups();
@ -331,7 +332,7 @@ void GcsPlacementGroupManager::RemovePlacementGroup(
}
auto placement_group = std::move(placement_group_it->second);
registered_placement_groups_.erase(placement_group_it);
placement_group_to_create_callbacks_.erase(placement_group_id);
placement_group_to_register_callbacks_.erase(placement_group_id);
// Remove placement group from `named_placement_groups_` if its name is not empty.
if (!placement_group->GetName().empty()) {
@ -369,13 +370,15 @@ void GcsPlacementGroupManager::RemovePlacementGroup(
placement_group->GetPlacementGroupTableData(),
[this, on_placement_group_removed, placement_group_id](Status status) {
RAY_CHECK_OK(status);
// If placement group hasn't been created yet, send a response to a core worker
// that the creation of placement group has failed.
auto it = placement_group_to_register_callback_.find(placement_group_id);
if (it != placement_group_to_register_callback_.end()) {
it->second(
Status::NotFound("Placement group is removed before it is created."));
placement_group_to_register_callback_.erase(it);
// If there is a driver waiting for the creation done, then send a message that
// the placement group has been removed.
auto it = placement_group_to_create_callbacks_.find(placement_group_id);
if (it != placement_group_to_create_callbacks_.end()) {
for (auto &callback : it->second) {
callback(
Status::NotFound("Placement group is removed before it is created."));
}
placement_group_to_create_callbacks_.erase(it);
}
on_placement_group_removed(status);
}));
@ -461,26 +464,37 @@ void GcsPlacementGroupManager::HandleWaitPlacementGroupUntilReady(
RAY_LOG(DEBUG) << "Waiting for placement group until ready, placement group id = "
<< placement_group_id;
auto callback = [placement_group_id, reply, send_reply_callback](const Status &status) {
RAY_LOG(DEBUG)
<< "Finished waiting for placement group until ready, placement group id = "
<< placement_group_id;
WaitPlacementGroup(placement_group_id, [reply, send_reply_callback,
placement_group_id](Status status) {
if (status.ok()) {
RAY_LOG(DEBUG)
<< "Finished waiting for placement group until ready, placement group id = "
<< placement_group_id;
} else {
RAY_LOG(WARNING)
<< "Failed to waiting for placement group until ready, placement group id = "
<< placement_group_id << ", cause: " << status.message();
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
});
++counts_[CountType::WAIT_PLACEMENT_GROUP_UNTIL_READY_REQUEST];
}
void GcsPlacementGroupManager::WaitPlacementGroup(
const PlacementGroupID &placement_group_id, StatusCallback callback) {
// If the placement group does not exist or it has been successfully created, return
// directly.
const auto &iter = registered_placement_groups_.find(placement_group_id);
if (iter == registered_placement_groups_.end()) {
// Check whether the placement group does not exist or is removed.
auto on_done = [this, placement_group_id, reply, callback, send_reply_callback](
auto on_done = [this, placement_group_id, callback](
const Status &status,
const boost::optional<PlacementGroupTableData> &result) {
if (result) {
RAY_LOG(DEBUG) << "Placement group is removed, placement group id = "
<< placement_group_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply,
Status::NotFound("Placement group is removed."));
callback(Status::NotFound("Placement group is removed."));
} else {
// `wait` is a method of placement group object. Placement group object is
// obtained by create placement group api, so it can guarantee the existence of
@ -501,13 +515,11 @@ void GcsPlacementGroupManager::HandleWaitPlacementGroupUntilReady(
} else if (iter->second->GetState() == rpc::PlacementGroupTableData::CREATED) {
RAY_LOG(DEBUG) << "Placement group is created, placement group id = "
<< placement_group_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
callback(Status::OK());
} else {
placement_group_to_create_callbacks_[placement_group_id].emplace_back(
std::move(callback));
}
++counts_[CountType::WAIT_PLACEMENT_GROUP_UNTIL_READY_REQUEST];
}
void GcsPlacementGroupManager::RetryCreatingPlacementGroup() {

View file

@ -167,6 +167,14 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
rpc::WaitPlacementGroupUntilReadyReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Register a callback which will be invoked after successfully created.
///
/// \param placement_group_id The placement group id which we want to listen.
/// \param callback Will be invoked after the placement group is created successfully or
/// be invoked if the placement group is deleted before create successfully.
void WaitPlacementGroup(const PlacementGroupID &placement_group_id,
StatusCallback callback);
/// Register placement_group asynchronously.
///
/// \param placement_group The placement group to be created.
@ -284,9 +292,11 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// execute_after).
boost::asio::io_context &io_context_;
/// Callback of placement_group registration requests that are not yet flushed.
absl::flat_hash_map<PlacementGroupID, StatusCallback>
placement_group_to_register_callback_;
/// Callbacks of pending `RegisterPlacementGroup` requests.
/// Maps placement group ID to placement group registration callbacks, which is used to
/// filter duplicated messages from a driver/worker caused by some network problems.
absl::flat_hash_map<PlacementGroupID, std::vector<StatusCallback>>
placement_group_to_register_callbacks_;
/// Callback of `WaitPlacementGroupUntilReady` requests.
absl::flat_hash_map<PlacementGroupID, std::vector<StatusCallback>>

View file

@ -21,6 +21,7 @@
namespace ray {
using ::testing::_;
using StatusCallback = std::function<void(Status status)>;
class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterface {
public:
@ -89,6 +90,34 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
thread_io_service_->join();
}
// Make placement group registration sync.
void RegisterPlacementGroup(const ray::rpc::CreatePlacementGroupRequest &request,
StatusCallback callback) {
std::promise<void> promise;
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&callback, &promise](Status status) {
RAY_CHECK_OK(status);
callback(status);
promise.set_value();
});
promise.get_future().get();
}
// We need this to ensure that `MarkSchedulingDone` and `SchedulePendingPlacementGroups`
// was already invoked when we have invoked `OnPlacementGroupCreationSuccess`.
void OnPlacementGroupCreationSuccess(
const std::shared_ptr<gcs::GcsPlacementGroup> &placement_group) {
std::promise<void> promise;
gcs_placement_group_manager_->WaitPlacementGroup(
placement_group->GetPlacementGroupID(), [&promise](Status status) {
RAY_CHECK_OK(status);
promise.set_value();
});
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
promise.get_future().get();
}
void WaitForExpectedPgCount(int expected_count) {
auto condition = [this, expected_count]() {
return mock_placement_group_scheduler_->GetPlacementGroupCount() == expected_count;
@ -110,32 +139,28 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
TEST_F(GcsPlacementGroupManagerTest, TestBasic) {
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request,
[&registered_placement_group_count](const Status &status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
}
TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) {
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request,
[&registered_placement_group_count](const Status &status) {
++registered_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
@ -144,30 +169,25 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) {
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
mock_placement_group_scheduler_->placement_groups_.clear();
ASSERT_EQ(finished_placement_group_count, 0);
// Check that the placement_group is in state `CREATED`.
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
}
TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) {
auto request = Mocker::GenCreatePlacementGroupRequest("test_name");
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
ASSERT_EQ(
gcs_placement_group_manager_->GetPlacementGroupIDByName("test_name"),
@ -176,20 +196,18 @@ TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) {
TEST_F(GcsPlacementGroupManagerTest, TestRemoveNamedPlacementGroup) {
auto request = Mocker::GenCreatePlacementGroupRequest("test_name");
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request,
[&registered_placement_group_count](const Status &status) {
++registered_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
// Remove the named placement group.
gcs_placement_group_manager_->RemovePlacementGroup(
@ -201,13 +219,11 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemoveNamedPlacementGroup) {
TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) {
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
@ -219,22 +235,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) {
TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
std::atomic<int> failed_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count,
&failed_placement_group_count](const Status &status) {
if (status.ok()) {
++finished_placement_group_count;
} else {
++failed_placement_group_count;
}
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(failed_placement_group_count, 0);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
@ -249,8 +255,6 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
mock_placement_group_scheduler_->placement_groups_.clear();
WaitForExpectedCount(finished_placement_group_count, 0);
WaitForExpectedCount(failed_placement_group_count, 1);
// Make sure we can re-remove again.
gcs_placement_group_manager_->RemovePlacementGroup(
@ -259,22 +263,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) {
TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
std::atomic<int> failed_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count,
&failed_placement_group_count](const Status &status) {
if (status.ok()) {
++finished_placement_group_count;
} else {
++failed_placement_group_count;
}
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
ASSERT_EQ(finished_placement_group_count, 0);
ASSERT_EQ(failed_placement_group_count, 0);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.clear();
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING);
@ -292,8 +286,6 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
mock_placement_group_scheduler_->placement_groups_.clear();
WaitForExpectedCount(finished_placement_group_count, 0);
WaitForExpectedCount(failed_placement_group_count, 1);
// Make sure we can re-remove again.
gcs_placement_group_manager_->RemovePlacementGroup(
@ -302,20 +294,17 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) {
TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](const Status &status) {
if (status.ok()) {
++finished_placement_group_count;
}
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
mock_placement_group_scheduler_->placement_groups_.pop_back();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
// We have ensured that this operation is synchronized.
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
const auto &placement_group_id = placement_group->GetPlacementGroupID();
@ -332,7 +321,6 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0);
mock_placement_group_scheduler_->placement_groups_.clear();
ASSERT_EQ(finished_placement_group_count, 1);
// Make sure we can re-remove again.
gcs_placement_group_manager_->RemovePlacementGroup(
@ -341,19 +329,15 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) {
TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
auto request1 = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request1),
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request1, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
auto request2 = Mocker::GenCreatePlacementGroupRequest();
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request2),
[&finished_placement_group_count](const Status &status) {
++finished_placement_group_count;
});
ASSERT_EQ(finished_placement_group_count, 0);
RegisterPlacementGroup(request2, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 2);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
placement_group->GetMutableBundle(0)->set_node_id(NodeID::FromRandom().Binary());
@ -371,9 +355,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
// Trigger scheduling `RESCHEDULING` placement group.
auto finished_group = std::make_shared<gcs::GcsPlacementGroup>(
placement_group->GetPlacementGroupTableData());
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(finished_group);
WaitForExpectedCount(finished_placement_group_count, 1);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1);
OnPlacementGroupCreationSuccess(finished_group);
ASSERT_EQ(finished_group->GetState(), rpc::PlacementGroupTableData::CREATED);
WaitForExpectedPgCount(1);
ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_[0]->GetPlacementGroupID(),
placement_group->GetPlacementGroupID());
const auto &bundles =
@ -401,17 +385,16 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorDeadAndJobDead
/* cpu_num */ 1.0,
/* job_id */ job_id,
/* actor_id */ actor_id);
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
// When both job and actor is dead, placement group should be destroyed.
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))
@ -435,17 +418,16 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorAndJobDead) {
/* cpu_num */ 1.0,
/* job_id */ job_id,
/* actor_id */ actor_id);
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))
.Times(0);
@ -469,17 +451,16 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenOnlyJobDead) {
/* cpu_num */ 1.0,
/* job_id */ job_id,
/* actor_id */ ActorID::Nil());
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))
.Times(1);
@ -500,17 +481,16 @@ TEST_F(GcsPlacementGroupManagerTest,
/* cpu_num */ 1.0,
/* job_id */ job_id,
/* actor_id */ ActorID::Nil());
std::atomic<int> finished_placement_group_count(0);
gcs_placement_group_manager_->RegisterPlacementGroup(
std::make_shared<gcs::GcsPlacementGroup>(request),
[&finished_placement_group_count](Status status) {
++finished_placement_group_count;
});
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
auto placement_group_id = placement_group->GetPlacementGroupID();
gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group);
WaitForExpectedCount(finished_placement_group_count, 1);
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
// This shouldn't have been called.
EXPECT_CALL(*mock_placement_group_scheduler_,
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))