From 88206417cbc586eb82b2a3b5d92dd46f6c7e1681 Mon Sep 17 00:00:00 2001 From: atumanov Date: Wed, 7 Dec 2016 17:25:40 -0800 Subject: [PATCH] unifying plasma seal path through the store to mgr to redis (#96) --- src/plasma/plasma_client.c | 3 --- src/plasma/plasma_manager.c | 28 +++++++++++++--------------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index 39aa286d1..4d80fa44d 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -348,9 +348,6 @@ void plasma_seal(plasma_connection *conn, object_id object_id) { /* Send the seal request to Plasma. */ plasma_request req = plasma_make_request(object_id); CHECK(plasma_send_request(conn->store_conn, PLASMA_SEAL, &req) >= 0); - if (conn->manager_conn >= 0) { - CHECK(plasma_send_request(conn->manager_conn, PLASMA_SEAL, &req) >= 0); - } } void plasma_delete(plasma_connection *conn, object_id object_id) { diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index cbf2d3811..1b2b9eb39 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -1497,6 +1497,11 @@ void process_object_notification(event_loop *loop, int events) { plasma_manager_state *state = context; object_id obj_id; + retry_info retry = { + .num_retries = NUM_RETRIES, + .timeout = MANAGER_TIMEOUT, + .fail_callback = NULL, + }; /* Read the notification from Plasma. */ int error = read_bytes(client_sock, (uint8_t *) &obj_id, sizeof(obj_id)); if (error < 0) { @@ -1515,6 +1520,14 @@ void process_object_notification(event_loop *loop, entry->object_id = obj_id; HASH_ADD(hh, state->local_available_objects, object_id, sizeof(object_id), entry); + + /* Add this object to the (redis) object table. */ + if (state->db) { + /* TODO(swang): Log the error if we fail to add the object, and possibly + * retry later? */ + object_table_add(state->db, obj_id, &retry, NULL, NULL); + } + /* If we were trying to fetch this object, finish up the fetch request. */ fetch_request2 *fetch_req; HASH_FIND(hh, state->fetch_requests2, &obj_id, sizeof(obj_id), fetch_req); @@ -1615,21 +1628,6 @@ void process_message(event_loop *loop, process_fetch_or_status_request(conn, req->object_requests[0].object_id, false); break; - case PLASMA_SEAL: { - LOG_DEBUG("Publishing to object table from DB client %d.", - get_client_id(conn->manager_state->db)); - /* TODO(swang): Log the error if we fail to add the object, and possibly - * retry later? */ - retry_info retry = { - .num_retries = NUM_RETRIES, - .timeout = MANAGER_TIMEOUT, - .fail_callback = NULL, - }; - if (conn->manager_state->db) { - object_table_add(conn->manager_state->db, - req->object_requests[0].object_id, &retry, NULL, NULL); - } - } break; case DISCONNECT_CLIENT: { LOG_INFO("Disconnecting client on fd %d", client_sock); /* TODO(swang): Check if this connection was to a plasma manager. If so,