From 844a6afcdd94f2a41e2dfeac158be4220b77a16b Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Tue, 13 Feb 2018 00:09:35 -0800 Subject: [PATCH] Implement simple random spillback policy. (#1493) * spillback policy implementation: global + local scheduler * modernize global scheduler policy state; factor out random number engine and generator * Minimal version. * Fix test. * Make load balancing test less strenuous. --- python/ray/global_scheduler/test/test.py | 4 +- src/common/state/ray_config.h | 8 +- src/global_scheduler/global_scheduler.h | 2 +- .../global_scheduler_algorithm.cc | 103 +++++++++++++++--- .../global_scheduler_algorithm.h | 31 +++++- src/local_scheduler/local_scheduler.cc | 3 + .../local_scheduler_algorithm.cc | 28 ++++- .../local_scheduler_algorithm.h | 7 ++ test/runtest.py | 8 +- 9 files changed, 162 insertions(+), 32 deletions(-) diff --git a/python/ray/global_scheduler/test/test.py b/python/ray/global_scheduler/test/test.py index 175f4cc32..4151e51f6 100644 --- a/python/ray/global_scheduler/test/test.py +++ b/python/ray/global_scheduler/test/test.py @@ -302,7 +302,9 @@ class TestGlobalScheduler(unittest.TestCase): num_retries -= 1 time.sleep(0.1) - self.assertEqual(num_tasks_done, num_tasks) + # Tasks can either be queued or in the global scheduler due to + # spillback. + self.assertEqual(num_tasks_done + num_tasks_waiting, num_tasks) @unittest.skipIf( os.environ.get('RAY_USE_NEW_GCS', False), diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h index 19507b836..36c79eaed 100644 --- a/src/common/state/ray_config.h +++ b/src/common/state/ray_config.h @@ -80,6 +80,8 @@ class RayConfig { int64_t L3_cache_size_bytes() const { return L3_cache_size_bytes_; } + int64_t max_tasks_to_spillback() const { return max_tasks_to_spillback_; } + private: RayConfig() : ray_protocol_version_(0x0000000000000000), @@ -105,7 +107,8 @@ class RayConfig { redis_db_connect_retries_(50), redis_db_connect_wait_milliseconds_(100), plasma_default_release_delay_(64), - L3_cache_size_bytes_(100000000) {} + L3_cache_size_bytes_(100000000), + max_tasks_to_spillback_(10) {} ~RayConfig() {} @@ -179,6 +182,9 @@ class RayConfig { /// TODO(rkn): These constants are currently unused. int64_t plasma_default_release_delay_; int64_t L3_cache_size_bytes_; + + /// Constants for the spillback scheduling policy. + int64_t max_tasks_to_spillback_; }; #endif // RAY_CONFIG_H diff --git a/src/global_scheduler/global_scheduler.h b/src/global_scheduler/global_scheduler.h index f9019126d..30a64b2fd 100644 --- a/src/global_scheduler/global_scheduler.h +++ b/src/global_scheduler/global_scheduler.h @@ -31,7 +31,7 @@ typedef struct { LocalSchedulerInfo info; } LocalScheduler; -typedef struct GlobalSchedulerPolicyState GlobalSchedulerPolicyState; +typedef class GlobalSchedulerPolicyState GlobalSchedulerPolicyState; /** * This defines a hash table used to cache information about different objects. diff --git a/src/global_scheduler/global_scheduler_algorithm.cc b/src/global_scheduler/global_scheduler_algorithm.cc index 0581f2792..764f90ca0 100644 --- a/src/global_scheduler/global_scheduler_algorithm.cc +++ b/src/global_scheduler/global_scheduler_algorithm.cc @@ -7,7 +7,6 @@ GlobalSchedulerPolicyState *GlobalSchedulerPolicyState_init(void) { GlobalSchedulerPolicyState *policy_state = new GlobalSchedulerPolicyState(); - policy_state->round_robin_index = 0; return policy_state; } @@ -111,37 +110,105 @@ double calculate_cost_pending(const GlobalSchedulerState *state, locally_available_data_size(state, scheduler->id, task_spec); /* TODO(rkn): This logic does not load balance properly when the different * machines have different sizes. Fix this. */ - return scheduler->num_recent_tasks_sent + scheduler->info.task_queue_length; + double cost_pending = scheduler->num_recent_tasks_sent + + scheduler->info.task_queue_length - + scheduler->info.available_workers; + return cost_pending; } -bool handle_task_waiting(GlobalSchedulerState *state, - GlobalSchedulerPolicyState *policy_state, - Task *task) { +bool handle_task_waiting_random(GlobalSchedulerState *state, + GlobalSchedulerPolicyState *policy_state, + Task *task) { TaskSpec *task_spec = Task_task_execution_spec(task)->Spec(); + CHECKM(task_spec != NULL, + "task wait handler encounted a task with NULL spec"); + + std::vector feasible_nodes; + + for (const auto &it : state->local_schedulers) { + // Local scheduler map iterator yields pairs. + const LocalScheduler &local_scheduler = it.second; + if (!constraints_satisfied_hard(&local_scheduler, task_spec)) { + continue; + } + // Add this local scheduler as a candidate for random selection. + feasible_nodes.push_back(it.first); + } + + if (feasible_nodes.size() == 0) { + std::string id_string = Task_task_id(task).hex(); + LOG_ERROR( + "Infeasible task. No nodes satisfy hard constraints for task = %s", + id_string.c_str()); + return false; + } + + // Randomly select the local scheduler. TODO(atumanov): replace with + // std::discrete_distribution. + std::uniform_int_distribution<> dis(0, feasible_nodes.size() - 1); + DBClientID local_scheduler_id = + feasible_nodes[dis(policy_state->getRandomGenerator())]; + CHECKM(!local_scheduler_id.is_nil(), + "Task is feasible, but doesn't have a local scheduler assigned."); + // A local scheduler ID was found, so assign the task. + assign_task_to_local_scheduler(state, task, local_scheduler_id); + return true; +} + +bool handle_task_waiting_cost(GlobalSchedulerState *state, + GlobalSchedulerPolicyState *policy_state, + Task *task) { + TaskSpec *task_spec = Task_task_execution_spec(task)->Spec(); + int64_t curtime = current_time_ms(); CHECKM(task_spec != NULL, "task wait handler encounted a task with NULL spec"); + // For tasks already seen by the global scheduler (spillback > 1), + // adjust scheduled task counts for the source local scheduler. + if (task->execution_spec->SpillbackCount() > 1) { + auto it = state->local_schedulers.find(task->local_scheduler_id); + // Task's previous local scheduler must be present and known. + CHECK(it != state->local_schedulers.end()); + LocalScheduler &src_local_scheduler = it->second; + src_local_scheduler.num_recent_tasks_sent -= 1; + } + bool task_feasible = false; - /* Go through all the nodes, calculate the score for each, pick max score. */ + // Go through all the nodes, calculate the score for each, pick max score. double best_local_scheduler_score = INT32_MIN; CHECKM(best_local_scheduler_score < 0, "We might have a floating point underflow"); - DBClientID best_local_scheduler_id = - DBClientID::nil(); /* best node to send this task */ + std::string id_string_fromlocalsched = task->local_scheduler_id.hex(); + LOG_INFO("ct[%" PRId64 "] task from %s spillback %d", curtime, + id_string_fromlocalsched.c_str(), + task->execution_spec->SpillbackCount()); + + // The best node to send this task. + DBClientID best_local_scheduler_id = DBClientID::nil(); + for (auto it = state->local_schedulers.begin(); it != state->local_schedulers.end(); it++) { - /* For each local scheduler, calculate its score. Check hard constraints - * first. */ + // For each local scheduler, calculate its score. Check hard constraints + // first. LocalScheduler *scheduler = &(it->second); if (!constraints_satisfied_hard(scheduler, task_spec)) { continue; } + // Skip the local scheduler the task came from. + if (task->local_scheduler_id == scheduler->id) { + continue; + } + std::string id_string = scheduler->id.hex(); task_feasible = true; - /* This node satisfies the hard capacity constraint. Calculate its score. */ + // This node satisfies the hard capacity constraint. Calculate its score. double score = -1 * calculate_cost_pending(state, scheduler, task_spec); - if (score > best_local_scheduler_score) { + LOG_INFO("ct[%" PRId64 "][%s][q%d][w%d]: score %f bestscore %f\n", curtime, + id_string.c_str(), scheduler->info.task_queue_length, + scheduler->info.available_workers, score, + best_local_scheduler_score); + if (score >= best_local_scheduler_score) { best_local_scheduler_score = score; best_local_scheduler_id = scheduler->id; } @@ -152,17 +219,23 @@ bool handle_task_waiting(GlobalSchedulerState *state, LOG_ERROR( "Infeasible task. No nodes satisfy hard constraints for task = %s", id_string.c_str()); - /* TODO(atumanov): propagate this error to the task's driver and/or - * cache the task in case new local schedulers satisfy it in the future. */ + // TODO(atumanov): propagate this error to the task's driver and/or + // cache the task in case new local schedulers satisfy it in the future. return false; } CHECKM(!best_local_scheduler_id.is_nil(), "Task is feasible, but doesn't have a local scheduler assigned."); - /* A local scheduler ID was found, so assign the task. */ + // A local scheduler ID was found, so assign the task. assign_task_to_local_scheduler(state, task, best_local_scheduler_id); return true; } +bool handle_task_waiting(GlobalSchedulerState *state, + GlobalSchedulerPolicyState *policy_state, + Task *task) { + return handle_task_waiting_random(state, policy_state, task); +} + void handle_object_available(GlobalSchedulerState *state, GlobalSchedulerPolicyState *policy_state, ObjectID object_id) { diff --git a/src/global_scheduler/global_scheduler_algorithm.h b/src/global_scheduler/global_scheduler_algorithm.h index 48338694a..d6f674b72 100644 --- a/src/global_scheduler/global_scheduler_algorithm.h +++ b/src/global_scheduler/global_scheduler_algorithm.h @@ -1,6 +1,8 @@ #ifndef GLOBAL_SCHEDULER_ALGORITHM_H #define GLOBAL_SCHEDULER_ALGORITHM_H +#include + #include "common.h" #include "global_scheduler.h" #include "task.h" @@ -19,10 +21,31 @@ typedef enum { SCHED_ALGORITHM_MAX } global_scheduler_algorithm; -/** The state managed by the global scheduling policy. */ -struct GlobalSchedulerPolicyState { - /** The index of the next local scheduler to assign a task to. */ - int64_t round_robin_index; +/// The class encapsulating state managed by the global scheduling policy. +class GlobalSchedulerPolicyState { + public: + GlobalSchedulerPolicyState(int64_t round_robin_index) + : round_robin_index_(round_robin_index), gen_(rd_()) {} + + GlobalSchedulerPolicyState() : round_robin_index_(0), gen_(rd_()) {} + + /// Return the policy's random number generator. + /// + /// @return The policy's random number generator. + std::mt19937_64 &getRandomGenerator() { return gen_; } + + /// Return the round robin index maintained by policy state. + /// + /// @return The round robin index. + int64_t getRoundRobinIndex() const { return round_robin_index_; } + + private: + /// The index of the next local scheduler to assign a task to. + int64_t round_robin_index_; + /// Internally maintained random number engine device. + std::random_device rd_; + /// Internally maintained random number generator. + std::mt19937_64 gen_; }; /** diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 2d7840fbc..d707dff7d 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -1423,6 +1423,9 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) { LocalSchedulerState *state = (LocalSchedulerState *) context; SchedulingAlgorithmState *algorithm_state = state->algorithm_state; + // Spillback policy invocation is synchronized with the heartbeats. + spillback_tasks_handler(state); + /* Check that the last heartbeat was not sent too long ago. */ int64_t current_time = current_time_ms(); CHECK(current_time >= state->previous_heartbeat_time); diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index a0a8d431f..880c7cf5c 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -820,6 +820,26 @@ bool resources_available(LocalSchedulerState *state) { return resources_available; } +void spillback_tasks_handler(LocalSchedulerState *state) { + SchedulingAlgorithmState *algorithm_state = state->algorithm_state; + + int64_t num_to_spillback = std::min( + static_cast(algorithm_state->dispatch_task_queue->size()), + RayConfig::instance().max_tasks_to_spillback()); + + auto it = algorithm_state->dispatch_task_queue->end(); + for (int64_t i = 0; i < num_to_spillback; i++) { + it--; + } + + for (int64_t i = 0; i < num_to_spillback; i++) { + it->IncrementSpillbackCount(); + give_task_to_global_scheduler(state, algorithm_state, *it); + // Dequeue the task. + it = algorithm_state->dispatch_task_queue->erase(it); + } +} + /** * Assign as many tasks from the dispatch queue as possible. * @@ -1146,12 +1166,8 @@ void give_task_to_global_scheduler(LocalSchedulerState *state, } /* Pass on the task to the global scheduler. */ DCHECK(state->config.global_scheduler_exists); - /* Increment the task's spillback count before forwarding it to the global - * scheduler. - */ - execution_spec.IncrementSpillbackCount(); - Task *task = - Task_alloc(execution_spec, TASK_STATUS_WAITING, DBClientID::nil()); + Task *task = Task_alloc(execution_spec, TASK_STATUS_WAITING, + get_db_client_id(state->db)); #if !RAY_USE_NEW_GCS DCHECK(state->db != NULL); auto retryInfo = RetryInfo{ diff --git a/src/local_scheduler/local_scheduler_algorithm.h b/src/local_scheduler/local_scheduler_algorithm.h index d06af4097..0e3f2eb70 100644 --- a/src/local_scheduler/local_scheduler_algorithm.h +++ b/src/local_scheduler/local_scheduler_algorithm.h @@ -303,6 +303,13 @@ int reconstruct_object_timeout_handler(event_loop *loop, bool object_locally_available(SchedulingAlgorithmState *algorithm_state, ObjectID object_id); +/// Spill some tasks back to the global scheduler. This function implements the +/// spillback policy. +/// +/// @param state The scheduler state. +/// @return Void. +void spillback_tasks_handler(LocalSchedulerState *state); + /** * A helper function to print debug information about the current state and * number of workers. diff --git a/test/runtest.py b/test/runtest.py index fee13ea26..04d0ada38 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1702,7 +1702,7 @@ class SchedulingAlgorithm(unittest.TestCase): total_tasks, num_local_schedulers, minimum_count, - num_attempts=20): + num_attempts=100): attempts = 0 while attempts < num_attempts: locations = ray.get( @@ -1728,11 +1728,11 @@ class SchedulingAlgorithm(unittest.TestCase): @ray.remote def f(): - time.sleep(0.001) + time.sleep(0.01) return ray.worker.global_worker.plasma_client.store_socket_name - self.attempt_to_load_balance(f, [], 100, num_local_schedulers, 25) - self.attempt_to_load_balance(f, [], 1000, num_local_schedulers, 250) + self.attempt_to_load_balance(f, [], 100, num_local_schedulers, 10) + self.attempt_to_load_balance(f, [], 1000, num_local_schedulers, 100) def testLoadBalancingWithDependencies(self): # This test ensures that tasks are being assigned to all local