[Core] add thread name to help performance profiling (#13506)

This commit is contained in:
ZhuSenlin 2021-01-20 20:34:28 +08:00 committed by GitHub
parent 6c23bef2a7
commit 2e7c2b774f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 25 additions and 5 deletions

View file

@ -270,7 +270,8 @@ void CoreWorkerProcess::RunTaskExecutionLoop() {
} else {
std::vector<std::thread> worker_threads;
for (int i = 0; i < instance_->options_.num_workers; i++) {
worker_threads.emplace_back([]() {
worker_threads.emplace_back([i] {
SetThreadName("worker.task" + std::to_string(i));
auto worker = instance_->CreateWorker();
worker->RunTaskExecutionLoop();
instance_->RemoveWorker(worker);
@ -649,7 +650,7 @@ void CoreWorker::RunIOService() {
sigaddset(&mask, SIGTERM);
pthread_sigmask(SIG_BLOCK, &mask, NULL);
#endif
SetThreadName("worker.io");
io_service_.run();
}

View file

@ -38,6 +38,7 @@ GlobalStateAccessor::GlobalStateAccessor(const std::string &redis_address,
std::promise<bool> promise;
thread_io_service_.reset(new std::thread([this, &promise] {
SetThreadName("global.accessor");
std::unique_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(*io_service_));
promise.set_value(true);

View file

@ -28,6 +28,7 @@ GcsHeartbeatManager::GcsHeartbeatManager(
num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()),
detect_timer_(io_service) {
io_service_thread_.reset(new std::thread([this] {
SetThreadName("heartbeat");
/// The asio work to keep io_service_ alive.
boost::asio::io_service::work io_service_work_(io_service_);
io_service_.run();

View file

@ -128,12 +128,15 @@ bool ObjectManager::IsPlasmaObjectSpillable(const ObjectID &object_id) {
return false;
}
void ObjectManager::RunRpcService() { rpc_service_.run(); }
void ObjectManager::RunRpcService(int index) {
SetThreadName("rpc.obj.mgr." + std::to_string(index));
rpc_service_.run();
}
void ObjectManager::StartRpcService() {
rpc_threads_.resize(config_.rpc_service_threads_number);
for (int i = 0; i < config_.rpc_service_threads_number; i++) {
rpc_threads_[i] = std::thread(&ObjectManager::RunRpcService, this);
rpc_threads_[i] = std::thread(&ObjectManager::RunRpcService, this, i);
}
object_manager_server_.RegisterService(object_manager_service_);
object_manager_server_.Run();

View file

@ -355,7 +355,7 @@ class ObjectManager : public ObjectManagerInterface,
/// Handle starting, running, and stopping asio rpc_service.
void StartRpcService();
void RunRpcService();
void RunRpcService(int index);
void StopRpcService();
/// Handle an object being added to this node. This adds the object to the

View file

@ -75,6 +75,7 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_mem
void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback,
std::function<void()> object_store_full_callback) {
SetThreadName("store.io");
RAY_LOG(DEBUG) << "starting server listening on " << socket_name_;
{
absl::MutexLock lock(&store_runner_mutex_);

View file

@ -72,6 +72,7 @@ void AgentManager::StartAgent() {
}
std::thread monitor_thread([this, child]() mutable {
SetThreadName("agent.monitor");
RAY_LOG(INFO) << "Monitor agent process with pid " << child.GetId()
<< ", register timeout "
<< RayConfig::instance().agent_register_timeout_ms() << "ms.";

View file

@ -21,6 +21,7 @@
#include "absl/synchronization/mutex.h"
#include "ray/common/grpc_util.h"
#include "ray/common/status.h"
#include "ray/util/util.h"
namespace ray {
namespace rpc {
@ -226,6 +227,7 @@ class ClientCallManager {
/// `CompletionQueue`, and dispatches the event to the callbacks via the `ClientCall`
/// objects.
void PollEventsFromCompletionQueue(int index) {
SetThreadName("client.poll" + std::to_string(index));
void *got_tag;
bool ok = false;
// Keep reading events from the `CompletionQueue` until it's shutdown.

View file

@ -19,6 +19,7 @@
#include <boost/asio/detail/socket_holder.hpp>
#include "ray/common/ray_config.h"
#include "ray/util/util.h"
namespace ray {
namespace rpc {
@ -101,6 +102,7 @@ void GrpcServer::RegisterService(GrpcService &service) {
}
void GrpcServer::PollEventsFromCompletionQueue(int index) {
SetThreadName("server.poll" + std::to_string(index));
void *tag;
bool ok;

View file

@ -194,3 +194,11 @@ void FillRandom(T *data) {
(*data)[i] = static_cast<uint8_t>(dist(generator));
}
}
inline void SetThreadName(const std::string &thread_name) {
#if defined(__APPLE__)
pthread_setname_np(thread_name.c_str());
#elif defined(__linux__)
pthread_setname_np(pthread_self(), thread_name.substr(0, 15).c_str());
#endif
}