diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 7c69bee60..689d0c6f2 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -24,8 +24,8 @@ #include "ray/stats/metric.h" #include "ray/util/util.h" -DEFINE_stats(grpc_server_req_latency_ms, "Request latency in grpc server", ("Method"), (), - ray::stats::GAUGE); +DEFINE_stats(grpc_server_req_process_time_ms, "Request latency in grpc server", + ("Method"), (), ray::stats::GAUGE); DEFINE_stats(grpc_server_req_new, "New request number in grpc server", ("Method"), (), ray::stats::COUNT); DEFINE_stats(grpc_server_req_handling, "Request number are handling in grpc server", diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 9e2d50e83..ed57f3cc0 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -24,7 +24,7 @@ #include "ray/common/status.h" #include "ray/stats/metric.h" -DECLARE_stats(grpc_server_req_latency_ms); +DECLARE_stats(grpc_server_req_process_time_ms); DECLARE_stats(grpc_server_req_new); DECLARE_stats(grpc_server_req_handling); DECLARE_stats(grpc_server_req_finished); @@ -145,26 +145,22 @@ class ServerCallImpl : public ServerCall { handle_request_function_(handle_request_function), response_writer_(&context_), io_service_(io_service), - call_name_(std::move(call_name)) { + call_name_(std::move(call_name)), + start_time_(0) { reply_ = google::protobuf::Arena::CreateMessage(&arena_); // TODO call_name_ sometimes get corrunpted due to memory issues. RAY_CHECK(!call_name_.empty()) << "Call name is empty"; STATS_grpc_server_req_new.Record(1.0, call_name_); - start_time_ = absl::GetCurrentTimeNanos(); } - ~ServerCallImpl() override { - STATS_grpc_server_req_finished.Record(1.0, call_name_); - auto end_time = absl::GetCurrentTimeNanos(); - STATS_grpc_server_req_latency_ms.Record((end_time - start_time_) / 1000000, - call_name_); - } + ~ServerCallImpl() override = default; ServerCallState GetState() const override { return state_; } void SetState(const ServerCallState &new_state) override { state_ = new_state; } void HandleRequest() override { + start_time_ = absl::GetCurrentTimeNanos(); STATS_grpc_server_req_handling.Record(1.0, call_name_); if (!io_service_.stopped()) { io_service_.post([this] { HandleRequestImpl(); }, call_name_); @@ -205,22 +201,32 @@ class ServerCallImpl : public ServerCall { } void OnReplySent() override { + STATS_grpc_server_req_finished.Record(1.0, call_name_); if (send_reply_success_callback_ && !io_service_.stopped()) { auto callback = std::move(send_reply_success_callback_); io_service_.post([callback]() { callback(); }, call_name_ + ".success_callback"); } + LogProcessTime(); } void OnReplyFailed() override { + STATS_grpc_server_req_finished.Record(1.0, call_name_); if (send_reply_failure_callback_ && !io_service_.stopped()) { auto callback = std::move(send_reply_failure_callback_); io_service_.post([callback]() { callback(); }, call_name_ + ".failure_callback"); } + LogProcessTime(); } const ServerCallFactory &GetServerCallFactory() override { return factory_; } private: + /// Log the duration this query used + void LogProcessTime() { + auto end_time = absl::GetCurrentTimeNanos(); + STATS_grpc_server_req_process_time_ms.Record((end_time - start_time_) / 1000000.0, + call_name_); + } /// Tell gRPC to finish this request and send reply asynchronously. void SendReply(const Status &status) { state_ = ServerCallState::SENDING_REPLY;