[xray] Implement faster flush policy for lineage cache (#3071)

* Policy that flushes the lineage stash immediately

* Fix bug where remote tasks in uncommitted lineage weren't getting subscribed to, add reg test

* test

* Fix bug where waiting task was getting subscribed

* Cleanup

* Update src/ray/raylet/lineage_cache.cc

Co-Authored-By: stephanie-wang <swang@cs.berkeley.edu>

* Update src/ray/raylet/lineage_cache.cc

Co-Authored-By: stephanie-wang <swang@cs.berkeley.edu>

* cleanup

* cleanup

* Add another test for task with many parents

* fix, unsubscribe to new waiting tasks

* Unsubscribe as soon as the commit notification is handled
This commit is contained in:
Stephanie Wang 2018-10-30 09:59:50 -07:00 committed by GitHub
parent a221f55b0d
commit aacbd007a0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 294 additions and 369 deletions

View file

@ -139,43 +139,30 @@ LineageCache::LineageCache(const ClientID &client_id,
gcs::TableInterface<TaskID, protocol::Task> &task_storage,
gcs::PubsubInterface<TaskID> &task_pubsub,
uint64_t max_lineage_size)
: client_id_(client_id),
task_storage_(task_storage),
task_pubsub_(task_pubsub),
max_lineage_size_(max_lineage_size) {}
: client_id_(client_id), task_storage_(task_storage), task_pubsub_(task_pubsub) {}
/// A helper function to merge one lineage into another, in DFS order.
///
/// \param task_id The current entry to merge from lineage_from into
/// lineage_to.
/// \param lineage_from The lineage to merge entries from. This lineage is
/// traversed by following each entry's parent pointers in DFS order,
/// until an entry is not found or the stopping condition is reached.
/// \param lineage_to The lineage to merge entries into.
/// \param stopping_condition A stopping condition for the DFS over
/// lineage_from. This should return true if the merge should stop.
void MergeLineageHelper(const TaskID &task_id, const Lineage &lineage_from,
Lineage &lineage_to,
std::function<bool(const LineageEntry &)> stopping_condition) {
/// A helper function to add some uncommitted lineage to the local cache.
void LineageCache::AddUncommittedLineage(const TaskID &task_id,
const Lineage &uncommitted_lineage,
std::unordered_set<TaskID> &subscribe_tasks) {
// If the entry is not found in the lineage to merge, then we stop since
// there is nothing to copy into the merged lineage.
auto entry = lineage_from.GetEntry(task_id);
auto entry = uncommitted_lineage.GetEntry(task_id);
if (!entry) {
return;
}
// Check whether we should stop at this entry in the DFS.
if (stopping_condition(entry.get())) {
return;
}
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE);
// Insert a copy of the entry into lineage_to.
// Insert a copy of the entry into our cache.
const auto &parent_ids = entry->GetParentTaskIds();
// If the insert is successful, then continue the DFS. The insert will fail
// if the new entry has an equal or lower GCS status than the current entry
// in lineage_to. This also prevents us from traversing the same node twice.
if (lineage_to.SetEntry(entry->TaskData(), entry->GetStatus())) {
// in our cache. This also prevents us from traversing the same node twice.
if (lineage_.SetEntry(entry->TaskData(), entry->GetStatus())) {
subscribe_tasks.insert(task_id);
for (const auto &parent_id : parent_ids) {
MergeLineageHelper(parent_id, lineage_from, lineage_to, stopping_condition);
children_[parent_id].insert(task_id);
AddUncommittedLineage(parent_id, uncommitted_lineage, subscribe_tasks);
}
}
}
@ -183,31 +170,39 @@ void MergeLineageHelper(const TaskID &task_id, const Lineage &lineage_from,
bool LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage) {
auto task_id = task.GetTaskSpecification().TaskId();
RAY_LOG(DEBUG) << "add waiting task " << task_id << " on " << client_id_;
// Merge the uncommitted lineage into the lineage cache.
MergeLineageHelper(task_id, uncommitted_lineage, lineage_,
[](const LineageEntry &entry) {
if (entry.GetStatus() != GcsStatus::NONE) {
// We received the uncommitted lineage from a remote node, so
// make sure that all entries in the lineage to merge have
// status UNCOMMITTED_REMOTE.
RAY_CHECK(entry.GetStatus() == GcsStatus::UNCOMMITTED_REMOTE);
}
// The only stopping condition is that an entry is not found.
return false;
});
auto entry = lineage_.GetEntry(task_id);
if (entry) {
if (entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) {
// The task was previously remote, so we may have been subscribed to it.
// Unsubscribe since we are now responsible for committing the task.
UnsubscribeTask(task_id);
}
}
// Merge the uncommitted lineage into the lineage cache. Collect the IDs of
// tasks that we should subscribe to. These are all of the tasks that were
// included in the uncommitted lineage that we did not already have in our
// stash.
std::unordered_set<TaskID> subscribe_tasks;
AddUncommittedLineage(task_id, uncommitted_lineage, subscribe_tasks);
// Add the submitted task to the lineage cache as UNCOMMITTED_WAITING. It
// should be marked as UNCOMMITTED_READY once the task starts execution.
return lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_WAITING);
auto added = lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_WAITING);
// Do not subscribe to the waiting task itself. We just added it as
// UNCOMMITTED_WAITING, so the task is local.
subscribe_tasks.erase(task_id);
// Unsubscribe to the waiting task since we may have previously been
// subscribed to it.
UnsubscribeTask(task_id);
// Subscribe to all other tasks that were included in the uncommitted lineage
// and that were not already in the local stash. These tasks haven't been
// committed yet and will be committed by a different node, so we will not
// evict them until a notification for their commit is received.
for (const auto &task_id : subscribe_tasks) {
RAY_CHECK(SubscribeTask(task_id));
}
// For every task that the waiting task depends on, record the fact that the
// waiting task depends on it.
auto entry = lineage_.GetEntry(task_id);
for (const auto &parent_id : entry->GetParentTaskIds()) {
children_[parent_id].insert(task_id);
}
return added;
}
bool LineageCache::AddReadyTask(const Task &task) {
@ -217,12 +212,7 @@ bool LineageCache::AddReadyTask(const Task &task) {
// Set the task to READY.
if (lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_READY)) {
// Attempt to flush the task.
bool flushed = FlushTask(task_id);
if (!flushed) {
// If we fail to flush the task here, due to uncommitted parents, then add
// the task to a cache to be flushed in the future.
uncommitted_ready_tasks_.insert(task_id);
}
FlushTask(task_id);
return true;
} else {
// The task was already ready to be committed (UNCOMMITTED_READY) or
@ -231,28 +221,6 @@ bool LineageCache::AddReadyTask(const Task &task) {
}
}
uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id,
std::unordered_set<TaskID> &seen) const {
if (seen.count(task_id) == 1) {
return 0;
}
seen.insert(task_id);
if (subscribed_tasks_.count(task_id) == 1) {
return 0;
}
auto entry = lineage_.GetEntry(task_id);
// Only count tasks that are remote. Tasks that are local will be evicted
// once they are committed in the GCS, along with their lineage.
if (!entry || entry->GetStatus() != GcsStatus::UNCOMMITTED_REMOTE) {
return 0;
}
uint64_t cnt = 1;
for (const auto &parent_id : entry->GetParentTaskIds()) {
cnt += CountUnsubscribedLineage(parent_id, seen);
}
return cnt;
}
bool LineageCache::RemoveWaitingTask(const TaskID &task_id) {
RAY_LOG(DEBUG) << "remove waiting task " << task_id << " on " << client_id_;
auto entry = lineage_.GetEntryMutable(task_id);
@ -274,36 +242,9 @@ bool LineageCache::RemoveWaitingTask(const TaskID &task_id) {
// completely in case another task is submitted locally that depends on this
// one.
entry->ResetStatus(GcsStatus::UNCOMMITTED_REMOTE);
// Subscribe to the task if necessary. We do this if it has any local
// children that must be written to the GCS, or if its uncommitted remote
// lineage is too large.
if (uncommitted_ready_children_.find(task_id) != uncommitted_ready_children_.end()) {
// Subscribe to the task if it has any children in UNCOMMITTED_READY. We
// will attempt to flush its children once we receive a notification for
// this task's commit. Since this task was in state WAITING, check that we
// were not already subscribed to the task.
RAY_CHECK(SubscribeTask(task_id));
} else {
// Check if the uncommitted remote lineage is too large. Request a
// notification for every max_lineage_size_ tasks, so that the task and its
// uncommitted lineage can be evicted once the commit notification is
// received. By doing this, we make sure that the unevicted lineage won't
// be more than max_lineage_size_, and the number of subscribed tasks won't
// be more than N / max_lineage_size_, where N is the size of the task
// chain.
// NOTE(swang): The number of entries in the uncommitted lineage also
// includes local tasks that haven't been committed yet, not just remote
// tasks, so this is an overestimate.
std::unordered_set<TaskID> seen;
auto count = CountUnsubscribedLineage(task_id, seen);
if (count >= max_lineage_size_) {
// Since this task was in state WAITING, check that we were not
// already subscribed to the task.
RAY_CHECK(SubscribeTask(task_id));
}
}
// The task was successfully reset to UNCOMMITTED_REMOTE.
// The task is now remote, so subscribe to the task to make sure that we'll
// eventually clean it up.
RAY_CHECK(SubscribeTask(task_id));
return true;
}
@ -312,17 +253,37 @@ void LineageCache::MarkTaskAsForwarded(const TaskID &task_id, const ClientID &no
lineage_.GetEntryMutable(task_id)->MarkExplicitlyForwarded(node_id);
}
/// A helper function to get the uncommitted lineage of a task.
void GetUncommittedLineageHelper(const TaskID &task_id, const Lineage &lineage_from,
Lineage &lineage_to, const ClientID &node_id) {
// If the entry is not found in the lineage to merge, then we stop since
// there is nothing to copy into the merged lineage.
auto entry = lineage_from.GetEntry(task_id);
if (!entry) {
return;
}
// If this task has already been forwarded to this node, then we can stop.
if (entry->WasExplicitlyForwarded(node_id)) {
return;
}
// Insert a copy of the entry into lineage_to. If the insert is successful,
// then continue the DFS. The insert will fail if the new entry has an equal
// or lower GCS status than the current entry in lineage_to. This also
// prevents us from traversing the same node twice.
if (lineage_to.SetEntry(entry->TaskData(), entry->GetStatus())) {
for (const auto &parent_id : entry->GetParentTaskIds()) {
GetUncommittedLineageHelper(parent_id, lineage_from, lineage_to, node_id);
}
}
}
Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id,
const ClientID &node_id) const {
Lineage uncommitted_lineage;
// Add all uncommitted ancestors from the lineage cache to the uncommitted
// lineage of the requested task.
MergeLineageHelper(
task_id, lineage_, uncommitted_lineage, [&](const LineageEntry &entry) {
// The stopping condition for recursion is that the entry has
// been committed to the GCS or has already been forwarded.
return entry.WasExplicitlyForwarded(node_id);
});
GetUncommittedLineageHelper(task_id, lineage_, uncommitted_lineage, node_id);
// The lineage always includes the requested task id, so add the task if it
// wasn't already added. The requested task may not have been added if it was
// already explicitly forwarded to this node before.
@ -334,72 +295,29 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id,
return uncommitted_lineage;
}
bool LineageCache::FlushTask(const TaskID &task_id) {
auto entry = lineage_.GetEntry(task_id);
void LineageCache::FlushTask(const TaskID &task_id) {
auto entry = lineage_.GetEntryMutable(task_id);
RAY_CHECK(entry);
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_READY);
// Check if all arguments have been committed to the GCS before writing
// this task.
bool all_arguments_committed = true;
for (const auto &parent_id : entry->GetParentTaskIds()) {
auto parent = lineage_.GetEntry(parent_id);
// If a parent entry exists in the lineage cache but has not been
// committed yet, then as far as we know, it's still in flight to the
// GCS. Skip this task for now.
if (parent) {
// Request notifications about the parent entry's commit in the GCS if
// the parent is remote. Otherwise, the parent is local and will
// eventually be flushed. In either case, once we receive a
// notification about the task's commit via HandleEntryCommitted, then
// this task will be ready to write on the next call to Flush().
if (parent->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) {
SubscribeTask(parent_id);
}
all_arguments_committed = false;
// Track the fact that this task is dependent on a parent that hasn't yet
// been committed, for fast lookup. Once all parents are committed, the
// child will be flushed.
uncommitted_ready_children_[parent_id].insert(task_id);
}
}
if (all_arguments_committed) {
gcs::raylet::TaskTable::WriteCallback task_callback = [this](
ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) {
HandleEntryCommitted(id);
};
auto task = lineage_.GetEntry(task_id);
// TODO(swang): Make this better...
flatbuffers::FlatBufferBuilder fbb;
auto message = task->TaskData().ToFlatbuffer(fbb);
fbb.Finish(message);
auto task_data = std::make_shared<protocol::TaskT>();
auto root = flatbuffers::GetRoot<protocol::Task>(fbb.GetBufferPointer());
root->UnPackTo(task_data.get());
RAY_CHECK_OK(task_storage_.Add(task->TaskData().GetTaskSpecification().DriverId(),
task_id, task_data, task_callback));
gcs::raylet::TaskTable::WriteCallback task_callback = [this](
ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) {
HandleEntryCommitted(id);
};
auto task = lineage_.GetEntry(task_id);
// TODO(swang): Make this better...
flatbuffers::FlatBufferBuilder fbb;
auto message = task->TaskData().ToFlatbuffer(fbb);
fbb.Finish(message);
auto task_data = std::make_shared<protocol::TaskT>();
auto root = flatbuffers::GetRoot<protocol::Task>(fbb.GetBufferPointer());
root->UnPackTo(task_data.get());
RAY_CHECK_OK(task_storage_.Add(task->TaskData().GetTaskSpecification().DriverId(),
task_id, task_data, task_callback));
// We successfully wrote the task, so mark it as committing.
// TODO(swang): Use a batched interface and write with all object entries.
auto entry = lineage_.GetEntryMutable(task_id);
RAY_CHECK(entry);
RAY_CHECK(entry->SetStatus(GcsStatus::COMMITTING));
}
return all_arguments_committed;
}
void LineageCache::Flush() {
// Iterate through all tasks that are PLACEABLE.
for (auto it = uncommitted_ready_tasks_.begin();
it != uncommitted_ready_tasks_.end();) {
bool flushed = FlushTask(*it);
// Erase the task from the cache of uncommitted ready tasks.
if (flushed) {
it = uncommitted_ready_tasks_.erase(it);
} else {
it++;
}
}
// We successfully wrote the task, so mark it as committing.
// TODO(swang): Use a batched interface and write with all object entries.
RAY_CHECK(entry->SetStatus(GcsStatus::COMMITTING));
}
bool LineageCache::SubscribeTask(const TaskID &task_id) {
@ -429,81 +347,63 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) {
return subscribed;
}
boost::optional<LineageEntry> LineageCache::EvictTask(const TaskID &task_id) {
RAY_LOG(DEBUG) << "evicting task " << task_id << " on " << client_id_;
auto entry = lineage_.PopEntry(task_id);
if (!entry) {
// The entry has already been evicted. Check that the entry does not have
// any dependent tasks, since we should've already attempted to flush these
// tasks on the first eviction.
RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0);
// Check that we already unsubscribed from the task when handling the
// first eviction.
RAY_CHECK(subscribed_tasks_.count(task_id) == 0);
// Do nothing if the entry has already been evicted.
return entry;
void LineageCache::EvictTask(const TaskID &task_id) {
// If we haven't received a commit for this task yet, do not evict.
auto commit_it = committed_tasks_.find(task_id);
if (commit_it == committed_tasks_.end()) {
return;
}
// Stop listening for notifications about this task.
UnsubscribeTask(task_id);
// Try to flush the children of the committed task. These are the tasks that
// have a dependency on the committed task.
auto children_entry = uncommitted_ready_children_.find(task_id);
if (children_entry != uncommitted_ready_children_.end()) {
// Get the children of the committed task that are uncommitted but ready.
auto children = std::move(children_entry->second);
uncommitted_ready_children_.erase(children_entry);
// Try to flush the children. If all of the child's parents are committed,
// then the child will be flushed here.
for (const auto &child_id : children) {
bool flushed = FlushTask(child_id);
// Erase the child task from the cache of uncommitted ready tasks.
if (flushed) {
auto erased = uncommitted_ready_tasks_.erase(child_id);
RAY_CHECK(erased == 1);
}
}
}
return entry;
}
void LineageCache::EvictRemoteLineage(const TaskID &task_id) {
// If the entry has already been evicted, exit.
auto entry = lineage_.GetEntry(task_id);
if (!entry) {
return;
}
// Only evict tasks that are remote. Other tasks, and their lineage, will be
// evicted once they are committed.
if (entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) {
// Remove the ancestor task.
auto evicted_entry = EvictTask(task_id);
// Recurse and remove this task's ancestors.
for (const auto &parent_id : evicted_entry->GetParentTaskIds()) {
EvictRemoteLineage(parent_id);
// Only evict tasks that we were subscribed to or that we were committing.
if (!(entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE ||
entry->GetStatus() == GcsStatus::COMMITTING)) {
return;
}
// Entries cannot be safely evicted until their parents are all evicted.
for (const auto &parent_id : entry->GetParentTaskIds()) {
if (ContainsTask(parent_id)) {
return;
}
}
// Evict the task.
RAY_LOG(DEBUG) << "evicting task " << task_id << " on " << client_id_;
lineage_.PopEntry(task_id);
committed_tasks_.erase(commit_it);
// Try to evict the children of the evict task. These are the tasks that have
// a dependency on the evicted task.
auto children_entry = children_.find(task_id);
if (children_entry != children_.end()) {
// Get the children of the evicted task.
auto children = std::move(children_entry->second);
children_.erase(children_entry);
// Try to evict the children. If all of the child's parents are evicted,
// then the child will be evicted here.
for (const auto &child_id : children) {
EvictTask(child_id);
}
}
return;
}
void LineageCache::HandleEntryCommitted(const TaskID &task_id) {
RAY_LOG(DEBUG) << "task committed: " << task_id;
auto entry = EvictTask(task_id);
auto entry = lineage_.GetEntry(task_id);
if (!entry) {
// The task has already been evicted due to a previous commit notification,
// or because one of its descendants was committed.
// The task has already been evicted due to a previous commit notification.
return;
}
// Evict the committed task's uncommitted lineage. Since local tasks are
// written in data dependency order, the uncommitted lineage should only
// include remote tasks, i.e. tasks that were committed by a different node.
// In case of reconstruction, the uncommitted lineage may also include local
// tasks that were resubmitted. These tasks are not evicted.
for (const auto &parent_id : entry->GetParentTaskIds()) {
EvictRemoteLineage(parent_id);
}
// Record the commit acknowledgement and attempt to evict the task.
committed_tasks_.insert(task_id);
EvictTask(task_id);
// We got the notification about the task's commit, so no longer need any
// more notifications.
UnsubscribeTask(task_id);
}
const Task &LineageCache::GetTask(const TaskID &task_id) const {
@ -519,6 +419,8 @@ bool LineageCache::ContainsTask(const TaskID &task_id) const {
return it != entries.end();
}
size_t LineageCache::NumEntries() const { return lineage_.GetEntries().size(); }
} // namespace raylet
} // namespace ray

View file

@ -184,7 +184,14 @@ class Lineage {
/// \class LineageCache
///
/// A cache of the task table. This consists of all tasks that this node owns,
/// as well as their lineage, that have not yet been added durably to the GCS.
/// as well as their lineage, that have not yet been added durably
/// ("committed") to the GCS.
///
/// The current policy is to flush each task as soon as it enters the
/// UNCOMMITTED_READY state. For safety, we only evict tasks if they have been
/// committed and if their parents have been all evicted. Thus, the invariant
/// is that if g depends on f, and g has been evicted, then f must have been
/// committed.
class LineageCache {
public:
/// Create a lineage cache for the given task storage system.
@ -242,15 +249,8 @@ class LineageCache {
/// includes the entry for the requested entry_id.
Lineage GetUncommittedLineage(const TaskID &task_id, const ClientID &node_id) const;
/// Asynchronously write any tasks that are in the UNCOMMITTED_READY state
/// and for which all parents have been committed to the GCS. These tasks
/// will be transitioned in this method to state COMMITTING. Once the write
/// is acknowledged, the task's state will be transitioned to state
/// COMMITTED.
void Flush();
/// Handle the commit of a task entry in the GCS. This sets the task to
/// COMMITTED and cleans up any ancestor tasks that are in the cache.
/// Handle the commit of a task entry in the GCS. This attempts to evict the
/// task if possible.
///
/// \param task_id The ID of the task entry that was committed.
void HandleEntryCommitted(const TaskID &task_id);
@ -267,35 +267,28 @@ class LineageCache {
/// \return Whether the task is in the lineage cache.
bool ContainsTask(const TaskID &task_id) const;
/// Get the number of entries in the lineage cache.
///
/// \return The number of entries in the lineage cache.
size_t NumEntries() const;
private:
/// Try to flush a task that is in UNCOMMITTED_READY state. If the task has
/// parents that are not committed yet, then the child will be flushed once
/// the parents have been committed.
bool FlushTask(const TaskID &task_id);
/// Flush a task that is in UNCOMMITTED_READY state.
void FlushTask(const TaskID &task_id);
/// Evict a single task. This should only be called if we are sure that the
/// task has been committed and will trigger an attempt to flush any of the
/// evicted task's children that are in UNCOMMITTED_READY state. Returns an
/// optional reference to the evicted task that is empty if the task was not
/// in the lineage cache.
boost::optional<LineageEntry> EvictTask(const TaskID &task_id);
/// Evict a remote task and its lineage. This should only be called if we
/// are sure that the remote task and its lineage are committed.
void EvictRemoteLineage(const TaskID &task_id);
/// task has been committed. The task will only be evicted if all of its
/// parents have also been evicted. If successful, then we will also attempt
/// to evict the task's children.
void EvictTask(const TaskID &task_id);
/// Subscribe to notifications for a task. Returns whether the operation
/// was successful (whether we were not already subscribed).
bool SubscribeTask(const TaskID &task_id);
/// Unsubscribe from notifications for a task. Returns whether the operation
/// was successful (whether we were subscribed).
bool UnsubscribeTask(const TaskID &task_id);
/// Count the size of unsubscribed and uncommitted lineage of the given task
/// excluding the values that have already been visited.
///
/// \param task_id The task whose lineage should be counted.
/// \param seen This set contains the keys of lineage entries counted so far,
/// so that we don't revisit those nodes.
/// \void The number of tasks that were counted.
uint64_t CountUnsubscribedLineage(const TaskID &task_id,
std::unordered_set<TaskID> &seen) const;
/// Add a task and its uncommitted lineage to the local stash.
void AddUncommittedLineage(const TaskID &task_id, const Lineage &uncommitted_lineage,
std::unordered_set<TaskID> &subscribe_tasks);
/// The client ID, used to request notifications for specific tasks.
/// TODO(swang): Move the ClientID into the generic Table implementation.
@ -305,23 +298,10 @@ class LineageCache {
/// The pubsub storage system for task information. This can be used to
/// request notifications for the commit of a task entry.
gcs::PubsubInterface<TaskID> &task_pubsub_;
/// The maximum size that a remote task's uncommitted lineage can get to. If
/// a remote task's uncommitted lineage exceeds this size, then a
/// notification will be requested from the pubsub storage system so that
/// the task and its lineage can be evicted from the stash.
uint64_t max_lineage_size_;
/// The set of tasks that are in UNCOMMITTED_READY state. This is a cache of
/// the tasks that may be flushable.
// TODO(swang): As an optimization, we may also want to further distinguish
// which tasks are flushable, to avoid iterating over tasks that are in
// UNCOMMITTED_READY, but that have dependencies that have not been committed
// yet.
std::unordered_set<TaskID> uncommitted_ready_tasks_;
/// A mapping from each task that hasn't been committed yet, to all dependent
/// children tasks that are in UNCOMMITTED_READY state. This is used when the
/// parent task is committed, for fast lookup of children that may now be
/// flushed.
std::unordered_map<TaskID, std::unordered_set<TaskID>> uncommitted_ready_children_;
/// The set of tasks that have been committed but not evicted.
std::unordered_set<TaskID> committed_tasks_;
/// A mapping from each task in the lineage cache to its children.
std::unordered_map<TaskID, std::unordered_set<TaskID>> children_;
/// All tasks and objects that we are responsible for writing back to the
/// GCS, and the tasks and objects in their lineage.
Lineage lineage_;

View file

@ -221,12 +221,6 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) {
ASSERT_EQ(1, uncommitted_lineage_forwarded.GetEntries().size());
}
void CheckFlush(LineageCache &lineage_cache, MockGcs &mock_gcs,
size_t num_tasks_flushed) {
lineage_cache.Flush();
ASSERT_EQ(mock_gcs.TaskTable().size(), num_tasks_flushed);
}
TEST_F(LineageCacheTest, TestWritebackNoneReady) {
// Insert a chain of dependent tasks.
size_t num_tasks_flushed = 0;
@ -235,7 +229,7 @@ TEST_F(LineageCacheTest, TestWritebackNoneReady) {
// Check that when no tasks have been marked as ready, we do not flush any
// entries.
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
}
TEST_F(LineageCacheTest, TestWritebackReady) {
@ -247,69 +241,124 @@ TEST_F(LineageCacheTest, TestWritebackReady) {
// Check that after marking the first task as ready, we flush only that task.
ASSERT_TRUE(lineage_cache_.AddReadyTask(tasks.front()));
num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
}
TEST_F(LineageCacheTest, TestWritebackOrder) {
// Insert a chain of dependent tasks.
size_t num_tasks_flushed = 0;
std::vector<Task> tasks;
InsertTaskChain(lineage_cache_, tasks, 3, std::vector<ObjectID>(), 1);
size_t num_tasks_flushed = tasks.size();
// Mark all tasks as ready. The first task, which has no dependencies, should
// be flushed.
// Mark all tasks as ready. All tasks should be flushed.
for (const auto &task : tasks) {
ASSERT_TRUE(lineage_cache_.AddReadyTask(task));
}
// Check that we write back the tasks in order of data dependencies.
for (size_t i = 0; i < tasks.size(); i++) {
num_tasks_flushed++;
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
// Flush acknowledgements. The next task should have been flushed.
mock_gcs_.Flush();
}
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
}
TEST_F(LineageCacheTest, TestWritebackPartiallyReady) {
// Create two independent tasks, task1 and task2, and a dependent task
// that depends on both tasks.
TEST_F(LineageCacheTest, TestEvictChain) {
// Create a chain of 3 tasks.
size_t num_tasks_flushed = 0;
auto task1 = ExampleTask({}, 1);
auto task2 = ExampleTask({}, 1);
std::vector<ObjectID> returns;
for (int64_t i = 0; i < task1.GetTaskSpecification().NumReturns(); i++) {
returns.push_back(task1.GetTaskSpecification().ReturnId(i));
std::vector<Task> tasks;
std::vector<ObjectID> arguments;
for (int i = 0; i < 3; i++) {
auto task = ExampleTask(arguments, 1);
tasks.push_back(task);
arguments = {task.GetTaskSpecification().ReturnId(0)};
}
for (int64_t i = 0; i < task2.GetTaskSpecification().NumReturns(); i++) {
returns.push_back(task2.GetTaskSpecification().ReturnId(i));
Lineage uncommitted_lineage;
for (const auto &task : tasks) {
uncommitted_lineage.SetEntry(task, GcsStatus::UNCOMMITTED_REMOTE);
}
auto dependent_task = ExampleTask(returns, 1);
// Insert all tasks as waiting for execution.
ASSERT_TRUE(lineage_cache_.AddWaitingTask(task1, Lineage()));
ASSERT_TRUE(lineage_cache_.AddWaitingTask(task2, Lineage()));
ASSERT_TRUE(lineage_cache_.AddWaitingTask(dependent_task, Lineage()));
// Flush one of the independent tasks.
ASSERT_TRUE(lineage_cache_.AddReadyTask(task1));
num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
// Flush acknowledgements. The lineage cache should receive the commit for
// the first task.
mock_gcs_.Flush();
// Mark the other independent task and the dependent as ready.
ASSERT_TRUE(lineage_cache_.AddReadyTask(task2));
ASSERT_TRUE(lineage_cache_.AddReadyTask(dependent_task));
// Two tasks are ready, but only the independent task should be flushed. The
// dependent task should only be flushed once commits for both independent
// tasks are received.
num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
// Flush acknowledgements. Both independent tasks should now be committed.
mock_gcs_.Flush();
// The dependent task should now be flushed.
// Mark the last task as ready to flush.
ASSERT_TRUE(lineage_cache_.AddWaitingTask(tasks.back(), uncommitted_lineage));
ASSERT_EQ(lineage_cache_.NumEntries(), tasks.size());
ASSERT_TRUE(lineage_cache_.AddReadyTask(tasks.back()));
num_tasks_flushed++;
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
// Flush acknowledgements. The lineage cache should receive the commit for
// the flushed task, but its lineage should not be evicted yet.
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_
.GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(),
ClientID::nil())
.GetEntries()
.size(),
tasks.size());
ASSERT_EQ(lineage_cache_.NumEntries(), tasks.size());
// Simulate executing the task on a remote node and adding it to the GCS.
auto task_data = std::make_shared<protocol::TaskT>();
RAY_CHECK_OK(
mock_gcs_.RemoteAdd(tasks.at(1).GetTaskSpecification().TaskId(), task_data));
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_
.GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(),
ClientID::nil())
.GetEntries()
.size(),
tasks.size());
ASSERT_EQ(lineage_cache_.NumEntries(), tasks.size());
// Simulate executing the task on a remote node and adding it to the GCS.
RAY_CHECK_OK(
mock_gcs_.RemoteAdd(tasks.at(0).GetTaskSpecification().TaskId(), task_data));
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_.NumEntries(), 0);
}
TEST_F(LineageCacheTest, TestEvictManyParents) {
// Create some independent tasks.
std::vector<Task> parent_tasks;
std::vector<ObjectID> arguments;
for (int i = 0; i < 10; i++) {
auto task = ExampleTask({}, 1);
parent_tasks.push_back(task);
arguments.push_back(task.GetTaskSpecification().ReturnId(0));
ASSERT_TRUE(lineage_cache_.AddWaitingTask(task, Lineage()));
}
// Create a child task that is dependent on all of the previous tasks.
auto child_task = ExampleTask(arguments, 1);
ASSERT_TRUE(lineage_cache_.AddWaitingTask(child_task, Lineage()));
// Flush the child task. Make sure that it remains in the cache, since none
// of its parents have been committed yet, and that the uncommitted lineage
// still includes all of the parent tasks.
size_t total_tasks = parent_tasks.size() + 1;
lineage_cache_.AddReadyTask(child_task);
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_.NumEntries(), total_tasks);
ASSERT_EQ(lineage_cache_
.GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(),
ClientID::nil())
.GetEntries()
.size(),
total_tasks);
// Flush each parent task and check for eviction safety.
for (const auto &parent_task : parent_tasks) {
lineage_cache_.AddReadyTask(parent_task);
mock_gcs_.Flush();
total_tasks--;
if (total_tasks > 1) {
// Each task should be evicted as soon as its commit is acknowledged,
// since the parent tasks have no dependencies.
ASSERT_EQ(lineage_cache_.NumEntries(), total_tasks);
ASSERT_EQ(lineage_cache_
.GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(),
ClientID::nil())
.GetEntries()
.size(),
total_tasks);
} else {
// After the last task has been committed, then the child task should
// also be evicted. The lineage cache should now be empty.
ASSERT_EQ(lineage_cache_.NumEntries(), 0);
}
}
}
TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) {
@ -350,16 +399,19 @@ TEST_F(LineageCacheTest, TestForwardTask) {
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(task_id_to_remove, ClientID::nil());
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove));
ASSERT_EQ(lineage_cache_.NumEntries(), 3);
// Simulate executing the remaining tasks.
for (const auto &task : tasks) {
ASSERT_TRUE(lineage_cache_.AddReadyTask(task));
num_tasks_flushed++;
}
// Check that the first task, which has no dependencies can be flushed. The
// last task cannot be flushed since one of its dependencies has not been
// added by the remote node yet.
num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
mock_gcs_.Flush();
ASSERT_EQ(lineage_cache_.NumEntries(), 2);
// Simulate executing the task on a remote node and adding it to the GCS.
auto task_data = std::make_shared<protocol::TaskT>();
@ -367,15 +419,14 @@ TEST_F(LineageCacheTest, TestForwardTask) {
mock_gcs_.RemoteAdd(forwarded_task.GetTaskSpecification().TaskId(), task_data));
// Check that the remote task is flushed.
num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
ASSERT_EQ(mock_gcs_.SubscribedTasks().size(), 1);
// Check that once we receive the callback for the remote task, we can now
// flush the last task.
mock_gcs_.Flush();
num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.SubscribedTasks().size(), 0);
ASSERT_EQ(lineage_cache_.NumEntries(), 0);
}
TEST_F(LineageCacheTest, TestEviction) {
@ -407,10 +458,11 @@ TEST_F(LineageCacheTest, TestEviction) {
// Check that the remote task is flushed.
num_tasks_flushed++;
mock_gcs_.Flush();
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
// Check that the last task in the chain still has all tasks in its
// uncommitted lineage.
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
ASSERT_EQ(lineage_cache_.NumEntries(), lineage_size - num_tasks_flushed);
// Simulate executing all the rest of the tasks except the last one on a
// remote node and adding them to the GCS.
@ -420,7 +472,8 @@ TEST_F(LineageCacheTest, TestEviction) {
// Check that the remote task is flushed.
num_tasks_flushed++;
mock_gcs_.Flush();
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
ASSERT_EQ(lineage_cache_.NumEntries(), lineage_size - num_tasks_flushed);
}
// All tasks have now been flushed. Check that enough lineage has been
// evicted that the uncommitted lineage is now less than the maximum size.
@ -445,8 +498,6 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
auto task_id = task.GetTaskSpecification().TaskId();
ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id));
}
// Check that we requested at most 2 notifications
ASSERT_TRUE(mock_gcs_.NumRequestedNotifications() <= 2);
// Check that the last task in the chain still has all tasks in its
// uncommitted lineage.
@ -454,37 +505,29 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) {
auto uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil());
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
// Simulate executing all the rest of the tasks except the last one at the
// remote node. Simulate receiving the notifications from the GCS in reverse
// order of execution.
tasks.pop_back();
auto task_data = std::make_shared<protocol::TaskT>();
auto it = tasks.rbegin();
RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data));
it++;
// Check that the remote task is flushed.
num_tasks_flushed++;
mock_gcs_.Flush();
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
// Check that the last task in the chain still has all tasks in its
// uncommitted lineage.
ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size);
ASSERT_EQ(lineage_cache_.NumEntries(), lineage_size);
// Simulate executing the rest of the tasks on a remote node and receiving
// the notifications from the GCS in reverse order of execution.
for (; it != tasks.rend(); it++) {
auto last_task = tasks.front();
tasks.erase(tasks.begin());
for (auto it = tasks.rbegin(); it != tasks.rend(); it++) {
auto task_data = std::make_shared<protocol::TaskT>();
RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data));
// Check that the remote task is flushed.
num_tasks_flushed++;
mock_gcs_.Flush();
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
ASSERT_EQ(lineage_cache_.NumEntries(), lineage_size);
}
// All tasks have now been flushed. Check that enough lineage has been
// evicted that the uncommitted lineage is now less than the maximum size.
uncommitted_lineage =
lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil());
ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_);
// Flush the last task. The lineage should not get evicted until this task's
// commit is received.
auto task_data = std::make_shared<protocol::TaskT>();
RAY_CHECK_OK(mock_gcs_.RemoteAdd(last_task.GetTaskSpecification().TaskId(), task_data));
num_tasks_flushed++;
mock_gcs_.Flush();
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
ASSERT_EQ(lineage_cache_.NumEntries(), 0);
}
TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) {
@ -524,7 +567,7 @@ TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) {
// tasks are flushed, since all of their dependencies have been evicted and
// are therefore committed in the GCS.
mock_gcs_.Flush();
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
}
} // namespace raylet