From 4658d0a180431e4b17cb9cd48ba9379d04d8c10a Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Mon, 19 Mar 2018 20:24:35 -0700 Subject: [PATCH] =?UTF-8?q?Print=20error=20when=20actor=20takes=20too=20lo?= =?UTF-8?q?ng=20to=20start,=20and=20refactor=20error=20me=E2=80=A6=20(#174?= =?UTF-8?q?7)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Print error when actor takes too long to start, and refactor error message pushing. * Print warning every ten seconds. * Fix linting and tests. * Fix tests. --- python/ray/worker.py | 17 ------ src/common/state/error_table.cc | 28 ++++----- src/common/state/error_table.h | 57 +++++++++---------- src/common/state/ray_config.h | 13 ++++- src/common/state/redis.cc | 15 +++-- src/local_scheduler/local_scheduler.cc | 26 ++++++--- .../local_scheduler_algorithm.cc | 22 +++++++ src/plasma/plasma_manager.cc | 9 ++- test/failure_test.py | 2 +- test/stress_tests.py | 4 -- 10 files changed, 103 insertions(+), 90 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 740f134dc..e7adf3218 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1100,23 +1100,6 @@ def error_info(worker=global_worker): for error_key in error_keys: if error_applies_to_driver(error_key, worker=worker): error_contents = worker.redis_client.hgetall(error_key) - # If the error is an object hash mismatch, look up the function - # name for the nondeterministic task. TODO(rkn): Change this so - # that we don't have to look up additional information. Ideally all - # relevant information would already be in error_contents. - error_type = error_contents[b"type"] - if error_type in [OBJECT_HASH_MISMATCH_ERROR_TYPE, - PUT_RECONSTRUCTION_ERROR_TYPE]: - function_id = error_contents[b"data"] - if function_id == NIL_FUNCTION_ID: - function_name = b"Driver" - else: - task_driver_id = worker.task_driver_id - function_name = worker.redis_client.hget( - (b"RemoteFunction:" + task_driver_id.id() + - b":" + function_id), - "name") - error_contents[b"data"] = function_name errors.append(error_contents) return errors diff --git a/src/common/state/error_table.cc b/src/common/state/error_table.cc index 98e85e340..9e25d2fa2 100644 --- a/src/common/state/error_table.cc +++ b/src/common/state/error_table.cc @@ -2,29 +2,21 @@ #include "redis.h" const char *error_types[] = {"object_hash_mismatch", "put_reconstruction", - "worker_died"}; -const char *error_messages[] = { - "A nondeterministic task was reexecuted.", - "An object created by ray.put was evicted and could not be reconstructed. " - "The driver may need to be restarted.", - "A worker died or was killed while executing a task."}; + "worker_died", "actor_not_created"}; void push_error(DBHandle *db_handle, DBClientID driver_id, - int error_index, - size_t data_length, - const unsigned char *data) { - RAY_CHECK(error_index >= 0 && error_index < MAX_ERROR_INDEX); + int error_type, + const std::string &error_message) { + int64_t message_size = error_message.size(); + /* Allocate a struct to hold the error information. */ - ErrorInfo *info = (ErrorInfo *) malloc(sizeof(ErrorInfo) + data_length); + ErrorInfo *info = (ErrorInfo *) malloc(sizeof(ErrorInfo) + message_size); info->driver_id = driver_id; - info->error_index = error_index; - info->data_length = data_length; - memcpy(info->data, data, data_length); - /* Generate a random key to identify this error message. */ - RAY_CHECK(sizeof(info->error_key) >= sizeof(UniqueID)); - UniqueID error_key = UniqueID::from_random(); - memcpy(info->error_key, error_key.data(), sizeof(info->error_key)); + info->error_type = error_type; + info->error_key = UniqueID::from_random(); + info->size = message_size; + memcpy(info->error_message, error_message.data(), message_size); init_table_callback(db_handle, UniqueID::nil(), __func__, new CommonCallbackData(info), NULL, NULL, diff --git a/src/common/state/error_table.h b/src/common/state/error_table.h index 415ffbe4f..66ac9da60 100644 --- a/src/common/state/error_table.h +++ b/src/common/state/error_table.h @@ -4,50 +4,47 @@ #include "db.h" #include "table.h" +/// Data that is needed to push an error. typedef struct { + /// The ID of the driver to push the error to. DBClientID driver_id; - unsigned char error_key[20]; - int error_index; - size_t data_length; - unsigned char data[0]; + /// An index into the error_types array indicating the type of the error. + int error_type; + /// The key to use for the error message in Redis. + UniqueID error_key; + /// The length of the error message. + int64_t size; + /// The error message. + uint8_t error_message[0]; } ErrorInfo; -/** An error_index may be used as an index into error_types and - * error_messages. */ +/// An error_index may be used as an index into error_types. typedef enum { - /** An object was added with a different hash from the existing - * one. */ + /// An object was added with a different hash from the existing one. OBJECT_HASH_MISMATCH_ERROR_INDEX = 0, - /** An object that was created through a ray.put is lost. */ + /// An object that was created through a ray.put is lost. PUT_RECONSTRUCTION_ERROR_INDEX, - /** A worker died or was killed while executing a task. */ + /// A worker died or was killed while executing a task. WORKER_DIED_ERROR_INDEX, - /** The total number of error types. */ + /// An actor hasn't been created for a while. + ACTOR_NOT_CREATED_ERROR_INDEX, + /// The total number of error types. MAX_ERROR_INDEX } error_index; -/** Information about the error to be displayed to the user. */ extern const char *error_types[]; -extern const char *error_messages[]; -/** - * Push an error to the given Python driver. - * - * @param db_handle Database handle. - * @param driver_id The ID of the Python driver to push the error - * to. - * @param error_index The error information at this index in - * error_types and error_messages will be included in the - * error pushed to the driver. - * @param data_length The length of the custom data to be included - * in the error. - * @param data The custom data to be included in the error. - * @return Void. - */ +/// Push an error to the given Python driver. +/// +/// \param db_handle Database handle. +/// \param driver_id The ID of the Python driver to push the error to. +/// \param error_type An index specifying the type of the error. This should +/// be a value from the error_index enum. +/// \param error_message The error message to print. +/// \return Void. void push_error(DBHandle *db_handle, DBClientID driver_id, - int error_index, - size_t data_length, - const unsigned char *data); + int error_type, + const std::string &error_message); #endif diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h index 36c79eaed..ee2830fd9 100644 --- a/src/common/state/ray_config.h +++ b/src/common/state/ray_config.h @@ -82,6 +82,10 @@ class RayConfig { int64_t max_tasks_to_spillback() const { return max_tasks_to_spillback_; } + int64_t actor_creation_num_spillbacks_warning() const { + return actor_creation_num_spillbacks_warning_; + } + private: RayConfig() : ray_protocol_version_(0x0000000000000000), @@ -108,7 +112,8 @@ class RayConfig { redis_db_connect_wait_milliseconds_(100), plasma_default_release_delay_(64), L3_cache_size_bytes_(100000000), - max_tasks_to_spillback_(10) {} + max_tasks_to_spillback_(10), + actor_creation_num_spillbacks_warning_(100) {} ~RayConfig() {} @@ -185,6 +190,12 @@ class RayConfig { /// Constants for the spillback scheduling policy. int64_t max_tasks_to_spillback_; + + /// Every time an actor creation task has been spilled back a number of times + /// that is a multiple of this quantity, a warning will be pushed to the + /// corresponding driver. Since spillback currently occurs on a 100ms timer, + /// a value of 100 corresponds to a warning every 10 seconds. + int64_t actor_creation_num_spillbacks_warning_; }; #endif // RAY_CONFIG_H diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index f2d80af8e..e69d21208 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -1665,7 +1665,7 @@ void redis_push_error_hmset_callback(redisAsyncContext *c, int status = redisAsyncCommand( db->context, redis_push_error_rpush_callback, (void *) callback_data->timer_id, "RPUSH ErrorKeys Error:%b:%b", - info->driver_id.data(), sizeof(info->driver_id), info->error_key, + info->driver_id.data(), sizeof(info->driver_id), info->error_key.data(), sizeof(info->error_key)); if ((status == REDIS_ERR) || db->subscribe_context->err) { LOG_REDIS_DEBUG(db->subscribe_context, "error in redis_push_error rpush"); @@ -1675,18 +1675,17 @@ void redis_push_error_hmset_callback(redisAsyncContext *c, void redis_push_error(TableCallbackData *callback_data) { DBHandle *db = callback_data->db_handle; ErrorInfo *info = (ErrorInfo *) callback_data->data->Get(); - RAY_CHECK(info->error_index < MAX_ERROR_INDEX && info->error_index >= 0); - /* Look up the error type. */ - const char *error_type = error_types[info->error_index]; - const char *error_message = error_messages[info->error_index]; + RAY_CHECK(info->error_type < MAX_ERROR_INDEX && info->error_type >= 0); + /// Look up the error type. + const char *error_type = error_types[info->error_type]; /* Set the error information. */ int status = redisAsyncCommand( db->context, redis_push_error_hmset_callback, (void *) callback_data->timer_id, - "HMSET Error:%b:%b type %s message %s data %b", info->driver_id.data(), - sizeof(info->driver_id), info->error_key, sizeof(info->error_key), - error_type, error_message, info->data, info->data_length); + "HMSET Error:%b:%b type %s message %b data %b", info->driver_id.data(), + sizeof(info->driver_id), info->error_key.data(), sizeof(info->error_key), + error_type, info->error_message, info->size, "None", strlen("None")); if ((status == REDIS_ERR) || db->subscribe_context->err) { LOG_REDIS_DEBUG(db->subscribe_context, "error in redis_push_error hmset"); } diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 5f90338d9..66ae094e6 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -128,9 +128,13 @@ void kill_worker(LocalSchedulerState *state, * error message to the driver responsible for the task. */ if (worker->task_in_progress != NULL && !cleanup && !suppress_warning) { TaskSpec *spec = Task_task_execution_spec(worker->task_in_progress)->Spec(); - TaskID task_id = TaskSpec_task_id(spec); + + std::ostringstream error_message; + error_message << "The worker with ID " << worker->client_id << " died or " + << "was killed while executing the task with ID " + << TaskSpec_task_id(spec); push_error(state->db, TaskSpec_driver_id(spec), WORKER_DIED_ERROR_INDEX, - sizeof(task_id), task_id.data()); + error_message.str()); } /* Clean up the task in progress. */ @@ -753,20 +757,26 @@ void reconstruct_put_task_update_callback(Task *task, * by `ray.put` was not able to be reconstructed, and the workload will * likely hang. Push an error to the appropriate driver. */ TaskSpec *spec = Task_task_execution_spec(task)->Spec(); - FunctionID function = TaskSpec_function(spec); + + std::ostringstream error_message; + error_message << "The task with ID " << TaskSpec_task_id(spec) + << " is still executing and so the object created by " + << "ray.put could not be reconstructed."; push_error(state->db, TaskSpec_driver_id(spec), - PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function), - function.data()); + PUT_RECONSTRUCTION_ERROR_INDEX, error_message.str()); } } else { /* (1) The task is still executing and it is the driver task. We cannot * restart the driver task, so the workload will hang. Push an error to * the appropriate driver. */ TaskSpec *spec = Task_task_execution_spec(task)->Spec(); - FunctionID function = TaskSpec_function(spec); + + std::ostringstream error_message; + error_message << "The task with ID " << TaskSpec_task_id(spec) + << " is a driver task and so the object created by ray.put " + << "could not be reconstructed."; push_error(state->db, TaskSpec_driver_id(spec), - PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function), - function.data()); + PUT_RECONSTRUCTION_ERROR_INDEX, error_message.str()); } } else { /* The update to TASK_STATUS_RECONSTRUCTING succeeded, so continue with diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index b471da791..a8f2f582a 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -7,6 +7,7 @@ #include "state/task_table.h" #include "state/actor_notification_table.h" #include "state/db_client_table.h" +#include "state/error_table.h" #include "state/local_scheduler_table.h" #include "state/object_table.h" #include "local_scheduler_shared.h" @@ -857,6 +858,27 @@ void spillback_tasks_handler(LocalSchedulerState *state) { for (int64_t i = 0; i < num_to_spillback; i++) { it->IncrementSpillbackCount(); + // If an actor hasn't been created for a while, push a warning to the + // driver. + if (it->SpillbackCount() % + RayConfig::instance().actor_creation_num_spillbacks_warning() == + 0) { + TaskSpec *spec = it->Spec(); + if (TaskSpec_is_actor_creation_task(spec)) { + std::ostringstream error_message; + error_message << "The actor with ID " + << TaskSpec_actor_creation_id(spec) << " is taking a " + << "while to be created. It is possible that the " + << "cluster does not have enough resources to place this " + << "actor. Try reducing the number of actors created or " + << "increasing the number of slots available by using " + << "the --num-cpus, --num-gpus, and --resources flags."; + + push_error(state->db, TaskSpec_driver_id(spec), + ACTOR_NOT_CREATED_ERROR_INDEX, error_message.str()); + } + } + give_task_to_global_scheduler(state, algorithm_state, *it); // Dequeue the task. it = algorithm_state->dispatch_task_queue->erase(it); diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index a557fdeb2..3b4223d91 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -1307,12 +1307,15 @@ void log_object_hash_mismatch_error_task_callback(Task *task, RAY_CHECK(task != NULL); PlasmaManagerState *state = (PlasmaManagerState *) user_context; TaskSpec *spec = Task_task_execution_spec(task)->Spec(); - FunctionID function = TaskSpec_function(spec); /* Push the error to the Python driver that caused the nondeterministic task * to be submitted. */ + std::ostringstream error_message; + error_message << "An object created by the task with ID " + << TaskSpec_task_id(spec) << " was created with a different " + << "hash. This may mean that a non-deterministic task was " + << "reexecuted."; push_error(state->db, TaskSpec_driver_id(spec), - OBJECT_HASH_MISMATCH_ERROR_INDEX, sizeof(function), - function.data()); + OBJECT_HASH_MISMATCH_ERROR_INDEX, error_message.str()); } void log_object_hash_mismatch_error_result_callback(ObjectID object_id, diff --git a/test/failure_test.py b/test/failure_test.py index f0c0c1b9c..2ff731f98 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -284,7 +284,7 @@ class WorkerDeath(unittest.TestCase): wait_for_errors(b"worker_died", 1) self.assertEqual(len(ray.error_info()), 1) - self.assertIn("A worker died or was killed while executing a task.", + self.assertIn("died or was killed while executing the task", ray.error_info()[0][b"message"].decode("ascii")) def testActorWorkerDying(self): diff --git a/test/stress_tests.py b/test/stress_tests.py index 37d0891f2..46c2ab25f 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -436,9 +436,6 @@ class ReconstructionTests(unittest.TestCase): # Make sure all the errors have the correct type. self.assertTrue(all(error[b"type"] == b"object_hash_mismatch" for error in errors)) - # Make sure all the errors have the correct function name. - self.assertTrue(all(error[b"data"] == b"__main__.foo" - for error in errors)) @unittest.skipIf( os.environ.get('RAY_USE_NEW_GCS', False), @@ -485,7 +482,6 @@ class ReconstructionTests(unittest.TestCase): errors = self.wait_for_errors(error_check) self.assertTrue(all(error[b"type"] == b"put_reconstruction" for error in errors)) - self.assertTrue(all(error[b"data"] == b"Driver" for error in errors)) class ReconstructionTestsMultinode(ReconstructionTests):