Fix race condition between failure detection and references going out of scope (#12548)

* fix

* lint
This commit is contained in:
Stephanie Wang 2020-12-01 20:52:30 -05:00 committed by GitHub
parent 19c8033df2
commit 8801e87afd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 90 additions and 48 deletions

View file

@ -431,7 +431,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
};
auto reconstruct_object_callback = [this](const ObjectID &object_id) {
io_service_.post([this, object_id]() {
RAY_CHECK_OK(object_recovery_manager_->RecoverObject(object_id));
RAY_CHECK(object_recovery_manager_->RecoverObject(object_id));
});
};
task_manager_.reset(new TaskManager(
@ -656,8 +656,11 @@ void CoreWorker::OnNodeRemoved(const rpc::GcsNodeInfo &node_info) {
memory_store_->Delete(lost_objects);
for (const auto &object_id : lost_objects) {
RAY_LOG(INFO) << "Object " << object_id << " lost due to node failure " << node_id;
// The lost object must have been owned by us.
RAY_CHECK_OK(object_recovery_manager_->RecoverObject(object_id));
// NOTE(swang): There is a race condition where this can return false if
// the reference went out of scope since the call to the ref counter to get
// the lost objects. It's okay to not mark the object as failed or recover
// the object since there are no reference holders.
static_cast<void>(object_recovery_manager_->RecoverObject(object_id));
}
}
@ -1189,8 +1192,10 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id,
// Find the raylet that hosts the primary copy of the object.
NodeID pinned_at;
bool spilled;
RAY_CHECK(
reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &pinned_at, &spilled));
bool owned_by_us;
RAY_CHECK(reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &owned_by_us,
&pinned_at, &spilled));
RAY_CHECK(owned_by_us);
if (spilled) {
// The object has already been spilled.
return;

View file

@ -18,16 +18,23 @@
namespace ray {
Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
// Check the ReferenceCounter to see if there is a location for the object.
bool owned_by_us;
NodeID pinned_at;
bool spilled;
bool owned_by_us =
reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &pinned_at, &spilled);
bool ref_exists = reference_counter_->IsPlasmaObjectPinnedOrSpilled(
object_id, &owned_by_us, &pinned_at, &spilled);
if (!ref_exists) {
// References that have gone out of scope cannot be recovered.
return false;
}
if (!owned_by_us) {
return Status::Invalid(
"Object reference no longer exists or is not owned by us. Either lineage pinning "
"is disabled or this object ID is borrowed.");
RAY_LOG(INFO) << "Reconstruction for borrowed objects (" << object_id
<< ") is not supported";
reconstruction_failure_callback_(object_id, /*pin_object=*/false);
return true;
}
bool already_pending_recovery = true;
@ -49,7 +56,7 @@ Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
RAY_LOG(INFO) << "Recovery complete for object " << object_id;
});
// Lookup the object in the GCS to find another copy.
RAY_RETURN_NOT_OK(object_lookup_(
RAY_CHECK_OK(object_lookup_(
object_id,
[this](const ObjectID &object_id, const std::vector<rpc::Address> &locations) {
PinOrReconstructObject(object_id, locations);
@ -57,7 +64,7 @@ Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
} else {
RAY_LOG(DEBUG) << "Recovery already started for object " << object_id;
}
return Status::OK();
return true;
}
void ObjectRecoveryManager::PinOrReconstructObject(
@ -130,8 +137,8 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
if (status.ok()) {
// Try to recover the task's dependencies.
for (const auto &dep : task_deps) {
auto status = RecoverObject(dep);
if (!status.ok()) {
auto recovered = RecoverObject(dep);
if (!recovered) {
RAY_LOG(INFO) << "Failed to reconstruct object " << dep << ": "
<< status.message();
// We do not pin the dependency because we may not be the owner.

View file

@ -79,11 +79,12 @@ class ObjectRecoveryManager {
/// plasma arguments to the task. The recovery operation will succeed once
/// the task completes and stores a new value for its return object.
///
/// \return OK if recovery for the object has successfully started, Invalid
/// if the object is not recoverable because we do not own it. Note that the
/// Status::OK value only indicates that the recovery operation has started,
/// but does not guarantee that the recovery operation is successful.
Status RecoverObject(const ObjectID &object_id);
/// \return True if recovery for the object has successfully started, false
/// if the object is not recoverable because we do not have any metadata
/// about the object. If this returns true, then eventually recovery will
/// either succeed (a value will be put into the memory store) or fail (the
/// reconstruction failure callback will be called for this object).
bool RecoverObject(const ObjectID &object_id);
private:
/// Pin a new copy for a lost object from the given locations or, if that

View file

@ -543,16 +543,17 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id,
}
bool ReferenceCounter::IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id,
NodeID *pinned_at,
bool *owned_by_us, NodeID *pinned_at,
bool *spilled) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it != object_id_refs_.end()) {
if (it->second.owned_by_us) {
*owned_by_us = true;
*spilled = it->second.spilled;
*pinned_at = it->second.pinned_at_raylet_id.value_or(NodeID::Nil());
return true;
}
return true;
}
return false;
}

View file

@ -327,14 +327,15 @@ class ReferenceCounter : public ReferenceCounterInterface {
/// available to fetch.
///
/// \param[in] object_id The object to check.
/// \param[out] owned_by_us Whether this object is owned by us. The pinned_at
/// and spilled out-parameters are set if this is true.
/// \param[out] pinned_at The node ID of the raylet at which this object is
/// \param[out] spilled Whether this object has been spilled.
/// pinned. Set to nil if the object is not pinned.
/// \return True if the object exists and is owned by us, false otherwise. We
/// return false here because a borrower should not know the pinned location
/// for an object.
bool IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id, NodeID *pinned_at,
bool *spilled) const LOCKS_EXCLUDED(mutex_);
/// \return True if the reference exists, false otherwise.
bool IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id, bool *owned_by_us,
NodeID *pinned_at, bool *spilled) const
LOCKS_EXCLUDED(mutex_);
/// Get and reset the objects that were pinned on the given node. This
/// method should be called upon a node failure, to determine which plasma

View file

@ -1986,23 +1986,28 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
ObjectID borrowed_id = ObjectID::FromRandom();
rc->AddLocalReference(borrowed_id, "");
bool owned_by_us;
NodeID pinned_at;
bool spilled;
ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(borrowed_id, &pinned_at, &spilled));
ASSERT_TRUE(
rc->IsPlasmaObjectPinnedOrSpilled(borrowed_id, &owned_by_us, &pinned_at, &spilled));
ASSERT_FALSE(owned_by_us);
ObjectID id = ObjectID::FromRandom();
NodeID node_id = NodeID::FromRandom();
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
ASSERT_TRUE(owned_by_us);
ASSERT_TRUE(pinned_at.IsNil());
rc->UpdateObjectPinnedAtRaylet(id, node_id);
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
ASSERT_TRUE(owned_by_us);
ASSERT_FALSE(pinned_at.IsNil());
rc->RemoveLocalReference(id, nullptr);
ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
ASSERT_TRUE(deleted->count(id) > 0);
deleted->clear();
@ -2013,7 +2018,8 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
auto objects = rc->ResetObjectsOnRemovedNode(node_id);
ASSERT_EQ(objects.size(), 1);
ASSERT_EQ(objects[0], id);
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
ASSERT_TRUE(owned_by_us);
ASSERT_TRUE(pinned_at.IsNil());
ASSERT_TRUE(deleted->count(id) > 0);
deleted->clear();
@ -2035,9 +2041,11 @@ TEST_F(ReferenceCountTest, TestFree) {
ASSERT_FALSE(rc->SetDeleteCallback(id, callback));
ASSERT_EQ(deleted->count(id), 0);
rc->UpdateObjectPinnedAtRaylet(id, node_id);
bool owned_by_us;
NodeID pinned_at;
bool spilled;
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
ASSERT_TRUE(owned_by_us);
ASSERT_TRUE(pinned_at.IsNil());
ASSERT_TRUE(rc->IsPlasmaObjectFreed(id));
rc->RemoveLocalReference(id, nullptr);
@ -2052,7 +2060,8 @@ TEST_F(ReferenceCountTest, TestFree) {
rc->FreePlasmaObjects({id});
ASSERT_TRUE(rc->IsPlasmaObjectFreed(id));
ASSERT_TRUE(deleted->count(id) > 0);
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &owned_by_us, &pinned_at, &spilled));
ASSERT_TRUE(owned_by_us);
ASSERT_TRUE(pinned_at.IsNil());
rc->RemoveLocalReference(id, nullptr);
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));

View file

@ -132,8 +132,6 @@ class ObjectRecoveryManagerTest : public ::testing::Test {
std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto data = RayObject(nullptr, meta_buffer, std::vector<ObjectID>());
RAY_CHECK(memory_store_->Put(data, object_id));
ref_counter_->UpdateObjectPinnedAtRaylet(object_id, local_raylet_id_);
},
/*lineage_reconstruction_enabled=*/true) {}
@ -149,13 +147,27 @@ class ObjectRecoveryManagerTest : public ::testing::Test {
};
TEST_F(ObjectRecoveryManagerTest, TestNoReconstruction) {
// Lineage recording disabled.
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(failed_reconstructions_.empty());
ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
// Borrowed object.
object_id = ObjectID::FromRandom();
ref_counter_->AddLocalReference(object_id, "");
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(failed_reconstructions_.count(object_id) == 1);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
// Ref went out of scope.
object_id = ObjectID::FromRandom();
ASSERT_FALSE(manager_.RecoverObject(object_id));
ASSERT_TRUE(failed_reconstructions_.count(object_id) == 0);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
}
TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) {
@ -164,7 +176,7 @@ TEST_F(ObjectRecoveryManagerTest, TestPinNewCopy) {
std::vector<rpc::Address> addresses({rpc::Address()});
object_directory_->SetLocations(object_id, addresses);
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(raylet_client_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_.empty());
@ -176,7 +188,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstruction) {
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
task_resubmitter_->AddTask(object_id.TaskId(), {});
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_.empty());
@ -188,24 +200,30 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionSuppression) {
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
ref_counter_->AddLocalReference(object_id, "");
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(manager_.RecoverObject(object_id));
// A second attempt to recover the object will not trigger any more
// callbacks.
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(manager_.RecoverObject(object_id));
// A new copy of the object is pinned.
NodeID remote_node_id = NodeID::FromRandom();
rpc::Address address;
address.set_raylet_id(remote_node_id.Binary());
object_directory_->SetLocations(object_id, {address});
ASSERT_TRUE(object_directory_->Flush() == 1);
failed_reconstructions_.clear();
ASSERT_TRUE(raylet_client_->Flush() == 1);
// The object has been marked as failed. Another attempt to recover the
// object will not trigger any callbacks.
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
// The object has been marked as failed but it is still pinned on the new
// node. Another attempt to recover the object will not trigger any
// callbacks.
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_EQ(object_directory_->Flush(), 0);
// The object is removed and can be recovered again.
auto objects = ref_counter_->ResetObjectsOnRemovedNode(local_raylet_id_);
auto objects = ref_counter_->ResetObjectsOnRemovedNode(remote_node_id);
ASSERT_EQ(objects.size(), 1);
ASSERT_EQ(objects[0], object_id);
memory_store_->Delete(objects);
ASSERT_TRUE(manager_.RecoverObject(object_id).ok());
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(object_directory_->Flush() == 1);
}
@ -220,7 +238,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionChain) {
object_ids.push_back(object_id);
}
ASSERT_TRUE(manager_.RecoverObject(object_ids.back()).ok());
ASSERT_TRUE(manager_.RecoverObject(object_ids.back()));
for (int i = 0; i < 3; i++) {
RAY_LOG(INFO) << i;
ASSERT_EQ(object_directory_->Flush(), 1);