Make redis module safe without crashing by removing RAY_CHECK (#3855)

This commit is contained in:
Yuhong Guo 2019-01-30 13:06:31 +08:00 committed by Robert Nishihara
parent fb73cedf70
commit c45b91dcca
3 changed files with 192 additions and 99 deletions

View file

@ -55,7 +55,7 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) {
case (REDIS_REPLY_STATUS): { case (REDIS_REPLY_STATUS): {
} break; } break;
case (REDIS_REPLY_ERROR): { case (REDIS_REPLY_ERROR): {
RAY_LOG(ERROR) << "Redis error " << reply->str; RAY_LOG(FATAL) << "Redis error: " << reply->str;
} break; } break;
case (REDIS_REPLY_INTEGER): { case (REDIS_REPLY_INTEGER): {
data = std::to_string(reply->integer); data = std::to_string(reply->integer);
@ -95,7 +95,7 @@ void SubscribeRedisCallback(void *c, void *r, void *privdata) {
} break; } break;
case (REDIS_REPLY_ERROR): { case (REDIS_REPLY_ERROR): {
RAY_LOG(ERROR) << "Redis error " << reply->str; RAY_LOG(FATAL) << "Redis error: " << reply->str;
} break; } break;
default: default:
RAY_LOG(FATAL) << "Fatal redis error of type " << reply->type << " and with string " RAY_LOG(FATAL) << "Fatal redis error of type " << reply->type << " and with string "

View file

@ -3,10 +3,13 @@
#include "ray/common/common_protocol.h" #include "ray/common/common_protocol.h"
#include "ray/gcs/format/gcs_generated.h" #include "ray/gcs/format/gcs_generated.h"
#include "ray/id.h" #include "ray/id.h"
#include "ray/status.h"
#include "ray/util/logging.h" #include "ray/util/logging.h"
#include "redis_string.h" #include "redis_string.h"
#include "redismodule.h" #include "redismodule.h"
using ray::Status;
#if RAY_USE_NEW_GCS #if RAY_USE_NEW_GCS
// Under this flag, ray-project/credis will be loaded. Specifically, via // Under this flag, ray-project/credis will be loaded. Specifically, via
// "path/redis-server --loadmodule <credis module> --loadmodule <current // "path/redis-server --loadmodule <credis module> --loadmodule <current
@ -20,33 +23,51 @@
extern RedisChainModule module; extern RedisChainModule module;
#endif #endif
#define CHECK_ERROR(STATUS, MESSAGE) \ #define REPLY_AND_RETURN_IF_FALSE(CONDITION, MESSAGE) \
if ((STATUS) == REDISMODULE_ERR) { \ if (!(CONDITION)) { \
return RedisModule_ReplyWithError(ctx, (MESSAGE)); \ RedisModule_ReplyWithError(ctx, (MESSAGE)); \
return REDISMODULE_ERR; \
}
// This macro can be used at the top level of redis module.
#define REPLY_AND_RETURN_IF_NOT_OK(STATUS) \
{ \
auto status = (STATUS); \
if (!status.ok()) { \
RedisModule_ReplyWithError(ctx, status.message().c_str()); \
return REDISMODULE_ERR; \
} \
} }
/// Parse a Redis string into a TablePubsub channel. /// Parse a Redis string into a TablePubsub channel.
TablePubsub ParseTablePubsub(const RedisModuleString *pubsub_channel_str) { Status ParseTablePubsub(TablePubsub *out, const RedisModuleString *pubsub_channel_str) {
long long pubsub_channel_long; long long pubsub_channel_long;
RAY_CHECK(RedisModule_StringToLongLong(pubsub_channel_str, &pubsub_channel_long) == if (RedisModule_StringToLongLong(pubsub_channel_str, &pubsub_channel_long) !=
REDISMODULE_OK) REDISMODULE_OK) {
<< "Pubsub channel must be a valid TablePubsub"; return Status::RedisError("Pubsub channel must be a valid integer.");
auto pubsub_channel = static_cast<TablePubsub>(pubsub_channel_long); }
RAY_CHECK(pubsub_channel >= TablePubsub::MIN && pubsub_channel <= TablePubsub::MAX) if (pubsub_channel_long > static_cast<long long>(TablePubsub::MAX) ||
<< "Pubsub channel must be a valid TablePubsub"; pubsub_channel_long < static_cast<long long>(TablePubsub::MIN)) {
return pubsub_channel; return Status::RedisError("Pubsub channel must be in the TablePubsub range.");
} else {
*out = static_cast<TablePubsub>(pubsub_channel_long);
return Status::OK();
}
} }
/// Format a pubsub channel for a specific key. pubsub_channel_str should /// Format a pubsub channel for a specific key. pubsub_channel_str should
/// contain a valid TablePubsub. /// contain a valid TablePubsub.
RedisModuleString *FormatPubsubChannel(RedisModuleCtx *ctx, Status FormatPubsubChannel(RedisModuleString **out, RedisModuleCtx *ctx,
const RedisModuleString *pubsub_channel_str, const RedisModuleString *pubsub_channel_str,
const RedisModuleString *id) { const RedisModuleString *id) {
// Format the pubsub channel enum to a string. TablePubsub_MAX should be more // Format the pubsub channel enum to a string. TablePubsub_MAX should be more
// than enough digits, but add 1 just in case for the null terminator. // than enough digits, but add 1 just in case for the null terminator.
char pubsub_channel[static_cast<int>(TablePubsub::MAX) + 1]; char pubsub_channel[static_cast<int>(TablePubsub::MAX) + 1];
sprintf(pubsub_channel, "%d", static_cast<int>(ParseTablePubsub(pubsub_channel_str))); TablePubsub table_pubsub;
return RedisString_Format(ctx, "%s:%S", pubsub_channel, id); RAY_RETURN_NOT_OK(ParseTablePubsub(&table_pubsub, pubsub_channel_str));
sprintf(pubsub_channel, "%d", static_cast<int>(table_pubsub));
*out = RedisString_Format(ctx, "%s:%S", pubsub_channel, id);
return Status::OK();
} }
// TODO(swang): This helper function should be deprecated by the version below, // TODO(swang): This helper function should be deprecated by the version below,
@ -59,23 +80,26 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, const char *prefix,
if (mutated_key_str != nullptr) { if (mutated_key_str != nullptr) {
*mutated_key_str = prefixed_keyname; *mutated_key_str = prefixed_keyname;
} }
RedisModuleKey *key = RedisModuleKey *key = reinterpret_cast<RedisModuleKey *>(
(RedisModuleKey *)RedisModule_OpenKey(ctx, prefixed_keyname, mode); RedisModule_OpenKey(ctx, prefixed_keyname, mode));
return key; return key;
} }
RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, RedisModuleString *prefix_enum, Status OpenPrefixedKey(RedisModuleKey **out, RedisModuleCtx *ctx,
RedisModuleString *keyname, int mode, RedisModuleString *prefix_enum, RedisModuleString *keyname,
RedisModuleString **mutated_key_str) { int mode, RedisModuleString **mutated_key_str) {
long long prefix_long; long long prefix_long;
RAY_CHECK(RedisModule_StringToLongLong(prefix_enum, &prefix_long) == REDISMODULE_OK) if (RedisModule_StringToLongLong(prefix_enum, &prefix_long) != REDISMODULE_OK) {
<< "Prefix must be a valid TablePrefix"; return Status::RedisError("Prefix must be a valid TablePrefix integer.");
}
if (prefix_long > static_cast<long long>(TablePrefix::MAX) ||
prefix_long < static_cast<long long>(TablePrefix::MIN)) {
return Status::RedisError("Prefix must be in the TablePrefix range.");
}
auto prefix = static_cast<TablePrefix>(prefix_long); auto prefix = static_cast<TablePrefix>(prefix_long);
RAY_CHECK(prefix != TablePrefix::UNUSED) << "This table has no prefix registered"; *out =
RAY_CHECK(prefix >= TablePrefix::MIN && prefix <= TablePrefix::MAX) OpenPrefixedKey(ctx, EnumNameTablePrefix(prefix), keyname, mode, mutated_key_str);
<< "Prefix must be a valid TablePrefix"; return Status::OK();
return OpenPrefixedKey(ctx, EnumNameTablePrefix(prefix), keyname, mode,
mutated_key_str);
} }
RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, const char *prefix, RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, const char *prefix,
@ -84,22 +108,24 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, const char *prefix,
/*mutated_key_str=*/nullptr); /*mutated_key_str=*/nullptr);
} }
RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, RedisModuleString *prefix_enum, Status OpenPrefixedKey(RedisModuleKey **out, RedisModuleCtx *ctx,
RedisModuleString *keyname, int mode) { RedisModuleString *prefix_enum, RedisModuleString *keyname,
return OpenPrefixedKey(ctx, prefix_enum, keyname, mode, int mode) {
return OpenPrefixedKey(out, ctx, prefix_enum, keyname, mode,
/*mutated_key_str=*/nullptr); /*mutated_key_str=*/nullptr);
} }
/// Open the key used to store the channels that should be published to when an /// Open the key used to store the channels that should be published to when an
/// update happens at the given keyname. /// update happens at the given keyname.
RedisModuleKey *OpenBroadcastKey(RedisModuleCtx *ctx, Status OpenBroadcastKey(RedisModuleKey **out, RedisModuleCtx *ctx,
RedisModuleString *pubsub_channel_str, RedisModuleString *pubsub_channel_str, RedisModuleString *keyname,
RedisModuleString *keyname, int mode) { int mode) {
RedisModuleString *channel = FormatPubsubChannel(ctx, pubsub_channel_str, keyname); RedisModuleString *channel;
RAY_RETURN_NOT_OK(FormatPubsubChannel(&channel, ctx, pubsub_channel_str, keyname));
RedisModuleString *prefixed_keyname = RedisString_Format(ctx, "BCAST:%S", channel); RedisModuleString *prefixed_keyname = RedisString_Format(ctx, "BCAST:%S", channel);
RedisModuleKey *key = *out = reinterpret_cast<RedisModuleKey *>(
(RedisModuleKey *)RedisModule_OpenKey(ctx, prefixed_keyname, mode); RedisModule_OpenKey(ctx, prefixed_keyname, mode));
return key; return Status::OK();
} }
/** /**
@ -146,14 +172,15 @@ int PublishTableAdd(RedisModuleCtx *ctx, RedisModuleString *pubsub_channel_str,
} }
// Publish the data to any clients who requested notifications on this key. // Publish the data to any clients who requested notifications on this key.
RedisModuleKey *notification_key = RedisModuleKey *notification_key;
OpenBroadcastKey(ctx, pubsub_channel_str, id, REDISMODULE_READ | REDISMODULE_WRITE); REPLY_AND_RETURN_IF_NOT_OK(OpenBroadcastKey(&notification_key, ctx, pubsub_channel_str,
id, REDISMODULE_READ | REDISMODULE_WRITE));
if (RedisModule_KeyType(notification_key) != REDISMODULE_KEYTYPE_EMPTY) { if (RedisModule_KeyType(notification_key) != REDISMODULE_KEYTYPE_EMPTY) {
// NOTE(swang): Sets are not implemented yet, so we use ZSETs instead. // NOTE(swang): Sets are not implemented yet, so we use ZSETs instead.
CHECK_ERROR( REPLY_AND_RETURN_IF_FALSE(RedisModule_ZsetFirstInScoreRange(
RedisModule_ZsetFirstInScoreRange(notification_key, REDISMODULE_NEGATIVE_INFINITE, notification_key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, 1), REDISMODULE_POSITIVE_INFINITE, 1, 1) == REDISMODULE_OK,
"Unable to initialize zset iterator"); "Unable to initialize zset iterator");
for (; !RedisModule_ZsetRangeEndReached(notification_key); for (; !RedisModule_ZsetRangeEndReached(notification_key);
RedisModule_ZsetRangeNext(notification_key)) { RedisModule_ZsetRangeNext(notification_key)) {
RedisModuleString *client_channel = RedisModuleString *client_channel =
@ -183,8 +210,9 @@ int TableAdd_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
RedisModuleString *id = argv[3]; RedisModuleString *id = argv[3];
RedisModuleString *data = argv[4]; RedisModuleString *data = argv[4];
RedisModuleKey *key = OpenPrefixedKey( RedisModuleKey *key;
ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str); REPLY_AND_RETURN_IF_NOT_OK(OpenPrefixedKey(
&key, ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str));
RedisModule_StringSet(key, data); RedisModule_StringSet(key, data);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -197,7 +225,8 @@ int TableAdd_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
RedisModuleString *id = argv[3]; RedisModuleString *id = argv[3];
RedisModuleString *data = argv[4]; RedisModuleString *data = argv[4];
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str); TablePubsub pubsub_channel;
REPLY_AND_RETURN_IF_NOT_OK(ParseTablePubsub(&pubsub_channel, pubsub_channel_str));
if (pubsub_channel != TablePubsub::NO_PUBLISH) { if (pubsub_channel != TablePubsub::NO_PUBLISH) {
// All other pubsub channels write the data back directly onto the channel. // All other pubsub channels write the data back directly onto the channel.
@ -251,17 +280,19 @@ int TableAppend_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
} }
// Set the keys in the table. // Set the keys in the table.
RedisModuleKey *key = OpenPrefixedKey( RedisModuleKey *key;
ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str); REPLY_AND_RETURN_IF_NOT_OK(OpenPrefixedKey(
&key, ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str));
// Determine the index at which the data should be appended. If no index is // Determine the index at which the data should be appended. If no index is
// requested, then is the current length of the log. // requested, then is the current length of the log.
size_t index = RedisModule_ValueLength(key); size_t index = RedisModule_ValueLength(key);
if (index_str != nullptr) { if (index_str != nullptr) {
// Parse the requested index. // Parse the requested index.
long long requested_index; long long requested_index;
RAY_CHECK(RedisModule_StringToLongLong(index_str, &requested_index) == REPLY_AND_RETURN_IF_FALSE(
REDISMODULE_OK); RedisModule_StringToLongLong(index_str, &requested_index) == REDISMODULE_OK,
RAY_CHECK(requested_index >= 0); "Index is not a number.");
REPLY_AND_RETURN_IF_FALSE(requested_index >= 0, "Index is less than 0.");
index = static_cast<size_t>(requested_index); index = static_cast<size_t>(requested_index);
} }
// Only perform the append if the requested index matches the current length // Only perform the append if the requested index matches the current length
@ -288,10 +319,13 @@ int TableAppend_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
std::string postfix = std::to_string(index); std::string postfix = std::to_string(index);
RedisModuleString *new_id = RedisModuleString *new_id =
RedisString_Format(ctx, "%S:%b", id, postfix.data(), postfix.size()); RedisString_Format(ctx, "%S:%b", id, postfix.data(), postfix.size());
RedisModuleKey *new_key = OpenPrefixedKey( RedisModuleKey *new_key;
ctx, prefix_str, new_id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str); REPLY_AND_RETURN_IF_NOT_OK(OpenPrefixedKey(&new_key, ctx, prefix_str, new_id,
REDISMODULE_READ | REDISMODULE_WRITE,
mutated_key_str));
RedisModule_ZsetAdd(new_key, index, data, &flags); RedisModule_ZsetAdd(new_key, index, data, &flags);
RAY_CHECK(flags == REDISMODULE_ZADD_ADDED); REPLY_AND_RETURN_IF_FALSE(flags == REDISMODULE_ZADD_ADDED,
"Appended a duplicate entry");
} }
return REDISMODULE_OK; return REDISMODULE_OK;
} else { } else {
@ -308,7 +342,8 @@ int TableAppend_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int /*a
RedisModuleString *id = argv[3]; RedisModuleString *id = argv[3];
RedisModuleString *data = argv[4]; RedisModuleString *data = argv[4];
// Publish a message on the requested pubsub channel if necessary. // Publish a message on the requested pubsub channel if necessary.
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str); TablePubsub pubsub_channel;
REPLY_AND_RETURN_IF_NOT_OK(ParseTablePubsub(&pubsub_channel, pubsub_channel_str));
if (pubsub_channel != TablePubsub::NO_PUBLISH) { if (pubsub_channel != TablePubsub::NO_PUBLISH) {
// All other pubsub channels write the data back directly onto the // All other pubsub channels write the data back directly onto the
// channel. // channel.
@ -339,11 +374,9 @@ int TableAppend_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int /*a
/// \return OK if the append succeeds, or an error message string if the append /// \return OK if the append succeeds, or an error message string if the append
/// fails. /// fails.
int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx); if (TableAppend_DoWrite(ctx, argv, argc, /*mutated_key_str=*/nullptr) !=
const int status = TableAppend_DoWrite(ctx, argv, argc, REDISMODULE_OK) {
/*mutated_key_str=*/nullptr); return REDISMODULE_ERR;
if (status) {
return status;
} }
return TableAppend_DoPublish(ctx, argv, argc); return TableAppend_DoPublish(ctx, argv, argc);
} }
@ -360,8 +393,8 @@ int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
/// A helper function to create and finish a GcsTableEntry, based on the /// A helper function to create and finish a GcsTableEntry, based on the
/// current value or values at the given key. /// current value or values at the given key.
void TableEntryToFlatbuf(RedisModuleKey *table_key, RedisModuleString *entry_id, Status TableEntryToFlatbuf(RedisModuleKey *table_key, RedisModuleString *entry_id,
flatbuffers::FlatBufferBuilder &fbb) { flatbuffers::FlatBufferBuilder &fbb) {
auto key_type = RedisModule_KeyType(table_key); auto key_type = RedisModule_KeyType(table_key);
switch (key_type) { switch (key_type) {
case REDISMODULE_KEYTYPE_STRING: { case REDISMODULE_KEYTYPE_STRING: {
@ -375,9 +408,11 @@ void TableEntryToFlatbuf(RedisModuleKey *table_key, RedisModuleString *entry_id,
} break; } break;
case REDISMODULE_KEYTYPE_ZSET: { case REDISMODULE_KEYTYPE_ZSET: {
// Build the flatbuffer from the set of log entries. // Build the flatbuffer from the set of log entries.
RAY_CHECK(RedisModule_ZsetFirstInScoreRange(table_key, REDISMODULE_NEGATIVE_INFINITE, if (RedisModule_ZsetFirstInScoreRange(table_key, REDISMODULE_NEGATIVE_INFINITE,
REDISMODULE_POSITIVE_INFINITE, 1, REDISMODULE_POSITIVE_INFINITE, 1,
1) == REDISMODULE_OK); 1) != REDISMODULE_OK) {
return Status::RedisError("Empty zset or wrong type");
}
std::vector<flatbuffers::Offset<flatbuffers::String>> data; std::vector<flatbuffers::Offset<flatbuffers::String>> data;
for (; !RedisModule_ZsetRangeEndReached(table_key); for (; !RedisModule_ZsetRangeEndReached(table_key);
RedisModule_ZsetRangeNext(table_key)) { RedisModule_ZsetRangeNext(table_key)) {
@ -395,8 +430,9 @@ void TableEntryToFlatbuf(RedisModuleKey *table_key, RedisModuleString *entry_id,
fbb.Finish(message); fbb.Finish(message);
} break; } break;
default: default:
RAY_LOG(FATAL) << "Invalid Redis type during lookup: " << key_type; return Status::RedisError("Invalid Redis type during lookup.");
} }
return Status::OK();
} }
/// Lookup the current value or values at a key. Returns the current value or /// Lookup the current value or values at a key. Returns the current value or
@ -423,13 +459,15 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
RedisModuleString *id = argv[3]; RedisModuleString *id = argv[3];
// Lookup the data at the key. // Lookup the data at the key.
RedisModuleKey *table_key = OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ); RedisModuleKey *table_key;
REPLY_AND_RETURN_IF_NOT_OK(
OpenPrefixedKey(&table_key, ctx, prefix_str, id, REDISMODULE_READ));
if (table_key == nullptr) { if (table_key == nullptr) {
RedisModule_ReplyWithNull(ctx); RedisModule_ReplyWithNull(ctx);
} else { } else {
// Serialize the data to a flatbuffer to return to the client. // Serialize the data to a flatbuffer to return to the client.
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
TableEntryToFlatbuf(table_key, id, fbb); REPLY_AND_RETURN_IF_NOT_OK(TableEntryToFlatbuf(table_key, id, fbb));
RedisModule_ReplyWithStringBuffer( RedisModule_ReplyWithStringBuffer(
ctx, reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); ctx, reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
} }
@ -465,23 +503,28 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleStrin
RedisModuleString *pubsub_channel_str = argv[2]; RedisModuleString *pubsub_channel_str = argv[2];
RedisModuleString *id = argv[3]; RedisModuleString *id = argv[3];
RedisModuleString *client_id = argv[4]; RedisModuleString *client_id = argv[4];
RedisModuleString *client_channel = RedisModuleString *client_channel;
FormatPubsubChannel(ctx, pubsub_channel_str, client_id); REPLY_AND_RETURN_IF_NOT_OK(
FormatPubsubChannel(&client_channel, ctx, pubsub_channel_str, client_id));
// Add this client to the set of clients that should be notified when there // Add this client to the set of clients that should be notified when there
// are changes to the key. // are changes to the key.
RedisModuleKey *notification_key = RedisModuleKey *notification_key;
OpenBroadcastKey(ctx, pubsub_channel_str, id, REDISMODULE_READ | REDISMODULE_WRITE); REPLY_AND_RETURN_IF_NOT_OK(OpenBroadcastKey(&notification_key, ctx, pubsub_channel_str,
CHECK_ERROR(RedisModule_ZsetAdd(notification_key, 0.0, client_channel, NULL), id, REDISMODULE_READ | REDISMODULE_WRITE));
"ZsetAdd failed."); REPLY_AND_RETURN_IF_FALSE(
RedisModule_ZsetAdd(notification_key, 0.0, client_channel, NULL) == REDISMODULE_OK,
"ZsetAdd failed.");
// Lookup the current value at the key. // Lookup the current value at the key.
RedisModuleKey *table_key = OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ); RedisModuleKey *table_key;
REPLY_AND_RETURN_IF_NOT_OK(
OpenPrefixedKey(&table_key, ctx, prefix_str, id, REDISMODULE_READ));
// Publish the current value at the key to the client that is requesting // Publish the current value at the key to the client that is requesting
// notifications. An empty notification will be published if the key is // notifications. An empty notification will be published if the key is
// empty. // empty.
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
TableEntryToFlatbuf(table_key, id, fbb); REPLY_AND_RETURN_IF_NOT_OK(TableEntryToFlatbuf(table_key, id, fbb));
RedisModule_Call(ctx, "PUBLISH", "sb", client_channel, RedisModule_Call(ctx, "PUBLISH", "sb", client_channel,
reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize()); reinterpret_cast<const char *>(fbb.GetBufferPointer()), fbb.GetSize());
@ -515,19 +558,23 @@ int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString
RedisModuleString *pubsub_channel_str = argv[2]; RedisModuleString *pubsub_channel_str = argv[2];
RedisModuleString *id = argv[3]; RedisModuleString *id = argv[3];
RedisModuleString *client_id = argv[4]; RedisModuleString *client_id = argv[4];
RedisModuleString *client_channel = RedisModuleString *client_channel;
FormatPubsubChannel(ctx, pubsub_channel_str, client_id); REPLY_AND_RETURN_IF_NOT_OK(
FormatPubsubChannel(&client_channel, ctx, pubsub_channel_str, client_id));
// Remove this client from the set of clients that should be notified when // Remove this client from the set of clients that should be notified when
// there are changes to the key. // there are changes to the key.
RedisModuleKey *notification_key = RedisModuleKey *notification_key;
OpenBroadcastKey(ctx, pubsub_channel_str, id, REDISMODULE_READ | REDISMODULE_WRITE); REPLY_AND_RETURN_IF_NOT_OK(OpenBroadcastKey(&notification_key, ctx, pubsub_channel_str,
id, REDISMODULE_READ | REDISMODULE_WRITE));
if (RedisModule_KeyType(notification_key) != REDISMODULE_KEYTYPE_EMPTY) { if (RedisModule_KeyType(notification_key) != REDISMODULE_KEYTYPE_EMPTY) {
RAY_CHECK(RedisModule_ZsetRem(notification_key, client_channel, NULL) == REPLY_AND_RETURN_IF_FALSE(
REDISMODULE_OK); RedisModule_ZsetRem(notification_key, client_channel, NULL) == REDISMODULE_OK,
"not opened for writing or wrong type.");
size_t size = RedisModule_ValueLength(notification_key); size_t size = RedisModule_ValueLength(notification_key);
if (size == 0) { if (size == 0) {
CHECK_ERROR(RedisModule_DeleteKey(notification_key), "Unable to delete zset key."); REPLY_AND_RETURN_IF_FALSE(RedisModule_DeleteKey(notification_key) == REDISMODULE_OK,
"Unable to delete zset key.");
} }
} }
@ -535,15 +582,18 @@ int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString
return REDISMODULE_OK; return REDISMODULE_OK;
} }
bool is_nil(const std::string &data) { Status is_nil(bool *out, const std::string &data) {
RAY_CHECK(data.size() == kUniqueIDSize); if (data.size() != kUniqueIDSize) {
return Status::RedisError("Size of data doesn't match size of UniqueID");
}
const uint8_t *d = reinterpret_cast<const uint8_t *>(data.data()); const uint8_t *d = reinterpret_cast<const uint8_t *>(data.data());
for (int i = 0; i < kUniqueIDSize; ++i) { for (int i = 0; i < kUniqueIDSize; ++i) {
if (d[i] != 255) { if (d[i] != 255) {
return false; *out = false;
} }
} }
return true; *out = true;
return Status::OK();
} }
// This is a temporary redis command that will be removed once // This is a temporary redis command that will be removed once
@ -560,8 +610,9 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg
RedisModuleString *id = argv[3]; RedisModuleString *id = argv[3];
RedisModuleString *update_data = argv[4]; RedisModuleString *update_data = argv[4];
RedisModuleKey *key = RedisModuleKey *key;
OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE); REPLY_AND_RETURN_IF_NOT_OK(
OpenPrefixedKey(&key, ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE));
size_t value_len = 0; size_t value_len = 0;
char *value_buf = RedisModule_StringDMA(key, &value_len, REDISMODULE_READ); char *value_buf = RedisModule_StringDMA(key, &value_len, REDISMODULE_READ);
@ -577,15 +628,18 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg
bool do_update = static_cast<int>(data->scheduling_state()) & bool do_update = static_cast<int>(data->scheduling_state()) &
static_cast<int>(update->test_state_bitmask()); static_cast<int>(update->test_state_bitmask());
if (!is_nil(update->test_scheduler_id()->str())) { bool is_nil_result;
REPLY_AND_RETURN_IF_NOT_OK(is_nil(&is_nil_result, update->test_scheduler_id()->str()));
if (!is_nil_result) {
do_update = do_update =
do_update && update->test_scheduler_id()->str() == data->scheduler_id()->str(); do_update && update->test_scheduler_id()->str() == data->scheduler_id()->str();
} }
if (do_update) { if (do_update) {
RAY_CHECK(data->mutate_scheduling_state(update->update_state())); REPLY_AND_RETURN_IF_FALSE(data->mutate_scheduling_state(update->update_state()),
"mutate_scheduling_state failed");
} }
RAY_CHECK(data->mutate_updated(do_update)); REPLY_AND_RETURN_IF_FALSE(data->mutate_updated(do_update), "mutate_updated failed");
int result = RedisModule_ReplyWithStringBuffer(ctx, value_buf, value_len); int result = RedisModule_ReplyWithStringBuffer(ctx, value_buf, value_len);

View file

@ -2,19 +2,20 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import numpy as np
import json import json
import os import os
import ray import pytest
import sys import sys
import tempfile import tempfile
import threading import threading
import time import time
import numpy as np
import redis
import ray
import ray.ray_constants as ray_constants import ray.ray_constants as ray_constants
from ray.utils import _random_string from ray.utils import _random_string
import pytest
from ray.test.cluster_utils import Cluster from ray.test.cluster_utils import Cluster
@ -618,6 +619,44 @@ def test_warning_for_too_many_nested_tasks(shutdown_only):
wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1) wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1)
def test_redis_module_failure(shutdown_only):
address_info = ray.init(num_cpus=1)
redis_address = address_info["redis_address"]
redis_address = redis_address.split(":")
assert len(redis_address) == 2
def run_failure_test(expecting_message, *command):
with pytest.raises(
Exception, match=".*{}.*".format(expecting_message)):
client = redis.StrictRedis(
host=redis_address[0], port=int(redis_address[1]))
client.execute_command(*command)
def run_one_command(*command):
client = redis.StrictRedis(
host=redis_address[0], port=int(redis_address[1]))
client.execute_command(*command)
run_failure_test("wrong number of arguments", "RAY.TABLE_ADD", 13)
run_failure_test("Prefix must be in the TablePrefix range",
"RAY.TABLE_ADD", 100000, 1, 1, 1)
run_failure_test("Prefix must be in the TablePrefix range",
"RAY.TABLE_REQUEST_NOTIFICATIONS", 100000, 1, 1, 1)
run_failure_test("Prefix must be a valid TablePrefix integer",
"RAY.TABLE_ADD", b"a", 1, 1, 1)
run_failure_test("Pubsub channel must be in the TablePubsub range",
"RAY.TABLE_ADD", 1, 10000, 1, 1)
run_failure_test("Pubsub channel must be a valid integer", "RAY.TABLE_ADD",
1, b"a", 1, 1)
run_failure_test("Index is less than 0.", "RAY.TABLE_APPEND", 1, 1, 1, 1,
-1)
run_failure_test("Index is not a number.", "RAY.TABLE_APPEND", 1, 1, 1, 1,
b"a")
run_one_command("RAY.TABLE_APPEND", 1, 1, 1, 1)
run_failure_test("Appended a duplicate entry", "RAY.TABLE_APPEND", 1, 1, 1,
1, 1)
@pytest.fixture @pytest.fixture
def ray_start_two_nodes(): def ray_start_two_nodes():
# Start the Ray processes. # Start the Ray processes.