[GCS]GCS adapts to job table pub sub (#8145)

This commit is contained in:
fangfengbin 2020-04-24 16:33:25 +08:00 committed by GitHub
parent 2298f6fb40
commit 713e375d50
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 57 additions and 30 deletions

View file

@ -360,6 +360,7 @@ cc_library(
copts = COPTS,
deps = [
":gcs",
":gcs_pub_sub_lib",
":gcs_service_rpc",
":node_manager_rpc",
":raylet_lib",
@ -862,6 +863,7 @@ cc_library(
copts = COPTS,
deps = [
":gcs",
":gcs_pub_sub_lib",
":gcs_service_rpc",
":redis_store_client",
],

View file

@ -20,8 +20,7 @@ namespace gcs {
ServiceBasedJobInfoAccessor::ServiceBasedJobInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl),
job_sub_executor_(client_impl->GetRedisGcsClient().job_table()) {}
: client_impl_(client_impl) {}
Status ServiceBasedJobInfoAccessor::AsyncAdd(
const std::shared_ptr<JobTableData> &data_ptr, const StatusCallback &callback) {
@ -64,13 +63,15 @@ Status ServiceBasedJobInfoAccessor::AsyncSubscribeToFinishedJobs(
const SubscribeCallback<JobID, JobTableData> &subscribe, const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing finished job.";
RAY_CHECK(subscribe != nullptr);
auto on_subscribe = [subscribe](const JobID &job_id, const JobTableData &job_data) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
JobTableData job_data;
job_data.ParseFromString(data);
if (job_data.is_dead()) {
subscribe(job_id, job_data);
subscribe(JobID::FromBinary(id), job_data);
}
};
Status status =
job_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done);
client_impl_->GetGcsPubSub().SubscribeAll(JOB_CHANNEL, on_subscribe, done);
RAY_LOG(DEBUG) << "Finished subscribing finished job.";
return status;
}

View file

@ -45,9 +45,6 @@ class ServiceBasedJobInfoAccessor : public JobInfoAccessor {
private:
ServiceBasedGcsClient *client_impl_;
typedef SubscriptionExecutor<JobID, JobTableData, JobTable> JobSubscriptionExecutor;
JobSubscriptionExecutor job_sub_executor_;
};
/// \class ServiceBasedActorInfoAccessor

View file

@ -31,11 +31,14 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
return Status::Invalid("gcs service address is invalid!");
}
// Connect to gcs
// Connect to gcs.
redis_gcs_client_.reset(new RedisGcsClient(options_));
RAY_CHECK_OK(redis_gcs_client_->Connect(io_service));
// Get gcs service address
// Init gcs pub sub instance.
gcs_pub_sub_.reset(new GcsPubSub(redis_gcs_client_->GetRedisClient()));
// Get gcs service address.
auto get_server_address = [this]() {
std::pair<std::string, int> address;
GetGcsServerAddressFromRedis(redis_gcs_client_->primary_context()->sync_context(),
@ -44,7 +47,7 @@ Status ServiceBasedGcsClient::Connect(boost::asio::io_service &io_service) {
};
std::pair<std::string, int> address = get_server_address();
// Connect to gcs service
// Connect to gcs service.
client_call_manager_.reset(new rpc::ClientCallManager(io_service));
gcs_rpc_client_.reset(new rpc::GcsRpcClient(address.first, address.second,
*client_call_manager_, get_server_address));

View file

@ -15,6 +15,7 @@
#ifndef RAY_GCS_SERVICE_BASED_GCS_CLIENT_H
#define RAY_GCS_SERVICE_BASED_GCS_CLIENT_H
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_client.h"
@ -23,14 +24,14 @@ namespace gcs {
class RAY_EXPORT ServiceBasedGcsClient : public GcsClient {
public:
ServiceBasedGcsClient(const GcsClientOptions &options);
ServiceBasedGcsClient(RedisGcsClient *redis_gcs_client);
explicit ServiceBasedGcsClient(const GcsClientOptions &options);
Status Connect(boost::asio::io_service &io_service) override;
void Disconnect() override;
GcsPubSub &GetGcsPubSub() { return *gcs_pub_sub_; }
RedisGcsClient &GetRedisGcsClient() { return *redis_gcs_client_; }
rpc::GcsRpcClient &GetGcsRpcClient() { return *gcs_rpc_client_; }
@ -46,6 +47,8 @@ class RAY_EXPORT ServiceBasedGcsClient : public GcsClient {
std::unique_ptr<RedisGcsClient> redis_gcs_client_;
std::unique_ptr<GcsPubSub> gcs_pub_sub_;
// Gcs rpc client
std::unique_ptr<rpc::GcsRpcClient> gcs_rpc_client_;
std::unique_ptr<rpc::ClientCallManager> client_call_manager_;

View file

@ -40,15 +40,18 @@ void GcsServer::Start() {
// Init backend client.
InitBackendClient();
// Init gcs node_manager
// Init gcs node_manager.
InitGcsNodeManager();
// Init gcs detector
// Init gcs pub sub instance.
gcs_pub_sub_ = std::make_shared<gcs::GcsPubSub>(redis_gcs_client_->GetRedisClient());
// Init gcs detector.
gcs_redis_failure_detector_ = std::make_shared<GcsRedisFailureDetector>(
main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); });
gcs_redis_failure_detector_->Start();
// Init gcs actor manager
// Init gcs actor manager.
InitGcsActorManager();
// Register rpc service.
@ -93,7 +96,7 @@ void GcsServer::Start() {
// Run rpc server.
rpc_server_.Run();
// Store gcs rpc server address in redis
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
is_started_ = true;
@ -138,7 +141,7 @@ void GcsServer::InitGcsActorManager() {
std::unique_ptr<rpc::JobInfoHandler> GcsServer::InitJobInfoHandler() {
return std::unique_ptr<rpc::DefaultJobInfoHandler>(
new rpc::DefaultJobInfoHandler(*redis_gcs_client_));
new rpc::DefaultJobInfoHandler(*redis_gcs_client_, gcs_pub_sub_));
}
std::unique_ptr<rpc::ActorInfoHandler> GcsServer::InitActorInfoHandler() {

View file

@ -15,6 +15,7 @@
#ifndef RAY_GCS_GCS_SERVER_H
#define RAY_GCS_GCS_SERVER_H
#include <ray/gcs/pubsub/gcs_pub_sub.h>
#include <ray/gcs/redis_gcs_client.h>
#include <ray/rpc/gcs_server/gcs_rpc_server.h>
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
@ -146,6 +147,8 @@ class GcsServer {
std::unique_ptr<rpc::WorkerInfoGrpcService> worker_info_service_;
/// Backend client
std::shared_ptr<RedisGcsClient> redis_gcs_client_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// Gcs service state flag, which is used for ut.
bool is_started_ = false;
bool is_stopped_ = false;

View file

@ -13,6 +13,7 @@
// limitations under the License.
#include "job_info_handler_impl.h"
#include "ray/gcs/pb_util.h"
namespace ray {
namespace rpc {
@ -20,14 +21,17 @@ void DefaultJobInfoHandler::HandleAddJob(const rpc::AddJobRequest &request,
rpc::AddJobReply *reply,
rpc::SendReplyCallback send_reply_callback) {
JobID job_id = JobID::FromBinary(request.data().job_id());
RAY_LOG(DEBUG) << "Adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
RAY_LOG(INFO) << "Adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
auto job_table_data = std::make_shared<JobTableData>();
job_table_data->CopyFrom(request.data());
auto on_done = [job_id, request, reply, send_reply_callback](Status status) {
auto on_done = [job_id, request, reply, send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
} else {
RAY_LOG(INFO) << "Finished adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
@ -36,18 +40,23 @@ void DefaultJobInfoHandler::HandleAddJob(const rpc::AddJobRequest &request,
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished adding job, job id = " << job_id
<< ", driver pid = " << request.data().driver_pid();
}
void DefaultJobInfoHandler::HandleMarkJobFinished(
const rpc::MarkJobFinishedRequest &request, rpc::MarkJobFinishedReply *reply,
rpc::SendReplyCallback send_reply_callback) {
JobID job_id = JobID::FromBinary(request.job_id());
RAY_LOG(DEBUG) << "Marking job state, job id = " << job_id;
auto on_done = [job_id, reply, send_reply_callback](Status status) {
RAY_LOG(INFO) << "Marking job state, job id = " << job_id;
auto job_table_data =
gcs::CreateJobTableData(job_id, /*is_dead*/ true, std::time(nullptr), "", -1);
auto on_done = [this, job_id, job_table_data, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Binary(),
job_table_data->SerializeAsString(), nullptr));
RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
@ -56,7 +65,6 @@ void DefaultJobInfoHandler::HandleMarkJobFinished(
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished marking job state, job id = " << job_id;
}
} // namespace rpc
} // namespace ray

View file

@ -15,6 +15,7 @@
#ifndef RAY_GCS_JOB_INFO_HANDLER_IMPL_H
#define RAY_GCS_JOB_INFO_HANDLER_IMPL_H
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
@ -24,8 +25,9 @@ namespace rpc {
/// This implementation class of `JobInfoHandler`.
class DefaultJobInfoHandler : public rpc::JobInfoHandler {
public:
explicit DefaultJobInfoHandler(gcs::RedisGcsClient &gcs_client)
: gcs_client_(gcs_client) {}
explicit DefaultJobInfoHandler(gcs::RedisGcsClient &gcs_client,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
: gcs_client_(gcs_client), gcs_pub_sub_(gcs_pub_sub) {}
void HandleAddJob(const AddJobRequest &request, AddJobReply *reply,
SendReplyCallback send_reply_callback) override;
@ -36,6 +38,7 @@ class DefaultJobInfoHandler : public rpc::JobInfoHandler {
private:
gcs::RedisGcsClient &gcs_client_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
};
} // namespace rpc

View file

@ -25,6 +25,8 @@
namespace ray {
namespace gcs {
#define JOB_CHANNEL "JOB"
/// \class GcsPubSub
///
/// GcsPubSub supports publishing, subscription and unsubscribing of data.

View file

@ -81,6 +81,8 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
return redis_client_->GetPrimaryContext();
}
std::shared_ptr<RedisClient> GetRedisClient() const { return redis_client_; }
/// The following xxx_table methods implement the Accessor interfaces.
/// Implements the Actors() interface.
LogBasedActorTable &log_based_actor_table();
@ -113,7 +115,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
// might be used, if available.
CommandType command_type_{CommandType::kUnknown};
std::unique_ptr<RedisClient> redis_client_;
std::shared_ptr<RedisClient> redis_client_;
std::unique_ptr<ObjectTable> object_table_;
std::unique_ptr<raylet::TaskTable> raylet_task_table_;