[GCS]Fix lease worker leak bug when gcs server restarts (#9315)

* add part code

* fix compile bug

* fix review comments

* fix review comments

* fix review comments

* fix review comments

* fix review comment

* fix ut bug

* fix lint error

* fix review comment

* fix review comments

* add testcase

* add testcase

* fix bug

* fix review comments

* fix review comment

* fix review comment

* refine comments

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
This commit is contained in:
fangfengbin 2020-07-18 15:49:41 +08:00 committed by GitHub
parent 4c18463fb7
commit b12b8e1324
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 231 additions and 1 deletions

View file

@ -112,6 +112,12 @@ class MockRayletClient : public WorkerLeaseInterface {
return Status::OK();
}
ray::Status ReleaseUnusedWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) override {
return Status::NotImplemented("ReleaseUnusedWorkers is not supported.");
}
ray::Status CancelWorkerLease(
const TaskID &task_id,
const rpc::ClientCallback<rpc::CancelWorkerLeaseReply> &callback) override {

View file

@ -760,6 +760,7 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "Loading initial data.";
auto callback = [this,
done](const std::unordered_map<ActorID, ActorTableData> &result) {
std::unordered_map<ClientID, std::vector<WorkerID>> node_to_workers;
for (auto &item : result) {
if (item.second.state() != ray::rpc::ActorTableData::DEAD) {
auto actor = std::make_shared<GcsActor>(item.second);
@ -779,9 +780,19 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) {
worker_client_factory_(actor->GetOwnerAddress());
workers.emplace(actor->GetOwnerID(), Owner(std::move(client)));
}
if (!actor->GetWorkerID().IsNil()) {
RAY_CHECK(!actor->GetNodeID().IsNil());
node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID());
}
}
}
// Notify raylets to release unused workers.
if (RayConfig::instance().gcs_actor_service_enabled()) {
gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers);
}
RAY_LOG(DEBUG) << "The number of registered actors is " << registered_actors_.size()
<< ", and the number of created actors is " << created_actors_.size();
for (auto &item : registered_actors_) {

View file

@ -154,6 +154,46 @@ ActorID GcsActorScheduler::CancelOnWorker(const ClientID &node_id,
return assigned_actor_id;
}
void GcsActorScheduler::ReleaseUnusedWorkers(
const std::unordered_map<ClientID, std::vector<WorkerID>> &node_to_workers) {
// The purpose of this function is to release leased workers that may be leaked.
// When GCS restarts, it doesn't know which workers it has leased in the previous
// lifecycle. In this case, GCS will send a list of worker ids that are still needed.
// And Raylet will release other leased workers.
// If the node is dead, there is no need to send the request of release unused
// workers.
const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
for (const auto &alive_node : alive_nodes) {
const auto &node_id = alive_node.first;
nodes_of_releasing_unused_workers_.insert(node_id);
rpc::Address address;
address.set_raylet_id(alive_node.second->node_id());
address.set_ip_address(alive_node.second->node_manager_address());
address.set_port(alive_node.second->node_manager_port());
auto lease_client = GetOrConnectLeaseClient(address);
auto release_unused_workers_callback =
[this, node_id](const Status &status,
const rpc::ReleaseUnusedWorkersReply &reply) {
nodes_of_releasing_unused_workers_.erase(node_id);
};
auto iter = node_to_workers.find(alive_node.first);
// When GCS restarts, the reply of RequestWorkerLease may not be processed, so some
// nodes do not have leased workers. In this case, GCS will send an empty list.
auto workers_in_use =
iter != node_to_workers.end() ? iter->second : std::vector<WorkerID>{};
const auto &status = lease_client->ReleaseUnusedWorkers(
workers_in_use, release_unused_workers_callback);
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to send ReleaseUnusedWorkers request to raylet because "
"raylet may be dead, node id: "
<< node_id << ", status: " << status.ToString();
nodes_of_releasing_unused_workers_.erase(node_id);
}
}
}
void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
std::shared_ptr<rpc::GcsNodeInfo> node) {
RAY_CHECK(actor && node);
@ -162,6 +202,13 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
RAY_LOG(INFO) << "Start leasing worker from node " << node_id << " for actor "
<< actor->GetActorID();
// We need to ensure that the RequestWorkerLease won't be sent before the reply of
// ReleaseUnusedWorkers is returned.
if (nodes_of_releasing_unused_workers_.contains(node_id)) {
RetryLeasingWorkerFromNode(actor, node);
return;
}
rpc::Address remote_address;
remote_address.set_raylet_id(node->node_id());
remote_address.set_ip_address(node->node_manager_address());

View file

@ -68,6 +68,12 @@ class GcsActorSchedulerInterface {
/// \return ID of actor associated with the specified node id and worker id.
virtual ActorID CancelOnWorker(const ClientID &node_id, const WorkerID &worker_id) = 0;
/// Notify raylets to release unused workers.
///
/// \param node_to_workers Workers used by each node.
virtual void ReleaseUnusedWorkers(
const std::unordered_map<ClientID, std::vector<WorkerID>> &node_to_workers) = 0;
virtual ~GcsActorSchedulerInterface() {}
};
@ -132,6 +138,12 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// \return ID of actor associated with the specified node id and worker id.
ActorID CancelOnWorker(const ClientID &node_id, const WorkerID &worker_id) override;
/// Notify raylets to release unused workers.
///
/// \param node_to_workers Workers used by each node.
void ReleaseUnusedWorkers(const std::unordered_map<ClientID, std::vector<WorkerID>>
&node_to_workers) override;
protected:
/// The GcsLeasedWorker is kind of abstraction of remote leased worker inside raylet. It
/// contains the address of remote leased worker as well as the leased resources and the
@ -285,6 +297,8 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
LeaseClientFactoryFn lease_client_factory_;
/// Factory for producing new core worker clients.
rpc::ClientFactoryFn client_factory_;
/// The nodes which are releasing unused workers.
absl::flat_hash_set<ClientID> nodes_of_releasing_unused_workers_;
};
} // namespace gcs

View file

@ -31,6 +31,8 @@ class MockActorScheduler : public gcs::GcsActorSchedulerInterface {
void Schedule(std::shared_ptr<gcs::GcsActor> actor) { actors.push_back(actor); }
void Reschedule(std::shared_ptr<gcs::GcsActor> actor) {}
void ReleaseUnusedWorkers(
const std::unordered_map<ClientID, std::vector<WorkerID>> &node_to_workers) {}
MOCK_METHOD1(CancelOnNode, std::vector<ActorID>(const ClientID &node_id));
MOCK_METHOD2(CancelOnWorker,

View file

@ -467,6 +467,42 @@ TEST_F(GcsActorSchedulerTest, TestReschedule) {
ASSERT_EQ(2, success_actors_.size());
}
TEST_F(GcsActorSchedulerTest, TestReleaseUnusedWorkers) {
// Test the case that GCS won't send `RequestWorkerLease` request to the raylet,
// if there is still a pending `ReleaseUnusedWorkers` request.
// Add a node to the cluster.
auto node = Mocker::GenNodeInfo();
auto node_id = ClientID::FromBinary(node->node_id());
gcs_node_manager_->AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());
// Send a `ReleaseUnusedWorkers` request to the node.
std::unordered_map<ClientID, std::vector<WorkerID>> node_to_workers;
node_to_workers[node_id].push_back({WorkerID::FromRandom()});
gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers);
ASSERT_EQ(1, raylet_client_->num_release_unused_workers);
ASSERT_EQ(1, raylet_client_->release_callbacks.size());
// Schedule an actor which is not tied to a worker, this should invoke the
// `LeaseWorkerFromNode` method.
// But since the `ReleaseUnusedWorkers` request hasn't finished, `GcsActorScheduler`
// won't send `RequestWorkerLease` request to node immediately. But instead, it will
// invoke the `RetryLeasingWorkerFromNode` to retry later.
auto job_id = JobID::FromInt(1);
auto create_actor_request = Mocker::GenCreateActorRequest(job_id);
auto actor = std::make_shared<gcs::GcsActor>(create_actor_request);
gcs_actor_scheduler_->Schedule(actor);
ASSERT_EQ(2, gcs_actor_scheduler_->num_retry_leasing_count_);
ASSERT_EQ(raylet_client_->num_workers_requested, 0);
// When `GcsActorScheduler` receives the `ReleaseUnusedWorkers` reply, it will send
// out the `RequestWorkerLease` request.
ASSERT_TRUE(raylet_client_->ReplyReleaseUnusedWorkers());
gcs_actor_scheduler_->TryLeaseWorkerFromNodeAgain(actor, node);
ASSERT_EQ(raylet_client_->num_workers_requested, 1);
}
} // namespace ray
int main(int argc, char **argv) {

View file

@ -76,6 +76,14 @@ struct GcsServerMocker {
return Status::OK();
}
ray::Status ReleaseUnusedWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) override {
num_release_unused_workers += 1;
release_callbacks.push_back(callback);
return Status::OK();
}
ray::Status CancelWorkerLease(
const TaskID &task_id,
const rpc::ClientCallback<rpc::CancelWorkerLeaseReply> &callback) override {
@ -127,15 +135,29 @@ struct GcsServerMocker {
}
}
bool ReplyReleaseUnusedWorkers() {
rpc::ReleaseUnusedWorkersReply reply;
if (release_callbacks.size() == 0) {
return false;
} else {
auto callback = release_callbacks.front();
callback(Status::OK(), reply);
release_callbacks.pop_front();
return true;
}
}
~MockRayletClient() {}
int num_workers_requested = 0;
int num_workers_returned = 0;
int num_workers_disconnected = 0;
int num_leases_canceled = 0;
int num_release_unused_workers = 0;
ClientID node_id = ClientID::FromRandom();
std::list<rpc::ClientCallback<rpc::RequestWorkerLeaseReply>> callbacks = {};
std::list<rpc::ClientCallback<rpc::CancelWorkerLeaseReply>> cancel_callbacks = {};
std::list<rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply>> release_callbacks = {};
};
class MockRayletResourceClient : public ResourceReserveInterface {
@ -193,11 +215,18 @@ struct GcsServerMocker {
client_factory_ = std::move(client_factory);
}
void TryLeaseWorkerFromNodeAgain(std::shared_ptr<gcs::GcsActor> actor,
std::shared_ptr<rpc::GcsNodeInfo> node) {
DoRetryLeasingWorkerFromNode(std::move(actor), std::move(node));
}
protected:
void RetryLeasingWorkerFromNode(std::shared_ptr<gcs::GcsActor> actor,
std::shared_ptr<rpc::GcsNodeInfo> node) override {
++num_retry_leasing_count_;
DoRetryLeasingWorkerFromNode(actor, node);
if (num_retry_leasing_count_ <= 1) {
DoRetryLeasingWorkerFromNode(actor, node);
}
}
void RetryCreatingActorOnWorker(std::shared_ptr<gcs::GcsActor> actor,

View file

@ -69,6 +69,13 @@ message ReturnWorkerRequest {
message ReturnWorkerReply {
}
message ReleaseUnusedWorkersRequest {
repeated bytes worker_ids_in_use = 1;
}
message ReleaseUnusedWorkersReply {
}
message CancelWorkerLeaseRequest {
// The task to cancel.
bytes task_id = 1;
@ -149,6 +156,12 @@ service NodeManagerService {
rpc RequestWorkerLease(RequestWorkerLeaseRequest) returns (RequestWorkerLeaseReply);
// Release a worker back to its raylet.
rpc ReturnWorker(ReturnWorkerRequest) returns (ReturnWorkerReply);
// This method is only used by GCS, and the purpose is to release leased workers
// that may be leaked. When GCS restarts, it doesn't know which workers it has leased
// in the previous lifecycle. In this case, GCS will send a list of worker ids that
// are still needed. And Raylet will release other leased workers.
rpc ReleaseUnusedWorkers(ReleaseUnusedWorkersRequest)
returns (ReleaseUnusedWorkersReply);
// Request resource from the raylet for a bundle.
rpc RequestResourceReserve(RequestResourceReserveRequest)
returns (RequestResourceReserveReply);

View file

@ -1841,6 +1841,33 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
send_reply_callback(status, nullptr, nullptr);
}
void NodeManager::HandleReleaseUnusedWorkers(
const rpc::ReleaseUnusedWorkersRequest &request,
rpc::ReleaseUnusedWorkersReply *reply, rpc::SendReplyCallback send_reply_callback) {
// TODO(ffbin): At present, we have not cleaned up the lease worker requests that are
// still waiting to be scheduled, which will be implemented in the next pr.
std::unordered_set<WorkerID> in_use_worker_ids;
for (int index = 0; index < request.worker_ids_in_use_size(); ++index) {
auto worker_id = WorkerID::FromBinary(request.worker_ids_in_use(index));
in_use_worker_ids.emplace(worker_id);
}
std::vector<WorkerID> unused_worker_ids;
for (auto &iter : leased_workers_) {
// We need to exclude workers used by common tasks.
// Because they are not used by GCS.
if (!iter.second->GetActorId().IsNil() && !in_use_worker_ids.count(iter.first)) {
unused_worker_ids.emplace_back(iter.first);
}
}
for (auto &iter : unused_worker_ids) {
leased_workers_.erase(iter);
}
send_reply_callback(Status::OK(), nullptr, nullptr);
}
void NodeManager::HandleCancelWorkerLease(const rpc::CancelWorkerLeaseRequest &request,
rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) {

View file

@ -618,6 +618,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
rpc::ReturnWorkerReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `ReleaseUnusedWorkers` request.
void HandleReleaseUnusedWorkers(const rpc::ReleaseUnusedWorkersRequest &request,
rpc::ReleaseUnusedWorkersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `ReturnWorker` request.
void HandleCancelWorkerLease(const rpc::CancelWorkerLeaseRequest &request,
rpc::CancelWorkerLeaseReply *reply,

View file

@ -318,6 +318,25 @@ Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worke
});
}
Status raylet::RayletClient::ReleaseUnusedWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) {
rpc::ReleaseUnusedWorkersRequest request;
for (auto &worker_id : workers_in_use) {
request.add_worker_ids_in_use(worker_id.Binary());
}
return grpc_client_->ReleaseUnusedWorkers(
request,
[callback](const Status &status, const rpc::ReleaseUnusedWorkersReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING)
<< "Error releasing workers from raylet, the raylet may have died:"
<< status;
}
callback(status, reply);
});
}
ray::Status raylet::RayletClient::CancelWorkerLease(
const TaskID &task_id,
const rpc::ClientCallback<rpc::CancelWorkerLeaseReply> &callback) {

View file

@ -73,6 +73,14 @@ class WorkerLeaseInterface {
virtual ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) = 0;
/// Notify raylets to release unused workers.
/// \param workers_in_use Workers currently in use.
/// \param callback Callback that will be called after raylet completes the release of
/// unused workers. \return ray::Status
virtual ray::Status ReleaseUnusedWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) = 0;
virtual ray::Status CancelWorkerLease(
const TaskID &task_id,
const rpc::ClientCallback<rpc::CancelWorkerLeaseReply> &callback) = 0;
@ -320,6 +328,11 @@ class RayletClient : public PinObjectsInterface,
ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) override;
/// Implements WorkerLeaseInterface.
ray::Status ReleaseUnusedWorkers(
const std::vector<WorkerID> &workers_in_use,
const rpc::ClientCallback<rpc::ReleaseUnusedWorkersReply> &callback) override;
ray::Status CancelWorkerLease(
const TaskID &task_id,
const rpc::ClientCallback<rpc::CancelWorkerLeaseReply> &callback) override;

View file

@ -86,6 +86,9 @@ class NodeManagerWorkerClient
/// Return a worker lease.
RPC_CLIENT_METHOD(NodeManagerService, ReturnWorker, grpc_client_, )
/// Release unused workers.
RPC_CLIENT_METHOD(NodeManagerService, ReleaseUnusedWorkers, grpc_client_, )
/// Cancel a pending worker lease request.
RPC_CLIENT_METHOD(NodeManagerService, CancelWorkerLease, grpc_client_, )

View file

@ -26,6 +26,7 @@ namespace rpc {
#define RAY_NODE_MANAGER_RPC_HANDLERS \
RPC_SERVICE_HANDLER(NodeManagerService, RequestWorkerLease) \
RPC_SERVICE_HANDLER(NodeManagerService, ReturnWorker) \
RPC_SERVICE_HANDLER(NodeManagerService, ReleaseUnusedWorkers) \
RPC_SERVICE_HANDLER(NodeManagerService, CancelWorkerLease) \
RPC_SERVICE_HANDLER(NodeManagerService, ForwardTask) \
RPC_SERVICE_HANDLER(NodeManagerService, PinObjectIDs) \
@ -57,6 +58,10 @@ class NodeManagerServiceHandler {
ReturnWorkerReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleReleaseUnusedWorkers(const ReleaseUnusedWorkersRequest &request,
ReleaseUnusedWorkersReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleCancelWorkerLease(const rpc::CancelWorkerLeaseRequest &request,
rpc::CancelWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) = 0;