mirror of
https://github.com/vale981/ray
synced 2025-03-08 19:41:38 -05:00
Fix memory leak in Redis by using auto memory management (#4054)
* Table appends should always succeed * Use Redis auto memory management * Remove unneeded namespace
This commit is contained in:
parent
0c0bd4d41c
commit
3684e5bc0d
2 changed files with 31 additions and 12 deletions
|
@ -39,6 +39,20 @@ extern RedisChainModule module;
|
|||
} \
|
||||
}
|
||||
|
||||
// Wrap a Redis command with automatic memory management.
|
||||
#define AUTO_MEMORY(FUNC) \
|
||||
int FUNC(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { \
|
||||
RedisModule_AutoMemory(ctx); \
|
||||
return internal_redis_commands::FUNC(ctx, argv, argc); \
|
||||
}
|
||||
|
||||
// Commands in this namespace should not be used directly. They should first be
|
||||
// wrapped with AUTO_MEMORY in the global namespace to enable automatic memory
|
||||
// management.
|
||||
// TODO(swang): Ideally, we would make the commands that don't have auto memory
|
||||
// management inaccessible instead of just using a separate namespace.
|
||||
namespace internal_redis_commands {
|
||||
|
||||
/// Map from pub sub channel to clients that are waiting on that channel.
|
||||
std::unordered_map<std::string, std::vector<std::string>> notification_map;
|
||||
|
||||
|
@ -272,14 +286,12 @@ int TableAdd_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||
/// \param data The data to insert at the key.
|
||||
/// \return The current value at the key, or OK if there is no value.
|
||||
int TableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
TableAdd_DoWrite(ctx, argv, argc, /*mutated_key_str=*/nullptr);
|
||||
return TableAdd_DoPublish(ctx, argv, argc);
|
||||
}
|
||||
|
||||
#if RAY_USE_NEW_GCS
|
||||
int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
return module.ChainReplicate(ctx, argv, argc, /*node_func=*/TableAdd_DoWrite,
|
||||
/*tail_func=*/TableAdd_DoPublish);
|
||||
}
|
||||
|
@ -388,7 +400,6 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
|
|||
#if RAY_USE_NEW_GCS
|
||||
int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
return module.ChainReplicate(ctx, argv, argc,
|
||||
/*node_func=*/TableAppend_DoWrite,
|
||||
/*tail_func=*/TableAppend_DoPublish);
|
||||
|
@ -471,8 +482,6 @@ Status TableEntryToFlatbuf(RedisModuleCtx *ctx, RedisModuleKey *table_key,
|
|||
/// \return nil if the key is empty, the current value if the key type is a
|
||||
/// string, or an array of the current values if the key type is a set.
|
||||
int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc < 4) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -515,8 +524,6 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
|
|||
/// string, or an array of the current values if the key type is a set.
|
||||
int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 5) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -569,8 +576,6 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleStrin
|
|||
/// \return OK.
|
||||
int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc < 5) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -620,8 +625,6 @@ Status is_nil(bool *out, const std::string &data) {
|
|||
// Be careful, this only supports Task Table payloads.
|
||||
int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 5) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -675,7 +678,6 @@ std::string DebugString() {
|
|||
|
||||
int DebugString_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 1) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
@ -683,6 +685,20 @@ int DebugString_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
|
|||
std::string debug_string = DebugString();
|
||||
return RedisModule_ReplyWithStringBuffer(ctx, debug_string.data(), debug_string.size());
|
||||
}
|
||||
};
|
||||
|
||||
// Wrap all Redis commands with Redis' auto memory management.
|
||||
AUTO_MEMORY(TableAdd_RedisCommand);
|
||||
AUTO_MEMORY(TableAppend_RedisCommand);
|
||||
AUTO_MEMORY(TableLookup_RedisCommand);
|
||||
AUTO_MEMORY(TableRequestNotifications_RedisCommand);
|
||||
AUTO_MEMORY(TableCancelNotifications_RedisCommand);
|
||||
AUTO_MEMORY(TableTestAndUpdate_RedisCommand);
|
||||
AUTO_MEMORY(DebugString_RedisCommand);
|
||||
#if RAY_USE_NEW_GCS
|
||||
AUTO_MEMORY(ChainTableAdd_RedisCommand);
|
||||
AUTO_MEMORY(ChainTableAppend_RedisCommand);
|
||||
#endif
|
||||
|
||||
extern "C" {
|
||||
|
||||
|
|
|
@ -42,6 +42,9 @@ Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
|
|||
std::shared_ptr<DataT> &dataT, const WriteCallback &done) {
|
||||
num_appends_++;
|
||||
auto callback = [this, id, dataT, done](const std::string &data) {
|
||||
// If data is not empty, then Redis failed to append the entry.
|
||||
RAY_CHECK(data.empty()) << "TABLE_APPEND command failed: " << data;
|
||||
|
||||
if (done != nullptr) {
|
||||
(done)(client_, id, *dataT);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue