[gcs/ha] Fix global_state_accessor_test to support redisless mode (#21729)

This PR enables global_state_accessor_test to support redisless mode so that we can enable the flag by default in the future.
This commit is contained in:
Yi Cheng 2022-01-20 15:32:49 -08:00 committed by GitHub
parent 9af8f11191
commit 9a88a60f6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -23,22 +23,47 @@
namespace ray {
class GlobalStateAccessorTest : public ::testing::Test {
class GlobalStateAccessorTest : public ::testing::TestWithParam<bool> {
public:
GlobalStateAccessorTest() { TestSetupUtil::StartUpRedisServers(std::vector<int>()); }
GlobalStateAccessorTest() {
if (GetParam()) {
RayConfig::instance().bootstrap_with_gcs() = true;
RayConfig::instance().gcs_storage() = "memory";
RayConfig::instance().gcs_grpc_based_pubsub() = true;
} else {
RayConfig::instance().bootstrap_with_gcs() = false;
RayConfig::instance().gcs_storage() = "redis";
RayConfig::instance().gcs_grpc_based_pubsub() = false;
}
virtual ~GlobalStateAccessorTest() { TestSetupUtil::ShutDownRedisServers(); }
if (!RayConfig::instance().bootstrap_with_gcs()) {
TestSetupUtil::StartUpRedisServers(std::vector<int>());
}
}
virtual ~GlobalStateAccessorTest() {
if (!RayConfig::instance().bootstrap_with_gcs()) {
TestSetupUtil::ShutDownRedisServers();
}
}
protected:
void SetUp() override {
RayConfig::instance().gcs_max_active_rpcs_per_handler() = -1;
config.grpc_server_port = 0;
if (RayConfig::instance().bootstrap_with_gcs()) {
config.grpc_server_port = 6379;
config.grpc_pubsub_enabled = true;
} else {
config.grpc_server_port = 0;
config.redis_address = "127.0.0.1";
config.enable_sharding_conn = false;
config.grpc_pubsub_enabled = false;
config.redis_port = TEST_REDIS_SERVER_PORTS.front();
}
config.node_ip_address = "127.0.0.1";
config.grpc_server_name = "MockedGcsServer";
config.grpc_server_thread_num = 1;
config.redis_address = "127.0.0.1";
config.node_ip_address = "127.0.0.1";
config.enable_sharding_conn = false;
config.redis_port = TEST_REDIS_SERVER_PORTS.front();
io_service_.reset(new instrumented_io_context());
gcs_server_.reset(new gcs::GcsServer(config, *io_service_));
@ -51,14 +76,19 @@ class GlobalStateAccessorTest : public ::testing::Test {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// Create GCS client.
gcs::GcsClientOptions options(config.redis_address, config.redis_port,
config.redis_password);
gcs_client_ = std::make_unique<gcs::GcsClient>(options);
// Create GCS client and global state.
if (RayConfig::instance().bootstrap_with_gcs()) {
gcs::GcsClientOptions options("127.0.0.1:6379");
gcs_client_ = std::make_unique<gcs::GcsClient>(options);
global_state_ = std::make_unique<gcs::GlobalStateAccessor>(options);
} else {
gcs::GcsClientOptions options(config.redis_address, config.redis_port,
config.redis_password);
gcs_client_ = std::make_unique<gcs::GcsClient>(options);
global_state_ = std::make_unique<gcs::GlobalStateAccessor>(options);
}
RAY_CHECK_OK(gcs_client_->Connect(*io_service_));
// Create global state.
global_state_ = std::make_unique<gcs::GlobalStateAccessor>(options);
RAY_CHECK(global_state_->Connect());
}
@ -70,7 +100,9 @@ class GlobalStateAccessorTest : public ::testing::Test {
gcs_client_.reset();
gcs_server_->Stop();
TestSetupUtil::FlushAllRedisServers();
if (!RayConfig::instance().bootstrap_with_gcs()) {
TestSetupUtil::FlushAllRedisServers();
}
io_service_->stop();
thread_io_service_->join();
@ -93,7 +125,7 @@ class GlobalStateAccessorTest : public ::testing::Test {
std::unique_ptr<boost::asio::io_service::work> work_;
};
TEST_F(GlobalStateAccessorTest, TestJobTable) {
TEST_P(GlobalStateAccessorTest, TestJobTable) {
int job_count = 100;
ASSERT_EQ(global_state_->GetAllJobInfo().size(), 0);
for (int index = 0; index < job_count; ++index) {
@ -107,7 +139,7 @@ TEST_F(GlobalStateAccessorTest, TestJobTable) {
ASSERT_EQ(global_state_->GetAllJobInfo().size(), job_count);
}
TEST_F(GlobalStateAccessorTest, TestNodeTable) {
TEST_P(GlobalStateAccessorTest, TestNodeTable) {
int node_count = 100;
ASSERT_EQ(global_state_->GetAllNodeInfo().size(), 0);
// It's useful to check if index value will be marked as address suffix.
@ -129,7 +161,7 @@ TEST_F(GlobalStateAccessorTest, TestNodeTable) {
}
}
TEST_F(GlobalStateAccessorTest, TestNodeResourceTable) {
TEST_P(GlobalStateAccessorTest, TestNodeResourceTable) {
int node_count = 100;
ASSERT_EQ(global_state_->GetAllNodeInfo().size(), 0);
for (int index = 0; index < node_count; ++index) {
@ -165,7 +197,7 @@ TEST_F(GlobalStateAccessorTest, TestNodeResourceTable) {
}
}
TEST_F(GlobalStateAccessorTest, TestGetAllResourceUsage) {
TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) {
std::unique_ptr<std::string> resources = global_state_->GetAllResourceUsage();
rpc::ResourceUsageBatchData resource_usage_batch_data;
resource_usage_batch_data.ParseFromString(*resources.get());
@ -235,7 +267,7 @@ TEST_F(GlobalStateAccessorTest, TestGetAllResourceUsage) {
ASSERT_EQ((*resources_data.mutable_resources_available())["GPU"], 5.0);
}
TEST_F(GlobalStateAccessorTest, TestProfileTable) {
TEST_P(GlobalStateAccessorTest, TestProfileTable) {
int profile_count = RayConfig::instance().maximum_profile_table_rows_count() + 1;
ASSERT_EQ(global_state_->GetAllProfileInfo().size(), 0);
for (int index = 0; index < profile_count; ++index) {
@ -251,7 +283,7 @@ TEST_F(GlobalStateAccessorTest, TestProfileTable) {
RayConfig::instance().maximum_profile_table_rows_count());
}
TEST_F(GlobalStateAccessorTest, TestWorkerTable) {
TEST_P(GlobalStateAccessorTest, TestWorkerTable) {
ASSERT_EQ(global_state_->GetAllWorkerInfo().size(), 0);
// Add worker info
auto worker_table_data = Mocker::GenWorkerTableData();
@ -272,10 +304,13 @@ TEST_F(GlobalStateAccessorTest, TestWorkerTable) {
}
// TODO(sang): Add tests after adding asyncAdd
TEST_F(GlobalStateAccessorTest, TestPlacementGroupTable) {
TEST_P(GlobalStateAccessorTest, TestPlacementGroupTable) {
ASSERT_EQ(global_state_->GetAllPlacementGroupInfo().size(), 0);
}
INSTANTIATE_TEST_SUITE_P(RedisRemovalTest, GlobalStateAccessorTest,
::testing::Values(false, true));
} // namespace ray
int main(int argc, char **argv) {