From b58500188143bbefb2aa358f32c9042885ffec8e Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 12 Oct 2017 00:04:38 -0700 Subject: [PATCH] =?UTF-8?q?When=20a=20task=20is=20passed=20to=20the=20glob?= =?UTF-8?q?al=20scheduler,=20if=20it=20is=20not=20received,=E2=80=A6=20(#1?= =?UTF-8?q?106)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * When a task is passed to the global scheduler, if it is not received, then try again. * Call give_task_to_global_scheduler directly (same with local). --- src/common/CMakeLists.txt | 2 +- src/global_scheduler/global_scheduler.cc | 6 ++- .../local_scheduler_algorithm.cc | 42 +++++++++++++++++-- 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 9492daf59..4ac69ed5c 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -6,7 +6,7 @@ include(${CMAKE_CURRENT_LIST_DIR}/cmake/Common.cmake) add_subdirectory(redis_module) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC -g") include_directories(thirdparty/ae) diff --git a/src/global_scheduler/global_scheduler.cc b/src/global_scheduler/global_scheduler.cc index ff3209cd8..3650d85d3 100644 --- a/src/global_scheduler/global_scheduler.cc +++ b/src/global_scheduler/global_scheduler.cc @@ -416,8 +416,10 @@ void start_server(const char *node_ip_address, * before this call to subscribe. */ db_client_table_subscribe(g_state->db, process_new_db_client, (void *) g_state, NULL, NULL, NULL); - /* Subscribe to notifications about waiting tasks. TODO(rkn): this may need to - * get tasks that were submitted to the database before the subscribe. */ + /* Subscribe to notifications about waiting tasks. If a local scheduler + * submits tasks to the global scheduler before the global scheduler + * successfully subscribes, then the local scheduler that submitted the tasks + * will retry. */ task_table_subscribe(g_state->db, NIL_ID, TASK_STATUS_WAITING, process_task_waiting, (void *) g_state, NULL, NULL, NULL); diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 4ab796ca2..9f7adac41 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -14,6 +14,17 @@ /* Declared for convenience. */ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id); +void give_task_to_global_scheduler(LocalSchedulerState *state, + SchedulingAlgorithmState *algorithm_state, + TaskSpec *spec, + int64_t task_spec_size); + +void give_task_to_local_scheduler(LocalSchedulerState *state, + SchedulingAlgorithmState *algorithm_state, + TaskSpec *spec, + int64_t task_spec_size, + DBClientID local_scheduler_id); + struct TaskQueueEntry { /** The task that is queued. */ TaskSpec *spec; @@ -892,8 +903,14 @@ void give_task_to_local_scheduler_retry(UniqueID id, CHECK(Task_state(task) == TASK_STATUS_SCHEDULED); TaskSpec *spec = Task_task_spec(task); - handle_actor_task_submitted(state, state->algorithm_state, spec, - Task_task_spec_size(task)); + + ActorID actor_id = TaskSpec_actor_id(spec); + CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); + CHECK(state->actor_mapping.count(actor_id) == 1); + + give_task_to_local_scheduler( + state, state->algorithm_state, spec, Task_task_spec_size(task), + state->actor_mapping[actor_id].local_scheduler_id); } /** @@ -927,6 +944,20 @@ void give_task_to_local_scheduler(LocalSchedulerState *state, task_table_add_task(state->db, task, &retryInfo, NULL, state); } +void give_task_to_global_scheduler_retry(UniqueID id, + void *user_context, + void *user_data) { + LocalSchedulerState *state = (LocalSchedulerState *) user_context; + Task *task = (Task *) user_data; + CHECK(Task_state(task) == TASK_STATUS_WAITING); + + TaskSpec *spec = Task_task_spec(task); + CHECK(ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)); + + give_task_to_global_scheduler(state, state->algorithm_state, spec, + Task_task_spec_size(task)); +} + /** * Give a task to the global scheduler to schedule. * @@ -948,7 +979,12 @@ void give_task_to_global_scheduler(LocalSchedulerState *state, DCHECK(state->config.global_scheduler_exists); Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_WAITING, NIL_ID); DCHECK(state->db != NULL); - task_table_add_task(state->db, task, NULL, NULL, NULL); + auto retryInfo = RetryInfo{ + .num_retries = 0, // This value is unused. + .timeout = 0, // This value is unused. + .fail_callback = give_task_to_global_scheduler_retry, + }; + task_table_add_task(state->db, task, &retryInfo, NULL, state); } bool resource_constraints_satisfied(LocalSchedulerState *state,