From f69d4aaaa7628958d42d95394575e87f8c9e47eb Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 1 Feb 2017 12:21:52 -0800 Subject: [PATCH] Change fetch requests in plasma manager to use a single timer. (#234) * Change fetch requests in plasma manager to use a single timer. * Fix manager tests, other cleanups. --- src/plasma/plasma_manager.c | 38 ++++++++++++--------------------- src/plasma/plasma_manager.h | 10 ++++++++- src/plasma/test/manager_tests.c | 5 +++++ 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index cab45292d..32b848160 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -111,11 +111,6 @@ typedef struct { typedef struct { /** The ID of the object we are fetching or waiting for. */ object_id object_id; - /** The plasma manager state. */ - plasma_manager_state *manager_state; - /** The ID for the timer that will time out the current request to the state - * database or another plasma manager. */ - int64_t timer; /** Pointer to the array containing the manager locations of this object. This * struct owns and must free each entry. */ char **manager_vector; @@ -431,9 +426,7 @@ void update_object_wait_requests(plasma_manager_state *manager_state, fetch_request *create_fetch_request(plasma_manager_state *manager_state, object_id object_id) { fetch_request *fetch_req = malloc(sizeof(fetch_request)); - fetch_req->manager_state = manager_state; fetch_req->object_id = object_id; - fetch_req->timer = -1; fetch_req->manager_count = 0; fetch_req->manager_vector = NULL; return fetch_req; @@ -443,11 +436,6 @@ void remove_fetch_request(plasma_manager_state *manager_state, fetch_request *fetch_req) { /* Remove the fetch request from the table of fetch requests. */ HASH_DELETE(hh, manager_state->fetch_requests, fetch_req); - /* Remove the timer associated with this fetch request. */ - if (fetch_req->timer != -1) { - CHECK(event_loop_remove_timer(manager_state->loop, fetch_req->timer) == - AE_OK); - } /* Free the fetch request and everything in it. */ for (int i = 0; i < fetch_req->manager_count; ++i) { free(fetch_req->manager_vector[i]); @@ -526,7 +514,7 @@ void destroy_plasma_manager_state(plasma_manager_state *state) { if (state->fetch_requests != NULL) { fetch_request *fetch_req, *tmp; HASH_ITER(hh, state->fetch_requests, fetch_req, tmp) { - remove_fetch_request(fetch_req->manager_state, fetch_req); + remove_fetch_request(state, fetch_req); } } @@ -898,10 +886,15 @@ void request_transfer_from(plasma_manager_state *manager_state, fetch_req->next_manager %= fetch_req->manager_count; } -int manager_timeout_handler(event_loop *loop, timer_id id, void *context) { - fetch_request *fetch_req = context; - plasma_manager_state *manager_state = fetch_req->manager_state; - request_transfer_from(manager_state, fetch_req->object_id); +int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { + plasma_manager_state *manager_state = context; + /* Loop over the fetch requests and reissue the requests. */ + fetch_request *fetch_req, *tmp; + HASH_ITER(hh, manager_state->fetch_requests, fetch_req, tmp) { + if (fetch_req->manager_count > 0) { + request_transfer_from(manager_state, fetch_req->object_id); + } + } return MANAGER_TIMEOUT; } @@ -959,13 +952,6 @@ void request_transfer(object_id object_id, /* Wait for the object data for the default number of retries, which timeout * after a default interval. */ request_transfer_from(manager_state, object_id); - /* It is possible for this method to be called multiple times, but we only - * need to create a timer once. */ - if (fetch_req->timer == -1) { - fetch_req->timer = - event_loop_add_timer(manager_state->loop, MANAGER_TIMEOUT, - manager_timeout_handler, fetch_req); - } } /* This method is only called from the tests. */ @@ -1484,6 +1470,10 @@ void start_server(const char *store_socket_name, object_table_subscribe_to_notifications(g_manager_state->db, false, object_table_subscribe_callback, g_manager_state, NULL, NULL, NULL); + /* Set up a recurring timer that will loop through the outstanding fetch + * requests and reissue requests for transfers of those objects. */ + event_loop_add_timer(g_manager_state->loop, MANAGER_TIMEOUT, + fetch_timeout_handler, g_manager_state); /* Run the event loop. */ event_loop_run(g_manager_state->loop); } diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index 826e5b2a1..08df81456 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -185,7 +185,7 @@ struct plasma_request_buffer { * Call the request_transfer method, which well attempt to get an object from * a remote Plasma manager. If it is unable to get it from another Plasma * manager, it will cycle through a list of Plasma managers that have the - * object. + * object. This method is only called from the tests. * * @param object_id The object ID of the object to transfer. * @param manager_count The number of managers that have the object. @@ -198,6 +198,14 @@ void call_request_transfer(object_id object_id, const char *manager_vector[], void *context); +/* + * This runs periodically (every MANAGER_TIMEOUT milliseconds) and reissues + * transfer requests for all outstanding fetch requests. This is only exposed so + * that it can be called from the tests. + * + */ +int fetch_timeout_handler(event_loop *loop, timer_id id, void *context); + /** * Clean up and free an active object context. Deregister it from the * associated client connection and from the manager state. diff --git a/src/plasma/test/manager_tests.c b/src/plasma/test/manager_tests.c index a3e4fa74a..d1765a126 100644 --- a/src/plasma/test/manager_tests.c +++ b/src/plasma/test/manager_tests.c @@ -177,6 +177,11 @@ TEST request_transfer_retry_test(void) { free(manager_vector); event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT * 2, test_done_handler, local_mock->state); + /* Register the fetch timeout handler. This is normally done when the plasma + * manager is started. It is needed here so that retries will happen when + * fetch requests time out. */ + event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT, fetch_timeout_handler, + local_mock->state); event_loop_run(local_mock->loop); int read_fd = get_client_sock(remote_mock2->read_conn);