When a task is passed to the global scheduler, if it is not received,… (#1106)

* When a task is passed to the global scheduler, if it is not received, then try again.
* Call give_task_to_global_scheduler directly (same with local).
This commit is contained in:
Robert Nishihara 2017-10-12 00:04:38 -07:00 committed by Alexey Tumanov
parent 6109cc0782
commit b585001881
3 changed files with 44 additions and 6 deletions

View file

@ -6,7 +6,7 @@ include(${CMAKE_CURRENT_LIST_DIR}/cmake/Common.cmake)
add_subdirectory(redis_module)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC -g")
include_directories(thirdparty/ae)

View file

@ -416,8 +416,10 @@ void start_server(const char *node_ip_address,
* before this call to subscribe. */
db_client_table_subscribe(g_state->db, process_new_db_client,
(void *) g_state, NULL, NULL, NULL);
/* Subscribe to notifications about waiting tasks. TODO(rkn): this may need to
* get tasks that were submitted to the database before the subscribe. */
/* Subscribe to notifications about waiting tasks. If a local scheduler
* submits tasks to the global scheduler before the global scheduler
* successfully subscribes, then the local scheduler that submitted the tasks
* will retry. */
task_table_subscribe(g_state->db, NIL_ID, TASK_STATUS_WAITING,
process_task_waiting, (void *) g_state, NULL, NULL,
NULL);

View file

@ -14,6 +14,17 @@
/* Declared for convenience. */
void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id);
void give_task_to_global_scheduler(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
TaskSpec *spec,
int64_t task_spec_size);
void give_task_to_local_scheduler(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
TaskSpec *spec,
int64_t task_spec_size,
DBClientID local_scheduler_id);
struct TaskQueueEntry {
/** The task that is queued. */
TaskSpec *spec;
@ -892,8 +903,14 @@ void give_task_to_local_scheduler_retry(UniqueID id,
CHECK(Task_state(task) == TASK_STATUS_SCHEDULED);
TaskSpec *spec = Task_task_spec(task);
handle_actor_task_submitted(state, state->algorithm_state, spec,
Task_task_spec_size(task));
ActorID actor_id = TaskSpec_actor_id(spec);
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
CHECK(state->actor_mapping.count(actor_id) == 1);
give_task_to_local_scheduler(
state, state->algorithm_state, spec, Task_task_spec_size(task),
state->actor_mapping[actor_id].local_scheduler_id);
}
/**
@ -927,6 +944,20 @@ void give_task_to_local_scheduler(LocalSchedulerState *state,
task_table_add_task(state->db, task, &retryInfo, NULL, state);
}
void give_task_to_global_scheduler_retry(UniqueID id,
void *user_context,
void *user_data) {
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
Task *task = (Task *) user_data;
CHECK(Task_state(task) == TASK_STATUS_WAITING);
TaskSpec *spec = Task_task_spec(task);
CHECK(ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID));
give_task_to_global_scheduler(state, state->algorithm_state, spec,
Task_task_spec_size(task));
}
/**
* Give a task to the global scheduler to schedule.
*
@ -948,7 +979,12 @@ void give_task_to_global_scheduler(LocalSchedulerState *state,
DCHECK(state->config.global_scheduler_exists);
Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_WAITING, NIL_ID);
DCHECK(state->db != NULL);
task_table_add_task(state->db, task, NULL, NULL, NULL);
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
.timeout = 0, // This value is unused.
.fail_callback = give_task_to_global_scheduler_retry,
};
task_table_add_task(state->db, task, &retryInfo, NULL, state);
}
bool resource_constraints_satisfied(LocalSchedulerState *state,