[Placement group] Placement group scheduling hangs due to creation/removal race condition (#18419)

This commit is contained in:
SangBin Cho 2021-09-09 20:39:01 -07:00 committed by GitHub
parent 688dbeb4cb
commit 7b2ed4c1f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 116 additions and 23 deletions

View file

@ -3,6 +3,8 @@ import os
import sys
import time
from random import random
try:
import pytest_timeout
except ImportError:
@ -1877,6 +1879,78 @@ def test_placement_group_gpu_unique_assigned(ray_start_cluster,
assert len(gpu_ids_res) == 4
@pytest.mark.parametrize("execution_number", range(3))
def test_placement_group_remove_stress(ray_start_cluster, execution_number):
# This test checks the race condition between remove / creation.
# This test shouldn't be flaky. If it fails on the last ray.get
# that highly likely indicates a real bug.
# It also runs 3 times to make sure the test consistently passes.
# When 999 resource quantity is used, it fails about every other time
# when the test was written.
cluster = ray_start_cluster
resource_quantity = 999
num_nodes = 5
custom_resources = {"pg_custom": resource_quantity}
# Create pg that uses 1 resource of cpu & custom resource.
num_pg = resource_quantity
# TODO(sang): Cluster setup. Remove when running in real clusters.
nodes = []
for _ in range(num_nodes):
nodes.append(
cluster.add_node(
num_cpus=3,
num_gpus=resource_quantity,
resources=custom_resources))
cluster.wait_for_nodes()
ray.init(address=cluster.address)
while not ray.is_initialized():
time.sleep(0.1)
bundles = [{"GPU": 1, "pg_custom": 1}] * num_nodes
@ray.remote(num_cpus=0, num_gpus=1, max_calls=0)
def mock_task():
time.sleep(0.1)
return True
@ray.remote(num_cpus=0)
def pg_launcher(num_pgs_to_create):
print("Creating pgs")
pgs = []
for i in range(num_pgs_to_create):
pgs.append(placement_group(bundles, strategy="STRICT_SPREAD"))
pgs_removed = []
pgs_unremoved = []
# Randomly choose placement groups to remove.
print("removing pgs")
for pg in pgs:
if random() < .5:
pgs_removed.append(pg)
else:
pgs_unremoved.append(pg)
tasks = []
# Randomly schedule tasks or actors on placement groups that
# are not removed.
for pg in pgs_unremoved:
tasks.append(mock_task.options(placement_group=pg).remote())
# Remove the rest of placement groups.
for pg in pgs_removed:
remove_placement_group(pg)
ray.get(tasks)
# Since placement groups are scheduled, remove them.
for pg in pgs_unremoved:
remove_placement_group(pg)
pg_launchers = []
for _ in range(3):
pg_launchers.append(pg_launcher.remote(num_pg // 3))
ray.get(pg_launchers, timeout=120)
def test_placement_group_status_no_bundle_demand(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)

View file

@ -71,9 +71,7 @@ class PlacementGroup:
f"{len(self.bundle_cache)}")
return bundle_reservation_check.options(
placement_group=self,
placement_group_bundle_index=0,
resources={
placement_group=self, resources={
"bundle": 0.001
}).remote(self)

View file

@ -60,8 +60,9 @@ for _ in range(repeat):
total_removing_time += (end - start)
# Validate the correctness.
assert ray.cluster_resources()["GPU"] == num_nodes * resource_quantity
assert ray.cluster_resources()["pg_custom"] == num_nodes * resource_quantity
time.sleep(30)
assert ray.available_resources()["GPU"] == num_nodes * resource_quantity
assert ray.available_resources()["pg_custom"] == num_nodes * resource_quantity
# Scenario 2:
@ -144,8 +145,9 @@ for i in range(3):
pg_launcher.remote(pre_created_pgs, num_pgs_to_create // 3))
ray.get(pg_launchers)
assert ray.cluster_resources()["GPU"] == num_nodes * resource_quantity
assert ray.cluster_resources()["pg_custom"] == num_nodes * resource_quantity
time.sleep(30)
assert ray.available_resources()["GPU"] == num_nodes * resource_quantity
assert ray.available_resources()["pg_custom"] == num_nodes * resource_quantity
ray.shutdown()
print("Avg placement group creating time: "
f"{total_creating_time / total_trial * 1000} ms")

View file

@ -231,6 +231,7 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed(
state == rpc::PlacementGroupTableData::PENDING ||
state == rpc::PlacementGroupTableData::REMOVED)
<< "State: " << state;
if (state == rpc::PlacementGroupTableData::RESCHEDULING) {
// NOTE: If a node is dead, the placement group scheduler should try to recover the
// group by rescheduling the bundles of the dead node. This should have higher
@ -274,25 +275,37 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
}
void GcsPlacementGroupManager::SchedulePendingPlacementGroups() {
// Update the placement group load to report load information to the autoscaler.
if (pending_placement_groups_.empty() || IsSchedulingInProgress()) {
if (pending_placement_groups_.empty()) {
RAY_LOG(DEBUG) << "No additional placement groups to schedule. Stop scheduling.";
return;
}
const auto placement_group = pending_placement_groups_.front();
const auto &placement_group_id = placement_group->GetPlacementGroupID();
// Do not reschedule if the placement group has removed already.
if (registered_placement_groups_.contains(placement_group_id)) {
MarkSchedulingStarted(placement_group_id);
gcs_placement_group_scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationFailed(std::move(placement_group));
},
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationSuccess(std::move(placement_group));
});
if (IsSchedulingInProgress()) {
RAY_LOG(DEBUG) << "Placement group scheduling is still in progress. New placement "
"groups will be scheduled after the current scheduling is done.";
return;
}
bool is_new_placement_group_scheduled = false;
while (!pending_placement_groups_.empty() && !is_new_placement_group_scheduled) {
const auto placement_group = pending_placement_groups_.front();
pending_placement_groups_.pop_front();
const auto &placement_group_id = placement_group->GetPlacementGroupID();
// Do not reschedule if the placement group has removed already.
if (registered_placement_groups_.contains(placement_group_id)) {
MarkSchedulingStarted(placement_group_id);
gcs_placement_group_scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationFailed(std::move(placement_group));
},
[this](std::shared_ptr<GcsPlacementGroup> placement_group) {
OnPlacementGroupCreationSuccess(std::move(placement_group));
});
is_new_placement_group_scheduled = true;
}
// If the placement group is not registered == removed.
}
pending_placement_groups_.pop_front();
}
void GcsPlacementGroupManager::HandleCreatePlacementGroup(
@ -617,6 +630,10 @@ void GcsPlacementGroupManager::CollectStats() const {
void GcsPlacementGroupManager::Tick() {
UpdatePlacementGroupLoad();
// To avoid scheduling exhaution in some race conditions.
// Note that we don't currently have a known race condition that requires this, but we
// added as a safety check. https://github.com/ray-project/ray/pull/18419
SchedulePendingPlacementGroups();
execute_after(io_context_, [this] { Tick(); }, 1000 /* milliseconds */);
}

View file

@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
#include <memory>
#include "gtest/gtest.h"