#include "io.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" #ifndef _WIN32 /* This function is actually not declared in standard POSIX, so declare it. */ extern int usleep(useconds_t usec); #endif int bind_inet_sock(const int port, bool shall_listen) { struct sockaddr_in name; int socket_fd = socket(PF_INET, SOCK_STREAM, 0); if (socket_fd < 0) { LOG_ERROR("socket() failed for port %d.", port); return -1; } name.sin_family = AF_INET; name.sin_port = htons(port); name.sin_addr.s_addr = htonl(INADDR_ANY); int on = 1; /* TODO(pcm): http://stackoverflow.com/q/1150635 */ if (ioctl(socket_fd, FIONBIO, (char *) &on) < 0) { LOG_ERROR("ioctl failed"); close(socket_fd); return -1; } int *const pon = (int *const) & on; if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, pon, sizeof(on)) < 0) { LOG_ERROR("setsockopt failed for port %d", port); close(socket_fd); return -1; } if (bind(socket_fd, (struct sockaddr *) &name, sizeof(name)) < 0) { LOG_ERROR("Bind failed for port %d", port); close(socket_fd); return -1; } if (shall_listen && listen(socket_fd, 5) == -1) { LOG_ERROR("Could not listen to socket %d", port); close(socket_fd); return -1; } return socket_fd; } int bind_ipc_sock(const char *socket_pathname, bool shall_listen) { struct sockaddr_un socket_address; int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); if (socket_fd < 0) { LOG_ERROR("socket() failed for pathname %s.", socket_pathname); return -1; } /* Tell the system to allow the port to be reused. */ int on = 1; if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0) { LOG_ERROR("setsockopt failed for pathname %s", socket_pathname); close(socket_fd); return -1; } unlink(socket_pathname); memset(&socket_address, 0, sizeof(socket_address)); socket_address.sun_family = AF_UNIX; if (strlen(socket_pathname) + 1 > sizeof(socket_address.sun_path)) { LOG_ERROR("Socket pathname is too long."); close(socket_fd); return -1; } strncpy(socket_address.sun_path, socket_pathname, strlen(socket_pathname) + 1); if (bind(socket_fd, (struct sockaddr *) &socket_address, sizeof(socket_address)) != 0) { LOG_ERROR("Bind failed for pathname %s.", socket_pathname); close(socket_fd); return -1; } if (shall_listen && listen(socket_fd, 5) == -1) { LOG_ERROR("Could not listen to socket %s", socket_pathname); close(socket_fd); return -1; } return socket_fd; } int connect_ipc_sock_retry(const char *socket_pathname, int num_retries, int64_t timeout) { /* Pick the default values if the user did not specify. */ if (num_retries < 0) { num_retries = NUM_CONNECT_ATTEMPTS; } if (timeout < 0) { timeout = CONNECT_TIMEOUT_MS; } CHECK(socket_pathname); int fd = -1; for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) { fd = connect_ipc_sock(socket_pathname); if (fd >= 0) { break; } /* Sleep for timeout milliseconds. */ usleep(timeout * 1000); } /* If we could not connect to the socket, exit. */ if (fd == -1) { LOG_FATAL("Could not connect to socket %s", socket_pathname); } return fd; } int connect_ipc_sock(const char *socket_pathname) { struct sockaddr_un socket_address; int socket_fd; socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); if (socket_fd < 0) { LOG_ERROR("socket() failed for pathname %s.", socket_pathname); return -1; } memset(&socket_address, 0, sizeof(socket_address)); socket_address.sun_family = AF_UNIX; if (strlen(socket_pathname) + 1 > sizeof(socket_address.sun_path)) { LOG_ERROR("Socket pathname is too long."); return -1; } strncpy(socket_address.sun_path, socket_pathname, strlen(socket_pathname) + 1); if (connect(socket_fd, (struct sockaddr *) &socket_address, sizeof(socket_address)) != 0) { LOG_ERROR("Connection to socket failed for pathname %s.", socket_pathname); return -1; } return socket_fd; } int connect_inet_sock_retry(const char *ip_addr, int port, int num_retries, int64_t timeout) { /* Pick the default values if the user did not specify. */ if (num_retries < 0) { num_retries = NUM_CONNECT_ATTEMPTS; } if (timeout < 0) { timeout = CONNECT_TIMEOUT_MS; } CHECK(ip_addr); int fd = -1; for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) { fd = connect_inet_sock(ip_addr, port); if (fd >= 0) { break; } /* Sleep for timeout milliseconds. */ usleep(timeout * 1000); } /* If we could not connect to the socket, exit. */ if (fd == -1) { LOG_FATAL("Could not connect to address %s:%d", ip_addr, port); } return fd; } int connect_inet_sock(const char *ip_addr, int port) { int fd = socket(PF_INET, SOCK_STREAM, 0); if (fd < 0) { LOG_ERROR("socket() failed for address %s:%d.", ip_addr, port); return -1; } struct hostent *manager = gethostbyname(ip_addr); /* TODO(pcm): cache this */ if (!manager) { LOG_ERROR("Failed to get hostname from address %s:%d.", ip_addr, port); return -1; } struct sockaddr_in addr; addr.sin_family = AF_INET; memcpy(&addr.sin_addr.s_addr, manager->h_addr_list[0], manager->h_length); addr.sin_port = htons(port); if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) != 0) { LOG_ERROR("Connection to socket failed for address %s:%d.", ip_addr, port); return -1; } return fd; } int accept_client(int socket_fd) { int client_fd = accept(socket_fd, NULL, NULL); if (client_fd < 0) { LOG_ERROR("Error reading from socket."); return -1; } return client_fd; } int write_bytes(int fd, uint8_t *cursor, size_t length) { ssize_t nbytes = 0; size_t bytesleft = length; size_t offset = 0; while (bytesleft > 0) { /* While we haven't written the whole message, write to the file * descriptor, advance the cursor, and decrease the amount left to write. */ nbytes = write(fd, cursor + offset, bytesleft); if (nbytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } return -1; /* Errno will be set. */ } else if (0 == nbytes) { /* Encountered early EOF. */ return -1; } CHECK(nbytes > 0); bytesleft -= nbytes; offset += nbytes; } return 0; } int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes) { int64_t version = RAY_PROTOCOL_VERSION; int closed; closed = write_bytes(fd, (uint8_t *) &version, sizeof(version)); if (closed) { return closed; } closed = write_bytes(fd, (uint8_t *) &type, sizeof(type)); if (closed) { return closed; } closed = write_bytes(fd, (uint8_t *) &length, sizeof(length)); if (closed) { return closed; } closed = write_bytes(fd, bytes, length * sizeof(char)); if (closed) { return closed; } return 0; } int read_bytes(int fd, uint8_t *cursor, size_t length) { ssize_t nbytes = 0; /* Termination condition: EOF or read 'length' bytes total. */ size_t bytesleft = length; size_t offset = 0; while (bytesleft > 0) { nbytes = read(fd, cursor + offset, bytesleft); if (nbytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } return -1; /* Errno will be set. */ } else if (0 == nbytes) { /* Encountered early EOF. */ return -1; } CHECK(nbytes > 0); bytesleft -= nbytes; offset += nbytes; } return 0; } void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) { int64_t version; int closed = read_bytes(fd, (uint8_t *) &version, sizeof(version)); if (closed) { goto disconnected; } CHECK(version == RAY_PROTOCOL_VERSION); closed = read_bytes(fd, (uint8_t *) type, sizeof(*type)); if (closed) { goto disconnected; } closed = read_bytes(fd, (uint8_t *) length, sizeof(*length)); if (closed) { goto disconnected; } *bytes = (uint8_t *) malloc(*length * sizeof(uint8_t)); closed = read_bytes(fd, *bytes, *length); if (closed) { free(*bytes); goto disconnected; } return; disconnected: /* Handle the case in which the socket is closed. */ *type = DISCONNECT_CLIENT; *length = 0; *bytes = NULL; return; } int64_t read_buffer(int fd, int64_t *type, UT_array *buffer) { int64_t version; int closed = read_bytes(fd, (uint8_t *) &version, sizeof(version)); if (closed) { goto disconnected; } CHECK(version == RAY_PROTOCOL_VERSION); int64_t length; closed = read_bytes(fd, (uint8_t *) type, sizeof(*type)); if (closed) { goto disconnected; } closed = read_bytes(fd, (uint8_t *) &length, sizeof(length)); if (closed) { goto disconnected; } if (length > utarray_len(buffer)) { utarray_resize(buffer, length); } closed = read_bytes(fd, (uint8_t *) utarray_front(buffer), length); if (closed) { goto disconnected; } return length; disconnected: /* Handle the case in which the socket is closed. */ *type = DISCONNECT_CLIENT; return 0; } void write_log_message(int fd, const char *message) { /* Account for the \0 at the end of the string. */ write_message(fd, LOG_MESSAGE, strlen(message) + 1, (uint8_t *) message); } char *read_log_message(int fd) { uint8_t *bytes; int64_t type; int64_t length; read_message(fd, &type, &length, &bytes); CHECK(type == LOG_MESSAGE); return (char *) bytes; } void write_formatted_log_message(int socket_fd, const char *format, ...) { UT_string *cmd; va_list ap; utstring_new(cmd); va_start(ap, format); utstring_printf_va(cmd, format, ap); va_end(ap); write_log_message(socket_fd, utstring_body(cmd)); utstring_free(cmd); }