Refactor GcsJobInfoHandle to GcsJobManager and filename (#9112)

This commit is contained in:
Zhilei Chen 2020-06-28 16:10:13 +08:00 committed by GitHub
parent aac40af948
commit d78128a69c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 36 deletions

View file

@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "job_info_handler_impl.h" #include "gcs_job_manager.h"
#include "ray/gcs/pb_util.h" #include "ray/gcs/pb_util.h"
namespace ray { namespace ray {
namespace rpc { namespace gcs {
void GcsJobInfoHandler::HandleAddJob(const rpc::AddJobRequest &request, void GcsJobManager::HandleAddJob(const rpc::AddJobRequest &request,
rpc::AddJobReply *reply, rpc::AddJobReply *reply,
rpc::SendReplyCallback send_reply_callback) { rpc::SendReplyCallback send_reply_callback) {
JobID job_id = JobID::FromBinary(request.data().job_id()); JobID job_id = JobID::FromBinary(request.data().job_id());
RAY_LOG(INFO) << "Adding job, job id = " << job_id RAY_LOG(INFO) << "Adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid(); << ", driver pid = " << request.data().driver_pid();
@ -41,9 +42,9 @@ void GcsJobInfoHandler::HandleAddJob(const rpc::AddJobRequest &request,
} }
} }
void GcsJobInfoHandler::HandleMarkJobFinished( void GcsJobManager::HandleMarkJobFinished(const rpc::MarkJobFinishedRequest &request,
const rpc::MarkJobFinishedRequest &request, rpc::MarkJobFinishedReply *reply, rpc::MarkJobFinishedReply *reply,
rpc::SendReplyCallback send_reply_callback) { rpc::SendReplyCallback send_reply_callback) {
JobID job_id = JobID::FromBinary(request.job_id()); JobID job_id = JobID::FromBinary(request.job_id());
RAY_LOG(INFO) << "Marking job state, job id = " << job_id; RAY_LOG(INFO) << "Marking job state, job id = " << job_id;
auto job_table_data = auto job_table_data =
@ -67,7 +68,7 @@ void GcsJobInfoHandler::HandleMarkJobFinished(
} }
} }
void GcsJobInfoHandler::ClearJobInfos(const JobID &job_id) { void GcsJobManager::ClearJobInfos(const JobID &job_id) {
// Notify all listeners. // Notify all listeners.
for (auto &listener : job_finished_listeners_) { for (auto &listener : job_finished_listeners_) {
listener(std::make_shared<JobID>(job_id)); listener(std::make_shared<JobID>(job_id));
@ -77,15 +78,15 @@ void GcsJobInfoHandler::ClearJobInfos(const JobID &job_id) {
/// Add listener to monitor the add action of nodes. /// Add listener to monitor the add action of nodes.
/// ///
/// \param listener The handler which process the add of nodes. /// \param listener The handler which process the add of nodes.
void GcsJobInfoHandler::AddJobFinishedListener( void GcsJobManager::AddJobFinishedListener(
std::function<void(std::shared_ptr<JobID>)> listener) { std::function<void(std::shared_ptr<JobID>)> listener) {
RAY_CHECK(listener); RAY_CHECK(listener);
job_finished_listeners_.emplace_back(std::move(listener)); job_finished_listeners_.emplace_back(std::move(listener));
} }
void GcsJobInfoHandler::HandleGetAllJobInfo(const rpc::GetAllJobInfoRequest &request, void GcsJobManager::HandleGetAllJobInfo(const rpc::GetAllJobInfoRequest &request,
rpc::GetAllJobInfoReply *reply, rpc::GetAllJobInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) { rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(INFO) << "Getting all job info."; RAY_LOG(INFO) << "Getting all job info.";
auto on_done = [reply, send_reply_callback]( auto on_done = [reply, send_reply_callback](
const std::unordered_map<JobID, JobTableData> &result) { const std::unordered_map<JobID, JobTableData> &result) {
@ -101,5 +102,5 @@ void GcsJobInfoHandler::HandleGetAllJobInfo(const rpc::GetAllJobInfoRequest &req
} }
} }
} // namespace rpc } // namespace gcs
} // namespace ray } // namespace ray

View file

@ -21,25 +21,26 @@
#include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h"
namespace ray { namespace ray {
namespace rpc { namespace gcs {
/// This implementation class of `JobInfoHandler`. /// This implementation class of `JobInfoHandler`.
class GcsJobInfoHandler : public rpc::JobInfoHandler { class GcsJobManager : public rpc::JobInfoHandler {
public: public:
explicit GcsJobInfoHandler(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage, explicit GcsJobManager(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub) std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub)
: gcs_table_storage_(std::move(gcs_table_storage)), : gcs_table_storage_(std::move(gcs_table_storage)),
gcs_pub_sub_(std::move(gcs_pub_sub)) {} gcs_pub_sub_(std::move(gcs_pub_sub)) {}
void HandleAddJob(const AddJobRequest &request, AddJobReply *reply, void HandleAddJob(const rpc::AddJobRequest &request, rpc::AddJobReply *reply,
SendReplyCallback send_reply_callback) override; rpc::SendReplyCallback send_reply_callback) override;
void HandleMarkJobFinished(const MarkJobFinishedRequest &request, void HandleMarkJobFinished(const rpc::MarkJobFinishedRequest &request,
MarkJobFinishedReply *reply, rpc::MarkJobFinishedReply *reply,
SendReplyCallback send_reply_callback) override; rpc::SendReplyCallback send_reply_callback) override;
void HandleGetAllJobInfo(const GetAllJobInfoRequest &request, GetAllJobInfoReply *reply, void HandleGetAllJobInfo(const rpc::GetAllJobInfoRequest &request,
SendReplyCallback send_reply_callback) override; rpc::GetAllJobInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void AddJobFinishedListener( void AddJobFinishedListener(
std::function<void(std::shared_ptr<JobID>)> listener) override; std::function<void(std::shared_ptr<JobID>)> listener) override;
@ -54,5 +55,5 @@ class GcsJobInfoHandler : public rpc::JobInfoHandler {
void ClearJobInfos(const JobID &job_id); void ClearJobInfos(const JobID &job_id);
}; };
} // namespace rpc } // namespace gcs
} // namespace ray } // namespace ray

View file

@ -16,9 +16,9 @@
#include "error_info_handler_impl.h" #include "error_info_handler_impl.h"
#include "gcs_actor_manager.h" #include "gcs_actor_manager.h"
#include "gcs_job_manager.h"
#include "gcs_node_manager.h" #include "gcs_node_manager.h"
#include "gcs_object_manager.h" #include "gcs_object_manager.h"
#include "job_info_handler_impl.h"
#include "ray/common/network_util.h" #include "ray/common/network_util.h"
#include "ray/common/ray_config.h" #include "ray/common/ray_config.h"
#include "stats_handler_impl.h" #include "stats_handler_impl.h"
@ -71,8 +71,8 @@ void GcsServer::Start() {
new rpc::TaskInfoGrpcService(main_service_, *task_info_handler_)); new rpc::TaskInfoGrpcService(main_service_, *task_info_handler_));
rpc_server_.RegisterService(*task_info_service_); rpc_server_.RegisterService(*task_info_service_);
InitJobInfoHandler(); InitGcsJobManager();
job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *job_info_handler_)); job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *gcs_job_manager_));
rpc_server_.RegisterService(*job_info_service_); rpc_server_.RegisterService(*job_info_service_);
actor_info_service_.reset( actor_info_service_.reset(
@ -198,10 +198,10 @@ void GcsServer::InitGcsActorManager() {
RAY_CHECK_OK(gcs_pub_sub_->SubscribeAll(WORKER_FAILURE_CHANNEL, on_subscribe, nullptr)); RAY_CHECK_OK(gcs_pub_sub_->SubscribeAll(WORKER_FAILURE_CHANNEL, on_subscribe, nullptr));
} }
void GcsServer::InitJobInfoHandler() { void GcsServer::InitGcsJobManager() {
job_info_handler_ = std::unique_ptr<rpc::GcsJobInfoHandler>( gcs_job_manager_ =
new rpc::GcsJobInfoHandler(gcs_table_storage_, gcs_pub_sub_)); std::unique_ptr<GcsJobManager>(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_));
job_info_handler_->AddJobFinishedListener([this](std::shared_ptr<JobID> job_id) { gcs_job_manager_->AddJobFinishedListener([this](std::shared_ptr<JobID> job_id) {
gcs_actor_manager_->OnJobFinished(*job_id); gcs_actor_manager_->OnJobFinished(*job_id);
}); });
} }

View file

@ -38,6 +38,7 @@ struct GcsServerConfig {
class GcsNodeManager; class GcsNodeManager;
class GcsActorManager; class GcsActorManager;
class GcsJobManager;
/// The GcsServer will take over all requests from ServiceBasedGcsClient and transparent /// The GcsServer will take over all requests from ServiceBasedGcsClient and transparent
/// transmit the command to the backend reliable storage for the time being. /// transmit the command to the backend reliable storage for the time being.
@ -80,8 +81,8 @@ class GcsServer {
/// Initialize the gcs actor manager. /// Initialize the gcs actor manager.
virtual void InitGcsActorManager(); virtual void InitGcsActorManager();
/// The job info handler /// Initialize the gcs job manager.
virtual void InitJobInfoHandler(); virtual void InitGcsJobManager();
/// The object manager /// The object manager
virtual std::unique_ptr<GcsObjectManager> InitObjectManager(); virtual std::unique_ptr<GcsObjectManager> InitObjectManager();
@ -121,7 +122,7 @@ class GcsServer {
/// The gcs actor manager /// The gcs actor manager
std::shared_ptr<GcsActorManager> gcs_actor_manager_; std::shared_ptr<GcsActorManager> gcs_actor_manager_;
/// Job info handler and service /// Job info handler and service
std::unique_ptr<rpc::JobInfoHandler> job_info_handler_; std::unique_ptr<GcsJobManager> gcs_job_manager_;
std::unique_ptr<rpc::JobInfoGrpcService> job_info_service_; std::unique_ptr<rpc::JobInfoGrpcService> job_info_service_;
/// Actor info service /// Actor info service
std::unique_ptr<rpc::ActorInfoGrpcService> actor_info_service_; std::unique_ptr<rpc::ActorInfoGrpcService> actor_info_service_;