[core] Fix race condition between object free and duplicate creation (#21364)

An object can get created/pinned twice if the original worker fails mid-task, or when lineage reconstruction is enabled. This can cause inconsistencies in the LocalObjectManager if the second creation races with object spilling and/or object free. For example:
1. Object X get created, then is pending spill.
2. Object X is freed by original owner because it goes out of scope.
3. Task that created X gets re-executed due to failure.
4. Task recreates X, which can now get spilled again while the original copy is also being spilled/freed.

This PR better enforces the state machine for objects managed by the LocalObjectManager. An object can be either: pinned, pending spill, or spilled. If we receive a free message from the owner, we do not delete the object metadata until all shared-memory and spilled copies of the object are removed.
This commit is contained in:
Stephanie Wang 2022-01-19 17:58:07 -08:00 committed by GitHub
parent d3e7abb3c9
commit bab7cd6388
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 141 additions and 19 deletions

View file

@ -35,19 +35,20 @@ void LocalObjectManager::PinObjectsAndWaitForFree(
continue;
}
const auto inserted = objects_waiting_for_free_.emplace(object_id, owner_address);
const auto inserted =
local_objects_.emplace(object_id, std::make_pair<>(owner_address, false));
if (inserted.second) {
// This is the first time we're pinning this object.
RAY_LOG(DEBUG) << "Pinning object " << object_id;
pinned_objects_size_ += object->GetSize();
pinned_objects_.emplace(object_id, std::move(object));
} else {
if (inserted.first->second.worker_id() != owner_address.worker_id()) {
auto original_worker_id =
WorkerID::FromBinary(inserted.first->second.first.worker_id());
auto new_worker_id = WorkerID::FromBinary(owner_address.worker_id());
if (original_worker_id != new_worker_id) {
// TODO(swang): Handle this case. We should use the new owner address
// and object copy.
auto original_worker_id =
WorkerID::FromBinary(inserted.first->second.worker_id());
auto new_worker_id = WorkerID::FromBinary(owner_address.worker_id());
RAY_LOG(WARNING)
<< "Received PinObjects request from a different owner " << new_worker_id
<< " from the original " << original_worker_id << ". Object " << object_id
@ -95,18 +96,31 @@ void LocalObjectManager::PinObjectsAndWaitForFree(
}
void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Unpinning object " << object_id;
if (!objects_waiting_for_free_.erase(object_id)) {
// Only free the object if it is not already freed.
auto it = local_objects_.find(object_id);
if (it == local_objects_.end() || it->second.second) {
return;
}
// The object should be in one of these stats. pinned, spilling, or spilled.
// Mark the object as freed. NOTE(swang): We have to mark this instead of
// deleting the entry immediately in case the object is currently being
// spilled. In that case, we should process the free event once the object
// spill is complete.
it->second.second = true;
RAY_LOG(DEBUG) << "Unpinning object " << object_id;
// The object should be in one of these states: pinned, spilling, or spilled.
RAY_CHECK((pinned_objects_.count(object_id) > 0) ||
(spilled_objects_url_.count(object_id) > 0) ||
(objects_pending_spill_.count(object_id) > 0));
spilled_object_pending_delete_.push(object_id);
if (pinned_objects_.count(object_id)) {
pinned_objects_size_ -= pinned_objects_[object_id]->GetSize();
pinned_objects_.erase(object_id);
local_objects_.erase(it);
} else {
// If the object is being spilled or is already spilled, then we will clean
// up the local_objects_ entry once the spilled copy has been
// freed.
spilled_object_pending_delete_.push(object_id);
}
// Try to evict all copies of the object from the cluster.
@ -258,14 +272,14 @@ void LocalObjectManager::SpillObjectsInternal(
std::vector<ObjectID> requested_objects_to_spill;
for (const auto &object_id : objects_to_spill) {
RAY_CHECK(objects_pending_spill_.count(object_id));
auto owner_it = objects_waiting_for_free_.find(object_id);
auto freed_it = local_objects_.find(object_id);
// If the object hasn't already been freed, spill it.
if (owner_it == objects_waiting_for_free_.end()) {
if (freed_it == local_objects_.end() || freed_it->second.second) {
objects_pending_spill_.erase(object_id);
} else {
auto ref = request.add_object_refs_to_spill();
ref->set_object_id(object_id.Binary());
ref->mutable_owner_address()->CopyFrom(owner_it->second);
ref->mutable_owner_address()->CopyFrom(freed_it->second.first);
RAY_LOG(DEBUG) << "Sending spill request for object " << object_id;
requested_objects_to_spill.push_back(object_id);
}
@ -349,14 +363,14 @@ void LocalObjectManager::OnObjectSpilled(const std::vector<ObjectID> &object_ids
request.set_spilled_node_id(node_id_object_spilled.Binary());
request.set_size(object_size);
auto owner_it = objects_waiting_for_free_.find(object_id);
if (owner_it == objects_waiting_for_free_.end()) {
auto freed_it = local_objects_.find(object_id);
if (freed_it == local_objects_.end() || freed_it->second.second) {
RAY_LOG(DEBUG) << "Spilled object already freed, skipping send of spilled URL to "
"object directory for object "
<< object_id;
continue;
}
const auto &worker_addr = owner_it->second;
const auto &worker_addr = freed_it->second.first;
auto owner_client = owner_client_pool_.GetOrConnect(worker_addr);
RAY_LOG(DEBUG) << "Sending spilled URL " << object_url << " for object " << object_id
<< " to owner " << WorkerID::FromBinary(worker_addr.worker_id());
@ -485,7 +499,12 @@ void LocalObjectManager::ProcessSpilledObjectsDeleteQueue(uint32_t max_batch_siz
object_urls_to_delete.emplace_back(object_url);
}
spilled_objects_url_.erase(spilled_objects_url_it);
} else {
// If the object was not spilled, it gets pinned again. Unpin here to
// prevent a memory leak.
pinned_objects_.erase(object_id);
}
local_objects_.erase(object_id);
spilled_object_pending_delete_.pop();
}
if (object_urls_to_delete.size() > 0) {

View file

@ -196,8 +196,14 @@ class LocalObjectManager {
/// A callback to call when an object has been freed.
std::function<void(const std::vector<ObjectID> &)> on_objects_freed_;
/// Hashmap from objects that we are waiting to free to their owner address.
absl::flat_hash_map<ObjectID, rpc::Address> objects_waiting_for_free_;
/// Hashmap from local objects that we are waiting to free to a tuple of
/// (their owner address, whether the object has been freed).
/// All objects in this hashmap should also be in exactly one of the
/// following maps:
/// - pinned_objects_: objects pinned in shared memory
/// - objects_pending_spill_: objects pinned and waiting for spill to complete
/// - spilled_objects_url_: objects already spilled
absl::flat_hash_map<ObjectID, std::pair<rpc::Address, bool>> local_objects_;
// Objects that are pinned on this node.
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_;

View file

@ -221,7 +221,7 @@ class MockIOWorkerPool : public IOWorkerPoolInterface {
void PopSpillWorker(
std::function<void(std::shared_ptr<WorkerInterface>)> callback) override {
callback(io_worker);
pop_callbacks.push_back(callback);
}
void PopRestoreWorker(
@ -244,6 +244,17 @@ class MockIOWorkerPool : public IOWorkerPoolInterface {
return true;
}
bool FlushPopSpillWorkerCallbacks() {
if (pop_callbacks.size() == 0) {
return false;
}
const auto callback = pop_callbacks.front();
callback(io_worker);
pop_callbacks.pop_front();
return true;
}
std::list<std::function<void(std::shared_ptr<WorkerInterface>)>> pop_callbacks;
std::list<std::function<void(std::shared_ptr<WorkerInterface>)>> restoration_callbacks;
std::shared_ptr<MockIOWorkerClient> io_worker_client =
std::make_shared<MockIOWorkerClient>();
@ -309,7 +320,8 @@ class LocalObjectManagerTest : public ::testing::Test {
ASSERT_TRUE(manager.spilled_objects_url_.empty());
ASSERT_TRUE(manager.objects_pending_spill_.empty());
ASSERT_TRUE(manager.url_ref_count_.empty());
ASSERT_TRUE(manager.objects_waiting_for_free_.empty());
ASSERT_TRUE(manager.local_objects_.empty());
ASSERT_TRUE(manager.spilled_object_pending_delete_.empty());
}
void TearDown() { unevictable_objects_.clear(); }
@ -384,6 +396,7 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
manager.SpillObjects(object_ids,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids.size(); i++) {
@ -443,6 +456,7 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) {
ASSERT_TRUE(status.ok());
num_times_fired++;
});
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_EQ(num_times_fired, 0);
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
@ -488,10 +502,12 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) {
ASSERT_TRUE(status.ok());
num_times_fired++;
});
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
// Spill the same objects again. The callback should only be fired once
// total.
manager.SpillObjects(object_ids,
[&](const Status &status) mutable { ASSERT_TRUE(!status.ok()); });
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_EQ(num_times_fired, 0);
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
@ -536,6 +552,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
ASSERT_TRUE(manager.SpillObjectsOfSize(total_size / 2));
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
}
@ -563,6 +580,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
// Make sure providing 0 bytes to SpillObjectsOfSize will spill one object.
// This is important to cover min_spilling_size_== 0.
ASSERT_TRUE(manager.SpillObjectsOfSize(0));
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
EXPECT_CALL(worker_pool, PushSpillWorker(_));
const std::string url = BuildURL("url" + std::to_string(object_ids.size()));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url}));
@ -577,6 +595,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
// Since there's no more object to spill, this should fail.
ASSERT_FALSE(manager.SpillObjectsOfSize(0));
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
}
TEST_F(LocalObjectManagerTest, TestSpillUptoMaxFuseCount) {
@ -603,6 +622,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxFuseCount) {
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
ASSERT_TRUE(manager.SpillObjectsOfSize(total_size));
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
}
@ -656,6 +676,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) {
// Now object is evictable. Spill should succeed.
unevictable_objects_.erase(object_id);
ASSERT_TRUE(manager.SpillObjectsOfSize(1000));
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
}
TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) {
@ -680,9 +701,11 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) {
// This will spill until 2 workers are occupied.
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_TRUE(manager.IsSpillingInProgress());
// Spilling is still going on, meaning we can make the pace. So it should return true.
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_TRUE(manager.IsSpillingInProgress());
// No object ids are spilled yet.
for (const auto &id : object_ids) {
@ -706,9 +729,11 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) {
// SpillObjectUptoMaxThroughput will spill one more object (since one worker is
// availlable).
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_TRUE(manager.IsSpillingInProgress());
manager.SpillObjectUptoMaxThroughput();
ASSERT_TRUE(manager.IsSpillingInProgress());
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
// Spilling is done for all objects.
for (size_t i = 1; i < object_ids.size(); i++) {
@ -727,6 +752,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) {
// We cannot spill anymore as there is no more pinned object.
manager.SpillObjectUptoMaxThroughput();
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_FALSE(manager.IsSpillingInProgress());
}
@ -750,6 +776,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
ASSERT_FALSE(status.ok());
num_times_fired++;
});
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
// Return an error from the IO worker during spill.
EXPECT_CALL(worker_pool, PushSpillWorker(_));
@ -764,6 +791,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
ASSERT_TRUE(status.ok());
num_times_fired++;
});
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
std::string url = BuildURL("url");
EXPECT_CALL(worker_pool, PushSpillWorker(_));
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({url}));
@ -791,6 +819,7 @@ TEST_F(LocalObjectManagerTest, TestPartialSpillError) {
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
manager.SpillObjects(object_ids,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
EXPECT_CALL(worker_pool, PushSpillWorker(_));
std::vector<std::string> urls;
@ -862,6 +891,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) {
}
manager.SpillObjects(object_ids_to_spill,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
@ -910,6 +940,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) {
}
manager.SpillObjects(object_ids_to_spill,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
std::vector<std::string> urls;
// Note every object has the same url. It means all objects are fused.
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
@ -976,8 +1007,10 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) {
}
manager.SpillObjects(spill_set_1,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
manager.SpillObjects(spill_set_2,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
std::vector<std::string> urls_spill_set_1;
std::vector<std::string> urls_spill_set_2;
@ -1046,6 +1079,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) {
// All the entries are spilled.
manager.SpillObjects(object_ids_to_spill,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
urls.push_back(BuildURL("url" + std::to_string(i)));
@ -1096,6 +1130,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) {
}
manager.SpillObjects(object_ids_to_spill,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
std::vector<std::string> urls;
for (size_t i = 0; i < object_ids_to_spill.size(); i++) {
// Simulate the situation where there's a single file that contains multiple objects.
@ -1191,6 +1226,68 @@ TEST_F(LocalObjectManagerTest, TestDuplicatePin) {
AssertNoLeaks();
}
TEST_F(LocalObjectManagerTest, TestDuplicatePinAndSpill) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
for (size_t i = 0; i < free_objects_batch_size; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
}
std::vector<std::unique_ptr<RayObject>> objects;
for (size_t i = 0; i < free_objects_batch_size; i++) {
std::string meta = std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto object = std::make_unique<RayObject>(nullptr, meta_buffer,
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
bool spilled = false;
manager.SpillObjects(object_ids, [&](const Status &status) {
RAY_CHECK(status.ok());
spilled = true;
});
ASSERT_FALSE(spilled);
// Free on messages from the original owner.
auto owner_id1 = WorkerID::FromBinary(owner_address.worker_id());
for (size_t i = 0; i < free_objects_batch_size; i++) {
ASSERT_TRUE(freed.empty());
EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary()));
ASSERT_TRUE(subscriber_->PublishObjectEviction(owner_id1));
}
std::unordered_set<ObjectID> expected(object_ids.begin(), object_ids.end());
ASSERT_EQ(freed, expected);
// Duplicate pin message from same owner.
objects.clear();
for (size_t i = 0; i < free_objects_batch_size; i++) {
std::string meta = std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto object = std::make_unique<RayObject>(nullptr, meta_buffer,
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
manager.SpillObjects(object_ids,
[&](const Status &status) { RAY_CHECK(!status.ok()); });
// Should only spill the objects once.
ASSERT_TRUE(worker_pool.FlushPopSpillWorkerCallbacks());
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects({}));
ASSERT_FALSE(worker_pool.FlushPopSpillWorkerCallbacks());
manager.FlushFreeObjects();
AssertNoLeaks();
}
} // namespace raylet
} // namespace ray