2016-10-18 18:27:43 -07:00
|
|
|
#include "photon_algorithm.h"
|
|
|
|
|
|
|
|
#include <stdbool.h>
|
|
|
|
#include "utarray.h"
|
2016-11-04 00:41:20 -07:00
|
|
|
#include "utlist.h"
|
2016-10-18 18:27:43 -07:00
|
|
|
|
2016-11-10 18:13:26 -08:00
|
|
|
#include "state/task_table.h"
|
2016-10-18 18:27:43 -07:00
|
|
|
#include "photon.h"
|
|
|
|
#include "photon_scheduler.h"
|
|
|
|
|
2016-11-15 20:33:29 -08:00
|
|
|
/* TODO(swang): We should set retry values in a config file somewhere. */
|
2016-11-19 12:19:49 -08:00
|
|
|
const retry_info photon_retry = {0, 1000, NULL};
|
2016-11-15 20:33:29 -08:00
|
|
|
|
2016-11-04 00:41:20 -07:00
|
|
|
typedef struct task_queue_entry {
|
2016-11-18 19:57:51 -08:00
|
|
|
/** The task that is queued. */
|
|
|
|
task_spec *spec;
|
|
|
|
/** True if this task was assigned to this local scheduler by the global
|
|
|
|
* scheduler and false otherwise. */
|
|
|
|
bool from_global_scheduler;
|
2016-11-04 00:41:20 -07:00
|
|
|
struct task_queue_entry *prev;
|
|
|
|
struct task_queue_entry *next;
|
|
|
|
} task_queue_entry;
|
|
|
|
|
2016-10-18 18:27:43 -07:00
|
|
|
typedef struct {
|
2016-11-18 19:57:51 -08:00
|
|
|
/** Object id of this object. */
|
2016-10-18 18:27:43 -07:00
|
|
|
object_id object_id;
|
2016-11-18 19:57:51 -08:00
|
|
|
/** Handle for the uthash table. */
|
2016-10-18 18:27:43 -07:00
|
|
|
UT_hash_handle handle;
|
|
|
|
} available_object;
|
|
|
|
|
2016-12-06 15:47:31 -08:00
|
|
|
/** A data structure used to track which objects are being fetched. */
|
|
|
|
typedef struct {
|
|
|
|
/** The object ID that we are trying to fetch. */
|
|
|
|
object_id object_id;
|
|
|
|
/** The local scheduler state. */
|
|
|
|
local_scheduler_state *state;
|
|
|
|
/** The scheduling algorithm state. */
|
|
|
|
scheduling_algorithm_state *algorithm_state;
|
|
|
|
/** The ID for the timer that will time out the current request. */
|
|
|
|
int64_t timer;
|
|
|
|
/** Handle for the uthash table. */
|
|
|
|
UT_hash_handle hh;
|
|
|
|
} fetch_object_request;
|
|
|
|
|
2016-10-18 18:27:43 -07:00
|
|
|
/** Part of the photon state that is maintained by the scheduling algorithm. */
|
2016-12-04 15:51:03 -08:00
|
|
|
struct scheduling_algorithm_state {
|
2016-10-18 18:27:43 -07:00
|
|
|
/** An array of pointers to tasks that are waiting to be scheduled. */
|
2016-11-04 00:41:20 -07:00
|
|
|
task_queue_entry *task_queue;
|
2016-10-18 18:27:43 -07:00
|
|
|
/** An array of worker indices corresponding to clients that are
|
|
|
|
* waiting for tasks. */
|
|
|
|
UT_array *available_workers;
|
|
|
|
/** A hash map of the objects that are available in the local Plasma store.
|
|
|
|
* This information could be a little stale. */
|
|
|
|
available_object *local_objects;
|
2016-12-06 15:47:31 -08:00
|
|
|
/** A hash map of the objects that are currently being fetched by this local
|
|
|
|
* scheduler. The key is the object ID. */
|
|
|
|
fetch_object_request *fetch_requests;
|
2016-10-18 18:27:43 -07:00
|
|
|
};
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
scheduling_algorithm_state *make_scheduling_algorithm_state(void) {
|
|
|
|
scheduling_algorithm_state *algorithm_state =
|
|
|
|
malloc(sizeof(scheduling_algorithm_state));
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Initialize an empty hash map for the cache of local available objects. */
|
2016-12-04 15:51:03 -08:00
|
|
|
algorithm_state->local_objects = NULL;
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Initialize the local data structures used for queuing tasks and workers. */
|
2016-12-04 15:51:03 -08:00
|
|
|
algorithm_state->task_queue = NULL;
|
|
|
|
utarray_new(algorithm_state->available_workers, &ut_int_icd);
|
2016-12-06 15:47:31 -08:00
|
|
|
/* Initialize the hash table of objects being fetched. */
|
|
|
|
algorithm_state->fetch_requests = NULL;
|
2016-12-04 15:51:03 -08:00
|
|
|
return algorithm_state;
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void free_scheduling_algorithm_state(
|
|
|
|
scheduling_algorithm_state *algorithm_state) {
|
2016-11-04 00:41:20 -07:00
|
|
|
task_queue_entry *elt, *tmp1;
|
2016-12-04 15:51:03 -08:00
|
|
|
DL_FOREACH_SAFE(algorithm_state->task_queue, elt, tmp1) {
|
|
|
|
DL_DELETE(algorithm_state->task_queue, elt);
|
2016-11-18 19:57:51 -08:00
|
|
|
free_task_spec(elt->spec);
|
2016-11-04 00:41:20 -07:00
|
|
|
free(elt);
|
2016-10-26 23:23:46 -07:00
|
|
|
}
|
2016-12-04 15:51:03 -08:00
|
|
|
utarray_free(algorithm_state->available_workers);
|
2016-11-04 00:41:20 -07:00
|
|
|
available_object *available_obj, *tmp2;
|
2016-12-04 15:51:03 -08:00
|
|
|
HASH_ITER(handle, algorithm_state->local_objects, available_obj, tmp2) {
|
|
|
|
HASH_DELETE(handle, algorithm_state->local_objects, available_obj);
|
2016-10-27 15:09:50 -07:00
|
|
|
free(available_obj);
|
|
|
|
}
|
2016-12-06 15:47:31 -08:00
|
|
|
fetch_object_request *fetch_elt, *tmp_fetch_elt;
|
|
|
|
HASH_ITER(hh, algorithm_state->fetch_requests, fetch_elt, tmp_fetch_elt) {
|
|
|
|
HASH_DELETE(hh, algorithm_state->fetch_requests, fetch_elt);
|
|
|
|
free(fetch_elt);
|
|
|
|
}
|
2016-12-04 15:51:03 -08:00
|
|
|
free(algorithm_state);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Check if all of the remote object arguments for a task are available in the
|
|
|
|
* local object store.
|
|
|
|
*
|
|
|
|
* @param s The scheduler state.
|
|
|
|
* @param task Task specification of the task to check.
|
|
|
|
* @return This returns 1 if all of the remote object arguments for the task are
|
|
|
|
* present in the local object store, otherwise it returns 0.
|
|
|
|
*/
|
2016-12-04 15:51:03 -08:00
|
|
|
bool can_run(scheduling_algorithm_state *algorithm_state, task_spec *task) {
|
2016-10-18 18:27:43 -07:00
|
|
|
int64_t num_args = task_num_args(task);
|
|
|
|
for (int i = 0; i < num_args; ++i) {
|
|
|
|
if (task_arg_type(task, i) == ARG_BY_REF) {
|
2016-11-08 14:46:34 -08:00
|
|
|
object_id obj_id = task_arg_id(task, i);
|
2016-10-18 18:27:43 -07:00
|
|
|
available_object *entry;
|
2016-12-04 15:51:03 -08:00
|
|
|
HASH_FIND(handle, algorithm_state->local_objects, &obj_id, sizeof(obj_id),
|
|
|
|
entry);
|
2016-10-18 18:27:43 -07:00
|
|
|
if (entry == NULL) {
|
|
|
|
/* The object is not present locally, so this task cannot be scheduled
|
|
|
|
* right now. */
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2016-12-06 15:47:31 -08:00
|
|
|
/* TODO(rkn): This method will need to be changed to call reconstruct. */
|
|
|
|
int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
|
|
|
|
fetch_object_request *fetch_req = (fetch_object_request *) context;
|
|
|
|
object_id object_ids[1] = {fetch_req->object_id};
|
2016-12-10 21:22:05 -08:00
|
|
|
plasma_fetch(fetch_req->state->plasma_conn, 1, object_ids);
|
2016-12-06 15:47:31 -08:00
|
|
|
return LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS;
|
|
|
|
}
|
|
|
|
|
|
|
|
void fetch_missing_dependencies(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
|
|
|
task_spec *spec) {
|
|
|
|
int64_t num_args = task_num_args(spec);
|
|
|
|
for (int i = 0; i < num_args; ++i) {
|
|
|
|
if (task_arg_type(spec, i) == ARG_BY_REF) {
|
|
|
|
object_id obj_id = task_arg_id(spec, i);
|
|
|
|
available_object *entry;
|
|
|
|
HASH_FIND(handle, algorithm_state->local_objects, &obj_id, sizeof(obj_id),
|
|
|
|
entry);
|
|
|
|
if (entry == NULL) {
|
|
|
|
/* The object is not present locally, fetch the object. */
|
|
|
|
object_id object_ids[1] = {obj_id};
|
2016-12-10 21:22:05 -08:00
|
|
|
plasma_fetch(state->plasma_conn, 1, object_ids);
|
2016-12-06 15:47:31 -08:00
|
|
|
/* Create a fetch request and add a timer to the event loop to ensure
|
|
|
|
* that the fetch actually happens. */
|
|
|
|
fetch_object_request *fetch_req = malloc(sizeof(fetch_object_request));
|
|
|
|
fetch_req->object_id = obj_id;
|
|
|
|
fetch_req->state = state;
|
|
|
|
fetch_req->algorithm_state = algorithm_state;
|
|
|
|
fetch_req->timer = event_loop_add_timer(
|
|
|
|
state->loop, LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS,
|
|
|
|
fetch_object_timeout_handler, fetch_req);
|
|
|
|
/* The fetch request will be freed and removed from the hash table in
|
|
|
|
* handle_object_available when the object becomes available locally. */
|
|
|
|
HASH_ADD(hh, algorithm_state->fetch_requests, object_id,
|
|
|
|
sizeof(fetch_req->object_id), fetch_req);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-18 18:27:43 -07:00
|
|
|
/**
|
|
|
|
* If there is a task whose dependencies are available locally, assign it to the
|
|
|
|
* worker. This does not remove the worker from the available worker queue.
|
|
|
|
*
|
|
|
|
* @param s The scheduler state.
|
|
|
|
* @param worker_index The index of the worker.
|
|
|
|
* @return This returns 1 if it successfully assigned a task to the worker,
|
|
|
|
* otherwise it returns 0.
|
|
|
|
*/
|
2016-12-04 15:51:03 -08:00
|
|
|
bool find_and_schedule_task_if_possible(
|
|
|
|
local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
|
|
|
int worker_index) {
|
2016-11-04 00:41:20 -07:00
|
|
|
task_queue_entry *elt, *tmp;
|
2016-11-18 19:57:51 -08:00
|
|
|
bool found_task_to_schedule = false;
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Find the first task whose dependencies are available locally. */
|
2016-12-04 15:51:03 -08:00
|
|
|
DL_FOREACH_SAFE(algorithm_state->task_queue, elt, tmp) {
|
|
|
|
if (can_run(algorithm_state, elt->spec)) {
|
2016-11-18 19:57:51 -08:00
|
|
|
found_task_to_schedule = true;
|
2016-10-18 18:27:43 -07:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (found_task_to_schedule) {
|
|
|
|
/* This task's dependencies are available locally, so assign the task to the
|
|
|
|
* worker. */
|
2016-12-04 15:51:03 -08:00
|
|
|
assign_task_to_worker(state, elt->spec, worker_index,
|
2016-11-18 19:57:51 -08:00
|
|
|
elt->from_global_scheduler);
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Update the task queue data structure and free the task. */
|
2016-12-04 15:51:03 -08:00
|
|
|
DL_DELETE(algorithm_state->task_queue, elt);
|
2016-11-18 19:57:51 -08:00
|
|
|
free_task_spec(elt->spec);
|
2016-11-04 00:41:20 -07:00
|
|
|
free(elt);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
return found_task_to_schedule;
|
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void run_task_immediately(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-11-18 19:57:51 -08:00
|
|
|
task_spec *spec,
|
|
|
|
bool from_global_scheduler) {
|
|
|
|
/* Get the last available worker in the available worker queue. */
|
2016-12-04 15:51:03 -08:00
|
|
|
int *worker_index = (int *) utarray_back(algorithm_state->available_workers);
|
2016-11-18 19:57:51 -08:00
|
|
|
/* Tell the available worker to execute the task. */
|
2016-12-04 15:51:03 -08:00
|
|
|
assign_task_to_worker(state, spec, *worker_index, from_global_scheduler);
|
2016-11-18 19:57:51 -08:00
|
|
|
/* Remove the available worker from the queue and free the struct. */
|
2016-12-04 15:51:03 -08:00
|
|
|
utarray_pop_back(algorithm_state->available_workers);
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void queue_task_locally(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-11-18 19:57:51 -08:00
|
|
|
task_spec *spec,
|
|
|
|
bool from_global_scheduler) {
|
|
|
|
/* Copy the spec and add it to the task queue. The allocated spec will be
|
|
|
|
* freed when it is assigned to a worker. */
|
|
|
|
task_queue_entry *elt = malloc(sizeof(task_queue_entry));
|
2016-11-19 12:19:49 -08:00
|
|
|
elt->spec = (task_spec *) malloc(task_spec_size(spec));
|
2016-11-18 19:57:51 -08:00
|
|
|
memcpy(elt->spec, spec, task_spec_size(spec));
|
|
|
|
elt->from_global_scheduler = from_global_scheduler;
|
2016-12-04 15:51:03 -08:00
|
|
|
DL_APPEND(algorithm_state->task_queue, elt);
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void give_task_to_global_scheduler(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-11-18 19:57:51 -08:00
|
|
|
task_spec *spec,
|
|
|
|
bool from_global_scheduler) {
|
|
|
|
/* Pass on the task to the global scheduler. */
|
|
|
|
DCHECK(!from_global_scheduler);
|
|
|
|
task *task = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID);
|
2016-12-04 15:51:03 -08:00
|
|
|
DCHECK(state->db != NULL);
|
|
|
|
task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL,
|
|
|
|
NULL);
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void handle_task_submitted(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-11-10 18:13:26 -08:00
|
|
|
task_spec *spec) {
|
2016-10-18 18:27:43 -07:00
|
|
|
/* If this task's dependencies are available locally, and if there is an
|
2016-11-18 19:57:51 -08:00
|
|
|
* available worker, then assign this task to an available worker. If we
|
|
|
|
* cannot assign the task to a worker immediately, we either queue the task in
|
|
|
|
* the local task queue or we pass the task to the global scheduler. For now,
|
|
|
|
* we pass the task along to the global scheduler if there is one. */
|
2016-12-04 15:51:03 -08:00
|
|
|
if ((utarray_len(algorithm_state->available_workers) > 0) &&
|
|
|
|
can_run(algorithm_state, spec)) {
|
|
|
|
run_task_immediately(state, algorithm_state, spec, false);
|
|
|
|
} else if (state->db == NULL) {
|
|
|
|
queue_task_locally(state, algorithm_state, spec, false);
|
2016-10-18 18:27:43 -07:00
|
|
|
} else {
|
2016-12-04 15:51:03 -08:00
|
|
|
give_task_to_global_scheduler(state, algorithm_state, spec, false);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void handle_task_scheduled(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-11-18 19:57:51 -08:00
|
|
|
task_spec *spec) {
|
|
|
|
/* This callback handles tasks that were assigned to this local scheduler by
|
|
|
|
* the global scheduler, so we can safely assert that there is a connection
|
|
|
|
* to the database. */
|
2016-12-04 15:51:03 -08:00
|
|
|
DCHECK(state->db != NULL);
|
2016-12-06 15:47:31 -08:00
|
|
|
/* Initiate fetch calls for any dependencies that are not present locally. */
|
|
|
|
fetch_missing_dependencies(state, algorithm_state, spec);
|
2016-11-18 19:57:51 -08:00
|
|
|
/* If this task's dependencies are available locally, and if there is an
|
|
|
|
* available worker, then assign this task to an available worker. If we
|
|
|
|
* cannot assign the task to a worker immediately, queue the task locally. */
|
2016-12-04 15:51:03 -08:00
|
|
|
if ((utarray_len(algorithm_state->available_workers) > 0) &&
|
|
|
|
can_run(algorithm_state, spec)) {
|
|
|
|
run_task_immediately(state, algorithm_state, spec, true);
|
2016-11-18 19:57:51 -08:00
|
|
|
} else {
|
2016-12-04 15:51:03 -08:00
|
|
|
queue_task_locally(state, algorithm_state, spec, true);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void handle_worker_available(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-10-18 18:27:43 -07:00
|
|
|
int worker_index) {
|
|
|
|
int scheduled_task =
|
2016-12-04 15:51:03 -08:00
|
|
|
find_and_schedule_task_if_possible(state, algorithm_state, worker_index);
|
2016-10-18 18:27:43 -07:00
|
|
|
/* If we couldn't find a task to schedule, add the worker to the queue of
|
|
|
|
* available workers. */
|
|
|
|
if (!scheduled_task) {
|
2016-12-04 15:51:03 -08:00
|
|
|
for (int *p = (int *) utarray_front(algorithm_state->available_workers);
|
|
|
|
p != NULL;
|
|
|
|
p = (int *) utarray_next(algorithm_state->available_workers, p)) {
|
2016-11-04 00:41:20 -07:00
|
|
|
DCHECK(*p != worker_index);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
/* Add client_sock to a list of available workers. This struct will be freed
|
|
|
|
* when a task is assigned to this worker. */
|
2016-12-04 15:51:03 -08:00
|
|
|
utarray_push_back(algorithm_state->available_workers, &worker_index);
|
2016-11-02 00:39:35 -07:00
|
|
|
LOG_DEBUG("Adding worker_index %d to available workers.\n", worker_index);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void handle_object_available(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-10-18 18:27:43 -07:00
|
|
|
object_id object_id) {
|
|
|
|
/* TODO(rkn): When does this get freed? */
|
|
|
|
available_object *entry =
|
|
|
|
(available_object *) malloc(sizeof(available_object));
|
|
|
|
entry->object_id = object_id;
|
2016-12-04 15:51:03 -08:00
|
|
|
HASH_ADD(handle, algorithm_state->local_objects, object_id, sizeof(object_id),
|
|
|
|
entry);
|
2016-10-18 18:27:43 -07:00
|
|
|
|
|
|
|
/* Check if we can schedule any tasks. */
|
|
|
|
int num_tasks_scheduled = 0;
|
2016-12-04 15:51:03 -08:00
|
|
|
for (int *p = (int *) utarray_front(algorithm_state->available_workers);
|
|
|
|
p != NULL;
|
|
|
|
p = (int *) utarray_next(algorithm_state->available_workers, p)) {
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Schedule a task on this worker if possible. */
|
2016-12-04 15:51:03 -08:00
|
|
|
int scheduled_task =
|
|
|
|
find_and_schedule_task_if_possible(state, algorithm_state, *p);
|
2016-10-18 18:27:43 -07:00
|
|
|
if (!scheduled_task) {
|
|
|
|
/* There are no tasks we can schedule, so exit the loop. */
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
num_tasks_scheduled += 1;
|
|
|
|
}
|
2016-12-04 15:51:03 -08:00
|
|
|
utarray_erase(algorithm_state->available_workers, 0, num_tasks_scheduled);
|
2016-12-06 15:47:31 -08:00
|
|
|
|
|
|
|
/* If we were previously trying to fetch this object, remove the fetch request
|
|
|
|
* from the hash table. */
|
|
|
|
fetch_object_request *fetch_req;
|
|
|
|
HASH_FIND(hh, algorithm_state->fetch_requests, &object_id, sizeof(object_id),
|
|
|
|
fetch_req);
|
|
|
|
if (fetch_req != NULL) {
|
|
|
|
HASH_DELETE(hh, algorithm_state->fetch_requests, fetch_req);
|
|
|
|
CHECK(event_loop_remove_timer(state->loop, fetch_req->timer) == AE_OK);
|
|
|
|
free(fetch_req);
|
|
|
|
}
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|