#include "photon_algorithm.h" #include #include "utarray.h" #include "state/task_log.h" #include "photon.h" #include "photon_scheduler.h" typedef struct { /* Object id of this object. */ object_id object_id; /* Handle for the uthash table. */ UT_hash_handle handle; } available_object; /** Part of the photon state that is maintained by the scheduling algorithm. */ struct scheduler_state { /** An array of pointers to tasks that are waiting to be scheduled. */ UT_array *task_queue; /** An array of worker indices corresponding to clients that are * waiting for tasks. */ UT_array *available_workers; /** A hash map of the objects that are available in the local Plasma store. * This information could be a little stale. */ available_object *local_objects; }; scheduler_state *make_scheduler_state(void) { scheduler_state *state = malloc(sizeof(scheduler_state)); /* Initialize an empty hash map for the cache of local available objects. */ state->local_objects = NULL; /* Initialize the local data structures used for queuing tasks and workers. */ utarray_new(state->task_queue, &task_ptr_icd); utarray_new(state->available_workers, &ut_int_icd); return state; } void free_scheduler_state(scheduler_state *s) { for (int i = 0; i < utarray_len(s->task_queue); ++i) { task_instance **instance = (task_instance **) utarray_eltptr(s->task_queue, i); free(*instance); } utarray_free(s->task_queue); utarray_free(s->available_workers); free(s); } /** * Check if all of the remote object arguments for a task are available in the * local object store. * * @param s The scheduler state. * @param task Task specification of the task to check. * @return This returns 1 if all of the remote object arguments for the task are * present in the local object store, otherwise it returns 0. */ bool can_run(scheduler_state *s, task_spec *task) { int64_t num_args = task_num_args(task); for (int i = 0; i < num_args; ++i) { if (task_arg_type(task, i) == ARG_BY_REF) { object_id obj_id = *task_arg_id(task, i); available_object *entry; HASH_FIND(handle, s->local_objects, &obj_id, sizeof(object_id), entry); if (entry == NULL) { /* The object is not present locally, so this task cannot be scheduled * right now. */ return false; } } } return true; } /** * If there is a task whose dependencies are available locally, assign it to the * worker. This does not remove the worker from the available worker queue. * * @param s The scheduler state. * @param worker_index The index of the worker. * @return This returns 1 if it successfully assigned a task to the worker, * otherwise it returns 0. */ int find_and_schedule_task_if_possible(scheduler_info *info, scheduler_state *state, int worker_index) { int found_task_to_schedule = 0; /* Find the first task whose dependencies are available locally. */ task_spec *spec; task_instance **task; int i = 0; for (; i < utarray_len(state->task_queue); ++i) { task = (task_instance **) utarray_eltptr(state->task_queue, i); spec = task_instance_task_spec(*task); if (can_run(state, spec)) { found_task_to_schedule = 1; break; } } if (found_task_to_schedule) { /* This task's dependencies are available locally, so assign the task to the * worker. */ assign_task_to_worker(info, spec, worker_index); /* Update the task queue data structure and free the task. */ free(*task); utarray_erase(state->task_queue, i, 1); } return found_task_to_schedule; } void handle_task_submitted(scheduler_info *info, scheduler_state *s, task_spec *task) { /* Create a unique task instance ID. This is different from the task ID and * is used to distinguish between potentially multiple executions of the * task. */ task_iid task_iid = globally_unique_id(); task_instance *instance = make_task_instance(task_iid, task, TASK_STATUS_WAITING, NIL_ID); /* If this task's dependencies are available locally, and if there is an * available worker, then assign this task to an available worker. Otherwise, * add this task to the local task queue. */ int schedule_locally = (utarray_len(s->available_workers) > 0) && can_run(s, task); if (schedule_locally) { /* Get the last available worker in the available worker queue. */ int *worker_index = (int *) utarray_back(s->available_workers); /* Tell the available worker to execute the task. */ assign_task_to_worker(info, task, *worker_index); /* Remove the available worker from the queue and free the struct. */ utarray_pop_back(s->available_workers); } else { /* Add the task to the task queue. This passes ownership of the task queue. * And the task will be freed when it is assigned to a worker. */ utarray_push_back(s->task_queue, &instance); } /* Submit the task to redis. */ task_log_add_task(info->db, instance); if (schedule_locally) { /* If the task was scheduled locally, we need to free it. Otherwise, * ownership of the task is passed to the task_queue, and it will be freed * when it is assigned to a worker. */ free(instance); } } void handle_worker_available(scheduler_info *info, scheduler_state *state, int worker_index) { int scheduled_task = find_and_schedule_task_if_possible(info, state, worker_index); /* If we couldn't find a task to schedule, add the worker to the queue of * available workers. */ if (!scheduled_task) { for (int *p = (int *) utarray_front(state->available_workers); p != NULL; p = (int *) utarray_next(state->available_workers, p)) { CHECK(*p != worker_index); } /* Add client_sock to a list of available workers. This struct will be freed * when a task is assigned to this worker. */ utarray_push_back(state->available_workers, &worker_index); LOG_INFO("Adding worker_index %d to available workers.\n", worker_index); } } void handle_object_available(scheduler_info *info, scheduler_state *state, object_id object_id) { /* TODO(rkn): When does this get freed? */ available_object *entry = (available_object *) malloc(sizeof(available_object)); entry->object_id = object_id; HASH_ADD(handle, state->local_objects, object_id, sizeof(object_id), entry); /* Check if we can schedule any tasks. */ int num_tasks_scheduled = 0; for (int *p = (int *) utarray_front(state->available_workers); p != NULL; p = (int *) utarray_next(state->available_workers, p)) { /* Schedule a task on this worker if possible. */ int scheduled_task = find_and_schedule_task_if_possible(info, state, *p); if (!scheduled_task) { /* There are no tasks we can schedule, so exit the loop. */ break; } num_tasks_scheduled += 1; } utarray_erase(state->available_workers, 0, num_tasks_scheduled); }