diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index c574565c5..0ea5b47b1 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -350,6 +350,7 @@ void GcsServer::PrintDebugInfo() { << gcs_actor_manager_->DebugString() << "\n" << gcs_object_manager_->DebugString() << "\n" << gcs_placement_group_manager_->DebugString() << "\n" + << gcs_pub_sub_->DebugString() << "\n" << ((rpc::DefaultTaskInfoHandler *)task_info_handler_.get())->DebugString(); // TODO(ffbin): We will get the session_dir in the next PR, and write the log to // gcs_debug_state.txt. diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.cc b/src/ray/gcs/pubsub/gcs_pub_sub.cc index ee6ac955b..68930f857 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.cc +++ b/src/ray/gcs/pubsub/gcs_pub_sub.cc @@ -53,6 +53,7 @@ Status GcsPubSub::Unsubscribe(const std::string &channel_name, const std::string auto channel = channels_.find(pattern); RAY_CHECK(channel != channels_.end()); channel->second.command_queue.push_back(Command()); + total_commands_queued_++; // Process the first command on the queue, if possible. return ExecuteCommandIfPossible(channel->first, channel->second); @@ -74,6 +75,7 @@ Status GcsPubSub::SubscribeInternal(const std::string &channel_name, // Add the SUBSCRIBE command to the queue. channel->second.command_queue.push_back(Command(subscribe, done, is_sub_or_unsub_all)); + total_commands_queued_++; // Process the first command on the queue, if possible. return ExecuteCommandIfPossible(channel->first, channel->second); @@ -152,6 +154,7 @@ Status GcsPubSub::ExecuteCommandIfPossible(const std::string &channel_key, } channel.pending_reply = true; channel.command_queue.pop_front(); + total_commands_queued_--; } else if (!command.is_subscribe && channel.callback_index != -1) { // The next command is UNSUBSCRIBE and we are currently subscribed, so we // can execute the command. The reply for will be received through the @@ -163,6 +166,7 @@ Status GcsPubSub::ExecuteCommandIfPossible(const std::string &channel_key, } channel.pending_reply = true; channel.command_queue.pop_front(); + total_commands_queued_--; } else if (!channel.pending_reply) { // There is no in-flight command, but the next command to execute is not // runnable. The caller must have sent a command out-of-order. @@ -193,5 +197,14 @@ bool GcsPubSub::IsUnsubscribed(const std::string &channel, const std::string &id return !channels_.contains(pattern); } +std::string GcsPubSub::DebugString() const { + absl::MutexLock lock(&mutex_); + std::ostringstream stream; + stream << "GcsPubSub:"; + stream << "\n- num channels subscribed to: " << channels_.size(); + stream << "\n- total commands queued: " << total_commands_queued_; + return stream.str(); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index 06748dffb..e5b3c1509 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -94,6 +94,8 @@ class GcsPubSub { /// \return Whether the specified ID under the specified channel is unsubscribed. bool IsUnsubscribed(const std::string &channel, const std::string &id); + std::string DebugString() const; + private: /// Represents a caller's command to subscribe or unsubscribe to a given /// channel. @@ -160,9 +162,11 @@ class GcsPubSub { std::shared_ptr redis_client_; /// Mutex to protect the subscribe_callback_index_ field. - absl::Mutex mutex_; + mutable absl::Mutex mutex_; absl::flat_hash_map channels_ GUARDED_BY(mutex_); + + size_t total_commands_queued_ GUARDED_BY(mutex_); }; } // namespace gcs