2016-09-07 20:19:37 -07:00
|
|
|
/* PLASMA MANAGER: Local to a node, connects to other managers to send and
|
|
|
|
* receive objects from them
|
|
|
|
*
|
|
|
|
* The storage manager listens on its main listening port, and if a request for
|
|
|
|
* transfering an object to another object store comes in, it ships the data
|
|
|
|
* using a new connection to the target object manager. */
|
2016-08-17 12:54:34 -07:00
|
|
|
|
|
|
|
#include <stdio.h>
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <unistd.h>
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <sys/mman.h>
|
|
|
|
#include <sys/types.h>
|
|
|
|
#include <sys/ioctl.h>
|
|
|
|
#include <sys/socket.h>
|
|
|
|
#include <sys/un.h>
|
|
|
|
#include <strings.h>
|
|
|
|
#include <poll.h>
|
|
|
|
#include <assert.h>
|
|
|
|
#include <netinet/in.h>
|
|
|
|
#include <netdb.h>
|
|
|
|
|
2016-09-07 20:19:37 -07:00
|
|
|
#include "event_loop.h"
|
2016-08-17 12:54:34 -07:00
|
|
|
#include "plasma.h"
|
2016-09-05 15:34:11 -07:00
|
|
|
#include "plasma_manager.h"
|
2016-08-17 12:54:34 -07:00
|
|
|
|
2016-09-07 20:19:37 -07:00
|
|
|
typedef struct {
|
|
|
|
/* Name of the socket connecting to local plasma store. */
|
|
|
|
const char* store_socket_name;
|
|
|
|
/* Event loop. */
|
|
|
|
event_loop* loop;
|
|
|
|
} plasma_manager_state;
|
|
|
|
|
|
|
|
/* Initialize the plasma manager. This function initializes the event loop
|
|
|
|
* of the plasma manager, and stores the address 'store_socket_name' of
|
|
|
|
* the local plasma store socket. */
|
2016-09-08 15:28:27 -07:00
|
|
|
void init_plasma_manager(plasma_manager_state* s,
|
|
|
|
const char* store_socket_name) {
|
2016-09-07 20:19:37 -07:00
|
|
|
s->loop = malloc(sizeof(event_loop));
|
|
|
|
event_loop_init(s->loop);
|
2016-08-17 12:54:34 -07:00
|
|
|
s->store_socket_name = store_socket_name;
|
|
|
|
}
|
|
|
|
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Start transfering data to another object store manager. This establishes
|
|
|
|
* a connection to both the manager and the local object store and sends
|
|
|
|
* the data header to the other object manager. */
|
|
|
|
void initiate_transfer(plasma_manager_state* s, plasma_request* req) {
|
|
|
|
int store_conn = plasma_store_connect(s->store_socket_name);
|
2016-09-14 14:20:34 -07:00
|
|
|
uint8_t* data;
|
|
|
|
int64_t data_size;
|
|
|
|
uint8_t* metadata;
|
|
|
|
int64_t metadata_size;
|
|
|
|
plasma_get(store_conn, req->object_id, &data_size, &data, &metadata_size,
|
|
|
|
&metadata);
|
|
|
|
assert(metadata == data + data_size);
|
|
|
|
plasma_buffer buf = {.object_id = req->object_id,
|
|
|
|
.data = data, /* We treat this as a pointer to the
|
|
|
|
concatenated data and metadata. */
|
|
|
|
.data_size = data_size,
|
|
|
|
.metadata_size = metadata_size,
|
|
|
|
.writable = 0};
|
2016-09-10 17:47:37 -07:00
|
|
|
char ip_addr[32];
|
2016-09-08 15:28:27 -07:00
|
|
|
snprintf(ip_addr, 32, "%d.%d.%d.%d", req->addr[0], req->addr[1], req->addr[2],
|
|
|
|
req->addr[3]);
|
2016-09-05 15:34:11 -07:00
|
|
|
|
|
|
|
int fd = plasma_manager_connect(&ip_addr[0], req->port);
|
2016-09-08 15:28:27 -07:00
|
|
|
data_connection conn = {.type = DATA_CONNECTION_WRITE,
|
|
|
|
.store_conn = store_conn,
|
|
|
|
.buf = buf,
|
|
|
|
.cursor = 0};
|
2016-09-07 20:19:37 -07:00
|
|
|
event_loop_attach(s->loop, CONNECTION_DATA, &conn, fd, POLLOUT);
|
2016-09-14 14:20:34 -07:00
|
|
|
plasma_request manager_req = {.type = PLASMA_DATA,
|
|
|
|
.object_id = req->object_id,
|
|
|
|
.data_size = buf.data_size,
|
|
|
|
.metadata_size = buf.metadata_size};
|
2016-08-17 12:54:34 -07:00
|
|
|
plasma_send(fd, &manager_req);
|
|
|
|
}
|
|
|
|
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Start reading data from another object manager.
|
|
|
|
* Initializes the object we are going to write to in the
|
|
|
|
* local plasma store and then switches the data socket to reading mode. */
|
2016-09-08 15:28:27 -07:00
|
|
|
void start_reading_data(int64_t index,
|
|
|
|
plasma_manager_state* s,
|
|
|
|
plasma_request* req) {
|
2016-09-07 20:19:37 -07:00
|
|
|
int store_conn = plasma_store_connect(s->store_socket_name);
|
2016-09-14 14:20:34 -07:00
|
|
|
plasma_buffer buf = {.object_id = req->object_id,
|
|
|
|
.data_size = req->data_size,
|
|
|
|
.metadata_size = req->metadata_size,
|
|
|
|
.writable = 1};
|
|
|
|
plasma_create(store_conn, req->object_id, req->data_size, NULL,
|
|
|
|
req->metadata_size, &buf.data);
|
2016-09-08 15:28:27 -07:00
|
|
|
data_connection conn = {.type = DATA_CONNECTION_READ,
|
|
|
|
.store_conn = store_conn,
|
|
|
|
.buf = buf,
|
|
|
|
.cursor = 0};
|
2016-09-07 20:19:37 -07:00
|
|
|
event_loop_set_connection(s->loop, index, &conn);
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
|
|
|
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Handle a command request that came in through a socket (transfering data,
|
|
|
|
* or accepting incoming data). */
|
2016-09-08 15:28:27 -07:00
|
|
|
void process_command(int64_t id,
|
|
|
|
plasma_manager_state* state,
|
|
|
|
plasma_request* req) {
|
2016-08-17 12:54:34 -07:00
|
|
|
switch (req->type) {
|
|
|
|
case PLASMA_TRANSFER:
|
2016-08-22 15:30:16 -07:00
|
|
|
LOG_INFO("transfering object to manager with port %d", req->port);
|
2016-08-17 12:54:34 -07:00
|
|
|
initiate_transfer(state, req);
|
|
|
|
break;
|
|
|
|
case PLASMA_DATA:
|
|
|
|
LOG_INFO("starting to stream data");
|
2016-09-07 20:19:37 -07:00
|
|
|
start_reading_data(id, state, req);
|
2016-08-17 12:54:34 -07:00
|
|
|
break;
|
|
|
|
default:
|
|
|
|
LOG_ERR("invalid request %d", req->type);
|
|
|
|
exit(-1);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Handle data or command event incoming on socket with index "index". */
|
2016-09-08 15:28:27 -07:00
|
|
|
void read_from_socket(plasma_manager_state* state,
|
|
|
|
struct pollfd* waiting,
|
|
|
|
int64_t index,
|
|
|
|
plasma_request* req) {
|
2016-08-17 12:54:34 -07:00
|
|
|
ssize_t r, s;
|
2016-09-08 15:28:27 -07:00
|
|
|
data_connection* conn = event_loop_get_connection(state->loop, index);
|
2016-09-07 20:19:37 -07:00
|
|
|
switch (conn->type) {
|
2016-09-08 15:28:27 -07:00
|
|
|
case DATA_CONNECTION_HEADER:
|
|
|
|
r = read(waiting->fd, req, sizeof(plasma_request));
|
|
|
|
if (r == -1) {
|
|
|
|
LOG_ERR("read error");
|
|
|
|
} else if (r == 0) {
|
|
|
|
LOG_INFO("connection with id %" PRId64 " disconnected", index);
|
|
|
|
event_loop_detach(state->loop, index, 1);
|
|
|
|
} else {
|
|
|
|
process_command(index, state, req);
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
case DATA_CONNECTION_READ:
|
|
|
|
LOG_DEBUG("polled DATA_CONNECTION_READ");
|
|
|
|
r = read(waiting->fd, conn->buf.data + conn->cursor, BUFSIZE);
|
|
|
|
if (r == -1) {
|
|
|
|
LOG_ERR("read error");
|
|
|
|
} else if (r == 0) {
|
|
|
|
LOG_INFO("end of file");
|
|
|
|
} else {
|
|
|
|
conn->cursor += r;
|
|
|
|
}
|
|
|
|
if (r == 0) {
|
|
|
|
LOG_DEBUG("reading on channel %" PRId64 " finished", index);
|
|
|
|
plasma_seal(conn->store_conn, conn->buf.object_id);
|
|
|
|
close(conn->store_conn);
|
|
|
|
event_loop_detach(state->loop, index, 1);
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
case DATA_CONNECTION_WRITE:
|
|
|
|
LOG_DEBUG("polled DATA_CONNECTION_WRITE");
|
2016-09-14 14:20:34 -07:00
|
|
|
s = conn->buf.data_size + conn->buf.metadata_size - conn->cursor;
|
2016-09-08 15:28:27 -07:00
|
|
|
if (s > BUFSIZE)
|
|
|
|
s = BUFSIZE;
|
|
|
|
r = write(waiting->fd, conn->buf.data + conn->cursor, s);
|
|
|
|
if (r != s) {
|
|
|
|
if (r > 0) {
|
|
|
|
LOG_ERR("partial write on fd %d", waiting->fd);
|
2016-08-17 12:54:34 -07:00
|
|
|
} else {
|
2016-09-08 15:28:27 -07:00
|
|
|
LOG_ERR("write error");
|
|
|
|
exit(-1);
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
2016-09-08 15:28:27 -07:00
|
|
|
} else {
|
|
|
|
conn->cursor += r;
|
|
|
|
}
|
|
|
|
if (r == 0) {
|
|
|
|
LOG_DEBUG("writing on channel %" PRId64 " finished", index);
|
|
|
|
close(conn->store_conn);
|
|
|
|
event_loop_detach(state->loop, index, 1);
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
LOG_ERR("invalid connection type");
|
|
|
|
exit(-1);
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Main event loop of the plasma manager. */
|
|
|
|
void run_event_loop(int sock, plasma_manager_state* s) {
|
|
|
|
/* Add listening socket. */
|
|
|
|
event_loop_attach(s->loop, CONNECTION_LISTENER, NULL, sock, POLLIN);
|
2016-08-17 12:54:34 -07:00
|
|
|
plasma_request req;
|
|
|
|
while (1) {
|
2016-09-07 20:19:37 -07:00
|
|
|
int num_ready = event_loop_poll(s->loop);
|
2016-08-17 12:54:34 -07:00
|
|
|
if (num_ready < 0) {
|
|
|
|
LOG_ERR("poll failed");
|
|
|
|
exit(-1);
|
|
|
|
}
|
2016-09-07 20:19:37 -07:00
|
|
|
for (int i = 0; i < event_loop_size(s->loop); ++i) {
|
2016-09-08 15:28:27 -07:00
|
|
|
struct pollfd* waiting = event_loop_get(s->loop, i);
|
2016-09-07 20:19:37 -07:00
|
|
|
if (waiting->revents == 0)
|
2016-08-17 12:54:34 -07:00
|
|
|
continue;
|
2016-09-07 20:19:37 -07:00
|
|
|
if (waiting->fd == sock) {
|
|
|
|
/* Handle new incoming connections. */
|
2016-08-17 12:54:34 -07:00
|
|
|
int new_socket = accept(sock, NULL, NULL);
|
|
|
|
if (new_socket < 0) {
|
|
|
|
if (errno != EWOULDBLOCK) {
|
|
|
|
LOG_ERR("accept failed");
|
|
|
|
exit(-1);
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
2016-09-08 15:28:27 -07:00
|
|
|
data_connection conn = {.type = DATA_CONNECTION_HEADER};
|
2016-09-07 20:19:37 -07:00
|
|
|
event_loop_attach(s->loop, CONNECTION_DATA, &conn, new_socket, POLLIN);
|
|
|
|
LOG_INFO("new connection with id %" PRId64, event_loop_size(s->loop));
|
2016-08-17 12:54:34 -07:00
|
|
|
} else {
|
2016-09-07 20:19:37 -07:00
|
|
|
read_from_socket(s, waiting, i, &req);
|
2016-08-18 09:56:20 -07:00
|
|
|
}
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-09-08 15:28:27 -07:00
|
|
|
void start_server(const char* store_socket_name,
|
|
|
|
const char* master_addr,
|
|
|
|
int port) {
|
2016-08-17 12:54:34 -07:00
|
|
|
struct sockaddr_in name;
|
|
|
|
int sock = socket(PF_INET, SOCK_STREAM, 0);
|
|
|
|
if (sock < 0) {
|
|
|
|
LOG_ERR("could not create socket");
|
|
|
|
exit(-1);
|
|
|
|
}
|
|
|
|
name.sin_family = AF_INET;
|
2016-08-22 15:30:16 -07:00
|
|
|
name.sin_port = htons(port);
|
2016-08-17 12:54:34 -07:00
|
|
|
name.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
|
|
int on = 1;
|
2016-09-07 20:19:37 -07:00
|
|
|
/* TODO(pcm): http://stackoverflow.com/q/1150635 */
|
2016-08-17 12:54:34 -07:00
|
|
|
if (ioctl(sock, FIONBIO, (char*) &on) < 0) {
|
|
|
|
LOG_ERR("ioctl failed");
|
|
|
|
close(sock);
|
|
|
|
exit(-1);
|
|
|
|
}
|
2016-09-08 15:28:27 -07:00
|
|
|
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
|
2016-08-17 12:54:34 -07:00
|
|
|
if (bind(sock, (struct sockaddr*) &name, sizeof(name)) < 0) {
|
|
|
|
LOG_ERR("could not bind socket");
|
|
|
|
exit(-1);
|
|
|
|
}
|
2016-08-22 15:30:16 -07:00
|
|
|
LOG_INFO("listening on port %d", port);
|
2016-08-17 12:54:34 -07:00
|
|
|
if (listen(sock, 5) == -1) {
|
|
|
|
LOG_ERR("could not listen to socket");
|
|
|
|
exit(-1);
|
|
|
|
}
|
|
|
|
plasma_manager_state state;
|
2016-09-07 20:19:37 -07:00
|
|
|
init_plasma_manager(&state, store_socket_name);
|
|
|
|
run_event_loop(sock, &state);
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
int main(int argc, char* argv[]) {
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Socket name of the plasma store this manager is connected to. */
|
2016-09-08 15:28:27 -07:00
|
|
|
char* store_socket_name = NULL;
|
2016-09-07 20:19:37 -07:00
|
|
|
/* IP address of this node. */
|
2016-09-08 15:28:27 -07:00
|
|
|
char* master_addr = NULL;
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Port number the manager should use. */
|
2016-08-22 15:30:16 -07:00
|
|
|
int port;
|
2016-08-17 12:54:34 -07:00
|
|
|
int c;
|
2016-08-22 15:30:16 -07:00
|
|
|
while ((c = getopt(argc, argv, "s:m:p:")) != -1) {
|
2016-08-17 12:54:34 -07:00
|
|
|
switch (c) {
|
|
|
|
case 's':
|
|
|
|
store_socket_name = optarg;
|
|
|
|
break;
|
|
|
|
case 'm':
|
|
|
|
master_addr = optarg;
|
|
|
|
break;
|
2016-08-22 15:30:16 -07:00
|
|
|
case 'p':
|
|
|
|
port = atoi(optarg);
|
|
|
|
break;
|
2016-08-17 12:54:34 -07:00
|
|
|
default:
|
|
|
|
LOG_ERR("unknown option %c", c);
|
|
|
|
exit(-1);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (!store_socket_name) {
|
2016-09-08 15:28:27 -07:00
|
|
|
LOG_ERR(
|
|
|
|
"please specify socket for connecting to the plasma store with -s "
|
|
|
|
"switch");
|
2016-08-17 12:54:34 -07:00
|
|
|
exit(-1);
|
|
|
|
}
|
|
|
|
if (!master_addr) {
|
2016-09-08 15:28:27 -07:00
|
|
|
LOG_ERR(
|
|
|
|
"please specify ip address of the current host in the format "
|
|
|
|
"123.456.789.10 with -m switch");
|
2016-08-17 12:54:34 -07:00
|
|
|
exit(-1);
|
|
|
|
}
|
2016-08-22 15:30:16 -07:00
|
|
|
start_server(store_socket_name, master_addr, port);
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|