From fa363a5a3a2bdcb1f4ceb662fe956419a212f478 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 6 Apr 2017 00:02:39 -0700 Subject: [PATCH] Notify driver when a worker dies while executing a task. (#419) * Notify driver when a worker dies while executing a task. * Fix linting. * Don't push error when local scheduler is cleaning up. --- src/common/state/error_table.h | 7 +++++-- src/local_scheduler/local_scheduler.cc | 9 +++++++++ test/failure_test.py | 21 +++++++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/common/state/error_table.h b/src/common/state/error_table.h index 9c68ad112..f2f7c5651 100644 --- a/src/common/state/error_table.h +++ b/src/common/state/error_table.h @@ -20,17 +20,20 @@ typedef enum { OBJECT_HASH_MISMATCH_ERROR_INDEX = 0, /** An object that was created through a ray.put is lost. */ PUT_RECONSTRUCTION_ERROR_INDEX, + /** A worker died or was killed while executing a task. */ + WORKER_DIED_ERROR_INDEX, /** The total number of error types. */ MAX_ERROR_INDEX } error_index; /** Information about the error to be displayed to the user. */ static const char *error_types[] = {"object_hash_mismatch", - "put_reconstruction"}; + "put_reconstruction", "worker_died"}; static const char *error_messages[] = { "A nondeterministic task was reexecuted.", "An object created by ray.put was evicted and could not be reconstructed. " - "The driver may need to be restarted."}; + "The driver may need to be restarted.", + "A worker died or was killed while executing a task."}; /** * Push an error to the given Python driver. diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 50354057a..ef837a7a8 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -123,6 +123,15 @@ void kill_worker(LocalSchedulerClient *worker, bool cleanup) { LOG_INFO("Killed worker with pid %d", worker->pid); } + /* If this worker is still running a task and we aren't cleaning up, push an + * error message to the driver responsible for the task. */ + if (worker->task_in_progress != NULL && !cleanup) { + TaskSpec *spec = Task_task_spec(worker->task_in_progress); + TaskID task_id = TaskSpec_task_id(spec); + push_error(state->db, TaskSpec_driver_id(spec), WORKER_DIED_ERROR_INDEX, + sizeof(task_id), task_id.id); + } + /* Clean up the task in progress. */ if (worker->task_in_progress) { if (!worker->is_blocked) { diff --git a/test/failure_test.py b/test/failure_test.py index d1fdb2554..b9d6312c6 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -353,5 +353,26 @@ class ActorTest(unittest.TestCase): ray.worker.cleanup() +class WorkerDeath(unittest.TestCase): + + def testWorkerDying(self): + ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) + + # Define a remote function that will kill the worker that runs it. + @ray.remote + def f(): + eval("exit()") + + f.remote() + + wait_for_errors(b"worker_died", 1) + + self.assertEqual(len(ray.error_info()), 1) + self.assertIn("A worker died or was killed while executing a task.", + ray.error_info()[0][b"message"].decode("ascii")) + + ray.worker.cleanup() + + if __name__ == "__main__": unittest.main(verbosity=2)