Retry connections to redis for async and subscribe contexts (#3105)

This is fixing a problem that @devin-petersohn observed on the windows subsystem for linux.

In theory, redis should be up once the async connect is happening and there should be no retries needed for the async connect. However on the windows subsystem for linux, the async connect was failing even though the synchronous one was working. Maybe windows has a different semantics here than linux.
This commit is contained in:
Philipp Moritz 2018-10-22 22:31:13 -07:00 committed by Robert Nishihara
parent eff7cb4458
commit 8d8b6e5bfa

View file

@ -157,27 +157,35 @@ Status AuthenticateRedis(redisAsyncContext *context, const std::string &password
return Status::OK();
}
Status RedisContext::Connect(const std::string &address, int port, bool sharding,
const std::string &password = "") {
template <typename RedisContext, typename RedisConnectFunction>
Status ConnectWithRetries(const std::string &address, int port,
const RedisConnectFunction &connect_function,
RedisContext **context) {
int connection_attempts = 0;
context_ = redisConnect(address.c_str(), port);
while (context_ == nullptr || context_->err) {
*context = connect_function(address.c_str(), port);
while (*context == nullptr || (*context)->err) {
if (connection_attempts >= RayConfig::instance().redis_db_connect_retries()) {
if (context_ == nullptr) {
if (*context == nullptr) {
RAY_LOG(FATAL) << "Could not allocate redis context.";
}
if (context_->err) {
if ((*context)->err) {
RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":"
<< port;
<< port << " (context.err = " << (*context)->err << ")";
}
break;
}
RAY_LOG(WARNING) << "Failed to connect to Redis, retrying.";
// Sleep for a little.
usleep(RayConfig::instance().redis_db_connect_wait_milliseconds() * 1000);
context_ = redisConnect(address.c_str(), port);
*context = connect_function(address.c_str(), port);
connection_attempts += 1;
}
return Status::OK();
}
Status RedisContext::Connect(const std::string &address, int port, bool sharding,
const std::string &password = "") {
RAY_CHECK_OK(ConnectWithRetries(address, port, redisConnect, &context_));
RAY_CHECK_OK(AuthenticateRedis(context_, password));
redisReply *reply = reinterpret_cast<redisReply *>(
@ -186,19 +194,11 @@ Status RedisContext::Connect(const std::string &address, int port, bool sharding
freeReplyObject(reply);
// Connect to async context
async_context_ = redisAsyncConnect(address.c_str(), port);
if (async_context_ == nullptr || async_context_->err) {
RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":"
<< port;
}
RAY_CHECK_OK(ConnectWithRetries(address, port, redisAsyncConnect, &async_context_));
RAY_CHECK_OK(AuthenticateRedis(async_context_, password));
// Connect to subscribe context
subscribe_context_ = redisAsyncConnect(address.c_str(), port);
if (subscribe_context_ == nullptr || subscribe_context_->err) {
RAY_LOG(FATAL) << "Could not establish subscribe connection to redis " << address
<< ":" << port;
}
RAY_CHECK_OK(ConnectWithRetries(address, port, redisAsyncConnect, &subscribe_context_));
RAY_CHECK_OK(AuthenticateRedis(subscribe_context_, password));
return Status::OK();