Pubsub registration / unregistration idempotency (#15896)

* Make AddEntry idempotent.

* Done.
This commit is contained in:
SangBin Cho 2021-05-20 18:40:06 -07:00 committed by GitHub
parent 061e3fbde3
commit a1375a955b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 17 deletions

View file

@ -164,13 +164,13 @@ class MockDistributedPublisher : public pubsub::PublisherInterface {
subscription_callback_map_(subscription_callback_map),
subscription_failure_callback_map_(subscription_failure_callback_map),
publisher_id_(publisher_id) {}
~MockDistributedPublisher() = default;
void RegisterSubscription(const rpc::ChannelType channel_type,
bool RegisterSubscription(const rpc::ChannelType channel_type,
const pubsub::SubscriberID &subscriber_id,
const std::string &key_id_binary) {
RAY_CHECK(false) << "No need to implement it for testing.";
return false;
}
void Publish(const rpc::ChannelType channel_type, const rpc::PubMessage &pub_message,

View file

@ -37,7 +37,7 @@ class MockSubscriber : public pubsub::SubscriberInterface {
class MockPublisher : public pubsub::PublisherInterface {
public:
MOCK_METHOD3(RegisterSubscription, void(const rpc::ChannelType channel_type,
MOCK_METHOD3(RegisterSubscription, bool(const rpc::ChannelType channel_type,
const pubsub::SubscriberID &subscriber_id,
const std::string &key_id_binary));

View file

@ -21,13 +21,16 @@ namespace pubsub {
namespace pub_internal {
template <typename KeyIdType>
void SubscriptionIndex<KeyIdType>::AddEntry(const std::string &key_id_binary,
bool SubscriptionIndex<KeyIdType>::AddEntry(const std::string &key_id_binary,
const SubscriberID &subscriber_id) {
const auto key_id = KeyIdType::FromBinary(key_id_binary);
auto &subscribing_key_ids = subscribers_to_key_id_[subscriber_id];
RAY_CHECK(subscribing_key_ids.emplace(key_id).second);
auto key_added = subscribing_key_ids.emplace(key_id).second;
auto &subscriber_map = key_id_to_subscribers_[key_id];
RAY_CHECK(subscriber_map.emplace(subscriber_id).second);
auto subscriber_added = subscriber_map.emplace(subscriber_id).second;
RAY_CHECK(key_added == subscriber_added);
return key_added;
}
template <typename KeyIdType>
@ -82,7 +85,7 @@ bool SubscriptionIndex<KeyIdType>::EraseSubscriber(const SubscriberID &subscribe
template <typename KeyIdType>
bool SubscriptionIndex<KeyIdType>::EraseEntry(const std::string &key_id_binary,
const SubscriberID &subscriber_id) {
// Erase from subscribers_to_objects_;
// Erase keys from subscribers.
const auto key_id = KeyIdType::FromBinary(key_id_binary);
auto subscribers_to_message_it = subscribers_to_key_id_.find(subscriber_id);
if (subscribers_to_message_it == subscribers_to_key_id_.end()) {
@ -99,7 +102,7 @@ bool SubscriptionIndex<KeyIdType>::EraseEntry(const std::string &key_id_binary,
subscribers_to_key_id_.erase(subscribers_to_message_it);
}
// Erase from objects_to_subscribers_.
// Erase subscribers from keys (reverse index).
auto key_id_to_subscriber_it = key_id_to_subscribers_.find(key_id);
// If code reaches this line, that means the object id was in the index.
RAY_CHECK(key_id_to_subscriber_it != key_id_to_subscribers_.end());
@ -215,7 +218,7 @@ void Publisher::ConnectToSubscriber(const SubscriberID &subscriber_id,
subscriber->PublishIfPossible();
}
void Publisher::RegisterSubscription(const rpc::ChannelType channel_type,
bool Publisher::RegisterSubscription(const rpc::ChannelType channel_type,
const SubscriberID &subscriber_id,
const std::string &key_id_binary) {
absl::MutexLock lock(&mutex_);
@ -226,7 +229,7 @@ void Publisher::RegisterSubscription(const rpc::ChannelType channel_type,
}
auto subscription_index_it = subscription_index_map_.find(channel_type);
RAY_CHECK(subscription_index_it != subscription_index_map_.end());
subscription_index_it->second.AddEntry(key_id_binary, subscriber_id);
return subscription_index_it->second.AddEntry(key_id_binary, subscriber_id);
}
void Publisher::Publish(const rpc::ChannelType channel_type,

View file

@ -42,8 +42,8 @@ class SubscriptionIndex {
~SubscriptionIndex() = default;
/// Add a new entry to the index.
/// NOTE: If the entry already exists, it raises assert failure.
void AddEntry(const std::string &key_id_binary, const SubscriberID &subscriber_id);
/// NOTE: The method is idempotent. If it adds a duplicated entry, it will be no-op.
bool AddEntry(const std::string &key_id_binary, const SubscriberID &subscriber_id);
/// Return the set of subscriber ids that are subscribing to the given object ids.
absl::optional<std::reference_wrapper<const absl::flat_hash_set<SubscriberID>>>
@ -53,9 +53,7 @@ class SubscriptionIndex {
/// NOTE: It cannot erase subscribers that were never added.
bool EraseSubscriber(const SubscriberID &subscriber_id);
/// Erase the object id and subscriber id from the index. Return the number of erased
/// entries.
/// NOTE: It cannot erase subscribers that were never added.
/// Erase the object id and subscriber id from the index.
bool EraseEntry(const std::string &key_id_binary, const SubscriberID &subscriber_id);
/// Return true if the object id exists in the index.
@ -161,7 +159,8 @@ class PublisherInterface {
/// \param channel_type The type of the channel.
/// \param subscriber_id The node id of the subscriber.
/// \param key_id_binary The key_id that the subscriber is subscribing to.
virtual void RegisterSubscription(const rpc::ChannelType channel_type,
/// \return True if registration is new. False otherwise.
virtual bool RegisterSubscription(const rpc::ChannelType channel_type,
const SubscriberID &subscriber_id,
const std::string &key_id_binary) = 0;
@ -245,7 +244,8 @@ class Publisher : public PublisherInterface {
/// \param channel_type The type of the channel.
/// \param subscriber_id The node id of the subscriber.
/// \param key_id_binary The key_id that the subscriber is subscribing to.
void RegisterSubscription(const rpc::ChannelType channel_type,
/// \return True if the registration is new. False otherwise.
bool RegisterSubscription(const rpc::ChannelType channel_type,
const SubscriberID &subscriber_id,
const std::string &key_id_binary) override;
@ -310,6 +310,7 @@ class Publisher : public PublisherInterface {
FRIEND_TEST(PublisherTest, TestNodeFailureWhenConnectionDoesntExist);
FRIEND_TEST(PublisherTest, TestUnregisterSubscription);
FRIEND_TEST(PublisherTest, TestUnregisterSubscriber);
FRIEND_TEST(PublisherTest, TestRegistrationIdempotency);
/// Testing only. Return true if there's no metadata remained in the private attribute.
bool CheckNoLeaks() const;

View file

@ -204,6 +204,40 @@ TEST_F(PublisherTest, TestSubscriptionIndexEraseSubscriber) {
ASSERT_TRUE(subscription_index.CheckNoLeaks());
}
TEST_F(PublisherTest, TestSubscriptionIndexIdempotency) {
///
/// Test the subscription index is idempotent.
///
auto node_id = NodeID::FromRandom();
auto oid = ObjectID::FromRandom();
SubscriptionIndex<ObjectID> subscription_index;
// Add the same entry many times.
for (int i = 0; i < 5; i++) {
subscription_index.AddEntry(oid.Binary(), node_id);
}
ASSERT_TRUE(subscription_index.HasKeyId(oid.Binary()));
ASSERT_TRUE(subscription_index.HasSubscriber(node_id));
// Erase it and make sure it is erased.
for (int i = 0; i < 5; i++) {
subscription_index.EraseEntry(oid.Binary(), node_id);
}
ASSERT_TRUE(subscription_index.CheckNoLeaks());
// Random mix.
subscription_index.AddEntry(oid.Binary(), node_id);
subscription_index.AddEntry(oid.Binary(), node_id);
subscription_index.EraseEntry(oid.Binary(), node_id);
subscription_index.EraseEntry(oid.Binary(), node_id);
ASSERT_TRUE(subscription_index.CheckNoLeaks());
subscription_index.AddEntry(oid.Binary(), node_id);
subscription_index.AddEntry(oid.Binary(), node_id);
ASSERT_TRUE(subscription_index.HasKeyId(oid.Binary()));
ASSERT_TRUE(subscription_index.HasSubscriber(node_id));
}
TEST_F(PublisherTest, TestSubscriber) {
std::unordered_set<ObjectID> object_ids_published;
rpc::PubsubLongPollingReply reply;
@ -889,6 +923,31 @@ TEST_F(PublisherTest, TestUnregisterSubscriber) {
ASSERT_TRUE(object_status_publisher_->CheckNoLeaks());
}
// Test if registration / unregistration is idempotent.
TEST_F(PublisherTest, TestRegistrationIdempotency) {
const auto subscriber_node_id = NodeID::FromRandom();
const auto oid = ObjectID::FromRandom();
ASSERT_TRUE(object_status_publisher_->RegisterSubscription(
rpc::ChannelType::WORKER_OBJECT_EVICTION, subscriber_node_id, oid.Binary()));
ASSERT_FALSE(object_status_publisher_->RegisterSubscription(
rpc::ChannelType::WORKER_OBJECT_EVICTION, subscriber_node_id, oid.Binary()));
ASSERT_FALSE(object_status_publisher_->RegisterSubscription(
rpc::ChannelType::WORKER_OBJECT_EVICTION, subscriber_node_id, oid.Binary()));
ASSERT_FALSE(object_status_publisher_->RegisterSubscription(
rpc::ChannelType::WORKER_OBJECT_EVICTION, subscriber_node_id, oid.Binary()));
ASSERT_FALSE(object_status_publisher_->CheckNoLeaks());
ASSERT_TRUE(object_status_publisher_->UnregisterSubscription(
rpc::ChannelType::WORKER_OBJECT_EVICTION, subscriber_node_id, oid.Binary()));
ASSERT_FALSE(object_status_publisher_->UnregisterSubscription(
rpc::ChannelType::WORKER_OBJECT_EVICTION, subscriber_node_id, oid.Binary()));
ASSERT_TRUE(object_status_publisher_->CheckNoLeaks());
ASSERT_TRUE(object_status_publisher_->RegisterSubscription(
rpc::ChannelType::WORKER_OBJECT_EVICTION, subscriber_node_id, oid.Binary()));
ASSERT_FALSE(object_status_publisher_->CheckNoLeaks());
ASSERT_TRUE(object_status_publisher_->UnregisterSubscription(
rpc::ChannelType::WORKER_OBJECT_EVICTION, subscriber_node_id, oid.Binary()));
}
} // namespace pubsub
} // namespace ray