Do resource bookkeeping for actor methods. (#682)

* Dispatch regular and actor tasks when resources become available.

* Make actor methods do resource bookkeeping and add test.

* Remove unnecessary field.

* Fix linting.

* Fix actor test.

* Maintain set of actors with pending tasks to speed up task dispatch.

* Exit early from task dispatch if there are no resources available.

* Fix linting.

* Fix error.

* Fix bug related to iterator invalidation.

* When an actor is removed, remove it from the set of actors with pending tasks.
This commit is contained in:
Robert Nishihara 2017-06-20 22:52:45 -07:00 committed by Philipp Moritz
parent ed9380d73d
commit 5ebc2f3f2e
5 changed files with 258 additions and 75 deletions

View file

@ -474,12 +474,14 @@ void assign_task_to_worker(LocalSchedulerState *state,
TaskSpec *spec,
int64_t task_spec_size,
LocalSchedulerClient *worker) {
/* Acquire the necessary resources for running this task. TODO(rkn): We are
* currently ignoring resource bookkeeping for actor methods. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
acquire_resources(state, worker,
TaskSpec_get_required_resource(spec, ResourceIndex_CPU),
TaskSpec_get_required_resource(spec, ResourceIndex_GPU));
/* Acquire the necessary resources for running this task. */
acquire_resources(state, worker,
TaskSpec_get_required_resource(spec, ResourceIndex_CPU),
TaskSpec_get_required_resource(spec, ResourceIndex_GPU));
/* Check that actor tasks don't have GPU requirements. Any necessary GPUs
* should already have been acquired by the actor worker. */
if (!ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
CHECK(TaskSpec_get_required_resource(spec, ResourceIndex_GPU) == 0);
}
CHECK(ActorID_equal(worker->actor_id, TaskSpec_actor_id(spec)));
@ -525,15 +527,17 @@ void assign_task_to_worker(LocalSchedulerState *state,
void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
if (worker->task_in_progress != NULL) {
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
/* Return dynamic resources back for the task in progress. TODO(rkn): We
* are currently ignoring resource bookkeeping for actor methods. */
/* Return dynamic resources back for the task in progress. */
CHECK(worker->cpus_in_use ==
TaskSpec_get_required_resource(spec, ResourceIndex_CPU));
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
CHECK(worker->cpus_in_use ==
TaskSpec_get_required_resource(spec, ResourceIndex_CPU));
CHECK(worker->gpus_in_use.size() ==
TaskSpec_get_required_resource(spec, ResourceIndex_GPU));
release_resources(state, worker, worker->cpus_in_use,
worker->gpus_in_use.size());
} else {
CHECK(0 == TaskSpec_get_required_resource(spec, ResourceIndex_GPU));
release_resources(state, worker, worker->cpus_in_use, 0);
}
/* If we're connected to Redis, update tables. */
if (state->db != NULL) {
@ -931,20 +935,21 @@ void process_message(event_loop *loop,
case MessageType_ReconstructObject: {
auto message = flatbuffers::GetRoot<ReconstructObject>(input);
if (worker->task_in_progress != NULL && !worker->is_blocked) {
/* TODO(swang): For now, we don't handle blocked actors. */
/* If the worker was executing a task (i.e. non-driver) and it wasn't
* already blocked on an object that's not locally available, update its
* state to blocked. */
worker->is_blocked = true;
/* Return the CPU resources that the blocked worker was using, but not
* GPU resources. */
release_resources(state, worker, worker->cpus_in_use, 0);
/* Let the scheduling algorithm process the fact that the worker is
* blocked. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
/* If the worker was executing a task (i.e. non-driver) and it wasn't
* already blocked on an object that's not locally available, update its
* state to blocked. */
worker->is_blocked = true;
/* Return the CPU resources that the blocked worker was using, but not
* GPU resources. */
release_resources(state, worker, worker->cpus_in_use, 0);
/* Let the scheduling algorithm process the fact that the worker is
* blocked. */
handle_worker_blocked(state, state->algorithm_state, worker);
print_worker_info("Reconstructing", state->algorithm_state);
} else {
handle_actor_worker_blocked(state, state->algorithm_state, worker);
}
print_worker_info("Reconstructing", state->algorithm_state);
}
reconstruct_object(state, from_flatbuf(message->object_id()));
} break;
@ -955,25 +960,26 @@ void process_message(event_loop *loop,
case MessageType_NotifyUnblocked: {
/* TODO(rkn): A driver may call this as well, right? */
if (worker->task_in_progress != NULL) {
/* TODO(swang): For now, we don't handle blocked actors. */
/* If the worker was executing a task (i.e. non-driver), update its
* state to not blocked. */
CHECK(worker->is_blocked);
worker->is_blocked = false;
/* Lease back the CPU resources that the blocked worker needs (note that
* it never released its GPU resources). TODO(swang): Leasing back the
* resources to blocked workers can cause us to transiently exceed the
* maximum number of resources. This could be fixed by having blocked
* workers explicitly yield and wait to be given back resources before
* continuing execution. */
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
acquire_resources(state, worker,
TaskSpec_get_required_resource(spec, ResourceIndex_CPU),
0);
/* Let the scheduling algorithm process the fact that the worker is
* unblocked. */
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
/* If the worker was executing a task (i.e. non-driver), update its
* state to not blocked. */
CHECK(worker->is_blocked);
worker->is_blocked = false;
/* Lease back the CPU resources that the blocked worker needs (note that
* it never released its GPU resources). TODO(swang): Leasing back the
* resources to blocked workers can cause us to transiently exceed the
* maximum number of resources. This could be fixed by having blocked
* workers explicitly yield and wait to be given back resources before
* continuing execution. */
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
acquire_resources(
state, worker,
TaskSpec_get_required_resource(spec, ResourceIndex_CPU), 0);
/* Let the scheduling algorithm process the fact that the worker is
* unblocked. */
handle_worker_unblocked(state, state->algorithm_state, worker);
} else {
handle_actor_worker_unblocked(state, state->algorithm_state, worker);
}
}
print_worker_info("Worker unblocked", state->algorithm_state);
@ -992,7 +998,7 @@ void process_message(event_loop *loop,
int64_t end_time = current_time_ms();
int64_t max_time_for_handler = 1000;
if (end_time - start_time > max_time_for_handler) {
LOG_WARN("process_message of type % " PRId64 " took %" PRId64
LOG_WARN("process_message of type %" PRId64 " took %" PRId64
" milliseconds.",
type, end_time - start_time);
}

View file

@ -38,8 +38,6 @@ struct ObjectEntry {
/** This struct contains information about a specific actor. This struct will be
* used inside of a hash table. */
typedef struct {
/** The ID of the actor. This is used as a key in the hash table. */
ActorID actor_id;
/** The number of tasks that have been executed on this actor so far. This is
* used to guarantee the in-order execution of tasks on actors (in the order
* that the tasks were submitted). This is currently meaningful because we
@ -53,8 +51,6 @@ typedef struct {
LocalSchedulerClient *worker;
/** True if the worker is available and false otherwise. */
bool worker_available;
/** Handle for the uthash table. */
UT_hash_handle hh;
} LocalActorInfo;
/** Part of the local scheduler state that is maintained by the scheduling
@ -69,6 +65,11 @@ struct SchedulingAlgorithmState {
* particular, a queue of tasks that are waiting to execute on that actor.
* This is only used for actors that exist locally. */
std::unordered_map<ActorID, LocalActorInfo, UniqueIDHasher> local_actor_infos;
/** This is a set of the IDs of the actors that have tasks waiting to run.
* The purpose is to make it easier to dispatch tasks without looping over
* all of the actors. Note that this is an optimization and is not strictly
* necessary. */
std::unordered_set<ActorID, UniqueIDHasher> actors_with_pending_tasks;
/** A vector of actor tasks that have been submitted but this local scheduler
* doesn't know which local scheduler is responsible for them, so cannot
* assign them to the correct local scheduler yet. Whenever a notification
@ -223,7 +224,6 @@ void create_actor(SchedulingAlgorithmState *algorithm_state,
ActorID actor_id,
LocalSchedulerClient *worker) {
LocalActorInfo entry;
entry.actor_id = actor_id;
entry.task_counter = 0;
entry.task_queue = new std::list<TaskQueueEntry>();
entry.worker = worker;
@ -261,6 +261,9 @@ void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) {
delete entry.task_queue;
/* Remove the entry from the hash table. */
algorithm_state->local_actor_infos.erase(actor_id);
/* Remove the actor ID from the set of actors with pending tasks. */
algorithm_state->actors_with_pending_tasks.erase(actor_id);
}
/**
@ -276,6 +279,11 @@ bool dispatch_actor_task(LocalSchedulerState *state,
ActorID actor_id) {
/* Make sure this worker actually is an actor. */
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
/* Return if this actor doesn't have any pending tasks. */
if (algorithm_state->actors_with_pending_tasks.find(actor_id) ==
algorithm_state->actors_with_pending_tasks.end()) {
return false;
}
/* Make sure this actor belongs to this local scheduler. */
if (state->actor_mapping.count(actor_id) != 1) {
/* The creation notification for this actor has not yet arrived at the local
@ -290,11 +298,9 @@ bool dispatch_actor_task(LocalSchedulerState *state,
LocalActorInfo &entry =
algorithm_state->local_actor_infos.find(actor_id)->second;
if (entry.task_queue->empty()) {
/* There are no queued tasks for this actor, so we cannot dispatch a task to
* the actor. */
return false;
}
/* There should be some queued tasks for this actor. */
CHECK(!entry.task_queue->empty());
TaskQueueEntry first_task = entry.task_queue->front();
int64_t next_task_counter = TaskSpec_actor_counter(first_task.spec);
if (next_task_counter != entry.task_counter) {
@ -307,6 +313,14 @@ bool dispatch_actor_task(LocalSchedulerState *state,
if (!entry.worker_available) {
return false;
}
/* If there are not enough resources available, we cannot assign the task. */
CHECK(0 ==
TaskSpec_get_required_resource(first_task.spec, ResourceIndex_GPU));
if (!check_dynamic_resources(state, TaskSpec_get_required_resource(
first_task.spec, ResourceIndex_CPU),
0)) {
return false;
}
/* Assign the first task in the task queue to the worker and mark the worker
* as unavailable. */
entry.task_counter += 1;
@ -317,6 +331,13 @@ bool dispatch_actor_task(LocalSchedulerState *state,
TaskQueueEntry_free(&first_task);
/* Remove the task from the actor's task queue. */
entry.task_queue->pop_front();
/* If there are no more tasks in the queue, then indicate that the actor has
* no tasks. */
if (entry.task_queue->empty()) {
algorithm_state->actors_with_pending_tasks.erase(actor_id);
}
return true;
}
@ -418,6 +439,9 @@ void add_task_to_actor_queue(LocalSchedulerState *state,
task_table_add_task(state->db, task, NULL, NULL, NULL);
}
}
/* Record the fact that this actor has a task waiting to execute. */
algorithm_state->actors_with_pending_tasks.insert(actor_id);
}
/**
@ -555,6 +579,23 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
return LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS;
}
/**
* Return true if there are still some resources available and false otherwise.
*
* @param state The scheduler state.
* @return True if there are still some resources and false if there are not.
*/
bool resources_available(LocalSchedulerState *state) {
bool resources_available = false;
for (int i = 0; i < ResourceIndex_MAX; i++) {
if (state->dynamic_resources[i] > 0) {
/* There are still resources left. */
resources_available = true;
}
}
return resources_available;
}
/**
* Assign as many tasks from the dispatch queue as possible.
*
@ -579,19 +620,12 @@ void dispatch_tasks(LocalSchedulerState *state,
}
return;
}
/* Terminate early if there are no more resources available. */
bool resources_available = false;
for (int i = 0; i < ResourceIndex_MAX; i++) {
if (state->dynamic_resources[i] > 0) {
/* There are still resources left, continue checking tasks. */
resources_available = true;
break;
}
}
if (!resources_available) {
/* No resources available -- terminate early. */
if (!resources_available(state)) {
return;
}
/* Skip to the next task if this task cannot currently be satisfied. */
if (!check_dynamic_resources(
state, TaskSpec_get_required_resource(task.spec, ResourceIndex_CPU),
@ -619,6 +653,34 @@ void dispatch_tasks(LocalSchedulerState *state,
} /* End for each task in the dispatch queue. */
}
/**
* Attempt to dispatch both regular tasks and actor tasks.
*
* @param state The scheduler state.
* @param algorithm_state The scheduling algorithm state.
* @return Void.
*/
void dispatch_all_tasks(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state) {
/* First attempt to dispatch regular tasks. */
dispatch_tasks(state, algorithm_state);
/* Attempt to dispatch actor tasks. */
auto it = algorithm_state->actors_with_pending_tasks.begin();
while (it != algorithm_state->actors_with_pending_tasks.end()) {
/* Terminate early if there are no more resources available. */
if (!resources_available(state)) {
break;
}
/* We increment the iterator ahead of time because the call to
* dispatch_actor_task may invalidate the current iterator. */
ActorID actor_id = *it;
it++;
/* Dispatch tasks for the current actor. */
dispatch_actor_task(state, algorithm_state, actor_id);
}
}
/**
* A helper function to allocate a queue entry for a task specification and
* push it onto a generic queue.
@ -951,9 +1013,8 @@ void handle_worker_available(LocalSchedulerState *state,
/* Add worker to the list of available workers. */
algorithm_state->available_workers.push_back(worker);
/* Try to dispatch tasks, since we now have available workers to assign them
* to. */
dispatch_tasks(state, algorithm_state);
/* Try to dispatch tasks. */
dispatch_all_tasks(state, algorithm_state);
}
void handle_worker_removed(LocalSchedulerState *state,
@ -1003,8 +1064,8 @@ void handle_actor_worker_available(LocalSchedulerState *state,
CHECK(worker == entry.worker);
CHECK(!entry.worker_available);
entry.worker_available = true;
/* Assign a task to this actor if possible. */
dispatch_actor_task(state, algorithm_state, actor_id);
/* Assign new tasks if possible. */
dispatch_all_tasks(state, algorithm_state);
}
void handle_worker_blocked(LocalSchedulerState *state,
@ -1020,7 +1081,16 @@ void handle_worker_blocked(LocalSchedulerState *state,
algorithm_state->blocked_workers.push_back(worker);
/* Try to dispatch tasks, since we may have freed up some resources. */
dispatch_tasks(state, algorithm_state);
dispatch_all_tasks(state, algorithm_state);
}
void handle_actor_worker_blocked(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker) {
/* The actor case doesn't use equivalents of the blocked_workers and
* executing_workers lists. Are these necessary? */
/* Try to dispatch tasks, since we may have freed up some resources. */
dispatch_all_tasks(state, algorithm_state);
}
void handle_worker_unblocked(LocalSchedulerState *state,
@ -1036,6 +1106,10 @@ void handle_worker_unblocked(LocalSchedulerState *state,
algorithm_state->executing_workers.push_back(worker);
}
void handle_actor_worker_unblocked(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker) {}
void handle_object_available(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
ObjectID object_id) {
@ -1071,7 +1145,7 @@ void handle_object_available(LocalSchedulerState *state,
}
/* Try to dispatch tasks, since we may have added some from the waiting
* queue. */
dispatch_tasks(state, algorithm_state);
dispatch_all_tasks(state, algorithm_state);
/* Clean up the records for dependent tasks. */
entry.dependent_tasks.clear();
}

View file

@ -217,6 +217,19 @@ void handle_worker_blocked(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker);
/**
* This function is called when an actor that was executing a task becomes
* blocked on an object that isn't available locally yet.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param worker The worker that is blocked.
* @return Void.
*/
void handle_actor_worker_blocked(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker);
/**
* This function is called when a worker that was blocked on an object that
* wasn't available locally yet becomes unblocked.
@ -230,6 +243,19 @@ void handle_worker_unblocked(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker);
/**
* This function is called when an actor that was blocked on an object that
* wasn't available locally yet becomes unblocked.
*
* @param state The state of the local scheduler.
* @param algorithm_state State maintained by the scheduling algorithm.
* @param worker The worker that is now unblocked.
* @return Void.
*/
void handle_actor_worker_unblocked(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
LocalSchedulerClient *worker);
/**
* Process the fact that a driver has been removed. This will remove all of the
* tasks for that driver from the scheduling algorithm's internal data

View file

@ -1602,7 +1602,7 @@ void process_message(event_loop *loop,
int64_t end_time = current_time_ms();
int64_t max_time_for_handler = 1000;
if (end_time - start_time > max_time_for_handler) {
LOG_WARN("process_message of type % " PRId64 " took %" PRId64
LOG_WARN("process_message of type %" PRId64 " took %" PRId64
" milliseconds.",
type, end_time - start_time);
}

View file

@ -570,18 +570,18 @@ class ActorSchedulingProperties(unittest.TestCase):
def __init__(self):
pass
Actor.remote()
def get_id(self):
return ray.worker.global_worker.worker_id
a = Actor.remote()
actor_id = ray.get(a.get_id.remote())
@ray.remote
def f():
return 1
return ray.worker.global_worker.worker_id
# Make sure that f cannot be scheduled on the worker created for the actor.
# The wait call should time out.
ready_ids, remaining_ids = ray.wait([f.remote() for _ in range(10)],
timeout=3000)
self.assertEqual(ready_ids, [])
self.assertEqual(len(remaining_ids), 10)
resulting_ids = ray.get([f.remote() for _ in range(100)])
self.assertNotIn(actor_id, resulting_ids)
ray.worker.cleanup()
@ -993,6 +993,83 @@ class ActorsWithGPUs(unittest.TestCase):
gpu_ids = ray.get(results)
self.assertEqual(set(gpu_ids), set(range(10)))
ray.worker.cleanup()
def testActorsAndTaskResourceBookkeeping(self):
ray.init(num_cpus=1)
@ray.remote
class Foo(object):
def __init__(self):
start = time.time()
time.sleep(0.1)
end = time.time()
self.interval = (start, end)
def get_interval(self):
return self.interval
def sleep(self):
start = time.time()
time.sleep(0.01)
end = time.time()
return start, end
# First make sure that we do not have more actor methods running at a time
# than we have CPUs.
actors = [Foo.remote() for _ in range(4)]
interval_ids = []
interval_ids += [actor.get_interval.remote() for actor in actors]
for _ in range(4):
interval_ids += [actor.sleep.remote() for actor in actors]
# Make sure that the intervals don't overlap.
intervals = ray.get(interval_ids)
intervals.sort(key=lambda x: x[0])
for interval1, interval2 in zip(intervals[:-1], intervals[1:]):
self.assertLess(interval1[0], interval1[1])
self.assertLess(interval1[1], interval2[0])
self.assertLess(interval2[0], interval2[1])
ray.worker.cleanup()
def testBlockingActorTask(self):
ray.init(num_cpus=1, num_gpus=1)
@ray.remote(num_gpus=1)
def f():
return 1
@ray.remote
class Foo(object):
def __init__(self):
pass
def blocking_method(self):
ray.get(f.remote())
# Make sure we can execute a blocking actor method even if there is only
# one CPU.
actor = Foo.remote()
ray.get(actor.blocking_method.remote())
@ray.remote(num_gpus=1)
class GPUFoo(object):
def __init__(self):
pass
def blocking_method(self):
ray.get(f.remote())
# Make sure that we GPU resources are not released when actors block.
actor = GPUFoo.remote()
x_id = actor.blocking_method.remote()
ready_ids, remaining_ids = ray.wait([x_id], timeout=500)
self.assertEqual(ready_ids, [])
self.assertEqual(remaining_ids, [x_id])
ray.worker.cleanup()
if __name__ == "__main__":
unittest.main(verbosity=2)