[Threaded actor] Fix threaded actor race condition (#19751)

This commit is contained in:
SangBin Cho 2021-10-27 07:17:53 +09:00 committed by GitHub
parent 2652ae7905
commit 3e81506d90
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 36 deletions

View file

@ -85,6 +85,7 @@ void CoreWorkerProcess::Initialize(const CoreWorkerOptions &options) {
}
void CoreWorkerProcess::Shutdown() {
RAY_LOG(DEBUG) << "Shutdown. Core worker process will be deleted";
if (!core_worker_process) {
return;
}
@ -291,7 +292,9 @@ CoreWorker &CoreWorkerProcess::GetCoreWorker() {
void CoreWorkerProcess::SetCurrentThreadWorkerId(const WorkerID &worker_id) {
EnsureInitialized();
if (core_worker_process->options_.num_workers == 1) {
RAY_CHECK(core_worker_process->GetGlobalWorker()->GetWorkerID() == worker_id);
auto global_worker = core_worker_process->GetGlobalWorker();
RAY_CHECK(global_worker) << "Global worker must not be NULL.";
RAY_CHECK(global_worker->GetWorkerID() == worker_id);
return;
}
current_core_worker_ = core_worker_process->GetWorker(worker_id);
@ -354,6 +357,7 @@ void CoreWorkerProcess::RunTaskExecutionLoop() {
worker = core_worker_process->CreateWorker();
}
worker->RunTaskExecutionLoop();
RAY_LOG(DEBUG) << "Task execution loop terminated. Removing the global worker.";
core_worker_process->RemoveWorker(worker);
} else {
std::vector<std::thread> worker_threads;
@ -362,6 +366,8 @@ void CoreWorkerProcess::RunTaskExecutionLoop() {
SetThreadName("worker.task" + std::to_string(i));
auto worker = core_worker_process->CreateWorker();
worker->RunTaskExecutionLoop();
RAY_LOG(INFO) << "Task execution loop terminated for a thread "
<< std::to_string(i) << ". Removing a worker.";
core_worker_process->RemoveWorker(worker);
});
}
@ -768,6 +774,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
void CoreWorker::Shutdown() {
io_service_.stop();
if (options_.worker_type == WorkerType::WORKER) {
direct_task_receiver_->Stop();
task_execution_service_.stop();
}
if (options_.on_worker_shutdown) {

View file

@ -636,5 +636,11 @@ void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(bool is_asyncio,
fiber_max_concurrency_ = fiber_max_concurrency;
}
void CoreWorkerDirectTaskReceiver::Stop() {
for (const auto &it : actor_scheduling_queues_) {
it.second->Stop();
}
}
} // namespace core
} // namespace ray

View file

@ -346,7 +346,45 @@ class FiberStateManager final {
std::shared_ptr<FiberState> default_fiber_ = nullptr;
};
class BoundedExecutor;
/// Wraps a thread-pool to block posts until the pool has free slots. This is used
/// by the SchedulingQueue to provide backpressure to clients.
class BoundedExecutor {
public:
BoundedExecutor(int max_concurrency)
: num_running_(0), max_concurrency_(max_concurrency), pool_(max_concurrency){};
/// Posts work to the pool, blocking if no free threads are available.
void PostBlocking(std::function<void()> fn) {
mu_.LockWhen(absl::Condition(this, &BoundedExecutor::ThreadsAvailable));
num_running_ += 1;
mu_.Unlock();
boost::asio::post(pool_, [this, fn]() {
fn();
absl::MutexLock lock(&mu_);
num_running_ -= 1;
});
}
/// Stop the thread pool.
void Stop() { pool_.stop(); }
/// Join the thread pool.
void Join() { pool_.join(); }
private:
bool ThreadsAvailable() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return num_running_ < max_concurrency_;
}
/// Protects access to the counters below.
absl::Mutex mu_;
/// The number of currently running tasks.
int num_running_ GUARDED_BY(mu_);
/// The max number of concurrently running tasks allowed.
const int max_concurrency_;
/// The underlying thread pool for running tasks.
boost::asio::thread_pool pool_;
};
/// A manager that manages a set of thread pool. which will perform
/// the methods defined in one concurrency group.
@ -390,6 +428,26 @@ class PoolManager final {
return default_thread_pool_;
}
/// Stop and join the thread pools that the pool manager owns.
void Stop() {
if (default_thread_pool_) {
RAY_LOG(DEBUG) << "Default pool is stopping.";
default_thread_pool_->Stop();
RAY_LOG(INFO) << "Default pool is joining. If the 'Default pool is joined.' "
"message is not printed after this, the worker is probably "
"hanging because the actor task is running an infinite loop.";
default_thread_pool_->Join();
RAY_LOG(INFO) << "Default pool is joined.";
}
for (const auto &it : name_to_thread_pool_index_) {
it.second->Stop();
}
for (const auto &it : name_to_thread_pool_index_) {
it.second->Join();
}
}
private:
// Map from the name to their corresponding thread pools.
std::unordered_map<std::string, std::shared_ptr<BoundedExecutor>>
@ -489,40 +547,6 @@ class DependencyWaiterImpl : public DependencyWaiter {
DependencyWaiterInterface &dependency_client_;
};
/// Wraps a thread-pool to block posts until the pool has free slots. This is used
/// by the SchedulingQueue to provide backpressure to clients.
class BoundedExecutor {
public:
BoundedExecutor(int max_concurrency)
: num_running_(0), max_concurrency_(max_concurrency), pool_(max_concurrency){};
/// Posts work to the pool, blocking if no free threads are available.
void PostBlocking(std::function<void()> fn) {
mu_.LockWhen(absl::Condition(this, &BoundedExecutor::ThreadsAvailable));
num_running_ += 1;
mu_.Unlock();
boost::asio::post(pool_, [this, fn]() {
fn();
absl::MutexLock lock(&mu_);
num_running_ -= 1;
});
}
private:
bool ThreadsAvailable() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return num_running_ < max_concurrency_;
}
/// Protects access to the counters below.
absl::Mutex mu_;
/// The number of currently running tasks.
int num_running_ GUARDED_BY(mu_);
/// The max number of concurrently running tasks allowed.
const int max_concurrency_;
/// The underlying thread pool for running tasks.
boost::asio::thread_pool pool_;
};
/// Used to implement task queueing at the worker. Abstraction to provide a common
/// interface for actor tasks as well as normal ones.
class SchedulingQueue {
@ -540,6 +564,7 @@ class SchedulingQueue {
virtual bool TaskQueueEmpty() const = 0;
virtual size_t Size() const = 0;
virtual size_t Steal(rpc::StealTasksReply *reply) = 0;
virtual void Stop() = 0;
virtual bool CancelTaskIfFound(TaskID task_id) = 0;
virtual ~SchedulingQueue(){};
};
@ -576,6 +601,12 @@ class ActorSchedulingQueue : public SchedulingQueue {
virtual ~ActorSchedulingQueue() = default;
void Stop() {
if (pool_manager_) {
pool_manager_->Stop();
}
}
bool TaskQueueEmpty() const {
RAY_CHECK(false) << "TaskQueueEmpty() not implemented for actor queues";
// The return instruction will never be executed, but we need to include it
@ -747,6 +778,10 @@ class NormalSchedulingQueue : public SchedulingQueue {
public:
NormalSchedulingQueue(){};
void Stop() {
// No-op
}
bool TaskQueueEmpty() const {
absl::MutexLock lock(&mu_);
return pending_normal_tasks_.empty();
@ -896,6 +931,8 @@ class CoreWorkerDirectTaskReceiver {
bool CancelQueuedNormalTask(TaskID task_id);
void Stop();
protected:
/// Cache the concurrency groups of actors.
absl::flat_hash_map<ActorID, std::vector<ConcurrencyGroup>> concurrency_groups_cache_;