mirror of
https://github.com/vale981/ray
synced 2025-03-08 11:31:40 -05:00
[GCS]Use an asynchronous PING to avoid blocking other operations (#9871)
* Use separate redis client to avoid its sync command blocking other operations * use redis_failure_detector_client_ * use async command to ping redis * format log
This commit is contained in:
parent
12d75784a4
commit
1760586628
2 changed files with 15 additions and 11 deletions
|
@ -16,10 +16,6 @@
|
||||||
|
|
||||||
#include "ray/common/ray_config.h"
|
#include "ray/common/ray_config.h"
|
||||||
|
|
||||||
extern "C" {
|
|
||||||
#include "hiredis/hiredis.h"
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace ray {
|
namespace ray {
|
||||||
namespace gcs {
|
namespace gcs {
|
||||||
|
|
||||||
|
@ -36,13 +32,18 @@ void GcsRedisFailureDetector::Start() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void GcsRedisFailureDetector::DetectRedis() {
|
void GcsRedisFailureDetector::DetectRedis() {
|
||||||
auto *reply = reinterpret_cast<redisReply *>(
|
auto redis_callback = [this](const std::shared_ptr<CallbackReply> &reply) {
|
||||||
redisCommand(redis_context_->sync_context(), "PING"));
|
if (reply->IsNil()) {
|
||||||
if (reply == nullptr || reply->type == REDIS_REPLY_NIL) {
|
RAY_LOG(ERROR) << "Redis is inactive.";
|
||||||
RAY_LOG(ERROR) << "Redis is inactive.";
|
callback_();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Status status = redis_context_->RunArgvAsync({"PING"}, redis_callback);
|
||||||
|
|
||||||
|
if (!status.ok()) {
|
||||||
|
RAY_LOG(ERROR) << "Redis is disconnected.";
|
||||||
callback_();
|
callback_();
|
||||||
} else {
|
|
||||||
freeReplyObject(reply);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ Status RedisAsyncContext::RedisAsyncCommand(redisCallbackFn *fn, void *privdata,
|
||||||
// `redisvAsyncCommand` will mutate `redis_async_context_`, use a lock to protect it.
|
// `redisvAsyncCommand` will mutate `redis_async_context_`, use a lock to protect it.
|
||||||
std::lock_guard<std::mutex> lock(mutex_);
|
std::lock_guard<std::mutex> lock(mutex_);
|
||||||
if (!redis_async_context_) {
|
if (!redis_async_context_) {
|
||||||
return Status::NotImplemented("...");
|
return Status::Disconnected("Redis is disconnected");
|
||||||
}
|
}
|
||||||
ret_code = redisvAsyncCommand(redis_async_context_, fn, privdata, format, ap);
|
ret_code = redisvAsyncCommand(redis_async_context_, fn, privdata, format, ap);
|
||||||
}
|
}
|
||||||
|
@ -97,6 +97,9 @@ Status RedisAsyncContext::RedisAsyncCommandArgv(redisCallbackFn *fn, void *privd
|
||||||
// `redisAsyncCommandArgv` will mutate `redis_async_context_`, use a lock to protect
|
// `redisAsyncCommandArgv` will mutate `redis_async_context_`, use a lock to protect
|
||||||
// it.
|
// it.
|
||||||
std::lock_guard<std::mutex> lock(mutex_);
|
std::lock_guard<std::mutex> lock(mutex_);
|
||||||
|
if (!redis_async_context_) {
|
||||||
|
return Status::Disconnected("Redis is disconnected");
|
||||||
|
}
|
||||||
ret_code =
|
ret_code =
|
||||||
redisAsyncCommandArgv(redis_async_context_, fn, privdata, argc, argv, argvlen);
|
redisAsyncCommandArgv(redis_async_context_, fn, privdata, argc, argv, argvlen);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue