[GCS] Add MessagePublisher to GCS (#7771)

This commit is contained in:
micafan 2020-04-13 19:32:28 +08:00 committed by GitHub
parent 7c52359b00
commit c222d64ca1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 0 deletions

View file

@ -983,6 +983,24 @@ cc_test(
],
)
cc_library(
name = "gcs_pubsub",
srcs = [
"src/ray/gcs/pubsub/redis_message_publisher.cc",
],
hdrs = [
"src/ray/gcs/callback.h",
"src/ray/gcs/pubsub/message_publisher.h",
"src/ray/gcs/pubsub/redis_message_publisher.h",
],
copts = COPTS,
deps = [
":ray_common",
":ray_util",
":redis_client",
],
)
cc_library(
name = "gcs",
srcs = glob(

View file

@ -0,0 +1,33 @@
#ifndef RAY_GCS_PUBSUB_MESSAGE_PUBLISHER_H
#define RAY_GCS_PUBSUB_MESSAGE_PUBLISHER_H
#include <string>
#include "ray/common/status.h"
#include "ray/gcs/callback.h"
#include "ray/util/io_service_pool.h"
#include "ray/util/logging.h"
namespace ray {
namespace gcs {
class MessagePublisher {
public:
virtual ~MessagePublisher() {}
virtual Status Init() = 0;
virtual void Shutdown() = 0;
virtual Status PublishMessage(const std::string &channel, const std::string &message,
const StatusCallback &callback) = 0;
protected:
MessagePublisher() {}
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_PUBSUB_MESSAGE_PUBLISHER_H

View file

@ -0,0 +1,44 @@
#include "ray/gcs/pubsub/redis_message_publisher.h"
#include "ray/gcs/redis_context.h"
namespace ray {
namespace gcs {
RedisMessagePublisher::RedisMessagePublisher(
const RedisClientOptions &options, std::shared_ptr<IOServicePool> io_service_pool)
: redis_client_(new RedisClient(options)),
io_service_pool_(std::move(io_service_pool)) {}
Status RedisMessagePublisher::Init() {
auto io_services = io_service_pool_->GetAll();
Status status = redis_client_->Connect(io_services);
RAY_LOG(INFO) << "RedisMessagePublisher::Init finished with status "
<< status.ToString();
return status;
}
void RedisMessagePublisher::Shutdown() {
redis_client_->Disconnect();
RAY_LOG(INFO) << "RedisMessagePublisher::Shutdown.";
}
Status RedisMessagePublisher::PublishMessage(const std::string &channel,
const std::string &message,
const StatusCallback &callback) {
std::vector<std::string> args = {"PUBLISH", channel, message};
RedisCallback pub_callback = nullptr;
if (callback) {
pub_callback = [callback](std::shared_ptr<CallbackReply> reply) {
callback(Status::OK());
};
}
// Select shard context by channel.
auto shard_context = redis_client_->GetShardContext(channel);
return shard_context->RunArgvAsync(args, pub_callback);
}
} // namespace gcs
} // namespace ray

View file

@ -0,0 +1,32 @@
#ifndef RAY_GCS_PUBSUB_REDIS_MESSAGE_PUBLISHER_H
#define RAY_GCS_PUBSUB_REDIS_MESSAGE_PUBLISHER_H
#include "ray/gcs/pubsub/message_publisher.h"
#include "ray/gcs/redis_client.h"
namespace ray {
namespace gcs {
class RedisMessagePublisher : public MessagePublisher {
public:
RedisMessagePublisher(const RedisClientOptions &options,
std::shared_ptr<IOServicePool> io_service_pool);
Status Init() override;
void Shutdown() override;
Status PublishMessage(const std::string &channel, const std::string &message,
const StatusCallback &callback) override;
private:
std::shared_ptr<RedisClient> redis_client_;
std::shared_ptr<IOServicePool> io_service_pool_;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_PUBSUB_REDIS_MESSAGE_PUBLISHER_H