mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[GCS]GCS node manager rename GetNode to GetAliveNode (#12781)
This commit is contained in:
parent
5f04ade6ef
commit
c22990a537
5 changed files with 14 additions and 13 deletions
|
@ -48,11 +48,11 @@ void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) {
|
|||
// Select a node to lease worker for the actor.
|
||||
std::shared_ptr<rpc::GcsNodeInfo> node;
|
||||
|
||||
// If an actor is non-detached and has resource requirements, We will try to schedule it
|
||||
// on the same node as the owner if possible.
|
||||
// If an actor has resource requirements, we will try to schedule it on the same node as
|
||||
// the owner if possible.
|
||||
const auto &task_spec = actor->GetCreationTaskSpecification();
|
||||
if (!actor->IsDetached() && !task_spec.GetRequiredResources().IsEmpty()) {
|
||||
auto maybe_node = gcs_node_manager_.GetNode(actor->GetOwnerNodeID());
|
||||
if (!task_spec.GetRequiredResources().IsEmpty()) {
|
||||
auto maybe_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID());
|
||||
node = maybe_node.has_value() ? maybe_node.value() : SelectNodeRandomly();
|
||||
} else {
|
||||
node = SelectNodeRandomly();
|
||||
|
@ -295,7 +295,7 @@ void GcsActorScheduler::HandleWorkerLeasedReply(
|
|||
// node, and then try again on the new node.
|
||||
RAY_CHECK(!retry_at_raylet_address.raylet_id().empty());
|
||||
auto spill_back_node_id = NodeID::FromBinary(retry_at_raylet_address.raylet_id());
|
||||
auto maybe_spill_back_node = gcs_node_manager_.GetNode(spill_back_node_id);
|
||||
auto maybe_spill_back_node = gcs_node_manager_.GetAliveNode(spill_back_node_id);
|
||||
if (maybe_spill_back_node.has_value()) {
|
||||
auto spill_back_node = maybe_spill_back_node.value();
|
||||
actor->UpdateAddress(retry_at_raylet_address);
|
||||
|
|
|
@ -322,7 +322,7 @@ void GcsNodeManager::UpdateNodeResourceUsage(
|
|||
}
|
||||
}
|
||||
|
||||
absl::optional<std::shared_ptr<rpc::GcsNodeInfo>> GcsNodeManager::GetNode(
|
||||
absl::optional<std::shared_ptr<rpc::GcsNodeInfo>> GcsNodeManager::GetAliveNode(
|
||||
const ray::NodeID &node_id) const {
|
||||
auto iter = alive_nodes_.find(node_id);
|
||||
if (iter == alive_nodes_.end()) {
|
||||
|
|
|
@ -127,7 +127,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
|
|||
///
|
||||
/// \param node_id The id of the node.
|
||||
/// \return the node if it is alive. Optional empty value if it is not alive.
|
||||
absl::optional<std::shared_ptr<rpc::GcsNodeInfo>> GetNode(const NodeID &node_id) const;
|
||||
absl::optional<std::shared_ptr<rpc::GcsNodeInfo>> GetAliveNode(
|
||||
const NodeID &node_id) const;
|
||||
|
||||
/// Get all alive nodes.
|
||||
///
|
||||
|
|
|
@ -273,7 +273,7 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
|
|||
lease_status_tracker->MarkPreparePhaseStarted(node_id, bundle);
|
||||
// TODO(sang): The callback might not be called at all if nodes are dead. We should
|
||||
// handle this case properly.
|
||||
PrepareResources(bundle, gcs_node_manager_.GetNode(node_id),
|
||||
PrepareResources(bundle, gcs_node_manager_.GetAliveNode(node_id),
|
||||
[this, bundle, node_id, lease_status_tracker, failure_callback,
|
||||
success_callback](const Status &status) {
|
||||
lease_status_tracker->MarkPrepareRequestReturned(node_id, bundle,
|
||||
|
@ -411,7 +411,7 @@ void GcsPlacementGroupScheduler::CommitAllBundles(
|
|||
lease_status_tracker->MarkCommitPhaseStarted();
|
||||
for (const auto &bundle_to_commit : *prepared_bundle_locations) {
|
||||
const auto &node_id = bundle_to_commit.second.first;
|
||||
const auto &node = gcs_node_manager_.GetNode(node_id);
|
||||
const auto &node = gcs_node_manager_.GetAliveNode(node_id);
|
||||
const auto &bundle = bundle_to_commit.second.second;
|
||||
|
||||
auto commit_resources_callback = [this, lease_status_tracker, bundle, node_id,
|
||||
|
@ -623,7 +623,7 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources(
|
|||
for (const auto &iter : *(leasing_bundle_locations)) {
|
||||
auto &bundle_spec = iter.second.second;
|
||||
auto &node_id = iter.second.first;
|
||||
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id));
|
||||
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetAliveNode(node_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -642,7 +642,7 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
|
|||
for (const auto &iter : *(committed_bundle_locations)) {
|
||||
auto &bundle_spec = iter.second.second;
|
||||
auto &node_id = iter.second.first;
|
||||
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id));
|
||||
CancelResourceReserve(bundle_spec, gcs_node_manager_.GetAliveNode(node_id));
|
||||
}
|
||||
committed_bundle_location_index_.Erase(placement_group_id);
|
||||
}
|
||||
|
|
|
@ -41,10 +41,10 @@ TEST_F(GcsNodeManagerTest, TestManagement) {
|
|||
auto node_id = NodeID::FromBinary(node->node_id());
|
||||
|
||||
node_manager.AddNode(node);
|
||||
ASSERT_EQ(node, node_manager.GetNode(node_id).value());
|
||||
ASSERT_EQ(node, node_manager.GetAliveNode(node_id).value());
|
||||
|
||||
node_manager.RemoveNode(node_id);
|
||||
ASSERT_TRUE(!node_manager.GetNode(node_id).has_value());
|
||||
ASSERT_TRUE(!node_manager.GetAliveNode(node_id).has_value());
|
||||
}
|
||||
|
||||
TEST_F(GcsNodeManagerTest, TestListener) {
|
||||
|
|
Loading…
Add table
Reference in a new issue