[core] Fix wrong local resource view in raylet (#19911)

## Why are these changes needed?
When gcs broad cast node resource change, raylet will use that to update local node as well which will lead to local node instance and nodes_ inconsistent.

1. local node has used all some pg resource
2. gcs broadcast node resources
3. local node now have resources
4. scheduler picks local node
5. local node can't schedule the task
6. since there is only one type of job and local nodes hasn't finished any tasks so it'll go to step 4 ==> hangs

## Related issue number
#19438
This commit is contained in:
Yi Cheng 2021-11-01 19:52:03 -07:00 committed by GitHub
parent c48d86e469
commit a907168184
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 19 deletions

View file

@ -646,5 +646,57 @@ def test_placement_group_removal_leak_regression(ray_start_cluster):
wait_for_condition(check_bundle_leaks)
def test_placement_group_local_resource_view(monkeypatch, ray_start_cluster):
"""Please refer to https://github.com/ray-project/ray/pull/19911
for more details.
"""
with monkeypatch.context() as m:
# Increase broadcasting interval so that node resource will arrive
# at raylet after local resource all being allocated.
m.setenv("RAY_raylet_report_resources_period_milliseconds", "2000")
m.setenv("RAY_grpc_based_resource_broadcast", "true")
cluster = ray_start_cluster
cluster.add_node(num_cpus=16, object_store_memory=1e9)
cluster.wait_for_nodes()
cluster.add_node(num_cpus=16, num_gpus=1)
cluster.wait_for_nodes()
NUM_CPU_BUNDLES = 30
@ray.remote(num_cpus=1)
class Worker(object):
def __init__(self, i):
self.i = i
def work(self):
time.sleep(0.1)
print("work ", self.i)
@ray.remote(num_cpus=1, num_gpus=1)
class Trainer(object):
def __init__(self, i):
self.i = i
def train(self):
time.sleep(0.2)
print("train ", self.i)
ray.init(address="auto")
bundles = [{"CPU": 1, "GPU": 1}]
bundles += [{"CPU": 1} for _ in range(NUM_CPU_BUNDLES)]
pg = placement_group(bundles, strategy="PACK")
ray.get(pg.ready())
# Local resource will be allocated and here we are to ensure
# local view is consistent and node resouce updates are discarded
workers = [
Worker.options(placement_group=pg).remote(i)
for i in range(NUM_CPU_BUNDLES)
]
trainer = Trainer.options(placement_group=pg).remote(0)
ray.get([workers[i].work.remote() for i in range(NUM_CPU_BUNDLES)])
ray.get(trainer.train.remote())
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -58,7 +58,7 @@ void GcsResourceManager::HandleUpdateResources(
const rpc::UpdateResourcesRequest &request, rpc::UpdateResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(INFO) << "Updating resources, node id = " << node_id;
RAY_LOG(DEBUG) << "Updating resources, node id = " << node_id;
auto changed_resources = std::make_shared<std::unordered_map<std::string, double>>();
for (const auto &entry : request.resources()) {
changed_resources->emplace(entry.first, entry.second.resource_capacity());

View file

@ -891,6 +891,9 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] received callback from node id " << node_id
<< " with created or updated resources: "
<< createUpdatedResources.ToString() << ". Updating resource map.";
if (node_id == self_node_id_) {
return;
}
// Update local_available_resources_ and SchedulingResources
for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) {
@ -900,12 +903,8 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id,
new_resource_capacity);
}
RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map.";
if (node_id == self_node_id_) {
// The resource update is on the local node, check if we can reschedule tasks.
cluster_task_manager_->ScheduleAndDispatchTasks();
}
}
void NodeManager::ResourceDeleted(const NodeID &node_id,
const std::vector<std::string> &resource_names) {
@ -1474,39 +1473,44 @@ void NodeManager::HandleUpdateResourceUsage(
rpc::SendReplyCallback send_reply_callback) {
rpc::ResourceUsageBroadcastData resource_usage_batch;
resource_usage_batch.ParseFromString(request.serialized_resource_usage_batch());
if (resource_usage_batch.seq_no() != next_resource_seq_no_) {
// When next_resource_seq_no_ == 0 it means it just started.
// TODO: Fetch a snapshot from gcs for lightweight resource broadcasting
if (next_resource_seq_no_ != 0 &&
resource_usage_batch.seq_no() != next_resource_seq_no_) {
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
// pull a full resource "snapshot" from gcs to make sure our state doesn't
// diverge from GCS.
RAY_LOG(WARNING)
<< "Raylet may have missed a resource broadcast. This either means that GCS has "
"restarted, the network is heavily congested and is dropping, reordering, or "
"duplicating packets. Expected seq#: "
<< next_resource_seq_no_ << ", but got: " << resource_usage_batch.seq_no() << ".";
// TODO (Alex): Ideally we would be really robust, and potentially eagerly
// pull a full resource "snapshot" from gcs to make sure our state doesn't
// diverge from GCS.
if (resource_usage_batch.seq_no() < next_resource_seq_no_) {
RAY_LOG(WARNING) << "Discard the the resource update since local version is newer";
return;
}
}
next_resource_seq_no_ = resource_usage_batch.seq_no() + 1;
for (const auto &resource_change_or_data : resource_usage_batch.batch()) {
if (resource_change_or_data.has_data()) {
const auto &resource_usage = resource_change_or_data.data();
const NodeID &node_id = NodeID::FromBinary(resource_usage.node_id());
if (node_id == self_node_id_) {
auto node_id = NodeID::FromBinary(resource_usage.node_id());
// Skip messages from self.
continue;
}
if (node_id != self_node_id_) {
UpdateResourceUsage(node_id, resource_usage);
}
} else if (resource_change_or_data.has_change()) {
const auto &resource_notification = resource_change_or_data.change();
auto id = NodeID::FromBinary(resource_notification.node_id());
auto node_id = NodeID::FromBinary(resource_notification.node_id());
if (resource_notification.updated_resources_size() != 0) {
ResourceSet resource_set(
MapFromProtobuf(resource_notification.updated_resources()));
ResourceCreateUpdated(id, resource_set);
ResourceCreateUpdated(node_id, resource_set);
}
if (resource_notification.deleted_resources_size() != 0) {
ResourceDeleted(id,
ResourceDeleted(node_id,
VectorFromProtobuf(resource_notification.deleted_resources()));
}
}