Implement simple random spillback policy. (#1493)

* spillback policy implementation: global + local scheduler

* modernize global scheduler policy state; factor out random number engine and generator

* Minimal version.

* Fix test.

* Make load balancing test less strenuous.
This commit is contained in:
Alexey Tumanov 2018-02-13 00:09:35 -08:00 committed by Robert Nishihara
parent f2b6a7b58d
commit 844a6afcdd
9 changed files with 162 additions and 32 deletions

View file

@ -302,7 +302,9 @@ class TestGlobalScheduler(unittest.TestCase):
num_retries -= 1
time.sleep(0.1)
self.assertEqual(num_tasks_done, num_tasks)
# Tasks can either be queued or in the global scheduler due to
# spillback.
self.assertEqual(num_tasks_done + num_tasks_waiting, num_tasks)
@unittest.skipIf(
os.environ.get('RAY_USE_NEW_GCS', False),

View file

@ -80,6 +80,8 @@ class RayConfig {
int64_t L3_cache_size_bytes() const { return L3_cache_size_bytes_; }
int64_t max_tasks_to_spillback() const { return max_tasks_to_spillback_; }
private:
RayConfig()
: ray_protocol_version_(0x0000000000000000),
@ -105,7 +107,8 @@ class RayConfig {
redis_db_connect_retries_(50),
redis_db_connect_wait_milliseconds_(100),
plasma_default_release_delay_(64),
L3_cache_size_bytes_(100000000) {}
L3_cache_size_bytes_(100000000),
max_tasks_to_spillback_(10) {}
~RayConfig() {}
@ -179,6 +182,9 @@ class RayConfig {
/// TODO(rkn): These constants are currently unused.
int64_t plasma_default_release_delay_;
int64_t L3_cache_size_bytes_;
/// Constants for the spillback scheduling policy.
int64_t max_tasks_to_spillback_;
};
#endif // RAY_CONFIG_H

View file

@ -31,7 +31,7 @@ typedef struct {
LocalSchedulerInfo info;
} LocalScheduler;
typedef struct GlobalSchedulerPolicyState GlobalSchedulerPolicyState;
typedef class GlobalSchedulerPolicyState GlobalSchedulerPolicyState;
/**
* This defines a hash table used to cache information about different objects.

View file

@ -7,7 +7,6 @@
GlobalSchedulerPolicyState *GlobalSchedulerPolicyState_init(void) {
GlobalSchedulerPolicyState *policy_state = new GlobalSchedulerPolicyState();
policy_state->round_robin_index = 0;
return policy_state;
}
@ -111,37 +110,105 @@ double calculate_cost_pending(const GlobalSchedulerState *state,
locally_available_data_size(state, scheduler->id, task_spec);
/* TODO(rkn): This logic does not load balance properly when the different
* machines have different sizes. Fix this. */
return scheduler->num_recent_tasks_sent + scheduler->info.task_queue_length;
double cost_pending = scheduler->num_recent_tasks_sent +
scheduler->info.task_queue_length -
scheduler->info.available_workers;
return cost_pending;
}
bool handle_task_waiting(GlobalSchedulerState *state,
GlobalSchedulerPolicyState *policy_state,
Task *task) {
bool handle_task_waiting_random(GlobalSchedulerState *state,
GlobalSchedulerPolicyState *policy_state,
Task *task) {
TaskSpec *task_spec = Task_task_execution_spec(task)->Spec();
CHECKM(task_spec != NULL,
"task wait handler encounted a task with NULL spec");
std::vector<DBClientID> feasible_nodes;
for (const auto &it : state->local_schedulers) {
// Local scheduler map iterator yields <DBClientID, LocalScheduler> pairs.
const LocalScheduler &local_scheduler = it.second;
if (!constraints_satisfied_hard(&local_scheduler, task_spec)) {
continue;
}
// Add this local scheduler as a candidate for random selection.
feasible_nodes.push_back(it.first);
}
if (feasible_nodes.size() == 0) {
std::string id_string = Task_task_id(task).hex();
LOG_ERROR(
"Infeasible task. No nodes satisfy hard constraints for task = %s",
id_string.c_str());
return false;
}
// Randomly select the local scheduler. TODO(atumanov): replace with
// std::discrete_distribution<int>.
std::uniform_int_distribution<> dis(0, feasible_nodes.size() - 1);
DBClientID local_scheduler_id =
feasible_nodes[dis(policy_state->getRandomGenerator())];
CHECKM(!local_scheduler_id.is_nil(),
"Task is feasible, but doesn't have a local scheduler assigned.");
// A local scheduler ID was found, so assign the task.
assign_task_to_local_scheduler(state, task, local_scheduler_id);
return true;
}
bool handle_task_waiting_cost(GlobalSchedulerState *state,
GlobalSchedulerPolicyState *policy_state,
Task *task) {
TaskSpec *task_spec = Task_task_execution_spec(task)->Spec();
int64_t curtime = current_time_ms();
CHECKM(task_spec != NULL,
"task wait handler encounted a task with NULL spec");
// For tasks already seen by the global scheduler (spillback > 1),
// adjust scheduled task counts for the source local scheduler.
if (task->execution_spec->SpillbackCount() > 1) {
auto it = state->local_schedulers.find(task->local_scheduler_id);
// Task's previous local scheduler must be present and known.
CHECK(it != state->local_schedulers.end());
LocalScheduler &src_local_scheduler = it->second;
src_local_scheduler.num_recent_tasks_sent -= 1;
}
bool task_feasible = false;
/* Go through all the nodes, calculate the score for each, pick max score. */
// Go through all the nodes, calculate the score for each, pick max score.
double best_local_scheduler_score = INT32_MIN;
CHECKM(best_local_scheduler_score < 0,
"We might have a floating point underflow");
DBClientID best_local_scheduler_id =
DBClientID::nil(); /* best node to send this task */
std::string id_string_fromlocalsched = task->local_scheduler_id.hex();
LOG_INFO("ct[%" PRId64 "] task from %s spillback %d", curtime,
id_string_fromlocalsched.c_str(),
task->execution_spec->SpillbackCount());
// The best node to send this task.
DBClientID best_local_scheduler_id = DBClientID::nil();
for (auto it = state->local_schedulers.begin();
it != state->local_schedulers.end(); it++) {
/* For each local scheduler, calculate its score. Check hard constraints
* first. */
// For each local scheduler, calculate its score. Check hard constraints
// first.
LocalScheduler *scheduler = &(it->second);
if (!constraints_satisfied_hard(scheduler, task_spec)) {
continue;
}
// Skip the local scheduler the task came from.
if (task->local_scheduler_id == scheduler->id) {
continue;
}
std::string id_string = scheduler->id.hex();
task_feasible = true;
/* This node satisfies the hard capacity constraint. Calculate its score. */
// This node satisfies the hard capacity constraint. Calculate its score.
double score = -1 * calculate_cost_pending(state, scheduler, task_spec);
if (score > best_local_scheduler_score) {
LOG_INFO("ct[%" PRId64 "][%s][q%d][w%d]: score %f bestscore %f\n", curtime,
id_string.c_str(), scheduler->info.task_queue_length,
scheduler->info.available_workers, score,
best_local_scheduler_score);
if (score >= best_local_scheduler_score) {
best_local_scheduler_score = score;
best_local_scheduler_id = scheduler->id;
}
@ -152,17 +219,23 @@ bool handle_task_waiting(GlobalSchedulerState *state,
LOG_ERROR(
"Infeasible task. No nodes satisfy hard constraints for task = %s",
id_string.c_str());
/* TODO(atumanov): propagate this error to the task's driver and/or
* cache the task in case new local schedulers satisfy it in the future. */
// TODO(atumanov): propagate this error to the task's driver and/or
// cache the task in case new local schedulers satisfy it in the future.
return false;
}
CHECKM(!best_local_scheduler_id.is_nil(),
"Task is feasible, but doesn't have a local scheduler assigned.");
/* A local scheduler ID was found, so assign the task. */
// A local scheduler ID was found, so assign the task.
assign_task_to_local_scheduler(state, task, best_local_scheduler_id);
return true;
}
bool handle_task_waiting(GlobalSchedulerState *state,
GlobalSchedulerPolicyState *policy_state,
Task *task) {
return handle_task_waiting_random(state, policy_state, task);
}
void handle_object_available(GlobalSchedulerState *state,
GlobalSchedulerPolicyState *policy_state,
ObjectID object_id) {

View file

@ -1,6 +1,8 @@
#ifndef GLOBAL_SCHEDULER_ALGORITHM_H
#define GLOBAL_SCHEDULER_ALGORITHM_H
#include <random>
#include "common.h"
#include "global_scheduler.h"
#include "task.h"
@ -19,10 +21,31 @@ typedef enum {
SCHED_ALGORITHM_MAX
} global_scheduler_algorithm;
/** The state managed by the global scheduling policy. */
struct GlobalSchedulerPolicyState {
/** The index of the next local scheduler to assign a task to. */
int64_t round_robin_index;
/// The class encapsulating state managed by the global scheduling policy.
class GlobalSchedulerPolicyState {
public:
GlobalSchedulerPolicyState(int64_t round_robin_index)
: round_robin_index_(round_robin_index), gen_(rd_()) {}
GlobalSchedulerPolicyState() : round_robin_index_(0), gen_(rd_()) {}
/// Return the policy's random number generator.
///
/// @return The policy's random number generator.
std::mt19937_64 &getRandomGenerator() { return gen_; }
/// Return the round robin index maintained by policy state.
///
/// @return The round robin index.
int64_t getRoundRobinIndex() const { return round_robin_index_; }
private:
/// The index of the next local scheduler to assign a task to.
int64_t round_robin_index_;
/// Internally maintained random number engine device.
std::random_device rd_;
/// Internally maintained random number generator.
std::mt19937_64 gen_;
};
/**

View file

@ -1423,6 +1423,9 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) {
LocalSchedulerState *state = (LocalSchedulerState *) context;
SchedulingAlgorithmState *algorithm_state = state->algorithm_state;
// Spillback policy invocation is synchronized with the heartbeats.
spillback_tasks_handler(state);
/* Check that the last heartbeat was not sent too long ago. */
int64_t current_time = current_time_ms();
CHECK(current_time >= state->previous_heartbeat_time);

View file

@ -820,6 +820,26 @@ bool resources_available(LocalSchedulerState *state) {
return resources_available;
}
void spillback_tasks_handler(LocalSchedulerState *state) {
SchedulingAlgorithmState *algorithm_state = state->algorithm_state;
int64_t num_to_spillback = std::min(
static_cast<int64_t>(algorithm_state->dispatch_task_queue->size()),
RayConfig::instance().max_tasks_to_spillback());
auto it = algorithm_state->dispatch_task_queue->end();
for (int64_t i = 0; i < num_to_spillback; i++) {
it--;
}
for (int64_t i = 0; i < num_to_spillback; i++) {
it->IncrementSpillbackCount();
give_task_to_global_scheduler(state, algorithm_state, *it);
// Dequeue the task.
it = algorithm_state->dispatch_task_queue->erase(it);
}
}
/**
* Assign as many tasks from the dispatch queue as possible.
*
@ -1146,12 +1166,8 @@ void give_task_to_global_scheduler(LocalSchedulerState *state,
}
/* Pass on the task to the global scheduler. */
DCHECK(state->config.global_scheduler_exists);
/* Increment the task's spillback count before forwarding it to the global
* scheduler.
*/
execution_spec.IncrementSpillbackCount();
Task *task =
Task_alloc(execution_spec, TASK_STATUS_WAITING, DBClientID::nil());
Task *task = Task_alloc(execution_spec, TASK_STATUS_WAITING,
get_db_client_id(state->db));
#if !RAY_USE_NEW_GCS
DCHECK(state->db != NULL);
auto retryInfo = RetryInfo{

View file

@ -303,6 +303,13 @@ int reconstruct_object_timeout_handler(event_loop *loop,
bool object_locally_available(SchedulingAlgorithmState *algorithm_state,
ObjectID object_id);
/// Spill some tasks back to the global scheduler. This function implements the
/// spillback policy.
///
/// @param state The scheduler state.
/// @return Void.
void spillback_tasks_handler(LocalSchedulerState *state);
/**
* A helper function to print debug information about the current state and
* number of workers.

View file

@ -1702,7 +1702,7 @@ class SchedulingAlgorithm(unittest.TestCase):
total_tasks,
num_local_schedulers,
minimum_count,
num_attempts=20):
num_attempts=100):
attempts = 0
while attempts < num_attempts:
locations = ray.get(
@ -1728,11 +1728,11 @@ class SchedulingAlgorithm(unittest.TestCase):
@ray.remote
def f():
time.sleep(0.001)
time.sleep(0.01)
return ray.worker.global_worker.plasma_client.store_socket_name
self.attempt_to_load_balance(f, [], 100, num_local_schedulers, 25)
self.attempt_to_load_balance(f, [], 1000, num_local_schedulers, 250)
self.attempt_to_load_balance(f, [], 100, num_local_schedulers, 10)
self.attempt_to_load_balance(f, [], 1000, num_local_schedulers, 100)
def testLoadBalancingWithDependencies(self):
# This test ensures that tasks are being assigned to all local