diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index cc28f0374..fad8e1241 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -2840,3 +2840,23 @@ def test_shutdown_disconnect_global_state(): with pytest.raises(Exception) as e: ray.global_state.object_table() assert str(e.value).endswith("ray.init has been called.") + + +@pytest.mark.parametrize( + "ray_start_object_store_memory", [10**8], indirect=True) +def test_redis_lru_with_set(ray_start_object_store_memory): + x = np.zeros(8 * 10**7, dtype=np.uint8) + x_id = ray.put(x) + + # Remove the object from the object table to simulate Redis LRU eviction. + removed = False + start_time = time.time() + while time.time() < start_time + 10: + if ray.global_state.redis_clients[0].delete(b"OBJECT" + + x_id.binary()) == 1: + removed = True + break + assert removed + + # Now evict the object from the object store. + ray.put(x) # This should not crash. diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 91b3228c6..c292c1f62 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -624,14 +624,17 @@ def test_redis_module_failure(ray_start_regular): -1) run_failure_test("Index is not a number.", "RAY.TABLE_APPEND", 1, 1, 2, 1, b"a") - run_failure_test("The entry to remove doesn't exist.", "RAY.SET_REMOVE", 1, - 1, 3, 1) run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1) # It's okay to add duplicate entries. run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1) run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 0) run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 1) run_one_command("RAY.SET_ADD", 1, 1, 3, 1) + # It's okey to add duplicate entries. + run_one_command("RAY.SET_ADD", 1, 1, 3, 1) + run_one_command("RAY.SET_REMOVE", 1, 1, 3, 1) + # It's okey to remove duplicate entries. + run_one_command("RAY.SET_REMOVE", 1, 1, 3, 1) # Note that this test will take at least 10 seconds because it must wait for diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 0d1f812a5..b94f67a83 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -690,7 +690,12 @@ void TestSetSubscribeAll(const JobID &job_id, for (size_t j = 0; j < managers.size(); j++) { auto data = std::make_shared(); data->manager = managers[j]; - RAY_CHECK_OK(client->object_table().Remove(job_id, object_ids[i], data, nullptr)); + for (int k = 0; k < 3; k++) { + // Remove the same entry several times. + // Expect no notification if the entry doesn't exist. + RAY_CHECK_OK( + client->object_table().Remove(job_id, object_ids[i], data, nullptr)); + } } } }; diff --git a/src/ray/gcs/redis_module/ray_redis_module.cc b/src/ray/gcs/redis_module/ray_redis_module.cc index f1fa99a0f..002ef4826 100644 --- a/src/ray/gcs/redis_module/ray_redis_module.cc +++ b/src/ray/gcs/redis_module/ray_redis_module.cc @@ -165,14 +165,12 @@ Status GetBroadcastKey(RedisModuleCtx *ctx, RedisModuleString *pubsub_channel_st return Status::OK(); } -/** - * This is a helper method to convert a redis module string to a flatbuffer - * string. - * - * @param fbb The flatbuffer builder. - * @param redis_string The redis string. - * @return The flatbuffer string. - */ +/// This is a helper method to convert a redis module string to a flatbuffer +/// string. +/// +/// \param fbb The flatbuffer builder. +/// \param redis_string The redis string. +/// \return The flatbuffer string. flatbuffers::Offset RedisStringToFlatbuf( flatbuffers::FlatBufferBuilder &fbb, RedisModuleString *redis_string) { size_t redis_string_size; @@ -186,8 +184,8 @@ flatbuffers::Offset RedisStringToFlatbuf( /// has requested notifications for this key. /// /// \param pubsub_channel_str The pubsub channel name that notifications for -/// this key should be published to. When publishing to a specific -/// client, the channel name should be :. +/// this key should be published to. When publishing to a specific client, the +/// channel name should be :. /// \param id The ID of the key that the notification is about. /// \param mode the update mode, such as append or remove. /// \param data The appended/removed data. @@ -285,8 +283,8 @@ int TableAdd_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) /// /// \param table_prefix The prefix string for keys in this table. /// \param pubsub_channel The pubsub channel name that notifications for -/// this key should be published to. When publishing to a specific -/// client, the channel name should be :. +/// this key should be published to. When publishing to a specific client, the +/// channel name should be :. /// \param id The ID of the key to set. /// \param data The data to insert at the key. /// \return The current value at the key, or OK if there is no value. @@ -384,17 +382,16 @@ int TableAppend_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, int /*a /// /// /// \param table_prefix The prefix string for keys in this table. -/// \param pubsub_channel The pubsub channel name that notifications for -/// this key should be published to. When publishing to a specific -/// client, the channel name should be :. +/// \param pubsub_channel The pubsub channel name that notifications for this +/// key should be published to. When publishing to a specific client, the +/// channel name should be :. /// \param id The ID of the key to append to. /// \param data The data to append to the key. /// \param index If this is set, then the data must be appended at this index. -/// If the current log is shorter or longer than the requested index, -/// then the append will fail and an error message will be returned as a -/// string. +/// If the current log is shorter or longer than the requested index, then the +/// append will fail and an error message will be returned as a string. /// \return OK if the append succeeds, or an error message string if the append -/// fails. +/// fails. int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (TableAppend_DoWrite(ctx, argv, argc, /*mutated_key_str=*/nullptr) != REDISMODULE_OK) { @@ -476,13 +473,12 @@ int Set_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, bool is /// RAY.SET_ADD /// /// \param table_prefix The prefix string for keys in this set. -/// \param pubsub_channel The pubsub channel name that notifications for -/// this key should be published to. When publishing to a specific -/// client, the channel name should be :. +/// \param pubsub_channel The pubsub channel name that notifications for this +/// key should be published to. When publishing to a specific client, the +/// channel name should be :. /// \param id The ID of the key to add to. /// \param data The data to add to the key. -/// \return OK if the add succeeds, or an error message string if the add -/// fails. +/// \return OK if the add succeeds, or an error message string if the add fails. int SetAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { bool changed; if (Set_DoWrite(ctx, argv, argc, /*is_add=*/true, &changed) != REDISMODULE_OK) { @@ -491,7 +487,7 @@ int SetAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (changed) { return Set_DoPublish(ctx, argv, /*is_add=*/true); } - return REDISMODULE_OK; + return RedisModule_ReplyWithSimpleString(ctx, "OK"); } /// Remove an entry from the set stored at a key. Publishes a notification about @@ -502,20 +498,24 @@ int SetAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) /// RAY.SET_REMOVE /// /// \param table_prefix The prefix string for keys in this table. -/// \param pubsub_channel The pubsub channel name that notifications for -/// this key should be published to. When publishing to a specific -/// client, the channel name should be :. +/// \param pubsub_channel The pubsub channel name that notifications for this +/// key should be published to. When publishing to a specific client, the +/// channel name should be :. /// \param id The ID of the key to remove from. /// \param data The data to remove from the key. /// \return OK if the remove succeeds, or an error message string if the remove -/// fails. +/// fails. int SetRemove_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { bool changed; if (Set_DoWrite(ctx, argv, argc, /*is_add=*/false, &changed) != REDISMODULE_OK) { return REDISMODULE_ERR; } - REPLY_AND_RETURN_IF_FALSE(changed, "ERR The entry to remove doesn't exist."); - return Set_DoPublish(ctx, argv, /*is_add=*/false); + if (changed) { + return Set_DoPublish(ctx, argv, /*is_add=*/false); + } else { + RAY_LOG(ERROR) << "The entry to remove doesn't exist."; + } + return RedisModule_ReplyWithSimpleString(ctx, "OK"); } /// A helper function to create and finish a GcsTableEntry, based on the @@ -888,8 +888,8 @@ AUTO_MEMORY(ChainTableAppend_RedisCommand); extern "C" { -/* This function must be present on each Redis module. It is used in order to - * register the commands into the Redis server. */ +/// This function must be present on each Redis module. It is used in order to +/// register the commands into the Redis server. int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -967,4 +967,4 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } -} /* extern "C" */ +} /// extern "C"