diff --git a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc index 2fd1179b1..25f8b60ff 100644 --- a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc +++ b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc @@ -16,10 +16,6 @@ #include "ray/common/ray_config.h" -extern "C" { -#include "hiredis/hiredis.h" -} - namespace ray { namespace gcs { @@ -36,13 +32,18 @@ void GcsRedisFailureDetector::Start() { } void GcsRedisFailureDetector::DetectRedis() { - auto *reply = reinterpret_cast( - redisCommand(redis_context_->sync_context(), "PING")); - if (reply == nullptr || reply->type == REDIS_REPLY_NIL) { - RAY_LOG(ERROR) << "Redis is inactive."; + auto redis_callback = [this](const std::shared_ptr &reply) { + if (reply->IsNil()) { + RAY_LOG(ERROR) << "Redis is inactive."; + callback_(); + } + }; + + Status status = redis_context_->RunArgvAsync({"PING"}, redis_callback); + + if (!status.ok()) { + RAY_LOG(ERROR) << "Redis is disconnected."; callback_(); - } else { - freeReplyObject(reply); } } diff --git a/src/ray/gcs/redis_async_context.cc b/src/ray/gcs/redis_async_context.cc index 33f388684..42662cf5b 100644 --- a/src/ray/gcs/redis_async_context.cc +++ b/src/ray/gcs/redis_async_context.cc @@ -75,7 +75,7 @@ Status RedisAsyncContext::RedisAsyncCommand(redisCallbackFn *fn, void *privdata, // `redisvAsyncCommand` will mutate `redis_async_context_`, use a lock to protect it. std::lock_guard lock(mutex_); if (!redis_async_context_) { - return Status::NotImplemented("..."); + return Status::Disconnected("Redis is disconnected"); } ret_code = redisvAsyncCommand(redis_async_context_, fn, privdata, format, ap); } @@ -97,6 +97,9 @@ Status RedisAsyncContext::RedisAsyncCommandArgv(redisCallbackFn *fn, void *privd // `redisAsyncCommandArgv` will mutate `redis_async_context_`, use a lock to protect // it. std::lock_guard lock(mutex_); + if (!redis_async_context_) { + return Status::Disconnected("Redis is disconnected"); + } ret_code = redisAsyncCommandArgv(redis_async_context_, fn, privdata, argc, argv, argvlen); }