Convert some local scheduler data structures to C++ STL. (#445)

* Convert more local scheduler data structures to C++ STL.

* Convert vector pointer to vector.

* Convert some of the UT_arrays to std::vector.

* Simplify worker vectors.

* Simplify remote_object and local_object containers.

* Change some unnecessary checks to DCHECK.
This commit is contained in:
Robert Nishihara 2017-04-10 21:02:36 -07:00 committed by Stephanie Wang
parent 6ffc849d23
commit fb4525f833
5 changed files with 239 additions and 355 deletions

View file

@ -22,10 +22,8 @@
#include "state/object_table.h" #include "state/object_table.h"
#include "state/error_table.h" #include "state/error_table.h"
#include "utarray.h" #include "utarray.h"
#include "uthash.h"
UT_icd task_ptr_icd = {sizeof(Task *), NULL, NULL, NULL}; UT_icd task_ptr_icd = {sizeof(Task *), NULL, NULL, NULL};
UT_icd workers_icd = {sizeof(LocalSchedulerClient *), NULL, NULL, NULL};
UT_icd pid_t_icd = {sizeof(pid_t), NULL, NULL, NULL}; UT_icd pid_t_icd = {sizeof(pid_t), NULL, NULL, NULL};
@ -63,7 +61,7 @@ int force_kill_worker(event_loop *loop, timer_id id, void *context) {
LocalSchedulerClient *worker = (LocalSchedulerClient *) context; LocalSchedulerClient *worker = (LocalSchedulerClient *) context;
kill(worker->pid, SIGKILL); kill(worker->pid, SIGKILL);
close(worker->sock); close(worker->sock);
free(worker); delete worker;
return EVENT_LOOP_TIMER_DONE; return EVENT_LOOP_TIMER_DONE;
} }
@ -78,28 +76,24 @@ int force_kill_worker(event_loop *loop, timer_id id, void *context) {
* to clean up its own state. * to clean up its own state.
* @return Void. * @return Void.
*/ */
void kill_worker(LocalSchedulerClient *worker, bool cleanup) { void kill_worker(LocalSchedulerState *state,
LocalSchedulerClient *worker,
bool cleanup) {
/* Erase the local scheduler's reference to the worker. */ /* Erase the local scheduler's reference to the worker. */
LocalSchedulerState *state = worker->local_scheduler_state; auto it = std::find(state->workers.begin(), state->workers.end(), worker);
int num_workers = utarray_len(state->workers); CHECK(it != state->workers.end());
for (int i = 0; i < utarray_len(state->workers); ++i) { state->workers.erase(it);
LocalSchedulerClient *active_worker =
*(LocalSchedulerClient **) utarray_eltptr(state->workers, i); /* Make sure that we removed the worker. */
if (active_worker == worker) { it = std::find(state->workers.begin(), state->workers.end(), worker);
utarray_erase(state->workers, i, 1); CHECK(it == state->workers.end());
}
}
/* Make sure that we erased exactly 1 worker. */
CHECKM(!(utarray_len(state->workers) < num_workers - 1),
"Found duplicate workers");
CHECKM(utarray_len(state->workers) != num_workers,
"Tried to kill worker that doesn't exist");
/* Erase the algorithm state's reference to the worker. */ /* Erase the algorithm state's reference to the worker. */
handle_worker_removed(state, state->algorithm_state, worker); handle_worker_removed(state, state->algorithm_state, worker);
/* Remove the client socket from the event loop so that we don't process the /* Remove the client socket from the event loop so that we don't process the
* SIGPIPE when the worker is killed. */ * SIGPIPE when the worker is killed. */
event_loop_remove_file(worker->local_scheduler_state->loop, worker->sock); event_loop_remove_file(state->loop, worker->sock);
/* If the worker has registered a process ID with us and it's a child /* If the worker has registered a process ID with us and it's a child
* process, use it to send a kill signal. */ * process, use it to send a kill signal. */
@ -154,7 +148,7 @@ void kill_worker(LocalSchedulerClient *worker, bool cleanup) {
/* Clean up the client socket after killing the worker so that the worker /* Clean up the client socket after killing the worker so that the worker
* can't receive the SIGPIPE before exiting. */ * can't receive the SIGPIPE before exiting. */
close(worker->sock); close(worker->sock);
free(worker); delete worker;
} }
} }
@ -165,24 +159,20 @@ void LocalSchedulerState_free(LocalSchedulerState *state) {
signal(SIGTERM, SIG_DFL); signal(SIGTERM, SIG_DFL);
/* Kill any child processes that didn't register as a worker yet. */ /* Kill any child processes that didn't register as a worker yet. */
pid_t *worker_pid; for (auto const &worker_pid : state->child_pids) {
for (worker_pid = (pid_t *) utarray_front(state->child_pids); kill(worker_pid, SIGKILL);
worker_pid != NULL; waitpid(worker_pid, NULL, 0);
worker_pid = (pid_t *) utarray_next(state->child_pids, worker_pid)) { LOG_INFO("Killed worker pid %d which hadn't started yet.", worker_pid);
kill(*worker_pid, SIGKILL);
waitpid(*worker_pid, NULL, 0);
LOG_DEBUG("Killed pid %d", *worker_pid);
} }
utarray_free(state->child_pids);
/* Kill any registered workers. */ /* Kill any registered workers. */
/* TODO(swang): It's possible that the local scheduler will exit before all /* TODO(swang): It's possible that the local scheduler will exit before all
* of its task table updates make it to redis. */ * of its task table updates make it to redis. */
for (LocalSchedulerClient **worker = while (state->workers.size() > 0) {
(LocalSchedulerClient **) utarray_front(state->workers); /* Note that kill_worker modifies the container state->workers, so it is
worker != NULL; * important to do this loop in a way that does not use invalidated
worker = (LocalSchedulerClient **) utarray_front(state->workers)) { * iterators. */
kill_worker(*worker, true); kill_worker(state, state->workers.back(), true);
} }
/* Disconnect from plasma. */ /* Disconnect from plasma. */
@ -208,11 +198,6 @@ void LocalSchedulerState_free(LocalSchedulerState *state) {
state->config.start_worker_command = NULL; state->config.start_worker_command = NULL;
} }
/* Free the list of workers and any tasks that are still in progress on those
* workers. */
utarray_free(state->workers);
state->workers = NULL;
/* Free the mapping from the actor ID to the ID of the local scheduler /* Free the mapping from the actor ID to the ID of the local scheduler
* responsible for that actor. */ * responsible for that actor. */
actor_map_entry *current_actor_map_entry, *temp_actor_map_entry; actor_map_entry *current_actor_map_entry, *temp_actor_map_entry;
@ -231,8 +216,9 @@ void LocalSchedulerState_free(LocalSchedulerState *state) {
/* Destroy the event loop. */ /* Destroy the event loop. */
event_loop_destroy(state->loop); event_loop_destroy(state->loop);
state->loop = NULL; state->loop = NULL;
/* Free the scheduler state. */ /* Free the scheduler state. */
free(state); delete state;
} }
/** /**
@ -250,7 +236,7 @@ void start_worker(LocalSchedulerState *state, ActorID actor_id) {
/* Launch the process to create the worker. */ /* Launch the process to create the worker. */
pid_t pid = fork(); pid_t pid = fork();
if (pid != 0) { if (pid != 0) {
utarray_push_back(state->child_pids, &pid); state->child_pids.push_back(pid);
LOG_INFO("Started worker with pid %d", pid); LOG_INFO("Started worker with pid %d", pid);
return; return;
} }
@ -332,8 +318,7 @@ LocalSchedulerState *LocalSchedulerState_init(
const double static_resource_conf[], const double static_resource_conf[],
const char *start_worker_command, const char *start_worker_command,
int num_workers) { int num_workers) {
LocalSchedulerState *state = LocalSchedulerState *state = new LocalSchedulerState();
(LocalSchedulerState *) malloc(sizeof(LocalSchedulerState));
/* Set the configuration struct for the local scheduler. */ /* Set the configuration struct for the local scheduler. */
if (start_worker_command != NULL) { if (start_worker_command != NULL) {
state->config.start_worker_command = parse_command(start_worker_command); state->config.start_worker_command = parse_command(start_worker_command);
@ -348,8 +333,7 @@ LocalSchedulerState *LocalSchedulerState_init(
state->config.global_scheduler_exists = global_scheduler_exists; state->config.global_scheduler_exists = global_scheduler_exists;
state->loop = loop; state->loop = loop;
/* Initialize the list of workers. */
utarray_new(state->workers, &workers_icd);
/* Initialize the hash table mapping actor ID to the ID of the local scheduler /* Initialize the hash table mapping actor ID to the ID of the local scheduler
* that is responsible for that actor. */ * that is responsible for that actor. */
state->actor_mapping = NULL; state->actor_mapping = NULL;
@ -418,7 +402,6 @@ LocalSchedulerState *LocalSchedulerState_init(
print_resource_info(state, NULL); print_resource_info(state, NULL);
/* Start the initial set of workers. */ /* Start the initial set of workers. */
utarray_new(state->child_pids, &pid_t_icd);
for (int i = 0; i < num_workers; ++i) { for (int i = 0; i < num_workers; ++i) {
start_worker(state, NIL_ACTOR_ID); start_worker(state, NIL_ACTOR_ID);
} }
@ -663,7 +646,7 @@ void send_client_register_reply(LocalSchedulerState *state,
fbb.GetSize(), fbb.GetBufferPointer()) < 0) { fbb.GetSize(), fbb.GetBufferPointer()) < 0) {
if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
/* Something went wrong, so kill the worker. */ /* Something went wrong, so kill the worker. */
kill_worker(worker, false); kill_worker(state, worker, false);
LOG_WARN( LOG_WARN(
"Failed to give send register client reply to worker on fd %d. The " "Failed to give send register client reply to worker on fd %d. The "
"client may have hung up.", "client may have hung up.",
@ -682,7 +665,7 @@ void handle_client_register(LocalSchedulerState *state,
if (message->is_worker()) { if (message->is_worker()) {
/* Update the actor mapping with the actor ID of the worker (if an actor is /* Update the actor mapping with the actor ID of the worker (if an actor is
* running on the worker). */ * running on the worker). */
int64_t worker_pid = message->worker_pid(); worker->pid = message->worker_pid();
ActorID actor_id = from_flatbuf(message->actor_id()); ActorID actor_id = from_flatbuf(message->actor_id());
if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) { if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) {
/* Make sure that the local scheduler is aware that it is responsible for /* Make sure that the local scheduler is aware that it is responsible for
@ -702,30 +685,25 @@ void handle_client_register(LocalSchedulerState *state,
} }
/* Register worker process id with the scheduler. */ /* Register worker process id with the scheduler. */
worker->pid = worker_pid;
/* Determine if this worker is one of our child processes. */ /* Determine if this worker is one of our child processes. */
LOG_DEBUG("PID is %d", worker_pid); LOG_DEBUG("PID is %d", worker->pid);
pid_t *child_pid; auto it = std::find(state->child_pids.begin(), state->child_pids.end(),
int index = 0; worker->pid);
for (child_pid = (pid_t *) utarray_front(state->child_pids); if (it != state->child_pids.end()) {
child_pid != NULL;
child_pid = (pid_t *) utarray_next(state->child_pids, child_pid)) {
if (*child_pid == worker_pid) {
/* If this worker is one of our child processes, mark it as a child so /* If this worker is one of our child processes, mark it as a child so
* that we know that we can wait for the process to exit during * that we know that we can wait for the process to exit during
* cleanup. */ * cleanup. */
worker->is_child = true; worker->is_child = true;
utarray_erase(state->child_pids, index, 1); state->child_pids.erase(it);
LOG_DEBUG("Found matching child pid %d", worker_pid); LOG_DEBUG("Found matching child pid %d", worker->pid);
break;
}
++index;
} }
} else { } else {
/* Register the driver. Currently we don't do anything here. */ /* Register the driver. Currently we don't do anything here. */
} }
} }
/* End of the cleanup code. */
void process_message(event_loop *loop, void process_message(event_loop *loop,
int client_sock, int client_sock,
void *context, void *context,
@ -822,7 +800,7 @@ void process_message(event_loop *loop,
} break; } break;
case DISCONNECT_CLIENT: { case DISCONNECT_CLIENT: {
LOG_INFO("Disconnecting client on fd %d", client_sock); LOG_INFO("Disconnecting client on fd %d", client_sock);
kill_worker(worker, false); kill_worker(state, worker, false);
if (!ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { if (!ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
/* Let the scheduling algorithm process the absence of this worker. */ /* Let the scheduling algorithm process the absence of this worker. */
handle_actor_worker_disconnect(state, state->algorithm_state, handle_actor_worker_disconnect(state, state->algorithm_state,
@ -861,8 +839,7 @@ void new_client_connection(event_loop *loop,
int new_socket = accept_client(listener_sock); int new_socket = accept_client(listener_sock);
/* Create a struct for this worker. This will be freed when we free the local /* Create a struct for this worker. This will be freed when we free the local
* scheduler state. */ * scheduler state. */
LocalSchedulerClient *worker = LocalSchedulerClient *worker = new LocalSchedulerClient();
(LocalSchedulerClient *) malloc(sizeof(LocalSchedulerClient));
worker->sock = new_socket; worker->sock = new_socket;
worker->task_in_progress = NULL; worker->task_in_progress = NULL;
worker->is_blocked = false; worker->is_blocked = false;
@ -870,7 +847,7 @@ void new_client_connection(event_loop *loop,
worker->is_child = false; worker->is_child = false;
worker->actor_id = NIL_ACTOR_ID; worker->actor_id = NIL_ACTOR_ID;
worker->local_scheduler_state = state; worker->local_scheduler_state = state;
utarray_push_back(state->workers, &worker); state->workers.push_back(worker);
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message,
worker); worker);
LOG_DEBUG("new connection with fd %d", new_socket); LOG_DEBUG("new connection with fd %d", new_socket);
@ -894,18 +871,18 @@ void signal_handler(int signal) {
} }
} }
/* End of the cleanup code. */ void handle_task_scheduled_callback(Task *original_task,
void *subscribe_context) {
void handle_task_scheduled_callback(Task *original_task, void *user_context) { LocalSchedulerState *state = (LocalSchedulerState *) subscribe_context;
TaskSpec *spec = Task_task_spec(original_task); TaskSpec *spec = Task_task_spec(original_task);
if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) {
/* This task does not involve an actor. Handle it normally. */ /* This task does not involve an actor. Handle it normally. */
handle_task_scheduled(g_state, g_state->algorithm_state, spec, handle_task_scheduled(state, state->algorithm_state, spec,
Task_task_spec_size(original_task)); Task_task_spec_size(original_task));
} else { } else {
/* This task involves an actor. Call the scheduling algorithm's actor /* This task involves an actor. Call the scheduling algorithm's actor
* handler. */ * handler. */
handle_actor_task_scheduled(g_state, g_state->algorithm_state, spec, handle_actor_task_scheduled(state, state->algorithm_state, spec,
Task_task_spec_size(original_task)); Task_task_spec_size(original_task));
} }
} }
@ -992,7 +969,7 @@ void start_server(const char *node_ip_address,
if (g_state->db != NULL) { if (g_state->db != NULL) {
task_table_subscribe(g_state->db, get_db_client_id(g_state->db), task_table_subscribe(g_state->db, get_db_client_id(g_state->db),
TASK_STATUS_SCHEDULED, handle_task_scheduled_callback, TASK_STATUS_SCHEDULED, handle_task_scheduled_callback,
NULL, NULL, NULL, NULL); g_state, NULL, NULL, NULL);
} }
/* Subscribe to notifications about newly created actors. */ /* Subscribe to notifications about newly created actors. */
if (g_state->db != NULL) { if (g_state->db != NULL) {

View file

@ -72,12 +72,15 @@ void print_resource_info(const LocalSchedulerState *s, const TaskSpec *spec);
/** /**
* Kill a worker. * Kill a worker.
* *
* @param state The local scheduler state.
* @param worker The local scheduler client to kill. * @param worker The local scheduler client to kill.
* @param wait A boolean representing whether to wait for the killed worker to * @param wait A boolean representing whether to wait for the killed worker to
* exit. * exit.
* @param Void. * @param Void.
*/ */
void kill_worker(LocalSchedulerClient *worker, bool wait); void kill_worker(LocalSchedulerState *state,
LocalSchedulerClient *worker,
bool wait);
/** /**
* Start a worker. This forks a new worker process that can be added to the * Start a worker. This forks a new worker process that can be added to the

View file

@ -1,10 +1,11 @@
#include "local_scheduler_algorithm.h" #include "local_scheduler_algorithm.h"
#include <stdbool.h> #include <stdbool.h>
#include "utarray.h"
#include <list> #include <list>
#include <vector> #include <vector>
#include "utarray.h" #include <unordered_map>
#include "utlist.h"
#include "state/task_table.h" #include "state/task_table.h"
#include "state/local_scheduler_table.h" #include "state/local_scheduler_table.h"
@ -23,29 +24,25 @@ struct TaskQueueEntry {
}; };
/** A data structure used to track which objects are available locally and /** A data structure used to track which objects are available locally and
* which objects are being actively fetched. */ * which objects are being actively fetched. Objects of this type are used for
typedef struct { * both the scheduling algorithm state's local_objects and remot_objects
* tables. An ObjectEntry should be in at most one of the tables and not both
* simultaneously. */
struct ObjectEntry {
/** Object id of this object. */ /** Object id of this object. */
ObjectID object_id; ObjectID object_id;
/** A vector of tasks dependent on this object. These tasks are a subset of /** A vector of tasks dependent on this object. These tasks are a subset of
* the tasks in the waiting queue. Each element actually stores a reference * the tasks in the waiting queue. Each element actually stores a reference
* to the corresponding task's queue entry in waiting queue, for fast * to the corresponding task's queue entry in waiting queue, for fast
* deletion when all of the task's dependencies become available. */ * deletion when all of the task's dependencies become available. */
std::vector<std::list<TaskQueueEntry>::iterator> *dependent_tasks; std::vector<std::list<TaskQueueEntry>::iterator> dependent_tasks;
/** Handle for the uthash table. NOTE: This handle is used for both the };
* scheduling algorithm state's local_objects and remote_objects tables.
* We must enforce the uthash invariant that the entry be in at most one of
* the tables. */
UT_hash_handle hh;
} object_entry;
/** This is used to define the queue of actor task specs for which the /** This is used to define the queue of actor task specs for which the
* corresponding local scheduler is unknown. */ * corresponding local scheduler is unknown. */
UT_icd task_spec_icd = {sizeof(TaskSpec *), NULL, NULL, NULL}; UT_icd task_spec_icd = {sizeof(TaskSpec *), NULL, NULL, NULL};
/** This is used to keep track of task spec sizes in the above queue. */ /** This is used to keep track of task spec sizes in the above queue. */
UT_icd task_spec_size_icd = {sizeof(int64_t), NULL, NULL, NULL}; UT_icd task_spec_size_icd = {sizeof(int64_t), NULL, NULL, NULL};
/** This is used to define the queue of available workers. */
UT_icd worker_icd = {sizeof(LocalSchedulerClient *), NULL, NULL, NULL};
/** This struct contains information about a specific actor. This struct will be /** This struct contains information about a specific actor. This struct will be
* used inside of a hash table. */ * used inside of a hash table. */
@ -92,25 +89,25 @@ struct SchedulingAlgorithmState {
/** An array of pointers to workers in the worker pool. These are workers /** An array of pointers to workers in the worker pool. These are workers
* that have registered a PID with us and that are now waiting to be * that have registered a PID with us and that are now waiting to be
* assigned a task to execute. */ * assigned a task to execute. */
UT_array *available_workers; std::vector<LocalSchedulerClient *> available_workers;
/** An array of pointers to workers that are currently executing a task, /** An array of pointers to workers that are currently executing a task,
* unblocked. These are the workers that are leasing some number of * unblocked. These are the workers that are leasing some number of
* resources. */ * resources. */
UT_array *executing_workers; std::vector<LocalSchedulerClient *> executing_workers;
/** An array of pointers to workers that are currently executing a task, /** An array of pointers to workers that are currently executing a task,
* blocked on some object(s) that isn't available locally yet. These are the * blocked on some object(s) that isn't available locally yet. These are the
* workers that are executing a task, but that have temporarily returned the * workers that are executing a task, but that have temporarily returned the
* task's required resources. */ * task's required resources. */
UT_array *blocked_workers; std::vector<LocalSchedulerClient *> blocked_workers;
/** A hash map of the objects that are available in the local Plasma store. /** A hash map of the objects that are available in the local Plasma store.
* The key is the object ID. This information could be a little stale. */ * The key is the object ID. This information could be a little stale. */
object_entry *local_objects; std::unordered_map<ObjectID, ObjectEntry, UniqueIDHasher> local_objects;
/** A hash map of the objects that are not available locally. These are /** A hash map of the objects that are not available locally. These are
* currently being fetched by this local scheduler. The key is the object * currently being fetched by this local scheduler. The key is the object
* ID. Every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS, a Plasma fetch * ID. Every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS, a Plasma fetch
* request will be sent the object IDs in this table. Each entry also holds * request will be sent the object IDs in this table. Each entry also holds
* an array of queued tasks that are dependent on it. */ * an array of queued tasks that are dependent on it. */
object_entry *remote_objects; std::unordered_map<ObjectID, ObjectEntry, UniqueIDHasher> remote_objects;
}; };
TaskQueueEntry TaskQueueEntry_init(TaskSpec *spec, int64_t task_spec_size) { TaskQueueEntry TaskQueueEntry_init(TaskSpec *spec, int64_t task_spec_size) {
@ -126,12 +123,7 @@ void TaskQueueEntry_free(TaskQueueEntry *entry) {
} }
SchedulingAlgorithmState *SchedulingAlgorithmState_init(void) { SchedulingAlgorithmState *SchedulingAlgorithmState_init(void) {
SchedulingAlgorithmState *algorithm_state = SchedulingAlgorithmState *algorithm_state = new SchedulingAlgorithmState();
(SchedulingAlgorithmState *) malloc(sizeof(SchedulingAlgorithmState));
/* Initialize an empty hash map for the cache of local available objects. */
algorithm_state->local_objects = NULL;
/* Initialize the hash table of objects being fetched. */
algorithm_state->remote_objects = NULL;
/* Initialize the local data structures used for queuing tasks and workers. */ /* Initialize the local data structures used for queuing tasks and workers. */
algorithm_state->waiting_task_queue = new std::list<TaskQueueEntry>(); algorithm_state->waiting_task_queue = new std::list<TaskQueueEntry>();
algorithm_state->dispatch_task_queue = new std::list<TaskQueueEntry>(); algorithm_state->dispatch_task_queue = new std::list<TaskQueueEntry>();
@ -142,9 +134,6 @@ SchedulingAlgorithmState *SchedulingAlgorithmState_init(void) {
algorithm_state->local_actor_infos = NULL; algorithm_state->local_actor_infos = NULL;
utarray_new(algorithm_state->available_workers, &worker_icd);
utarray_new(algorithm_state->executing_workers, &worker_icd);
utarray_new(algorithm_state->blocked_workers, &worker_icd);
return algorithm_state; return algorithm_state;
} }
@ -178,36 +167,50 @@ void SchedulingAlgorithmState_free(SchedulingAlgorithmState *algorithm_state) {
} }
utarray_free(algorithm_state->cached_submitted_actor_tasks); utarray_free(algorithm_state->cached_submitted_actor_tasks);
utarray_free(algorithm_state->cached_submitted_actor_task_sizes); utarray_free(algorithm_state->cached_submitted_actor_task_sizes);
/* Free the list of available workers. */
utarray_free(algorithm_state->available_workers);
utarray_free(algorithm_state->executing_workers);
utarray_free(algorithm_state->blocked_workers);
/* Free the cached information about which objects are present locally. */
object_entry *obj_entry, *tmp_obj_entry;
HASH_ITER(hh, algorithm_state->local_objects, obj_entry, tmp_obj_entry) {
HASH_DELETE(hh, algorithm_state->local_objects, obj_entry);
/* The check below is commented out because it could fail if
* SchedulingAlgorithmState_free is called in response to a SIGINT which
* arrives in the middle of handle_object_available. */
/* CHECK(obj_entry->dependent_tasks->empty()); */
delete obj_entry->dependent_tasks;
free(obj_entry);
}
/* Free the cached information about which objects are currently being
* fetched. */
HASH_ITER(hh, algorithm_state->remote_objects, obj_entry, tmp_obj_entry) {
HASH_DELETE(hh, algorithm_state->remote_objects, obj_entry);
delete obj_entry->dependent_tasks;
free(obj_entry);
}
/* Free the algorithm state. */ /* Free the algorithm state. */
free(algorithm_state); delete algorithm_state;
}
/**
* This is a helper method to check if a worker is in a vector of workers.
*
* @param worker_vector A vector of workers.
* @param The worker to look for in the vector.
* @return True if the worker is in the vector and false otherwise.
*/
bool worker_in_vector(std::vector<LocalSchedulerClient *> &worker_vector,
LocalSchedulerClient *worker) {
auto it = std::find(worker_vector.begin(), worker_vector.end(), worker);
return it != worker_vector.end();
}
/**
* This is a helper method to remove a worker from a vector of workers if it is
* present in the vector.
*
* @param worker_vector A vector of workers.
* @param The worker to remove.
* @return True if the worker was removed and false otherwise.
*/
bool remove_worker_from_vector(
std::vector<LocalSchedulerClient *> &worker_vector,
LocalSchedulerClient *worker) {
/* Find the worker in the list of executing workers. */
auto it = std::find(worker_vector.begin(), worker_vector.end(), worker);
bool remove_worker = (it != worker_vector.end());
if (remove_worker) {
/* Remove the worker from the list of workers. */
using std::swap;
swap(*it, worker_vector.back());
worker_vector.pop_back();
}
return remove_worker;
} }
void provide_scheduler_info(LocalSchedulerState *state, void provide_scheduler_info(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state, SchedulingAlgorithmState *algorithm_state,
LocalSchedulerInfo *info) { LocalSchedulerInfo *info) {
info->total_num_workers = utarray_len(state->workers); info->total_num_workers = state->workers.size();
/* TODO(swang): Provide separate counts for tasks that are waiting for /* TODO(swang): Provide separate counts for tasks that are waiting for
* dependencies vs tasks that are waiting to be assigned. */ * dependencies vs tasks that are waiting to be assigned. */
int64_t waiting_task_queue_length = int64_t waiting_task_queue_length =
@ -216,7 +219,7 @@ void provide_scheduler_info(LocalSchedulerState *state,
algorithm_state->dispatch_task_queue->size(); algorithm_state->dispatch_task_queue->size();
info->task_queue_length = info->task_queue_length =
waiting_task_queue_length + dispatch_task_queue_length; waiting_task_queue_length + dispatch_task_queue_length;
info->available_workers = utarray_len(algorithm_state->available_workers); info->available_workers = algorithm_state->available_workers.size();
/* Copy static and dynamic resource information. */ /* Copy static and dynamic resource information. */
for (int i = 0; i < ResourceIndex_MAX; i++) { for (int i = 0; i < ResourceIndex_MAX; i++) {
info->dynamic_resources[i] = state->dynamic_resources[i]; info->dynamic_resources[i] = state->dynamic_resources[i];
@ -463,10 +466,7 @@ void fetch_missing_dependency(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state, SchedulingAlgorithmState *algorithm_state,
std::list<TaskQueueEntry>::iterator task_entry_it, std::list<TaskQueueEntry>::iterator task_entry_it,
ObjectID obj_id) { ObjectID obj_id) {
object_entry *entry; if (algorithm_state->remote_objects.count(obj_id) == 0) {
HASH_FIND(hh, algorithm_state->remote_objects, &obj_id, sizeof(obj_id),
entry);
if (entry == NULL) {
/* We weren't actively fetching this object. Try the fetch once /* We weren't actively fetching this object. Try the fetch once
* immediately. */ * immediately. */
if (plasma_manager_is_connected(state->plasma_conn)) { if (plasma_manager_is_connected(state->plasma_conn)) {
@ -477,14 +477,12 @@ void fetch_missing_dependency(LocalSchedulerState *state,
* hash table of locally available objects in handle_object_available when * hash table of locally available objects in handle_object_available when
* the object becomes available locally. It will get freed if the object is * the object becomes available locally. It will get freed if the object is
* subsequently removed locally. */ * subsequently removed locally. */
entry = (object_entry *) malloc(sizeof(object_entry)); ObjectEntry entry;
entry->object_id = obj_id; entry.object_id = obj_id;
entry->dependent_tasks = algorithm_state->remote_objects[obj_id] = entry;
new std::vector<std::list<TaskQueueEntry>::iterator>();
HASH_ADD(hh, algorithm_state->remote_objects, object_id,
sizeof(entry->object_id), entry);
} }
entry->dependent_tasks->push_back(task_entry_it); algorithm_state->remote_objects[obj_id].dependent_tasks.push_back(
task_entry_it);
} }
/** /**
@ -507,10 +505,7 @@ void fetch_missing_dependencies(
for (int i = 0; i < num_args; ++i) { for (int i = 0; i < num_args; ++i) {
if (TaskSpec_arg_by_ref(task, i)) { if (TaskSpec_arg_by_ref(task, i)) {
ObjectID obj_id = TaskSpec_arg_id(task, i); ObjectID obj_id = TaskSpec_arg_id(task, i);
object_entry *entry; if (algorithm_state->local_objects.count(obj_id) == 0) {
HASH_FIND(hh, algorithm_state->local_objects, &obj_id, sizeof(obj_id),
entry);
if (entry == NULL) {
/* If the entry is not yet available locally, record the dependency. */ /* If the entry is not yet available locally, record the dependency. */
fetch_missing_dependency(state, algorithm_state, task_entry_it, obj_id); fetch_missing_dependency(state, algorithm_state, task_entry_it, obj_id);
++num_missing_dependencies; ++num_missing_dependencies;
@ -535,10 +530,7 @@ bool can_run(SchedulingAlgorithmState *algorithm_state, TaskSpec *task) {
for (int i = 0; i < num_args; ++i) { for (int i = 0; i < num_args; ++i) {
if (TaskSpec_arg_by_ref(task, i)) { if (TaskSpec_arg_by_ref(task, i)) {
ObjectID obj_id = TaskSpec_arg_id(task, i); ObjectID obj_id = TaskSpec_arg_id(task, i);
object_entry *entry; if (algorithm_state->local_objects.count(obj_id) == 0) {
HASH_FIND(hh, algorithm_state->local_objects, &obj_id, sizeof(obj_id),
entry);
if (entry == NULL) {
/* The object is not present locally, so this task cannot be scheduled /* The object is not present locally, so this task cannot be scheduled
* right now. */ * right now. */
return false; return false;
@ -558,15 +550,14 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
} }
/* Allocate a buffer to hold all the object IDs for active fetch requests. */ /* Allocate a buffer to hold all the object IDs for active fetch requests. */
int num_object_ids = HASH_COUNT(state->algorithm_state->remote_objects); int num_object_ids = state->algorithm_state->remote_objects.size();
ObjectID *object_ids = (ObjectID *) malloc(num_object_ids * sizeof(ObjectID)); ObjectID *object_ids = (ObjectID *) malloc(num_object_ids * sizeof(ObjectID));
/* Fill out the request with the object IDs for active fetches. */ /* Fill out the request with the object IDs for active fetches. */
object_entry *fetch_request, *tmp;
int i = 0; int i = 0;
HASH_ITER(hh, state->algorithm_state->remote_objects, fetch_request, tmp) { for (auto const &entry : state->algorithm_state->remote_objects) {
object_ids[i] = fetch_request->object_id; object_ids[i] = entry.second.object_id;
++i; i++;
} }
plasma_fetch(state->plasma_conn, num_object_ids, object_ids); plasma_fetch(state->plasma_conn, num_object_ids, object_ids);
for (int i = 0; i < num_object_ids; ++i) { for (int i = 0; i < num_object_ids; ++i) {
@ -592,8 +583,8 @@ void dispatch_tasks(LocalSchedulerState *state,
/* If there is a task to assign, but there are no more available workers in /* If there is a task to assign, but there are no more available workers in
* the worker pool, then exit. Ensure that there will be an available * the worker pool, then exit. Ensure that there will be an available
* worker during a future invocation of dispatch_tasks. */ * worker during a future invocation of dispatch_tasks. */
if (utarray_len(algorithm_state->available_workers) == 0) { if (algorithm_state->available_workers.size() == 0) {
if (utarray_len(state->child_pids) == 0) { if (state->child_pids.size() == 0) {
/* If there are no workers, including those pending PID registration, /* If there are no workers, including those pending PID registration,
* then we must start a new one to replenish the worker pool. */ * then we must start a new one to replenish the worker pool. */
start_worker(state, NIL_ACTOR_ID); start_worker(state, NIL_ACTOR_ID);
@ -632,14 +623,13 @@ void dispatch_tasks(LocalSchedulerState *state,
/* Dispatch this task to an available worker and dequeue the task. */ /* Dispatch this task to an available worker and dequeue the task. */
LOG_DEBUG("Dispatching task"); LOG_DEBUG("Dispatching task");
/* Get the last available worker in the available worker queue. */ /* Get the last available worker in the available worker queue. */
LocalSchedulerClient **worker = (LocalSchedulerClient **) utarray_back( LocalSchedulerClient *worker = algorithm_state->available_workers.back();
algorithm_state->available_workers);
/* Tell the available worker to execute the task. */ /* Tell the available worker to execute the task. */
assign_task_to_worker(state, task.spec, task.task_spec_size, *worker); assign_task_to_worker(state, task.spec, task.task_spec_size, worker);
/* Remove the worker from the available queue, and add it to the executing /* Remove the worker from the available queue, and add it to the executing
* workers. */ * workers. */
utarray_pop_back(algorithm_state->available_workers); algorithm_state->available_workers.pop_back();
utarray_push_back(algorithm_state->executing_workers, worker); algorithm_state->executing_workers.push_back(worker);
print_resource_info(state, task.spec); print_resource_info(state, task.spec);
/* Free the task queue entry. */ /* Free the task queue entry. */
TaskQueueEntry_free(&task); TaskQueueEntry_free(&task);
@ -845,7 +835,7 @@ void handle_task_submitted(LocalSchedulerState *state,
* dispatch queue and trigger task dispatch. Otherwise, pass the task along to * dispatch queue and trigger task dispatch. Otherwise, pass the task along to
* the global scheduler if there is one. */ * the global scheduler if there is one. */
if (resource_constraints_satisfied(state, spec) && if (resource_constraints_satisfied(state, spec) &&
(utarray_len(algorithm_state->available_workers) > 0) && (algorithm_state->available_workers.size() > 0) &&
can_run(algorithm_state, spec)) { can_run(algorithm_state, spec)) {
queue_dispatch_task(state, algorithm_state, spec, task_spec_size, false); queue_dispatch_task(state, algorithm_state, spec, task_spec_size, false);
} else { } else {
@ -972,38 +962,20 @@ void handle_worker_available(LocalSchedulerState *state,
LocalSchedulerClient *worker) { LocalSchedulerClient *worker) {
CHECK(worker->task_in_progress == NULL); CHECK(worker->task_in_progress == NULL);
/* Check that the worker isn't in the pool of available workers. */ /* Check that the worker isn't in the pool of available workers. */
for (LocalSchedulerClient **p = (LocalSchedulerClient **) utarray_front( DCHECK(!worker_in_vector(algorithm_state->available_workers, worker));
algorithm_state->available_workers);
p != NULL; p = (LocalSchedulerClient **) utarray_next(
algorithm_state->available_workers, p)) {
DCHECK(*p != worker);
}
/* Check that the worker isn't in the list of blocked workers. */ /* Check that the worker isn't in the list of blocked workers. */
for (LocalSchedulerClient **p = (LocalSchedulerClient **) utarray_front( DCHECK(!worker_in_vector(algorithm_state->blocked_workers, worker));
algorithm_state->blocked_workers);
p != NULL; p = (LocalSchedulerClient **) utarray_next(
algorithm_state->blocked_workers, p)) {
DCHECK(*p != worker);
}
/* If the worker was executing a task, it must have finished, so remove it /* If the worker was executing a task, it must have finished, so remove it
* from the list of executing workers. */ * from the list of executing workers. If the worker is connecting for the
for (int i = 0; i < utarray_len(algorithm_state->executing_workers); ++i) { * first time, it will not be in the list of executing workers. */
LocalSchedulerClient **p = (LocalSchedulerClient **) utarray_eltptr( remove_worker_from_vector(algorithm_state->executing_workers, worker);
algorithm_state->executing_workers, i); /* Double check that we successfully removed the worker. */
if (*p == worker) { DCHECK(!worker_in_vector(algorithm_state->executing_workers, worker));
utarray_erase(algorithm_state->executing_workers, i, 1);
break;
}
}
/* Check that we actually erased the worker. */
for (int i = 0; i < utarray_len(algorithm_state->executing_workers); ++i) {
LocalSchedulerClient **p = (LocalSchedulerClient **) utarray_eltptr(
algorithm_state->executing_workers, i);
DCHECK(*p != worker);
}
/* Add worker to the list of available workers. */ /* Add worker to the list of available workers. */
utarray_push_back(algorithm_state->available_workers, &worker); algorithm_state->available_workers.push_back(worker);
/* Try to dispatch tasks, since we now have available workers to assign them /* Try to dispatch tasks, since we now have available workers to assign them
* to. */ * to. */
@ -1014,44 +986,31 @@ void handle_worker_removed(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state, SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker) { LocalSchedulerClient *worker) {
/* Make sure that we remove the worker at most once. */ /* Make sure that we remove the worker at most once. */
bool removed = false; int num_times_removed = 0;
int64_t num_workers;
/* Remove the worker from available workers, if it's there. */ /* Remove the worker from available workers, if it's there. */
num_workers = utarray_len(algorithm_state->available_workers); bool removed_from_available =
for (int64_t i = num_workers - 1; i >= 0; --i) { remove_worker_from_vector(algorithm_state->available_workers, worker);
LocalSchedulerClient **p = (LocalSchedulerClient **) utarray_eltptr( num_times_removed += removed_from_available;
algorithm_state->available_workers, i); /* Double check that we actually removed the worker. */
DCHECK(!((*p == worker) && removed)); DCHECK(!worker_in_vector(algorithm_state->available_workers, worker));
if (*p == worker) {
utarray_erase(algorithm_state->available_workers, i, 1);
removed = true;
}
}
/* Remove the worker from executing workers, if it's there. */ /* Remove the worker from executing workers, if it's there. */
num_workers = utarray_len(algorithm_state->executing_workers); bool removed_from_executing =
for (int64_t i = num_workers - 1; i >= 0; --i) { remove_worker_from_vector(algorithm_state->executing_workers, worker);
LocalSchedulerClient **p = (LocalSchedulerClient **) utarray_eltptr( num_times_removed += removed_from_executing;
algorithm_state->executing_workers, i); /* Double check that we actually removed the worker. */
DCHECK(!((*p == worker) && removed)); DCHECK(!worker_in_vector(algorithm_state->executing_workers, worker));
if (*p == worker) {
utarray_erase(algorithm_state->executing_workers, i, 1);
removed = true;
}
}
/* Remove the worker from blocked workers, if it's there. */ /* Remove the worker from blocked workers, if it's there. */
num_workers = utarray_len(algorithm_state->blocked_workers); bool removed_from_blocked =
for (int64_t i = num_workers - 1; i >= 0; --i) { remove_worker_from_vector(algorithm_state->blocked_workers, worker);
LocalSchedulerClient **p = (LocalSchedulerClient **) utarray_eltptr( num_times_removed += removed_from_blocked;
algorithm_state->blocked_workers, i); /* Double check that we actually removed the worker. */
DCHECK(!((*p == worker) && removed)); DCHECK(!worker_in_vector(algorithm_state->blocked_workers, worker));
if (*p == worker) {
utarray_erase(algorithm_state->blocked_workers, i, 1); /* Make sure we removed the worker at most once. */
removed = true; CHECK(num_times_removed <= 1);
}
}
} }
void handle_actor_worker_available(LocalSchedulerState *state, void handle_actor_worker_available(LocalSchedulerState *state,
@ -1075,24 +1034,14 @@ void handle_worker_blocked(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state, SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker) { LocalSchedulerClient *worker) {
/* Find the worker in the list of executing workers. */ /* Find the worker in the list of executing workers. */
for (int i = 0; i < utarray_len(algorithm_state->executing_workers); ++i) { CHECK(remove_worker_from_vector(algorithm_state->executing_workers, worker));
LocalSchedulerClient **p = (LocalSchedulerClient **) utarray_eltptr(
algorithm_state->executing_workers, i);
if (*p == worker) {
/* Remove the worker from the list of executing workers. */
utarray_erase(algorithm_state->executing_workers, i, 1);
/* Check that the worker isn't in the list of blocked workers. */ /* Check that the worker isn't in the list of blocked workers. */
for (LocalSchedulerClient **q = (LocalSchedulerClient **) utarray_front( DCHECK(!worker_in_vector(algorithm_state->blocked_workers, worker));
algorithm_state->blocked_workers);
q != NULL; q = (LocalSchedulerClient **) utarray_next(
algorithm_state->blocked_workers, q)) {
DCHECK(*q != worker);
}
/* Add the worker to the list of blocked workers. */ /* Add the worker to the list of blocked workers. */
worker->is_blocked = true; worker->is_blocked = true;
utarray_push_back(algorithm_state->blocked_workers, &worker); algorithm_state->blocked_workers.push_back(worker);
/* Return the resources that the blocked worker was using. */ /* Return the resources that the blocked worker was using. */
CHECK(worker->task_in_progress != NULL); CHECK(worker->task_in_progress != NULL);
TaskSpec *spec = Task_task_spec(worker->task_in_progress); TaskSpec *spec = Task_task_spec(worker->task_in_progress);
@ -1100,35 +1049,16 @@ void handle_worker_blocked(LocalSchedulerState *state,
/* Try to dispatch tasks, since we may have freed up some resources. */ /* Try to dispatch tasks, since we may have freed up some resources. */
dispatch_tasks(state, algorithm_state); dispatch_tasks(state, algorithm_state);
return;
}
}
/* The worker should have been in the list of executing workers, so this line
* should be unreachable. */
LOG_FATAL(
"Worker registered as blocked, but it was not in the list of executing "
"workers.");
} }
void handle_worker_unblocked(LocalSchedulerState *state, void handle_worker_unblocked(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state, SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker) { LocalSchedulerClient *worker) {
/* Find the worker in the list of blocked workers. */ /* Find the worker in the list of blocked workers. */
for (int i = 0; i < utarray_len(algorithm_state->blocked_workers); ++i) { CHECK(remove_worker_from_vector(algorithm_state->blocked_workers, worker));
LocalSchedulerClient **p = (LocalSchedulerClient **) utarray_eltptr(
algorithm_state->blocked_workers, i);
if (*p == worker) {
/* Remove the worker from the list of blocked workers. */
utarray_erase(algorithm_state->blocked_workers, i, 1);
/* Check that the worker isn't in the list of executing workers. */ /* Check that the worker isn't in the list of executing workers. */
for (LocalSchedulerClient **q = (LocalSchedulerClient **) utarray_front( DCHECK(!worker_in_vector(algorithm_state->executing_workers, worker));
algorithm_state->executing_workers);
q != NULL; q = (LocalSchedulerClient **) utarray_next(
algorithm_state->executing_workers, q)) {
DCHECK(*q != worker);
}
/* Lease back the resources that the blocked worker will need. */ /* Lease back the resources that the blocked worker will need. */
/* TODO(swang): Leasing back the resources to blocked workers can cause /* TODO(swang): Leasing back the resources to blocked workers can cause
@ -1140,47 +1070,34 @@ void handle_worker_unblocked(LocalSchedulerState *state,
update_dynamic_resources(state, spec, false); update_dynamic_resources(state, spec, false);
/* Add the worker to the list of executing workers. */ /* Add the worker to the list of executing workers. */
worker->is_blocked = false; worker->is_blocked = false;
utarray_push_back(algorithm_state->executing_workers, &worker); algorithm_state->executing_workers.push_back(worker);
return;
}
}
/* The worker should have been in the list of blocked workers, so this line
* should be unreachable. */
LOG_FATAL(
"Worker registered as unblocked, but it was not in the list of blocked "
"workers.");
} }
void handle_object_available(LocalSchedulerState *state, void handle_object_available(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state, SchedulingAlgorithmState *algorithm_state,
ObjectID object_id) { ObjectID object_id) {
auto object_entry_it = algorithm_state->remote_objects.find(object_id);
ObjectEntry entry;
/* Get the entry for this object from the active fetch request, or allocate /* Get the entry for this object from the active fetch request, or allocate
* one if needed. */ * one if needed. */
object_entry *entry; if (object_entry_it != algorithm_state->remote_objects.end()) {
HASH_FIND(hh, algorithm_state->remote_objects, &object_id, sizeof(object_id),
entry);
if (entry != NULL) {
/* Remove the object from the active fetch requests. */ /* Remove the object from the active fetch requests. */
HASH_DELETE(hh, algorithm_state->remote_objects, entry); entry = object_entry_it->second;
algorithm_state->remote_objects.erase(object_id);
} else { } else {
/* Allocate a new object entry. Object entries will get freed if the object /* Create a new object entry. */
* is removed. */ entry.object_id = object_id;
entry = (object_entry *) malloc(sizeof(object_entry));
entry->object_id = object_id;
entry->dependent_tasks =
new std::vector<std::list<TaskQueueEntry>::iterator>();
} }
/* Add the entry to the set of locally available objects. */ /* Add the entry to the set of locally available objects. */
HASH_ADD(hh, algorithm_state->local_objects, object_id, sizeof(object_id), CHECK(algorithm_state->local_objects.count(object_id) == 0);
entry); algorithm_state->local_objects[object_id] = entry;
if (!entry->dependent_tasks->empty()) { if (!entry.dependent_tasks.empty()) {
/* Out of the tasks that were dependent on this object, if they are now /* Out of the tasks that were dependent on this object, if they are now
* ready to run, move them to the dispatch queue. */ * ready to run, move them to the dispatch queue. */
for (auto &it : *entry->dependent_tasks) { for (auto &it : entry.dependent_tasks) {
if (can_run(algorithm_state, it->spec)) { if (can_run(algorithm_state, it->spec)) {
LOG_DEBUG("Moved task to dispatch queue"); LOG_DEBUG("Moved task to dispatch queue");
algorithm_state->dispatch_task_queue->push_back(*it); algorithm_state->dispatch_task_queue->push_back(*it);
@ -1193,7 +1110,7 @@ void handle_object_available(LocalSchedulerState *state,
* queue. */ * queue. */
dispatch_tasks(state, algorithm_state); dispatch_tasks(state, algorithm_state);
/* Clean up the records for dependent tasks. */ /* Clean up the records for dependent tasks. */
entry->dependent_tasks->clear(); entry.dependent_tasks.clear();
} }
} }
@ -1201,13 +1118,9 @@ void handle_object_removed(LocalSchedulerState *state,
ObjectID removed_object_id) { ObjectID removed_object_id) {
/* Remove the object from the set of locally available objects. */ /* Remove the object from the set of locally available objects. */
SchedulingAlgorithmState *algorithm_state = state->algorithm_state; SchedulingAlgorithmState *algorithm_state = state->algorithm_state;
object_entry *entry;
HASH_FIND(hh, algorithm_state->local_objects, &removed_object_id, CHECK(algorithm_state->local_objects.count(removed_object_id) == 1);
sizeof(removed_object_id), entry); algorithm_state->local_objects.erase(removed_object_id);
CHECK(entry != NULL);
HASH_DELETE(hh, algorithm_state->local_objects, entry);
delete entry->dependent_tasks;
free(entry);
/* Track queued tasks that were dependent on this object. /* Track queued tasks that were dependent on this object.
* NOTE: Since objects often get removed in batches (e.g., during eviction), * NOTE: Since objects often get removed in batches (e.g., during eviction),
@ -1260,7 +1173,7 @@ int num_dispatch_tasks(SchedulingAlgorithmState *algorithm_state) {
void print_worker_info(const char *message, void print_worker_info(const char *message,
SchedulingAlgorithmState *algorithm_state) { SchedulingAlgorithmState *algorithm_state) {
LOG_DEBUG("%s: %d available, %d executing, %d blocked", message, LOG_DEBUG("%s: %d available, %d executing, %d blocked", message,
utarray_len(algorithm_state->available_workers), algorithm_state->available_workers.size(),
utarray_len(algorithm_state->executing_workers), algorithm_state->executing_workers.size(),
utarray_len(algorithm_state->blocked_workers)); algorithm_state->blocked_workers.size());
} }

View file

@ -8,10 +8,10 @@
#include "utarray.h" #include "utarray.h"
#include "uthash.h" #include "uthash.h"
#include <vector>
/* These are needed to define the UT_arrays. */ /* These are needed to define the UT_arrays. */
extern UT_icd task_ptr_icd; extern UT_icd task_ptr_icd;
extern UT_icd workers_icd;
extern UT_icd pid_t_icd;
/** This struct is used to maintain a mapping from actor IDs to the ID of the /** This struct is used to maintain a mapping from actor IDs to the ID of the
* local scheduler that is responsible for the actor. */ * local scheduler that is responsible for the actor. */
@ -27,6 +27,8 @@ typedef struct {
/** Internal state of the scheduling algorithm. */ /** Internal state of the scheduling algorithm. */
typedef struct SchedulingAlgorithmState SchedulingAlgorithmState; typedef struct SchedulingAlgorithmState SchedulingAlgorithmState;
struct LocalSchedulerClient;
/** A struct storing the configuration state of the local scheduler. This should /** A struct storing the configuration state of the local scheduler. This should
* consist of values that don't change over the lifetime of the local * consist of values that don't change over the lifetime of the local
* scheduler. */ * scheduler. */
@ -38,7 +40,7 @@ typedef struct {
} local_scheduler_config; } local_scheduler_config;
/** The state of the local scheduler. */ /** The state of the local scheduler. */
typedef struct { struct LocalSchedulerState {
/** The configuration for the local scheduler. */ /** The configuration for the local scheduler. */
local_scheduler_config config; local_scheduler_config config;
/** The local scheduler event loop. */ /** The local scheduler event loop. */
@ -46,10 +48,10 @@ typedef struct {
/** List of workers available to this node. This is used to free the worker /** List of workers available to this node. This is used to free the worker
* structs when we free the scheduler state and also to access the worker * structs when we free the scheduler state and also to access the worker
* structs in the tests. */ * structs in the tests. */
UT_array *workers; std::vector<LocalSchedulerClient *> workers;
/** List of the process IDs for child processes (workers) started by the /** List of the process IDs for child processes (workers) started by the
* local scheduler that have not sent a REGISTER_PID message yet. */ * local scheduler that have not sent a REGISTER_PID message yet. */
UT_array *child_pids; std::vector<pid_t> child_pids;
/** A hash table mapping actor IDs to the db_client_id of the local scheduler /** A hash table mapping actor IDs to the db_client_id of the local scheduler
* that is responsible for the actor. */ * that is responsible for the actor. */
actor_map_entry *actor_mapping; actor_map_entry *actor_mapping;
@ -68,10 +70,10 @@ typedef struct {
/** Vector of dynamic attributes associated with the node owned by this local /** Vector of dynamic attributes associated with the node owned by this local
* scheduler. */ * scheduler. */
double dynamic_resources[ResourceIndex_MAX]; double dynamic_resources[ResourceIndex_MAX];
} LocalSchedulerState; };
/** Contains all information associated with a local scheduler client. */ /** Contains all information associated with a local scheduler client. */
typedef struct { struct LocalSchedulerClient {
/** The socket used to communicate with the client. */ /** The socket used to communicate with the client. */
int sock; int sock;
/** A pointer to the task object that is currently running on this client. If /** A pointer to the task object that is currently running on this client. If
@ -91,6 +93,6 @@ typedef struct {
ActorID actor_id; ActorID actor_id;
/** A pointer to the local scheduler state. */ /** A pointer to the local scheduler state. */
LocalSchedulerState *local_scheduler_state; LocalSchedulerState *local_scheduler_state;
} LocalSchedulerClient; };
#endif /* LOCAL_SCHEDULER_SHARED_H */ #endif /* LOCAL_SCHEDULER_SHARED_H */

View file

@ -65,11 +65,9 @@ static void register_clients(int num_mock_workers, LocalSchedulerMock *mock) {
new_client_connection(mock->loop, mock->local_scheduler_fd, new_client_connection(mock->loop, mock->local_scheduler_fd,
(void *) mock->local_scheduler_state, 0); (void *) mock->local_scheduler_state, 0);
LocalSchedulerClient **worker = (LocalSchedulerClient **) utarray_eltptr( LocalSchedulerClient *worker = mock->local_scheduler_state->workers[i];
mock->local_scheduler_state->workers, i);
process_message(mock->local_scheduler_state->loop, (*worker)->sock, *worker, process_message(mock->local_scheduler_state->loop, worker->sock, worker, 0);
0);
} }
} }
@ -147,12 +145,9 @@ void LocalSchedulerMock_free(LocalSchedulerMock *mock) {
/* Kill all the workers and run the event loop again so that the task table /* Kill all the workers and run the event loop again so that the task table
* updates propagate and the tasks in progress are freed. */ * updates propagate and the tasks in progress are freed. */
LocalSchedulerClient **worker = (LocalSchedulerClient **) utarray_eltptr( while (mock->local_scheduler_state->workers.size() > 0) {
mock->local_scheduler_state->workers, 0); LocalSchedulerClient *worker = mock->local_scheduler_state->workers.front();
while (worker != NULL) { kill_worker(mock->local_scheduler_state, worker, true);
kill_worker(*worker, true);
worker = (LocalSchedulerClient **) utarray_eltptr(
mock->local_scheduler_state->workers, 0);
} }
event_loop_add_timer(mock->loop, 500, event_loop_add_timer(mock->loop, 500,
(event_loop_timer_handler) timeout_handler, NULL); (event_loop_timer_handler) timeout_handler, NULL);
@ -446,8 +441,7 @@ TEST task_dependency_test(void) {
LocalSchedulerState *state = local_scheduler->local_scheduler_state; LocalSchedulerState *state = local_scheduler->local_scheduler_state;
SchedulingAlgorithmState *algorithm_state = state->algorithm_state; SchedulingAlgorithmState *algorithm_state = state->algorithm_state;
/* Get the first worker. */ /* Get the first worker. */
LocalSchedulerClient *worker = LocalSchedulerClient *worker = state->workers.front();
*((LocalSchedulerClient **) utarray_eltptr(state->workers, 0));
int64_t task_size; int64_t task_size;
TaskSpec *spec = example_task_spec(1, 1, &task_size); TaskSpec *spec = example_task_spec(1, 1, &task_size);
ObjectID oid = TaskSpec_arg_id(spec, 0); ObjectID oid = TaskSpec_arg_id(spec, 0);
@ -522,8 +516,7 @@ TEST task_multi_dependency_test(void) {
LocalSchedulerState *state = local_scheduler->local_scheduler_state; LocalSchedulerState *state = local_scheduler->local_scheduler_state;
SchedulingAlgorithmState *algorithm_state = state->algorithm_state; SchedulingAlgorithmState *algorithm_state = state->algorithm_state;
/* Get the first worker. */ /* Get the first worker. */
LocalSchedulerClient *worker = LocalSchedulerClient *worker = state->workers.front();
*((LocalSchedulerClient **) utarray_eltptr(state->workers, 0));
int64_t task_size; int64_t task_size;
TaskSpec *spec = example_task_spec(2, 1, &task_size); TaskSpec *spec = example_task_spec(2, 1, &task_size);
ObjectID oid1 = TaskSpec_arg_id(spec, 0); ObjectID oid1 = TaskSpec_arg_id(spec, 0);
@ -598,9 +591,9 @@ TEST start_kill_workers_test(void) {
LocalSchedulerMock *local_scheduler = LocalSchedulerMock_init(num_workers, 0); LocalSchedulerMock *local_scheduler = LocalSchedulerMock_init(num_workers, 0);
/* We start off with num_workers children processes, but no workers /* We start off with num_workers children processes, but no workers
* registered yet. */ * registered yet. */
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), ASSERT_EQ(local_scheduler->local_scheduler_state->child_pids.size(),
num_workers); num_workers);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), 0); ASSERT_EQ(local_scheduler->local_scheduler_state->workers.size(), 0);
/* Make sure that each worker connects to the local_scheduler scheduler. This /* Make sure that each worker connects to the local_scheduler scheduler. This
* for loop will hang if one of the workers does not connect. */ * for loop will hang if one of the workers does not connect. */
@ -613,29 +606,26 @@ TEST start_kill_workers_test(void) {
/* After handling each worker's initial connection, we should now have all /* After handling each worker's initial connection, we should now have all
* workers accounted for, but we haven't yet matched up process IDs with our * workers accounted for, but we haven't yet matched up process IDs with our
* children processes. */ * children processes. */
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), ASSERT_EQ(local_scheduler->local_scheduler_state->child_pids.size(),
num_workers); num_workers);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), ASSERT_EQ(local_scheduler->local_scheduler_state->workers.size(),
num_workers); num_workers);
/* Each worker should register its process ID. */ /* Each worker should register its process ID. */
for (int i = 0; for (auto const &worker : local_scheduler->local_scheduler_state->workers) {
i < utarray_len(local_scheduler->local_scheduler_state->workers); ++i) {
LocalSchedulerClient *worker = *(LocalSchedulerClient **) utarray_eltptr(
local_scheduler->local_scheduler_state->workers, i);
process_message(local_scheduler->local_scheduler_state->loop, worker->sock, process_message(local_scheduler->local_scheduler_state->loop, worker->sock,
worker, 0); worker, 0);
} }
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 0); ASSERT_EQ(local_scheduler->local_scheduler_state->child_pids.size(), 0);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), ASSERT_EQ(local_scheduler->local_scheduler_state->workers.size(),
num_workers); num_workers);
/* After killing a worker, its state is cleaned up. */ /* After killing a worker, its state is cleaned up. */
LocalSchedulerClient *worker = *(LocalSchedulerClient **) utarray_eltptr( LocalSchedulerClient *worker =
local_scheduler->local_scheduler_state->workers, 0); local_scheduler->local_scheduler_state->workers.front();
kill_worker(worker, false); kill_worker(local_scheduler->local_scheduler_state, worker, false);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 0); ASSERT_EQ(local_scheduler->local_scheduler_state->child_pids.size(), 0);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), ASSERT_EQ(local_scheduler->local_scheduler_state->workers.size(),
num_workers - 1); num_workers - 1);
/* Start a worker after the local scheduler has been initialized. */ /* Start a worker after the local scheduler has been initialized. */
@ -643,23 +633,22 @@ TEST start_kill_workers_test(void) {
/* Accept the workers as clients to the plasma manager. */ /* Accept the workers as clients to the plasma manager. */
int new_worker_fd = accept_client(local_scheduler->plasma_manager_fd); int new_worker_fd = accept_client(local_scheduler->plasma_manager_fd);
/* The new worker should register its process ID. */ /* The new worker should register its process ID. */
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 1); ASSERT_EQ(local_scheduler->local_scheduler_state->child_pids.size(), 1);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), ASSERT_EQ(local_scheduler->local_scheduler_state->workers.size(),
num_workers - 1); num_workers - 1);
/* Make sure the new worker connects to the local_scheduler scheduler. */ /* Make sure the new worker connects to the local_scheduler scheduler. */
new_client_connection(local_scheduler->loop, new_client_connection(local_scheduler->loop,
local_scheduler->local_scheduler_fd, local_scheduler->local_scheduler_fd,
(void *) local_scheduler->local_scheduler_state, 0); (void *) local_scheduler->local_scheduler_state, 0);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 1); ASSERT_EQ(local_scheduler->local_scheduler_state->child_pids.size(), 1);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), ASSERT_EQ(local_scheduler->local_scheduler_state->workers.size(),
num_workers); num_workers);
/* Make sure that the new worker registers its process ID. */ /* Make sure that the new worker registers its process ID. */
worker = *(LocalSchedulerClient **) utarray_eltptr( worker = local_scheduler->local_scheduler_state->workers[num_workers - 1];
local_scheduler->local_scheduler_state->workers, num_workers - 1);
process_message(local_scheduler->local_scheduler_state->loop, worker->sock, process_message(local_scheduler->local_scheduler_state->loop, worker->sock,
worker, 0); worker, 0);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->child_pids), 0); ASSERT_EQ(local_scheduler->local_scheduler_state->child_pids.size(), 0);
ASSERT_EQ(utarray_len(local_scheduler->local_scheduler_state->workers), ASSERT_EQ(local_scheduler->local_scheduler_state->workers.size(),
num_workers); num_workers);
/* Clean up. */ /* Clean up. */