Factor out stream protocols for Windows (#6408)

This commit is contained in:
mehrdadn 2019-12-10 15:23:30 -08:00 committed by Philipp Moritz
parent 4e1d1ed00d
commit 2f19b01898
8 changed files with 42 additions and 31 deletions

View file

@ -317,9 +317,9 @@ std::string ClientConnection<T>::RemoteEndpointInfo() {
} }
template <> template <>
std::string ClientConnection<boost::asio::ip::tcp>::RemoteEndpointInfo() { std::string ClientConnection<remote_stream_protocol>::RemoteEndpointInfo() {
const auto &remote_endpoint = const auto &remote_endpoint =
ServerConnection<boost::asio::ip::tcp>::socket_.remote_endpoint(); ServerConnection<remote_stream_protocol>::socket_.remote_endpoint();
return remote_endpoint.address().to_string() + ":" + return remote_endpoint.address().to_string() + ":" +
std::to_string(remote_endpoint.port()); std::to_string(remote_endpoint.port());
} }
@ -361,9 +361,9 @@ std::string ServerConnection<T>::DebugString() const {
return result.str(); return result.str();
} }
template class ServerConnection<boost::asio::local::stream_protocol>; template class ServerConnection<local_stream_protocol>;
template class ServerConnection<boost::asio::ip::tcp>; template class ServerConnection<remote_stream_protocol>;
template class ClientConnection<boost::asio::local::stream_protocol>; template class ClientConnection<local_stream_protocol>;
template class ClientConnection<boost::asio::ip::tcp>; template class ClientConnection<remote_stream_protocol>;
} // namespace ray } // namespace ray

View file

@ -213,10 +213,20 @@ class ClientConnection : public ServerConnection<T> {
std::vector<uint8_t> read_message_; std::vector<uint8_t> read_message_;
}; };
using LocalServerConnection = ServerConnection<boost::asio::local::stream_protocol>; typedef
using TcpServerConnection = ServerConnection<boost::asio::ip::tcp>; #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
using LocalClientConnection = ClientConnection<boost::asio::local::stream_protocol>; boost::asio::local::stream_protocol
using TcpClientConnection = ClientConnection<boost::asio::ip::tcp>; #else
boost::asio::generic::stream_protocol
#endif
local_stream_protocol;
typedef boost::asio::ip::tcp remote_stream_protocol;
using LocalServerConnection = ServerConnection<local_stream_protocol>;
using TcpServerConnection = ServerConnection<remote_stream_protocol>;
using LocalClientConnection = ClientConnection<local_stream_protocol>;
using TcpClientConnection = ClientConnection<remote_stream_protocol>;
} // namespace ray } // namespace ray

View file

@ -24,7 +24,7 @@ ObjectStoreNotificationManager::ObjectStoreNotificationManager(
RAY_ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_)); RAY_ARROW_CHECK_OK(store_client_.Subscribe(&c_socket_));
boost::system::error_code ec; boost::system::error_code ec;
socket_.assign(boost::asio::local::stream_protocol(), c_socket_, ec); socket_.assign(local_stream_protocol(), c_socket_, ec);
assert(!ec.value()); assert(!ec.value());
NotificationWait(); NotificationWait();
} }

View file

@ -11,6 +11,7 @@
#include "plasma/client.h" #include "plasma/client.h"
#include "ray/common/client_connection.h"
#include "ray/common/id.h" #include "ray/common/id.h"
#include "ray/common/status.h" #include "ray/common/status.h"
@ -70,7 +71,7 @@ class ObjectStoreNotificationManager {
int64_t num_adds_processed_; int64_t num_adds_processed_;
int64_t num_removes_processed_; int64_t num_removes_processed_;
std::vector<uint8_t> notification_; std::vector<uint8_t> notification_;
boost::asio::local::stream_protocol::socket socket_; local_stream_protocol::socket socket_;
}; };
} // namespace ray } // namespace ray

View file

@ -31,8 +31,8 @@ class ClientConnectionTest : public ::testing::Test {
protected: protected:
boost::asio::io_service io_service_; boost::asio::io_service io_service_;
boost::asio::local::stream_protocol::socket in_; local_stream_protocol::socket in_;
boost::asio::local::stream_protocol::socket out_; local_stream_protocol::socket out_;
int64_t error_message_type_; int64_t error_message_type_;
}; };
@ -40,10 +40,10 @@ TEST_F(ClientConnectionTest, SimpleSyncWrite) {
const uint8_t arr[5] = {1, 2, 3, 4, 5}; const uint8_t arr[5] = {1, 2, 3, 4, 5};
int num_messages = 0; int num_messages = 0;
ClientHandler<boost::asio::local::stream_protocol> client_handler = ClientHandler<local_stream_protocol> client_handler =
[](LocalClientConnection &client) {}; [](LocalClientConnection &client) {};
MessageHandler<boost::asio::local::stream_protocol> message_handler = MessageHandler<local_stream_protocol> message_handler =
[&arr, &num_messages](std::shared_ptr<LocalClientConnection> client, [&arr, &num_messages](std::shared_ptr<LocalClientConnection> client,
int64_t message_type, const uint8_t *message) { int64_t message_type, const uint8_t *message) {
ASSERT_TRUE(!std::memcmp(arr, message, 5)); ASSERT_TRUE(!std::memcmp(arr, message, 5));
@ -70,16 +70,16 @@ TEST_F(ClientConnectionTest, SimpleAsyncWrite) {
const uint8_t msg3[5] = {8, 8, 8, 8, 8}; const uint8_t msg3[5] = {8, 8, 8, 8, 8};
int num_messages = 0; int num_messages = 0;
ClientHandler<boost::asio::local::stream_protocol> client_handler = ClientHandler<local_stream_protocol> client_handler =
[](LocalClientConnection &client) {}; [](LocalClientConnection &client) {};
MessageHandler<boost::asio::local::stream_protocol> noop_handler = MessageHandler<local_stream_protocol> noop_handler =
[](std::shared_ptr<LocalClientConnection> client, int64_t message_type, [](std::shared_ptr<LocalClientConnection> client, int64_t message_type,
const uint8_t *message) {}; const uint8_t *message) {};
std::shared_ptr<LocalClientConnection> reader = NULL; std::shared_ptr<LocalClientConnection> reader = NULL;
MessageHandler<boost::asio::local::stream_protocol> message_handler = MessageHandler<local_stream_protocol> message_handler =
[&msg1, &msg2, &msg3, &num_messages, &reader]( [&msg1, &msg2, &msg3, &num_messages, &reader](
std::shared_ptr<LocalClientConnection> client, int64_t message_type, std::shared_ptr<LocalClientConnection> client, int64_t message_type,
const uint8_t *message) { const uint8_t *message) {
@ -117,10 +117,10 @@ TEST_F(ClientConnectionTest, SimpleAsyncWrite) {
TEST_F(ClientConnectionTest, SimpleAsyncError) { TEST_F(ClientConnectionTest, SimpleAsyncError) {
const uint8_t msg1[5] = {1, 2, 3, 4, 5}; const uint8_t msg1[5] = {1, 2, 3, 4, 5};
ClientHandler<boost::asio::local::stream_protocol> client_handler = ClientHandler<local_stream_protocol> client_handler =
[](LocalClientConnection &client) {}; [](LocalClientConnection &client) {};
MessageHandler<boost::asio::local::stream_protocol> noop_handler = MessageHandler<local_stream_protocol> noop_handler =
[](std::shared_ptr<LocalClientConnection> client, int64_t message_type, [](std::shared_ptr<LocalClientConnection> client, int64_t message_type,
const uint8_t *message) {}; const uint8_t *message) {};
@ -139,10 +139,10 @@ TEST_F(ClientConnectionTest, SimpleAsyncError) {
TEST_F(ClientConnectionTest, CallbackWithSharedRefDoesNotLeakConnection) { TEST_F(ClientConnectionTest, CallbackWithSharedRefDoesNotLeakConnection) {
const uint8_t msg1[5] = {1, 2, 3, 4, 5}; const uint8_t msg1[5] = {1, 2, 3, 4, 5};
ClientHandler<boost::asio::local::stream_protocol> client_handler = ClientHandler<local_stream_protocol> client_handler =
[](LocalClientConnection &client) {}; [](LocalClientConnection &client) {};
MessageHandler<boost::asio::local::stream_protocol> noop_handler = MessageHandler<local_stream_protocol> noop_handler =
[](std::shared_ptr<LocalClientConnection> client, int64_t message_type, [](std::shared_ptr<LocalClientConnection> client, int64_t message_type,
const uint8_t *message) {}; const uint8_t *message) {};
@ -162,10 +162,10 @@ TEST_F(ClientConnectionTest, ProcessBadMessage) {
const uint8_t arr[5] = {1, 2, 3, 4, 5}; const uint8_t arr[5] = {1, 2, 3, 4, 5};
int num_messages = 0; int num_messages = 0;
ClientHandler<boost::asio::local::stream_protocol> client_handler = ClientHandler<local_stream_protocol> client_handler =
[](LocalClientConnection &client) {}; [](LocalClientConnection &client) {};
MessageHandler<boost::asio::local::stream_protocol> message_handler = MessageHandler<local_stream_protocol> message_handler =
[&arr, &num_messages](std::shared_ptr<LocalClientConnection> client, [&arr, &num_messages](std::shared_ptr<LocalClientConnection> client,
int64_t message_type, const uint8_t *message) { int64_t message_type, const uint8_t *message) {
ASSERT_TRUE(!std::memcmp(arr, message, 5)); ASSERT_TRUE(!std::memcmp(arr, message, 5));

View file

@ -51,7 +51,7 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
node_manager_(main_service, node_manager_config, object_manager_, gcs_client_, node_manager_(main_service, node_manager_config, object_manager_, gcs_client_,
object_directory_), object_directory_),
socket_name_(socket_name), socket_name_(socket_name),
acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)), acceptor_(main_service, local_stream_protocol::endpoint(socket_name)),
socket_(main_service) { socket_(main_service) {
// Start listening for clients. // Start listening for clients.
DoAccept(); DoAccept();
@ -118,9 +118,9 @@ void Raylet::DoAccept() {
void Raylet::HandleAccept(const boost::system::error_code &error) { void Raylet::HandleAccept(const boost::system::error_code &error) {
if (!error) { if (!error) {
// TODO: typedef these handlers. // TODO: typedef these handlers.
ClientHandler<boost::asio::local::stream_protocol> client_handler = ClientHandler<local_stream_protocol> client_handler =
[this](LocalClientConnection &client) { node_manager_.ProcessNewClient(client); }; [this](LocalClientConnection &client) { node_manager_.ProcessNewClient(client); };
MessageHandler<boost::asio::local::stream_protocol> message_handler = MessageHandler<local_stream_protocol> message_handler =
[this](std::shared_ptr<LocalClientConnection> client, int64_t message_type, [this](std::shared_ptr<LocalClientConnection> client, int64_t message_type,
const uint8_t *message) { const uint8_t *message) {
node_manager_.ProcessClientMessage(client, message_type, message); node_manager_.ProcessClientMessage(client, message_type, message);

View file

@ -76,9 +76,9 @@ class Raylet {
std::string socket_name_; std::string socket_name_;
/// An acceptor for new clients. /// An acceptor for new clients.
boost::asio::local::stream_protocol::acceptor acceptor_; local_stream_protocol::acceptor acceptor_;
/// The socket to listen on for new clients. /// The socket to listen on for new clients.
boost::asio::local::stream_protocol::socket socket_; local_stream_protocol::socket socket_;
}; };
} // namespace raylet } // namespace raylet

View file

@ -85,7 +85,7 @@ class WorkerPoolTest : public ::testing::Test {
int64_t message_type, const uint8_t *message) { int64_t message_type, const uint8_t *message) {
HandleMessage(client, message_type, message); HandleMessage(client, message_type, message);
}; };
boost::asio::local::stream_protocol::socket socket(io_service_); local_stream_protocol::socket socket(io_service_);
auto client = auto client =
LocalClientConnection::Create(client_handler, message_handler, std::move(socket), LocalClientConnection::Create(client_handler, message_handler, std::move(socket),
"worker", {}, error_message_type_); "worker", {}, error_message_type_);