Revert "Revert "[Core] [Placement Group] Fix bundle reconstruction when raylet fo after gcs fo (#19452)" (#19724)" (#19736)

This reverts commit d453afbab8.
This commit is contained in:
SangBin Cho 2021-10-27 00:25:09 +09:00 committed by GitHub
parent aaef82920d
commit 00ea716ada
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 167 additions and 5 deletions

View file

@ -12,9 +12,9 @@ import ray.cluster_utils
import ray._private.gcs_utils as gcs_utils
from ray._private.test_utils import (
generate_system_config_map, kill_actor_and_wait_for_failure,
run_string_as_driver, wait_for_condition, get_error_message,
placement_group_assert_no_leak)
get_other_nodes, generate_system_config_map,
kill_actor_and_wait_for_failure, run_string_as_driver, wait_for_condition,
get_error_message, placement_group_assert_no_leak)
from ray.util.placement_group import get_current_placement_group
from ray.util.client.ray_client_helpers import connect_to_client_or_not
@ -755,5 +755,40 @@ def test_create_actor_with_placement_group_after_gcs_server_restart(
assert ray.get(actor_2.method.remote(1)) == 3
@pytest.mark.parametrize(
"ray_start_cluster_head", [
generate_system_config_map(
num_heartbeats_timeout=10, ping_gcs_rpc_server_max_retries=60)
],
indirect=True)
def test_bundle_recreated_when_raylet_fo_after_gcs_server_restart(
ray_start_cluster_head):
cluster = ray_start_cluster_head
cluster.add_node(num_cpus=2)
cluster.wait_for_nodes()
# Create one placement group and make sure its creation successfully.
placement_group = ray.util.placement_group([{"CPU": 2}])
ray.get(placement_group.ready(), timeout=10)
table = ray.util.placement_group_table(placement_group)
assert table["state"] == "CREATED"
# Restart gcs server.
cluster.head_node.kill_gcs_server()
cluster.head_node.start_gcs_server()
# Restart the raylet.
cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1])
cluster.add_node(num_cpus=2)
cluster.wait_for_nodes()
# Schedule an actor and make sure its creaton successfully.
actor = Increase.options(
placement_group=placement_group,
placement_group_bundle_index=0).remote()
assert ray.get(actor.method.remote(1), timeout=5) == 3
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -43,6 +43,11 @@ class MockGcsPlacementGroupSchedulerInterface
void, ReleaseUnusedBundles,
((const std::unordered_map<NodeID, std::vector<rpc::Bundle>> &node_to_bundles)),
(override));
MOCK_METHOD(void, Initialize,
((const std::unordered_map<
PlacementGroupID, std::vector<std::shared_ptr<BundleSpecification>>>
&group_to_bundles)),
(override));
};
} // namespace gcs

View file

@ -735,6 +735,8 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() {
void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) {
std::unordered_map<NodeID, std::vector<rpc::Bundle>> node_to_bundles;
std::unordered_map<PlacementGroupID, std::vector<std::shared_ptr<BundleSpecification>>>
group_to_bundles;
for (auto &item : gcs_init_data.PlacementGroups()) {
auto placement_group = std::make_shared<GcsPlacementGroup>(item.second);
if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) {
@ -755,6 +757,9 @@ void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) {
for (const auto &bundle : bundles) {
if (!NodeID::FromBinary(bundle.node_id()).IsNil()) {
node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle);
group_to_bundles[PlacementGroupID::FromBinary(
bundle.bundle_id().placement_group_id())]
.emplace_back(std::make_shared<BundleSpecification>(bundle));
}
}
}
@ -763,6 +768,7 @@ void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) {
// Notify raylets to release unused bundles.
gcs_placement_group_scheduler_->ReleaseUnusedBundles(node_to_bundles);
gcs_placement_group_scheduler_->Initialize(group_to_bundles);
SchedulePendingPlacementGroups();
}

View file

@ -503,6 +503,32 @@ void GcsPlacementGroupScheduler::ReleaseUnusedBundles(
}
}
void GcsPlacementGroupScheduler::Initialize(
const std::unordered_map<PlacementGroupID,
std::vector<std::shared_ptr<BundleSpecification>>>
&group_to_bundles) {
// We need to reinitialize the `committed_bundle_location_index_`, otherwise,
// it will get an empty bundle set when raylet fo occurred after GCS server restart.
// Init the container that contains the map relation between node and bundle.
auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
committed_bundle_location_index_.AddNodes(alive_nodes);
for (const auto &group : group_to_bundles) {
const auto &placement_group_id = group.first;
std::shared_ptr<BundleLocations> committed_bundle_locations =
std::make_shared<BundleLocations>();
for (const auto &bundle : group.second) {
if (!bundle->NodeId().IsNil()) {
committed_bundle_locations->emplace(bundle->BundleId(),
std::make_pair(bundle->NodeId(), bundle));
}
}
committed_bundle_location_index_.AddBundleLocations(placement_group_id,
committed_bundle_locations);
}
}
void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources(
const PlacementGroupID &placement_group_id) {
// Get the locations of prepared bundles.

View file

@ -90,6 +90,15 @@ class GcsPlacementGroupSchedulerInterface {
virtual void ReleaseUnusedBundles(
const std::unordered_map<NodeID, std::vector<rpc::Bundle>> &node_to_bundles) = 0;
/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param node_to_bundles Bundles used by each node.
virtual void Initialize(
const std::unordered_map<PlacementGroupID,
std::vector<std::shared_ptr<BundleSpecification>>>
&group_to_bundles) = 0;
virtual ~GcsPlacementGroupSchedulerInterface() {}
};
@ -452,6 +461,14 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
void ReleaseUnusedBundles(const std::unordered_map<NodeID, std::vector<rpc::Bundle>>
&node_to_bundles) override;
/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param node_to_bundles Bundles used by each node.
void Initialize(const std::unordered_map<
PlacementGroupID, std::vector<std::shared_ptr<BundleSpecification>>>
&group_to_bundles) override;
protected:
/// Send a bundle PREPARE request to a node. The PREPARE request will lock resources
/// on a node until COMMIT or CANCEL requests are sent to a node.

View file

@ -298,8 +298,6 @@ class GcsTableStorage {
object_table_ = std::make_unique<GcsObjectTable>(store_client_);
node_table_ = std::make_unique<GcsNodeTable>(store_client_);
node_resource_table_ = std::make_unique<GcsNodeResourceTable>(store_client_);
placement_group_schedule_table_ =
std::make_unique<GcsPlacementGroupScheduleTable>(store_client_);
placement_group_schedule_table_ =
std::make_unique<GcsPlacementGroupScheduleTable>(store_client_);
resource_usage_batch_table_ =

View file

@ -49,6 +49,11 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf
ReleaseUnusedBundles,
void(const std::unordered_map<NodeID, std::vector<rpc::Bundle>> &node_to_bundles));
MOCK_METHOD1(Initialize,
void(const std::unordered_map<
PlacementGroupID, std::vector<std::shared_ptr<BundleSpecification>>>
&group_to_bundles));
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> GetBundlesOnNode(
const NodeID &node_id) override {
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> bundles;
@ -130,6 +135,14 @@ class GcsPlacementGroupManagerTest : public ::testing::Test {
promise.get_future().get();
}
std::shared_ptr<GcsInitData> LoadDataFromDataStorage() {
auto gcs_init_data = std::make_shared<GcsInitData>(gcs_table_storage_);
std::promise<void> promise;
gcs_init_data->AsyncLoad([&promise] { promise.set_value(); });
promise.get_future().get();
return gcs_init_data;
}
void WaitForExpectedPgCount(int expected_count) {
auto condition = [this, expected_count]() {
return mock_placement_group_scheduler_->GetPlacementGroupCount() == expected_count;
@ -414,6 +427,36 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeDead) {
placement_group->GetPlacementGroupID());
}
TEST_F(GcsPlacementGroupManagerTest, TestSchedulerReinitializeAfterGcsRestart) {
// Create a placement group and make sure it has been created successfully.
auto request = Mocker::GenCreatePlacementGroupRequest();
std::atomic<int> registered_placement_group_count(0);
RegisterPlacementGroup(request, [&registered_placement_group_count](Status status) {
++registered_placement_group_count;
});
ASSERT_EQ(registered_placement_group_count, 1);
WaitForExpectedPgCount(1);
auto placement_group = mock_placement_group_scheduler_->placement_groups_.back();
placement_group->GetMutableBundle(0)->set_node_id(NodeID::FromRandom().Binary());
placement_group->GetMutableBundle(1)->set_node_id(NodeID::FromRandom().Binary());
mock_placement_group_scheduler_->placement_groups_.pop_back();
OnPlacementGroupCreationSuccess(placement_group);
ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED);
// Reinitialize the placement group manager and test the node dead case.
auto gcs_init_data = LoadDataFromDataStorage();
ASSERT_EQ(1, gcs_init_data->PlacementGroups().size());
EXPECT_TRUE(
gcs_init_data->PlacementGroups().find(placement_group->GetPlacementGroupID()) !=
gcs_init_data->PlacementGroups().end());
EXPECT_CALL(*mock_placement_group_scheduler_, ReleaseUnusedBundles(_)).Times(1);
EXPECT_CALL(
*mock_placement_group_scheduler_,
Initialize(testing::Contains(testing::Key(placement_group->GetPlacementGroupID()))))
.Times(1);
gcs_placement_group_manager_->Initialize(*gcs_init_data);
}
TEST_F(GcsPlacementGroupManagerTest, TestAutomaticCleanupWhenActorDeadAndJobDead) {
// Test the scenario where actor dead -> job dead.
const auto job_id = JobID::FromInt(1);

View file

@ -1071,6 +1071,38 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestReleaseUnusedBundles) {
ASSERT_EQ(1, raylet_clients_[0]->num_release_unused_bundles_requested);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestInitialize) {
auto node0 = Mocker::GenNodeInfo(0);
auto node1 = Mocker::GenNodeInfo(1);
AddNode(node0);
AddNode(node1);
ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size());
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto placement_group =
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request, "");
placement_group->GetMutableBundle(0)->set_node_id(node0->node_id());
placement_group->GetMutableBundle(1)->set_node_id(node1->node_id());
std::unordered_map<PlacementGroupID, std::vector<std::shared_ptr<BundleSpecification>>>
group_to_bundles;
group_to_bundles[placement_group->GetPlacementGroupID()].emplace_back(
std::make_shared<BundleSpecification>(*placement_group->GetMutableBundle(0)));
group_to_bundles[placement_group->GetPlacementGroupID()].emplace_back(
std::make_shared<BundleSpecification>(*placement_group->GetMutableBundle(1)));
scheduler_->Initialize(group_to_bundles);
auto bundles = scheduler_->GetBundlesOnNode(NodeID::FromBinary(node0->node_id()));
ASSERT_EQ(1, bundles.size());
ASSERT_EQ(1, bundles[placement_group->GetPlacementGroupID()].size());
ASSERT_EQ(0, bundles[placement_group->GetPlacementGroupID()][0]);
bundles = scheduler_->GetBundlesOnNode(NodeID::FromBinary(node1->node_id()));
ASSERT_EQ(1, bundles.size());
ASSERT_EQ(1, bundles[placement_group->GetPlacementGroupID()].size());
ASSERT_EQ(1, bundles[placement_group->GetPlacementGroupID()][0]);
}
} // namespace ray
int main(int argc, char **argv) {