1
0
Fork 0
mirror of https://github.com/vale981/ray synced 2025-03-12 22:26:39 -04:00
ray/src/common/state/task_table.cc
Stephanie Wang 083e7a28ad Push an error to the driver when the workload hangs on ray.put reconstruction ()
* Fix worker blocked bug

* tmp

* Push an error to the driver on ray.put for non-driver tasks

* Fix result table tests

* Fix test, logging

* Address comments

* Fix suppression bug

* Fix redis module test

* Edit error message

* Get values in chunks during reconstruction

* Test case for driver ray.put errors

* Error for evicting ray.put objects from the driver

* Fix tests

* Reduce verbosity

* Documentation
2017-03-21 00:16:48 -07:00

74 lines
3.1 KiB
C++

#include "task_table.h"
#include "redis.h"
#define NUM_DB_REQUESTS 2
void task_table_get_task(DBHandle *db_handle,
TaskID task_id,
RetryInfo *retry,
task_table_get_callback get_callback,
void *user_context) {
init_table_callback(db_handle, task_id, __func__, NULL, retry,
(void *) get_callback, redis_task_table_get_task,
user_context);
}
void task_table_add_task(DBHandle *db_handle,
OWNER Task *task,
RetryInfo *retry,
task_table_done_callback done_callback,
void *user_context) {
init_table_callback(db_handle, Task_task_id(task), __func__, task, retry,
(table_done_callback) done_callback,
redis_task_table_add_task, user_context);
}
void task_table_update(DBHandle *db_handle,
OWNER Task *task,
RetryInfo *retry,
task_table_done_callback done_callback,
void *user_context) {
init_table_callback(db_handle, Task_task_id(task), __func__, task, retry,
(table_done_callback) done_callback,
redis_task_table_update, user_context);
}
void task_table_test_and_update(
DBHandle *db_handle,
TaskID task_id,
int test_state_bitmask,
int update_state,
RetryInfo *retry,
task_table_test_and_update_callback done_callback,
void *user_context) {
TaskTableTestAndUpdateData *update_data =
(TaskTableTestAndUpdateData *) malloc(sizeof(TaskTableTestAndUpdateData));
update_data->test_state_bitmask = test_state_bitmask;
update_data->update_state = update_state;
/* Update the task entry's local scheduler with this client's ID. */
update_data->local_scheduler_id = db_handle->client;
init_table_callback(db_handle, task_id, __func__, update_data, retry,
(table_done_callback) done_callback,
redis_task_table_test_and_update, user_context);
}
/* TODO(swang): A corresponding task_table_unsubscribe. */
void task_table_subscribe(DBHandle *db_handle,
DBClientID local_scheduler_id,
int state_filter,
task_table_subscribe_callback subscribe_callback,
void *subscribe_context,
RetryInfo *retry,
task_table_done_callback done_callback,
void *user_context) {
TaskTableSubscribeData *sub_data =
(TaskTableSubscribeData *) malloc(sizeof(TaskTableSubscribeData));
sub_data->local_scheduler_id = local_scheduler_id;
sub_data->state_filter = state_filter;
sub_data->subscribe_callback = subscribe_callback;
sub_data->subscribe_context = subscribe_context;
init_table_callback(db_handle, local_scheduler_id, __func__, sub_data, retry,
(table_done_callback) done_callback,
redis_task_table_subscribe, user_context);
}