mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[Test] Try fixing a flaky gcs heartbeat manager test. (#27096)
Heartbeat manager starts its own thread to run its background task and that shares the same data structured used within HandleReportHeartbeat (heartbeats_). That said, both methods should run in the same thread. This achieves it by running HandleReportHeartbeat within the io_service thread
This commit is contained in:
parent
96f9b9506f
commit
1b1787a9aa
2 changed files with 48 additions and 18 deletions
|
@ -185,10 +185,8 @@ class NodeHead(dashboard_utils.DashboardHeadModule):
|
|||
and self._node_update_cnt * FREQUENTY_UPDATE_NODES_INTERVAL_SECONDS
|
||||
< FREQUENT_UPDATE_TIMEOUT_SECONDS
|
||||
):
|
||||
logger.info("SANG-TODO a")
|
||||
await asyncio.sleep(FREQUENTY_UPDATE_NODES_INTERVAL_SECONDS)
|
||||
else:
|
||||
logger.info("SANG-TODO b")
|
||||
if head_node_not_registered:
|
||||
logger.warning(
|
||||
"Head node is not registered even after "
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
#include <chrono>
|
||||
|
||||
#include "absl/synchronization/mutex.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
using namespace ray;
|
||||
|
@ -35,8 +36,11 @@ class GcsHeartbeatManagerTest : public ::testing::Test {
|
|||
}
|
||||
|
||||
void SetUp() override {
|
||||
heartbeat_manager = std::make_unique<GcsHeartbeatManager>(
|
||||
io_service, [this](const NodeID &node_id) { dead_nodes.push_back(node_id); });
|
||||
heartbeat_manager =
|
||||
std::make_unique<GcsHeartbeatManager>(io_service, [this](const NodeID &node_id) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
dead_nodes.push_back(node_id);
|
||||
});
|
||||
heartbeat_manager->Start();
|
||||
}
|
||||
|
||||
|
@ -50,7 +54,11 @@ class GcsHeartbeatManagerTest : public ::testing::Test {
|
|||
|
||||
instrumented_io_context io_service;
|
||||
std::unique_ptr<GcsHeartbeatManager> heartbeat_manager;
|
||||
std::vector<NodeID> dead_nodes;
|
||||
mutable absl::Mutex mutex_;
|
||||
// This field needs to be protected because it is accessed
|
||||
// by a different thread created by `heartbeat_manager`.
|
||||
std::vector<NodeID> dead_nodes GUARDED_BY(mutex_);
|
||||
;
|
||||
};
|
||||
|
||||
TEST_F(GcsHeartbeatManagerTest, TestBasicTimeout) {
|
||||
|
@ -59,12 +67,16 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicTimeout) {
|
|||
AddNode(node_1);
|
||||
|
||||
while (absl::Now() - start < absl::Seconds(1)) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
ASSERT_TRUE(dead_nodes.empty());
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(2s);
|
||||
|
||||
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
|
||||
{
|
||||
absl::MutexLock lock(&mutex_);
|
||||
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(GcsHeartbeatManagerTest, TestBasicReport) {
|
||||
|
@ -72,13 +84,19 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicReport) {
|
|||
auto start = absl::Now();
|
||||
AddNode(node_1);
|
||||
|
||||
rpc::ReportHeartbeatReply reply;
|
||||
rpc::ReportHeartbeatRequest request;
|
||||
request.mutable_heartbeat()->set_node_id(node_1.Binary());
|
||||
while (absl::Now() - start < absl::Seconds(3)) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
ASSERT_TRUE(dead_nodes.empty());
|
||||
// std::function<void(ray::Status, std::function<void()>, std::function<void()>)>'
|
||||
heartbeat_manager->HandleReportHeartbeat(request, &reply, [](auto, auto, auto) {});
|
||||
io_service.post(
|
||||
[&]() {
|
||||
rpc::ReportHeartbeatReply reply;
|
||||
rpc::ReportHeartbeatRequest request;
|
||||
request.mutable_heartbeat()->set_node_id(node_1.Binary());
|
||||
heartbeat_manager->HandleReportHeartbeat(
|
||||
request, &reply, [](auto, auto, auto) {});
|
||||
},
|
||||
"HandleReportHeartbeat");
|
||||
std::this_thread::sleep_for(0.1s);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,11 +117,15 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicRestart) {
|
|||
heartbeat_manager->Initialize(init_data);
|
||||
|
||||
while (absl::Now() - start < absl::Seconds(3)) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
ASSERT_TRUE(dead_nodes.empty());
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(2s);
|
||||
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
|
||||
{
|
||||
absl::MutexLock lock(&mutex_);
|
||||
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(GcsHeartbeatManagerTest, TestBasicRestart2) {
|
||||
|
@ -122,18 +144,28 @@ TEST_F(GcsHeartbeatManagerTest, TestBasicRestart2) {
|
|||
|
||||
heartbeat_manager->Initialize(init_data);
|
||||
|
||||
rpc::ReportHeartbeatReply reply;
|
||||
rpc::ReportHeartbeatRequest request;
|
||||
|
||||
while (absl::Now() - start < absl::Seconds(1)) {
|
||||
request.mutable_heartbeat()->set_node_id(node_1.Binary());
|
||||
heartbeat_manager->HandleReportHeartbeat(request, &reply, [](auto, auto, auto) {});
|
||||
io_service.post(
|
||||
[&]() {
|
||||
rpc::ReportHeartbeatReply reply;
|
||||
rpc::ReportHeartbeatRequest request;
|
||||
request.mutable_heartbeat()->set_node_id(node_1.Binary());
|
||||
heartbeat_manager->HandleReportHeartbeat(
|
||||
request, &reply, [](auto, auto, auto) {});
|
||||
},
|
||||
"HandleReportHeartbeat");
|
||||
// Added a sleep to avoid io service overloaded.
|
||||
std::this_thread::sleep_for(0.1s);
|
||||
}
|
||||
|
||||
while (absl::Now() - start < absl::Seconds(1)) {
|
||||
absl::MutexLock lock(&mutex_);
|
||||
ASSERT_TRUE(dead_nodes.empty());
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(2s);
|
||||
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
|
||||
{
|
||||
absl::MutexLock lock(&mutex_);
|
||||
ASSERT_EQ(std::vector<NodeID>{node_1}, dead_nodes);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue