diff --git a/BUILD.bazel b/BUILD.bazel index a1e48e178..ad6bd083f 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -2073,6 +2073,18 @@ cc_library( ], ) +cc_test( + name = "callback_reply_test", + size = "small", + srcs = ["src/ray/gcs/test/callback_reply_test.cc"], + copts = COPTS, + tags = ["team:core"], + deps = [ + ":gcs", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "asio_test", size = "small", diff --git a/src/ray/gcs/gcs_server/gcs_kv_manager.cc b/src/ray/gcs/gcs_server/gcs_kv_manager.cc index 653418374..c1cba600e 100644 --- a/src/ray/gcs/gcs_server/gcs_kv_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_kv_manager.cc @@ -81,7 +81,8 @@ void GcsInternalKVManager::HandleInternalKVKeys( cmd, [reply, send_reply_callback](auto redis_reply) { const auto &results = redis_reply->ReadAsStringArray(); for (const auto &result : results) { - reply->add_results(result); + RAY_CHECK(result.has_value()); + reply->add_results(*result); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); })); diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 71a0c8c46..6ce1157a2 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -91,30 +91,31 @@ CallbackReply::CallbackReply(redisReply *redis_reply) : reply_type_(redis_reply- break; } redisReply *message_type = redis_reply->element[0]; - if (strcmp(message_type->str, "subscribe") == 0 || - strcmp(message_type->str, "psubscribe") == 0) { - is_subscribe_callback_ = true; - // If the message is for the initial subscription call, return the empty - // string as a response to signify that subscription was successful. - } else if (strcmp(message_type->str, "punsubscribe") == 0 || - strcmp(message_type->str, "unsubscribe") == 0) { - is_unsubscribe_callback_ = true; - } else if (strcmp(message_type->str, "message") == 0) { - // If the message is from a PUBLISH, make sure the data is nonempty. - redisReply *message = redis_reply->element[redis_reply->elements - 1]; - // data is a notification message. - string_reply_ = std::string(message->str, message->len); - RAY_CHECK(!string_reply_.empty()) << "Empty message received on subscribe channel."; - } else if (strcmp(message_type->str, "pmessage") == 0) { - // If the message is from a PUBLISH, make sure the data is nonempty. - redisReply *message = redis_reply->element[redis_reply->elements - 1]; - // data is a notification message. - string_reply_ = std::string(message->str, message->len); - RAY_CHECK(!string_reply_.empty()) << "Empty message received on subscribe channel."; - } else { - // Array replies are used for scan or get. - ParseAsStringArrayOrScanArray(redis_reply); + if (message_type->type == REDIS_REPLY_STRING) { + if (strcmp(message_type->str, "subscribe") == 0 || + strcmp(message_type->str, "psubscribe") == 0) { + // If the message is for the initial subscription call, return the empty + // string as a response to signify that subscription was successful. + is_subscribe_callback_ = true; + break; + } else if (strcmp(message_type->str, "punsubscribe") == 0 || + strcmp(message_type->str, "unsubscribe") == 0) { + is_unsubscribe_callback_ = true; + break; + } else if (strcmp(message_type->str, "message") == 0 || + strcmp(message_type->str, "pmessage") == 0) { + // If the message is from a PUBLISH, make sure the data is nonempty. + redisReply *message = redis_reply->element[redis_reply->elements - 1]; + // data is a notification message. + string_reply_ = std::string(message->str, message->len); + RAY_CHECK(!string_reply_.empty()) + << "Empty message received on subscribe channel."; + break; + } } + + // Array replies are used for scan or get. + ParseAsStringArrayOrScanArray(redis_reply); break; } default: { @@ -140,7 +141,7 @@ void CallbackReply::ParseAsStringArrayOrScanArray(redisReply *redis_reply) { auto *entry = array_entry->element[i]; RAY_CHECK(REDIS_REPLY_STRING == entry->type) << "Unexcepted type: " << entry->type; - string_array_reply_.push_back(std::string(entry->str, entry->len)); + string_array_reply_.emplace_back(std::string(entry->str, entry->len)); } return; } @@ -154,8 +155,12 @@ void CallbackReply::ParseAsStringArray(redisReply *redis_reply) { string_array_reply_.reserve(array_size); for (size_t i = 0; i < array_size; ++i) { auto *entry = redis_reply->element[i]; - RAY_CHECK(REDIS_REPLY_STRING == entry->type) << "Unexcepted type: " << entry->type; - string_array_reply_.push_back(std::string(entry->str, entry->len)); + if (entry->type == REDIS_REPLY_STRING) { + string_array_reply_.emplace_back(std::string(entry->str, entry->len)); + } else { + RAY_CHECK(REDIS_REPLY_NIL == entry->type) << "Unexcepted type: " << entry->type; + string_array_reply_.emplace_back(); + } } } @@ -183,11 +188,16 @@ const std::string &CallbackReply::ReadAsPubsubData() const { size_t CallbackReply::ReadAsScanArray(std::vector *array) const { RAY_CHECK(reply_type_ == REDIS_REPLY_ARRAY) << "Unexpected type: " << reply_type_; - *array = string_array_reply_; + array->clear(); + array->reserve(string_array_reply_.size()); + for (const auto &element : string_array_reply_) { + RAY_CHECK(element.has_value()); + array->emplace_back(*element); + } return next_scan_cursor_reply_; } -const std::vector &CallbackReply::ReadAsStringArray() const { +const std::vector> &CallbackReply::ReadAsStringArray() const { RAY_CHECK(reply_type_ == REDIS_REPLY_ARRAY) << "Unexpected type: " << reply_type_; return string_array_reply_; } diff --git a/src/ray/gcs/redis_context.h b/src/ray/gcs/redis_context.h index 8b931e170..c7244aac8 100644 --- a/src/ray/gcs/redis_context.h +++ b/src/ray/gcs/redis_context.h @@ -62,7 +62,7 @@ class CallbackReply { const std::string &ReadAsPubsubData() const; /// Read this reply data as a string array. - const std::vector &ReadAsStringArray() const; + [[nodiscard]] const std::vector> &ReadAsStringArray() const; /// Read this reply data as a scan array. /// @@ -95,7 +95,7 @@ class CallbackReply { /// Reply data if reply_type_ is REDIS_REPLY_ARRAY. /// Represent the reply of StringArray or ScanArray. - std::vector string_array_reply_; + std::vector> string_array_reply_; bool is_subscribe_callback_ = false; bool is_unsubscribe_callback_ = false; diff --git a/src/ray/gcs/store_client/redis_store_client.cc b/src/ray/gcs/store_client/redis_store_client.cc index df3d93e72..5f8c4ebf6 100644 --- a/src/ray/gcs/store_client/redis_store_client.cc +++ b/src/ray/gcs/store_client/redis_store_client.cc @@ -334,9 +334,11 @@ Status RedisStoreClient::MGetValues( if (!reply->IsNil()) { auto value = reply->ReadAsStringArray(); // The 0 th element of mget_keys is "MGET", so we start from the 1 th element. - for (int index = 0; index < (int)value.size(); ++index) { - (*key_value_map)[GetKeyFromRedisKey(mget_keys[index + 1], table_name)] = - value[index]; + for (size_t index = 0; index < value.size(); ++index) { + if (value[index].has_value()) { + (*key_value_map)[GetKeyFromRedisKey(mget_keys[index + 1], table_name)] = + *(value[index]); + } } } diff --git a/src/ray/gcs/test/callback_reply_test.cc b/src/ray/gcs/test/callback_reply_test.cc new file mode 100644 index 000000000..42baff3f9 --- /dev/null +++ b/src/ray/gcs/test/callback_reply_test.cc @@ -0,0 +1,84 @@ +// Copyright 2021 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/gcs/redis_context.h" + +#include "gtest/gtest.h" + +extern "C" { +#include "hiredis/hiredis.h" +} + +namespace ray::gcs { +TEST(TestCallbackReply, TestParseAsStringArray) { + { + redisReply redis_reply_string1; + redis_reply_string1.type = REDIS_REPLY_STRING; + std::string string1 = "string1"; + redis_reply_string1.str = string1.data(); + redis_reply_string1.len = 7; + + redisReply redis_reply_string2; + redis_reply_string2.type = REDIS_REPLY_STRING; + std::string string2 = "string2"; + redis_reply_string2.str = string2.data(); + redis_reply_string2.len = 7; + + redisReply redis_reply_array; + redis_reply_array.type = REDIS_REPLY_ARRAY; + redis_reply_array.elements = 2; + redisReply *redis_reply_array_elements[2]; + redis_reply_array_elements[0] = &redis_reply_string1; + redis_reply_array_elements[1] = &redis_reply_string2; + redis_reply_array.element = redis_reply_array_elements; + CallbackReply callback_reply(&redis_reply_array); + ASSERT_EQ( + callback_reply.ReadAsStringArray(), + (std::vector>{std::optional(string1), + std::optional(string2)})); + } + + { + redisReply redis_reply_string1; + redis_reply_string1.type = REDIS_REPLY_STRING; + std::string string1 = "string1"; + redis_reply_string1.str = string1.data(); + redis_reply_string1.len = 7; + + redisReply redis_reply_nil1; + redis_reply_nil1.type = REDIS_REPLY_NIL; + redisReply redis_reply_nil2; + redis_reply_nil2.type = REDIS_REPLY_NIL; + + redisReply redis_reply_array; + redis_reply_array.type = REDIS_REPLY_ARRAY; + redis_reply_array.elements = 3; + redisReply *redis_reply_array_elements[3]; + redis_reply_array_elements[0] = &redis_reply_nil1; + redis_reply_array_elements[1] = &redis_reply_string1; + redis_reply_array_elements[2] = &redis_reply_nil2; + redis_reply_array.element = redis_reply_array_elements; + CallbackReply callback_reply(&redis_reply_array); + ASSERT_EQ(callback_reply.ReadAsStringArray(), + (std::vector>{ + std::optional(), std::optional(string1), + std::optional()})); + } +} +} // namespace ray::gcs + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file