2016-09-13 18:54:26 -07:00
|
|
|
/* Redis implementation of the global state store */
|
|
|
|
|
|
|
|
#include <assert.h>
|
|
|
|
|
2016-09-23 22:53:58 -07:00
|
|
|
#include <stdlib.h>
|
|
|
|
#include "hiredis/adapters/ae.h"
|
2016-09-22 23:15:45 -07:00
|
|
|
#include "utstring.h"
|
|
|
|
|
2016-09-13 18:54:26 -07:00
|
|
|
#include "common.h"
|
|
|
|
#include "db.h"
|
|
|
|
#include "object_table.h"
|
2016-09-20 17:02:56 -07:00
|
|
|
#include "task_queue.h"
|
2016-09-13 18:54:26 -07:00
|
|
|
#include "event_loop.h"
|
|
|
|
#include "redis.h"
|
2016-09-20 22:40:35 -07:00
|
|
|
#include "io.h"
|
2016-09-13 18:54:26 -07:00
|
|
|
|
|
|
|
#define LOG_REDIS_ERR(context, M, ...) \
|
|
|
|
fprintf(stderr, "[ERROR] (%s:%d: message: %s) " M "\n", __FILE__, __LINE__, \
|
|
|
|
context->errstr, ##__VA_ARGS__)
|
|
|
|
|
|
|
|
#define CHECK_REDIS_CONNECT(CONTEXT_TYPE, context, M, ...) \
|
|
|
|
do { \
|
|
|
|
CONTEXT_TYPE *_context = (context); \
|
|
|
|
if (!_context) { \
|
|
|
|
LOG_ERR("could not allocate redis context"); \
|
|
|
|
exit(-1); \
|
|
|
|
} \
|
|
|
|
if (_context->err) { \
|
|
|
|
LOG_REDIS_ERR(_context, M, ##__VA_ARGS__); \
|
|
|
|
exit(-1); \
|
|
|
|
} \
|
|
|
|
} while (0);
|
|
|
|
|
2016-09-25 21:52:06 -07:00
|
|
|
db_handle *db_connect(const char *address,
|
|
|
|
int port,
|
|
|
|
const char *client_type,
|
|
|
|
const char *client_addr,
|
|
|
|
int client_port) {
|
|
|
|
db_handle *db = malloc(sizeof(db_handle));
|
2016-09-13 18:54:26 -07:00
|
|
|
/* Sync connection for initial handshake */
|
|
|
|
redisReply *reply;
|
|
|
|
long long num_clients;
|
|
|
|
redisContext *context = redisConnect(address, port);
|
|
|
|
CHECK_REDIS_CONNECT(redisContext, context, "could not connect to redis %s:%d",
|
|
|
|
address, port);
|
|
|
|
/* Add new client using optimistic locking. */
|
|
|
|
while (1) {
|
|
|
|
reply = redisCommand(context, "WATCH %s", client_type);
|
|
|
|
freeReplyObject(reply);
|
|
|
|
reply = redisCommand(context, "HLEN %s", client_type);
|
|
|
|
num_clients = reply->integer;
|
|
|
|
freeReplyObject(reply);
|
|
|
|
reply = redisCommand(context, "MULTI");
|
|
|
|
freeReplyObject(reply);
|
|
|
|
reply = redisCommand(context, "HSET %s %lld %s:%d", client_type,
|
|
|
|
num_clients, client_addr, client_port);
|
|
|
|
freeReplyObject(reply);
|
|
|
|
reply = redisCommand(context, "EXEC");
|
|
|
|
if (reply) {
|
|
|
|
freeReplyObject(reply);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
freeReplyObject(reply);
|
|
|
|
}
|
|
|
|
|
|
|
|
db->client_type = strdup(client_type);
|
|
|
|
db->client_id = num_clients;
|
|
|
|
db->reading = 0;
|
|
|
|
db->writing = 0;
|
2016-09-20 17:02:56 -07:00
|
|
|
db->service_cache = NULL;
|
|
|
|
db->sync_context = context;
|
2016-09-13 18:54:26 -07:00
|
|
|
|
|
|
|
/* Establish async connection */
|
|
|
|
db->context = redisAsyncConnect(address, port);
|
|
|
|
CHECK_REDIS_CONNECT(redisAsyncContext, db->context,
|
|
|
|
"could not connect to redis %s:%d", address, port);
|
|
|
|
db->context->data = (void *) db;
|
2016-09-25 21:52:06 -07:00
|
|
|
return db;
|
2016-09-13 18:54:26 -07:00
|
|
|
}
|
|
|
|
|
2016-09-25 21:52:06 -07:00
|
|
|
void db_disconnect(db_handle *db) {
|
2016-09-20 17:02:56 -07:00
|
|
|
redisFree(db->sync_context);
|
|
|
|
redisAsyncFree(db->context);
|
|
|
|
service_cache_entry *e, *tmp;
|
|
|
|
HASH_ITER(hh, db->service_cache, e, tmp) {
|
|
|
|
free(e->addr);
|
|
|
|
HASH_DEL(db->service_cache, e);
|
|
|
|
free(e);
|
|
|
|
}
|
|
|
|
free(db->client_type);
|
2016-09-25 21:52:06 -07:00
|
|
|
free(db);
|
2016-09-20 17:02:56 -07:00
|
|
|
}
|
|
|
|
|
2016-09-25 21:52:06 -07:00
|
|
|
void db_attach(db_handle *db, event_loop *loop) {
|
2016-09-23 22:53:58 -07:00
|
|
|
redisAeAttach(loop, db->context);
|
2016-09-13 18:54:26 -07:00
|
|
|
}
|
|
|
|
|
2016-09-25 21:52:06 -07:00
|
|
|
void object_table_add(db_handle *db, unique_id object_id) {
|
2016-09-13 18:54:26 -07:00
|
|
|
static char hex_object_id[2 * UNIQUE_ID_SIZE + 1];
|
|
|
|
sha1_to_hex(&object_id.id[0], &hex_object_id[0]);
|
|
|
|
redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%s %d",
|
2016-09-20 17:02:56 -07:00
|
|
|
&hex_object_id[0], db->client_id);
|
2016-09-13 18:54:26 -07:00
|
|
|
if (db->context->err) {
|
|
|
|
LOG_REDIS_ERR(db->context, "could not add object_table entry");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-09-20 17:02:56 -07:00
|
|
|
void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) {
|
2016-09-25 21:52:06 -07:00
|
|
|
db_handle *db = c->data;
|
2016-09-20 17:02:56 -07:00
|
|
|
lookup_callback_data *cb_data = privdata;
|
2016-09-13 18:54:26 -07:00
|
|
|
redisReply *reply = r;
|
|
|
|
if (reply == NULL)
|
|
|
|
return;
|
2016-09-20 17:02:56 -07:00
|
|
|
int *result = malloc(reply->elements * sizeof(int));
|
|
|
|
int64_t manager_count = reply->elements;
|
|
|
|
if (reply->type == REDIS_REPLY_ARRAY) {
|
|
|
|
for (int j = 0; j < reply->elements; j++) {
|
|
|
|
CHECK(reply->element[j]->type == REDIS_REPLY_STRING);
|
|
|
|
result[j] = atoi(reply->element[j]->str);
|
|
|
|
service_cache_entry *entry;
|
|
|
|
HASH_FIND_INT(db->service_cache, &result[j], entry);
|
|
|
|
if (!entry) {
|
|
|
|
redisReply *reply = redisCommand(db->sync_context, "HGET %s %lld",
|
|
|
|
db->client_type, result[j]);
|
|
|
|
CHECK(reply->type == REDIS_REPLY_STRING);
|
|
|
|
entry = malloc(sizeof(service_cache_entry));
|
|
|
|
entry->service_id = result[j];
|
|
|
|
entry->addr = strdup(reply->str);
|
|
|
|
HASH_ADD_INT(db->service_cache, service_id, entry);
|
|
|
|
freeReplyObject(reply);
|
|
|
|
}
|
|
|
|
}
|
2016-09-13 18:54:26 -07:00
|
|
|
} else {
|
|
|
|
LOG_ERR("expected integer or string, received type %d", reply->type);
|
|
|
|
exit(-1);
|
|
|
|
}
|
2016-09-20 17:02:56 -07:00
|
|
|
const char **manager_vector = malloc(manager_count * sizeof(char *));
|
|
|
|
for (int j = 0; j < manager_count; ++j) {
|
|
|
|
service_cache_entry *entry;
|
|
|
|
HASH_FIND_INT(db->service_cache, &result[j], entry);
|
|
|
|
manager_vector[j] = entry->addr;
|
|
|
|
}
|
|
|
|
cb_data->callback(cb_data->object_id, manager_count, manager_vector);
|
|
|
|
free(privdata);
|
|
|
|
free(result);
|
2016-09-13 18:54:26 -07:00
|
|
|
}
|
|
|
|
|
2016-09-25 21:52:06 -07:00
|
|
|
void object_table_lookup(db_handle *db,
|
2016-09-20 17:02:56 -07:00
|
|
|
object_id object_id,
|
2016-09-13 18:54:26 -07:00
|
|
|
lookup_callback callback) {
|
|
|
|
static char hex_object_id[2 * UNIQUE_ID_SIZE + 1];
|
|
|
|
sha1_to_hex(&object_id.id[0], &hex_object_id[0]);
|
2016-09-20 17:02:56 -07:00
|
|
|
lookup_callback_data *cb_data = malloc(sizeof(lookup_callback_data));
|
|
|
|
cb_data->callback = callback;
|
|
|
|
cb_data->object_id = object_id;
|
|
|
|
redisAsyncCommand(db->context, object_table_get_entry, cb_data,
|
|
|
|
"SMEMBERS obj:%s", &hex_object_id[0]);
|
2016-09-13 18:54:26 -07:00
|
|
|
if (db->context->err) {
|
|
|
|
LOG_REDIS_ERR(db->context, "error in object_table lookup");
|
|
|
|
}
|
|
|
|
}
|
2016-09-20 22:40:35 -07:00
|
|
|
|
2016-09-25 21:52:06 -07:00
|
|
|
void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task) {
|
2016-09-22 23:15:45 -07:00
|
|
|
/* For converting an id to hex, which has double the number
|
|
|
|
* of bytes compared to the id (+ 1 byte for '\0'). */
|
|
|
|
static char hex[2 * UNIQUE_ID_SIZE + 1];
|
|
|
|
UT_string *command;
|
|
|
|
utstring_new(command);
|
|
|
|
sha1_to_hex(&task_iid.id[0], &hex[0]);
|
2016-09-25 16:51:24 -07:00
|
|
|
utstring_printf(command, "HMSET queue:%s ", &hex[0]);
|
2016-09-22 23:15:45 -07:00
|
|
|
print_task(task, command);
|
|
|
|
redisAsyncCommand(db->context, NULL, NULL, utstring_body(command));
|
|
|
|
if (db->context->err) {
|
|
|
|
LOG_REDIS_ERR(db->context, "error in task_queue submit_task");
|
|
|
|
}
|
|
|
|
utstring_free(command);
|
|
|
|
}
|