From 166000b089ee15d44635ebca00f12320f51ce587 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Mon, 28 May 2018 21:03:15 -0700 Subject: [PATCH] [xray] Improve flush algorithm for the lineage cache (#2130) * Private method to flush a single task from the lineage cache * Track parent->child relationships for faster flushing * doc * Only flush the newly ready task * Flush() returns void * x --- src/ray/raylet/lineage_cache.cc | 139 ++++++++++++++++----------- src/ray/raylet/lineage_cache.h | 22 +++-- src/ray/raylet/lineage_cache_test.cc | 20 ++-- 3 files changed, 109 insertions(+), 72 deletions(-) diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index d5ac1c931..a5eb91a6b 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -187,12 +187,14 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l void LineageCache::AddReadyTask(const Task &task) { auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY); RAY_CHECK(lineage_.SetEntry(std::move(new_entry))); - // Add the task to the cache of tasks that may be flushed. - uncommitted_ready_tasks_.insert(task.GetTaskSpecification().TaskId()); - - // Try to flush the task to the GCS. - // TODO(swang): Allow a pluggable policy for when to flush. - RAY_CHECK_OK(Flush()); + const TaskID task_id = task.GetTaskSpecification().TaskId(); + // Attempt to flush the task. + bool flushed = FlushTask(task_id); + if (!flushed) { + // If we fail to flush the task here, due to uncommitted parents, then add + // the task to a cache to be flushed in the future. + uncommitted_ready_tasks_.insert(task_id); + } } void LineageCache::RemoveWaitingTask(const TaskID &task_id) { @@ -219,57 +221,49 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const { return uncommitted_lineage; } -Status LineageCache::Flush() { - // Iterate through all tasks that are READY. - std::vector ready_task_ids; - for (const auto &task_id : uncommitted_ready_tasks_) { - auto entry = lineage_.GetEntry(task_id); - RAY_CHECK(entry); - RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY); +bool LineageCache::FlushTask(const TaskID &task_id) { + auto entry = lineage_.GetEntry(task_id); + RAY_CHECK(entry); + RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY); - // Check if all arguments have been committed to the GCS before writing - // this task. - bool all_arguments_committed = true; - for (const auto &parent_id : entry->GetParentTaskIds()) { - auto parent = lineage_.GetEntry(parent_id); - // If a parent entry exists in the lineage cache but has not been - // committed yet, then as far as we know, it's still in flight to the - // GCS. Skip this task for now. - if (parent && parent->GetStatus() != GcsStatus_COMMITTED) { - RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING) - << "Children should not become ready to flush before their parents."; - // Request notifications about the parent entry's commit in the GCS if - // the parent is remote. Otherwise, the parent is local and will - // eventually be flushed. In either case, once we receive a - // notification about the task's commit via HandleEntryCommitted, then - // this task will be ready to write on the next call to Flush(). - if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) { - auto inserted = subscribed_tasks_.insert(parent_id); - if (inserted.second) { - // Only request notifications about the parent entry if we haven't - // already requested notifications for it. - RAY_CHECK_OK( - task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_)); - } + // Check if all arguments have been committed to the GCS before writing + // this task. + bool all_arguments_committed = true; + for (const auto &parent_id : entry->GetParentTaskIds()) { + auto parent = lineage_.GetEntry(parent_id); + // If a parent entry exists in the lineage cache but has not been + // committed yet, then as far as we know, it's still in flight to the + // GCS. Skip this task for now. + if (parent && parent->GetStatus() != GcsStatus_COMMITTED) { + RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING) + << "Children should not become ready to flush before their parents."; + // Request notifications about the parent entry's commit in the GCS if + // the parent is remote. Otherwise, the parent is local and will + // eventually be flushed. In either case, once we receive a + // notification about the task's commit via HandleEntryCommitted, then + // this task will be ready to write on the next call to Flush(). + if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) { + auto inserted = subscribed_tasks_.insert(parent_id); + if (inserted.second) { + // Only request notifications about the parent entry if we haven't + // already requested notifications for it. + RAY_CHECK_OK( + task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_)); } - all_arguments_committed = false; - break; } - } - if (all_arguments_committed) { - // All arguments have been committed to the GCS. Add this task to the - // list of tasks to write back to the GCS. - ready_task_ids.push_back(task_id); + all_arguments_committed = false; + // Track the fact that this task is dependent on a parent that hasn't yet + // been committed, for fast lookup. Once all parents are committed, the + // child will be flushed. + uncommitted_ready_children_[parent_id].insert(task_id); } } - - // Write back all ready tasks whose arguments have been committed to the GCS. - gcs::raylet::TaskTable::WriteCallback task_callback = [this]( - ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) { - HandleEntryCommitted(id); - }; - for (const auto &ready_task_id : ready_task_ids) { - auto task = lineage_.GetEntry(ready_task_id); + if (all_arguments_committed) { + gcs::raylet::TaskTable::WriteCallback task_callback = [this]( + ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) { + HandleEntryCommitted(id); + }; + auto task = lineage_.GetEntry(task_id); // TODO(swang): Make this better... flatbuffers::FlatBufferBuilder fbb; auto message = task->TaskData().ToFlatbuffer(fbb); @@ -278,18 +272,29 @@ Status LineageCache::Flush() { auto root = flatbuffers::GetRoot(fbb.GetBufferPointer()); root->UnPackTo(task_data.get()); RAY_CHECK_OK(task_storage_.Add(task->TaskData().GetTaskSpecification().DriverId(), - ready_task_id, task_data, task_callback)); + task_id, task_data, task_callback)); // We successfully wrote the task, so mark it as committing. // TODO(swang): Use a batched interface and write with all object entries. - auto entry = lineage_.PopEntry(ready_task_id); + auto entry = lineage_.PopEntry(task_id); RAY_CHECK(entry->SetStatus(GcsStatus_COMMITTING)); RAY_CHECK(lineage_.SetEntry(std::move(*entry))); - // Erase the task from the cache of uncommitted ready tasks. - uncommitted_ready_tasks_.erase(ready_task_id); } + return all_arguments_committed; +} - return ray::Status::OK(); +void LineageCache::Flush() { + // Iterate through all tasks that are READY. + for (auto it = uncommitted_ready_tasks_.begin(); + it != uncommitted_ready_tasks_.end();) { + bool flushed = FlushTask(*it); + // Erase the task from the cache of uncommitted ready tasks. + if (flushed) { + it = uncommitted_ready_tasks_.erase(it); + } else { + it++; + } + } } void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) { @@ -328,6 +333,26 @@ void LineageCache::HandleEntryCommitted(const UniqueID &task_id) { RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_)); subscribed_tasks_.erase(it); } + + // Try to flush the children of the committed task. These are the tasks that + // have a dependency on the committed task. + auto children_entry = uncommitted_ready_children_.find(task_id); + if (children_entry != uncommitted_ready_children_.end()) { + // Get the children of the committed task that are uncommitted but ready. + auto children = std::move(children_entry->second); + uncommitted_ready_children_.erase(children_entry); + + // Try to flush the children. If all of the child's parents are committed, + // then the child will be flushed here. + for (const auto &child_id : children) { + bool flushed = FlushTask(child_id); + // Erase the child task from the cache of uncommitted ready tasks. + if (flushed) { + auto erased = uncommitted_ready_tasks_.erase(child_id); + RAY_CHECK(erased == 1); + } + } + } } } // namespace raylet diff --git a/src/ray/raylet/lineage_cache.h b/src/ray/raylet/lineage_cache.h index 44b9b62f4..c4ccf92c2 100644 --- a/src/ray/raylet/lineage_cache.h +++ b/src/ray/raylet/lineage_cache.h @@ -196,12 +196,12 @@ class LineageCache { /// includes the entry for the requested entry_id. Lineage GetUncommittedLineage(const TaskID &entry_id) const; - /// Asynchronously write any tasks that have been added since the last flush - /// to the GCS. When each write is acknowledged, its entry will be marked as - /// committed. - /// - /// \return Status. - Status Flush(); + /// Asynchronously write any tasks that are in the UNCOMMITTED_READY state + /// and for which all parents have been committed to the GCS. These tasks + /// will be transitioned in this method to state COMMITTING. Once the write + /// is acknowledged, the task's state will be transitioned to state + /// COMMITTED. + void Flush(); /// Handle the commit of a task entry in the GCS. This sets the task to /// COMMITTED and cleans up any ancestor tasks that are in the cache. @@ -210,6 +210,11 @@ class LineageCache { void HandleEntryCommitted(const TaskID &task_id); private: + /// Try to flush a task that is in UNCOMMITTED_READY state. If the task has + /// parents that are not committed yet, then the child will be flushed once + /// the parents have been committed. + bool FlushTask(const TaskID &task_id); + /// The client ID, used to request notifications for specific tasks. /// TODO(swang): Move the ClientID into the generic Table implementation. ClientID client_id_; @@ -225,6 +230,11 @@ class LineageCache { // UNCOMMITTED_READY, but that have dependencies that have not been committed // yet. std::unordered_set uncommitted_ready_tasks_; + /// A mapping from each task that hasn't been committed yet, to all dependent + /// children tasks that are in UNCOMMITTED_READY state. This is used when the + /// parent task is committed, for fast lookup of children that may now be + /// flushed. + std::unordered_map> uncommitted_ready_children_; /// All tasks and objects that we are responsible for writing back to the /// GCS, and the tasks and objects in their lineage. Lineage lineage_; diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index 631767346..12b24ba35 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -62,10 +62,11 @@ class MockGcs : public gcs::TableInterface, } void Flush() { - for (const auto &callback : callbacks_) { + auto callbacks = std::move(callbacks_); + callbacks_.clear(); + for (const auto &callback : callbacks) { callback.first(NULL, callback.second, *task_table_[callback.second]); } - callbacks_.clear(); } const std::unordered_map> &TaskTable() const { @@ -183,7 +184,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) { void CheckFlush(LineageCache &lineage_cache, MockGcs &mock_gcs, size_t num_tasks_flushed) { - RAY_CHECK_OK(lineage_cache.Flush()); + lineage_cache.Flush(); ASSERT_EQ(mock_gcs.TaskTable().size(), num_tasks_flushed); } @@ -219,15 +220,16 @@ TEST_F(LineageCacheTest, TestWritebackOrder) { auto return_values1 = InsertTaskChain(lineage_cache_, tasks, 3, std::vector(), 1); - // Mark all tasks as ready. + // Mark all tasks as ready. The first task, which has no dependencies, should + // be flushed. for (const auto &task : tasks) { lineage_cache_.AddReadyTask(task); } // Check that we write back the tasks in order of data dependencies. for (size_t i = 0; i < tasks.size(); i++) { num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - // Flush acknowledgements. The next task should be able to be written. + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); + // Flush acknowledgements. The next task should have been flushed. mock_gcs_.Flush(); } } @@ -268,11 +270,11 @@ TEST_F(LineageCacheTest, TestWritebackPartiallyReady) { // tasks are received. num_tasks_flushed++; CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); - // Flush acknowledgements. The dependent task should now be able to be - // written. + // Flush acknowledgements. Both independent tasks should now be committed. mock_gcs_.Flush(); + // The dependent task should now be flushed. num_tasks_flushed++; - CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); + ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed); } TEST_F(LineageCacheTest, TestForwardTaskRoundTrip) {