diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 02389b040..2332c36ce 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -29,6 +29,9 @@ #include "ray/util/logging.h" #include "ray/util/util.h" +DEFINE_stats(worker_register_time_ms, "end to end latency of register a worker process.", + (), ({1, 10, 100, 1000, 10000}, ), ray::stats::HISTOGRAM); + namespace { // A helper function to get a worker from a list. @@ -417,7 +420,8 @@ Process WorkerPool::StartWorkerProcess( worker_type); state.starting_worker_processes.emplace( worker_startup_token_counter_, - StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type, proc}); + StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type, proc, + start}); update_worker_startup_token_counter(); if (IsIOWorkerType(worker_type)) { auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state); @@ -588,7 +592,8 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker std::function send_reply_callback) { RAY_CHECK(worker); auto &state = GetStateForLanguage(worker->GetLanguage()); - if (state.starting_worker_processes.count(worker_startup_token) == 0) { + auto it = state.starting_worker_processes.find(worker_startup_token); + if (it == state.starting_worker_processes.end()) { RAY_LOG(WARNING) << "Received a register request from an unknown worker shim process: " << worker_shim_pid; @@ -609,8 +614,13 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker send_reply_callback(status, /*port=*/0); return status; } + auto &starting_process_info = it->second; + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast( + end - starting_process_info.start_time); + STATS_worker_register_time_ms.Record(duration.count()); RAY_LOG(DEBUG) << "Registering worker " << worker->WorkerId() << " with pid " << pid - << ", port: " << port + << ", port: " << port << ", register cost: " << duration.count() << ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType()); worker->SetAssignedPort(port); diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 666058bd1..e46060d0c 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -461,6 +461,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { rpc::WorkerType worker_type; /// The worker process instance. Process proc; + /// The worker process start time. + std::chrono::high_resolution_clock::time_point start_time; }; struct TaskWaitingForWorkerInfo {