[Issues 2403][xray] Fix raylet performance issues on scheduling queue (#2438)

* merge from ray
* Revert "merge from ray"
This reverts commit 32b181ebbb1fa184026631e1a7368112c4c3118d.
* fix raylet performance regression
* address comments
* Update code after merging latest changes
* fix lint
* address comments
This commit is contained in:
Zhijun Fu 2018-08-02 05:41:20 +08:00 committed by Alexey Tumanov
parent 89f60e39f3
commit ca36827f01
2 changed files with 116 additions and 26 deletions

View file

@ -6,15 +6,12 @@ namespace {
// Helper function to remove tasks in the given set of task_ids from a
// queue, and append them to the given vector removed_tasks.
void RemoveTasksFromQueue(std::list<ray::raylet::Task> &queue,
void RemoveTasksFromQueue(ray::raylet::SchedulingQueue::TaskQueue &queue,
std::unordered_set<ray::TaskID> &task_ids,
std::vector<ray::raylet::Task> &removed_tasks) {
for (auto it = queue.begin(); it != queue.end();) {
auto task_id = task_ids.find(it->GetTaskSpecification().TaskId());
if (task_id != task_ids.end()) {
task_ids.erase(task_id);
removed_tasks.push_back(std::move(*it));
it = queue.erase(it);
for (auto it = task_ids.begin(); it != task_ids.end();) {
if (queue.RemoveTask(*it, removed_tasks)) {
it = task_ids.erase(it);
} else {
it++;
}
@ -22,19 +19,22 @@ void RemoveTasksFromQueue(std::list<ray::raylet::Task> &queue,
}
// Helper function to queue the given tasks to the given queue.
inline void QueueTasks(std::list<ray::raylet::Task> &queue,
inline void QueueTasks(ray::raylet::SchedulingQueue::TaskQueue &queue,
const std::vector<ray::raylet::Task> &tasks) {
queue.insert(queue.end(), tasks.begin(), tasks.end());
for (const auto &task : tasks) {
queue.AppendTask(task.GetTaskSpecification().TaskId(), task);
}
}
// Helper function to filter out tasks of a given state.
inline void FilterStateFromQueue(const std::list<ray::raylet::Task> &queue,
inline void FilterStateFromQueue(const ray::raylet::SchedulingQueue::TaskQueue &queue,
std::unordered_set<ray::TaskID> &task_ids,
ray::raylet::TaskState filter_state) {
for (auto it = queue.begin(); it != queue.end(); it++) {
auto task_id = task_ids.find(it->GetTaskSpecification().TaskId());
if (task_id != task_ids.end()) {
task_ids.erase(task_id);
for (auto it = task_ids.begin(); it != task_ids.end();) {
if (queue.HasTask(*it)) {
it = task_ids.erase(it);
} else {
it++;
}
}
}
@ -45,28 +45,72 @@ namespace ray {
namespace raylet {
SchedulingQueue::TaskQueue::~TaskQueue() {
task_map_.clear();
task_list_.clear();
}
bool SchedulingQueue::TaskQueue::AppendTask(const TaskID &task_id, const Task &task) {
RAY_CHECK(task_map_.find(task_id) == task_map_.end());
auto list_iterator = task_list_.insert(task_list_.end(), task);
task_map_[task_id] = list_iterator;
return true;
}
bool SchedulingQueue::TaskQueue::RemoveTask(const TaskID &task_id) {
auto task_found_iterator = task_map_.find(task_id);
if (task_found_iterator == task_map_.end()) {
return false;
}
auto list_iterator = task_found_iterator->second;
task_map_.erase(task_found_iterator);
task_list_.erase(list_iterator);
return true;
}
bool SchedulingQueue::TaskQueue::RemoveTask(const TaskID &task_id,
std::vector<Task> &removed_tasks) {
auto task_found_iterator = task_map_.find(task_id);
if (task_found_iterator == task_map_.end()) {
return false;
}
auto list_iterator = task_found_iterator->second;
removed_tasks.push_back(std::move(*list_iterator));
task_map_.erase(task_found_iterator);
task_list_.erase(list_iterator);
return true;
}
bool SchedulingQueue::TaskQueue::HasTask(const TaskID &task_id) const {
return task_map_.find(task_id) != task_map_.end();
}
const std::list<Task> &SchedulingQueue::TaskQueue::GetTasks() const { return task_list_; }
const std::list<Task> &SchedulingQueue::GetMethodsWaitingForActorCreation() const {
return this->methods_waiting_for_actor_creation_;
return this->methods_waiting_for_actor_creation_.GetTasks();
}
const std::list<Task> &SchedulingQueue::GetWaitingTasks() const {
return this->waiting_tasks_;
return this->waiting_tasks_.GetTasks();
}
const std::list<Task> &SchedulingQueue::GetPlaceableTasks() const {
return this->placeable_tasks_;
return this->placeable_tasks_.GetTasks();
}
const std::list<Task> &SchedulingQueue::GetReadyTasks() const {
return this->ready_tasks_;
return this->ready_tasks_.GetTasks();
}
const std::list<Task> &SchedulingQueue::GetRunningTasks() const {
return this->running_tasks_;
return this->running_tasks_.GetTasks();
}
const std::list<Task> &SchedulingQueue::GetBlockedTasks() const {
return this->blocked_tasks_;
return this->blocked_tasks_.GetTasks();
}
void SchedulingQueue::FilterState(std::unordered_set<TaskID> &task_ids,

View file

@ -147,21 +147,67 @@ class SchedulingQueue {
/// \param filter_state The task state to filter out.
void FilterState(std::unordered_set<TaskID> &task_ids, TaskState filter_state) const;
class TaskQueue {
public:
/// Creating a task queue.
TaskQueue() {}
/// Destructor for task queue.
~TaskQueue();
/// \brief Append a task to queue.
///
/// \param task_id The task ID for the task to append.
/// \param task The task to append to the queue.
/// \return Whether the append operation succeeds.
bool AppendTask(const TaskID &task_id, const Task &task);
/// \brief Remove a task from queue.
///
/// \param task_id The task ID for the task to remove from the queue.
/// \return Whether the removal succeeds.
bool RemoveTask(const TaskID &task_id);
/// \brief Remove a task from queue.
///
/// \param task_id The task ID for the task to remove from the queue.
/// \param removed_tasks If the task specified by task_id is successfully
// removed from the queue, the task data is appended to the vector.
/// \return Whether the removal succeeds.
bool RemoveTask(const TaskID &task_id, std::vector<Task> &removed_tasks);
/// \brief Check if the queue contains a specific task id.
///
/// \param task_id The task ID for the task.
/// \return Whether the task_id exists in this queue.
bool HasTask(const TaskID &task_id) const;
/// \brief Remove the task list of the queue.
/// \return A list of tasks contained in this queue.
const std::list<Task> &GetTasks() const;
private:
// A list of tasks.
std::list<Task> task_list_;
// A hash to speed up looking up a task.
std::unordered_map<TaskID, std::list<Task>::iterator> task_map_;
};
private:
/// Tasks that are destined for actors that have not yet been created.
std::list<Task> methods_waiting_for_actor_creation_;
TaskQueue methods_waiting_for_actor_creation_;
/// Tasks that are waiting for an object dependency to appear locally.
std::list<Task> waiting_tasks_;
TaskQueue waiting_tasks_;
/// Tasks whose object dependencies are locally available, but that are
/// waiting to be scheduled.
std::list<Task> placeable_tasks_;
TaskQueue placeable_tasks_;
/// Tasks ready for dispatch, but that are waiting for a worker.
std::list<Task> ready_tasks_;
TaskQueue ready_tasks_;
/// Tasks that are running on a worker.
std::list<Task> running_tasks_;
TaskQueue running_tasks_;
/// Tasks that were dispatched to a worker but are blocked on a data
/// dependency that was missing at runtime.
std::list<Task> blocked_tasks_;
TaskQueue blocked_tasks_;
/// The set of currently running driver tasks. These are empty tasks that are
/// started by a driver process on initialization.
std::unordered_set<TaskID> driver_task_ids_;