[Placement Group]Add strict spread strategy (#10174)

* support STRICT_SPREAD strategy

* fix review comments

* rebase master

* fix lint error

* fix lint error

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin 2020-08-21 01:18:58 +08:00 committed by GitHub
parent 224933b5e4
commit a462ae2747
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 278 additions and 63 deletions

View file

@ -8,14 +8,22 @@ public enum PlacementStrategy {
* Packs Bundles close together inside nodes as tight as possible.
*/
PACK(0),
/**
* Places Bundles across distinct nodes as even as possible.
*/
SPREAD(1),
/**
* Packs Bundles into one node. The group is not allowed to span multiple nodes.
*/
STRICT_PACK(2);
STRICT_PACK(2),
/**
* Places Bundles across distinct nodes.
* The group is not allowed to deploy more than one bundle on a node.
*/
STRICT_SPREAD(3);
private int value = 0;

View file

@ -70,6 +70,7 @@ from ray.includes.common cimport (
PLACEMENT_STRATEGY_PACK,
PLACEMENT_STRATEGY_SPREAD,
PLACEMENT_STRATEGY_STRICT_PACK,
PLACEMENT_STRATEGY_STRICT_SPREAD,
)
from ray.includes.unique_ids cimport (
CActorID,
@ -1064,9 +1065,11 @@ cdef class CoreWorker:
c_strategy = PLACEMENT_STRATEGY_PACK
elif strategy == b"SPREAD":
c_strategy = PLACEMENT_STRATEGY_SPREAD
elif strategy == b"STRICT_PACK":
c_strategy = PLACEMENT_STRATEGY_STRICT_PACK
else:
if strategy == b"STRICT_PACK":
c_strategy = PLACEMENT_STRATEGY_STRICT_PACK
if strategy == b"STRICT_SPREAD":
c_strategy = PLACEMENT_STRATEGY_STRICT_SPREAD
else:
raise TypeError(strategy)

View file

@ -19,6 +19,7 @@ def placement_group(bundles: List[Dict[str, float]],
PACK: Packs Bundles into as few nodes as possible.
SPREAD: Places Bundles across distinct nodes as even as possible.
STRICT_PACK: Packs Bundles into one node.
STRICT_SPREAD: Packs Bundles across distinct nodes.
The group is not allowed to span multiple nodes.
name: The name of the placement group.
"""

View file

@ -176,6 +176,8 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
"ray::PlacementStrategy::SPREAD"
cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_PACK \
"ray::PlacementStrategy::STRICT_PACK"
cdef CPlacementStrategy PLACEMENT_STRATEGY_STRICT_SPREAD \
"ray::PlacementStrategy::STRICT_SPREAD"
cdef extern from "ray/common/task/scheduling_resources.h" nogil:
cdef cppclass ResourceSet "ray::ResourceSet":

View file

@ -137,7 +137,7 @@ def test_placement_group_spread(ray_start_cluster):
# Get all actors.
actor_infos = ray.actors()
# Make sure all actors in counter_list are collocated in one node.
# Make sure all actors in counter_list are located in separate nodes.
actor_info_1 = actor_infos.get(actor_1._actor_id.hex())
actor_info_2 = actor_infos.get(actor_2._actor_id.hex())
@ -148,6 +148,63 @@ def test_placement_group_spread(ray_start_cluster):
assert node_of_actor_1 != node_of_actor_2
def test_placement_group_strict_spread(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
def __init__(self):
self.n = 0
def value(self):
return self.n
cluster = ray_start_cluster
num_nodes = 3
for _ in range(num_nodes):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
placement_group_id = ray.experimental.placement_group(
name="name",
strategy="STRICT_SPREAD",
bundles=[{
"CPU": 2
}, {
"CPU": 2
}, {
"CPU": 2
}])
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=0).remote()
actor_2 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=1).remote()
actor_3 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=2).remote()
print(ray.get(actor_1.value.remote()))
print(ray.get(actor_2.value.remote()))
print(ray.get(actor_3.value.remote()))
# Get all actors.
actor_infos = ray.actors()
# Make sure all actors in counter_list are located in separate nodes.
actor_info_1 = actor_infos.get(actor_1._actor_id.hex())
actor_info_2 = actor_infos.get(actor_2._actor_id.hex())
actor_info_3 = actor_infos.get(actor_3._actor_id.hex())
assert actor_info_1 and actor_info_2 and actor_info_3
node_of_actor_1 = actor_info_1["Address"]["NodeID"]
node_of_actor_2 = actor_info_2["Address"]["NodeID"]
node_of_actor_3 = actor_info_3["Address"]["NodeID"]
assert node_of_actor_1 != node_of_actor_2
assert node_of_actor_1 != node_of_actor_3
assert node_of_actor_2 != node_of_actor_3
def test_placement_group_actor_resource_ids(ray_start_cluster):
@ray.remote(num_cpus=1)
class F:

View file

@ -166,8 +166,10 @@ inline ray::PlacementStrategy ConvertStrategy(jint java_strategy) {
return ray::rpc::PACK;
case 1:
return ray::rpc::SPREAD;
default:
case 2:
return ray::rpc::STRICT_PACK;
default:
return ray::rpc::STRICT_SPREAD;
}
}

View file

@ -32,6 +32,7 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
scheduler_strategies_.push_back(std::make_shared<GcsPackStrategy>());
scheduler_strategies_.push_back(std::make_shared<GcsSpreadStrategy>());
scheduler_strategies_.push_back(std::make_shared<GcsStrictPackStrategy>());
scheduler_strategies_.push_back(std::make_shared<GcsStrictSpreadStrategy>());
}
ScheduleMap GcsStrictPackStrategy::Schedule(
@ -97,19 +98,82 @@ ScheduleMap GcsPackStrategy::Schedule(
ScheduleMap GcsSpreadStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) {
// When selecting nodes, if you traverse from the beginning each time, a large number of
// bundles will be deployed to the previous nodes. So we start with the next node of the
// last selected node.
ScheduleMap schedule_map;
auto &alive_nodes = context->node_manager_.GetClusterRealtimeResources();
auto iter = alive_nodes.begin();
size_t index = 0;
size_t alive_nodes_size = alive_nodes.size();
for (; iter != alive_nodes.end(); iter++, index++) {
for (size_t base = 0;; base++) {
if (index + base * alive_nodes_size >= bundles.size()) {
auto node_resources = context->node_manager_.GetClusterRealtimeResources();
if (node_resources.empty()) {
return schedule_map;
}
auto candidate_nodes = node_resources;
auto iter = candidate_nodes.begin();
auto iter_begin = iter;
for (const auto &bundle : bundles) {
const auto &required_resources = bundle->GetRequiredResources();
for (; iter != candidate_nodes.end(); ++iter) {
if (required_resources.IsSubset(*iter->second)) {
node_resources[iter->first]->SubtractResourcesStrict(required_resources);
schedule_map[bundle->BundleId()] = iter->first;
break;
} else {
schedule_map[bundles[index + base * alive_nodes_size]->BundleId()] = iter->first;
}
}
if (iter == candidate_nodes.end() && iter_begin != candidate_nodes.begin()) {
for (iter = candidate_nodes.begin(); iter != iter_begin; ++iter) {
if (required_resources.IsSubset(*iter->second)) {
node_resources[iter->first]->SubtractResourcesStrict(required_resources);
schedule_map[bundle->BundleId()] = iter->first;
break;
}
}
if (iter == iter_begin) {
break;
}
}
iter_begin = ++iter;
}
if (schedule_map.size() != bundles.size()) {
schedule_map.clear();
}
return schedule_map;
}
ScheduleMap GcsStrictSpreadStrategy::Schedule(
std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) {
// TODO(ffbin): A bundle may require special resources, such as GPU. We need to
// schedule bundles with special resource requirements first, which will be implemented
// in the next pr.
ScheduleMap schedule_map;
auto candidate_nodes = context->node_manager_.GetClusterRealtimeResources();
// The number of bundles is more than the number of nodes, scheduling fails.
if (bundles.size() > candidate_nodes.size()) {
return schedule_map;
}
for (const auto &bundle : bundles) {
const auto &required_resources = bundle->GetRequiredResources();
auto iter = candidate_nodes.begin();
for (; iter != candidate_nodes.end(); ++iter) {
if (required_resources.IsSubset(*iter->second)) {
schedule_map[bundle->BundleId()] = iter->first;
candidate_nodes.erase(iter);
break;
}
}
// Node resource is not satisfied, scheduling failed.
if (iter == candidate_nodes.end()) {
break;
}
}
if (schedule_map.size() != bundles.size()) {
schedule_map.clear();
}
return schedule_map;
}

View file

@ -121,6 +121,15 @@ class GcsStrictPackStrategy : public GcsScheduleStrategy {
const std::unique_ptr<ScheduleContext> &context) override;
};
/// The `GcsStrictSpreadStrategy` is that spread all bundles in different nodes.
/// A node can only deploy one bundle.
/// If the node resource is insufficient, it will fail to schedule.
class GcsStrictSpreadStrategy : public GcsScheduleStrategy {
public:
ScheduleMap Schedule(std::vector<std::shared_ptr<ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context) override;
};
/// GcsPlacementGroupScheduler is responsible for scheduling placement_groups registered
/// to GcsPlacementGroupManager. This class is not thread-safe.
class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {

View file

@ -31,6 +31,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
raylet_client_ = std::make_shared<GcsServerMocker::MockRayletResourceClient>();
raylet_client1_ = std::make_shared<GcsServerMocker::MockRayletResourceClient>();
raylet_client2_ = std::make_shared<GcsServerMocker::MockRayletResourceClient>();
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
@ -43,8 +44,10 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
[this](const rpc::Address &address) {
if (0 == address.port()) {
return raylet_client_;
} else {
} else if (1 == address.port()) {
return raylet_client1_;
} else {
return raylet_client2_;
}
});
}
@ -71,6 +74,59 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
heartbeat);
}
void ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy strategy) {
ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size());
auto request = Mocker::GenCreatePlacementGroupRequest("", strategy);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
// Schedule the placement_group with zero node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
failure_placement_groups_.emplace_back(std::move(placement_group));
},
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
success_placement_groups_.emplace_back(std::move(placement_group));
});
// The lease request should not be send and the scheduling of placement_group should
// fail as there are no available nodes.
ASSERT_EQ(raylet_client_->num_lease_requested, 0);
ASSERT_EQ(0, success_placement_groups_.size());
ASSERT_EQ(1, failure_placement_groups_.size());
ASSERT_EQ(placement_group, failure_placement_groups_.front());
}
void SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy strategy) {
auto node = Mocker::GenNodeInfo();
AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
auto request = Mocker::GenCreatePlacementGroupRequest("", strategy);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
// Schedule the placement_group with 1 available node, and the lease request should be
// send to the node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
});
ASSERT_EQ(2, raylet_client_->num_lease_requested);
ASSERT_EQ(2, raylet_client_->lease_callbacks.size());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
WaitPendingDone(failure_placement_groups_, 0);
WaitPendingDone(success_placement_groups_, 1);
ASSERT_EQ(placement_group, success_placement_groups_.front());
}
void ReschedulingWhenNodeAddTest(rpc::PlacementStrategy strategy) {
AddNode(Mocker::GenNodeInfo(0), 1);
auto failure_handler =
@ -110,6 +166,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockRayletResourceClient> raylet_client_;
std::shared_ptr<GcsServerMocker::MockRayletResourceClient> raylet_client1_;
std::shared_ptr<GcsServerMocker::MockRayletResourceClient> raylet_client2_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<GcsServerMocker::MockedGcsPlacementGroupScheduler> scheduler_;
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> success_placement_groups_;
@ -119,57 +176,32 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
std::shared_ptr<gcs::RedisClient> redis_client_;
};
TEST_F(GcsPlacementGroupSchedulerTest, TestScheduleFailedWithZeroNode) {
ASSERT_EQ(0, gcs_node_manager_->GetAllAliveNodes().size());
auto request = Mocker::GenCreatePlacementGroupRequest();
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
// Schedule the placement_group with zero node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
failure_placement_groups_.emplace_back(std::move(placement_group));
},
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
success_placement_groups_.emplace_back(std::move(placement_group));
});
// The lease request should not be send and the scheduling of placement_group should
// fail as there are no available nodes.
ASSERT_EQ(raylet_client_->num_lease_requested, 0);
ASSERT_EQ(0, success_placement_groups_.size());
ASSERT_EQ(1, failure_placement_groups_.size());
ASSERT_EQ(placement_group, failure_placement_groups_.front());
TEST_F(GcsPlacementGroupSchedulerTest, TestSpreadScheduleFailedWithZeroNode) {
ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy::SPREAD);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupSuccess) {
auto node = Mocker::GenNodeInfo();
AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
TEST_F(GcsPlacementGroupSchedulerTest, TestPackScheduleFailedWithZeroNode) {
ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy::PACK);
}
auto request = Mocker::GenCreatePlacementGroupRequest();
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackScheduleFailedWithZeroNode) {
ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy::STRICT_PACK);
}
// Schedule the placement_group with 1 available node, and the lease request should be
// send to the node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
});
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadScheduleFailedWithZeroNode) {
ScheduleFailedWithZeroNodeTest(rpc::PlacementStrategy::STRICT_SPREAD);
}
ASSERT_EQ(2, raylet_client_->num_lease_requested);
ASSERT_EQ(2, raylet_client_->lease_callbacks.size());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
WaitPendingDone(failure_placement_groups_, 0);
WaitPendingDone(success_placement_groups_, 1);
ASSERT_EQ(placement_group, success_placement_groups_.front());
TEST_F(GcsPlacementGroupSchedulerTest, TestSpreadSchedulePlacementGroupSuccess) {
SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::SPREAD);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestPackSchedulePlacementGroupSuccess) {
SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::PACK);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackSchedulePlacementGroupSuccess) {
SchedulePlacementGroupSuccessTest(rpc::PlacementStrategy::STRICT_PACK);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestSchedulePlacementGroupFailed) {
@ -458,6 +490,40 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestRescheduleWhenNodeDead) {
WaitPendingDone(success_placement_groups_, 2);
}
TEST_F(GcsPlacementGroupSchedulerTest, TestStrictSpreadStrategyResourceCheck) {
auto node0 = Mocker::GenNodeInfo(0);
AddNode(node0);
auto failure_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
};
auto success_handler = [this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&vector_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
};
auto request = Mocker::GenCreatePlacementGroupRequest(
"", rpc::PlacementStrategy::STRICT_SPREAD, 2, 2);
auto placement_group = std::make_shared<gcs::GcsPlacementGroup>(request);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
// The number of nodes is less than the number of bundles, scheduling failed.
WaitPendingDone(failure_placement_groups_, 1);
// Node1 resource is insufficient, scheduling failed.
auto node1 = Mocker::GenNodeInfo(1);
AddNode(node1, 1);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
WaitPendingDone(failure_placement_groups_, 2);
// The node2 resource is enough and the scheduling is successful.
auto node2 = Mocker::GenNodeInfo(2);
AddNode(node2);
scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler);
ASSERT_TRUE(raylet_client_->GrantResourceReserve());
ASSERT_TRUE(raylet_client2_->GrantResourceReserve());
WaitPendingDone(success_placement_groups_, 1);
}
} // namespace ray
int main(int argc, char **argv) {

View file

@ -91,11 +91,11 @@ struct Mocker {
static rpc::CreatePlacementGroupRequest GenCreatePlacementGroupRequest(
const std::string name = "",
rpc::PlacementStrategy strategy = rpc::PlacementStrategy::SPREAD,
int bundles_count = 2) {
int bundles_count = 2, double cpu_num = 1.0) {
rpc::CreatePlacementGroupRequest request;
std::vector<std::unordered_map<std::string, double>> bundles;
std::unordered_map<std::string, double> bundle;
bundle["CPU"] = 1.0;
bundle["CPU"] = cpu_num;
for (int index = 0; index < bundles_count; ++index) {
bundles.push_back(bundle);
}

View file

@ -52,6 +52,9 @@ enum PlacementStrategy {
SPREAD = 1;
// Packs Bundles within one node. The group is not allowed to span multiple nodes.
STRICT_PACK = 2;
// Places Bundles across distinct nodes.
// The group is not allowed to deploy more than one bundle on a node.
STRICT_SPREAD = 3;
}
// Address of a worker or node manager.