From 2bbfc5da8dd863cb9b3ea5630a8dd1338a54e6ec Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 28 Apr 2017 17:36:43 -0700 Subject: [PATCH] Dispatch actor tasks when actor connects. (#495) --- .../local_scheduler_algorithm.cc | 107 +++++++++--------- 1 file changed, 54 insertions(+), 53 deletions(-) diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 55a068b70..874bef5c7 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -282,6 +282,59 @@ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) { algorithm_state->local_actor_infos.erase(actor_id); } +/** + * Dispatch a task to an actor if possible. + * + * @param state The state of the local scheduler. + * @param algorithm_state The state of the scheduling algorithm. + * @param actor_id The ID of the actor corresponding to the worker. + * @return True if a task was dispatched to the actor and false otherwise. + */ +bool dispatch_actor_task(LocalSchedulerState *state, + SchedulingAlgorithmState *algorithm_state, + ActorID actor_id) { + /* Make sure this worker actually is an actor. */ + CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); + /* Make sure this actor belongs to this local scheduler. */ + CHECK(state->actor_mapping.count(actor_id) == 1); + CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id, + get_db_client_id(state->db))) + + /* Get the local actor entry for this actor. */ + CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0); + LocalActorInfo &entry = + algorithm_state->local_actor_infos.find(actor_id)->second; + + if (entry.task_queue->empty()) { + /* There are no queued tasks for this actor, so we cannot dispatch a task to + * the actor. */ + return false; + } + TaskQueueEntry first_task = entry.task_queue->front(); + int64_t next_task_counter = TaskSpec_actor_counter(first_task.spec); + if (next_task_counter != entry.task_counter) { + /* We cannot execute the next task on this actor without violating the + * in-order execution guarantee for actor tasks. */ + CHECK(next_task_counter > entry.task_counter); + return false; + } + /* If the worker is not available, we cannot assign a task to it. */ + if (!entry.worker_available) { + return false; + } + /* Assign the first task in the task queue to the worker and mark the worker + * as unavailable. */ + entry.task_counter += 1; + assign_task_to_worker(state, first_task.spec, first_task.task_spec_size, + entry.worker); + entry.worker_available = false; + /* Free the task queue entry. */ + TaskQueueEntry_free(&first_task); + /* Remove the task from the actor's task queue. */ + entry.task_queue->pop_front(); + return true; +} + void handle_actor_worker_connect(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, ActorID actor_id, @@ -294,6 +347,7 @@ void handle_actor_worker_connect(LocalSchedulerState *state, * filled out, so fill out the correct worker field now. */ algorithm_state->local_actor_infos[actor_id].worker = worker; } + dispatch_actor_task(state, algorithm_state, actor_id); } void handle_actor_worker_disconnect(LocalSchedulerState *state, @@ -381,59 +435,6 @@ void add_task_to_actor_queue(LocalSchedulerState *state, } } -/** - * Dispatch a task to an actor if possible. - * - * @param state The state of the local scheduler. - * @param algorithm_state The state of the scheduling algorithm. - * @param actor_id The ID of the actor corresponding to the worker. - * @return True if a task was dispatched to the actor and false otherwise. - */ -bool dispatch_actor_task(LocalSchedulerState *state, - SchedulingAlgorithmState *algorithm_state, - ActorID actor_id) { - /* Make sure this worker actually is an actor. */ - CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); - /* Make sure this actor belongs to this local scheduler. */ - CHECK(state->actor_mapping.count(actor_id) == 1); - CHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id, - get_db_client_id(state->db))) - - /* Get the local actor entry for this actor. */ - CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0); - LocalActorInfo &entry = - algorithm_state->local_actor_infos.find(actor_id)->second; - - if (entry.task_queue->empty()) { - /* There are no queued tasks for this actor, so we cannot dispatch a task to - * the actor. */ - return false; - } - TaskQueueEntry first_task = entry.task_queue->front(); - int64_t next_task_counter = TaskSpec_actor_counter(first_task.spec); - if (next_task_counter != entry.task_counter) { - /* We cannot execute the next task on this actor without violating the - * in-order execution guarantee for actor tasks. */ - CHECK(next_task_counter > entry.task_counter); - return false; - } - /* If the worker is not available, we cannot assign a task to it. */ - if (!entry.worker_available) { - return false; - } - /* Assign the first task in the task queue to the worker and mark the worker - * as unavailable. */ - entry.task_counter += 1; - assign_task_to_worker(state, first_task.spec, first_task.task_spec_size, - entry.worker); - entry.worker_available = false; - /* Free the task queue entry. */ - TaskQueueEntry_free(&first_task); - /* Remove the task from the actor's task queue. */ - entry.task_queue->pop_front(); - return true; -} - /** * Fetch a queued task's missing object dependency. The fetch request will be * retried every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS until the object is