Implement plasma.wait (#12)

* implement wait

* fix wait documentation

* fixes

* More fixes.

* more fixes

* Update test to check if redis-server is present.
This commit is contained in:
Philipp Moritz 2016-10-29 17:30:34 -07:00 committed by Robert Nishihara
parent ee3718c80c
commit e6319986ec
8 changed files with 355 additions and 46 deletions

View file

@ -14,8 +14,10 @@ class PlasmaID(ctypes.Structure):
def make_plasma_id(string):
if len(string) != PLASMA_ID_SIZE:
raise Exception("PlasmaIDs must be {} characters long".format(PLASMA_ID_SIZE))
object_id = map(ord, string)
return PlasmaID(plasma_id=ID(*object_id))
return PlasmaID(plasma_id=ID.from_buffer_copy(string))
def plasma_id_to_str(plasma_id):
return str(bytearray(plasma_id.plasma_id))
class PlasmaBuffer(object):
"""This is the type of objects returned by calls to get with a PlasmaClient.
@ -217,6 +219,31 @@ class PlasmaClient(object):
success_array);
return [bool(success) for success in success_array]
def wait(self, object_ids, timeout, num_returns):
"""Wait until num_returns objects in object_ids are ready.
Args:
object_ids (List[str]): List of object IDs to wait for.
timeout (int): Return to the caller after timeout milliseconds.
num_returns (int): We are waiting for this number of objects to be ready.
Returns:
ready_ids, waiting_ids (List[str], List[str]): List of object IDs that
are ready and list of object IDs we might still wait on respectively.
"""
if not self.has_manager_conn:
raise Exception("Not connected to the plasma manager socket")
object_id_array = (len(object_ids) * PlasmaID)()
for i, object_id in enumerate(object_ids):
object_id_array[i] = make_plasma_id(object_id)
return_id_array = (num_returns * PlasmaID)()
num_return_objects = self.client.plasma_wait(self.plasma_conn,
object_id_array._length_,
object_id_array,
timeout, num_returns, return_id_array)
ready_ids = map(plasma_id_to_str, return_id_array[num_returns-num_return_objects:])
return ready_ids, list(set(object_ids) - set(ready_ids))
def subscribe(self):
"""Subscribe to notifications about sealed objects."""
fd = self.client.plasma_subscribe(self.plasma_conn)

View file

@ -62,6 +62,8 @@ enum plasma_message_type {
PLASMA_DATA,
/** Request a fetch of an object in another store. */
PLASMA_FETCH,
/** Wait until an object becomes available. */
PLASMA_WAIT
};
typedef struct {
@ -69,6 +71,10 @@ typedef struct {
int64_t data_size;
/** The size of the object's metadata. */
int64_t metadata_size;
/** The timeout of the request. */
uint64_t timeout;
/** The number of objects we wait for for wait. */
int num_returns;
/** In a transfer request, this is the IP address of the Plasma Manager to
* transfer the object to. */
uint8_t addr[4];
@ -82,8 +88,6 @@ typedef struct {
} plasma_request;
typedef struct {
/** The object ID that this reply refers to. */
object_id object_id;
/** The object that is returned with this reply. */
plasma_object object;
/** This is used only to respond to requests of type
@ -91,6 +95,12 @@ typedef struct {
* present and 0 otherwise. Used for plasma_contains and
* plasma_fetch. */
int has_object;
/** Number of object IDs a wait is returning. */
int num_objects_returned;
/** The number of object IDs that will be included in this reply. */
int num_object_ids;
/** The IDs of the objects that this reply refers to. */
object_id object_ids[1];
} plasma_reply;
#endif

View file

@ -302,6 +302,7 @@ int plasma_subscribe(plasma_connection *conn) {
* message because otherwise it seems to hang on Linux. */
char dummy = '\0';
send_fd(conn->store_conn, fd[1], &dummy, 1);
close(fd[1]);
/* Return the file descriptor that the client should use to read notifications
* about sealed objects. */
return fd[0];
@ -440,10 +441,12 @@ void plasma_fetch(plasma_connection *conn,
CHECK(nbytes == sizeof(reply));
success = reply.has_object;
}
CHECK(reply.num_object_ids == 1);
/* Update the correct index in is_fetched. */
int i = 0;
for (; i < num_object_ids; i++) {
if (memcmp(&object_ids[i], &reply.object_id, sizeof(object_id)) == 0) {
if (memcmp(&object_ids[i], &reply.object_ids[0], sizeof(object_id)) ==
0) {
/* Check that this isn't a duplicate response. */
CHECK(!is_fetched[i]);
is_fetched[i] = success;
@ -455,6 +458,30 @@ void plasma_fetch(plasma_connection *conn,
}
}
int plasma_wait(plasma_connection *conn,
int num_object_ids,
object_id object_ids[],
uint64_t timeout,
int num_returns,
object_id return_object_ids[]) {
CHECK(conn->manager_conn >= 0);
plasma_request *req =
make_plasma_multiple_request(num_object_ids, object_ids);
req->num_returns = num_returns;
req->timeout = timeout;
plasma_send_request(conn->manager_conn, PLASMA_WAIT, req);
free(req);
int64_t return_size =
sizeof(plasma_reply) + (num_returns - 1) * sizeof(object_id);
plasma_reply *reply = malloc(return_size);
int nbytes = recv(conn->manager_conn, (uint8_t *) reply, return_size, 0);
CHECK(nbytes == return_size);
memcpy(return_object_ids, reply->object_ids, num_returns * sizeof(object_id));
int num_objects_returned = reply->num_objects_returned;
free(reply);
return num_objects_returned;
}
int get_manager_fd(plasma_connection *conn) {
return conn->manager_conn;
}

View file

@ -196,6 +196,28 @@ void plasma_fetch(plasma_connection *conn,
object_id object_ids[],
int is_fetched[]);
/**
* Wait for objects to be created (right now, wait for local objects).
*
* @param conn The object containing the connection state.
* @param num_object_ids Number of object IDs wait is called on.
* @param object_ids Object IDs wait is called on.
* @param timeout Wait will time out and return after this number of ms.
* @param num_returns Number of object IDs wait will return if it doesn't time
* out.
* @param return_object_ids Out parameter for the object IDs returned by wait.
* This is an array of size num_returns. If the number of objects that
* are ready when we time out, the objects will be stored in the last
* slots of the array and the number of objects is returned.
* @return Number of objects that are actually ready.
*/
int plasma_wait(plasma_connection *conn,
int num_object_ids,
object_id object_ids[],
uint64_t timeout,
int num_returns,
object_id return_object_ids[]);
/**
* Subscribe to notifications when objects are sealed in the object store.
* Whenever an object is sealed, a message will be written to the client socket

View file

@ -11,6 +11,7 @@
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdbool.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -36,6 +37,14 @@
typedef struct client_object_connection client_object_connection;
/* Entry of the hashtable of objects that are available locally. */
typedef struct {
/** Object id of this object. */
object_id object_id;
/** Handle for the uthash table. */
UT_hash_handle hh;
} available_object;
struct plasma_manager_state {
/** Event loop. */
event_loop *loop;
@ -54,6 +63,8 @@ struct plasma_manager_state {
* object id, value is a list of connections to the clients
* who are blocking on a fetch of this object. */
client_object_connection *fetch_connections;
/** Initialize an empty hash map for the cache of local available object. */
available_object *local_available_objects;
};
plasma_manager_state *g_manager_state = NULL;
@ -106,6 +117,13 @@ struct client_connection {
/** File descriptor for the socket connected to the other
* plasma manager. */
int fd;
/** Timer id for timing out wait (or fetch). */
int64_t timer_id;
/** True if this client is in a "wait" and false if it is in a "fetch". */
bool is_wait;
/** If this client is processing a wait, this contains the object ids that
* are already available. */
plasma_reply *wait_reply;
/** The objects that we are waiting for and their callback
* contexts, for either a fetch or a wait operation. */
client_object_connection *active_objects;
@ -137,7 +155,8 @@ int send_client_reply(client_connection *conn, plasma_reply *reply) {
}
int send_client_failure_reply(object_id object_id, client_connection *conn) {
plasma_reply reply = {.object_id = object_id, .has_object = 0};
plasma_reply reply = {
.object_ids = {object_id}, .num_object_ids = 1, .has_object = 0};
return send_client_reply(conn, &reply);
}
@ -160,7 +179,6 @@ client_object_connection *get_object_connection(client_connection *client_conn,
client_object_connection *add_object_connection(client_connection *client_conn,
object_id object_id) {
/* TODO(swang): Support registration of wait operations. */
/* Create a new context for this client connection and object. */
client_object_connection *object_conn =
malloc(sizeof(client_object_connection));
@ -247,6 +265,13 @@ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name,
sscanf(manager_addr, "%hhu.%hhu.%hhu.%hhu", &state->addr[0], &state->addr[1],
&state->addr[2], &state->addr[3]);
state->port = manager_port;
/* Initialize an empty hash map for the cache of local available objects. */
state->local_available_objects = NULL;
/* Subscribe to notifications about sealed objects. */
int plasma_fd = plasma_subscribe(state->plasma_conn);
/* Add the callback that processes the notification to the event loop. */
event_loop_add_file(state->loop, plasma_fd, EVENT_LOOP_READ,
process_object_notification, state);
return state;
}
@ -409,23 +434,10 @@ void process_data_chunk(event_loop *loop,
/* Seal the object and release it. The release corresponds to the call to
* plasma_create that occurred in process_data_request. */
LOG_DEBUG("reading on channel %d finished", data_sock);
/* The following seal also triggers notification of clients for fetch or
* wait requests, see process_object_notification. */
plasma_seal(conn->manager_state->plasma_conn, buf->object_id);
plasma_release(conn->manager_state->plasma_conn, buf->object_id);
/* Notify any clients who were waiting on a fetch to this object. */
client_object_connection *object_conn, *next;
client_connection *client_conn;
HASH_FIND(fetch_hh, conn->manager_state->fetch_connections, &(buf->object_id),
sizeof(buf->object_id), object_conn);
plasma_reply reply = {.object_id = buf->object_id, .has_object = 1};
while (object_conn) {
next = object_conn->next;
client_conn = object_conn->client_conn;
send_client_reply(client_conn, &reply);
event_loop_remove_timer(client_conn->manager_state->loop,
object_conn->timer);
remove_object_connection(client_conn, object_conn);
object_conn = next;
}
/* Remove the request buffer used for reading this object's data. */
LL_DELETE(conn->transfer_queue, buf);
free(buf);
@ -586,10 +598,12 @@ int manager_timeout_handler(event_loop *loop, timer_id id, void *context) {
object_conn->num_retries--;
return MANAGER_TIMEOUT;
}
plasma_reply reply = {.object_id = object_conn->object_id, .has_object = 0};
plasma_reply reply = {.object_ids = {object_conn->object_id},
.num_object_ids = 1,
.has_object = 0};
send_client_reply(client_conn, &reply);
remove_object_connection(client_conn, object_conn);
return AE_NOMORE;
return EVENT_LOOP_TIMER_DONE;
}
/* TODO(swang): Consolidate transfer requests for same object
@ -614,6 +628,7 @@ void request_transfer(object_id object_id,
* register a Redis callback for changes to this object table entry. */
free(manager_vector);
send_client_failure_reply(object_id, client_conn);
remove_object_connection(client_conn, object_conn);
return;
}
/* Register the new outstanding fetch with the current client connection. */
@ -644,7 +659,9 @@ void request_transfer(object_id object_id,
void process_fetch_request(client_connection *client_conn,
object_id object_id) {
plasma_reply reply = {.object_id = object_id};
client_conn->is_wait = false;
client_conn->wait_reply = NULL;
plasma_reply reply = {.object_ids = {object_id}, .num_object_ids = 1};
if (client_conn->manager_state->db == NULL) {
reply.has_object = 0;
send_client_reply(client_conn, &reply);
@ -678,6 +695,118 @@ void process_fetch_requests(client_connection *client_conn,
}
}
void return_from_wait(client_connection *client_conn) {
CHECK(client_conn->is_wait);
int64_t size =
sizeof(plasma_reply) +
(client_conn->wait_reply->num_object_ids - 1) * sizeof(object_id);
client_conn->wait_reply->num_objects_returned =
client_conn->wait_reply->num_object_ids - client_conn->num_return_objects;
int n = write(client_conn->fd, (uint8_t *) client_conn->wait_reply, size);
CHECK(n == size);
free(client_conn->wait_reply);
/* Clean the remaining object connections. */
client_object_connection *object_conn, *tmp;
HASH_ITER(active_hh, client_conn->active_objects, object_conn, tmp) {
remove_object_connection(client_conn, object_conn);
}
}
int wait_timeout_handler(event_loop *loop, timer_id id, void *context) {
client_connection *client_conn = context;
CHECK(client_conn->timer_id == id);
return_from_wait(client_conn);
return EVENT_LOOP_TIMER_DONE;
}
void process_wait_request(client_connection *client_conn,
int num_object_ids,
object_id object_ids[],
uint64_t timeout,
int num_returns) {
plasma_manager_state *manager_state = client_conn->manager_state;
client_conn->num_return_objects = num_returns;
client_conn->is_wait = true;
client_conn->timer_id = event_loop_add_timer(
manager_state->loop, timeout, wait_timeout_handler, client_conn);
int64_t size = sizeof(plasma_reply) + (num_returns - 1) * sizeof(object_id);
client_conn->wait_reply = malloc(size);
memset(client_conn->wait_reply, 0, size);
client_conn->wait_reply->num_object_ids = num_returns;
for (int i = 0; i < num_object_ids; ++i) {
available_object *entry;
HASH_FIND(hh, manager_state->local_available_objects, &object_ids[i],
sizeof(object_id), entry);
if (entry) {
/* If an object id occurs twice in object_ids, this will count them twice.
* This might not be desirable behavior. */
client_conn->num_return_objects -= 1;
client_conn->wait_reply->object_ids[client_conn->num_return_objects] =
entry->object_id;
if (client_conn->num_return_objects == 0) {
event_loop_remove_timer(manager_state->loop, client_conn->timer_id);
return_from_wait(client_conn);
return;
}
} else {
add_object_connection(client_conn, object_ids[i]);
}
}
}
void process_object_notification(event_loop *loop,
int client_sock,
void *context,
int events) {
plasma_manager_state *state = context;
object_id obj_id;
/* Read the notification from Plasma. */
int n = recv(client_sock, &obj_id, sizeof(object_id), MSG_WAITALL);
if (n == 0) {
/* The store has closed the socket. */
LOG_DEBUG("The plasma store has closed the object notification socket.");
event_loop_remove_file(loop, client_sock);
close(client_sock);
return;
}
CHECK(n == sizeof(object_id));
/* Add object to locally available object. */
/* TODO(pcm): Where is this deallocated? */
available_object *entry =
(available_object *) malloc(sizeof(available_object));
entry->object_id = obj_id;
HASH_ADD(hh, state->local_available_objects, object_id, sizeof(object_id),
entry);
/* Notify any clients who were waiting on a fetch to this object and tick
* off objects we are waiting for. */
client_object_connection *object_conn, *next;
client_connection *client_conn;
HASH_FIND(fetch_hh, state->fetch_connections, &obj_id, sizeof(object_id),
object_conn);
plasma_reply reply = {
.object_ids = {obj_id}, .num_object_ids = 1, .has_object = 1};
while (object_conn) {
next = object_conn->next;
client_conn = object_conn->client_conn;
if (!client_conn->is_wait) {
event_loop_remove_timer(state->loop, object_conn->timer);
send_client_reply(client_conn, &reply);
} else {
client_conn->num_return_objects -= 1;
client_conn->wait_reply->object_ids[client_conn->num_return_objects] =
obj_id;
if (client_conn->num_return_objects == 0) {
event_loop_remove_timer(loop, client_conn->timer_id);
return_from_wait(client_conn);
object_conn = next;
continue;
}
}
remove_object_connection(client_conn, object_conn);
object_conn = next;
}
}
void process_message(event_loop *loop,
int client_sock,
void *context,
@ -703,6 +832,10 @@ void process_message(event_loop *loop,
LOG_DEBUG("Processing fetch");
process_fetch_requests(conn, req->num_object_ids, req->object_ids);
break;
case PLASMA_WAIT:
LOG_DEBUG("Processing wait");
process_wait_request(conn, req->num_object_ids, req->object_ids,
req->timeout, req->num_returns);
case PLASMA_SEAL:
LOG_DEBUG("Publishing to object table from DB client %d.",
get_client_id(conn->manager_state->db));

View file

@ -137,14 +137,47 @@ void process_fetch_request(client_connection *client_conn, object_id object_id);
*
* @param client_conn The connection context for the client that made the
* request.
* @param object_id_count The number of object IDs requested.
* @param num_object_ids The number of object IDs requested.
* @param object_ids[] The vector of object IDs requested.
* @return Void.
*/
void process_fetch_requests(client_connection *client_conn,
int object_id_count,
int num_object_ids,
object_id object_ids[]);
/**
* Process a wait request from a client.
*
* @param client_conn The connection context for the client that made the
* request.
* @param num_object_ids Number of object IDs wait is called on.
* @param object_ids Object IDs wait is called on.
* @param timeout Wait will time out and return after this number of
* milliseconds.
* @param num_returns Number of object IDs wait will return if it doesn't time
* out.
* @return Void.
*/
void process_wait_request(client_connection *client_conn,
int num_object_ids,
object_id object_ids[],
uint64_t timeout,
int num_returns);
/**
* Callback that will be called when a new object becomes available.
*
* @param loop This is the event loop of the plasma manager.
* @param client_sock The connection to the plasma store.
* @param context Plasma manager state.
* @param events (unused).
* @return Void.
*/
void process_object_notification(event_loop *loop,
int client_sock,
void *context,
int events);
/**
* Send the next request queued for the other plasma manager connected to the
* socket "data_sock". This could be a request to either write object data or
@ -237,6 +270,7 @@ void request_transfer(object_id object_id,
*
* @param client_conn The client connection context.
* @param object_id The object ID whose context we want to delete.
* @return Void.
*/
void remove_object_connection(client_connection *client_conn,
client_object_connection *object_conn);
@ -260,9 +294,9 @@ client_connection *get_manager_connection(plasma_manager_state *state,
*
* @param conn The connection to the client who's sending the data.
* @param buf The buffer to write the data into.
* @return An integer representing whether the client is done
* sending this object. 1 means that the client has
* sent all the data, 0 means there is more.
* @return An integer representing whether the client is done sending this
* object. 1 means that the client has sent all the data, 0 means there
* is more.
*/
int read_object_chunk(client_connection *conn, plasma_request_buffer *buf);

View file

@ -186,7 +186,8 @@ TEST request_transfer_timeout_test(void) {
int manager_fd = get_manager_fd(local_mock->plasma_conn);
int nbytes = recv(manager_fd, (uint8_t *) &reply, sizeof(reply), MSG_WAITALL);
ASSERT_EQ(nbytes, sizeof(reply));
ASSERT_EQ(memcmp(&oid, &reply.object_id, sizeof(object_id)), 0);
ASSERT_EQ(reply.num_object_ids, 1);
ASSERT_EQ(memcmp(&oid, &reply.object_ids, sizeof(object_id)), 0);
ASSERT_EQ(reply.has_object, 0);
/* Clean up. */
destroy_plasma_mock(remote_mock);

View file

@ -10,6 +10,7 @@ import unittest
import random
import time
import tempfile
import threading
import plasma
@ -52,6 +53,11 @@ def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffe
unit_test.assertEqual(client1.get_metadata(object_id)[:],
client2.get_metadata(object_id)[:])
# Check if the redis-server binary is present.
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis-3.2.3/src/redis-server")
if not os.path.exists(redis_path):
raise Exception("You do not have the redis-server binary. Run `make test` in the plasma directory to get it.")
class TestPlasmaClient(unittest.TestCase):
def setUp(self):
@ -226,17 +232,12 @@ class TestPlasmaManager(unittest.TestCase):
# Start a Redis server.
redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../common/thirdparty/redis-3.2.3/src/redis-server")
self.redis_process = None
manager_redis_args = []
if os.path.exists(redis_path):
redis_port = 6379
with open(os.devnull, 'w') as FNULL:
self.redis_process = subprocess.Popen([redis_path,
"--port", str(redis_port)],
stdout=FNULL)
time.sleep(0.1)
manager_redis_args = ["-r", "{addr}:{port}".format(addr="127.0.0.1",
port=redis_port)]
redis_port = 6379
with open(os.devnull, "w") as FNULL:
self.redis_process = subprocess.Popen([redis_path,
"--port", str(redis_port)],
stdout=FNULL)
time.sleep(0.1)
# Start two PlasmaManagers.
self.port1 = random.randint(10000, 50000)
@ -246,12 +247,16 @@ class TestPlasmaManager(unittest.TestCase):
"-s", store_name1,
"-m", manager_name1,
"-h", "127.0.0.1",
"-p", str(self.port1)] + manager_redis_args
"-p", str(self.port1),
"-r", "{addr}:{port}".format(addr="127.0.0.1",
port=redis_port)]
plasma_manager_command2 = [plasma_manager_executable,
"-s", store_name2,
"-m", manager_name2,
"-h", "127.0.0.1",
"-p", str(self.port2)] + manager_redis_args
"-p", str(self.port2),
"-r", "{addr}:{port}".format(addr="127.0.0.1",
port=redis_port)]
if USE_VALGRIND:
self.p4 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--error-exitcode=1"] + plasma_manager_command1)
@ -283,8 +288,7 @@ class TestPlasmaManager(unittest.TestCase):
self.p3.kill()
self.p4.kill()
self.p5.kill()
if self.redis_process:
self.redis_process.kill()
self.redis_process.kill()
# def test_fetch(self):
# if self.redis_process is None:
@ -341,6 +345,57 @@ class TestPlasmaManager(unittest.TestCase):
# assert_get_object_equal(self, self.client2, self.client1, object_id2,
# memory_buffer=memory_buffer2, metadata=metadata2)
def test_wait(self):
# Test timeout.
obj_id0 = random_object_id()
self.client1.wait([obj_id0], timeout=100, num_returns=1)
# If we get here, the test worked.
# Test wait if local objects available.
obj_id1 = random_object_id()
self.client1.create(obj_id1, 1000)
self.client1.seal(obj_id1)
ready, waiting = self.client1.wait([obj_id1], timeout=100, num_returns=1)
self.assertEqual(len(ready), 1)
self.assertEqual(ready[0], obj_id1)
self.assertEqual(len(waiting), 0)
# Test wait if only one object available and only one object waited for.
obj_id2 = random_object_id()
self.client1.create(obj_id2, 1000)
# Don't seal.
ready, waiting = self.client1.wait([obj_id2, obj_id1], timeout=100, num_returns=1)
self.assertEqual(len(ready), 1)
self.assertEqual(ready[0], obj_id1)
self.assertEqual(len(waiting), 1)
self.assertEqual(waiting[0], obj_id2)
# Test wait if object is sealed later.
obj_id3 = random_object_id()
def finish():
self.client2.create(obj_id3, 1000)
self.client2.seal(obj_id3)
self.client2.transfer("127.0.0.1", self.port1, obj_id3)
t = threading.Timer(0.1, finish)
t.start()
ready, waiting = self.client1.wait([obj_id3, obj_id2, obj_id1], timeout=500, num_returns=2)
self.assertEqual(len(ready), 2)
self.assertTrue((ready[0] == obj_id1 and ready[1] == obj_id3) or (ready[0] == obj_id3 and ready[1] == obj_id1))
self.assertEqual(len(waiting), 1)
self.assertTrue(waiting[0] == obj_id2)
# Test if the appropriate number of objects is shown if some objects are not ready
ready, wait = self.client1.wait([obj_id3, obj_id2, obj_id1], 100, 3)
self.assertEqual(len(ready), 2)
self.assertTrue((ready[0] == obj_id1 and ready[1] == obj_id3) or (ready[0] == obj_id3 and ready[1] == obj_id1))
self.assertEqual(len(waiting), 1)
self.assertTrue(waiting[0] == obj_id2)
# Don't forget to seal obj_id2.
self.client1.seal(obj_id2)
def test_transfer(self):
for _ in range(100):
# Create an object.