[Core] Add gRPC Server Backpressure Tests (#18500)

This commit is contained in:
Lixin Wei 2021-09-11 08:17:09 +08:00 committed by GitHub
parent 1587eb22f0
commit 7e37d6e348
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 23 deletions

View file

@ -17,11 +17,11 @@ syntax = "proto3";
package ray.rpc;
// Serivce that is only used to test GrpcServer
message SleepRequest {
int64 sleep_time_ms = 1;
message PingRequest {
bool no_reply = 1;
}
message SleepReply {
message PingReply {
}
service TestService {
rpc Sleep(SleepRequest) returns (SleepReply);
rpc Ping(PingRequest) returns (PingReply);
}

View file

@ -22,15 +22,19 @@ namespace ray {
namespace rpc {
class TestServiceHandler {
public:
void HandleSleep(const SleepRequest &request, SleepReply *reply,
SendReplyCallback send_reply_callback) {
RAY_LOG(INFO) << "Got sleep request, time=" << request.sleep_time_ms() << "ms";
void HandlePing(const PingRequest &request, PingReply *reply,
SendReplyCallback send_reply_callback) {
RAY_LOG(INFO) << "Got ping request, no_reply=" << request.no_reply();
request_count++;
while (frozen) {
RAY_LOG(INFO) << "Server is frozen...";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
RAY_LOG(INFO) << "Handling and replying request.";
std::this_thread::sleep_for(std::chrono::milliseconds(request.sleep_time_ms()));
if (request.no_reply()) {
RAY_LOG(INFO) << "No reply!";
return;
}
send_reply_callback(ray::Status::OK(),
/*reply_success=*/[]() { RAY_LOG(INFO) << "Reply success."; },
/*reply_failure=*/
@ -39,7 +43,7 @@ class TestServiceHandler {
reply_failure_count++;
});
}
std::atomic<int> request_count{0};
std::atomic<int> reply_failure_count{0};
std::atomic<bool> frozen{false};
};
@ -59,7 +63,7 @@ class TestGrpcService : public GrpcService {
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
RPC_SERVICE_HANDLER(TestService, Sleep, /*max_active_rpcs=*/1);
RPC_SERVICE_HANDLER(TestService, Ping, /*max_active_rpcs=*/1);
}
private:
@ -99,15 +103,16 @@ class TestGrpcServerClientFixture : public ::testing::Test {
*client_call_manager_));
}
void TearDown() {
// Cleanup stuffs.
void ShutdownClient() {
grpc_client_.reset();
client_call_manager_.reset();
client_io_service_.stop();
if (client_thread_->joinable()) {
client_thread_->join();
}
}
void ShutdownServer() {
grpc_server_->Shutdown();
handler_io_service_.stop();
if (handler_thread_->joinable()) {
@ -115,8 +120,14 @@ class TestGrpcServerClientFixture : public ::testing::Test {
}
}
void TearDown() {
// Cleanup stuffs.
ShutdownClient();
ShutdownServer();
}
protected:
VOID_RPC_CLIENT_METHOD(TestService, Sleep, grpc_client_, )
VOID_RPC_CLIENT_METHOD(TestService, Ping, grpc_client_, )
// Server
TestServiceHandler test_service_handler_;
instrumented_io_context handler_io_service_;
@ -134,10 +145,9 @@ class TestGrpcServerClientFixture : public ::testing::Test {
TEST_F(TestGrpcServerClientFixture, TestBasic) {
// Send request
SleepRequest request;
request.set_sleep_time_ms(1);
PingRequest request;
std::atomic<bool> done(false);
Sleep(request, [&done](const Status &status, const SleepReply &reply) {
Ping(request, [&done](const Status &status, const PingReply &reply) {
RAY_LOG(INFO) << "replied, status=" << status;
done = true;
});
@ -147,6 +157,27 @@ TEST_F(TestGrpcServerClientFixture, TestBasic) {
}
}
TEST_F(TestGrpcServerClientFixture, TestBackpressure) {
// Send a request which won't be replied to.
PingRequest request;
request.set_no_reply(true);
Ping(request, [](const Status &status, const PingReply &reply) {
FAIL() << "Should have no response.";
});
while (test_service_handler_.request_count <= 0) {
RAY_LOG(INFO) << "Waiting for request to arrive";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
// Send a normal request, this request will be blocked by backpressure since
// max_active_rpcs is 1.
request.set_no_reply(false);
Ping(request, [](const Status &status, const PingReply &reply) {
FAIL() << "Should have no response.";
});
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
ASSERT_EQ(test_service_handler_.request_count, 1);
}
TEST_F(TestGrpcServerClientFixture, TestClientCallManagerTimeout) {
// Reinit ClientCallManager with short timeout.
grpc_client_.reset();
@ -158,10 +189,9 @@ TEST_F(TestGrpcServerClientFixture, TestClientCallManagerTimeout) {
// Freeze server first, it won't reply any request.
test_service_handler_.frozen = true;
// Send request.
SleepRequest request;
request.set_sleep_time_ms(1);
PingRequest request;
std::atomic<bool> call_timed_out(false);
Sleep(request, [&call_timed_out](const Status &status, const SleepReply &reply) {
Ping(request, [&call_timed_out](const Status &status, const PingReply &reply) {
RAY_LOG(INFO) << "Replied, status=" << status;
ASSERT_TRUE(status.IsIOError());
call_timed_out = true;
@ -191,10 +221,9 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) {
// Freeze server first, it won't reply any request.
test_service_handler_.frozen = true;
// Send request.
SleepRequest request;
request.set_sleep_time_ms(1);
PingRequest request;
std::atomic<bool> call_timed_out(false);
Sleep(request, [&call_timed_out](const Status &status, const SleepReply &reply) {
Ping(request, [&call_timed_out](const Status &status, const PingReply &reply) {
RAY_LOG(INFO) << "Replied, status=" << status;
ASSERT_TRUE(status.IsIOError());
call_timed_out = true;
@ -220,7 +249,7 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) {
// Send again, this request should be replied. If any leaking happened, this call won't
// be replied to since the max_active_rpcs is 1.
std::atomic<bool> done(false);
Sleep(request, [&done](const Status &status, const SleepReply &reply) {
Ping(request, [&done](const Status &status, const PingReply &reply) {
RAY_LOG(INFO) << "replied, status=" << status;
done = true;
});