Fix actor bug arising from overwriting task specifications in the local scheduler (#513)

* copy task specifications put into the actor task cache so it won't get overwritten when the scheduler receives the next task

* cleanup

* cleanup and fix

* linting

* fix jenkins test

* fix linting
This commit is contained in:
Philipp Moritz 2017-05-06 17:39:35 -07:00 committed by Robert Nishihara
parent 8532ba4272
commit 1dddd5336a
6 changed files with 25 additions and 9 deletions

View file

@ -1729,7 +1729,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
return ready_ids, remaining_ids
def wait_for_function(function_id, driver_id, timeout=5, worker=global_worker):
def wait_for_function(function_id, driver_id, timeout=10,
worker=global_worker):
"""Wait until the function to be executed is present on this worker.
This method will simply loop until the import thread has imported the

View file

@ -99,8 +99,7 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) {
PyTask *result = PyObject_New(PyTask, &PyTaskType);
result = (PyTask *) PyObject_Init((PyObject *) result, &PyTaskType);
result->size = size;
result->spec = (TaskSpec *) malloc(size);
memcpy(result->spec, data, size);
result->spec = TaskSpec_copy((TaskSpec *) data, size);
/* TODO(pcm): Use flatbuffers validation here. */
return (PyObject *) result;
}

View file

@ -288,6 +288,12 @@ bool TaskSpec_is_dependent_on(TaskSpec *spec, ObjectID object_id) {
return false;
}
TaskSpec *TaskSpec_copy(TaskSpec *spec, int64_t task_spec_size) {
TaskSpec *copy = (TaskSpec *) malloc(task_spec_size);
memcpy(copy, spec, task_spec_size);
return copy;
}
void TaskSpec_free(TaskSpec *spec) {
free(spec);
}

View file

@ -291,6 +291,15 @@ ObjectID task_compute_put_id(TaskID task_id, int64_t put_index);
*/
void TaskSpec_print(TaskSpec *spec, UT_string *output);
/**
* Create a copy of the task spec. Must be freed with TaskSpec_free after use.
*
* @param spec The task specification that will be copied.
* @param task_spec_size The size of the task specification in bytes.
* @returns Pointer to the copy of the task specification.
*/
TaskSpec *TaskSpec_copy(TaskSpec *spec, int64_t task_spec_size);
/**
* Free a task_spec.
*
@ -355,7 +364,7 @@ Task *Task_alloc(TaskSpec *spec,
DBClientID local_scheduler_id);
/**
* Create a copy of the task. Must be freed with free_task after use.
* Create a copy of the task. Must be freed with Task_free after use.
*
* @param other The task that will be copied.
* @returns Pointer to the copy of the task.

View file

@ -112,8 +112,7 @@ struct SchedulingAlgorithmState {
TaskQueueEntry TaskQueueEntry_init(TaskSpec *spec, int64_t task_spec_size) {
TaskQueueEntry elt;
elt.spec = (TaskSpec *) malloc(task_spec_size);
memcpy(elt.spec, spec, task_spec_size);
elt.spec = TaskSpec_copy(spec, task_spec_size);
elt.task_spec_size = task_spec_size;
return elt;
}
@ -833,8 +832,9 @@ void handle_task_submitted(LocalSchedulerState *state,
void handle_actor_task_submitted(LocalSchedulerState *state,
SchedulingAlgorithmState *algorithm_state,
TaskSpec *spec,
TaskSpec *task_spec,
int64_t task_spec_size) {
TaskSpec *spec = TaskSpec_copy(task_spec, task_spec_size);
ActorID actor_id = TaskSpec_actor_id(spec);
CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID));
@ -865,6 +865,7 @@ void handle_actor_task_submitted(LocalSchedulerState *state,
state, algorithm_state, spec, task_spec_size,
state->actor_mapping[actor_id].local_scheduler_id);
}
TaskSpec_free(spec);
}
void handle_actor_creation_notification(

View file

@ -92,8 +92,8 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn,
auto reply_message = flatbuffers::GetRoot<GetTaskReply>(message);
/* Create a copy of the task spec so we can free the reply. */
*task_size = reply_message->task_spec()->size();
TaskSpec *spec = (TaskSpec *) malloc(*task_size);
memcpy(spec, reply_message->task_spec()->data(), *task_size);
TaskSpec *data = (TaskSpec *) reply_message->task_spec()->data();
TaskSpec *spec = TaskSpec_copy(data, *task_size);
/* Free the original message from the local scheduler. */
free(message);
/* Return the copy of the task spec and pass ownership to the caller. */