From ca585703b260e678de187c47e863ed1822b92e46 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Tue, 6 Nov 2018 20:33:10 -0800 Subject: [PATCH] Refactor ObjectDirectory to reduce and fix callback usage (#3227) --- src/ray/gcs/client_test.cc | 3 +- src/ray/gcs/tables.cc | 9 +- src/ray/gcs/tables.h | 14 +- src/ray/object_manager/object_directory.cc | 66 +++---- src/ray/object_manager/object_directory.h | 66 +++---- src/ray/object_manager/object_manager.cc | 177 +++++++++--------- .../test/object_manager_stress_test.cc | 6 +- .../test/object_manager_test.cc | 6 +- src/ray/raylet/node_manager.cc | 11 +- .../raylet/object_manager_integration_test.cc | 6 +- src/ray/raylet/reconstruction_policy_test.cc | 5 +- 11 files changed, 179 insertions(+), 190 deletions(-) diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 116d4dd04..a99e4ceb0 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -648,7 +648,8 @@ void ClientTableNotification(gcs::AsyncGcsClient *client, const ClientID &client ASSERT_EQ(ClientID::from_binary(data.client_id), added_id); ASSERT_EQ(data.is_insertion, is_insertion); - auto cached_client = client->client_table().GetClient(added_id); + ClientTableDataT cached_client; + client->client_table().GetClient(added_id, cached_client); ASSERT_EQ(ClientID::from_binary(cached_client.client_id), added_id); ASSERT_EQ(cached_client.is_insertion, is_insertion); } diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 6ef2b66af..f1a9a1975 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -421,15 +421,14 @@ ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) { return Append(JobID::nil(), client_log_key_, data, nullptr); } -const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) const { +void ClientTable::GetClient(const ClientID &client_id, + ClientTableDataT &client_info) const { RAY_CHECK(!client_id.is_nil()); auto entry = client_cache_.find(client_id); if (entry != client_cache_.end()) { - return entry->second; + client_info = entry->second; } else { - // If the requested client was not found, return a reference to the nil - // client entry. - return client_cache_.at(ClientID::nil()); + client_info.client_id = ClientID::nil().binary(); } } diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 1973ebf42..6bc3759e9 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -517,12 +517,6 @@ class ClientTable : private Log { // Set the local client's ID. local_client_.client_id = client_id.binary(); - - // Add a nil client to the cache so that we can serve requests for clients - // that we have not heard about. - ClientTableDataT nil_client; - nil_client.client_id = ClientID::nil().binary(); - client_cache_[ClientID::nil()] = nil_client; }; /// Connect as a client to the GCS. This registers us in the client table @@ -560,9 +554,11 @@ class ClientTable : private Log { /// information for clients that we've heard a notification for. /// /// \param client The client to get information about. - /// \return A reference to the requested client. If the client is not in the - /// cache, then an entry with a nil ClientID will be returned. - const ClientTableDataT &GetClient(const ClientID &client) const; + /// \param A reference to the client information. If we have information + /// about the client in the cache, then the reference will be modified to + /// contain that information. Else, the reference will be updated to contain + /// a nil client ID. + void GetClient(const ClientID &client, ClientTableDataT &client_info) const; /// Get the local client's ID. /// diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 53bbf7a8c..64f6033c5 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -2,8 +2,9 @@ namespace ray { -ObjectDirectory::ObjectDirectory(std::shared_ptr &gcs_client) - : gcs_client_(gcs_client) {} +ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service, + std::shared_ptr &gcs_client) + : io_service_(io_service), gcs_client_(gcs_client) {} namespace { @@ -61,6 +62,8 @@ void ObjectDirectory::RegisterBackend() { // empty, since this may indicate that the objects have been evicted from // all nodes. for (const auto &callback_pair : callbacks) { + // It is safe to call the callback directly since this is already running + // in the subscription callback stack. callback_pair.second(client_id_vec, object_id); } }; @@ -102,40 +105,34 @@ ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id, return status; }; -ray::Status ObjectDirectory::GetInformation(const ClientID &client_id, - const InfoSuccessCallback &success_callback, - const InfoFailureCallback &fail_callback) { - const ClientTableDataT &data = gcs_client_->client_table().GetClient(client_id); - ClientID result_client_id = ClientID::from_binary(data.client_id); - if (result_client_id == ClientID::nil() || !data.is_insertion) { - fail_callback(); - } else { - const auto &info = - RemoteConnectionInfo(client_id, data.node_manager_address, - static_cast(data.object_manager_port)); - success_callback(info); - } - return ray::Status::OK(); -} - -void ObjectDirectory::RunFunctionForEachClient( - const InfoSuccessCallback &client_function) { - const auto &clients = gcs_client_->client_table().GetAllClients(); - for (const auto &client_pair : clients) { - const ClientTableDataT &data = client_pair.second; - if (client_pair.first == ClientID::nil() || - client_pair.first == gcs_client_->client_table().GetLocalClientId() || - !data.is_insertion) { - continue; - } else { - const auto &info = - RemoteConnectionInfo(client_pair.first, data.node_manager_address, - static_cast(data.object_manager_port)); - client_function(info); +void ObjectDirectory::LookupRemoteConnectionInfo( + RemoteConnectionInfo &connection_info) const { + ClientTableDataT client_data; + gcs_client_->client_table().GetClient(connection_info.client_id, client_data); + ClientID result_client_id = ClientID::from_binary(client_data.client_id); + if (!result_client_id.is_nil()) { + RAY_CHECK(result_client_id == connection_info.client_id); + if (client_data.is_insertion) { + connection_info.ip = client_data.node_manager_address; + connection_info.port = static_cast(client_data.object_manager_port); } } } +std::vector ObjectDirectory::LookupAllRemoteConnections() const { + std::vector remote_connections; + const auto &clients = gcs_client_->client_table().GetAllClients(); + for (const auto &client_pair : clients) { + RemoteConnectionInfo info(client_pair.first); + LookupRemoteConnectionInfo(info); + if (info.Connected() && + info.client_id != gcs_client_->client_table().GetLocalClientId()) { + remote_connections.push_back(info); + } + } + return remote_connections; +} + ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id, const OnLocationsFound &callback) { @@ -156,7 +153,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i // have been evicted from all nodes. std::vector client_id_vec(listener_state.current_object_locations.begin(), listener_state.current_object_locations.end()); - callback(client_id_vec, object_id); + io_service_.post( + [callback, client_id_vec, object_id]() { callback(client_id_vec, object_id); }); return status; } @@ -187,6 +185,8 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, std::unordered_set client_ids; std::vector locations_vector = UpdateObjectLocations( client_ids, location_history, gcs_client_->client_table()); + // It is safe to call the callback directly since this is already running + // in the GCS client's lookup callback stack. callback(locations_vector, object_id); }); return status; diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index bbe933462..e705e1b56 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -16,10 +16,12 @@ namespace ray { /// Connection information for remote object managers. struct RemoteConnectionInfo { - RemoteConnectionInfo() = default; - RemoteConnectionInfo(const ClientID &id, const std::string &ip_address, - uint16_t port_num) - : client_id(id), ip(ip_address), port(port_num) {} + RemoteConnectionInfo(const ClientID &id) : client_id(id) {} + + // Returns whether there is enough information to connect to the remote + // object manager. + bool Connected() const { return !ip.empty(); } + ClientID client_id; std::string ip; uint16_t port; @@ -27,24 +29,23 @@ struct RemoteConnectionInfo { class ObjectDirectoryInterface { public: - ObjectDirectoryInterface() = default; - virtual ~ObjectDirectoryInterface() = default; - - /// Callbacks for GetInformation. - using InfoSuccessCallback = std::function; - using InfoFailureCallback = std::function; + virtual ~ObjectDirectoryInterface() {} virtual void RegisterBackend() = 0; - /// This is used to establish object manager client connections. + /// Lookup how to connect to a remote object manager. /// - /// \param client_id The client for which information is required. - /// \param success_cb A callback which handles the success of this method. - /// \param fail_cb A callback which handles the failure of this method. - /// \return Status of whether this asynchronous request succeeded. - virtual ray::Status GetInformation(const ClientID &client_id, - const InfoSuccessCallback &success_cb, - const InfoFailureCallback &fail_cb) = 0; + /// \param connection_info The connection information to fill out. This + /// should be pre-populated with the requested client ID. If the directory + /// has information about the requested client, then the rest of the fields + /// in this struct will be populated accordingly. + virtual void LookupRemoteConnectionInfo( + RemoteConnectionInfo &connection_info) const = 0; + + /// Get information for all connected remote object managers. + /// + /// \return A vector of information for all connected remote object managers. + virtual std::vector LookupAllRemoteConnections() const = 0; /// Callback for object location notifications. using OnLocationsFound = std::function &, @@ -102,28 +103,27 @@ class ObjectDirectoryInterface { /// \return Status of whether this method succeeded. virtual ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) = 0; - - /// Go through all the client information. - /// - /// \param success_cb A callback which handles the success of this method. - /// This function will be called multiple times. - /// \return Void. - virtual void RunFunctionForEachClient(const InfoSuccessCallback &client_function) = 0; }; /// Ray ObjectDirectory declaration. class ObjectDirectory : public ObjectDirectoryInterface { public: - ObjectDirectory() = default; - ~ObjectDirectory() override = default; + /// Create an object directory. + /// + /// \param io_service The event loop to dispatch callbacks to. This should + /// usually be the same event loop that the given gcs_client runs on. + /// \param gcs_client A Ray GCS client to request object and client + /// information from. + ObjectDirectory(boost::asio::io_service &io_service, + std::shared_ptr &gcs_client); + + virtual ~ObjectDirectory() {} void RegisterBackend() override; - ray::Status GetInformation(const ClientID &client_id, - const InfoSuccessCallback &success_callback, - const InfoFailureCallback &fail_callback) override; + void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) const override; - void RunFunctionForEachClient(const InfoSuccessCallback &client_function) override; + std::vector LookupAllRemoteConnections() const override; ray::Status LookupLocations(const ObjectID &object_id, const OnLocationsFound &callback) override; @@ -139,8 +139,6 @@ class ObjectDirectory : public ObjectDirectoryInterface { const object_manager::protocol::ObjectInfoT &object_info) override; ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) override; - /// Ray only (not part of the OD interface). - ObjectDirectory(std::shared_ptr &gcs_client); /// ObjectDirectory should not be copied. RAY_DISALLOW_COPY_AND_ASSIGN(ObjectDirectory); @@ -154,6 +152,8 @@ class ObjectDirectory : public ObjectDirectoryInterface { std::unordered_set current_object_locations; }; + /// Reference to the event loop. + boost::asio::io_service &io_service_; /// Reference to the gcs client. std::shared_ptr gcs_client_; /// Info about subscribers to object locations. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 615498076..745270587 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -23,7 +23,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service, // TODO(hme): Eliminate knowledge of GCS. : client_id_(gcs_client->client_table().GetLocalClientId()), config_(config), - object_directory_(new ObjectDirectory(gcs_client)), + object_directory_(new ObjectDirectory(main_service, gcs_client)), store_notification_(main_service, config_.store_socket_name), // release_delay of 2 * config_.max_sends is to ensure the pool does not release // an object prematurely whenever we reach the maximum number of sends. @@ -263,23 +263,20 @@ void ObjectManager::PullEstablishConnection(const ObjectID &object_id, // TODO(hme): There is no cap on the number of pull request connections. connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE, client_id, &conn); + // Try to create a new connection to the remote object manager if one doesn't + // already exist. if (conn == nullptr) { - status = object_directory_->GetInformation( - client_id, - [this, object_id, client_id](const RemoteConnectionInfo &connection_info) { - std::shared_ptr async_conn = CreateSenderConnection( - ConnectionPool::ConnectionType::MESSAGE, connection_info); - if (async_conn == nullptr) { - return; - } - connection_pool_.RegisterSender(ConnectionPool::ConnectionType::MESSAGE, - client_id, async_conn); - PullSendRequest(object_id, async_conn); - }, - []() { - RAY_LOG(ERROR) << "Failed to establish connection with remote object manager."; - }); - } else { + RemoteConnectionInfo connection_info(client_id); + object_directory_->LookupRemoteConnectionInfo(connection_info); + if (connection_info.Connected()) { + conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE, + connection_info); + } else { + RAY_LOG(ERROR) << "Failed to establish connection with remote object manager."; + } + } + + if (conn != nullptr) { PullSendRequest(object_id, conn); } } @@ -347,34 +344,30 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { return; } - // TODO(hme): Cache this data in ObjectDirectory. - // Okay for now since the GCS client caches this data. - RAY_CHECK_OK(object_directory_->GetInformation( - client_id, - [this, object_id, client_id](const RemoteConnectionInfo &info) { - const object_manager::protocol::ObjectInfoT &object_info = - local_objects_[object_id]; - uint64_t data_size = - static_cast(object_info.data_size + object_info.metadata_size); - uint64_t metadata_size = static_cast(object_info.metadata_size); - uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); - for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { - send_service_.post([this, client_id, object_id, data_size, metadata_size, - chunk_index, info]() { - // NOTE: When this callback executes, it's possible that the object - // will have already been evicted. It's also possible that the - // object could be in the process of being transferred to this - // object manager from another object manager. - ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index, - info); - }); - } - }, - []() { - // Push is best effort, so do nothing here. - RAY_LOG(ERROR) - << "Failed to establish connection for Push with remote object manager."; - })); + RemoteConnectionInfo connection_info(client_id); + object_directory_->LookupRemoteConnectionInfo(connection_info); + if (connection_info.Connected()) { + const object_manager::protocol::ObjectInfoT &object_info = local_objects_[object_id]; + uint64_t data_size = + static_cast(object_info.data_size + object_info.metadata_size); + uint64_t metadata_size = static_cast(object_info.metadata_size); + uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size); + for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { + send_service_.post([this, client_id, object_id, data_size, metadata_size, + chunk_index, connection_info]() { + // NOTE: When this callback executes, it's possible that the object + // will have already been evicted. It's also possible that the + // object could be in the process of being transferred to this + // object manager from another object manager. + ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index, + connection_info); + }); + } + } else { + // Push is best effort, so do nothing here. + RAY_LOG(ERROR) + << "Failed to establish connection for Push with remote object manager."; + } } void ObjectManager::ExecuteSendObject(const ClientID &client_id, @@ -389,15 +382,13 @@ void ObjectManager::ExecuteSendObject(const ClientID &client_id, if (conn == nullptr) { conn = CreateSenderConnection(ConnectionPool::ConnectionType::TRANSFER, connection_info); - connection_pool_.RegisterSender(ConnectionPool::ConnectionType::TRANSFER, client_id, - conn); - if (conn == nullptr) { - return; - } } - status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn); - if (!status.ok()) { - CheckIOError(status, "Push"); + + if (conn != nullptr) { + status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn); + if (!status.ok()) { + CheckIOError(status, "Push"); + } } } @@ -545,23 +536,13 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { wait_state.timeout_ms == 0) { // Requirements already satisfied. WaitComplete(wait_id); - } else { - // Wait may complete during the execution of any one of the following calls to - // SubscribeObjectLocations, so copy the object ids that need to be iterated over. - // Order matters for test purposes. - std::vector ordered_remaining_object_ids; - for (const auto &object_id : wait_state.object_id_order) { - if (wait_state.remaining.count(object_id) > 0) { - ordered_remaining_object_ids.push_back(object_id); - } - } - for (const auto &object_id : ordered_remaining_object_ids) { - if (active_wait_requests_.find(wait_id) == active_wait_requests_.end()) { - // This is possible if an object's location is obtained immediately, - // within the current callstack. In this case, WaitComplete has been - // invoked already, so we're done. - return; - } + return; + } + + // There are objects remaining whose locations we don't know. Request their + // locations from the object directory. + for (const auto &object_id : wait_state.object_id_order) { + if (wait_state.remaining.count(object_id) > 0) { wait_state.requested_objects.insert(object_id); // Subscribe to object notifications. RAY_CHECK_OK(object_directory_->SubscribeObjectLocations( @@ -584,6 +565,10 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { } })); } + + // If a timeout was provided, then set a timer. If we don't find locations + // for enough objects by the time the timer expires, then we will return + // from the Wait. if (wait_state.timeout_ms != -1) { auto timeout = boost::posix_time::milliseconds(wait_state.timeout_ms); wait_state.timeout_timer->expires_from_now(timeout); @@ -643,19 +628,23 @@ std::shared_ptr ObjectManager::CreateSenderConnection( SenderConnection::Create(*main_service_, info.client_id, info.ip, info.port); if (conn == nullptr) { RAY_LOG(ERROR) << "Failed to connect to remote object manager."; - return conn; + } else { + // Register the new connection. + // TODO(Yuhong): Implement ConnectionPool::RemoveSender and call it if the client + // disconnects. + connection_pool_.RegisterSender(type, info.client_id, conn); + // Prepare client connection info buffer + flatbuffers::FlatBufferBuilder fbb; + bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER); + auto message = object_manager_protocol::CreateConnectClientMessage( + fbb, to_flatbuf(fbb, client_id_), is_transfer); + fbb.Finish(message); + // Send synchronously. + // TODO(swang): Make this a WriteMessageAsync. + RAY_CHECK_OK(conn->WriteMessage( + static_cast(object_manager_protocol::MessageType::ConnectClient), + fbb.GetSize(), fbb.GetBufferPointer())); } - // Prepare client connection info buffer - flatbuffers::FlatBufferBuilder fbb; - bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER); - auto message = object_manager_protocol::CreateConnectClientMessage( - fbb, fbb.CreateString(client_id_.binary()), is_transfer); - fbb.Finish(message); - // Send synchronously. - RAY_CHECK_OK(conn->WriteMessage( - static_cast(object_manager_protocol::MessageType::ConnectClient), - fbb.GetSize(), fbb.GetBufferPointer())); - // The connection is ready; return to caller. return conn; } @@ -808,25 +797,27 @@ void ObjectManager::SpreadFreeObjectRequest(const std::vector &object_ flatbuffers::Offset request = object_manager_protocol::CreateFreeRequestMessage(fbb, to_flatbuf(fbb, object_ids)); fbb.Finish(request); - auto function_on_client = [this, &fbb](const RemoteConnectionInfo &connection_info) { + + const auto remote_connections = object_directory_->LookupAllRemoteConnections(); + for (const auto &connection_info : remote_connections) { std::shared_ptr conn; connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE, connection_info.client_id, &conn); if (conn == nullptr) { conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE, connection_info); - connection_pool_.RegisterSender(ConnectionPool::ConnectionType::MESSAGE, - connection_info.client_id, conn); } - ray::Status status = conn->WriteMessage( - static_cast(object_manager_protocol::MessageType::FreeRequest), - fbb.GetSize(), fbb.GetBufferPointer()); - if (status.ok()) { - connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn); + + if (conn != nullptr) { + // TODO(swang): Make this a WriteMessageAsync. + ray::Status status = conn->WriteMessage( + static_cast(object_manager_protocol::MessageType::FreeRequest), + fbb.GetSize(), fbb.GetBufferPointer()); + if (status.ok()) { + connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn); + } } - // TODO(Yuhong): Implement ConnectionPool::RemoveSender and call it in "else". - }; - object_directory_->RunFunctionForEachClient(function_on_client); + } } } // namespace ray diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index a26519c54..31ea5aed7 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -433,11 +433,13 @@ class StressTestObjectManager : public TestObjectManagerBase { RAY_LOG(DEBUG) << "\n" << "All connected clients:" << "\n"; - const ClientTableDataT &data = gcs_client_1->client_table().GetClient(client_id_1); + ClientTableDataT data; + gcs_client_1->client_table().GetClient(client_id_1, data); RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data.client_id) << "\n" << "ClientIp=" << data.node_manager_address << "\n" << "ClientPort=" << data.node_manager_port; - const ClientTableDataT &data2 = gcs_client_1->client_table().GetClient(client_id_2); + ClientTableDataT data2; + gcs_client_1->client_table().GetClient(client_id_2, data2); RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data2.client_id) << "\n" << "ClientIp=" << data2.node_manager_address << "\n" << "ClientPort=" << data2.node_manager_port; diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 3ce2f2c2a..cb9870675 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -447,13 +447,15 @@ class TestObjectManager : public TestObjectManagerBase { RAY_LOG(DEBUG) << "\n" << "Server client ids:" << "\n"; - const ClientTableDataT &data = gcs_client_1->client_table().GetClient(client_id_1); + ClientTableDataT data; + gcs_client_1->client_table().GetClient(client_id_1, data); RAY_LOG(DEBUG) << (ClientID::from_binary(data.client_id) == ClientID::nil()); RAY_LOG(DEBUG) << "Server 1 ClientID=" << ClientID::from_binary(data.client_id); RAY_LOG(DEBUG) << "Server 1 ClientIp=" << data.node_manager_address; RAY_LOG(DEBUG) << "Server 1 ClientPort=" << data.node_manager_port; ASSERT_EQ(client_id_1, ClientID::from_binary(data.client_id)); - const ClientTableDataT &data2 = gcs_client_1->client_table().GetClient(client_id_2); + ClientTableDataT data2; + gcs_client_1->client_table().GetClient(client_id_2, data2); RAY_LOG(DEBUG) << "Server 2 ClientID=" << ClientID::from_binary(data2.client_id); RAY_LOG(DEBUG) << "Server 2 ClientIp=" << data2.node_manager_address; RAY_LOG(DEBUG) << "Server 2 ClientPort=" << data2.node_manager_port; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 94ac03a68..54145152b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -61,7 +61,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, [this](const TaskID &task_id) { HandleTaskReconstruction(task_id); }, RayConfig::instance().initial_reconstruction_timeout_milliseconds(), gcs_client_->client_table().GetLocalClientId(), gcs_client->task_lease_table(), - std::make_shared(gcs_client), + std::make_shared(io_service, gcs_client), gcs_client_->task_reconstruction_log()), task_dependency_manager_( object_manager, reconstruction_policy_, io_service, @@ -304,14 +304,13 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) { } // Establish a new NodeManager connection to this GCS client. - auto client_info = gcs_client_->client_table().GetClient(client_id); RAY_LOG(DEBUG) << "[ClientAdded] Trying to connect to client " << client_id << " at " - << client_info.node_manager_address << ":" - << client_info.node_manager_port; + << client_data.node_manager_address << ":" + << client_data.node_manager_port; boost::asio::ip::tcp::socket socket(io_service_); auto status = - TcpConnect(socket, client_info.node_manager_address, client_info.node_manager_port); + TcpConnect(socket, client_data.node_manager_address, client_data.node_manager_port); // A disconnected client has 2 entries in the client table (one for being // inserted and one for being removed). When a new raylet starts, ClientAdded // will be called with the disconnected client's first entry, which will cause @@ -1556,8 +1555,6 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id, RAY_LOG(DEBUG) << "Forwarding task " << task_id << " to " << node_id << " spillback=" << lineage_cache_entry_task.GetTaskExecutionSpec().NumForwards(); - auto client_info = gcs_client_->client_table().GetClient(node_id); - // Lookup remote server connection for this node_id and use it to send the request. auto it = remote_server_connections_.find(node_id); if (it == remote_server_connections_.end()) { diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 96ecc41de..83c7a9f8f 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -204,12 +204,14 @@ class TestObjectManagerIntegration : public TestObjectManagerBase { RAY_LOG(INFO) << "\n" << "All connected clients:" << "\n"; - const ClientTableDataT &data = gcs_client_2->client_table().GetClient(client_id_1); + ClientTableDataT data; + gcs_client_2->client_table().GetClient(client_id_1, data); RAY_LOG(INFO) << (ClientID::from_binary(data.client_id) == ClientID::nil()); RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data.client_id); RAY_LOG(INFO) << "ClientIp=" << data.node_manager_address; RAY_LOG(INFO) << "ClientPort=" << data.node_manager_port; - const ClientTableDataT &data2 = gcs_client_1->client_table().GetClient(client_id_2); + ClientTableDataT data2; + gcs_client_1->client_table().GetClient(client_id_2, data2); RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data2.client_id); RAY_LOG(INFO) << "ClientIp=" << data2.node_manager_address; RAY_LOG(INFO) << "ClientPort=" << data2.node_manager_port; diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index e103001cd..ec117cd1c 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -37,8 +37,8 @@ class MockObjectDirectory : public ObjectDirectoryInterface { } MOCK_METHOD0(RegisterBackend, void(void)); - MOCK_METHOD3(GetInformation, ray::Status(const ClientID &, const InfoSuccessCallback &, - const InfoFailureCallback &)); + MOCK_CONST_METHOD1(LookupRemoteConnectionInfo, void(RemoteConnectionInfo &)); + MOCK_CONST_METHOD0(LookupAllRemoteConnections, std::vector()); MOCK_METHOD3(SubscribeObjectLocations, ray::Status(const ray::UniqueID &, const ObjectID &, const OnLocationsFound &)); @@ -48,7 +48,6 @@ class MockObjectDirectory : public ObjectDirectoryInterface { ray::Status(const ObjectID &, const ClientID &, const object_manager::protocol::ObjectInfoT &)); MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &)); - MOCK_METHOD1(RunFunctionForEachClient, void(const InfoSuccessCallback &success_cb)); private: std::vector> callbacks_;