Convert plasma/plasma_store.cc to use STL (#324)

* Change plasma_store.c to C++ (clobbering existing FlatBuffers usage).

* Convert plasma_store.cc to use STL (with a caveat)

* Fix CMakeLists and mutation-while-iterating problem

* Remove extra extern "C" declarations

* Remove redundant -std=c++11 from plasma/CMakeLists.txt
This commit is contained in:
Richard Shin 2017-04-01 13:58:10 +08:00 committed by Philipp Moritz
parent f1b48f2fd4
commit 227c916c25
2 changed files with 112 additions and 156 deletions

View file

@ -5,8 +5,6 @@ project(plasma)
# Recursively include common # Recursively include common
include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake) include(${CMAKE_CURRENT_LIST_DIR}/../common/cmake/Common.cmake)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
if(APPLE) if(APPLE)
SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so") SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so")
endif(APPLE) endif(APPLE)
@ -14,6 +12,7 @@ endif(APPLE)
include_directories("${PYTHON_INCLUDE_DIRS}" thirdparty) include_directories("${PYTHON_INCLUDE_DIRS}" thirdparty)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} --std=c99 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} --std=c++11 -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L")
# Compile flatbuffers # Compile flatbuffers

View file

@ -25,6 +25,9 @@
#include <limits.h> #include <limits.h>
#include <poll.h> #include <poll.h>
#include <unordered_map>
#include <vector>
#include "common.h" #include "common.h"
#include "format/common_generated.h" #include "format/common_generated.h"
#include "event_loop.h" #include "event_loop.h"
@ -44,8 +47,24 @@ void *dlmemalign(size_t alignment, size_t bytes);
void dlfree(void *); void dlfree(void *);
} }
namespace std {
template <>
struct hash<UniqueID> {
size_t operator()(const UniqueID &unique_id) const {
return *reinterpret_cast<const size_t *>(unique_id.id + UNIQUE_ID_SIZE -
sizeof(size_t));
}
};
} // namespace std
bool operator==(UniqueID a, UniqueID b) {
return UNIQUE_ID_EQ(a, b);
}
/** Contains all information that is associated with a Plasma store client. */ /** Contains all information that is associated with a Plasma store client. */
struct Client { struct Client {
Client(int sock, PlasmaStoreState *plasma_state);
/** The socket used to communicate with the client. */ /** The socket used to communicate with the client. */
int sock; int sock;
/** A pointer to the global plasma state. */ /** A pointer to the global plasma state. */
@ -70,46 +89,35 @@ typedef struct {
UT_hash_handle hh; UT_hash_handle hh;
} NotificationQueue; } NotificationQueue;
typedef struct { struct GetRequest {
GetRequest(Client *client, int num_object_ids, ObjectID object_ids[]);
/** The client connection that called get. */ /** The client connection that called get. */
Client *client; Client *client;
/** The ID of the timer that will time out and cause this wait to return to /** The ID of the timer that will time out and cause this wait to return to
* the client if it hasn't already returned. */ * the client if it hasn't already returned. */
int64_t timer; int64_t timer;
/** The number of objects in this get request. */
int64_t num_object_ids;
/** The object IDs involved in this request. This is used in the reply. */ /** The object IDs involved in this request. This is used in the reply. */
ObjectID *object_ids; std::vector<ObjectID> object_ids;
/** The object information for the objects in this request. This is used in /** The object information for the objects in this request. This is used in
* the reply. */ * the reply. */
PlasmaObject *objects; std::vector<PlasmaObject> objects;
/** The minimum number of objects to wait for in this request. */ /** The minimum number of objects to wait for in this request. */
int64_t num_objects_to_wait_for; int64_t num_objects_to_wait_for;
/** The number of object requests in this wait request that are already /** The number of object requests in this wait request that are already
* satisfied. */ * satisfied. */
int64_t num_satisfied; int64_t num_satisfied;
} GetRequest; };
typedef struct {
/** The ID of the object. This is used as a key in a hash table. */
ObjectID object_id;
/** An array of the get requests involving this object ID. */
UT_array *get_requests;
/** Handle for the uthash table in the store state that keeps track of the get
* requests involving this object ID. */
UT_hash_handle hh;
} ObjectGetRequests;
/** This is used to define the utarray of get requests in the
* ObjectGetRequests struct. */
UT_icd get_request_icd = {sizeof(GetRequest *), NULL, NULL, NULL};
struct PlasmaStoreState { struct PlasmaStoreState {
PlasmaStoreState(event_loop *loop, int64_t system_memory);
/* Event loop of the plasma store. */ /* Event loop of the plasma store. */
event_loop *loop; event_loop *loop;
/** A hash table mapping object IDs to a vector of the get requests that are /** A hash table mapping object IDs to a vector of the get requests that are
* waiting for the object to arrive. */ * waiting for the object to arrive. */
ObjectGetRequests *object_get_requests; std::unordered_map<ObjectID, std::vector<GetRequest *>> object_get_requests;
/** The pending notifications that have not been sent to subscribers because /** The pending notifications that have not been sent to subscribers because
* the socket send buffers were full. This is a hash table from client file * the socket send buffers were full. This is a hash table from client file
* descriptor to an array of object_ids to send to that client. */ * descriptor to an array of object_ids to send to that client. */
@ -130,23 +138,29 @@ PlasmaStoreState *g_state;
UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL};
PlasmaStoreState *PlasmaStoreState_init(event_loop *loop, Client::Client(int sock, PlasmaStoreState *plasma_state)
int64_t system_memory) { : sock(sock), plasma_state(plasma_state) {}
PlasmaStoreState *state =
(PlasmaStoreState *) malloc(sizeof(PlasmaStoreState)); GetRequest::GetRequest(Client *client,
state->loop = loop; int num_object_ids,
state->object_get_requests = NULL; ObjectID object_ids[])
state->pending_notifications = NULL; : client(client),
/* Initialize the plasma store info. */ timer(-1),
state->plasma_store_info = object_ids(object_ids, object_ids + num_object_ids),
(PlasmaStoreInfo *) malloc(sizeof(PlasmaStoreInfo)); objects(num_object_ids),
state->plasma_store_info->objects = NULL; num_objects_to_wait_for(num_object_ids),
state->plasma_store_info->memory_capacity = system_memory; num_satisfied(0) {}
/* Initialize the eviction state. */
state->eviction_state = EvictionState_init(); PlasmaStoreState::PlasmaStoreState(event_loop *loop, int64_t system_memory)
utarray_new(state->input_buffer, &byte_icd); : loop(loop),
state->builder = make_protocol_builder(); pending_notifications(NULL),
return state; plasma_store_info((PlasmaStoreInfo *) malloc(sizeof(PlasmaStoreInfo))),
eviction_state(EvictionState_init()),
builder(make_protocol_builder()) {
this->plasma_store_info->objects = NULL;
this->plasma_store_info->memory_capacity = system_memory;
utarray_new(this->input_buffer, &byte_icd);
} }
void PlasmaStoreState_free(PlasmaStoreState *state) { void PlasmaStoreState_free(PlasmaStoreState *state) {
@ -279,55 +293,27 @@ int create_object(Client *client_context,
void add_get_request_for_object(PlasmaStoreState *store_state, void add_get_request_for_object(PlasmaStoreState *store_state,
ObjectID object_id, ObjectID object_id,
GetRequest *get_req) { GetRequest *get_req) {
ObjectGetRequests *object_get_reqs; store_state->object_get_requests[object_id].push_back(get_req);
HASH_FIND(hh, store_state->object_get_requests, &object_id, sizeof(object_id),
object_get_reqs);
/* If there are currently no get requests involving this object ID, create a
* new ObjectGetRequests struct for this object ID and add it to the hash
* table. */
if (object_get_reqs == NULL) {
object_get_reqs = (ObjectGetRequests *) malloc(sizeof(ObjectGetRequests));
object_get_reqs->object_id = object_id;
utarray_new(object_get_reqs->get_requests, &get_request_icd);
HASH_ADD(hh, store_state->object_get_requests, object_id,
sizeof(object_get_reqs->object_id), object_get_reqs);
}
/* Add this get request to the vector of get requests involving this object
* ID. */
utarray_push_back(object_get_reqs->get_requests, &get_req);
} }
void remove_get_request_for_object(PlasmaStoreState *store_state, void remove_get_request_for_object(PlasmaStoreState *store_state,
ObjectID object_id, ObjectID object_id,
GetRequest *get_req) { GetRequest *get_req) {
ObjectGetRequests *object_get_reqs; std::vector<GetRequest *> &get_requests =
HASH_FIND(hh, store_state->object_get_requests, &object_id, sizeof(object_id), store_state->object_get_requests[object_id];
object_get_reqs); for (auto it = get_requests.begin(); it != get_requests.end(); ++it) {
/* If there is a vector of get requests for this object ID, and if this vector if (*it == get_req) {
* contains the get request, then remove the get request from the vector. */ get_requests.erase(it);
if (object_get_reqs != NULL) {
for (int i = 0; i < utarray_len(object_get_reqs->get_requests); ++i) {
GetRequest **get_req_ptr =
(GetRequest **) utarray_eltptr(object_get_reqs->get_requests, i);
if (*get_req_ptr == get_req) {
/* Remove the get request from the array. */
utarray_erase(object_get_reqs->get_requests, i, 1);
break; break;
} }
} }
/* In principle, if there are no more get requests involving this object ID,
* then we could remove the object_get_reqs struct. However, the
* object_get_reqs struct gets removed in update_object_get_requests. */
}
} }
void remove_get_request(PlasmaStoreState *store_state, GetRequest *get_req) { void remove_get_request(PlasmaStoreState *store_state, GetRequest *get_req) {
if (get_req->timer != -1) { if (get_req->timer != -1) {
CHECK(event_loop_remove_timer(store_state->loop, get_req->timer) == AE_OK); CHECK(event_loop_remove_timer(store_state->loop, get_req->timer) == AE_OK);
} }
free(get_req->object_ids); delete get_req;
free(get_req->objects);
free(get_req);
} }
void PlasmaObject_init(PlasmaObject *object, object_table_entry *entry) { void PlasmaObject_init(PlasmaObject *object, object_table_entry *entry) {
@ -344,20 +330,19 @@ void PlasmaObject_init(PlasmaObject *object, object_table_entry *entry) {
void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) { void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) {
/* Send the get reply to the client. */ /* Send the get reply to the client. */
int status = plasma_send_GetReply(get_req->client->sock, store_state->builder, int status = plasma_send_GetReply(
get_req->object_ids, get_req->objects, get_req->client->sock, store_state->builder, &get_req->object_ids[0],
get_req->num_object_ids); &get_req->objects[0], get_req->object_ids.size());
warn_if_sigpipe(status, get_req->client->sock); warn_if_sigpipe(status, get_req->client->sock);
/* If we successfully sent the get reply message to the client, then also send /* If we successfully sent the get reply message to the client, then also send
* the file descriptors. */ * the file descriptors. */
if (status >= 0) { if (status >= 0) {
/* Send all of the file descriptors for the present objects. */ /* Send all of the file descriptors for the present objects. */
for (int i = 0; i < get_req->num_object_ids; ++i) { for (PlasmaObject &object : get_req->objects) {
/* We use the data size to indicate whether the object is present or not. /* We use the data size to indicate whether the object is present or not.
*/ */
if (get_req->objects[i].data_size != -1) { if (object.data_size != -1) {
int error_code = int error_code = send_fd(get_req->client->sock, object.handle.store_fd);
send_fd(get_req->client->sock, get_req->objects[i].handle.store_fd);
/* If we failed to send the file descriptor, loop until we have sent it /* If we failed to send the file descriptor, loop until we have sent it
* successfully. TODO(rkn): This is problematic for two reasons. First * successfully. TODO(rkn): This is problematic for two reasons. First
* of all, sending the file descriptor should just succeed without any * of all, sending the file descriptor should just succeed without any
@ -367,8 +352,7 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) {
while (error_code < 0) { while (error_code < 0) {
if (errno == EMSGSIZE) { if (errno == EMSGSIZE) {
LOG_WARN("Failed to send file descriptor, retrying."); LOG_WARN("Failed to send file descriptor, retrying.");
error_code = send_fd(get_req->client->sock, error_code = send_fd(get_req->client->sock, object.handle.store_fd);
get_req->objects[i].handle.store_fd);
continue; continue;
} }
warn_if_sigpipe(error_code, get_req->client->sock); warn_if_sigpipe(error_code, get_req->client->sock);
@ -381,8 +365,8 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) {
/* Remove the get request from each of the relevant object_get_requests hash /* Remove the get request from each of the relevant object_get_requests hash
* tables if it is present there. It should only be present there if the get * tables if it is present there. It should only be present there if the get
* request timed out. */ * request timed out. */
for (int i = 0; i < get_req->num_object_ids; ++i) { for (ObjectID &object_id : get_req->object_ids) {
remove_get_request_for_object(store_state, get_req->object_ids[i], get_req); remove_get_request_for_object(store_state, object_id, get_req);
} }
/* Remove the get request. */ /* Remove the get request. */
remove_get_request(store_state, get_req); remove_get_request(store_state, get_req);
@ -390,24 +374,12 @@ void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) {
void update_object_get_requests(PlasmaStoreState *store_state, void update_object_get_requests(PlasmaStoreState *store_state,
ObjectID obj_id) { ObjectID obj_id) {
/* Update the in-progress get requests. */ std::vector<GetRequest *> &get_requests =
ObjectGetRequests *object_get_reqs; store_state->object_get_requests[obj_id];
HASH_FIND(hh, store_state->object_get_requests, &obj_id, sizeof(obj_id),
object_get_reqs);
if (object_get_reqs != NULL) {
/* We compute the number of requests first because the length of the utarray
* will change as we iterate over it (because each call to return_from_get
* will remove one element). */
int num_requests = utarray_len(object_get_reqs->get_requests);
/* The argument index is the index of the current element of the utarray
* that we are processing. It may differ from the counter i when elements
* are removed from the array. */
int index = 0; int index = 0;
int num_requests = get_requests.size();
for (int i = 0; i < num_requests; ++i) { for (int i = 0; i < num_requests; ++i) {
GetRequest **get_req_ptr = GetRequest *get_req = get_requests[index];
(GetRequest **) utarray_eltptr(object_get_reqs->get_requests, index);
GetRequest *get_req = *get_req_ptr;
int num_updated = 0; int num_updated = 0;
for (int j = 0; j < get_req->num_objects_to_wait_for; ++j) { for (int j = 0; j < get_req->num_objects_to_wait_for; ++j) {
object_table_entry *entry; object_table_entry *entry;
@ -439,13 +411,11 @@ void update_object_get_requests(PlasmaStoreState *store_state,
index += 1; index += 1;
} }
} }
DCHECK(index == utarray_len(object_get_reqs->get_requests));
DCHECK(index == get_requests.size());
/* Remove the array of get requests for this object, since no one should be /* Remove the array of get requests for this object, since no one should be
* waiting for this object anymore. */ * waiting for this object anymore. */
HASH_DELETE(hh, store_state->object_get_requests, object_get_reqs); store_state->object_get_requests.erase(obj_id);
utarray_free(object_get_reqs->get_requests);
free(object_get_reqs);
}
} }
int get_timeout_handler(event_loop *loop, timer_id id, void *context) { int get_timeout_handler(event_loop *loop, timer_id id, void *context) {
@ -461,19 +431,8 @@ void process_get_request(Client *client_context,
PlasmaStoreState *plasma_state = client_context->plasma_state; PlasmaStoreState *plasma_state = client_context->plasma_state;
/* Create a get request for this object. */ /* Create a get request for this object. */
GetRequest *get_req = (GetRequest *) malloc(sizeof(GetRequest)); GetRequest *get_req =
memset(get_req, 0, sizeof(GetRequest)); new GetRequest(client_context, num_object_ids, object_ids);
get_req->client = client_context;
get_req->timer = -1;
get_req->num_object_ids = num_object_ids;
get_req->object_ids = (ObjectID *) malloc(num_object_ids * sizeof(ObjectID));
get_req->objects =
(PlasmaObject *) malloc(num_object_ids * sizeof(PlasmaObject));
for (int i = 0; i < num_object_ids; ++i) {
get_req->object_ids[i] = object_ids[i];
}
get_req->num_objects_to_wait_for = num_object_ids;
get_req->num_satisfied = 0;
for (int i = 0; i < num_object_ids; ++i) { for (int i = 0; i < num_object_ids; ++i) {
ObjectID obj_id = object_ids[i]; ObjectID obj_id = object_ids[i];
@ -858,9 +817,7 @@ void new_client_connection(event_loop *loop,
int new_socket = accept_client(listener_sock); int new_socket = accept_client(listener_sock);
/* Create a new client object. This will also be used as the context to use /* Create a new client object. This will also be used as the context to use
* for events on this client's socket. TODO(rkn): free this somewhere. */ * for events on this client's socket. TODO(rkn): free this somewhere. */
Client *client_context = (Client *) malloc(sizeof(Client)); Client *client_context = new Client(new_socket, plasma_state);
client_context->sock = new_socket;
client_context->plasma_state = plasma_state;
/* Add a callback to handle events on this socket. */ /* Add a callback to handle events on this socket. */
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message,
client_context); client_context);
@ -881,7 +838,7 @@ void start_server(char *socket_name, int64_t system_memory) {
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
/* Create the event loop. */ /* Create the event loop. */
event_loop *loop = event_loop_create(); event_loop *loop = event_loop_create();
PlasmaStoreState *state = PlasmaStoreState_init(loop, system_memory); PlasmaStoreState *state = new PlasmaStoreState(loop, system_memory);
int socket = bind_ipc_sock(socket_name, true); int socket = bind_ipc_sock(socket_name, true);
CHECK(socket >= 0); CHECK(socket >= 0);
event_loop_add_file(loop, socket, EVENT_LOOP_READ, new_client_connection, event_loop_add_file(loop, socket, EVENT_LOOP_READ, new_client_connection,