From 2e7c2b774f0397cecda52f777dbb550001e545af Mon Sep 17 00:00:00 2001 From: ZhuSenlin Date: Wed, 20 Jan 2021 20:34:28 +0800 Subject: [PATCH] [Core] add thread name to help performance profiling (#13506) --- src/ray/core_worker/core_worker.cc | 5 +++-- src/ray/gcs/gcs_client/global_state_accessor.cc | 1 + src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc | 1 + src/ray/object_manager/object_manager.cc | 7 +++++-- src/ray/object_manager/object_manager.h | 2 +- src/ray/object_manager/plasma/store_runner.cc | 1 + src/ray/raylet/agent_manager.cc | 1 + src/ray/rpc/client_call.h | 2 ++ src/ray/rpc/grpc_server.cc | 2 ++ src/ray/util/util.h | 8 ++++++++ 10 files changed, 25 insertions(+), 5 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ea5c8b4f7..21fc462a7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -270,7 +270,8 @@ void CoreWorkerProcess::RunTaskExecutionLoop() { } else { std::vector worker_threads; for (int i = 0; i < instance_->options_.num_workers; i++) { - worker_threads.emplace_back([]() { + worker_threads.emplace_back([i] { + SetThreadName("worker.task" + std::to_string(i)); auto worker = instance_->CreateWorker(); worker->RunTaskExecutionLoop(); instance_->RemoveWorker(worker); @@ -649,7 +650,7 @@ void CoreWorker::RunIOService() { sigaddset(&mask, SIGTERM); pthread_sigmask(SIG_BLOCK, &mask, NULL); #endif - + SetThreadName("worker.io"); io_service_.run(); } diff --git a/src/ray/gcs/gcs_client/global_state_accessor.cc b/src/ray/gcs/gcs_client/global_state_accessor.cc index 947c16b96..4e9a6fa18 100644 --- a/src/ray/gcs/gcs_client/global_state_accessor.cc +++ b/src/ray/gcs/gcs_client/global_state_accessor.cc @@ -38,6 +38,7 @@ GlobalStateAccessor::GlobalStateAccessor(const std::string &redis_address, std::promise promise; thread_io_service_.reset(new std::thread([this, &promise] { + SetThreadName("global.accessor"); std::unique_ptr work( new boost::asio::io_service::work(*io_service_)); promise.set_value(true); diff --git a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc index 64806982b..b6dd56945 100644 --- a/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_heartbeat_manager.cc @@ -28,6 +28,7 @@ GcsHeartbeatManager::GcsHeartbeatManager( num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), detect_timer_(io_service) { io_service_thread_.reset(new std::thread([this] { + SetThreadName("heartbeat"); /// The asio work to keep io_service_ alive. boost::asio::io_service::work io_service_work_(io_service_); io_service_.run(); diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index b69085682..d82a5fb0d 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -128,12 +128,15 @@ bool ObjectManager::IsPlasmaObjectSpillable(const ObjectID &object_id) { return false; } -void ObjectManager::RunRpcService() { rpc_service_.run(); } +void ObjectManager::RunRpcService(int index) { + SetThreadName("rpc.obj.mgr." + std::to_string(index)); + rpc_service_.run(); +} void ObjectManager::StartRpcService() { rpc_threads_.resize(config_.rpc_service_threads_number); for (int i = 0; i < config_.rpc_service_threads_number; i++) { - rpc_threads_[i] = std::thread(&ObjectManager::RunRpcService, this); + rpc_threads_[i] = std::thread(&ObjectManager::RunRpcService, this, i); } object_manager_server_.RegisterService(object_manager_service_); object_manager_server_.Run(); diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index a7970dc82..a114f16bc 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -355,7 +355,7 @@ class ObjectManager : public ObjectManagerInterface, /// Handle starting, running, and stopping asio rpc_service. void StartRpcService(); - void RunRpcService(); + void RunRpcService(int index); void StopRpcService(); /// Handle an object being added to this node. This adds the object to the diff --git a/src/ray/object_manager/plasma/store_runner.cc b/src/ray/object_manager/plasma/store_runner.cc index 723213bac..34e08080c 100644 --- a/src/ray/object_manager/plasma/store_runner.cc +++ b/src/ray/object_manager/plasma/store_runner.cc @@ -75,6 +75,7 @@ PlasmaStoreRunner::PlasmaStoreRunner(std::string socket_name, int64_t system_mem void PlasmaStoreRunner::Start(ray::SpillObjectsCallback spill_objects_callback, std::function object_store_full_callback) { + SetThreadName("store.io"); RAY_LOG(DEBUG) << "starting server listening on " << socket_name_; { absl::MutexLock lock(&store_runner_mutex_); diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index 7445c7034..538f8909e 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -72,6 +72,7 @@ void AgentManager::StartAgent() { } std::thread monitor_thread([this, child]() mutable { + SetThreadName("agent.monitor"); RAY_LOG(INFO) << "Monitor agent process with pid " << child.GetId() << ", register timeout " << RayConfig::instance().agent_register_timeout_ms() << "ms."; diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index 9e834e160..aea10b348 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -21,6 +21,7 @@ #include "absl/synchronization/mutex.h" #include "ray/common/grpc_util.h" #include "ray/common/status.h" +#include "ray/util/util.h" namespace ray { namespace rpc { @@ -226,6 +227,7 @@ class ClientCallManager { /// `CompletionQueue`, and dispatches the event to the callbacks via the `ClientCall` /// objects. void PollEventsFromCompletionQueue(int index) { + SetThreadName("client.poll" + std::to_string(index)); void *got_tag; bool ok = false; // Keep reading events from the `CompletionQueue` until it's shutdown. diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index cdf676ffb..a932cefa7 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -19,6 +19,7 @@ #include #include "ray/common/ray_config.h" +#include "ray/util/util.h" namespace ray { namespace rpc { @@ -101,6 +102,7 @@ void GrpcServer::RegisterService(GrpcService &service) { } void GrpcServer::PollEventsFromCompletionQueue(int index) { + SetThreadName("server.poll" + std::to_string(index)); void *tag; bool ok; diff --git a/src/ray/util/util.h b/src/ray/util/util.h index 763119861..4c7c59471 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -194,3 +194,11 @@ void FillRandom(T *data) { (*data)[i] = static_cast(dist(generator)); } } + +inline void SetThreadName(const std::string &thread_name) { +#if defined(__APPLE__) + pthread_setname_np(thread_name.c_str()); +#elif defined(__linux__) + pthread_setname_np(pthread_self(), thread_name.substr(0, 15).c_str()); +#endif +}