From 3d697c7ed2d8a16244338f284d3ab65c7d72f0ed Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 24 Dec 2016 20:02:25 -0800 Subject: [PATCH] Introduce local scheduler heartbeats which carry load information. (#155) * Introduce local scheduler heartbeats which carry load information. --- src/common/Makefile | 2 +- src/common/state/local_scheduler_table.c | 27 +++++++++ src/common/state/local_scheduler_table.h | 65 ++++++++++++++++++++ src/common/state/redis.c | 77 ++++++++++++++++++++++++ src/common/state/redis.h | 18 ++++++ src/global_scheduler/global_scheduler.c | 21 +++++++ src/photon/photon_algorithm.c | 9 +++ src/photon/photon_algorithm.h | 8 +++ src/photon/photon_scheduler.c | 19 ++++++ src/photon/photon_scheduler.h | 3 + 10 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 src/common/state/local_scheduler_table.c create mode 100644 src/common/state/local_scheduler_table.h diff --git a/src/common/Makefile b/src/common/Makefile index 2e3a37637..33de5d268 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -4,7 +4,7 @@ BUILD = build all: hiredis redis redismodule $(BUILD)/libcommon.a -$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o net.o state/redis.o state/table.o state/object_table.o state/task_table.o state/db_client_table.o thirdparty/ae/ae.o thirdparty/sha256.o +$(BUILD)/libcommon.a: event_loop.o common.o task.o io.o net.o state/redis.o state/table.o state/object_table.o state/task_table.o state/db_client_table.o state/local_scheduler_table.o thirdparty/ae/ae.o thirdparty/sha256.o ar rcs $@ $^ $(BUILD)/common_tests: test/common_tests.c $(BUILD)/libcommon.a diff --git a/src/common/state/local_scheduler_table.c b/src/common/state/local_scheduler_table.c new file mode 100644 index 000000000..bfa0d8a58 --- /dev/null +++ b/src/common/state/local_scheduler_table.c @@ -0,0 +1,27 @@ +#include "local_scheduler_table.h" +#include "redis.h" + +void local_scheduler_table_subscribe( + db_handle *db_handle, + local_scheduler_table_subscribe_callback subscribe_callback, + void *subscribe_context, + retry_info *retry) { + local_scheduler_table_subscribe_data *sub_data = + malloc(sizeof(local_scheduler_table_subscribe_data)); + sub_data->subscribe_callback = subscribe_callback; + sub_data->subscribe_context = subscribe_context; + + init_table_callback(db_handle, NIL_ID, __func__, sub_data, retry, NULL, + redis_local_scheduler_table_subscribe, NULL); +} + +void local_scheduler_table_send_info(db_handle *db_handle, + local_scheduler_info *info, + retry_info *retry) { + local_scheduler_table_send_info_data *data = + malloc(sizeof(local_scheduler_table_send_info_data)); + data->info = *info; + + init_table_callback(db_handle, NIL_ID, __func__, data, retry, NULL, + redis_local_scheduler_table_send_info, NULL); +} diff --git a/src/common/state/local_scheduler_table.h b/src/common/state/local_scheduler_table.h new file mode 100644 index 000000000..48d7d556d --- /dev/null +++ b/src/common/state/local_scheduler_table.h @@ -0,0 +1,65 @@ +#ifndef LOCAL_SCHEDULER_TABLE_H +#define LOCAL_SCHEDULER_TABLE_H + +#include "db.h" +#include "table.h" + +typedef struct { + int task_queue_length; + int available_workers; +} local_scheduler_info; + +/* + * ==== Subscribing to the local scheduler table ==== + */ + +/* Callback for subscribing to the local scheduler table. */ +typedef void (*local_scheduler_table_subscribe_callback)( + db_client_id client_id, + local_scheduler_info info, + void *user_context); + +/** + * Register a callback for a local scheduler table event. + * + * @param db_handle Database handle. + * @param subscribe_callback Callback that will be called when the local + * scheduler event happens. + * @param subscribe_context Context that will be passed into the + * subscribe_callback. + * @param retry Information about retrying the request to the database. + * @return Void. + */ +void local_scheduler_table_subscribe( + db_handle *db_handle, + local_scheduler_table_subscribe_callback subscribe_callback, + void *subscribe_context, + retry_info *retry); + +/* Data that is needed to register local scheduler table subscribe callbacks + * with the state database. */ +typedef struct { + local_scheduler_table_subscribe_callback subscribe_callback; + void *subscribe_context; +} local_scheduler_table_subscribe_data; + +/** + * Send a heartbeat to all subscriers to the local scheduler table. This + * heartbeat contains some information about the load on the local scheduler. + * + * @param db_handle Database handle. + * @param info Information about the local scheduler, including the load on the + * local scheduler. + * @param retry Information about retrying the request to the database. + */ +void local_scheduler_table_send_info(db_handle *db_handle, + local_scheduler_info *info, + retry_info *retry); + +/* Data that is needed to publish local scheduer heartbeats to the local + * scheduler table. */ +typedef struct { + local_scheduler_info info; +} local_scheduler_table_send_info_data; + +#endif /* LOCAL_SCHEDULER_TABLE_H */ diff --git a/src/common/state/redis.c b/src/common/state/redis.c index fa8e6c79e..d9043d544 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -12,6 +12,7 @@ #include "common.h" #include "db.h" #include "db_client_table.h" +#include "local_scheduler_table.h" #include "object_table.h" #include "object_info.h" #include "task.h" @@ -1081,6 +1082,82 @@ void redis_db_client_table_subscribe(table_callback_data *callback_data) { } } +void redis_local_scheduler_table_subscribe_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r); + + redisReply *reply = r; + CHECK(reply->type == REDIS_REPLY_ARRAY); + CHECK(reply->elements == 3); + redisReply *message_type = reply->element[0]; + LOG_DEBUG("Local scheduer table subscribe callback, message %s", + message_type->str); + + if (strcmp(message_type->str, "message") == 0) { + /* Handle a local scheduler heartbeat. Parse the payload and call the + * subscribe callback. */ + redisReply *payload = reply->element[2]; + local_scheduler_table_subscribe_data *data = callback_data->data; + db_client_id client_id; + local_scheduler_info info; + /* The payload should be the concatenation of these two structs. */ + CHECK(sizeof(client_id) + sizeof(info) == payload->len); + memcpy(&client_id, payload->str, sizeof(client_id)); + memcpy(&info, payload->str + sizeof(client_id), sizeof(info)); + if (data->subscribe_callback) { + data->subscribe_callback(client_id, info, data->subscribe_context); + } + } else if (strcmp(message_type->str, "subscribe") == 0) { + /* The reply for the initial SUBSCRIBE command. */ + CHECK(callback_data->done_callback == NULL); + /* If the initial SUBSCRIBE was successful, clean up the timer, but don't + * destroy the callback data. */ + event_loop_remove_timer(db->loop, callback_data->timer_id); + + } else { + LOG_FATAL("Unexpected reply type from local scheduler subscribe."); + } +} + +void redis_local_scheduler_table_subscribe(table_callback_data *callback_data) { + db_handle *db = callback_data->db_handle; + int status = redisAsyncCommand( + db->sub_context, redis_local_scheduler_table_subscribe_callback, + (void *) callback_data->timer_id, "SUBSCRIBE local_schedulers"); + if ((status == REDIS_ERR) || db->sub_context->err) { + LOG_REDIS_DEBUG(db->sub_context, + "error in redis_local_scheduler_table_subscribe"); + } +} + +void redis_local_scheduler_table_send_info_callback(redisAsyncContext *c, + void *r, + void *privdata) { + REDIS_CALLBACK_HEADER(db, callback_data, r); + + redisReply *reply = r; + CHECK(reply->type == REDIS_REPLY_INTEGER); + LOG_DEBUG("%" PRId64 " subscribers received this publish.\n", reply->integer); + + CHECK(callback_data->done_callback == NULL); + /* Clean up the timer and callback. */ + destroy_timer_callback(db->loop, callback_data); +} + +void redis_local_scheduler_table_send_info(table_callback_data *callback_data) { + db_handle *db = callback_data->db_handle; + local_scheduler_table_send_info_data *data = callback_data->data; + int status = redisAsyncCommand( + db->context, redis_local_scheduler_table_send_info_callback, + (void *) callback_data->timer_id, "PUBLISH local_schedulers %b%b", + db->client.id, sizeof(db->client.id), &data->info, sizeof(data->info)); + if ((status == REDIS_ERR) || db->context->err) { + LOG_REDIS_DEBUG(db->context, + "error in redis_local_scheduler_table_send_info"); + } +} + void redis_object_info_subscribe_callback(redisAsyncContext *c, void *r, void *privdata) { diff --git a/src/common/state/redis.h b/src/common/state/redis.h index 6c9c3d7cd..73061dcf6 100644 --- a/src/common/state/redis.h +++ b/src/common/state/redis.h @@ -218,6 +218,24 @@ void redis_task_table_subscribe(table_callback_data *callback_data); */ void redis_db_client_table_subscribe(table_callback_data *callback_data); +/** + * Subscribe to updates from the local scheduler table. + * + * @param callback_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_local_scheduler_table_subscribe(table_callback_data *callback_data); + +/** + * Publish an update to the local scheduler table. + * + * @param callback_data Data structure containing redis connection and timeout + * information. + * @return Void. + */ +void redis_local_scheduler_table_send_info(table_callback_data *callback_data); + void redis_object_info_subscribe(table_callback_data *callback_data); #endif /* REDIS_H */ diff --git a/src/global_scheduler/global_scheduler.c b/src/global_scheduler/global_scheduler.c index 18cddab4a..a8c9ad829 100644 --- a/src/global_scheduler/global_scheduler.c +++ b/src/global_scheduler/global_scheduler.c @@ -9,6 +9,7 @@ #include "net.h" #include "object_info.h" #include "state/db_client_table.h" +#include "state/local_scheduler_table.h" #include "state/object_table.h" #include "state/table.h" #include "state/task_table.h" @@ -192,6 +193,21 @@ void object_table_subscribe_callback(object_id object_id, } } +void local_scheduler_table_handler(db_client_id client_id, + local_scheduler_info info, + void *user_context) { + /* Extract global scheduler state from the callback context. */ + global_scheduler_state *state = (global_scheduler_state *) user_context; + UNUSED(state); + char id_string[ID_STRING_SIZE]; + LOG_DEBUG( + "Local scheduler heartbeat from db_client_id %s", + object_id_to_string((object_id) client_id, id_string, ID_STRING_SIZE)); + UNUSED(id_string); + LOG_DEBUG("Task queue length is %d", info.task_queue_length); + LOG_DEBUG("Num available workers is %d", info.available_workers); +} + void start_server(const char *redis_addr, int redis_port) { event_loop *loop = event_loop_create(); g_state = init_global_scheduler(loop, redis_addr, redis_port); @@ -214,6 +230,11 @@ void start_server(const char *redis_addr, int redis_port) { object_table_subscribe_to_notifications(g_state->db, true, object_table_subscribe_callback, g_state, &retry, NULL, NULL); + /* Subscribe to notifications from local schedulers. These notifications serve + * as heartbeats and contain informaion about the load on the local + * schedulers. */ + local_scheduler_table_subscribe(g_state->db, local_scheduler_table_handler, + g_state, NULL); /* Start the event loop. */ event_loop_run(loop); } diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 22ad92abd..cc861bdf1 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -5,6 +5,7 @@ #include "utlist.h" #include "state/task_table.h" +#include "state/local_scheduler_table.h" #include "state/object_table.h" #include "photon.h" #include "photon_scheduler.h" @@ -90,6 +91,14 @@ void free_scheduling_algorithm_state( free(algorithm_state); } +void provide_scheduler_info(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_info *info) { + task_queue_entry *elt; + DL_COUNT(algorithm_state->task_queue, elt, info->task_queue_length); + info->available_workers = utarray_len(algorithm_state->available_workers); +} + /** * Check if all of the remote object arguments for a task are available in the * local object store. diff --git a/src/photon/photon_algorithm.h b/src/photon/photon_algorithm.h index 905e95ad4..49f2ef981 100644 --- a/src/photon/photon_algorithm.h +++ b/src/photon/photon_algorithm.h @@ -3,6 +3,7 @@ #include "photon.h" #include "common/task.h" +#include "state/local_scheduler_table.h" /* The duration that the local scheduler will wait before reinitiating a fetch * request for a missing task dependency. TODO(rkn): We may want this to be @@ -33,6 +34,13 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void); void free_scheduling_algorithm_state( scheduling_algorithm_state *algorithm_state); +/** + * + */ +void provide_scheduler_info(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_info *info); + /** * This function will be called when a new task is submitted by a worker for * execution. diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 06d8f0007..66417c3b5 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -291,6 +291,18 @@ void handle_task_scheduled_callback(task *original_task, void *user_context) { task_task_spec(original_task)); } +int heartbeat_handler(event_loop *loop, timer_id id, void *context) { + local_scheduler_state *state = context; + scheduling_algorithm_state *algorithm_state = state->algorithm_state; + local_scheduler_info info; + /* Ask the scheduling algorithm to fill out the scheduler info struct. */ + provide_scheduler_info(state, algorithm_state, &info); + /* Publish the heartbeat to all subscribers of the local scheduler table. */ + local_scheduler_table_send_info(state->db, &info, NULL); + /* Reset the timer. */ + return LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS; +} + void start_server(const char *node_ip_address, const char *socket_name, const char *redis_addr, @@ -323,6 +335,13 @@ void start_server(const char *node_ip_address, TASK_STATUS_SCHEDULED, handle_task_scheduled_callback, NULL, &retry, NULL, NULL); } + /* Create a timer for publishing information about the load on the local + * scheduler to the local scheduler table. This message also serves as a + * heartbeat. */ + if (g_state->db != NULL) { + event_loop_add_timer(loop, LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS, + heartbeat_handler, g_state); + } /* Run event loop. */ event_loop_run(loop); } diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index b8417c443..f178d2a38 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -4,6 +4,9 @@ #include "task.h" #include "event_loop.h" +/* The duration between local scheduler heartbeats. */ +#define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 1000 + /** * Establish a connection to a new client. *