mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00
Refactor ObjectDirectory to reduce and fix callback usage (#3227)
This commit is contained in:
parent
344b4ef0ff
commit
ca585703b2
11 changed files with 179 additions and 190 deletions
|
@ -648,7 +648,8 @@ void ClientTableNotification(gcs::AsyncGcsClient *client, const ClientID &client
|
|||
ASSERT_EQ(ClientID::from_binary(data.client_id), added_id);
|
||||
ASSERT_EQ(data.is_insertion, is_insertion);
|
||||
|
||||
auto cached_client = client->client_table().GetClient(added_id);
|
||||
ClientTableDataT cached_client;
|
||||
client->client_table().GetClient(added_id, cached_client);
|
||||
ASSERT_EQ(ClientID::from_binary(cached_client.client_id), added_id);
|
||||
ASSERT_EQ(cached_client.is_insertion, is_insertion);
|
||||
}
|
||||
|
|
|
@ -421,15 +421,14 @@ ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) {
|
|||
return Append(JobID::nil(), client_log_key_, data, nullptr);
|
||||
}
|
||||
|
||||
const ClientTableDataT &ClientTable::GetClient(const ClientID &client_id) const {
|
||||
void ClientTable::GetClient(const ClientID &client_id,
|
||||
ClientTableDataT &client_info) const {
|
||||
RAY_CHECK(!client_id.is_nil());
|
||||
auto entry = client_cache_.find(client_id);
|
||||
if (entry != client_cache_.end()) {
|
||||
return entry->second;
|
||||
client_info = entry->second;
|
||||
} else {
|
||||
// If the requested client was not found, return a reference to the nil
|
||||
// client entry.
|
||||
return client_cache_.at(ClientID::nil());
|
||||
client_info.client_id = ClientID::nil().binary();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -517,12 +517,6 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
|
|||
|
||||
// Set the local client's ID.
|
||||
local_client_.client_id = client_id.binary();
|
||||
|
||||
// Add a nil client to the cache so that we can serve requests for clients
|
||||
// that we have not heard about.
|
||||
ClientTableDataT nil_client;
|
||||
nil_client.client_id = ClientID::nil().binary();
|
||||
client_cache_[ClientID::nil()] = nil_client;
|
||||
};
|
||||
|
||||
/// Connect as a client to the GCS. This registers us in the client table
|
||||
|
@ -560,9 +554,11 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
|
|||
/// information for clients that we've heard a notification for.
|
||||
///
|
||||
/// \param client The client to get information about.
|
||||
/// \return A reference to the requested client. If the client is not in the
|
||||
/// cache, then an entry with a nil ClientID will be returned.
|
||||
const ClientTableDataT &GetClient(const ClientID &client) const;
|
||||
/// \param A reference to the client information. If we have information
|
||||
/// about the client in the cache, then the reference will be modified to
|
||||
/// contain that information. Else, the reference will be updated to contain
|
||||
/// a nil client ID.
|
||||
void GetClient(const ClientID &client, ClientTableDataT &client_info) const;
|
||||
|
||||
/// Get the local client's ID.
|
||||
///
|
||||
|
|
|
@ -2,8 +2,9 @@
|
|||
|
||||
namespace ray {
|
||||
|
||||
ObjectDirectory::ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client)
|
||||
: gcs_client_(gcs_client) {}
|
||||
ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,
|
||||
std::shared_ptr<gcs::AsyncGcsClient> &gcs_client)
|
||||
: io_service_(io_service), gcs_client_(gcs_client) {}
|
||||
|
||||
namespace {
|
||||
|
||||
|
@ -61,6 +62,8 @@ void ObjectDirectory::RegisterBackend() {
|
|||
// empty, since this may indicate that the objects have been evicted from
|
||||
// all nodes.
|
||||
for (const auto &callback_pair : callbacks) {
|
||||
// It is safe to call the callback directly since this is already running
|
||||
// in the subscription callback stack.
|
||||
callback_pair.second(client_id_vec, object_id);
|
||||
}
|
||||
};
|
||||
|
@ -102,40 +105,34 @@ ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
|
|||
return status;
|
||||
};
|
||||
|
||||
ray::Status ObjectDirectory::GetInformation(const ClientID &client_id,
|
||||
const InfoSuccessCallback &success_callback,
|
||||
const InfoFailureCallback &fail_callback) {
|
||||
const ClientTableDataT &data = gcs_client_->client_table().GetClient(client_id);
|
||||
ClientID result_client_id = ClientID::from_binary(data.client_id);
|
||||
if (result_client_id == ClientID::nil() || !data.is_insertion) {
|
||||
fail_callback();
|
||||
} else {
|
||||
const auto &info =
|
||||
RemoteConnectionInfo(client_id, data.node_manager_address,
|
||||
static_cast<uint16_t>(data.object_manager_port));
|
||||
success_callback(info);
|
||||
}
|
||||
return ray::Status::OK();
|
||||
}
|
||||
|
||||
void ObjectDirectory::RunFunctionForEachClient(
|
||||
const InfoSuccessCallback &client_function) {
|
||||
const auto &clients = gcs_client_->client_table().GetAllClients();
|
||||
for (const auto &client_pair : clients) {
|
||||
const ClientTableDataT &data = client_pair.second;
|
||||
if (client_pair.first == ClientID::nil() ||
|
||||
client_pair.first == gcs_client_->client_table().GetLocalClientId() ||
|
||||
!data.is_insertion) {
|
||||
continue;
|
||||
} else {
|
||||
const auto &info =
|
||||
RemoteConnectionInfo(client_pair.first, data.node_manager_address,
|
||||
static_cast<uint16_t>(data.object_manager_port));
|
||||
client_function(info);
|
||||
void ObjectDirectory::LookupRemoteConnectionInfo(
|
||||
RemoteConnectionInfo &connection_info) const {
|
||||
ClientTableDataT client_data;
|
||||
gcs_client_->client_table().GetClient(connection_info.client_id, client_data);
|
||||
ClientID result_client_id = ClientID::from_binary(client_data.client_id);
|
||||
if (!result_client_id.is_nil()) {
|
||||
RAY_CHECK(result_client_id == connection_info.client_id);
|
||||
if (client_data.is_insertion) {
|
||||
connection_info.ip = client_data.node_manager_address;
|
||||
connection_info.port = static_cast<uint16_t>(client_data.object_manager_port);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<RemoteConnectionInfo> ObjectDirectory::LookupAllRemoteConnections() const {
|
||||
std::vector<RemoteConnectionInfo> remote_connections;
|
||||
const auto &clients = gcs_client_->client_table().GetAllClients();
|
||||
for (const auto &client_pair : clients) {
|
||||
RemoteConnectionInfo info(client_pair.first);
|
||||
LookupRemoteConnectionInfo(info);
|
||||
if (info.Connected() &&
|
||||
info.client_id != gcs_client_->client_table().GetLocalClientId()) {
|
||||
remote_connections.push_back(info);
|
||||
}
|
||||
}
|
||||
return remote_connections;
|
||||
}
|
||||
|
||||
ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_id,
|
||||
const ObjectID &object_id,
|
||||
const OnLocationsFound &callback) {
|
||||
|
@ -156,7 +153,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
|
|||
// have been evicted from all nodes.
|
||||
std::vector<ClientID> client_id_vec(listener_state.current_object_locations.begin(),
|
||||
listener_state.current_object_locations.end());
|
||||
callback(client_id_vec, object_id);
|
||||
io_service_.post(
|
||||
[callback, client_id_vec, object_id]() { callback(client_id_vec, object_id); });
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -187,6 +185,8 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
|
|||
std::unordered_set<ClientID> client_ids;
|
||||
std::vector<ClientID> locations_vector = UpdateObjectLocations(
|
||||
client_ids, location_history, gcs_client_->client_table());
|
||||
// It is safe to call the callback directly since this is already running
|
||||
// in the GCS client's lookup callback stack.
|
||||
callback(locations_vector, object_id);
|
||||
});
|
||||
return status;
|
||||
|
|
|
@ -16,10 +16,12 @@ namespace ray {
|
|||
|
||||
/// Connection information for remote object managers.
|
||||
struct RemoteConnectionInfo {
|
||||
RemoteConnectionInfo() = default;
|
||||
RemoteConnectionInfo(const ClientID &id, const std::string &ip_address,
|
||||
uint16_t port_num)
|
||||
: client_id(id), ip(ip_address), port(port_num) {}
|
||||
RemoteConnectionInfo(const ClientID &id) : client_id(id) {}
|
||||
|
||||
// Returns whether there is enough information to connect to the remote
|
||||
// object manager.
|
||||
bool Connected() const { return !ip.empty(); }
|
||||
|
||||
ClientID client_id;
|
||||
std::string ip;
|
||||
uint16_t port;
|
||||
|
@ -27,24 +29,23 @@ struct RemoteConnectionInfo {
|
|||
|
||||
class ObjectDirectoryInterface {
|
||||
public:
|
||||
ObjectDirectoryInterface() = default;
|
||||
virtual ~ObjectDirectoryInterface() = default;
|
||||
|
||||
/// Callbacks for GetInformation.
|
||||
using InfoSuccessCallback = std::function<void(const ray::RemoteConnectionInfo &info)>;
|
||||
using InfoFailureCallback = std::function<void()>;
|
||||
virtual ~ObjectDirectoryInterface() {}
|
||||
|
||||
virtual void RegisterBackend() = 0;
|
||||
|
||||
/// This is used to establish object manager client connections.
|
||||
/// Lookup how to connect to a remote object manager.
|
||||
///
|
||||
/// \param client_id The client for which information is required.
|
||||
/// \param success_cb A callback which handles the success of this method.
|
||||
/// \param fail_cb A callback which handles the failure of this method.
|
||||
/// \return Status of whether this asynchronous request succeeded.
|
||||
virtual ray::Status GetInformation(const ClientID &client_id,
|
||||
const InfoSuccessCallback &success_cb,
|
||||
const InfoFailureCallback &fail_cb) = 0;
|
||||
/// \param connection_info The connection information to fill out. This
|
||||
/// should be pre-populated with the requested client ID. If the directory
|
||||
/// has information about the requested client, then the rest of the fields
|
||||
/// in this struct will be populated accordingly.
|
||||
virtual void LookupRemoteConnectionInfo(
|
||||
RemoteConnectionInfo &connection_info) const = 0;
|
||||
|
||||
/// Get information for all connected remote object managers.
|
||||
///
|
||||
/// \return A vector of information for all connected remote object managers.
|
||||
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;
|
||||
|
||||
/// Callback for object location notifications.
|
||||
using OnLocationsFound = std::function<void(const std::vector<ray::ClientID> &,
|
||||
|
@ -102,28 +103,27 @@ class ObjectDirectoryInterface {
|
|||
/// \return Status of whether this method succeeded.
|
||||
virtual ray::Status ReportObjectRemoved(const ObjectID &object_id,
|
||||
const ClientID &client_id) = 0;
|
||||
|
||||
/// Go through all the client information.
|
||||
///
|
||||
/// \param success_cb A callback which handles the success of this method.
|
||||
/// This function will be called multiple times.
|
||||
/// \return Void.
|
||||
virtual void RunFunctionForEachClient(const InfoSuccessCallback &client_function) = 0;
|
||||
};
|
||||
|
||||
/// Ray ObjectDirectory declaration.
|
||||
class ObjectDirectory : public ObjectDirectoryInterface {
|
||||
public:
|
||||
ObjectDirectory() = default;
|
||||
~ObjectDirectory() override = default;
|
||||
/// Create an object directory.
|
||||
///
|
||||
/// \param io_service The event loop to dispatch callbacks to. This should
|
||||
/// usually be the same event loop that the given gcs_client runs on.
|
||||
/// \param gcs_client A Ray GCS client to request object and client
|
||||
/// information from.
|
||||
ObjectDirectory(boost::asio::io_service &io_service,
|
||||
std::shared_ptr<gcs::AsyncGcsClient> &gcs_client);
|
||||
|
||||
virtual ~ObjectDirectory() {}
|
||||
|
||||
void RegisterBackend() override;
|
||||
|
||||
ray::Status GetInformation(const ClientID &client_id,
|
||||
const InfoSuccessCallback &success_callback,
|
||||
const InfoFailureCallback &fail_callback) override;
|
||||
void LookupRemoteConnectionInfo(RemoteConnectionInfo &connection_info) const override;
|
||||
|
||||
void RunFunctionForEachClient(const InfoSuccessCallback &client_function) override;
|
||||
std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const override;
|
||||
|
||||
ray::Status LookupLocations(const ObjectID &object_id,
|
||||
const OnLocationsFound &callback) override;
|
||||
|
@ -139,8 +139,6 @@ class ObjectDirectory : public ObjectDirectoryInterface {
|
|||
const object_manager::protocol::ObjectInfoT &object_info) override;
|
||||
ray::Status ReportObjectRemoved(const ObjectID &object_id,
|
||||
const ClientID &client_id) override;
|
||||
/// Ray only (not part of the OD interface).
|
||||
ObjectDirectory(std::shared_ptr<gcs::AsyncGcsClient> &gcs_client);
|
||||
|
||||
/// ObjectDirectory should not be copied.
|
||||
RAY_DISALLOW_COPY_AND_ASSIGN(ObjectDirectory);
|
||||
|
@ -154,6 +152,8 @@ class ObjectDirectory : public ObjectDirectoryInterface {
|
|||
std::unordered_set<ClientID> current_object_locations;
|
||||
};
|
||||
|
||||
/// Reference to the event loop.
|
||||
boost::asio::io_service &io_service_;
|
||||
/// Reference to the gcs client.
|
||||
std::shared_ptr<gcs::AsyncGcsClient> gcs_client_;
|
||||
/// Info about subscribers to object locations.
|
||||
|
|
|
@ -23,7 +23,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
|
|||
// TODO(hme): Eliminate knowledge of GCS.
|
||||
: client_id_(gcs_client->client_table().GetLocalClientId()),
|
||||
config_(config),
|
||||
object_directory_(new ObjectDirectory(gcs_client)),
|
||||
object_directory_(new ObjectDirectory(main_service, gcs_client)),
|
||||
store_notification_(main_service, config_.store_socket_name),
|
||||
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
|
||||
// an object prematurely whenever we reach the maximum number of sends.
|
||||
|
@ -263,23 +263,20 @@ void ObjectManager::PullEstablishConnection(const ObjectID &object_id,
|
|||
// TODO(hme): There is no cap on the number of pull request connections.
|
||||
connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE, client_id, &conn);
|
||||
|
||||
// Try to create a new connection to the remote object manager if one doesn't
|
||||
// already exist.
|
||||
if (conn == nullptr) {
|
||||
status = object_directory_->GetInformation(
|
||||
client_id,
|
||||
[this, object_id, client_id](const RemoteConnectionInfo &connection_info) {
|
||||
std::shared_ptr<SenderConnection> async_conn = CreateSenderConnection(
|
||||
ConnectionPool::ConnectionType::MESSAGE, connection_info);
|
||||
if (async_conn == nullptr) {
|
||||
return;
|
||||
}
|
||||
connection_pool_.RegisterSender(ConnectionPool::ConnectionType::MESSAGE,
|
||||
client_id, async_conn);
|
||||
PullSendRequest(object_id, async_conn);
|
||||
},
|
||||
[]() {
|
||||
RAY_LOG(ERROR) << "Failed to establish connection with remote object manager.";
|
||||
});
|
||||
} else {
|
||||
RemoteConnectionInfo connection_info(client_id);
|
||||
object_directory_->LookupRemoteConnectionInfo(connection_info);
|
||||
if (connection_info.Connected()) {
|
||||
conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE,
|
||||
connection_info);
|
||||
} else {
|
||||
RAY_LOG(ERROR) << "Failed to establish connection with remote object manager.";
|
||||
}
|
||||
}
|
||||
|
||||
if (conn != nullptr) {
|
||||
PullSendRequest(object_id, conn);
|
||||
}
|
||||
}
|
||||
|
@ -347,34 +344,30 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
|
|||
return;
|
||||
}
|
||||
|
||||
// TODO(hme): Cache this data in ObjectDirectory.
|
||||
// Okay for now since the GCS client caches this data.
|
||||
RAY_CHECK_OK(object_directory_->GetInformation(
|
||||
client_id,
|
||||
[this, object_id, client_id](const RemoteConnectionInfo &info) {
|
||||
const object_manager::protocol::ObjectInfoT &object_info =
|
||||
local_objects_[object_id];
|
||||
uint64_t data_size =
|
||||
static_cast<uint64_t>(object_info.data_size + object_info.metadata_size);
|
||||
uint64_t metadata_size = static_cast<uint64_t>(object_info.metadata_size);
|
||||
uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size);
|
||||
for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
|
||||
send_service_.post([this, client_id, object_id, data_size, metadata_size,
|
||||
chunk_index, info]() {
|
||||
// NOTE: When this callback executes, it's possible that the object
|
||||
// will have already been evicted. It's also possible that the
|
||||
// object could be in the process of being transferred to this
|
||||
// object manager from another object manager.
|
||||
ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index,
|
||||
info);
|
||||
});
|
||||
}
|
||||
},
|
||||
[]() {
|
||||
// Push is best effort, so do nothing here.
|
||||
RAY_LOG(ERROR)
|
||||
<< "Failed to establish connection for Push with remote object manager.";
|
||||
}));
|
||||
RemoteConnectionInfo connection_info(client_id);
|
||||
object_directory_->LookupRemoteConnectionInfo(connection_info);
|
||||
if (connection_info.Connected()) {
|
||||
const object_manager::protocol::ObjectInfoT &object_info = local_objects_[object_id];
|
||||
uint64_t data_size =
|
||||
static_cast<uint64_t>(object_info.data_size + object_info.metadata_size);
|
||||
uint64_t metadata_size = static_cast<uint64_t>(object_info.metadata_size);
|
||||
uint64_t num_chunks = buffer_pool_.GetNumChunks(data_size);
|
||||
for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
|
||||
send_service_.post([this, client_id, object_id, data_size, metadata_size,
|
||||
chunk_index, connection_info]() {
|
||||
// NOTE: When this callback executes, it's possible that the object
|
||||
// will have already been evicted. It's also possible that the
|
||||
// object could be in the process of being transferred to this
|
||||
// object manager from another object manager.
|
||||
ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index,
|
||||
connection_info);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// Push is best effort, so do nothing here.
|
||||
RAY_LOG(ERROR)
|
||||
<< "Failed to establish connection for Push with remote object manager.";
|
||||
}
|
||||
}
|
||||
|
||||
void ObjectManager::ExecuteSendObject(const ClientID &client_id,
|
||||
|
@ -389,15 +382,13 @@ void ObjectManager::ExecuteSendObject(const ClientID &client_id,
|
|||
if (conn == nullptr) {
|
||||
conn =
|
||||
CreateSenderConnection(ConnectionPool::ConnectionType::TRANSFER, connection_info);
|
||||
connection_pool_.RegisterSender(ConnectionPool::ConnectionType::TRANSFER, client_id,
|
||||
conn);
|
||||
if (conn == nullptr) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn);
|
||||
if (!status.ok()) {
|
||||
CheckIOError(status, "Push");
|
||||
|
||||
if (conn != nullptr) {
|
||||
status = SendObjectHeaders(object_id, data_size, metadata_size, chunk_index, conn);
|
||||
if (!status.ok()) {
|
||||
CheckIOError(status, "Push");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -545,23 +536,13 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
|
|||
wait_state.timeout_ms == 0) {
|
||||
// Requirements already satisfied.
|
||||
WaitComplete(wait_id);
|
||||
} else {
|
||||
// Wait may complete during the execution of any one of the following calls to
|
||||
// SubscribeObjectLocations, so copy the object ids that need to be iterated over.
|
||||
// Order matters for test purposes.
|
||||
std::vector<ObjectID> ordered_remaining_object_ids;
|
||||
for (const auto &object_id : wait_state.object_id_order) {
|
||||
if (wait_state.remaining.count(object_id) > 0) {
|
||||
ordered_remaining_object_ids.push_back(object_id);
|
||||
}
|
||||
}
|
||||
for (const auto &object_id : ordered_remaining_object_ids) {
|
||||
if (active_wait_requests_.find(wait_id) == active_wait_requests_.end()) {
|
||||
// This is possible if an object's location is obtained immediately,
|
||||
// within the current callstack. In this case, WaitComplete has been
|
||||
// invoked already, so we're done.
|
||||
return;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// There are objects remaining whose locations we don't know. Request their
|
||||
// locations from the object directory.
|
||||
for (const auto &object_id : wait_state.object_id_order) {
|
||||
if (wait_state.remaining.count(object_id) > 0) {
|
||||
wait_state.requested_objects.insert(object_id);
|
||||
// Subscribe to object notifications.
|
||||
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
|
||||
|
@ -584,6 +565,10 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
|
|||
}
|
||||
}));
|
||||
}
|
||||
|
||||
// If a timeout was provided, then set a timer. If we don't find locations
|
||||
// for enough objects by the time the timer expires, then we will return
|
||||
// from the Wait.
|
||||
if (wait_state.timeout_ms != -1) {
|
||||
auto timeout = boost::posix_time::milliseconds(wait_state.timeout_ms);
|
||||
wait_state.timeout_timer->expires_from_now(timeout);
|
||||
|
@ -643,19 +628,23 @@ std::shared_ptr<SenderConnection> ObjectManager::CreateSenderConnection(
|
|||
SenderConnection::Create(*main_service_, info.client_id, info.ip, info.port);
|
||||
if (conn == nullptr) {
|
||||
RAY_LOG(ERROR) << "Failed to connect to remote object manager.";
|
||||
return conn;
|
||||
} else {
|
||||
// Register the new connection.
|
||||
// TODO(Yuhong): Implement ConnectionPool::RemoveSender and call it if the client
|
||||
// disconnects.
|
||||
connection_pool_.RegisterSender(type, info.client_id, conn);
|
||||
// Prepare client connection info buffer
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER);
|
||||
auto message = object_manager_protocol::CreateConnectClientMessage(
|
||||
fbb, to_flatbuf(fbb, client_id_), is_transfer);
|
||||
fbb.Finish(message);
|
||||
// Send synchronously.
|
||||
// TODO(swang): Make this a WriteMessageAsync.
|
||||
RAY_CHECK_OK(conn->WriteMessage(
|
||||
static_cast<int64_t>(object_manager_protocol::MessageType::ConnectClient),
|
||||
fbb.GetSize(), fbb.GetBufferPointer()));
|
||||
}
|
||||
// Prepare client connection info buffer
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
bool is_transfer = (type == ConnectionPool::ConnectionType::TRANSFER);
|
||||
auto message = object_manager_protocol::CreateConnectClientMessage(
|
||||
fbb, fbb.CreateString(client_id_.binary()), is_transfer);
|
||||
fbb.Finish(message);
|
||||
// Send synchronously.
|
||||
RAY_CHECK_OK(conn->WriteMessage(
|
||||
static_cast<int64_t>(object_manager_protocol::MessageType::ConnectClient),
|
||||
fbb.GetSize(), fbb.GetBufferPointer()));
|
||||
// The connection is ready; return to caller.
|
||||
return conn;
|
||||
}
|
||||
|
||||
|
@ -808,25 +797,27 @@ void ObjectManager::SpreadFreeObjectRequest(const std::vector<ObjectID> &object_
|
|||
flatbuffers::Offset<object_manager_protocol::FreeRequestMessage> request =
|
||||
object_manager_protocol::CreateFreeRequestMessage(fbb, to_flatbuf(fbb, object_ids));
|
||||
fbb.Finish(request);
|
||||
auto function_on_client = [this, &fbb](const RemoteConnectionInfo &connection_info) {
|
||||
|
||||
const auto remote_connections = object_directory_->LookupAllRemoteConnections();
|
||||
for (const auto &connection_info : remote_connections) {
|
||||
std::shared_ptr<SenderConnection> conn;
|
||||
connection_pool_.GetSender(ConnectionPool::ConnectionType::MESSAGE,
|
||||
connection_info.client_id, &conn);
|
||||
if (conn == nullptr) {
|
||||
conn = CreateSenderConnection(ConnectionPool::ConnectionType::MESSAGE,
|
||||
connection_info);
|
||||
connection_pool_.RegisterSender(ConnectionPool::ConnectionType::MESSAGE,
|
||||
connection_info.client_id, conn);
|
||||
}
|
||||
ray::Status status = conn->WriteMessage(
|
||||
static_cast<int64_t>(object_manager_protocol::MessageType::FreeRequest),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
if (status.ok()) {
|
||||
connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn);
|
||||
|
||||
if (conn != nullptr) {
|
||||
// TODO(swang): Make this a WriteMessageAsync.
|
||||
ray::Status status = conn->WriteMessage(
|
||||
static_cast<int64_t>(object_manager_protocol::MessageType::FreeRequest),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
if (status.ok()) {
|
||||
connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn);
|
||||
}
|
||||
}
|
||||
// TODO(Yuhong): Implement ConnectionPool::RemoveSender and call it in "else".
|
||||
};
|
||||
object_directory_->RunFunctionForEachClient(function_on_client);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ray
|
||||
|
|
|
@ -433,11 +433,13 @@ class StressTestObjectManager : public TestObjectManagerBase {
|
|||
RAY_LOG(DEBUG) << "\n"
|
||||
<< "All connected clients:"
|
||||
<< "\n";
|
||||
const ClientTableDataT &data = gcs_client_1->client_table().GetClient(client_id_1);
|
||||
ClientTableDataT data;
|
||||
gcs_client_1->client_table().GetClient(client_id_1, data);
|
||||
RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data.client_id) << "\n"
|
||||
<< "ClientIp=" << data.node_manager_address << "\n"
|
||||
<< "ClientPort=" << data.node_manager_port;
|
||||
const ClientTableDataT &data2 = gcs_client_1->client_table().GetClient(client_id_2);
|
||||
ClientTableDataT data2;
|
||||
gcs_client_1->client_table().GetClient(client_id_2, data2);
|
||||
RAY_LOG(DEBUG) << "ClientID=" << ClientID::from_binary(data2.client_id) << "\n"
|
||||
<< "ClientIp=" << data2.node_manager_address << "\n"
|
||||
<< "ClientPort=" << data2.node_manager_port;
|
||||
|
|
|
@ -447,13 +447,15 @@ class TestObjectManager : public TestObjectManagerBase {
|
|||
RAY_LOG(DEBUG) << "\n"
|
||||
<< "Server client ids:"
|
||||
<< "\n";
|
||||
const ClientTableDataT &data = gcs_client_1->client_table().GetClient(client_id_1);
|
||||
ClientTableDataT data;
|
||||
gcs_client_1->client_table().GetClient(client_id_1, data);
|
||||
RAY_LOG(DEBUG) << (ClientID::from_binary(data.client_id) == ClientID::nil());
|
||||
RAY_LOG(DEBUG) << "Server 1 ClientID=" << ClientID::from_binary(data.client_id);
|
||||
RAY_LOG(DEBUG) << "Server 1 ClientIp=" << data.node_manager_address;
|
||||
RAY_LOG(DEBUG) << "Server 1 ClientPort=" << data.node_manager_port;
|
||||
ASSERT_EQ(client_id_1, ClientID::from_binary(data.client_id));
|
||||
const ClientTableDataT &data2 = gcs_client_1->client_table().GetClient(client_id_2);
|
||||
ClientTableDataT data2;
|
||||
gcs_client_1->client_table().GetClient(client_id_2, data2);
|
||||
RAY_LOG(DEBUG) << "Server 2 ClientID=" << ClientID::from_binary(data2.client_id);
|
||||
RAY_LOG(DEBUG) << "Server 2 ClientIp=" << data2.node_manager_address;
|
||||
RAY_LOG(DEBUG) << "Server 2 ClientPort=" << data2.node_manager_port;
|
||||
|
|
|
@ -61,7 +61,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
|
|||
[this](const TaskID &task_id) { HandleTaskReconstruction(task_id); },
|
||||
RayConfig::instance().initial_reconstruction_timeout_milliseconds(),
|
||||
gcs_client_->client_table().GetLocalClientId(), gcs_client->task_lease_table(),
|
||||
std::make_shared<ObjectDirectory>(gcs_client),
|
||||
std::make_shared<ObjectDirectory>(io_service, gcs_client),
|
||||
gcs_client_->task_reconstruction_log()),
|
||||
task_dependency_manager_(
|
||||
object_manager, reconstruction_policy_, io_service,
|
||||
|
@ -304,14 +304,13 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
|
|||
}
|
||||
|
||||
// Establish a new NodeManager connection to this GCS client.
|
||||
auto client_info = gcs_client_->client_table().GetClient(client_id);
|
||||
RAY_LOG(DEBUG) << "[ClientAdded] Trying to connect to client " << client_id << " at "
|
||||
<< client_info.node_manager_address << ":"
|
||||
<< client_info.node_manager_port;
|
||||
<< client_data.node_manager_address << ":"
|
||||
<< client_data.node_manager_port;
|
||||
|
||||
boost::asio::ip::tcp::socket socket(io_service_);
|
||||
auto status =
|
||||
TcpConnect(socket, client_info.node_manager_address, client_info.node_manager_port);
|
||||
TcpConnect(socket, client_data.node_manager_address, client_data.node_manager_port);
|
||||
// A disconnected client has 2 entries in the client table (one for being
|
||||
// inserted and one for being removed). When a new raylet starts, ClientAdded
|
||||
// will be called with the disconnected client's first entry, which will cause
|
||||
|
@ -1556,8 +1555,6 @@ void NodeManager::ForwardTask(const Task &task, const ClientID &node_id,
|
|||
RAY_LOG(DEBUG) << "Forwarding task " << task_id << " to " << node_id << " spillback="
|
||||
<< lineage_cache_entry_task.GetTaskExecutionSpec().NumForwards();
|
||||
|
||||
auto client_info = gcs_client_->client_table().GetClient(node_id);
|
||||
|
||||
// Lookup remote server connection for this node_id and use it to send the request.
|
||||
auto it = remote_server_connections_.find(node_id);
|
||||
if (it == remote_server_connections_.end()) {
|
||||
|
|
|
@ -204,12 +204,14 @@ class TestObjectManagerIntegration : public TestObjectManagerBase {
|
|||
RAY_LOG(INFO) << "\n"
|
||||
<< "All connected clients:"
|
||||
<< "\n";
|
||||
const ClientTableDataT &data = gcs_client_2->client_table().GetClient(client_id_1);
|
||||
ClientTableDataT data;
|
||||
gcs_client_2->client_table().GetClient(client_id_1, data);
|
||||
RAY_LOG(INFO) << (ClientID::from_binary(data.client_id) == ClientID::nil());
|
||||
RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data.client_id);
|
||||
RAY_LOG(INFO) << "ClientIp=" << data.node_manager_address;
|
||||
RAY_LOG(INFO) << "ClientPort=" << data.node_manager_port;
|
||||
const ClientTableDataT &data2 = gcs_client_1->client_table().GetClient(client_id_2);
|
||||
ClientTableDataT data2;
|
||||
gcs_client_1->client_table().GetClient(client_id_2, data2);
|
||||
RAY_LOG(INFO) << "ClientID=" << ClientID::from_binary(data2.client_id);
|
||||
RAY_LOG(INFO) << "ClientIp=" << data2.node_manager_address;
|
||||
RAY_LOG(INFO) << "ClientPort=" << data2.node_manager_port;
|
||||
|
|
|
@ -37,8 +37,8 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
|
|||
}
|
||||
|
||||
MOCK_METHOD0(RegisterBackend, void(void));
|
||||
MOCK_METHOD3(GetInformation, ray::Status(const ClientID &, const InfoSuccessCallback &,
|
||||
const InfoFailureCallback &));
|
||||
MOCK_CONST_METHOD1(LookupRemoteConnectionInfo, void(RemoteConnectionInfo &));
|
||||
MOCK_CONST_METHOD0(LookupAllRemoteConnections, std::vector<RemoteConnectionInfo>());
|
||||
MOCK_METHOD3(SubscribeObjectLocations,
|
||||
ray::Status(const ray::UniqueID &, const ObjectID &,
|
||||
const OnLocationsFound &));
|
||||
|
@ -48,7 +48,6 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
|
|||
ray::Status(const ObjectID &, const ClientID &,
|
||||
const object_manager::protocol::ObjectInfoT &));
|
||||
MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &));
|
||||
MOCK_METHOD1(RunFunctionForEachClient, void(const InfoSuccessCallback &success_cb));
|
||||
|
||||
private:
|
||||
std::vector<std::pair<ObjectID, OnLocationsFound>> callbacks_;
|
||||
|
|
Loading…
Add table
Reference in a new issue