[Core] fix tsan issue in GcsRpcClient (#25365)

`//:global_state_accessor_test` has been flaky under `tsan`.
<img width="695" alt="image" src="https://user-images.githubusercontent.com/81660174/171493283-f36150e7-e1e5-4f1f-80e8-125a2f2f4bab.png">

I'm able to reproduce the issue on Linux EC2. It seems `GcsRpcClient::timer_` can be used from both application main thread and GCS RPC client event loop. Added a lock to avoid the data race.
This commit is contained in:
mwtian 2022-06-01 17:57:17 -07:00 committed by GitHub
parent e45054c130
commit 8df5a24dbd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -463,10 +463,12 @@ class GcsRpcClient {
/*method_timeout_ms*/ -1, )
void Shutdown() {
bool is_shutdown = false;
if (!shutdown_.compare_exchange_strong(is_shutdown, true)) {
RAY_LOG(DEBUG) << "GCS client has already been shutdown.";
if (!shutdown_.exchange(true)) {
// First call to shut down this GCS RPC client.
absl::MutexLock lock(&timer_mu_);
timer_->cancel();
} else {
RAY_LOG(DEBUG) << "GCS RPC client has already shutdown.";
}
}
@ -481,6 +483,7 @@ class GcsRpcClient {
auto duration = boost::posix_time::milliseconds(
::RayConfig::instance()
.gcs_client_check_connection_status_interval_milliseconds());
absl::MutexLock lock(&timer_mu_);
timer_->expires_from_now(duration);
timer_->async_wait([this](boost::system::error_code error) {
if (error == boost::system::errc::success) {
@ -488,6 +491,7 @@ class GcsRpcClient {
}
});
}
void CheckChannelStatus(bool reset_timer = true) {
if (shutdown_) {
return;
@ -527,10 +531,16 @@ class GcsRpcClient {
}
SetupCheckTimer();
}
std::string gcs_address_;
int64_t gcs_port_;
instrumented_io_context *io_context_;
const std::string gcs_address_;
const int64_t gcs_port_;
instrumented_io_context *const io_context_;
// Timer can be called from either the GCS RPC event loop, or the application's
// main thread. It needs to be protected by a mutex.
absl::Mutex timer_mu_;
const std::unique_ptr<boost::asio::deadline_timer> timer_;
/// The gRPC-generated stub.
std::unique_ptr<GrpcClient<JobInfoGcsService>> job_info_grpc_client_;
@ -550,7 +560,6 @@ class GcsRpcClient {
absl::Time gcs_last_alive_time_ = absl::Now();
std::atomic<bool> shutdown_ = false;
std::unique_ptr<boost::asio::deadline_timer> timer_;
std::vector<Executor *> pending_requests_;
size_t pending_requests_bytes_ = 0;