2016-09-13 18:54:26 -07:00
|
|
|
/* Redis implementation of the global state store */
|
|
|
|
|
|
|
|
#include <assert.h>
|
2016-10-29 15:22:33 -07:00
|
|
|
#include <stdbool.h>
|
2016-09-23 22:53:58 -07:00
|
|
|
#include <stdlib.h>
|
2016-12-09 17:21:49 -08:00
|
|
|
#include <unistd.h>
|
2016-11-22 17:04:24 -08:00
|
|
|
/* Including hiredis here is necessary on Windows for typedefs used in ae.h. */
|
|
|
|
#include "hiredis/hiredis.h"
|
2016-09-23 22:53:58 -07:00
|
|
|
#include "hiredis/adapters/ae.h"
|
2016-09-22 23:15:45 -07:00
|
|
|
#include "utstring.h"
|
|
|
|
|
2016-09-13 18:54:26 -07:00
|
|
|
#include "common.h"
|
|
|
|
#include "db.h"
|
2016-11-18 19:57:51 -08:00
|
|
|
#include "db_client_table.h"
|
2016-12-24 20:02:25 -08:00
|
|
|
#include "local_scheduler_table.h"
|
2016-09-13 18:54:26 -07:00
|
|
|
#include "object_table.h"
|
2016-12-09 00:51:44 -08:00
|
|
|
#include "object_info.h"
|
2016-10-29 15:22:33 -07:00
|
|
|
#include "task.h"
|
2016-11-10 18:13:26 -08:00
|
|
|
#include "task_table.h"
|
2016-09-13 18:54:26 -07:00
|
|
|
#include "event_loop.h"
|
|
|
|
#include "redis.h"
|
2016-09-20 22:40:35 -07:00
|
|
|
#include "io.h"
|
2016-09-13 18:54:26 -07:00
|
|
|
|
2016-12-09 17:21:49 -08:00
|
|
|
#ifndef _WIN32
|
|
|
|
/* This function is actually not declared in standard POSIX, so declare it. */
|
|
|
|
extern int usleep(useconds_t usec);
|
|
|
|
#endif
|
|
|
|
|
2016-09-13 18:54:26 -07:00
|
|
|
#define CHECK_REDIS_CONNECT(CONTEXT_TYPE, context, M, ...) \
|
|
|
|
do { \
|
|
|
|
CONTEXT_TYPE *_context = (context); \
|
|
|
|
if (!_context) { \
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL("could not allocate redis context"); \
|
2016-09-13 18:54:26 -07:00
|
|
|
} \
|
|
|
|
if (_context->err) { \
|
2016-12-05 00:26:53 -08:00
|
|
|
LOG_ERROR(M, ##__VA_ARGS__); \
|
|
|
|
LOG_REDIS_ERROR(_context, ""); \
|
2016-09-13 18:54:26 -07:00
|
|
|
exit(-1); \
|
|
|
|
} \
|
2016-11-19 12:19:49 -08:00
|
|
|
} while (0)
|
2016-09-13 18:54:26 -07:00
|
|
|
|
2016-12-08 20:57:08 -08:00
|
|
|
/**
|
|
|
|
* A header for callbacks of a single Redis asynchronous command. The user must
|
|
|
|
* pass in the table operation's timer ID as the asynchronous command's
|
|
|
|
* privdata field when executing the asynchronous command. The user must define
|
|
|
|
* variable names for DB and CB_DATA. After this piece of code runs, DB
|
|
|
|
* will hold a reference to the database handle, CB_DATA will hold a reference
|
|
|
|
* to the callback data for this table operation. The user must pass in the
|
|
|
|
* redisReply pointer as the REPLY argument.
|
|
|
|
*
|
|
|
|
* This header also short-circuits the entire callback if: (1) there was no
|
|
|
|
* reply from Redis, or (2) the callback data for this table operation was
|
|
|
|
* already removed, meaning that the operation was already marked as succeeded
|
|
|
|
* or failed.
|
|
|
|
*/
|
2016-10-29 15:22:33 -07:00
|
|
|
#define REDIS_CALLBACK_HEADER(DB, CB_DATA, REPLY) \
|
|
|
|
if ((REPLY) == NULL) { \
|
|
|
|
return; \
|
|
|
|
} \
|
|
|
|
db_handle *DB = c->data; \
|
|
|
|
table_callback_data *CB_DATA = \
|
|
|
|
outstanding_callbacks_find((int64_t) privdata); \
|
2016-12-05 00:26:53 -08:00
|
|
|
if (CB_DATA == NULL) { \
|
2016-10-29 15:22:33 -07:00
|
|
|
/* the callback data structure has been \
|
|
|
|
* already freed; just ignore this reply */ \
|
2016-11-19 12:19:49 -08:00
|
|
|
return; \
|
2016-12-05 00:26:53 -08:00
|
|
|
} \
|
2016-11-19 12:19:49 -08:00
|
|
|
do { \
|
|
|
|
} while (0)
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-12-08 20:57:08 -08:00
|
|
|
/**
|
|
|
|
* A data structure to track the status of a table operation attempt that spans
|
|
|
|
* multiple Redis commands. Each attempt at a table operation is associated
|
|
|
|
* with a unique redis_requests_info instance. To use this data structure, pass
|
|
|
|
* it as the `privdata` argument for the callback of each asynchronous Redis
|
|
|
|
* command.
|
|
|
|
*/
|
|
|
|
typedef struct {
|
|
|
|
/** The timer ID that uniquely identifies this table operation. All retry
|
|
|
|
* attempts of a table operation share the same timer ID. */
|
|
|
|
int64_t timer_id;
|
|
|
|
/** The index of the next command to try for this operation. This may be
|
|
|
|
* different across different attempts of the same table operation. */
|
|
|
|
int request_index;
|
|
|
|
/** Whether the current invocation of the callback was triggered by a reply
|
|
|
|
* to an asynchronous Redis command. If not, then the callback was called
|
|
|
|
* directly. */
|
|
|
|
bool is_redis_reply;
|
|
|
|
} redis_requests_info;
|
|
|
|
|
2016-12-20 20:21:35 -08:00
|
|
|
db_handle *db_connect(const char *db_address,
|
|
|
|
int db_port,
|
2016-09-25 21:52:06 -07:00
|
|
|
const char *client_type,
|
2016-12-20 20:21:35 -08:00
|
|
|
const char *node_ip_address,
|
|
|
|
int num_args,
|
|
|
|
const char **args) {
|
|
|
|
/* Check that the number of args is even. These args will be passed to the
|
|
|
|
* RAY.CONNECT Redis command, which takes arguments in pairs. */
|
|
|
|
if (num_args % 2 != 0) {
|
|
|
|
LOG_FATAL("The number of extra args must be divisible by two.");
|
|
|
|
}
|
2016-12-13 17:21:38 -08:00
|
|
|
|
2016-09-25 21:52:06 -07:00
|
|
|
db_handle *db = malloc(sizeof(db_handle));
|
2016-09-13 18:54:26 -07:00
|
|
|
/* Sync connection for initial handshake */
|
|
|
|
redisReply *reply;
|
2016-12-09 17:21:49 -08:00
|
|
|
int connection_attempts = 0;
|
2016-12-20 20:21:35 -08:00
|
|
|
redisContext *context = redisConnect(db_address, db_port);
|
2016-12-09 17:21:49 -08:00
|
|
|
while (context == NULL || context->err) {
|
|
|
|
if (connection_attempts >= REDIS_DB_CONNECT_RETRIES) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
LOG_WARN("Failed to connect to Redis, retrying.");
|
|
|
|
/* Sleep for a little. */
|
|
|
|
usleep(REDIS_DB_CONNECT_WAIT_MS * 1000);
|
2016-12-20 20:21:35 -08:00
|
|
|
context = redisConnect(db_address, db_port);
|
2016-12-09 17:21:49 -08:00
|
|
|
connection_attempts += 1;
|
|
|
|
}
|
|
|
|
CHECK_REDIS_CONNECT(redisContext, context,
|
|
|
|
"could not establish synchronous connection to redis "
|
|
|
|
"%s:%d",
|
2016-12-20 20:21:35 -08:00
|
|
|
db_address, db_port);
|
2016-12-21 18:53:12 -08:00
|
|
|
/* Configure Redis to generate keyspace notifications for list events. This
|
|
|
|
* should only need to be done once (by whoever started Redis), but since
|
|
|
|
* Redis may be started in multiple places (e.g., for testing or when starting
|
|
|
|
* processes by hand), it is easier to do it multiple times. */
|
|
|
|
reply = redisCommand(context, "CONFIG SET notify-keyspace-events Kl");
|
2016-12-13 17:21:38 -08:00
|
|
|
CHECKM(reply != NULL, "db_connect failed on CONFIG SET");
|
2016-11-27 21:26:23 -08:00
|
|
|
freeReplyObject(reply);
|
2016-12-21 18:53:12 -08:00
|
|
|
/* Also configure Redis to not run in protected mode, so clients on other
|
|
|
|
* hosts can connect to it. */
|
|
|
|
reply = redisCommand(context, "CONFIG SET protected-mode no");
|
|
|
|
CHECKM(reply != NULL, "db_connect failed on CONFIG SET");
|
|
|
|
freeReplyObject(reply);
|
|
|
|
/* Create a client ID for this client. */
|
2016-11-18 19:57:51 -08:00
|
|
|
db_client_id client = globally_unique_id();
|
2016-12-13 17:21:38 -08:00
|
|
|
|
2016-12-20 20:21:35 -08:00
|
|
|
/* Construct the argument arrays for RAY.CONNECT. */
|
|
|
|
int argc = num_args + 4;
|
|
|
|
const char **argv = malloc(sizeof(char *) * argc);
|
|
|
|
size_t *argvlen = malloc(sizeof(size_t) * argc);
|
|
|
|
/* Set the command name argument. */
|
|
|
|
argv[0] = "RAY.CONNECT";
|
|
|
|
argvlen[0] = strlen(argv[0]);
|
|
|
|
/* Set the client ID argument. */
|
|
|
|
argv[1] = (char *) client.id;
|
|
|
|
argvlen[1] = sizeof(db->client.id);
|
|
|
|
/* Set the node IP address argument. */
|
|
|
|
argv[2] = node_ip_address;
|
|
|
|
argvlen[2] = strlen(node_ip_address);
|
|
|
|
/* Set the client type argument. */
|
|
|
|
argv[3] = client_type;
|
|
|
|
argvlen[3] = strlen(client_type);
|
|
|
|
/* Set the remaining arguments. */
|
|
|
|
for (int i = 0; i < num_args; ++i) {
|
|
|
|
if (args[i] == NULL) {
|
|
|
|
LOG_FATAL("Element %d of the args array passed to db_connect was NULL.",
|
|
|
|
i);
|
|
|
|
}
|
|
|
|
argv[4 + i] = args[i];
|
|
|
|
argvlen[4 + i] = strlen(args[i]);
|
|
|
|
}
|
|
|
|
|
2016-12-15 14:47:10 -08:00
|
|
|
/* Register this client with Redis. RAY.CONNECT is a custom Redis command that
|
|
|
|
* we've defined. */
|
2016-12-20 20:21:35 -08:00
|
|
|
reply = redisCommandArgv(context, argc, argv, argvlen);
|
2016-12-15 14:47:10 -08:00
|
|
|
CHECKM(reply != NULL, "db_connect failed on RAY.CONNECT");
|
2016-12-20 20:21:35 -08:00
|
|
|
CHECK(reply->type != REDIS_REPLY_ERROR);
|
|
|
|
CHECK(strcmp(reply->str, "OK") == 0);
|
2016-12-15 14:47:10 -08:00
|
|
|
freeReplyObject(reply);
|
2016-12-20 20:21:35 -08:00
|
|
|
free(argv);
|
|
|
|
free(argvlen);
|
2016-09-13 18:54:26 -07:00
|
|
|
|
|
|
|
db->client_type = strdup(client_type);
|
2016-11-18 19:57:51 -08:00
|
|
|
db->client = client;
|
|
|
|
db->db_client_cache = NULL;
|
2016-09-20 17:02:56 -07:00
|
|
|
db->sync_context = context;
|
2016-09-13 18:54:26 -07:00
|
|
|
|
|
|
|
/* Establish async connection */
|
2016-12-20 20:21:35 -08:00
|
|
|
db->context = redisAsyncConnect(db_address, db_port);
|
2016-09-13 18:54:26 -07:00
|
|
|
CHECK_REDIS_CONNECT(redisAsyncContext, db->context,
|
2016-12-09 17:21:49 -08:00
|
|
|
"could not establish asynchronous connection to redis "
|
|
|
|
"%s:%d",
|
2016-12-20 20:21:35 -08:00
|
|
|
db_address, db_port);
|
2016-09-13 18:54:26 -07:00
|
|
|
db->context->data = (void *) db;
|
2016-09-29 21:12:06 -07:00
|
|
|
/* Establish async connection for subscription */
|
2016-12-20 20:21:35 -08:00
|
|
|
db->sub_context = redisAsyncConnect(db_address, db_port);
|
2016-09-29 21:12:06 -07:00
|
|
|
CHECK_REDIS_CONNECT(redisAsyncContext, db->sub_context,
|
2016-12-09 17:21:49 -08:00
|
|
|
"could not establish asynchronous subscription "
|
|
|
|
"connection to redis %s:%d",
|
2016-12-20 20:21:35 -08:00
|
|
|
db_address, db_port);
|
2016-09-29 21:12:06 -07:00
|
|
|
db->sub_context->data = (void *) db;
|
|
|
|
|
2016-09-25 21:52:06 -07:00
|
|
|
return db;
|
2016-09-13 18:54:26 -07:00
|
|
|
}
|
|
|
|
|
2016-09-25 21:52:06 -07:00
|
|
|
void db_disconnect(db_handle *db) {
|
2016-09-20 17:02:56 -07:00
|
|
|
redisFree(db->sync_context);
|
|
|
|
redisAsyncFree(db->context);
|
2016-09-29 21:12:06 -07:00
|
|
|
redisAsyncFree(db->sub_context);
|
2016-11-18 19:57:51 -08:00
|
|
|
db_client_cache_entry *e, *tmp;
|
|
|
|
HASH_ITER(hh, db->db_client_cache, e, tmp) {
|
2016-09-20 17:02:56 -07:00
|
|
|
free(e->addr);
|
2016-11-18 19:57:51 -08:00
|
|
|
HASH_DELETE(hh, db->db_client_cache, e);
|
2016-09-20 17:02:56 -07:00
|
|
|
free(e);
|
|
|
|
}
|
|
|
|
free(db->client_type);
|
2016-09-25 21:52:06 -07:00
|
|
|
free(db);
|
2016-09-20 17:02:56 -07:00
|
|
|
}
|
|
|
|
|
2016-12-05 00:26:53 -08:00
|
|
|
void db_attach(db_handle *db, event_loop *loop, bool reattach) {
|
2016-10-29 15:22:33 -07:00
|
|
|
db->loop = loop;
|
2016-12-05 00:26:53 -08:00
|
|
|
int err = redisAeAttach(loop, db->context);
|
|
|
|
/* If the database is reattached in the tests, redis normally gives
|
|
|
|
* an error which we can safely ignore. */
|
|
|
|
if (!reattach) {
|
|
|
|
CHECKM(err == REDIS_OK, "failed to attach the event loop");
|
|
|
|
}
|
|
|
|
err = redisAeAttach(loop, db->sub_context);
|
|
|
|
if (!reattach) {
|
|
|
|
CHECKM(err == REDIS_OK, "failed to attach the event loop");
|
|
|
|
}
|
2016-09-13 18:54:26 -07:00
|
|
|
}
|
|
|
|
|
2016-10-29 15:22:33 -07:00
|
|
|
/*
|
|
|
|
* ==== object_table callbacks ====
|
|
|
|
*/
|
|
|
|
|
|
|
|
void redis_object_table_add_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
2016-12-18 18:19:02 -08:00
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-12-08 20:57:08 -08:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Do some minimal checking. */
|
|
|
|
redisReply *reply = r;
|
|
|
|
if (strcmp(reply->str, "hash mismatch") == 0) {
|
|
|
|
/* If our object hash doesn't match the one recorded in the table, report
|
|
|
|
* the error back to the user and exit immediately. */
|
|
|
|
LOG_FATAL(
|
|
|
|
"Found objects with different value but same object ID, most likely "
|
|
|
|
"because a nondeterministic task was executed twice, either for "
|
|
|
|
"reconstruction or for speculation.");
|
2016-12-08 20:57:08 -08:00
|
|
|
}
|
2016-12-18 18:19:02 -08:00
|
|
|
CHECK(reply->type != REDIS_REPLY_ERROR);
|
|
|
|
CHECK(strcmp(reply->str, "OK") == 0);
|
|
|
|
/* Call the done callback if there is one. */
|
|
|
|
if (callback_data->done_callback != NULL) {
|
2016-12-19 23:18:57 -08:00
|
|
|
object_table_done_callback done_callback = callback_data->done_callback;
|
2016-12-18 18:19:02 -08:00
|
|
|
done_callback(callback_data->id, callback_data->user_context);
|
2016-10-29 15:22:33 -07:00
|
|
|
}
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Clean up the timer and callback. */
|
|
|
|
destroy_timer_callback(db->loop, callback_data);
|
2016-10-29 15:22:33 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
void redis_object_table_add(table_callback_data *callback_data) {
|
|
|
|
db_handle *db = callback_data->db_handle;
|
2016-12-18 18:19:02 -08:00
|
|
|
|
|
|
|
object_table_add_data *info = callback_data->data;
|
|
|
|
object_id obj_id = callback_data->id;
|
|
|
|
int64_t object_size = info->object_size;
|
|
|
|
unsigned char *digest = info->digest;
|
|
|
|
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->context, redis_object_table_add_callback,
|
|
|
|
(void *) callback_data->timer_id, "RAY.OBJECT_TABLE_ADD %b %ld %b %b",
|
|
|
|
obj_id.id, sizeof(obj_id.id), object_size, digest, (size_t) DIGEST_SIZE,
|
|
|
|
db->client.id, sizeof(db->client.id));
|
|
|
|
|
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->context, "error in redis_object_table_add");
|
|
|
|
}
|
2016-09-13 18:54:26 -07:00
|
|
|
}
|
|
|
|
|
2016-12-19 23:18:57 -08:00
|
|
|
void redis_object_table_remove_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
|
|
|
|
|
|
|
/* Do some minimal checking. */
|
|
|
|
redisReply *reply = r;
|
|
|
|
if (strcmp(reply->str, "object not found") == 0) {
|
|
|
|
/* If our object entry was not in the table, it's probably a race
|
|
|
|
* condition with an object_table_add. */
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
CHECK(reply->type != REDIS_REPLY_ERROR);
|
|
|
|
CHECK(strcmp(reply->str, "OK") == 0);
|
|
|
|
/* Call the done callback if there is one. */
|
|
|
|
if (callback_data->done_callback != NULL) {
|
|
|
|
object_table_done_callback done_callback = callback_data->done_callback;
|
|
|
|
done_callback(callback_data->id, callback_data->user_context);
|
|
|
|
}
|
|
|
|
/* Clean up the timer and callback. */
|
|
|
|
destroy_timer_callback(db->loop, callback_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
void redis_object_table_remove(table_callback_data *callback_data) {
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
|
|
|
|
object_id obj_id = callback_data->id;
|
|
|
|
/* If the caller provided a manager ID to delete, use it. Otherwise, use our
|
|
|
|
* own client ID as the ID to delete. */
|
|
|
|
db_client_id *client_id = callback_data->data;
|
|
|
|
if (client_id == NULL) {
|
|
|
|
client_id = &db->client;
|
|
|
|
}
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->context, redis_object_table_remove_callback,
|
|
|
|
(void *) callback_data->timer_id, "RAY.OBJECT_TABLE_REMOVE %b %b",
|
|
|
|
obj_id.id, sizeof(obj_id.id), client_id->id, sizeof(client_id->id));
|
|
|
|
|
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->context, "error in redis_object_table_remove");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-29 15:22:33 -07:00
|
|
|
void redis_object_table_lookup(table_callback_data *callback_data) {
|
|
|
|
CHECK(callback_data);
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
object_id obj_id = callback_data->id;
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->context, redis_object_table_lookup_callback,
|
|
|
|
(void *) callback_data->timer_id, "RAY.OBJECT_TABLE_LOOKUP %b", obj_id.id,
|
|
|
|
sizeof(obj_id.id));
|
2016-11-10 18:13:26 -08:00
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
2016-11-15 20:33:29 -08:00
|
|
|
LOG_REDIS_DEBUG(db->context, "error in object_table lookup");
|
2016-10-29 15:22:33 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-10 18:13:26 -08:00
|
|
|
void redis_result_table_add_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
2016-11-19 12:19:49 -08:00
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-11-10 18:13:26 -08:00
|
|
|
redisReply *reply = r;
|
2016-12-25 23:57:05 -08:00
|
|
|
/* Check that the command succeeded. */
|
|
|
|
CHECK(reply->type != REDIS_REPLY_ERROR);
|
|
|
|
CHECKM(strncmp(reply->str, "OK", strlen("OK")) == 0, "reply->str is %s",
|
|
|
|
reply->str);
|
|
|
|
/* Call the done callback if there is one. */
|
2016-11-10 18:13:26 -08:00
|
|
|
if (callback_data->done_callback) {
|
|
|
|
result_table_done_callback done_callback = callback_data->done_callback;
|
|
|
|
done_callback(callback_data->id, callback_data->user_context);
|
|
|
|
}
|
|
|
|
destroy_timer_callback(db->loop, callback_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
void redis_result_table_add(table_callback_data *callback_data) {
|
|
|
|
CHECK(callback_data);
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
object_id id = callback_data->id;
|
|
|
|
task_id *result_task_id = (task_id *) callback_data->data;
|
|
|
|
/* Add the result entry to the result table. */
|
2016-11-19 12:19:49 -08:00
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->context, redis_result_table_add_callback,
|
2016-12-25 23:57:05 -08:00
|
|
|
(void *) callback_data->timer_id, "RAY.RESULT_TABLE_ADD %b %b", id.id,
|
2016-11-19 12:19:49 -08:00
|
|
|
sizeof(id.id), result_task_id->id, sizeof(result_task_id->id));
|
2016-11-10 18:13:26 -08:00
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
2016-11-15 20:33:29 -08:00
|
|
|
LOG_REDIS_DEBUG(db->context, "Error in result table add");
|
2016-11-10 18:13:26 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-25 23:57:05 -08:00
|
|
|
/* This allocates a task which must be freed by the caller, unless the returned
|
|
|
|
* task is NULL. This is used by both redis_result_table_lookup_callback and
|
|
|
|
* redis_task_table_get_task_callback. */
|
|
|
|
task *parse_and_construct_task_from_redis_reply(redisReply *reply) {
|
|
|
|
task *task;
|
|
|
|
if (reply->type == REDIS_REPLY_NIL) {
|
|
|
|
/* There is no task in the reply, so return NULL. */
|
|
|
|
task = NULL;
|
|
|
|
} else if (reply->type == REDIS_REPLY_ARRAY) {
|
|
|
|
/* Check that the reply is as expected. The 0th element is the scheduling
|
|
|
|
* state. The 1st element is the db_client_id of the associated local
|
|
|
|
* scheduler, and the 2nd element is the task_spec. */
|
|
|
|
CHECK(reply->elements == 3);
|
|
|
|
CHECK(reply->element[0]->type == REDIS_REPLY_INTEGER);
|
|
|
|
CHECK(reply->element[1]->type == REDIS_REPLY_STRING);
|
|
|
|
CHECK(reply->element[2]->type == REDIS_REPLY_STRING);
|
|
|
|
/* Parse the scheduling state. */
|
|
|
|
long long state = reply->element[0]->integer;
|
|
|
|
/* Parse the local scheduler db_client_id. */
|
|
|
|
db_client_id local_scheduler_id;
|
|
|
|
CHECK(sizeof(local_scheduler_id) == reply->element[1]->len);
|
|
|
|
memcpy(local_scheduler_id.id, reply->element[1]->str,
|
|
|
|
reply->element[1]->len);
|
|
|
|
/* Parse the task spec. */
|
|
|
|
task_spec *spec = malloc(reply->element[2]->len);
|
|
|
|
memcpy(spec, reply->element[2]->str, reply->element[2]->len);
|
|
|
|
CHECK(task_spec_size(spec) == reply->element[2]->len);
|
|
|
|
task = alloc_task(spec, state, local_scheduler_id);
|
|
|
|
/* Free the task spec. */
|
|
|
|
free_task_spec(spec);
|
|
|
|
} else {
|
|
|
|
LOG_FATAL("Unexpected reply type %d", reply->type);
|
2016-11-10 18:13:26 -08:00
|
|
|
}
|
2016-12-25 23:57:05 -08:00
|
|
|
/* Return the task. If it is not NULL, then it must be freed by the caller. */
|
|
|
|
return task;
|
2016-11-10 18:13:26 -08:00
|
|
|
}
|
|
|
|
|
2016-12-25 23:57:05 -08:00
|
|
|
void redis_result_table_lookup_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
2016-11-19 12:19:49 -08:00
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-11-10 18:13:26 -08:00
|
|
|
redisReply *reply = r;
|
2016-12-25 23:57:05 -08:00
|
|
|
/* Parse the task from the reply. */
|
|
|
|
task *task = parse_and_construct_task_from_redis_reply(reply);
|
|
|
|
/* Call the done callback if there is one. */
|
|
|
|
result_table_lookup_callback done_callback = callback_data->done_callback;
|
|
|
|
if (done_callback != NULL) {
|
|
|
|
done_callback(callback_data->id, task, callback_data->user_context);
|
2016-11-10 18:13:26 -08:00
|
|
|
}
|
2016-12-25 23:57:05 -08:00
|
|
|
/* Free the task if it is not NULL. */
|
|
|
|
if (task != NULL) {
|
|
|
|
free_task(task);
|
|
|
|
}
|
|
|
|
/* Clean up timer and callback. */
|
|
|
|
destroy_timer_callback(db->loop, callback_data);
|
2016-11-10 18:13:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
void redis_result_table_lookup(table_callback_data *callback_data) {
|
|
|
|
CHECK(callback_data);
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
object_id id = callback_data->id;
|
2016-12-25 23:57:05 -08:00
|
|
|
int status =
|
|
|
|
redisAsyncCommand(db->context, redis_result_table_lookup_callback,
|
|
|
|
(void *) callback_data->timer_id,
|
|
|
|
"RAY.RESULT_TABLE_LOOKUP %b", id.id, sizeof(id.id));
|
2016-11-10 18:13:26 -08:00
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
2016-11-15 20:33:29 -08:00
|
|
|
LOG_REDIS_DEBUG(db->context, "Error in result table lookup");
|
2016-11-10 18:13:26 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-29 15:22:33 -07:00
|
|
|
/**
|
|
|
|
* Get an entry from the plasma manager table in redis.
|
|
|
|
*
|
|
|
|
* @param db The database handle.
|
|
|
|
* @param index The index of the plasma manager.
|
2016-11-18 19:57:51 -08:00
|
|
|
* @param manager The pointer where the IP address of the manager gets written.
|
2016-10-29 15:22:33 -07:00
|
|
|
* @return Void.
|
|
|
|
*/
|
2016-11-18 19:57:51 -08:00
|
|
|
void redis_get_cached_db_client(db_handle *db,
|
|
|
|
db_client_id db_client_id,
|
|
|
|
const char **manager) {
|
|
|
|
db_client_cache_entry *entry;
|
|
|
|
HASH_FIND(hh, db->db_client_cache, &db_client_id, sizeof(db_client_id),
|
|
|
|
entry);
|
2016-10-29 15:22:33 -07:00
|
|
|
if (!entry) {
|
2016-11-18 19:57:51 -08:00
|
|
|
/* This is a very rare case. It should happen at most once per db client. */
|
2016-10-29 15:22:33 -07:00
|
|
|
redisReply *reply =
|
2016-12-15 14:47:10 -08:00
|
|
|
redisCommand(db->sync_context, "RAY.GET_CLIENT_ADDRESS %b",
|
2016-11-19 12:19:49 -08:00
|
|
|
(char *) db_client_id.id, sizeof(db_client_id.id));
|
2016-12-20 20:21:35 -08:00
|
|
|
CHECKM(reply->type == REDIS_REPLY_STRING, "REDIS reply type=%d, str=%s",
|
|
|
|
reply->type, reply->str);
|
2016-11-18 19:57:51 -08:00
|
|
|
entry = malloc(sizeof(db_client_cache_entry));
|
|
|
|
entry->db_client_id = db_client_id;
|
2016-10-29 15:22:33 -07:00
|
|
|
entry->addr = strdup(reply->str);
|
2016-11-18 19:57:51 -08:00
|
|
|
HASH_ADD(hh, db->db_client_cache, db_client_id, sizeof(db_client_id),
|
|
|
|
entry);
|
2016-10-29 15:22:33 -07:00
|
|
|
freeReplyObject(reply);
|
|
|
|
}
|
|
|
|
*manager = entry->addr;
|
|
|
|
}
|
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
void redis_object_table_lookup_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
2016-11-19 12:19:49 -08:00
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-09-13 18:54:26 -07:00
|
|
|
redisReply *reply = r;
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
object_id obj_id = callback_data->id;
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
LOG_DEBUG("Object table lookup callback");
|
|
|
|
CHECK(reply->type == REDIS_REPLY_ARRAY);
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
int64_t manager_count = reply->elements;
|
|
|
|
db_client_id *managers = NULL;
|
|
|
|
const char **manager_vector = NULL;
|
|
|
|
if (manager_count > 0) {
|
|
|
|
managers = malloc(reply->elements * sizeof(db_client_id));
|
|
|
|
manager_vector = malloc(manager_count * sizeof(char *));
|
|
|
|
}
|
|
|
|
for (int j = 0; j < reply->elements; ++j) {
|
|
|
|
CHECK(reply->element[j]->type == REDIS_REPLY_STRING);
|
|
|
|
memcpy(managers[j].id, reply->element[j]->str, sizeof(managers[j].id));
|
|
|
|
redis_get_cached_db_client(db, managers[j], manager_vector + j);
|
|
|
|
}
|
|
|
|
object_table_lookup_done_callback done_callback =
|
|
|
|
callback_data->done_callback;
|
|
|
|
if (done_callback) {
|
|
|
|
done_callback(obj_id, manager_count, manager_vector,
|
|
|
|
callback_data->user_context);
|
|
|
|
}
|
2016-12-05 00:26:53 -08:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Clean up timer and callback. */
|
|
|
|
destroy_timer_callback(callback_data->db_handle->loop, callback_data);
|
|
|
|
if (manager_count > 0) {
|
|
|
|
free(managers);
|
|
|
|
free(manager_vector);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* This will parse a payload string published on the object notification
|
|
|
|
* channel. The string must have the format:
|
|
|
|
*
|
|
|
|
* <object id> MANAGERS <manager id1> <manager id2> ...
|
|
|
|
*
|
|
|
|
* where there may be any positive number of manager IDs.
|
|
|
|
*
|
|
|
|
* @param db The db handle.
|
|
|
|
* @param payload The payload string.
|
|
|
|
* @param length The length of the string.
|
|
|
|
* @param manager_count This method will write the number of managers at this
|
|
|
|
* address.
|
|
|
|
* @param manager_vector This method will allocate an array of pointers to
|
|
|
|
* manager addresses and write the address of the array at this address.
|
|
|
|
* The caller is responsible for freeing this array.
|
|
|
|
* @return The object ID that the notification is about.
|
|
|
|
*/
|
|
|
|
object_id parse_subscribe_to_notifications_payload(
|
|
|
|
db_handle *db,
|
|
|
|
char *payload,
|
|
|
|
int length,
|
2016-12-19 21:07:25 -08:00
|
|
|
int64_t *data_size,
|
2016-12-18 18:19:02 -08:00
|
|
|
int *manager_count,
|
|
|
|
const char ***manager_vector) {
|
2016-12-19 21:07:25 -08:00
|
|
|
long long data_size_value = 0;
|
|
|
|
int num_managers = (length - sizeof(object_id) - 1 - sizeof(data_size_value) -
|
|
|
|
1 - strlen("MANAGERS")) /
|
2016-12-18 18:19:02 -08:00
|
|
|
(1 + sizeof(db_client_id));
|
2016-12-22 03:11:46 -08:00
|
|
|
|
|
|
|
int64_t rval = sizeof(object_id) + 1 + sizeof(data_size_value) + 1 +
|
|
|
|
strlen("MANAGERS") + num_managers * (1 + sizeof(db_client_id));
|
|
|
|
|
|
|
|
CHECKM(length == rval,
|
|
|
|
"length mismatch: num_managers = %d, length = %d, rval = %" PRId64,
|
|
|
|
num_managers, length, rval);
|
2016-12-18 18:19:02 -08:00
|
|
|
CHECK(num_managers > 0);
|
|
|
|
object_id obj_id;
|
|
|
|
/* Track our current offset in the payload. */
|
|
|
|
int offset = 0;
|
|
|
|
/* Parse the object ID. */
|
|
|
|
memcpy(&obj_id.id, &payload[offset], sizeof(obj_id.id));
|
|
|
|
offset += sizeof(obj_id.id);
|
2016-12-19 21:07:25 -08:00
|
|
|
/* The next part of the payload is a space. */
|
|
|
|
char *space_str = " ";
|
|
|
|
CHECK(memcmp(&payload[offset], space_str, strlen(space_str)) == 0);
|
|
|
|
offset += strlen(space_str);
|
|
|
|
/* The next part of the payload is binary data_size. */
|
|
|
|
memcpy(&data_size_value, &payload[offset], sizeof(data_size_value));
|
|
|
|
offset += sizeof(data_size_value);
|
|
|
|
/* The next part of the payload is the string " MANAGERS" with leading ' '. */
|
2016-12-18 18:19:02 -08:00
|
|
|
char *managers_str = " MANAGERS";
|
|
|
|
CHECK(memcmp(&payload[offset], managers_str, strlen(managers_str)) == 0);
|
|
|
|
offset += strlen(managers_str);
|
|
|
|
/* Parse the managers. */
|
|
|
|
const char **managers = malloc(num_managers * sizeof(char *));
|
|
|
|
for (int i = 0; i < num_managers; ++i) {
|
|
|
|
/* First there is a space. */
|
|
|
|
CHECK(memcmp(&payload[offset], " ", strlen(" ")) == 0);
|
|
|
|
offset += strlen(" ");
|
|
|
|
/* Get the manager ID. */
|
|
|
|
db_client_id manager_id;
|
|
|
|
memcpy(&manager_id.id, &payload[offset], sizeof(manager_id.id));
|
|
|
|
offset += sizeof(manager_id.id);
|
|
|
|
/* Write the address of the corresponding manager to the returned array. */
|
|
|
|
redis_get_cached_db_client(db, manager_id, &managers[i]);
|
2016-11-27 21:26:23 -08:00
|
|
|
}
|
2016-12-18 18:19:02 -08:00
|
|
|
CHECK(offset == length);
|
|
|
|
/* Return the manager array and the object ID. */
|
|
|
|
*manager_count = num_managers;
|
|
|
|
*manager_vector = managers;
|
2016-12-19 21:07:25 -08:00
|
|
|
*data_size = data_size_value;
|
2016-12-18 18:19:02 -08:00
|
|
|
return obj_id;
|
2016-11-27 21:26:23 -08:00
|
|
|
}
|
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
void object_table_redis_subscribe_to_notifications_callback(
|
|
|
|
redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
2016-11-19 12:19:49 -08:00
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-12-18 18:19:02 -08:00
|
|
|
|
|
|
|
/* Replies to the SUBSCRIBE command have 3 elements. There are two
|
|
|
|
* possibilities. Either the reply is the initial acknowledgment of the
|
|
|
|
* subscribe command, or it is a message. If it is the initial acknowledgment,
|
|
|
|
* then
|
|
|
|
* - reply->element[0]->str is "subscribe"
|
|
|
|
* - reply->element[1]->str is the name of the channel
|
|
|
|
* - reply->emement[2]->str is null.
|
|
|
|
* If it is an actual message, then
|
|
|
|
* - reply->element[0]->str is "message"
|
|
|
|
* - reply->element[1]->str is the name of the channel
|
|
|
|
* - reply->emement[2]->str is the contents of the message.
|
|
|
|
*/
|
2016-10-29 15:22:33 -07:00
|
|
|
redisReply *reply = r;
|
|
|
|
CHECK(reply->type == REDIS_REPLY_ARRAY);
|
2016-12-18 18:19:02 -08:00
|
|
|
CHECK(reply->elements == 3);
|
2016-12-13 00:47:21 -08:00
|
|
|
redisReply *message_type = reply->element[0];
|
2016-12-18 18:19:02 -08:00
|
|
|
LOG_DEBUG("Object table subscribe to notifications callback, message %s",
|
|
|
|
message_type->str);
|
|
|
|
|
2016-12-13 00:47:21 -08:00
|
|
|
if (strcmp(message_type->str, "message") == 0) {
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Handle an object notification. */
|
2016-12-19 21:07:25 -08:00
|
|
|
int64_t data_size = 0;
|
2016-12-18 18:19:02 -08:00
|
|
|
int manager_count;
|
|
|
|
const char **manager_vector;
|
|
|
|
object_id obj_id = parse_subscribe_to_notifications_payload(
|
2016-12-19 21:07:25 -08:00
|
|
|
db, reply->element[2]->str, reply->element[2]->len, &data_size,
|
|
|
|
&manager_count, &manager_vector);
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Call the subscribe callback. */
|
|
|
|
object_table_subscribe_data *data = callback_data->data;
|
|
|
|
if (data->object_available_callback) {
|
2016-12-19 21:07:25 -08:00
|
|
|
data->object_available_callback(obj_id, data_size, manager_count,
|
|
|
|
manager_vector, data->subscribe_context);
|
2016-12-18 18:19:02 -08:00
|
|
|
}
|
|
|
|
free(manager_vector);
|
2016-12-13 00:47:21 -08:00
|
|
|
} else if (strcmp(message_type->str, "subscribe") == 0) {
|
2016-12-18 18:19:02 -08:00
|
|
|
/* The reply for the initial SUBSCRIBE command. */
|
|
|
|
/* Call the done callback if there is one. This code path should only be
|
|
|
|
* used in the tests. */
|
|
|
|
if (callback_data->done_callback != NULL) {
|
|
|
|
object_table_lookup_done_callback done_callback =
|
|
|
|
callback_data->done_callback;
|
2016-12-13 00:47:21 -08:00
|
|
|
done_callback(NIL_ID, 0, NULL, callback_data->user_context);
|
|
|
|
}
|
2016-12-18 18:19:02 -08:00
|
|
|
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
|
|
|
|
* destroy the callback data. */
|
2016-12-13 00:47:21 -08:00
|
|
|
event_loop_remove_timer(callback_data->db_handle->loop,
|
|
|
|
callback_data->timer_id);
|
|
|
|
} else {
|
2016-12-18 18:19:02 -08:00
|
|
|
LOG_FATAL(
|
|
|
|
"Unexpected reply type from object table subscribe to notifications.");
|
2016-12-13 00:47:21 -08:00
|
|
|
}
|
2016-12-18 18:19:02 -08:00
|
|
|
}
|
2016-12-13 00:47:21 -08:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
void redis_object_table_subscribe_to_notifications(
|
|
|
|
table_callback_data *callback_data) {
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
/* The object channel prefix must match the value defined in
|
|
|
|
* src/common/redismodule/ray_redis_module.c. */
|
|
|
|
const char *object_channel_prefix = "OC:";
|
2016-12-19 21:07:25 -08:00
|
|
|
const char *object_channel_bcast = "BCAST";
|
|
|
|
int status = REDIS_OK;
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Subscribe to notifications from the object table. This uses the client ID
|
|
|
|
* as the channel name so this channel is specific to this client. TODO(rkn):
|
|
|
|
* The channel name should probably be the client ID with some prefix. */
|
2016-12-19 21:07:25 -08:00
|
|
|
CHECKM(callback_data->data != NULL,
|
|
|
|
"Object table subscribe data passed as NULL.");
|
|
|
|
if (((object_table_subscribe_data *) (callback_data->data))->subscribe_all) {
|
|
|
|
/* Subscribe to the object broadcast channel. */
|
|
|
|
status = redisAsyncCommand(
|
|
|
|
db->sub_context, object_table_redis_subscribe_to_notifications_callback,
|
|
|
|
(void *) callback_data->timer_id, "SUBSCRIBE %s%s",
|
|
|
|
object_channel_prefix, object_channel_bcast);
|
|
|
|
} else {
|
|
|
|
status = redisAsyncCommand(
|
|
|
|
db->sub_context, object_table_redis_subscribe_to_notifications_callback,
|
|
|
|
(void *) callback_data->timer_id, "SUBSCRIBE %s%b",
|
|
|
|
object_channel_prefix, db->client.id, sizeof(db->client.id));
|
|
|
|
}
|
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
if ((status == REDIS_ERR) || db->sub_context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->sub_context,
|
|
|
|
"error in redis_object_table_subscribe_to_notifications");
|
2016-09-13 18:54:26 -07:00
|
|
|
}
|
|
|
|
}
|
2016-09-20 22:40:35 -07:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
void redis_object_table_request_notifications_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
/* Do some minimal checking. */
|
|
|
|
redisReply *reply = r;
|
|
|
|
CHECK(strcmp(reply->str, "OK") == 0);
|
|
|
|
CHECK(callback_data->done_callback == NULL);
|
|
|
|
/* Clean up the timer and callback. */
|
|
|
|
destroy_timer_callback(db->loop, callback_data);
|
|
|
|
}
|
2016-12-09 00:51:44 -08:00
|
|
|
|
2016-12-18 18:19:02 -08:00
|
|
|
void redis_object_table_request_notifications(
|
|
|
|
table_callback_data *callback_data) {
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
|
|
|
|
object_table_request_notifications_data *request_data = callback_data->data;
|
|
|
|
int num_object_ids = request_data->num_object_ids;
|
|
|
|
object_id *object_ids = request_data->object_ids;
|
|
|
|
|
|
|
|
/* Create the arguments for the Redis command. */
|
|
|
|
int num_args = 1 + 1 + num_object_ids;
|
|
|
|
const char **argv = malloc(sizeof(char *) * num_args);
|
|
|
|
size_t *argvlen = malloc(sizeof(size_t) * num_args);
|
|
|
|
/* Set the command name argument. */
|
|
|
|
argv[0] = "RAY.OBJECT_TABLE_REQUEST_NOTIFICATIONS";
|
|
|
|
argvlen[0] = strlen(argv[0]);
|
|
|
|
/* Set the client ID argument. */
|
|
|
|
argv[1] = (char *) db->client.id;
|
|
|
|
argvlen[1] = sizeof(db->client.id);
|
|
|
|
/* Set the object ID arguments. */
|
|
|
|
for (int i = 0; i < num_object_ids; ++i) {
|
|
|
|
argv[2 + i] = (char *) object_ids[i].id;
|
|
|
|
argvlen[2 + i] = sizeof(object_ids[i].id);
|
2016-12-09 00:51:44 -08:00
|
|
|
}
|
2016-12-18 18:19:02 -08:00
|
|
|
|
|
|
|
int status = redisAsyncCommandArgv(
|
|
|
|
db->context, redis_object_table_request_notifications_callback,
|
|
|
|
(void *) callback_data->timer_id, num_args, argv, argvlen);
|
|
|
|
free(argv);
|
|
|
|
free(argvlen);
|
|
|
|
|
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->context,
|
|
|
|
"error in redis_object_table_subscribe_to_notifications");
|
2016-09-29 21:12:06 -07:00
|
|
|
}
|
2016-10-29 15:22:33 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2016-11-10 18:13:26 -08:00
|
|
|
* ==== task_table callbacks ====
|
2016-10-29 15:22:33 -07:00
|
|
|
*/
|
|
|
|
|
2016-11-10 18:13:26 -08:00
|
|
|
void redis_task_table_get_task_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
2016-11-19 12:19:49 -08:00
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-11-10 18:13:26 -08:00
|
|
|
redisReply *reply = r;
|
2016-12-25 23:57:05 -08:00
|
|
|
/* Parse the task from the reply. */
|
|
|
|
task *task = parse_and_construct_task_from_redis_reply(reply);
|
|
|
|
/* Call the done callback if there is one. */
|
|
|
|
task_table_get_callback done_callback = callback_data->done_callback;
|
|
|
|
if (done_callback != NULL) {
|
|
|
|
done_callback(task, callback_data->user_context);
|
2016-11-10 18:13:26 -08:00
|
|
|
}
|
2016-12-25 23:57:05 -08:00
|
|
|
/* Free the task if it is not NULL. */
|
|
|
|
free_task(task);
|
|
|
|
|
|
|
|
/* Clean up the timer and callback. */
|
2016-11-10 18:13:26 -08:00
|
|
|
destroy_timer_callback(db->loop, callback_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
void redis_task_table_get_task(table_callback_data *callback_data) {
|
|
|
|
db_handle *db = callback_data->db_handle;
|
2016-12-25 23:57:05 -08:00
|
|
|
CHECK(callback_data->data == NULL);
|
|
|
|
task_id task_id = callback_data->id;
|
|
|
|
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->context, redis_task_table_get_task_callback,
|
|
|
|
(void *) callback_data->timer_id, "RAY.TASK_TABLE_GET %b", task_id.id,
|
|
|
|
sizeof(task_id.id));
|
2016-11-19 12:19:49 -08:00
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
2016-12-25 23:57:05 -08:00
|
|
|
LOG_REDIS_DEBUG(db->context, "error in redis_task_table_get_task");
|
2016-11-10 18:13:26 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-25 23:57:05 -08:00
|
|
|
void redis_task_table_add_task_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
|
|
|
|
|
|
|
/* Do some minimal checking. */
|
|
|
|
redisReply *reply = r;
|
|
|
|
CHECKM(strcmp(reply->str, "OK") == 0, "reply->str is %s", reply->str);
|
|
|
|
/* Call the done callback if there is one. */
|
|
|
|
if (callback_data->done_callback != NULL) {
|
|
|
|
task_table_done_callback done_callback = callback_data->done_callback;
|
|
|
|
done_callback(callback_data->id, callback_data->user_context);
|
|
|
|
}
|
|
|
|
/* Clean up the timer and callback. */
|
|
|
|
destroy_timer_callback(db->loop, callback_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
void redis_task_table_add_task(table_callback_data *callback_data) {
|
2016-10-29 15:22:33 -07:00
|
|
|
db_handle *db = callback_data->db_handle;
|
2016-11-10 18:13:26 -08:00
|
|
|
task *task = callback_data->data;
|
2016-12-25 23:57:05 -08:00
|
|
|
task_id task_id = task_task_id(task);
|
|
|
|
db_client_id local_scheduler_id = task_local_scheduler(task);
|
2016-11-10 18:13:26 -08:00
|
|
|
scheduling_state state = task_state(task);
|
|
|
|
task_spec *spec = task_task_spec(task);
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-12-25 23:57:05 -08:00
|
|
|
CHECKM(task != NULL, "NULL task passed to redis_task_table_add_task.");
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->context, redis_task_table_add_task_callback,
|
|
|
|
(void *) callback_data->timer_id, "RAY.TASK_TABLE_ADD %b %d %b %b",
|
|
|
|
task_id.id, sizeof(task_id.id), state, local_scheduler_id.id,
|
|
|
|
sizeof(local_scheduler_id.id), spec, task_spec_size(spec));
|
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->context, "error in redis_task_table_add_task");
|
2016-10-29 15:22:33 -07:00
|
|
|
}
|
2016-12-25 23:57:05 -08:00
|
|
|
}
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-12-25 23:57:05 -08:00
|
|
|
void redis_task_table_update_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-11-10 18:13:26 -08:00
|
|
|
|
2016-12-25 23:57:05 -08:00
|
|
|
/* Do some minimal checking. */
|
|
|
|
redisReply *reply = r;
|
|
|
|
CHECKM(strcmp(reply->str, "OK") == 0, "reply->str is %s", reply->str);
|
|
|
|
/* Call the done callback if there is one. */
|
|
|
|
if (callback_data->done_callback != NULL) {
|
|
|
|
task_table_done_callback done_callback = callback_data->done_callback;
|
|
|
|
done_callback(callback_data->id, callback_data->user_context);
|
2016-09-29 21:12:06 -07:00
|
|
|
}
|
2016-12-25 23:57:05 -08:00
|
|
|
/* Clean up the timer and callback. */
|
|
|
|
destroy_timer_callback(db->loop, callback_data);
|
2016-11-10 18:13:26 -08:00
|
|
|
}
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-11-10 18:13:26 -08:00
|
|
|
void redis_task_table_update(table_callback_data *callback_data) {
|
2016-12-25 23:57:05 -08:00
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
task *task = callback_data->data;
|
|
|
|
task_id task_id = task_task_id(task);
|
|
|
|
db_client_id local_scheduler_id = task_local_scheduler(task);
|
|
|
|
scheduling_state state = task_state(task);
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-12-25 23:57:05 -08:00
|
|
|
CHECKM(task != NULL, "NULL task passed to redis_task_table_update.");
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->context, redis_task_table_update_callback,
|
|
|
|
(void *) callback_data->timer_id, "RAY.TASK_TABLE_UPDATE %b %d %b",
|
|
|
|
task_id.id, sizeof(task_id.id), state, local_scheduler_id.id,
|
|
|
|
sizeof(local_scheduler_id.id));
|
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->context, "error in redis_task_table_update");
|
2016-10-29 15:22:33 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-25 23:57:05 -08:00
|
|
|
/* The format of the payload is described in ray_redis_module.c and is
|
|
|
|
* "<task ID> <state> <local scheduler ID> <task specification>". TODO(rkn):
|
|
|
|
* Make this code nicer. */
|
|
|
|
void parse_task_table_subscribe_callback(char *payload,
|
|
|
|
int length,
|
|
|
|
task_id *task_id,
|
|
|
|
int *state,
|
|
|
|
db_client_id *local_scheduler_id,
|
|
|
|
task_spec **spec) {
|
|
|
|
/* Note that the state is padded with spaces to consist of precisely two
|
|
|
|
* characters. */
|
|
|
|
int task_spec_payload_size =
|
|
|
|
length - sizeof(*task_id) - 1 - 2 - 1 - sizeof(*local_scheduler_id) - 1;
|
|
|
|
int offset = 0;
|
|
|
|
/* Read in the task ID. */
|
|
|
|
memcpy(task_id, &payload[offset], sizeof(*task_id));
|
|
|
|
offset += sizeof(*task_id);
|
|
|
|
/* Read in a space. */
|
|
|
|
char *space_str = " ";
|
|
|
|
CHECK(memcmp(space_str, &payload[offset], strlen(space_str)) == 0);
|
|
|
|
offset += strlen(space_str);
|
|
|
|
/* Read in the state, which is an integer left-padded with spaces to two
|
|
|
|
* characters. */
|
|
|
|
CHECK(sscanf(&payload[offset], "%2d", state) == 1);
|
|
|
|
offset += 2;
|
|
|
|
/* Read in a space. */
|
|
|
|
CHECK(memcmp(space_str, &payload[offset], strlen(space_str)) == 0);
|
|
|
|
offset += strlen(space_str);
|
|
|
|
/* Read in the local scheduler ID. */
|
|
|
|
memcpy(local_scheduler_id, &payload[offset], sizeof(*local_scheduler_id));
|
|
|
|
offset += sizeof(*local_scheduler_id);
|
|
|
|
/* Read in a space. */
|
|
|
|
CHECK(memcmp(space_str, &payload[offset], strlen(space_str)) == 0);
|
|
|
|
offset += strlen(space_str);
|
|
|
|
/* Read in the task spec. */
|
|
|
|
*spec = malloc(task_spec_payload_size);
|
|
|
|
memcpy(*spec, &payload[offset], task_spec_payload_size);
|
|
|
|
CHECK(task_spec_size(*spec) == task_spec_payload_size);
|
2016-10-29 15:22:33 -07:00
|
|
|
}
|
|
|
|
|
2016-11-10 18:13:26 -08:00
|
|
|
void redis_task_table_subscribe_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
2016-11-19 12:19:49 -08:00
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-10-29 15:22:33 -07:00
|
|
|
redisReply *reply = r;
|
|
|
|
|
|
|
|
CHECK(reply->type == REDIS_REPLY_ARRAY);
|
2016-12-25 23:57:05 -08:00
|
|
|
/* The number of elements is 3 for a reply to SUBSCRIBE, and 4 for a reply to
|
|
|
|
* PSUBSCRIBE. */
|
|
|
|
CHECKM(reply->elements == 3 || reply->elements == 4, "reply->elements is %zu",
|
|
|
|
reply->elements);
|
|
|
|
/* The first element is the message type and the last entry is the payload.
|
|
|
|
* The middle one or middle two elements describe the channel that was
|
|
|
|
* published on. */
|
|
|
|
redisReply *message_type = reply->element[0];
|
2016-11-10 18:13:26 -08:00
|
|
|
redisReply *payload = reply->element[reply->elements - 1];
|
2016-12-25 23:57:05 -08:00
|
|
|
if (strcmp(message_type->str, "message") == 0 ||
|
|
|
|
strcmp(message_type->str, "pmessage") == 0) {
|
|
|
|
/* Handle a task table event. Parse the payload and call the callback. */
|
|
|
|
task_table_subscribe_data *data = callback_data->data;
|
|
|
|
/* Read out the information from the payload. */
|
|
|
|
task_id task_id;
|
|
|
|
int state;
|
|
|
|
db_client_id local_scheduler_id;
|
|
|
|
task_spec *spec;
|
|
|
|
parse_task_table_subscribe_callback(payload->str, payload->len, &task_id,
|
|
|
|
&state, &local_scheduler_id, &spec);
|
|
|
|
task *task = alloc_task(spec, state, local_scheduler_id);
|
|
|
|
free(spec);
|
|
|
|
/* Call the subscribe callback if there is one. */
|
|
|
|
if (data->subscribe_callback != NULL) {
|
|
|
|
data->subscribe_callback(task, data->subscribe_context);
|
|
|
|
}
|
|
|
|
free_task(task);
|
|
|
|
} else if (strcmp(message_type->str, "subscribe") == 0 ||
|
|
|
|
strcmp(message_type->str, "psubscribe") == 0) {
|
|
|
|
/* If this condition is true, we got the initial message that acknowledged
|
|
|
|
* the subscription. */
|
|
|
|
if (callback_data->done_callback != NULL) {
|
2016-11-10 18:13:26 -08:00
|
|
|
task_table_done_callback done_callback = callback_data->done_callback;
|
2016-10-29 15:22:33 -07:00
|
|
|
done_callback(callback_data->id, callback_data->user_context);
|
|
|
|
}
|
|
|
|
/* Note that we do not destroy the callback data yet because the
|
|
|
|
* subscription callback needs this data. */
|
|
|
|
event_loop_remove_timer(db->loop, callback_data->timer_id);
|
2016-12-25 23:57:05 -08:00
|
|
|
} else {
|
|
|
|
LOG_FATAL(
|
|
|
|
"Unexpected reply type from task table subscribe. Message type is %s.",
|
|
|
|
message_type->str);
|
2016-10-29 15:22:33 -07:00
|
|
|
}
|
2016-09-29 21:12:06 -07:00
|
|
|
}
|
2016-10-29 15:22:33 -07:00
|
|
|
|
2016-11-10 18:13:26 -08:00
|
|
|
void redis_task_table_subscribe(table_callback_data *callback_data) {
|
2016-10-29 15:22:33 -07:00
|
|
|
db_handle *db = callback_data->db_handle;
|
2016-11-10 18:13:26 -08:00
|
|
|
task_table_subscribe_data *data = callback_data->data;
|
2016-12-25 23:57:05 -08:00
|
|
|
/* TASK_CHANNEL_PREFIX is defined in ray_redis_module.c and must be kept in
|
|
|
|
* sync with that file. */
|
|
|
|
const char *TASK_CHANNEL_PREFIX = "TT:";
|
|
|
|
int status;
|
|
|
|
if (IS_NIL_ID(data->local_scheduler_id)) {
|
2016-11-10 18:13:26 -08:00
|
|
|
/* TODO(swang): Implement the state_filter by translating the bitmask into
|
|
|
|
* a Redis key-matching pattern. */
|
2016-12-25 23:57:05 -08:00
|
|
|
status = redisAsyncCommand(
|
|
|
|
db->sub_context, redis_task_table_subscribe_callback,
|
|
|
|
(void *) callback_data->timer_id, "PSUBSCRIBE %s*:%2d",
|
|
|
|
TASK_CHANNEL_PREFIX, data->state_filter);
|
2016-09-29 21:12:06 -07:00
|
|
|
} else {
|
2016-12-25 23:57:05 -08:00
|
|
|
db_client_id local_scheduler_id = data->local_scheduler_id;
|
2016-11-10 18:13:26 -08:00
|
|
|
status = redisAsyncCommand(
|
|
|
|
db->sub_context, redis_task_table_subscribe_callback,
|
2016-12-25 23:57:05 -08:00
|
|
|
(void *) callback_data->timer_id, "SUBSCRIBE %s%b:%2d",
|
|
|
|
TASK_CHANNEL_PREFIX, (char *) local_scheduler_id.id,
|
|
|
|
sizeof(local_scheduler_id.id), data->state_filter);
|
2016-09-29 21:12:06 -07:00
|
|
|
}
|
2016-11-10 18:13:26 -08:00
|
|
|
if ((status == REDIS_ERR) || db->sub_context->err) {
|
2016-12-25 23:57:05 -08:00
|
|
|
LOG_REDIS_DEBUG(db->sub_context, "error in redis_task_table_subscribe");
|
2016-09-22 23:15:45 -07:00
|
|
|
}
|
|
|
|
}
|
2016-10-18 12:38:30 -07:00
|
|
|
|
2016-11-18 19:57:51 -08:00
|
|
|
/*
|
|
|
|
* ==== db client table callbacks ====
|
|
|
|
*/
|
|
|
|
|
|
|
|
void redis_db_client_table_subscribe_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
2016-11-19 12:19:49 -08:00
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
2016-11-18 19:57:51 -08:00
|
|
|
redisReply *reply = r;
|
|
|
|
|
|
|
|
CHECK(reply->type == REDIS_REPLY_ARRAY);
|
|
|
|
CHECK(reply->elements > 2);
|
|
|
|
/* First entry is message type, then possibly the regex we psubscribed to,
|
|
|
|
* then topic, then payload. */
|
|
|
|
redisReply *payload = reply->element[reply->elements - 1];
|
2016-12-09 00:51:44 -08:00
|
|
|
/* If this condition is true, we got the initial message that acknowledged the
|
|
|
|
* subscription. */
|
2016-11-18 19:57:51 -08:00
|
|
|
if (payload->str == NULL) {
|
|
|
|
if (callback_data->done_callback) {
|
|
|
|
db_client_table_done_callback done_callback =
|
|
|
|
callback_data->done_callback;
|
|
|
|
done_callback(callback_data->id, callback_data->user_context);
|
|
|
|
}
|
|
|
|
/* Note that we do not destroy the callback data yet because the
|
|
|
|
* subscription callback needs this data. */
|
|
|
|
event_loop_remove_timer(db->loop, callback_data->timer_id);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* Otherwise, parse the payload and call the callback. */
|
|
|
|
db_client_table_subscribe_data *data = callback_data->data;
|
|
|
|
db_client_id client;
|
2016-11-19 12:19:49 -08:00
|
|
|
memcpy(client.id, payload->str, sizeof(client.id));
|
|
|
|
/* We subtract 1 + sizeof(client.id) to compute the length of the
|
2016-11-18 19:57:51 -08:00
|
|
|
* client_type string, and we add 1 to null-terminate the string. */
|
2016-11-19 12:19:49 -08:00
|
|
|
int client_type_length = payload->len - 1 - sizeof(client.id) + 1;
|
2016-11-18 19:57:51 -08:00
|
|
|
char *client_type = malloc(client_type_length);
|
2016-12-13 17:21:38 -08:00
|
|
|
char *aux_address = malloc(client_type_length);
|
|
|
|
memset(aux_address, 0, client_type_length);
|
|
|
|
/* Published message format: <client_id:client_type aux_addr> */
|
|
|
|
int rv = sscanf(&payload->str[1 + sizeof(client.id)], "%s %s", client_type,
|
|
|
|
aux_address);
|
|
|
|
CHECKM(rv == 2,
|
|
|
|
"redis_db_client_table_subscribe_callback: expected 2 parsed args, "
|
|
|
|
"Got %d instead.",
|
|
|
|
rv);
|
2016-11-18 19:57:51 -08:00
|
|
|
if (data->subscribe_callback) {
|
2016-12-13 17:21:38 -08:00
|
|
|
data->subscribe_callback(client, client_type, aux_address,
|
|
|
|
data->subscribe_context);
|
2016-10-18 12:38:30 -07:00
|
|
|
}
|
2016-11-18 19:57:51 -08:00
|
|
|
free(client_type);
|
2016-12-13 17:21:38 -08:00
|
|
|
free(aux_address);
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
void redis_db_client_table_subscribe(table_callback_data *callback_data) {
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->sub_context, redis_db_client_table_subscribe_callback,
|
|
|
|
(void *) callback_data->timer_id, "SUBSCRIBE db_clients");
|
|
|
|
if ((status == REDIS_ERR) || db->sub_context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->sub_context,
|
|
|
|
"error in db_client_table_register_callback");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-24 20:02:25 -08:00
|
|
|
void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
|
|
|
|
|
|
|
redisReply *reply = r;
|
|
|
|
CHECK(reply->type == REDIS_REPLY_ARRAY);
|
|
|
|
CHECK(reply->elements == 3);
|
|
|
|
redisReply *message_type = reply->element[0];
|
|
|
|
LOG_DEBUG("Local scheduer table subscribe callback, message %s",
|
|
|
|
message_type->str);
|
|
|
|
|
|
|
|
if (strcmp(message_type->str, "message") == 0) {
|
|
|
|
/* Handle a local scheduler heartbeat. Parse the payload and call the
|
|
|
|
* subscribe callback. */
|
|
|
|
redisReply *payload = reply->element[2];
|
|
|
|
local_scheduler_table_subscribe_data *data = callback_data->data;
|
|
|
|
db_client_id client_id;
|
|
|
|
local_scheduler_info info;
|
|
|
|
/* The payload should be the concatenation of these two structs. */
|
|
|
|
CHECK(sizeof(client_id) + sizeof(info) == payload->len);
|
|
|
|
memcpy(&client_id, payload->str, sizeof(client_id));
|
|
|
|
memcpy(&info, payload->str + sizeof(client_id), sizeof(info));
|
|
|
|
if (data->subscribe_callback) {
|
|
|
|
data->subscribe_callback(client_id, info, data->subscribe_context);
|
|
|
|
}
|
|
|
|
} else if (strcmp(message_type->str, "subscribe") == 0) {
|
|
|
|
/* The reply for the initial SUBSCRIBE command. */
|
|
|
|
CHECK(callback_data->done_callback == NULL);
|
|
|
|
/* If the initial SUBSCRIBE was successful, clean up the timer, but don't
|
|
|
|
* destroy the callback data. */
|
|
|
|
event_loop_remove_timer(db->loop, callback_data->timer_id);
|
|
|
|
|
|
|
|
} else {
|
|
|
|
LOG_FATAL("Unexpected reply type from local scheduler subscribe.");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void redis_local_scheduler_table_subscribe(table_callback_data *callback_data) {
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->sub_context, redis_local_scheduler_table_subscribe_callback,
|
|
|
|
(void *) callback_data->timer_id, "SUBSCRIBE local_schedulers");
|
|
|
|
if ((status == REDIS_ERR) || db->sub_context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->sub_context,
|
|
|
|
"error in redis_local_scheduler_table_subscribe");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void redis_local_scheduler_table_send_info_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
|
|
|
|
|
|
|
redisReply *reply = r;
|
|
|
|
CHECK(reply->type == REDIS_REPLY_INTEGER);
|
|
|
|
LOG_DEBUG("%" PRId64 " subscribers received this publish.\n", reply->integer);
|
|
|
|
|
|
|
|
CHECK(callback_data->done_callback == NULL);
|
|
|
|
/* Clean up the timer and callback. */
|
|
|
|
destroy_timer_callback(db->loop, callback_data);
|
|
|
|
}
|
|
|
|
|
|
|
|
void redis_local_scheduler_table_send_info(table_callback_data *callback_data) {
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
local_scheduler_table_send_info_data *data = callback_data->data;
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->context, redis_local_scheduler_table_send_info_callback,
|
|
|
|
(void *) callback_data->timer_id, "PUBLISH local_schedulers %b%b",
|
|
|
|
db->client.id, sizeof(db->client.id), &data->info, sizeof(data->info));
|
|
|
|
if ((status == REDIS_ERR) || db->context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->context,
|
|
|
|
"error in redis_local_scheduler_table_send_info");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-09 00:51:44 -08:00
|
|
|
void redis_object_info_subscribe_callback(redisAsyncContext *c,
|
|
|
|
void *r,
|
|
|
|
void *privdata) {
|
|
|
|
REDIS_CALLBACK_HEADER(db, callback_data, r);
|
|
|
|
redisReply *reply = r;
|
|
|
|
|
|
|
|
CHECK(reply->type == REDIS_REPLY_ARRAY);
|
|
|
|
|
|
|
|
CHECK(reply->elements > 2);
|
|
|
|
/* First entry is message type, then possibly the regex we psubscribed to,
|
|
|
|
* then topic, then payload. */
|
|
|
|
redisReply *payload = reply->element[reply->elements - 1];
|
|
|
|
/* If this condition is true, we got the initial message that acknowledged the
|
|
|
|
* subscription. */
|
|
|
|
if (payload->str == NULL) {
|
|
|
|
if (callback_data->done_callback) {
|
|
|
|
db_client_table_done_callback done_callback =
|
|
|
|
callback_data->done_callback;
|
|
|
|
done_callback(callback_data->id, callback_data->user_context);
|
|
|
|
}
|
|
|
|
/* Note that we do not destroy the callback data yet because the
|
|
|
|
* subscription callback needs this data. */
|
|
|
|
event_loop_remove_timer(db->loop, callback_data->timer_id);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* Otherwise, parse the payload and call the callback. */
|
|
|
|
object_info_subscribe_data *data = callback_data->data;
|
|
|
|
object_id object_id;
|
|
|
|
memcpy(object_id.id, payload->str, sizeof(object_id.id));
|
|
|
|
/* payload->str should have the format: "object_id:object_size_int" */
|
|
|
|
LOG_DEBUG("obj:info channel received message <%s>", payload->str);
|
|
|
|
if (data->subscribe_callback) {
|
|
|
|
data->subscribe_callback(
|
|
|
|
object_id, strtol(&payload->str[1 + sizeof(object_id)], NULL, 10),
|
|
|
|
data->subscribe_context);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void redis_object_info_subscribe(table_callback_data *callback_data) {
|
|
|
|
db_handle *db = callback_data->db_handle;
|
|
|
|
int status = redisAsyncCommand(
|
|
|
|
db->sub_context, redis_object_info_subscribe_callback,
|
|
|
|
(void *) callback_data->timer_id, "PSUBSCRIBE obj:info");
|
|
|
|
if ((status == REDIS_ERR) || db->sub_context->err) {
|
|
|
|
LOG_REDIS_DEBUG(db->sub_context, "error in object_info_register_callback");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-11-18 19:57:51 -08:00
|
|
|
db_client_id get_db_client_id(db_handle *db) {
|
|
|
|
CHECK(db != NULL);
|
|
|
|
return db->client;
|
2016-10-18 12:38:30 -07:00
|
|
|
}
|