mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Remove uses of std::list::size (#3358)
* worker pool and client conn * Fix linting * unordered set * move
This commit is contained in:
parent
c24d87b4d1
commit
686cf20951
4 changed files with 15 additions and 22 deletions
|
@ -112,7 +112,7 @@ void ServerConnection<T>::WriteMessageAsync(
|
|||
|
||||
auto size = async_write_queue_.size();
|
||||
auto size_is_power_of_two = (size & (size - 1)) == 0;
|
||||
if (size > 100 && size_is_power_of_two) {
|
||||
if (size > 1000 && size_is_power_of_two) {
|
||||
RAY_LOG(WARNING) << "ServerConnection has " << size << " buffered async writes";
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
#ifndef RAY_COMMON_CLIENT_CONNECTION_H
|
||||
#define RAY_COMMON_CLIENT_CONNECTION_H
|
||||
|
||||
#include <list>
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
|
@ -94,7 +94,7 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection<T>
|
|||
const int async_write_max_messages_;
|
||||
|
||||
/// List of pending messages to write.
|
||||
std::list<std::unique_ptr<AsyncWriteBuffer>> async_write_queue_;
|
||||
std::deque<std::unique_ptr<AsyncWriteBuffer>> async_write_queue_;
|
||||
|
||||
/// Whether we are in the middle of an async write.
|
||||
bool async_write_in_flight_;
|
||||
|
|
|
@ -12,7 +12,7 @@ namespace {
|
|||
|
||||
// A helper function to get a worker from a list.
|
||||
std::shared_ptr<ray::raylet::Worker> GetWorker(
|
||||
const std::list<std::shared_ptr<ray::raylet::Worker>> &worker_pool,
|
||||
const std::unordered_set<std::shared_ptr<ray::raylet::Worker>> &worker_pool,
|
||||
const std::shared_ptr<ray::LocalClientConnection> &connection) {
|
||||
for (auto it = worker_pool.begin(); it != worker_pool.end(); it++) {
|
||||
if ((*it)->Connection() == connection) {
|
||||
|
@ -24,15 +24,9 @@ std::shared_ptr<ray::raylet::Worker> GetWorker(
|
|||
|
||||
// A helper function to remove a worker from a list. Returns true if the worker
|
||||
// was found and removed.
|
||||
bool RemoveWorker(std::list<std::shared_ptr<ray::raylet::Worker>> &worker_pool,
|
||||
bool RemoveWorker(std::unordered_set<std::shared_ptr<ray::raylet::Worker>> &worker_pool,
|
||||
const std::shared_ptr<ray::raylet::Worker> &worker) {
|
||||
for (auto it = worker_pool.begin(); it != worker_pool.end(); it++) {
|
||||
if (*it == worker) {
|
||||
worker_pool.erase(it);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return worker_pool.erase(worker) > 0;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -152,7 +146,7 @@ void WorkerPool::RegisterWorker(std::shared_ptr<Worker> worker) {
|
|||
auto pid = worker->Pid();
|
||||
RAY_LOG(DEBUG) << "Registering worker with pid " << pid;
|
||||
auto &state = GetStateForLanguage(worker->GetLanguage());
|
||||
state.registered_workers.push_back(std::move(worker));
|
||||
state.registered_workers.insert(std::move(worker));
|
||||
|
||||
auto it = starting_worker_processes_.find(pid);
|
||||
RAY_CHECK(it != starting_worker_processes_.end());
|
||||
|
@ -165,7 +159,7 @@ void WorkerPool::RegisterWorker(std::shared_ptr<Worker> worker) {
|
|||
void WorkerPool::RegisterDriver(std::shared_ptr<Worker> driver) {
|
||||
RAY_CHECK(!driver->GetAssignedTaskId().is_nil());
|
||||
auto &state = GetStateForLanguage(driver->GetLanguage());
|
||||
state.registered_drivers.push_back(driver);
|
||||
state.registered_drivers.insert(std::move(driver));
|
||||
}
|
||||
|
||||
std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker(
|
||||
|
@ -197,7 +191,7 @@ void WorkerPool::PushWorker(std::shared_ptr<Worker> worker) {
|
|||
auto &state = GetStateForLanguage(worker->GetLanguage());
|
||||
// Add the worker to the idle pool.
|
||||
if (worker->GetActorId().is_nil()) {
|
||||
state.idle.push_back(std::move(worker));
|
||||
state.idle.insert(std::move(worker));
|
||||
} else {
|
||||
state.idle_actor[worker->GetActorId()] = std::move(worker);
|
||||
}
|
||||
|
@ -209,8 +203,8 @@ std::shared_ptr<Worker> WorkerPool::PopWorker(const TaskSpecification &task_spec
|
|||
std::shared_ptr<Worker> worker = nullptr;
|
||||
if (actor_id.is_nil()) {
|
||||
if (!state.idle.empty()) {
|
||||
worker = std::move(state.idle.back());
|
||||
state.idle.pop_back();
|
||||
worker = std::move(*state.idle.begin());
|
||||
state.idle.erase(state.idle.begin());
|
||||
}
|
||||
} else {
|
||||
auto actor_entry = state.idle_actor.find(actor_id);
|
||||
|
|
|
@ -2,9 +2,9 @@
|
|||
#define RAY_RAYLET_WORKER_POOL_H
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <list>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
#include "ray/common/client_connection.h"
|
||||
#include "ray/gcs/format/util.h"
|
||||
|
@ -136,15 +136,14 @@ class WorkerPool {
|
|||
/// The commands and arguments used to start the worker process
|
||||
std::vector<std::string> worker_command;
|
||||
/// The pool of idle non-actor workers.
|
||||
std::list<std::shared_ptr<Worker>> idle;
|
||||
std::unordered_set<std::shared_ptr<Worker>> idle;
|
||||
/// The pool of idle actor workers.
|
||||
std::unordered_map<ActorID, std::shared_ptr<Worker>> idle_actor;
|
||||
/// All workers that have registered and are still connected, including both
|
||||
/// idle and executing.
|
||||
// TODO(swang): Make this a map to make GetRegisteredWorker faster.
|
||||
std::list<std::shared_ptr<Worker>> registered_workers;
|
||||
std::unordered_set<std::shared_ptr<Worker>> registered_workers;
|
||||
/// All drivers that have registered and are still connected.
|
||||
std::list<std::shared_ptr<Worker>> registered_drivers;
|
||||
std::unordered_set<std::shared_ptr<Worker>> registered_drivers;
|
||||
};
|
||||
|
||||
/// A helper function that returns the reference of the pool state
|
||||
|
|
Loading…
Add table
Reference in a new issue