[Placement Group]Placement Group supports gcs failover(Part2) (#12003)

* add testcase

* fix ut

* fix review comment

* fix review comment

* fix review comments

* fix ut bug

* add part code

* add part code

* add part code

* add testcase

* add part code

* fix ut bug

* fix ut timeout bug

* fix ut bug

Co-authored-by: 灵洵 <fengbin.ffb@antgroup.com>
This commit is contained in:
fangfengbin 2020-11-18 10:59:26 +08:00 committed by GitHub
parent c476037c97
commit f400333841
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 178 additions and 4 deletions

View file

@ -1178,7 +1178,7 @@ def test_create_placement_group_after_gcs_server_restarts(
# Create placement group 1 successfully.
placement_group1 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}])
ray.get(placement_group1.ready(), timeout=2)
ray.get(placement_group1.ready(), timeout=10)
table = ray.util.placement_group_table(placement_group1)
assert table["state"] == "CREATED"
@ -1188,7 +1188,7 @@ def test_create_placement_group_after_gcs_server_restarts(
# Create placement group 2 successfully.
placement_group2 = ray.util.placement_group([{"CPU": 1}, {"CPU": 1}])
ray.get(placement_group2.ready(), timeout=2)
ray.get(placement_group2.ready(), timeout=10)
table = ray.util.placement_group_table(placement_group2)
assert table["state"] == "CREATED"
@ -1253,7 +1253,7 @@ def test_create_placement_group_during_gcs_server_restart(
cluster.head_node.start_gcs_server()
for i in range(0, 10):
ray.get(placement_groups[i].ready(), timeout=2)
ray.get(placement_groups[i].ready())
if __name__ == "__main__":

View file

@ -470,6 +470,7 @@ void GcsPlacementGroupManager::LoadInitialData(const EmptyCallback &done) {
auto callback = [this,
done](const std::unordered_map<PlacementGroupID,
rpc::PlacementGroupTableData> &result) {
std::unordered_map<NodeID, std::vector<rpc::Bundle>> node_to_bundles;
for (auto &item : result) {
auto placement_group = std::make_shared<GcsPlacementGroup>(item.second);
if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) {
@ -479,9 +480,20 @@ void GcsPlacementGroupManager::LoadInitialData(const EmptyCallback &done) {
item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) {
pending_placement_groups_.emplace_back(std::move(placement_group));
}
if (item.second.state() == rpc::PlacementGroupTableData::CREATED ||
item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) {
const auto &bundles = item.second.bundles();
for (auto &bundle : bundles) {
node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle);
}
}
}
}
// Notify raylets to release unused bundles.
gcs_placement_group_scheduler_->ReleaseUnusedBundles(node_to_bundles);
SchedulePendingPlacementGroups();
RAY_LOG(INFO) << "Finished loading initial data.";
done();

View file

@ -204,6 +204,17 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
std::shared_ptr<GcsPlacementGroup> placement_group,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> failure_callback,
std::function<void(std::shared_ptr<GcsPlacementGroup>)> success_callback) {
// We need to ensure that the PrepareBundleResources won't be sent before the reply of
// ReleaseUnusedBundles is returned.
if (!nodes_of_releasing_unused_bundles_.empty()) {
RAY_LOG(INFO) << "Failed to schedule placement group " << placement_group->GetName()
<< ", id: " << placement_group->GetPlacementGroupID() << ", because "
<< nodes_of_releasing_unused_bundles_.size()
<< " nodes have not released unused bundles.";
failure_callback(placement_group);
return;
}
auto bundles = placement_group->GetUnplacedBundles();
auto strategy = placement_group->GetStrategy();
@ -556,6 +567,34 @@ GcsPlacementGroupScheduler::GetBundlesOnNode(const NodeID &node_id) {
return bundles_on_node;
}
void GcsPlacementGroupScheduler::ReleaseUnusedBundles(
const std::unordered_map<NodeID, std::vector<rpc::Bundle>> &node_to_bundles) {
// The purpose of this function is to release bundles that may be leaked.
// When GCS restarts, it doesn't know which bundles it has scheduled in the
// previous lifecycle. In this case, GCS will send a list of bundle ids that
// are still needed. And Raylet will release other bundles. If the node is
// dead, there is no need to send the request of release unused bundles.
const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
for (const auto &alive_node : alive_nodes) {
const auto &node_id = alive_node.first;
nodes_of_releasing_unused_bundles_.insert(node_id);
auto lease_client = GetLeaseClientFromNode(alive_node.second);
auto release_unused_bundles_callback =
[this, node_id](const Status &status,
const rpc::ReleaseUnusedBundlesReply &reply) {
nodes_of_releasing_unused_bundles_.erase(node_id);
};
auto iter = node_to_bundles.find(alive_node.first);
// When GCS restarts, some nodes maybe do not have bundles.
// In this case, GCS will send an empty list.
auto bundles_in_use =
iter != node_to_bundles.end() ? iter->second : std::vector<rpc::Bundle>{};
lease_client->ReleaseUnusedBundles(bundles_in_use, release_unused_bundles_callback);
}
}
void BundleLocationIndex::AddBundleLocations(
const PlacementGroupID &placement_group_id,
std::shared_ptr<BundleLocations> bundle_locations) {

View file

@ -74,6 +74,12 @@ class GcsPlacementGroupSchedulerInterface {
/// \param placement_group_id The placement group id scheduling is in progress.
virtual void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) = 0;
/// Notify raylets to release unused bundles.
///
/// \param node_to_bundles Bundles used by each node.
virtual void ReleaseUnusedBundles(
const std::unordered_map<NodeID, std::vector<rpc::Bundle>> &node_to_bundles) = 0;
virtual ~GcsPlacementGroupSchedulerInterface() {}
};
@ -385,6 +391,12 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> GetBundlesOnNode(
const NodeID &node_id) override;
/// Notify raylets to release unused bundles.
///
/// \param node_to_bundles Bundles used by each node.
void ReleaseUnusedBundles(const std::unordered_map<NodeID, std::vector<rpc::Bundle>>
&node_to_bundles) override;
protected:
/// Send a bundle PREPARE request to a node. The PREPARE request will lock resources
/// on a node until COMMIT or CANCEL requests are sent to a node.
@ -477,6 +489,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// Set of placement group that have lease requests in flight to nodes.
absl::flat_hash_map<PlacementGroupID, std::shared_ptr<LeaseStatusTracker>>
placement_group_leasing_in_progress_;
/// The nodes which are releasing unused bundles.
absl::flat_hash_set<NodeID> nodes_of_releasing_unused_bundles_;
};
} // namespace gcs

View file

@ -40,6 +40,10 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf
MOCK_METHOD1(MarkScheduleCancelled, void(const PlacementGroupID &placement_group_id));
MOCK_METHOD1(
ReleaseUnusedBundles,
void(const std::unordered_map<NodeID, std::vector<rpc::Bundle>> &node_to_bundles));
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> GetBundlesOnNode(
const NodeID &node_id) override {
absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>> bundles;

View file

@ -1014,6 +1014,13 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPGCancelledDuringReschedulingCommitPr
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestReleaseUnusedBundles) {
SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::SPREAD);
std::unordered_map<NodeID, std::vector<rpc::Bundle>> node_to_bundle;
scheduler_->ReleaseUnusedBundles(node_to_bundle);
ASSERT_EQ(1, raylet_clients_[0]->num_release_unused_bundles_requested);
}
} // namespace ray
int main(int argc, char **argv) {

View file

@ -183,6 +183,12 @@ struct GcsServerMocker {
return_callbacks.push_back(callback);
}
void ReleaseUnusedBundles(
const std::vector<rpc::Bundle> &bundles_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback) override {
++num_release_unused_bundles_requested;
}
// Trigger reply to PrepareBundleResources.
bool GrantPrepareBundleResources(bool success = true) {
Status status = Status::OK();
@ -231,6 +237,7 @@ struct GcsServerMocker {
int num_lease_requested = 0;
int num_return_requested = 0;
int num_commit_requested = 0;
int num_release_unused_bundles_requested = 0;
NodeID node_id = NodeID::FromRandom();
std::list<rpc::ClientCallback<rpc::PrepareBundleResourcesReply>> lease_callbacks = {};
std::list<rpc::ClientCallback<rpc::CommitBundleResourcesReply>> commit_callbacks = {};

View file

@ -148,6 +148,13 @@ message RequestObjectSpillageReply {
bool success = 1;
}
message ReleaseUnusedBundlesRequest {
repeated Bundle bundles_in_use = 1;
}
message ReleaseUnusedBundlesReply {
}
// Service for inter-node-manager communication.
service NodeManagerService {
// Request a worker from the raylet.
@ -186,4 +193,10 @@ service NodeManagerService {
// Ask the raylet to spill an object to external storage.
rpc RequestObjectSpillage(RequestObjectSpillageRequest)
returns (RequestObjectSpillageReply);
// This method is only used by GCS, and the purpose is to release bundles
// that may be leaked. When GCS restarts, it doesn't know which bundles it has leased
// in the previous lifecycle. In this case, GCS will send a list of bundles that
// are still needed. And Raylet will release other bundles.
rpc ReleaseUnusedBundles(ReleaseUnusedBundlesRequest)
returns (ReleaseUnusedBundlesReply);
}

View file

@ -558,6 +558,36 @@ void NodeManager::HandleRequestObjectSpillage(
});
}
void NodeManager::HandleReleaseUnusedBundles(
const rpc::ReleaseUnusedBundlesRequest &request,
rpc::ReleaseUnusedBundlesReply *reply, rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(!new_scheduler_enabled_) << "Not implemented";
RAY_LOG(DEBUG) << "Releasing unused bundles.";
std::unordered_set<BundleID, pair_hash> in_use_bundles;
for (int index = 0; index < request.bundles_in_use_size(); ++index) {
const auto &bundle_id = request.bundles_in_use(index).bundle_id();
in_use_bundles.emplace(
std::make_pair(PlacementGroupID::FromBinary(bundle_id.placement_group_id()),
bundle_id.bundle_index()));
}
// TODO(ffbin): Kill all workers that are currently associated with the unused bundles.
// At present, the worker does not have a bundle ID, so we cannot filter out the workers
// used by unused bundles. We will solve it in next pr.
// Return unused bundle resources.
for (auto iter = bundle_spec_map_.begin(); iter != bundle_spec_map_.end();) {
if (0 == in_use_bundles.count(iter->first)) {
ReturnBundleResources(*iter->second);
bundle_spec_map_.erase(iter++);
} else {
iter++;
}
}
send_reply_callback(Status::OK(), nullptr, nullptr);
}
// TODO(edoakes): this function is problematic because it both sends warnings spuriously
// under normal conditions and sometimes doesn't send a warning under actual deadlock
// conditions. The current logic is to push a warning when: all running tasks are
@ -1986,6 +2016,8 @@ bool NodeManager::PrepareBundle(
// Register bundle state.
bundle_state->state = CommitState::PREPARED;
bundle_state_map_.emplace(bundle_id, bundle_state);
bundle_spec_map_.emplace(
bundle_id, std::make_shared<BundleSpecification>(bundle_spec.GetMessage()));
}
return bundle_state->acquired_resources.AvailableResources().size() > 0;
}

View file

@ -645,6 +645,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
rpc::RequestObjectSpillageReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `ReleaseUnusedBundles` request.
void HandleReleaseUnusedBundles(const rpc::ReleaseUnusedBundlesRequest &request,
rpc::ReleaseUnusedBundlesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Trigger global GC across the cluster to free up references to actors or
/// object ids.
void TriggerGlobalGC();
@ -816,6 +821,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// creation.
absl::flat_hash_map<BundleID, std::shared_ptr<BundleState>, pair_hash>
bundle_state_map_;
/// Save `BundleSpecification` for cleaning leaked bundles after GCS restart.
absl::flat_hash_map<BundleID, std::shared_ptr<BundleSpecification>, pair_hash>
bundle_spec_map_;
};
} // namespace raylet

View file

@ -402,6 +402,25 @@ void raylet::RayletClient::CancelResourceReserve(
grpc_client_->CancelResourceReserve(request, callback);
}
void raylet::RayletClient::ReleaseUnusedBundles(
const std::vector<rpc::Bundle> &bundles_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback) {
rpc::ReleaseUnusedBundlesRequest request;
for (auto &bundle : bundles_in_use) {
request.add_bundles_in_use()->CopyFrom(bundle);
}
grpc_client_->ReleaseUnusedBundles(
request,
[callback](const Status &status, const rpc::ReleaseUnusedBundlesReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING)
<< "Error releasing bundles from raylet, the raylet may have died:"
<< status;
}
callback(status, reply);
});
}
void raylet::RayletClient::PinObjectIDs(
const rpc::Address &caller_address, const std::vector<ObjectID> &object_ids,
const rpc::ClientCallback<rpc::PinObjectIDsReply> &callback) {

View file

@ -118,6 +118,10 @@ class ResourceReserveInterface {
BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::CancelResourceReserveReply> &callback) = 0;
virtual void ReleaseUnusedBundles(
const std::vector<rpc::Bundle> &bundles_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback) = 0;
virtual ~ResourceReserveInterface(){};
};
@ -380,6 +384,11 @@ class RayletClient : public PinObjectsInterface,
const ray::rpc::ClientCallback<ray::rpc::CancelResourceReserveReply> &callback)
override;
/// Implements ReleaseUnusedBundlesInterface.
void ReleaseUnusedBundles(
const std::vector<rpc::Bundle> &bundles_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedBundlesReply> &callback) override;
void PinObjectIDs(
const rpc::Address &caller_address, const std::vector<ObjectID> &object_ids,
const ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply> &callback) override;

View file

@ -100,6 +100,9 @@ class NodeManagerWorkerClient
/// Ask the raylet to spill an object to external storage.
VOID_RPC_CLIENT_METHOD(NodeManagerService, RequestObjectSpillage, grpc_client_, )
/// Release unused bundles.
VOID_RPC_CLIENT_METHOD(NodeManagerService, ReleaseUnusedBundles, grpc_client_, )
private:
/// Constructor.
///

View file

@ -35,7 +35,8 @@ namespace rpc {
RPC_SERVICE_HANDLER(NodeManagerService, PrepareBundleResources) \
RPC_SERVICE_HANDLER(NodeManagerService, CommitBundleResources) \
RPC_SERVICE_HANDLER(NodeManagerService, CancelResourceReserve) \
RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage)
RPC_SERVICE_HANDLER(NodeManagerService, RequestObjectSpillage) \
RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedBundles)
/// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`.
class NodeManagerServiceHandler {
@ -100,6 +101,10 @@ class NodeManagerServiceHandler {
virtual void HandleRequestObjectSpillage(const RequestObjectSpillageRequest &request,
RequestObjectSpillageReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleReleaseUnusedBundles(const ReleaseUnusedBundlesRequest &request,
ReleaseUnusedBundlesReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `NodeManagerService`.