Remove new gcs code from legacy Ray codepath (#2329)

This commit is contained in:
Philipp Moritz 2018-07-03 11:48:50 -07:00 committed by Robert Nishihara
parent 178346fa16
commit f21d783e6d
6 changed files with 2 additions and 149 deletions

View file

@ -1162,13 +1162,9 @@ void redis_task_table_subscribe(TableCallbackData *callback_data) {
/* TASK_CHANNEL_PREFIX is defined in ray_redis_module.cc and must be kept in
* sync with that file. */
const char *TASK_CHANNEL_PREFIX = "TT:";
#if !RAY_USE_NEW_GCS
for (auto subscribe_context : db->subscribe_contexts) {
#else
/* In the new code path, subscriptions currently go through the
* primary redis shard. */
for (auto subscribe_context : {db->subscribe_context}) {
#endif
for (auto subscribe_context : db->subscribe_contexts) {
int status;
if (data->local_scheduler_id.is_nil()) {
/* TODO(swang): Implement the state_filter by translating the bitmask into

View file

@ -41,7 +41,6 @@ void assign_task_to_local_scheduler_retry(UniqueID id,
return;
}
#if !RAY_USE_NEW_GCS
// The local scheduler is still alive. The failure is most likely due to the
// task assignment getting published before the local scheduler subscribed to
// the channel. Retry the assignment.
@ -51,9 +50,6 @@ void assign_task_to_local_scheduler_retry(UniqueID id,
.fail_callback = assign_task_to_local_scheduler_retry,
};
task_table_update(state->db, Task_copy(task), &retryInfo, NULL, user_context);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
#endif
}
/**
@ -76,16 +72,12 @@ void assign_task_to_local_scheduler(GlobalSchedulerState *state,
RAY_LOG(DEBUG) << "Issuing a task table update for task = "
<< Task_task_id(task);
#if !RAY_USE_NEW_GCS
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
.timeout = 0, // This value is unused.
.fail_callback = assign_task_to_local_scheduler_retry,
};
task_table_update(state->db, Task_copy(task), &retryInfo, NULL, state);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
#endif
/* Update the object table info to reflect the fact that the results of this
* task will be created on the machine that the task was assigned to. This can

View file

@ -144,12 +144,7 @@ void kill_worker(LocalSchedulerState *state,
/* Update the task table to reflect that the task failed to complete. */
if (state->db != NULL) {
Task_set_state(worker->task_in_progress, TaskStatus::LOST);
#if !RAY_USE_NEW_GCS
task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, worker->task_in_progress));
Task_free(worker->task_in_progress);
#endif
} else {
Task_free(worker->task_in_progress);
}
@ -558,12 +553,7 @@ void assign_task_to_worker(LocalSchedulerState *state,
worker->task_in_progress = Task_copy(task);
/* Update the global task table. */
if (state->db != NULL) {
#if !RAY_USE_NEW_GCS
task_table_update(state->db, task, NULL, NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
Task_free(task);
#endif
} else {
Task_free(task);
}
@ -630,7 +620,6 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
/* Update control state tables. */
TaskStatus task_state = TaskStatus::DONE;
Task_set_state(worker->task_in_progress, task_state);
#if !RAY_USE_NEW_GCS
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
.timeout = 0, // This value is unused.
@ -641,10 +630,6 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
// task table entries have already been cleaned up by the monitor.
task_table_update(state->db, worker->task_in_progress, &retryInfo, NULL,
NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, worker->task_in_progress));
Task_free(worker->task_in_progress);
#endif
} else {
Task_free(worker->task_in_progress);
}
@ -692,22 +677,10 @@ void reconstruct_task_update_callback(Task *task,
/* (2) The current local scheduler for the task is dead. The task is
* lost, but the task table hasn't received the update yet. Retry the
* test-and-set. */
#if !RAY_USE_NEW_GCS
task_table_test_and_update(state->db, Task_task_id(task),
current_local_scheduler_id, Task_state(task),
TaskStatus::RECONSTRUCTING, NULL,
reconstruct_task_update_callback, state);
#else
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
&state->gcs_client, Task_task_id(task), current_local_scheduler_id,
static_cast<SchedulingState>(Task_state(task)),
SchedulingState::RECONSTRUCTING,
[task, user_context](gcs::AsyncGcsClient *, const ray::TaskID &,
const TaskTableDataT &t, bool updated) {
reconstruct_task_update_callback(task, user_context, updated);
}));
Task_free(task);
#endif
}
}
/* The test-and-set failed, so it is not safe to resubmit the task for
@ -751,22 +724,10 @@ void reconstruct_put_task_update_callback(Task *task,
/* (2) The current local scheduler for the task is dead. The task is
* lost, but the task table hasn't received the update yet. Retry the
* test-and-set. */
#if !RAY_USE_NEW_GCS
task_table_test_and_update(state->db, Task_task_id(task),
current_local_scheduler_id, Task_state(task),
TaskStatus::RECONSTRUCTING, NULL,
reconstruct_put_task_update_callback, state);
#else
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
&state->gcs_client, Task_task_id(task), current_local_scheduler_id,
static_cast<SchedulingState>(Task_state(task)),
SchedulingState::RECONSTRUCTING,
[task, user_context](gcs::AsyncGcsClient *, const ray::TaskID &,
const TaskTableDataT &, bool updated) {
reconstruct_put_task_update_callback(task, user_context, updated);
}));
Task_free(task);
#endif
} else if (Task_state(task) == TaskStatus::RUNNING) {
/* (1) The task is still executing on a live node. The object created
* by `ray.put` was not able to be reconstructed, and the workload will
@ -821,27 +782,10 @@ void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id,
}
/* If there are no other instances of the task running, it's safe for us to
* claim responsibility for reconstruction. */
#if !RAY_USE_NEW_GCS
task_table_test_and_update(state->db, task_id, DBClientID::nil(),
(TaskStatus::DONE | TaskStatus::LOST),
TaskStatus::RECONSTRUCTING, NULL, done_callback,
state);
#else
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
&state->gcs_client, task_id, DBClientID::nil(),
static_cast<SchedulingState>(static_cast<uint>(SchedulingState::DONE) |
static_cast<uint>(SchedulingState::LOST)),
SchedulingState::RECONSTRUCTING,
[done_callback, state](gcs::AsyncGcsClient *, const ray::TaskID &,
const TaskTableDataT &t, bool updated) {
Task *task = Task_alloc(t.task_info.data(), t.task_info.size(),
static_cast<TaskStatus>(t.scheduling_state),
DBClientID::from_binary(t.scheduler_id),
std::vector<ObjectID>());
done_callback(task, state, updated);
Task_free(task);
}));
#endif
}
void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id,
@ -860,24 +804,9 @@ void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id,
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
/* If the task failed to finish, it's safe for us to claim responsibility for
* reconstruction. */
#if !RAY_USE_NEW_GCS
task_table_test_and_update(state->db, task_id, DBClientID::nil(),
TaskStatus::LOST, TaskStatus::RECONSTRUCTING, NULL,
reconstruct_task_update_callback, state);
#else
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
&state->gcs_client, task_id, DBClientID::nil(), SchedulingState::LOST,
SchedulingState::RECONSTRUCTING,
[state](gcs::AsyncGcsClient *, const ray::TaskID &,
const TaskTableDataT &t, bool updated) {
Task *task = Task_alloc(t.task_info.data(), t.task_info.size(),
static_cast<TaskStatus>(t.scheduling_state),
DBClientID::from_binary(t.scheduler_id),
std::vector<ObjectID>());
reconstruct_task_update_callback(task, state, updated);
Task_free(task);
}));
#endif
}
void reconstruct_object_lookup_callback(

View file

@ -389,16 +389,11 @@ void finish_killed_task(LocalSchedulerState *state,
if (state->db != NULL) {
Task *task = Task_alloc(execution_spec, TaskStatus::DONE,
get_db_client_id(state->db));
#if !RAY_USE_NEW_GCS
// In most cases, task_table_update would be appropriate, however, it is
// possible in some cases that the task has not yet been added to the task
// table (e.g., if it is an actor task that is queued locally because the
// actor has not been created yet).
task_table_add_task(state->db, task, NULL, NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
Task_free(task);
#endif
}
}
@ -507,22 +502,12 @@ void queue_actor_task(LocalSchedulerState *state,
if (from_global_scheduler) {
/* If the task is from the global scheduler, it's already been added to
* the task table, so just update the entry. */
#if !RAY_USE_NEW_GCS
task_table_update(state->db, task, NULL, NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
Task_free(task);
#endif
} else {
/* Otherwise, this is the first time the task has been seen in the
* system (unless it's a resubmission of a previous task), so add the
* entry. */
#if !RAY_USE_NEW_GCS
task_table_add_task(state->db, task, NULL, NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
Task_free(task);
#endif
}
}
@ -1002,7 +987,6 @@ std::list<TaskExecutionSpec>::iterator queue_task(
if (state->db != NULL) {
Task *task =
Task_alloc(task_entry, TaskStatus::QUEUED, get_db_client_id(state->db));
#if !RAY_USE_NEW_GCS
if (from_global_scheduler) {
/* If the task is from the global scheduler, it's already been added to
* the task table, so just update the entry. */
@ -1012,10 +996,6 @@ std::list<TaskExecutionSpec>::iterator queue_task(
* (unless it's a resubmission of a previous task), so add the entry. */
task_table_add_task(state->db, task, NULL, NULL, NULL);
}
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
Task_free(task);
#endif
}
/* Copy the spec and add it to the task queue. The allocated spec will be
@ -1204,7 +1184,6 @@ void give_task_to_local_scheduler(LocalSchedulerState *state,
RAY_CHECK(state->config.global_scheduler_exists);
Task *task =
Task_alloc(execution_spec, TaskStatus::SCHEDULED, local_scheduler_id);
#if !RAY_USE_NEW_GCS
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
.timeout = 0, // This value is unused.
@ -1212,10 +1191,6 @@ void give_task_to_local_scheduler(LocalSchedulerState *state,
};
task_table_add_task(state->db, task, &retryInfo, NULL, state);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
Task_free(task);
#endif
}
void give_task_to_global_scheduler_retry(UniqueID id,
@ -1252,7 +1227,6 @@ void give_task_to_global_scheduler(LocalSchedulerState *state,
RAY_CHECK(state->config.global_scheduler_exists);
Task *task = Task_alloc(execution_spec, TaskStatus::WAITING,
get_db_client_id(state->db));
#if !RAY_USE_NEW_GCS
RAY_CHECK(state->db != NULL);
auto retryInfo = RetryInfo{
.num_retries = 0, // This value is unused.
@ -1260,10 +1234,6 @@ void give_task_to_global_scheduler(LocalSchedulerState *state,
.fail_callback = give_task_to_global_scheduler_retry,
};
task_table_add_task(state->db, task, &retryInfo, NULL, state);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
Task_free(task);
#endif
}
bool resource_constraints_satisfied(LocalSchedulerState *state,
@ -1338,16 +1308,11 @@ void handle_actor_task_submitted(LocalSchedulerState *state,
algorithm_state->cached_submitted_actor_tasks.push_back(
std::move(task_entry));
#if !RAY_USE_NEW_GCS
// Even if the task can't be assigned to a worker yet, we should still write
// it to the task table. TODO(rkn): There's no need to do this more than
// once, and we could run into problems if we have very large numbers of
// tasks in this cache.
task_table_add_task(state->db, task, NULL, NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
Task_free(task);
#endif
return;
}

View file

@ -233,14 +233,8 @@ TEST object_reconstruction_test(void) {
Task *task = Task_alloc(
execution_spec, TaskStatus::DONE,
get_db_client_id(local_scheduler->local_scheduler_state->db));
#if !RAY_USE_NEW_GCS
task_table_add_task(local_scheduler->local_scheduler_state->db, task, NULL,
NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(
&local_scheduler->local_scheduler_state->gcs_client, task));
Task_free(task);
#endif
/* Trigger reconstruction, and run the event loop again. */
ObjectID return_id = TaskSpec_return(spec, 0);
@ -355,14 +349,8 @@ TEST object_reconstruction_recursive_test(void) {
Task *last_task = Task_alloc(
specs[NUM_TASKS - 1], TaskStatus::DONE,
get_db_client_id(local_scheduler->local_scheduler_state->db));
#if !RAY_USE_NEW_GCS
task_table_add_task(local_scheduler->local_scheduler_state->db, last_task,
NULL, NULL, NULL);
#else
RAY_CHECK_OK(TaskTableAdd(
&local_scheduler->local_scheduler_state->gcs_client, last_task));
Task_free(last_task);
#endif
/* Simulate eviction of the objects, so that reconstruction is required. */
for (int i = 0; i < NUM_TASKS; ++i) {
ObjectID return_id = TaskSpec_return(specs[i].Spec(), 0);

View file

@ -1321,26 +1321,9 @@ void log_object_hash_mismatch_error_result_callback(ObjectID object_id,
void *user_context) {
RAY_CHECK(!task_id.is_nil());
PlasmaManagerState *state = (PlasmaManagerState *) user_context;
/* Get the specification for the nondeterministic task. */
#if !RAY_USE_NEW_GCS
/* Get the specification for the nondeterministic task. */
task_table_get_task(state->db, task_id, NULL,
log_object_hash_mismatch_error_task_callback, state);
#else
RAY_CHECK_OK(state->gcs_client.task_table().Lookup(
ray::JobID::nil(), task_id,
[user_context](gcs::AsyncGcsClient *, const TaskID &,
const TaskTableDataT &t) {
Task *task = Task_alloc(t.task_info.data(), t.task_info.size(),
static_cast<TaskStatus>(t.scheduling_state),
DBClientID::from_binary(t.scheduler_id),
std::vector<ObjectID>());
log_object_hash_mismatch_error_task_callback(task, user_context);
Task_free(task);
},
[](gcs::AsyncGcsClient *, const TaskID &) {
// TODO(pcmoritz): Handle failure.
}));
#endif
}
void log_object_hash_mismatch_error_object_callback(ObjectID object_id,