Replaced ObjectWaitRequests with unordered map (#990)

* Replaced ObjectWaitRequests with unordered map

* Pass C++ STL object by reference

* Formatting changes and typos.
This commit is contained in:
Peter Schafhalter 2017-09-28 15:29:26 -07:00 committed by Robert Nishihara
parent 19562f6ce5
commit 10027974b1

View file

@ -155,23 +155,22 @@ typedef struct {
/**
* There are fundamentally two data structures used for handling wait requests.
* There is the "wait_request" struct and the "ObjectWaitRequests" struct. A
* There is the "wait_request" struct and an unordered map.
* WaitRequest keeps track of all of the object IDs that a WaitRequest is
* waiting for. An ObjectWaitRequests struct keeps track of all of the
* WaitRequest structs that are waiting for a particular object iD. The
* PlasmaManagerState contains a hash table mapping object IDs to their
* coresponding ObjectWaitRequests structs.
* waiting for. The unordered map keeps track of all of the
* WaitRequest structs that are waiting for a particular object ID. The
* PlasmaManagerState owns and manages the unordered maps.
*
* These data structures are updated by several methods:
* - add_wait_request_for_object adds a WaitRequest to the
* ObjectWaitRequests struct corresponding to a particular object ID. This
* is called when a client calls plasma_wait.
* - remove_wait_request_for_object removes a WaitRequest from an
* ObjectWaitRequests struct. When a wait request returns, this method is
* called for all of the object IDs involved in that WaitRequest.
* - update_object_wait_requests removes an ObjectWaitRequests struct and
* does some processing for each WaitRequest involved in that
* ObjectWaitRequests struct.
* - add_wait_request_for_object adds a WaitRequest to the unordered map
* corresponding to a particular object ID. This is called when a client
* calls plasma_wait.
* - remove_wait_request_for_object removes a WaitRequest from an unordered
* map. When a wait request returns, this method is called for all of the
* object IDs involved in that WaitRequest.
* - update_object_wait_requests removes a vector of wait requests from the
* unordered map and does some processing for each WaitRequest involved in
* the vector.
*/
struct WaitRequest {
WaitRequest(ClientConnection *client_conn,
@ -205,16 +204,6 @@ struct WaitRequest {
int64_t num_satisfied;
};
typedef struct {
/** The ID of the object. This is used as a key in a hash table. */
ObjectID object_id;
/** An array of the wait requests involving this object ID. */
std::vector<WaitRequest *> wait_requests;
/** Handle for the uthash table in the manager state that keeps track of the
* wait requests involving this object ID. */
UT_hash_handle hh;
} ObjectWaitRequests;
struct PlasmaManagerState {
/** Event loop. */
event_loop *loop;
@ -232,12 +221,16 @@ struct PlasmaManagerState {
/** Unordered map of outstanding fetch requests. The key is the object ID. The
* value is the data needed to perform the fetch. */
std::unordered_map<ObjectID, FetchRequest *, UniqueIDHasher> fetch_requests;
/** A hash table mapping object IDs to a vector of the wait requests that
* are waiting for the object to arrive locally. */
ObjectWaitRequests *object_wait_requests_local;
/** A hash table mapping object IDs to a vector of the wait requests that
* are waiting for the object to be available somewhere in the system. */
ObjectWaitRequests *object_wait_requests_remote;
/** Unordered map of outstanding wait requests. The key is the object ID. The
* value is the vector of wait requests that are waiting for the object to
* arrive locally. */
std::unordered_map<ObjectID, std::vector<WaitRequest *>, UniqueIDHasher>
object_wait_requests_local;
/** Unordered map of outstanding wait requests. The key is the object ID. The
* value is the vector of wait requests that are waiting for the object to
* be available somewhere in the system. */
std::unordered_map<ObjectID, std::vector<WaitRequest *>, UniqueIDHasher>
object_wait_requests_remote;
/** Initialize an empty unordered set for the cache of local available object.
*/
std::unordered_set<ObjectID, UniqueIDHasher> local_available_objects;
@ -342,14 +335,13 @@ void object_table_subscribe_callback(ObjectID object_id,
const char *manager_vector[],
void *context);
ObjectWaitRequests **object_wait_requests_table_ptr_from_type(
PlasmaManagerState *manager_state,
int type) {
std::unordered_map<ObjectID, std::vector<WaitRequest *>, UniqueIDHasher> &
object_wait_requests_from_type(PlasmaManagerState *manager_state, int type) {
/* We use different types of hash tables for different requests. */
if (type == plasma::PLASMA_QUERY_LOCAL) {
return &manager_state->object_wait_requests_local;
return manager_state->object_wait_requests_local;
} else if (type == plasma::PLASMA_QUERY_ANYWHERE) {
return &manager_state->object_wait_requests_remote;
return manager_state->object_wait_requests_remote;
} else {
LOG_FATAL("This code should be unreachable.");
}
@ -359,49 +351,35 @@ void add_wait_request_for_object(PlasmaManagerState *manager_state,
ObjectID object_id,
int type,
WaitRequest *wait_req) {
ObjectWaitRequests **object_wait_requests_table_ptr =
object_wait_requests_table_ptr_from_type(manager_state, type);
ObjectWaitRequests *object_wait_reqs;
HASH_FIND(hh, *object_wait_requests_table_ptr, &object_id, sizeof(object_id),
object_wait_reqs);
/* If there are currently no wait requests involving this object ID, create a
* new ObjectWaitRequests struct for this object ID and add it to the hash
* table. */
if (object_wait_reqs == NULL) {
object_wait_reqs = new ObjectWaitRequests();
object_wait_reqs->object_id = object_id;
HASH_ADD(hh, *object_wait_requests_table_ptr, object_id,
sizeof(object_wait_reqs->object_id), object_wait_reqs);
}
auto &object_wait_requests =
object_wait_requests_from_type(manager_state, type);
auto object_wait_requests_it = object_wait_requests.find(object_id);
/* Add this wait request to the vector of wait requests involving this object
* ID. Creates a vector of wait requests if none exist involving the object
* ID. */
object_wait_reqs->wait_requests.push_back(wait_req);
object_wait_requests[object_id].push_back(wait_req);
}
void remove_wait_request_for_object(PlasmaManagerState *manager_state,
ObjectID object_id,
int type,
WaitRequest *wait_req) {
ObjectWaitRequests **object_wait_requests_table_ptr =
object_wait_requests_table_ptr_from_type(manager_state, type);
ObjectWaitRequests *object_wait_reqs;
HASH_FIND(hh, *object_wait_requests_table_ptr, &object_id, sizeof(object_id),
object_wait_reqs);
auto &object_wait_requests =
object_wait_requests_from_type(manager_state, type);
auto object_wait_requests_it = object_wait_requests.find(object_id);
/* If there is a vector of wait requests for this object ID, and if this
* vector contains the wait request, then remove the wait request from the
* vector. */
if (object_wait_reqs != NULL) {
for (int i = 0; i < object_wait_reqs->wait_requests.size(); ++i) {
if (object_wait_reqs->wait_requests[i] == wait_req) {
if (object_wait_requests_it != object_wait_requests.end()) {
std::vector<WaitRequest *> &wait_requests = object_wait_requests_it->second;
for (int i = 0; i < wait_requests.size(); ++i) {
if (wait_requests[i] == wait_req) {
/* Remove the wait request from the array. */
object_wait_reqs->wait_requests.erase(
object_wait_reqs->wait_requests.begin() + i);
wait_requests.erase(wait_requests.begin() + i);
break;
}
}
/* In principle, if there are no more wait requests involving this object
* ID, then we could remove the object_wait_reqs struct. However, the
* object_wait_reqs struct gets removed in update_object_wait_requests. */
}
}
@ -422,8 +400,8 @@ void return_from_wait(PlasmaManagerState *manager_state,
wait_req->num_object_requests),
wait_req->client_conn->fd);
/* Iterate over all object IDs requested as part of this wait request.
* Remove the wait request from each of the relevant object_wait_requests hash
* tables if it is present there. */
* Remove the wait request from each of the relevant object_wait_requests maps
* if it is present there. */
for (const auto &entry : wait_req->object_requests) {
remove_wait_request_for_object(manager_state, entry.second.object_id,
entry.second.type, wait_req);
@ -436,23 +414,22 @@ void update_object_wait_requests(PlasmaManagerState *manager_state,
ObjectID obj_id,
int type,
int status) {
ObjectWaitRequests **object_wait_requests_table_ptr =
object_wait_requests_table_ptr_from_type(manager_state, type);
auto &object_wait_requests =
object_wait_requests_from_type(manager_state, type);
/* Update the in-progress wait requests in the specified table. */
ObjectWaitRequests *object_wait_reqs;
HASH_FIND(hh, *object_wait_requests_table_ptr, &obj_id, sizeof(obj_id),
object_wait_reqs);
if (object_wait_reqs != NULL) {
auto object_wait_requests_it = object_wait_requests.find(obj_id);
if (object_wait_requests_it != object_wait_requests.end()) {
/* We compute the number of requests first because the length of the vector
* will change as we iterate over it (because each call to return_from_wait
* will remove one element). */
int num_requests = object_wait_reqs->wait_requests.size();
std::vector<WaitRequest *> &wait_requests = object_wait_requests_it->second;
int num_requests = wait_requests.size();
/* The argument index is the index of the current element of the vector
* that we are processing. It may differ from the counter i when elements
* are removed from the array. */
int index = 0;
for (int i = 0; i < num_requests; ++i) {
WaitRequest *wait_req = object_wait_reqs->wait_requests[index];
WaitRequest *wait_req = wait_requests[index];
wait_req->num_satisfied += 1;
/* Mark the object as present in the wait request. */
auto object_request =
@ -473,11 +450,10 @@ void update_object_wait_requests(PlasmaManagerState *manager_state,
index += 1;
}
}
DCHECK(index == object_wait_reqs->wait_requests.size());
DCHECK(index == wait_requests.size());
/* Remove the array of wait requests for this object, since no one should be
* waiting for this object anymore. */
HASH_DELETE(hh, *object_wait_requests_table_ptr, object_wait_reqs);
delete object_wait_reqs;
object_wait_requests.erase(object_wait_requests_it);
}
}
@ -523,8 +499,6 @@ PlasmaManagerState *PlasmaManagerState_init(const char *store_socket_name,
ARROW_CHECK_OK(state->plasma_conn->Connect(store_socket_name, "",
PLASMA_DEFAULT_RELEASE_DELAY));
state->manager_connections = NULL;
state->object_wait_requests_local = NULL;
state->object_wait_requests_remote = NULL;
if (redis_primary_addr) {
/* Get the manager port as a string. */
std::string manager_address_str =
@ -585,16 +559,6 @@ void PlasmaManagerState_free(PlasmaManagerState *state) {
it = next_it;
}
ObjectWaitRequests *wait_reqs, *tmp_wait_reqs;
HASH_ITER(hh, state->object_wait_requests_local, wait_reqs, tmp_wait_reqs) {
HASH_DELETE(hh, state->object_wait_requests_local, wait_reqs);
delete wait_reqs;
}
HASH_ITER(hh, state->object_wait_requests_remote, wait_reqs, tmp_wait_reqs) {
HASH_DELETE(hh, state->object_wait_requests_remote, wait_reqs);
delete wait_reqs;
}
ARROW_CHECK_OK(state->plasma_conn->Disconnect());
delete state->plasma_conn;
event_loop_destroy(state->loop);