rename gcs_resource_scheduler to cluster_resource_scheduler (#23274)

This commit is contained in:
ZhuSenlin 2022-03-18 13:19:33 +08:00 committed by GitHub
parent f40fe73f4b
commit d3f92cca33
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 89 additions and 70 deletions

View file

@ -82,7 +82,7 @@ class MockGcsScheduleStrategy : public GcsScheduleStrategy {
Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
ClusterResourceScheduler &cluster_resource_scheduler),
(override));
};
@ -99,7 +99,7 @@ class MockGcsPackStrategy : public GcsPackStrategy {
Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
ClusterResourceScheduler &cluster_resource_scheduler),
(override));
};
@ -116,7 +116,7 @@ class MockGcsSpreadStrategy : public GcsSpreadStrategy {
Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
ClusterResourceScheduler &cluster_resource_scheduler),
(override));
};
@ -133,7 +133,7 @@ class MockGcsStrictPackStrategy : public GcsStrictPackStrategy {
Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
ClusterResourceScheduler &cluster_resource_scheduler),
(override));
};
@ -150,7 +150,7 @@ class MockGcsStrictSpreadStrategy : public GcsStrictSpreadStrategy {
Schedule,
(const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler),
ClusterResourceScheduler &cluster_resource_scheduler),
(override));
};

View file

@ -36,7 +36,7 @@ GcsBasedActorScheduler::GcsBasedActorScheduler(
instrumented_io_context &io_context,
GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler,
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler,
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
@ -50,7 +50,7 @@ GcsBasedActorScheduler::GcsBasedActorScheduler(
schedule_success_handler,
raylet_client_pool,
client_factory),
gcs_resource_scheduler_(std::move(gcs_resource_scheduler)),
cluster_resource_scheduler_(std::move(cluster_resource_scheduler)),
normal_task_resources_changed_callback_(normal_task_resources_changed_callback) {}
NodeID GcsBasedActorScheduler::SelectNode(std::shared_ptr<GcsActor> actor) {
@ -105,7 +105,7 @@ GcsBasedActorScheduler::AllocateNewActorWorkerAssignment(
scheduling::NodeID GcsBasedActorScheduler::AllocateResources(
const ResourceRequest &required_resources) {
auto selected_nodes =
gcs_resource_scheduler_->Schedule({required_resources}, SchedulingType::SPREAD)
cluster_resource_scheduler_->Schedule({required_resources}, SchedulingType::SPREAD)
.second;
if (selected_nodes.size() == 0) {
@ -118,7 +118,8 @@ scheduling::NodeID GcsBasedActorScheduler::AllocateResources(
auto selected_node_id = selected_nodes[0];
if (!selected_node_id.IsNil()) {
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
auto &cluster_resource_manager =
cluster_resource_scheduler_->GetClusterResourceManager();
// Acquire the resources from the selected node.
RAY_CHECK(cluster_resource_manager.SubtractNodeAvailableResources(
selected_node_id, required_resources));
@ -129,7 +130,8 @@ scheduling::NodeID GcsBasedActorScheduler::AllocateResources(
scheduling::NodeID GcsBasedActorScheduler::GetHighestScoreNodeResource(
const ResourceRequest &required_resources) const {
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
auto &cluster_resource_manager =
cluster_resource_scheduler_->GetClusterResourceManager();
const auto &resource_view = cluster_resource_manager.GetResourceView();
/// Get the highest score node
@ -151,7 +153,8 @@ scheduling::NodeID GcsBasedActorScheduler::GetHighestScoreNodeResource(
void GcsBasedActorScheduler::WarnResourceAllocationFailure(
const TaskSpecification &task_spec, const ResourceRequest &required_resources) const {
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
auto &cluster_resource_manager =
cluster_resource_scheduler_->GetClusterResourceManager();
auto scheduling_node_id = GetHighestScoreNodeResource(required_resources);
const NodeResources *node_resources = nullptr;
if (!scheduling_node_id.IsNil()) {
@ -237,7 +240,8 @@ void GcsBasedActorScheduler::HandleWorkerLeaseRejectedReply(
std::shared_ptr<GcsActor> actor, const rpc::RequestWorkerLeaseReply &reply) {
// The request was rejected because of insufficient resources.
auto node_id = actor->GetNodeID();
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
auto &cluster_resource_manager =
cluster_resource_scheduler_->GetClusterResourceManager();
if (normal_task_resources_changed_callback_) {
normal_task_resources_changed_callback_(node_id, reply.resources_data());
}
@ -261,7 +265,8 @@ void GcsBasedActorScheduler::NotifyClusterResourcesChanged() {
}
void GcsBasedActorScheduler::ResetActorWorkerAssignment(GcsActor *actor) {
auto &cluster_resource_manager = gcs_resource_scheduler_->GetClusterResourceManager();
auto &cluster_resource_manager =
cluster_resource_scheduler_->GetClusterResourceManager();
if (cluster_resource_manager.AddNodeAvailableResources(
scheduling::NodeID(actor->GetActorWorkerAssignment()->GetNodeID().Binary()),
actor->GetActorWorkerAssignment()->GetResources())) {

View file

@ -31,6 +31,8 @@
namespace ray {
namespace gcs {
using ClusterResourceScheduler = gcs::GcsResourceScheduler;
/// `GcsActorWorkerAssignment` represents the assignment from one or multiple actors to a
/// worker process.
/// TODO(Chong-Li): It contains multiple slots, and each of them can bind to an actor.
@ -71,7 +73,7 @@ class GcsBasedActorScheduler : public GcsActorScheduler {
/// \param io_context The main event loop.
/// \param gcs_actor_table Used to flush actor info to storage.
/// \param gcs_node_manager The node manager which is used when scheduling.
/// \param gcs_resource_scheduler The scheduler to select nodes based on cluster
/// \param cluster_resource_scheduler The scheduler to select nodes based on cluster
/// resources.
/// \param schedule_failure_handler Invoked when there are no available nodes to
/// schedule actors.
@ -84,7 +86,7 @@ class GcsBasedActorScheduler : public GcsActorScheduler {
instrumented_io_context &io_context,
GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler,
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler,
GcsActorSchedulerFailureCallback schedule_failure_handler,
GcsActorSchedulerSuccessCallback schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
@ -164,7 +166,7 @@ class GcsBasedActorScheduler : public GcsActorScheduler {
std::vector<std::function<void()>> resource_changed_listeners_;
/// Gcs resource scheduler
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler_;
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
/// Normal task resources changed callback.
std::function<void(const NodeID &, const rpc::ResourcesData &)>

View file

@ -50,14 +50,14 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler(
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const gcs::GcsNodeManager &gcs_node_manager,
GcsResourceManager &gcs_resource_manager,
GcsResourceScheduler &gcs_resource_scheduler,
ClusterResourceScheduler &cluster_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
syncer::RaySyncer &ray_syncer)
: return_timer_(io_context),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_node_manager_(gcs_node_manager),
gcs_resource_manager_(gcs_resource_manager),
gcs_resource_scheduler_(gcs_resource_scheduler),
cluster_resource_scheduler_(cluster_resource_scheduler),
raylet_client_pool_(raylet_client_pool),
ray_syncer_(ray_syncer) {
scheduler_strategies_.push_back(std::make_shared<GcsPackStrategy>());
@ -94,10 +94,10 @@ ScheduleResult GcsScheduleStrategy::GenerateScheduleResult(
ScheduleResult GcsStrictPackStrategy::Schedule(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
ClusterResourceScheduler &cluster_resource_scheduler) {
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &scheduling_result =
gcs_resource_scheduler.Schedule(required_resources, SchedulingType::STRICT_PACK);
const auto &scheduling_result = cluster_resource_scheduler.Schedule(
required_resources, SchedulingType::STRICT_PACK);
return GenerateScheduleResult(
bundles, scheduling_result.second, scheduling_result.first);
}
@ -105,13 +105,13 @@ ScheduleResult GcsStrictPackStrategy::Schedule(
ScheduleResult GcsPackStrategy::Schedule(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
ClusterResourceScheduler &cluster_resource_scheduler) {
// The current algorithm is to select a node and deploy as many bundles as possible.
// First fill up a node. If the node resource is insufficient, select a new node.
// TODO(ffbin): We will speed this up in next PR. Currently it is a double for loop.
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &scheduling_result =
gcs_resource_scheduler.Schedule(required_resources, SchedulingType::PACK);
cluster_resource_scheduler.Schedule(required_resources, SchedulingType::PACK);
return GenerateScheduleResult(
bundles, scheduling_result.second, scheduling_result.first);
}
@ -119,10 +119,10 @@ ScheduleResult GcsPackStrategy::Schedule(
ScheduleResult GcsSpreadStrategy::Schedule(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
ClusterResourceScheduler &cluster_resource_scheduler) {
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &scheduling_result =
gcs_resource_scheduler.Schedule(required_resources, SchedulingType::SPREAD);
cluster_resource_scheduler.Schedule(required_resources, SchedulingType::SPREAD);
return GenerateScheduleResult(
bundles, scheduling_result.second, scheduling_result.first);
}
@ -130,7 +130,7 @@ ScheduleResult GcsSpreadStrategy::Schedule(
ScheduleResult GcsStrictSpreadStrategy::Schedule(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) {
ClusterResourceScheduler &cluster_resource_scheduler) {
// TODO(ffbin): A bundle may require special resources, such as GPU. We need to
// schedule bundles with special resource requirements first, which will be implemented
// in the next pr.
@ -145,7 +145,7 @@ ScheduleResult GcsStrictSpreadStrategy::Schedule(
}
const auto &required_resources = GetRequiredResourcesFromBundles(bundles);
const auto &scheduling_result = gcs_resource_scheduler.Schedule(
const auto &scheduling_result = cluster_resource_scheduler.Schedule(
required_resources,
SchedulingType::STRICT_SPREAD,
/*node_filter_func=*/[&nodes_in_use](const scheduling::NodeID &node_id) {
@ -179,7 +179,7 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
auto scheduling_result = scheduler_strategies_[strategy]->Schedule(
bundles,
GetScheduleContext(placement_group->GetPlacementGroupID()),
gcs_resource_scheduler_);
cluster_resource_scheduler_);
auto result_status = scheduling_result.first;
auto selected_nodes = scheduling_result.second;
@ -656,7 +656,8 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
void GcsPlacementGroupScheduler::AcquireBundleResources(
const std::shared_ptr<BundleLocations> &bundle_locations) {
// Acquire bundle resources from gcs resources manager.
auto &cluster_resource_manager = gcs_resource_scheduler_.GetClusterResourceManager();
auto &cluster_resource_manager =
cluster_resource_scheduler_.GetClusterResourceManager();
for (auto &bundle : *bundle_locations) {
cluster_resource_manager.SubtractNodeAvailableResources(
scheduling::NodeID(bundle.second.first.Binary()),
@ -667,7 +668,8 @@ void GcsPlacementGroupScheduler::AcquireBundleResources(
void GcsPlacementGroupScheduler::ReturnBundleResources(
const std::shared_ptr<BundleLocations> &bundle_locations) {
// Release bundle resources to gcs resources manager.
auto &cluster_resource_manager = gcs_resource_scheduler_.GetClusterResourceManager();
auto &cluster_resource_manager =
cluster_resource_scheduler_.GetClusterResourceManager();
for (auto &bundle : *bundle_locations) {
cluster_resource_manager.AddNodeAvailableResources(
scheduling::NodeID(bundle.second.first.Binary()),

View file

@ -34,6 +34,8 @@ namespace gcs {
class GcsPlacementGroup;
using ClusterResourceScheduler = gcs::GcsResourceScheduler;
using ReserveResourceClientFactoryFn =
std::function<std::shared_ptr<ResourceReserveInterface>(const rpc::Address &address)>;
@ -125,7 +127,7 @@ class GcsScheduleStrategy {
virtual ScheduleResult Schedule(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) = 0;
ClusterResourceScheduler &cluster_resource_scheduler) = 0;
protected:
/// Get required resources from bundles.
@ -155,7 +157,7 @@ class GcsPackStrategy : public GcsScheduleStrategy {
ScheduleResult Schedule(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
ClusterResourceScheduler &cluster_resource_scheduler) override;
};
/// The `GcsSpreadStrategy` is that spread all bundles in different nodes.
@ -164,7 +166,7 @@ class GcsSpreadStrategy : public GcsScheduleStrategy {
ScheduleResult Schedule(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
ClusterResourceScheduler &cluster_resource_scheduler) override;
};
/// The `GcsStrictPackStrategy` is that all bundles must be scheduled to one node. If one
@ -174,7 +176,7 @@ class GcsStrictPackStrategy : public GcsScheduleStrategy {
ScheduleResult Schedule(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
ClusterResourceScheduler &cluster_resource_scheduler) override;
};
/// The `GcsStrictSpreadStrategy` is that spread all bundles in different nodes.
@ -185,7 +187,7 @@ class GcsStrictSpreadStrategy : public GcsScheduleStrategy {
ScheduleResult Schedule(
const std::vector<std::shared_ptr<const ray::BundleSpecification>> &bundles,
const std::unique_ptr<ScheduleContext> &context,
GcsResourceScheduler &gcs_resource_scheduler) override;
ClusterResourceScheduler &cluster_resource_scheduler) override;
};
enum class LeasingState {
@ -417,14 +419,14 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// \param placement_group_info_accessor Used to flush placement_group info to storage.
/// \param gcs_node_manager The node manager which is used when scheduling.
/// \param gcs_resource_manager The resource manager which is used when scheduling.
/// \param gcs_resource_scheduler The resource scheduler which is used when scheduling.
/// \param lease_client_factory Factory to create remote lease client.
/// \param cluster_resource_scheduler The resource scheduler which is used when
/// scheduling. \param lease_client_factory Factory to create remote lease client.
GcsPlacementGroupScheduler(
instrumented_io_context &io_context,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
const GcsNodeManager &gcs_node_manager,
GcsResourceManager &gcs_resource_manager,
GcsResourceScheduler &gcs_resource_scheduler,
ClusterResourceScheduler &cluster_resource_scheduler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
syncer::RaySyncer &ray_syncer);
@ -581,8 +583,8 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// Reference of GcsResourceManager.
GcsResourceManager &gcs_resource_manager_;
/// Reference of GcsResourceScheduler.
GcsResourceScheduler &gcs_resource_scheduler_;
/// Reference of ClusterResourceScheduler.
ClusterResourceScheduler &cluster_resource_scheduler_;
/// A vector to store all the schedule strategy.
std::vector<std::shared_ptr<GcsScheduleStrategy>> scheduler_strategies_;

View file

@ -125,8 +125,8 @@ void GcsServer::Start() {
}
void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs resource scheduler.
InitGcsResourceScheduler();
// Init cluster resource scheduler.
InitClusterResourceScheduler();
// Init gcs resource manager.
InitGcsResourceManager(gcs_init_data);
@ -255,9 +255,9 @@ void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) {
}
void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_resource_scheduler_);
RAY_CHECK(gcs_table_storage_ && cluster_resource_scheduler_);
gcs_resource_manager_ = std::make_shared<GcsResourceManager>(
gcs_table_storage_, gcs_resource_scheduler_->GetClusterResourceManager());
gcs_table_storage_, cluster_resource_scheduler_->GetClusterResourceManager());
// Initialize by gcs tables data.
gcs_resource_manager_->Initialize(gcs_init_data);
@ -267,8 +267,8 @@ void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) {
rpc_server_.RegisterService(*node_resource_info_service_);
}
void GcsServer::InitGcsResourceScheduler() {
gcs_resource_scheduler_ = std::make_shared<GcsResourceScheduler>();
void GcsServer::InitClusterResourceScheduler() {
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>();
}
void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) {
@ -306,12 +306,12 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
};
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
RAY_CHECK(gcs_resource_manager_ && gcs_resource_scheduler_);
RAY_CHECK(gcs_resource_manager_ && cluster_resource_scheduler_);
scheduler = std::make_unique<GcsBasedActorScheduler>(
main_service_,
gcs_table_storage_->ActorTable(),
*gcs_node_manager_,
gcs_resource_scheduler_,
cluster_resource_scheduler_,
schedule_failure_handler,
schedule_success_handler,
raylet_client_pool_,
@ -370,11 +370,12 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_node_manager_);
auto scheduler = std::make_shared<GcsPlacementGroupScheduler>(main_service_,
auto scheduler =
std::make_shared<GcsPlacementGroupScheduler>(main_service_,
gcs_table_storage_,
*gcs_node_manager_,
*gcs_resource_manager_,
*gcs_resource_scheduler_,
*cluster_resource_scheduler_,
raylet_client_pool_,
*ray_syncer_);

View file

@ -35,6 +35,8 @@
namespace ray {
namespace gcs {
using ClusterResourceScheduler = gcs::GcsResourceScheduler;
struct GcsServerConfig {
std::string grpc_server_name = "GcsServer";
uint16_t grpc_server_port = 0;
@ -102,8 +104,8 @@ class GcsServer {
/// Initialize synchronization service
void InitRaySyncer(const GcsInitData &gcs_init_data);
/// Initialize gcs resource scheduler.
void InitGcsResourceScheduler();
/// Initialize cluster resource scheduler.
void InitClusterResourceScheduler();
/// Initialize gcs job manager.
void InitGcsJobManager(const GcsInitData &gcs_init_data);
@ -180,8 +182,8 @@ class GcsServer {
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
/// The gcs resource manager.
std::shared_ptr<GcsResourceManager> gcs_resource_manager_;
/// The gcs resource scheduler.
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler_;
/// The cluster resource scheduler.
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
/// The gcs node manager.
std::shared_ptr<GcsNodeManager> gcs_node_manager_;
/// The heartbeat manager.

View file

@ -24,6 +24,10 @@
#include "ray/gcs/test/gcs_test_util.h"
namespace ray {
namespace gcs {
using ClusterResourceScheduler = gcs::GcsResourceScheduler;
class GcsBasedActorSchedulerTest : public ::testing::Test {
public:
void SetUp() override {
@ -39,15 +43,15 @@ class GcsBasedActorSchedulerTest : public ::testing::Test {
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
gcs_resource_scheduler_ = std::make_shared<gcs::GcsResourceScheduler>();
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>();
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
gcs_table_storage_, gcs_resource_scheduler_->GetClusterResourceManager());
gcs_table_storage_, cluster_resource_scheduler_->GetClusterResourceManager());
gcs_actor_scheduler_ =
std::make_shared<GcsServerMocker::MockedGcsBasedActorScheduler>(
io_service_,
*gcs_actor_table_,
*gcs_node_manager_,
gcs_resource_scheduler_,
cluster_resource_scheduler_,
/*schedule_failure_handler=*/
[this](std::shared_ptr<gcs::GcsActor> actor,
const rpc::RequestWorkerLeaseReply::SchedulingFailureType,
@ -110,7 +114,7 @@ class GcsBasedActorSchedulerTest : public ::testing::Test {
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
std::shared_ptr<GcsServerMocker::MockWorkerClient> worker_client_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<gcs::GcsResourceScheduler> gcs_resource_scheduler_;
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<GcsServerMocker::MockedGcsBasedActorScheduler> gcs_actor_scheduler_;
std::vector<std::shared_ptr<gcs::GcsActor>> success_actors_;
@ -171,7 +175,7 @@ TEST_F(GcsBasedActorSchedulerTest, TestScheduleAndDestroyOneActor) {
scheduling::NodeID scheduling_node_id(node->node_id());
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
const auto &cluster_resource_manager =
gcs_resource_scheduler_->GetClusterResourceManager();
cluster_resource_scheduler_->GetClusterResourceManager();
auto resource_view_before_scheduling = cluster_resource_manager.GetResourceView();
ASSERT_TRUE(resource_view_before_scheduling.contains(scheduling_node_id));
@ -654,7 +658,7 @@ TEST_F(GcsBasedActorSchedulerTest, TestReleaseUnusedWorkers) {
gcs_actor_scheduler_->TryLeaseWorkerFromNodeAgain(actor, node);
ASSERT_EQ(raylet_client_->num_workers_requested, 1);
}
} // namespace gcs
} // namespace ray
int main(int argc, char **argv) {

View file

@ -28,6 +28,8 @@ enum class GcsPlacementGroupStatus : int32_t {
};
class GcsPlacementGroupSchedulerTest : public ::testing::Test {
using ClusterResourceScheduler = gcs::GcsResourceScheduler;
public:
void SetUp() override {
thread_io_service_.reset(new std::thread([this] {
@ -41,9 +43,9 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_publisher_ = std::make_shared<gcs::GcsPublisher>(
std::make_unique<GcsServerMocker::MockGcsPubSub>(redis_client_));
gcs_resource_scheduler_ = std::make_shared<gcs::GcsResourceScheduler>();
cluster_resource_scheduler_ = std::make_shared<ClusterResourceScheduler>();
gcs_resource_manager_ = std::make_shared<gcs::GcsResourceManager>(
gcs_table_storage_, gcs_resource_scheduler_->GetClusterResourceManager());
gcs_table_storage_, cluster_resource_scheduler_->GetClusterResourceManager());
ray_syncer_ = std::make_shared<ray::syncer::RaySyncer>(
io_service_, nullptr, *gcs_resource_manager_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
@ -56,7 +58,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
gcs_table_storage_,
*gcs_node_manager_,
*gcs_resource_manager_,
*gcs_resource_scheduler_,
*cluster_resource_scheduler_,
raylet_client_pool_,
*ray_syncer_);
}
@ -262,7 +264,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
std::vector<std::shared_ptr<GcsServerMocker::MockRayletClient>> raylet_clients_;
std::shared_ptr<gcs::GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<gcs::GcsResourceScheduler> gcs_resource_scheduler_;
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<GcsServerMocker::MockedGcsPlacementGroupScheduler> scheduler_;
std::vector<std::shared_ptr<gcs::GcsPlacementGroup>> success_placement_groups_
@ -1145,7 +1147,6 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestInitialize) {
ASSERT_EQ(1, bundles[placement_group->GetPlacementGroupID()].size());
ASSERT_EQ(1, bundles[placement_group->GetPlacementGroupID()][0]);
}
} // namespace ray
int main(int argc, char **argv) {