Don't reconstruct all objects in every fetch request in local scheduler. (#686)

* Don't reconstruct all objects in every fetch request in local scheduler.

* Separate out fetch timer and reconstruction timer.

* Fix bug.

* Bug fix.

* Fix naming convention for global variables.

* Address comments.

* Make reconstruct_counter a static variable.

* Fix linting.

* Redo reconstruct handler using a set of objects to fetch.

* Fix linting.

* Replace set with vector.
This commit is contained in:
Robert Nishihara 2017-06-23 14:08:02 -07:00 committed by Philipp Moritz
parent e16df6da9a
commit ad480f8165
4 changed files with 104 additions and 34 deletions

View file

@ -1200,8 +1200,12 @@ void start_server(const char *node_ip_address,
heartbeat_handler, g_state);
}
/* Create a timer for fetching queued tasks' missing object dependencies. */
event_loop_add_timer(loop, LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS,
event_loop_add_timer(loop, kLocalSchedulerFetchTimeoutMilliseconds,
fetch_object_timeout_handler, g_state);
/* Create a timer for initiating the reconstruction of tasks' missing object
* dependencies. */
event_loop_add_timer(loop, kLocalSchedulerReconstructionTimeoutMilliseconds,
reconstruct_object_timeout_handler, g_state);
/* Run event loop. */
event_loop_run(loop);
}

View file

@ -22,12 +22,10 @@ struct TaskQueueEntry {
/** A data structure used to track which objects are available locally and
* which objects are being actively fetched. Objects of this type are used for
* both the scheduling algorithm state's local_objects and remot_objects
* both the scheduling algorithm state's local_objects and remote_objects
* tables. An ObjectEntry should be in at most one of the tables and not both
* simultaneously. */
struct ObjectEntry {
/** Object id of this object. */
ObjectID object_id;
/** A vector of tasks dependent on this object. These tasks are a subset of
* the tasks in the waiting queue. Each element actually stores a reference
* to the corresponding task's queue entry in waiting queue, for fast
@ -94,7 +92,7 @@ struct SchedulingAlgorithmState {
std::unordered_map<ObjectID, ObjectEntry, UniqueIDHasher> local_objects;
/** A hash map of the objects that are not available locally. These are
* currently being fetched by this local scheduler. The key is the object
* ID. Every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS, a Plasma fetch
* ID. Every kLocalSchedulerFetchTimeoutMilliseconds, a Plasma fetch
* request will be sent the object IDs in this table. Each entry also holds
* an array of queued tasks that are dependent on it. */
std::unordered_map<ObjectID, ObjectEntry, UniqueIDHasher> remote_objects;
@ -446,7 +444,7 @@ void add_task_to_actor_queue(LocalSchedulerState *state,
/**
* Fetch a queued task's missing object dependency. The fetch request will be
* retried every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS until the object is
* retried every kLocalSchedulerFetchTimeoutMilliseconds until the object is
* available locally.
*
* @param state The scheduler state.
@ -471,7 +469,6 @@ void fetch_missing_dependency(LocalSchedulerState *state,
* the object becomes available locally. It will get freed if the object is
* subsequently removed locally. */
ObjectEntry entry;
entry.object_id = obj_id;
algorithm_state->remote_objects[obj_id] = entry;
}
algorithm_state->remote_objects[obj_id].dependent_tasks.push_back(
@ -480,7 +477,7 @@ void fetch_missing_dependency(LocalSchedulerState *state,
/**
* Fetch a queued task's missing object dependencies. The fetch requests will
* be retried every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS until all
* be retried every kLocalSchedulerFetchTimeoutMilliseconds until all
* objects are available locally.
*
* @param state The scheduler state.
@ -541,32 +538,26 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
/* Only try the fetches if we are connected to the object store manager. */
if (!plasma_manager_is_connected(state->plasma_conn)) {
LOG_INFO("Local scheduler is not connected to a object store manager");
return LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS;
return kLocalSchedulerFetchTimeoutMilliseconds;
}
/* Allocate a buffer to hold all the object IDs for active fetch requests. */
int num_object_ids = state->algorithm_state->remote_objects.size();
ObjectID *object_ids = (ObjectID *) malloc(num_object_ids * sizeof(ObjectID));
/* Fill out the request with the object IDs for active fetches. */
int i = 0;
std::vector<ObjectID> object_id_vec;
for (auto const &entry : state->algorithm_state->remote_objects) {
object_ids[i] = entry.second.object_id;
i++;
object_id_vec.push_back(entry.first);
}
ObjectID *object_ids = object_id_vec.data();
int64_t num_object_ids = object_id_vec.size();
/* Divide very large fetch requests into smaller fetch requests so that a
* single fetch request doesn't block the plasma manager for a long time. */
int fetch_request_size = 10000;
for (int j = 0; j < num_object_ids; j += fetch_request_size) {
int64_t fetch_request_size = 10000;
for (int64_t j = 0; j < num_object_ids; j += fetch_request_size) {
int num_objects_in_request =
std::min(num_object_ids, j + fetch_request_size) - j;
ARROW_CHECK_OK(
state->plasma_conn->Fetch(num_objects_in_request, &object_ids[j]));
}
for (int k = 0; k < num_object_ids; ++k) {
reconstruct_object(state, object_ids[k]);
}
free(object_ids);
/* Print a warning if this method took too long. */
int64_t end_time = current_time_ms();
@ -576,7 +567,64 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
end_time - start_time);
}
return LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS;
/* Wait at least kLocalSchedulerFetchTimeoutMilliseconds before running
* this timeout handler again. But if we're waiting for a large number of
* objects, wait longer (e.g., 10 seconds for one million objects) so that we
* don't overwhelm the plasma manager. */
return std::max(kLocalSchedulerFetchTimeoutMilliseconds,
int64_t(0.01 * num_object_ids));
}
/* TODO(swang): This method is not covered by any valgrind tests. */
int reconstruct_object_timeout_handler(event_loop *loop,
timer_id id,
void *context) {
int64_t start_time = current_time_ms();
LocalSchedulerState *state = (LocalSchedulerState *) context;
/* This vector is used to track which object IDs to reconstruct next. If the
* vector is empty, we repopulate it with all of the keys of the remote object
* table. During every pass through this handler, we call reconstruct on up to
* 10000 elements of the vector (after first checking that the object IDs are
* still missing). */
static std::vector<ObjectID> object_ids_to_reconstruct;
/* If the set is empty, repopulate it. */
if (object_ids_to_reconstruct.size() == 0) {
for (auto const &entry : state->algorithm_state->remote_objects) {
object_ids_to_reconstruct.push_back(entry.first);
}
}
int64_t max_num_to_reconstruct = 10000;
int64_t num_reconstructed = 0;
for (int64_t i = 0; i < object_ids_to_reconstruct.size(); i++) {
ObjectID object_id = object_ids_to_reconstruct[i];
/* Only call reconstruct if we are still missing the object. */
if (state->algorithm_state->remote_objects.find(object_id) !=
state->algorithm_state->remote_objects.end()) {
reconstruct_object(state, object_id);
}
num_reconstructed++;
if (num_reconstructed == max_num_to_reconstruct) {
break;
}
}
object_ids_to_reconstruct.erase(
object_ids_to_reconstruct.begin(),
object_ids_to_reconstruct.begin() + num_reconstructed);
/* Print a warning if this method took too long. */
int64_t end_time = current_time_ms();
int64_t max_time_for_handler = 1000;
if (end_time - start_time > max_time_for_handler) {
LOG_WARN("reconstruct_object_timeout_handler took %" PRId64
" milliseconds.",
end_time - start_time);
}
return kLocalSchedulerReconstructionTimeoutMilliseconds;
}
/**
@ -1122,9 +1170,6 @@ void handle_object_available(LocalSchedulerState *state,
/* Remove the object from the active fetch requests. */
entry = object_entry_it->second;
algorithm_state->remote_objects.erase(object_id);
} else {
/* Create a new object entry. */
entry.object_id = object_id;
}
/* Add the entry to the set of locally available objects. */

View file

@ -6,9 +6,14 @@
#include "state/local_scheduler_table.h"
/* The duration that the local scheduler will wait before reinitiating a fetch
* request for a missing task dependency. TODO(rkn): We may want this to be
* adaptable based on the load on the local scheduler. */
#define LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS 1000
* request for a missing task dependency. This time may adapt based on the
* number of missing task dependencies. */
constexpr int64_t kLocalSchedulerFetchTimeoutMilliseconds = 1000;
/* The duration that the local scheduler will wait between initiating
* reconstruction calls for missing task dependencies. If there are many missing
* task dependencies, we will only iniate reconstruction calls for some of them
* each time. */
constexpr int64_t kLocalSchedulerReconstructionTimeoutMilliseconds = 1000;
/* ==== The scheduling algorithm ====
*
@ -272,7 +277,7 @@ void handle_driver_removed(LocalSchedulerState *state,
/**
* This function fetches queued task's missing object dependencies. It is
* called every LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS.
* called every kLocalSchedulerFetchTimeoutMilliseconds.
*
* @param loop The local scheduler's event loop.
* @param id The ID of the timer that triggers this function.
@ -282,6 +287,22 @@ void handle_driver_removed(LocalSchedulerState *state,
*/
int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context);
/**
* This function initiates reconstruction for task's missing object
* dependencies. It is called every
* kLocalSchedulerReconstructionTimeoutMilliseconds, but it may not initiate
* reconstruction for every missing object.
*
* @param loop The local scheduler's event loop.
* @param id The ID of the timer that triggers this function.
* @param context The function's context.
* @return An integer representing the time interval in seconds before the
* next invocation of the function.
*/
int reconstruct_object_timeout_handler(event_loop *loop,
timer_id id,
void *context);
/**
* A helper function to print debug information about the current state and
* number of workers.

View file

@ -1022,9 +1022,9 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) {
}
free(object_ids_to_request);
/* Wait at least MANAGER_TIMEOUT before sending running this timeout handler
* again. But if we're waiting for a large number of objects, wait longer
* (e.g., 10 seconds for one million objects) so that we don't overwhelm other
/* Wait at least MANAGER_TIMEOUT before running this timeout handler again.
* But if we're waiting for a large number of objects, wait longer (e.g., 10
* seconds for one million objects) so that we don't overwhelm other
* components like Redis with too many requests (and so that we don't
* overwhelm this manager with responses). */
return std::max(MANAGER_TIMEOUT, int(0.01 * num_object_ids));