From 911b028c546b79bbbb6d8c22d2d444833be9546d Mon Sep 17 00:00:00 2001 From: "DK.Pino" Date: Wed, 24 Feb 2021 08:11:43 +0800 Subject: [PATCH] [Placement Group] Make the creation of placement group sync (#13858) * make pg creation sync * return successful immediately when pg registeration * hold on * fix ut * make collection for callback * make pg registration vector * fix new cpp ut * fix named py ut * fix python ut bug * fix python ut * fix lint * modify comment * fix comment * fix comment * add new ut and fix old lint issue * fix comment * update comment * fix conflict --- python/ray/tests/test_placement_group.py | 49 +++- src/ray/core_worker/core_worker.cc | 20 +- src/ray/gcs/accessor.h | 3 +- .../gcs/gcs_client/service_based_accessor.cc | 10 +- .../gcs/gcs_client/service_based_accessor.h | 3 +- .../gcs_server/gcs_placement_group_manager.cc | 96 ++++--- .../gcs_server/gcs_placement_group_manager.h | 16 +- .../test/gcs_placement_group_manager_test.cc | 254 ++++++++---------- 8 files changed, 254 insertions(+), 197 deletions(-) diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 00cc622df..bd64c37b0 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -13,6 +13,7 @@ from ray.test_utils import (generate_system_config_map, get_other_nodes, run_string_as_driver, wait_for_condition, get_error_message) import ray.cluster_utils +from ray.exceptions import RaySystemError from ray._raylet import PlacementGroupID from ray.util.placement_group import (PlacementGroup, placement_group, remove_placement_group, @@ -1481,13 +1482,17 @@ ray.shutdown() ray.get(actor.ping.remote()) # Create another placement group and make sure its creation will failed. - same_name_pg = ray.util.placement_group( - [{ - "CPU": 1 - } for _ in range(2)], - strategy="STRICT_SPREAD", - name=global_placement_group_name) - assert not same_name_pg.wait(10) + error_creation_count = 0 + try: + ray.util.placement_group( + [{ + "CPU": 1 + } for _ in range(2)], + strategy="STRICT_SPREAD", + name=global_placement_group_name) + except RaySystemError: + error_creation_count += 1 + assert error_creation_count == 1 # Remove a named placement group and make sure the second creation # will successful. @@ -1510,5 +1515,35 @@ ray.shutdown() assert error_count == 1 +def test_placement_group_synchronous_registration(ray_start_cluster): + cluster = ray_start_cluster + # One node which only has one CPU. + cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + # Create a placement group that has two bundles and `STRICT_PACK` strategy, + # so, its registration will successful but scheduling failed. + placement_group = ray.util.placement_group( + name="name", + strategy="STRICT_PACK", + bundles=[{ + "CPU": 1, + }, { + "CPU": 1 + }]) + # Make sure we can properly remove it immediately + # as its registration is synchronous. + ray.util.remove_placement_group(placement_group) + + def is_placement_group_removed(): + table = ray.util.placement_group_table(placement_group) + if "state" not in table: + return False + return table["state"] == "REMOVED" + + wait_for_condition(is_placement_group_removed) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5770c3c5f..37bb12be7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1540,6 +1540,8 @@ Status CoreWorker::CreateActor(const RayFunction &function, Status CoreWorker::CreatePlacementGroup( const PlacementGroupCreationOptions &placement_group_creation_options, PlacementGroupID *return_placement_group_id) { + std::shared_ptr> status_promise = + std::make_shared>(); const PlacementGroupID placement_group_id = PlacementGroupID::FromRandom(); PlacementGroupSpecBuilder builder; builder.SetPlacementGroupSpec( @@ -1550,9 +1552,21 @@ Status CoreWorker::CreatePlacementGroup( PlacementGroupSpecification placement_group_spec = builder.Build(); *return_placement_group_id = placement_group_id; RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id; - RAY_CHECK_OK( - gcs_client_->PlacementGroups().AsyncCreatePlacementGroup(placement_group_spec)); - return Status::OK(); + RAY_UNUSED(gcs_client_->PlacementGroups().AsyncCreatePlacementGroup( + placement_group_spec, + [status_promise](const Status &status) { status_promise->set_value(status); })); + auto status_future = status_promise->get_future(); + if (status_future.wait_for(std::chrono::seconds( + RayConfig::instance().gcs_server_request_timeout_seconds())) != + std::future_status::ready) { + std::ostringstream stream; + stream << "There was timeout in creating the placement group of id " + << placement_group_id + << ". It is probably " + "because GCS server is dead or there's a high load there."; + return Status::TimedOut(stream.str()); + } + return status_future.get(); } Status CoreWorker::RemovePlacementGroup(const PlacementGroupID &placement_group_id) { diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index db2aa41f9..092241402 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -715,7 +715,8 @@ class PlacementGroupInfoAccessor { /// written to GCS. /// \return Status. virtual Status AsyncCreatePlacementGroup( - const PlacementGroupSpecification &placement_group_spec) = 0; + const PlacementGroupSpecification &placement_group_spec, + const StatusCallback &callback) = 0; /// Get a placement group data from GCS asynchronously by id. /// diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index e1b5713eb..e0e749d59 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -1394,12 +1394,13 @@ ServiceBasedPlacementGroupInfoAccessor::ServiceBasedPlacementGroupInfoAccessor( : client_impl_(client_impl) {} Status ServiceBasedPlacementGroupInfoAccessor::AsyncCreatePlacementGroup( - const ray::PlacementGroupSpecification &placement_group_spec) { + const ray::PlacementGroupSpecification &placement_group_spec, + const StatusCallback &callback) { rpc::CreatePlacementGroupRequest request; request.mutable_placement_group_spec()->CopyFrom(placement_group_spec.GetMessage()); client_impl_->GetGcsRpcClient().CreatePlacementGroup( - request, [placement_group_spec](const Status &, - const rpc::CreatePlacementGroupReply &reply) { + request, [placement_group_spec, callback]( + const Status &, const rpc::CreatePlacementGroupReply &reply) { auto status = reply.status().code() == (int)StatusCode::OK ? Status() @@ -1412,6 +1413,9 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncCreatePlacementGroup( << placement_group_spec.PlacementGroupId() << " failed to be registered. " << status; } + if (callback) { + callback(status); + } }); return Status::OK(); } diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index e2d084d31..3cbf872e2 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -440,7 +440,8 @@ class ServiceBasedPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor virtual ~ServiceBasedPlacementGroupInfoAccessor() = default; Status AsyncCreatePlacementGroup( - const PlacementGroupSpecification &placement_group_spec) override; + const PlacementGroupSpecification &placement_group_spec, + const StatusCallback &callback) override; Status AsyncRemovePlacementGroup(const PlacementGroupID &placement_group_id, const StatusCallback &callback) override; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 12260d867..e9e368791 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -131,12 +131,12 @@ void GcsPlacementGroupManager::RegisterPlacementGroup( auto iter = registered_placement_groups_.find(placement_group_id); if (iter != registered_placement_groups_.end()) { auto pending_register_iter = - placement_group_to_register_callback_.find(placement_group_id); - if (pending_register_iter != placement_group_to_register_callback_.end()) { + placement_group_to_register_callbacks_.find(placement_group_id); + if (pending_register_iter != placement_group_to_register_callbacks_.end()) { // 1. The GCS client sends the `RegisterPlacementGroup` request to the GCS server. // 2. The GCS client receives some network errors. // 3. The GCS client resends the `RegisterPlacementGroup` request to the GCS server. - pending_register_iter->second = std::move(callback); + pending_register_iter->second.emplace_back(std::move(callback)); } else { // 1. The GCS client sends the `RegisterPlacementGroup` request to the GCS server. // 2. The GCS server flushes the placement group to the storage and restarts before @@ -164,10 +164,8 @@ void GcsPlacementGroupManager::RegisterPlacementGroup( } } - // Mark the callback as pending and invoke it after the placement_group has been - // successfully created. - placement_group_to_register_callback_[placement_group->GetPlacementGroupID()] = - std::move(callback); + placement_group_to_register_callbacks_[placement_group->GetPlacementGroupID()] + .emplace_back(std::move(callback)); registered_placement_groups_.emplace(placement_group->GetPlacementGroupID(), placement_group); pending_placement_groups_.emplace_back(placement_group); @@ -175,18 +173,28 @@ void GcsPlacementGroupManager::RegisterPlacementGroup( RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( placement_group_id, placement_group->GetPlacementGroupTableData(), [this, placement_group_id, placement_group](Status status) { + // The backend storage is supposed to be reliable, so the status must be ok. RAY_CHECK_OK(status); - if (!registered_placement_groups_.contains(placement_group_id)) { - auto iter = placement_group_to_register_callback_.find(placement_group_id); - if (iter != placement_group_to_register_callback_.end()) { - std::stringstream stream; - stream << "Placement group of id " << placement_group_id - << " has been removed before registration."; - iter->second(Status::NotFound(stream.str())); - placement_group_to_register_callback_.erase(iter); + if (registered_placement_groups_.contains(placement_group_id)) { + auto iter = placement_group_to_register_callbacks_.find(placement_group_id); + auto callbacks = std::move(iter->second); + placement_group_to_register_callbacks_.erase(iter); + for (const auto &callback : callbacks) { + callback(status); } - } else { SchedulePendingPlacementGroups(); + } else { + // The placement group registration is synchronous, so if we found the placement + // group was deleted here, it must be triggered by the abnormal exit of job, + // we will return directly in this case. + RAY_CHECK(placement_group_to_register_callbacks_.count(placement_group_id) == 0) + << "The placement group has been removed unexpectedly with an unknown " + "error. Please file a bug report on here: " + "https://github.com/ray-project/ray/issues"; + RAY_LOG(WARNING) << "Failed to create placement group '" + << placement_group->GetPlacementGroupID() + << "', because the placement group has been removed by GCS."; + return; } })); } @@ -236,13 +244,6 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess( [this, placement_group_id](Status status) { RAY_CHECK_OK(status); - // Invoke callback for registration request of this placement_group - // and remove it from placement_group_to_register_callback_. - auto iter = placement_group_to_register_callback_.find(placement_group_id); - if (iter != placement_group_to_register_callback_.end()) { - iter->second(Status::OK()); - placement_group_to_register_callback_.erase(iter); - } MarkSchedulingDone(); SchedulePendingPlacementGroups(); @@ -331,7 +332,7 @@ void GcsPlacementGroupManager::RemovePlacementGroup( } auto placement_group = std::move(placement_group_it->second); registered_placement_groups_.erase(placement_group_it); - placement_group_to_create_callbacks_.erase(placement_group_id); + placement_group_to_register_callbacks_.erase(placement_group_id); // Remove placement group from `named_placement_groups_` if its name is not empty. if (!placement_group->GetName().empty()) { @@ -369,13 +370,15 @@ void GcsPlacementGroupManager::RemovePlacementGroup( placement_group->GetPlacementGroupTableData(), [this, on_placement_group_removed, placement_group_id](Status status) { RAY_CHECK_OK(status); - // If placement group hasn't been created yet, send a response to a core worker - // that the creation of placement group has failed. - auto it = placement_group_to_register_callback_.find(placement_group_id); - if (it != placement_group_to_register_callback_.end()) { - it->second( - Status::NotFound("Placement group is removed before it is created.")); - placement_group_to_register_callback_.erase(it); + // If there is a driver waiting for the creation done, then send a message that + // the placement group has been removed. + auto it = placement_group_to_create_callbacks_.find(placement_group_id); + if (it != placement_group_to_create_callbacks_.end()) { + for (auto &callback : it->second) { + callback( + Status::NotFound("Placement group is removed before it is created.")); + } + placement_group_to_create_callbacks_.erase(it); } on_placement_group_removed(status); })); @@ -461,26 +464,37 @@ void GcsPlacementGroupManager::HandleWaitPlacementGroupUntilReady( RAY_LOG(DEBUG) << "Waiting for placement group until ready, placement group id = " << placement_group_id; - auto callback = [placement_group_id, reply, send_reply_callback](const Status &status) { - RAY_LOG(DEBUG) - << "Finished waiting for placement group until ready, placement group id = " - << placement_group_id; + WaitPlacementGroup(placement_group_id, [reply, send_reply_callback, + placement_group_id](Status status) { + if (status.ok()) { + RAY_LOG(DEBUG) + << "Finished waiting for placement group until ready, placement group id = " + << placement_group_id; + } else { + RAY_LOG(WARNING) + << "Failed to waiting for placement group until ready, placement group id = " + << placement_group_id << ", cause: " << status.message(); + } GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); - }; + }); + ++counts_[CountType::WAIT_PLACEMENT_GROUP_UNTIL_READY_REQUEST]; +} + +void GcsPlacementGroupManager::WaitPlacementGroup( + const PlacementGroupID &placement_group_id, StatusCallback callback) { // If the placement group does not exist or it has been successfully created, return // directly. const auto &iter = registered_placement_groups_.find(placement_group_id); if (iter == registered_placement_groups_.end()) { // Check whether the placement group does not exist or is removed. - auto on_done = [this, placement_group_id, reply, callback, send_reply_callback]( + auto on_done = [this, placement_group_id, callback]( const Status &status, const boost::optional &result) { if (result) { RAY_LOG(DEBUG) << "Placement group is removed, placement group id = " << placement_group_id; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, - Status::NotFound("Placement group is removed.")); + callback(Status::NotFound("Placement group is removed.")); } else { // `wait` is a method of placement group object. Placement group object is // obtained by create placement group api, so it can guarantee the existence of @@ -501,13 +515,11 @@ void GcsPlacementGroupManager::HandleWaitPlacementGroupUntilReady( } else if (iter->second->GetState() == rpc::PlacementGroupTableData::CREATED) { RAY_LOG(DEBUG) << "Placement group is created, placement group id = " << placement_group_id; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); + callback(Status::OK()); } else { placement_group_to_create_callbacks_[placement_group_id].emplace_back( std::move(callback)); } - - ++counts_[CountType::WAIT_PLACEMENT_GROUP_UNTIL_READY_REQUEST]; } void GcsPlacementGroupManager::RetryCreatingPlacementGroup() { diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index 49a7634df..91ba14c51 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -167,6 +167,14 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { rpc::WaitPlacementGroupUntilReadyReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Register a callback which will be invoked after successfully created. + /// + /// \param placement_group_id The placement group id which we want to listen. + /// \param callback Will be invoked after the placement group is created successfully or + /// be invoked if the placement group is deleted before create successfully. + void WaitPlacementGroup(const PlacementGroupID &placement_group_id, + StatusCallback callback); + /// Register placement_group asynchronously. /// /// \param placement_group The placement group to be created. @@ -284,9 +292,11 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// execute_after). boost::asio::io_context &io_context_; - /// Callback of placement_group registration requests that are not yet flushed. - absl::flat_hash_map - placement_group_to_register_callback_; + /// Callbacks of pending `RegisterPlacementGroup` requests. + /// Maps placement group ID to placement group registration callbacks, which is used to + /// filter duplicated messages from a driver/worker caused by some network problems. + absl::flat_hash_map> + placement_group_to_register_callbacks_; /// Callback of `WaitPlacementGroupUntilReady` requests. absl::flat_hash_map> diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index 77784e44b..262d1f933 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -21,6 +21,7 @@ namespace ray { using ::testing::_; +using StatusCallback = std::function; class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterface { public: @@ -89,6 +90,34 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { thread_io_service_->join(); } + // Make placement group registration sync. + void RegisterPlacementGroup(const ray::rpc::CreatePlacementGroupRequest &request, + StatusCallback callback) { + std::promise promise; + gcs_placement_group_manager_->RegisterPlacementGroup( + std::make_shared(request), + [&callback, &promise](Status status) { + RAY_CHECK_OK(status); + callback(status); + promise.set_value(); + }); + promise.get_future().get(); + } + + // We need this to ensure that `MarkSchedulingDone` and `SchedulePendingPlacementGroups` + // was already invoked when we have invoked `OnPlacementGroupCreationSuccess`. + void OnPlacementGroupCreationSuccess( + const std::shared_ptr &placement_group) { + std::promise promise; + gcs_placement_group_manager_->WaitPlacementGroup( + placement_group->GetPlacementGroupID(), [&promise](Status status) { + RAY_CHECK_OK(status); + promise.set_value(); + }); + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); + promise.get_future().get(); + } + void WaitForExpectedPgCount(int expected_count) { auto condition = [this, expected_count]() { return mock_placement_group_scheduler_->GetPlacementGroupCount() == expected_count; @@ -110,32 +139,28 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { TEST_F(GcsPlacementGroupManagerTest, TestBasic) { auto request = Mocker::GenCreatePlacementGroupRequest(); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](const Status &status) { - ++finished_placement_group_count; - }); - ASSERT_EQ(finished_placement_group_count, 0); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, + [®istered_placement_group_count](const Status &status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.pop_back(); - - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); - WaitForExpectedCount(finished_placement_group_count, 1); + OnPlacementGroupCreationSuccess(placement_group); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); } TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) { auto request = Mocker::GenCreatePlacementGroupRequest(); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](const Status &status) { - ++finished_placement_group_count; - }); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, + [®istered_placement_group_count](const Status &status) { + ++registered_placement_group_count; + }); - ASSERT_EQ(finished_placement_group_count, 0); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.clear(); @@ -144,30 +169,25 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) { gcs_placement_group_manager_->SchedulePendingPlacementGroups(); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); mock_placement_group_scheduler_->placement_groups_.clear(); - ASSERT_EQ(finished_placement_group_count, 0); // Check that the placement_group is in state `CREATED`. - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); - WaitForExpectedCount(finished_placement_group_count, 1); + OnPlacementGroupCreationSuccess(placement_group); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); } TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) { auto request = Mocker::GenCreatePlacementGroupRequest("test_name"); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](const Status &status) { - ++finished_placement_group_count; - }); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); - ASSERT_EQ(finished_placement_group_count, 0); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.pop_back(); - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); - WaitForExpectedCount(finished_placement_group_count, 1); + OnPlacementGroupCreationSuccess(placement_group); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); ASSERT_EQ( gcs_placement_group_manager_->GetPlacementGroupIDByName("test_name"), @@ -176,20 +196,18 @@ TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) { TEST_F(GcsPlacementGroupManagerTest, TestRemoveNamedPlacementGroup) { auto request = Mocker::GenCreatePlacementGroupRequest("test_name"); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](const Status &status) { - ++finished_placement_group_count; - }); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, + [®istered_placement_group_count](const Status &status) { + ++registered_placement_group_count; + }); - ASSERT_EQ(finished_placement_group_count, 0); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.pop_back(); - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); - WaitForExpectedCount(finished_placement_group_count, 1); + OnPlacementGroupCreationSuccess(placement_group); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); // Remove the named placement group. gcs_placement_group_manager_->RemovePlacementGroup( @@ -201,13 +219,11 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemoveNamedPlacementGroup) { TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) { auto request = Mocker::GenCreatePlacementGroupRequest(); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](const Status &status) { - ++finished_placement_group_count; - }); - ASSERT_EQ(finished_placement_group_count, 0); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.pop_back(); @@ -219,22 +235,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) { TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { auto request = Mocker::GenCreatePlacementGroupRequest(); - std::atomic finished_placement_group_count(0); - std::atomic failed_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count, - &failed_placement_group_count](const Status &status) { - if (status.ok()) { - ++finished_placement_group_count; - } else { - ++failed_placement_group_count; - } - }); - + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); - ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(failed_placement_group_count, 0); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.clear(); @@ -249,8 +255,6 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { gcs_placement_group_manager_->SchedulePendingPlacementGroups(); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); mock_placement_group_scheduler_->placement_groups_.clear(); - WaitForExpectedCount(finished_placement_group_count, 0); - WaitForExpectedCount(failed_placement_group_count, 1); // Make sure we can re-remove again. gcs_placement_group_manager_->RemovePlacementGroup( @@ -259,22 +263,12 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { auto request = Mocker::GenCreatePlacementGroupRequest(); - std::atomic finished_placement_group_count(0); - std::atomic failed_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count, - &failed_placement_group_count](const Status &status) { - if (status.ok()) { - ++finished_placement_group_count; - } else { - ++failed_placement_group_count; - } - }); - + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); - ASSERT_EQ(finished_placement_group_count, 0); - ASSERT_EQ(failed_placement_group_count, 0); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.clear(); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING); @@ -292,8 +286,6 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { gcs_placement_group_manager_->SchedulePendingPlacementGroups(); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); mock_placement_group_scheduler_->placement_groups_.clear(); - WaitForExpectedCount(finished_placement_group_count, 0); - WaitForExpectedCount(failed_placement_group_count, 1); // Make sure we can re-remove again. gcs_placement_group_manager_->RemovePlacementGroup( @@ -302,20 +294,17 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { auto request = Mocker::GenCreatePlacementGroupRequest(); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](const Status &status) { - if (status.ok()) { - ++finished_placement_group_count; - } - }); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); mock_placement_group_scheduler_->placement_groups_.pop_back(); - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); - WaitForExpectedCount(finished_placement_group_count, 1); + // We have ensured that this operation is synchronized. + OnPlacementGroupCreationSuccess(placement_group); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); const auto &placement_group_id = placement_group->GetPlacementGroupID(); @@ -332,7 +321,6 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { gcs_placement_group_manager_->SchedulePendingPlacementGroups(); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 0); mock_placement_group_scheduler_->placement_groups_.clear(); - ASSERT_EQ(finished_placement_group_count, 1); // Make sure we can re-remove again. gcs_placement_group_manager_->RemovePlacementGroup( @@ -341,19 +329,15 @@ TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) { auto request1 = Mocker::GenCreatePlacementGroupRequest(); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request1), - [&finished_placement_group_count](const Status &status) { - ++finished_placement_group_count; - }); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request1, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); auto request2 = Mocker::GenCreatePlacementGroupRequest(); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request2), - [&finished_placement_group_count](const Status &status) { - ++finished_placement_group_count; - }); - ASSERT_EQ(finished_placement_group_count, 0); + RegisterPlacementGroup(request2, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 2); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); placement_group->GetMutableBundle(0)->set_node_id(NodeID::FromRandom().Binary()); @@ -371,9 +355,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) { // Trigger scheduling `RESCHEDULING` placement group. auto finished_group = std::make_shared( placement_group->GetPlacementGroupTableData()); - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(finished_group); - WaitForExpectedCount(finished_placement_group_count, 1); - ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_.size(), 1); + OnPlacementGroupCreationSuccess(finished_group); + ASSERT_EQ(finished_group->GetState(), rpc::PlacementGroupTableData::CREATED); + WaitForExpectedPgCount(1); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups_[0]->GetPlacementGroupID(), placement_group->GetPlacementGroupID()); const auto &bundles = @@ -401,17 +385,16 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorDeadAndJobDead /* cpu_num */ 1.0, /* job_id */ job_id, /* actor_id */ actor_id); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](Status status) { - ++finished_placement_group_count; - }); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); auto placement_group_id = placement_group->GetPlacementGroupID(); - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); - WaitForExpectedCount(finished_placement_group_count, 1); + OnPlacementGroupCreationSuccess(placement_group); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); // When both job and actor is dead, placement group should be destroyed. EXPECT_CALL(*mock_placement_group_scheduler_, DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) @@ -435,17 +418,16 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorAndJobDead) { /* cpu_num */ 1.0, /* job_id */ job_id, /* actor_id */ actor_id); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](Status status) { - ++finished_placement_group_count; - }); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); auto placement_group_id = placement_group->GetPlacementGroupID(); - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); - WaitForExpectedCount(finished_placement_group_count, 1); + OnPlacementGroupCreationSuccess(placement_group); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); EXPECT_CALL(*mock_placement_group_scheduler_, DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) .Times(0); @@ -469,17 +451,16 @@ TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenOnlyJobDead) { /* cpu_num */ 1.0, /* job_id */ job_id, /* actor_id */ ActorID::Nil()); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](Status status) { - ++finished_placement_group_count; - }); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); auto placement_group_id = placement_group->GetPlacementGroupID(); - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); - WaitForExpectedCount(finished_placement_group_count, 1); + OnPlacementGroupCreationSuccess(placement_group); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); EXPECT_CALL(*mock_placement_group_scheduler_, DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) .Times(1); @@ -500,17 +481,16 @@ TEST_F(GcsPlacementGroupManagerTest, /* cpu_num */ 1.0, /* job_id */ job_id, /* actor_id */ ActorID::Nil()); - std::atomic finished_placement_group_count(0); - gcs_placement_group_manager_->RegisterPlacementGroup( - std::make_shared(request), - [&finished_placement_group_count](Status status) { - ++finished_placement_group_count; - }); + std::atomic registered_placement_group_count(0); + RegisterPlacementGroup(request, [®istered_placement_group_count](Status status) { + ++registered_placement_group_count; + }); + ASSERT_EQ(registered_placement_group_count, 1); WaitForExpectedPgCount(1); auto placement_group = mock_placement_group_scheduler_->placement_groups_.back(); auto placement_group_id = placement_group->GetPlacementGroupID(); - gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); - WaitForExpectedCount(finished_placement_group_count, 1); + OnPlacementGroupCreationSuccess(placement_group); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); // This shouldn't have been called. EXPECT_CALL(*mock_placement_group_scheduler_, DestroyPlacementGroupBundleResourcesIfExists(placement_group_id))