diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index dde1998a4..368c5fa13 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -55,7 +55,7 @@ void GlobalRedisCallback(void *c, void *r, void *privdata) { case (REDIS_REPLY_STATUS): { } break; case (REDIS_REPLY_ERROR): { - RAY_LOG(ERROR) << "Redis error " << reply->str; + RAY_LOG(FATAL) << "Redis error: " << reply->str; } break; case (REDIS_REPLY_INTEGER): { data = std::to_string(reply->integer); @@ -95,7 +95,7 @@ void SubscribeRedisCallback(void *c, void *r, void *privdata) { } break; case (REDIS_REPLY_ERROR): { - RAY_LOG(ERROR) << "Redis error " << reply->str; + RAY_LOG(FATAL) << "Redis error: " << reply->str; } break; default: RAY_LOG(FATAL) << "Fatal redis error of type " << reply->type << " and with string " diff --git a/src/ray/gcs/redis_module/ray_redis_module.cc b/src/ray/gcs/redis_module/ray_redis_module.cc index f19eacbb6..447e515a0 100644 --- a/src/ray/gcs/redis_module/ray_redis_module.cc +++ b/src/ray/gcs/redis_module/ray_redis_module.cc @@ -3,10 +3,13 @@ #include "ray/common/common_protocol.h" #include "ray/gcs/format/gcs_generated.h" #include "ray/id.h" +#include "ray/status.h" #include "ray/util/logging.h" #include "redis_string.h" #include "redismodule.h" +using ray::Status; + #if RAY_USE_NEW_GCS // Under this flag, ray-project/credis will be loaded. Specifically, via // "path/redis-server --loadmodule --loadmodule (pubsub_channel_long); - RAY_CHECK(pubsub_channel >= TablePubsub::MIN && pubsub_channel <= TablePubsub::MAX) - << "Pubsub channel must be a valid TablePubsub"; - return pubsub_channel; + if (RedisModule_StringToLongLong(pubsub_channel_str, &pubsub_channel_long) != + REDISMODULE_OK) { + return Status::RedisError("Pubsub channel must be a valid integer."); + } + if (pubsub_channel_long > static_cast(TablePubsub::MAX) || + pubsub_channel_long < static_cast(TablePubsub::MIN)) { + return Status::RedisError("Pubsub channel must be in the TablePubsub range."); + } else { + *out = static_cast(pubsub_channel_long); + return Status::OK(); + } } /// Format a pubsub channel for a specific key. pubsub_channel_str should /// contain a valid TablePubsub. -RedisModuleString *FormatPubsubChannel(RedisModuleCtx *ctx, - const RedisModuleString *pubsub_channel_str, - const RedisModuleString *id) { +Status FormatPubsubChannel(RedisModuleString **out, RedisModuleCtx *ctx, + const RedisModuleString *pubsub_channel_str, + const RedisModuleString *id) { // Format the pubsub channel enum to a string. TablePubsub_MAX should be more // than enough digits, but add 1 just in case for the null terminator. char pubsub_channel[static_cast(TablePubsub::MAX) + 1]; - sprintf(pubsub_channel, "%d", static_cast(ParseTablePubsub(pubsub_channel_str))); - return RedisString_Format(ctx, "%s:%S", pubsub_channel, id); + TablePubsub table_pubsub; + RAY_RETURN_NOT_OK(ParseTablePubsub(&table_pubsub, pubsub_channel_str)); + sprintf(pubsub_channel, "%d", static_cast(table_pubsub)); + *out = RedisString_Format(ctx, "%s:%S", pubsub_channel, id); + return Status::OK(); } // TODO(swang): This helper function should be deprecated by the version below, @@ -59,23 +80,26 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, const char *prefix, if (mutated_key_str != nullptr) { *mutated_key_str = prefixed_keyname; } - RedisModuleKey *key = - (RedisModuleKey *)RedisModule_OpenKey(ctx, prefixed_keyname, mode); + RedisModuleKey *key = reinterpret_cast( + RedisModule_OpenKey(ctx, prefixed_keyname, mode)); return key; } -RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, RedisModuleString *prefix_enum, - RedisModuleString *keyname, int mode, - RedisModuleString **mutated_key_str) { +Status OpenPrefixedKey(RedisModuleKey **out, RedisModuleCtx *ctx, + RedisModuleString *prefix_enum, RedisModuleString *keyname, + int mode, RedisModuleString **mutated_key_str) { long long prefix_long; - RAY_CHECK(RedisModule_StringToLongLong(prefix_enum, &prefix_long) == REDISMODULE_OK) - << "Prefix must be a valid TablePrefix"; + if (RedisModule_StringToLongLong(prefix_enum, &prefix_long) != REDISMODULE_OK) { + return Status::RedisError("Prefix must be a valid TablePrefix integer."); + } + if (prefix_long > static_cast(TablePrefix::MAX) || + prefix_long < static_cast(TablePrefix::MIN)) { + return Status::RedisError("Prefix must be in the TablePrefix range."); + } auto prefix = static_cast(prefix_long); - RAY_CHECK(prefix != TablePrefix::UNUSED) << "This table has no prefix registered"; - RAY_CHECK(prefix >= TablePrefix::MIN && prefix <= TablePrefix::MAX) - << "Prefix must be a valid TablePrefix"; - return OpenPrefixedKey(ctx, EnumNameTablePrefix(prefix), keyname, mode, - mutated_key_str); + *out = + OpenPrefixedKey(ctx, EnumNameTablePrefix(prefix), keyname, mode, mutated_key_str); + return Status::OK(); } RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, const char *prefix, @@ -84,22 +108,24 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, const char *prefix, /*mutated_key_str=*/nullptr); } -RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx, RedisModuleString *prefix_enum, - RedisModuleString *keyname, int mode) { - return OpenPrefixedKey(ctx, prefix_enum, keyname, mode, +Status OpenPrefixedKey(RedisModuleKey **out, RedisModuleCtx *ctx, + RedisModuleString *prefix_enum, RedisModuleString *keyname, + int mode) { + return OpenPrefixedKey(out, ctx, prefix_enum, keyname, mode, /*mutated_key_str=*/nullptr); } /// Open the key used to store the channels that should be published to when an /// update happens at the given keyname. -RedisModuleKey *OpenBroadcastKey(RedisModuleCtx *ctx, - RedisModuleString *pubsub_channel_str, - RedisModuleString *keyname, int mode) { - RedisModuleString *channel = FormatPubsubChannel(ctx, pubsub_channel_str, keyname); +Status OpenBroadcastKey(RedisModuleKey **out, RedisModuleCtx *ctx, + RedisModuleString *pubsub_channel_str, RedisModuleString *keyname, + int mode) { + RedisModuleString *channel; + RAY_RETURN_NOT_OK(FormatPubsubChannel(&channel, ctx, pubsub_channel_str, keyname)); RedisModuleString *prefixed_keyname = RedisString_Format(ctx, "BCAST:%S", channel); - RedisModuleKey *key = - (RedisModuleKey *)RedisModule_OpenKey(ctx, prefixed_keyname, mode); - return key; + *out = reinterpret_cast( + RedisModule_OpenKey(ctx, prefixed_keyname, mode)); + return Status::OK(); } /** @@ -146,14 +172,15 @@ int PublishTableAdd(RedisModuleCtx *ctx, RedisModuleString *pubsub_channel_str, } // Publish the data to any clients who requested notifications on this key. - RedisModuleKey *notification_key = - OpenBroadcastKey(ctx, pubsub_channel_str, id, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModuleKey *notification_key; + REPLY_AND_RETURN_IF_NOT_OK(OpenBroadcastKey(¬ification_key, ctx, pubsub_channel_str, + id, REDISMODULE_READ | REDISMODULE_WRITE)); if (RedisModule_KeyType(notification_key) != REDISMODULE_KEYTYPE_EMPTY) { // NOTE(swang): Sets are not implemented yet, so we use ZSETs instead. - CHECK_ERROR( - RedisModule_ZsetFirstInScoreRange(notification_key, REDISMODULE_NEGATIVE_INFINITE, - REDISMODULE_POSITIVE_INFINITE, 1, 1), - "Unable to initialize zset iterator"); + REPLY_AND_RETURN_IF_FALSE(RedisModule_ZsetFirstInScoreRange( + notification_key, REDISMODULE_NEGATIVE_INFINITE, + REDISMODULE_POSITIVE_INFINITE, 1, 1) == REDISMODULE_OK, + "Unable to initialize zset iterator"); for (; !RedisModule_ZsetRangeEndReached(notification_key); RedisModule_ZsetRangeNext(notification_key)) { RedisModuleString *client_channel = @@ -183,8 +210,9 @@ int TableAdd_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, RedisModuleString *id = argv[3]; RedisModuleString *data = argv[4]; - RedisModuleKey *key = OpenPrefixedKey( - ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str); + RedisModuleKey *key; + REPLY_AND_RETURN_IF_NOT_OK(OpenPrefixedKey( + &key, ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str)); RedisModule_StringSet(key, data); return REDISMODULE_OK; } @@ -197,7 +225,8 @@ int TableAdd_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModuleString *id = argv[3]; RedisModuleString *data = argv[4]; - TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str); + TablePubsub pubsub_channel; + REPLY_AND_RETURN_IF_NOT_OK(ParseTablePubsub(&pubsub_channel, pubsub_channel_str)); if (pubsub_channel != TablePubsub::NO_PUBLISH) { // All other pubsub channels write the data back directly onto the channel. @@ -251,17 +280,19 @@ int TableAppend_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, } // Set the keys in the table. - RedisModuleKey *key = OpenPrefixedKey( - ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str); + RedisModuleKey *key; + REPLY_AND_RETURN_IF_NOT_OK(OpenPrefixedKey( + &key, ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str)); // Determine the index at which the data should be appended. If no index is // requested, then is the current length of the log. size_t index = RedisModule_ValueLength(key); if (index_str != nullptr) { // Parse the requested index. long long requested_index; - RAY_CHECK(RedisModule_StringToLongLong(index_str, &requested_index) == - REDISMODULE_OK); - RAY_CHECK(requested_index >= 0); + REPLY_AND_RETURN_IF_FALSE( + RedisModule_StringToLongLong(index_str, &requested_index) == REDISMODULE_OK, + "Index is not a number."); + REPLY_AND_RETURN_IF_FALSE(requested_index >= 0, "Index is less than 0."); index = static_cast(requested_index); } // Only perform the append if the requested index matches the current length @@ -288,10 +319,13 @@ int TableAppend_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, std::string postfix = std::to_string(index); RedisModuleString *new_id = RedisString_Format(ctx, "%S:%b", id, postfix.data(), postfix.size()); - RedisModuleKey *new_key = OpenPrefixedKey( - ctx, prefix_str, new_id, REDISMODULE_READ | REDISMODULE_WRITE, mutated_key_str); + RedisModuleKey *new_key; + REPLY_AND_RETURN_IF_NOT_OK(OpenPrefixedKey(&new_key, ctx, prefix_str, new_id, + REDISMODULE_READ | REDISMODULE_WRITE, + mutated_key_str)); RedisModule_ZsetAdd(new_key, index, data, &flags); - RAY_CHECK(flags == REDISMODULE_ZADD_ADDED); + REPLY_AND_RETURN_IF_FALSE(flags == REDISMODULE_ZADD_ADDED, + "Appended a duplicate entry"); } return REDISMODULE_OK; } else { @@ -308,7 +342,8 @@ int TableAppend_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int /*a RedisModuleString *id = argv[3]; RedisModuleString *data = argv[4]; // Publish a message on the requested pubsub channel if necessary. - TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str); + TablePubsub pubsub_channel; + REPLY_AND_RETURN_IF_NOT_OK(ParseTablePubsub(&pubsub_channel, pubsub_channel_str)); if (pubsub_channel != TablePubsub::NO_PUBLISH) { // All other pubsub channels write the data back directly onto the // channel. @@ -339,11 +374,9 @@ int TableAppend_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int /*a /// \return OK if the append succeeds, or an error message string if the append /// fails. int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - RedisModule_AutoMemory(ctx); - const int status = TableAppend_DoWrite(ctx, argv, argc, - /*mutated_key_str=*/nullptr); - if (status) { - return status; + if (TableAppend_DoWrite(ctx, argv, argc, /*mutated_key_str=*/nullptr) != + REDISMODULE_OK) { + return REDISMODULE_ERR; } return TableAppend_DoPublish(ctx, argv, argc); } @@ -360,8 +393,8 @@ int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, /// A helper function to create and finish a GcsTableEntry, based on the /// current value or values at the given key. -void TableEntryToFlatbuf(RedisModuleKey *table_key, RedisModuleString *entry_id, - flatbuffers::FlatBufferBuilder &fbb) { +Status TableEntryToFlatbuf(RedisModuleKey *table_key, RedisModuleString *entry_id, + flatbuffers::FlatBufferBuilder &fbb) { auto key_type = RedisModule_KeyType(table_key); switch (key_type) { case REDISMODULE_KEYTYPE_STRING: { @@ -375,9 +408,11 @@ void TableEntryToFlatbuf(RedisModuleKey *table_key, RedisModuleString *entry_id, } break; case REDISMODULE_KEYTYPE_ZSET: { // Build the flatbuffer from the set of log entries. - RAY_CHECK(RedisModule_ZsetFirstInScoreRange(table_key, REDISMODULE_NEGATIVE_INFINITE, - REDISMODULE_POSITIVE_INFINITE, 1, - 1) == REDISMODULE_OK); + if (RedisModule_ZsetFirstInScoreRange(table_key, REDISMODULE_NEGATIVE_INFINITE, + REDISMODULE_POSITIVE_INFINITE, 1, + 1) != REDISMODULE_OK) { + return Status::RedisError("Empty zset or wrong type"); + } std::vector> data; for (; !RedisModule_ZsetRangeEndReached(table_key); RedisModule_ZsetRangeNext(table_key)) { @@ -395,8 +430,9 @@ void TableEntryToFlatbuf(RedisModuleKey *table_key, RedisModuleString *entry_id, fbb.Finish(message); } break; default: - RAY_LOG(FATAL) << "Invalid Redis type during lookup: " << key_type; + return Status::RedisError("Invalid Redis type during lookup."); } + return Status::OK(); } /// Lookup the current value or values at a key. Returns the current value or @@ -423,13 +459,15 @@ int TableLookup_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int RedisModuleString *id = argv[3]; // Lookup the data at the key. - RedisModuleKey *table_key = OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ); + RedisModuleKey *table_key; + REPLY_AND_RETURN_IF_NOT_OK( + OpenPrefixedKey(&table_key, ctx, prefix_str, id, REDISMODULE_READ)); if (table_key == nullptr) { RedisModule_ReplyWithNull(ctx); } else { // Serialize the data to a flatbuffer to return to the client. flatbuffers::FlatBufferBuilder fbb; - TableEntryToFlatbuf(table_key, id, fbb); + REPLY_AND_RETURN_IF_NOT_OK(TableEntryToFlatbuf(table_key, id, fbb)); RedisModule_ReplyWithStringBuffer( ctx, reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); } @@ -465,23 +503,28 @@ int TableRequestNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleStrin RedisModuleString *pubsub_channel_str = argv[2]; RedisModuleString *id = argv[3]; RedisModuleString *client_id = argv[4]; - RedisModuleString *client_channel = - FormatPubsubChannel(ctx, pubsub_channel_str, client_id); + RedisModuleString *client_channel; + REPLY_AND_RETURN_IF_NOT_OK( + FormatPubsubChannel(&client_channel, ctx, pubsub_channel_str, client_id)); // Add this client to the set of clients that should be notified when there // are changes to the key. - RedisModuleKey *notification_key = - OpenBroadcastKey(ctx, pubsub_channel_str, id, REDISMODULE_READ | REDISMODULE_WRITE); - CHECK_ERROR(RedisModule_ZsetAdd(notification_key, 0.0, client_channel, NULL), - "ZsetAdd failed."); + RedisModuleKey *notification_key; + REPLY_AND_RETURN_IF_NOT_OK(OpenBroadcastKey(¬ification_key, ctx, pubsub_channel_str, + id, REDISMODULE_READ | REDISMODULE_WRITE)); + REPLY_AND_RETURN_IF_FALSE( + RedisModule_ZsetAdd(notification_key, 0.0, client_channel, NULL) == REDISMODULE_OK, + "ZsetAdd failed."); // Lookup the current value at the key. - RedisModuleKey *table_key = OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ); + RedisModuleKey *table_key; + REPLY_AND_RETURN_IF_NOT_OK( + OpenPrefixedKey(&table_key, ctx, prefix_str, id, REDISMODULE_READ)); // Publish the current value at the key to the client that is requesting // notifications. An empty notification will be published if the key is // empty. flatbuffers::FlatBufferBuilder fbb; - TableEntryToFlatbuf(table_key, id, fbb); + REPLY_AND_RETURN_IF_NOT_OK(TableEntryToFlatbuf(table_key, id, fbb)); RedisModule_Call(ctx, "PUBLISH", "sb", client_channel, reinterpret_cast(fbb.GetBufferPointer()), fbb.GetSize()); @@ -515,19 +558,23 @@ int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString RedisModuleString *pubsub_channel_str = argv[2]; RedisModuleString *id = argv[3]; RedisModuleString *client_id = argv[4]; - RedisModuleString *client_channel = - FormatPubsubChannel(ctx, pubsub_channel_str, client_id); + RedisModuleString *client_channel; + REPLY_AND_RETURN_IF_NOT_OK( + FormatPubsubChannel(&client_channel, ctx, pubsub_channel_str, client_id)); // Remove this client from the set of clients that should be notified when // there are changes to the key. - RedisModuleKey *notification_key = - OpenBroadcastKey(ctx, pubsub_channel_str, id, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModuleKey *notification_key; + REPLY_AND_RETURN_IF_NOT_OK(OpenBroadcastKey(¬ification_key, ctx, pubsub_channel_str, + id, REDISMODULE_READ | REDISMODULE_WRITE)); if (RedisModule_KeyType(notification_key) != REDISMODULE_KEYTYPE_EMPTY) { - RAY_CHECK(RedisModule_ZsetRem(notification_key, client_channel, NULL) == - REDISMODULE_OK); + REPLY_AND_RETURN_IF_FALSE( + RedisModule_ZsetRem(notification_key, client_channel, NULL) == REDISMODULE_OK, + "not opened for writing or wrong type."); size_t size = RedisModule_ValueLength(notification_key); if (size == 0) { - CHECK_ERROR(RedisModule_DeleteKey(notification_key), "Unable to delete zset key."); + REPLY_AND_RETURN_IF_FALSE(RedisModule_DeleteKey(notification_key) == REDISMODULE_OK, + "Unable to delete zset key."); } } @@ -535,15 +582,18 @@ int TableCancelNotifications_RedisCommand(RedisModuleCtx *ctx, RedisModuleString return REDISMODULE_OK; } -bool is_nil(const std::string &data) { - RAY_CHECK(data.size() == kUniqueIDSize); +Status is_nil(bool *out, const std::string &data) { + if (data.size() != kUniqueIDSize) { + return Status::RedisError("Size of data doesn't match size of UniqueID"); + } const uint8_t *d = reinterpret_cast(data.data()); for (int i = 0; i < kUniqueIDSize; ++i) { if (d[i] != 255) { - return false; + *out = false; } } - return true; + *out = true; + return Status::OK(); } // This is a temporary redis command that will be removed once @@ -560,8 +610,9 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg RedisModuleString *id = argv[3]; RedisModuleString *update_data = argv[4]; - RedisModuleKey *key = - OpenPrefixedKey(ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE); + RedisModuleKey *key; + REPLY_AND_RETURN_IF_NOT_OK( + OpenPrefixedKey(&key, ctx, prefix_str, id, REDISMODULE_READ | REDISMODULE_WRITE)); size_t value_len = 0; char *value_buf = RedisModule_StringDMA(key, &value_len, REDISMODULE_READ); @@ -577,15 +628,18 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg bool do_update = static_cast(data->scheduling_state()) & static_cast(update->test_state_bitmask()); - if (!is_nil(update->test_scheduler_id()->str())) { + bool is_nil_result; + REPLY_AND_RETURN_IF_NOT_OK(is_nil(&is_nil_result, update->test_scheduler_id()->str())); + if (!is_nil_result) { do_update = do_update && update->test_scheduler_id()->str() == data->scheduler_id()->str(); } if (do_update) { - RAY_CHECK(data->mutate_scheduling_state(update->update_state())); + REPLY_AND_RETURN_IF_FALSE(data->mutate_scheduling_state(update->update_state()), + "mutate_scheduling_state failed"); } - RAY_CHECK(data->mutate_updated(do_update)); + REPLY_AND_RETURN_IF_FALSE(data->mutate_updated(do_update), "mutate_updated failed"); int result = RedisModule_ReplyWithStringBuffer(ctx, value_buf, value_len); diff --git a/test/failure_test.py b/test/failure_test.py index 0a5a6255e..b231d8b62 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -2,19 +2,20 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import numpy as np import json import os -import ray +import pytest import sys import tempfile import threading import time +import numpy as np +import redis + +import ray import ray.ray_constants as ray_constants from ray.utils import _random_string -import pytest - from ray.test.cluster_utils import Cluster @@ -618,6 +619,44 @@ def test_warning_for_too_many_nested_tasks(shutdown_only): wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1) +def test_redis_module_failure(shutdown_only): + address_info = ray.init(num_cpus=1) + redis_address = address_info["redis_address"] + redis_address = redis_address.split(":") + assert len(redis_address) == 2 + + def run_failure_test(expecting_message, *command): + with pytest.raises( + Exception, match=".*{}.*".format(expecting_message)): + client = redis.StrictRedis( + host=redis_address[0], port=int(redis_address[1])) + client.execute_command(*command) + + def run_one_command(*command): + client = redis.StrictRedis( + host=redis_address[0], port=int(redis_address[1])) + client.execute_command(*command) + + run_failure_test("wrong number of arguments", "RAY.TABLE_ADD", 13) + run_failure_test("Prefix must be in the TablePrefix range", + "RAY.TABLE_ADD", 100000, 1, 1, 1) + run_failure_test("Prefix must be in the TablePrefix range", + "RAY.TABLE_REQUEST_NOTIFICATIONS", 100000, 1, 1, 1) + run_failure_test("Prefix must be a valid TablePrefix integer", + "RAY.TABLE_ADD", b"a", 1, 1, 1) + run_failure_test("Pubsub channel must be in the TablePubsub range", + "RAY.TABLE_ADD", 1, 10000, 1, 1) + run_failure_test("Pubsub channel must be a valid integer", "RAY.TABLE_ADD", + 1, b"a", 1, 1) + run_failure_test("Index is less than 0.", "RAY.TABLE_APPEND", 1, 1, 1, 1, + -1) + run_failure_test("Index is not a number.", "RAY.TABLE_APPEND", 1, 1, 1, 1, + b"a") + run_one_command("RAY.TABLE_APPEND", 1, 1, 1, 1) + run_failure_test("Appended a duplicate entry", "RAY.TABLE_APPEND", 1, 1, 1, + 1, 1) + + @pytest.fixture def ray_start_two_nodes(): # Start the Ray processes.