Refactor local scheduler to remove worker indices. (#245)

* Refactor local scheduler to remove worker indices.

* Change scheduling state enum to int in all function signatures.

* Bug fix, don't use pointers into a resizable array.

* Remove total_num_workers.

* Fix tests.
This commit is contained in:
Robert Nishihara 2017-02-05 14:52:28 -08:00 committed by Philipp Moritz
parent ca254b8689
commit 2d1c980ad7
12 changed files with 135 additions and 120 deletions

View file

@ -785,7 +785,7 @@ void redis_task_table_add_task(table_callback_data *callback_data) {
task *task = callback_data->data; task *task = callback_data->data;
task_id task_id = task_task_id(task); task_id task_id = task_task_id(task);
db_client_id local_scheduler_id = task_local_scheduler(task); db_client_id local_scheduler_id = task_local_scheduler(task);
scheduling_state state = task_state(task); int state = task_state(task);
task_spec *spec = task_task_spec(task); task_spec *spec = task_task_spec(task);
CHECKM(task != NULL, "NULL task passed to redis_task_table_add_task."); CHECKM(task != NULL, "NULL task passed to redis_task_table_add_task.");
@ -821,7 +821,7 @@ void redis_task_table_update(table_callback_data *callback_data) {
task *task = callback_data->data; task *task = callback_data->data;
task_id task_id = task_task_id(task); task_id task_id = task_task_id(task);
db_client_id local_scheduler_id = task_local_scheduler(task); db_client_id local_scheduler_id = task_local_scheduler(task);
scheduling_state state = task_state(task); int state = task_state(task);
CHECKM(task != NULL, "NULL task passed to redis_task_table_update."); CHECKM(task != NULL, "NULL task passed to redis_task_table_update.");
int status = redisAsyncCommand( int status = redisAsyncCommand(

View file

@ -32,8 +32,8 @@ void task_table_update(db_handle *db_handle,
void task_table_test_and_update(db_handle *db_handle, void task_table_test_and_update(db_handle *db_handle,
task_id task_id, task_id task_id,
scheduling_state test_state, int test_state,
scheduling_state update_state, int update_state,
retry_info *retry, retry_info *retry,
task_table_get_callback done_callback, task_table_get_callback done_callback,
void *user_context) { void *user_context) {
@ -51,7 +51,7 @@ void task_table_test_and_update(db_handle *db_handle,
/* TODO(swang): A corresponding task_table_unsubscribe. */ /* TODO(swang): A corresponding task_table_unsubscribe. */
void task_table_subscribe(db_handle *db_handle, void task_table_subscribe(db_handle *db_handle,
db_client_id local_scheduler_id, db_client_id local_scheduler_id,
scheduling_state state_filter, int state_filter,
task_table_subscribe_callback subscribe_callback, task_table_subscribe_callback subscribe_callback,
void *subscribe_context, void *subscribe_context,
retry_info *retry, retry_info *retry,

View file

@ -108,16 +108,16 @@ void task_table_update(db_handle *db_handle,
*/ */
void task_table_test_and_update(db_handle *db_handle, void task_table_test_and_update(db_handle *db_handle,
task_id task_id, task_id task_id,
scheduling_state test_state, int test_state,
scheduling_state update_state, int update_state,
retry_info *retry, retry_info *retry,
task_table_get_callback done_callback, task_table_get_callback done_callback,
void *user_context); void *user_context);
/* Data that is needed to test and set the task's scheduling state. */ /* Data that is needed to test and set the task's scheduling state. */
typedef struct { typedef struct {
scheduling_state test_state; int test_state;
scheduling_state update_state; int update_state;
db_client_id local_scheduler_id; db_client_id local_scheduler_id;
} task_table_test_and_update_data; } task_table_test_and_update_data;
@ -153,7 +153,7 @@ typedef void (*task_table_subscribe_callback)(task *task, void *user_context);
*/ */
void task_table_subscribe(db_handle *db_handle, void task_table_subscribe(db_handle *db_handle,
db_client_id local_scheduler_id, db_client_id local_scheduler_id,
scheduling_state state_filter, int state_filter,
task_table_subscribe_callback subscribe_callback, task_table_subscribe_callback subscribe_callback,
void *subscribe_context, void *subscribe_context,
retry_info *retry, retry_info *retry,
@ -164,7 +164,7 @@ void task_table_subscribe(db_handle *db_handle,
* database. */ * database. */
typedef struct { typedef struct {
db_client_id local_scheduler_id; db_client_id local_scheduler_id;
scheduling_state state_filter; int state_filter;
task_table_subscribe_callback subscribe_callback; task_table_subscribe_callback subscribe_callback;
void *subscribe_context; void *subscribe_context;
} task_table_subscribe_data; } task_table_subscribe_data;

View file

@ -299,14 +299,15 @@ void print_task(task_spec *spec, UT_string *output) {
/* TASK INSTANCES */ /* TASK INSTANCES */
struct task_impl { struct task_impl {
scheduling_state state; /** The scheduling state of the task. */
int state;
/** The ID of the local scheduler involved. */
db_client_id local_scheduler_id; db_client_id local_scheduler_id;
/** The task specification for this task. */
task_spec spec; task_spec spec;
}; };
task *alloc_task(task_spec *spec, task *alloc_task(task_spec *spec, int state, db_client_id local_scheduler_id) {
scheduling_state state,
db_client_id local_scheduler_id) {
int64_t size = sizeof(task) - sizeof(task_spec) + task_spec_size(spec); int64_t size = sizeof(task) - sizeof(task_spec) + task_spec_size(spec);
task *result = malloc(size); task *result = malloc(size);
memset(result, 0, size); memset(result, 0, size);
@ -328,11 +329,11 @@ int64_t task_size(task *task_arg) {
return sizeof(task) - sizeof(task_spec) + task_spec_size(&task_arg->spec); return sizeof(task) - sizeof(task_spec) + task_spec_size(&task_arg->spec);
} }
scheduling_state task_state(task *task) { int task_state(task *task) {
return task->state; return task->state;
} }
void task_set_state(task *task, scheduling_state state) { void task_set_state(task *task, int state) {
task->state = state; task->state = state;
} }

View file

@ -291,9 +291,7 @@ typedef struct task_impl task;
* @param local_scheduler_id The ID of the local scheduler that the task is * @param local_scheduler_id The ID of the local scheduler that the task is
* scheduled on, if any. * scheduled on, if any.
*/ */
task *alloc_task(task_spec *spec, task *alloc_task(task_spec *spec, int state, db_client_id local_scheduler_id);
scheduling_state state,
db_client_id local_scheduler_id);
/** /**
* Create a copy of the task. Must be freed with free_task after use. * Create a copy of the task. Must be freed with free_task after use.
@ -307,10 +305,10 @@ task *copy_task(task *other);
int64_t task_size(task *task); int64_t task_size(task *task);
/** The scheduling state of the task. */ /** The scheduling state of the task. */
scheduling_state task_state(task *task); int task_state(task *task);
/** Update the schedule state of the task. */ /** Update the schedule state of the task. */
void task_set_state(task *task, scheduling_state state); void task_set_state(task *task, int state);
/** Local scheduler this task has been assigned to or is running on. */ /** Local scheduler this task has been assigned to or is running on. */
db_client_id task_local_scheduler(task *task); db_client_id task_local_scheduler(task *task);

View file

@ -44,7 +44,7 @@ static inline task_spec *example_task_spec(int64_t num_args,
static inline task *example_task_with_args(int64_t num_args, static inline task *example_task_with_args(int64_t num_args,
int64_t num_returns, int64_t num_returns,
scheduling_state task_state, int task_state,
object_id arg_ids[]) { object_id arg_ids[]) {
task_spec *spec = example_task_spec_with_args(num_args, num_returns, arg_ids); task_spec *spec = example_task_spec_with_args(num_args, num_returns, arg_ids);
task *instance = alloc_task(spec, task_state, NIL_ID); task *instance = alloc_task(spec, task_state, NIL_ID);
@ -54,7 +54,7 @@ static inline task *example_task_with_args(int64_t num_args,
static inline task *example_task(int64_t num_args, static inline task *example_task(int64_t num_args,
int64_t num_returns, int64_t num_returns,
scheduling_state task_state) { int task_state) {
task_spec *spec = example_task_spec(num_args, num_returns); task_spec *spec = example_task_spec(num_args, num_returns);
task *instance = alloc_task(spec, task_state, NIL_ID); task *instance = alloc_task(spec, task_state, NIL_ID);
free_task_spec(spec); free_task_spec(spec);

View file

@ -42,16 +42,6 @@ typedef struct {
UT_icd task_ptr_icd; UT_icd task_ptr_icd;
UT_icd worker_icd; UT_icd worker_icd;
/** Association between the socket fd of a worker and its worker_index. */
typedef struct {
/** The socket fd of a worker. */
int sock;
/** The index of the worker in scheduler_info->workers. */
int64_t worker_index;
/** Handle for the hash table. */
UT_hash_handle hh;
} worker_index;
/** Internal state of the scheduling algorithm. */ /** Internal state of the scheduling algorithm. */
typedef struct scheduling_algorithm_state scheduling_algorithm_state; typedef struct scheduling_algorithm_state scheduling_algorithm_state;
@ -71,11 +61,9 @@ typedef struct {
local_scheduler_config config; local_scheduler_config config;
/** The local scheduler event loop. */ /** The local scheduler event loop. */
event_loop *loop; event_loop *loop;
/** Association between client socket and worker index. */ /** List of workers available to this node. This is used to free the worker
worker_index *worker_index; * structs when we free the scheduler state and also to access the worker
/** List of workers available to this node. The index into this array * structs in the tests. */
* is the worker_index and is used to identify workers throughout
* the program. */
UT_array *workers; UT_array *workers;
/** The handle to the database. */ /** The handle to the database. */
db_handle *db; db_handle *db;
@ -88,4 +76,16 @@ typedef struct {
UT_array *input_buffer; UT_array *input_buffer;
} local_scheduler_state; } local_scheduler_state;
/** Contains all information associated with a local scheduler client. */
typedef struct {
/** The socket used to communicate with the client. */
int sock;
/** A pointer to the task object that is currently running on this client. If
* no task is running on the worker, this will be NULL. This is used to
* update the task table. */
task *task_in_progress;
/** A pointer to the local scheduler state. */
local_scheduler_state *local_scheduler_state;
} local_scheduler_client;
#endif /* PHOTON_H */ #endif /* PHOTON_H */

View file

@ -33,6 +33,9 @@ typedef struct {
UT_icd task_queue_entry_icd = {sizeof(task_queue_entry *), NULL, NULL, NULL}; UT_icd task_queue_entry_icd = {sizeof(task_queue_entry *), NULL, NULL, NULL};
/** This is used to define the queue of available workers. */
UT_icd worker_icd = {sizeof(local_scheduler_client *), NULL, NULL, NULL};
/** Part of the photon state that is maintained by the scheduling algorithm. */ /** Part of the photon state that is maintained by the scheduling algorithm. */
struct scheduling_algorithm_state { struct scheduling_algorithm_state {
/** An array of pointers to tasks that are waiting for dependencies. */ /** An array of pointers to tasks that are waiting for dependencies. */
@ -64,35 +67,42 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void) {
/* Initialize the local data structures used for queuing tasks and workers. */ /* Initialize the local data structures used for queuing tasks and workers. */
algorithm_state->waiting_task_queue = NULL; algorithm_state->waiting_task_queue = NULL;
algorithm_state->dispatch_task_queue = NULL; algorithm_state->dispatch_task_queue = NULL;
utarray_new(algorithm_state->available_workers, &ut_int_icd); utarray_new(algorithm_state->available_workers, &worker_icd);
return algorithm_state; return algorithm_state;
} }
void free_scheduling_algorithm_state( void free_scheduling_algorithm_state(
scheduling_algorithm_state *algorithm_state) { scheduling_algorithm_state *algorithm_state) {
/* Free all of the tasks in the waiting queue. */
task_queue_entry *elt, *tmp1; task_queue_entry *elt, *tmp1;
DL_FOREACH_SAFE(algorithm_state->waiting_task_queue, elt, tmp1) { DL_FOREACH_SAFE(algorithm_state->waiting_task_queue, elt, tmp1) {
DL_DELETE(algorithm_state->waiting_task_queue, elt); DL_DELETE(algorithm_state->waiting_task_queue, elt);
free_task_spec(elt->spec); free_task_spec(elt->spec);
free(elt); free(elt);
} }
/* Free all the tasks in the dispatch queue. */
DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp1) { DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp1) {
DL_DELETE(algorithm_state->dispatch_task_queue, elt); DL_DELETE(algorithm_state->dispatch_task_queue, elt);
free_task_spec(elt->spec); free_task_spec(elt->spec);
free(elt); free(elt);
} }
/* Free the list of available workers. */
utarray_free(algorithm_state->available_workers); utarray_free(algorithm_state->available_workers);
/* Free the cached information about which objects are present locally. */
object_entry *obj_entry, *tmp_obj_entry; object_entry *obj_entry, *tmp_obj_entry;
HASH_ITER(hh, algorithm_state->local_objects, obj_entry, tmp_obj_entry) { HASH_ITER(hh, algorithm_state->local_objects, obj_entry, tmp_obj_entry) {
HASH_DELETE(hh, algorithm_state->local_objects, obj_entry); HASH_DELETE(hh, algorithm_state->local_objects, obj_entry);
CHECK(obj_entry->dependent_tasks == NULL); CHECK(obj_entry->dependent_tasks == NULL);
free(obj_entry); free(obj_entry);
} }
/* Free the cached information about which objects are currently being
* fetched. */
HASH_ITER(hh, algorithm_state->remote_objects, obj_entry, tmp_obj_entry) { HASH_ITER(hh, algorithm_state->remote_objects, obj_entry, tmp_obj_entry) {
HASH_DELETE(hh, algorithm_state->remote_objects, obj_entry); HASH_DELETE(hh, algorithm_state->remote_objects, obj_entry);
utarray_free(obj_entry->dependent_tasks); utarray_free(obj_entry->dependent_tasks);
free(obj_entry); free(obj_entry);
} }
/* Free the algorithm state. */
free(algorithm_state); free(algorithm_state);
} }
@ -259,10 +269,10 @@ void dispatch_tasks(local_scheduler_state *state,
DL_DELETE(algorithm_state->dispatch_task_queue, dispatched_task); DL_DELETE(algorithm_state->dispatch_task_queue, dispatched_task);
/* Get the last available worker in the available worker queue. */ /* Get the last available worker in the available worker queue. */
int *worker_index = local_scheduler_client **worker = (local_scheduler_client **) utarray_back(
(int *) utarray_back(algorithm_state->available_workers); algorithm_state->available_workers);
/* Tell the available worker to execute the task. */ /* Tell the available worker to execute the task. */
assign_task_to_worker(state, dispatched_task->spec, *worker_index); assign_task_to_worker(state, dispatched_task->spec, *worker);
/* Remove the available worker from the queue and free the struct. */ /* Remove the available worker from the queue and free the struct. */
utarray_pop_back(algorithm_state->available_workers); utarray_pop_back(algorithm_state->available_workers);
free_task_spec(dispatched_task->spec); free_task_spec(dispatched_task->spec);
@ -458,18 +468,16 @@ void handle_task_scheduled(local_scheduler_state *state,
void handle_worker_available(local_scheduler_state *state, void handle_worker_available(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state, scheduling_algorithm_state *algorithm_state,
int worker_index) { local_scheduler_client *worker) {
worker *available_worker = CHECK(worker->task_in_progress == NULL);
(worker *) utarray_eltptr(state->workers, worker_index); for (local_scheduler_client **p = (local_scheduler_client **) utarray_front(
CHECK(available_worker->task_in_progress == NULL); algorithm_state->available_workers);
for (int *p = (int *) utarray_front(algorithm_state->available_workers); p != NULL; p = (local_scheduler_client **) utarray_next(
p != NULL; algorithm_state->available_workers, p)) {
p = (int *) utarray_next(algorithm_state->available_workers, p)) { DCHECK(*p != worker);
DCHECK(*p != worker_index);
} }
/* Add worker to the list of available workers. */ /* Add worker to the list of available workers. */
utarray_push_back(algorithm_state->available_workers, &worker_index); utarray_push_back(algorithm_state->available_workers, &worker);
LOG_DEBUG("Adding worker_index %d to available workers", worker_index);
/* Try to dispatch tasks, since we now have available workers to assign them /* Try to dispatch tasks, since we now have available workers to assign them
* to. */ * to. */

View file

@ -101,12 +101,12 @@ void handle_object_removed(local_scheduler_state *state, object_id object_id);
* *
* @param state The state of the local scheduler. * @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm. * @param algorithm_state State maintained by the scheduling algorithm.
* @param worker_index The index of the worker that becomes available. * @param worker The worker that is available.
* @return Void. * @return Void.
*/ */
void handle_worker_available(local_scheduler_state *state, void handle_worker_available(local_scheduler_state *state,
scheduling_algorithm_state *algorithm_state, scheduling_algorithm_state *algorithm_state,
int worker_index); local_scheduler_client *worker);
/** /**
* This function fetches queued task's missing object dependencies. It is * This function fetches queued task's missing object dependencies. It is

View file

@ -21,7 +21,7 @@
#include "uthash.h" #include "uthash.h"
UT_icd task_ptr_icd = {sizeof(task *), NULL, NULL, NULL}; UT_icd task_ptr_icd = {sizeof(task *), NULL, NULL, NULL};
UT_icd worker_icd = {sizeof(worker), NULL, NULL, NULL}; UT_icd workers_icd = {sizeof(local_scheduler_client *), NULL, NULL, NULL};
UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL};
@ -46,9 +46,8 @@ local_scheduler_state *init_local_scheduler(
state->config.global_scheduler_exists = global_scheduler_exists; state->config.global_scheduler_exists = global_scheduler_exists;
state->loop = loop; state->loop = loop;
state->worker_index = NULL; /* Initialize the list of workers. */
/* Add scheduler info. */ utarray_new(state->workers, &workers_icd);
utarray_new(state->workers, &worker_icd);
/* Connect to Redis if a Redis address is provided. */ /* Connect to Redis if a Redis address is provided. */
if (redis_addr != NULL) { if (redis_addr != NULL) {
int num_args; int num_args;
@ -84,48 +83,60 @@ local_scheduler_state *init_local_scheduler(
process_plasma_notification, state); process_plasma_notification, state);
/* Add scheduler state. */ /* Add scheduler state. */
state->algorithm_state = make_scheduling_algorithm_state(); state->algorithm_state = make_scheduling_algorithm_state();
/* Add the input buffer. This is used to read in messages from clients without
* having to reallocate a new buffer every time. */
utarray_new(state->input_buffer, &byte_icd); utarray_new(state->input_buffer, &byte_icd);
return state; return state;
}; };
void free_local_scheduler(local_scheduler_state *state) { void free_local_scheduler(local_scheduler_state *state) {
/* Free the command for starting new workers. */
if (state->config.start_worker_command != NULL) { if (state->config.start_worker_command != NULL) {
free(state->config.start_worker_command); free(state->config.start_worker_command);
state->config.start_worker_command = NULL; state->config.start_worker_command = NULL;
} }
/* Disconnect from the database. */
if (state->db != NULL) { if (state->db != NULL) {
db_disconnect(state->db); db_disconnect(state->db);
state->db = NULL;
} }
/* Disconnect from plasma. */
plasma_disconnect(state->plasma_conn); plasma_disconnect(state->plasma_conn);
state->plasma_conn = NULL;
worker_index *current_worker_index, *temp_worker_index; /* Free the list of workers and any tasks that are still in progress on those
HASH_ITER(hh, state->worker_index, current_worker_index, temp_worker_index) { * workers. */
HASH_DEL(state->worker_index, current_worker_index); for (int i = 0; i < utarray_len(state->workers); ++i) {
free(current_worker_index); local_scheduler_client **worker =
} (local_scheduler_client **) utarray_eltptr(state->workers, i);
if ((*worker)->task_in_progress != NULL) {
worker *w; free_task((*worker)->task_in_progress);
for (w = (worker *) utarray_front(state->workers); w != NULL; (*worker)->task_in_progress = NULL;
w = (worker *) utarray_next(state->workers, w)) {
if (w->task_in_progress) {
free_task(w->task_in_progress);
} }
free(*worker);
*worker = NULL;
} }
utarray_free(state->workers); utarray_free(state->workers);
state->workers = NULL;
/* Free the algorithm state. */
free_scheduling_algorithm_state(state->algorithm_state); free_scheduling_algorithm_state(state->algorithm_state);
state->algorithm_state = NULL;
/* Free the input buffer. */
utarray_free(state->input_buffer); utarray_free(state->input_buffer);
state->input_buffer = NULL;
/* Destroy the event loop. */
event_loop_destroy(state->loop); event_loop_destroy(state->loop);
state->loop = NULL;
/* Free the scheduler state. */
free(state); free(state);
} }
void assign_task_to_worker(local_scheduler_state *state, void assign_task_to_worker(local_scheduler_state *state,
task_spec *spec, task_spec *spec,
int worker_index) { local_scheduler_client *worker) {
CHECK(worker_index < utarray_len(state->workers)); if (write_message(worker->sock, EXECUTE_TASK, task_spec_size(spec),
worker *w = (worker *) utarray_eltptr(state->workers, worker_index);
if (write_message(w->sock, EXECUTE_TASK, task_spec_size(spec),
(uint8_t *) spec) < 0) { (uint8_t *) spec) < 0) {
if (errno == EPIPE || errno == EBADF) { if (errno == EPIPE || errno == EBADF) {
/* TODO(rkn): If this happens, the task should be added back to the task /* TODO(rkn): If this happens, the task should be added back to the task
@ -133,9 +144,9 @@ void assign_task_to_worker(local_scheduler_state *state,
LOG_WARN( LOG_WARN(
"Failed to give task to worker on fd %d. The client may have hung " "Failed to give task to worker on fd %d. The client may have hung "
"up.", "up.",
w->sock); worker->sock);
} else { } else {
LOG_FATAL("Failed to give task to client on fd %d.", w->sock); LOG_FATAL("Failed to give task to client on fd %d.", worker->sock);
} }
} }
/* Update the global task table. */ /* Update the global task table. */
@ -147,7 +158,7 @@ void assign_task_to_worker(local_scheduler_state *state,
/* Record which task this worker is executing. This will be freed in /* Record which task this worker is executing. This will be freed in
* process_message when the worker sends a GET_TASK message to the local * process_message when the worker sends a GET_TASK message to the local
* scheduler. */ * scheduler. */
w->task_in_progress = copy_task(task); worker->task_in_progress = copy_task(task);
} }
} }
@ -251,7 +262,8 @@ void process_message(event_loop *loop,
int client_sock, int client_sock,
void *context, void *context,
int events) { int events) {
local_scheduler_state *state = context; local_scheduler_client *worker = context;
local_scheduler_state *state = worker->local_scheduler_state;
int64_t type; int64_t type;
int64_t length = read_buffer(client_sock, &type, state->input_buffer); int64_t length = read_buffer(client_sock, &type, state->input_buffer);
@ -290,22 +302,21 @@ void process_message(event_loop *loop,
free(value); free(value);
} break; } break;
case GET_TASK: { case GET_TASK: {
worker_index *wi;
HASH_FIND_INT(state->worker_index, &client_sock, wi);
/* Update the task table with the completed task. */ /* Update the task table with the completed task. */
worker *available_worker = if (state->db != NULL && worker->task_in_progress != NULL) {
(worker *) utarray_eltptr(state->workers, wi->worker_index); task_set_state(worker->task_in_progress, TASK_STATUS_DONE);
if (state->db != NULL && available_worker->task_in_progress != NULL) { task_table_update(state->db, worker->task_in_progress,
task_set_state(available_worker->task_in_progress, TASK_STATUS_DONE);
task_table_update(state->db, available_worker->task_in_progress,
(retry_info *) &photon_retry, NULL, NULL); (retry_info *) &photon_retry, NULL, NULL);
/* The call to task_table_update takes ownership of the task_in_progress, /* The call to task_table_update takes ownership of the task_in_progress,
* so we set the pointer to NULL so it is not used. */ * so we set the pointer to NULL so it is not used. */
available_worker->task_in_progress = NULL; worker->task_in_progress = NULL;
} else if (worker->task_in_progress != NULL) {
free_task(worker->task_in_progress);
worker->task_in_progress = NULL;
} }
/* Let the scheduling algorithm process the fact that there is an available /* Let the scheduling algorithm process the fact that there is an available
* worker. */ * worker. */
handle_worker_available(state, state->algorithm_state, wi->worker_index); handle_worker_available(state, state->algorithm_state, worker);
} break; } break;
case RECONSTRUCT_OBJECT: { case RECONSTRUCT_OBJECT: {
object_id *obj_id = (object_id *) utarray_front(state->input_buffer); object_id *obj_id = (object_id *) utarray_front(state->input_buffer);
@ -329,19 +340,16 @@ void new_client_connection(event_loop *loop,
int events) { int events) {
local_scheduler_state *state = context; local_scheduler_state *state = context;
int new_socket = accept_client(listener_sock); int new_socket = accept_client(listener_sock);
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, /* Create a struct for this worker. This will be freed when we free the local
state); * scheduler state. */
LOG_DEBUG("new connection with fd %d", new_socket); local_scheduler_client *worker = malloc(sizeof(local_scheduler_client));
/* Add worker to list of workers. */ worker->sock = new_socket;
/* TODO(pcm): Where shall we free this? */ worker->task_in_progress = NULL;
worker_index *new_worker_index = malloc(sizeof(worker_index)); worker->local_scheduler_state = state;
new_worker_index->sock = new_socket;
new_worker_index->worker_index = utarray_len(state->workers);
HASH_ADD_INT(state->worker_index, sock, new_worker_index);
worker worker;
memset(&worker, 0, sizeof(worker));
worker.sock = new_socket;
utarray_push_back(state->workers, &worker); utarray_push_back(state->workers, &worker);
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message,
worker);
LOG_DEBUG("new connection with fd %d", new_socket);
} }
/* We need this code so we can clean up when we get a SIGTERM signal. */ /* We need this code so we can clean up when we get a SIGTERM signal. */

View file

@ -28,14 +28,12 @@ void new_client_connection(event_loop *loop,
* *
* @param info * @param info
* @param task The task that is submitted to the worker. * @param task The task that is submitted to the worker.
* @param worker_index The index of the worker the task is submitted to. * @param worker The worker to assign the task to.
* @param from_global_scheduler True if the task was assigned to the local
* scheduler by the global scheduler and false otherwise.
* @return Void. * @return Void.
*/ */
void assign_task_to_worker(local_scheduler_state *state, void assign_task_to_worker(local_scheduler_state *state,
task_spec *task, task_spec *task,
int worker_index); local_scheduler_client *worker);
/** /**
* This is the callback that is used to process a notification from the Plasma * This is the callback that is used to process a notification from the Plasma

View file

@ -86,10 +86,8 @@ void destroy_photon_mock(photon_mock *mock) {
free(mock); free(mock);
} }
void reset_worker(photon_mock *mock, int worker_index) { void reset_worker(photon_mock *mock, local_scheduler_client *worker) {
worker *available_worker = worker->task_in_progress = NULL;
(worker *) utarray_eltptr(mock->photon_state->workers, worker_index);
available_worker->task_in_progress = NULL;
} }
/** /**
@ -326,7 +324,9 @@ TEST task_dependency_test(void) {
photon_mock *photon = init_photon_mock(false); photon_mock *photon = init_photon_mock(false);
local_scheduler_state *state = photon->photon_state; local_scheduler_state *state = photon->photon_state;
scheduling_algorithm_state *algorithm_state = state->algorithm_state; scheduling_algorithm_state *algorithm_state = state->algorithm_state;
int worker_index = 0; /* Get the first worker. */
local_scheduler_client *worker =
*((local_scheduler_client **) utarray_eltptr(state->workers, 0));
task_spec *spec = example_task_spec(1, 1); task_spec *spec = example_task_spec(1, 1);
object_id oid = task_arg_id(spec, 0); object_id oid = task_arg_id(spec, 0);
@ -340,23 +340,23 @@ TEST task_dependency_test(void) {
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1);
/* Once a worker is available, the task gets assigned. */ /* Once a worker is available, the task gets assigned. */
handle_worker_available(state, algorithm_state, worker_index); handle_worker_available(state, algorithm_state, worker);
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0);
reset_worker(photon, worker_index); reset_worker(photon, worker);
/* Check that the task gets queued in the waiting queue if the task is /* Check that the task gets queued in the waiting queue if the task is
* submitted and a worker is available, but the input is not. */ * submitted and a worker is available, but the input is not. */
handle_object_removed(state, oid); handle_object_removed(state, oid);
handle_task_submitted(state, algorithm_state, spec); handle_task_submitted(state, algorithm_state, spec);
handle_worker_available(state, algorithm_state, worker_index); handle_worker_available(state, algorithm_state, worker);
ASSERT_EQ(num_waiting_tasks(algorithm_state), 1); ASSERT_EQ(num_waiting_tasks(algorithm_state), 1);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0);
/* Once the input is available, the task gets assigned. */ /* Once the input is available, the task gets assigned. */
handle_object_available(state, algorithm_state, oid); handle_object_available(state, algorithm_state, oid);
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0);
reset_worker(photon, worker_index); reset_worker(photon, worker);
/* Check that the task gets queued in the dispatch queue if the task is /* Check that the task gets queued in the dispatch queue if the task is
* submitted and the input is available, but no worker is available yet. */ * submitted and the input is available, but no worker is available yet. */
@ -364,10 +364,10 @@ TEST task_dependency_test(void) {
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1);
/* Once a worker is available, the task gets assigned. */ /* Once a worker is available, the task gets assigned. */
handle_worker_available(state, algorithm_state, worker_index); handle_worker_available(state, algorithm_state, worker);
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0);
reset_worker(photon, worker_index); reset_worker(photon, worker);
/* If an object gets removed, check the first scenario again, where the task /* If an object gets removed, check the first scenario again, where the task
* gets queued in the waiting task if the task is submitted and a worker is * gets queued in the waiting task if the task is submitted and a worker is
@ -386,7 +386,7 @@ TEST task_dependency_test(void) {
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1);
/* Once a worker is available, the task gets assigned. */ /* Once a worker is available, the task gets assigned. */
handle_worker_available(state, algorithm_state, worker_index); handle_worker_available(state, algorithm_state, worker);
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0);
@ -399,7 +399,9 @@ TEST task_multi_dependency_test(void) {
photon_mock *photon = init_photon_mock(false); photon_mock *photon = init_photon_mock(false);
local_scheduler_state *state = photon->photon_state; local_scheduler_state *state = photon->photon_state;
scheduling_algorithm_state *algorithm_state = state->algorithm_state; scheduling_algorithm_state *algorithm_state = state->algorithm_state;
int worker_index = 0; /* Get the first worker. */
local_scheduler_client *worker =
*((local_scheduler_client **) utarray_eltptr(state->workers, 0));
task_spec *spec = example_task_spec(2, 1); task_spec *spec = example_task_spec(2, 1);
object_id oid1 = task_arg_id(spec, 0); object_id oid1 = task_arg_id(spec, 0);
object_id oid2 = task_arg_id(spec, 1); object_id oid2 = task_arg_id(spec, 1);
@ -419,10 +421,10 @@ TEST task_multi_dependency_test(void) {
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1);
/* Once a worker is available, the task gets assigned. */ /* Once a worker is available, the task gets assigned. */
handle_worker_available(state, algorithm_state, worker_index); handle_worker_available(state, algorithm_state, worker);
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0);
reset_worker(photon, worker_index); reset_worker(photon, worker);
/* Check that the task gets queued in the dispatch queue if the task is /* Check that the task gets queued in the dispatch queue if the task is
* submitted and the inputs are available, but no worker is available yet. */ * submitted and the inputs are available, but no worker is available yet. */
@ -457,10 +459,10 @@ TEST task_multi_dependency_test(void) {
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 1);
/* Once a worker is available, the task gets assigned. */ /* Once a worker is available, the task gets assigned. */
handle_worker_available(state, algorithm_state, worker_index); handle_worker_available(state, algorithm_state, worker);
ASSERT_EQ(num_waiting_tasks(algorithm_state), 0); ASSERT_EQ(num_waiting_tasks(algorithm_state), 0);
ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0); ASSERT_EQ(num_dispatch_tasks(algorithm_state), 0);
reset_worker(photon, worker_index); reset_worker(photon, worker);
free_task_spec(spec); free_task_spec(spec);
destroy_photon_mock(photon); destroy_photon_mock(photon);