From 8df5a24dbd3eb92ced7ac84430e3ed95998b7bf6 Mon Sep 17 00:00:00 2001
From: mwtian <81660174+mwtian@users.noreply.github.com>
Date: Wed, 1 Jun 2022 17:57:17 -0700
Subject: [PATCH] [Core] fix tsan issue in `GcsRpcClient` (#25365)
`//:global_state_accessor_test` has been flaky under `tsan`.
I'm able to reproduce the issue on Linux EC2. It seems `GcsRpcClient::timer_` can be used from both application main thread and GCS RPC client event loop. Added a lock to avoid the data race.
---
src/ray/rpc/gcs_server/gcs_rpc_client.h | 23 ++++++++++++++++-------
1 file changed, 16 insertions(+), 7 deletions(-)
diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h
index 5af87724c..cda0e6b7d 100644
--- a/src/ray/rpc/gcs_server/gcs_rpc_client.h
+++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h
@@ -463,10 +463,12 @@ class GcsRpcClient {
/*method_timeout_ms*/ -1, )
void Shutdown() {
- bool is_shutdown = false;
- if (!shutdown_.compare_exchange_strong(is_shutdown, true)) {
- RAY_LOG(DEBUG) << "GCS client has already been shutdown.";
+ if (!shutdown_.exchange(true)) {
+ // First call to shut down this GCS RPC client.
+ absl::MutexLock lock(&timer_mu_);
timer_->cancel();
+ } else {
+ RAY_LOG(DEBUG) << "GCS RPC client has already shutdown.";
}
}
@@ -481,6 +483,7 @@ class GcsRpcClient {
auto duration = boost::posix_time::milliseconds(
::RayConfig::instance()
.gcs_client_check_connection_status_interval_milliseconds());
+ absl::MutexLock lock(&timer_mu_);
timer_->expires_from_now(duration);
timer_->async_wait([this](boost::system::error_code error) {
if (error == boost::system::errc::success) {
@@ -488,6 +491,7 @@ class GcsRpcClient {
}
});
}
+
void CheckChannelStatus(bool reset_timer = true) {
if (shutdown_) {
return;
@@ -527,10 +531,16 @@ class GcsRpcClient {
}
SetupCheckTimer();
}
- std::string gcs_address_;
- int64_t gcs_port_;
- instrumented_io_context *io_context_;
+ const std::string gcs_address_;
+ const int64_t gcs_port_;
+
+ instrumented_io_context *const io_context_;
+
+ // Timer can be called from either the GCS RPC event loop, or the application's
+ // main thread. It needs to be protected by a mutex.
+ absl::Mutex timer_mu_;
+ const std::unique_ptr timer_;
/// The gRPC-generated stub.
std::unique_ptr> job_info_grpc_client_;
@@ -550,7 +560,6 @@ class GcsRpcClient {
absl::Time gcs_last_alive_time_ = absl::Now();
std::atomic shutdown_ = false;
- std::unique_ptr timer_;
std::vector pending_requests_;
size_t pending_requests_bytes_ = 0;