[Placement Group] Leasing context refactoring part 2 (#10413)

* In progress.

* Refactoring done, but still failing tests.

* Fix issues.

* Addressed code review.

* Addressed code review.
This commit is contained in:
SangBin Cho 2020-08-31 15:54:34 -07:00 committed by GitHub
parent f3f698816d
commit a0c7907d88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 162 additions and 59 deletions

View file

@ -214,43 +214,26 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
return;
}
// If schedule success, the decision will be set as schedule_map[bundles[pos]]
// else will be set ClientID::Nil().
auto bundle_locations = std::make_shared<BundleLocations>();
// To count how many scheduler have been return, which include success and failure.
auto finished_count = std::make_shared<size_t>();
RAY_CHECK(
placement_group_leasing_in_progress_.emplace(placement_group->GetPlacementGroupID())
.second);
auto lease_status_tracker =
std::make_shared<LeaseStatusTracker>(placement_group, bundles);
RAY_CHECK(placement_group_leasing_in_progress_
.emplace(placement_group->GetPlacementGroupID(), lease_status_tracker)
.second);
/// TODO(AlisaWu): Change the strategy when reserve resource failed.
for (auto &bundle : bundles) {
const auto &bundle_id = bundle->BundleId();
const auto &node_id = selected_nodes[bundle_id];
RAY_CHECK(node_to_bundles_when_leasing_[node_id].emplace(bundle_id).second);
lease_status_tracker->MarkLeaseStarted(node_id, bundle);
// TODO(sang): The callback might not be called at all if nodes are dead. We should
// handle this case properly.
ReserveResourceFromNode(
bundle, gcs_node_manager_.GetNode(node_id),
[this, bundle_id, bundle, bundles, node_id, placement_group, bundle_locations,
finished_count, failure_callback, success_callback](const Status &status) {
auto leasing_bundles = node_to_bundles_when_leasing_.find(node_id);
RAY_CHECK(leasing_bundles != node_to_bundles_when_leasing_.end());
auto bundle_iter = leasing_bundles->second.find(bundle->BundleId());
RAY_CHECK(bundle_iter != leasing_bundles->second.end());
// Remove the bundle from the leasing map as the reply is returned from the
// remote node.
leasing_bundles->second.erase(bundle_iter);
if (leasing_bundles->second.empty()) {
node_to_bundles_when_leasing_.erase(leasing_bundles);
}
if (status.ok()) {
(*bundle_locations)[bundle_id] = std::make_pair(node_id, bundle);
}
if (++(*finished_count) == bundles.size()) {
OnAllBundleSchedulingRequestReturned(placement_group, bundles,
bundle_locations, failure_callback,
[this, bundle, node_id, lease_status_tracker, failure_callback,
success_callback](const Status &status) {
lease_status_tracker->MarkLeaseReturned(node_id, bundle, status);
if (lease_status_tracker->IsAllLeaseRequestReturned()) {
OnAllBundleSchedulingRequestReturned(lease_status_tracker, failure_callback,
success_callback);
}
});
@ -281,7 +264,7 @@ void GcsPlacementGroupScheduler::MarkScheduleCancelled(
const PlacementGroupID &placement_group_id) {
auto it = placement_group_leasing_in_progress_.find(placement_group_id);
RAY_CHECK(it != placement_group_leasing_in_progress_.end());
placement_group_leasing_in_progress_.erase(it);
it->second->MarkPlacementGroupScheduleCancelled();
}
void GcsPlacementGroupScheduler::ReserveResourceFromNode(
@ -350,19 +333,21 @@ GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_a
}
void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned(
const std::shared_ptr<GcsPlacementGroup> &placement_group,
const std::vector<std::shared_ptr<BundleSpecification>> &bundles,
const std::shared_ptr<BundleLocations> &bundle_locations,
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_success_handler) {
RAY_CHECK(lease_status_tracker->IsAllLeaseRequestReturned())
<< "This method can be called only after all bundle scheduling requests are "
"returend.";
const auto &placement_group = lease_status_tracker->GetPlacementGroup();
const auto &bundles = lease_status_tracker->GetUnplacedBundles();
const auto &bundle_locations = lease_status_tracker->GetBundleLocations();
const auto &placement_group_id = placement_group->GetPlacementGroupID();
bundle_location_index_.AddBundleLocations(placement_group_id, bundle_locations);
if (placement_group_leasing_in_progress_.find(placement_group_id) ==
placement_group_leasing_in_progress_.end() ||
bundle_locations->size() != bundles.size()) {
if (!lease_status_tracker->IsLeasingSucceed()) {
// If the lease request has been already cancelled
// or not every lease request succeeds.
DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
@ -388,12 +373,9 @@ void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned(
->set_node_id(location.first.Binary());
}
}
// Erase leasing in progress placement group.
// This could've been removed if the leasing request is cancelled already.
auto it = placement_group_leasing_in_progress_.find(placement_group_id);
if (it != placement_group_leasing_in_progress_.end()) {
placement_group_leasing_in_progress_.erase(it);
}
RAY_CHECK(it != placement_group_leasing_in_progress_.end());
placement_group_leasing_in_progress_.erase(it);
}
std::unique_ptr<ScheduleContext> GcsPlacementGroupScheduler::GetScheduleContext(
@ -522,5 +504,84 @@ void BundleLocationIndex::AddNodes(
}
}
LeaseStatusTracker::LeaseStatusTracker(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::vector<std::shared_ptr<BundleSpecification>> &unplaced_bundles)
: placement_group_(placement_group), unplaced_bundles_(unplaced_bundles) {
bundle_locations_ = std::make_shared<BundleLocations>();
}
bool LeaseStatusTracker::MarkLeaseStarted(const ClientID &node_id,
std::shared_ptr<BundleSpecification> bundle) {
const auto &bundle_id = bundle->BundleId();
return node_to_bundles_when_leasing_[node_id].emplace(bundle_id).second;
}
void LeaseStatusTracker::MarkLeaseReturned(
const ClientID &node_id, const std::shared_ptr<BundleSpecification> bundle,
const Status &status) {
RAY_CHECK(returned_count_ <= unplaced_bundles_.size());
auto leasing_bundles = node_to_bundles_when_leasing_.find(node_id);
RAY_CHECK(leasing_bundles != node_to_bundles_when_leasing_.end());
auto bundle_iter = leasing_bundles->second.find(bundle->BundleId());
RAY_CHECK(bundle_iter != leasing_bundles->second.end());
// Remove the bundle from the leasing map as the reply is returned from the
// remote node.
leasing_bundles->second.erase(bundle_iter);
if (leasing_bundles->second.empty()) {
node_to_bundles_when_leasing_.erase(leasing_bundles);
}
// If the request succeeds, record it.
const auto &bundle_id = bundle->BundleId();
if (status.ok()) {
bundle_locations_->emplace(bundle_id, std::make_pair(node_id, bundle));
}
returned_count_ += 1;
// If every bundle lease request is returned, mark it as all returned.
if (IsAllLeaseRequestReturned()) {
UpdateLeasingState(LeasingState::ALL_RETURNED);
}
}
bool LeaseStatusTracker::IsAllLeaseRequestReturned() const {
return returned_count_ == unplaced_bundles_.size();
}
bool LeaseStatusTracker::IsLeasingSucceed() const {
return IsAllLeaseRequestReturned() &&
(bundle_locations_->size() == unplaced_bundles_.size()) &&
(leasing_state_ != LeasingState::CANCELLED);
}
const std::shared_ptr<GcsPlacementGroup> &LeaseStatusTracker::GetPlacementGroup() const {
return placement_group_;
}
const std::shared_ptr<BundleLocations> &LeaseStatusTracker::GetBundleLocations() const {
return bundle_locations_;
}
const std::vector<std::shared_ptr<BundleSpecification>>
&LeaseStatusTracker::GetUnplacedBundles() const {
return unplaced_bundles_;
}
const LeasingState LeaseStatusTracker::GetLeasingState() const { return leasing_state_; }
void LeaseStatusTracker::MarkPlacementGroupScheduleCancelled() {
UpdateLeasingState(LeasingState::CANCELLED);
}
bool LeaseStatusTracker::UpdateLeasingState(LeasingState leasing_state) {
// If the lease was cancelled, we cannot update the state.
if (leasing_state_ == LeasingState::CANCELLED) {
return false;
}
leasing_state_ = leasing_state;
return true;
}
} // namespace gcs
} // namespace ray

View file

@ -131,17 +131,68 @@ class GcsStrictSpreadStrategy : public GcsScheduleStrategy {
const std::unique_ptr<ScheduleContext> &context) override;
};
enum class LeasingState {
// TODO(sang): Use prepare and commit instead for 2PC.
/// The phase where lease requests haven't been returned.
SCHEDULING,
/// The phase where lease requests have returned
ALL_RETURNED,
/// Placement group has been removed, and this leasing is not valid.
CANCELLED
};
/// A data structure that encapsulates information regarding bundle resource leasing
/// status.
class LeasingContext {
// TODO(sang): Implement in the next PR.
class LeaseStatusTracker {
public:
LeaseStatusTracker(std::shared_ptr<GcsPlacementGroup> placement_group,
std::vector<std::shared_ptr<BundleSpecification>> &unplaced_bundles);
~LeaseStatusTracker() = default;
bool MarkLeaseStarted(const ClientID &node_id,
std::shared_ptr<BundleSpecification> bundle);
void MarkLeaseReturned(const ClientID &node_id,
std::shared_ptr<BundleSpecification> bundle,
const Status &status);
bool IsAllLeaseRequestReturned() const;
bool IsLeasingSucceed() const;
const std::shared_ptr<GcsPlacementGroup> &GetPlacementGroup() const;
const std::vector<std::shared_ptr<BundleSpecification>> &GetUnplacedBundles() const;
const std::shared_ptr<BundleLocations> &GetBundleLocations() const;
const LeasingState GetLeasingState() const;
void MarkPlacementGroupScheduleCancelled();
private:
/// Method to update leasing states.
///
/// \param leasing_state The state to update.
/// \return True if succeeds to update. False otherwise.
bool UpdateLeasingState(LeasingState leasing_state);
/// Placement group of which this leasing context is associated with.
std::shared_ptr<GcsPlacementGroup> placement_group_;
/// Location of bundles that lease requests were sent.
/// If schedule success, the decision will be set as schedule_map[bundles[pos]]
/// else will be set ClientID::Nil().
std::shared_ptr<BundleLocations> bundle_locations_;
/// Number of lease requests that are returned.
size_t returned_count_ = 0;
/// The leasing stage. This is used to know the state of current leasing context.
LeasingState leasing_state_ = LeasingState::SCHEDULING;
/// Map from node ID to the set of bundles for whom we are trying to acquire a lease
/// from that node. This is needed so that we can retry lease requests from the node
/// until we receive a reply or the node is removed.
/// TODO(sang): We don't currently handle retry.
absl::flat_hash_map<ClientID, absl::flat_hash_set<BundleID>>
node_to_bundles_when_leasing_;
/// Unplaced bundle specification for this leasing context.
std::vector<std::shared_ptr<BundleSpecification>> unplaced_bundles_;
};
/// A data structure that helps fast bundle location lookup.
class BundleLocationIndex {
public:
BundleLocationIndex() {}
~BundleLocationIndex() {}
~BundleLocationIndex() = default;
/// Add bundle locations to index.
///
@ -265,9 +316,7 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
const rpc::Address &raylet_address);
void OnAllBundleSchedulingRequestReturned(
const std::shared_ptr<GcsPlacementGroup> &placement_group,
const std::vector<std::shared_ptr<BundleSpecification>> &bundles,
const std::shared_ptr<BundleLocations> &bundle_locations,
const std::shared_ptr<LeaseStatusTracker> &lease_status_tracker,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
&schedule_failure_handler,
const std::function<void(std::shared_ptr<GcsPlacementGroup>)>
@ -296,19 +345,13 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// A vector to store all the schedule strategy.
std::vector<std::shared_ptr<GcsScheduleStrategy>> scheduler_strategies_;
/// Map from node ID to the set of bundles for whom we are trying to acquire a lease
/// from that node. This is needed so that we can retry lease requests from the node
/// until we receive a reply or the node is removed.
/// TODO(sang): We don't currently handle retry.
absl::flat_hash_map<ClientID, absl::flat_hash_set<BundleID>>
node_to_bundles_when_leasing_;
/// Index to lookup bundle locations of node or placement group.
BundleLocationIndex bundle_location_index_;
/// Set of placement group that have lease requests in flight to nodes.
/// It is required to know if placement group has been removed or not.
absl::flat_hash_set<PlacementGroupID> placement_group_leasing_in_progress_;
/// Index to lookup bundle locations of node or placement group.
BundleLocationIndex bundle_location_index_;
absl::flat_hash_map<PlacementGroupID, std::shared_ptr<LeaseStatusTracker>>
placement_group_leasing_in_progress_;
};
} // namespace gcs

View file

@ -502,7 +502,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) {
auto bundles_on_node1 =
scheduler_->GetBundlesOnNode(ClientID::FromBinary(node1->node_id()));
ASSERT_EQ(1, bundles_on_node1.size());
// One node is dead, reschedule the placement group.
auto bundle_on_dead_node = placement_group->GetMutableBundle(0);
bundle_on_dead_node->clear_node_id();