ray/thirdparty/patches/arrow-windows-socket.patch
mehrdadn b4030cdbbe
File HANDLE/descriptor translation layer for Windows (#7657)
* Use TCP sockets on Windows with custom HANDLE <-> FD translation layer

* Get Plasma working on Windows

Co-authored-by: Mehrdad <noreply@github.com>
2020-03-23 21:08:25 -07:00

159 lines
4.5 KiB
Diff

diff --git cpp/src/plasma/client.cc cpp/src/plasma/client.cc
--- cpp/src/plasma/client.cc
+++ cpp/src/plasma/client.cc
@@ -28,1 +28,5 @@
+#ifdef _WIN32
+#include <Windows.h>
+#else
#include <sys/mman.h>
+#endif
@@ -33,1 +37,3 @@
+#ifndef _WIN32
#include <unistd.h>
+#endif
@@ -178,6 +184,14 @@
+#ifdef _WIN32
+ pointer_ = reinterpret_cast<uint8_t*>(MapViewOfFile(reinterpret_cast<HANDLE>(fh_get(fd)), FILE_MAP_ALL_ACCESS, 0, 0, length_));
+ // TODO(pcm): Don't fail here, instead return a Status.
+ if (pointer_ == NULL) {
+ ARROW_LOG(FATAL) << "mmap failed";
+ }
+#else
pointer_ = reinterpret_cast<uint8_t*>(
mmap(NULL, length_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
// TODO(pcm): Don't fail here, instead return a Status.
if (pointer_ == MAP_FAILED) {
ARROW_LOG(FATAL) << "mmap failed";
}
+#endif
@@ -189,1 +195,6 @@
+ int r;
+#ifdef _WIN32
+ r = UnmapViewOfFile(pointer_) ? 0 : -1;
+#else
- int r = munmap(pointer_, length_);
+ r = munmap(pointer_, length_);
+#endif
@@ -990,5 +1009,17 @@
+#ifdef _WINSOCKAPI_
+ SOCKET sockets[2] = { INVALID_SOCKET, INVALID_SOCKET };
+ socketpair(AF_INET, SOCK_STREAM, 0, sockets);
+ sock[0] = fh_open(sockets[0], -1);
+ sock[1] = fh_open(sockets[1], -1);
+#else
socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
+#endif
// Make the socket non-blocking.
+#ifdef _WINSOCKAPI_
+ unsigned long value = 1;
+ ARROW_CHECK(ioctlsocket(sock[1], FIONBIO, &value) == 0);
+#else
int flags = fcntl(sock[1], F_GETFL, 0);
ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0);
+#endif
// Tell the Plasma store about the subscription.
diff --git cpp/src/plasma/fling.cc cpp/src/plasma/fling.cc
--- cpp/src/plasma/fling.cc
+++ cpp/src/plasma/fling.cc
@@ -19,7 +19,14 @@
#include "arrow/util/logging.h"
+#ifdef _WIN32
+#include <ws2tcpip.h> // socklen_t
+#else
+typedef int SOCKET;
+#endif
+
void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) {
iov->iov_base = buf;
iov->iov_len = 1;
+ msg->msg_flags = 0;
msg->msg_iov = iov;
@@ -36,3 +43,8 @@
+#ifdef _WIN32
+ SOCKET to_send = fh_get(fd);
+#else
+ SOCKET to_send = fd;
+#endif
- char buf[CMSG_SPACE(sizeof(int))];
+ char buf[CMSG_SPACE(sizeof(to_send))];
- memset(&buf, 0, CMSG_SPACE(sizeof(int)));
+ memset(&buf, 0, sizeof(buf));
@@ -47,7 +59,12 @@
- header->cmsg_len = CMSG_LEN(sizeof(int));
+ header->cmsg_len = CMSG_LEN(sizeof(to_send));
- memcpy(CMSG_DATA(header), reinterpret_cast<void*>(&fd), sizeof(int));
+ memcpy(CMSG_DATA(header), reinterpret_cast<void*>(&to_send), sizeof(to_send));
+#ifdef _WIN32
+ SOCKET sock = fh_get(conn);
+#else
+ SOCKET sock = conn;
+#endif
// Send file descriptor.
while (true) {
- ssize_t r = sendmsg(conn, &msg, 0);
+ ssize_t r = sendmsg(sock, &msg, 0);
if (r < 0) {
@@ -83,6 +100,11 @@
- char buf[CMSG_SPACE(sizeof(int))];
+ char buf[CMSG_SPACE(sizeof(SOCKET))];
init_msg(&msg, &iov, buf, sizeof(buf));
+#ifdef _WIN32
+ SOCKET sock = fh_get(conn);
+#else
+ int sock = conn;
+#endif
while (true) {
- ssize_t r = recvmsg(conn, &msg, 0);
+ ssize_t r = recvmsg(sock, &msg, 0);
if (r == -1) {
@@ -100,18 +122,22 @@
- int found_fd = -1;
+ SOCKET found_fd = -1;
int oh_noes = 0;
for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL;
header = CMSG_NXTHDR(&msg, header))
if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
ssize_t count = (header->cmsg_len -
(CMSG_DATA(header) - reinterpret_cast<unsigned char*>(header))) /
- sizeof(int);
+ sizeof(SOCKET);
for (int i = 0; i < count; ++i) {
- int fd = (reinterpret_cast<int*>(CMSG_DATA(header)))[i];
+ SOCKET fd = (reinterpret_cast<SOCKET*>(CMSG_DATA(header)))[i];
if (found_fd == -1) {
found_fd = fd;
} else {
+#ifdef _WIN32
+ closesocket(fd) == 0 || ((WSAGetLastError() == WSAENOTSOCK || WSAGetLastError() == WSANOTINITIALISED) && CloseHandle(reinterpret_cast<HANDLE>(fd)));
+#else
close(fd);
+#endif
oh_noes = 1;
}
}
}
@@ -122,8 +148,17 @@
if (oh_noes) {
+#ifdef _WIN32
+ closesocket(found_fd) == 0 || ((WSAGetLastError() == WSAENOTSOCK || WSAGetLastError() == WSANOTINITIALISED) && CloseHandle(reinterpret_cast<HANDLE>(found_fd)));
+#else
close(found_fd);
+#endif
errno = EBADMSG;
return -1;
}
- return found_fd;
+#ifdef _WIN32
+ int to_receive = fh_open(found_fd, -1);
+#else
+ int to_receive = found_fd;
+#endif
+ return to_receive;
}
--