Remove n^2 algorithm in plasma get (#466)

Remove n^2 algorithm in plasma get.
This commit is contained in:
Philipp Moritz 2017-04-17 23:37:33 -07:00 committed by Robert Nishihara
parent 0189b09581
commit 8ac6c59931
5 changed files with 52 additions and 51 deletions

View file

@ -394,19 +394,20 @@ void plasma_read_GetRequest(uint8_t *data,
*timeout_ms = message->timeout_ms();
}
int plasma_send_GetReply(int sock,
protocol_builder *B,
ObjectID object_ids[],
PlasmaObject plasma_objects[],
int64_t num_objects) {
int plasma_send_GetReply(
int sock,
protocol_builder *B,
ObjectID object_ids[],
std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> &plasma_objects,
int64_t num_objects) {
flatbuffers::FlatBufferBuilder fbb(FLATBUFFER_BUILDER_DEFAULT_SIZE);
std::vector<PlasmaObjectSpec> objects;
for (int i = 0; i < num_objects; ++i) {
PlasmaObject *object = &plasma_objects[i];
const PlasmaObject &object = plasma_objects[object_ids[i]];
objects.push_back(PlasmaObjectSpec(
object->handle.store_fd, object->handle.mmap_size, object->data_offset,
object->data_size, object->metadata_offset, object->metadata_size));
object.handle.store_fd, object.handle.mmap_size, object.data_offset,
object.data_size, object.metadata_offset, object.metadata_size));
}
auto message = CreatePlasmaGetReply(
fbb, to_flatbuf(fbb, object_ids, num_objects),

View file

@ -76,11 +76,12 @@ void plasma_read_GetRequest(uint8_t *data,
int64_t *timeout_ms,
int64_t num_objects);
int plasma_send_GetReply(int sock,
protocol_builder *B,
ObjectID object_ids[],
PlasmaObject plasma_objects[],
int64_t num_objects);
int plasma_send_GetReply(
int sock,
protocol_builder *B,
ObjectID object_ids[],
std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> &plasma_objects,
int64_t num_objects);
void plasma_read_GetReply(uint8_t *data,
ObjectID object_ids[],

View file

@ -26,6 +26,7 @@
#include <poll.h>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "common.h"
@ -88,7 +89,7 @@ struct GetRequest {
std::vector<ObjectID> object_ids;
/** The object information for the objects in this request. This is used in
* the reply. */
std::vector<PlasmaObject> objects;
std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> objects;
/** The minimum number of objects to wait for in this request. */
int64_t num_objects_to_wait_for;
/** The number of object requests in this wait request that are already
@ -136,8 +137,11 @@ GetRequest::GetRequest(Client *client,
timer(-1),
object_ids(object_ids, object_ids + num_object_ids),
objects(num_object_ids),
num_objects_to_wait_for(num_object_ids),
num_satisfied(0) {}
num_satisfied(0) {
std::unordered_set<ObjectID, UniqueIDHasher> unique_ids(
object_ids, object_ids + num_object_ids);
num_objects_to_wait_for = unique_ids.size();
}
PlasmaStoreState::PlasmaStoreState(event_loop *loop, int64_t system_memory)
: loop(loop),
@ -324,15 +328,16 @@ void PlasmaObject_init(PlasmaObject *object, object_table_entry *entry) {
void return_from_get(PlasmaStoreState *store_state, GetRequest *get_req) {
/* Send the get reply to the client. */
int status = plasma_send_GetReply(
get_req->client->sock, store_state->builder, &get_req->object_ids[0],
&get_req->objects[0], get_req->object_ids.size());
int status = plasma_send_GetReply(get_req->client->sock, store_state->builder,
&get_req->object_ids[0], get_req->objects,
get_req->object_ids.size());
warn_if_sigpipe(status, get_req->client->sock);
/* If we successfully sent the get reply message to the client, then also send
* the file descriptors. */
if (status >= 0) {
/* Send all of the file descriptors for the present objects. */
for (PlasmaObject &object : get_req->objects) {
for (const auto &object_id : get_req->object_ids) {
PlasmaObject &object = get_req->objects[object_id];
/* We use the data size to indicate whether the object is present or not.
*/
if (object.data_size != -1) {
@ -374,27 +379,16 @@ void update_object_get_requests(PlasmaStoreState *store_state,
int num_requests = get_requests.size();
for (int i = 0; i < num_requests; ++i) {
GetRequest *get_req = get_requests[index];
int num_updated = 0;
for (int j = 0; j < get_req->num_objects_to_wait_for; ++j) {
object_table_entry *entry;
HASH_FIND(handle, store_state->plasma_store_info->objects, &obj_id,
sizeof(obj_id), entry);
CHECK(entry != NULL);
object_table_entry *entry;
HASH_FIND(handle, store_state->plasma_store_info->objects, &obj_id,
sizeof(obj_id), entry);
CHECK(entry != NULL);
if (ObjectID_equal(get_req->object_ids[j], obj_id)) {
PlasmaObject_init(&get_req->objects[j], entry);
num_updated += 1;
get_req->num_satisfied += 1;
/* Record the fact that this client will be using this object and will
* be responsible for releasing this object. */
add_client_to_object_clients(entry, get_req->client);
}
}
/* Check a few things just to be sure there aren't bugs. */
DCHECK(num_updated > 0);
if (num_updated > 1) {
LOG_WARN("A get request contained a duplicated object ID.");
}
PlasmaObject_init(&get_req->objects[obj_id], entry);
get_req->num_satisfied += 1;
/* Record the fact that this client will be using this object and will
* be responsible for releasing this object. */
add_client_to_object_clients(entry, get_req->client);
/* If this get request is done, reply to the client. */
if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
@ -438,18 +432,16 @@ void process_get_request(Client *client_context,
sizeof(obj_id), entry);
if (entry && entry->state == PLASMA_SEALED) {
/* Update the get request to take into account the present object. */
PlasmaObject_init(&get_req->objects[i], entry);
PlasmaObject_init(&get_req->objects[obj_id], entry);
get_req->num_satisfied += 1;
/* If necessary, record that this client is using this object. In the case
* where entry == NULL, this will be called from seal_object. */
add_client_to_object_clients(entry, client_context);
} else {
/* Add a placeholder plasma object to the get request to indicate that the
* object is not present. This will be parsed by the client. We memset it
* to 0 so valgrind doesn't complain. We set the data size to -1 to
* indicate that the object is not present. */
memset(&get_req->objects[i], 0, sizeof(get_req->objects[i]));
get_req->objects[i].data_size = -1;
* object is not present. This will be parsed by the client. We set the
* data size to -1 to indicate that the object is not present. */
get_req->objects[obj_id].data_size = -1;
/* Add the get request to the relevant data structures. */
add_get_request_for_object(plasma_state, obj_id, get_req);
}

View file

@ -152,9 +152,9 @@ TEST plasma_get_reply_test(void) {
ObjectID object_ids[2];
object_ids[0] = globally_unique_id();
object_ids[1] = globally_unique_id();
PlasmaObject plasma_objects[2];
plasma_objects[0] = random_plasma_object();
plasma_objects[1] = random_plasma_object();
std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> plasma_objects;
plasma_objects[object_ids[0]] = random_plasma_object();
plasma_objects[object_ids[1]] = random_plasma_object();
plasma_send_GetReply(fd, g_B, object_ids, plasma_objects, 2);
uint8_t *data = read_message_from_file(fd, MessageType_PlasmaGetReply);
int64_t num_objects = plasma_read_GetRequest_num_objects(data);
@ -165,9 +165,9 @@ TEST plasma_get_reply_test(void) {
num_objects);
ASSERT(ObjectID_equal(object_ids[0], object_ids_return[0]));
ASSERT(ObjectID_equal(object_ids[1], object_ids_return[1]));
ASSERT(memcmp(&plasma_objects[0], &plasma_objects_return[0],
ASSERT(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0],
sizeof(PlasmaObject)) == 0);
ASSERT(memcmp(&plasma_objects[1], &plasma_objects_return[1],
ASSERT(memcmp(&plasma_objects[object_ids[1]], &plasma_objects_return[1],
sizeof(PlasmaObject)) == 0);
free(data);
close(fd);

View file

@ -448,6 +448,13 @@ class APITest(unittest.TestCase):
ray.init(num_workers=0)
object_ids = [ray.put(i) for i in range(10)]
self.assertEqual(ray.get(object_ids), list(range(10)))
# Get a random choice of object IDs with duplicates.
indices = list(np.random.choice(range(10), 5))
indices += indices
results = ray.get([object_ids[i] for i in indices])
self.assertEqual(results, indices)
ray.worker.cleanup()
def testWait(self):