[Placement Group] Fix infeasible placement group not scheduled after node is added (#21993)

It looks like existing infeasible placement group in placement group manager didn't work properly. Idk how we added this feature when we cannot pass this simple test case.

But this is what has happend;

(1) PG is not scheduleable because it is infeasible
(2) New node is added
(3) After a new node is added, placement group manager tries rescheduling all infeasible pgs.
(4) Here, when we add a new node, we didn't report resources (this seems to be very weird. We are reporting resource using a separate RPC here). So when (3) happens, pg was still unschedulable. 

This PR fixes the issue by adding the resource information when the new node is added. 

Note that in the long term, we'd like to have a separate resource path from (4). This won't be addressed in this PR.
This commit is contained in:
SangBin Cho 2022-02-02 23:44:42 +09:00 committed by GitHub
parent 9c95b9a5fa
commit 9531887590
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 1 deletions

View file

@ -20,7 +20,7 @@ from ray._private.test_utils import (
is_placement_group_removed,
convert_actor_state,
)
from ray.exceptions import RaySystemError
from ray.exceptions import RaySystemError, GetTimeoutError
from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.client.ray_client_helpers import connect_to_client_or_not
import ray.experimental.internal_kv as internal_kv
@ -687,5 +687,26 @@ def test_fractional_resources_handle_correct(ray_start_cluster):
ray.get(pg.ready(), timeout=10)
def test_infeasible_pg(ray_start_cluster):
"""Test infeasible pgs are scheduled after new nodes are added."""
cluster = ray_start_cluster
cluster.add_node(num_cpus=2)
ray.init("auto")
bundle = {"CPU": 4, "GPU": 1}
pg = placement_group([bundle], name="worker_1", strategy="STRICT_PACK")
# Placement group is infeasible.
with pytest.raises(GetTimeoutError):
ray.get(pg.ready(), timeout=3)
state = ray.util.placement_group_table()[pg.id.hex()]["stats"]["scheduling_state"]
assert state == "INFEASIBLE"
# Add a new node. PG can now be scheduled.
cluster.add_node(num_cpus=4, num_gpus=1)
assert ray.get(pg.ready(), timeout=10)
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -79,6 +79,9 @@ Raylet::Raylet(instrumented_io_context &main_service, const std::string &socket_
self_node_info_.set_node_manager_port(node_manager_.GetServerPort());
self_node_info_.set_node_manager_hostname(boost::asio::ip::host_name());
self_node_info_.set_metrics_export_port(metrics_export_port);
const auto &resource_map = node_manager_config.resource_config.GetResourceMap();
self_node_info_.mutable_resources_total()->insert(resource_map.begin(),
resource_map.end());
}
Raylet::~Raylet() {}