diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index e69d21208..62d814a26 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -966,18 +966,16 @@ void redis_task_table_update_callback(redisAsyncContext *c, // task table update can race with the liveness monitor. Do not retry the // update unless the caller is sure that the receiving subscriber is still // alive in the db_client table. - if (reply->type == REDIS_REPLY_ERROR && - strcmp(reply->str, "No subscribers received message.") == 0) { - RAY_LOG(WARNING) << "No subscribers received the task_table_update " - << "message."; + if (reply->type == REDIS_REPLY_ERROR) { + RAY_LOG(WARNING) << "task_table_update failed with " << reply->str; if (callback_data->retry.fail_callback != NULL) { callback_data->retry.fail_callback(callback_data->id, callback_data->user_context, callback_data->data->Get()); + } else { + RAY_LOG(FATAL) << "task_table_update failed and no fail_callback is set"; } } else { - RAY_CHECK(reply->type != REDIS_REPLY_ERROR) << "reply->str is " - << reply->str; RAY_CHECK(strcmp(reply->str, "OK") == 0) << "reply->str is " << reply->str; /* Call the done callback if there is one. */ diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index b68490890..241d3e4fe 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -565,6 +565,11 @@ void assign_task_to_worker(LocalSchedulerState *state, } } +// This is used to allow task_table_update to fail. +void allow_task_table_update_failure(UniqueID id, + void *user_context, + void *user_data) {} + void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { if (worker->task_in_progress != NULL) { TaskSpec *spec = Task_task_execution_spec(worker->task_in_progress)->Spec(); @@ -622,7 +627,16 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) { int task_state = TASK_STATUS_DONE; Task_set_state(worker->task_in_progress, task_state); #if !RAY_USE_NEW_GCS - task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL); + auto retryInfo = RetryInfo{ + .num_retries = 0, // This value is unused. + .timeout = 0, // This value is unused. + .fail_callback = allow_task_table_update_failure, + }; + + // We allow this call to fail in case the driver has been removed and the + // task table entries have already been cleaned up by the monitor. + task_table_update(state->db, worker->task_in_progress, &retryInfo, NULL, + NULL); #else RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, worker->task_in_progress)); Task_free(worker->task_in_progress);