From c222d64ca1a8ebce9e92ccdb3a8840198c6af97e Mon Sep 17 00:00:00 2001 From: micafan <550435771@qq.com> Date: Mon, 13 Apr 2020 19:32:28 +0800 Subject: [PATCH] [GCS] Add MessagePublisher to GCS (#7771) --- BUILD.bazel | 18 ++++++++ src/ray/gcs/pubsub/message_publisher.h | 33 ++++++++++++++ src/ray/gcs/pubsub/redis_message_publisher.cc | 44 +++++++++++++++++++ src/ray/gcs/pubsub/redis_message_publisher.h | 32 ++++++++++++++ 4 files changed, 127 insertions(+) create mode 100644 src/ray/gcs/pubsub/message_publisher.h create mode 100644 src/ray/gcs/pubsub/redis_message_publisher.cc create mode 100644 src/ray/gcs/pubsub/redis_message_publisher.h diff --git a/BUILD.bazel b/BUILD.bazel index f59963167..a8ff8637e 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -983,6 +983,24 @@ cc_test( ], ) +cc_library( + name = "gcs_pubsub", + srcs = [ + "src/ray/gcs/pubsub/redis_message_publisher.cc", + ], + hdrs = [ + "src/ray/gcs/callback.h", + "src/ray/gcs/pubsub/message_publisher.h", + "src/ray/gcs/pubsub/redis_message_publisher.h", + ], + copts = COPTS, + deps = [ + ":ray_common", + ":ray_util", + ":redis_client", + ], +) + cc_library( name = "gcs", srcs = glob( diff --git a/src/ray/gcs/pubsub/message_publisher.h b/src/ray/gcs/pubsub/message_publisher.h new file mode 100644 index 000000000..4b2660d48 --- /dev/null +++ b/src/ray/gcs/pubsub/message_publisher.h @@ -0,0 +1,33 @@ +#ifndef RAY_GCS_PUBSUB_MESSAGE_PUBLISHER_H +#define RAY_GCS_PUBSUB_MESSAGE_PUBLISHER_H + +#include +#include "ray/common/status.h" +#include "ray/gcs/callback.h" +#include "ray/util/io_service_pool.h" +#include "ray/util/logging.h" + +namespace ray { + +namespace gcs { + +class MessagePublisher { + public: + virtual ~MessagePublisher() {} + + virtual Status Init() = 0; + + virtual void Shutdown() = 0; + + virtual Status PublishMessage(const std::string &channel, const std::string &message, + const StatusCallback &callback) = 0; + + protected: + MessagePublisher() {} +}; + +} // namespace gcs + +} // namespace ray + +#endif // RAY_GCS_PUBSUB_MESSAGE_PUBLISHER_H diff --git a/src/ray/gcs/pubsub/redis_message_publisher.cc b/src/ray/gcs/pubsub/redis_message_publisher.cc new file mode 100644 index 000000000..3131b5506 --- /dev/null +++ b/src/ray/gcs/pubsub/redis_message_publisher.cc @@ -0,0 +1,44 @@ +#include "ray/gcs/pubsub/redis_message_publisher.h" +#include "ray/gcs/redis_context.h" + +namespace ray { + +namespace gcs { + +RedisMessagePublisher::RedisMessagePublisher( + const RedisClientOptions &options, std::shared_ptr io_service_pool) + : redis_client_(new RedisClient(options)), + io_service_pool_(std::move(io_service_pool)) {} + +Status RedisMessagePublisher::Init() { + auto io_services = io_service_pool_->GetAll(); + Status status = redis_client_->Connect(io_services); + RAY_LOG(INFO) << "RedisMessagePublisher::Init finished with status " + << status.ToString(); + return status; +} + +void RedisMessagePublisher::Shutdown() { + redis_client_->Disconnect(); + RAY_LOG(INFO) << "RedisMessagePublisher::Shutdown."; +} + +Status RedisMessagePublisher::PublishMessage(const std::string &channel, + const std::string &message, + const StatusCallback &callback) { + std::vector args = {"PUBLISH", channel, message}; + + RedisCallback pub_callback = nullptr; + if (callback) { + pub_callback = [callback](std::shared_ptr reply) { + callback(Status::OK()); + }; + } + // Select shard context by channel. + auto shard_context = redis_client_->GetShardContext(channel); + return shard_context->RunArgvAsync(args, pub_callback); +} + +} // namespace gcs + +} // namespace ray diff --git a/src/ray/gcs/pubsub/redis_message_publisher.h b/src/ray/gcs/pubsub/redis_message_publisher.h new file mode 100644 index 000000000..30833dc88 --- /dev/null +++ b/src/ray/gcs/pubsub/redis_message_publisher.h @@ -0,0 +1,32 @@ +#ifndef RAY_GCS_PUBSUB_REDIS_MESSAGE_PUBLISHER_H +#define RAY_GCS_PUBSUB_REDIS_MESSAGE_PUBLISHER_H + +#include "ray/gcs/pubsub/message_publisher.h" +#include "ray/gcs/redis_client.h" + +namespace ray { + +namespace gcs { + +class RedisMessagePublisher : public MessagePublisher { + public: + RedisMessagePublisher(const RedisClientOptions &options, + std::shared_ptr io_service_pool); + + Status Init() override; + + void Shutdown() override; + + Status PublishMessage(const std::string &channel, const std::string &message, + const StatusCallback &callback) override; + + private: + std::shared_ptr redis_client_; + std::shared_ptr io_service_pool_; +}; + +} // namespace gcs + +} // namespace ray + +#endif // RAY_GCS_PUBSUB_REDIS_MESSAGE_PUBLISHER_H