Plasma manager performance: speed up wait with a wait request object map (#427)

* plasma manager perf: speedup wait with a wait request object map

* removing duplicate == operator in plasma store

* fix serialization test

* code cleanup

* minor cleanup

* factoring out uniqueid hash and equality operators into common

* plasma manager: c++ify the WaitRequest struct

* plasma manager: get rid of the initial object request malloc

* cleanup

* linting

* cleanups and fix compiler warnings

* compiler warnings and linting
This commit is contained in:
Alexey Tumanov 2017-04-07 12:32:12 -07:00 committed by Philipp Moritz
parent 7af6f462fb
commit 6f9225490b
8 changed files with 138 additions and 114 deletions

View file

@ -7,6 +7,7 @@
#include <fcntl.h>
#include "io.h"
#include <functional>
/* This is used to define the array of object IDs. */
const UT_icd object_id_icd = {sizeof(ObjectID), NULL, NULL, NULL};
@ -29,6 +30,11 @@ UniqueID globally_unique_id(void) {
return result;
}
/* ObjectID equality function. */
bool operator==(const ObjectID &x, const ObjectID &y) {
return UNIQUE_ID_EQ(x, y);
}
bool ObjectID_equal(ObjectID first_id, ObjectID second_id) {
return UNIQUE_ID_EQ(first_id, second_id);
}

View file

@ -13,6 +13,7 @@
#include "utarray.h"
#ifdef __cplusplus
#include <functional>
extern "C" {
#endif
#include "sha256.h"
@ -152,6 +153,20 @@ UniqueID globally_unique_id(void);
typedef UniqueID ObjectID;
#ifdef __cplusplus
struct UniqueIDHasher {
/* ObjectID hashing function. */
size_t operator()(const UniqueID &id) const {
size_t result;
memcpy(&result, id.id + UNIQUE_ID_SIZE - sizeof(size_t), sizeof(size_t));
return result;
}
};
bool operator==(const ObjectID &x, const ObjectID &y);
#endif
#define ID_STRING_SIZE (2 * UNIQUE_ID_SIZE + 1)
/**

View file

@ -10,6 +10,8 @@
#include <string.h>
#include <unistd.h> /* pid_t */
#include <unordered_map>
#include "common.h"
#include "format/common_generated.h"
@ -42,6 +44,10 @@ typedef struct {
int status;
} ObjectRequest;
/** Mapping from object IDs to type and status of the request. */
typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher>
ObjectRequestMap;
/* Handle to access memory mapped file and map it into client address space. */
typedef struct {
/** The file descriptor of the memory mapped file in the store. It is used as

View file

@ -21,6 +21,9 @@
#include <assert.h>
#include <netinet/in.h>
/* C++ includes. */
#include <unordered_map>
#include "uthash.h"
#include "utlist.h"
#include "utarray.h"
@ -147,7 +150,20 @@ typedef struct {
* does some processing for each WaitRequest involved in that
* ObjectWaitRequests struct.
*/
typedef struct {
struct WaitRequest {
WaitRequest(ClientConnection *client_conn,
int64_t timer,
int64_t num_object_requests,
ObjectRequestMap &&object_requests,
int64_t num_objects_to_wait_for,
int64_t num_satisfied)
: client_conn(client_conn),
timer(timer),
num_object_requests(num_object_requests),
object_requests(object_requests),
num_objects_to_wait_for(num_objects_to_wait_for),
num_satisfied(num_satisfied) {}
/** The client connection that called wait. */
ClientConnection *client_conn;
/** The ID of the timer that will time out and cause this wait to return to
@ -158,13 +174,13 @@ typedef struct {
/** The object requests for this wait request. Each object request has a
* status field which is either PLASMA_QUERY_LOCAL or PLASMA_QUERY_ANYWHERE.
*/
ObjectRequest *object_requests;
ObjectRequestMap object_requests;
/** The minimum number of objects to wait for in this request. */
int64_t num_objects_to_wait_for;
/** The number of object requests in this wait request that are already
* satisfied. */
int64_t num_satisfied;
} WaitRequest;
};
/** This is used to define the utarray of wait requests in the
* ObjectWaitRequests struct. */
@ -378,8 +394,7 @@ void remove_wait_request(PlasmaManagerState *manager_state,
CHECK(event_loop_remove_timer(manager_state->loop, wait_req->timer) ==
AE_OK);
}
free(wait_req->object_requests);
free(wait_req);
delete wait_req;
}
void return_from_wait(PlasmaManagerState *manager_state,
@ -389,12 +404,12 @@ void return_from_wait(PlasmaManagerState *manager_state,
wait_req->client_conn->fd, manager_state->builder,
wait_req->object_requests, wait_req->num_object_requests),
wait_req->client_conn->fd);
/* Remove the wait request from each of the relevant object_wait_requests hash
/* Iterate over all object IDs requested as part of this wait request.
* Remove the wait request from each of the relevant object_wait_requests hash
* tables if it is present there. */
for (int i = 0; i < wait_req->num_object_requests; ++i) {
remove_wait_request_for_object(manager_state,
wait_req->object_requests[i].object_id,
wait_req->object_requests[i].type, wait_req);
for (const auto &entry : wait_req->object_requests) {
remove_wait_request_for_object(manager_state, entry.second.object_id,
entry.second.type, wait_req);
}
/* Remove the wait request. */
remove_wait_request(manager_state, wait_req);
@ -425,18 +440,14 @@ void update_object_wait_requests(PlasmaManagerState *manager_state,
WaitRequest *wait_req = *wait_req_ptr;
wait_req->num_satisfied += 1;
/* Mark the object as present in the wait request. */
int j = 0;
for (; j < wait_req->num_object_requests; ++j) {
if (ObjectID_equal(wait_req->object_requests[j].object_id, obj_id)) {
/* Check that this object is currently nonexistent. */
CHECK(wait_req->object_requests[j].status ==
ObjectStatus_Nonexistent);
wait_req->object_requests[j].status = status;
break;
}
}
/* Make sure that we actually marked an object as available.*/
CHECK(j != wait_req->num_object_requests);
auto object_request = wait_req->object_requests.find(obj_id);
/* Check that we found the object. */
CHECK(object_request != wait_req->object_requests.end());
/* Check that the object found was not previously known to us. */
CHECK(object_request->second.status == ObjectStatus_Nonexistent);
/* Update the found object's status to a known status. */
object_request->second.status = status;
/* If this wait request is done, reply to the client. */
if (wait_req->num_satisfied == wait_req->num_objects_to_wait_for) {
return_from_wait(manager_state, wait_req);
@ -1144,27 +1155,16 @@ int wait_timeout_handler(event_loop *loop, timer_id id, void *context) {
void process_wait_request(ClientConnection *client_conn,
int num_object_requests,
ObjectRequest object_requests[],
ObjectRequestMap &&object_requests,
uint64_t timeout_ms,
int num_ready_objects) {
CHECK(client_conn != NULL);
PlasmaManagerState *manager_state = client_conn->manager_state;
/* Create a wait request for this object. */
WaitRequest *wait_req = (WaitRequest *) malloc(sizeof(WaitRequest));
memset(wait_req, 0, sizeof(WaitRequest));
wait_req->client_conn = client_conn;
wait_req->timer = -1;
wait_req->num_object_requests = num_object_requests;
wait_req->object_requests =
(ObjectRequest *) malloc(num_object_requests * sizeof(ObjectRequest));
for (int i = 0; i < num_object_requests; ++i) {
wait_req->object_requests[i].object_id = object_requests[i].object_id;
wait_req->object_requests[i].type = object_requests[i].type;
wait_req->object_requests[i].status = ObjectStatus_Nonexistent;
}
wait_req->num_objects_to_wait_for = num_ready_objects;
wait_req->num_satisfied = 0;
WaitRequest *wait_req =
new WaitRequest(client_conn, -1, num_object_requests,
std::move(object_requests), num_ready_objects, 0);
int num_object_ids_to_request = 0;
/* This is allocating more space than necessary, but we do not know the exact
@ -1172,25 +1172,26 @@ void process_wait_request(ClientConnection *client_conn,
ObjectID *object_ids_to_request =
(ObjectID *) malloc(num_object_requests * sizeof(ObjectID));
for (int i = 0; i < num_object_requests; ++i) {
ObjectID obj_id = object_requests[i].object_id;
for (auto &entry : wait_req->object_requests) {
auto &object_request = entry.second;
ObjectID obj_id = object_request.object_id;
/* Check if this object is already present locally. If so, mark the object
* as present. */
if (is_object_local(manager_state, obj_id)) {
wait_req->object_requests[i].status = ObjectStatus_Local;
object_request.status = ObjectStatus_Local;
wait_req->num_satisfied += 1;
continue;
}
/* Add the wait request to the relevant data structures. */
add_wait_request_for_object(manager_state, obj_id,
wait_req->object_requests[i].type, wait_req);
add_wait_request_for_object(manager_state, obj_id, object_request.type,
wait_req);
if (wait_req->object_requests[i].type == PLASMA_QUERY_LOCAL) {
if (object_request.type == PLASMA_QUERY_LOCAL) {
/* TODO(rkn): If desired, we could issue a fetch command here to retrieve
* the object. */
} else if (wait_req->object_requests[i].type == PLASMA_QUERY_ANYWHERE) {
} else if (object_request.type == PLASMA_QUERY_ANYWHERE) {
/* Add this object ID to the list of object IDs to request notifications
* for from the object table. */
object_ids_to_request[num_object_ids_to_request] = obj_id;
@ -1536,17 +1537,13 @@ void process_message(event_loop *loop,
case MessageType_PlasmaWaitRequest: {
LOG_DEBUG("Processing wait");
int num_object_ids = plasma_read_WaitRequest_num_object_ids(data);
ObjectRequest *object_requests =
(ObjectRequest *) malloc(num_object_ids * sizeof(ObjectRequest));
ObjectRequestMap object_requests;
int64_t timeout_ms;
int num_ready_objects;
plasma_read_WaitRequest(data, &object_requests[0], num_object_ids,
&timeout_ms, &num_ready_objects);
/* TODO(pcm): process_wait_requests allocates an array of num_object_ids
* object_requests too so these could be shared in the future. */
process_wait_request(conn, num_object_ids, &object_requests[0], timeout_ms,
num_ready_objects);
free(object_requests);
plasma_read_WaitRequest(data, object_requests, num_object_ids, &timeout_ms,
&num_ready_objects);
process_wait_request(conn, num_object_ids, std::move(object_requests),
timeout_ms, num_ready_objects);
} break;
case MessageType_PlasmaStatusRequest: {
LOG_DEBUG("Processing status");

View file

@ -498,7 +498,7 @@ int plasma_read_WaitRequest_num_object_ids(uint8_t *data) {
}
void plasma_read_WaitRequest(uint8_t *data,
ObjectRequest object_requests[],
ObjectRequestMap &object_requests,
int num_object_ids,
int64_t *timeout_ms,
int *num_ready_objects) {
@ -509,23 +509,26 @@ void plasma_read_WaitRequest(uint8_t *data,
CHECK(num_object_ids == message->object_requests()->size());
for (int i = 0; i < num_object_ids; i++) {
object_requests[i].object_id =
ObjectID object_id =
from_flatbuf(message->object_requests()->Get(i)->object_id());
object_requests[i].type = message->object_requests()->Get(i)->type();
ObjectRequest object_request({object_id,
message->object_requests()->Get(i)->type(),
ObjectStatus_Nonexistent});
object_requests[object_id] = object_request;
}
}
int plasma_send_WaitReply(int sock,
protocol_builder *B,
ObjectRequest object_requests[],
const ObjectRequestMap &object_requests,
int num_ready_objects) {
flatbuffers::FlatBufferBuilder fbb(FLATBUFFER_BUILDER_DEFAULT_SIZE);
std::vector<flatbuffers::Offset<ObjectReply>> object_replies;
for (int i = 0; i < num_ready_objects; i++) {
object_replies.push_back(
CreateObjectReply(fbb, to_flatbuf(fbb, object_requests[i].object_id),
object_requests[i].status));
for (const auto &entry : object_requests) {
const auto &object_request = entry.second;
object_replies.push_back(CreateObjectReply(
fbb, to_flatbuf(fbb, object_request.object_id), object_request.status));
}
auto message = CreatePlasmaWaitReply(

View file

@ -207,14 +207,14 @@ int plasma_send_WaitRequest(int sock,
int plasma_read_WaitRequest_num_object_ids(uint8_t *data);
void plasma_read_WaitRequest(uint8_t *data,
ObjectRequest object_requests[],
ObjectRequestMap &object_requests,
int num_object_ids,
int64_t *timeout_ms,
int *num_ready_objects);
int plasma_send_WaitReply(int sock,
protocol_builder *B,
ObjectRequest object_requests[],
const ObjectRequestMap &object_requests,
int num_ready_objects);
void plasma_read_WaitReply(uint8_t *data,

View file

@ -48,20 +48,6 @@ void dlfree(void *);
size_t dlmalloc_set_footprint_limit(size_t bytes);
}
namespace std {
template <>
struct hash<UniqueID> {
size_t operator()(const UniqueID &unique_id) const {
return *reinterpret_cast<const size_t *>(unique_id.id + UNIQUE_ID_SIZE -
sizeof(size_t));
}
};
} // namespace std
bool operator==(UniqueID a, UniqueID b) {
return UNIQUE_ID_EQ(a, b);
}
/** Contains all information that is associated with a Plasma store client. */
struct Client {
Client(int sock, PlasmaStoreState *plasma_state);
@ -117,7 +103,8 @@ struct PlasmaStoreState {
event_loop *loop;
/** A hash table mapping object IDs to a vector of the get requests that are
* waiting for the object to arrive. */
std::unordered_map<ObjectID, std::vector<GetRequest *>> object_get_requests;
std::unordered_map<ObjectID, std::vector<GetRequest *>, UniqueIDHasher>
object_get_requests;
/** The pending notifications that have not been sent to subscribers because
* the socket send buffers were full. This is a hash table from client file

View file

@ -315,30 +315,35 @@ TEST plasma_fetch_request_test(void) {
TEST plasma_wait_request_test(void) {
int fd = create_temp_file();
ObjectRequest object_requests[2];
object_requests[0].object_id = globally_unique_id();
object_requests[0].type = PLASMA_QUERY_ANYWHERE;
object_requests[1].object_id = globally_unique_id();
object_requests[1].type = PLASMA_QUERY_LOCAL;
int num_ready_objects = 1;
const int num_objects_in = 2;
ObjectRequest object_requests_in[num_objects_in] = {
ObjectRequest({globally_unique_id(), PLASMA_QUERY_ANYWHERE, 0}),
ObjectRequest({globally_unique_id(), PLASMA_QUERY_LOCAL, 0})};
const int num_ready_objects_in = 1;
int64_t timeout_ms = 1000;
plasma_send_WaitRequest(fd, g_B, object_requests, 2, num_ready_objects,
timeout_ms);
plasma_send_WaitRequest(fd, g_B, &object_requests_in[0], num_objects_in,
num_ready_objects_in, timeout_ms);
/* Read message back. */
uint8_t *data = read_message_from_file(fd, MessageType_PlasmaWaitRequest);
ObjectRequest object_requests_read[2];
int num_object_ids_read = plasma_read_WaitRequest_num_object_ids(data);
ASSERT_EQ(num_object_ids_read, 2);
int num_ready_objects_read;
int num_object_ids_out = plasma_read_WaitRequest_num_object_ids(data);
ASSERT_EQ(num_object_ids_out, num_objects_in);
int num_ready_objects_out;
int64_t timeout_ms_read;
plasma_read_WaitRequest(data, &object_requests_read[0], num_object_ids_read,
&timeout_ms_read, &num_ready_objects_read);
ASSERT(ObjectID_equal(object_requests[0].object_id,
object_requests_read[0].object_id));
ASSERT(ObjectID_equal(object_requests[1].object_id,
object_requests_read[1].object_id));
ASSERT(object_requests[0].type == object_requests_read[0].type);
ASSERT(object_requests[1].type == object_requests_read[1].type);
ObjectRequestMap object_requests_out;
plasma_read_WaitRequest(data, object_requests_out, num_object_ids_out,
&timeout_ms_read, &num_ready_objects_out);
ASSERT_EQ(num_objects_in, object_requests_out.size());
ASSERT_EQ(num_ready_objects_out, num_ready_objects_in);
for (int i = 0; i < num_objects_in; i++) {
const ObjectID &object_id = object_requests_in[i].object_id;
ASSERT_EQ(1, object_requests_out.count(object_id));
const auto &entry = object_requests_out.find(object_id);
ASSERT(entry != object_requests_out.end());
ASSERT(ObjectID_equal(entry->second.object_id,
object_requests_in[i].object_id));
ASSERT_EQ(entry->second.type, object_requests_in[i].type);
}
free(data);
close(fd);
PASS();
@ -346,24 +351,29 @@ TEST plasma_wait_request_test(void) {
TEST plasma_wait_reply_test(void) {
int fd = create_temp_file();
ObjectRequest object_replies1[2];
object_replies1[0].object_id = globally_unique_id();
object_replies1[0].status = ObjectStatus_Local;
object_replies1[1].object_id = globally_unique_id();
object_replies1[1].status = ObjectStatus_Nonexistent;
int num_ready_objects1 = 2;
plasma_send_WaitReply(fd, g_B, object_replies1, num_ready_objects1);
const int num_objects_in = 2;
/* Create a map with two ObjectRequests in it. */
ObjectRequestMap objects_in(num_objects_in);
ObjectID id1 = globally_unique_id();
objects_in[id1] = ObjectRequest({id1, 0, ObjectStatus_Local});
ObjectID id2 = globally_unique_id();
objects_in[id2] = ObjectRequest({id2, 0, ObjectStatus_Nonexistent});
plasma_send_WaitReply(fd, g_B, objects_in, num_objects_in);
/* Read message back. */
uint8_t *data = read_message_from_file(fd, MessageType_PlasmaWaitReply);
ObjectRequest object_replies2[2];
int num_ready_objects_read2;
plasma_read_WaitReply(data, &object_replies2[0], &num_ready_objects_read2);
ASSERT(ObjectID_equal(object_replies1[0].object_id,
object_replies2[0].object_id));
ASSERT(ObjectID_equal(object_replies1[1].object_id,
object_replies2[1].object_id));
ASSERT(object_replies1[0].status == object_replies2[0].status);
ASSERT(object_replies1[1].status == object_replies2[1].status);
ObjectRequest objects_out[2];
int num_objects_out;
plasma_read_WaitReply(data, &objects_out[0], &num_objects_out);
ASSERT(num_objects_in == num_objects_out);
for (int i = 0; i < num_objects_out; i++) {
/* Each object request must appear exactly once. */
ASSERT(1 == objects_in.count(objects_out[i].object_id));
const auto &entry = objects_in.find(objects_out[i].object_id);
ASSERT(entry != objects_in.end());
ASSERT(ObjectID_equal(entry->second.object_id, objects_out[i].object_id));
ASSERT(entry->second.status == objects_out[i].status);
}
free(data);
close(fd);
PASS();