mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Use automatic memory management in Redis modules. (#1797)
This commit is contained in:
parent
317c9450e7
commit
9b9ff19dd0
1 changed files with 38 additions and 101 deletions
|
@ -93,7 +93,6 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
|
|||
RedisString_Format(ctx, "%s%S", prefix, keyname);
|
||||
RedisModuleKey *key =
|
||||
(RedisModuleKey *) RedisModule_OpenKey(ctx, prefixed_keyname, mode);
|
||||
RedisModule_FreeString(ctx, prefixed_keyname);
|
||||
return key;
|
||||
}
|
||||
|
||||
|
@ -125,8 +124,6 @@ RedisModuleKey *OpenBroadcastKey(RedisModuleCtx *ctx,
|
|||
RedisString_Format(ctx, "BCAST:%S", channel);
|
||||
RedisModuleKey *key =
|
||||
(RedisModuleKey *) RedisModule_OpenKey(ctx, prefixed_keyname, mode);
|
||||
RedisModule_FreeString(ctx, prefixed_keyname);
|
||||
RedisModule_FreeString(ctx, channel);
|
||||
return key;
|
||||
}
|
||||
|
||||
|
@ -197,8 +194,6 @@ bool PublishDBClientNotification(RedisModuleCtx *ctx,
|
|||
/* Publish the client info on the db client channel. */
|
||||
RedisModuleCallReply *reply;
|
||||
reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name, client_info);
|
||||
RedisModule_FreeString(ctx, channel_name);
|
||||
RedisModule_FreeString(ctx, client_info);
|
||||
return (reply != NULL);
|
||||
}
|
||||
|
||||
|
@ -225,6 +220,8 @@ bool PublishDBClientNotification(RedisModuleCtx *ctx,
|
|||
int Connect_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc < 4) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -265,9 +262,6 @@ int Connect_RedisCommand(RedisModuleCtx *ctx,
|
|||
}
|
||||
}
|
||||
/* Clean up. */
|
||||
RedisModule_FreeString(ctx, deleted);
|
||||
RedisModule_FreeString(ctx, manager_address_key);
|
||||
RedisModule_CloseKey(db_client_table_key);
|
||||
if (!PublishDBClientNotification(ctx, ray_client_id, client_type,
|
||||
manager_address, true)) {
|
||||
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
|
||||
|
@ -292,6 +286,8 @@ int Connect_RedisCommand(RedisModuleCtx *ctx,
|
|||
int Disconnect_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 2) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -307,9 +303,7 @@ int Disconnect_RedisCommand(RedisModuleCtx *ctx,
|
|||
&deleted_string, NULL);
|
||||
long long deleted;
|
||||
int parsed = RedisModule_StringToLongLong(deleted_string, &deleted);
|
||||
RedisModule_FreeString(ctx, deleted_string);
|
||||
if (parsed != REDISMODULE_OK) {
|
||||
RedisModule_CloseKey(db_client_table_key);
|
||||
return RedisModule_ReplyWithError(ctx, "Unable to parse deleted field");
|
||||
}
|
||||
|
||||
|
@ -320,7 +314,6 @@ int Disconnect_RedisCommand(RedisModuleCtx *ctx,
|
|||
RedisModule_CreateString(ctx, "1", strlen("1"));
|
||||
RedisModule_HashSet(db_client_table_key, REDISMODULE_HASH_CFIELDS,
|
||||
"deleted", deleted, NULL);
|
||||
RedisModule_FreeString(ctx, deleted);
|
||||
|
||||
RedisModuleString *client_type;
|
||||
RedisModuleString *manager_address;
|
||||
|
@ -331,13 +324,7 @@ int Disconnect_RedisCommand(RedisModuleCtx *ctx,
|
|||
/* Publish the deletion notification on the db client channel. */
|
||||
published = PublishDBClientNotification(ctx, ray_client_id, client_type,
|
||||
manager_address, false);
|
||||
if (manager_address != NULL) {
|
||||
RedisModule_FreeString(ctx, manager_address);
|
||||
}
|
||||
RedisModule_FreeString(ctx, client_type);
|
||||
}
|
||||
|
||||
RedisModule_CloseKey(db_client_table_key);
|
||||
|
||||
if (!published) {
|
||||
/* Return an error message if we weren't able to publish the deletion
|
||||
|
@ -364,6 +351,8 @@ int Disconnect_RedisCommand(RedisModuleCtx *ctx,
|
|||
int ObjectTableLookup_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 2) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -394,9 +383,6 @@ int ObjectTableLookup_RedisCommand(RedisModuleCtx *ctx,
|
|||
} while (RedisModule_ZsetRangeNext(key));
|
||||
RedisModule_ReplySetArrayLength(ctx, num_results);
|
||||
|
||||
/* Clean up. */
|
||||
RedisModule_CloseKey(key);
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
|
@ -450,8 +436,6 @@ bool PublishObjectNotification(RedisModuleCtx *ctx,
|
|||
|
||||
RedisModuleCallReply *reply;
|
||||
reply = RedisModule_Call(ctx, "PUBLISH", "ss", channel_name, payload);
|
||||
RedisModule_FreeString(ctx, channel_name);
|
||||
RedisModule_FreeString(ctx, payload);
|
||||
if (reply == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
@ -500,8 +484,6 @@ int TaskTableAdd(RedisModuleCtx *ctx,
|
|||
RAY_CHECK(num_clients <= 1) << "Published to " << num_clients
|
||||
<< " clients.";
|
||||
|
||||
RedisModule_FreeString(ctx, publish_message);
|
||||
RedisModule_FreeString(ctx, publish_topic);
|
||||
}
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
@ -553,12 +535,10 @@ int PublishTableAdd(RedisModuleCtx *ctx,
|
|||
RedisModule_Call(ctx, "PUBLISH", "sb", client_channel,
|
||||
fbb.GetBufferPointer(), fbb.GetSize());
|
||||
if (reply == NULL) {
|
||||
RedisModule_CloseKey(notification_key);
|
||||
return RedisModule_ReplyWithError(ctx, "error during PUBLISH");
|
||||
}
|
||||
}
|
||||
}
|
||||
RedisModule_CloseKey(notification_key);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
|
@ -580,6 +560,8 @@ int PublishTableAdd(RedisModuleCtx *ctx,
|
|||
int TableAdd_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 5) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -593,7 +575,6 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx,
|
|||
RedisModuleKey *key = OpenPrefixedKey(ctx, prefix_str, id,
|
||||
REDISMODULE_READ | REDISMODULE_WRITE);
|
||||
RedisModule_StringSet(key, data);
|
||||
RedisModule_CloseKey(key);
|
||||
|
||||
// Publish a message on the requested pubsub channel if necessary.
|
||||
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
|
||||
|
@ -633,6 +614,8 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx,
|
|||
int TableAppend_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc < 5 || argc > 6) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -671,7 +654,6 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx,
|
|||
// necessary since we implement the log with a sorted set, so all entries
|
||||
// must be unique, or else we will have gaps in the log.
|
||||
RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry";
|
||||
RedisModule_CloseKey(key);
|
||||
// Publish a message on the requested pubsub channel if necessary.
|
||||
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
|
||||
if (pubsub_channel != TablePubsub_NO_PUBLISH) {
|
||||
|
@ -684,7 +666,6 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx,
|
|||
} else {
|
||||
// The requested index did not match the current length of the log. Return
|
||||
// an error message as a string.
|
||||
RedisModule_CloseKey(key);
|
||||
const char *reply = "ERR entry exists";
|
||||
return RedisModule_ReplyWithStringBuffer(ctx, reply, strlen(reply));
|
||||
}
|
||||
|
@ -743,6 +724,8 @@ void TableEntryToFlatbuf(RedisModuleKey *table_key,
|
|||
int TableLookup_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc < 4) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -763,7 +746,6 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx,
|
|||
ctx, reinterpret_cast<const char *>(fbb.GetBufferPointer()),
|
||||
fbb.GetSize());
|
||||
}
|
||||
RedisModule_CloseKey(table_key);
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -788,6 +770,8 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx,
|
|||
int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 5) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -805,7 +789,6 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
|
|||
ctx, pubsub_channel_str, id, REDISMODULE_READ | REDISMODULE_WRITE);
|
||||
CHECK_ERROR(RedisModule_ZsetAdd(notification_key, 0.0, client_channel, NULL),
|
||||
"ZsetAdd failed.");
|
||||
RedisModule_CloseKey(notification_key);
|
||||
|
||||
// Lookup the current value at the key.
|
||||
RedisModuleKey *table_key =
|
||||
|
@ -819,9 +802,7 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
|
|||
reinterpret_cast<const char *>(fbb.GetBufferPointer()),
|
||||
fbb.GetSize());
|
||||
}
|
||||
RedisModule_CloseKey(table_key);
|
||||
|
||||
RedisModule_FreeString(ctx, client_channel);
|
||||
return RedisModule_ReplyWithNull(ctx);
|
||||
}
|
||||
|
||||
|
@ -844,6 +825,8 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
|
|||
int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc < 5) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -862,7 +845,6 @@ int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx,
|
|||
RAY_CHECK(RedisModule_ZsetRem(notification_key, client_channel, NULL) ==
|
||||
REDISMODULE_OK);
|
||||
}
|
||||
RedisModule_CloseKey(notification_key);
|
||||
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
|
@ -885,6 +867,8 @@ bool is_nil(const std::string &data) {
|
|||
int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 5) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -921,8 +905,6 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
|
|||
|
||||
int result = RedisModule_ReplyWithStringBuffer(ctx, value_buf, value_len);
|
||||
|
||||
RedisModule_CloseKey(key);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -945,6 +927,8 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
|
|||
int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 5) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -977,14 +961,12 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
|||
/* Check whether the new hash value matches the old one. If not, we will
|
||||
* later return the "hash mismatch" error. */
|
||||
hash_mismatch = (RedisModule_StringCompare(existing_hash, new_hash) != 0);
|
||||
RedisModule_FreeString(ctx, existing_hash);
|
||||
}
|
||||
}
|
||||
|
||||
RedisModule_HashSet(key, REDISMODULE_HASH_CFIELDS, "hash", new_hash, NULL);
|
||||
RedisModule_HashSet(key, REDISMODULE_HASH_CFIELDS, "data_size", data_size,
|
||||
NULL);
|
||||
RedisModule_CloseKey(key);
|
||||
|
||||
/* Add the location in the object location table. */
|
||||
RedisModuleKey *table_key;
|
||||
|
@ -1002,7 +984,6 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
|||
/* The publish failed somehow. */
|
||||
return RedisModule_ReplyWithError(ctx, "PUBLISH BCAST unsuccessful");
|
||||
}
|
||||
RedisModule_FreeString(ctx, bcast_client_str);
|
||||
|
||||
/* Get the zset of clients that requested a notification about the
|
||||
* availability of this object. */
|
||||
|
@ -1030,7 +1011,6 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
|||
data_size, table_key);
|
||||
if (!success) {
|
||||
/* The publish failed somehow. */
|
||||
RedisModule_CloseKey(object_notification_key);
|
||||
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
|
||||
}
|
||||
} while (RedisModule_ZsetRangeNext(object_notification_key));
|
||||
|
@ -1038,10 +1018,8 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
|||
* waiting for notifications. */
|
||||
CHECK_ERROR(RedisModule_DeleteKey(object_notification_key),
|
||||
"Unable to delete zset key.");
|
||||
RedisModule_CloseKey(object_notification_key);
|
||||
}
|
||||
|
||||
RedisModule_CloseKey(table_key);
|
||||
if (hash_mismatch) {
|
||||
return RedisModule_ReplyWithError(ctx, "hash mismatch");
|
||||
} else {
|
||||
|
@ -1068,6 +1046,8 @@ int ObjectTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
|||
int ObjectTableRemove_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 3) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -1080,12 +1060,10 @@ int ObjectTableRemove_RedisCommand(RedisModuleCtx *ctx,
|
|||
table_key = OpenPrefixedKey(ctx, OBJECT_LOCATION_PREFIX, object_id,
|
||||
REDISMODULE_READ | REDISMODULE_WRITE);
|
||||
if (RedisModule_KeyType(table_key) == REDISMODULE_KEYTYPE_EMPTY) {
|
||||
RedisModule_CloseKey(table_key);
|
||||
return RedisModule_ReplyWithError(ctx, "object not found");
|
||||
}
|
||||
|
||||
RedisModule_ZsetRem(table_key, manager, NULL);
|
||||
RedisModule_CloseKey(table_key);
|
||||
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
|
@ -1112,6 +1090,8 @@ int ObjectTableRemove_RedisCommand(RedisModuleCtx *ctx,
|
|||
int ObjectTableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc < 3) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -1136,7 +1116,6 @@ int ObjectTableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
|
|||
CHECK_ERROR(
|
||||
RedisModule_ZsetAdd(object_notification_key, 0.0, client_id, NULL),
|
||||
"ZsetAdd failed.");
|
||||
RedisModule_CloseKey(object_notification_key);
|
||||
} else {
|
||||
/* Publish a notification to the client's object notification channel. */
|
||||
/* Extract the data_size first. */
|
||||
|
@ -1144,31 +1123,23 @@ int ObjectTableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
|
|||
object_info_key =
|
||||
OpenPrefixedKey(ctx, OBJECT_INFO_PREFIX, object_id, REDISMODULE_READ);
|
||||
if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
|
||||
RedisModule_CloseKey(object_info_key);
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithError(ctx, "requested object not found");
|
||||
}
|
||||
RedisModuleString *existing_data_size;
|
||||
RedisModule_HashGet(object_info_key, REDISMODULE_HASH_CFIELDS,
|
||||
"data_size", &existing_data_size, NULL);
|
||||
RedisModule_CloseKey(object_info_key); /* No longer needed. */
|
||||
if (existing_data_size == NULL) {
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithError(ctx,
|
||||
"no data_size field in object info");
|
||||
}
|
||||
|
||||
bool success = PublishObjectNotification(ctx, client_id, object_id,
|
||||
existing_data_size, key);
|
||||
RedisModule_FreeString(ctx, existing_data_size);
|
||||
if (!success) {
|
||||
/* The publish failed somehow. */
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
|
||||
}
|
||||
}
|
||||
/* Clean up. */
|
||||
RedisModule_CloseKey(key);
|
||||
}
|
||||
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
|
@ -1178,7 +1149,8 @@ int ObjectTableRequestNotifications_RedisCommand(RedisModuleCtx *ctx,
|
|||
int ObjectInfoSubscribe_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
REDISMODULE_NOT_USED(ctx);
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
return REDISMODULE_OK;
|
||||
|
@ -1201,6 +1173,8 @@ int ObjectInfoSubscribe_RedisCommand(RedisModuleCtx *ctx,
|
|||
int ResultTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 4) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -1224,8 +1198,6 @@ int ResultTableAdd_RedisCommand(RedisModuleCtx *ctx,
|
|||
RedisModule_HashSet(key, REDISMODULE_HASH_CFIELDS, "task", task_id, "is_put",
|
||||
is_put, NULL);
|
||||
|
||||
/* Clean up. */
|
||||
RedisModule_CloseKey(key);
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
|
||||
return REDISMODULE_OK;
|
||||
|
@ -1266,7 +1238,6 @@ int ReplyWithTask(RedisModuleCtx *ctx,
|
|||
execution_dependencies == NULL || task_spec == NULL ||
|
||||
spillback_count == NULL) {
|
||||
/* We must have either all fields or no fields. */
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithError(
|
||||
ctx, "Missing fields in the task table entry");
|
||||
}
|
||||
|
@ -1279,12 +1250,6 @@ int ReplyWithTask(RedisModuleCtx *ctx,
|
|||
(RedisModule_StringToLongLong(spillback_count, &spillback_count_val) !=
|
||||
REDISMODULE_OK) ||
|
||||
(spillback_count_val < 0)) {
|
||||
RedisModule_CloseKey(key);
|
||||
RedisModule_FreeString(ctx, state);
|
||||
RedisModule_FreeString(ctx, local_scheduler_id);
|
||||
RedisModule_FreeString(ctx, execution_dependencies);
|
||||
RedisModule_FreeString(ctx, task_spec);
|
||||
RedisModule_FreeString(ctx, spillback_count);
|
||||
return RedisModule_ReplyWithError(
|
||||
ctx, "Found invalid scheduling state or spillback count.");
|
||||
}
|
||||
|
@ -1300,19 +1265,11 @@ int ReplyWithTask(RedisModuleCtx *ctx,
|
|||
RedisModuleString *reply = RedisModule_CreateString(
|
||||
ctx, (char *) fbb.GetBufferPointer(), fbb.GetSize());
|
||||
RedisModule_ReplyWithString(ctx, reply);
|
||||
|
||||
RedisModule_FreeString(ctx, state);
|
||||
RedisModule_FreeString(ctx, local_scheduler_id);
|
||||
RedisModule_FreeString(ctx, execution_dependencies);
|
||||
RedisModule_FreeString(ctx, task_spec);
|
||||
RedisModule_FreeString(ctx, spillback_count);
|
||||
} else {
|
||||
/* If the key does not exist, return nil. */
|
||||
RedisModule_ReplyWithNull(ctx);
|
||||
}
|
||||
|
||||
RedisModule_CloseKey(key);
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
|
@ -1330,6 +1287,8 @@ int ReplyWithTask(RedisModuleCtx *ctx,
|
|||
int ResultTableLookup_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 2) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -1341,7 +1300,6 @@ int ResultTableLookup_RedisCommand(RedisModuleCtx *ctx,
|
|||
key = OpenPrefixedKey(ctx, OBJECT_INFO_PREFIX, object_id, REDISMODULE_READ);
|
||||
|
||||
if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithNull(ctx);
|
||||
}
|
||||
|
||||
|
@ -1351,7 +1309,6 @@ int ResultTableLookup_RedisCommand(RedisModuleCtx *ctx,
|
|||
RedisModuleString *hash;
|
||||
RedisModule_HashGet(key, REDISMODULE_HASH_CFIELDS, "task", &task_id, "is_put",
|
||||
&is_put, "data_size", &data_size, "hash", &hash, NULL);
|
||||
RedisModule_CloseKey(key);
|
||||
|
||||
if (task_id == NULL || is_put == NULL) {
|
||||
return RedisModule_ReplyWithNull(ctx);
|
||||
|
@ -1361,8 +1318,6 @@ int ResultTableLookup_RedisCommand(RedisModuleCtx *ctx,
|
|||
long long is_put_integer;
|
||||
if (RedisModule_StringToLongLong(is_put, &is_put_integer) != REDISMODULE_OK ||
|
||||
(is_put_integer != 0 && is_put_integer != 1)) {
|
||||
RedisModule_FreeString(ctx, is_put);
|
||||
RedisModule_FreeString(ctx, task_id);
|
||||
return RedisModule_ReplyWithError(
|
||||
ctx, "The is_put field must be either a 0 or a 1.");
|
||||
}
|
||||
|
@ -1395,19 +1350,6 @@ int ResultTableLookup_RedisCommand(RedisModuleCtx *ctx,
|
|||
ctx, (const char *) fbb.GetBufferPointer(), fbb.GetSize());
|
||||
RedisModule_ReplyWithString(ctx, reply);
|
||||
|
||||
/* Clean up. */
|
||||
RedisModule_FreeString(ctx, reply);
|
||||
RedisModule_FreeString(ctx, is_put);
|
||||
RedisModule_FreeString(ctx, task_id);
|
||||
|
||||
if (data_size != NULL) {
|
||||
RedisModule_FreeString(ctx, data_size);
|
||||
}
|
||||
|
||||
if (hash != NULL) {
|
||||
RedisModule_FreeString(ctx, hash);
|
||||
}
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
|
@ -1442,7 +1384,6 @@ int TaskTableWrite(RedisModuleCtx *ctx,
|
|||
RedisModule_HashGet(key, REDISMODULE_HASH_CFIELDS, "TaskSpec",
|
||||
&existing_task_spec, NULL);
|
||||
if (existing_task_spec == NULL) {
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithError(
|
||||
ctx, "Cannot update a task that doesn't exist yet");
|
||||
}
|
||||
|
@ -1452,7 +1393,6 @@ int TaskTableWrite(RedisModuleCtx *ctx,
|
|||
local_scheduler_id, "execution_dependencies", execution_dependencies,
|
||||
"TaskSpec", task_spec, "spillback_count", spillback_count, NULL);
|
||||
}
|
||||
RedisModule_CloseKey(key);
|
||||
|
||||
if (state_value == TASK_STATUS_WAITING ||
|
||||
state_value == TASK_STATUS_SCHEDULED) {
|
||||
|
@ -1492,12 +1432,6 @@ int TaskTableWrite(RedisModuleCtx *ctx,
|
|||
RAY_CHECK(num_clients <= 1) << "Published to " << num_clients
|
||||
<< " clients.";
|
||||
|
||||
RedisModule_FreeString(ctx, publish_message);
|
||||
RedisModule_FreeString(ctx, publish_topic);
|
||||
if (existing_task_spec != NULL) {
|
||||
RedisModule_FreeString(ctx, existing_task_spec);
|
||||
}
|
||||
|
||||
if (reply == NULL) {
|
||||
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
|
||||
}
|
||||
|
@ -1539,6 +1473,8 @@ int TaskTableWrite(RedisModuleCtx *ctx,
|
|||
int TaskTableAddTask_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 7) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -1568,6 +1504,8 @@ int TaskTableAddTask_RedisCommand(RedisModuleCtx *ctx,
|
|||
int TaskTableUpdate_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 6) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -1602,6 +1540,8 @@ int TaskTableUpdate_RedisCommand(RedisModuleCtx *ctx,
|
|||
int TaskTableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc < 5 || argc > 6) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
@ -1617,7 +1557,6 @@ int TaskTableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
|
|||
RedisModuleKey *key = OpenPrefixedKey(ctx, TASK_PREFIX, task_id,
|
||||
REDISMODULE_READ | REDISMODULE_WRITE);
|
||||
if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithNull(ctx);
|
||||
}
|
||||
|
||||
|
@ -1634,13 +1573,11 @@ int TaskTableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
|
|||
}
|
||||
|
||||
if (current_state_integer < 0) {
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithError(ctx, "Found invalid scheduling state.");
|
||||
}
|
||||
long long test_state_bitmask;
|
||||
int status = RedisModule_StringToLongLong(test_state, &test_state_bitmask);
|
||||
if (status != REDISMODULE_OK) {
|
||||
RedisModule_CloseKey(key);
|
||||
return RedisModule_ReplyWithError(
|
||||
ctx, "Invalid test value for scheduling state");
|
||||
}
|
||||
|
@ -1670,8 +1607,6 @@ int TaskTableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
|
|||
"local_scheduler_id", local_scheduler_id, NULL);
|
||||
}
|
||||
|
||||
/* Clean up. */
|
||||
RedisModule_CloseKey(key);
|
||||
/* Construct a reply by getting the task from the task ID. */
|
||||
return ReplyWithTask(ctx, task_id, update);
|
||||
}
|
||||
|
@ -1692,6 +1627,8 @@ int TaskTableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
|
|||
int TaskTableGet_RedisCommand(RedisModuleCtx *ctx,
|
||||
RedisModuleString **argv,
|
||||
int argc) {
|
||||
RedisModule_AutoMemory(ctx);
|
||||
|
||||
if (argc != 2) {
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue