[Core] Fix Crash in ObjectDirectory (#20540)

Here we met a crash in line 446's RAY_CHECK

d26c9e67e8/src/ray/object_manager/ownership_based_object_directory.cc (L441-L450)


And we found out that it's because we didn't set the node_id for dead nodes. If there are dead nodes and we are trying to LookupRemoteConnectionInfo in it. This crash will happen.

This PR fixes this crash.
This commit is contained in:
Lixin Wei 2021-12-08 15:03:49 +08:00 committed by GitHub
parent 1b9c03adb3
commit 96dc10a95a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 6 deletions

View file

@ -1813,6 +1813,21 @@ cc_library(
], ],
) )
cc_test(
name = "accessor_test",
size = "small",
srcs = [
"src/ray/gcs/gcs_client/test/accessor_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":gcs_client_lib",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_test( cc_test(
name = "global_state_accessor_test", name = "global_state_accessor_test",
size = "small", size = "small",

View file

@ -559,11 +559,14 @@ void NodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) {
// Add the notification to our cache. // Add the notification to our cache.
RAY_LOG(INFO) << "Received notification for node id = " << node_id RAY_LOG(INFO) << "Received notification for node id = " << node_id
<< ", IsAlive = " << is_alive; << ", IsAlive = " << is_alive;
auto &node = node_cache_[node_id];
if (is_alive) { if (is_alive) {
node_cache_[node_id] = node_info; node = node_info;
} else { } else {
node_cache_[node_id].set_state(rpc::GcsNodeInfo::DEAD); node.set_node_id(node_info.node_id());
node_cache_[node_id].set_timestamp(node_info.timestamp()); node.set_state(rpc::GcsNodeInfo::DEAD);
node.set_timestamp(node_info.timestamp());
} }
// If the notification is new, call registered callback. // If the notification is new, call registered callback.
@ -574,7 +577,9 @@ void NodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_info) {
removed_nodes_.insert(node_id); removed_nodes_.insert(node_id);
} }
GcsNodeInfo &cache_data = node_cache_[node_id]; GcsNodeInfo &cache_data = node_cache_[node_id];
node_change_callback_(node_id, cache_data); if (node_change_callback_) {
node_change_callback_(node_id, cache_data);
}
} }
} }

View file

@ -447,6 +447,9 @@ class NodeInfoAccessor {
virtual Status AsyncGetInternalConfig( virtual Status AsyncGetInternalConfig(
const OptionalItemCallback<std::string> &callback); const OptionalItemCallback<std::string> &callback);
/// Add a node to accessor cache.
virtual void HandleNotification(const rpc::GcsNodeInfo &node_info);
private: private:
/// Save the subscribe operation in this function, so we can call it again when PubSub /// Save the subscribe operation in this function, so we can call it again when PubSub
/// server restarts from a failure. /// server restarts from a failure.
@ -456,8 +459,6 @@ class NodeInfoAccessor {
/// server restarts from a failure. /// server restarts from a failure.
FetchDataOperation fetch_node_data_operation_; FetchDataOperation fetch_node_data_operation_;
void HandleNotification(const rpc::GcsNodeInfo &node_info);
GcsClient *client_impl_; GcsClient *client_impl_;
using NodeChangeCallback = using NodeChangeCallback =

View file

@ -0,0 +1,38 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_client/accessor.h"
#include "gtest/gtest.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
using namespace ray::gcs;
using namespace ray::rpc;
TEST(NodeInfoAccessorTest, TestHandleNotification) {
NodeInfoAccessor accessor;
GcsNodeInfo node_info;
node_info.set_state(rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_DEAD);
NodeID node_id = NodeID::FromRandom();
node_info.set_node_id(node_id.Binary());
accessor.HandleNotification(node_info);
ASSERT_EQ(accessor.Get(node_id, false)->node_id(), node_id.Binary());
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
} // namespace ray