Refactor RayletBasedActorScheduler (#16018)

This commit is contained in:
Chong-Li 2021-05-31 15:28:00 +08:00 committed by GitHub
parent 17b5f4dcaa
commit d5d0072635
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 151 additions and 218 deletions

View file

@ -1,58 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_actor_schedule_strategy.h"
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
namespace ray {
namespace gcs {
std::shared_ptr<rpc::GcsNodeInfo> GcsRandomActorScheduleStrategy::Schedule(
std::shared_ptr<GcsActor> actor) {
// Select a node to lease worker for the actor.
std::shared_ptr<rpc::GcsNodeInfo> node;
// If an actor has resource requirements, we will try to schedule it on the same node as
// the owner if possible.
const auto &task_spec = actor->GetCreationTaskSpecification();
if (!task_spec.GetRequiredResources().IsEmpty()) {
auto maybe_node = gcs_node_manager_->GetAliveNode(actor->GetOwnerNodeID());
node = maybe_node.has_value() ? maybe_node.value() : SelectNodeRandomly();
} else {
node = SelectNodeRandomly();
}
return node;
}
std::shared_ptr<rpc::GcsNodeInfo> GcsRandomActorScheduleStrategy::SelectNodeRandomly()
const {
auto &alive_nodes = gcs_node_manager_->GetAllAliveNodes();
if (alive_nodes.empty()) {
return nullptr;
}
static std::mt19937_64 gen_(
std::chrono::high_resolution_clock::now().time_since_epoch().count());
std::uniform_int_distribution<int> distribution(0, alive_nodes.size() - 1);
int key_index = distribution(gen_);
int index = 0;
auto iter = alive_nodes.begin();
for (; index != key_index && iter != alive_nodes.end(); ++index, ++iter)
;
return iter->second;
}
} // namespace gcs
} // namespace ray

View file

@ -1,71 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/common/id.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
namespace ray {
namespace gcs {
class GcsActor;
/// \class GcsActorScheduleStrategyInterface
///
/// Used for different kinds of actor scheduling strategy.
class GcsActorScheduleStrategyInterface {
public:
virtual ~GcsActorScheduleStrategyInterface() = default;
/// Select a node to schedule the actor.
///
/// \param actor The actor to be scheduled.
/// \return The selected node. If the scheduling fails, nullptr is returned.
virtual std::shared_ptr<rpc::GcsNodeInfo> Schedule(std::shared_ptr<GcsActor> actor) = 0;
};
/// \class GcsRandomActorScheduleStrategy
///
/// This strategy will select node randomly from the node pool to schedule the actor.
class GcsRandomActorScheduleStrategy : public GcsActorScheduleStrategyInterface {
public:
/// Create a GcsRandomActorScheduleStrategy
///
/// \param gcs_node_manager Node management of the cluster, which provides interfaces
/// to access the node information.
explicit GcsRandomActorScheduleStrategy(
std::shared_ptr<GcsNodeManager> gcs_node_manager)
: gcs_node_manager_(std::move(gcs_node_manager)) {}
virtual ~GcsRandomActorScheduleStrategy() = default;
/// Select a node to schedule the actor.
///
/// \param actor The actor to be scheduled.
/// \return The selected node. If the scheduling fails, nullptr is returned.
std::shared_ptr<rpc::GcsNodeInfo> Schedule(std::shared_ptr<GcsActor> actor) override;
private:
/// Select a node from alive nodes randomly.
///
/// \return The selected node. If the scheduling fails, `nullptr` is returned.
std::shared_ptr<rpc::GcsNodeInfo> SelectNodeRandomly() const;
/// The node manager.
std::shared_ptr<GcsNodeManager> gcs_node_manager_;
};
} // namespace gcs
} // namespace ray

View file

@ -30,7 +30,6 @@ GcsActorScheduler::GcsActorScheduler(
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
std::shared_ptr<GcsActorScheduleStrategyInterface> actor_schedule_strategy,
rpc::ClientFactoryFn client_factory)
: io_context_(io_context),
gcs_actor_table_(gcs_actor_table),
@ -40,7 +39,6 @@ GcsActorScheduler::GcsActorScheduler(
schedule_success_handler_(std::move(schedule_success_handler)),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()),
raylet_client_pool_(raylet_client_pool),
actor_schedule_strategy_(actor_schedule_strategy),
core_worker_clients_(client_factory) {
RAY_CHECK(schedule_failure_handler_ != nullptr && schedule_success_handler_ != nullptr);
}
@ -49,7 +47,7 @@ void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) {
RAY_CHECK(actor->GetNodeID().IsNil() && actor->GetWorkerID().IsNil());
// Select a node to lease worker for the actor.
const auto &node = actor_schedule_strategy_->Schedule(actor);
const auto &node = SelectNode(actor);
if (node == nullptr) {
// There are no available nodes to schedule the actor, so just trigger the failed
@ -232,53 +230,9 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
int backlog_size = report_worker_backlog_ ? 0 : -1;
lease_client->RequestWorkerLease(
actor->GetCreationTaskSpecification(),
[this, node_id, actor, node](const Status &status,
const rpc::RequestWorkerLeaseReply &reply) {
// If the actor is still in the leasing map and the status is ok, remove the actor
// from the leasing map and handle the reply. Otherwise, lease again, because it
// may be a network exception.
// If the actor is not in the leasing map, it means that the actor has been
// cancelled as the node is dead, just do nothing in this case because the
// gcs_actor_manager will reconstruct it again.
auto iter = node_to_actors_when_leasing_.find(node_id);
if (iter != node_to_actors_when_leasing_.end()) {
auto actor_iter = iter->second.find(actor->GetActorID());
if (actor_iter == iter->second.end()) {
// if actor is not in leasing state, it means it is cancelled.
RAY_LOG(INFO)
<< "Raylet granted a lease request, but the outstanding lease "
"request for "
<< actor->GetActorID()
<< " has been already cancelled. The response will be ignored. Job id = "
<< actor->GetActorID().JobId();
return;
}
if (status.ok()) {
if (reply.worker_address().raylet_id().empty() &&
reply.retry_at_raylet_address().raylet_id().empty()) {
// Actor creation task has been cancelled. It is triggered by `ray.kill`. If
// the number of remaining restarts of the actor is not equal to 0, GCS will
// reschedule the actor, so it return directly here.
RAY_LOG(DEBUG) << "Actor " << actor->GetActorID()
<< " creation task has been cancelled.";
return;
}
// Remove the actor from the leasing map as the reply is returned from the
// remote node.
iter->second.erase(actor_iter);
if (iter->second.empty()) {
node_to_actors_when_leasing_.erase(iter);
}
RAY_LOG(INFO) << "Finished leasing worker from " << node_id << " for actor "
<< actor->GetActorID()
<< ", job id = " << actor->GetActorID().JobId();
HandleWorkerLeasedReply(actor, reply);
} else {
RetryLeasingWorkerFromNode(actor, node);
}
}
[this, actor, node](const Status &status,
const rpc::RequestWorkerLeaseReply &reply) {
HandleWorkerLeaseReply(actor, node, status, reply);
},
backlog_size);
}
@ -307,7 +261,7 @@ void GcsActorScheduler::DoRetryLeasingWorkerFromNode(
}
}
void GcsActorScheduler::HandleWorkerLeasedReply(
void GcsActorScheduler::HandleWorkerLeaseGrantedReply(
std::shared_ptr<GcsActor> actor, const ray::rpc::RequestWorkerLeaseReply &reply) {
const auto &retry_at_raylet_address = reply.retry_at_raylet_address();
const auto &worker_address = reply.worker_address();
@ -435,7 +389,30 @@ void GcsActorScheduler::DoRetryCreatingActorOnWorker(
}
}
std::shared_ptr<rpc::GcsNodeInfo> GcsActorScheduler::SelectNodeRandomly() const {
std::shared_ptr<WorkerLeaseInterface> GcsActorScheduler::GetOrConnectLeaseClient(
const rpc::Address &raylet_address) {
return raylet_client_pool_->GetOrConnectByAddress(raylet_address);
}
std::shared_ptr<rpc::GcsNodeInfo> RayletBasedActorScheduler::SelectNode(
std::shared_ptr<GcsActor> actor) {
// Select a node to lease worker for the actor.
std::shared_ptr<rpc::GcsNodeInfo> node;
// If an actor has resource requirements, we will try to schedule it on the same node as
// the owner if possible.
const auto &task_spec = actor->GetCreationTaskSpecification();
if (!task_spec.GetRequiredResources().IsEmpty()) {
auto maybe_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID());
node = maybe_node.has_value() ? maybe_node.value() : SelectNodeRandomly();
} else {
node = SelectNodeRandomly();
}
return node;
}
std::shared_ptr<rpc::GcsNodeInfo> RayletBasedActorScheduler::SelectNodeRandomly() const {
auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
if (alive_nodes.empty()) {
return nullptr;
@ -452,9 +429,55 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsActorScheduler::SelectNodeRandomly() const
return iter->second;
}
std::shared_ptr<WorkerLeaseInterface> GcsActorScheduler::GetOrConnectLeaseClient(
const rpc::Address &raylet_address) {
return raylet_client_pool_->GetOrConnectByAddress(raylet_address);
void RayletBasedActorScheduler::HandleWorkerLeaseReply(
std::shared_ptr<GcsActor> actor, std::shared_ptr<rpc::GcsNodeInfo> node,
const Status &status, const rpc::RequestWorkerLeaseReply &reply) {
// If the actor is still in the leasing map and the status is ok, remove the actor
// from the leasing map and handle the reply. Otherwise, lease again, because it
// may be a network exception.
// If the actor is not in the leasing map, it means that the actor has been
// cancelled as the node is dead, just do nothing in this case because the
// gcs_actor_manager will reconstruct it again.
auto node_id = NodeID::FromBinary(node->node_id());
auto iter = node_to_actors_when_leasing_.find(node_id);
if (iter != node_to_actors_when_leasing_.end()) {
auto actor_iter = iter->second.find(actor->GetActorID());
if (actor_iter == iter->second.end()) {
// if actor is not in leasing state, it means it is cancelled.
RAY_LOG(INFO)
<< "Raylet granted a lease request, but the outstanding lease "
"request for "
<< actor->GetActorID()
<< " has been already cancelled. The response will be ignored. Job id = "
<< actor->GetActorID().JobId();
return;
}
if (status.ok()) {
if (reply.worker_address().raylet_id().empty() &&
reply.retry_at_raylet_address().raylet_id().empty()) {
// Actor creation task has been cancelled. It is triggered by `ray.kill`. If
// the number of remaining restarts of the actor is not equal to 0, GCS will
// reschedule the actor, so it return directly here.
RAY_LOG(DEBUG) << "Actor " << actor->GetActorID()
<< " creation task has been cancelled.";
return;
}
// Remove the actor from the leasing map as the reply is returned from the
// remote node.
iter->second.erase(actor_iter);
if (iter->second.empty()) {
node_to_actors_when_leasing_.erase(iter);
}
RAY_LOG(INFO) << "Finished leasing worker from " << node_id << " for actor "
<< actor->GetActorID()
<< ", job id = " << actor->GetActorID().JobId();
HandleWorkerLeaseGrantedReply(actor, reply);
} else {
RetryLeasingWorkerFromNode(actor, node);
}
}
}
} // namespace gcs

View file

@ -23,7 +23,6 @@
#include "ray/common/task/task_execution_spec.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/accessor.h"
#include "ray/gcs/gcs_server/gcs_actor_schedule_strategy.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/raylet_client/raylet_client.h"
@ -93,7 +92,6 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// \param schedule_success_handler Invoked when actors are created on the worker
/// successfully.
/// \param raylet_client_pool Raylet client pool to construct connections to raylets.
/// \param actor_schedule_strategy Actor schedule strategy.
/// \param client_factory Factory to create remote core worker client, default factor
/// will be used if not set.
explicit GcsActorScheduler(
@ -102,7 +100,6 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
std::shared_ptr<GcsActorScheduleStrategyInterface> actor_schedule_strategy,
rpc::ClientFactoryFn client_factory = nullptr);
virtual ~GcsActorScheduler() = default;
@ -200,6 +197,13 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
ActorID assigned_actor_id_;
};
/// Select a node to schedule the actor.
///
/// \param actor The actor to be scheduled.
/// \return The selected node. If the selection fails, nullptr is returned.
virtual std::shared_ptr<rpc::GcsNodeInfo> SelectNode(
std::shared_ptr<GcsActor> actor) = 0;
/// Lease a worker from the specified node for the specified actor.
///
/// \param actor A description of the actor to create. This object has the resource
@ -208,6 +212,17 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
void LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
std::shared_ptr<rpc::GcsNodeInfo> node);
/// Handler to process a worker lease reply.
///
/// \param actor The actor to be scheduled.
/// \param node The selected node at which a worker is to be leased.
/// \param status Status of the reply of `RequestWorkerLeaseRequest`.
/// \param reply The reply of `RequestWorkerLeaseRequest`.
virtual void HandleWorkerLeaseReply(std::shared_ptr<GcsActor> actor,
std::shared_ptr<rpc::GcsNodeInfo> node,
const Status &status,
const rpc::RequestWorkerLeaseReply &reply) = 0;
/// Retry leasing a worker from the specified node for the specified actor.
/// Make it a virtual method so that the io_context_ could be mocked out.
///
@ -230,8 +245,8 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
///
/// \param actor Contains the resources needed to lease workers from the specified node.
/// \param reply The reply of `RequestWorkerLeaseRequest`.
void HandleWorkerLeasedReply(std::shared_ptr<GcsActor> actor,
const rpc::RequestWorkerLeaseReply &reply);
void HandleWorkerLeaseGrantedReply(std::shared_ptr<GcsActor> actor,
const rpc::RequestWorkerLeaseReply &reply);
/// Create the specified actor on the specified worker.
///
@ -256,9 +271,6 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
void DoRetryCreatingActorOnWorker(std::shared_ptr<GcsActor> actor,
std::shared_ptr<GcsLeasedWorker> worker);
/// Select a node from alive nodes randomly.
std::shared_ptr<rpc::GcsNodeInfo> SelectNodeRandomly() const;
/// Get an existing lease client or connect a new one.
std::shared_ptr<WorkerLeaseInterface> GetOrConnectLeaseClient(
const rpc::Address &raylet_address);
@ -292,11 +304,43 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
absl::flat_hash_set<NodeID> nodes_of_releasing_unused_workers_;
/// The cached raylet clients used to communicate with raylet.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
/// The actor schedule strategy.
std::shared_ptr<GcsActorScheduleStrategyInterface> actor_schedule_strategy_;
/// The cached core worker clients which are used to communicate with leased worker.
rpc::CoreWorkerClientPool core_worker_clients_;
};
/// RayletBasedActorScheduler implements a random node selection, while relying on Raylets
/// for spillback scheduling.
class RayletBasedActorScheduler : public GcsActorScheduler {
public:
using GcsActorScheduler::GcsActorScheduler;
virtual ~RayletBasedActorScheduler() = default;
protected:
/// Randomly select a node from the node pool to schedule the actor.
///
/// \param actor The actor to be scheduled.
/// \return The selected node. If the selection fails, nullptr is returned.
std::shared_ptr<rpc::GcsNodeInfo> SelectNode(std::shared_ptr<GcsActor> actor) override;
/// Handler to process a worker lease reply.
/// If the worker leasing fails at the selected node, the corresponding Raylet tries to
/// reply a spillback node.
///
/// \param actor The actor to be scheduled.
/// \param node The selected node at which a worker is to be leased.
/// \param status Status of the reply of `RequestWorkerLeaseRequest`.
/// \param reply The reply of `RequestWorkerLeaseRequest`.
void HandleWorkerLeaseReply(std::shared_ptr<GcsActor> actor,
std::shared_ptr<rpc::GcsNodeInfo> node,
const Status &status,
const rpc::RequestWorkerLeaseReply &reply) override;
private:
/// A helper function to select a node from alive nodes randomly.
///
/// \return The selected node. If the selection fails, `nullptr` is returned.
std::shared_ptr<rpc::GcsNodeInfo> SelectNodeRandomly() const;
};
} // namespace gcs
} // namespace ray

View file

@ -220,9 +220,7 @@ void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) {
void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_);
auto actor_schedule_strategy =
std::make_shared<GcsRandomActorScheduleStrategy>(gcs_node_manager_);
auto scheduler = std::make_shared<GcsActorScheduler>(
auto scheduler = std::make_shared<RayletBasedActorScheduler>(
main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, gcs_pub_sub_,
/*schedule_failure_handler=*/
[this](std::shared_ptr<GcsActor> actor) {
@ -236,7 +234,7 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
[this](std::shared_ptr<GcsActor> actor) {
gcs_actor_manager_->OnActorCreationSuccess(std::move(actor));
},
raylet_client_pool_, actor_schedule_strategy,
raylet_client_pool_,
/*client_factory=*/
[this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);

View file

@ -29,26 +29,25 @@ class GcsActorSchedulerTest : public ::testing::Test {
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
gcs_node_manager_ =
std::make_shared<gcs::GcsNodeManager>(gcs_pub_sub_, gcs_table_storage_);
gcs_actor_schedule_strategy_ =
std::make_shared<gcs::GcsRandomActorScheduleStrategy>(gcs_node_manager_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
raylet_client_pool_ = std::make_shared<rpc::NodeManagerClientPool>(
[this](const rpc::Address &addr) { return raylet_client_; });
gcs_actor_scheduler_ = std::make_shared<GcsServerMocker::MockedGcsActorScheduler>(
io_service_, *gcs_actor_table_, *gcs_node_manager_, gcs_pub_sub_,
/*schedule_failure_handler=*/
[this](std::shared_ptr<gcs::GcsActor> actor) {
failure_actors_.emplace_back(std::move(actor));
},
/*schedule_success_handler=*/
[this](std::shared_ptr<gcs::GcsActor> actor) {
success_actors_.emplace_back(std::move(actor));
},
raylet_client_pool_, gcs_actor_schedule_strategy_,
/*client_factory=*/
[this](const rpc::Address &address) { return worker_client_; });
gcs_actor_scheduler_ =
std::make_shared<GcsServerMocker::MockedRayletBasedActorScheduler>(
io_service_, *gcs_actor_table_, *gcs_node_manager_, gcs_pub_sub_,
/*schedule_failure_handler=*/
[this](std::shared_ptr<gcs::GcsActor> actor) {
failure_actors_.emplace_back(std::move(actor));
},
/*schedule_success_handler=*/
[this](std::shared_ptr<gcs::GcsActor> actor) {
success_actors_.emplace_back(std::move(actor));
},
raylet_client_pool_,
/*client_factory=*/
[this](const rpc::Address &address) { return worker_client_; });
}
protected:
@ -58,8 +57,7 @@ class GcsActorSchedulerTest : public ::testing::Test {
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;
std::shared_ptr<GcsServerMocker::MockWorkerClient> worker_client_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<gcs::GcsActorScheduleStrategyInterface> gcs_actor_schedule_strategy_;
std::shared_ptr<GcsServerMocker::MockedGcsActorScheduler> gcs_actor_scheduler_;
std::shared_ptr<GcsServerMocker::MockedRayletBasedActorScheduler> gcs_actor_scheduler_;
std::vector<std::shared_ptr<gcs::GcsActor>> success_actors_;
std::vector<std::shared_ptr<gcs::GcsActor>> failure_actors_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;

View file

@ -22,7 +22,6 @@
#include "ray/common/task/task_util.h"
#include "ray/common/test_util.h"
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
#include "ray/gcs/gcs_server/gcs_actor_schedule_strategy.h"
#include "ray/gcs/gcs_server/gcs_actor_scheduler.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
@ -278,9 +277,9 @@ struct GcsServerMocker {
std::list<rpc::ClientCallback<rpc::CancelResourceReserveReply>> return_callbacks = {};
};
class MockedGcsActorScheduler : public gcs::GcsActorScheduler {
class MockedRayletBasedActorScheduler : public gcs::RayletBasedActorScheduler {
public:
using gcs::GcsActorScheduler::GcsActorScheduler;
using gcs::RayletBasedActorScheduler::RayletBasedActorScheduler;
void TryLeaseWorkerFromNodeAgain(std::shared_ptr<gcs::GcsActor> actor,
std::shared_ptr<rpc::GcsNodeInfo> node) {