Dispatch actor tasks when actor connects. (#495)

This commit is contained in:
Robert Nishihara 2017-04-28 17:36:43 -07:00 committed by Philipp Moritz
parent 6d301d9079
commit 2bbfc5da8d

View file

@ -282,6 +282,59 @@ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) {
algorithm_state->local_actor_infos.erase(actor_id);
}
/**
* Dispatch a task to an actor if possible.
*
* @param state The state of the local scheduler.
* @param algorithm_state The state of the scheduling algorithm.
* @param actor_id The ID of the actor corresponding to the worker.
* @return True if a task was dispatched to the actor and false otherwise.
*/
bool dispatch_actor_task(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ActorID actor_id) {
/* Make sure this worker actually is an actor. */
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
/* Make sure this actor belongs to this local scheduler. */
CHECK(state->actor_mapping.count(actor_id) == 1);
CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db)))
/* Get the local actor entry for this actor. */
CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0);
LocalActorInfo &entry =
algorithm_state->local_actor_infos.find(actor_id)->second;
if (entry.task_queue->empty()) {
/* There are no queued tasks for this actor, so we cannot dispatch a task to
* the actor. */
return false;
}
TaskQueueEntry first_task = entry.task_queue->front();
int64_t next_task_counter = TaskSpec_actor_counter(first_task.spec);
if (next_task_counter != entry.task_counter) {
/* We cannot execute the next task on this actor without violating the
* in-order execution guarantee for actor tasks. */
CHECK(next_task_counter > entry.task_counter);
return false;
}
/* If the worker is not available, we cannot assign a task to it. */
if (!entry.worker_available) {
return false;
}
/* Assign the first task in the task queue to the worker and mark the worker
* as unavailable. */
entry.task_counter += 1;
assign_task_to_worker(state, first_task.spec, first_task.task_spec_size,
entry.worker);
entry.worker_available = false;
/* Free the task queue entry. */
TaskQueueEntry_free(&first_task);
/* Remove the task from the actor's task queue. */
entry.task_queue->pop_front();
return true;
}
void handle_actor_worker_connect(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ActorID actor_id,
@ -294,6 +347,7 @@ void handle_actor_worker_connect(LocalSchedulerState *state,
* filled out, so fill out the correct worker field now. */
algorithm_state->local_actor_infos[actor_id].worker = worker;
}
dispatch_actor_task(state, algorithm_state, actor_id);
}
void handle_actor_worker_disconnect(LocalSchedulerState *state,
@ -381,59 +435,6 @@ void add_task_to_actor_queue(LocalSchedulerState *state,
}
}
/**
* Dispatch a task to an actor if possible.
*
* @param state The state of the local scheduler.
* @param algorithm_state The state of the scheduling algorithm.
* @param actor_id The ID of the actor corresponding to the worker.
* @return True if a task was dispatched to the actor and false otherwise.
*/
bool dispatch_actor_task(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ActorID actor_id) {
/* Make sure this worker actually is an actor. */
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
/* Make sure this actor belongs to this local scheduler. */
CHECK(state->actor_mapping.count(actor_id) == 1);
CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id,
get_db_client_id(state->db)))
/* Get the local actor entry for this actor. */
CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0);
LocalActorInfo &entry =
algorithm_state->local_actor_infos.find(actor_id)->second;
if (entry.task_queue->empty()) {
/* There are no queued tasks for this actor, so we cannot dispatch a task to
* the actor. */
return false;
}
TaskQueueEntry first_task = entry.task_queue->front();
int64_t next_task_counter = TaskSpec_actor_counter(first_task.spec);
if (next_task_counter != entry.task_counter) {
/* We cannot execute the next task on this actor without violating the
* in-order execution guarantee for actor tasks. */
CHECK(next_task_counter > entry.task_counter);
return false;
}
/* If the worker is not available, we cannot assign a task to it. */
if (!entry.worker_available) {
return false;
}
/* Assign the first task in the task queue to the worker and mark the worker
* as unavailable. */
entry.task_counter += 1;
assign_task_to_worker(state, first_task.spec, first_task.task_spec_size,
entry.worker);
entry.worker_available = false;
/* Free the task queue entry. */
TaskQueueEntry_free(&first_task);
/* Remove the task from the actor's task queue. */
entry.task_queue->pop_front();
return true;
}
/**
* Fetch a queued task's missing object dependency. The fetch request will be
* retried every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS until the object is