2016-09-07 20:19:37 -07:00
|
|
|
/* PLASMA MANAGER: Local to a node, connects to other managers to send and
|
|
|
|
* receive objects from them
|
|
|
|
*
|
|
|
|
* The storage manager listens on its main listening port, and if a request for
|
|
|
|
* transfering an object to another object store comes in, it ships the data
|
|
|
|
* using a new connection to the target object manager. */
|
2016-08-17 12:54:34 -07:00
|
|
|
|
2016-10-14 19:27:17 -07:00
|
|
|
#include <fcntl.h>
|
2016-08-17 12:54:34 -07:00
|
|
|
#include <stdio.h>
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <unistd.h>
|
2016-10-11 17:58:14 -07:00
|
|
|
#include <signal.h>
|
2016-08-17 12:54:34 -07:00
|
|
|
#include <stdlib.h>
|
2016-10-29 17:30:34 -07:00
|
|
|
#include <stdbool.h>
|
2016-08-17 12:54:34 -07:00
|
|
|
#include <sys/mman.h>
|
|
|
|
#include <sys/types.h>
|
|
|
|
#include <sys/socket.h>
|
|
|
|
#include <sys/un.h>
|
|
|
|
#include <strings.h>
|
|
|
|
#include <poll.h>
|
|
|
|
#include <assert.h>
|
|
|
|
#include <netinet/in.h>
|
|
|
|
|
2016-10-03 18:29:18 -07:00
|
|
|
#include "uthash.h"
|
|
|
|
#include "utlist.h"
|
2016-10-18 18:20:59 -07:00
|
|
|
#include "utarray.h"
|
2016-10-03 18:29:18 -07:00
|
|
|
#include "utstring.h"
|
|
|
|
#include "common.h"
|
|
|
|
#include "io.h"
|
2016-11-18 19:57:51 -08:00
|
|
|
#include "net.h"
|
2016-09-07 20:19:37 -07:00
|
|
|
#include "event_loop.h"
|
2016-08-17 12:54:34 -07:00
|
|
|
#include "plasma.h"
|
2016-12-20 14:46:25 -08:00
|
|
|
#include "plasma_protocol.h"
|
2016-09-15 15:39:33 -07:00
|
|
|
#include "plasma_client.h"
|
2016-09-05 15:34:11 -07:00
|
|
|
#include "plasma_manager.h"
|
2016-10-18 18:20:59 -07:00
|
|
|
#include "state/db.h"
|
|
|
|
#include "state/object_table.h"
|
|
|
|
|
2016-12-01 02:15:21 -08:00
|
|
|
/**
|
|
|
|
* Process either the fetch or the status request.
|
|
|
|
*
|
|
|
|
* @param client_conn Client connection.
|
|
|
|
* @param object_id ID of the object for which we process this request.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
2016-12-10 21:22:05 -08:00
|
|
|
void process_status_request(client_connection *client_conn,
|
|
|
|
object_id object_id);
|
2016-12-01 02:15:21 -08:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Request the transfer from a remote node or get the status of
|
|
|
|
* a given object. This is called for an object that is stored at
|
|
|
|
* a remote Plasma Store.
|
|
|
|
*
|
|
|
|
* @param object_id ID of the object to transfer or to get its status.
|
|
|
|
* @param manager_cont Number of remote nodes object_id is stored at.
|
|
|
|
* @param manager_vector Array containing the Plasma Managers
|
|
|
|
* running at the nodes where object_id is stored.
|
|
|
|
* @param context Client connection.
|
|
|
|
* @return Status of object_id as defined in plasma.h
|
|
|
|
*/
|
2016-12-10 21:22:05 -08:00
|
|
|
int request_status(object_id object_id,
|
|
|
|
int manager_count,
|
|
|
|
const char *manager_vector[],
|
|
|
|
void *context);
|
2016-12-01 02:15:21 -08:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Send requested object_id back to the Plasma Manager identified
|
|
|
|
* by (addr, port) which requested it. This is done via a
|
2016-12-20 14:46:25 -08:00
|
|
|
* data Request message.
|
2016-12-01 02:15:21 -08:00
|
|
|
*
|
|
|
|
* @param loop
|
|
|
|
* @param object_id The ID of the object being transferred to (addr, port).
|
|
|
|
* @param addr The address of the Plasma Manager object_id is sent to.
|
|
|
|
* @param port The port number of the Plasma Manager object_id is sent to.
|
|
|
|
* @param conn The client connection object.
|
|
|
|
*/
|
|
|
|
void process_transfer_request(event_loop *loop,
|
|
|
|
object_id object_id,
|
2016-12-20 14:46:25 -08:00
|
|
|
const char *addr,
|
2016-12-01 02:15:21 -08:00
|
|
|
int port,
|
|
|
|
client_connection *conn);
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Receive object_id requested by this Plamsa Manager from the remote Plasma
|
2016-12-20 14:46:25 -08:00
|
|
|
* Manager identified by client_sock. The object_id is sent via the data request
|
2016-12-01 02:15:21 -08:00
|
|
|
* message.
|
|
|
|
*
|
|
|
|
* @param loop The event data structure.
|
|
|
|
* @param client_sock The sender's socket.
|
|
|
|
* @param object_id ID of the object being received.
|
|
|
|
* @param data_size Size of the data of object_id.
|
|
|
|
* @param metadata_size Size of the metadata of object_id.
|
|
|
|
* @param conn The connection object.
|
|
|
|
*/
|
|
|
|
void process_data_request(event_loop *loop,
|
|
|
|
int client_sock,
|
|
|
|
object_id object_id,
|
|
|
|
int64_t data_size,
|
|
|
|
int64_t metadata_size,
|
|
|
|
client_connection *conn);
|
|
|
|
|
|
|
|
/** Entry of the hashtable of objects that are available locally. */
|
2016-10-29 17:30:34 -07:00
|
|
|
typedef struct {
|
|
|
|
/** Object id of this object. */
|
|
|
|
object_id object_id;
|
|
|
|
/** Handle for the uthash table. */
|
|
|
|
UT_hash_handle hh;
|
|
|
|
} available_object;
|
|
|
|
|
2016-12-03 19:09:05 -08:00
|
|
|
typedef struct {
|
|
|
|
/** The ID of the object we are fetching or waiting for. */
|
|
|
|
object_id object_id;
|
|
|
|
/** Pointer to the array containing the manager locations of this object. This
|
|
|
|
* struct owns and must free each entry. */
|
|
|
|
char **manager_vector;
|
|
|
|
/** The number of manager locations in the array manager_vector. */
|
|
|
|
int manager_count;
|
|
|
|
/** The next manager we should try to contact. This is set to an index in
|
|
|
|
* manager_vector in the retry handler, in case the current attempt fails to
|
|
|
|
* contact a manager. */
|
|
|
|
int next_manager;
|
|
|
|
/** Handle for the uthash table in the manager state that keeps track of
|
|
|
|
* outstanding fetch requests. */
|
|
|
|
UT_hash_handle hh;
|
2016-12-10 21:22:05 -08:00
|
|
|
} fetch_request;
|
2016-12-03 19:09:05 -08:00
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
/**
|
|
|
|
* There are fundamentally two data structures used for handling wait requests.
|
|
|
|
* There is the "wait_request" struct and the "object_wait_requests" struct. A
|
|
|
|
* wait_request keeps track of all of the object IDs that a wait_request is
|
|
|
|
* waiting for. An object_wait_requests struct keeps track of all of the
|
|
|
|
* wait_request structs that are waiting for a particular object iD. The
|
|
|
|
* plasma_manager_state contains a hash table mapping object IDs to their
|
|
|
|
* coresponding object_wait_requests structs.
|
|
|
|
*
|
|
|
|
* These data structures are updated by several methods:
|
|
|
|
* - add_wait_request_for_object adds a wait_request to the
|
|
|
|
* object_wait_requests struct corresponding to a particular object ID. This
|
|
|
|
* is called when a client calls plasma_wait.
|
|
|
|
* - remove_wait_request_for_object removes a wait_request from an
|
|
|
|
* object_wait_requests struct. When a wait request returns, this method is
|
|
|
|
* called for all of the object IDs involved in that wait_request.
|
|
|
|
* - update_object_wait_requests removes an object_wait_requests struct and
|
|
|
|
* does some processing for each wait_request involved in that
|
|
|
|
* object_wait_requests struct.
|
|
|
|
*/
|
2016-12-09 19:26:11 -08:00
|
|
|
typedef struct {
|
|
|
|
/** The client connection that called wait. */
|
|
|
|
client_connection *client_conn;
|
|
|
|
/** The ID of the timer that will time out and cause this wait to return to
|
|
|
|
* the client if it hasn't already returned. */
|
|
|
|
int64_t timer;
|
|
|
|
/** The number of objects in this wait request. */
|
|
|
|
int64_t num_object_requests;
|
|
|
|
/** The object requests for this wait request. Each object request has a
|
|
|
|
* status field which is either PLASMA_QUERY_LOCAL or PLASMA_QUERY_ANYWHERE.
|
|
|
|
*/
|
|
|
|
object_request *object_requests;
|
|
|
|
/** The minimum number of objects to wait for in this request. */
|
|
|
|
int64_t num_objects_to_wait_for;
|
|
|
|
/** The number of object requests in this wait request that are already
|
|
|
|
* satisfied. */
|
|
|
|
int64_t num_satisfied;
|
2016-12-10 21:22:05 -08:00
|
|
|
} wait_request;
|
2016-12-09 19:26:11 -08:00
|
|
|
|
|
|
|
/** This is used to define the utarray of wait requests in the
|
|
|
|
* object_wait_requests struct. */
|
2016-12-10 21:22:05 -08:00
|
|
|
UT_icd wait_request_icd = {sizeof(wait_request *), NULL, NULL, NULL};
|
2016-12-09 19:26:11 -08:00
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
/** The ID of the object. This is used as a key in a hash table. */
|
|
|
|
object_id object_id;
|
|
|
|
/** An array of the wait requests involving this object ID. */
|
|
|
|
UT_array *wait_requests;
|
|
|
|
/** Handle for the uthash table in the manager state that keeps track of the
|
|
|
|
* wait requests involving this object ID. */
|
|
|
|
UT_hash_handle hh;
|
|
|
|
} object_wait_requests;
|
|
|
|
|
2016-10-28 11:56:16 -07:00
|
|
|
struct plasma_manager_state {
|
2016-10-18 18:20:59 -07:00
|
|
|
/** Event loop. */
|
|
|
|
event_loop *loop;
|
2016-10-03 18:29:18 -07:00
|
|
|
/** Connection to the local plasma store for reading or writing data. */
|
2016-10-18 18:20:59 -07:00
|
|
|
plasma_connection *plasma_conn;
|
|
|
|
/** Hash table of all contexts for active connections to
|
|
|
|
* other plasma managers. These are used for writing data to
|
|
|
|
* other plasma stores. */
|
2016-10-03 18:29:18 -07:00
|
|
|
client_connection *manager_connections;
|
2016-10-18 18:20:59 -07:00
|
|
|
db_handle *db;
|
|
|
|
/** Our address. */
|
2016-12-20 14:46:25 -08:00
|
|
|
const char *addr;
|
2016-10-18 18:20:59 -07:00
|
|
|
/** Our port. */
|
|
|
|
int port;
|
2016-12-03 19:09:05 -08:00
|
|
|
/** Hash table of outstanding fetch requests. The key is the object ID. The
|
|
|
|
* value is the data needed to perform the fetch. */
|
2016-12-10 21:22:05 -08:00
|
|
|
fetch_request *fetch_requests;
|
2016-12-09 19:26:11 -08:00
|
|
|
/** A hash table mapping object IDs to a vector of the wait requests that
|
|
|
|
* are waiting for the object to arrive locally. */
|
|
|
|
object_wait_requests *object_wait_requests_local;
|
|
|
|
/** A hash table mapping object IDs to a vector of the wait requests that
|
|
|
|
* are waiting for the object to be available somewhere in the system. */
|
|
|
|
object_wait_requests *object_wait_requests_remote;
|
2016-10-29 17:30:34 -07:00
|
|
|
/** Initialize an empty hash map for the cache of local available object. */
|
|
|
|
available_object *local_available_objects;
|
2016-12-20 14:46:25 -08:00
|
|
|
/** Buffer that holds memory for serializing plasma protocol messages. */
|
|
|
|
protocol_builder *builder;
|
2016-10-28 11:56:16 -07:00
|
|
|
};
|
2016-09-07 20:19:37 -07:00
|
|
|
|
2016-10-18 18:20:59 -07:00
|
|
|
plasma_manager_state *g_manager_state = NULL;
|
|
|
|
|
|
|
|
/* The context for fetch and wait requests. These are per client, per object. */
|
2016-12-01 02:15:21 -08:00
|
|
|
struct client_object_request {
|
2016-10-18 18:20:59 -07:00
|
|
|
/** The ID of the object we are fetching or waiting for. */
|
|
|
|
object_id object_id;
|
|
|
|
/** The client connection context, shared between other
|
2016-12-01 02:15:21 -08:00
|
|
|
* client_object_requests for the same client. */
|
2016-10-18 18:20:59 -07:00
|
|
|
client_connection *client_conn;
|
|
|
|
/** The ID for the timer that will time out the current request to the state
|
|
|
|
* database or another plasma manager. */
|
|
|
|
int64_t timer;
|
|
|
|
/** How many retries we have left for the request. Decremented on every
|
|
|
|
* timeout. */
|
|
|
|
int num_retries;
|
|
|
|
/** Handle for a linked list. */
|
2016-12-01 02:15:21 -08:00
|
|
|
client_object_request *next;
|
2016-10-18 18:20:59 -07:00
|
|
|
/** Pointer to the array containing the manager locations of
|
|
|
|
* this object. */
|
|
|
|
char **manager_vector;
|
|
|
|
/** The number of manager locations in the array manager_vector. */
|
|
|
|
int manager_count;
|
2016-10-28 11:56:16 -07:00
|
|
|
/** The next manager we should try to contact. This is set to an index in
|
|
|
|
* manager_vector in the retry handler, in case the current attempt fails to
|
|
|
|
* contact a manager. */
|
|
|
|
int next_manager;
|
2016-10-18 18:20:59 -07:00
|
|
|
/** Handle for the uthash table in the client connection
|
|
|
|
* context that keeps track of active object connection
|
|
|
|
* contexts. */
|
|
|
|
UT_hash_handle active_hh;
|
|
|
|
/** Handle for the uthash table in the manager state that
|
|
|
|
* keeps track of outstanding fetch requests. */
|
|
|
|
UT_hash_handle fetch_hh;
|
2016-10-03 18:29:18 -07:00
|
|
|
};
|
2016-09-05 15:34:11 -07:00
|
|
|
|
2016-10-03 18:29:18 -07:00
|
|
|
/* Context for a client connection to another plasma manager. */
|
|
|
|
struct client_connection {
|
2016-10-18 18:20:59 -07:00
|
|
|
/** Current state for this plasma manager. This is shared
|
|
|
|
* between all client connections to the plasma manager. */
|
2016-10-03 18:29:18 -07:00
|
|
|
plasma_manager_state *manager_state;
|
2016-10-18 18:20:59 -07:00
|
|
|
/** Current position in the buffer. */
|
2016-10-03 18:29:18 -07:00
|
|
|
int64_t cursor;
|
2016-10-18 18:20:59 -07:00
|
|
|
/** Buffer that this connection is reading from. If this is a connection to
|
|
|
|
* write data to another plasma store, then it is a linked
|
|
|
|
* list of buffers to write. */
|
|
|
|
/* TODO(swang): Split into two queues, data transfers and data requests. */
|
|
|
|
plasma_request_buffer *transfer_queue;
|
2016-12-01 02:15:21 -08:00
|
|
|
/** Buffer used to receive transfers (data fetches) we want to ignore */
|
|
|
|
plasma_request_buffer *ignore_buffer;
|
2016-10-18 18:20:59 -07:00
|
|
|
/** File descriptor for the socket connected to the other
|
|
|
|
* plasma manager. */
|
2016-10-03 18:29:18 -07:00
|
|
|
int fd;
|
2016-10-29 17:30:34 -07:00
|
|
|
/** Timer id for timing out wait (or fetch). */
|
|
|
|
int64_t timer_id;
|
2016-10-18 18:20:59 -07:00
|
|
|
/** The objects that we are waiting for and their callback
|
|
|
|
* contexts, for either a fetch or a wait operation. */
|
2016-12-01 02:15:21 -08:00
|
|
|
client_object_request *active_objects;
|
2016-10-18 18:20:59 -07:00
|
|
|
/** The number of objects that we have left to return for
|
|
|
|
* this fetch or wait operation. */
|
|
|
|
int num_return_objects;
|
|
|
|
/** Fields specific to connections to plasma managers. Key that uniquely
|
|
|
|
* identifies the plasma manager that we're connected to. We will use the
|
|
|
|
* string <address>:<port> as an identifier. */
|
2016-10-03 18:29:18 -07:00
|
|
|
char *ip_addr_port;
|
|
|
|
/** Handle for the uthash table. */
|
2016-10-28 11:56:16 -07:00
|
|
|
UT_hash_handle manager_hh;
|
2016-10-03 18:29:18 -07:00
|
|
|
};
|
2016-08-17 12:54:34 -07:00
|
|
|
|
2016-12-09 19:26:11 -08:00
|
|
|
object_wait_requests **object_wait_requests_table_ptr_from_type(
|
|
|
|
plasma_manager_state *manager_state,
|
|
|
|
int type) {
|
|
|
|
/* We use different types of hash tables for different requests. */
|
|
|
|
if (type == PLASMA_QUERY_LOCAL) {
|
|
|
|
return &manager_state->object_wait_requests_local;
|
|
|
|
} else if (type == PLASMA_QUERY_ANYWHERE) {
|
|
|
|
return &manager_state->object_wait_requests_remote;
|
|
|
|
} else {
|
|
|
|
LOG_FATAL("This code should be unreachable.");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void add_wait_request_for_object(plasma_manager_state *manager_state,
|
|
|
|
object_id object_id,
|
|
|
|
int type,
|
2016-12-10 21:22:05 -08:00
|
|
|
wait_request *wait_req) {
|
2016-12-09 19:26:11 -08:00
|
|
|
object_wait_requests **object_wait_requests_table_ptr =
|
|
|
|
object_wait_requests_table_ptr_from_type(manager_state, type);
|
|
|
|
object_wait_requests *object_wait_reqs;
|
|
|
|
HASH_FIND(hh, *object_wait_requests_table_ptr, &object_id, sizeof(object_id),
|
|
|
|
object_wait_reqs);
|
|
|
|
/* If there are currently no wait requests involving this object ID, create a
|
|
|
|
* new object_wait_requests struct for this object ID and add it to the hash
|
|
|
|
* table. */
|
|
|
|
if (object_wait_reqs == NULL) {
|
|
|
|
object_wait_reqs = malloc(sizeof(object_wait_requests));
|
|
|
|
object_wait_reqs->object_id = object_id;
|
2016-12-10 21:22:05 -08:00
|
|
|
utarray_new(object_wait_reqs->wait_requests, &wait_request_icd);
|
2016-12-09 19:26:11 -08:00
|
|
|
HASH_ADD(hh, *object_wait_requests_table_ptr, object_id,
|
|
|
|
sizeof(object_wait_reqs->object_id), object_wait_reqs);
|
|
|
|
}
|
|
|
|
/* Add this wait request to the vector of wait requests involving this object
|
|
|
|
* ID. */
|
|
|
|
utarray_push_back(object_wait_reqs->wait_requests, &wait_req);
|
|
|
|
}
|
|
|
|
|
|
|
|
void remove_wait_request_for_object(plasma_manager_state *manager_state,
|
|
|
|
object_id object_id,
|
|
|
|
int type,
|
2016-12-10 21:22:05 -08:00
|
|
|
wait_request *wait_req) {
|
2016-12-09 19:26:11 -08:00
|
|
|
object_wait_requests **object_wait_requests_table_ptr =
|
|
|
|
object_wait_requests_table_ptr_from_type(manager_state, type);
|
|
|
|
object_wait_requests *object_wait_reqs;
|
|
|
|
HASH_FIND(hh, *object_wait_requests_table_ptr, &object_id, sizeof(object_id),
|
|
|
|
object_wait_reqs);
|
|
|
|
/* If there is a vector of wait requests for this object ID, and if this
|
|
|
|
* vector contains the wait request, then remove the wait request from the
|
|
|
|
* vector. */
|
|
|
|
if (object_wait_reqs != NULL) {
|
|
|
|
for (int i = 0; i < utarray_len(object_wait_reqs->wait_requests); ++i) {
|
2016-12-10 21:22:05 -08:00
|
|
|
wait_request **wait_req_ptr =
|
|
|
|
(wait_request **) utarray_eltptr(object_wait_reqs->wait_requests, i);
|
2016-12-09 19:26:11 -08:00
|
|
|
if (*wait_req_ptr == wait_req) {
|
|
|
|
/* Remove the wait request from the array. */
|
|
|
|
utarray_erase(object_wait_reqs->wait_requests, i, 1);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/* In principle, if there are no more wait requests involving this object
|
2017-01-19 12:21:12 -08:00
|
|
|
* ID, then we could remove the object_wait_reqs struct. However, the
|
|
|
|
* object_wait_reqs struct gets removed in update_object_wait_requests. */
|
2016-12-09 19:26:11 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
void remove_wait_request(plasma_manager_state *manager_state,
|
|
|
|
wait_request *wait_req) {
|
2016-12-09 19:26:11 -08:00
|
|
|
if (wait_req->timer != -1) {
|
|
|
|
CHECK(event_loop_remove_timer(manager_state->loop, wait_req->timer) ==
|
|
|
|
AE_OK);
|
|
|
|
}
|
|
|
|
free(wait_req->object_requests);
|
|
|
|
free(wait_req);
|
|
|
|
}
|
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
void return_from_wait(plasma_manager_state *manager_state,
|
|
|
|
wait_request *wait_req) {
|
2016-12-09 19:26:11 -08:00
|
|
|
/* Send the reply to the client. */
|
2017-01-17 20:34:31 -08:00
|
|
|
warn_if_sigpipe(plasma_send_WaitReply(
|
|
|
|
wait_req->client_conn->fd, manager_state->builder,
|
|
|
|
wait_req->object_requests, wait_req->num_object_requests),
|
|
|
|
wait_req->client_conn->fd);
|
2016-12-09 19:26:11 -08:00
|
|
|
/* Remove the wait request from each of the relevant object_wait_requests hash
|
|
|
|
* tables if it is present there. */
|
|
|
|
for (int i = 0; i < wait_req->num_object_requests; ++i) {
|
|
|
|
remove_wait_request_for_object(manager_state,
|
|
|
|
wait_req->object_requests[i].object_id,
|
|
|
|
wait_req->object_requests[i].type, wait_req);
|
|
|
|
}
|
|
|
|
/* Remove the wait request. */
|
2016-12-10 21:22:05 -08:00
|
|
|
remove_wait_request(manager_state, wait_req);
|
2016-12-09 19:26:11 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
void update_object_wait_requests(plasma_manager_state *manager_state,
|
|
|
|
object_id obj_id,
|
|
|
|
int type,
|
|
|
|
int status) {
|
|
|
|
object_wait_requests **object_wait_requests_table_ptr =
|
|
|
|
object_wait_requests_table_ptr_from_type(manager_state, type);
|
|
|
|
/* Update the in-progress wait requests in the specified table. */
|
|
|
|
object_wait_requests *object_wait_reqs;
|
|
|
|
HASH_FIND(hh, *object_wait_requests_table_ptr, &obj_id, sizeof(obj_id),
|
|
|
|
object_wait_reqs);
|
|
|
|
if (object_wait_reqs != NULL) {
|
2017-01-19 12:21:12 -08:00
|
|
|
/* We compute the number of requests first because the length of the utarray
|
|
|
|
* will change as we iterate over it (because each call to return_from_wait
|
|
|
|
* will remove one element). */
|
|
|
|
int num_requests = utarray_len(object_wait_reqs->wait_requests);
|
|
|
|
/* The argument index is the index of the current element of the utarray
|
|
|
|
* that we are processing. It may differ from the counter i when elements
|
|
|
|
* are removed from the array. */
|
|
|
|
int index = 0;
|
|
|
|
for (int i = 0; i < num_requests; ++i) {
|
|
|
|
wait_request **wait_req_ptr = (wait_request **) utarray_eltptr(
|
|
|
|
object_wait_reqs->wait_requests, index);
|
2016-12-10 21:22:05 -08:00
|
|
|
wait_request *wait_req = *wait_req_ptr;
|
2016-12-09 19:26:11 -08:00
|
|
|
wait_req->num_satisfied += 1;
|
|
|
|
/* Mark the object as present in the wait request. */
|
|
|
|
int j = 0;
|
|
|
|
for (; j < wait_req->num_object_requests; ++j) {
|
|
|
|
if (object_ids_equal(wait_req->object_requests[j].object_id, obj_id)) {
|
|
|
|
/* Check that this object is currently nonexistent. */
|
|
|
|
CHECK(wait_req->object_requests[j].status ==
|
2016-12-20 14:46:25 -08:00
|
|
|
ObjectStatus_Nonexistent);
|
2016-12-09 19:26:11 -08:00
|
|
|
wait_req->object_requests[j].status = status;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/* Make sure that we actually marked an object as available.*/
|
|
|
|
CHECK(j != wait_req->num_object_requests);
|
|
|
|
/* If this wait request is done, reply to the client. */
|
2016-12-23 16:22:41 -08:00
|
|
|
if (wait_req->num_satisfied == wait_req->num_objects_to_wait_for) {
|
2016-12-10 21:22:05 -08:00
|
|
|
return_from_wait(manager_state, wait_req);
|
2017-01-19 12:21:12 -08:00
|
|
|
} else {
|
|
|
|
/* The call to return_from_wait will remove the current element in the
|
|
|
|
* array, so we only increment the counter in the else branch. */
|
|
|
|
index += 1;
|
2016-12-09 19:26:11 -08:00
|
|
|
}
|
|
|
|
}
|
2017-01-19 12:21:12 -08:00
|
|
|
DCHECK(index == utarray_len(object_wait_reqs->wait_requests));
|
2016-12-09 19:26:11 -08:00
|
|
|
/* Remove the array of wait requests for this object, since no one should be
|
|
|
|
* waiting for this object anymore. */
|
|
|
|
HASH_DELETE(hh, *object_wait_requests_table_ptr, object_wait_reqs);
|
|
|
|
utarray_free(object_wait_reqs->wait_requests);
|
|
|
|
free(object_wait_reqs);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
fetch_request *create_fetch_request(plasma_manager_state *manager_state,
|
|
|
|
object_id object_id) {
|
|
|
|
fetch_request *fetch_req = malloc(sizeof(fetch_request));
|
|
|
|
fetch_req->object_id = object_id;
|
|
|
|
fetch_req->manager_count = 0;
|
|
|
|
fetch_req->manager_vector = NULL;
|
|
|
|
return fetch_req;
|
|
|
|
}
|
|
|
|
|
2016-12-03 19:09:05 -08:00
|
|
|
void remove_fetch_request(plasma_manager_state *manager_state,
|
2016-12-10 21:22:05 -08:00
|
|
|
fetch_request *fetch_req) {
|
2016-12-03 19:09:05 -08:00
|
|
|
/* Remove the fetch request from the table of fetch requests. */
|
2016-12-10 21:22:05 -08:00
|
|
|
HASH_DELETE(hh, manager_state->fetch_requests, fetch_req);
|
2016-12-03 19:09:05 -08:00
|
|
|
/* Free the fetch request and everything in it. */
|
|
|
|
for (int i = 0; i < fetch_req->manager_count; ++i) {
|
|
|
|
free(fetch_req->manager_vector[i]);
|
|
|
|
}
|
|
|
|
if (fetch_req->manager_vector != NULL) {
|
|
|
|
free(fetch_req->manager_vector);
|
|
|
|
}
|
|
|
|
free(fetch_req);
|
|
|
|
}
|
|
|
|
|
2016-10-18 18:20:59 -07:00
|
|
|
plasma_manager_state *init_plasma_manager_state(const char *store_socket_name,
|
2016-12-20 20:21:35 -08:00
|
|
|
const char *manager_socket_name,
|
2016-10-18 18:20:59 -07:00
|
|
|
const char *manager_addr,
|
|
|
|
int manager_port,
|
|
|
|
const char *db_addr,
|
|
|
|
int db_port) {
|
2016-10-03 18:29:18 -07:00
|
|
|
plasma_manager_state *state = malloc(sizeof(plasma_manager_state));
|
2016-10-18 18:20:59 -07:00
|
|
|
state->loop = event_loop_create();
|
2016-11-06 17:31:14 -08:00
|
|
|
state->plasma_conn =
|
|
|
|
plasma_connect(store_socket_name, NULL, PLASMA_DEFAULT_RELEASE_DELAY);
|
2016-10-03 18:29:18 -07:00
|
|
|
state->manager_connections = NULL;
|
2016-12-01 02:15:21 -08:00
|
|
|
state->fetch_requests = NULL;
|
2016-12-09 19:26:11 -08:00
|
|
|
state->object_wait_requests_local = NULL;
|
|
|
|
state->object_wait_requests_remote = NULL;
|
2016-10-18 18:20:59 -07:00
|
|
|
if (db_addr) {
|
2016-12-20 20:21:35 -08:00
|
|
|
/* Get the manager port as a string. */
|
|
|
|
UT_string *manager_address_str;
|
|
|
|
utstring_new(manager_address_str);
|
|
|
|
utstring_printf(manager_address_str, "%s:%d", manager_addr, manager_port);
|
|
|
|
|
|
|
|
int num_args = 6;
|
|
|
|
const char **db_connect_args = malloc(sizeof(char *) * num_args);
|
|
|
|
db_connect_args[0] = "store_socket_name";
|
|
|
|
db_connect_args[1] = store_socket_name;
|
|
|
|
db_connect_args[2] = "manager_socket_name";
|
|
|
|
db_connect_args[3] = manager_socket_name;
|
|
|
|
db_connect_args[4] = "address";
|
|
|
|
db_connect_args[5] = utstring_body(manager_address_str);
|
2016-10-18 18:20:59 -07:00
|
|
|
state->db = db_connect(db_addr, db_port, "plasma_manager", manager_addr,
|
2016-12-20 20:21:35 -08:00
|
|
|
num_args, db_connect_args);
|
|
|
|
utstring_free(manager_address_str);
|
|
|
|
free(db_connect_args);
|
2016-12-05 00:26:53 -08:00
|
|
|
db_attach(state->db, state->loop, false);
|
2016-10-18 18:20:59 -07:00
|
|
|
} else {
|
|
|
|
state->db = NULL;
|
|
|
|
LOG_DEBUG("No db connection specified");
|
|
|
|
}
|
2016-12-20 14:46:25 -08:00
|
|
|
state->addr = manager_addr;
|
2016-10-18 18:20:59 -07:00
|
|
|
state->port = manager_port;
|
2016-10-29 17:30:34 -07:00
|
|
|
/* Initialize an empty hash map for the cache of local available objects. */
|
|
|
|
state->local_available_objects = NULL;
|
|
|
|
/* Subscribe to notifications about sealed objects. */
|
|
|
|
int plasma_fd = plasma_subscribe(state->plasma_conn);
|
|
|
|
/* Add the callback that processes the notification to the event loop. */
|
|
|
|
event_loop_add_file(state->loop, plasma_fd, EVENT_LOOP_READ,
|
|
|
|
process_object_notification, state);
|
2016-12-20 14:46:25 -08:00
|
|
|
state->builder = make_protocol_builder();
|
2016-10-03 18:29:18 -07:00
|
|
|
return state;
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
|
|
|
|
2016-10-28 11:56:16 -07:00
|
|
|
void destroy_plasma_manager_state(plasma_manager_state *state) {
|
|
|
|
client_connection *manager_conn, *tmp;
|
|
|
|
HASH_ITER(manager_hh, state->manager_connections, manager_conn, tmp) {
|
|
|
|
HASH_DELETE(manager_hh, state->manager_connections, manager_conn);
|
|
|
|
plasma_request_buffer *head = manager_conn->transfer_queue;
|
|
|
|
while (head) {
|
|
|
|
LL_DELETE(manager_conn->transfer_queue, head);
|
|
|
|
free(head);
|
|
|
|
head = manager_conn->transfer_queue;
|
|
|
|
}
|
|
|
|
close(manager_conn->fd);
|
|
|
|
free(manager_conn->ip_addr_port);
|
|
|
|
free(manager_conn);
|
|
|
|
}
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-12-01 02:15:21 -08:00
|
|
|
if (state->fetch_requests != NULL) {
|
2016-12-10 21:22:05 -08:00
|
|
|
fetch_request *fetch_req, *tmp;
|
|
|
|
HASH_ITER(hh, state->fetch_requests, fetch_req, tmp) {
|
2017-02-01 12:21:52 -08:00
|
|
|
remove_fetch_request(state, fetch_req);
|
2016-12-03 19:09:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-06 17:31:14 -08:00
|
|
|
plasma_disconnect(state->plasma_conn);
|
2016-10-28 11:56:16 -07:00
|
|
|
event_loop_destroy(state->loop);
|
2016-12-20 14:46:25 -08:00
|
|
|
free_protocol_builder(state->builder);
|
2016-10-28 11:56:16 -07:00
|
|
|
free(state);
|
|
|
|
}
|
|
|
|
|
|
|
|
event_loop *get_event_loop(plasma_manager_state *state) {
|
|
|
|
return state->loop;
|
|
|
|
}
|
|
|
|
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Handle a command request that came in through a socket (transfering data,
|
|
|
|
* or accepting incoming data). */
|
2016-10-03 18:29:18 -07:00
|
|
|
void process_message(event_loop *loop,
|
|
|
|
int client_sock,
|
|
|
|
void *context,
|
|
|
|
int events);
|
|
|
|
|
2016-10-18 18:20:59 -07:00
|
|
|
void write_object_chunk(client_connection *conn, plasma_request_buffer *buf) {
|
2016-10-28 11:56:16 -07:00
|
|
|
LOG_DEBUG("Writing data to fd %d", conn->fd);
|
2016-08-17 12:54:34 -07:00
|
|
|
ssize_t r, s;
|
2016-10-03 18:29:18 -07:00
|
|
|
/* Try to write one BUFSIZE at a time. */
|
|
|
|
s = buf->data_size + buf->metadata_size - conn->cursor;
|
|
|
|
if (s > BUFSIZE)
|
|
|
|
s = BUFSIZE;
|
|
|
|
r = write(conn->fd, buf->data + conn->cursor, s);
|
|
|
|
|
|
|
|
if (r != s) {
|
|
|
|
if (r > 0) {
|
2016-11-15 20:33:29 -08:00
|
|
|
LOG_ERROR("partial write on fd %d", conn->fd);
|
2016-09-08 15:28:27 -07:00
|
|
|
} else {
|
2016-11-10 18:13:26 -08:00
|
|
|
/* TODO(swang): This should not be a fatal error, since connections can
|
|
|
|
* close at any time. */
|
|
|
|
LOG_FATAL("write error");
|
2016-09-08 15:28:27 -07:00
|
|
|
}
|
2016-10-03 18:29:18 -07:00
|
|
|
} else {
|
|
|
|
conn->cursor += r;
|
|
|
|
}
|
|
|
|
if (r == 0) {
|
2016-10-18 18:20:59 -07:00
|
|
|
/* If we've finished writing this buffer, reset the cursor to zero. */
|
|
|
|
LOG_DEBUG("writing on channel %d finished", conn->fd);
|
2016-10-03 18:29:18 -07:00
|
|
|
conn->cursor = 0;
|
2016-10-21 00:47:34 -07:00
|
|
|
/* We are done sending the object, so release it. The corresponding call to
|
|
|
|
* plasma_get occurred in process_transfer_request. */
|
|
|
|
plasma_release(conn->manager_state->plasma_conn, buf->object_id);
|
2016-10-18 18:20:59 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void send_queued_request(event_loop *loop,
|
|
|
|
int data_sock,
|
|
|
|
void *context,
|
|
|
|
int events) {
|
|
|
|
client_connection *conn = (client_connection *) context;
|
2016-12-20 14:46:25 -08:00
|
|
|
plasma_manager_state *state = conn->manager_state;
|
|
|
|
|
2016-10-18 18:20:59 -07:00
|
|
|
if (conn->transfer_queue == NULL) {
|
|
|
|
/* If there are no objects to transfer, temporarily remove this connection
|
|
|
|
* from the event loop. It will be reawoken when we receive another
|
2016-12-20 14:46:25 -08:00
|
|
|
* data request. */
|
2016-10-18 18:20:59 -07:00
|
|
|
event_loop_remove_file(loop, conn->fd);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
plasma_request_buffer *buf = conn->transfer_queue;
|
|
|
|
switch (buf->type) {
|
2016-12-20 14:46:25 -08:00
|
|
|
case MessageType_PlasmaDataRequest:
|
2017-01-17 20:34:31 -08:00
|
|
|
warn_if_sigpipe(
|
|
|
|
plasma_send_DataRequest(conn->fd, state->builder, buf->object_id,
|
|
|
|
state->addr, state->port),
|
|
|
|
conn->fd);
|
2016-10-18 18:20:59 -07:00
|
|
|
break;
|
2016-12-20 14:46:25 -08:00
|
|
|
case MessageType_PlasmaDataReply:
|
2016-10-18 18:20:59 -07:00
|
|
|
LOG_DEBUG("Transferring object to manager");
|
|
|
|
if (conn->cursor == 0) {
|
|
|
|
/* If the cursor is zero, we haven't sent any requests for this object
|
2016-12-20 14:46:25 -08:00
|
|
|
* yet, so send the initial data request. */
|
2017-01-17 20:34:31 -08:00
|
|
|
warn_if_sigpipe(
|
|
|
|
plasma_send_DataReply(conn->fd, state->builder, buf->object_id,
|
|
|
|
buf->data_size, buf->metadata_size),
|
|
|
|
conn->fd);
|
2016-10-18 18:20:59 -07:00
|
|
|
}
|
|
|
|
write_object_chunk(conn, buf);
|
|
|
|
break;
|
|
|
|
default:
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL("Buffered request has unknown type.");
|
2016-10-18 18:20:59 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/* We are done sending this request. */
|
|
|
|
if (conn->cursor == 0) {
|
2016-10-03 18:29:18 -07:00
|
|
|
LL_DELETE(conn->transfer_queue, buf);
|
|
|
|
free(buf);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-28 11:56:16 -07:00
|
|
|
int read_object_chunk(client_connection *conn, plasma_request_buffer *buf) {
|
|
|
|
LOG_DEBUG("Reading data from fd %d to %p", conn->fd,
|
|
|
|
buf->data + conn->cursor);
|
2016-10-03 18:29:18 -07:00
|
|
|
ssize_t r, s;
|
|
|
|
CHECK(buf != NULL);
|
|
|
|
/* Try to read one BUFSIZE at a time. */
|
|
|
|
s = buf->data_size + buf->metadata_size - conn->cursor;
|
|
|
|
if (s > BUFSIZE) {
|
|
|
|
s = BUFSIZE;
|
|
|
|
}
|
2016-10-28 11:56:16 -07:00
|
|
|
r = read(conn->fd, buf->data + conn->cursor, s);
|
2016-10-03 18:29:18 -07:00
|
|
|
|
|
|
|
if (r == -1) {
|
2016-11-15 20:33:29 -08:00
|
|
|
LOG_ERROR("read error");
|
2016-10-03 18:29:18 -07:00
|
|
|
} else if (r == 0) {
|
|
|
|
LOG_DEBUG("end of file");
|
|
|
|
} else {
|
|
|
|
conn->cursor += r;
|
|
|
|
}
|
2016-10-28 11:56:16 -07:00
|
|
|
/* If the cursor is equal to the full object size, reset the cursor and we're
|
|
|
|
* done. */
|
|
|
|
if (conn->cursor == buf->data_size + buf->metadata_size) {
|
|
|
|
conn->cursor = 0;
|
|
|
|
return 1;
|
|
|
|
} else {
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
2016-10-18 18:20:59 -07:00
|
|
|
|
2016-10-28 11:56:16 -07:00
|
|
|
void process_data_chunk(event_loop *loop,
|
|
|
|
int data_sock,
|
|
|
|
void *context,
|
|
|
|
int events) {
|
|
|
|
/* Read the object chunk. */
|
|
|
|
client_connection *conn = (client_connection *) context;
|
|
|
|
plasma_request_buffer *buf = conn->transfer_queue;
|
|
|
|
int done = read_object_chunk(conn, buf);
|
|
|
|
if (!done) {
|
2016-10-18 18:20:59 -07:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2016-10-21 00:47:34 -07:00
|
|
|
/* Seal the object and release it. The release corresponds to the call to
|
|
|
|
* plasma_create that occurred in process_data_request. */
|
2016-10-18 18:20:59 -07:00
|
|
|
LOG_DEBUG("reading on channel %d finished", data_sock);
|
2016-10-29 17:30:34 -07:00
|
|
|
/* The following seal also triggers notification of clients for fetch or
|
|
|
|
* wait requests, see process_object_notification. */
|
2016-10-18 18:20:59 -07:00
|
|
|
plasma_seal(conn->manager_state->plasma_conn, buf->object_id);
|
2016-10-21 00:47:34 -07:00
|
|
|
plasma_release(conn->manager_state->plasma_conn, buf->object_id);
|
2016-10-18 18:20:59 -07:00
|
|
|
/* Remove the request buffer used for reading this object's data. */
|
|
|
|
LL_DELETE(conn->transfer_queue, buf);
|
|
|
|
free(buf);
|
|
|
|
/* Switch to listening for requests from this socket, instead of reading
|
|
|
|
* object data. */
|
|
|
|
event_loop_remove_file(loop, data_sock);
|
|
|
|
event_loop_add_file(loop, data_sock, EVENT_LOOP_READ, process_message, conn);
|
2016-10-03 18:29:18 -07:00
|
|
|
}
|
|
|
|
|
2016-12-01 02:15:21 -08:00
|
|
|
void ignore_data_chunk(event_loop *loop,
|
|
|
|
int data_sock,
|
|
|
|
void *context,
|
|
|
|
int events) {
|
|
|
|
/* Read the object chunk. */
|
|
|
|
client_connection *conn = (client_connection *) context;
|
|
|
|
plasma_request_buffer *buf = conn->ignore_buffer;
|
|
|
|
|
|
|
|
/* Just read the transferred data into ignore_buf and then drop (free) it. */
|
|
|
|
int done = read_object_chunk(conn, buf);
|
|
|
|
if (!done) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
free(buf->data);
|
|
|
|
free(buf);
|
|
|
|
/* Switch to listening for requests from this socket, instead of reading
|
|
|
|
* object data. */
|
|
|
|
event_loop_remove_file(loop, data_sock);
|
|
|
|
event_loop_add_file(loop, data_sock, EVENT_LOOP_READ, process_message, conn);
|
|
|
|
}
|
|
|
|
|
2016-10-18 18:20:59 -07:00
|
|
|
client_connection *get_manager_connection(plasma_manager_state *state,
|
|
|
|
const char *ip_addr,
|
|
|
|
int port) {
|
|
|
|
/* TODO(swang): Should probably check whether ip_addr and port belong to us.
|
|
|
|
*/
|
|
|
|
UT_string *ip_addr_port;
|
|
|
|
utstring_new(ip_addr_port);
|
|
|
|
utstring_printf(ip_addr_port, "%s:%d", ip_addr, port);
|
|
|
|
client_connection *manager_conn;
|
2016-10-28 11:56:16 -07:00
|
|
|
HASH_FIND(manager_hh, state->manager_connections, utstring_body(ip_addr_port),
|
|
|
|
utstring_len(ip_addr_port), manager_conn);
|
2016-10-18 18:20:59 -07:00
|
|
|
if (!manager_conn) {
|
|
|
|
/* If we don't already have a connection to this manager, start one. */
|
2017-02-17 23:41:21 -08:00
|
|
|
int fd = connect_inet_sock_retry(ip_addr, port, -1, -1);
|
2016-10-28 11:56:16 -07:00
|
|
|
/* TODO(swang): Handle the case when connection to this manager was
|
|
|
|
* unsuccessful. */
|
|
|
|
CHECK(fd >= 0);
|
2016-10-18 18:20:59 -07:00
|
|
|
manager_conn = malloc(sizeof(client_connection));
|
2016-10-28 11:56:16 -07:00
|
|
|
manager_conn->fd = fd;
|
2016-10-18 18:20:59 -07:00
|
|
|
manager_conn->manager_state = state;
|
|
|
|
manager_conn->transfer_queue = NULL;
|
|
|
|
manager_conn->cursor = 0;
|
|
|
|
manager_conn->ip_addr_port = strdup(utstring_body(ip_addr_port));
|
2016-10-28 11:56:16 -07:00
|
|
|
HASH_ADD_KEYPTR(manager_hh,
|
|
|
|
manager_conn->manager_state->manager_connections,
|
2016-10-18 18:20:59 -07:00
|
|
|
manager_conn->ip_addr_port,
|
|
|
|
strlen(manager_conn->ip_addr_port), manager_conn);
|
|
|
|
}
|
|
|
|
utstring_free(ip_addr_port);
|
|
|
|
return manager_conn;
|
|
|
|
}
|
|
|
|
|
|
|
|
void process_transfer_request(event_loop *loop,
|
2017-01-19 12:21:12 -08:00
|
|
|
object_id obj_id,
|
2016-12-20 14:46:25 -08:00
|
|
|
const char *addr,
|
2016-10-18 18:20:59 -07:00
|
|
|
int port,
|
|
|
|
client_connection *conn) {
|
2016-10-03 18:29:18 -07:00
|
|
|
uint8_t *data;
|
|
|
|
int64_t data_size;
|
|
|
|
uint8_t *metadata;
|
|
|
|
int64_t metadata_size;
|
2016-10-18 18:20:59 -07:00
|
|
|
/* TODO(swang): A non-blocking plasma_get, or else we could block here
|
|
|
|
* forever if we don't end up sealing this object. */
|
2016-10-21 00:47:34 -07:00
|
|
|
/* The corresponding call to plasma_release will happen in
|
|
|
|
* write_object_chunk. */
|
2016-12-27 19:51:26 -08:00
|
|
|
/* TODO(rkn): The manager currently will block here if the object is not
|
|
|
|
* present in the store. This is completely unacceptable. The manager should
|
|
|
|
* do a non-blocking get call on the store, and if the object isn't there then
|
|
|
|
* perhaps the manager should initiate the transfer when it receives a
|
|
|
|
* notification from the store that the object is present. */
|
2017-01-19 12:21:12 -08:00
|
|
|
object_buffer obj_buffer;
|
2016-12-27 19:51:26 -08:00
|
|
|
int counter = 0;
|
|
|
|
do {
|
2017-01-19 12:21:12 -08:00
|
|
|
/* We pass in 0 to indicate that the command should return immediately. */
|
|
|
|
object_id obj_id_array[1] = {obj_id};
|
|
|
|
plasma_get(conn->manager_state->plasma_conn, obj_id_array, 1, 0,
|
|
|
|
&obj_buffer);
|
2016-12-27 19:51:26 -08:00
|
|
|
if (counter > 0) {
|
|
|
|
LOG_WARN("Blocking in the plasma manager.");
|
|
|
|
}
|
|
|
|
counter += 1;
|
2017-01-19 12:21:12 -08:00
|
|
|
} while (obj_buffer.data_size == -1);
|
|
|
|
DCHECK(obj_buffer.metadata == obj_buffer.data + obj_buffer.data_size);
|
2016-10-18 18:20:59 -07:00
|
|
|
plasma_request_buffer *buf = malloc(sizeof(plasma_request_buffer));
|
2016-12-20 14:46:25 -08:00
|
|
|
buf->type = MessageType_PlasmaDataReply;
|
2017-01-19 12:21:12 -08:00
|
|
|
buf->object_id = obj_id;
|
|
|
|
/* We treat buf->data as a pointer to the concatenated data and metadata, so
|
|
|
|
* we don't actually use buf->metadata. */
|
|
|
|
buf->data = obj_buffer.data;
|
|
|
|
buf->data_size = obj_buffer.data_size;
|
|
|
|
buf->metadata_size = obj_buffer.metadata_size;
|
2016-10-03 18:29:18 -07:00
|
|
|
|
2016-10-18 18:20:59 -07:00
|
|
|
client_connection *manager_conn =
|
2016-12-20 14:46:25 -08:00
|
|
|
get_manager_connection(conn->manager_state, addr, port);
|
2016-10-03 18:29:18 -07:00
|
|
|
|
|
|
|
if (manager_conn->transfer_queue == NULL) {
|
|
|
|
/* If we already have a connection to this manager and its inactive,
|
|
|
|
* (re)register it with the event loop again. */
|
|
|
|
event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE,
|
2016-10-18 18:20:59 -07:00
|
|
|
send_queued_request, manager_conn);
|
2016-10-03 18:29:18 -07:00
|
|
|
}
|
2017-01-06 22:14:51 -08:00
|
|
|
/* Add this transfer request to this connection's transfer queue if there
|
|
|
|
* isn't already a request with the same object ID. */
|
|
|
|
plasma_request_buffer *pending;
|
|
|
|
LL_FOREACH(manager_conn->transfer_queue, pending) {
|
|
|
|
if (object_ids_equal(pending->object_id, buf->object_id)) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
2016-10-03 18:29:18 -07:00
|
|
|
LL_APPEND(manager_conn->transfer_queue, buf);
|
|
|
|
}
|
|
|
|
|
2016-12-01 02:15:21 -08:00
|
|
|
/**
|
|
|
|
* Receive object_id requested by this Plamsa Manager from the remote Plasma
|
2016-12-20 14:46:25 -08:00
|
|
|
* Manager identified by client_sock. The object_id is sent via the data requst
|
2016-12-01 02:15:21 -08:00
|
|
|
* message.
|
|
|
|
*
|
|
|
|
* @param loop The event data structure.
|
|
|
|
* @param client_sock The sender's socket.
|
|
|
|
* @param object_id ID of the object being received.
|
|
|
|
* @param data_size Size of the data of object_id.
|
|
|
|
* @param metadata_size Size of the metadata of object_id.
|
|
|
|
* @param conn The connection object.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
2016-10-18 18:20:59 -07:00
|
|
|
void process_data_request(event_loop *loop,
|
|
|
|
int client_sock,
|
|
|
|
object_id object_id,
|
|
|
|
int64_t data_size,
|
|
|
|
int64_t metadata_size,
|
|
|
|
client_connection *conn) {
|
|
|
|
plasma_request_buffer *buf = malloc(sizeof(plasma_request_buffer));
|
2016-10-03 18:29:18 -07:00
|
|
|
buf->object_id = object_id;
|
|
|
|
buf->data_size = data_size;
|
|
|
|
buf->metadata_size = metadata_size;
|
|
|
|
|
2016-10-21 00:47:34 -07:00
|
|
|
/* The corresponding call to plasma_release should happen in
|
|
|
|
* process_data_chunk. */
|
2016-12-28 11:56:16 -08:00
|
|
|
int error_code = plasma_create(conn->manager_state->plasma_conn, object_id,
|
|
|
|
data_size, NULL, metadata_size, &(buf->data));
|
2016-12-01 02:15:21 -08:00
|
|
|
/* If success_create == true, a new object has been created.
|
|
|
|
* If success_create == false the object creation has failed, possibly
|
|
|
|
* due to an object with the same ID already existing in the Plasma Store. */
|
2016-12-28 11:56:16 -08:00
|
|
|
if (error_code == PlasmaError_OK) {
|
2016-12-01 02:15:21 -08:00
|
|
|
/* Add buffer where the fetched data is to be stored to
|
|
|
|
* conn->transfer_queue. */
|
|
|
|
LL_APPEND(conn->transfer_queue, buf);
|
|
|
|
}
|
2016-10-28 11:56:16 -07:00
|
|
|
CHECK(conn->cursor == 0);
|
2016-10-03 18:29:18 -07:00
|
|
|
|
|
|
|
/* Switch to reading the data from this socket, instead of listening for
|
|
|
|
* other requests. */
|
|
|
|
event_loop_remove_file(loop, client_sock);
|
2016-12-28 11:56:16 -08:00
|
|
|
if (error_code == PlasmaError_OK) {
|
2016-12-01 02:15:21 -08:00
|
|
|
event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, process_data_chunk,
|
|
|
|
conn);
|
|
|
|
} else {
|
|
|
|
/* Since plasma_create() has failed, we ignore the data transfer. We will
|
|
|
|
* receive this transfer in g_ignore_buf and then drop it. Allocate memory
|
|
|
|
* for data and metadata, if needed. All memory associated with
|
|
|
|
* buf/g_ignore_buf will be freed in ignore_data_chunkc(). */
|
|
|
|
conn->ignore_buffer = buf;
|
2016-12-03 19:09:05 -08:00
|
|
|
buf->data = (uint8_t *) malloc(buf->data_size + buf->metadata_size);
|
2016-12-01 02:15:21 -08:00
|
|
|
event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, ignore_data_chunk,
|
|
|
|
conn);
|
|
|
|
}
|
2016-10-03 18:29:18 -07:00
|
|
|
}
|
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
void request_transfer_from(plasma_manager_state *manager_state,
|
2016-10-18 18:20:59 -07:00
|
|
|
object_id object_id) {
|
2016-12-10 21:22:05 -08:00
|
|
|
fetch_request *fetch_req;
|
|
|
|
HASH_FIND(hh, manager_state->fetch_requests, &object_id, sizeof(object_id),
|
2016-12-03 19:09:05 -08:00
|
|
|
fetch_req);
|
|
|
|
/* TODO(rkn): This probably can be NULL so we should remove this check, and
|
|
|
|
* instead return in the case where there is no fetch request. */
|
|
|
|
CHECK(fetch_req != NULL);
|
|
|
|
|
|
|
|
CHECK(fetch_req->manager_count > 0);
|
|
|
|
CHECK(fetch_req->next_manager >= 0 &&
|
|
|
|
fetch_req->next_manager < fetch_req->manager_count);
|
|
|
|
char addr[16];
|
|
|
|
int port;
|
|
|
|
parse_ip_addr_port(fetch_req->manager_vector[fetch_req->next_manager], addr,
|
|
|
|
&port);
|
|
|
|
|
|
|
|
client_connection *manager_conn =
|
|
|
|
get_manager_connection(manager_state, addr, port);
|
2016-12-18 18:19:02 -08:00
|
|
|
|
|
|
|
/* Check that this manager isn't trying to request an object from itself.
|
|
|
|
* TODO(rkn): Later this should not be fatal. */
|
|
|
|
uint8_t temp_addr[4];
|
|
|
|
sscanf(addr, "%hhu.%hhu.%hhu.%hhu", &temp_addr[0], &temp_addr[1],
|
|
|
|
&temp_addr[2], &temp_addr[3]);
|
|
|
|
if (memcmp(temp_addr, manager_state->addr, 4) == 0 &&
|
|
|
|
port == manager_state->port) {
|
|
|
|
LOG_FATAL("This manager is attempting to request a transfer from itself.");
|
|
|
|
}
|
|
|
|
|
2016-12-03 19:09:05 -08:00
|
|
|
plasma_request_buffer *transfer_request =
|
|
|
|
malloc(sizeof(plasma_request_buffer));
|
2016-12-20 14:46:25 -08:00
|
|
|
transfer_request->type = MessageType_PlasmaDataRequest;
|
2016-12-03 19:09:05 -08:00
|
|
|
transfer_request->object_id = fetch_req->object_id;
|
|
|
|
|
|
|
|
if (manager_conn->transfer_queue == NULL) {
|
|
|
|
/* If we already have a connection to this manager and its inactive,
|
|
|
|
* (re)register it with the event loop. */
|
|
|
|
event_loop_add_file(manager_state->loop, manager_conn->fd, EVENT_LOOP_WRITE,
|
|
|
|
send_queued_request, manager_conn);
|
|
|
|
}
|
|
|
|
/* Add this transfer request to this connection's transfer queue. */
|
|
|
|
LL_APPEND(manager_conn->transfer_queue, transfer_request);
|
|
|
|
/* On the next attempt, try the next manager in manager_vector. */
|
|
|
|
fetch_req->next_manager += 1;
|
|
|
|
fetch_req->next_manager %= fetch_req->manager_count;
|
|
|
|
}
|
|
|
|
|
2017-02-01 12:21:52 -08:00
|
|
|
int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) {
|
|
|
|
plasma_manager_state *manager_state = context;
|
|
|
|
/* Loop over the fetch requests and reissue the requests. */
|
|
|
|
fetch_request *fetch_req, *tmp;
|
|
|
|
HASH_ITER(hh, manager_state->fetch_requests, fetch_req, tmp) {
|
|
|
|
if (fetch_req->manager_count > 0) {
|
|
|
|
request_transfer_from(manager_state, fetch_req->object_id);
|
|
|
|
}
|
|
|
|
}
|
2016-12-06 15:47:31 -08:00
|
|
|
return MANAGER_TIMEOUT;
|
2016-12-03 19:09:05 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
bool is_object_local(plasma_manager_state *state, object_id object_id) {
|
|
|
|
available_object *entry;
|
|
|
|
HASH_FIND(hh, state->local_available_objects, &object_id, sizeof(object_id),
|
|
|
|
entry);
|
|
|
|
return entry != NULL;
|
|
|
|
}
|
|
|
|
|
2016-10-18 18:20:59 -07:00
|
|
|
void request_transfer(object_id object_id,
|
|
|
|
int manager_count,
|
|
|
|
const char *manager_vector[],
|
|
|
|
void *context) {
|
2016-12-03 19:09:05 -08:00
|
|
|
plasma_manager_state *manager_state = (plasma_manager_state *) context;
|
2016-12-06 15:47:31 -08:00
|
|
|
/* This callback is called from object_table_subscribe, which guarantees that
|
|
|
|
* the manager vector contains at least one element. */
|
|
|
|
CHECK(manager_count >= 1);
|
2016-12-10 21:22:05 -08:00
|
|
|
fetch_request *fetch_req;
|
|
|
|
HASH_FIND(hh, manager_state->fetch_requests, &object_id, sizeof(object_id),
|
2016-12-03 19:09:05 -08:00
|
|
|
fetch_req);
|
|
|
|
|
|
|
|
if (is_object_local(manager_state, object_id)) {
|
|
|
|
/* If the object is already here, then the fetch request should have been
|
|
|
|
* removed. */
|
|
|
|
CHECK(fetch_req == NULL);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* If the object is not present, then the fetch request should still be here.
|
|
|
|
* TODO(rkn): We actually have to remove this check to handle the rare
|
|
|
|
* scenario where the object is transferred here and then evicted before this
|
|
|
|
* callback gets called. */
|
|
|
|
CHECK(fetch_req != NULL);
|
|
|
|
|
2016-12-06 15:47:31 -08:00
|
|
|
/* This method may be run multiple times, so if we are updating the manager
|
|
|
|
* vector, we need to free the previous manager vector. */
|
|
|
|
if (fetch_req->manager_count != 0) {
|
|
|
|
for (int i = 0; i < fetch_req->manager_count; ++i) {
|
|
|
|
free(fetch_req->manager_vector[i]);
|
|
|
|
}
|
|
|
|
free(fetch_req->manager_vector);
|
2016-12-03 19:09:05 -08:00
|
|
|
}
|
2016-12-06 15:47:31 -08:00
|
|
|
/* Update the manager vector. */
|
2016-12-03 19:09:05 -08:00
|
|
|
fetch_req->manager_count = manager_count;
|
|
|
|
fetch_req->manager_vector = malloc(manager_count * sizeof(char *));
|
|
|
|
fetch_req->next_manager = 0;
|
|
|
|
memset(fetch_req->manager_vector, 0, manager_count * sizeof(char *));
|
|
|
|
for (int i = 0; i < manager_count; ++i) {
|
|
|
|
int len = strlen(manager_vector[i]);
|
|
|
|
fetch_req->manager_vector[i] = malloc(len + 1);
|
|
|
|
strncpy(fetch_req->manager_vector[i], manager_vector[i], len);
|
|
|
|
fetch_req->manager_vector[i][len] = '\0';
|
|
|
|
}
|
|
|
|
/* Wait for the object data for the default number of retries, which timeout
|
|
|
|
* after a default interval. */
|
2016-12-10 21:22:05 -08:00
|
|
|
request_transfer_from(manager_state, object_id);
|
2016-10-18 18:20:59 -07:00
|
|
|
}
|
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
/* This method is only called from the tests. */
|
|
|
|
void call_request_transfer(object_id object_id,
|
|
|
|
int manager_count,
|
|
|
|
const char *manager_vector[],
|
|
|
|
void *context) {
|
|
|
|
plasma_manager_state *manager_state = (plasma_manager_state *) context;
|
|
|
|
fetch_request *fetch_req;
|
|
|
|
/* Check that there isn't already a fetch request for this object. */
|
|
|
|
HASH_FIND(hh, manager_state->fetch_requests, &object_id, sizeof(object_id),
|
|
|
|
fetch_req);
|
|
|
|
CHECK(fetch_req == NULL);
|
|
|
|
/* Create a fetch request. */
|
|
|
|
fetch_req = create_fetch_request(manager_state, object_id);
|
|
|
|
HASH_ADD(hh, manager_state->fetch_requests, object_id,
|
|
|
|
sizeof(fetch_req->object_id), fetch_req);
|
|
|
|
request_transfer(object_id, manager_count, manager_vector, context);
|
2016-10-18 18:20:59 -07:00
|
|
|
}
|
|
|
|
|
2016-12-03 19:09:05 -08:00
|
|
|
void fatal_table_callback(object_id id, void *user_context, void *user_data) {
|
|
|
|
CHECK(0);
|
|
|
|
}
|
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
void object_present_callback(object_id object_id,
|
|
|
|
int manager_count,
|
|
|
|
const char *manager_vector[],
|
|
|
|
void *context) {
|
|
|
|
plasma_manager_state *manager_state = (plasma_manager_state *) context;
|
|
|
|
/* This callback is called from object_table_subscribe, which guarantees that
|
|
|
|
* the manager vector contains at least one element. */
|
|
|
|
CHECK(manager_count >= 1);
|
|
|
|
|
|
|
|
/* Update the in-progress remote wait requests. */
|
|
|
|
update_object_wait_requests(manager_state, object_id, PLASMA_QUERY_ANYWHERE,
|
2016-12-20 14:46:25 -08:00
|
|
|
ObjectStatus_Remote);
|
2016-12-10 21:22:05 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/* This callback is used by both fetch and wait. Therefore, it may have to
|
|
|
|
* handle outstanding fetch and wait requests. */
|
|
|
|
void object_table_subscribe_callback(object_id object_id,
|
2016-12-19 21:07:25 -08:00
|
|
|
int64_t data_size,
|
2016-12-10 21:22:05 -08:00
|
|
|
int manager_count,
|
|
|
|
const char *manager_vector[],
|
|
|
|
void *context) {
|
|
|
|
plasma_manager_state *manager_state = (plasma_manager_state *) context;
|
|
|
|
/* Run the callback for fetch requests if there is a fetch request. */
|
|
|
|
fetch_request *fetch_req;
|
|
|
|
HASH_FIND(hh, manager_state->fetch_requests, &object_id, sizeof(object_id),
|
|
|
|
fetch_req);
|
|
|
|
if (fetch_req != NULL) {
|
|
|
|
request_transfer(object_id, manager_count, manager_vector, context);
|
|
|
|
}
|
|
|
|
/* Run the callback for wait requests. */
|
|
|
|
object_present_callback(object_id, manager_count, manager_vector, context);
|
|
|
|
}
|
|
|
|
|
|
|
|
void process_fetch_requests(client_connection *client_conn,
|
|
|
|
int num_object_ids,
|
2016-12-20 14:46:25 -08:00
|
|
|
object_id object_ids[]) {
|
2016-12-03 19:09:05 -08:00
|
|
|
plasma_manager_state *manager_state = client_conn->manager_state;
|
2016-12-18 18:19:02 -08:00
|
|
|
|
|
|
|
int num_object_ids_to_request = 0;
|
|
|
|
/* This is allocating more space than necessary, but we do not know the exact
|
|
|
|
* number of object IDs to request notifications for yet. */
|
|
|
|
object_id *object_ids_to_request = malloc(num_object_ids * sizeof(object_id));
|
|
|
|
|
2016-12-03 19:09:05 -08:00
|
|
|
for (int i = 0; i < num_object_ids; ++i) {
|
2016-12-20 14:46:25 -08:00
|
|
|
object_id obj_id = object_ids[i];
|
2016-12-03 19:09:05 -08:00
|
|
|
|
|
|
|
/* Check if this object is already present locally. If so, do nothing. */
|
|
|
|
if (is_object_local(manager_state, obj_id)) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Check if this object is already being fetched. If so, do nothing. */
|
2016-12-10 21:22:05 -08:00
|
|
|
fetch_request *entry;
|
|
|
|
HASH_FIND(hh, manager_state->fetch_requests, &obj_id, sizeof(obj_id),
|
2016-12-03 19:09:05 -08:00
|
|
|
entry);
|
|
|
|
if (entry != NULL) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Add an entry to the fetch requests data structure to indidate that the
|
|
|
|
* object is being fetched. */
|
2016-12-10 21:22:05 -08:00
|
|
|
entry = create_fetch_request(manager_state, obj_id);
|
|
|
|
HASH_ADD(hh, manager_state->fetch_requests, object_id,
|
2016-12-03 19:09:05 -08:00
|
|
|
sizeof(entry->object_id), entry);
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Add this object ID to the list of object IDs to request notifications for
|
|
|
|
* from the object table. */
|
|
|
|
object_ids_to_request[num_object_ids_to_request] = obj_id;
|
|
|
|
num_object_ids_to_request += 1;
|
|
|
|
}
|
|
|
|
if (num_object_ids_to_request > 0) {
|
|
|
|
/* Request notifications from the object table when these object IDs become
|
|
|
|
* available. The notifications will call the callback that was passed to
|
|
|
|
* object_table_subscribe_to_notifications, which will initiate a transfer
|
|
|
|
* of the object to this plasma manager. */
|
2016-12-03 19:09:05 -08:00
|
|
|
retry_info retry;
|
|
|
|
memset(&retry, 0, sizeof(retry));
|
2016-12-09 19:26:11 -08:00
|
|
|
retry.num_retries = 0;
|
2016-12-03 19:09:05 -08:00
|
|
|
retry.timeout = MANAGER_TIMEOUT;
|
|
|
|
retry.fail_callback = fatal_table_callback;
|
2016-12-18 18:19:02 -08:00
|
|
|
object_table_request_notifications(manager_state->db,
|
|
|
|
num_object_ids_to_request,
|
|
|
|
object_ids_to_request, &retry);
|
2016-10-29 17:30:34 -07:00
|
|
|
}
|
2016-12-18 18:19:02 -08:00
|
|
|
free(object_ids_to_request);
|
2016-10-29 17:30:34 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
int wait_timeout_handler(event_loop *loop, timer_id id, void *context) {
|
2016-12-10 21:22:05 -08:00
|
|
|
wait_request *wait_req = context;
|
|
|
|
return_from_wait(wait_req->client_conn->manager_state, wait_req);
|
2016-10-29 17:30:34 -07:00
|
|
|
return EVENT_LOOP_TIMER_DONE;
|
|
|
|
}
|
|
|
|
|
|
|
|
void process_wait_request(client_connection *client_conn,
|
2016-12-10 21:22:05 -08:00
|
|
|
int num_object_requests,
|
2016-12-01 02:15:21 -08:00
|
|
|
object_request object_requests[],
|
2016-12-10 21:22:05 -08:00
|
|
|
uint64_t timeout_ms,
|
2016-12-01 02:15:21 -08:00
|
|
|
int num_ready_objects) {
|
2016-12-09 19:26:11 -08:00
|
|
|
CHECK(client_conn != NULL);
|
|
|
|
plasma_manager_state *manager_state = client_conn->manager_state;
|
|
|
|
|
|
|
|
/* Create a wait request for this object. */
|
2016-12-10 21:22:05 -08:00
|
|
|
wait_request *wait_req = malloc(sizeof(wait_request));
|
|
|
|
memset(wait_req, 0, sizeof(wait_request));
|
2016-12-09 19:26:11 -08:00
|
|
|
wait_req->client_conn = client_conn;
|
|
|
|
wait_req->timer = -1;
|
|
|
|
wait_req->num_object_requests = num_object_requests;
|
|
|
|
wait_req->object_requests =
|
|
|
|
malloc(num_object_requests * sizeof(object_request));
|
|
|
|
for (int i = 0; i < num_object_requests; ++i) {
|
|
|
|
wait_req->object_requests[i].object_id = object_requests[i].object_id;
|
|
|
|
wait_req->object_requests[i].type = object_requests[i].type;
|
2016-12-20 14:46:25 -08:00
|
|
|
wait_req->object_requests[i].status = ObjectStatus_Nonexistent;
|
2016-12-09 19:26:11 -08:00
|
|
|
}
|
|
|
|
wait_req->num_objects_to_wait_for = num_ready_objects;
|
|
|
|
wait_req->num_satisfied = 0;
|
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
int num_object_ids_to_request = 0;
|
|
|
|
/* This is allocating more space than necessary, but we do not know the exact
|
|
|
|
* number of object IDs to request notifications for yet. */
|
|
|
|
object_id *object_ids_to_request =
|
|
|
|
malloc(num_object_requests * sizeof(object_id));
|
|
|
|
|
2016-12-09 19:26:11 -08:00
|
|
|
for (int i = 0; i < num_object_requests; ++i) {
|
|
|
|
object_id obj_id = object_requests[i].object_id;
|
|
|
|
|
|
|
|
/* Check if this object is already present locally. If so, mark the object
|
|
|
|
* as present. */
|
|
|
|
if (is_object_local(manager_state, obj_id)) {
|
2016-12-20 14:46:25 -08:00
|
|
|
wait_req->object_requests[i].status = ObjectStatus_Local;
|
2016-12-09 19:26:11 -08:00
|
|
|
wait_req->num_satisfied += 1;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Add the wait request to the relevant data structures. */
|
|
|
|
add_wait_request_for_object(manager_state, obj_id,
|
|
|
|
wait_req->object_requests[i].type, wait_req);
|
|
|
|
|
|
|
|
if (wait_req->object_requests[i].type == PLASMA_QUERY_LOCAL) {
|
|
|
|
/* TODO(rkn): If desired, we could issue a fetch command here to retrieve
|
|
|
|
* the object. */
|
|
|
|
} else if (wait_req->object_requests[i].type == PLASMA_QUERY_ANYWHERE) {
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Add this object ID to the list of object IDs to request notifications
|
|
|
|
* for from the object table. */
|
|
|
|
object_ids_to_request[num_object_ids_to_request] = obj_id;
|
|
|
|
num_object_ids_to_request += 1;
|
2016-12-09 19:26:11 -08:00
|
|
|
} else {
|
|
|
|
/* This code should be unreachable. */
|
|
|
|
CHECK(0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* If enough of the wait requests have already been satisfied, return to the
|
|
|
|
* client. */
|
|
|
|
if (wait_req->num_satisfied >= wait_req->num_objects_to_wait_for) {
|
2016-12-10 21:22:05 -08:00
|
|
|
return_from_wait(manager_state, wait_req);
|
2016-12-18 18:19:02 -08:00
|
|
|
} else {
|
|
|
|
if (num_object_ids_to_request > 0) {
|
|
|
|
/* Request notifications from the object table when these object IDs
|
|
|
|
* become available. The notifications will call the callback that was
|
|
|
|
* passed to object_table_subscribe_to_notifications, which will update
|
|
|
|
* the wait request. */
|
|
|
|
retry_info retry;
|
|
|
|
memset(&retry, 0, sizeof(retry));
|
|
|
|
retry.num_retries = 0;
|
|
|
|
retry.timeout = MANAGER_TIMEOUT;
|
|
|
|
retry.fail_callback = fatal_table_callback;
|
|
|
|
object_table_request_notifications(manager_state->db,
|
|
|
|
num_object_ids_to_request,
|
|
|
|
object_ids_to_request, &retry);
|
|
|
|
}
|
2016-12-09 19:26:11 -08:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Set a timer that will cause the wait request to return to the client. */
|
|
|
|
wait_req->timer = event_loop_add_timer(manager_state->loop, timeout_ms,
|
|
|
|
wait_timeout_handler, wait_req);
|
|
|
|
}
|
|
|
|
free(object_ids_to_request);
|
2016-12-01 02:15:21 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Check whether a non-local object is stored on any remot enote or not.
|
|
|
|
*
|
|
|
|
* @param object_id ID of the object whose status we require.
|
|
|
|
* @param manager_cont Number of remote nodes object_id is stored at. If
|
|
|
|
* manager_count > 0, then object_id exists on a remote node an its
|
2016-12-20 14:46:25 -08:00
|
|
|
* status is ObjectStatus_Remote. Otherwise, if manager_count == 0, the
|
2016-12-01 02:15:21 -08:00
|
|
|
* object doesn't exist in the system and its status is
|
2016-12-20 14:46:25 -08:00
|
|
|
* ObjectStatus_Nonexistent.
|
2016-12-01 02:15:21 -08:00
|
|
|
* @param manager_vector Array containing the Plasma Managers running at the
|
|
|
|
* nodes where object_id is stored. Not used; it will be eventually
|
|
|
|
* deallocated.
|
|
|
|
* @param context Client connection.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
|
|
|
void request_status_done(object_id object_id,
|
|
|
|
int manager_count,
|
|
|
|
const char *manager_vector[],
|
|
|
|
void *context) {
|
|
|
|
client_connection *client_conn = (client_connection *) context;
|
2016-12-10 21:22:05 -08:00
|
|
|
int status =
|
|
|
|
request_status(object_id, manager_count, manager_vector, context);
|
2017-01-17 20:34:31 -08:00
|
|
|
warn_if_sigpipe(plasma_send_StatusReply(client_conn->fd,
|
|
|
|
client_conn->manager_state->builder,
|
|
|
|
&object_id, &status, 1),
|
|
|
|
client_conn->fd);
|
2016-12-01 02:15:21 -08:00
|
|
|
}
|
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
int request_status(object_id object_id,
|
|
|
|
int manager_count,
|
|
|
|
const char *manager_vector[],
|
|
|
|
void *context) {
|
2016-12-01 02:15:21 -08:00
|
|
|
client_connection *client_conn = (client_connection *) context;
|
|
|
|
|
|
|
|
/* Return success immediately if we already have this object. */
|
|
|
|
if (is_object_local(client_conn->manager_state, object_id)) {
|
2016-12-20 14:46:25 -08:00
|
|
|
return ObjectStatus_Local;
|
2016-12-01 02:15:21 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/* Since object is not stored at the local locally, manager_count > 0 means
|
|
|
|
* that the object is stored at another remote object. Otherwise, if
|
|
|
|
* manager_count == 0, the object is not stored anywhere. */
|
2016-12-20 14:46:25 -08:00
|
|
|
return (manager_count > 0 ? ObjectStatus_Remote : ObjectStatus_Nonexistent);
|
2016-12-01 02:15:21 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
void object_table_lookup_fail_callback(object_id object_id,
|
|
|
|
void *user_context,
|
|
|
|
void *user_data) {
|
2016-12-20 14:46:25 -08:00
|
|
|
/* Fail for now. Later, we may want to send a ObjectStatus_Nonexistent to the
|
2016-12-01 02:15:21 -08:00
|
|
|
* client. */
|
|
|
|
CHECK(0);
|
|
|
|
}
|
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
void process_status_request(client_connection *client_conn,
|
|
|
|
object_id object_id) {
|
2016-12-01 02:15:21 -08:00
|
|
|
/* Return success immediately if we already have this object. */
|
|
|
|
if (is_object_local(client_conn->manager_state, object_id)) {
|
2016-12-20 14:46:25 -08:00
|
|
|
int status = ObjectStatus_Local;
|
2017-01-17 20:34:31 -08:00
|
|
|
warn_if_sigpipe(plasma_send_StatusReply(client_conn->fd,
|
|
|
|
client_conn->manager_state->builder,
|
|
|
|
&object_id, &status, 1),
|
|
|
|
client_conn->fd);
|
2016-12-01 02:15:21 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (client_conn->manager_state->db == NULL) {
|
2016-12-20 14:46:25 -08:00
|
|
|
int status = ObjectStatus_Nonexistent;
|
2017-01-17 20:34:31 -08:00
|
|
|
warn_if_sigpipe(plasma_send_StatusReply(client_conn->fd,
|
|
|
|
client_conn->manager_state->builder,
|
|
|
|
&object_id, &status, 1),
|
|
|
|
client_conn->fd);
|
2016-12-01 02:15:21 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* The object is not local, so check whether it is stored remotely. */
|
|
|
|
retry_info retry = {
|
|
|
|
.num_retries = NUM_RETRIES,
|
|
|
|
.timeout = MANAGER_TIMEOUT,
|
|
|
|
.fail_callback = object_table_lookup_fail_callback,
|
|
|
|
};
|
|
|
|
|
2016-12-10 21:22:05 -08:00
|
|
|
object_table_lookup(client_conn->manager_state->db, object_id, &retry,
|
|
|
|
request_status_done, client_conn);
|
2016-12-01 02:15:21 -08:00
|
|
|
}
|
|
|
|
|
2016-12-19 23:18:57 -08:00
|
|
|
void process_delete_object_notification(plasma_manager_state *state,
|
|
|
|
object_info object_info) {
|
|
|
|
object_id obj_id = object_info.obj_id;
|
|
|
|
available_object *entry;
|
|
|
|
HASH_FIND(hh, state->local_available_objects, &obj_id, sizeof(obj_id), entry);
|
|
|
|
if (entry != NULL) {
|
|
|
|
HASH_DELETE(hh, state->local_available_objects, entry);
|
|
|
|
free(entry);
|
2016-10-29 17:30:34 -07:00
|
|
|
}
|
2016-12-19 23:18:57 -08:00
|
|
|
|
|
|
|
/* Remove this object from the (redis) object table. */
|
|
|
|
if (state->db) {
|
|
|
|
retry_info retry = {
|
|
|
|
.num_retries = NUM_RETRIES,
|
|
|
|
.timeout = MANAGER_TIMEOUT,
|
|
|
|
.fail_callback = NULL,
|
|
|
|
};
|
|
|
|
object_table_remove(state->db, obj_id, NULL, &retry, NULL, NULL);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* NOTE: There could be pending wait requests for this object that will now
|
|
|
|
* return when the object is not actually available. For simplicity, we allow
|
|
|
|
* this scenario rather than try to keep the wait request statuses exactly
|
|
|
|
* up-to-date. */
|
|
|
|
}
|
|
|
|
|
|
|
|
void process_add_object_notification(plasma_manager_state *state,
|
|
|
|
object_info object_info) {
|
|
|
|
object_id obj_id = object_info.obj_id;
|
2016-10-29 17:30:34 -07:00
|
|
|
available_object *entry =
|
|
|
|
(available_object *) malloc(sizeof(available_object));
|
|
|
|
entry->object_id = obj_id;
|
|
|
|
HASH_ADD(hh, state->local_available_objects, object_id, sizeof(object_id),
|
|
|
|
entry);
|
2016-12-07 17:25:40 -08:00
|
|
|
|
|
|
|
/* Add this object to the (redis) object table. */
|
|
|
|
if (state->db) {
|
|
|
|
/* TODO(swang): Log the error if we fail to add the object, and possibly
|
|
|
|
* retry later? */
|
2016-12-19 23:18:57 -08:00
|
|
|
retry_info retry = {
|
|
|
|
.num_retries = NUM_RETRIES,
|
|
|
|
.timeout = MANAGER_TIMEOUT,
|
|
|
|
.fail_callback = NULL,
|
|
|
|
};
|
2016-12-18 18:19:02 -08:00
|
|
|
object_table_add(state->db, obj_id,
|
|
|
|
object_info.data_size + object_info.metadata_size,
|
2016-12-09 00:51:44 -08:00
|
|
|
object_info.digest, &retry, NULL, NULL);
|
2016-12-07 17:25:40 -08:00
|
|
|
}
|
|
|
|
|
2016-12-03 19:09:05 -08:00
|
|
|
/* If we were trying to fetch this object, finish up the fetch request. */
|
2016-12-10 21:22:05 -08:00
|
|
|
fetch_request *fetch_req;
|
|
|
|
HASH_FIND(hh, state->fetch_requests, &obj_id, sizeof(obj_id), fetch_req);
|
2016-12-03 19:09:05 -08:00
|
|
|
if (fetch_req != NULL) {
|
|
|
|
remove_fetch_request(state, fetch_req);
|
2016-12-06 15:47:31 -08:00
|
|
|
/* TODO(rkn): We also really should unsubscribe from the object table. */
|
2016-12-03 19:09:05 -08:00
|
|
|
}
|
2016-12-09 19:26:11 -08:00
|
|
|
|
|
|
|
/* Update the in-progress local and remote wait requests. */
|
|
|
|
update_object_wait_requests(state, obj_id, PLASMA_QUERY_LOCAL,
|
2016-12-20 14:46:25 -08:00
|
|
|
ObjectStatus_Local);
|
2016-12-09 19:26:11 -08:00
|
|
|
update_object_wait_requests(state, obj_id, PLASMA_QUERY_ANYWHERE,
|
2016-12-20 14:46:25 -08:00
|
|
|
ObjectStatus_Local);
|
2016-10-29 17:30:34 -07:00
|
|
|
}
|
|
|
|
|
2016-12-19 23:18:57 -08:00
|
|
|
void process_object_notification(event_loop *loop,
|
|
|
|
int client_sock,
|
|
|
|
void *context,
|
|
|
|
int events) {
|
|
|
|
plasma_manager_state *state = context;
|
|
|
|
object_info object_info;
|
|
|
|
/* Read the notification from Plasma. */
|
|
|
|
int error =
|
|
|
|
read_bytes(client_sock, (uint8_t *) &object_info, sizeof(object_info));
|
|
|
|
if (error < 0) {
|
|
|
|
/* The store has closed the socket. */
|
|
|
|
LOG_DEBUG(
|
|
|
|
"The plasma store has closed the object notification socket, or some "
|
|
|
|
"other error has occurred.");
|
|
|
|
event_loop_remove_file(loop, client_sock);
|
|
|
|
close(client_sock);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* Add object to locally available object. */
|
|
|
|
if (object_info.is_deletion) {
|
|
|
|
process_delete_object_notification(state, object_info);
|
|
|
|
} else {
|
|
|
|
process_add_object_notification(state, object_info);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-03 18:29:18 -07:00
|
|
|
void process_message(event_loop *loop,
|
|
|
|
int client_sock,
|
|
|
|
void *context,
|
|
|
|
int events) {
|
|
|
|
client_connection *conn = (client_connection *) context;
|
|
|
|
|
2016-12-20 14:46:25 -08:00
|
|
|
int64_t length;
|
2016-10-03 18:29:18 -07:00
|
|
|
int64_t type;
|
2016-12-20 14:46:25 -08:00
|
|
|
uint8_t *data;
|
|
|
|
read_message(client_sock, &type, &length, &data);
|
2016-10-03 18:29:18 -07:00
|
|
|
|
|
|
|
switch (type) {
|
2016-12-20 14:46:25 -08:00
|
|
|
case MessageType_PlasmaDataRequest: {
|
|
|
|
LOG_DEBUG("Processing data request");
|
|
|
|
object_id object_id;
|
|
|
|
char *address;
|
|
|
|
int port;
|
|
|
|
plasma_read_DataRequest(data, &object_id, &address, &port);
|
|
|
|
process_transfer_request(loop, object_id, address, port, conn);
|
|
|
|
free(address);
|
|
|
|
} break;
|
|
|
|
case MessageType_PlasmaDataReply: {
|
|
|
|
LOG_DEBUG("Processing data reply");
|
|
|
|
object_id object_id;
|
|
|
|
int64_t object_size;
|
|
|
|
int64_t metadata_size;
|
|
|
|
plasma_read_DataReply(data, &object_id, &object_size, &metadata_size);
|
|
|
|
process_data_request(loop, client_sock, object_id, object_size,
|
|
|
|
metadata_size, conn);
|
|
|
|
} break;
|
|
|
|
case MessageType_PlasmaFetchRequest: {
|
2016-12-03 19:09:05 -08:00
|
|
|
LOG_DEBUG("Processing fetch remote");
|
2016-12-20 14:46:25 -08:00
|
|
|
int64_t num_objects = plasma_read_FetchRequest_num_objects(data);
|
2017-02-04 16:49:36 -08:00
|
|
|
object_id *object_ids_to_fetch = malloc(num_objects * sizeof(object_id));
|
|
|
|
/* TODO(pcm): process_fetch_requests allocates an array of num_objects
|
|
|
|
* object_ids too so these should be shared in the future. */
|
2016-12-20 14:46:25 -08:00
|
|
|
plasma_read_FetchRequest(data, object_ids_to_fetch, num_objects);
|
|
|
|
process_fetch_requests(conn, num_objects, &object_ids_to_fetch[0]);
|
2017-02-04 16:49:36 -08:00
|
|
|
free(object_ids_to_fetch);
|
2016-12-20 14:46:25 -08:00
|
|
|
} break;
|
|
|
|
case MessageType_PlasmaWaitRequest: {
|
2016-10-29 17:30:34 -07:00
|
|
|
LOG_DEBUG("Processing wait");
|
2016-12-20 14:46:25 -08:00
|
|
|
int num_object_ids = plasma_read_WaitRequest_num_object_ids(data);
|
2017-02-04 16:49:36 -08:00
|
|
|
object_request *object_requests =
|
|
|
|
malloc(num_object_ids * sizeof(object_request));
|
2016-12-20 14:46:25 -08:00
|
|
|
int64_t timeout_ms;
|
|
|
|
int num_ready_objects;
|
|
|
|
plasma_read_WaitRequest(data, &object_requests[0], num_object_ids,
|
|
|
|
&timeout_ms, &num_ready_objects);
|
2017-02-04 16:49:36 -08:00
|
|
|
/* TODO(pcm): process_wait_requests allocates an array of num_object_ids
|
|
|
|
* object_requests too so these could be shared in the future. */
|
2016-12-20 14:46:25 -08:00
|
|
|
process_wait_request(conn, num_object_ids, &object_requests[0], timeout_ms,
|
|
|
|
num_ready_objects);
|
2017-02-04 16:49:36 -08:00
|
|
|
free(object_requests);
|
2016-12-20 14:46:25 -08:00
|
|
|
} break;
|
|
|
|
case MessageType_PlasmaStatusRequest: {
|
2016-12-01 02:15:21 -08:00
|
|
|
LOG_DEBUG("Processing status");
|
2016-12-20 14:46:25 -08:00
|
|
|
object_id object_id;
|
|
|
|
int64_t num_objects = plasma_read_StatusRequest_num_objects(data);
|
|
|
|
CHECK(num_objects == 1);
|
|
|
|
plasma_read_StatusRequest(data, &object_id, 1);
|
|
|
|
process_status_request(conn, object_id);
|
|
|
|
} break;
|
2016-10-03 18:29:18 -07:00
|
|
|
case DISCONNECT_CLIENT: {
|
|
|
|
LOG_INFO("Disconnecting client on fd %d", client_sock);
|
2016-10-18 18:20:59 -07:00
|
|
|
/* TODO(swang): Check if this connection was to a plasma manager. If so,
|
|
|
|
* delete it. */
|
2016-10-03 18:29:18 -07:00
|
|
|
event_loop_remove_file(loop, client_sock);
|
|
|
|
close(client_sock);
|
|
|
|
free(conn);
|
|
|
|
} break;
|
2016-09-08 15:28:27 -07:00
|
|
|
default:
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL("invalid request %" PRId64, type);
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
2016-12-20 14:46:25 -08:00
|
|
|
free(data);
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
|
|
|
|
2016-10-28 21:26:54 -07:00
|
|
|
/* TODO(pcm): Split this into two methods: new_worker_connection
|
|
|
|
* and new_manager_connection and also split client_connection
|
|
|
|
* into two structs, one for workers and one for other plasma managers. */
|
2016-10-28 11:56:16 -07:00
|
|
|
client_connection *new_client_connection(event_loop *loop,
|
|
|
|
int listener_sock,
|
|
|
|
void *context,
|
|
|
|
int events) {
|
2016-10-03 18:29:18 -07:00
|
|
|
int new_socket = accept_client(listener_sock);
|
|
|
|
/* Create a new data connection context per client. */
|
|
|
|
client_connection *conn = malloc(sizeof(client_connection));
|
|
|
|
conn->manager_state = (plasma_manager_state *) context;
|
2016-10-28 11:56:16 -07:00
|
|
|
conn->cursor = 0;
|
2016-10-03 18:29:18 -07:00
|
|
|
conn->transfer_queue = NULL;
|
2016-10-18 18:20:59 -07:00
|
|
|
conn->fd = new_socket;
|
|
|
|
conn->active_objects = NULL;
|
|
|
|
conn->num_return_objects = 0;
|
2016-10-03 18:29:18 -07:00
|
|
|
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, conn);
|
2016-10-28 21:26:54 -07:00
|
|
|
LOG_DEBUG("New client connection with fd %d", new_socket);
|
2016-10-28 11:56:16 -07:00
|
|
|
return conn;
|
|
|
|
}
|
|
|
|
|
|
|
|
void handle_new_client(event_loop *loop,
|
|
|
|
int listener_sock,
|
|
|
|
void *context,
|
|
|
|
int events) {
|
|
|
|
(void) new_client_connection(loop, listener_sock, context, events);
|
|
|
|
}
|
|
|
|
|
|
|
|
int get_client_sock(client_connection *conn) {
|
|
|
|
return conn->fd;
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
|
|
|
|
2016-09-15 15:39:33 -07:00
|
|
|
void start_server(const char *store_socket_name,
|
2016-10-28 21:26:54 -07:00
|
|
|
const char *manager_socket_name,
|
2016-09-15 15:39:33 -07:00
|
|
|
const char *master_addr,
|
2016-10-18 18:20:59 -07:00
|
|
|
int port,
|
|
|
|
const char *db_addr,
|
|
|
|
int db_port) {
|
2017-01-17 20:34:31 -08:00
|
|
|
/* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
|
|
|
|
* to a client that has already died, the manager could die. */
|
|
|
|
signal(SIGPIPE, SIG_IGN);
|
2016-11-02 00:09:04 -07:00
|
|
|
/* Bind the sockets before we try to connect to the plasma store.
|
|
|
|
* In case the bind does not succeed, we want to be able to exit
|
|
|
|
* without breaking the pipe to the store. */
|
|
|
|
int remote_sock = bind_inet_sock(port, false);
|
2016-10-31 15:00:15 -07:00
|
|
|
if (remote_sock < 0) {
|
|
|
|
exit(EXIT_COULD_NOT_BIND_PORT);
|
|
|
|
}
|
2016-11-02 00:09:04 -07:00
|
|
|
|
|
|
|
int local_sock = bind_ipc_sock(manager_socket_name, false);
|
2016-10-28 21:26:54 -07:00
|
|
|
CHECKM(local_sock >= 0, "Unable to bind local manager socket");
|
2016-10-28 11:56:16 -07:00
|
|
|
|
2016-12-20 20:21:35 -08:00
|
|
|
g_manager_state =
|
|
|
|
init_plasma_manager_state(store_socket_name, manager_socket_name,
|
|
|
|
master_addr, port, db_addr, db_port);
|
2016-11-02 00:09:04 -07:00
|
|
|
CHECK(g_manager_state);
|
|
|
|
|
|
|
|
CHECK(listen(remote_sock, 5) != -1);
|
|
|
|
CHECK(listen(local_sock, 5) != -1);
|
|
|
|
|
2016-10-18 18:20:59 -07:00
|
|
|
LOG_DEBUG("Started server connected to store %s, listening on port %d",
|
|
|
|
store_socket_name, port);
|
2016-10-28 21:26:54 -07:00
|
|
|
event_loop_add_file(g_manager_state->loop, local_sock, EVENT_LOOP_READ,
|
|
|
|
handle_new_client, g_manager_state);
|
|
|
|
event_loop_add_file(g_manager_state->loop, remote_sock, EVENT_LOOP_READ,
|
2016-10-28 11:56:16 -07:00
|
|
|
handle_new_client, g_manager_state);
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Set up a client-specific channel to receive notifications from the object
|
|
|
|
* table. */
|
2016-12-19 21:07:25 -08:00
|
|
|
object_table_subscribe_to_notifications(g_manager_state->db, false,
|
2016-12-18 18:19:02 -08:00
|
|
|
object_table_subscribe_callback,
|
|
|
|
g_manager_state, NULL, NULL, NULL);
|
2017-02-01 12:21:52 -08:00
|
|
|
/* Set up a recurring timer that will loop through the outstanding fetch
|
|
|
|
* requests and reissue requests for transfers of those objects. */
|
|
|
|
event_loop_add_timer(g_manager_state->loop, MANAGER_TIMEOUT,
|
|
|
|
fetch_timeout_handler, g_manager_state);
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Run the event loop. */
|
2016-10-18 18:20:59 -07:00
|
|
|
event_loop_run(g_manager_state->loop);
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
|
|
|
|
2016-10-11 17:58:14 -07:00
|
|
|
/* Report "success" to valgrind. */
|
|
|
|
void signal_handler(int signal) {
|
|
|
|
if (signal == SIGTERM) {
|
2016-10-18 18:20:59 -07:00
|
|
|
if (g_manager_state) {
|
|
|
|
db_disconnect(g_manager_state->db);
|
|
|
|
}
|
2016-10-11 17:58:14 -07:00
|
|
|
exit(0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-28 11:56:16 -07:00
|
|
|
/* Only declare the main function if we are not in testing mode, since the test
|
|
|
|
* suite has its own declaration of main. */
|
|
|
|
#ifndef PLASMA_TEST
|
2016-09-15 15:39:33 -07:00
|
|
|
int main(int argc, char *argv[]) {
|
2016-10-11 17:58:14 -07:00
|
|
|
signal(SIGTERM, signal_handler);
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Socket name of the plasma store this manager is connected to. */
|
2016-09-15 15:39:33 -07:00
|
|
|
char *store_socket_name = NULL;
|
2016-10-28 21:26:54 -07:00
|
|
|
/* Socket name this manager will bind to. */
|
|
|
|
char *manager_socket_name = NULL;
|
2016-09-07 20:19:37 -07:00
|
|
|
/* IP address of this node. */
|
2016-09-15 15:39:33 -07:00
|
|
|
char *master_addr = NULL;
|
2016-09-07 20:19:37 -07:00
|
|
|
/* Port number the manager should use. */
|
2016-10-28 21:26:54 -07:00
|
|
|
int port = -1;
|
2016-10-18 18:20:59 -07:00
|
|
|
/* IP address and port of state database. */
|
|
|
|
char *db_host = NULL;
|
2016-08-17 12:54:34 -07:00
|
|
|
int c;
|
2016-10-28 21:26:54 -07:00
|
|
|
while ((c = getopt(argc, argv, "s:m:h:p:r:")) != -1) {
|
2016-08-17 12:54:34 -07:00
|
|
|
switch (c) {
|
|
|
|
case 's':
|
|
|
|
store_socket_name = optarg;
|
|
|
|
break;
|
|
|
|
case 'm':
|
2016-10-28 21:26:54 -07:00
|
|
|
manager_socket_name = optarg;
|
|
|
|
break;
|
|
|
|
case 'h':
|
2016-08-17 12:54:34 -07:00
|
|
|
master_addr = optarg;
|
|
|
|
break;
|
2016-08-22 15:30:16 -07:00
|
|
|
case 'p':
|
|
|
|
port = atoi(optarg);
|
|
|
|
break;
|
2016-10-28 21:26:54 -07:00
|
|
|
case 'r':
|
2016-10-18 18:20:59 -07:00
|
|
|
db_host = optarg;
|
|
|
|
break;
|
2016-08-17 12:54:34 -07:00
|
|
|
default:
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL("unknown option %c", c);
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if (!store_socket_name) {
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL(
|
2016-09-08 15:28:27 -07:00
|
|
|
"please specify socket for connecting to the plasma store with -s "
|
|
|
|
"switch");
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
2016-10-28 21:26:54 -07:00
|
|
|
if (!manager_socket_name) {
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL(
|
2016-10-28 21:26:54 -07:00
|
|
|
"please specify socket name of the manager's local socket with -m "
|
|
|
|
"switch");
|
|
|
|
}
|
2016-08-17 12:54:34 -07:00
|
|
|
if (!master_addr) {
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL(
|
2016-09-08 15:28:27 -07:00
|
|
|
"please specify ip address of the current host in the format "
|
2016-10-31 15:00:15 -07:00
|
|
|
"123.456.789.10 with -h switch");
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
2016-10-28 21:26:54 -07:00
|
|
|
if (port == -1) {
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL(
|
2016-10-28 21:26:54 -07:00
|
|
|
"please specify port the plasma manager shall listen to in the"
|
|
|
|
"format 12345 with -p switch");
|
|
|
|
}
|
2016-10-18 18:20:59 -07:00
|
|
|
char db_addr[16];
|
|
|
|
int db_port;
|
|
|
|
if (db_host) {
|
|
|
|
parse_ip_addr_port(db_host, db_addr, &db_port);
|
2016-10-28 21:26:54 -07:00
|
|
|
start_server(store_socket_name, manager_socket_name, master_addr, port,
|
|
|
|
db_addr, db_port);
|
2016-10-18 18:20:59 -07:00
|
|
|
} else {
|
2016-10-28 21:26:54 -07:00
|
|
|
start_server(store_socket_name, manager_socket_name, master_addr, port,
|
|
|
|
NULL, 0);
|
2016-10-18 18:20:59 -07:00
|
|
|
}
|
2016-08-17 12:54:34 -07:00
|
|
|
}
|
2016-10-28 11:56:16 -07:00
|
|
|
#endif
|