[core] Fix placement group GPU assignment bug (#15049)

This commit is contained in:
Alex Wu 2021-04-01 17:46:09 -07:00 committed by GitHub
parent d4c20c970b
commit f52c855704
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 139 additions and 61 deletions

View file

@ -1582,5 +1582,28 @@ def test_placement_group_gpu_set(ray_start_cluster):
assert result == [0]
def test_placement_group_gpu_assigned(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_gpus=2)
ray.init(address=cluster.address)
gpu_ids_res = set()
@ray.remote(num_gpus=1, num_cpus=0)
def f():
import os
return os.environ["CUDA_VISIBLE_DEVICES"]
pg1 = ray.util.placement_group([{"GPU": 1}])
pg2 = ray.util.placement_group([{"GPU": 1}])
assert pg1.wait(10)
assert pg2.wait(10)
gpu_ids_res.add(ray.get(f.options(placement_group=pg1).remote()))
gpu_ids_res.add(ray.get(f.options(placement_group=pg2).remote()))
assert len(gpu_ids_res) == 2
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -94,9 +94,18 @@ void NewPlacementGroupResourceManager::CommitBundle(
const auto &bundle_state = it->second;
bundle_state->state_ = CommitState::COMMITTED;
const auto &string_id_map = cluster_resource_scheduler_->GetStringIdMap();
const auto &task_resource_instances = *bundle_state->resources_;
for (const auto &resource : bundle_spec.GetFormattedResources()) {
cluster_resource_scheduler_->AddLocalResource(resource.first, resource.second);
const auto &resource_name = resource.first;
const auto &original_resource_name = GetOriginalResourceName(resource_name);
const auto &instances =
task_resource_instances.Get(original_resource_name, string_id_map);
cluster_resource_scheduler_->AddLocalResourceInstances(resource_name, instances);
}
cluster_resource_scheduler_->UpdateLocalAvailableResourcesFromResourceInstances();
}
void NewPlacementGroupResourceManager::ReturnBundle(

View file

@ -43,11 +43,11 @@ class NewPlacementGroupResourceManagerTest : public ::testing::Test {
ASSERT_TRUE(cluster_resource_scheduler_->IsAvailableResourceEmpty(resource));
}
void CheckRemainingResourceCorrect(NodeResourceInstances &node_resource_instances) {
void CheckRemainingResourceCorrect(NodeResources &node_resources) {
const auto cluster_resource_scheduler_ =
new_placement_group_resource_manager_->GetResourceScheduler();
ASSERT_TRUE(cluster_resource_scheduler_->GetLocalResources() ==
node_resource_instances);
auto local_node_resource = cluster_resource_scheduler_->GetLocalNodeResources();
ASSERT_TRUE(local_node_resource == node_resources);
}
};
@ -102,8 +102,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewCommitBundleResource) {
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
unit_resource, resource_instances));
auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources();
CheckRemainingResourceCorrect(remaining_resouece_instance);
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
}
TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) {
@ -122,8 +123,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewReturnBundleResource) {
/// 5. check remaining resources is correct.
auto remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", unit_resource);
auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources();
CheckRemainingResourceCorrect(remaining_resouece_instance);
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
}
TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndReturn) {
@ -154,8 +156,10 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
init_unit_resource, resource_instances));
auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources();
CheckRemainingResourceCorrect(remaining_resouece_instance);
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
/// 5. return second bundle.
new_placement_group_resource_manager_->ReturnBundle(second_bundle_spec);
/// 6. check remaining resources is correct after return second bundle.
@ -166,16 +170,16 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewMultipleBundlesCommitAndRetu
std::make_shared<ClusterResourceScheduler>("remaining", remaining_resources);
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
{{"CPU_group_" + group_id.Hex(), 1.0}, {"CPU", 1.0}}, resource_instances));
remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources();
CheckRemainingResourceCorrect(remaining_resouece_instance);
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
/// 7. return first bundel.
new_placement_group_resource_manager_->ReturnBundle(first_bundle_spec);
/// 8. check remaining resources is correct after all bundle returned.
remaining_resources = {{"CPU", 2.0}};
remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", remaining_resources);
remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources();
CheckRemainingResourceCorrect(remaining_resouece_instance);
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
}
TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare) {
@ -200,8 +204,9 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithMultiPrepare)
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
unit_resource, resource_instances));
auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources();
CheckRemainingResourceCorrect(remaining_resouece_instance);
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
}
TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder) {
@ -229,15 +234,16 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(remaining_resource_scheduler->AllocateLocalTaskResources(
unit_resource, resource_instances));
auto remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources();
CheckRemainingResourceCorrect(remaining_resouece_instance);
auto remaining_resource_instance =
remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
// 5. prepare bundle -> commit bundle -> commit bundle.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
new_placement_group_resource_manager_->CommitBundle(bundle_spec);
new_placement_group_resource_manager_->CommitBundle(bundle_spec);
// 6. check remaining resources is correct.
CheckRemainingResourceCorrect(remaining_resouece_instance);
CheckRemainingResourceCorrect(remaining_resource_instance);
new_placement_group_resource_manager_->ReturnBundle(bundle_spec);
// 7. prepare bundle -> return bundle -> commit bundle.
ASSERT_TRUE(new_placement_group_resource_manager_->PrepareBundle(bundle_spec));
@ -246,8 +252,8 @@ TEST_F(NewPlacementGroupResourceManagerTest, TestNewIdempotencyWithRandomOrder)
// 8. check remaining resources is correct.
remaining_resource_scheduler =
std::make_shared<ClusterResourceScheduler>("remaining", available_resource);
remaining_resouece_instance = remaining_resource_scheduler->GetLocalResources();
CheckRemainingResourceCorrect(remaining_resouece_instance);
remaining_resource_instance = remaining_resource_scheduler->GetLocalNodeResources();
CheckRemainingResourceCorrect(remaining_resource_instance);
}
} // namespace ray

View file

@ -94,6 +94,24 @@ TaskRequest ResourceMapToTaskRequest(
return task_request;
}
const std::vector<FixedPoint> &TaskResourceInstances::Get(
const std::string &resource_name, const StringIdMap &string_id_map) const {
if (ray::kCPU_ResourceLabel == resource_name) {
return predefined_resources[CPU];
} else if (ray::kGPU_ResourceLabel == resource_name) {
return predefined_resources[GPU];
} else if (ray::kObjectStoreMemory_ResourceLabel == resource_name) {
return predefined_resources[OBJECT_STORE_MEM];
} else if (ray::kMemory_ResourceLabel == resource_name) {
return predefined_resources[MEM];
} else {
int64_t resource_id = string_id_map.Get(resource_name);
auto it = custom_resources.find(resource_id);
RAY_CHECK(it != custom_resources.end());
return it->second;
}
}
TaskRequest TaskResourceInstances::ToTaskRequest() const {
TaskRequest task_req;
task_req.predefined_resources.resize(PredefinedResources_MAX);

View file

@ -97,6 +97,9 @@ class TaskResourceInstances {
/// The list of instances of each custom resource allocated to a task.
absl::flat_hash_map<int64_t, std::vector<FixedPoint>> custom_resources;
bool operator==(const TaskResourceInstances &other);
/// Get instances based on the string.
const std::vector<FixedPoint> &Get(const std::string &resource_name,
const StringIdMap &string_id_map) const;
/// For each resource of this request aggregate its instances.
TaskRequest ToTaskRequest() const;
/// Get CPU instances only.

View file

@ -432,36 +432,41 @@ const NodeResources &ClusterResourceScheduler::GetLocalNodeResources() const {
return node_it->second.GetLocalView();
}
int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); }
int64_t ClusterResourceScheduler::NumNodes() const { return nodes_.size(); }
void ClusterResourceScheduler::AddLocalResource(const std::string &resource_name,
double resource_total) {
string_to_int_map_.Insert(resource_name);
int64_t resource_id = string_to_int_map_.Get(resource_name);
const StringIdMap &ClusterResourceScheduler::GetStringIdMap() const {
return string_to_int_map_;
}
if (local_resources_.custom_resources.contains(resource_id)) {
FixedPoint total(resource_total);
auto &instances = local_resources_.custom_resources[resource_id];
instances.total[0] += total;
instances.available[0] += total;
auto local_node_it = nodes_.find(local_node_id_);
RAY_CHECK(local_node_it != nodes_.end());
auto &capacity =
local_node_it->second.GetMutableLocalView()->custom_resources[resource_id];
capacity.available += total;
capacity.total += total;
void ClusterResourceScheduler::AddLocalResourceInstances(
const std::string &resource_name, const std::vector<FixedPoint> &instances) {
ResourceInstanceCapacities *node_instances;
local_resources_.predefined_resources.resize(PredefinedResources_MAX);
if (kCPU_ResourceLabel == resource_name) {
node_instances = &local_resources_.predefined_resources[CPU];
} else if (kGPU_ResourceLabel == resource_name) {
node_instances = &local_resources_.predefined_resources[GPU];
} else if (kObjectStoreMemory_ResourceLabel == resource_name) {
node_instances = &local_resources_.predefined_resources[OBJECT_STORE_MEM];
} else if (kMemory_ResourceLabel == resource_name) {
node_instances = &local_resources_.predefined_resources[MEM];
} else {
ResourceInstanceCapacities capacity;
capacity.total.resize(1);
capacity.total[0] = resource_total;
capacity.available.resize(1);
capacity.available[0] = resource_total;
local_resources_.custom_resources.emplace(resource_id, capacity);
std::string node_id_string = string_to_int_map_.Get(local_node_id_);
RAY_CHECK(string_to_int_map_.Get(node_id_string) == local_node_id_);
UpdateResourceCapacity(node_id_string, resource_name, resource_total);
UpdateLocalAvailableResourcesFromResourceInstances();
string_to_int_map_.Insert(resource_name);
int64_t resource_id = string_to_int_map_.Get(resource_name);
node_instances = &local_resources_.custom_resources[resource_id];
}
if (node_instances->total.size() < instances.size()) {
node_instances->total.resize(instances.size());
node_instances->available.resize(instances.size());
}
for (size_t i = 0; i < instances.size(); i++) {
node_instances->available[i] += instances[i];
node_instances->total[i] =
std::max(node_instances->total[i], node_instances->available[i]);
}
UpdateLocalAvailableResourcesFromResourceInstances();
}
bool ClusterResourceScheduler::IsAvailableResourceEmpty(
@ -854,14 +859,17 @@ void ClusterResourceScheduler::UpdateLocalAvailableResourcesFromResourceInstance
}
}
for (auto &custom_resource : local_view->custom_resources) {
auto it = local_resources_.custom_resources.find(custom_resource.first);
if (it != local_resources_.custom_resources.end()) {
custom_resource.second.available = 0;
for (const auto &available : it->second.available) {
custom_resource.second.available += available;
}
}
for (auto &custom_resource : local_resources_.custom_resources) {
int64_t resource_name = custom_resource.first;
auto &instances = custom_resource.second;
FixedPoint available = std::accumulate(instances.available.begin(),
instances.available.end(), FixedPoint());
FixedPoint total =
std::accumulate(instances.total.begin(), instances.total.end(), FixedPoint());
local_view->custom_resources[resource_name].available = available;
local_view->custom_resources[resource_name].total = total;
}
}

View file

@ -191,13 +191,17 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
const NodeResources &GetLocalNodeResources() const;
/// Get number of nodes in the cluster.
int64_t NumNodes();
int64_t NumNodes() const;
/// Temporarily get the StringIDMap.
const StringIdMap &GetStringIdMap() const;
/// Add a local resource that is available.
///
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void AddLocalResource(const std::string &resource_name, double resource_total);
void AddLocalResourceInstances(const std::string &resource_name,
const std::vector<FixedPoint> &instances);
/// Check whether the available resources are empty.
///

View file

@ -22,6 +22,13 @@ class ClusterResourceSchedulerInterface {
public:
virtual ~ClusterResourceSchedulerInterface() = default;
/// Add a local resource that is available.
///
/// \param resource_name: Resource which we want to update.
/// \param resource_total: New capacity of the resource.
void AddLocalResourceInstances(const std::string &resource_name,
std::vector<FixedPoint> instances);
/// Remove node from the cluster data structure. This happens
/// when a node fails or it is removed from the cluster.
///

View file

@ -1233,18 +1233,18 @@ TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) {
task_request, false, false, &t, &is_infeasible);
ASSERT_TRUE(result.empty());
resource_scheduler.AddLocalResource("custom123", 5);
resource_scheduler.AddLocalResourceInstances("custom123", {0., 1.0, 1.0});
result = resource_scheduler.GetBestSchedulableNode(task_request, false, false, &t,
&is_infeasible);
ASSERT_FALSE(result.empty());
ASSERT_FALSE(result.empty()) << resource_scheduler.DebugString();
task_request["custom123"] = 6;
task_request["custom123"] = 3;
result = resource_scheduler.GetBestSchedulableNode(task_request, false, false, &t,
&is_infeasible);
ASSERT_TRUE(result.empty());
resource_scheduler.AddLocalResource("custom123", 5);
resource_scheduler.AddLocalResourceInstances("custom123", {1.0});
result = resource_scheduler.GetBestSchedulableNode(task_request, false, false, &t,
&is_infeasible);
ASSERT_FALSE(result.empty());