diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 39bb7a5be..3f8baa009 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -1200,8 +1200,12 @@ void start_server(const char *node_ip_address, heartbeat_handler, g_state); } /* Create a timer for fetching queued tasks' missing object dependencies. */ - event_loop_add_timer(loop, LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS, + event_loop_add_timer(loop, kLocalSchedulerFetchTimeoutMilliseconds, fetch_object_timeout_handler, g_state); + /* Create a timer for initiating the reconstruction of tasks' missing object + * dependencies. */ + event_loop_add_timer(loop, kLocalSchedulerReconstructionTimeoutMilliseconds, + reconstruct_object_timeout_handler, g_state); /* Run event loop. */ event_loop_run(loop); } diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index f17b92c2b..46bd31525 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -22,12 +22,10 @@ struct TaskQueueEntry { /** A data structure used to track which objects are available locally and * which objects are being actively fetched. Objects of this type are used for - * both the scheduling algorithm state's local_objects and remot_objects + * both the scheduling algorithm state's local_objects and remote_objects * tables. An ObjectEntry should be in at most one of the tables and not both * simultaneously. */ struct ObjectEntry { - /** Object id of this object. */ - ObjectID object_id; /** A vector of tasks dependent on this object. These tasks are a subset of * the tasks in the waiting queue. Each element actually stores a reference * to the corresponding task's queue entry in waiting queue, for fast @@ -94,7 +92,7 @@ struct SchedulingAlgorithmState { std::unordered_map local_objects; /** A hash map of the objects that are not available locally. These are * currently being fetched by this local scheduler. The key is the object - * ID. Every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS, a Plasma fetch + * ID. Every kLocalSchedulerFetchTimeoutMilliseconds, a Plasma fetch * request will be sent the object IDs in this table. Each entry also holds * an array of queued tasks that are dependent on it. */ std::unordered_map remote_objects; @@ -446,7 +444,7 @@ void add_task_to_actor_queue(LocalSchedulerState *state, /** * Fetch a queued task's missing object dependency. The fetch request will be - * retried every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS until the object is + * retried every kLocalSchedulerFetchTimeoutMilliseconds until the object is * available locally. * * @param state The scheduler state. @@ -471,7 +469,6 @@ void fetch_missing_dependency(LocalSchedulerState *state, * the object becomes available locally. It will get freed if the object is * subsequently removed locally. */ ObjectEntry entry; - entry.object_id = obj_id; algorithm_state->remote_objects[obj_id] = entry; } algorithm_state->remote_objects[obj_id].dependent_tasks.push_back( @@ -480,7 +477,7 @@ void fetch_missing_dependency(LocalSchedulerState *state, /** * Fetch a queued task's missing object dependencies. The fetch requests will - * be retried every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS until all + * be retried every kLocalSchedulerFetchTimeoutMilliseconds until all * objects are available locally. * * @param state The scheduler state. @@ -541,32 +538,26 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { /* Only try the fetches if we are connected to the object store manager. */ if (!plasma_manager_is_connected(state->plasma_conn)) { LOG_INFO("Local scheduler is not connected to a object store manager"); - return LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS; + return kLocalSchedulerFetchTimeoutMilliseconds; } - /* Allocate a buffer to hold all the object IDs for active fetch requests. */ - int num_object_ids = state->algorithm_state->remote_objects.size(); - ObjectID *object_ids = (ObjectID *) malloc(num_object_ids * sizeof(ObjectID)); - - /* Fill out the request with the object IDs for active fetches. */ - int i = 0; + std::vector object_id_vec; for (auto const &entry : state->algorithm_state->remote_objects) { - object_ids[i] = entry.second.object_id; - i++; + object_id_vec.push_back(entry.first); } + + ObjectID *object_ids = object_id_vec.data(); + int64_t num_object_ids = object_id_vec.size(); + /* Divide very large fetch requests into smaller fetch requests so that a * single fetch request doesn't block the plasma manager for a long time. */ - int fetch_request_size = 10000; - for (int j = 0; j < num_object_ids; j += fetch_request_size) { + int64_t fetch_request_size = 10000; + for (int64_t j = 0; j < num_object_ids; j += fetch_request_size) { int num_objects_in_request = std::min(num_object_ids, j + fetch_request_size) - j; ARROW_CHECK_OK( state->plasma_conn->Fetch(num_objects_in_request, &object_ids[j])); } - for (int k = 0; k < num_object_ids; ++k) { - reconstruct_object(state, object_ids[k]); - } - free(object_ids); /* Print a warning if this method took too long. */ int64_t end_time = current_time_ms(); @@ -576,7 +567,64 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { end_time - start_time); } - return LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS; + /* Wait at least kLocalSchedulerFetchTimeoutMilliseconds before running + * this timeout handler again. But if we're waiting for a large number of + * objects, wait longer (e.g., 10 seconds for one million objects) so that we + * don't overwhelm the plasma manager. */ + return std::max(kLocalSchedulerFetchTimeoutMilliseconds, + int64_t(0.01 * num_object_ids)); +} + +/* TODO(swang): This method is not covered by any valgrind tests. */ +int reconstruct_object_timeout_handler(event_loop *loop, + timer_id id, + void *context) { + int64_t start_time = current_time_ms(); + + LocalSchedulerState *state = (LocalSchedulerState *) context; + + /* This vector is used to track which object IDs to reconstruct next. If the + * vector is empty, we repopulate it with all of the keys of the remote object + * table. During every pass through this handler, we call reconstruct on up to + * 10000 elements of the vector (after first checking that the object IDs are + * still missing). */ + static std::vector object_ids_to_reconstruct; + + /* If the set is empty, repopulate it. */ + if (object_ids_to_reconstruct.size() == 0) { + for (auto const &entry : state->algorithm_state->remote_objects) { + object_ids_to_reconstruct.push_back(entry.first); + } + } + + int64_t max_num_to_reconstruct = 10000; + int64_t num_reconstructed = 0; + for (int64_t i = 0; i < object_ids_to_reconstruct.size(); i++) { + ObjectID object_id = object_ids_to_reconstruct[i]; + /* Only call reconstruct if we are still missing the object. */ + if (state->algorithm_state->remote_objects.find(object_id) != + state->algorithm_state->remote_objects.end()) { + reconstruct_object(state, object_id); + } + num_reconstructed++; + if (num_reconstructed == max_num_to_reconstruct) { + break; + } + } + object_ids_to_reconstruct.erase( + object_ids_to_reconstruct.begin(), + object_ids_to_reconstruct.begin() + num_reconstructed); + + /* Print a warning if this method took too long. */ + int64_t end_time = current_time_ms(); + int64_t max_time_for_handler = 1000; + if (end_time - start_time > max_time_for_handler) { + LOG_WARN("reconstruct_object_timeout_handler took %" PRId64 + " milliseconds.", + end_time - start_time); + } + + return kLocalSchedulerReconstructionTimeoutMilliseconds; } /** @@ -1122,9 +1170,6 @@ void handle_object_available(LocalSchedulerState *state, /* Remove the object from the active fetch requests. */ entry = object_entry_it->second; algorithm_state->remote_objects.erase(object_id); - } else { - /* Create a new object entry. */ - entry.object_id = object_id; } /* Add the entry to the set of locally available objects. */ diff --git a/src/local_scheduler/local_scheduler_algorithm.h b/src/local_scheduler/local_scheduler_algorithm.h index f23b62c65..4eb6cbee9 100644 --- a/src/local_scheduler/local_scheduler_algorithm.h +++ b/src/local_scheduler/local_scheduler_algorithm.h @@ -6,9 +6,14 @@ #include "state/local_scheduler_table.h" /* The duration that the local scheduler will wait before reinitiating a fetch - * request for a missing task dependency. TODO(rkn): We may want this to be - * adaptable based on the load on the local scheduler. */ -#define LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS 1000 + * request for a missing task dependency. This time may adapt based on the + * number of missing task dependencies. */ +constexpr int64_t kLocalSchedulerFetchTimeoutMilliseconds = 1000; +/* The duration that the local scheduler will wait between initiating + * reconstruction calls for missing task dependencies. If there are many missing + * task dependencies, we will only iniate reconstruction calls for some of them + * each time. */ +constexpr int64_t kLocalSchedulerReconstructionTimeoutMilliseconds = 1000; /* ==== The scheduling algorithm ==== * @@ -272,7 +277,7 @@ void handle_driver_removed(LocalSchedulerState *state, /** * This function fetches queued task's missing object dependencies. It is - * called every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS. + * called every kLocalSchedulerFetchTimeoutMilliseconds. * * @param loop The local scheduler's event loop. * @param id The ID of the timer that triggers this function. @@ -282,6 +287,22 @@ void handle_driver_removed(LocalSchedulerState *state, */ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context); +/** + * This function initiates reconstruction for task's missing object + * dependencies. It is called every + * kLocalSchedulerReconstructionTimeoutMilliseconds, but it may not initiate + * reconstruction for every missing object. + * + * @param loop The local scheduler's event loop. + * @param id The ID of the timer that triggers this function. + * @param context The function's context. + * @return An integer representing the time interval in seconds before the + * next invocation of the function. + */ +int reconstruct_object_timeout_handler(event_loop *loop, + timer_id id, + void *context); + /** * A helper function to print debug information about the current state and * number of workers. diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index d1c3c7fba..7a96c9788 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -1022,9 +1022,9 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { } free(object_ids_to_request); - /* Wait at least MANAGER_TIMEOUT before sending running this timeout handler - * again. But if we're waiting for a large number of objects, wait longer - * (e.g., 10 seconds for one million objects) so that we don't overwhelm other + /* Wait at least MANAGER_TIMEOUT before running this timeout handler again. + * But if we're waiting for a large number of objects, wait longer (e.g., 10 + * seconds for one million objects) so that we don't overwhelm other * components like Redis with too many requests (and so that we don't * overwhelm this manager with responses). */ return std::max(MANAGER_TIMEOUT, int(0.01 * num_object_ids));