From eed39163f9d21d388bc54d72374577f804c1eda1 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 18 Jul 2018 16:59:04 -0700 Subject: [PATCH] Add callback to node manager for client removed event. (#2417) * Add callback to node manager for client removed event. * Fix linting. --- src/ray/id.cc | 2 ++ src/ray/id.h | 1 + src/ray/raylet/node_manager.cc | 45 ++++++++++++++++++++++++++++++++++ src/ray/raylet/node_manager.h | 5 ++++ 4 files changed, 53 insertions(+) diff --git a/src/ray/id.cc b/src/ray/id.cc index f197936ba..f3fc7806c 100644 --- a/src/ray/id.cc +++ b/src/ray/id.cc @@ -82,6 +82,8 @@ bool UniqueID::operator==(const UniqueID &rhs) const { return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0; } +bool UniqueID::operator!=(const UniqueID &rhs) const { return !(*this == rhs); } + // This code is from https://sites.google.com/site/murmurhash/ // and is public domain. uint64_t MurmurHash64A(const void *key, int len, unsigned int seed) { diff --git a/src/ray/id.h b/src/ray/id.h index 534f99a34..e2f9cf05a 100644 --- a/src/ray/id.h +++ b/src/ray/id.h @@ -22,6 +22,7 @@ class RAY_EXPORT UniqueID { size_t hash() const; bool is_nil() const; bool operator==(const UniqueID &rhs) const; + bool operator!=(const UniqueID &rhs) const; const uint8_t *data() const; uint8_t *mutable_data(); size_t size() const; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 10825edba..f9ae27bee 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -142,6 +142,12 @@ ray::Status NodeManager::RegisterGcs() { ClientAdded(data); }; gcs_client_->client_table().RegisterClientAddedCallback(node_manager_client_added); + // Register a callback on the client table for removed clients. + auto node_manager_client_removed = [this]( + gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) { + ClientRemoved(data); + }; + gcs_client_->client_table().RegisterClientRemovedCallback(node_manager_client_removed); // Subscribe to node manager heartbeats. const auto heartbeat_added = [this](gcs::AsyncGcsClient *client, const ClientID &id, @@ -203,6 +209,15 @@ void NodeManager::Heartbeat() { void NodeManager::ClientAdded(const ClientTableDataT &client_data) { ClientID client_id = ClientID::from_binary(client_data.client_id); + + // Make sure the client hasn't already been removed. + if (removed_clients_.find(client_id) != removed_clients_.end()) { + // This client has already been removed, so don't do anything. + RAY_LOG(INFO) << "The client " << client_id << " has already been removed, so it " + << "can't be added. This is very unusual."; + return; + } + RAY_LOG(DEBUG) << "[ClientAdded] received callback from client id " << client_id; if (client_id == gcs_client_->client_table().GetLocalClientId()) { // We got a notification for ourselves, so we are connected to the GCS now. @@ -240,6 +255,36 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) { remote_server_connections_.emplace(client_id, std::move(server_conn)); } +void NodeManager::ClientRemoved(const ClientTableDataT &client_data) { + const ClientID client_id = ClientID::from_binary(client_data.client_id); + RAY_LOG(DEBUG) << "[ClientRemoved] received callback from client id " << client_id; + + // If the client has already been removed, don't do anything. + if (removed_clients_.find(client_id) != removed_clients_.end()) { + RAY_LOG(INFO) << "The client " << client_id << " has already been removed. This " + << "should be very unusual."; + return; + } + removed_clients_.insert(client_id); + + RAY_CHECK(client_id != gcs_client_->client_table().GetLocalClientId()) + << "Exiting because this node manager has mistakenly been marked dead by the " + << "monitor."; + + // Below, when we remove client_id from all of these data structures, we could + // check that it is actually removed, or log a warning otherwise, but that may + // not be necessary. + + // Remove the client from the list of remote clients. + std::remove(remote_clients_.begin(), remote_clients_.end(), client_id); + + // Remove the client from the resource map. + cluster_resource_map_.erase(client_id); + + // Remove the remote server connection. + remote_server_connections_.erase(client_id); +} + void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &client_id, const HeartbeatTableDataT &heartbeat_data) { RAY_LOG(DEBUG) << "[HeartbeatAdded]: received heartbeat from client id " << client_id; diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 0e897bf07..ebe0f8480 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -62,6 +62,8 @@ class NodeManager { /// Methods for handling clients. /// Handler for the addition of a new GCS client. void ClientAdded(const ClientTableDataT &data); + /// Handler for the removal of a GCS client. + void ClientRemoved(const ClientTableDataT &client_data); /// Send heartbeats to the GCS. void Heartbeat(); /// Handler for a heartbeat notification from the GCS. @@ -133,6 +135,9 @@ class NodeManager { /// The lineage cache for the GCS object and task tables. LineageCache lineage_cache_; std::vector remote_clients_; + /// A set of all of the remote clients that have been removed. In principle, + /// this could grow unbounded. + std::unordered_set removed_clients_; std::unordered_map remote_server_connections_; std::unordered_map actor_registry_; };