From a0dd3a44c0c5e1cb4f3aad671bf5727e25eb0df6 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 17 Feb 2017 17:08:52 -0800 Subject: [PATCH] Dynamically grow worker pool to partially solve hanging workloads (#286) * First pass at a policy to solve deadlock * Address Robert's comments * stress test * unit test * Fix test cases * Fix test for python3 * add more logging * White space. --- python/ray/worker.py | 6 ++ src/photon/photon.h | 8 +- src/photon/photon_algorithm.c | 160 ++++++++++++++++++++++++++++++++-- src/photon/photon_algorithm.h | 39 ++++++++- src/photon/photon_client.c | 4 + src/photon/photon_client.h | 8 ++ src/photon/photon_extension.c | 7 ++ src/photon/photon_scheduler.c | 77 ++++++++++++---- src/photon/photon_scheduler.h | 49 ++++++++--- test/runtest.py | 51 ++++++++++- test/stress_tests.py | 22 +++++ 11 files changed, 393 insertions(+), 38 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index e665f04c0..575965fb9 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -471,6 +471,7 @@ class Worker(object): # their original index in the object_ids argument. unready_ids = dict((object_id, i) for (i, (object_id, val)) in enumerate(final_results) if val is None) + was_blocked = (len(unready_ids) > 0) # Try reconstructing any objects we haven't gotten yet. Try to get them # until GET_TIMEOUT_MILLISECONDS milliseconds passes, then repeat. while len(unready_ids) > 0: @@ -487,6 +488,11 @@ class Worker(object): final_results[index] = (object_id, val) unready_ids.pop(object_id) + # If there were objects that we weren't able to get locally, let the local + # scheduler know that we're now unblocked. + if was_blocked: + self.photon_client.notify_unblocked() + # Unwrap the object from the list (it was wrapped put_object). assert len(final_results) == len(object_ids) for i in range(len(final_results)): diff --git a/src/photon/photon.h b/src/photon/photon.h index b2597538f..247e3bcc8 100644 --- a/src/photon/photon.h +++ b/src/photon/photon.h @@ -29,7 +29,10 @@ enum photon_message_type { EVENT_LOG_MESSAGE, /** Send an initial connection message to the local scheduler. * This contains the worker's process ID and actor ID. */ - REGISTER_WORKER_INFO + REGISTER_WORKER_INFO, + /** For a worker that was blocked on some object(s), tell the local scheduler + * that the worker is now unblocked. */ + NOTIFY_UNBLOCKED, }; /* These are needed to define the UT_arrays. */ @@ -112,6 +115,9 @@ typedef struct { * no task is running on the worker, this will be NULL. This is used to * update the task table. */ task *task_in_progress; + /** A flag to indicate whether this worker is currently blocking on an + * object(s) that isn't available locally yet. */ + bool is_blocked; /** The process ID of the client. If this is set to zero, the client has not * yet registered a process ID. */ pid_t pid; diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 1d7e006d7..400af2393 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -83,9 +83,19 @@ struct scheduling_algorithm_state { * about a new local scheduler arrives, we will resubmit all of these tasks * locally. */ UT_array *cached_submitted_actor_tasks; - /** An array of worker indices corresponding to clients that are - * waiting for tasks. */ + /** An array of pointers to workers in the worker pool. These are workers + * that have registered a PID with us and that are now waiting to be + * assigned a task to execute. */ UT_array *available_workers; + /** An array of pointers to workers that are currently executing a task, + * unblocked. These are the workers that are leasing some number of + * resources. */ + UT_array *executing_workers; + /** An array of pointers to workers that are currently executing a task, + * blocked on some object(s) that isn't available locally yet. These are the + * workers that are executing a task, but that have temporarily returned the + * task's required resources. */ + UT_array *blocked_workers; /** A hash map of the objects that are available in the local Plasma store. * The key is the object ID. This information could be a little stale. */ object_entry *local_objects; @@ -107,9 +117,13 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void) { /* Initialize the local data structures used for queuing tasks and workers. */ algorithm_state->waiting_task_queue = NULL; algorithm_state->dispatch_task_queue = NULL; - utarray_new(algorithm_state->available_workers, &worker_icd); + utarray_new(algorithm_state->cached_submitted_actor_tasks, &task_spec_icd); algorithm_state->local_actor_infos = NULL; + + utarray_new(algorithm_state->available_workers, &worker_icd); + utarray_new(algorithm_state->executing_workers, &worker_icd); + utarray_new(algorithm_state->blocked_workers, &worker_icd); return algorithm_state; } @@ -146,6 +160,8 @@ void free_scheduling_algorithm_state( utarray_free(algorithm_state->cached_submitted_actor_tasks); /* Free the list of available workers. */ utarray_free(algorithm_state->available_workers); + utarray_free(algorithm_state->executing_workers); + utarray_free(algorithm_state->blocked_workers); /* Free the cached information about which objects are present locally. */ object_entry *obj_entry, *tmp_obj_entry; HASH_ITER(hh, algorithm_state->local_objects, obj_entry, tmp_obj_entry) { @@ -556,10 +572,6 @@ void dispatch_tasks(local_scheduler_state *state, /* Assign as many tasks as we can, while there are workers available. */ DL_FOREACH_SAFE(algorithm_state->dispatch_task_queue, elt, tmp) { - if (utarray_len(algorithm_state->available_workers) <= 0) { - /* There are no more available workers, so we're done. */ - break; - } /* TODO(atumanov): as an optimization, we can also check if all dynamic * capacity is zero and bail early. */ bool task_satisfied = true; @@ -574,6 +586,19 @@ void dispatch_tasks(local_scheduler_state *state, if (!task_satisfied) { continue; /* Proceed to the next task. */ } + + /* If there is a task to assign, but there are no more available workers in + * the worker pool, then exit. Ensure that there will be an available + * worker during a future invocation of dispatch_tasks. */ + if (utarray_len(algorithm_state->available_workers) == 0) { + if (utarray_len(state->child_pids) == 0) { + /* If there are no workers, including those pending PID registration, + * then we must start a new one to replenish the worker pool. */ + start_worker(state, NIL_ACTOR_ID); + } + break; + } + /* Dispatch this task to an available worker and dequeue the task. */ LOG_DEBUG("Dispatching task"); /* Get the last available worker in the available worker queue. */ @@ -581,10 +606,12 @@ void dispatch_tasks(local_scheduler_state *state, algorithm_state->available_workers); /* Tell the available worker to execute the task. */ assign_task_to_worker(state, elt->spec, *worker); - /* Remove the available worker from the queue and free the struct. */ + /* Remove the worker from the available queue, and add it to the executing + * workers. */ utarray_pop_back(algorithm_state->available_workers); + utarray_push_back(algorithm_state->executing_workers, worker); + /* Dequeue the task and free the struct. */ print_resource_info(state, elt->spec); - /* Deque the task. */ DL_DELETE(algorithm_state->dispatch_task_queue, elt); free_task_spec(elt->spec); free(elt); @@ -923,12 +950,37 @@ void handle_worker_available(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, local_scheduler_client *worker) { CHECK(worker->task_in_progress == NULL); + /* Check that the worker isn't in the pool of available workers. */ for (local_scheduler_client **p = (local_scheduler_client **) utarray_front( algorithm_state->available_workers); p != NULL; p = (local_scheduler_client **) utarray_next( algorithm_state->available_workers, p)) { DCHECK(*p != worker); } + /* Check that the worker isn't in the list of blocked workers. */ + for (local_scheduler_client **p = (local_scheduler_client **) utarray_front( + algorithm_state->blocked_workers); + p != NULL; p = (local_scheduler_client **) utarray_next( + algorithm_state->blocked_workers, p)) { + DCHECK(*p != worker); + } + /* If the worker was executing a task, it must have finished, so remove it + * from the list of executing workers. */ + for (int i = 0; i < utarray_len(algorithm_state->executing_workers); ++i) { + local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr( + algorithm_state->executing_workers, i); + if (*p == worker) { + utarray_erase(algorithm_state->executing_workers, i, 1); + break; + } + } + /* Check that we actually erased the worker. */ + for (int i = 0; i < utarray_len(algorithm_state->executing_workers); ++i) { + local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr( + algorithm_state->executing_workers, i); + DCHECK(*p != worker); + } + /* Add worker to the list of available workers. */ utarray_push_back(algorithm_state->available_workers, &worker); @@ -954,6 +1006,88 @@ void handle_actor_worker_available(local_scheduler_state *state, dispatch_actor_task(state, algorithm_state, actor_id); } +void handle_worker_blocked(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_client *worker) { + /* Find the worker in the list of executing workers. */ + for (int i = 0; i < utarray_len(algorithm_state->executing_workers); ++i) { + local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr( + algorithm_state->executing_workers, i); + if (*p == worker) { + /* Remove the worker from the list of executing workers. */ + utarray_erase(algorithm_state->executing_workers, i, 1); + + /* Check that the worker isn't in the list of blocked workers. */ + for (local_scheduler_client **q = + (local_scheduler_client **) utarray_front( + algorithm_state->blocked_workers); + q != NULL; q = (local_scheduler_client **) utarray_next( + algorithm_state->blocked_workers, q)) { + DCHECK(*q != worker); + } + + /* Return the resources that the blocked worker was using. */ + CHECK(worker->task_in_progress != NULL); + task_spec *spec = task_task_spec(worker->task_in_progress); + update_dynamic_resources(state, spec, true); + /* Add the worker to the list of blocked workers. */ + worker->is_blocked = true; + utarray_push_back(algorithm_state->blocked_workers, &worker); + + return; + } + } + + /* The worker should have been in the list of executing workers, so this line + * should be unreachable. */ + LOG_FATAL( + "Worker registered as blocked, but it was not in the list of executing " + "workers."); +} + +void handle_worker_unblocked(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_client *worker) { + /* Find the worker in the list of blocked workers. */ + for (int i = 0; i < utarray_len(algorithm_state->blocked_workers); ++i) { + local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr( + algorithm_state->blocked_workers, i); + if (*p == worker) { + /* Remove the worker from the list of blocked workers. */ + utarray_erase(algorithm_state->blocked_workers, i, 1); + + /* Check that the worker isn't in the list of executing workers. */ + for (local_scheduler_client **q = + (local_scheduler_client **) utarray_front( + algorithm_state->executing_workers); + q != NULL; q = (local_scheduler_client **) utarray_next( + algorithm_state->executing_workers, q)) { + DCHECK(*q != worker); + } + + /* Lease back the resources that the blocked worker will need. */ + /* TODO(swang): Leasing back the resources to blocked workers can cause + * us to transiently exceed the maximum number of resources. This can be + * fixed by having blocked workers explicitly yield and wait to be given + * back resources before continuing execution. */ + CHECK(worker->task_in_progress != NULL); + task_spec *spec = task_task_spec(worker->task_in_progress); + update_dynamic_resources(state, spec, false); + /* Add the worker to the list of executing workers. */ + worker->is_blocked = false; + utarray_push_back(algorithm_state->executing_workers, &worker); + + return; + } + } + + /* The worker should have been in the list of blocked workers, so this line + * should be unreachable. */ + LOG_FATAL( + "Worker registered as unblocked, but it was not in the list of blocked " + "workers."); +} + void handle_object_available(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, object_id object_id) { @@ -1064,3 +1198,11 @@ int num_dispatch_tasks(scheduling_algorithm_state *algorithm_state) { DL_COUNT(algorithm_state->dispatch_task_queue, elt, count); return count; } + +void print_worker_info(const char *message, + scheduling_algorithm_state *algorithm_state) { + LOG_DEBUG("%s: %d available, %d executing, %d blocked", message, + utarray_len(algorithm_state->available_workers), + utarray_len(algorithm_state->executing_workers), + utarray_len(algorithm_state->blocked_workers)); +} diff --git a/src/photon/photon_algorithm.h b/src/photon/photon_algorithm.h index 4324e8130..d34f3c674 100644 --- a/src/photon/photon_algorithm.h +++ b/src/photon/photon_algorithm.h @@ -139,7 +139,7 @@ void handle_object_available(local_scheduler_state *state, void handle_object_removed(local_scheduler_state *state, object_id object_id); /** - * This function is called when a new worker becomes available + * This function is called when a new worker becomes available. * * @param state The state of the local scheduler. * @param algorithm_state State maintained by the scheduling algorithm. @@ -189,6 +189,32 @@ void handle_actor_worker_disconnect(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, actor_id actor_id); +/** + * This function is called when a worker that was executing a task becomes + * blocked on an object that isn't available locally yet. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. + * @param worker The worker that is blocked. + * @return Void. + */ +void handle_worker_blocked(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_client *worker); + +/** + * This function is called when a worker that was blocked on an object that + * wasn't available locally yet becomes unblocked. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. + * @param worker The worker that is now unblocked. + * @return Void. + */ +void handle_worker_unblocked(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_client *worker); + /** * This function fetches queued task's missing object dependencies. It is * called every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS. @@ -201,6 +227,17 @@ void handle_actor_worker_disconnect(local_scheduler_state *state, */ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context); +/** + * A helper function to print debug information about the current state and + * number of workers. + * + * @param message A message to identify the log message. + * @param algorithm_state State maintained by the scheduling algorithm. + * @return Void. + */ +void print_worker_info(const char *message, + scheduling_algorithm_state *algorithm_state); + /** The following methods are for testing purposes only. */ #ifdef PHOTON_TEST /** diff --git a/src/photon/photon_client.c b/src/photon/photon_client.c index 290286df1..8be532bbe 100644 --- a/src/photon/photon_client.c +++ b/src/photon/photon_client.c @@ -76,3 +76,7 @@ void photon_reconstruct_object(photon_conn *conn, object_id object_id) { void photon_log_message(photon_conn *conn) { write_message(conn->conn, LOG_MESSAGE, 0, NULL); } + +void photon_notify_unblocked(photon_conn *conn) { + write_message(conn->conn, NOTIFY_UNBLOCKED, 0, NULL); +} diff --git a/src/photon/photon_client.h b/src/photon/photon_client.h index fb5f2ad42..a0276a7cc 100644 --- a/src/photon/photon_client.h +++ b/src/photon/photon_client.h @@ -93,4 +93,12 @@ void photon_reconstruct_object(photon_conn *conn, object_id object_id); */ void photon_log_message(photon_conn *conn); +/** + * Notify the local scheduler that this client (worker) is no longer blocked. + * + * @param conn The connection information. + * @return Void. + */ +void photon_notify_unblocked(photon_conn *conn); + #endif diff --git a/src/photon/photon_extension.c b/src/photon/photon_extension.c index 05d3d1498..1970cd579 100644 --- a/src/photon/photon_extension.c +++ b/src/photon/photon_extension.c @@ -80,6 +80,11 @@ static PyObject *PyPhotonClient_log_event(PyObject *self, PyObject *args) { Py_RETURN_NONE; } +static PyObject *PyPhotonClient_notify_unblocked(PyObject *self) { + photon_notify_unblocked(((PyPhotonClient *) self)->photon_connection); + Py_RETURN_NONE; +} + static PyMethodDef PyPhotonClient_methods[] = { {"submit", (PyCFunction) PyPhotonClient_submit, METH_VARARGS, "Submit a task to the local scheduler."}, @@ -89,6 +94,8 @@ static PyMethodDef PyPhotonClient_methods[] = { METH_VARARGS, "Ask the local scheduler to reconstruct an object."}, {"log_event", (PyCFunction) PyPhotonClient_log_event, METH_VARARGS, "Log an event to the event log through the local scheduler."}, + {"notify_unblocked", (PyCFunction) PyPhotonClient_notify_unblocked, + METH_NOARGS, "Notify the local scheduler that we are unblocked."}, {NULL} /* Sentinel */ }; diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 4ad46e873..dccd57f0f 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -68,6 +68,9 @@ void print_resource_info(const local_scheduler_state *state, * @return Void. */ void kill_worker(local_scheduler_client *worker, bool wait) { + /* TODO(swang): This method should also propagate changes to other parts of + * the system to reflect the killed task in progress, if there was one. This + * includes updating dynamic resources and updating the task table. */ /* Erase the worker from the array of workers. */ local_scheduler_state *state = worker->local_scheduler_state; int num_workers = utarray_len(state->workers); @@ -96,6 +99,7 @@ void kill_worker(local_scheduler_client *worker, bool wait) { /* Wait for the process to exit. */ waitpid(worker->pid, NULL, 0); } + LOG_INFO("Killed worker with pid %d", worker->pid); } /* Clean up the client socket after killing the worker so that the worker @@ -187,12 +191,15 @@ void free_local_scheduler(local_scheduler_state *state) { */ void start_worker(local_scheduler_state *state, actor_id actor_id) { /* We can't start a worker if we don't have the path to the worker script. */ - CHECK(state->config.start_worker_command != NULL); + if (state->config.start_worker_command == NULL) { + LOG_WARN("No valid command to start worker provided. Cannot start worker."); + return; + } /* Launch the process to create the worker. */ pid_t pid = fork(); if (pid != 0) { utarray_push_back(state->child_pids, &pid); - LOG_DEBUG("Started worker with pid %d", pid); + LOG_INFO("Started worker with pid %d", pid); return; } @@ -279,6 +286,11 @@ local_scheduler_state *init_local_scheduler( } else { state->config.start_worker_command = NULL; } + if (start_worker_command == NULL) { + LOG_WARN( + "No valid command to start a worker provided, local scheduler will not " + "start any workers."); + } state->config.global_scheduler_exists = global_scheduler_exists; state->loop = loop; @@ -343,6 +355,29 @@ local_scheduler_state *init_local_scheduler( return state; } +void update_dynamic_resources(local_scheduler_state *state, + task_spec *spec, + bool return_resources) { + for (int i = 0; i < MAX_RESOURCE_INDEX; ++i) { + double resource = task_spec_get_required_resource(spec, i); + if (!return_resources) { + /* If we are not returning resources, we are leasing them, so we want to + * subtract the resource quantities from our accounting. */ + resource *= -1; + } + /* Add or subtract the task's resources from our count. */ + state->dynamic_resources[i] += resource; + + if (!return_resources && state->dynamic_resources[i] < 0) { + /* We are using more resources than we have been allocated. */ + LOG_WARN("photon dynamic resources dropped to %8.4f\t%8.4f\n", + state->dynamic_resources[0], state->dynamic_resources[1]); + } + CHECK(state->dynamic_resources[i] <= state->static_resources[i]); + } + print_resource_info(state, spec); +} + void assign_task_to_worker(local_scheduler_state *state, task_spec *spec, local_scheduler_client *worker) { @@ -362,13 +397,7 @@ void assign_task_to_worker(local_scheduler_state *state, /* Resource accounting: * Update dynamic resource vector in the local scheduler state. */ - for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { - state->dynamic_resources[i] -= task_spec_get_required_resource(spec, i); - CHECKM(state->dynamic_resources[i] >= 0, - "photon dynamic resources dropped to %8.4f\t%8.4f\n", - state->dynamic_resources[0], state->dynamic_resources[1]); - } - print_resource_info(state, spec); + update_dynamic_resources(state, spec, false); task *task = alloc_task(spec, TASK_STATUS_RUNNING, state->db ? get_db_client_id(state->db) : NIL_ID); /* Record which task this worker is executing. This will be freed in @@ -580,12 +609,7 @@ void process_message(event_loop *loop, if (worker->task_in_progress != NULL) { task_spec *spec = task_task_spec(worker->task_in_progress); /* Return dynamic resources back for the task in progress. */ - for (int i = 0; i < MAX_RESOURCE_INDEX; i++) { - state->dynamic_resources[i] += task_spec_get_required_resource(spec, i); - /* Sanity-check resource vector boundary conditions. */ - CHECK(state->dynamic_resources[i] <= state->static_resources[i]); - } - print_resource_info(state, spec); + update_dynamic_resources(state, spec, true); /* If we're connected to Redis, update tables. */ if (state->db != NULL) { /* Update control state tables. */ @@ -608,6 +632,16 @@ void process_message(event_loop *loop, } } break; case RECONSTRUCT_OBJECT: { + if (worker->task_in_progress != NULL && !worker->is_blocked) { + /* TODO(swang): For now, we don't handle blocked actors. */ + if (actor_ids_equal(worker->actor_id, NIL_ACTOR_ID)) { + /* If the worker was executing a task (i.e. non-driver) and it wasn't + * already blocked on an object that's not locally available, update its + * state to blocked. */ + handle_worker_blocked(state, state->algorithm_state, worker); + print_worker_info("Reconstructing", state->algorithm_state); + } + } object_id *obj_id = (object_id *) utarray_front(state->input_buffer); reconstruct_object(state, *obj_id); } break; @@ -622,6 +656,18 @@ void process_message(event_loop *loop, } break; case LOG_MESSAGE: { } break; + case NOTIFY_UNBLOCKED: { + if (worker->task_in_progress != NULL) { + /* TODO(swang): For now, we don't handle blocked actors. */ + if (actor_ids_equal(worker->actor_id, NIL_ACTOR_ID)) { + /* If the worker was executing a task (i.e. non-driver), update its + * state to not blocked. */ + CHECK(worker->is_blocked); + handle_worker_unblocked(state, state->algorithm_state, worker); + } + } + print_worker_info("Worker unblocked", state->algorithm_state); + } break; default: /* This code should be unreachable. */ CHECK(0); @@ -639,6 +685,7 @@ void new_client_connection(event_loop *loop, local_scheduler_client *worker = malloc(sizeof(local_scheduler_client)); worker->sock = new_socket; worker->task_in_progress = NULL; + worker->is_blocked = false; worker->pid = 0; worker->is_child = false; worker->actor_id = NIL_ACTOR_ID; diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index 8b96322d7..93c1df52f 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -67,6 +67,44 @@ void reconstruct_object(local_scheduler_state *state, object_id object_id); void print_resource_info(const local_scheduler_state *s, const task_spec *spec); +/** + * Kill a worker. + * + * @param worker The local scheduler client to kill. + * @param wait A boolean representing whether to wait for the killed worker to + * exit. + * @param Void. + */ +void kill_worker(local_scheduler_client *worker, bool wait); + +/** + * Start a worker. This forks a new worker process that can be added to the + * pool of available workers, pending registration of its PID with the local + * scheduler. + * + * @param state The local scheduler state. + * @param actor_id The ID of the actor for this worker. If this worker is not an + * actor, then NIL_ACTOR_ID should be used. + * @param Void. + */ +void start_worker(local_scheduler_state *state, actor_id actor_id); + +/** + * Update our accounting for the current resources being used, according to + * some task that is starting or finishing execution. + * + * @param state The local scheduler state. + * @param spec The specification for the task that is or was using resources. + * @param return_resources A boolean representing whether the task is starting + * or finishing execution. If true, then the task is finishing execution + * (possibly temporarily), so it will add to the dynamic resources + * available. Else, it will take from the dynamic resources available. + * @return Void. + */ +void update_dynamic_resources(local_scheduler_state *state, + task_spec *spec, + bool return_resources); + /** The following methods are for testing purposes only. */ #ifdef PHOTON_TEST local_scheduler_state *init_local_scheduler( @@ -92,17 +130,6 @@ void process_message(event_loop *loop, void *context, int events); -void kill_worker(local_scheduler_client *worker, bool wait); - -/** - * Start a new worker by forking. - * - * @param state The local scheduler state. - * @param actor_id The ID of the actor for this worker. If this worker is not an - * actor, then NIL_ACTOR_ID should be used. - * @return Void. - */ -void start_worker(local_scheduler_state *state, actor_id actor_id); #endif #endif /* PHOTON_SCHEDULER_H */ diff --git a/test/runtest.py b/test/runtest.py index 7fec277f9..aeb5a0180 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -367,7 +367,7 @@ class APITest(unittest.TestCase): ray.worker.cleanup() def testWait(self): - ray.init(num_workers=1) + ray.init(num_workers=1, num_cpus=1) @ray.remote def f(delay): @@ -1115,6 +1115,55 @@ class ResourcesTest(unittest.TestCase): ray.worker.cleanup() +class WorkerPoolTests(unittest.TestCase): + + def tearDown(self): + ray.worker.cleanup() + + def testNoWorkers(self): + ray.init(num_workers=0) + + @ray.remote + def f(): + return 1 + + # Make sure we can call a remote function. This will require starting a new + # worker. + ray.get(f.remote()) + + ray.get([f.remote() for _ in range(100)]) + + def testBlockingTasks(self): + ray.init(num_workers=1) + + @ray.remote + def f(i, j): + return (i, j) + + @ray.remote + def g(i): + # Each instance of g submits and blocks on the result of another remote + # task. + object_ids = [f.remote(i, j) for j in range(10)] + return ray.get(object_ids) + + ray.get([g.remote(i) for i in range(100)]) + + @ray.remote + def _sleep(i): + time.sleep(1) + return (i) + + @ray.remote + def sleep(): + # Each instance of sleep submits and blocks on the result of another + # remote task, which takes one second to execute. + ray.get([_sleep.remote(i) for i in range(10)]) + + ray.get(sleep.remote()) + + ray.worker.cleanup() + class SchedulingAlgorithm(unittest.TestCase): def attempt_to_load_balance(self, remote_function, args, total_tasks, diff --git a/test/stress_tests.py b/test/stress_tests.py index 918d543d7..71598a588 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -300,5 +300,27 @@ class ReconstructionTestsMultinode(ReconstructionTests): # one worker each. num_local_schedulers = 4 +# NOTE(swang): This test tries to launch 1000 workers and breaks. +#class WorkerPoolTests(unittest.TestCase): +# +# def tearDown(self): +# ray.worker.cleanup() +# +# def testBlockingTasks(self): +# @ray.remote +# def f(i, j): +# return (i, j) +# +# @ray.remote +# def g(i): +# # Each instance of g submits and blocks on the result of another remote +# # task. +# object_ids = [f.remote(i, j) for j in range(10)] +# return ray.get(object_ids) +# +# ray.init(num_workers=1) +# ray.get([g.remote(i) for i in range(1000)]) +# ray.worker.cleanup() + if __name__ == "__main__": unittest.main(verbosity=2)