Fix nil redis array element (#18813)

This commit is contained in:
Jiajun Yao 2021-09-24 20:11:43 -07:00 committed by GitHub
parent 3d9c973861
commit e79f271b05
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 143 additions and 34 deletions

View file

@ -2073,6 +2073,18 @@ cc_library(
],
)
cc_test(
name = "callback_reply_test",
size = "small",
srcs = ["src/ray/gcs/test/callback_reply_test.cc"],
copts = COPTS,
tags = ["team:core"],
deps = [
":gcs",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "asio_test",
size = "small",

View file

@ -81,7 +81,8 @@ void GcsInternalKVManager::HandleInternalKVKeys(
cmd, [reply, send_reply_callback](auto redis_reply) {
const auto &results = redis_reply->ReadAsStringArray();
for (const auto &result : results) {
reply->add_results(result);
RAY_CHECK(result.has_value());
reply->add_results(*result);
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}));

View file

@ -91,30 +91,31 @@ CallbackReply::CallbackReply(redisReply *redis_reply) : reply_type_(redis_reply-
break;
}
redisReply *message_type = redis_reply->element[0];
if (strcmp(message_type->str, "subscribe") == 0 ||
strcmp(message_type->str, "psubscribe") == 0) {
is_subscribe_callback_ = true;
// If the message is for the initial subscription call, return the empty
// string as a response to signify that subscription was successful.
} else if (strcmp(message_type->str, "punsubscribe") == 0 ||
strcmp(message_type->str, "unsubscribe") == 0) {
is_unsubscribe_callback_ = true;
} else if (strcmp(message_type->str, "message") == 0) {
// If the message is from a PUBLISH, make sure the data is nonempty.
redisReply *message = redis_reply->element[redis_reply->elements - 1];
// data is a notification message.
string_reply_ = std::string(message->str, message->len);
RAY_CHECK(!string_reply_.empty()) << "Empty message received on subscribe channel.";
} else if (strcmp(message_type->str, "pmessage") == 0) {
// If the message is from a PUBLISH, make sure the data is nonempty.
redisReply *message = redis_reply->element[redis_reply->elements - 1];
// data is a notification message.
string_reply_ = std::string(message->str, message->len);
RAY_CHECK(!string_reply_.empty()) << "Empty message received on subscribe channel.";
} else {
// Array replies are used for scan or get.
ParseAsStringArrayOrScanArray(redis_reply);
if (message_type->type == REDIS_REPLY_STRING) {
if (strcmp(message_type->str, "subscribe") == 0 ||
strcmp(message_type->str, "psubscribe") == 0) {
// If the message is for the initial subscription call, return the empty
// string as a response to signify that subscription was successful.
is_subscribe_callback_ = true;
break;
} else if (strcmp(message_type->str, "punsubscribe") == 0 ||
strcmp(message_type->str, "unsubscribe") == 0) {
is_unsubscribe_callback_ = true;
break;
} else if (strcmp(message_type->str, "message") == 0 ||
strcmp(message_type->str, "pmessage") == 0) {
// If the message is from a PUBLISH, make sure the data is nonempty.
redisReply *message = redis_reply->element[redis_reply->elements - 1];
// data is a notification message.
string_reply_ = std::string(message->str, message->len);
RAY_CHECK(!string_reply_.empty())
<< "Empty message received on subscribe channel.";
break;
}
}
// Array replies are used for scan or get.
ParseAsStringArrayOrScanArray(redis_reply);
break;
}
default: {
@ -140,7 +141,7 @@ void CallbackReply::ParseAsStringArrayOrScanArray(redisReply *redis_reply) {
auto *entry = array_entry->element[i];
RAY_CHECK(REDIS_REPLY_STRING == entry->type)
<< "Unexcepted type: " << entry->type;
string_array_reply_.push_back(std::string(entry->str, entry->len));
string_array_reply_.emplace_back(std::string(entry->str, entry->len));
}
return;
}
@ -154,8 +155,12 @@ void CallbackReply::ParseAsStringArray(redisReply *redis_reply) {
string_array_reply_.reserve(array_size);
for (size_t i = 0; i < array_size; ++i) {
auto *entry = redis_reply->element[i];
RAY_CHECK(REDIS_REPLY_STRING == entry->type) << "Unexcepted type: " << entry->type;
string_array_reply_.push_back(std::string(entry->str, entry->len));
if (entry->type == REDIS_REPLY_STRING) {
string_array_reply_.emplace_back(std::string(entry->str, entry->len));
} else {
RAY_CHECK(REDIS_REPLY_NIL == entry->type) << "Unexcepted type: " << entry->type;
string_array_reply_.emplace_back();
}
}
}
@ -183,11 +188,16 @@ const std::string &CallbackReply::ReadAsPubsubData() const {
size_t CallbackReply::ReadAsScanArray(std::vector<std::string> *array) const {
RAY_CHECK(reply_type_ == REDIS_REPLY_ARRAY) << "Unexpected type: " << reply_type_;
*array = string_array_reply_;
array->clear();
array->reserve(string_array_reply_.size());
for (const auto &element : string_array_reply_) {
RAY_CHECK(element.has_value());
array->emplace_back(*element);
}
return next_scan_cursor_reply_;
}
const std::vector<std::string> &CallbackReply::ReadAsStringArray() const {
const std::vector<std::optional<std::string>> &CallbackReply::ReadAsStringArray() const {
RAY_CHECK(reply_type_ == REDIS_REPLY_ARRAY) << "Unexpected type: " << reply_type_;
return string_array_reply_;
}

View file

@ -62,7 +62,7 @@ class CallbackReply {
const std::string &ReadAsPubsubData() const;
/// Read this reply data as a string array.
const std::vector<std::string> &ReadAsStringArray() const;
[[nodiscard]] const std::vector<std::optional<std::string>> &ReadAsStringArray() const;
/// Read this reply data as a scan array.
///
@ -95,7 +95,7 @@ class CallbackReply {
/// Reply data if reply_type_ is REDIS_REPLY_ARRAY.
/// Represent the reply of StringArray or ScanArray.
std::vector<std::string> string_array_reply_;
std::vector<std::optional<std::string>> string_array_reply_;
bool is_subscribe_callback_ = false;
bool is_unsubscribe_callback_ = false;

View file

@ -334,9 +334,11 @@ Status RedisStoreClient::MGetValues(
if (!reply->IsNil()) {
auto value = reply->ReadAsStringArray();
// The 0 th element of mget_keys is "MGET", so we start from the 1 th element.
for (int index = 0; index < (int)value.size(); ++index) {
(*key_value_map)[GetKeyFromRedisKey(mget_keys[index + 1], table_name)] =
value[index];
for (size_t index = 0; index < value.size(); ++index) {
if (value[index].has_value()) {
(*key_value_map)[GetKeyFromRedisKey(mget_keys[index + 1], table_name)] =
*(value[index]);
}
}
}

View file

@ -0,0 +1,84 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/redis_context.h"
#include "gtest/gtest.h"
extern "C" {
#include "hiredis/hiredis.h"
}
namespace ray::gcs {
TEST(TestCallbackReply, TestParseAsStringArray) {
{
redisReply redis_reply_string1;
redis_reply_string1.type = REDIS_REPLY_STRING;
std::string string1 = "string1";
redis_reply_string1.str = string1.data();
redis_reply_string1.len = 7;
redisReply redis_reply_string2;
redis_reply_string2.type = REDIS_REPLY_STRING;
std::string string2 = "string2";
redis_reply_string2.str = string2.data();
redis_reply_string2.len = 7;
redisReply redis_reply_array;
redis_reply_array.type = REDIS_REPLY_ARRAY;
redis_reply_array.elements = 2;
redisReply *redis_reply_array_elements[2];
redis_reply_array_elements[0] = &redis_reply_string1;
redis_reply_array_elements[1] = &redis_reply_string2;
redis_reply_array.element = redis_reply_array_elements;
CallbackReply callback_reply(&redis_reply_array);
ASSERT_EQ(
callback_reply.ReadAsStringArray(),
(std::vector<std::optional<std::string>>{std::optional<std::string>(string1),
std::optional<std::string>(string2)}));
}
{
redisReply redis_reply_string1;
redis_reply_string1.type = REDIS_REPLY_STRING;
std::string string1 = "string1";
redis_reply_string1.str = string1.data();
redis_reply_string1.len = 7;
redisReply redis_reply_nil1;
redis_reply_nil1.type = REDIS_REPLY_NIL;
redisReply redis_reply_nil2;
redis_reply_nil2.type = REDIS_REPLY_NIL;
redisReply redis_reply_array;
redis_reply_array.type = REDIS_REPLY_ARRAY;
redis_reply_array.elements = 3;
redisReply *redis_reply_array_elements[3];
redis_reply_array_elements[0] = &redis_reply_nil1;
redis_reply_array_elements[1] = &redis_reply_string1;
redis_reply_array_elements[2] = &redis_reply_nil2;
redis_reply_array.element = redis_reply_array_elements;
CallbackReply callback_reply(&redis_reply_array);
ASSERT_EQ(callback_reply.ReadAsStringArray(),
(std::vector<std::optional<std::string>>{
std::optional<std::string>(), std::optional<std::string>(string1),
std::optional<std::string>()}));
}
}
} // namespace ray::gcs
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}