2016-10-18 18:27:43 -07:00
|
|
|
#include "photon_algorithm.h"
|
|
|
|
|
|
|
|
#include <stdbool.h>
|
|
|
|
#include "utarray.h"
|
2016-11-04 00:41:20 -07:00
|
|
|
#include "utlist.h"
|
2016-10-18 18:27:43 -07:00
|
|
|
|
2016-11-10 18:13:26 -08:00
|
|
|
#include "state/task_table.h"
|
2016-12-24 20:02:25 -08:00
|
|
|
#include "state/local_scheduler_table.h"
|
2016-12-12 23:17:22 -08:00
|
|
|
#include "state/object_table.h"
|
2016-10-18 18:27:43 -07:00
|
|
|
#include "photon.h"
|
|
|
|
#include "photon_scheduler.h"
|
|
|
|
|
2016-11-04 00:41:20 -07:00
|
|
|
typedef struct task_queue_entry {
|
2016-11-18 19:57:51 -08:00
|
|
|
/** The task that is queued. */
|
|
|
|
task_spec *spec;
|
2016-11-04 00:41:20 -07:00
|
|
|
struct task_queue_entry *prev;
|
|
|
|
struct task_queue_entry *next;
|
|
|
|
} task_queue_entry;
|
|
|
|
|
2016-10-18 18:27:43 -07:00
|
|
|
typedef struct {
|
2016-11-18 19:57:51 -08:00
|
|
|
/** Object id of this object. */
|
2016-10-18 18:27:43 -07:00
|
|
|
object_id object_id;
|
2016-11-18 19:57:51 -08:00
|
|
|
/** Handle for the uthash table. */
|
2016-10-18 18:27:43 -07:00
|
|
|
UT_hash_handle handle;
|
|
|
|
} available_object;
|
|
|
|
|
2016-12-06 15:47:31 -08:00
|
|
|
/** A data structure used to track which objects are being fetched. */
|
|
|
|
typedef struct {
|
|
|
|
/** The object ID that we are trying to fetch. */
|
|
|
|
object_id object_id;
|
|
|
|
/** The local scheduler state. */
|
|
|
|
local_scheduler_state *state;
|
|
|
|
/** The scheduling algorithm state. */
|
|
|
|
scheduling_algorithm_state *algorithm_state;
|
|
|
|
/** The ID for the timer that will time out the current request. */
|
|
|
|
int64_t timer;
|
|
|
|
/** Handle for the uthash table. */
|
|
|
|
UT_hash_handle hh;
|
|
|
|
} fetch_object_request;
|
|
|
|
|
2016-10-18 18:27:43 -07:00
|
|
|
/** Part of the photon state that is maintained by the scheduling algorithm. */
|
2016-12-04 15:51:03 -08:00
|
|
|
struct scheduling_algorithm_state {
|
2017-01-18 20:27:40 -08:00
|
|
|
/** An array of pointers to tasks that are waiting for dependencies. */
|
|
|
|
task_queue_entry *waiting_task_queue;
|
|
|
|
/** An array of pointers to tasks whose dependencies are ready but that are
|
|
|
|
* waiting to be assigned to a worker. */
|
|
|
|
task_queue_entry *dispatch_task_queue;
|
2016-10-18 18:27:43 -07:00
|
|
|
/** An array of worker indices corresponding to clients that are
|
|
|
|
* waiting for tasks. */
|
|
|
|
UT_array *available_workers;
|
|
|
|
/** A hash map of the objects that are available in the local Plasma store.
|
|
|
|
* This information could be a little stale. */
|
|
|
|
available_object *local_objects;
|
2016-12-06 15:47:31 -08:00
|
|
|
/** A hash map of the objects that are currently being fetched by this local
|
|
|
|
* scheduler. The key is the object ID. */
|
|
|
|
fetch_object_request *fetch_requests;
|
2016-10-18 18:27:43 -07:00
|
|
|
};
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
scheduling_algorithm_state *make_scheduling_algorithm_state(void) {
|
|
|
|
scheduling_algorithm_state *algorithm_state =
|
|
|
|
malloc(sizeof(scheduling_algorithm_state));
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Initialize an empty hash map for the cache of local available objects. */
|
2016-12-04 15:51:03 -08:00
|
|
|
algorithm_state->local_objects = NULL;
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Initialize the local data structures used for queuing tasks and workers. */
|
2017-01-18 20:27:40 -08:00
|
|
|
algorithm_state->waiting_task_queue = NULL;
|
|
|
|
algorithm_state->dispatch_task_queue = NULL;
|
2016-12-04 15:51:03 -08:00
|
|
|
utarray_new(algorithm_state->available_workers, &ut_int_icd);
|
2016-12-06 15:47:31 -08:00
|
|
|
/* Initialize the hash table of objects being fetched. */
|
|
|
|
algorithm_state->fetch_requests = NULL;
|
2016-12-04 15:51:03 -08:00
|
|
|
return algorithm_state;
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void free_scheduling_algorithm_state(
|
|
|
|
scheduling_algorithm_state *algorithm_state) {
|
2016-11-04 00:41:20 -07:00
|
|
|
task_queue_entry *elt, *tmp1;
|
2017-01-18 20:27:40 -08:00
|
|
|
DL_FOREACH_SAFE(algorithm_state->waiting_task_queue, elt, tmp1) {
|
|
|
|
DL_DELETE(algorithm_state->waiting_task_queue, elt);
|
|
|
|
free_task_spec(elt->spec);
|
|
|
|
free(elt);
|
|
|
|
}
|
|
|
|
DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp1) {
|
|
|
|
DL_DELETE(algorithm_state->dispatch_task_queue, elt);
|
2016-11-18 19:57:51 -08:00
|
|
|
free_task_spec(elt->spec);
|
2016-11-04 00:41:20 -07:00
|
|
|
free(elt);
|
2016-10-26 23:23:46 -07:00
|
|
|
}
|
2016-12-04 15:51:03 -08:00
|
|
|
utarray_free(algorithm_state->available_workers);
|
2016-11-04 00:41:20 -07:00
|
|
|
available_object *available_obj, *tmp2;
|
2016-12-04 15:51:03 -08:00
|
|
|
HASH_ITER(handle, algorithm_state->local_objects, available_obj, tmp2) {
|
|
|
|
HASH_DELETE(handle, algorithm_state->local_objects, available_obj);
|
2016-10-27 15:09:50 -07:00
|
|
|
free(available_obj);
|
|
|
|
}
|
2016-12-06 15:47:31 -08:00
|
|
|
fetch_object_request *fetch_elt, *tmp_fetch_elt;
|
|
|
|
HASH_ITER(hh, algorithm_state->fetch_requests, fetch_elt, tmp_fetch_elt) {
|
|
|
|
HASH_DELETE(hh, algorithm_state->fetch_requests, fetch_elt);
|
|
|
|
free(fetch_elt);
|
|
|
|
}
|
2016-12-04 15:51:03 -08:00
|
|
|
free(algorithm_state);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
|
2016-12-24 20:02:25 -08:00
|
|
|
void provide_scheduler_info(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
|
|
|
local_scheduler_info *info) {
|
|
|
|
task_queue_entry *elt;
|
2016-12-28 22:33:20 -08:00
|
|
|
info->total_num_workers = utarray_len(state->workers);
|
2017-01-18 20:27:40 -08:00
|
|
|
/* TODO(swang): Provide separate counts for tasks that are waiting for
|
|
|
|
* dependencies vs tasks that are waiting to be assigned. */
|
|
|
|
int waiting_task_queue_length;
|
|
|
|
DL_COUNT(algorithm_state->waiting_task_queue, elt, waiting_task_queue_length);
|
|
|
|
int dispatch_task_queue_length;
|
|
|
|
DL_COUNT(algorithm_state->dispatch_task_queue, elt,
|
|
|
|
dispatch_task_queue_length);
|
|
|
|
info->task_queue_length =
|
|
|
|
waiting_task_queue_length + dispatch_task_queue_length;
|
2016-12-24 20:02:25 -08:00
|
|
|
info->available_workers = utarray_len(algorithm_state->available_workers);
|
|
|
|
}
|
|
|
|
|
2016-10-18 18:27:43 -07:00
|
|
|
/**
|
|
|
|
* Check if all of the remote object arguments for a task are available in the
|
|
|
|
* local object store.
|
|
|
|
*
|
|
|
|
* @param s The scheduler state.
|
|
|
|
* @param task Task specification of the task to check.
|
|
|
|
* @return This returns 1 if all of the remote object arguments for the task are
|
|
|
|
* present in the local object store, otherwise it returns 0.
|
|
|
|
*/
|
2016-12-04 15:51:03 -08:00
|
|
|
bool can_run(scheduling_algorithm_state *algorithm_state, task_spec *task) {
|
2016-10-18 18:27:43 -07:00
|
|
|
int64_t num_args = task_num_args(task);
|
|
|
|
for (int i = 0; i < num_args; ++i) {
|
|
|
|
if (task_arg_type(task, i) == ARG_BY_REF) {
|
2016-11-08 14:46:34 -08:00
|
|
|
object_id obj_id = task_arg_id(task, i);
|
2016-10-18 18:27:43 -07:00
|
|
|
available_object *entry;
|
2016-12-04 15:51:03 -08:00
|
|
|
HASH_FIND(handle, algorithm_state->local_objects, &obj_id, sizeof(obj_id),
|
|
|
|
entry);
|
2016-10-18 18:27:43 -07:00
|
|
|
if (entry == NULL) {
|
|
|
|
/* The object is not present locally, so this task cannot be scheduled
|
|
|
|
* right now. */
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2016-12-06 15:47:31 -08:00
|
|
|
/* TODO(rkn): This method will need to be changed to call reconstruct. */
|
|
|
|
int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
|
|
|
|
fetch_object_request *fetch_req = (fetch_object_request *) context;
|
|
|
|
object_id object_ids[1] = {fetch_req->object_id};
|
2016-12-10 21:22:05 -08:00
|
|
|
plasma_fetch(fetch_req->state->plasma_conn, 1, object_ids);
|
2016-12-06 15:47:31 -08:00
|
|
|
return LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS;
|
|
|
|
}
|
|
|
|
|
|
|
|
void fetch_missing_dependencies(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
|
|
|
task_spec *spec) {
|
|
|
|
int64_t num_args = task_num_args(spec);
|
|
|
|
for (int i = 0; i < num_args; ++i) {
|
|
|
|
if (task_arg_type(spec, i) == ARG_BY_REF) {
|
|
|
|
object_id obj_id = task_arg_id(spec, i);
|
|
|
|
available_object *entry;
|
|
|
|
HASH_FIND(handle, algorithm_state->local_objects, &obj_id, sizeof(obj_id),
|
|
|
|
entry);
|
|
|
|
if (entry == NULL) {
|
|
|
|
/* The object is not present locally, fetch the object. */
|
|
|
|
object_id object_ids[1] = {obj_id};
|
2016-12-10 21:22:05 -08:00
|
|
|
plasma_fetch(state->plasma_conn, 1, object_ids);
|
2016-12-06 15:47:31 -08:00
|
|
|
/* Create a fetch request and add a timer to the event loop to ensure
|
|
|
|
* that the fetch actually happens. */
|
|
|
|
fetch_object_request *fetch_req = malloc(sizeof(fetch_object_request));
|
|
|
|
fetch_req->object_id = obj_id;
|
|
|
|
fetch_req->state = state;
|
|
|
|
fetch_req->algorithm_state = algorithm_state;
|
|
|
|
fetch_req->timer = event_loop_add_timer(
|
|
|
|
state->loop, LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS,
|
|
|
|
fetch_object_timeout_handler, fetch_req);
|
|
|
|
/* The fetch request will be freed and removed from the hash table in
|
|
|
|
* handle_object_available when the object becomes available locally. */
|
|
|
|
HASH_ADD(hh, algorithm_state->fetch_requests, object_id,
|
|
|
|
sizeof(fetch_req->object_id), fetch_req);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-18 18:27:43 -07:00
|
|
|
/**
|
2017-01-18 20:27:40 -08:00
|
|
|
* Assign as many tasks from the dispatch queue as possible.
|
2016-10-18 18:27:43 -07:00
|
|
|
*
|
2017-01-18 20:27:40 -08:00
|
|
|
* @param state The scheduler state.
|
|
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
|
|
* @return Void.
|
2016-10-18 18:27:43 -07:00
|
|
|
*/
|
2017-01-18 20:27:40 -08:00
|
|
|
void dispatch_tasks(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state) {
|
|
|
|
/* Assign tasks while there are still tasks in the dispatch queue and
|
|
|
|
* available workers. */
|
|
|
|
while ((algorithm_state->dispatch_task_queue != NULL) &&
|
|
|
|
(utarray_len(algorithm_state->available_workers) > 0)) {
|
|
|
|
LOG_DEBUG("Dispatching task");
|
|
|
|
/* Pop a task from the dispatch queue. */
|
|
|
|
task_queue_entry *dispatched_task = algorithm_state->dispatch_task_queue;
|
|
|
|
DL_DELETE(algorithm_state->dispatch_task_queue, dispatched_task);
|
|
|
|
|
|
|
|
/* Get the last available worker in the available worker queue. */
|
|
|
|
int *worker_index =
|
|
|
|
(int *) utarray_back(algorithm_state->available_workers);
|
|
|
|
/* Tell the available worker to execute the task. */
|
|
|
|
assign_task_to_worker(state, dispatched_task->spec, *worker_index);
|
|
|
|
/* Remove the available worker from the queue and free the struct. */
|
|
|
|
utarray_pop_back(algorithm_state->available_workers);
|
|
|
|
free_task_spec(dispatched_task->spec);
|
|
|
|
free(dispatched_task);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* A helper function to allocate a queue entry for a task specification and
|
|
|
|
* push it onto a generic queue.
|
|
|
|
*
|
|
|
|
* @param state The state of the local scheduler.
|
|
|
|
* @param task_queue A pointer to a task queue. NOTE: Because we are using
|
|
|
|
* utlist.h, we must pass in a pointer to the queue we want to append
|
|
|
|
* to. If we passed in the queue itself and the queue was empty, this
|
|
|
|
* would append the task to a queue that we don't have a reference to.
|
|
|
|
* @param spec The task specification to queue.
|
|
|
|
* @param from_global_scheduler Whether or not the task was from a global
|
|
|
|
* scheduler. If false, the task was submitted by a worker.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
|
|
|
void queue_task(local_scheduler_state *state,
|
|
|
|
task_queue_entry **task_queue,
|
|
|
|
task_spec *spec,
|
|
|
|
bool from_global_scheduler) {
|
|
|
|
/* Copy the spec and add it to the task queue. The allocated spec will be
|
|
|
|
* freed when it is assigned to a worker. */
|
|
|
|
task_queue_entry *elt = malloc(sizeof(task_queue_entry));
|
|
|
|
elt->spec = (task_spec *) malloc(task_spec_size(spec));
|
|
|
|
memcpy(elt->spec, spec, task_spec_size(spec));
|
|
|
|
DL_APPEND((*task_queue), elt);
|
|
|
|
|
|
|
|
/* The task has been added to a local scheduler queue. Write the entry in the
|
|
|
|
* task table to notify others that we have queued it. */
|
|
|
|
if (state->db != NULL) {
|
|
|
|
task *task =
|
|
|
|
alloc_task(spec, TASK_STATUS_QUEUED, get_db_client_id(state->db));
|
|
|
|
if (from_global_scheduler) {
|
|
|
|
/* If the task is from the global scheduler, it's already been added to
|
|
|
|
* the task table, so just update the entry. */
|
|
|
|
task_table_update(state->db, task, (retry_info *) &photon_retry, NULL,
|
|
|
|
NULL);
|
|
|
|
} else {
|
|
|
|
/* Otherwise, this is the first time the task has been seen in the system
|
|
|
|
* (unless it's a resubmission of a previous task), so add the entry. */
|
|
|
|
task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL,
|
|
|
|
NULL);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
}
|
2017-01-18 20:27:40 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Queue a task whose dependencies are missing. When the task's object
|
|
|
|
* dependencies become available, the task will be moved to the dispatch queue.
|
|
|
|
* If we have a connection to a plasma manager, begin trying to fetch the
|
|
|
|
* dependencies.
|
|
|
|
*
|
|
|
|
* @param state The scheduler state.
|
|
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
|
|
* @param spec The task specification to queue.
|
|
|
|
* @param from_global_scheduler Whether or not the task was from a global
|
|
|
|
* scheduler. If false, the task was submitted by a worker.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
|
|
|
void queue_waiting_task(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
|
|
|
task_spec *spec,
|
|
|
|
bool from_global_scheduler) {
|
|
|
|
LOG_DEBUG("Queueing task in waiting queue");
|
|
|
|
/* Initiate fetch calls for any dependencies that are not present locally. */
|
|
|
|
if (plasma_manager_is_connected(state->plasma_conn)) {
|
|
|
|
fetch_missing_dependencies(state, algorithm_state, spec);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
2017-01-18 20:27:40 -08:00
|
|
|
queue_task(state, &algorithm_state->waiting_task_queue, spec,
|
|
|
|
from_global_scheduler);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
|
2017-01-18 20:27:40 -08:00
|
|
|
/**
|
|
|
|
* Queue a task whose dependencies are ready. When the task reaches the front
|
|
|
|
* of the dispatch queue and workers are available, it will be assigned.
|
|
|
|
*
|
|
|
|
* @param state The scheduler state.
|
|
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
|
|
* @param spec The task specification to queue.
|
|
|
|
* @param from_global_scheduler Whether or not the task was from a global
|
|
|
|
* scheduler. If false, the task was submitted by a worker.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
|
|
|
void queue_dispatch_task(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
|
|
|
task_spec *spec,
|
|
|
|
bool from_global_scheduler) {
|
|
|
|
LOG_DEBUG("Queueing task in dispatch queue");
|
|
|
|
queue_task(state, &algorithm_state->dispatch_task_queue, spec,
|
|
|
|
from_global_scheduler);
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2017-01-18 20:27:40 -08:00
|
|
|
/**
|
|
|
|
* Add the task to the proper local scheduler queue. This assumes that the
|
|
|
|
* scheduling decision to place the task on this node has already been made,
|
|
|
|
* whether locally or by the global scheduler.
|
|
|
|
*
|
|
|
|
* @param state The scheduler state.
|
|
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
|
|
* @param spec The task specification to queue.
|
|
|
|
* @param from_global_scheduler Whether or not the task was from a global
|
|
|
|
* scheduler. If false, the task was submitted by a worker.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
2016-12-04 15:51:03 -08:00
|
|
|
void queue_task_locally(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-11-18 19:57:51 -08:00
|
|
|
task_spec *spec,
|
|
|
|
bool from_global_scheduler) {
|
2017-01-18 20:27:40 -08:00
|
|
|
if (can_run(algorithm_state, spec)) {
|
|
|
|
/* Dependencies are ready, so push the task to the dispatch queue. */
|
|
|
|
queue_dispatch_task(state, algorithm_state, spec, from_global_scheduler);
|
|
|
|
} else {
|
|
|
|
/* Dependencies are not ready, so push the task to the waiting queue. */
|
|
|
|
queue_waiting_task(state, algorithm_state, spec, from_global_scheduler);
|
2016-12-20 00:13:39 -08:00
|
|
|
}
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2017-01-18 20:27:40 -08:00
|
|
|
/**
|
|
|
|
* Give a task to the global scheduler to schedule.
|
|
|
|
*
|
|
|
|
* @param state The scheduler state.
|
|
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
|
|
* @param spec The task specification to schedule.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
2016-12-04 15:51:03 -08:00
|
|
|
void give_task_to_global_scheduler(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2017-01-18 20:27:40 -08:00
|
|
|
task_spec *spec) {
|
|
|
|
if (state->db == NULL || !state->global_scheduler_exists) {
|
|
|
|
/* A global scheduler is not available, so queue the task locally. */
|
|
|
|
queue_task_locally(state, algorithm_state, spec, false);
|
|
|
|
return;
|
|
|
|
}
|
2016-11-18 19:57:51 -08:00
|
|
|
/* Pass on the task to the global scheduler. */
|
2016-12-12 23:17:22 -08:00
|
|
|
DCHECK(state->global_scheduler_exists);
|
2016-11-18 19:57:51 -08:00
|
|
|
task *task = alloc_task(spec, TASK_STATUS_WAITING, NIL_ID);
|
2016-12-04 15:51:03 -08:00
|
|
|
DCHECK(state->db != NULL);
|
|
|
|
task_table_add_task(state->db, task, (retry_info *) &photon_retry, NULL,
|
|
|
|
NULL);
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void handle_task_submitted(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-11-10 18:13:26 -08:00
|
|
|
task_spec *spec) {
|
2016-10-18 18:27:43 -07:00
|
|
|
/* If this task's dependencies are available locally, and if there is an
|
2016-11-18 19:57:51 -08:00
|
|
|
* available worker, then assign this task to an available worker. If we
|
|
|
|
* cannot assign the task to a worker immediately, we either queue the task in
|
|
|
|
* the local task queue or we pass the task to the global scheduler. For now,
|
|
|
|
* we pass the task along to the global scheduler if there is one. */
|
2017-01-18 20:27:40 -08:00
|
|
|
if (can_run(algorithm_state, spec) &&
|
|
|
|
(utarray_len(algorithm_state->available_workers) > 0)) {
|
|
|
|
/* Dependencies are ready and there is an available worker, so dispatch the
|
|
|
|
* task. */
|
|
|
|
queue_dispatch_task(state, algorithm_state, spec, false);
|
2016-10-18 18:27:43 -07:00
|
|
|
} else {
|
2017-01-18 20:27:40 -08:00
|
|
|
/* Give the task to the global scheduler to schedule, if it exists. */
|
|
|
|
give_task_to_global_scheduler(state, algorithm_state, spec);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
2017-01-18 20:27:40 -08:00
|
|
|
|
|
|
|
/* Try to dispatch tasks, since we may have added one to the queue. */
|
|
|
|
dispatch_tasks(state, algorithm_state);
|
|
|
|
|
2016-12-12 23:17:22 -08:00
|
|
|
/* Update the result table, which holds mappings of object ID -> ID of the
|
|
|
|
* task that created it. */
|
|
|
|
if (state->db != NULL) {
|
|
|
|
task_id task_id = task_spec_id(spec);
|
|
|
|
for (int64_t i = 0; i < task_num_returns(spec); ++i) {
|
|
|
|
object_id return_id = task_return(spec, i);
|
|
|
|
result_table_add(state->db, return_id, task_id,
|
|
|
|
(retry_info *) &photon_retry, NULL, NULL);
|
|
|
|
}
|
|
|
|
}
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void handle_task_scheduled(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-11-18 19:57:51 -08:00
|
|
|
task_spec *spec) {
|
|
|
|
/* This callback handles tasks that were assigned to this local scheduler by
|
|
|
|
* the global scheduler, so we can safely assert that there is a connection
|
|
|
|
* to the database. */
|
2016-12-04 15:51:03 -08:00
|
|
|
DCHECK(state->db != NULL);
|
2016-12-12 23:17:22 -08:00
|
|
|
DCHECK(state->global_scheduler_exists);
|
2017-01-18 20:27:40 -08:00
|
|
|
/* Push the task to the appropriate queue. */
|
|
|
|
queue_task_locally(state, algorithm_state, spec, true);
|
|
|
|
dispatch_tasks(state, algorithm_state);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void handle_worker_available(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-10-18 18:27:43 -07:00
|
|
|
int worker_index) {
|
2016-12-20 00:13:39 -08:00
|
|
|
worker *available_worker =
|
|
|
|
(worker *) utarray_eltptr(state->workers, worker_index);
|
|
|
|
CHECK(available_worker->task_in_progress == NULL);
|
2017-01-18 20:27:40 -08:00
|
|
|
for (int *p = (int *) utarray_front(algorithm_state->available_workers);
|
|
|
|
p != NULL;
|
|
|
|
p = (int *) utarray_next(algorithm_state->available_workers, p)) {
|
|
|
|
DCHECK(*p != worker_index);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
2017-01-18 20:27:40 -08:00
|
|
|
/* Add worker to the list of available workers. */
|
|
|
|
utarray_push_back(algorithm_state->available_workers, &worker_index);
|
|
|
|
LOG_DEBUG("Adding worker_index %d to available workers", worker_index);
|
|
|
|
|
|
|
|
/* Try to dispatch tasks, since we now have available workers to assign them
|
|
|
|
* to. */
|
|
|
|
dispatch_tasks(state, algorithm_state);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
|
|
|
|
2016-12-04 15:51:03 -08:00
|
|
|
void handle_object_available(local_scheduler_state *state,
|
|
|
|
scheduling_algorithm_state *algorithm_state,
|
2016-10-18 18:27:43 -07:00
|
|
|
object_id object_id) {
|
2017-01-18 20:27:40 -08:00
|
|
|
/* Available object entries get freed if the object is removed. */
|
2016-10-18 18:27:43 -07:00
|
|
|
available_object *entry =
|
|
|
|
(available_object *) malloc(sizeof(available_object));
|
|
|
|
entry->object_id = object_id;
|
2016-12-04 15:51:03 -08:00
|
|
|
HASH_ADD(handle, algorithm_state->local_objects, object_id, sizeof(object_id),
|
|
|
|
entry);
|
2016-10-18 18:27:43 -07:00
|
|
|
|
2016-12-06 15:47:31 -08:00
|
|
|
/* If we were previously trying to fetch this object, remove the fetch request
|
|
|
|
* from the hash table. */
|
|
|
|
fetch_object_request *fetch_req;
|
|
|
|
HASH_FIND(hh, algorithm_state->fetch_requests, &object_id, sizeof(object_id),
|
|
|
|
fetch_req);
|
|
|
|
if (fetch_req != NULL) {
|
|
|
|
HASH_DELETE(hh, algorithm_state->fetch_requests, fetch_req);
|
|
|
|
CHECK(event_loop_remove_timer(state->loop, fetch_req->timer) == AE_OK);
|
|
|
|
free(fetch_req);
|
|
|
|
}
|
2017-01-18 20:27:40 -08:00
|
|
|
|
|
|
|
/* Move any tasks whose object dependencies are now ready to the dispatch
|
|
|
|
* queue. */
|
|
|
|
/* TODO(swang): This can be optimized by keeping a lookup table from object
|
|
|
|
* ID to list of dependent tasks in the waiting queue. */
|
|
|
|
task_queue_entry *elt, *tmp;
|
|
|
|
DL_FOREACH_SAFE(algorithm_state->waiting_task_queue, elt, tmp) {
|
|
|
|
if (can_run(algorithm_state, elt->spec)) {
|
|
|
|
LOG_DEBUG("Moved task to dispatch queue");
|
|
|
|
DL_DELETE(algorithm_state->waiting_task_queue, elt);
|
|
|
|
DL_APPEND(algorithm_state->dispatch_task_queue, elt);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Try to dispatch tasks, since we may have added some from the waiting
|
|
|
|
* queue. */
|
|
|
|
dispatch_tasks(state, algorithm_state);
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
2016-12-12 23:17:22 -08:00
|
|
|
|
2017-01-18 20:27:40 -08:00
|
|
|
void handle_object_removed(local_scheduler_state *state,
|
|
|
|
object_id removed_object_id) {
|
2016-12-19 23:18:57 -08:00
|
|
|
scheduling_algorithm_state *algorithm_state = state->algorithm_state;
|
|
|
|
available_object *entry;
|
2017-01-18 20:27:40 -08:00
|
|
|
HASH_FIND(handle, algorithm_state->local_objects, &removed_object_id,
|
|
|
|
sizeof(removed_object_id), entry);
|
2016-12-19 23:18:57 -08:00
|
|
|
if (entry != NULL) {
|
|
|
|
HASH_DELETE(handle, algorithm_state->local_objects, entry);
|
2016-12-29 23:10:38 -08:00
|
|
|
free(entry);
|
2016-12-19 23:18:57 -08:00
|
|
|
}
|
2017-01-18 20:27:40 -08:00
|
|
|
|
|
|
|
/* Move dependent tasks from the dispatch queue back to the waiting queue. */
|
|
|
|
task_queue_entry *elt, *tmp;
|
|
|
|
DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp) {
|
|
|
|
task_spec *task = elt->spec;
|
|
|
|
int64_t num_args = task_num_args(task);
|
|
|
|
for (int i = 0; i < num_args; ++i) {
|
|
|
|
if (task_arg_type(task, i) == ARG_BY_REF) {
|
|
|
|
object_id arg_id = task_arg_id(task, i);
|
|
|
|
if (object_ids_equal(arg_id, removed_object_id)) {
|
|
|
|
LOG_DEBUG("Moved task from dispatch queue back to waiting queue");
|
|
|
|
DL_DELETE(algorithm_state->dispatch_task_queue, elt);
|
|
|
|
DL_APPEND(algorithm_state->waiting_task_queue, elt);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
int num_waiting_tasks(scheduling_algorithm_state *algorithm_state) {
|
|
|
|
task_queue_entry *elt;
|
|
|
|
int count;
|
|
|
|
DL_COUNT(algorithm_state->waiting_task_queue, elt, count);
|
|
|
|
return count;
|
2016-12-19 23:18:57 -08:00
|
|
|
}
|
|
|
|
|
2017-01-18 20:27:40 -08:00
|
|
|
int num_dispatch_tasks(scheduling_algorithm_state *algorithm_state) {
|
2016-12-12 23:17:22 -08:00
|
|
|
task_queue_entry *elt;
|
|
|
|
int count;
|
2017-01-18 20:27:40 -08:00
|
|
|
DL_COUNT(algorithm_state->dispatch_task_queue, elt, count);
|
2016-12-12 23:17:22 -08:00
|
|
|
return count;
|
|
|
|
}
|