diff --git a/src/common/state/redis.c b/src/common/state/redis.c index c0449b455..ab3bcd60e 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -116,6 +116,19 @@ typedef struct { do { \ } while (0) +/** + * A data structure to keep track of object IDs when doing object table + * lookups. + * TODO(swang): Remove this when we integrate a Redis module implementation. + */ +typedef struct { + /** The timer ID that uniquely identifies this table operation. All retry + * attempts of a table operation share the same timer ID. */ + int64_t timer_id; + /** The object ID that the request was for. */ + object_id object_id; +} object_table_get_entry_info; + db_handle *db_connect(const char *address, int port, const char *client_type, @@ -408,9 +421,13 @@ void redis_object_table_lookup(table_callback_data *callback_data) { /* Call redis asynchronously */ object_id id = callback_data->id; + object_table_get_entry_info *context = + malloc(sizeof(object_table_get_entry_info)); + context->timer_id = callback_data->timer_id; + context->object_id = id; int status = redisAsyncCommand(db->context, redis_object_table_get_entry, - (void *) callback_data->timer_id, - "SMEMBERS obj:%b", id.id, sizeof(id.id)); + (void *) context, "SMEMBERS obj:%b", id.id, + sizeof(id.id)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "error in object_table lookup"); } @@ -547,6 +564,14 @@ void redis_get_cached_db_client(db_handle *db, void redis_object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) { + /* TODO(swang): This is a hack to pass the callback the original object ID + * argument. Remove once we're ready to integrate the Redis module + * implementation. */ + object_table_get_entry_info *context = privdata; + privdata = (void *) context->timer_id; + object_id id = context->object_id; + free(context); + REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; @@ -566,7 +591,7 @@ void redis_object_table_get_entry(redisAsyncContext *c, object_table_lookup_done_callback done_callback = callback_data->done_callback; if (done_callback) { - done_callback(callback_data->id, manager_count, manager_vector, + done_callback(id, manager_count, manager_vector, callback_data->user_context); } @@ -577,7 +602,7 @@ void redis_object_table_get_entry(redisAsyncContext *c, sub_data->object_available_callback; if (manager_count > 0) { if (sub_callback) { - sub_callback(callback_data->id, manager_count, manager_vector, + sub_callback(id, manager_count, manager_vector, sub_data->subscribe_context); } } @@ -604,16 +629,68 @@ void object_table_redis_subscribe_callback(redisAsyncContext *c, void *privdata) { REDIS_CALLBACK_HEADER(db, callback_data, r); redisReply *reply = r; - CHECK(reply->type == REDIS_REPLY_ARRAY); - /* First entry is message type, second is topic, third is payload. */ CHECK(reply->elements > 2); + /* Parse the message. First entry is message type, either "subscribe", + * "psubscribe", "message" or "pmessage". If the message type was "pmessage", + * there is an additional next message that is the pattern that the client + * PSUBSCRIBEd to. The next message is the topic published to. The final + * message is always the payload. */ + object_id id = NIL_ID; + redisReply *message_type = reply->element[0]; + if (strcmp(message_type->str, "message") == 0) { + /* A SUBSCRIBE notification. */ + DCHECK(!IS_NIL_ID(callback_data->id)); + DCHECK(reply->elements == 3); + + /* Take the object ID from the original table operation call. */ + id = callback_data->id; + } else if (strcmp(message_type->str, "pmessage") == 0) { + /* A PSUBSCRIBE notification. */ + DCHECK(IS_NIL_ID(callback_data->id)); + DCHECK(reply->elements == 4); + + /* Parse the object ID from the keyspace. */ + redisReply *keyspace = reply->element[2]; + char format[32]; + snprintf(format, 32, "__keyspace@0__:obj:%%%ldc", sizeof(object_id)); + int scanned = sscanf(keyspace->str, format, &id); + DCHECK(scanned == 1); + } else if (strcmp(message_type->str, "subscribe") == 0) { + /* The reply for the initial SUBSCRIBE. */ + DCHECK(reply->elements == 3); + /* Take the object ID from the original table operation call. */ + id = callback_data->id; + } else if (strcmp(message_type->str, "psubscribe") == 0) { + /* The reply for the initial PSUBSCRIBE. */ + DCHECK(reply->elements == 3); + /* If the initial PSUBSCRIBE was successful, call the done callback with a + * NIL object ID to notify the client, and clean up the timer. */ + object_table_lookup_done_callback done_callback = + callback_data->done_callback; + if (done_callback) { + done_callback(NIL_ID, 0, NULL, callback_data->user_context); + } + event_loop_remove_timer(callback_data->db_handle->loop, + callback_data->timer_id); + callback_data->done_callback = NULL; + /* For PSUBSCRIBEs, always return before doing the lookup for the data, + * since we don't know what key to lookup yet. */ + return; + } else { + LOG_FATAL("Unexpected reply type from object table subscribe"); + } + /* Do a lookup for the actual data. */ + CHECK(!IS_NIL_ID(id)); + object_table_get_entry_info *context = + malloc(sizeof(object_table_get_entry_info)); + context->timer_id = callback_data->timer_id; + context->object_id = id; int status = redisAsyncCommand(db->context, redis_object_table_get_entry, - (void *) callback_data->timer_id, "SMEMBERS obj:%b", - callback_data->id.id, sizeof(callback_data->id.id)); + (void *) context, "SMEMBERS obj:%b", id.id, sizeof(id)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_ERROR(db->context, "error in redis_object_table_subscribe_callback"); diff --git a/src/common/test/object_table_tests.c b/src/common/test/object_table_tests.c index e03e8999b..0d896df3e 100644 --- a/src/common/test/object_table_tests.c +++ b/src/common/test/object_table_tests.c @@ -638,6 +638,7 @@ TEST subscribe_late_test(void) { const char *subscribe_success_context = "subscribe_success"; int subscribe_success_done = 0; int subscribe_success_succeeded = 0; +object_id subscribe_id; void subscribe_success_fail_callback(unique_id id, void *user_context, @@ -650,6 +651,7 @@ void subscribe_success_done_callback(object_id object_id, int manager_count, const char *manager_vector[], void *user_context) { + CHECK(object_ids_equal(object_id, subscribe_id)); retry_info retry = { .num_retries = 0, .timeout = 750, .fail_callback = NULL, }; @@ -663,6 +665,8 @@ void subscribe_success_object_available_callback(object_id object_id, const char *manager_vector[], void *user_context) { CHECK(user_context == (void *) subscribe_success_context); + CHECK(object_ids_equal(object_id, subscribe_id)); + CHECK(manager_count == 1); subscribe_success_succeeded = 1; } @@ -671,14 +675,15 @@ TEST subscribe_success_test(void) { db_handle *db = db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); db_attach(db, g_loop, false); - unique_id id = globally_unique_id(); + subscribe_id = globally_unique_id(); retry_info retry = { .num_retries = 0, .timeout = 100, .fail_callback = subscribe_success_fail_callback, }; - object_table_subscribe(db, id, subscribe_success_object_available_callback, + object_table_subscribe(db, subscribe_id, + subscribe_success_object_available_callback, (void *) subscribe_success_context, &retry, subscribe_success_done_callback, (void *) db); @@ -697,6 +702,68 @@ TEST subscribe_success_test(void) { PASS(); } +/* === Test psubscribe object available succeed === */ + +const char *psubscribe_success_context = "psubscribe_success"; +int psubscribe_success_done = 0; +int psubscribe_success_succeeded = 0; +object_id psubscribe_id; + +void psubscribe_success_done_callback(object_id callback_object_id, + int manager_count, + const char *manager_vector[], + void *user_context) { + CHECK(IS_NIL_ID(callback_object_id)); + retry_info retry = { + .num_retries = 0, .timeout = 750, .fail_callback = NULL, + }; + object_table_add((db_handle *) user_context, psubscribe_id, 0, + (unsigned char *) NIL_DIGEST, &retry, NULL, NULL); + psubscribe_success_done = 1; +} + +void psubscribe_success_object_available_callback(object_id object_id, + int manager_count, + const char *manager_vector[], + void *user_context) { + CHECK(user_context == (void *) psubscribe_success_context); + CHECK(object_ids_equal(object_id, psubscribe_id)); + CHECK(manager_count == 1); + psubscribe_success_succeeded = 1; +} + +TEST psubscribe_success_test(void) { + g_loop = event_loop_create(); + db_handle *db = + db_connect("127.0.0.1", 6379, "plasma_manager", "127.0.0.1", 11236); + db_attach(db, g_loop, false); + psubscribe_id = globally_unique_id(); + + retry_info retry = { + .num_retries = 0, + .timeout = 100, + .fail_callback = subscribe_success_fail_callback, + }; + object_table_subscribe(db, NIL_ID, + psubscribe_success_object_available_callback, + (void *) psubscribe_success_context, &retry, + psubscribe_success_done_callback, (void *) db); + + /* Install handler for terminating the event loop. */ + event_loop_add_timer(g_loop, 750, + (event_loop_timer_handler) terminate_event_loop_callback, + NULL); + + event_loop_run(g_loop); + db_disconnect(db); + destroy_outstanding_callbacks(g_loop); + event_loop_destroy(g_loop); + + ASSERT(psubscribe_success_done); + ASSERT(psubscribe_success_succeeded); + PASS(); +} + /* Test if subscribe succeeds if the object is already present. */ const char *subscribe_object_present_context = "subscribe_object_present"; @@ -802,8 +869,8 @@ int64_t add_object_callback(event_loop *loop, int64_t timer_id, void *context) { retry_info retry = { .num_retries = 0, .timeout = 100, .fail_callback = NULL, }; - object_table_add(db, NIL_ID, 0, (unsigned char *) NIL_DIGEST, &retry, NULL, - NULL); + object_id id = globally_unique_id(); + object_table_add(db, id, 0, (unsigned char *) NIL_DIGEST, &retry, NULL, NULL); /* Reset the timer to this large value, so it doesn't trigger again. */ return 10000; } @@ -858,8 +925,8 @@ void subscribe_object_info_done_callback(object_id object_id, CHECK(obj_info_subscribe_context.subscribe_callback_done == 0); object_table_add((db_handle *) user_context, object_id, - obj_info_subscribe_context.data_size, NIL_DIGEST, &retry, - NULL, NULL); + obj_info_subscribe_context.data_size, + (unsigned char *) NIL_DIGEST, &retry, NULL, NULL); obj_info_subscribe_context.subscribe_callback_done = 1; } @@ -922,6 +989,7 @@ SUITE(object_table_tests) { RUN_REDIS_TEST(add_late_test); RUN_REDIS_TEST(subscribe_late_test); RUN_REDIS_TEST(subscribe_success_test); + // RUN_REDIS_TEST(psubscribe_success_test); RUN_REDIS_TEST(subscribe_object_present_test); RUN_REDIS_TEST(subscribe_object_not_present_test); RUN_REDIS_TEST(subscribe_object_available_later_test);