Add worker info handler to gcs service (#6798)

* add worker info handler

* rebase master

* add log

* remove unused variable

* fix code style
This commit is contained in:
fangfengbin 2020-01-16 22:35:00 +08:00 committed by Qing Wang
parent 05674c219f
commit e5ad4e6f8d
8 changed files with 154 additions and 0 deletions

View file

@ -6,6 +6,7 @@
#include "object_info_handler_impl.h"
#include "stats_handler_impl.h"
#include "task_info_handler_impl.h"
#include "worker_info_handler_impl.h"
namespace ray {
namespace gcs {
@ -55,6 +56,11 @@ void GcsServer::Start() {
new rpc::ErrorInfoGrpcService(main_service_, *error_info_handler_));
rpc_server_.RegisterService(*error_info_service_);
worker_info_handler_ = InitWorkerInfoHandler();
worker_info_service_.reset(
new rpc::WorkerInfoGrpcService(main_service_, *worker_info_handler_));
rpc_server_.RegisterService(*worker_info_service_);
// Run rpc server.
rpc_server_.Run();
@ -116,5 +122,10 @@ std::unique_ptr<rpc::ErrorInfoHandler> GcsServer::InitErrorInfoHandler() {
new rpc::DefaultErrorInfoHandler(*redis_gcs_client_));
}
std::unique_ptr<rpc::WorkerInfoHandler> GcsServer::InitWorkerInfoHandler() {
return std::unique_ptr<rpc::DefaultWorkerInfoHandler>(
new rpc::DefaultWorkerInfoHandler(*redis_gcs_client_));
}
} // namespace gcs
} // namespace ray

View file

@ -65,6 +65,9 @@ class GcsServer {
/// The error info handler
virtual std::unique_ptr<rpc::ErrorInfoHandler> InitErrorInfoHandler();
/// The worker info handler
virtual std::unique_ptr<rpc::WorkerInfoHandler> InitWorkerInfoHandler();
private:
/// Gcs server configuration
GcsServerConfig config_;
@ -93,6 +96,9 @@ class GcsServer {
/// Error info handler and service
std::unique_ptr<rpc::ErrorInfoHandler> error_info_handler_;
std::unique_ptr<rpc::ErrorInfoGrpcService> error_info_service_;
/// Worker info handler and service
std::unique_ptr<rpc::WorkerInfoHandler> worker_info_handler_;
std::unique_ptr<rpc::WorkerInfoGrpcService> worker_info_service_;
/// Backend client
std::shared_ptr<RedisGcsClient> redis_gcs_client_;
};

View file

@ -365,6 +365,17 @@ class GcsServerTest : public RedisServiceManagerForTest {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool ReportWorkerFailure(const rpc::ReportWorkerFailureRequest &request) {
std::promise<bool> promise;
client_->ReportWorkerFailure(
request,
[&promise](const Status &status, const rpc::ReportWorkerFailureReply &reply) {
RAY_CHECK_OK(status);
promise.set_value(true);
});
return WaitReady(promise.get_future(), timeout_ms_);
}
bool WaitReady(const std::future<bool> &future, uint64_t timeout_ms) {
auto status = future.wait_for(std::chrono::milliseconds(timeout_ms));
return status == std::future_status::ready;
@ -633,6 +644,15 @@ TEST_F(GcsServerTest, TestErrorInfo) {
ASSERT_TRUE(ReportJobError(report_error_request));
}
TEST_F(GcsServerTest, TestWorkerInfo) {
rpc::WorkerFailureData worker_failure_data;
worker_failure_data.mutable_worker_address()->set_ip_address("127.0.0.1");
worker_failure_data.mutable_worker_address()->set_port(5566);
rpc::ReportWorkerFailureRequest report_worker_failure_request;
report_worker_failure_request.mutable_worker_failure()->CopyFrom(worker_failure_data);
ASSERT_TRUE(ReportWorkerFailure(report_worker_failure_request));
}
} // namespace ray
int main(int argc, char **argv) {

View file

@ -0,0 +1,30 @@
#include "worker_info_handler_impl.h"
namespace ray {
namespace rpc {
void DefaultWorkerInfoHandler::HandleReportWorkerFailure(
const ReportWorkerFailureRequest &request, ReportWorkerFailureReply *reply,
SendReplyCallback send_reply_callback) {
Address worker_address = request.worker_failure().worker_address();
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString();
auto worker_failure_data = std::make_shared<WorkerFailureData>();
worker_failure_data->CopyFrom(request.worker_failure());
auto on_done = [worker_address, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to report worker failure, "
<< worker_address.DebugString();
}
send_reply_callback(status, nullptr, nullptr);
};
Status status =
gcs_client_.Workers().AsyncReportWorkerFailure(worker_failure_data, on_done);
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished reporting worker failure, " << worker_address.DebugString();
}
} // namespace rpc
} // namespace ray

View file

@ -0,0 +1,27 @@
#ifndef RAY_GCS_WORKER_INFO_HANDLER_IMPL_H
#define RAY_GCS_WORKER_INFO_HANDLER_IMPL_H
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
namespace ray {
namespace rpc {
/// This implementation class of `WorkerInfoHandler`.
class DefaultWorkerInfoHandler : public rpc::WorkerInfoHandler {
public:
explicit DefaultWorkerInfoHandler(gcs::RedisGcsClient &gcs_client)
: gcs_client_(gcs_client) {}
void HandleReportWorkerFailure(const ReportWorkerFailureRequest &request,
ReportWorkerFailureReply *reply,
SendReplyCallback send_reply_callback) override;
private:
gcs::RedisGcsClient &gcs_client_;
};
} // namespace rpc
} // namespace ray
#endif // RAY_GCS_WORKER_INFO_HANDLER_IMPL_H

View file

@ -295,3 +295,16 @@ service ErrorInfoGcsService {
// Report a job error to GCS Service.
rpc ReportJobError(ReportJobErrorRequest) returns (ReportJobErrorReply);
}
message ReportWorkerFailureRequest {
WorkerFailureData worker_failure = 1;
}
message ReportWorkerFailureReply {
}
// Service for worker info access.
service WorkerInfoGcsService {
// Report a worker failure to GCS Service.
rpc ReportWorkerFailure(ReportWorkerFailureRequest) returns (ReportWorkerFailureReply);
}

View file

@ -30,6 +30,8 @@ class GcsRpcClient {
new GrpcClient<StatsGcsService>(address, port, client_call_manager));
error_info_grpc_client_ = std::unique_ptr<GrpcClient<ErrorInfoGcsService>>(
new GrpcClient<ErrorInfoGcsService>(address, port, client_call_manager));
worker_info_grpc_client_ = std::unique_ptr<GrpcClient<WorkerInfoGcsService>>(
new GrpcClient<WorkerInfoGcsService>(address, port, client_call_manager));
};
/// Add job info to gcs server.
@ -119,6 +121,10 @@ class GcsRpcClient {
/// Report a job error to GCS Service.
VOID_RPC_CLIENT_METHOD(ErrorInfoGcsService, ReportJobError, error_info_grpc_client_, )
/// Report a worker failure to GCS Service.
VOID_RPC_CLIENT_METHOD(WorkerInfoGcsService, ReportWorkerFailure,
worker_info_grpc_client_, )
private:
/// The gRPC-generated stub.
std::unique_ptr<GrpcClient<JobInfoGcsService>> job_info_grpc_client_;
@ -128,6 +134,7 @@ class GcsRpcClient {
std::unique_ptr<GrpcClient<TaskInfoGcsService>> task_info_grpc_client_;
std::unique_ptr<GrpcClient<StatsGcsService>> stats_grpc_client_;
std::unique_ptr<GrpcClient<ErrorInfoGcsService>> error_info_grpc_client_;
std::unique_ptr<GrpcClient<WorkerInfoGcsService>> worker_info_grpc_client_;
};
} // namespace rpc

View file

@ -30,6 +30,9 @@ namespace rpc {
#define ERROR_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(ErrorInfoGcsService, HANDLER, CONCURRENCY)
#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER, CONCURRENCY) \
RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER, CONCURRENCY)
class JobInfoGcsServiceHandler {
public:
virtual ~JobInfoGcsServiceHandler() = default;
@ -374,6 +377,42 @@ class ErrorInfoGrpcService : public GrpcService {
ErrorInfoGcsServiceHandler &service_handler_;
};
class WorkerInfoGcsServiceHandler {
public:
virtual ~WorkerInfoGcsServiceHandler() = default;
virtual void HandleReportWorkerFailure(const ReportWorkerFailureRequest &request,
ReportWorkerFailureReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `WorkerInfoGcsService`.
class WorkerInfoGrpcService : public GrpcService {
public:
/// Constructor.
///
/// \param[in] handler The service handler that actually handle the requests.
explicit WorkerInfoGrpcService(boost::asio::io_service &io_service,
WorkerInfoGcsServiceHandler &handler)
: GrpcService(io_service), service_handler_(handler){};
protected:
grpc::Service &GetGrpcService() override { return service_; }
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
WORKER_INFO_SERVICE_RPC_HANDLER(ReportWorkerFailure, 1);
}
private:
/// The grpc async service object.
WorkerInfoGcsService::AsyncService service_;
/// The service handler that actually handle the requests.
WorkerInfoGcsServiceHandler &service_handler_;
};
using JobInfoHandler = JobInfoGcsServiceHandler;
using ActorInfoHandler = ActorInfoGcsServiceHandler;
using NodeInfoHandler = NodeInfoGcsServiceHandler;
@ -381,6 +420,7 @@ using ObjectInfoHandler = ObjectInfoGcsServiceHandler;
using TaskInfoHandler = TaskInfoGcsServiceHandler;
using StatsHandler = StatsGcsServiceHandler;
using ErrorInfoHandler = ErrorInfoGcsServiceHandler;
using WorkerInfoHandler = WorkerInfoGcsServiceHandler;
} // namespace rpc
} // namespace ray