[xray] Improve flush algorithm for the lineage cache (#2130)

* Private method to flush a single task from the lineage cache

* Track parent->child relationships for faster flushing

* doc

* Only flush the newly ready task

* Flush() returns void

* x
This commit is contained in:
Stephanie Wang 2018-05-28 21:03:15 -07:00 committed by GitHub
parent bc2a83e698
commit 166000b089
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 109 additions and 72 deletions

View file

@ -187,12 +187,14 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l
void LineageCache::AddReadyTask(const Task &task) { void LineageCache::AddReadyTask(const Task &task) {
auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY); auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY);
RAY_CHECK(lineage_.SetEntry(std::move(new_entry))); RAY_CHECK(lineage_.SetEntry(std::move(new_entry)));
// Add the task to the cache of tasks that may be flushed. const TaskID task_id = task.GetTaskSpecification().TaskId();
uncommitted_ready_tasks_.insert(task.GetTaskSpecification().TaskId()); // Attempt to flush the task.
bool flushed = FlushTask(task_id);
// Try to flush the task to the GCS. if (!flushed) {
// TODO(swang): Allow a pluggable policy for when to flush. // If we fail to flush the task here, due to uncommitted parents, then add
RAY_CHECK_OK(Flush()); // the task to a cache to be flushed in the future.
uncommitted_ready_tasks_.insert(task_id);
}
} }
void LineageCache::RemoveWaitingTask(const TaskID &task_id) { void LineageCache::RemoveWaitingTask(const TaskID &task_id) {
@ -219,57 +221,49 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const {
return uncommitted_lineage; return uncommitted_lineage;
} }
Status LineageCache::Flush() { bool LineageCache::FlushTask(const TaskID &task_id) {
// Iterate through all tasks that are READY. auto entry = lineage_.GetEntry(task_id);
std::vector<TaskID> ready_task_ids; RAY_CHECK(entry);
for (const auto &task_id : uncommitted_ready_tasks_) { RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY);
auto entry = lineage_.GetEntry(task_id);
RAY_CHECK(entry);
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY);
// Check if all arguments have been committed to the GCS before writing // Check if all arguments have been committed to the GCS before writing
// this task. // this task.
bool all_arguments_committed = true; bool all_arguments_committed = true;
for (const auto &parent_id : entry->GetParentTaskIds()) { for (const auto &parent_id : entry->GetParentTaskIds()) {
auto parent = lineage_.GetEntry(parent_id); auto parent = lineage_.GetEntry(parent_id);
// If a parent entry exists in the lineage cache but has not been // If a parent entry exists in the lineage cache but has not been
// committed yet, then as far as we know, it's still in flight to the // committed yet, then as far as we know, it's still in flight to the
// GCS. Skip this task for now. // GCS. Skip this task for now.
if (parent && parent->GetStatus() != GcsStatus_COMMITTED) { if (parent && parent->GetStatus() != GcsStatus_COMMITTED) {
RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING) RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING)
<< "Children should not become ready to flush before their parents."; << "Children should not become ready to flush before their parents.";
// Request notifications about the parent entry's commit in the GCS if // Request notifications about the parent entry's commit in the GCS if
// the parent is remote. Otherwise, the parent is local and will // the parent is remote. Otherwise, the parent is local and will
// eventually be flushed. In either case, once we receive a // eventually be flushed. In either case, once we receive a
// notification about the task's commit via HandleEntryCommitted, then // notification about the task's commit via HandleEntryCommitted, then
// this task will be ready to write on the next call to Flush(). // this task will be ready to write on the next call to Flush().
if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) { if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) {
auto inserted = subscribed_tasks_.insert(parent_id); auto inserted = subscribed_tasks_.insert(parent_id);
if (inserted.second) { if (inserted.second) {
// Only request notifications about the parent entry if we haven't // Only request notifications about the parent entry if we haven't
// already requested notifications for it. // already requested notifications for it.
RAY_CHECK_OK( RAY_CHECK_OK(
task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_)); task_pubsub_.RequestNotifications(JobID::nil(), parent_id, client_id_));
}
} }
all_arguments_committed = false;
break;
} }
} all_arguments_committed = false;
if (all_arguments_committed) { // Track the fact that this task is dependent on a parent that hasn't yet
// All arguments have been committed to the GCS. Add this task to the // been committed, for fast lookup. Once all parents are committed, the
// list of tasks to write back to the GCS. // child will be flushed.
ready_task_ids.push_back(task_id); uncommitted_ready_children_[parent_id].insert(task_id);
} }
} }
if (all_arguments_committed) {
// Write back all ready tasks whose arguments have been committed to the GCS. gcs::raylet::TaskTable::WriteCallback task_callback = [this](
gcs::raylet::TaskTable::WriteCallback task_callback = [this]( ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) {
ray::gcs::AsyncGcsClient *client, const TaskID &id, const protocol::TaskT &data) { HandleEntryCommitted(id);
HandleEntryCommitted(id); };
}; auto task = lineage_.GetEntry(task_id);
for (const auto &ready_task_id : ready_task_ids) {
auto task = lineage_.GetEntry(ready_task_id);
// TODO(swang): Make this better... // TODO(swang): Make this better...
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = task->TaskData().ToFlatbuffer(fbb); auto message = task->TaskData().ToFlatbuffer(fbb);
@ -278,18 +272,29 @@ Status LineageCache::Flush() {
auto root = flatbuffers::GetRoot<protocol::Task>(fbb.GetBufferPointer()); auto root = flatbuffers::GetRoot<protocol::Task>(fbb.GetBufferPointer());
root->UnPackTo(task_data.get()); root->UnPackTo(task_data.get());
RAY_CHECK_OK(task_storage_.Add(task->TaskData().GetTaskSpecification().DriverId(), RAY_CHECK_OK(task_storage_.Add(task->TaskData().GetTaskSpecification().DriverId(),
ready_task_id, task_data, task_callback)); task_id, task_data, task_callback));
// We successfully wrote the task, so mark it as committing. // We successfully wrote the task, so mark it as committing.
// TODO(swang): Use a batched interface and write with all object entries. // TODO(swang): Use a batched interface and write with all object entries.
auto entry = lineage_.PopEntry(ready_task_id); auto entry = lineage_.PopEntry(task_id);
RAY_CHECK(entry->SetStatus(GcsStatus_COMMITTING)); RAY_CHECK(entry->SetStatus(GcsStatus_COMMITTING));
RAY_CHECK(lineage_.SetEntry(std::move(*entry))); RAY_CHECK(lineage_.SetEntry(std::move(*entry)));
// Erase the task from the cache of uncommitted ready tasks.
uncommitted_ready_tasks_.erase(ready_task_id);
} }
return all_arguments_committed;
}
return ray::Status::OK(); void LineageCache::Flush() {
// Iterate through all tasks that are READY.
for (auto it = uncommitted_ready_tasks_.begin();
it != uncommitted_ready_tasks_.end();) {
bool flushed = FlushTask(*it);
// Erase the task from the cache of uncommitted ready tasks.
if (flushed) {
it = uncommitted_ready_tasks_.erase(it);
} else {
it++;
}
}
} }
void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) { void PopAncestorTasks(const UniqueID &task_id, Lineage &lineage) {
@ -328,6 +333,26 @@ void LineageCache::HandleEntryCommitted(const UniqueID &task_id) {
RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_)); RAY_CHECK_OK(task_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_));
subscribed_tasks_.erase(it); subscribed_tasks_.erase(it);
} }
// Try to flush the children of the committed task. These are the tasks that
// have a dependency on the committed task.
auto children_entry = uncommitted_ready_children_.find(task_id);
if (children_entry != uncommitted_ready_children_.end()) {
// Get the children of the committed task that are uncommitted but ready.
auto children = std::move(children_entry->second);
uncommitted_ready_children_.erase(children_entry);
// Try to flush the children. If all of the child's parents are committed,
// then the child will be flushed here.
for (const auto &child_id : children) {
bool flushed = FlushTask(child_id);
// Erase the child task from the cache of uncommitted ready tasks.
if (flushed) {
auto erased = uncommitted_ready_tasks_.erase(child_id);
RAY_CHECK(erased == 1);
}
}
}
} }
} // namespace raylet } // namespace raylet

View file

@ -196,12 +196,12 @@ class LineageCache {
/// includes the entry for the requested entry_id. /// includes the entry for the requested entry_id.
Lineage GetUncommittedLineage(const TaskID &entry_id) const; Lineage GetUncommittedLineage(const TaskID &entry_id) const;
/// Asynchronously write any tasks that have been added since the last flush /// Asynchronously write any tasks that are in the UNCOMMITTED_READY state
/// to the GCS. When each write is acknowledged, its entry will be marked as /// and for which all parents have been committed to the GCS. These tasks
/// committed. /// will be transitioned in this method to state COMMITTING. Once the write
/// /// is acknowledged, the task's state will be transitioned to state
/// \return Status. /// COMMITTED.
Status Flush(); void Flush();
/// Handle the commit of a task entry in the GCS. This sets the task to /// Handle the commit of a task entry in the GCS. This sets the task to
/// COMMITTED and cleans up any ancestor tasks that are in the cache. /// COMMITTED and cleans up any ancestor tasks that are in the cache.
@ -210,6 +210,11 @@ class LineageCache {
void HandleEntryCommitted(const TaskID &task_id); void HandleEntryCommitted(const TaskID &task_id);
private: private:
/// Try to flush a task that is in UNCOMMITTED_READY state. If the task has
/// parents that are not committed yet, then the child will be flushed once
/// the parents have been committed.
bool FlushTask(const TaskID &task_id);
/// The client ID, used to request notifications for specific tasks. /// The client ID, used to request notifications for specific tasks.
/// TODO(swang): Move the ClientID into the generic Table implementation. /// TODO(swang): Move the ClientID into the generic Table implementation.
ClientID client_id_; ClientID client_id_;
@ -225,6 +230,11 @@ class LineageCache {
// UNCOMMITTED_READY, but that have dependencies that have not been committed // UNCOMMITTED_READY, but that have dependencies that have not been committed
// yet. // yet.
std::unordered_set<TaskID> uncommitted_ready_tasks_; std::unordered_set<TaskID> uncommitted_ready_tasks_;
/// A mapping from each task that hasn't been committed yet, to all dependent
/// children tasks that are in UNCOMMITTED_READY state. This is used when the
/// parent task is committed, for fast lookup of children that may now be
/// flushed.
std::unordered_map<TaskID, std::unordered_set<TaskID>> uncommitted_ready_children_;
/// All tasks and objects that we are responsible for writing back to the /// All tasks and objects that we are responsible for writing back to the
/// GCS, and the tasks and objects in their lineage. /// GCS, and the tasks and objects in their lineage.
Lineage lineage_; Lineage lineage_;

View file

@ -62,10 +62,11 @@ class MockGcs : public gcs::TableInterface<TaskID, protocol::Task>,
} }
void Flush() { void Flush() {
for (const auto &callback : callbacks_) { auto callbacks = std::move(callbacks_);
callbacks_.clear();
for (const auto &callback : callbacks) {
callback.first(NULL, callback.second, *task_table_[callback.second]); callback.first(NULL, callback.second, *task_table_[callback.second]);
} }
callbacks_.clear();
} }
const std::unordered_map<TaskID, std::shared_ptr<protocol::TaskT>> &TaskTable() const { const std::unordered_map<TaskID, std::shared_ptr<protocol::TaskT>> &TaskTable() const {
@ -183,7 +184,7 @@ TEST_F(LineageCacheTest, TestGetUncommittedLineage) {
void CheckFlush(LineageCache &lineage_cache, MockGcs &mock_gcs, void CheckFlush(LineageCache &lineage_cache, MockGcs &mock_gcs,
size_t num_tasks_flushed) { size_t num_tasks_flushed) {
RAY_CHECK_OK(lineage_cache.Flush()); lineage_cache.Flush();
ASSERT_EQ(mock_gcs.TaskTable().size(), num_tasks_flushed); ASSERT_EQ(mock_gcs.TaskTable().size(), num_tasks_flushed);
} }
@ -219,15 +220,16 @@ TEST_F(LineageCacheTest, TestWritebackOrder) {
auto return_values1 = auto return_values1 =
InsertTaskChain(lineage_cache_, tasks, 3, std::vector<ObjectID>(), 1); InsertTaskChain(lineage_cache_, tasks, 3, std::vector<ObjectID>(), 1);
// Mark all tasks as ready. // Mark all tasks as ready. The first task, which has no dependencies, should
// be flushed.
for (const auto &task : tasks) { for (const auto &task : tasks) {
lineage_cache_.AddReadyTask(task); lineage_cache_.AddReadyTask(task);
} }
// Check that we write back the tasks in order of data dependencies. // Check that we write back the tasks in order of data dependencies.
for (size_t i = 0; i < tasks.size(); i++) { for (size_t i = 0; i < tasks.size(); i++) {
num_tasks_flushed++; num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
// Flush acknowledgements. The next task should be able to be written. // Flush acknowledgements. The next task should have been flushed.
mock_gcs_.Flush(); mock_gcs_.Flush();
} }
} }
@ -268,11 +270,11 @@ TEST_F(LineageCacheTest, TestWritebackPartiallyReady) {
// tasks are received. // tasks are received.
num_tasks_flushed++; num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed);
// Flush acknowledgements. The dependent task should now be able to be // Flush acknowledgements. Both independent tasks should now be committed.
// written.
mock_gcs_.Flush(); mock_gcs_.Flush();
// The dependent task should now be flushed.
num_tasks_flushed++; num_tasks_flushed++;
CheckFlush(lineage_cache_, mock_gcs_, num_tasks_flushed); ASSERT_EQ(mock_gcs_.TaskTable().size(), num_tasks_flushed);
} }
TEST_F(LineageCacheTest, TestForwardTaskRoundTrip) { TEST_F(LineageCacheTest, TestForwardTaskRoundTrip) {