[Placement Group]Support placement group request processing idempotent in raylet (#10998)

* add part code

* fix review comment

* fix review comment

* fix review comment

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin 2020-09-28 16:56:43 +08:00 committed by GitHub
parent 0765d989ae
commit 2e41a29c8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 25 deletions

View file

@ -1844,7 +1844,6 @@ void NodeManager::HandleCancelResourceReserve(
auto bundle_spec = BundleSpecification(request.bundle_spec());
RAY_LOG(INFO) << "Request to cancel reserved resource is received, "
<< bundle_spec.DebugString();
const auto &resource_set = bundle_spec.GetRequiredResources();
// Kill all workers that are currently associated with the placement group.
std::vector<std::shared_ptr<WorkerInterface>> workers_associated_with_pg;
@ -1870,29 +1869,13 @@ void NodeManager::HandleCancelResourceReserve(
KillWorker(worker);
}
// We should commit resources if it weren't because
// ReturnBundleResources requires resources to be committed when it is called.
auto it = bundle_state_map_.find(bundle_spec.BundleId());
RAY_CHECK(it != bundle_state_map_.end())
<< "Cancel requests are received to raylet although it hasn't received any prepare "
"or commit requests. This must be an anomaly.";
const auto &bundle_state = it->second;
if (bundle_state->state == CommitState::PREPARED) {
CommitBundle(cluster_resource_map_, bundle_spec);
// Return bundle resources.
if (ReturnBundleResources(bundle_spec)) {
// Call task dispatch to assign work to the released resources.
TryLocalInfeasibleTaskScheduling();
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
bundle_state_map_.erase(it);
// Return resources.
for (auto resource : resource_set.GetResourceMap()) {
local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(),
bundle_spec.Index(), resource.first);
}
cluster_resource_map_[self_node_id_].ReturnBundleResources(
bundle_spec.PlacementGroupId(), bundle_spec.Index());
send_reply_callback(Status::OK(), nullptr, nullptr);
// Call task dispatch to assign work to the released resources.
TryLocalInfeasibleTaskScheduling();
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
@ -2040,6 +2023,24 @@ void NodeManager::ProcessSetResourceRequest(
bool NodeManager::PrepareBundle(
std::unordered_map<NodeID, SchedulingResources> &resource_map,
const BundleSpecification &bundle_spec) {
// We will first delete the existing bundle to ensure idempotent.
// The reason why we do this is: after GCS restarts, placement group can be rescheduled
// directly without rolling back the operations performed before the restart.
const auto &bundle_id = bundle_spec.BundleId();
auto iter = bundle_state_map_.find(bundle_id);
if (iter != bundle_state_map_.end()) {
if (iter->second->state == CommitState::COMMITTED) {
// If the bundle state is already committed, it means that prepare request is just
// stale.
RAY_LOG(INFO) << "Duplicate prepare bundle request, skip it directly.";
return true;
} else {
// If there was a bundle in prepare state, it already locked resources, we will
// return bundle resources.
ReturnBundleResources(bundle_spec);
}
}
// TODO(sang): It is currently not idempotent because we don't retry. Make it idempotent
// once retry is implemented. If the resource map contains the local raylet, update load
// before calling policy.
@ -2053,7 +2054,6 @@ bool NodeManager::PrepareBundle(
auto bundle_state = std::make_shared<BundleState>();
if (reserve_resource_success) {
// Register states.
const auto &bundle_id = bundle_spec.BundleId();
auto it = bundle_state_map_.find(bundle_id);
// Same bundle cannot be rescheduled.
RAY_CHECK(it == bundle_state_map_.end());
@ -2081,8 +2081,13 @@ void NodeManager::CommitBundle(
const auto &bundle_id = bundle_spec.BundleId();
auto it = bundle_state_map_.find(bundle_id);
// When bundle is committed, it should've been prepared already.
// We don't need this check if commit becomes idempotent.
RAY_CHECK(it != bundle_state_map_.end());
// If GCS call `CommitBundleResources` after `CancelResourceReserve`, we will skip it
// directly.
if (it == bundle_state_map_.end()) {
RAY_LOG(INFO) << "The bundle has been cancelled. Skip it directly. Bundle info is "
<< bundle_spec.DebugString();
return;
}
const auto &bundle_state = it->second;
bundle_state->state = CommitState::COMMITTED;
const auto &acquired_resources = bundle_state->acquired_resources;
@ -3496,6 +3501,31 @@ void NodeManager::RecordMetrics() {
stats::DeadActors().Record(statistical_data.dead_actors);
}
bool NodeManager::ReturnBundleResources(const BundleSpecification &bundle_spec) {
// We should commit resources if it weren't because
// ReturnBundleResources requires resources to be committed when it is called.
auto it = bundle_state_map_.find(bundle_spec.BundleId());
if (it == bundle_state_map_.end()) {
RAY_LOG(INFO) << "Duplicate cancel request, skip it directly.";
return false;
}
const auto &bundle_state = it->second;
if (bundle_state->state == CommitState::PREPARED) {
CommitBundle(cluster_resource_map_, bundle_spec);
}
bundle_state_map_.erase(it);
// Return resources.
const auto &resource_set = bundle_spec.GetRequiredResources();
for (const auto &resource : resource_set.GetResourceMap()) {
local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(),
bundle_spec.Index(), resource.first);
}
cluster_resource_map_[self_node_id_].ReturnBundleResources(
bundle_spec.PlacementGroupId(), bundle_spec.Index());
return true;
}
} // namespace raylet
} // namespace ray

View file

@ -680,6 +680,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Whether a task is an actor creation task.
bool IsActorCreationTask(const TaskID &task_id);
/// Return back all the bundle resource.
///
/// \param bundle_spec: Specification of bundle whose resources will be returned.
/// \return Whether the resource is returned successfully.
bool ReturnBundleResources(const BundleSpecification &bundle_spec);
/// ID of this node.
NodeID self_node_id_;
boost::asio::io_service &io_service_;