Revert "[Placement Group] Make placement group prepare resource rpc r… (#21144)

This PR makes pg_test_2 flaky. cc @clay4444 can you re-merge it?
This commit is contained in:
SangBin Cho 2021-12-17 17:13:26 +09:00 committed by GitHub
parent 0e7c0b491b
commit 02465a6792
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 97 additions and 286 deletions

View file

@ -62,7 +62,7 @@ class MockResourceReserveInterface : public ResourceReserveInterface {
public: public:
MOCK_METHOD( MOCK_METHOD(
void, PrepareBundleResources, void, PrepareBundleResources,
(const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs, (const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback), const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback),
(override)); (override));
MOCK_METHOD( MOCK_METHOD(
@ -146,7 +146,7 @@ class MockRayletClientInterface : public RayletClientInterface {
(override)); (override));
MOCK_METHOD( MOCK_METHOD(
void, PrepareBundleResources, void, PrepareBundleResources,
(const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs, (const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback), const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback),
(override)); (override));
MOCK_METHOD( MOCK_METHOD(

View file

@ -126,13 +126,4 @@ std::string GetOriginalResourceName(const std::string &resource) {
return resource.substr(0, idx); return resource.substr(0, idx);
} }
std::string GetDebugStringForBundles(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles) {
std::ostringstream debug_info;
for (const auto &bundle : bundles) {
debug_info << "{" << bundle->DebugString() << "},";
}
return debug_info.str();
};
} // namespace ray } // namespace ray

View file

@ -108,8 +108,4 @@ bool IsBundleIndex(const std::string &resource, const PlacementGroupID &group_id
/// Return the original resource name of the placement group resource. /// Return the original resource name of the placement group resource.
std::string GetOriginalResourceName(const std::string &resource); std::string GetOriginalResourceName(const std::string &resource);
/// Generate debug information of given bundles.
std::string GetDebugStringForBundles(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles);
} // namespace ray } // namespace ray

View file

@ -17,31 +17,6 @@
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h" #include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include "src/ray/protobuf/gcs.pb.h" #include "src/ray/protobuf/gcs.pb.h"
namespace {
using ray::BundleSpecification;
using ray::NodeID;
// Get a set of bundle specifications grouped by the node.
std::unordered_map<NodeID, std::vector<std::shared_ptr<const BundleSpecification>>>
GetUnplacedBundlesPerNode(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles,
const ray::gcs::ScheduleMap &selected_nodes) {
std::unordered_map<NodeID, std::vector<std::shared_ptr<const BundleSpecification>>>
node_to_bundles;
for (const auto &bundle : bundles) {
const auto &bundle_id = bundle->BundleId();
const auto &iter = selected_nodes.find(bundle_id);
RAY_CHECK(iter != selected_nodes.end());
if (node_to_bundles.find(iter->second) == node_to_bundles.end()) {
node_to_bundles[iter->second] = {};
}
node_to_bundles[iter->second].push_back(bundle);
}
return node_to_bundles;
}
} // namespace
namespace ray { namespace ray {
namespace gcs { namespace gcs {
@ -194,24 +169,18 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
.emplace(placement_group->GetPlacementGroupID(), lease_status_tracker) .emplace(placement_group->GetPlacementGroupID(), lease_status_tracker)
.second); .second);
const auto &pending_bundles = GetUnplacedBundlesPerNode(bundles, selected_nodes); /// TODO(AlisaWu): Change the strategy when reserve resource failed.
for (const auto &node_to_bundles : pending_bundles) { for (const auto &bundle : bundles) {
const auto &node_id = node_to_bundles.first; const auto &bundle_id = bundle->BundleId();
const auto &bundles_per_node = node_to_bundles.second; const auto &node_id = selected_nodes[bundle_id];
for (const auto &bundle : bundles_per_node) { lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle);
lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle);
}
// TODO(sang): The callback might not be called at all if nodes are dead. We should // TODO(sang): The callback might not be called at all if nodes are dead. We should
// handle this case properly. // handle this case properly.
PrepareResources(bundles_per_node, gcs_node_manager_.GetAliveNode(node_id), PrepareResources(bundle, gcs_node_manager_.GetAliveNode(node_id),
[this, bundles_per_node, node_id, lease_status_tracker, [this, bundle, node_id, lease_status_tracker, failure_callback,
failure_callback, success_callback](const Status &status) { success_callback](const Status &status) {
for (const auto &bundle : bundles_per_node) { lease_status_tracker->MarkPrepareRequestReturned(node_id, bundle,
lease_status_tracker->MarkPrepareRequestReturned(node_id, bundle, status);
status);
}
if (lease_status_tracker->AllPrepareRequestsReturned()) { if (lease_status_tracker->AllPrepareRequestsReturned()) {
OnAllBundlePrepareRequestReturned( OnAllBundlePrepareRequestReturned(
lease_status_tracker, failure_callback, success_callback); lease_status_tracker, failure_callback, success_callback);
@ -244,7 +213,7 @@ void GcsPlacementGroupScheduler::MarkScheduleCancelled(
} }
void GcsPlacementGroupScheduler::PrepareResources( void GcsPlacementGroupScheduler::PrepareResources(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles, const std::shared_ptr<const BundleSpecification> &bundle,
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node, const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node,
const StatusCallback &callback) { const StatusCallback &callback) {
if (!node.has_value()) { if (!node.has_value()) {
@ -255,19 +224,18 @@ void GcsPlacementGroupScheduler::PrepareResources(
const auto lease_client = GetLeaseClientFromNode(node.value()); const auto lease_client = GetLeaseClientFromNode(node.value());
const auto node_id = NodeID::FromBinary(node.value()->node_id()); const auto node_id = NodeID::FromBinary(node.value()->node_id());
RAY_LOG(DEBUG) << "Preparing resource from node " << node_id RAY_LOG(DEBUG) << "Preparing resource from node " << node_id
<< " for bundles: " << GetDebugStringForBundles(bundles); << " for a bundle: " << bundle->DebugString();
lease_client->PrepareBundleResources( lease_client->PrepareBundleResources(
bundles, [node_id, bundles, callback]( *bundle, [node_id, bundle, callback](
const Status &status, const rpc::PrepareBundleResourcesReply &reply) { const Status &status, const rpc::PrepareBundleResourcesReply &reply) {
auto result = reply.success() ? Status::OK() auto result = reply.success() ? Status::OK()
: Status::IOError("Failed to reserve resource"); : Status::IOError("Failed to reserve resource");
if (result.ok()) { if (result.ok()) {
RAY_LOG(DEBUG) << "Finished leasing resource from " << node_id RAY_LOG(DEBUG) << "Finished leasing resource from " << node_id
<< " for bundles: " << GetDebugStringForBundles(bundles); << " for bundle: " << bundle->DebugString();
} else { } else {
RAY_LOG(DEBUG) << "Failed to lease resource from " << node_id RAY_LOG(DEBUG) << "Failed to lease resource from " << node_id
<< " for bundles: " << GetDebugStringForBundles(bundles); << " for bundle: " << bundle->DebugString();
} }
callback(result); callback(result);
}); });

View file

@ -469,16 +469,14 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
&group_to_bundles) override; &group_to_bundles) override;
protected: protected:
/// Send bundles PREPARE requests to a node. The PREPARE requests will lock resources /// Send a bundle PREPARE request to a node. The PREPARE request will lock resources
/// on a node until COMMIT or CANCEL requests are sent to a node. /// on a node until COMMIT or CANCEL requests are sent to a node.
/// NOTE: All of given bundles will be prepared on the same node. It is guaranteed that
/// all of bundles are atomically prepared on a given node.
/// ///
/// \param bundles Bundles to be scheduled on a node. /// \param bundle A bundle to schedule on a node.
/// \param node A node to prepare resources for a given bundle. /// \param node A node to prepare resources for a given bundle.
/// \param callback /// \param callback
void PrepareResources( void PrepareResources(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundles, const std::shared_ptr<const BundleSpecification> &bundle,
const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node, const absl::optional<std::shared_ptr<ray::rpc::GcsNodeInfo>> &node,
const StatusCallback &callback); const StatusCallback &callback);

View file

@ -159,11 +159,10 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
success_placement_groups_.emplace_back(std::move(placement_group)); success_placement_groups_.emplace_back(std::move(placement_group));
}); });
ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested); ASSERT_EQ(2, raylet_clients_[0]->num_lease_requested);
ASSERT_EQ(1, raylet_clients_[0]->lease_callbacks.size()); ASSERT_EQ(2, raylet_clients_[0]->lease_callbacks.size());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
// TODO(@clay4444): It should be updated to 1 after we make the commit request
// batched.
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
@ -198,6 +197,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler,
success_handler); success_handler);
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
@ -276,11 +276,12 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReplyFailure) {
success_placement_groups_.emplace_back(std::move(placement_group)); success_placement_groups_.emplace_back(std::move(placement_group));
}); });
ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested); ASSERT_EQ(2, raylet_clients_[0]->num_lease_requested);
ASSERT_EQ(1, raylet_clients_[0]->lease_callbacks.size()); ASSERT_EQ(2, raylet_clients_[0]->lease_callbacks.size());
// Reply failure, so the placement group scheduling failed. // Reply failure, so the placement group scheduling failed.
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false)); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false));
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false));
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
WaitPlacementGroupPendingDone(0, GcsPlacementGroupStatus::SUCCESS); WaitPlacementGroupPendingDone(0, GcsPlacementGroupStatus::SUCCESS);
@ -335,11 +336,12 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupReturnResource)
success_placement_groups_.emplace_back(std::move(placement_group)); success_placement_groups_.emplace_back(std::move(placement_group));
}); });
ASSERT_EQ(1, raylet_clients_[0]->num_lease_requested); ASSERT_EQ(2, raylet_clients_[0]->num_lease_requested);
ASSERT_EQ(1, raylet_clients_[0]->lease_callbacks.size()); ASSERT_EQ(2, raylet_clients_[0]->lease_callbacks.size());
// Failed to create these two bundles. // One bundle success and the other failed.
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false)); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources(false));
ASSERT_EQ(0, raylet_clients_[0]->num_return_requested); ASSERT_EQ(1, raylet_clients_[0]->num_return_requested);
// Reply the placement_group creation request, then the placement_group should be // Reply the placement_group creation request, then the placement_group should be
// scheduled successfully. // scheduled successfully.
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
@ -375,6 +377,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyBalancedScheduling)
++node_select_count[node_index]; ++node_select_count[node_index];
node_commit_count[node_index] += 2; node_commit_count[node_index] += 2;
ASSERT_TRUE(raylet_clients_[node_index]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[node_index]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[node_index]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[node_index]->commit_callbacks, 2); WaitPendingDone(raylet_clients_[node_index]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[node_index]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[node_index]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[node_index]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[node_index]->GrantCommitBundleResources());
@ -411,6 +414,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request, ""); auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request, "");
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
@ -426,6 +430,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) {
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request2, ""); std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request2, "");
scheduler_->ScheduleUnplacedBundles(placement_group2, failure_handler, success_handler); scheduler_->ScheduleUnplacedBundles(placement_group2, failure_handler, success_handler);
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
@ -455,6 +460,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) {
success_placement_groups_.emplace_back(std::move(placement_group)); success_placement_groups_.emplace_back(std::move(placement_group));
}); });
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
@ -471,11 +477,9 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) {
} }
TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) { TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) {
auto node0 = Mocker::GenNodeInfo(0); auto node = Mocker::GenNodeInfo();
auto node1 = Mocker::GenNodeInfo(1); AddNode(node);
AddNode(node0); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
AddNode(node1);
ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size());
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto placement_group = auto placement_group =
@ -499,18 +503,16 @@ TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) {
// Now, cancel the schedule request. // Now, cancel the schedule request.
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
scheduler_->MarkScheduleCancelled(placement_group_id); scheduler_->MarkScheduleCancelled(placement_group_id);
ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve()); ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_clients_[1]->GrantCancelResourceReserve());
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
} }
TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) { TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) {
auto node0 = Mocker::GenNodeInfo(0); auto node = Mocker::GenNodeInfo();
auto node1 = Mocker::GenNodeInfo(1); AddNode(node);
AddNode(node0); ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
AddNode(node1);
ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size());
auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto placement_group = auto placement_group =
@ -533,14 +535,13 @@ TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) {
// Now, cancel the schedule request. // Now, cancel the schedule request.
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
scheduler_->MarkScheduleCancelled(placement_group_id); scheduler_->MarkScheduleCancelled(placement_group_id);
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 1); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 2);
WaitPendingDone(raylet_clients_[1]->commit_callbacks, 1); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[1]->GrantCommitBundleResources());
ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve()); ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_clients_[1]->GrantCancelResourceReserve()); ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE);
} }
@ -567,16 +568,19 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyLargeBundlesScheduling) {
Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 15); Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::PACK, 15);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request, ""); auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request, "");
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
// Prepared resource is batched! RAY_CHECK(raylet_clients_[0]->num_lease_requested > 0);
ASSERT_TRUE(raylet_clients_[0]->num_lease_requested == 1); RAY_CHECK(raylet_clients_[1]->num_lease_requested > 0);
ASSERT_TRUE(raylet_clients_[1]->num_lease_requested == 1); for (int index = 0; index < raylet_clients_[0]->num_lease_requested; ++index) {
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); }
for (int index = 0; index < raylet_clients_[1]->num_lease_requested; ++index) {
ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources());
}
// Wait until all resources are prepared. // Wait until all resources are prepared.
WaitPendingDone(raylet_clients_[0]->commit_callbacks, WaitPendingDone(raylet_clients_[0]->commit_callbacks,
raylet_clients_[0]->num_prepared_bundle); raylet_clients_[0]->num_lease_requested);
WaitPendingDone(raylet_clients_[1]->commit_callbacks, WaitPendingDone(raylet_clients_[1]->commit_callbacks,
raylet_clients_[1]->num_prepared_bundle); raylet_clients_[1]->num_lease_requested);
for (int index = 0; index < raylet_clients_[0]->num_commit_requested; ++index) { for (int index = 0; index < raylet_clients_[0]->num_commit_requested; ++index) {
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
} }

View file

@ -175,11 +175,10 @@ struct GcsServerMocker {
/// ResourceReserveInterface /// ResourceReserveInterface
void PrepareBundleResources( void PrepareBundleResources(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs, const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback) const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback)
override { override {
num_lease_requested += 1; num_lease_requested += 1;
num_prepared_bundle = bundle_specs.size();
lease_callbacks.push_back(callback); lease_callbacks.push_back(callback);
} }
@ -300,8 +299,6 @@ struct GcsServerMocker {
int num_lease_requested = 0; int num_lease_requested = 0;
int num_return_requested = 0; int num_return_requested = 0;
int num_commit_requested = 0; int num_commit_requested = 0;
// TODO(@clay4444): Remove this once we make the commit rpc request batched!
int num_prepared_bundle = 0;
int num_release_unused_bundles_requested = 0; int num_release_unused_bundles_requested = 0;
std::list<rpc::ClientCallback<rpc::PrepareBundleResourcesReply>> lease_callbacks = {}; std::list<rpc::ClientCallback<rpc::PrepareBundleResourcesReply>> lease_callbacks = {};

View file

@ -83,26 +83,6 @@ struct Mocker {
return request; return request;
} }
static std::vector<std::shared_ptr<const BundleSpecification>> GenBundleSpecifications(
const PlacementGroupID &placement_group_id,
absl::flat_hash_map<std::string, double> &unit_resource, int bundles_size = 1) {
std::vector<std::shared_ptr<const BundleSpecification>> bundle_specs;
for (int i = 0; i < bundles_size; i++) {
rpc::Bundle bundle;
auto mutable_bundle_id = bundle.mutable_bundle_id();
// The bundle index is start from 1.
mutable_bundle_id->set_bundle_index(i + 1);
mutable_bundle_id->set_placement_group_id(placement_group_id.Binary());
auto mutable_unit_resources = bundle.mutable_unit_resources();
for (auto &resource : unit_resource) {
mutable_unit_resources->insert({resource.first, resource.second});
}
bundle_specs.emplace_back(std::make_shared<BundleSpecification>(bundle));
}
return bundle_specs;
}
// TODO(@clay4444): Remove this once we did the batch rpc request refactor.
static BundleSpecification GenBundleCreation( static BundleSpecification GenBundleCreation(
const PlacementGroupID &placement_group_id, const int bundle_index, const PlacementGroupID &placement_group_id, const int bundle_index,
absl::flat_hash_map<std::string, double> &unit_resource) { absl::flat_hash_map<std::string, double> &unit_resource) {

View file

@ -73,8 +73,8 @@ message RequestWorkerLeaseReply {
} }
message PrepareBundleResourcesRequest { message PrepareBundleResourcesRequest {
// Bundles that containing the requested resources. // Bundle containing the requested resources.
repeated Bundle bundle_specs = 1; Bundle bundle_spec = 1;
} }
message PrepareBundleResourcesReply { message PrepareBundleResourcesReply {

View file

@ -1620,14 +1620,11 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
void NodeManager::HandlePrepareBundleResources( void NodeManager::HandlePrepareBundleResources(
const rpc::PrepareBundleResourcesRequest &request, const rpc::PrepareBundleResourcesRequest &request,
rpc::PrepareBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { rpc::PrepareBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) {
std::vector<std::shared_ptr<const BundleSpecification>> bundle_specs; auto bundle_spec = BundleSpecification(request.bundle_spec());
for (int index = 0; index < request.bundle_specs_size(); index++) { RAY_LOG(DEBUG) << "Request to prepare bundle resources is received, "
bundle_specs.emplace_back( << bundle_spec.DebugString();
std::make_shared<BundleSpecification>(request.bundle_specs(index)));
} auto prepared = placement_group_resource_manager_->PrepareBundle(bundle_spec);
RAY_LOG(DEBUG) << "Request to prepare resources for bundles: "
<< GetDebugStringForBundles(bundle_specs);
auto prepared = placement_group_resource_manager_->PrepareBundles(bundle_specs);
reply->set_success(prepared); reply->set_success(prepared);
send_reply_callback(Status::OK(), nullptr, nullptr); send_reply_callback(Status::OK(), nullptr, nullptr);
} }

View file

@ -78,31 +78,6 @@ bool NewPlacementGroupResourceManager::PrepareBundle(
return true; return true;
} }
bool NewPlacementGroupResourceManager::PrepareBundles(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs) {
std::vector<std::shared_ptr<const BundleSpecification>> prepared_bundles;
for (const auto &bundle_spec : bundle_specs) {
if (PrepareBundle(*bundle_spec)) {
prepared_bundles.emplace_back(bundle_spec);
} else {
// Terminate the preparation phase if any of bundle cannot be prepared.
break;
}
}
if (prepared_bundles.size() != bundle_specs.size()) {
RAY_LOG(DEBUG) << "There are one or more bundles request resource failed, will "
"release the requested resources before.";
for (const auto &bundle : prepared_bundles) {
// `ReturnBundle` will return resource, erase from `pg_bundles_` and
// `bundle_spec_map_`.
ReturnBundle(*bundle);
}
return false;
}
return true;
}
void NewPlacementGroupResourceManager::CommitBundle( void NewPlacementGroupResourceManager::CommitBundle(
const BundleSpecification &bundle_spec) { const BundleSpecification &bundle_spec) {
auto it = pg_bundles_.find(bundle_spec.BundleId()); auto it = pg_bundles_.find(bundle_spec.BundleId());
@ -185,11 +160,6 @@ void NewPlacementGroupResourceManager::ReturnBundle(
} }
} }
pg_bundles_.erase(it); pg_bundles_.erase(it);
// Erase from `bundle_spec_map_`.
const auto &iter = bundle_spec_map_.find(bundle_spec.BundleId());
if (iter != bundle_spec_map_.end()) {
bundle_spec_map_.erase(iter);
}
delete_resources_(deleted); delete_resources_(deleted);
} }

View file

@ -50,29 +50,26 @@ struct BundleTransactionState {
/// about allocated for placement group bundles. /// about allocated for placement group bundles.
class PlacementGroupResourceManager { class PlacementGroupResourceManager {
public: public:
/// Prepare a list of bundles. It is guaranteed that all bundles are atomically /// Lock the required resources from local available resources. Note that this is phase
/// prepared. /// one of 2PC, it will not convert placement group resource(like CPU -> CPU_group_i).
///(e.g., if one of bundle cannot be prepared, all bundles are failed to be prepared)
/// ///
/// \param bundle_specs A set of bundles that waiting to be prepared. /// \param bundle_spec: Specification of bundle whose resources will be prepared.
/// \return bool True if all bundles successfully reserved resources, otherwise false. virtual bool PrepareBundle(const BundleSpecification &bundle_spec) = 0;
virtual bool PrepareBundles(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs) = 0;
/// Convert the required resources to placement group resources(like CPU -> /// Convert the required resources to placement group resources(like CPU ->
/// CPU_group_i). This is phase two of 2PC. /// CPU_group_i). This is phase two of 2PC.
/// ///
/// \param bundle_spec Specification of bundle whose resources will be commited. /// \param bundle_spec: Specification of bundle whose resources will be commited.
virtual void CommitBundle(const BundleSpecification &bundle_spec) = 0; virtual void CommitBundle(const BundleSpecification &bundle_spec) = 0;
/// Return back all the bundle resource. /// Return back all the bundle resource.
/// ///
/// \param bundle_spec Specification of bundle whose resources will be returned. /// \param bundle_spec: Specification of bundle whose resources will be returned.
virtual void ReturnBundle(const BundleSpecification &bundle_spec) = 0; virtual void ReturnBundle(const BundleSpecification &bundle_spec) = 0;
/// Return back all the bundle(which is unused) resource. /// Return back all the bundle(which is unused) resource.
/// ///
/// \param bundle_spec A set of bundles which in use. /// \param bundle_spec: A set of bundles which in use.
void ReturnUnusedBundle(const std::unordered_set<BundleID, pair_hash> &in_use_bundles); void ReturnUnusedBundle(const std::unordered_set<BundleID, pair_hash> &in_use_bundles);
virtual ~PlacementGroupResourceManager() {} virtual ~PlacementGroupResourceManager() {}
@ -101,8 +98,7 @@ class NewPlacementGroupResourceManager : public PlacementGroupResourceManager {
virtual ~NewPlacementGroupResourceManager() = default; virtual ~NewPlacementGroupResourceManager() = default;
bool PrepareBundles( bool PrepareBundle(const BundleSpecification &bundle_spec);
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs);
void CommitBundle(const BundleSpecification &bundle_spec); void CommitBundle(const BundleSpecification &bundle_spec);
@ -126,13 +122,6 @@ class NewPlacementGroupResourceManager : public PlacementGroupResourceManager {
/// truth for the new scheduler. /// truth for the new scheduler.
absl::flat_hash_map<BundleID, std::shared_ptr<BundleTransactionState>, pair_hash> absl::flat_hash_map<BundleID, std::shared_ptr<BundleTransactionState>, pair_hash>
pg_bundles_; pg_bundles_;
/// Lock the required resources from local available resources. Note that this is phase
/// one of 2PC, it will not convert placement group resource(like CPU -> CPU_group_i).
///
/// \param bundle_spec Specification of bundle whose resources will be prepared.
/// \return bool True if the bundle successfully reserved resources, otherwise false.
bool PrepareBundle(const BundleSpecification &bundle_spec);
}; };
} // namespace raylet } // namespace raylet

View file

@ -31,7 +31,6 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
public: public:
std::unique_ptr<raylet::NewPlacementGroupResourceManager> std::unique_ptr<raylet::NewPlacementGroupResourceManager>
new_placement_group_resource_manager_; new_placement_group_resource_manager_;
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
std::unique_ptr<gcs::MockGcsClient> gcs_client_; std::unique_ptr<gcs::MockGcsClient> gcs_client_;
rpc::GcsNodeInfo node_info_; rpc::GcsNodeInfo node_info_;
void SetUp() { void SetUp() {
@ -41,7 +40,7 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
} }
void InitLocalAvailableResource( void InitLocalAvailableResource(
absl::flat_hash_map<std::string, double> &unit_resource) { absl::flat_hash_map<std::string, double> &unit_resource) {
cluster_resource_scheduler_ = auto cluster_resource_scheduler_ =
std::make_shared<ClusterResourceScheduler>("local", unit_resource, *gcs_client_); std::make_shared<ClusterResourceScheduler>("local", unit_resource, *gcs_client_);
new_placement_group_resource_manager_.reset( new_placement_group_resource_manager_.reset(
new raylet::NewPlacementGroupResourceManager( new raylet::NewPlacementGroupResourceManager(
@ -55,23 +54,18 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
} }
void CheckAvailableResoueceEmpty(const std::string &resource) { void CheckAvailableResoueceEmpty(const std::string &resource) {
const auto cluster_resource_scheduler_ =
new_placement_group_resource_manager_->GetResourceScheduler();
ASSERT_TRUE(cluster_resource_scheduler_->IsAvailableResourceEmpty(resource)); ASSERT_TRUE(cluster_resource_scheduler_->IsAvailableResourceEmpty(resource));
} }
void CheckRemainingResourceCorrect(NodeResources &node_resources) { void CheckRemainingResourceCorrect(NodeResources &node_resources) {
const auto cluster_resource_scheduler_ =
new_placement_group_resource_manager_->GetResourceScheduler();
auto local_node_resource = cluster_resource_scheduler_->GetLocalNodeResources(); auto local_node_resource = cluster_resource_scheduler_->GetLocalNodeResources();
ASSERT_TRUE(local_node_resource == node_resources); ASSERT_TRUE(local_node_resource == node_resources);
} }
// TODO(@clay4444): Remove this once we did the batch rpc request refactor!
std::vector<std::shared_ptr<const BundleSpecification>> ConvertSingleSpecToVectorPtrs(
BundleSpecification bundle_spec) {
std::vector<std::shared_ptr<const BundleSpecification>> bundle_specs;
bundle_specs.push_back(
std::make_shared<const BundleSpecification>(std::move(bundle_spec)));
return bundle_specs;
}
bool update_called_ = false; bool update_called_ = false;
bool delete_called_ = false; bool delete_called_ = false;
}; };
@ -85,8 +79,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewPrepareBundleResource) {
/// 2. init local available resource. /// 2. init local available resource.
InitLocalAvailableResource(unit_resource); InitLocalAvailableResource(unit_resource);
/// 3. prepare bundle resource. /// 3. prepare bundle resource.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
ConvertSingleSpecToVectorPtrs(bundle_spec)));
/// 4. check remaining resources is correct. /// 4. check remaining resources is correct.
CheckAvailableResoueceEmpty("CPU"); CheckAvailableResoueceEmpty("CPU");
} }
@ -103,8 +96,7 @@ TEST_F(NewPlacementGroupResourceManagerTest,
init_unit_resource.insert({"CPU", 1.0}); init_unit_resource.insert({"CPU", 1.0});
InitLocalAvailableResource(init_unit_resource); InitLocalAvailableResource(init_unit_resource);
/// 3. prepare bundle resource. /// 3. prepare bundle resource.
ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles( ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
ConvertSingleSpecToVectorPtrs(bundle_spec)));
} }
TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) { TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
@ -116,8 +108,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
/// 2. init local available resource. /// 2. init local available resource.
InitLocalAvailableResource(unit_resource); InitLocalAvailableResource(unit_resource);
/// 3. prepare and commit bundle resource. /// 3. prepare and commit bundle resource.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
ConvertSingleSpecToVectorPtrs(bundle_spec)));
ASSERT_FALSE(update_called_); ASSERT_FALSE(update_called_);
new_placement_group_resource_manager_->CommitBundle(bundle_spec); new_placement_group_resource_manager_->CommitBundle(bundle_spec);
ASSERT_TRUE(update_called_); ASSERT_TRUE(update_called_);
@ -148,8 +139,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) {
/// 2. init local available resource. /// 2. init local available resource.
InitLocalAvailableResource(unit_resource); InitLocalAvailableResource(unit_resource);
/// 3. prepare and commit bundle resource. /// 3. prepare and commit bundle resource.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
ConvertSingleSpecToVectorPtrs(bundle_spec)));
ASSERT_FALSE(update_called_); ASSERT_FALSE(update_called_);
new_placement_group_resource_manager_->CommitBundle(bundle_spec); new_placement_group_resource_manager_->CommitBundle(bundle_spec);
ASSERT_TRUE(update_called_); ASSERT_TRUE(update_called_);
@ -177,10 +167,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
init_unit_resource.insert({"CPU", 2.0}); init_unit_resource.insert({"CPU", 2.0});
InitLocalAvailableResource(init_unit_resource); InitLocalAvailableResource(init_unit_resource);
/// 3. prepare and commit two bundle resource. /// 3. prepare and commit two bundle resource.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(first_bundle_spec));
ConvertSingleSpecToVectorPtrs(first_bundle_spec))); ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(second_bundle_spec));
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(
ConvertSingleSpecToVectorPtrs(second_bundle_spec)));
ASSERT_FALSE(update_called_); ASSERT_FALSE(update_called_);
ASSERT_FALSE(delete_called_); ASSERT_FALSE(delete_called_);
new_placement_group_resource_manager_->CommitBundle(first_bundle_spec); new_placement_group_resource_manager_->CommitBundle(first_bundle_spec);
@ -251,8 +239,7 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare)
InitLocalAvailableResource(available_resource); InitLocalAvailableResource(available_resource);
/// 3. prepare bundle resource 10 times. /// 3. prepare bundle resource 10 times.
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
new_placement_group_resource_manager_->PrepareBundles( new_placement_group_resource_manager_->PrepareBundle(bundle_spec);
ConvertSingleSpecToVectorPtrs(bundle_spec));
} }
/// 4. check remaining resources is correct. /// 4. check remaining resources is correct.
absl::flat_hash_map<std::string, double> remaining_resources = {{"CPU", 3.0}}; absl::flat_hash_map<std::string, double> remaining_resources = {{"CPU", 3.0}};
@ -278,11 +265,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
std::make_pair("CPU", 3.0)}; std::make_pair("CPU", 3.0)};
InitLocalAvailableResource(available_resource); InitLocalAvailableResource(available_resource);
/// 3. prepare bundle -> commit bundle -> prepare bundle. /// 3. prepare bundle -> commit bundle -> prepare bundle.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
ConvertSingleSpecToVectorPtrs(bundle_spec)));
new_placement_group_resource_manager_->CommitBundle(bundle_spec); new_placement_group_resource_manager_->CommitBundle(bundle_spec);
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
ConvertSingleSpecToVectorPtrs(bundle_spec)));
/// 4. check remaining resources is correct. /// 4. check remaining resources is correct.
absl::flat_hash_map<std::string, double> remaining_resources = { absl::flat_hash_map<std::string, double> remaining_resources = {
{"CPU_group_" + group_id.Hex(), 1.0}, {"CPU_group_" + group_id.Hex(), 1.0},
@ -301,16 +286,14 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
CheckRemainingResourceCorrect(remaining_resource_instance); CheckRemainingResourceCorrect(remaining_resource_instance);
new_placement_group_resource_manager_->ReturnBundle(bundle_spec); new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
// 5. prepare bundle -> commit bundle -> commit bundle. // 5. prepare bundle -> commit bundle -> commit bundle.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
ConvertSingleSpecToVectorPtrs(bundle_spec)));
new_placement_group_resource_manager_->CommitBundle(bundle_spec); new_placement_group_resource_manager_->CommitBundle(bundle_spec);
new_placement_group_resource_manager_->CommitBundle(bundle_spec); new_placement_group_resource_manager_->CommitBundle(bundle_spec);
// 6. check remaining resources is correct. // 6. check remaining resources is correct.
CheckRemainingResourceCorrect(remaining_resource_instance); CheckRemainingResourceCorrect(remaining_resource_instance);
new_placement_group_resource_manager_->ReturnBundle(bundle_spec); new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
// 7. prepare bundle -> return bundle -> commit bundle. // 7. prepare bundle -> return bundle -> commit bundle.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles( ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
ConvertSingleSpecToVectorPtrs(bundle_spec)));
new_placement_group_resource_manager_->ReturnBundle(bundle_spec); new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
new_placement_group_resource_manager_->CommitBundle(bundle_spec); new_placement_group_resource_manager_->CommitBundle(bundle_spec);
// 8. check remaining resources is correct. // 8. check remaining resources is correct.
@ -320,63 +303,6 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
CheckRemainingResourceCorrect(remaining_resource_instance); CheckRemainingResourceCorrect(remaining_resource_instance);
} }
TEST_F(NewPlacementGroupResourceManagerTest, TestPreparedResourceBatched) {
// 1. create a placement group spec with 4 bundles and each required 1 CPU.
auto group_id = PlacementGroupID::FromRandom();
absl::flat_hash_map<std::string, double> unit_resource;
unit_resource.insert({"CPU", 1.0});
auto bundle_specs = Mocker::GenBundleSpecifications(group_id, unit_resource, 4);
// 2. init local available resource with 3 CPUs.
absl::flat_hash_map<std::string, double> available_resource = {
std::make_pair("CPU", 3.0)};
InitLocalAvailableResource(available_resource);
// 3. prepare resources for the four bundles.
ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs));
// make sure it keeps Idempotency.
ASSERT_FALSE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs));
// 4. check remaining resources is correct.
absl::flat_hash_map<std::string, double> remaining_resources = {{"CPU", 3.0}};
auto remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
// 5. re-init the local available resource with 4 CPUs.
available_resource = {std::make_pair("CPU", 4.0)};
InitLocalAvailableResource(available_resource);
// 6. re-prepare resources for the four bundles, but this time it should be
// successfully.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs));
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundles(bundle_specs));
for (const auto &bundle_spec : bundle_specs) {
new_placement_group_resource_manager_->CommitBundle(*bundle_spec);
}
// 7. re-check remaining resources is correct.
remaining_resources = {{"CPU_group_" + group_id.Hex(), 4.0},
{"CPU_group_1_" + group_id.Hex(), 1.0},
{"CPU_group_2_" + group_id.Hex(), 1.0},
{"CPU_group_3_" + group_id.Hex(), 1.0},
{"CPU_group_4_" + group_id.Hex(), 1.0},
{"CPU", 4.0},
{"bundle_group_1_" + group_id.Hex(), 1000},
{"bundle_group_2_" + group_id.Hex(), 1000},
{"bundle_group_3_" + group_id.Hex(), 1000},
{"bundle_group_4_" + group_id.Hex(), 1000},
{"bundle_group_" + group_id.Hex(), 4000}};
remaining_resource_scheduler = std::make_shared<ClusterResourceScheduler>(
"remaining", remaining_resources, *gcs_client_);
std::shared_ptr<TaskResourceInstances> resource_instances =
std::make_shared<TaskResourceInstances>();
absl::flat_hash_map<std::string, double> allocating_resource;
allocating_resource.insert({"CPU", 4.0});
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
allocating_resource, resource_instances));
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
RAY_LOG(INFO) << "The current local resource view: "
<< cluster_resource_scheduler_->DebugString();
CheckRemainingResourceCorrect(remaining_resource_instance);
}
} // namespace ray } // namespace ray
int main(int argc, char **argv) { int main(int argc, char **argv) {

View file

@ -373,16 +373,10 @@ void raylet::RayletClient::CancelWorkerLease(
} }
void raylet::RayletClient::PrepareBundleResources( void raylet::RayletClient::PrepareBundleResources(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs, const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback) { const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback) {
rpc::PrepareBundleResourcesRequest request; rpc::PrepareBundleResourcesRequest request;
std::set<std::string> nodes; request.mutable_bundle_spec()->CopyFrom(bundle_spec.GetMessage());
for (const auto &bundle_spec : bundle_specs) {
nodes.insert(bundle_spec->NodeId().Hex());
auto message_bundle = request.add_bundle_specs();
message_bundle->CopyFrom(bundle_spec->GetMessage());
}
RAY_CHECK(nodes.size() == 1);
grpc_client_->PrepareBundleResources(request, callback); grpc_client_->PrepareBundleResources(request, callback);
} }

View file

@ -109,13 +109,14 @@ class WorkerLeaseInterface {
/// Interface for leasing resource. /// Interface for leasing resource.
class ResourceReserveInterface { class ResourceReserveInterface {
public: public:
/// Request a raylet to prepare resources of given bundles for atomic placement group /// Request a raylet to prepare resources of a given bundle for atomic placement group
/// creation. This is used for the first phase of atomic placement group creation. The /// creation. This is used for the first phase of atomic placement group creation. The
/// callback will be sent via gRPC. /// callback will be sent via gRPC.
/// \param bundle_specs Bundles to be scheduled at this raylet. /// \param resource_spec Resources that should be
/// allocated for the worker.
/// \return ray::Status /// \return ray::Status
virtual void PrepareBundleResources( virtual void PrepareBundleResources(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs, const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply>
&callback) = 0; &callback) = 0;
@ -400,7 +401,7 @@ class RayletClient : public RayletClientInterface {
/// Implements PrepareBundleResourcesInterface. /// Implements PrepareBundleResourcesInterface.
void PrepareBundleResources( void PrepareBundleResources(
const std::vector<std::shared_ptr<const BundleSpecification>> &bundle_specs, const BundleSpecification &bundle_spec,
const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback) const ray::rpc::ClientCallback<ray::rpc::PrepareBundleResourcesReply> &callback)
override; override;