[Placement Group] Atomic Creation using 2 phase protocol part 2. (#10599)

* In progress.

* In Progress

* Basic done.

* Fix build issues.

* Addressed code review.

* Change the confusing test name.

* Fix comments.

* Addressed code review.
This commit is contained in:
SangBin Cho 2020-09-08 13:11:11 -07:00 committed by GitHub
parent 36e1f20e9c
commit dcb9e03fde
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 305 additions and 68 deletions

View file

@ -616,15 +616,14 @@ def test_schedule_placement_group_when_node_add(ray_start_cluster):
wait_for_condition(is_placement_group_created) wait_for_condition(is_placement_group_created)
@pytest.mark.skip(reason="Not working yet")
def test_atomic_creation(ray_start_cluster): def test_atomic_creation(ray_start_cluster):
# Setup cluster. # Setup cluster.
cluster = ray_start_cluster cluster = ray_start_cluster
bundle_cpu_size = 2 bundle_cpu_size = 2
bundle_per_node = 2 bundle_per_node = 2
num_nodes = 5 num_nodes = 2
nodes = [ [
cluster.add_node(num_cpus=bundle_cpu_size * bundle_per_node) cluster.add_node(num_cpus=bundle_cpu_size * bundle_per_node)
for _ in range(num_nodes) for _ in range(num_nodes)
] ]
@ -635,24 +634,29 @@ def test_atomic_creation(ray_start_cluster):
def ping(self): def ping(self):
pass pass
@ray.remote(num_cpus=3)
def bothering_task():
import time
time.sleep(1)
return True
# Schedule tasks to fail initial placement group creation.
tasks = [bothering_task.remote() for _ in range(2)]
# Create an actor that will fail bundle scheduling. # Create an actor that will fail bundle scheduling.
# It is important to use pack strategy to make test less flaky. # It is important to use pack strategy to make test less flaky.
pg = ray.util.placement_group( pg = ray.util.placement_group(
name="name", name="name",
strategy="PACK", strategy="SPREAD",
bundles=[{ bundles=[{
"CPU": bundle_cpu_size "CPU": bundle_cpu_size
} for _ in range(num_nodes * bundle_per_node)]) } for _ in range(num_nodes * bundle_per_node)])
# Create a placement group actor. # Create a placement group actor.
# This shouldn't be scheduled until placement group creation is done. # This shouldn't be scheduled because atomic
# placement group creation should've failed.
pg_actor = NormalActor.options( pg_actor = NormalActor.options(
placement_group=pg, placement_group=pg,
placement_group_bundle_index=num_nodes * bundle_per_node - 1).remote() placement_group_bundle_index=num_nodes * bundle_per_node - 1).remote()
# Destroy some nodes to fail placement group creation.
nodes_to_kill = get_other_nodes(cluster, exclude_head=True)
for node_to_kill in nodes_to_kill:
cluster.remove_node(node_to_kill)
# Wait on the placement group now. It should be unready # Wait on the placement group now. It should be unready
# because normal actor takes resources that are required # because normal actor takes resources that are required
@ -660,23 +664,131 @@ def test_atomic_creation(ray_start_cluster):
ready, unready = ray.wait([pg.ready()], timeout=0) ready, unready = ray.wait([pg.ready()], timeout=0)
assert len(ready) == 0 assert len(ready) == 0
assert len(unready) == 1 assert len(unready) == 1
# Wait until all tasks are done.
assert all(ray.get(tasks))
# Add a node back to schedule placement group. # Wait on the placement group creation. Since resources are now available,
for _ in range(len(nodes_to_kill)): # it should be ready soon.
nodes.append(
cluster.add_node(num_cpus=bundle_cpu_size * bundle_per_node))
# Wait on the placement group creation.
ready, unready = ray.wait([pg.ready()]) ready, unready = ray.wait([pg.ready()])
assert len(ready) == 1 assert len(ready) == 1
assert len(unready) == 0 assert len(unready) == 0
# Confirm that the placement group actor is created. It will # Confirm that the placement group actor is created. It will
# raise an exception if actor was scheduled before placement group was # raise an exception if actor was scheduled before placement
# created. # group was created thus it checks atomicity.
# TODO(sang): This with statement should be removed after atomic creation
# is implemented. It will be done in the next PR.
with pytest.raises(ray.exceptions.RayActorError):
ray.get(pg_actor.ping.remote(), timeout=3.0) ray.get(pg_actor.ping.remote(), timeout=3.0)
ray.kill(pg_actor)
# Make sure atomic creation failure didn't impact resources.
@ray.remote(num_cpus=bundle_cpu_size)
def resource_check():
return True
# This should hang because every resources
# are claimed by placement group.
check_without_pg = [
resource_check.remote() for _ in range(bundle_per_node * num_nodes)
]
# This all should scheduled on each bundle.
check_with_pg = [
resource_check.options(
placement_group=pg, placement_group_bundle_index=i).remote()
for i in range(bundle_per_node * num_nodes)
]
# Make sure these are hanging.
ready, unready = ray.wait(check_without_pg, timeout=0)
assert len(ready) == 0
assert len(unready) == bundle_per_node * num_nodes
# Make sure these are all scheduled.
assert all(ray.get(check_with_pg))
ray.util.remove_placement_group(pg)
def pg_removed():
return ray.util.placement_group_table(pg)["state"] == "REMOVED"
wait_for_condition(pg_removed)
# Make sure check without pgs are all
# scheduled properly because resources are cleaned up.
assert all(ray.get(check_without_pg))
def test_mini_integration(ray_start_cluster):
# Create bundles as many as number of gpus in the cluster.
# Do some random work and make sure all resources are properly recovered.
cluster = ray_start_cluster
num_nodes = 5
per_bundle_gpus = 2
gpu_per_node = 4
total_gpus = num_nodes * per_bundle_gpus * gpu_per_node
per_node_gpus = per_bundle_gpus * gpu_per_node
bundles_per_pg = 2
total_num_pg = total_gpus // (bundles_per_pg * per_bundle_gpus)
[
cluster.add_node(num_cpus=2, num_gpus=per_bundle_gpus * gpu_per_node)
for _ in range(num_nodes)
]
cluster.wait_for_nodes()
ray.init(address=cluster.address)
@ray.remote(num_cpus=0, num_gpus=1)
def random_tasks():
import time
import random
sleep_time = random.uniform(0.1, 0.2)
time.sleep(sleep_time)
return True
pgs = []
pg_tasks = []
# total bundle gpu usage = bundles_per_pg * total_num_pg * per_bundle_gpus
# Note this is half of total
for _ in range(total_num_pg):
pgs.append(
ray.util.placement_group(
name="name",
strategy="PACK",
bundles=[{
"GPU": per_bundle_gpus
} for _ in range(bundles_per_pg)]))
# Schedule tasks.
for i in range(total_num_pg):
pg = pgs[i]
pg_tasks.append([
random_tasks.options(
placement_group=pg,
placement_group_bundle_index=bundle_index).remote()
for bundle_index in range(bundles_per_pg)
])
# Make sure tasks are done and we remove placement groups.
num_removed_pg = 0
pg_indexes = [2, 3, 1, 7, 8, 9, 0, 6, 4, 5]
while num_removed_pg < total_num_pg:
index = pg_indexes[num_removed_pg]
pg = pgs[index]
assert all(ray.get(pg_tasks[index]))
ray.util.remove_placement_group(pg)
num_removed_pg += 1
@ray.remote(num_cpus=2, num_gpus=per_node_gpus)
class A:
def ping(self):
return True
# Make sure all resources are properly returned by scheduling
# actors that take up all existing resources.
actors = [A.remote() for _ in range(num_nodes)]
assert all(ray.get([a.ping.remote() for a in actors]))
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -228,8 +228,9 @@ void ResourceSet::AddResources(const ResourceSet &other) {
} }
} }
void ResourceSet::AddBundleResources(const PlacementGroupID &group_id, void ResourceSet::CommitBundleResources(const PlacementGroupID &group_id,
const int bundle_index, const ResourceSet &other) { const int bundle_index,
const ResourceSet &other) {
for (const auto &resource_pair : other.GetResourceAmountMap()) { for (const auto &resource_pair : other.GetResourceAmountMap()) {
// With bundle index (e.g., CPU_group_i_zzz). // With bundle index (e.g., CPU_group_i_zzz).
const std::string &resource_label = const std::string &resource_label =
@ -685,7 +686,7 @@ void ResourceIdSet::AddOrUpdateResource(const std::string &resource_name,
} }
} }
void ResourceIdSet::AddBundleResourceIds(const PlacementGroupID &group_id, void ResourceIdSet::CommitBundleResourceIds(const PlacementGroupID &group_id,
const int bundle_index, const int bundle_index,
const std::string &resource_name, const std::string &resource_name,
ResourceIds &resource_ids) { ResourceIds &resource_ids) {
@ -872,13 +873,18 @@ void SchedulingResources::UpdateResourceCapacity(const std::string &resource_nam
} }
} }
void SchedulingResources::TransferToBundleResources(const PlacementGroupID &group, void SchedulingResources::PrepareBundleResources(const PlacementGroupID &group,
const int bundle_index, const int bundle_index,
const ResourceSet &resource_set) { const ResourceSet &resource_set) {
resources_available_.SubtractResourcesStrict(resource_set); resources_available_.SubtractResourcesStrict(resource_set);
resources_available_.AddBundleResources(group, bundle_index, resource_set);
resources_total_.SubtractResourcesStrict(resource_set); resources_total_.SubtractResourcesStrict(resource_set);
resources_total_.AddBundleResources(group, bundle_index, resource_set); }
void SchedulingResources::CommitBundleResources(const PlacementGroupID &group,
const int bundle_index,
const ResourceSet &resource_set) {
resources_available_.CommitBundleResources(group, bundle_index, resource_set);
resources_total_.CommitBundleResources(group, bundle_index, resource_set);
} }
void SchedulingResources::ReturnBundleResources(const PlacementGroupID &group_id, void SchedulingResources::ReturnBundleResources(const PlacementGroupID &group_id,

View file

@ -154,17 +154,21 @@ class ResourceSet {
/// This adds both the the indexed and wildcard resources (e.g., both /// This adds both the the indexed and wildcard resources (e.g., both
/// CPU_group_i_zzz and CPU_group_zzz). /// CPU_group_i_zzz and CPU_group_zzz).
/// ///
/// NOTE: This method should be used AFTER resources are COMMITTED.
/// It can have unexpected behavior if you call this method on PREPARED resources.
///
/// \param group_id: The placement group id. /// \param group_id: The placement group id.
/// \param bundle_index: The index of the bundle. /// \param bundle_index: The index of the bundle.
/// \param other: The other resource set to add. /// \param other: The other resource set to add.
/// \return Void. /// \return Void.
void AddBundleResources(const PlacementGroupID &group_id, const int bundle_index, void CommitBundleResources(const PlacementGroupID &group_id, const int bundle_index,
const ResourceSet &other); const ResourceSet &other);
/// \brief Return back all the bundle resource. Changing the resource name and adding /// \brief Return back all the bundle resource. Changing the resource name and adding
/// any missing resource labels to this set. /// any missing resource labels to this set.
/// ///
/// This is the inverse of AddBundleResources(). /// Note that this method assumes bundle resources are COMMITTED.
/// Please make sure to commit bundle resources before calling this method.
/// ///
/// \param group_id: The placement group id. /// \param group_id: The placement group id.
/// \param bundle_index: The bundle index to return resources for. /// \param bundle_index: The bundle index to return resources for.
@ -427,7 +431,7 @@ class ResourceIdSet {
/// \param capacity capacity of the resource being added /// \param capacity capacity of the resource being added
void AddOrUpdateResource(const std::string &resource_name, int64_t capacity); void AddOrUpdateResource(const std::string &resource_name, int64_t capacity);
/// \brief Add a Bundle resource in the ResourceIdSet. /// \brief Commit a Bundle resource in the ResourceIdSet.
/// ///
/// This adds both the the indexed and wildcard resources (e.g., both /// This adds both the the indexed and wildcard resources (e.g., both
/// CPU_group_i_zzz and CPU_group_zzz). /// CPU_group_i_zzz and CPU_group_zzz).
@ -436,12 +440,15 @@ class ResourceIdSet {
/// \param bundle_index: The index of the bundle. /// \param bundle_index: The index of the bundle.
/// \param resource_name the name of the resource to create/update (e.g., "CPU"). /// \param resource_name the name of the resource to create/update (e.g., "CPU").
/// \param resource_ids resource_ids of the resource being added /// \param resource_ids resource_ids of the resource being added
void AddBundleResourceIds(const PlacementGroupID &group_id, const int bundle_index, void CommitBundleResourceIds(const PlacementGroupID &group_id, const int bundle_index,
const std::string &resource_name, ResourceIds &resource_ids); const std::string &resource_name,
ResourceIds &resource_ids);
/// \brief remove a Bundle resource in the ResourceIdSet. /// \brief remove a Bundle resource in the ResourceIdSet.
/// ///
/// The bundle resources will be returned to their original resource names. /// The bundle resources will be returned to their original resource names.
/// Note that the bundle resources should've been COMMITTED before this method is
/// called.
/// ///
/// \param group_id: The placement group id. /// \param group_id: The placement group id.
/// \param bundle_index: The index of the bundle. /// \param bundle_index: The index of the bundle.
@ -571,7 +578,22 @@ class SchedulingResources {
void UpdateResourceCapacity(const std::string &resource_name, int64_t capacity); void UpdateResourceCapacity(const std::string &resource_name, int64_t capacity);
/// \brief Update total, available and load resources with the ResourceIds. /// \brief Update total, available and load resources with the ResourceIds.
/// Create if not exists. /// Create if not exists. This will only update resources, but it won't
/// create placement group resources. That'll be done when resources are
/// COMMITTED. Commit should be done by CommitBundleResources.
///
/// We need this step for running 2PC protocol for atomic placement group creation.
///
/// \param resource_name: Name of the resource to be modified.
/// \param resource_set: New resource_set of the resource.
void PrepareBundleResources(const PlacementGroupID &group, const int bundle_index,
const ResourceSet &resource_set);
/// \brief Commit placement group resources. It means this method'll create
/// placement group resources. The original resources should've been updated
/// by PrepareBundleResources.
///
/// We need this step for running 2PC protocol for atomic placement group creation.
/// ///
/// The resources will be transfered from their original resource names. /// The resources will be transfered from their original resource names.
/// This includes both the the indexed and wildcard resources (e.g., both /// This includes both the the indexed and wildcard resources (e.g., both
@ -579,7 +601,7 @@ class SchedulingResources {
/// ///
/// \param resource_name: Name of the resource to be modified /// \param resource_name: Name of the resource to be modified
/// \param resource_set: New resource_set of the resource. /// \param resource_set: New resource_set of the resource.
void TransferToBundleResources(const PlacementGroupID &group, const int bundle_index, void CommitBundleResources(const PlacementGroupID &group, const int bundle_index,
const ResourceSet &resource_set); const ResourceSet &resource_set);
/// \brief delete total, available and load resources with the ResourceIds. /// \brief delete total, available and load resources with the ResourceIds.

View file

@ -32,12 +32,12 @@ class SchedulingResourcesTest : public ::testing::Test {
std::shared_ptr<ResourceIdSet> resource_id_set; std::shared_ptr<ResourceIdSet> resource_id_set;
}; };
TEST_F(SchedulingResourcesTest, AddBundleResources) { TEST_F(SchedulingResourcesTest, CommitBundleResources) {
PlacementGroupID group_id = PlacementGroupID::FromRandom(); PlacementGroupID group_id = PlacementGroupID::FromRandom();
std::vector<std::string> resource_labels = {"CPU"}; std::vector<std::string> resource_labels = {"CPU"};
std::vector<double> resource_capacity = {1.0}; std::vector<double> resource_capacity = {1.0};
ResourceSet resource(resource_labels, resource_capacity); ResourceSet resource(resource_labels, resource_capacity);
resource_set->AddBundleResources(group_id, 1, resource); resource_set->CommitBundleResources(group_id, 1, resource);
resource_labels.pop_back(); resource_labels.pop_back();
resource_labels.push_back("CPU_group_1_" + group_id.Hex()); resource_labels.push_back("CPU_group_1_" + group_id.Hex());
resource_labels.push_back("CPU_group_" + group_id.Hex()); resource_labels.push_back("CPU_group_" + group_id.Hex());
@ -52,7 +52,7 @@ TEST_F(SchedulingResourcesTest, AddBundleResource) {
std::string index_name = "CPU_group_1_" + group_id.Hex(); std::string index_name = "CPU_group_1_" + group_id.Hex();
std::vector<int64_t> whole_ids = {1, 2, 3}; std::vector<int64_t> whole_ids = {1, 2, 3};
ResourceIds resource_ids(whole_ids); ResourceIds resource_ids(whole_ids);
resource_id_set->AddBundleResourceIds(group_id, 1, "CPU", resource_ids); resource_id_set->CommitBundleResourceIds(group_id, 1, "CPU", resource_ids);
ASSERT_EQ(2, resource_id_set->AvailableResources().size()); ASSERT_EQ(2, resource_id_set->AvailableResources().size());
for (auto res : resource_id_set->AvailableResources()) { for (auto res : resource_id_set->AvailableResources()) {
ASSERT_TRUE(res.first == wild_name || res.first == index_name) << res.first; ASSERT_TRUE(res.first == wild_name || res.first == index_name) << res.first;
@ -64,7 +64,7 @@ TEST_F(SchedulingResourcesTest, ReturnBundleResources) {
std::vector<std::string> resource_labels = {"CPU"}; std::vector<std::string> resource_labels = {"CPU"};
std::vector<double> resource_capacity = {1.0}; std::vector<double> resource_capacity = {1.0};
ResourceSet resource(resource_labels, resource_capacity); ResourceSet resource(resource_labels, resource_capacity);
resource_set->AddBundleResources(group_id, 1, resource); resource_set->CommitBundleResources(group_id, 1, resource);
resource_labels.pop_back(); resource_labels.pop_back();
resource_labels.push_back("CPU_group_" + group_id.Hex()); resource_labels.push_back("CPU_group_" + group_id.Hex());
resource_labels.push_back("CPU_group_1_" + group_id.Hex()); resource_labels.push_back("CPU_group_1_" + group_id.Hex());
@ -83,8 +83,8 @@ TEST_F(SchedulingResourcesTest, MultipleBundlesAddRemove) {
ResourceSet resource(resource_labels, resource_capacity); ResourceSet resource(resource_labels, resource_capacity);
// Construct resource set containing two bundles. // Construct resource set containing two bundles.
resource_set->AddBundleResources(group_id, 1, resource); resource_set->CommitBundleResources(group_id, 1, resource);
resource_set->AddBundleResources(group_id, 2, resource); resource_set->CommitBundleResources(group_id, 2, resource);
resource_labels = { resource_labels = {
"CPU_group_" + group_id.Hex(), "CPU_group_" + group_id.Hex(),
"CPU_group_1_" + group_id.Hex(), "CPU_group_1_" + group_id.Hex(),
@ -120,10 +120,10 @@ TEST_F(SchedulingResourcesTest, MultipleBundlesAddRemoveIdSet) {
// Construct resource set containing two bundles. // Construct resource set containing two bundles.
auto rid1 = ResourceIds({1, 2}); auto rid1 = ResourceIds({1, 2});
auto rid2 = ResourceIds({3, 4}); auto rid2 = ResourceIds({3, 4});
resource_ids.AddBundleResourceIds(group_id, 1, "CPU", rid1); resource_ids.CommitBundleResourceIds(group_id, 1, "CPU", rid1);
resource_ids.AddBundleResourceIds(group_id, 2, "CPU", rid2); resource_ids.CommitBundleResourceIds(group_id, 2, "CPU", rid2);
resource_ids.AddBundleResourceIds(group_id, 1, "GPU", rid1); resource_ids.CommitBundleResourceIds(group_id, 1, "GPU", rid1);
resource_ids.AddBundleResourceIds(group_id, 2, "GPU", rid2); resource_ids.CommitBundleResourceIds(group_id, 2, "GPU", rid2);
auto result = ResourceSet( auto result = ResourceSet(
{ {
"CPU_group_" + group_id.Hex(), "CPU_group_" + group_id.Hex(),

View file

@ -1808,8 +1808,8 @@ void NodeManager::HandlePrepareBundleResources(
auto bundle_spec = BundleSpecification(request.bundle_spec()); auto bundle_spec = BundleSpecification(request.bundle_spec());
RAY_LOG(DEBUG) << "bundle prepare request " << bundle_spec.BundleId().first RAY_LOG(DEBUG) << "bundle prepare request " << bundle_spec.BundleId().first
<< bundle_spec.BundleId().second; << bundle_spec.BundleId().second;
auto resource_ids = ScheduleBundle(cluster_resource_map_, bundle_spec); auto prepared = PrepareBundle(cluster_resource_map_, bundle_spec);
if (resource_ids.AvailableResources().size() == 0) { if (!prepared) {
reply->set_success(false); reply->set_success(false);
send_reply_callback(Status::OK(), nullptr, nullptr); send_reply_callback(Status::OK(), nullptr, nullptr);
} else { } else {
@ -1824,8 +1824,17 @@ void NodeManager::HandlePrepareBundleResources(
void NodeManager::HandleCommitBundleResources( void NodeManager::HandleCommitBundleResources(
const rpc::CommitBundleResourcesRequest &request, const rpc::CommitBundleResourcesRequest &request,
rpc::CommitBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { rpc::CommitBundleResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(!new_scheduler_enabled_) << "Not implemented yet.";
auto bundle_spec = BundleSpecification(request.bundle_spec());
RAY_LOG(DEBUG) << "Received bundle commit request " << bundle_spec.BundleId().first
<< bundle_spec.BundleId().second;
CommitBundle(cluster_resource_map_, bundle_spec);
send_reply_callback(Status::OK(), nullptr, nullptr); send_reply_callback(Status::OK(), nullptr, nullptr);
// TODO(sang): Implement this in the next PR.
// Call task dispatch to assign work to the new group.
TryLocalInfeasibleTaskScheduling();
DispatchTasks(local_queues_.GetReadyTasksByClass());
} }
void NodeManager::HandleCancelResourceReserve( void NodeManager::HandleCancelResourceReserve(
@ -1860,6 +1869,20 @@ void NodeManager::HandleCancelResourceReserve(
worker->MarkDead(); worker->MarkDead();
KillWorker(worker); KillWorker(worker);
} }
// We should commit resources if it weren't because
// ReturnBundleResources requires resources to be committed when it is called.
auto it = bundle_state_map_.find(bundle_spec.BundleId());
RAY_CHECK(it != bundle_state_map_.end())
<< "Cancel requests are received to raylet although it hasn't received any prepare "
"or commit requests. This must be an anomaly.";
const auto &bundle_state = it->second;
if (bundle_state->state == CommitState::PREPARED) {
CommitBundle(cluster_resource_map_, bundle_spec);
}
bundle_state_map_.erase(it);
// Return resources.
for (auto resource : resource_set.GetResourceMap()) { for (auto resource : resource_set.GetResourceMap()) {
local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(), local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(),
bundle_spec.Index(), resource.first); bundle_spec.Index(), resource.first);
@ -2014,30 +2037,66 @@ void NodeManager::ProcessSetResourceRequest(
} }
} }
ResourceIdSet NodeManager::ScheduleBundle( bool NodeManager::PrepareBundle(
std::unordered_map<ClientID, SchedulingResources> &resource_map, std::unordered_map<ClientID, SchedulingResources> &resource_map,
const BundleSpecification &bundle_spec) { const BundleSpecification &bundle_spec) {
// If the resource map contains the local raylet, update load before calling policy. // TODO(sang): It is currently not idempotent because we don't retry. Make it idempotent
// once retry is implemented. If the resource map contains the local raylet, update load
// before calling policy.
if (resource_map.count(self_node_id_) > 0) { if (resource_map.count(self_node_id_) > 0) {
resource_map[self_node_id_].SetLoadResources(local_queues_.GetTotalResourceLoad()); resource_map[self_node_id_].SetLoadResources(local_queues_.GetTotalResourceLoad());
} }
// Invoke the scheduling policy. // Invoke the scheduling policy.
auto reserve_resource_success = auto reserve_resource_success =
scheduling_policy_.ScheduleBundle(resource_map, self_node_id_, bundle_spec); scheduling_policy_.ScheduleBundle(resource_map, self_node_id_, bundle_spec);
ResourceIdSet acquired_resources;
auto bundle_state = std::make_shared<BundleState>();
if (reserve_resource_success) { if (reserve_resource_success) {
acquired_resources = // Register states.
const auto &bundle_id = bundle_spec.BundleId();
auto it = bundle_state_map_.find(bundle_id);
// Same bundle cannot be rescheduled.
RAY_CHECK(it == bundle_state_map_.end());
// Prepare resources. This shouldn't create formatted placement group resources
// because that'll be done at the commit phase.
bundle_state->acquired_resources =
local_available_resources_.Acquire(bundle_spec.GetRequiredResources()); local_available_resources_.Acquire(bundle_spec.GetRequiredResources());
for (auto resource : acquired_resources.AvailableResources()) { resource_map[self_node_id_].PrepareBundleResources(
local_available_resources_.AddBundleResourceIds(bundle_spec.PlacementGroupId(),
bundle_spec.Index(), resource.first,
resource.second);
}
resource_map[self_node_id_].TransferToBundleResources(
bundle_spec.PlacementGroupId(), bundle_spec.Index(), bundle_spec.PlacementGroupId(), bundle_spec.Index(),
bundle_spec.GetRequiredResources()); bundle_spec.GetRequiredResources());
// Register bundle state.
bundle_state->state = CommitState::PREPARED;
bundle_state_map_.emplace(bundle_id, bundle_state);
} }
return acquired_resources; return bundle_state->acquired_resources.AvailableResources().size() > 0;
}
void NodeManager::CommitBundle(
std::unordered_map<ClientID, SchedulingResources> &resource_map,
const BundleSpecification &bundle_spec) {
// TODO(sang): It is currently not idempotent because we don't retry. Make it idempotent
// once retry is implemented.
const auto &bundle_id = bundle_spec.BundleId();
auto it = bundle_state_map_.find(bundle_id);
// When bundle is committed, it should've been prepared already.
// We don't need this check if commit becomes idempotent.
RAY_CHECK(it != bundle_state_map_.end());
const auto &bundle_state = it->second;
bundle_state->state = CommitState::COMMITTED;
const auto &acquired_resources = bundle_state->acquired_resources;
for (auto resource : acquired_resources.AvailableResources()) {
local_available_resources_.CommitBundleResourceIds(bundle_spec.PlacementGroupId(),
bundle_spec.Index(),
resource.first, resource.second);
}
resource_map[self_node_id_].CommitBundleResources(bundle_spec.PlacementGroupId(),
bundle_spec.Index(),
bundle_spec.GetRequiredResources());
RAY_CHECK(bundle_state->acquired_resources.AvailableResources().size() > 0)
<< "Prepare should've been failed if there were no acquireable resources.";
} }
void NodeManager::ScheduleTasks( void NodeManager::ScheduleTasks(

View file

@ -99,6 +99,28 @@ struct NodeManagerConfig {
std::unordered_map<std::string, std::string> raylet_config; std::unordered_map<std::string, std::string> raylet_config;
}; };
typedef std::pair<PlacementGroupID, int64_t> BundleID;
struct pair_hash {
template <class T1, class T2>
std::size_t operator()(const std::pair<T1, T2> &pair) const {
return std::hash<T1>()(pair.first) ^ std::hash<T2>()(pair.second);
}
};
enum CommitState {
/// Resources are prepared.
PREPARED,
/// Resources are COMMITTED.
COMMITTED
};
struct BundleState {
/// Leasing state for 2PC protocol.
CommitState state;
/// Resources that are acquired at preparation stage.
ResourceIdSet acquired_resources;
};
class NodeManager : public rpc::NodeManagerServiceHandler { class NodeManager : public rpc::NodeManagerServiceHandler {
public: public:
/// Create a node manager. /// Create a node manager.
@ -288,15 +310,26 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return Void. /// \return Void.
void ScheduleTasks(std::unordered_map<ClientID, SchedulingResources> &resource_map); void ScheduleTasks(std::unordered_map<ClientID, SchedulingResources> &resource_map);
/// Make a placement decision for the resource_map and subtract original resources so
/// that the node is ready to commit (create) placement group resources.
///
/// \param resource_map A mapping from node manager ID to an estimate of the
/// resources available to that node manager. Scheduling decisions will only
/// consider the local node manager and the node managers in the keys of the
/// resource_map argument.
/// \param bundle_spec Specification of bundle that will be prepared.
/// \return True is resources were successfully prepared. False otherwise.
bool PrepareBundle(std::unordered_map<ClientID, SchedulingResources> &resource_map,
const BundleSpecification &bundle_spec);
/// Make a placement decision for the resource_map. /// Make a placement decision for the resource_map.
/// ///
/// \param resource_map A mapping from node manager ID to an estimate of the /// \param resource_map A mapping from node manager ID to an estimate of the
/// resources available to that node manager. Scheduling decisions will only /// resources available to that node manager. Scheduling decisions will only
/// consider the local node manager and the node managers in the keys of the /// consider the local node manager and the node managers in the keys of the
/// resource_map argument. /// resource_map argument.
/// \return ResourceIdSet. /// \param bundle_spec Specification of bundle that will be prepared.
ResourceIdSet ScheduleBundle( void CommitBundle(std::unordered_map<ClientID, SchedulingResources> &resource_map,
std::unordered_map<ClientID, SchedulingResources> &resource_map,
const BundleSpecification &bundle_spec); const BundleSpecification &bundle_spec);
/// Handle a task whose return value(s) must be reconstructed. /// Handle a task whose return value(s) must be reconstructed.
@ -794,6 +827,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// free_objects_batch_size, or if objects have been in the cache for longer /// free_objects_batch_size, or if objects have been in the cache for longer
/// than the config's free_objects_period, whichever occurs first. /// than the config's free_objects_period, whichever occurs first.
std::vector<ObjectID> objects_to_free_; std::vector<ObjectID> objects_to_free_;
/// This map represents the commit state of 2PC protocol for atomic placement group
/// creation.
absl::flat_hash_map<BundleID, std::shared_ptr<BundleState>, pair_hash>
bundle_state_map_;
}; };
} // namespace raylet } // namespace raylet