diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 917d925a7..24b7ab034 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -139,43 +139,30 @@ LineageCache::LineageCache(const ClientID &client_id, gcs::TableInterface &task_storage, gcs::PubsubInterface &task_pubsub, uint64_t max_lineage_size) - : client_id_(client_id), - task_storage_(task_storage), - task_pubsub_(task_pubsub), - max_lineage_size_(max_lineage_size) {} + : client_id_(client_id), task_storage_(task_storage), task_pubsub_(task_pubsub) {} -/// A helper function to merge one lineage into another, in DFS order. -/// -/// \param task_id The current entry to merge from lineage_from into -/// lineage_to. -/// \param lineage_from The lineage to merge entries from. This lineage is -/// traversed by following each entry's parent pointers in DFS order, -/// until an entry is not found or the stopping condition is reached. -/// \param lineage_to The lineage to merge entries into. -/// \param stopping_condition A stopping condition for the DFS over -/// lineage_from. This should return true if the merge should stop. -void MergeLineageHelper(const TaskID &task_id, const Lineage &lineage_from, - Lineage &lineage_to, - std::function stopping_condition) { +/// A helper function to add some uncommitted lineage to the local cache. +void LineageCache::AddUncommittedLineage(const TaskID &task_id, + const Lineage &uncommitted_lineage, + std::unordered_set &subscribe_tasks) { // If the entry is not found in the lineage to merge, then we stop since // there is nothing to copy into the merged lineage. - auto entry = lineage_from.GetEntry(task_id); + auto entry = uncommitted_lineage.GetEntry(task_id); if (!entry) { return; } - // Check whether we should stop at this entry in the DFS. - if (stopping_condition(entry.get())) { - return; - } + RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE); - // Insert a copy of the entry into lineage_to. + // Insert a copy of the entry into our cache. const auto &parent_ids = entry->GetParentTaskIds(); // If the insert is successful, then continue the DFS. The insert will fail // if the new entry has an equal or lower GCS status than the current entry - // in lineage_to. This also prevents us from traversing the same node twice. - if (lineage_to.SetEntry(entry->TaskData(), entry->GetStatus())) { + // in our cache. This also prevents us from traversing the same node twice. + if (lineage_.SetEntry(entry->TaskData(), entry->GetStatus())) { + subscribe_tasks.insert(task_id); for (const auto &parent_id : parent_ids) { - MergeLineageHelper(parent_id, lineage_from, lineage_to, stopping_condition); + children_[parent_id].insert(task_id); + AddUncommittedLineage(parent_id, uncommitted_lineage, subscribe_tasks); } } } @@ -183,31 +170,39 @@ void MergeLineageHelper(const TaskID &task_id, const Lineage &lineage_from, bool LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_lineage) { auto task_id = task.GetTaskSpecification().TaskId(); RAY_LOG(DEBUG) << "add waiting task " << task_id << " on " << client_id_; - // Merge the uncommitted lineage into the lineage cache. - MergeLineageHelper(task_id, uncommitted_lineage, lineage_, - [](const LineageEntry &entry) { - if (entry.GetStatus() != GcsStatus::NONE) { - // We received the uncommitted lineage from a remote node, so - // make sure that all entries in the lineage to merge have - // status UNCOMMITTED_REMOTE. - RAY_CHECK(entry.GetStatus() == GcsStatus::UNCOMMITTED_REMOTE); - } - // The only stopping condition is that an entry is not found. - return false; - }); - - auto entry = lineage_.GetEntry(task_id); - if (entry) { - if (entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) { - // The task was previously remote, so we may have been subscribed to it. - // Unsubscribe since we are now responsible for committing the task. - UnsubscribeTask(task_id); - } - } + // Merge the uncommitted lineage into the lineage cache. Collect the IDs of + // tasks that we should subscribe to. These are all of the tasks that were + // included in the uncommitted lineage that we did not already have in our + // stash. + std::unordered_set subscribe_tasks; + AddUncommittedLineage(task_id, uncommitted_lineage, subscribe_tasks); // Add the submitted task to the lineage cache as UNCOMMITTED_WAITING. It // should be marked as UNCOMMITTED_READY once the task starts execution. - return lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_WAITING); + auto added = lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_WAITING); + + // Do not subscribe to the waiting task itself. We just added it as + // UNCOMMITTED_WAITING, so the task is local. + subscribe_tasks.erase(task_id); + // Unsubscribe to the waiting task since we may have previously been + // subscribed to it. + UnsubscribeTask(task_id); + // Subscribe to all other tasks that were included in the uncommitted lineage + // and that were not already in the local stash. These tasks haven't been + // committed yet and will be committed by a different node, so we will not + // evict them until a notification for their commit is received. + for (const auto &task_id : subscribe_tasks) { + RAY_CHECK(SubscribeTask(task_id)); + } + + // For every task that the waiting task depends on, record the fact that the + // waiting task depends on it. + auto entry = lineage_.GetEntry(task_id); + for (const auto &parent_id : entry->GetParentTaskIds()) { + children_[parent_id].insert(task_id); + } + + return added; } bool LineageCache::AddReadyTask(const Task &task) { @@ -217,12 +212,7 @@ bool LineageCache::AddReadyTask(const Task &task) { // Set the task to READY. if (lineage_.SetEntry(task, GcsStatus::UNCOMMITTED_READY)) { // Attempt to flush the task. - bool flushed = FlushTask(task_id); - if (!flushed) { - // If we fail to flush the task here, due to uncommitted parents, then add - // the task to a cache to be flushed in the future. - uncommitted_ready_tasks_.insert(task_id); - } + FlushTask(task_id); return true; } else { // The task was already ready to be committed (UNCOMMITTED_READY) or @@ -231,28 +221,6 @@ bool LineageCache::AddReadyTask(const Task &task) { } } -uint64_t LineageCache::CountUnsubscribedLineage(const TaskID &task_id, - std::unordered_set &seen) const { - if (seen.count(task_id) == 1) { - return 0; - } - seen.insert(task_id); - if (subscribed_tasks_.count(task_id) == 1) { - return 0; - } - auto entry = lineage_.GetEntry(task_id); - // Only count tasks that are remote. Tasks that are local will be evicted - // once they are committed in the GCS, along with their lineage. - if (!entry || entry->GetStatus() != GcsStatus::UNCOMMITTED_REMOTE) { - return 0; - } - uint64_t cnt = 1; - for (const auto &parent_id : entry->GetParentTaskIds()) { - cnt += CountUnsubscribedLineage(parent_id, seen); - } - return cnt; -} - bool LineageCache::RemoveWaitingTask(const TaskID &task_id) { RAY_LOG(DEBUG) << "remove waiting task " << task_id << " on " << client_id_; auto entry = lineage_.GetEntryMutable(task_id); @@ -274,36 +242,9 @@ bool LineageCache::RemoveWaitingTask(const TaskID &task_id) { // completely in case another task is submitted locally that depends on this // one. entry->ResetStatus(GcsStatus::UNCOMMITTED_REMOTE); - - // Subscribe to the task if necessary. We do this if it has any local - // children that must be written to the GCS, or if its uncommitted remote - // lineage is too large. - if (uncommitted_ready_children_.find(task_id) != uncommitted_ready_children_.end()) { - // Subscribe to the task if it has any children in UNCOMMITTED_READY. We - // will attempt to flush its children once we receive a notification for - // this task's commit. Since this task was in state WAITING, check that we - // were not already subscribed to the task. - RAY_CHECK(SubscribeTask(task_id)); - } else { - // Check if the uncommitted remote lineage is too large. Request a - // notification for every max_lineage_size_ tasks, so that the task and its - // uncommitted lineage can be evicted once the commit notification is - // received. By doing this, we make sure that the unevicted lineage won't - // be more than max_lineage_size_, and the number of subscribed tasks won't - // be more than N / max_lineage_size_, where N is the size of the task - // chain. - // NOTE(swang): The number of entries in the uncommitted lineage also - // includes local tasks that haven't been committed yet, not just remote - // tasks, so this is an overestimate. - std::unordered_set seen; - auto count = CountUnsubscribedLineage(task_id, seen); - if (count >= max_lineage_size_) { - // Since this task was in state WAITING, check that we were not - // already subscribed to the task. - RAY_CHECK(SubscribeTask(task_id)); - } - } - // The task was successfully reset to UNCOMMITTED_REMOTE. + // The task is now remote, so subscribe to the task to make sure that we'll + // eventually clean it up. + RAY_CHECK(SubscribeTask(task_id)); return true; } @@ -312,17 +253,37 @@ void LineageCache::MarkTaskAsForwarded(const TaskID &task_id, const ClientID &no lineage_.GetEntryMutable(task_id)->MarkExplicitlyForwarded(node_id); } +/// A helper function to get the uncommitted lineage of a task. +void GetUncommittedLineageHelper(const TaskID &task_id, const Lineage &lineage_from, + Lineage &lineage_to, const ClientID &node_id) { + // If the entry is not found in the lineage to merge, then we stop since + // there is nothing to copy into the merged lineage. + auto entry = lineage_from.GetEntry(task_id); + if (!entry) { + return; + } + // If this task has already been forwarded to this node, then we can stop. + if (entry->WasExplicitlyForwarded(node_id)) { + return; + } + + // Insert a copy of the entry into lineage_to. If the insert is successful, + // then continue the DFS. The insert will fail if the new entry has an equal + // or lower GCS status than the current entry in lineage_to. This also + // prevents us from traversing the same node twice. + if (lineage_to.SetEntry(entry->TaskData(), entry->GetStatus())) { + for (const auto &parent_id : entry->GetParentTaskIds()) { + GetUncommittedLineageHelper(parent_id, lineage_from, lineage_to, node_id); + } + } +} + Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id, const ClientID &node_id) const { Lineage uncommitted_lineage; // Add all uncommitted ancestors from the lineage cache to the uncommitted // lineage of the requested task. - MergeLineageHelper( - task_id, lineage_, uncommitted_lineage, [&](const LineageEntry &entry) { - // The stopping condition for recursion is that the entry has - // been committed to the GCS or has already been forwarded. - return entry.WasExplicitlyForwarded(node_id); - }); + GetUncommittedLineageHelper(task_id, lineage_, uncommitted_lineage, node_id); // The lineage always includes the requested task id, so add the task if it // wasn't already added. The requested task may not have been added if it was // already explicitly forwarded to this node before. @@ -334,72 +295,29 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id, return uncommitted_lineage; } -bool LineageCache::FlushTask(const TaskID &task_id) { - auto entry = lineage_.GetEntry(task_id); +void LineageCache::FlushTask(const TaskID &task_id) { + auto entry = lineage_.GetEntryMutable(task_id); RAY_CHECK(entry); RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_READY); - // Check if all arguments have been committed to the GCS before writing - // this task. - bool all_arguments_committed = true; - for (const auto &parent_id : entry->GetParentTaskIds()) { - auto parent = lineage_.GetEntry(parent_id); - // If a parent entry exists in the lineage cache but has not been - // committed yet, then as far as we know, it's still in flight to the - // GCS. Skip this task for now. - if (parent) { - // Request notifications about the parent entry's commit in the GCS if - // the parent is remote. Otherwise, the parent is local and will - // eventually be flushed. In either case, once we receive a - // notification about the task's commit via HandleEntryCommitted, then - // this task will be ready to write on the next call to Flush(). - if (parent->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) { - SubscribeTask(parent_id); - } - all_arguments_committed = false; - // Track the fact that this task is dependent on a parent that hasn't yet - // been committed, for fast lookup. Once all parents are committed, the - // child will be flushed. - uncommitted_ready_children_[parent_id].insert(task_id); - } - } - if (all_arguments_committed) { - gcs::raylet::TaskTable::WriteCallback task_callback = [this]( - ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) { - HandleEntryCommitted(id); - }; - auto task = lineage_.GetEntry(task_id); - // TODO(swang): Make this better... - flatbuffers::FlatBufferBuilder fbb; - auto message = task->TaskData().ToFlatbuffer(fbb); - fbb.Finish(message); - auto task_data = std::make_shared(); - auto root = flatbuffers::GetRoot(fbb.GetBufferPointer()); - root->UnPackTo(task_data.get()); - RAY_CHECK_OK(task_storage_.Add(task->TaskData().GetTaskSpecification().DriverId(), - task_id, task_data, task_callback)); + gcs::raylet::TaskTable::WriteCallback task_callback = [this]( + ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) { + HandleEntryCommitted(id); + }; + auto task = lineage_.GetEntry(task_id); + // TODO(swang): Make this better... + flatbuffers::FlatBufferBuilder fbb; + auto message = task->TaskData().ToFlatbuffer(fbb); + fbb.Finish(message); + auto task_data = std::make_shared(); + auto root = flatbuffers::GetRoot(fbb.GetBufferPointer()); + root->UnPackTo(task_data.get()); + RAY_CHECK_OK(task_storage_.Add(task->TaskData().GetTaskSpecification().DriverId(), + task_id, task_data, task_callback)); - // We successfully wrote the task, so mark it as committing. - // TODO(swang): Use a batched interface and write with all object entries. - auto entry = lineage_.GetEntryMutable(task_id); - RAY_CHECK(entry); - RAY_CHECK(entry->SetStatus(GcsStatus::COMMITTING)); - } - return all_arguments_committed; -} - -void LineageCache::Flush() { - // Iterate through all tasks that are PLACEABLE. - for (auto it = uncommitted_ready_tasks_.begin(); - it != uncommitted_ready_tasks_.end();) { - bool flushed = FlushTask(*it); - // Erase the task from the cache of uncommitted ready tasks. - if (flushed) { - it = uncommitted_ready_tasks_.erase(it); - } else { - it++; - } - } + // We successfully wrote the task, so mark it as committing. + // TODO(swang): Use a batched interface and write with all object entries. + RAY_CHECK(entry->SetStatus(GcsStatus::COMMITTING)); } bool LineageCache::SubscribeTask(const TaskID &task_id) { @@ -429,81 +347,63 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) { return subscribed; } -boost::optional LineageCache::EvictTask(const TaskID &task_id) { - RAY_LOG(DEBUG) << "evicting task " << task_id << " on " << client_id_; - auto entry = lineage_.PopEntry(task_id); - if (!entry) { - // The entry has already been evicted. Check that the entry does not have - // any dependent tasks, since we should've already attempted to flush these - // tasks on the first eviction. - RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0); - // Check that we already unsubscribed from the task when handling the - // first eviction. - RAY_CHECK(subscribed_tasks_.count(task_id) == 0); - // Do nothing if the entry has already been evicted. - return entry; +void LineageCache::EvictTask(const TaskID &task_id) { + // If we haven't received a commit for this task yet, do not evict. + auto commit_it = committed_tasks_.find(task_id); + if (commit_it == committed_tasks_.end()) { + return; } - - // Stop listening for notifications about this task. - UnsubscribeTask(task_id); - - // Try to flush the children of the committed task. These are the tasks that - // have a dependency on the committed task. - auto children_entry = uncommitted_ready_children_.find(task_id); - if (children_entry != uncommitted_ready_children_.end()) { - // Get the children of the committed task that are uncommitted but ready. - auto children = std::move(children_entry->second); - uncommitted_ready_children_.erase(children_entry); - - // Try to flush the children. If all of the child's parents are committed, - // then the child will be flushed here. - for (const auto &child_id : children) { - bool flushed = FlushTask(child_id); - // Erase the child task from the cache of uncommitted ready tasks. - if (flushed) { - auto erased = uncommitted_ready_tasks_.erase(child_id); - RAY_CHECK(erased == 1); - } - } - } - - return entry; -} - -void LineageCache::EvictRemoteLineage(const TaskID &task_id) { + // If the entry has already been evicted, exit. auto entry = lineage_.GetEntry(task_id); if (!entry) { return; } - // Only evict tasks that are remote. Other tasks, and their lineage, will be - // evicted once they are committed. - if (entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) { - // Remove the ancestor task. - auto evicted_entry = EvictTask(task_id); - // Recurse and remove this task's ancestors. - for (const auto &parent_id : evicted_entry->GetParentTaskIds()) { - EvictRemoteLineage(parent_id); + // Only evict tasks that we were subscribed to or that we were committing. + if (!(entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE || + entry->GetStatus() == GcsStatus::COMMITTING)) { + return; + } + // Entries cannot be safely evicted until their parents are all evicted. + for (const auto &parent_id : entry->GetParentTaskIds()) { + if (ContainsTask(parent_id)) { + return; } } + + // Evict the task. + RAY_LOG(DEBUG) << "evicting task " << task_id << " on " << client_id_; + lineage_.PopEntry(task_id); + committed_tasks_.erase(commit_it); + // Try to evict the children of the evict task. These are the tasks that have + // a dependency on the evicted task. + auto children_entry = children_.find(task_id); + if (children_entry != children_.end()) { + // Get the children of the evicted task. + auto children = std::move(children_entry->second); + children_.erase(children_entry); + // Try to evict the children. If all of the child's parents are evicted, + // then the child will be evicted here. + for (const auto &child_id : children) { + EvictTask(child_id); + } + } + + return; } void LineageCache::HandleEntryCommitted(const TaskID &task_id) { RAY_LOG(DEBUG) << "task committed: " << task_id; - auto entry = EvictTask(task_id); + auto entry = lineage_.GetEntry(task_id); if (!entry) { - // The task has already been evicted due to a previous commit notification, - // or because one of its descendants was committed. + // The task has already been evicted due to a previous commit notification. return; } - - // Evict the committed task's uncommitted lineage. Since local tasks are - // written in data dependency order, the uncommitted lineage should only - // include remote tasks, i.e. tasks that were committed by a different node. - // In case of reconstruction, the uncommitted lineage may also include local - // tasks that were resubmitted. These tasks are not evicted. - for (const auto &parent_id : entry->GetParentTaskIds()) { - EvictRemoteLineage(parent_id); - } + // Record the commit acknowledgement and attempt to evict the task. + committed_tasks_.insert(task_id); + EvictTask(task_id); + // We got the notification about the task's commit, so no longer need any + // more notifications. + UnsubscribeTask(task_id); } const Task &LineageCache::GetTask(const TaskID &task_id) const { @@ -519,6 +419,8 @@ bool LineageCache::ContainsTask(const TaskID &task_id) const { return it != entries.end(); } +size_t LineageCache::NumEntries() const { return lineage_.GetEntries().size(); } + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 8f4f0009f..ad9510dcb 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -184,7 +184,14 @@ class Lineage { /// \class LineageCache /// /// A cache of the task table. This consists of all tasks that this node owns, -/// as well as their lineage, that have not yet been added durably to the GCS. +/// as well as their lineage, that have not yet been added durably +/// ("committed") to the GCS. +/// +/// The current policy is to flush each task as soon as it enters the +/// UNCOMMITTED_READY state. For safety, we only evict tasks if they have been +/// committed and if their parents have been all evicted. Thus, the invariant +/// is that if g depends on f, and g has been evicted, then f must have been +/// committed. class LineageCache { public: /// Create a lineage cache for the given task storage system. @@ -242,15 +249,8 @@ class LineageCache { /// includes the entry for the requested entry_id. Lineage GetUncommittedLineage(const TaskID &task_id, const ClientID &node_id) const; - /// Asynchronously write any tasks that are in the UNCOMMITTED_READY state - /// and for which all parents have been committed to the GCS. These tasks - /// will be transitioned in this method to state COMMITTING. Once the write - /// is acknowledged, the task's state will be transitioned to state - /// COMMITTED. - void Flush(); - - /// Handle the commit of a task entry in the GCS. This sets the task to - /// COMMITTED and cleans up any ancestor tasks that are in the cache. + /// Handle the commit of a task entry in the GCS. This attempts to evict the + /// task if possible. /// /// \param task_id The ID of the task entry that was committed. void HandleEntryCommitted(const TaskID &task_id); @@ -267,35 +267,28 @@ class LineageCache { /// \return Whether the task is in the lineage cache. bool ContainsTask(const TaskID &task_id) const; + /// Get the number of entries in the lineage cache. + /// + /// \return The number of entries in the lineage cache. + size_t NumEntries() const; + private: - /// Try to flush a task that is in UNCOMMITTED_READY state. If the task has - /// parents that are not committed yet, then the child will be flushed once - /// the parents have been committed. - bool FlushTask(const TaskID &task_id); + /// Flush a task that is in UNCOMMITTED_READY state. + void FlushTask(const TaskID &task_id); /// Evict a single task. This should only be called if we are sure that the - /// task has been committed and will trigger an attempt to flush any of the - /// evicted task's children that are in UNCOMMITTED_READY state. Returns an - /// optional reference to the evicted task that is empty if the task was not - /// in the lineage cache. - boost::optional EvictTask(const TaskID &task_id); - /// Evict a remote task and its lineage. This should only be called if we - /// are sure that the remote task and its lineage are committed. - void EvictRemoteLineage(const TaskID &task_id); + /// task has been committed. The task will only be evicted if all of its + /// parents have also been evicted. If successful, then we will also attempt + /// to evict the task's children. + void EvictTask(const TaskID &task_id); /// Subscribe to notifications for a task. Returns whether the operation /// was successful (whether we were not already subscribed). bool SubscribeTask(const TaskID &task_id); /// Unsubscribe from notifications for a task. Returns whether the operation /// was successful (whether we were subscribed). bool UnsubscribeTask(const TaskID &task_id); - /// Count the size of unsubscribed and uncommitted lineage of the given task - /// excluding the values that have already been visited. - /// - /// \param task_id The task whose lineage should be counted. - /// \param seen This set contains the keys of lineage entries counted so far, - /// so that we don't revisit those nodes. - /// \void The number of tasks that were counted. - uint64_t CountUnsubscribedLineage(const TaskID &task_id, - std::unordered_set &seen) const; + /// Add a task and its uncommitted lineage to the local stash. + void AddUncommittedLineage(const TaskID &task_id, const Lineage &uncommitted_lineage, + std::unordered_set &subscribe_tasks); /// The client ID, used to request notifications for specific tasks. /// TODO(swang): Move the ClientID into the generic Table implementation. @@ -305,23 +298,10 @@ class LineageCache { /// The pubsub storage system for task information. This can be used to /// request notifications for the commit of a task entry. gcs::PubsubInterface &task_pubsub_; - /// The maximum size that a remote task's uncommitted lineage can get to. If - /// a remote task's uncommitted lineage exceeds this size, then a - /// notification will be requested from the pubsub storage system so that - /// the task and its lineage can be evicted from the stash. - uint64_t max_lineage_size_; - /// The set of tasks that are in UNCOMMITTED_READY state. This is a cache of - /// the tasks that may be flushable. - // TODO(swang): As an optimization, we may also want to further distinguish - // which tasks are flushable, to avoid iterating over tasks that are in - // UNCOMMITTED_READY, but that have dependencies that have not been committed - // yet. - std::unordered_set uncommitted_ready_tasks_; - /// A mapping from each task that hasn't been committed yet, to all dependent - /// children tasks that are in UNCOMMITTED_READY state. This is used when the - /// parent task is committed, for fast lookup of children that may now be - /// flushed. - std::unordered_map> uncommitted_ready_children_; + /// The set of tasks that have been committed but not evicted. + std::unordered_set committed_tasks_; + /// A mapping from each task in the lineage cache to its children. + std::unordered_map> children_; /// All tasks and objects that we are responsible for writing back to the /// GCS, and the tasks and objects in their lineage. Lineage lineage_; diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 10081fdc8..248de2a1c 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -221,12 +221,6 @@ TEST_F(LineageCacheTest, TestMarkTaskAsForwarded) { ASSERT_EQ(1, uncommitted_lineage_forwarded.GetEntries().size()); } -void CheckFlush(LineageCache &lineage_cache, MockGcs &mock_gcs, - size_t num_tasks_flushed) { - lineage_cache.Flush(); - ASSERT_EQ(mock_gcs.TaskTable().size(), num_tasks_flushed); -} - TEST_F(LineageCacheTest, TestWritebackNoneReady) { // Insert a chain of dependent tasks. size_t num_tasks_flushed = 0; @@ -235,7 +229,7 @@ TEST_F(LineageCacheTest, TestWritebackNoneReady) { // Check that when no tasks have been marked as ready, we do not flush any // entries. - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); } TEST_F(LineageCacheTest, TestWritebackReady) { @@ -247,69 +241,124 @@ TEST_F(LineageCacheTest, TestWritebackReady) { // Check that after marking the first task as ready, we flush only that task. ASSERT_TRUE(lineage_cache_.AddReadyTask(tasks.front())); num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); } TEST_F(LineageCacheTest, TestWritebackOrder) { // Insert a chain of dependent tasks. - size_t num_tasks_flushed = 0; std::vector tasks; InsertTaskChain(lineage_cache_, tasks, 3, std::vector(), 1); + size_t num_tasks_flushed = tasks.size(); - // Mark all tasks as ready. The first task, which has no dependencies, should - // be flushed. + // Mark all tasks as ready. All tasks should be flushed. for (const auto &task : tasks) { ASSERT_TRUE(lineage_cache_.AddReadyTask(task)); } - // Check that we write back the tasks in order of data dependencies. - for (size_t i = 0; i < tasks.size(); i++) { - num_tasks_flushed++; - ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); - // Flush acknowledgements. The next task should have been flushed. - mock_gcs_.Flush(); - } + + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); } -TEST_F(LineageCacheTest, TestWritebackPartiallyReady) { - // Create two independent tasks, task1 and task2, and a dependent task - // that depends on both tasks. +TEST_F(LineageCacheTest, TestEvictChain) { + // Create a chain of 3 tasks. size_t num_tasks_flushed = 0; - auto task1 = ExampleTask({}, 1); - auto task2 = ExampleTask({}, 1); - std::vector returns; - for (int64_t i = 0; i < task1.GetTaskSpecification().NumReturns(); i++) { - returns.push_back(task1.GetTaskSpecification().ReturnId(i)); + std::vector tasks; + std::vector arguments; + for (int i = 0; i < 3; i++) { + auto task = ExampleTask(arguments, 1); + tasks.push_back(task); + arguments = {task.GetTaskSpecification().ReturnId(0)}; } - for (int64_t i = 0; i < task2.GetTaskSpecification().NumReturns(); i++) { - returns.push_back(task2.GetTaskSpecification().ReturnId(i)); + + Lineage uncommitted_lineage; + for (const auto &task : tasks) { + uncommitted_lineage.SetEntry(task, GcsStatus::UNCOMMITTED_REMOTE); } - auto dependent_task = ExampleTask(returns, 1); - - // Insert all tasks as waiting for execution. - ASSERT_TRUE(lineage_cache_.AddWaitingTask(task1, Lineage())); - ASSERT_TRUE(lineage_cache_.AddWaitingTask(task2, Lineage())); - ASSERT_TRUE(lineage_cache_.AddWaitingTask(dependent_task, Lineage())); - - // Flush one of the independent tasks. - ASSERT_TRUE(lineage_cache_.AddReadyTask(task1)); - num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - // Flush acknowledgements. The lineage cache should receive the commit for - // the first task. - mock_gcs_.Flush(); - // Mark the other independent task and the dependent as ready. - ASSERT_TRUE(lineage_cache_.AddReadyTask(task2)); - ASSERT_TRUE(lineage_cache_.AddReadyTask(dependent_task)); - // Two tasks are ready, but only the independent task should be flushed. The - // dependent task should only be flushed once commits for both independent - // tasks are received. - num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - // Flush acknowledgements. Both independent tasks should now be committed. - mock_gcs_.Flush(); - // The dependent task should now be flushed. + // Mark the last task as ready to flush. + ASSERT_TRUE(lineage_cache_.AddWaitingTask(tasks.back(), uncommitted_lineage)); + ASSERT_EQ(lineage_cache_.NumEntries(), tasks.size()); + ASSERT_TRUE(lineage_cache_.AddReadyTask(tasks.back())); num_tasks_flushed++; ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); + // Flush acknowledgements. The lineage cache should receive the commit for + // the flushed task, but its lineage should not be evicted yet. + mock_gcs_.Flush(); + ASSERT_EQ(lineage_cache_ + .GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(), + ClientID::nil()) + .GetEntries() + .size(), + tasks.size()); + ASSERT_EQ(lineage_cache_.NumEntries(), tasks.size()); + + // Simulate executing the task on a remote node and adding it to the GCS. + auto task_data = std::make_shared(); + RAY_CHECK_OK( + mock_gcs_.RemoteAdd(tasks.at(1).GetTaskSpecification().TaskId(), task_data)); + mock_gcs_.Flush(); + ASSERT_EQ(lineage_cache_ + .GetUncommittedLineage(tasks.back().GetTaskSpecification().TaskId(), + ClientID::nil()) + .GetEntries() + .size(), + tasks.size()); + ASSERT_EQ(lineage_cache_.NumEntries(), tasks.size()); + + // Simulate executing the task on a remote node and adding it to the GCS. + RAY_CHECK_OK( + mock_gcs_.RemoteAdd(tasks.at(0).GetTaskSpecification().TaskId(), task_data)); + mock_gcs_.Flush(); + ASSERT_EQ(lineage_cache_.NumEntries(), 0); +} + +TEST_F(LineageCacheTest, TestEvictManyParents) { + // Create some independent tasks. + std::vector parent_tasks; + std::vector arguments; + for (int i = 0; i < 10; i++) { + auto task = ExampleTask({}, 1); + parent_tasks.push_back(task); + arguments.push_back(task.GetTaskSpecification().ReturnId(0)); + ASSERT_TRUE(lineage_cache_.AddWaitingTask(task, Lineage())); + } + // Create a child task that is dependent on all of the previous tasks. + auto child_task = ExampleTask(arguments, 1); + ASSERT_TRUE(lineage_cache_.AddWaitingTask(child_task, Lineage())); + + // Flush the child task. Make sure that it remains in the cache, since none + // of its parents have been committed yet, and that the uncommitted lineage + // still includes all of the parent tasks. + size_t total_tasks = parent_tasks.size() + 1; + lineage_cache_.AddReadyTask(child_task); + mock_gcs_.Flush(); + ASSERT_EQ(lineage_cache_.NumEntries(), total_tasks); + ASSERT_EQ(lineage_cache_ + .GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(), + ClientID::nil()) + .GetEntries() + .size(), + total_tasks); + + // Flush each parent task and check for eviction safety. + for (const auto &parent_task : parent_tasks) { + lineage_cache_.AddReadyTask(parent_task); + mock_gcs_.Flush(); + total_tasks--; + if (total_tasks > 1) { + // Each task should be evicted as soon as its commit is acknowledged, + // since the parent tasks have no dependencies. + ASSERT_EQ(lineage_cache_.NumEntries(), total_tasks); + ASSERT_EQ(lineage_cache_ + .GetUncommittedLineage(child_task.GetTaskSpecification().TaskId(), + ClientID::nil()) + .GetEntries() + .size(), + total_tasks); + } else { + // After the last task has been committed, then the child task should + // also be evicted. The lineage cache should now be empty. + ASSERT_EQ(lineage_cache_.NumEntries(), 0); + } + } } TEST_F(LineageCacheTest, TestForwardTasksRoundTrip) { @@ -350,16 +399,19 @@ TEST_F(LineageCacheTest, TestForwardTask) { auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(task_id_to_remove, ClientID::nil()); ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id_to_remove)); + ASSERT_EQ(lineage_cache_.NumEntries(), 3); // Simulate executing the remaining tasks. for (const auto &task : tasks) { ASSERT_TRUE(lineage_cache_.AddReadyTask(task)); + num_tasks_flushed++; } // Check that the first task, which has no dependencies can be flushed. The // last task cannot be flushed since one of its dependencies has not been // added by the remote node yet. - num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); + mock_gcs_.Flush(); + ASSERT_EQ(lineage_cache_.NumEntries(), 2); // Simulate executing the task on a remote node and adding it to the GCS. auto task_data = std::make_shared(); @@ -367,15 +419,14 @@ TEST_F(LineageCacheTest, TestForwardTask) { mock_gcs_.RemoteAdd(forwarded_task.GetTaskSpecification().TaskId(), task_data)); // Check that the remote task is flushed. num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); ASSERT_EQ(mock_gcs_.SubscribedTasks().size(), 1); // Check that once we receive the callback for the remote task, we can now // flush the last task. mock_gcs_.Flush(); - num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); ASSERT_EQ(mock_gcs_.SubscribedTasks().size(), 0); + ASSERT_EQ(lineage_cache_.NumEntries(), 0); } TEST_F(LineageCacheTest, TestEviction) { @@ -407,10 +458,11 @@ TEST_F(LineageCacheTest, TestEviction) { // Check that the remote task is flushed. num_tasks_flushed++; mock_gcs_.Flush(); - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); // Check that the last task in the chain still has all tasks in its // uncommitted lineage. ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); + ASSERT_EQ(lineage_cache_.NumEntries(), lineage_size - num_tasks_flushed); // Simulate executing all the rest of the tasks except the last one on a // remote node and adding them to the GCS. @@ -420,7 +472,8 @@ TEST_F(LineageCacheTest, TestEviction) { // Check that the remote task is flushed. num_tasks_flushed++; mock_gcs_.Flush(); - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); + ASSERT_EQ(lineage_cache_.NumEntries(), lineage_size - num_tasks_flushed); } // All tasks have now been flushed. Check that enough lineage has been // evicted that the uncommitted lineage is now less than the maximum size. @@ -445,8 +498,6 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { auto task_id = task.GetTaskSpecification().TaskId(); ASSERT_TRUE(lineage_cache_.RemoveWaitingTask(task_id)); } - // Check that we requested at most 2 notifications - ASSERT_TRUE(mock_gcs_.NumRequestedNotifications() <= 2); // Check that the last task in the chain still has all tasks in its // uncommitted lineage. @@ -454,37 +505,29 @@ TEST_F(LineageCacheTest, TestOutOfOrderEviction) { auto uncommitted_lineage = lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil()); ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); - - // Simulate executing all the rest of the tasks except the last one at the - // remote node. Simulate receiving the notifications from the GCS in reverse - // order of execution. - tasks.pop_back(); - auto task_data = std::make_shared(); - auto it = tasks.rbegin(); - RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data)); - it++; - // Check that the remote task is flushed. - num_tasks_flushed++; - mock_gcs_.Flush(); - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - // Check that the last task in the chain still has all tasks in its - // uncommitted lineage. - ASSERT_EQ(uncommitted_lineage.GetEntries().size(), lineage_size); + ASSERT_EQ(lineage_cache_.NumEntries(), lineage_size); // Simulate executing the rest of the tasks on a remote node and receiving // the notifications from the GCS in reverse order of execution. - for (; it != tasks.rend(); it++) { + auto last_task = tasks.front(); + tasks.erase(tasks.begin()); + for (auto it = tasks.rbegin(); it != tasks.rend(); it++) { + auto task_data = std::make_shared(); RAY_CHECK_OK(mock_gcs_.RemoteAdd(it->GetTaskSpecification().TaskId(), task_data)); // Check that the remote task is flushed. num_tasks_flushed++; mock_gcs_.Flush(); - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); + ASSERT_EQ(lineage_cache_.NumEntries(), lineage_size); } - // All tasks have now been flushed. Check that enough lineage has been - // evicted that the uncommitted lineage is now less than the maximum size. - uncommitted_lineage = - lineage_cache_.GetUncommittedLineage(last_task_id, ClientID::nil()); - ASSERT_TRUE(uncommitted_lineage.GetEntries().size() < max_lineage_size_); + // Flush the last task. The lineage should not get evicted until this task's + // commit is received. + auto task_data = std::make_shared(); + RAY_CHECK_OK(mock_gcs_.RemoteAdd(last_task.GetTaskSpecification().TaskId(), task_data)); + num_tasks_flushed++; + mock_gcs_.Flush(); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); + ASSERT_EQ(lineage_cache_.NumEntries(), 0); } TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) { @@ -524,7 +567,7 @@ TEST_F(LineageCacheTest, TestEvictionUncommittedChildren) { // tasks are flushed, since all of their dependencies have been evicted and // are therefore committed in the GCS. mock_gcs_.Flush(); - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); } } // namespace raylet