diff --git a/src/ray/common/client_connection.cc b/src/ray/common/client_connection.cc index 817223e54..7f7e8a476 100644 --- a/src/ray/common/client_connection.cc +++ b/src/ray/common/client_connection.cc @@ -317,9 +317,9 @@ std::string ClientConnection::RemoteEndpointInfo() { } template <> -std::string ClientConnection::RemoteEndpointInfo() { +std::string ClientConnection::RemoteEndpointInfo() { const auto &remote_endpoint = - ServerConnection::socket_.remote_endpoint(); + ServerConnection::socket_.remote_endpoint(); return remote_endpoint.address().to_string() + ":" + std::to_string(remote_endpoint.port()); } @@ -361,9 +361,9 @@ std::string ServerConnection::DebugString() const { return result.str(); } -template class ServerConnection; -template class ServerConnection; -template class ClientConnection; -template class ClientConnection; +template class ServerConnection; +template class ServerConnection; +template class ClientConnection; +template class ClientConnection; } // namespace ray diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index e4e6d3c5f..4c9c6292a 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -213,10 +213,20 @@ class ClientConnection : public ServerConnection { std::vector read_message_; }; -using LocalServerConnection = ServerConnection; -using TcpServerConnection = ServerConnection; -using LocalClientConnection = ClientConnection; -using TcpClientConnection = ClientConnection; +typedef +#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS) + boost::asio::local::stream_protocol +#else + boost::asio::generic::stream_protocol +#endif + local_stream_protocol; + +typedef boost::asio::ip::tcp remote_stream_protocol; + +using LocalServerConnection = ServerConnection; +using TcpServerConnection = ServerConnection; +using LocalClientConnection = ClientConnection; +using TcpClientConnection = ClientConnection; } // namespace ray diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index 151c4f548..d740aeee6 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -24,7 +24,7 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager( RAY_ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_)); boost::system::error_code ec; - socket_.assign(boost::asio::local::stream_protocol(), c_socket_, ec); + socket_.assign(local_stream_protocol(), c_socket_, ec); assert(!ec.value()); NotificationWait(); } diff --git a/src/ray/object_manager/object_store_notification_manager.h b/src/ray/object_manager/object_store_notification_manager.h index 912c5dc88..35ed0f1ee 100644 --- a/src/ray/object_manager/object_store_notification_manager.h +++ b/src/ray/object_manager/object_store_notification_manager.h @@ -11,6 +11,7 @@ #include "plasma/client.h" +#include "ray/common/client_connection.h" #include "ray/common/id.h" #include "ray/common/status.h" @@ -70,7 +71,7 @@ class ObjectStoreNotificationManager { int64_t num_adds_processed_; int64_t num_removes_processed_; std::vector notification_; - boost::asio::local::stream_protocol::socket socket_; + local_stream_protocol::socket socket_; }; } // namespace ray diff --git a/src/ray/raylet/client_connection_test.cc b/src/ray/raylet/client_connection_test.cc index d6359ae28..db6ca53e5 100644 --- a/src/ray/raylet/client_connection_test.cc +++ b/src/ray/raylet/client_connection_test.cc @@ -31,8 +31,8 @@ class ClientConnectionTest : public ::testing::Test { protected: boost::asio::io_service io_service_; - boost::asio::local::stream_protocol::socket in_; - boost::asio::local::stream_protocol::socket out_; + local_stream_protocol::socket in_; + local_stream_protocol::socket out_; int64_t error_message_type_; }; @@ -40,10 +40,10 @@ TEST_F(ClientConnectionTest, SimpleSyncWrite) { const uint8_t arr[5] = {1, 2, 3, 4, 5}; int num_messages = 0; - ClientHandler client_handler = + ClientHandler client_handler = [](LocalClientConnection &client) {}; - MessageHandler message_handler = + MessageHandler message_handler = [&arr, &num_messages](std::shared_ptr client, int64_t message_type, const uint8_t *message) { ASSERT_TRUE(!std::memcmp(arr, message, 5)); @@ -70,16 +70,16 @@ TEST_F(ClientConnectionTest, SimpleAsyncWrite) { const uint8_t msg3[5] = {8, 8, 8, 8, 8}; int num_messages = 0; - ClientHandler client_handler = + ClientHandler client_handler = [](LocalClientConnection &client) {}; - MessageHandler noop_handler = + MessageHandler noop_handler = [](std::shared_ptr client, int64_t message_type, const uint8_t *message) {}; std::shared_ptr reader = NULL; - MessageHandler message_handler = + MessageHandler message_handler = [&msg1, &msg2, &msg3, &num_messages, &reader]( std::shared_ptr client, int64_t message_type, const uint8_t *message) { @@ -117,10 +117,10 @@ TEST_F(ClientConnectionTest, SimpleAsyncWrite) { TEST_F(ClientConnectionTest, SimpleAsyncError) { const uint8_t msg1[5] = {1, 2, 3, 4, 5}; - ClientHandler client_handler = + ClientHandler client_handler = [](LocalClientConnection &client) {}; - MessageHandler noop_handler = + MessageHandler noop_handler = [](std::shared_ptr client, int64_t message_type, const uint8_t *message) {}; @@ -139,10 +139,10 @@ TEST_F(ClientConnectionTest, SimpleAsyncError) { TEST_F(ClientConnectionTest, CallbackWithSharedRefDoesNotLeakConnection) { const uint8_t msg1[5] = {1, 2, 3, 4, 5}; - ClientHandler client_handler = + ClientHandler client_handler = [](LocalClientConnection &client) {}; - MessageHandler noop_handler = + MessageHandler noop_handler = [](std::shared_ptr client, int64_t message_type, const uint8_t *message) {}; @@ -162,10 +162,10 @@ TEST_F(ClientConnectionTest, ProcessBadMessage) { const uint8_t arr[5] = {1, 2, 3, 4, 5}; int num_messages = 0; - ClientHandler client_handler = + ClientHandler client_handler = [](LocalClientConnection &client) {}; - MessageHandler message_handler = + MessageHandler message_handler = [&arr, &num_messages](std::shared_ptr client, int64_t message_type, const uint8_t *message) { ASSERT_TRUE(!std::memcmp(arr, message, 5)); diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index b3c8d8c37..831be4d7e 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -51,7 +51,7 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ node_manager_(main_service, node_manager_config, object_manager_, gcs_client_, object_directory_), socket_name_(socket_name), - acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)), + acceptor_(main_service, local_stream_protocol::endpoint(socket_name)), socket_(main_service) { // Start listening for clients. DoAccept(); @@ -118,9 +118,9 @@ void Raylet::DoAccept() { void Raylet::HandleAccept(const boost::system::error_code &error) { if (!error) { // TODO: typedef these handlers. - ClientHandler client_handler = + ClientHandler client_handler = [this](LocalClientConnection &client) { node_manager_.ProcessNewClient(client); }; - MessageHandler message_handler = + MessageHandler message_handler = [this](std::shared_ptr client, int64_t message_type, const uint8_t *message) { node_manager_.ProcessClientMessage(client, message_type, message); diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index ec7fe74cf..24c1c7746 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -76,9 +76,9 @@ class Raylet { std::string socket_name_; /// An acceptor for new clients. - boost::asio::local::stream_protocol::acceptor acceptor_; + local_stream_protocol::acceptor acceptor_; /// The socket to listen on for new clients. - boost::asio::local::stream_protocol::socket socket_; + local_stream_protocol::socket socket_; }; } // namespace raylet diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index c3f8702b9..aae9714ea 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -85,7 +85,7 @@ class WorkerPoolTest : public ::testing::Test { int64_t message_type, const uint8_t *message) { HandleMessage(client, message_type, message); }; - boost::asio::local::stream_protocol::socket socket(io_service_); + local_stream_protocol::socket socket(io_service_); auto client = LocalClientConnection::Create(client_handler, message_handler, std::move(socket), "worker", {}, error_message_type_);