Print error when actor takes too long to start, and refactor error me… (#1747)

* Print error when actor takes too long to start, and refactor error message pushing.

* Print warning every ten seconds.

* Fix linting and tests.

* Fix tests.
This commit is contained in:
Robert Nishihara 2018-03-19 20:24:35 -07:00 committed by Philipp Moritz
parent 73bb149c8a
commit 4658d0a180
10 changed files with 103 additions and 90 deletions

View file

@ -1100,23 +1100,6 @@ def error_info(worker=global_worker):
for error_key in error_keys:
if error_applies_to_driver(error_key, worker=worker):
error_contents = worker.redis_client.hgetall(error_key)
# If the error is an object hash mismatch, look up the function
# name for the nondeterministic task. TODO(rkn): Change this so
# that we don't have to look up additional information. Ideally all
# relevant information would already be in error_contents.
error_type = error_contents[b"type"]
if error_type in [OBJECT_HASH_MISMATCH_ERROR_TYPE,
PUT_RECONSTRUCTION_ERROR_TYPE]:
function_id = error_contents[b"data"]
if function_id == NIL_FUNCTION_ID:
function_name = b"Driver"
else:
task_driver_id = worker.task_driver_id
function_name = worker.redis_client.hget(
(b"RemoteFunction:" + task_driver_id.id() +
b":" + function_id),
"name")
error_contents[b"data"] = function_name
errors.append(error_contents)
return errors

View file

@ -2,29 +2,21 @@
#include "redis.h"
const char *error_types[] = {"object_hash_mismatch", "put_reconstruction",
"worker_died"};
const char *error_messages[] = {
"A nondeterministic task was reexecuted.",
"An object created by ray.put was evicted and could not be reconstructed. "
"The driver may need to be restarted.",
"A worker died or was killed while executing a task."};
"worker_died", "actor_not_created"};
void push_error(DBHandle *db_handle,
DBClientID driver_id,
int error_index,
size_t data_length,
const unsigned char *data) {
RAY_CHECK(error_index >= 0 && error_index < MAX_ERROR_INDEX);
int error_type,
const std::string &error_message) {
int64_t message_size = error_message.size();
/* Allocate a struct to hold the error information. */
ErrorInfo *info = (ErrorInfo *) malloc(sizeof(ErrorInfo) + data_length);
ErrorInfo *info = (ErrorInfo *) malloc(sizeof(ErrorInfo) + message_size);
info->driver_id = driver_id;
info->error_index = error_index;
info->data_length = data_length;
memcpy(info->data, data, data_length);
/* Generate a random key to identify this error message. */
RAY_CHECK(sizeof(info->error_key) >= sizeof(UniqueID));
UniqueID error_key = UniqueID::from_random();
memcpy(info->error_key, error_key.data(), sizeof(info->error_key));
info->error_type = error_type;
info->error_key = UniqueID::from_random();
info->size = message_size;
memcpy(info->error_message, error_message.data(), message_size);
init_table_callback(db_handle, UniqueID::nil(), __func__,
new CommonCallbackData(info), NULL, NULL,

View file

@ -4,50 +4,47 @@
#include "db.h"
#include "table.h"
/// Data that is needed to push an error.
typedef struct {
/// The ID of the driver to push the error to.
DBClientID driver_id;
unsigned char error_key[20];
int error_index;
size_t data_length;
unsigned char data[0];
/// An index into the error_types array indicating the type of the error.
int error_type;
/// The key to use for the error message in Redis.
UniqueID error_key;
/// The length of the error message.
int64_t size;
/// The error message.
uint8_t error_message[0];
} ErrorInfo;
/** An error_index may be used as an index into error_types and
* error_messages. */
/// An error_index may be used as an index into error_types.
typedef enum {
/** An object was added with a different hash from the existing
* one. */
/// An object was added with a different hash from the existing one.
OBJECT_HASH_MISMATCH_ERROR_INDEX = 0,
/** An object that was created through a ray.put is lost. */
/// An object that was created through a ray.put is lost.
PUT_RECONSTRUCTION_ERROR_INDEX,
/** A worker died or was killed while executing a task. */
/// A worker died or was killed while executing a task.
WORKER_DIED_ERROR_INDEX,
/** The total number of error types. */
/// An actor hasn't been created for a while.
ACTOR_NOT_CREATED_ERROR_INDEX,
/// The total number of error types.
MAX_ERROR_INDEX
} error_index;
/** Information about the error to be displayed to the user. */
extern const char *error_types[];
extern const char *error_messages[];
/**
* Push an error to the given Python driver.
*
* @param db_handle Database handle.
* @param driver_id The ID of the Python driver to push the error
* to.
* @param error_index The error information at this index in
* error_types and error_messages will be included in the
* error pushed to the driver.
* @param data_length The length of the custom data to be included
* in the error.
* @param data The custom data to be included in the error.
* @return Void.
*/
/// Push an error to the given Python driver.
///
/// \param db_handle Database handle.
/// \param driver_id The ID of the Python driver to push the error to.
/// \param error_type An index specifying the type of the error. This should
/// be a value from the error_index enum.
/// \param error_message The error message to print.
/// \return Void.
void push_error(DBHandle *db_handle,
DBClientID driver_id,
int error_index,
size_t data_length,
const unsigned char *data);
int error_type,
const std::string &error_message);
#endif

View file

@ -82,6 +82,10 @@ class RayConfig {
int64_t max_tasks_to_spillback() const { return max_tasks_to_spillback_; }
int64_t actor_creation_num_spillbacks_warning() const {
return actor_creation_num_spillbacks_warning_;
}
private:
RayConfig()
: ray_protocol_version_(0x0000000000000000),
@ -108,7 +112,8 @@ class RayConfig {
redis_db_connect_wait_milliseconds_(100),
plasma_default_release_delay_(64),
L3_cache_size_bytes_(100000000),
max_tasks_to_spillback_(10) {}
max_tasks_to_spillback_(10),
actor_creation_num_spillbacks_warning_(100) {}
~RayConfig() {}
@ -185,6 +190,12 @@ class RayConfig {
/// Constants for the spillback scheduling policy.
int64_t max_tasks_to_spillback_;
/// Every time an actor creation task has been spilled back a number of times
/// that is a multiple of this quantity, a warning will be pushed to the
/// corresponding driver. Since spillback currently occurs on a 100ms timer,
/// a value of 100 corresponds to a warning every 10 seconds.
int64_t actor_creation_num_spillbacks_warning_;
};
#endif // RAY_CONFIG_H

View file

@ -1665,7 +1665,7 @@ void redis_push_error_hmset_callback(redisAsyncContext *c,
int status = redisAsyncCommand(
db->context, redis_push_error_rpush_callback,
(void *) callback_data->timer_id, "RPUSH ErrorKeys Error:%b:%b",
info->driver_id.data(), sizeof(info->driver_id), info->error_key,
info->driver_id.data(), sizeof(info->driver_id), info->error_key.data(),
sizeof(info->error_key));
if ((status == REDIS_ERR) || db->subscribe_context->err) {
LOG_REDIS_DEBUG(db->subscribe_context, "error in redis_push_error rpush");
@ -1675,18 +1675,17 @@ void redis_push_error_hmset_callback(redisAsyncContext *c,
void redis_push_error(TableCallbackData *callback_data) {
DBHandle *db = callback_data->db_handle;
ErrorInfo *info = (ErrorInfo *) callback_data->data->Get();
RAY_CHECK(info->error_index < MAX_ERROR_INDEX && info->error_index >= 0);
/* Look up the error type. */
const char *error_type = error_types[info->error_index];
const char *error_message = error_messages[info->error_index];
RAY_CHECK(info->error_type < MAX_ERROR_INDEX && info->error_type >= 0);
/// Look up the error type.
const char *error_type = error_types[info->error_type];
/* Set the error information. */
int status = redisAsyncCommand(
db->context, redis_push_error_hmset_callback,
(void *) callback_data->timer_id,
"HMSET Error:%b:%b type %s message %s data %b", info->driver_id.data(),
sizeof(info->driver_id), info->error_key, sizeof(info->error_key),
error_type, error_message, info->data, info->data_length);
"HMSET Error:%b:%b type %s message %b data %b", info->driver_id.data(),
sizeof(info->driver_id), info->error_key.data(), sizeof(info->error_key),
error_type, info->error_message, info->size, "None", strlen("None"));
if ((status == REDIS_ERR) || db->subscribe_context->err) {
LOG_REDIS_DEBUG(db->subscribe_context, "error in redis_push_error hmset");
}

View file

@ -128,9 +128,13 @@ void kill_worker(LocalSchedulerState *state,
* error message to the driver responsible for the task. */
if (worker->task_in_progress != NULL && !cleanup && !suppress_warning) {
TaskSpec *spec = Task_task_execution_spec(worker->task_in_progress)->Spec();
TaskID task_id = TaskSpec_task_id(spec);
std::ostringstream error_message;
error_message << "The worker with ID " << worker->client_id << " died or "
<< "was killed while executing the task with ID "
<< TaskSpec_task_id(spec);
push_error(state->db, TaskSpec_driver_id(spec), WORKER_DIED_ERROR_INDEX,
sizeof(task_id), task_id.data());
error_message.str());
}
/* Clean up the task in progress. */
@ -753,20 +757,26 @@ void reconstruct_put_task_update_callback(Task *task,
* by `ray.put` was not able to be reconstructed, and the workload will
* likely hang. Push an error to the appropriate driver. */
TaskSpec *spec = Task_task_execution_spec(task)->Spec();
FunctionID function = TaskSpec_function(spec);
std::ostringstream error_message;
error_message << "The task with ID " << TaskSpec_task_id(spec)
<< " is still executing and so the object created by "
<< "ray.put could not be reconstructed.";
push_error(state->db, TaskSpec_driver_id(spec),
PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function),
function.data());
PUT_RECONSTRUCTION_ERROR_INDEX, error_message.str());
}
} else {
/* (1) The task is still executing and it is the driver task. We cannot
* restart the driver task, so the workload will hang. Push an error to
* the appropriate driver. */
TaskSpec *spec = Task_task_execution_spec(task)->Spec();
FunctionID function = TaskSpec_function(spec);
std::ostringstream error_message;
error_message << "The task with ID " << TaskSpec_task_id(spec)
<< " is a driver task and so the object created by ray.put "
<< "could not be reconstructed.";
push_error(state->db, TaskSpec_driver_id(spec),
PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function),
function.data());
PUT_RECONSTRUCTION_ERROR_INDEX, error_message.str());
}
} else {
/* The update to TASK_STATUS_RECONSTRUCTING succeeded, so continue with

View file

@ -7,6 +7,7 @@
#include "state/task_table.h"
#include "state/actor_notification_table.h"
#include "state/db_client_table.h"
#include "state/error_table.h"
#include "state/local_scheduler_table.h"
#include "state/object_table.h"
#include "local_scheduler_shared.h"
@ -857,6 +858,27 @@ void spillback_tasks_handler(LocalSchedulerState *state) {
for (int64_t i = 0; i < num_to_spillback; i++) {
it->IncrementSpillbackCount();
// If an actor hasn't been created for a while, push a warning to the
// driver.
if (it->SpillbackCount() %
RayConfig::instance().actor_creation_num_spillbacks_warning() ==
0) {
TaskSpec *spec = it->Spec();
if (TaskSpec_is_actor_creation_task(spec)) {
std::ostringstream error_message;
error_message << "The actor with ID "
<< TaskSpec_actor_creation_id(spec) << " is taking a "
<< "while to be created. It is possible that the "
<< "cluster does not have enough resources to place this "
<< "actor. Try reducing the number of actors created or "
<< "increasing the number of slots available by using "
<< "the --num-cpus, --num-gpus, and --resources flags.";
push_error(state->db, TaskSpec_driver_id(spec),
ACTOR_NOT_CREATED_ERROR_INDEX, error_message.str());
}
}
give_task_to_global_scheduler(state, algorithm_state, *it);
// Dequeue the task.
it = algorithm_state->dispatch_task_queue->erase(it);

View file

@ -1307,12 +1307,15 @@ void log_object_hash_mismatch_error_task_callback(Task *task,
RAY_CHECK(task != NULL);
PlasmaManagerState *state = (PlasmaManagerState *) user_context;
TaskSpec *spec = Task_task_execution_spec(task)->Spec();
FunctionID function = TaskSpec_function(spec);
/* Push the error to the Python driver that caused the nondeterministic task
* to be submitted. */
std::ostringstream error_message;
error_message << "An object created by the task with ID "
<< TaskSpec_task_id(spec) << " was created with a different "
<< "hash. This may mean that a non-deterministic task was "
<< "reexecuted.";
push_error(state->db, TaskSpec_driver_id(spec),
OBJECT_HASH_MISMATCH_ERROR_INDEX, sizeof(function),
function.data());
OBJECT_HASH_MISMATCH_ERROR_INDEX, error_message.str());
}
void log_object_hash_mismatch_error_result_callback(ObjectID object_id,

View file

@ -284,7 +284,7 @@ class WorkerDeath(unittest.TestCase):
wait_for_errors(b"worker_died", 1)
self.assertEqual(len(ray.error_info()), 1)
self.assertIn("A worker died or was killed while executing a task.",
self.assertIn("died or was killed while executing the task",
ray.error_info()[0][b"message"].decode("ascii"))
def testActorWorkerDying(self):

View file

@ -436,9 +436,6 @@ class ReconstructionTests(unittest.TestCase):
# Make sure all the errors have the correct type.
self.assertTrue(all(error[b"type"] == b"object_hash_mismatch"
for error in errors))
# Make sure all the errors have the correct function name.
self.assertTrue(all(error[b"data"] == b"__main__.foo"
for error in errors))
@unittest.skipIf(
os.environ.get('RAY_USE_NEW_GCS', False),
@ -485,7 +482,6 @@ class ReconstructionTests(unittest.TestCase):
errors = self.wait_for_errors(error_check)
self.assertTrue(all(error[b"type"] == b"put_reconstruction"
for error in errors))
self.assertTrue(all(error[b"data"] == b"Driver" for error in errors))
class ReconstructionTestsMultinode(ReconstructionTests):