mirror of
https://github.com/vale981/ray
synced 2025-03-08 11:31:40 -05:00
Redis returns OK when removing a non-existent set entry (#4434)
This commit is contained in:
parent
57c1aeb427
commit
c36d03874b
4 changed files with 65 additions and 37 deletions
|
@ -2840,3 +2840,23 @@ def test_shutdown_disconnect_global_state():
|
||||||
with pytest.raises(Exception) as e:
|
with pytest.raises(Exception) as e:
|
||||||
ray.global_state.object_table()
|
ray.global_state.object_table()
|
||||||
assert str(e.value).endswith("ray.init has been called.")
|
assert str(e.value).endswith("ray.init has been called.")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"ray_start_object_store_memory", [10**8], indirect=True)
|
||||||
|
def test_redis_lru_with_set(ray_start_object_store_memory):
|
||||||
|
x = np.zeros(8 * 10**7, dtype=np.uint8)
|
||||||
|
x_id = ray.put(x)
|
||||||
|
|
||||||
|
# Remove the object from the object table to simulate Redis LRU eviction.
|
||||||
|
removed = False
|
||||||
|
start_time = time.time()
|
||||||
|
while time.time() < start_time + 10:
|
||||||
|
if ray.global_state.redis_clients[0].delete(b"OBJECT" +
|
||||||
|
x_id.binary()) == 1:
|
||||||
|
removed = True
|
||||||
|
break
|
||||||
|
assert removed
|
||||||
|
|
||||||
|
# Now evict the object from the object store.
|
||||||
|
ray.put(x) # This should not crash.
|
||||||
|
|
|
@ -624,14 +624,17 @@ def test_redis_module_failure(ray_start_regular):
|
||||||
-1)
|
-1)
|
||||||
run_failure_test("Index is not a number.", "RAY.TABLE_APPEND", 1, 1, 2, 1,
|
run_failure_test("Index is not a number.", "RAY.TABLE_APPEND", 1, 1, 2, 1,
|
||||||
b"a")
|
b"a")
|
||||||
run_failure_test("The entry to remove doesn't exist.", "RAY.SET_REMOVE", 1,
|
|
||||||
1, 3, 1)
|
|
||||||
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1)
|
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1)
|
||||||
# It's okay to add duplicate entries.
|
# It's okay to add duplicate entries.
|
||||||
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1)
|
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1)
|
||||||
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 0)
|
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 0)
|
||||||
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 1)
|
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 1)
|
||||||
run_one_command("RAY.SET_ADD", 1, 1, 3, 1)
|
run_one_command("RAY.SET_ADD", 1, 1, 3, 1)
|
||||||
|
# It's okey to add duplicate entries.
|
||||||
|
run_one_command("RAY.SET_ADD", 1, 1, 3, 1)
|
||||||
|
run_one_command("RAY.SET_REMOVE", 1, 1, 3, 1)
|
||||||
|
# It's okey to remove duplicate entries.
|
||||||
|
run_one_command("RAY.SET_REMOVE", 1, 1, 3, 1)
|
||||||
|
|
||||||
|
|
||||||
# Note that this test will take at least 10 seconds because it must wait for
|
# Note that this test will take at least 10 seconds because it must wait for
|
||||||
|
|
|
@ -690,7 +690,12 @@ void TestSetSubscribeAll(const JobID &job_id,
|
||||||
for (size_t j = 0; j < managers.size(); j++) {
|
for (size_t j = 0; j < managers.size(); j++) {
|
||||||
auto data = std::make_shared<ObjectTableDataT>();
|
auto data = std::make_shared<ObjectTableDataT>();
|
||||||
data->manager = managers[j];
|
data->manager = managers[j];
|
||||||
RAY_CHECK_OK(client->object_table().Remove(job_id, object_ids[i], data, nullptr));
|
for (int k = 0; k < 3; k++) {
|
||||||
|
// Remove the same entry several times.
|
||||||
|
// Expect no notification if the entry doesn't exist.
|
||||||
|
RAY_CHECK_OK(
|
||||||
|
client->object_table().Remove(job_id, object_ids[i], data, nullptr));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -165,14 +165,12 @@ Status GetBroadcastKey(RedisModuleCtx *ctx, RedisModuleString *pubsub_channel_st
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// This is a helper method to convert a redis module string to a flatbuffer
|
||||||
* This is a helper method to convert a redis module string to a flatbuffer
|
/// string.
|
||||||
* string.
|
///
|
||||||
*
|
/// \param fbb The flatbuffer builder.
|
||||||
* @param fbb The flatbuffer builder.
|
/// \param redis_string The redis string.
|
||||||
* @param redis_string The redis string.
|
/// \return The flatbuffer string.
|
||||||
* @return The flatbuffer string.
|
|
||||||
*/
|
|
||||||
flatbuffers::Offset<flatbuffers::String> RedisStringToFlatbuf(
|
flatbuffers::Offset<flatbuffers::String> RedisStringToFlatbuf(
|
||||||
flatbuffers::FlatBufferBuilder &fbb, RedisModuleString *redis_string) {
|
flatbuffers::FlatBufferBuilder &fbb, RedisModuleString *redis_string) {
|
||||||
size_t redis_string_size;
|
size_t redis_string_size;
|
||||||
|
@ -186,8 +184,8 @@ flatbuffers::Offset<flatbuffers::String> RedisStringToFlatbuf(
|
||||||
/// has requested notifications for this key.
|
/// has requested notifications for this key.
|
||||||
///
|
///
|
||||||
/// \param pubsub_channel_str The pubsub channel name that notifications for
|
/// \param pubsub_channel_str The pubsub channel name that notifications for
|
||||||
/// this key should be published to. When publishing to a specific
|
/// this key should be published to. When publishing to a specific client, the
|
||||||
/// client, the channel name should be <pubsub_channel>:<client_id>.
|
/// channel name should be <pubsub_channel>:<client_id>.
|
||||||
/// \param id The ID of the key that the notification is about.
|
/// \param id The ID of the key that the notification is about.
|
||||||
/// \param mode the update mode, such as append or remove.
|
/// \param mode the update mode, such as append or remove.
|
||||||
/// \param data The appended/removed data.
|
/// \param data The appended/removed data.
|
||||||
|
@ -285,8 +283,8 @@ int TableAdd_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
///
|
///
|
||||||
/// \param table_prefix The prefix string for keys in this table.
|
/// \param table_prefix The prefix string for keys in this table.
|
||||||
/// \param pubsub_channel The pubsub channel name that notifications for
|
/// \param pubsub_channel The pubsub channel name that notifications for
|
||||||
/// this key should be published to. When publishing to a specific
|
/// this key should be published to. When publishing to a specific client, the
|
||||||
/// client, the channel name should be <pubsub_channel>:<client_id>.
|
/// channel name should be <pubsub_channel>:<client_id>.
|
||||||
/// \param id The ID of the key to set.
|
/// \param id The ID of the key to set.
|
||||||
/// \param data The data to insert at the key.
|
/// \param data The data to insert at the key.
|
||||||
/// \return The current value at the key, or OK if there is no value.
|
/// \return The current value at the key, or OK if there is no value.
|
||||||
|
@ -384,15 +382,14 @@ int TableAppend_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int /*a
|
||||||
/// <index (optional)>
|
/// <index (optional)>
|
||||||
///
|
///
|
||||||
/// \param table_prefix The prefix string for keys in this table.
|
/// \param table_prefix The prefix string for keys in this table.
|
||||||
/// \param pubsub_channel The pubsub channel name that notifications for
|
/// \param pubsub_channel The pubsub channel name that notifications for this
|
||||||
/// this key should be published to. When publishing to a specific
|
/// key should be published to. When publishing to a specific client, the
|
||||||
/// client, the channel name should be <pubsub_channel>:<client_id>.
|
/// channel name should be <pubsub_channel>:<client_id>.
|
||||||
/// \param id The ID of the key to append to.
|
/// \param id The ID of the key to append to.
|
||||||
/// \param data The data to append to the key.
|
/// \param data The data to append to the key.
|
||||||
/// \param index If this is set, then the data must be appended at this index.
|
/// \param index If this is set, then the data must be appended at this index.
|
||||||
/// If the current log is shorter or longer than the requested index,
|
/// If the current log is shorter or longer than the requested index, then the
|
||||||
/// then the append will fail and an error message will be returned as a
|
/// append will fail and an error message will be returned as a string.
|
||||||
/// string.
|
|
||||||
/// \return OK if the append succeeds, or an error message string if the append
|
/// \return OK if the append succeeds, or an error message string if the append
|
||||||
/// fails.
|
/// fails.
|
||||||
int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
@ -476,13 +473,12 @@ int Set_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, bool is
|
||||||
/// RAY.SET_ADD <table_prefix> <pubsub_channel> <id> <data>
|
/// RAY.SET_ADD <table_prefix> <pubsub_channel> <id> <data>
|
||||||
///
|
///
|
||||||
/// \param table_prefix The prefix string for keys in this set.
|
/// \param table_prefix The prefix string for keys in this set.
|
||||||
/// \param pubsub_channel The pubsub channel name that notifications for
|
/// \param pubsub_channel The pubsub channel name that notifications for this
|
||||||
/// this key should be published to. When publishing to a specific
|
/// key should be published to. When publishing to a specific client, the
|
||||||
/// client, the channel name should be <pubsub_channel>:<client_id>.
|
/// channel name should be <pubsub_channel>:<client_id>.
|
||||||
/// \param id The ID of the key to add to.
|
/// \param id The ID of the key to add to.
|
||||||
/// \param data The data to add to the key.
|
/// \param data The data to add to the key.
|
||||||
/// \return OK if the add succeeds, or an error message string if the add
|
/// \return OK if the add succeeds, or an error message string if the add fails.
|
||||||
/// fails.
|
|
||||||
int SetAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
int SetAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
bool changed;
|
bool changed;
|
||||||
if (Set_DoWrite(ctx, argv, argc, /*is_add=*/true, &changed) != REDISMODULE_OK) {
|
if (Set_DoWrite(ctx, argv, argc, /*is_add=*/true, &changed) != REDISMODULE_OK) {
|
||||||
|
@ -491,7 +487,7 @@ int SetAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
if (changed) {
|
if (changed) {
|
||||||
return Set_DoPublish(ctx, argv, /*is_add=*/true);
|
return Set_DoPublish(ctx, argv, /*is_add=*/true);
|
||||||
}
|
}
|
||||||
return REDISMODULE_OK;
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove an entry from the set stored at a key. Publishes a notification about
|
/// Remove an entry from the set stored at a key. Publishes a notification about
|
||||||
|
@ -502,9 +498,9 @@ int SetAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
/// RAY.SET_REMOVE <table_prefix> <pubsub_channel> <id> <data>
|
/// RAY.SET_REMOVE <table_prefix> <pubsub_channel> <id> <data>
|
||||||
///
|
///
|
||||||
/// \param table_prefix The prefix string for keys in this table.
|
/// \param table_prefix The prefix string for keys in this table.
|
||||||
/// \param pubsub_channel The pubsub channel name that notifications for
|
/// \param pubsub_channel The pubsub channel name that notifications for this
|
||||||
/// this key should be published to. When publishing to a specific
|
/// key should be published to. When publishing to a specific client, the
|
||||||
/// client, the channel name should be <pubsub_channel>:<client_id>.
|
/// channel name should be <pubsub_channel>:<client_id>.
|
||||||
/// \param id The ID of the key to remove from.
|
/// \param id The ID of the key to remove from.
|
||||||
/// \param data The data to remove from the key.
|
/// \param data The data to remove from the key.
|
||||||
/// \return OK if the remove succeeds, or an error message string if the remove
|
/// \return OK if the remove succeeds, or an error message string if the remove
|
||||||
|
@ -514,8 +510,12 @@ int SetRemove_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
|
||||||
if (Set_DoWrite(ctx, argv, argc, /*is_add=*/false, &changed) != REDISMODULE_OK) {
|
if (Set_DoWrite(ctx, argv, argc, /*is_add=*/false, &changed) != REDISMODULE_OK) {
|
||||||
return REDISMODULE_ERR;
|
return REDISMODULE_ERR;
|
||||||
}
|
}
|
||||||
REPLY_AND_RETURN_IF_FALSE(changed, "ERR The entry to remove doesn't exist.");
|
if (changed) {
|
||||||
return Set_DoPublish(ctx, argv, /*is_add=*/false);
|
return Set_DoPublish(ctx, argv, /*is_add=*/false);
|
||||||
|
} else {
|
||||||
|
RAY_LOG(ERROR) << "The entry to remove doesn't exist.";
|
||||||
|
}
|
||||||
|
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A helper function to create and finish a GcsTableEntry, based on the
|
/// A helper function to create and finish a GcsTableEntry, based on the
|
||||||
|
@ -888,8 +888,8 @@ AUTO_MEMORY(ChainTableAppend_RedisCommand);
|
||||||
|
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
|
||||||
/* This function must be present on each Redis module. It is used in order to
|
/// This function must be present on each Redis module. It is used in order to
|
||||||
* register the commands into the Redis server. */
|
/// register the commands into the Redis server.
|
||||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
REDISMODULE_NOT_USED(argv);
|
REDISMODULE_NOT_USED(argv);
|
||||||
REDISMODULE_NOT_USED(argc);
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
@ -967,4 +967,4 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
} /* extern "C" */
|
} /// extern "C"
|
||||||
|
|
Loading…
Add table
Reference in a new issue