Add callback to node manager for client removed event. (#2417)

* Add callback to node manager for client removed event.

* Fix linting.
This commit is contained in:
Robert Nishihara 2018-07-18 16:59:04 -07:00 committed by Philipp Moritz
parent 991d0911d1
commit eed39163f9
4 changed files with 53 additions and 0 deletions

View file

@ -82,6 +82,8 @@ bool UniqueID::operator==(const UniqueID &rhs) const {
return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
}
bool UniqueID::operator!=(const UniqueID &rhs) const { return !(*this == rhs); }
// This code is from https://sites.google.com/site/murmurhash/
// and is public domain.
uint64_t MurmurHash64A(const void *key, int len, unsigned int seed) {

View file

@ -22,6 +22,7 @@ class RAY_EXPORT UniqueID {
size_t hash() const;
bool is_nil() const;
bool operator==(const UniqueID &rhs) const;
bool operator!=(const UniqueID &rhs) const;
const uint8_t *data() const;
uint8_t *mutable_data();
size_t size() const;

View file

@ -142,6 +142,12 @@ ray::Status NodeManager::RegisterGcs() {
ClientAdded(data);
};
gcs_client_->client_table().RegisterClientAddedCallback(node_manager_client_added);
// Register a callback on the client table for removed clients.
auto node_manager_client_removed = [this](
gcs::AsyncGcsClient *client, const UniqueID &id, const ClientTableDataT &data) {
ClientRemoved(data);
};
gcs_client_->client_table().RegisterClientRemovedCallback(node_manager_client_removed);
// Subscribe to node manager heartbeats.
const auto heartbeat_added = [this](gcs::AsyncGcsClient *client, const ClientID &id,
@ -203,6 +209,15 @@ void NodeManager::Heartbeat() {
void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
ClientID client_id = ClientID::from_binary(client_data.client_id);
// Make sure the client hasn't already been removed.
if (removed_clients_.find(client_id) != removed_clients_.end()) {
// This client has already been removed, so don't do anything.
RAY_LOG(INFO) << "The client " << client_id << " has already been removed, so it "
<< "can't be added. This is very unusual.";
return;
}
RAY_LOG(DEBUG) << "[ClientAdded] received callback from client id " << client_id;
if (client_id == gcs_client_->client_table().GetLocalClientId()) {
// We got a notification for ourselves, so we are connected to the GCS now.
@ -240,6 +255,36 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
remote_server_connections_.emplace(client_id, std::move(server_conn));
}
void NodeManager::ClientRemoved(const ClientTableDataT &client_data) {
const ClientID client_id = ClientID::from_binary(client_data.client_id);
RAY_LOG(DEBUG) << "[ClientRemoved] received callback from client id " << client_id;
// If the client has already been removed, don't do anything.
if (removed_clients_.find(client_id) != removed_clients_.end()) {
RAY_LOG(INFO) << "The client " << client_id << " has already been removed. This "
<< "should be very unusual.";
return;
}
removed_clients_.insert(client_id);
RAY_CHECK(client_id != gcs_client_->client_table().GetLocalClientId())
<< "Exiting because this node manager has mistakenly been marked dead by the "
<< "monitor.";
// Below, when we remove client_id from all of these data structures, we could
// check that it is actually removed, or log a warning otherwise, but that may
// not be necessary.
// Remove the client from the list of remote clients.
std::remove(remote_clients_.begin(), remote_clients_.end(), client_id);
// Remove the client from the resource map.
cluster_resource_map_.erase(client_id);
// Remove the remote server connection.
remote_server_connections_.erase(client_id);
}
void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &client_id,
const HeartbeatTableDataT &heartbeat_data) {
RAY_LOG(DEBUG) << "[HeartbeatAdded]: received heartbeat from client id " << client_id;

View file

@ -62,6 +62,8 @@ class NodeManager {
/// Methods for handling clients.
/// Handler for the addition of a new GCS client.
void ClientAdded(const ClientTableDataT &data);
/// Handler for the removal of a GCS client.
void ClientRemoved(const ClientTableDataT &client_data);
/// Send heartbeats to the GCS.
void Heartbeat();
/// Handler for a heartbeat notification from the GCS.
@ -133,6 +135,9 @@ class NodeManager {
/// The lineage cache for the GCS object and task tables.
LineageCache lineage_cache_;
std::vector<ClientID> remote_clients_;
/// A set of all of the remote clients that have been removed. In principle,
/// this could grow unbounded.
std::unordered_set<ClientID> removed_clients_;
std::unordered_map<ClientID, TcpServerConnection> remote_server_connections_;
std::unordered_map<ActorID, ActorRegistration> actor_registry_;
};