[Core]Add metrics: worker_register_time_ms (#20472)

Recently I am testing some benchmark about worker registering with running worker in container. Current the Ray core has `process_startup_time_ms` metrics which is about process fork time.

This PR try to add metrics about the duration of worker registering.
This commit is contained in:
chenk008 2021-12-09 21:25:49 +08:00 committed by GitHub
parent 63db0e3a7c
commit 8bb9bfe632
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 3 deletions

View file

@ -29,6 +29,9 @@
#include "ray/util/logging.h"
#include "ray/util/util.h"
DEFINE_stats(worker_register_time_ms, "end to end latency of register a worker process.",
(), ({1, 10, 100, 1000, 10000}, ), ray::stats::HISTOGRAM);
namespace {
// A helper function to get a worker from a list.
@ -417,7 +420,8 @@ Process WorkerPool::StartWorkerProcess(
worker_type);
state.starting_worker_processes.emplace(
worker_startup_token_counter_,
StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type, proc});
StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type, proc,
start});
update_worker_startup_token_counter();
if (IsIOWorkerType(worker_type)) {
auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state);
@ -588,7 +592,8 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
std::function<void(Status, int)> send_reply_callback) {
RAY_CHECK(worker);
auto &state = GetStateForLanguage(worker->GetLanguage());
if (state.starting_worker_processes.count(worker_startup_token) == 0) {
auto it = state.starting_worker_processes.find(worker_startup_token);
if (it == state.starting_worker_processes.end()) {
RAY_LOG(WARNING)
<< "Received a register request from an unknown worker shim process: "
<< worker_shim_pid;
@ -609,8 +614,13 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
send_reply_callback(status, /*port=*/0);
return status;
}
auto &starting_process_info = it->second;
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end - starting_process_info.start_time);
STATS_worker_register_time_ms.Record(duration.count());
RAY_LOG(DEBUG) << "Registering worker " << worker->WorkerId() << " with pid " << pid
<< ", port: " << port
<< ", port: " << port << ", register cost: " << duration.count()
<< ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType());
worker->SetAssignedPort(port);

View file

@ -461,6 +461,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
rpc::WorkerType worker_type;
/// The worker process instance.
Process proc;
/// The worker process start time.
std::chrono::high_resolution_clock::time_point start_time;
};
struct TaskWaitingForWorkerInfo {