mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Logging level (#38)
* Set logging levels in Makefile using -DRAY_COMMON_LOG_LEVEL=level * Lower level of some LOG_ERROR messages, log the name of the table operation on failure * Address rest of Robert's comments * Fix spurious log message
This commit is contained in:
parent
986ed5c9e8
commit
7babe0d22f
14 changed files with 123 additions and 75 deletions
|
@ -38,6 +38,7 @@ redis:
|
|||
hiredis:
|
||||
cd thirdparty/hiredis ; make
|
||||
|
||||
test: CFLAGS += -DRAY_COMMON_LOG_LEVEL=4
|
||||
test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_table_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE
|
||||
./thirdparty/redis-3.2.3/src/redis-server &
|
||||
sleep 1s
|
||||
|
|
|
@ -16,7 +16,7 @@ unique_id globally_unique_id(void) {
|
|||
/* Use /dev/urandom for "real" randomness. */
|
||||
int fd;
|
||||
if ((fd = open("/dev/urandom", O_RDONLY)) == -1) {
|
||||
LOG_ERR("Could not generate random number");
|
||||
LOG_ERROR("Could not generate random number");
|
||||
}
|
||||
unique_id result;
|
||||
read(fd, &result.id[0], UNIQUE_ID_SIZE);
|
||||
|
|
|
@ -11,20 +11,49 @@
|
|||
|
||||
#include "utarray.h"
|
||||
|
||||
#ifndef RAY_COMMON_DEBUG
|
||||
#define RAY_COMMON_DEBUG 0
|
||||
#define RAY_COMMON_INFO 1
|
||||
#define RAY_COMMON_WARNING 2
|
||||
#define RAY_COMMON_ERROR 3
|
||||
#define RAY_COMMON_FATAL 4
|
||||
|
||||
/* Default logging level is INFO. */
|
||||
#ifndef RAY_COMMON_LOG_LEVEL
|
||||
#define RAY_COMMON_LOG_LEVEL RAY_COMMON_INFO
|
||||
#endif
|
||||
|
||||
#if (RAY_COMMON_LOG_LEVEL > RAY_COMMON_DEBUG)
|
||||
#define LOG_DEBUG(M, ...)
|
||||
#else
|
||||
#define LOG_DEBUG(M, ...) \
|
||||
fprintf(stderr, "[DEBUG] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
|
||||
#endif
|
||||
|
||||
#define LOG_ERR(M, ...) \
|
||||
fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, \
|
||||
errno == 0 ? "None" : strerror(errno), ##__VA_ARGS__)
|
||||
|
||||
#if (RAY_COMMON_LOG_LEVEL > RAY_COMMON_INFO)
|
||||
#define LOG_INFO(M, ...)
|
||||
#else
|
||||
#define LOG_INFO(M, ...) \
|
||||
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
|
||||
#endif
|
||||
|
||||
#if (RAY_COMMON_LOG_LEVEL > RAY_COMMON_WARNING)
|
||||
#define LOG_WARN(M, ...)
|
||||
#else
|
||||
#define LOG_WARN(M, ...) \
|
||||
fprintf(stderr, "[WARN] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
|
||||
#endif
|
||||
|
||||
#if (RAY_COMMON_LOG_LEVEL > RAY_COMMON_ERROR)
|
||||
#define LOG_ERROR(M, ...)
|
||||
#else
|
||||
#define LOG_ERROR(M, ...) \
|
||||
fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, \
|
||||
errno == 0 ? "None" : strerror(errno), ##__VA_ARGS__)
|
||||
#endif
|
||||
|
||||
#if (RAY_COMMON_LOG_LEVEL > RAY_COMMON_FATAL)
|
||||
#define LOG_FATAL(M, ...)
|
||||
#else
|
||||
#define LOG_FATAL(M, ...) \
|
||||
do { \
|
||||
fprintf(stderr, "[FATAL] (%s:%d) " M "\n", __FILE__, __LINE__, \
|
||||
|
@ -34,10 +63,11 @@
|
|||
backtrace_symbols_fd(buffer, calls, 1); \
|
||||
exit(-1); \
|
||||
} while (0);
|
||||
#endif
|
||||
|
||||
#define CHECKM(COND, M, ...) \
|
||||
if (!(COND)) { \
|
||||
LOG_ERR("Check failure: %s \n" M, #COND, ##__VA_ARGS__); \
|
||||
#define CHECKM(COND, M, ...) \
|
||||
if (!(COND)) { \
|
||||
LOG_FATAL("Check failure: %s \n" M, #COND, ##__VA_ARGS__); \
|
||||
}
|
||||
|
||||
#define CHECK(COND) CHECKM(COND, "")
|
||||
|
|
|
@ -33,7 +33,7 @@ int bind_inet_sock(const int port, bool shall_listen) {
|
|||
struct sockaddr_in name;
|
||||
int socket_fd = socket(PF_INET, SOCK_STREAM, 0);
|
||||
if (socket_fd < 0) {
|
||||
LOG_ERR("socket() failed for port %d.", port);
|
||||
LOG_ERROR("socket() failed for port %d.", port);
|
||||
return -1;
|
||||
}
|
||||
name.sin_family = AF_INET;
|
||||
|
@ -42,22 +42,22 @@ int bind_inet_sock(const int port, bool shall_listen) {
|
|||
int on = 1;
|
||||
/* TODO(pcm): http://stackoverflow.com/q/1150635 */
|
||||
if (ioctl(socket_fd, FIONBIO, (char *) &on) < 0) {
|
||||
LOG_ERR("ioctl failed");
|
||||
LOG_ERROR("ioctl failed");
|
||||
close(socket_fd);
|
||||
return -1;
|
||||
}
|
||||
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
|
||||
LOG_ERR("setsockopt failed for port %d", port);
|
||||
LOG_ERROR("setsockopt failed for port %d", port);
|
||||
close(socket_fd);
|
||||
return -1;
|
||||
}
|
||||
if (bind(socket_fd, (struct sockaddr *) &name, sizeof(name)) < 0) {
|
||||
LOG_ERR("Bind failed for port %d", port);
|
||||
LOG_ERROR("Bind failed for port %d", port);
|
||||
close(socket_fd);
|
||||
return -1;
|
||||
}
|
||||
if (shall_listen && listen(socket_fd, 5) == -1) {
|
||||
LOG_ERR("Could not listen to socket %d", port);
|
||||
LOG_ERROR("Could not listen to socket %d", port);
|
||||
close(socket_fd);
|
||||
return -1;
|
||||
}
|
||||
|
@ -77,14 +77,14 @@ int bind_ipc_sock(const char *socket_pathname, bool shall_listen) {
|
|||
struct sockaddr_un socket_address;
|
||||
int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (socket_fd < 0) {
|
||||
LOG_ERR("socket() failed for pathname %s.", socket_pathname);
|
||||
LOG_ERROR("socket() failed for pathname %s.", socket_pathname);
|
||||
return -1;
|
||||
}
|
||||
/* Tell the system to allow the port to be reused. */
|
||||
int on = 1;
|
||||
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on,
|
||||
sizeof(on)) < 0) {
|
||||
LOG_ERR("setsockopt failed for pathname %s", socket_pathname);
|
||||
LOG_ERROR("setsockopt failed for pathname %s", socket_pathname);
|
||||
close(socket_fd);
|
||||
return -1;
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ int bind_ipc_sock(const char *socket_pathname, bool shall_listen) {
|
|||
memset(&socket_address, 0, sizeof(struct sockaddr_un));
|
||||
socket_address.sun_family = AF_UNIX;
|
||||
if (strlen(socket_pathname) + 1 > sizeof(socket_address.sun_path)) {
|
||||
LOG_ERR("Socket pathname is too long.");
|
||||
LOG_ERROR("Socket pathname is too long.");
|
||||
close(socket_fd);
|
||||
return -1;
|
||||
}
|
||||
|
@ -102,12 +102,12 @@ int bind_ipc_sock(const char *socket_pathname, bool shall_listen) {
|
|||
|
||||
if (bind(socket_fd, (struct sockaddr *) &socket_address,
|
||||
sizeof(struct sockaddr_un)) != 0) {
|
||||
LOG_ERR("Bind failed for pathname %s.", socket_pathname);
|
||||
LOG_ERROR("Bind failed for pathname %s.", socket_pathname);
|
||||
close(socket_fd);
|
||||
return -1;
|
||||
}
|
||||
if (shall_listen && listen(socket_fd, 5) == -1) {
|
||||
LOG_ERR("Could not listen to socket %s", socket_pathname);
|
||||
LOG_ERROR("Could not listen to socket %s", socket_pathname);
|
||||
close(socket_fd);
|
||||
return -1;
|
||||
}
|
||||
|
@ -125,14 +125,14 @@ int connect_ipc_sock(const char *socket_pathname) {
|
|||
|
||||
socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (socket_fd < 0) {
|
||||
LOG_ERR("socket() failed for pathname %s.", socket_pathname);
|
||||
LOG_ERROR("socket() failed for pathname %s.", socket_pathname);
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset(&socket_address, 0, sizeof(struct sockaddr_un));
|
||||
socket_address.sun_family = AF_UNIX;
|
||||
if (strlen(socket_pathname) + 1 > sizeof(socket_address.sun_path)) {
|
||||
LOG_ERR("Socket pathname is too long.");
|
||||
LOG_ERROR("Socket pathname is too long.");
|
||||
return -1;
|
||||
}
|
||||
strncpy(socket_address.sun_path, socket_pathname,
|
||||
|
@ -140,7 +140,7 @@ int connect_ipc_sock(const char *socket_pathname) {
|
|||
|
||||
if (connect(socket_fd, (struct sockaddr *) &socket_address,
|
||||
sizeof(struct sockaddr_un)) != 0) {
|
||||
LOG_ERR("Connection to socket failed for pathname %s.", socket_pathname);
|
||||
LOG_ERROR("Connection to socket failed for pathname %s.", socket_pathname);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -154,7 +154,7 @@ int connect_ipc_sock(const char *socket_pathname) {
|
|||
int accept_client(int socket_fd) {
|
||||
int client_fd = accept(socket_fd, NULL, NULL);
|
||||
if (client_fd < 0) {
|
||||
LOG_ERR("Error reading from socket.");
|
||||
LOG_ERROR("Error reading from socket.");
|
||||
return -1;
|
||||
}
|
||||
return client_fd;
|
||||
|
|
|
@ -6,8 +6,8 @@ void object_table_lookup(db_handle *db_handle,
|
|||
retry_info *retry,
|
||||
object_table_lookup_done_callback done_callback,
|
||||
void *user_context) {
|
||||
init_table_callback(db_handle, object_id, NULL, retry, done_callback,
|
||||
redis_object_table_lookup, user_context);
|
||||
init_table_callback(db_handle, object_id, __func__, NULL, retry,
|
||||
done_callback, redis_object_table_lookup, user_context);
|
||||
}
|
||||
|
||||
void object_table_add(db_handle *db_handle,
|
||||
|
@ -15,8 +15,8 @@ void object_table_add(db_handle *db_handle,
|
|||
retry_info *retry,
|
||||
object_table_done_callback done_callback,
|
||||
void *user_context) {
|
||||
init_table_callback(db_handle, object_id, NULL, retry, done_callback,
|
||||
redis_object_table_add, user_context);
|
||||
init_table_callback(db_handle, object_id, __func__, NULL, retry,
|
||||
done_callback, redis_object_table_add, user_context);
|
||||
}
|
||||
|
||||
void object_table_subscribe(
|
||||
|
@ -33,8 +33,9 @@ void object_table_subscribe(
|
|||
sub_data->object_available_callback = object_available_callback;
|
||||
sub_data->subscribe_context = subscribe_context;
|
||||
|
||||
init_table_callback(db_handle, object_id, sub_data, retry, done_callback,
|
||||
redis_object_table_subscribe, user_context);
|
||||
init_table_callback(db_handle, object_id, __func__, sub_data, retry,
|
||||
done_callback, redis_object_table_subscribe,
|
||||
user_context);
|
||||
}
|
||||
|
||||
void result_table_add(db_handle *db_handle,
|
||||
|
@ -45,8 +46,8 @@ void result_table_add(db_handle *db_handle,
|
|||
void *user_context) {
|
||||
task_id *task_id_copy = malloc(sizeof(task_id));
|
||||
memcpy(task_id_copy, task_id_arg.id, sizeof(task_id));
|
||||
init_table_callback(db_handle, object_id, task_id_copy, retry, done_callback,
|
||||
redis_result_table_add, user_context);
|
||||
init_table_callback(db_handle, object_id, __func__, task_id_copy, retry,
|
||||
done_callback, redis_result_table_add, user_context);
|
||||
}
|
||||
|
||||
void result_table_lookup(db_handle *db_handle,
|
||||
|
@ -54,6 +55,6 @@ void result_table_lookup(db_handle *db_handle,
|
|||
retry_info *retry,
|
||||
result_table_lookup_callback done_callback,
|
||||
void *user_context) {
|
||||
init_table_callback(db_handle, object_id, NULL, retry, done_callback,
|
||||
redis_result_table_lookup, user_context);
|
||||
init_table_callback(db_handle, object_id, __func__, NULL, retry,
|
||||
done_callback, redis_result_table_lookup, user_context);
|
||||
}
|
||||
|
|
|
@ -16,8 +16,11 @@
|
|||
#include "redis.h"
|
||||
#include "io.h"
|
||||
|
||||
#define LOG_REDIS_ERR(context, M, ...) \
|
||||
LOG_INFO("Redis error %d %s; %s", context->err, context->errstr, M)
|
||||
#define LOG_REDIS_ERROR(context, M, ...) \
|
||||
LOG_ERROR("Redis error %d %s; %s", context->err, context->errstr, M)
|
||||
|
||||
#define LOG_REDIS_DEBUG(context, M, ...) \
|
||||
LOG_DEBUG("Redis error %d %s; %s", context->err, context->errstr, M)
|
||||
|
||||
#define CHECK_REDIS_CONNECT(CONTEXT_TYPE, context, M, ...) \
|
||||
do { \
|
||||
|
@ -26,7 +29,7 @@
|
|||
LOG_FATAL("could not allocate redis context"); \
|
||||
} \
|
||||
if (_context->err) { \
|
||||
LOG_REDIS_ERR(_context, M, ##__VA_ARGS__); \
|
||||
LOG_REDIS_ERROR(_context, M, ##__VA_ARGS__); \
|
||||
exit(-1); \
|
||||
} \
|
||||
} while (0);
|
||||
|
@ -202,7 +205,7 @@ void redis_object_table_add(table_callback_data *callback_data) {
|
|||
(void *) callback_data->timer_id, "SADD obj:%b %d",
|
||||
id.id, sizeof(object_id), db->client_id);
|
||||
if ((status == REDIS_ERR) || db->context->err) {
|
||||
LOG_REDIS_ERR(db->context, "could not add object_table entry");
|
||||
LOG_REDIS_DEBUG(db->context, "could not add object_table entry");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -216,7 +219,7 @@ void redis_object_table_lookup(table_callback_data *callback_data) {
|
|||
(void *) callback_data->timer_id,
|
||||
"SMEMBERS obj:%b", id.id, sizeof(object_id));
|
||||
if ((status == REDIS_ERR) || db->context->err) {
|
||||
LOG_REDIS_ERR(db->context, "error in object_table lookup");
|
||||
LOG_REDIS_DEBUG(db->context, "error in object_table lookup");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -247,7 +250,7 @@ void redis_result_table_add(table_callback_data *callback_data) {
|
|||
"SET result:%b %b", id.id, sizeof(object_id),
|
||||
(*result_task_id).id, sizeof(task_id));
|
||||
if ((status == REDIS_ERR) || db->context->err) {
|
||||
LOG_REDIS_ERR(db->context, "Error in result table add");
|
||||
LOG_REDIS_DEBUG(db->context, "Error in result table add");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -292,11 +295,11 @@ void redis_result_table_lookup_object_callback(redisAsyncContext *c,
|
|||
(void *) callback_data->timer_id, "HGETALL task:%b",
|
||||
(*result_task_id).id, sizeof(task_id));
|
||||
if ((status == REDIS_ERR) || db->context->err) {
|
||||
LOG_REDIS_ERR(db->context, "Could not look up result table entry");
|
||||
LOG_REDIS_DEBUG(db->context, "Could not look up result table entry");
|
||||
}
|
||||
} else if (reply->type == REDIS_REPLY_NIL) {
|
||||
/* The object with the requested ID was not in the table. */
|
||||
LOG_ERR("Object's result not in table.");
|
||||
LOG_INFO("Object's result not in table.");
|
||||
result_table_lookup_callback done_callback = callback_data->done_callback;
|
||||
if (done_callback) {
|
||||
done_callback(callback_data->id, NULL, callback_data->user_context);
|
||||
|
@ -318,7 +321,7 @@ void redis_result_table_lookup(table_callback_data *callback_data) {
|
|||
(void *) callback_data->timer_id, "GET result:%b",
|
||||
id.id, sizeof(object_id));
|
||||
if ((status == REDIS_ERR) || db->context->err) {
|
||||
LOG_REDIS_ERR(db->context, "Error in result table lookup");
|
||||
LOG_REDIS_DEBUG(db->context, "Error in result table lookup");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -413,8 +416,8 @@ void redis_object_table_subscribe(table_callback_data *callback_data) {
|
|||
"SUBSCRIBE __keyspace@0__:%b add", id.id,
|
||||
sizeof(object_id));
|
||||
if ((status == REDIS_ERR) || db->sub_context->err) {
|
||||
LOG_REDIS_ERR(db->sub_context,
|
||||
"error in redis_object_table_subscribe_callback");
|
||||
LOG_REDIS_DEBUG(db->sub_context,
|
||||
"error in redis_object_table_subscribe_callback");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -453,7 +456,7 @@ void redis_task_table_get_task(table_callback_data *callback_data) {
|
|||
(void *) callback_data->timer_id, "HGETALL task:%b",
|
||||
id.id, sizeof(task_id));
|
||||
if ((status == REDIS_ERR) || db->sub_context->err) {
|
||||
LOG_REDIS_ERR(db->sub_context, "Could not get task from task table");
|
||||
LOG_REDIS_DEBUG(db->sub_context, "Could not get task from task table");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -506,7 +509,7 @@ void redis_task_table_publish(table_callback_data *callback_data,
|
|||
(char *) spec, task_spec_size(spec));
|
||||
}
|
||||
if ((status = REDIS_ERR) || db->context->err) {
|
||||
LOG_REDIS_ERR(db->context, "error setting task in task_table_add_task");
|
||||
LOG_REDIS_DEBUG(db->context, "error setting task in task_table_add_task");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -518,8 +521,8 @@ void redis_task_table_publish(table_callback_data *callback_data,
|
|||
task_size(task));
|
||||
|
||||
if ((status == REDIS_ERR) || db->context->err) {
|
||||
LOG_REDIS_ERR(db->context,
|
||||
"error publishing task in task_table_add_task");
|
||||
LOG_REDIS_DEBUG(db->context,
|
||||
"error publishing task in task_table_add_task");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -535,6 +538,7 @@ void redis_task_table_update(table_callback_data *callback_data) {
|
|||
void redis_task_table_publish_push_callback(redisAsyncContext *c,
|
||||
void *r,
|
||||
void *privdata) {
|
||||
LOG_DEBUG("Calling publish push callback");
|
||||
REDIS_CALLBACK_HEADER(db, callback_data, r)
|
||||
CHECK(callback_data->requests_info != NULL);
|
||||
((bool *) callback_data->requests_info)[PUSH_INDEX] = true;
|
||||
|
@ -551,6 +555,7 @@ void redis_task_table_publish_push_callback(redisAsyncContext *c,
|
|||
void redis_task_table_publish_publish_callback(redisAsyncContext *c,
|
||||
void *r,
|
||||
void *privdata) {
|
||||
LOG_DEBUG("Calling publish publish callback");
|
||||
REDIS_CALLBACK_HEADER(db, callback_data, r)
|
||||
CHECK(callback_data->requests_info != NULL);
|
||||
((bool *) callback_data->requests_info)[PUBLISH_INDEX] = true;
|
||||
|
@ -617,7 +622,7 @@ void redis_task_table_subscribe(table_callback_data *callback_data) {
|
|||
(char *) node.id, sizeof(node_id), data->state_filter);
|
||||
}
|
||||
if ((status == REDIS_ERR) || db->sub_context->err) {
|
||||
LOG_REDIS_ERR(db->sub_context, "error in task_table_register_callback");
|
||||
LOG_REDIS_DEBUG(db->sub_context, "error in task_table_register_callback");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
table_callback_data *init_table_callback(db_handle *db_handle,
|
||||
unique_id id,
|
||||
const char *label,
|
||||
void *data,
|
||||
retry_info *retry,
|
||||
table_done_callback done_callback,
|
||||
|
@ -17,6 +18,7 @@ table_callback_data *init_table_callback(db_handle *db_handle,
|
|||
table_callback_data *callback_data = malloc(sizeof(table_callback_data));
|
||||
CHECKM(callback_data != NULL, "Memory allocation error!")
|
||||
callback_data->id = id;
|
||||
callback_data->label = label;
|
||||
callback_data->retry = *retry;
|
||||
callback_data->done_callback = done_callback;
|
||||
callback_data->retry_callback = retry_callback;
|
||||
|
@ -30,6 +32,8 @@ table_callback_data *init_table_callback(db_handle *db_handle,
|
|||
(event_loop_timer_handler) table_timeout_handler, callback_data);
|
||||
outstanding_callbacks_add(callback_data);
|
||||
|
||||
LOG_DEBUG("Initializing table command %s with timer ID %" PRId64,
|
||||
callback_data->label, callback_data->timer_id);
|
||||
callback_data->retry_callback(callback_data);
|
||||
|
||||
return callback_data;
|
||||
|
@ -67,7 +71,8 @@ int64_t table_timeout_handler(event_loop *loop,
|
|||
if (callback_data->retry.num_retries == 0) {
|
||||
/* We didn't get a response from the database after exhausting all retries;
|
||||
* let user know, cleanup the state, and remove the timer. */
|
||||
LOG_ERR("Table command with timer ID %ld failed", timer_id);
|
||||
LOG_WARN("Table command %s with timer ID %" PRId64 " failed",
|
||||
callback_data->label, timer_id);
|
||||
if (callback_data->retry.fail_callback) {
|
||||
callback_data->retry.fail_callback(
|
||||
callback_data->id, callback_data->user_context, callback_data->data);
|
||||
|
|
|
@ -44,6 +44,8 @@ struct table_callback_data {
|
|||
/** ID of the entry in the table that we are going to look up, remove or add.
|
||||
*/
|
||||
unique_id id;
|
||||
/** A label to identify the original request for logging purposes. */
|
||||
const char *label;
|
||||
/** The callback that will be called when results is returned. */
|
||||
table_done_callback done_callback;
|
||||
/** The callback that will be called to initiate the next try. */
|
||||
|
@ -86,6 +88,8 @@ int64_t table_timeout_handler(event_loop *loop,
|
|||
*
|
||||
* @param db_handle Database handle.
|
||||
* @param id ID of the object that is looked up, added or removed.
|
||||
* @param label A string label to identify the type of table request for
|
||||
* logging purposes.
|
||||
* @param data Data entered into the table. Shall be freed by the user.
|
||||
* @param retry Retry relevant information: retry timeout, number of remaining
|
||||
* retries, and retry callback.
|
||||
|
@ -98,6 +102,7 @@ int64_t table_timeout_handler(event_loop *loop,
|
|||
*/
|
||||
table_callback_data *init_table_callback(db_handle *db_handle,
|
||||
unique_id id,
|
||||
const char *label,
|
||||
void *data,
|
||||
retry_info *retry,
|
||||
table_done_callback done_callback,
|
||||
|
|
|
@ -8,7 +8,7 @@ void task_table_get_task(db_handle *db_handle,
|
|||
retry_info *retry,
|
||||
task_table_get_callback done_callback,
|
||||
void *user_context) {
|
||||
init_table_callback(db_handle, task_id, NULL, retry, done_callback,
|
||||
init_table_callback(db_handle, task_id, __func__, NULL, retry, done_callback,
|
||||
redis_task_table_get_task, user_context);
|
||||
}
|
||||
|
||||
|
@ -17,8 +17,8 @@ void task_table_add_task(db_handle *db_handle,
|
|||
retry_info *retry,
|
||||
task_table_done_callback done_callback,
|
||||
void *user_context) {
|
||||
init_table_callback(db_handle, task_task_id(task), task, retry, done_callback,
|
||||
redis_task_table_add_task, user_context);
|
||||
init_table_callback(db_handle, task_task_id(task), __func__, task, retry,
|
||||
done_callback, redis_task_table_add_task, user_context);
|
||||
}
|
||||
|
||||
void task_table_update(db_handle *db_handle,
|
||||
|
@ -26,8 +26,8 @@ void task_table_update(db_handle *db_handle,
|
|||
retry_info *retry,
|
||||
task_table_done_callback done_callback,
|
||||
void *user_context) {
|
||||
init_table_callback(db_handle, task_task_id(task), task, retry, done_callback,
|
||||
redis_task_table_update, user_context);
|
||||
init_table_callback(db_handle, task_task_id(task), __func__, task, retry,
|
||||
done_callback, redis_task_table_update, user_context);
|
||||
}
|
||||
|
||||
/* TODO(swang): A corresponding task_table_unsubscribe. */
|
||||
|
@ -47,6 +47,6 @@ void task_table_subscribe(db_handle *db_handle,
|
|||
sub_data->subscribe_callback = subscribe_callback;
|
||||
sub_data->subscribe_context = subscribe_context;
|
||||
|
||||
init_table_callback(db_handle, node, sub_data, retry, done_callback,
|
||||
init_table_callback(db_handle, node, __func__, sub_data, retry, done_callback,
|
||||
redis_task_table_subscribe, user_context);
|
||||
}
|
||||
|
|
|
@ -8,6 +8,13 @@
|
|||
#include "photon.h"
|
||||
#include "photon_scheduler.h"
|
||||
|
||||
/* TODO(swang): We should set retry values in a config file somewhere. */
|
||||
const retry_info photon_retry = {
|
||||
.num_retries = 0,
|
||||
.timeout = 1000,
|
||||
.fail_callback = NULL,
|
||||
};
|
||||
|
||||
typedef struct task_queue_entry {
|
||||
task *task;
|
||||
struct task_queue_entry *prev;
|
||||
|
@ -147,13 +154,9 @@ void handle_task_submitted(scheduler_info *info,
|
|||
DL_APPEND(s->task_queue, elt);
|
||||
}
|
||||
/* Submit the task to redis. */
|
||||
/* TODO(swang): We should set retry values in a config file somewhere. */
|
||||
retry_info retry = {
|
||||
.num_retries = 0, .timeout = 0, .fail_callback = NULL,
|
||||
};
|
||||
/* TODO(swang): This should be task_table_update if the task is already in the
|
||||
* log. */
|
||||
task_table_add_task(info->db, task, &retry, NULL, NULL);
|
||||
task_table_add_task(info->db, task, (retry_info *) &photon_retry, NULL, NULL);
|
||||
if (schedule_locally) {
|
||||
/* If the task was scheduled locally, we need to free it. Otherwise,
|
||||
* ownership of the task is passed to the task_queue, and it will be freed
|
||||
|
|
|
@ -6,7 +6,7 @@ BUILD = build
|
|||
all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example $(BUILD)/libplasma_client.a
|
||||
|
||||
debug: FORCE
|
||||
debug: CFLAGS += -DRAY_COMMON_DEBUG=1
|
||||
debug: CFLAGS += -DRAY_COMMON_LOG_LEVEL=0
|
||||
debug: all
|
||||
|
||||
clean:
|
||||
|
@ -34,8 +34,8 @@ $(BUILD)/example: plasma_client.c plasma.h example.c fling.h fling.c common
|
|||
common: FORCE
|
||||
cd ../common; make
|
||||
|
||||
# Set the request timeout low for testing purposes.
|
||||
test: CFLAGS += -DRAY_TIMEOUT=50
|
||||
# Set the request timeout low and logging level at FATAL for testing purposes.
|
||||
test: CFLAGS += -DRAY_TIMEOUT=50 -DRAY_COMMON_LOG_LEVEL=4
|
||||
# First, build and run all the unit tests.
|
||||
test: $(BUILD)/manager_tests FORCE
|
||||
./build/manager_tests
|
||||
|
|
|
@ -61,11 +61,11 @@ int create_buffer(int64_t size) {
|
|||
return -1;
|
||||
}
|
||||
if (unlink(file_name) != 0) {
|
||||
LOG_ERR("unlink error");
|
||||
LOG_ERROR("unlink error");
|
||||
return -1;
|
||||
}
|
||||
if (ftruncate(fd, (off_t) size) != 0) {
|
||||
LOG_ERR("ftruncate error");
|
||||
LOG_ERROR("ftruncate error");
|
||||
return -1;
|
||||
}
|
||||
return fd;
|
||||
|
@ -78,6 +78,7 @@ void *fake_mmap(size_t size) {
|
|||
size += sizeof(size_t);
|
||||
|
||||
int fd = create_buffer(size);
|
||||
CHECKM(fd >= 0, "Failed to create buffer during mmap");
|
||||
void *pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
if (pointer == MAP_FAILED) {
|
||||
return pointer;
|
||||
|
|
|
@ -415,13 +415,11 @@ bool plasma_manager_is_connected(plasma_connection *conn) {
|
|||
int plasma_manager_try_connect(const char *ip_addr, int port) {
|
||||
int fd = socket(PF_INET, SOCK_STREAM, 0);
|
||||
if (fd < 0) {
|
||||
LOG_ERR("could not create socket");
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct hostent *manager = gethostbyname(ip_addr); /* TODO(pcm): cache this */
|
||||
if (!manager) {
|
||||
LOG_ERR("plasma manager %s not found", ip_addr);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -432,10 +430,6 @@ int plasma_manager_try_connect(const char *ip_addr, int port) {
|
|||
|
||||
int r = connect(fd, (struct sockaddr *) &addr, sizeof(addr));
|
||||
if (r < 0) {
|
||||
LOG_ERR(
|
||||
"could not establish connection to manager with id %s:%d (may have run "
|
||||
"out of ports)",
|
||||
&ip_addr[0], port);
|
||||
return -1;
|
||||
}
|
||||
return fd;
|
||||
|
@ -454,6 +448,9 @@ int plasma_manager_connect(const char *ip_addr, int port) {
|
|||
/* Sleep for 100 milliseconds. */
|
||||
usleep(100000);
|
||||
}
|
||||
if (fd < 0) {
|
||||
LOG_WARN("Unable to connect to plasma manager at %s:%d", ip_addr, port);
|
||||
}
|
||||
return fd;
|
||||
}
|
||||
|
||||
|
@ -489,7 +486,7 @@ void plasma_fetch(plasma_connection *conn,
|
|||
nbytes = recv(conn->manager_conn, (uint8_t *) &reply, sizeof(reply),
|
||||
MSG_WAITALL);
|
||||
if (nbytes < 0) {
|
||||
LOG_ERR("Error while waiting for manager response in fetch");
|
||||
LOG_ERROR("Error while waiting for manager response in fetch");
|
||||
success = 0;
|
||||
} else if (nbytes == 0) {
|
||||
success = 0;
|
||||
|
|
|
@ -326,7 +326,7 @@ void write_object_chunk(client_connection *conn, plasma_request_buffer *buf) {
|
|||
|
||||
if (r != s) {
|
||||
if (r > 0) {
|
||||
LOG_ERR("partial write on fd %d", conn->fd);
|
||||
LOG_ERROR("partial write on fd %d", conn->fd);
|
||||
} else {
|
||||
/* TODO(swang): This should not be a fatal error, since connections can
|
||||
* close at any time. */
|
||||
|
@ -405,7 +405,7 @@ int read_object_chunk(client_connection *conn, plasma_request_buffer *buf) {
|
|||
r = read(conn->fd, buf->data + conn->cursor, s);
|
||||
|
||||
if (r == -1) {
|
||||
LOG_ERR("read error");
|
||||
LOG_ERROR("read error");
|
||||
} else if (r == 0) {
|
||||
LOG_DEBUG("end of file");
|
||||
} else {
|
||||
|
|
Loading…
Add table
Reference in a new issue