[Test] Try fixing a flaky gcs heartbeat manager test. (#27096)

Heartbeat manager starts its own thread to run its background task and that shares the same data structured used within HandleReportHeartbeat (heartbeats_). That said, both methods should run in the same thread. This achieves it by running HandleReportHeartbeat within the io_service thread
This commit is contained in:
SangBin Cho 2022-07-29 14:42:13 +09:00 committed by GitHub
parent 749d313dcd
commit c1ac2bb80f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 18 deletions

View file

@ -172,10 +172,8 @@ class NodeHead(dashboard_utils.DashboardHeadModule):
and self._node_update_cnt * FREQUENTY_UPDATE_NODES_INTERVAL_SECONDS
< FREQUENT_UPDATE_TIMEOUT_SECONDS
):
logger.info("SANG-TODO a")
await asyncio.sleep(FREQUENTY_UPDATE_NODES_INTERVAL_SECONDS)
else:
logger.info("SANG-TODO b")
if head_node_not_registered:
logger.warning(
"Head node is not registered even after "

View file

@ -16,6 +16,7 @@
#include <chrono>
#include "absl/synchronization/mutex.h"
#include "gtest/gtest.h"
using namespace ray;
@ -35,8 +36,11 @@ class GcsHeartbeatManagerTest : public ::testing::Test {
}
void SetUp() override {
heartbeat_manager = std::make_unique<GcsHeartbeatManager>(
io_service, [this](const NodeID &node_id) { dead_nodes.push_back(node_id); });
heartbeat_manager =
std::make_unique<GcsHeartbeatManager>(io_service, [this](const NodeID &node_id) {
absl::MutexLock lock(&mutex_);
dead_nodes.push_back(node_id);
});
heartbeat_manager->Start();
}
@ -50,7 +54,11 @@ class GcsHeartbeatManagerTest : public ::testing::Test {
instrumented_io_context io_service;
std::unique_ptr<GcsHeartbeatManager> heartbeat_manager;
std::vector<NodeID> dead_nodes;
mutable absl::Mutex mutex_;
// This field needs to be protected because it is accessed
// by a different thread created by `heartbeat_manager`.
std::vector<NodeID> dead_nodes GUARDED_BY(mutex_);
;
};
TEST_F(GcsHeartbeatManagerTest, TestBasicTimeout) {
@ -59,12 +67,16 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicTimeout) {
AddNode(node_1);
while (absl::Now() - start < absl::Seconds(1)) {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(dead_nodes.empty());
}
std::this_thread::sleep_for(2s);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
{
absl::MutexLock lock(&mutex_);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
}
}
TEST_F(GcsHeartbeatManagerTest, TestBasicReport) {
@ -72,13 +84,19 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicReport) {
auto start = absl::Now();
AddNode(node_1);
rpc::ReportHeartbeatReply reply;
rpc::ReportHeartbeatRequest request;
request.mutable_heartbeat()->set_node_id(node_1.Binary());
while (absl::Now() - start < absl::Seconds(3)) {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(dead_nodes.empty());
// std::function<void(ray::Status, std::function<void()>, std::function<void()>)>'
heartbeat_manager->HandleReportHeartbeat(request, &reply, [](auto, auto, auto) {});
io_service.post(
[&]() {
rpc::ReportHeartbeatReply reply;
rpc::ReportHeartbeatRequest request;
request.mutable_heartbeat()->set_node_id(node_1.Binary());
heartbeat_manager->HandleReportHeartbeat(
request, &reply, [](auto, auto, auto) {});
},
"HandleReportHeartbeat");
std::this_thread::sleep_for(0.1s);
}
}
@ -99,11 +117,15 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicRestart) {
heartbeat_manager->Initialize(init_data);
while (absl::Now() - start < absl::Seconds(3)) {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(dead_nodes.empty());
}
std::this_thread::sleep_for(2s);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
{
absl::MutexLock lock(&mutex_);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
}
}
TEST_F(GcsHeartbeatManagerTest, TestBasicRestart2) {
@ -122,18 +144,28 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicRestart2) {
heartbeat_manager->Initialize(init_data);
rpc::ReportHeartbeatReply reply;
rpc::ReportHeartbeatRequest request;
while (absl::Now() - start < absl::Seconds(1)) {
request.mutable_heartbeat()->set_node_id(node_1.Binary());
heartbeat_manager->HandleReportHeartbeat(request, &reply, [](auto, auto, auto) {});
io_service.post(
[&]() {
rpc::ReportHeartbeatReply reply;
rpc::ReportHeartbeatRequest request;
request.mutable_heartbeat()->set_node_id(node_1.Binary());
heartbeat_manager->HandleReportHeartbeat(
request, &reply, [](auto, auto, auto) {});
},
"HandleReportHeartbeat");
// Added a sleep to avoid io service overloaded.
std::this_thread::sleep_for(0.1s);
}
while (absl::Now() - start < absl::Seconds(1)) {
absl::MutexLock lock(&mutex_);
ASSERT_TRUE(dead_nodes.empty());
}
std::this_thread::sleep_for(2s);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
{
absl::MutexLock lock(&mutex_);
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
}
}