[core] Evict lineage to bound memory usage (#19946)

* bound lineage

* Bound lineage in bytes

* test

* Lineage evicted error

* Lineage evicted

* lint

* test

* test

* comment

* doc

* x

* x

* x

* x
This commit is contained in:
Stephanie Wang 2021-11-08 21:53:40 -08:00 committed by GitHub
parent e5e62d8991
commit ffcc5935d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 434 additions and 124 deletions

View file

@ -33,6 +33,11 @@ public class ObjectSerializer {
String.valueOf(ErrorType.ACTOR_DIED.getNumber()).getBytes(); String.valueOf(ErrorType.ACTOR_DIED.getNumber()).getBytes();
private static final byte[] UNRECONSTRUCTABLE_EXCEPTION_META = private static final byte[] UNRECONSTRUCTABLE_EXCEPTION_META =
String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes(); String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes();
private static final byte[] UNRECONSTRUCTABLE_LINEAGE_EVICTED_EXCEPTION_META =
String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTED.getNumber()).getBytes();
private static final byte[] UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED_EXCEPTION_META =
String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED.getNumber())
.getBytes();
private static final byte[] OBJECT_LOST_META = private static final byte[] OBJECT_LOST_META =
String.valueOf(ErrorType.OBJECT_LOST.getNumber()).getBytes(); String.valueOf(ErrorType.OBJECT_LOST.getNumber()).getBytes();
private static final byte[] OWNER_DIED_META = private static final byte[] OWNER_DIED_META =
@ -84,6 +89,8 @@ public class ObjectSerializer {
} else if (Bytes.indexOf(meta, WORKER_EXCEPTION_META) == 0) { } else if (Bytes.indexOf(meta, WORKER_EXCEPTION_META) == 0) {
return new RayWorkerException(); return new RayWorkerException();
} else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0 } else if (Bytes.indexOf(meta, UNRECONSTRUCTABLE_EXCEPTION_META) == 0
|| Bytes.indexOf(meta, UNRECONSTRUCTABLE_LINEAGE_EVICTED_EXCEPTION_META) == 0
|| Bytes.indexOf(meta, UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED_EXCEPTION_META) == 0
|| Bytes.indexOf(meta, OBJECT_LOST_META) == 0 || Bytes.indexOf(meta, OBJECT_LOST_META) == 0
|| Bytes.indexOf(meta, OWNER_DIED_META) == 0 || Bytes.indexOf(meta, OWNER_DIED_META) == 0
|| Bytes.indexOf(meta, OBJECT_DELETED_META) == 0) { || Bytes.indexOf(meta, OBJECT_DELETED_META) == 0) {

View file

@ -365,8 +365,7 @@ class OwnerDiedError(ObjectLostError):
class ObjectReconstructionFailedError(ObjectLostError): class ObjectReconstructionFailedError(ObjectLostError):
"""Indicates that the owner of the object has died while there is still a """Indicates that the object cannot be reconstructed.
reference to the object.
Attributes: Attributes:
object_ref_hex: Hex ID of the object. object_ref_hex: Hex ID of the object.
@ -375,7 +374,40 @@ class ObjectReconstructionFailedError(ObjectLostError):
def __str__(self): def __str__(self):
return self._base_str() + "\n\n" + ( return self._base_str() + "\n\n" + (
"The object cannot be reconstructed " "The object cannot be reconstructed "
"because the maximum number of task retries has been exceeded.") "because it was created by an actor, ray.put() call, or its "
"ObjectRef was created by a different worker.")
class ObjectReconstructionFailedMaxAttemptsExceededError(ObjectLostError):
"""Indicates that the object cannot be reconstructed because the maximum
number of task retries has been exceeded.
Attributes:
object_ref_hex: Hex ID of the object.
"""
def __str__(self):
return self._base_str() + "\n\n" + (
"The object cannot be reconstructed "
"because the maximum number of task retries has been exceeded. "
"To prevent this error, set "
"`@ray.remote(max_retries=<num retries>)` (default 3).")
class ObjectReconstructionFailedLineageEvictedError(ObjectLostError):
"""Indicates that the object cannot be reconstructed because its lineage
was evicted due to memory pressure.
Attributes:
object_ref_hex: Hex ID of the object.
"""
def __str__(self):
return self._base_str() + "\n\n" + (
"The object cannot be reconstructed because its lineage has been "
"evicted to reduce memory pressure. "
"To prevent this error, set the environment variable "
"RAY_max_lineage_bytes=<bytes> (default 1GB) during `ray start`.")
class GetTimeoutError(RayError): class GetTimeoutError(RayError):
@ -413,6 +445,8 @@ RAY_EXCEPTION_TYPES = [
ObjectLostError, ObjectLostError,
ReferenceCountingAssertionError, ReferenceCountingAssertionError,
ObjectReconstructionFailedError, ObjectReconstructionFailedError,
ObjectReconstructionFailedMaxAttemptsExceededError,
ObjectReconstructionFailedLineageEvictedError,
OwnerDiedError, OwnerDiedError,
GetTimeoutError, GetTimeoutError,
AsyncioActorExit, AsyncioActorExit,

View file

@ -6,11 +6,14 @@ import ray.cloudpickle as pickle
from ray import ray_constants from ray import ray_constants
import ray._private.utils import ray._private.utils
from ray._private.gcs_utils import ErrorType from ray._private.gcs_utils import ErrorType
from ray.exceptions import ( from ray.exceptions import (RayError, PlasmaObjectNotAvailable, RayTaskError,
RayError, PlasmaObjectNotAvailable, RayTaskError, RayActorError, RayActorError, TaskCancelledError,
TaskCancelledError, WorkerCrashedError, ObjectLostError, WorkerCrashedError, ObjectLostError,
ReferenceCountingAssertionError, OwnerDiedError, ReferenceCountingAssertionError, OwnerDiedError,
ObjectReconstructionFailedError, RaySystemError, RuntimeEnvSetupError) ObjectReconstructionFailedError,
ObjectReconstructionFailedMaxAttemptsExceededError,
ObjectReconstructionFailedLineageEvictedError,
RaySystemError, RuntimeEnvSetupError)
from ray._raylet import ( from ray._raylet import (
split_buffer, split_buffer,
unpack_pickle5_buffers, unpack_pickle5_buffers,
@ -239,6 +242,16 @@ class SerializationContext:
return ObjectReconstructionFailedError( return ObjectReconstructionFailedError(
object_ref.hex(), object_ref.owner_address(), object_ref.hex(), object_ref.owner_address(),
object_ref.call_site()) object_ref.call_site())
elif error_type == ErrorType.Value(
"OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED"):
return ObjectReconstructionFailedMaxAttemptsExceededError(
object_ref.hex(), object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value(
"OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTED"):
return ObjectReconstructionFailedLineageEvictedError(
object_ref.hex(), object_ref.owner_address(),
object_ref.call_site())
elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"): elif error_type == ErrorType.Value("RUNTIME_ENV_SETUP_FAILED"):
return RuntimeEnvSetupError() return RuntimeEnvSetupError()
else: else:

View file

@ -170,7 +170,8 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled):
num_cpus=1, resources={"node1": 1}, object_store_memory=10**8) num_cpus=1, resources={"node1": 1}, object_store_memory=10**8)
if reconstruction_enabled: if reconstruction_enabled:
with pytest.raises(ray.exceptions.ObjectReconstructionFailedError): with pytest.raises(ray.exceptions.
ObjectReconstructionFailedMaxAttemptsExceededError):
ray.get(obj) ray.get(obj)
else: else:
with pytest.raises(ray.exceptions.ObjectLostError): with pytest.raises(ray.exceptions.ObjectLostError):
@ -681,6 +682,60 @@ def test_nondeterministic_output(ray_start_cluster, reconstruction_enabled):
ray.get(x) ray.get(x)
def test_lineage_evicted(ray_start_cluster):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"object_timeout_milliseconds": 200,
"max_lineage_bytes": 10_000,
}
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_system_config=config,
object_store_memory=10**8,
enable_object_reconstruction=True)
ray.init(address=cluster.address)
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)
cluster.wait_for_nodes()
@ray.remote
def large_object():
return np.zeros(10**7, dtype=np.uint8)
@ray.remote
def chain(x):
return x
@ray.remote
def dependent_task(x):
return x
obj = large_object.remote()
for _ in range(5):
obj = chain.remote(obj)
ray.get(dependent_task.remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False)
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)
ray.get(dependent_task.remote(obj))
# Lineage now exceeds the eviction factor.
for _ in range(100):
obj = chain.remote(obj)
ray.get(dependent_task.remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False)
cluster.add_node(num_cpus=1, object_store_memory=10**8)
try:
ray.get(dependent_task.remote(obj))
assert False
except ray.exceptions.RayTaskError as e:
assert "ObjectReconstructionFailedLineageEvictedError" in str(e)
if __name__ == "__main__": if __name__ == "__main__":
import pytest import pytest
sys.exit(pytest.main(["-v", __file__])) sys.exit(pytest.main(["-v", __file__]))

View file

@ -50,7 +50,7 @@ namespace core {
class MockTaskResubmissionInterface : public TaskResubmissionInterface { class MockTaskResubmissionInterface : public TaskResubmissionInterface {
public: public:
MOCK_METHOD(Status, ResubmitTask, MOCK_METHOD(bool, ResubmitTask,
(const TaskID &task_id, std::vector<ObjectID> *task_deps), (override)); (const TaskID &task_id, std::vector<ObjectID> *task_deps), (override));
}; };

View file

@ -92,6 +92,14 @@ RAY_CONFIG(size_t, free_objects_batch_size, 100)
RAY_CONFIG(bool, lineage_pinning_enabled, false) RAY_CONFIG(bool, lineage_pinning_enabled, false)
/// Maximum amount of lineage to keep in bytes. This includes the specs of all
/// tasks that have previously already finished but that may be retried again.
/// If we reach this limit, 50% of the current lineage will be evicted and
/// objects that are still in scope will no longer be reconstructed if lost.
/// Each task spec is on the order of 1KB but can be much larger if it has many
/// inlined args.
RAY_CONFIG(int64_t, max_lineage_bytes, 1024 * 1024 * 1024)
/// Whether to re-populate plasma memory. This avoids memory allocation failures /// Whether to re-populate plasma memory. This avoids memory allocation failures
/// at runtime (SIGBUS errors creating new objects), however it will use more memory /// at runtime (SIGBUS errors creating new objects), however it will use more memory
/// upfront and can slow down Ray startup. /// upfront and can slow down Ray startup.

View file

@ -631,7 +631,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
} }
} }
}, },
check_node_alive_fn, reconstruct_object_callback, push_error_callback)); check_node_alive_fn, reconstruct_object_callback, push_error_callback,
RayConfig::instance().max_lineage_bytes()));
// Create an entry for the driver task in the task table. This task is // Create an entry for the driver task in the task table. This task is
// added immediately with status RUNNING. This allows us to push errors // added immediately with status RUNNING. This allows us to push errors

View file

@ -126,10 +126,20 @@ void ObjectRecoveryManager::PinExistingObjectCopy(
} }
void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) { void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
if (!reference_counter_->IsObjectReconstructable(object_id)) { bool lineage_evicted = false;
if (!reference_counter_->IsObjectReconstructable(object_id, &lineage_evicted)) {
RAY_LOG(DEBUG) << "Object " << object_id << " is not reconstructable"; RAY_LOG(DEBUG) << "Object " << object_id << " is not reconstructable";
recovery_failure_callback_(object_id, rpc::ErrorType::OBJECT_LOST, if (lineage_evicted) {
/*pin_object=*/true); // TODO(swang): We may not report the LINEAGE_EVICTED error (just reports
// general OBJECT_UNRECONSTRUCTABLE error) if lineage eviction races with
// reconstruction.
recovery_failure_callback_(object_id,
rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTED,
/*pin_object=*/true);
} else {
recovery_failure_callback_(object_id, rpc::ErrorType::OBJECT_LOST,
/*pin_object=*/true);
}
return; return;
} }
@ -138,24 +148,28 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
// object. // object.
const auto task_id = object_id.TaskId(); const auto task_id = object_id.TaskId();
std::vector<ObjectID> task_deps; std::vector<ObjectID> task_deps;
auto status = task_resubmitter_->ResubmitTask(task_id, &task_deps); auto resubmitted = task_resubmitter_->ResubmitTask(task_id, &task_deps);
if (status.ok()) { if (resubmitted) {
// Try to recover the task's dependencies. // Try to recover the task's dependencies.
for (const auto &dep : task_deps) { for (const auto &dep : task_deps) {
auto recovered = RecoverObject(dep); auto recovered = RecoverObject(dep);
if (!recovered) { if (!recovered) {
RAY_LOG(INFO) << "Failed to reconstruct object " << dep << ": " RAY_LOG(INFO) << "Failed to reconstruct object " << dep;
<< status.message(); // This case can happen if the dependency was borrowed from another
// worker, or if there was a bug in reconstruction that caused us to GC
// the dependency ref.
// We do not pin the dependency because we may not be the owner. // We do not pin the dependency because we may not be the owner.
recovery_failure_callback_(dep, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE, recovery_failure_callback_(dep, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE,
/*pin_object=*/false); /*pin_object=*/false);
} }
} }
} else { } else {
RAY_LOG(INFO) << "Failed to reconstruct object " << object_id; RAY_LOG(INFO) << "Failed to reconstruct object " << object_id
recovery_failure_callback_(object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE, << " because lineage has already been deleted";
/*pin_object=*/true); recovery_failure_callback_(
object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED,
/*pin_object=*/true);
} }
} }

View file

@ -177,6 +177,8 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
// If the entry doesn't exist, we initialize the direct reference count to zero // If the entry doesn't exist, we initialize the direct reference count to zero
// because this corresponds to a submitted task whose return ObjectID will be created // because this corresponds to a submitted task whose return ObjectID will be created
// in the frontend language, incrementing the reference count. // in the frontend language, incrementing the reference count.
// TODO(swang): Objects that are not reconstructable should not increment
// their arguments' lineage ref counts.
auto it = object_id_refs_ auto it = object_id_refs_
.emplace(object_id, Reference(owner_address, call_site, object_size, .emplace(object_id, Reference(owner_address, call_site, object_size,
is_reconstructable, pinned_at_raylet_id)) is_reconstructable, pinned_at_raylet_id))
@ -190,6 +192,11 @@ void ReferenceCounter::AddOwnedObject(const ObjectID &object_id,
// We eagerly add the pinned location to the set of object locations. // We eagerly add the pinned location to the set of object locations.
AddObjectLocationInternal(it, pinned_at_raylet_id.value()); AddObjectLocationInternal(it, pinned_at_raylet_id.value());
} }
reconstructable_owned_objects_.emplace_back(object_id);
auto back_it = reconstructable_owned_objects_.end();
back_it--;
RAY_CHECK(reconstructable_owned_objects_index_.emplace(object_id, back_it).second);
} }
void ReferenceCounter::RemoveOwnedObject(const ObjectID &object_id) { void ReferenceCounter::RemoveOwnedObject(const ObjectID &object_id) {
@ -335,37 +342,43 @@ void ReferenceCounter::UpdateFinishedTaskReferences(
RemoveSubmittedTaskReferences(argument_ids, release_lineage, deleted); RemoveSubmittedTaskReferences(argument_ids, release_lineage, deleted);
} }
void ReferenceCounter::ReleaseLineageReferences( int64_t ReferenceCounter::ReleaseLineageReferences(ReferenceTable::iterator ref) {
const std::vector<ObjectID> &argument_ids) { int64_t lineage_bytes_evicted = 0;
absl::MutexLock lock(&mutex_); std::vector<ObjectID> argument_ids;
ReleaseLineageReferencesInternal(argument_ids); if (on_lineage_released_ && ref->second.owned_by_us) {
} RAY_LOG(DEBUG) << "Releasing lineage for object " << ref->first;
lineage_bytes_evicted += on_lineage_released_(ref->first, &argument_ids);
// The object is still in scope by the application and it was
// reconstructable with lineage. Mark that its lineage has been evicted so
// we can return the right error during reconstruction.
if (!ref->second.OutOfScope(lineage_pinning_enabled_) &&
ref->second.is_reconstructable) {
ref->second.lineage_evicted = true;
ref->second.is_reconstructable = false;
}
}
void ReferenceCounter::ReleaseLineageReferencesInternal(
const std::vector<ObjectID> &argument_ids) {
for (const ObjectID &argument_id : argument_ids) { for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id); auto arg_it = object_id_refs_.find(argument_id);
if (it == object_id_refs_.end()) { if (arg_it == object_id_refs_.end()) {
// References can get evicted early when lineage pinning is disabled.
RAY_CHECK(!lineage_pinning_enabled_);
continue; continue;
} }
if (it->second.lineage_ref_count == 0) { if (arg_it->second.lineage_ref_count == 0) {
// References can get evicted early when lineage pinning is disabled.
RAY_CHECK(!lineage_pinning_enabled_);
continue; continue;
} }
RAY_LOG(DEBUG) << "Releasing lineage internal for argument " << argument_id; RAY_LOG(DEBUG) << "Releasing lineage internal for argument " << argument_id;
it->second.lineage_ref_count--; arg_it->second.lineage_ref_count--;
if (it->second.lineage_ref_count == 0) { if (arg_it->second.ShouldDelete(lineage_pinning_enabled_)) {
// Don't have to pass in a deleted vector here because the reference // We only decremented the lineage ref count, so the argument value
// cannot have gone out of scope here since we are only modifying the // should already be released.
// lineage ref count. RAY_CHECK(arg_it->second.on_ref_removed == nullptr);
DeleteReferenceInternal(it, nullptr); lineage_bytes_evicted += ReleaseLineageReferences(arg_it);
EraseReference(arg_it);
} }
} }
return lineage_bytes_evicted;
} }
void ReferenceCounter::RemoveSubmittedTaskReferences( void ReferenceCounter::RemoveSubmittedTaskReferences(
@ -384,9 +397,6 @@ void ReferenceCounter::RemoveSubmittedTaskReferences(
if (release_lineage) { if (release_lineage) {
if (it->second.lineage_ref_count > 0) { if (it->second.lineage_ref_count > 0) {
it->second.lineage_ref_count--; it->second.lineage_ref_count--;
} else {
// References can get evicted early when lineage pinning is disabled.
RAY_CHECK(!lineage_pinning_enabled_);
} }
} }
if (it->second.RefCount() == 0) { if (it->second.RefCount() == 0) {
@ -482,13 +492,7 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
PRINT_REF_COUNT(it); PRINT_REF_COUNT(it);
// Whether it is safe to unpin the value. // Whether it is safe to unpin the value.
bool should_delete_value = false;
bool should_delete_ref = true;
if (it->second.OutOfScope(lineage_pinning_enabled_)) { if (it->second.OutOfScope(lineage_pinning_enabled_)) {
// If distributed ref counting is enabled, then delete the object once its
// ref count across all processes is 0.
should_delete_value = true;
for (const auto &inner_id : it->second.contains) { for (const auto &inner_id : it->second.contains) {
auto inner_it = object_id_refs_.find(inner_id); auto inner_it = object_id_refs_.find(inner_id);
if (inner_it != object_id_refs_.end()) { if (inner_it != object_id_refs_.end()) {
@ -504,31 +508,55 @@ void ReferenceCounter::DeleteReferenceInternal(ReferenceTable::iterator it,
DeleteReferenceInternal(inner_it, deleted); DeleteReferenceInternal(inner_it, deleted);
} }
} }
} // Perform the deletion.
// Perform the deletion.
if (should_delete_value) {
ReleasePlasmaObject(it); ReleasePlasmaObject(it);
if (deleted) { if (deleted) {
deleted->push_back(id); deleted->push_back(id);
} }
auto index_it = reconstructable_owned_objects_index_.find(id);
if (index_it != reconstructable_owned_objects_index_.end()) {
reconstructable_owned_objects_.erase(index_it->second);
reconstructable_owned_objects_index_.erase(index_it);
}
} }
if (it->second.ShouldDelete(lineage_pinning_enabled_) && should_delete_ref) {
if (it->second.ShouldDelete(lineage_pinning_enabled_)) {
RAY_LOG(DEBUG) << "Deleting Reference to object " << id; RAY_LOG(DEBUG) << "Deleting Reference to object " << id;
// TODO(swang): Update lineage_ref_count for nested objects? // TODO(swang): Update lineage_ref_count for nested objects?
if (on_lineage_released_ && it->second.owned_by_us) { ReleaseLineageReferences(it);
RAY_LOG(DEBUG) << "Releasing lineage for object " << id; EraseReference(it);
std::vector<ObjectID> ids_to_release;
on_lineage_released_(id, &ids_to_release);
ReleaseLineageReferencesInternal(ids_to_release);
}
freed_objects_.erase(id);
object_id_refs_.erase(it);
ShutdownIfNeeded();
} }
} }
void ReferenceCounter::EraseReference(ReferenceTable::iterator it) {
RAY_CHECK(it->second.ShouldDelete(lineage_pinning_enabled_));
auto index_it = reconstructable_owned_objects_index_.find(it->first);
if (index_it != reconstructable_owned_objects_index_.end()) {
reconstructable_owned_objects_.erase(index_it->second);
reconstructable_owned_objects_index_.erase(index_it);
}
freed_objects_.erase(it->first);
object_id_refs_.erase(it);
ShutdownIfNeeded();
}
int64_t ReferenceCounter::EvictLineage(int64_t min_bytes_to_evict) {
absl::MutexLock lock(&mutex_);
int64_t lineage_bytes_evicted = 0;
while (!reconstructable_owned_objects_.empty() &&
lineage_bytes_evicted < min_bytes_to_evict) {
ObjectID object_id = std::move(reconstructable_owned_objects_.front());
reconstructable_owned_objects_.pop_front();
reconstructable_owned_objects_index_.erase(object_id);
auto it = object_id_refs_.find(object_id);
RAY_CHECK(it != object_id_refs_.end());
lineage_bytes_evicted += ReleaseLineageReferences(it);
}
return lineage_bytes_evicted;
}
void ReferenceCounter::ReleasePlasmaObject(ReferenceTable::iterator it) { void ReferenceCounter::ReleasePlasmaObject(ReferenceTable::iterator it) {
if (it->second.on_delete) { if (it->second.on_delete) {
RAY_LOG(DEBUG) << "Calling on_delete for object " << it->first; RAY_LOG(DEBUG) << "Calling on_delete for object " << it->first;
@ -1197,7 +1225,8 @@ void ReferenceCounter::AddBorrowerAddress(const ObjectID &object_id,
} }
} }
bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id) const { bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id,
bool *lineage_evicted) const {
if (!lineage_pinning_enabled_) { if (!lineage_pinning_enabled_) {
return false; return false;
} }
@ -1206,6 +1235,7 @@ bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id) const
if (it == object_id_refs_.end()) { if (it == object_id_refs_.end()) {
return false; return false;
} }
*lineage_evicted = it->second.lineage_evicted;
return it->second.is_reconstructable; return it->second.is_reconstructable;
} }

View file

@ -59,8 +59,9 @@ class ReferenceCounter : public ReferenceCounterInterface,
using ReferenceTableProto = using ReferenceTableProto =
::google::protobuf::RepeatedPtrField<rpc::ObjectReferenceCount>; ::google::protobuf::RepeatedPtrField<rpc::ObjectReferenceCount>;
using ReferenceRemovedCallback = std::function<void(const ObjectID &)>; using ReferenceRemovedCallback = std::function<void(const ObjectID &)>;
// Returns the amount of lineage in bytes released.
using LineageReleasedCallback = using LineageReleasedCallback =
std::function<void(const ObjectID &, std::vector<ObjectID> *)>; std::function<int64_t(const ObjectID &, std::vector<ObjectID> *)>;
ReferenceCounter(const rpc::WorkerAddress &rpc_address, ReferenceCounter(const rpc::WorkerAddress &rpc_address,
pubsub::PublisherInterface *object_info_publisher, pubsub::PublisherInterface *object_info_publisher,
@ -146,17 +147,6 @@ class ReferenceCounter : public ReferenceCounterInterface,
std::vector<ObjectID> *deleted) std::vector<ObjectID> *deleted)
LOCKS_EXCLUDED(mutex_); LOCKS_EXCLUDED(mutex_);
/// Release the lineage ref count for this list of object IDs. An object's
/// lineage ref count is the number of tasks that depend on the object that
/// may be retried in the future (pending execution or finished but
/// retryable). If the object is direct (not stored in plasma), then its
/// lineage ref count is 0.
///
/// \param[in] argument_ids The list of objects whose lineage ref counts we
/// should decrement.
void ReleaseLineageReferences(const std::vector<ObjectID> &argument_ids)
LOCKS_EXCLUDED(mutex_);
/// Add an object that we own. The object may depend on other objects. /// Add an object that we own. The object may depend on other objects.
/// Dependencies for each ObjectID must be set at most once. The local /// Dependencies for each ObjectID must be set at most once. The local
/// reference count for the ObjectID is set to zero, which assumes that an /// reference count for the ObjectID is set to zero, which assumes that an
@ -471,7 +461,13 @@ class ReferenceCounter : public ReferenceCounterInterface,
void AddBorrowerAddress(const ObjectID &object_id, const rpc::Address &borrower_address) void AddBorrowerAddress(const ObjectID &object_id, const rpc::Address &borrower_address)
LOCKS_EXCLUDED(mutex_); LOCKS_EXCLUDED(mutex_);
bool IsObjectReconstructable(const ObjectID &object_id) const; bool IsObjectReconstructable(const ObjectID &object_id, bool *lineage_evicted) const;
/// Evict lineage of objects that are still in scope. This evicts lineage in
/// FIFO order, based on when the ObjectRef was created.
///
/// \param[in] min_bytes_to_evict The minimum number of bytes to evict.
int64_t EvictLineage(int64_t min_bytes_to_evict);
private: private:
struct Reference { struct Reference {
@ -569,8 +565,9 @@ class ReferenceCounter : public ReferenceCounterInterface,
absl::flat_hash_set<NodeID> locations; absl::flat_hash_set<NodeID> locations;
// Whether this object can be reconstructed via lineage. If false, then the // Whether this object can be reconstructed via lineage. If false, then the
// object's value will be pinned as long as it is referenced by any other // object's value will be pinned as long as it is referenced by any other
// object's lineage. // object's lineage. This should be set to false if the object was created
const bool is_reconstructable = false; // by ray.put(), a task that cannot be retried, or its lineage was evicted.
bool is_reconstructable = false;
/// The local ref count for the ObjectID in the language frontend. /// The local ref count for the ObjectID in the language frontend.
size_t local_ref_count = 0; size_t local_ref_count = 0;
@ -624,6 +621,8 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// is inlined (not stored in plasma), then its lineage ref count is 0 /// is inlined (not stored in plasma), then its lineage ref count is 0
/// because any dependent task will already have the value of the object. /// because any dependent task will already have the value of the object.
size_t lineage_ref_count = 0; size_t lineage_ref_count = 0;
/// Whether the lineage of this object was evicted due to memory pressure.
bool lineage_evicted = false;
/// Whether this object has been spilled to external storage. /// Whether this object has been spilled to external storage.
bool spilled = false; bool spilled = false;
/// For objects that have been spilled to external storage, the URL from which /// For objects that have been spilled to external storage, the URL from which
@ -760,8 +759,13 @@ class ReferenceCounter : public ReferenceCounterInterface,
std::vector<ObjectID> *deleted) std::vector<ObjectID> *deleted)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Helper method to decrement the lineage ref count for a list of objects. /// Erase the Reference from the table. Assumes that the entry has no more
void ReleaseLineageReferencesInternal(const std::vector<ObjectID> &argument_ids) /// references, normal or lineage.
void EraseReference(ReferenceTable::iterator entry) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Helper method to garbage-collect all out-of-scope References in the
/// lineage for this object.
int64_t ReleaseLineageReferences(ReferenceTable::iterator entry)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Add a new location for the given object. The owner must have the object ref in /// Add a new location for the given object. The owner must have the object ref in
@ -823,6 +827,7 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// The callback to call once an object ID that we own is no longer in scope /// The callback to call once an object ID that we own is no longer in scope
/// and it has no tasks that depend on it that may be retried in the future. /// and it has no tasks that depend on it that may be retried in the future.
/// The object's Reference will be erased after this callback. /// The object's Reference will be erased after this callback.
// Returns the amount of lineage in bytes released.
LineageReleasedCallback on_lineage_released_; LineageReleasedCallback on_lineage_released_;
/// Optional shutdown hook to call when all references have gone /// Optional shutdown hook to call when all references have gone
/// out of scope. /// out of scope.
@ -836,6 +841,18 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// Object status subscriber. It is used to subscribe the ref removed information from /// Object status subscriber. It is used to subscribe the ref removed information from
/// other workers. /// other workers.
pubsub::SubscriberInterface *object_info_subscriber_; pubsub::SubscriberInterface *object_info_subscriber_;
/// Objects that we own that are still in scope at the application level and
/// that may be reconstructed. These objects may have pinned lineage that
/// should be evicted on memory pressure. The queue is in FIFO order, based
/// on ObjectRef creation time.
std::list<ObjectID> reconstructable_owned_objects_ GUARDED_BY(mutex_);
/// We keep a FIFO queue of objects in scope so that we can choose lineage to
/// evict under memory pressure. This is an index from ObjectID to the
/// object's place in the queue.
absl::flat_hash_map<ObjectID, std::list<ObjectID>::iterator>
reconstructable_owned_objects_index_ GUARDED_BY(mutex_);
}; };
} // namespace core } // namespace core

View file

@ -2289,6 +2289,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestBasicLineage) {
rc->SetReleaseLineageCallback( rc->SetReleaseLineageCallback(
[&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) { [&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
lineage_deleted.push_back(object_id); lineage_deleted.push_back(object_id);
return 0;
}); });
// We should not keep lineage for borrowed objects. // We should not keep lineage for borrowed objects.
@ -2334,6 +2335,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
if (i > 0) { if (i > 0) {
ids_to_release->push_back(ids[i - 1]); ids_to_release->push_back(ids[i - 1]);
} }
return 0;
}); });
for (size_t i = 0; i < ids.size() - 1; i++) { for (size_t i = 0; i < ids.size() - 1; i++) {
@ -2366,6 +2368,53 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPinLineageRecursive) {
ASSERT_EQ(rc->NumObjectIDsInScope(), 0); ASSERT_EQ(rc->NumObjectIDsInScope(), 0);
} }
TEST_F(ReferenceCountLineageEnabledTest, TestEvictLineage) {
std::vector<ObjectID> ids;
for (int i = 0; i < 3; i++) {
ObjectID id = ObjectID::FromRandom();
ids.push_back(id);
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
}
std::vector<ObjectID> lineage_deleted;
rc->SetReleaseLineageCallback(
[&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
lineage_deleted.push_back(object_id);
if (object_id == ids[1]) {
// ID1 depends on ID0.
ids_to_release->push_back(ids[0]);
}
return 10;
});
// ID1 depends on ID0.
rc->UpdateSubmittedTaskReferences({ids[0]});
rc->UpdateFinishedTaskReferences({ids[0]}, /*release_lineage=*/false, empty_borrower,
empty_refs, nullptr);
rc->AddLocalReference(ids[1], "");
rc->AddLocalReference(ids[2], "");
bool lineage_evicted = false;
for (const auto &id : ids) {
ASSERT_TRUE(rc->IsObjectReconstructable(id, &lineage_evicted));
ASSERT_FALSE(lineage_evicted);
}
// IDs 0 and 1 should be evicted because they were created before ID2, and
// ID1 depends on ID0.
auto bytes_evicted = rc->EvictLineage(10);
ASSERT_EQ(bytes_evicted, 20);
ASSERT_EQ(lineage_deleted.size(), 2);
ASSERT_FALSE(rc->HasReference(ids[0]));
ASSERT_TRUE(rc->HasReference(ids[1]));
ASSERT_TRUE(rc->HasReference(ids[2]));
// ID1 is no longer reconstructable due to lineage eviction.
ASSERT_FALSE(rc->IsObjectReconstructable(ids[1], &lineage_evicted));
ASSERT_TRUE(lineage_evicted);
ASSERT_TRUE(rc->IsObjectReconstructable(ids[2], &lineage_evicted));
ASSERT_FALSE(lineage_evicted);
}
TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) { TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) {
std::vector<ObjectID> out; std::vector<ObjectID> out;
std::vector<ObjectID> lineage_deleted; std::vector<ObjectID> lineage_deleted;
@ -2376,6 +2425,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestResubmittedTask) {
rc->SetReleaseLineageCallback( rc->SetReleaseLineageCallback(
[&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) { [&](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
lineage_deleted.push_back(object_id); lineage_deleted.push_back(object_id);
return 0;
}); });
// Local references. // Local references.

View file

@ -84,29 +84,38 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
{ {
absl::MutexLock lock(&mu_); absl::MutexLock lock(&mu_);
RAY_CHECK(submissible_tasks_ auto inserted = submissible_tasks_.emplace(spec.TaskId(),
.emplace(spec.TaskId(), TaskEntry(spec, max_retries, num_returns)) TaskEntry(spec, max_retries, num_returns));
.second); RAY_CHECK(inserted.second);
num_pending_tasks_++; num_pending_tasks_++;
} }
return returned_refs; return returned_refs;
} }
Status TaskManager::ResubmitTask(const TaskID &task_id, bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) {
std::vector<ObjectID> *task_deps) {
TaskSpecification spec; TaskSpecification spec;
bool resubmit = false; bool resubmit = false;
{ {
absl::MutexLock lock(&mu_); absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id); auto it = submissible_tasks_.find(task_id);
if (it == submissible_tasks_.end()) { if (it == submissible_tasks_.end()) {
return Status::Invalid("Task spec missing"); // This can happen when the task has already been
// retried up to its max attempts.
return false;
} }
if (!it->second.pending) { if (!it->second.pending) {
resubmit = true; resubmit = true;
it->second.pending = true; it->second.pending = true;
num_pending_tasks_++;
// The task is pending again, so it's no longer counted as lineage. If
// the task finishes and we still need the spec, we'll add the task back
// to the footprint sum.
total_lineage_footprint_bytes_ -= it->second.lineage_footprint_bytes;
it->second.lineage_footprint_bytes = 0;
if (it->second.num_retries_left > 0) { if (it->second.num_retries_left > 0) {
it->second.num_retries_left--; it->second.num_retries_left--;
} else { } else {
@ -140,7 +149,7 @@ Status TaskManager::ResubmitTask(const TaskID &task_id,
retry_task_callback_(spec, /*delay=*/false); retry_task_callback_(spec, /*delay=*/false);
} }
return Status::OK(); return true;
} }
void TaskManager::DrainAndShutdown(std::function<void()> shutdown) { void TaskManager::DrainAndShutdown(std::function<void()> shutdown) {
@ -273,6 +282,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
TaskSpecification spec; TaskSpecification spec;
bool release_lineage = true; bool release_lineage = true;
int64_t min_lineage_bytes_to_evict = 0;
{ {
absl::MutexLock lock(&mu_); absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id); auto it = submissible_tasks_.find(task_id);
@ -304,12 +314,26 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
if (task_retryable) { if (task_retryable) {
// Pin the task spec if it may be retried again. // Pin the task spec if it may be retried again.
release_lineage = false; release_lineage = false;
it->second.lineage_footprint_bytes = it->second.spec.GetMessage().ByteSizeLong();
total_lineage_footprint_bytes_ += it->second.lineage_footprint_bytes;
if (total_lineage_footprint_bytes_ > max_lineage_bytes_) {
RAY_LOG(INFO) << "Total lineage size is " << total_lineage_footprint_bytes_ / 1e6
<< "MB, which exceeds the limit of " << max_lineage_bytes_ / 1e6
<< "MB";
min_lineage_bytes_to_evict =
total_lineage_footprint_bytes_ - (max_lineage_bytes_ / 2);
}
} else { } else {
submissible_tasks_.erase(it); submissible_tasks_.erase(it);
} }
} }
RemoveFinishedTaskReferences(spec, release_lineage, worker_addr, reply.borrowed_refs()); RemoveFinishedTaskReferences(spec, release_lineage, worker_addr, reply.borrowed_refs());
if (min_lineage_bytes_to_evict > 0) {
// Evict at least half of the current lineage.
auto bytes_evicted = reference_counter_->EvictLineage(min_lineage_bytes_to_evict);
RAY_LOG(INFO) << "Evicted " << bytes_evicted / 1e6 << "MB of task lineage.";
}
ShutdownIfNeeded(); ShutdownIfNeeded();
} }
@ -460,14 +484,16 @@ void TaskManager::RemoveFinishedTaskReferences(
in_memory_store_->Delete(deleted); in_memory_store_->Delete(deleted);
} }
void TaskManager::RemoveLineageReference(const ObjectID &object_id, int64_t TaskManager::RemoveLineageReference(const ObjectID &object_id,
std::vector<ObjectID> *released_objects) { std::vector<ObjectID> *released_objects) {
absl::MutexLock lock(&mu_); absl::MutexLock lock(&mu_);
const int64_t total_lineage_footprint_bytes_prev(total_lineage_footprint_bytes_);
const TaskID &task_id = object_id.TaskId(); const TaskID &task_id = object_id.TaskId();
auto it = submissible_tasks_.find(task_id); auto it = submissible_tasks_.find(task_id);
if (it == submissible_tasks_.end()) { if (it == submissible_tasks_.end()) {
RAY_LOG(DEBUG) << "No lineage for object " << object_id; RAY_LOG(DEBUG) << "No lineage for object " << object_id;
return; return 0;
} }
RAY_LOG(DEBUG) << "Plasma object " << object_id << " out of scope"; RAY_LOG(DEBUG) << "Plasma object " << object_id << " out of scope";
@ -493,10 +519,13 @@ void TaskManager::RemoveLineageReference(const ObjectID &object_id,
} }
} }
total_lineage_footprint_bytes_ -= it->second.lineage_footprint_bytes;
// The task has finished and none of the return IDs are in scope anymore, // The task has finished and none of the return IDs are in scope anymore,
// so it is safe to remove the task spec. // so it is safe to remove the task spec.
submissible_tasks_.erase(it); submissible_tasks_.erase(it);
} }
return total_lineage_footprint_bytes_ - total_lineage_footprint_bytes_prev;
} }
bool TaskManager::MarkTaskCanceled(const TaskID &task_id) { bool TaskManager::MarkTaskCanceled(const TaskID &task_id) {

View file

@ -55,8 +55,7 @@ class TaskFinisherInterface {
class TaskResubmissionInterface { class TaskResubmissionInterface {
public: public:
virtual Status ResubmitTask(const TaskID &task_id, virtual bool ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) = 0;
std::vector<ObjectID> *task_deps) = 0;
virtual ~TaskResubmissionInterface() {} virtual ~TaskResubmissionInterface() {}
}; };
@ -77,17 +76,18 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
RetryTaskCallback retry_task_callback, RetryTaskCallback retry_task_callback,
const std::function<bool(const NodeID &node_id)> &check_node_alive, const std::function<bool(const NodeID &node_id)> &check_node_alive,
ReconstructObjectCallback reconstruct_object_callback, ReconstructObjectCallback reconstruct_object_callback,
PushErrorCallback push_error_callback) PushErrorCallback push_error_callback, int64_t max_lineage_bytes)
: in_memory_store_(in_memory_store), : in_memory_store_(in_memory_store),
reference_counter_(reference_counter), reference_counter_(reference_counter),
put_in_local_plasma_callback_(put_in_local_plasma_callback), put_in_local_plasma_callback_(put_in_local_plasma_callback),
retry_task_callback_(retry_task_callback), retry_task_callback_(retry_task_callback),
check_node_alive_(check_node_alive), check_node_alive_(check_node_alive),
reconstruct_object_callback_(reconstruct_object_callback), reconstruct_object_callback_(reconstruct_object_callback),
push_error_callback_(push_error_callback) { push_error_callback_(push_error_callback),
max_lineage_bytes_(max_lineage_bytes) {
reference_counter_->SetReleaseLineageCallback( reference_counter_->SetReleaseLineageCallback(
[this](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) { [this](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
RemoveLineageReference(object_id, ids_to_release); return RemoveLineageReference(object_id, ids_to_release);
ShutdownIfNeeded(); ShutdownIfNeeded();
}); });
} }
@ -115,7 +115,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// not already pending and was successfully resubmitted. /// not already pending and was successfully resubmitted.
/// \return OK if the task was successfully resubmitted or was /// \return OK if the task was successfully resubmitted or was
/// already pending, Invalid if the task spec is no longer present. /// already pending, Invalid if the task spec is no longer present.
Status ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) override; bool ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) override;
/// Wait for all pending tasks to finish, and then shutdown. /// Wait for all pending tasks to finish, and then shutdown.
/// ///
@ -201,6 +201,11 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// Return the number of pending tasks. /// Return the number of pending tasks.
size_t NumPendingTasks() const; size_t NumPendingTasks() const;
int64_t TotalLineageFootprintBytes() const {
absl::MutexLock lock(&mu_);
return total_lineage_footprint_bytes_;
}
private: private:
struct TaskEntry { struct TaskEntry {
TaskEntry(const TaskSpecification &spec_arg, int num_retries_left_arg, TaskEntry(const TaskSpecification &spec_arg, int num_retries_left_arg,
@ -243,12 +248,23 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
// pending tasks and tasks that finished execution but that may be // pending tasks and tasks that finished execution but that may be
// retried in the future. // retried in the future.
absl::flat_hash_set<ObjectID> reconstructable_return_ids; absl::flat_hash_set<ObjectID> reconstructable_return_ids;
// The size of this (serialized) task spec in bytes, if the task spec is
// not pending, i.e. it is being pinned because it's in another object's
// lineage. We cache this because the task spec protobuf can mutate
// out-of-band.
int64_t lineage_footprint_bytes = 0;
}; };
/// Remove a lineage reference to this object ID. This should be called /// Remove a lineage reference to this object ID. This should be called
/// whenever a task that depended on this object ID can no longer be retried. /// whenever a task that depended on this object ID can no longer be retried.
void RemoveLineageReference(const ObjectID &object_id, ///
std::vector<ObjectID> *ids_to_release) LOCKS_EXCLUDED(mu_); /// \param[in] object_id The object ID whose lineage to delete.
/// \param[out] ids_to_release If a task was deleted, then these are the
/// task's arguments whose lineage should also be released.
/// \param[out] The amount of lineage in bytes that was removed.
int64_t RemoveLineageReference(const ObjectID &object_id,
std::vector<ObjectID> *ids_to_release)
LOCKS_EXCLUDED(mu_);
/// Helper function to call RemoveSubmittedTaskReferences on the remaining /// Helper function to call RemoveSubmittedTaskReferences on the remaining
/// dependencies of the given task spec after the task has finished or /// dependencies of the given task spec after the task has finished or
@ -292,6 +308,8 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
// Called to push an error to the relevant driver. // Called to push an error to the relevant driver.
const PushErrorCallback push_error_callback_; const PushErrorCallback push_error_callback_;
const int64_t max_lineage_bytes_;
// The number of task failures we have logged total. // The number of task failures we have logged total.
int64_t num_failure_logs_ GUARDED_BY(mu_) = 0; int64_t num_failure_logs_ GUARDED_BY(mu_) = 0;
@ -312,8 +330,12 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// execution. /// execution.
size_t num_pending_tasks_ = 0; size_t num_pending_tasks_ = 0;
int64_t total_lineage_footprint_bytes_ GUARDED_BY(mu_) = 0;
/// Optional shutdown hook to call when pending tasks all finish. /// Optional shutdown hook to call when pending tasks all finish.
std::function<void()> shutdown_hook_ GUARDED_BY(mu_) = nullptr; std::function<void()> shutdown_hook_ GUARDED_BY(mu_) = nullptr;
friend class TaskManagerTest;
}; };
} // namespace core } // namespace core

View file

@ -40,16 +40,16 @@ class MockTaskResubmitter : public TaskResubmissionInterface {
task_specs[task_id] = task_deps; task_specs[task_id] = task_deps;
} }
Status ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) { bool ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) {
if (task_specs.find(task_id) == task_specs.end()) { if (task_specs.find(task_id) == task_specs.end()) {
return Status::Invalid(""); return false;
} }
for (const auto &dep : task_specs[task_id]) { for (const auto &dep : task_specs[task_id]) {
task_deps->push_back(dep); task_deps->push_back(dep);
} }
num_tasks_resubmitted++; num_tasks_resubmitted++;
return Status::OK(); return true;
} }
std::unordered_map<TaskID, std::vector<ObjectID>> task_specs; std::unordered_map<TaskID, std::vector<ObjectID>> task_specs;
@ -139,7 +139,10 @@ class ObjectRecoveryManagerTestBase : public ::testing::Test {
auto data = auto data =
RayObject(nullptr, meta_buffer, std::vector<rpc::ObjectReference>()); RayObject(nullptr, meta_buffer, std::vector<rpc::ObjectReference>());
RAY_CHECK(memory_store_->Put(data, object_id)); RAY_CHECK(memory_store_->Put(data, object_id));
}) {} }) {
ref_counter_->SetReleaseLineageCallback(
[](const ObjectID &, std::vector<ObjectID> *args) { return 0; });
}
NodeID local_raylet_id_; NodeID local_raylet_id_;
std::unordered_map<ObjectID, rpc::ErrorType> failed_reconstructions_; std::unordered_map<ObjectID, rpc::ErrorType> failed_reconstructions_;
@ -285,7 +288,7 @@ TEST_F(ObjectRecoveryManagerTest, TestReconstructionFails) {
ASSERT_TRUE(object_directory_->Flush() == 1); ASSERT_TRUE(object_directory_->Flush() == 1);
ASSERT_TRUE(failed_reconstructions_[object_id] == ASSERT_TRUE(failed_reconstructions_[object_id] ==
rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE); rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0); ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
} }
@ -302,11 +305,24 @@ TEST_F(ObjectRecoveryManagerTest, TestDependencyReconstructionFails) {
ASSERT_EQ(object_directory_->Flush(), 1); ASSERT_EQ(object_directory_->Flush(), 1);
// Trigger callback for dep ID. // Trigger callback for dep ID.
ASSERT_EQ(object_directory_->Flush(), 1); ASSERT_EQ(object_directory_->Flush(), 1);
ASSERT_EQ(failed_reconstructions_[dep_id], rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE); ASSERT_EQ(failed_reconstructions_[dep_id],
rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED);
ASSERT_EQ(failed_reconstructions_.count(object_id), 0); ASSERT_EQ(failed_reconstructions_.count(object_id), 0);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 1); ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 1);
} }
TEST_F(ObjectRecoveryManagerTest, TestLineageEvicted) {
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id, {}, rpc::Address(), "", 0, true);
ref_counter_->AddLocalReference(object_id, "");
ref_counter_->EvictLineage(1);
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_EQ(object_directory_->Flush(), 1);
ASSERT_EQ(failed_reconstructions_[object_id],
rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTED);
}
} // namespace core } // namespace core
} // namespace ray } // namespace ray

View file

@ -39,7 +39,8 @@ TaskSpecification CreateTaskHelper(uint64_t num_returns,
class TaskManagerTest : public ::testing::Test { class TaskManagerTest : public ::testing::Test {
public: public:
TaskManagerTest(bool lineage_pinning_enabled = false) TaskManagerTest(bool lineage_pinning_enabled = false,
int64_t max_lineage_bytes = 1024 * 1024 * 1024)
: store_(std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore())), : store_(std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore())),
publisher_(std::make_shared<mock_pubsub::MockPublisher>()), publisher_(std::make_shared<mock_pubsub::MockPublisher>()),
subscriber_(std::make_shared<mock_pubsub::MockSubscriber>()), subscriber_(std::make_shared<mock_pubsub::MockSubscriber>()),
@ -61,7 +62,17 @@ class TaskManagerTest : public ::testing::Test {
}, },
[](const JobID &job_id, const std::string &type, [](const JobID &job_id, const std::string &type,
const std::string &error_message, const std::string &error_message,
double timestamp) { return Status::OK(); }) {} double timestamp) { return Status::OK(); },
max_lineage_bytes) {}
virtual void TearDown() { AssertNoLeaks(); }
void AssertNoLeaks() {
absl::MutexLock lock(&manager_.mu_);
ASSERT_EQ(manager_.submissible_tasks_.size(), 0);
ASSERT_EQ(manager_.num_pending_tasks_, 0);
ASSERT_EQ(manager_.total_lineage_footprint_bytes_, 0);
}
std::shared_ptr<CoreWorkerMemoryStore> store_; std::shared_ptr<CoreWorkerMemoryStore> store_;
std::shared_ptr<mock_pubsub::MockPublisher> publisher_; std::shared_ptr<mock_pubsub::MockPublisher> publisher_;
@ -76,7 +87,7 @@ class TaskManagerTest : public ::testing::Test {
class TaskManagerLineageTest : public TaskManagerTest { class TaskManagerLineageTest : public TaskManagerTest {
public: public:
TaskManagerLineageTest() : TaskManagerTest(true) {} TaskManagerLineageTest() : TaskManagerTest(true, /*max_lineage_bytes=*/10000) {}
}; };
TEST_F(TaskManagerTest, TestTaskSuccess) { TEST_F(TaskManagerTest, TestTaskSuccess) {
@ -485,13 +496,13 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) {
// Cannot resubmit a task whose spec we do not have. // Cannot resubmit a task whose spec we do not have.
std::vector<ObjectID> resubmitted_task_deps; std::vector<ObjectID> resubmitted_task_deps;
ASSERT_FALSE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); ASSERT_FALSE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps));
ASSERT_TRUE(resubmitted_task_deps.empty()); ASSERT_TRUE(resubmitted_task_deps.empty());
ASSERT_EQ(num_retries_, 0); ASSERT_EQ(num_retries_, 0);
manager_.AddPendingTask(caller_address, spec, "", num_retries); manager_.AddPendingTask(caller_address, spec, "", num_retries);
// A task that is already pending does not get resubmitted. // A task that is already pending does not get resubmitted.
ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps));
ASSERT_TRUE(resubmitted_task_deps.empty()); ASSERT_TRUE(resubmitted_task_deps.empty());
ASSERT_EQ(num_retries_, 0); ASSERT_EQ(num_retries_, 0);
@ -508,7 +519,7 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) {
// The task finished, its return ID is still in scope, and the return object // The task finished, its return ID is still in scope, and the return object
// was stored in plasma. It is okay to resubmit it now. // was stored in plasma. It is okay to resubmit it now.
ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps));
ASSERT_EQ(resubmitted_task_deps, spec.GetDependencyIds()); ASSERT_EQ(resubmitted_task_deps, spec.GetDependencyIds());
ASSERT_EQ(num_retries_, 1); ASSERT_EQ(num_retries_, 1);
resubmitted_task_deps.clear(); resubmitted_task_deps.clear();
@ -518,7 +529,7 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) {
// The task is still pending execution. // The task is still pending execution.
ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId())); ASSERT_TRUE(manager_.IsTaskPending(spec.TaskId()));
// A task that is already pending does not get resubmitted. // A task that is already pending does not get resubmitted.
ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps));
ASSERT_TRUE(resubmitted_task_deps.empty()); ASSERT_TRUE(resubmitted_task_deps.empty());
ASSERT_EQ(num_retries_, 1); ASSERT_EQ(num_retries_, 1);
@ -526,7 +537,7 @@ TEST_F(TaskManagerLineageTest, TestResubmitTask) {
manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address()); manager_.CompletePendingTask(spec.TaskId(), reply, rpc::Address());
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId())); ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
// The task cannot be resubmitted because its spec has been released. // The task cannot be resubmitted because its spec has been released.
ASSERT_FALSE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); ASSERT_FALSE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps));
ASSERT_TRUE(resubmitted_task_deps.empty()); ASSERT_TRUE(resubmitted_task_deps.empty());
ASSERT_EQ(num_retries_, 1); ASSERT_EQ(num_retries_, 1);
} }
@ -540,8 +551,7 @@ TEST_F(TaskManagerLineageTest, TestResubmittedTaskNondeterministicReturns) {
auto spec = CreateTaskHelper(2, {}); auto spec = CreateTaskHelper(2, {});
auto return_id1 = spec.ReturnId(0); auto return_id1 = spec.ReturnId(0);
auto return_id2 = spec.ReturnId(1); auto return_id2 = spec.ReturnId(1);
int num_retries = 3; manager_.AddPendingTask(caller_address, spec, "", /*num_retries=*/1);
manager_.AddPendingTask(caller_address, spec, "", num_retries);
// The task completes. Both return objects are stored in plasma. // The task completes. Both return objects are stored in plasma.
{ {
@ -564,7 +574,7 @@ TEST_F(TaskManagerLineageTest, TestResubmittedTaskNondeterministicReturns) {
// was stored in plasma. It is okay to resubmit it now. // was stored in plasma. It is okay to resubmit it now.
ASSERT_TRUE(stored_in_plasma.empty()); ASSERT_TRUE(stored_in_plasma.empty());
std::vector<ObjectID> resubmitted_task_deps; std::vector<ObjectID> resubmitted_task_deps;
ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps).ok()); ASSERT_TRUE(manager_.ResubmitTask(spec.TaskId(), &resubmitted_task_deps));
ASSERT_EQ(num_retries_, 1); ASSERT_EQ(num_retries_, 1);
// The re-executed task completes again. One of the return objects is now // The re-executed task completes again. One of the return objects is now

View file

@ -109,12 +109,10 @@ enum ErrorType {
// Indicates that a task failed because the actor died unexpectedly before // Indicates that a task failed because the actor died unexpectedly before
// finishing it. // finishing it.
ACTOR_DIED = 1; ACTOR_DIED = 1;
// This object was lost from distributed memory to a node failure or system // This object was lost from distributed memory due to a node failure or
// error. We use this error when lineage reconstruction is enabled, but the // system error. We use this error when lineage reconstruction is enabled,
// object cannot be reconstructed but the lineage is not available.. // but this object is not reconstructable (created by actor, ray.put, or a
// TODO(swang): We may want to break down this error type further, e.g., // borowed ObjectRef).
// object's lineage was evicted or object depended on an actor task that
// can't be reconstructed.
OBJECT_UNRECONSTRUCTABLE = 2; OBJECT_UNRECONSTRUCTABLE = 2;
// Indicates that a task failed due to user code failure. // Indicates that a task failed due to user code failure.
TASK_EXECUTION_EXCEPTION = 3; TASK_EXECUTION_EXCEPTION = 3;
@ -140,6 +138,12 @@ enum ErrorType {
OBJECT_DELETED = 10; OBJECT_DELETED = 10;
// Indicates there is some error when resolving the dependence // Indicates there is some error when resolving the dependence
DEPENDENCY_RESOLUTION_FAILED = 11; DEPENDENCY_RESOLUTION_FAILED = 11;
// The object is reconstructable but we have already exceeded its maximum
// number of task retries.
OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED = 12;
// The object is reconstructable, but its lineage was evicted due to memory
// pressure.
OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTED = 13;
} }
/// The task exception encapsulates all information about task /// The task exception encapsulates all information about task