Debug info to GCS pub sub (#13564)

This commit is contained in:
Stephanie Wang 2021-01-19 14:55:23 -08:00 committed by GitHub
parent a0d08c2cc6
commit bfe147a6a8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 1 deletions

View file

@ -350,6 +350,7 @@ void GcsServer::PrintDebugInfo() {
<< gcs_actor_manager_->DebugString() << "\n"
<< gcs_object_manager_->DebugString() << "\n"
<< gcs_placement_group_manager_->DebugString() << "\n"
<< gcs_pub_sub_->DebugString() << "\n"
<< ((rpc::DefaultTaskInfoHandler *)task_info_handler_.get())->DebugString();
// TODO(ffbin): We will get the session_dir in the next PR, and write the log to
// gcs_debug_state.txt.

View file

@ -53,6 +53,7 @@ Status GcsPubSub::Unsubscribe(const std::string &channel_name, const std::string
auto channel = channels_.find(pattern);
RAY_CHECK(channel != channels_.end());
channel->second.command_queue.push_back(Command());
total_commands_queued_++;
// Process the first command on the queue, if possible.
return ExecuteCommandIfPossible(channel->first, channel->second);
@ -74,6 +75,7 @@ Status GcsPubSub::SubscribeInternal(const std::string &channel_name,
// Add the SUBSCRIBE command to the queue.
channel->second.command_queue.push_back(Command(subscribe, done, is_sub_or_unsub_all));
total_commands_queued_++;
// Process the first command on the queue, if possible.
return ExecuteCommandIfPossible(channel->first, channel->second);
@ -152,6 +154,7 @@ Status GcsPubSub::ExecuteCommandIfPossible(const std::string &channel_key,
}
channel.pending_reply = true;
channel.command_queue.pop_front();
total_commands_queued_--;
} else if (!command.is_subscribe && channel.callback_index != -1) {
// The next command is UNSUBSCRIBE and we are currently subscribed, so we
// can execute the command. The reply for will be received through the
@ -163,6 +166,7 @@ Status GcsPubSub::ExecuteCommandIfPossible(const std::string &channel_key,
}
channel.pending_reply = true;
channel.command_queue.pop_front();
total_commands_queued_--;
} else if (!channel.pending_reply) {
// There is no in-flight command, but the next command to execute is not
// runnable. The caller must have sent a command out-of-order.
@ -193,5 +197,14 @@ bool GcsPubSub::IsUnsubscribed(const std::string &channel, const std::string &id
return !channels_.contains(pattern);
}
std::string GcsPubSub::DebugString() const {
absl::MutexLock lock(&mutex_);
std::ostringstream stream;
stream << "GcsPubSub:";
stream << "\n- num channels subscribed to: " << channels_.size();
stream << "\n- total commands queued: " << total_commands_queued_;
return stream.str();
}
} // namespace gcs
} // namespace ray

View file

@ -94,6 +94,8 @@ class GcsPubSub {
/// \return Whether the specified ID under the specified channel is unsubscribed.
bool IsUnsubscribed(const std::string &channel, const std::string &id);
std::string DebugString() const;
private:
/// Represents a caller's command to subscribe or unsubscribe to a given
/// channel.
@ -160,9 +162,11 @@ class GcsPubSub {
std::shared_ptr<RedisClient> redis_client_;
/// Mutex to protect the subscribe_callback_index_ field.
absl::Mutex mutex_;
mutable absl::Mutex mutex_;
absl::flat_hash_map<std::string, Channel> channels_ GUARDED_BY(mutex_);
size_t total_commands_queued_ GUARDED_BY(mutex_);
};
} // namespace gcs