mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Divide large plasma requests into smaller chunks, and wait longer before reissuing large requests. (#678)
* Divide large get requests into smaller chunks. * Divide fetches into smaller chunks. * Wait longer in worker and manager before reissuing fetch requests if there are many outstanding fetch requests. * Log warning if a handler in the local scheduler or plasma manager takes more than one second.
This commit is contained in:
parent
4d5ac9dad5
commit
f12db5f0e2
3 changed files with 56 additions and 12 deletions
|
@ -350,8 +350,14 @@ class Worker(object):
|
|||
warning_sent = False
|
||||
while True:
|
||||
try:
|
||||
results = ray.numbuf.retrieve_list(
|
||||
object_ids,
|
||||
# We divide very large get requests into smaller get requests so that
|
||||
# a single get request doesn't block the store for a long time, if the
|
||||
# store is blocked, it can block the manager as well as a consequence.
|
||||
results = []
|
||||
get_request_size = 10000
|
||||
for i in range(0, len(object_ids), get_request_size):
|
||||
results += ray.numbuf.retrieve_list(
|
||||
object_ids[i:(i + get_request_size)],
|
||||
self.plasma_client.conn,
|
||||
timeout)
|
||||
return results
|
||||
|
@ -392,8 +398,13 @@ class Worker(object):
|
|||
if not isinstance(object_id, ray.local_scheduler.ObjectID):
|
||||
raise Exception("Attempting to call `get` on the value {}, which is "
|
||||
"not an ObjectID.".format(object_id))
|
||||
# Do an initial fetch for remote objects.
|
||||
self.plasma_client.fetch([object_id.id() for object_id in object_ids])
|
||||
# Do an initial fetch for remote objects. We divide the fetch into smaller
|
||||
# fetches so as to not block the manager for a prolonged period of time in
|
||||
# a single call.
|
||||
fetch_request_size = 10000
|
||||
plain_object_ids = [object_id.id() for object_id in object_ids]
|
||||
for i in range(0, len(object_ids), fetch_request_size):
|
||||
self.plasma_client.fetch(plain_object_ids[i:(i + fetch_request_size)])
|
||||
|
||||
# Get the objects. We initially try to get the objects immediately.
|
||||
final_results = self.retrieve_and_deserialize(
|
||||
|
@ -404,15 +415,21 @@ class Worker(object):
|
|||
enumerate(final_results) if val is None)
|
||||
was_blocked = (len(unready_ids) > 0)
|
||||
# Try reconstructing any objects we haven't gotten yet. Try to get them
|
||||
# until GET_TIMEOUT_MILLISECONDS milliseconds passes, then repeat.
|
||||
# until at least GET_TIMEOUT_MILLISECONDS milliseconds passes, then repeat.
|
||||
while len(unready_ids) > 0:
|
||||
for unready_id in unready_ids:
|
||||
self.local_scheduler_client.reconstruct_object(unready_id)
|
||||
# Do another fetch for objects that aren't available locally yet, in case
|
||||
# they were evicted since the last fetch.
|
||||
self.plasma_client.fetch(list(unready_ids.keys()))
|
||||
results = self.retrieve_and_deserialize(list(unready_ids.keys()),
|
||||
GET_TIMEOUT_MILLISECONDS)
|
||||
# they were evicted since the last fetch. We divide the fetch into
|
||||
# smaller fetches so as to not block the manager for a prolonged period
|
||||
# of time in a single call.
|
||||
object_ids_to_fetch = list(unready_ids.keys())
|
||||
for i in range(0, len(object_ids_to_fetch), fetch_request_size):
|
||||
self.plasma_client.fetch(
|
||||
object_ids_to_fetch[i:(i + fetch_request_size)])
|
||||
results = self.retrieve_and_deserialize(
|
||||
list(unready_ids.keys()),
|
||||
max([GET_TIMEOUT_MILLISECONDS, int(0.01 * len(unready_ids))]))
|
||||
# Remove any entries for objects we received during this iteration so we
|
||||
# don't retrieve the same object twice.
|
||||
for object_id, val in results:
|
||||
|
|
|
@ -871,6 +871,8 @@ void process_message(event_loop *loop,
|
|||
int client_sock,
|
||||
void *context,
|
||||
int events) {
|
||||
int64_t start_time = current_time_ms();
|
||||
|
||||
LocalSchedulerClient *worker = (LocalSchedulerClient *) context;
|
||||
LocalSchedulerState *state = worker->local_scheduler_state;
|
||||
|
||||
|
@ -1001,6 +1003,15 @@ void process_message(event_loop *loop,
|
|||
/* This code should be unreachable. */
|
||||
CHECK(0);
|
||||
}
|
||||
|
||||
/* Print a warning if this method took too long. */
|
||||
int64_t end_time = current_time_ms();
|
||||
int64_t max_time_for_handler = 1000;
|
||||
if (end_time - start_time > max_time_for_handler) {
|
||||
LOG_WARN("process_message of type % " PRId64 " took %" PRId64
|
||||
" milliseconds.",
|
||||
type, end_time - start_time);
|
||||
}
|
||||
}
|
||||
|
||||
void new_client_connection(event_loop *loop,
|
||||
|
|
|
@ -1023,7 +1023,12 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) {
|
|||
}
|
||||
free(object_ids_to_request);
|
||||
|
||||
return MANAGER_TIMEOUT;
|
||||
/* Wait at least MANAGER_TIMEOUT before sending running this timeout handler
|
||||
* again. But if we're waiting for a large number of objects, wait longer
|
||||
* (e.g., 10 seconds for one million objects) so that we don't overwhelm other
|
||||
* components like Redis with too many requests (and so that we don't
|
||||
* overwhelm this manager with responses). */
|
||||
return std::max(MANAGER_TIMEOUT, int(0.01 * num_object_ids));
|
||||
}
|
||||
|
||||
bool is_object_local(PlasmaManagerState *state, ObjectID object_id) {
|
||||
|
@ -1530,6 +1535,8 @@ void process_message(event_loop *loop,
|
|||
int client_sock,
|
||||
void *context,
|
||||
int events) {
|
||||
int64_t start_time = current_time_ms();
|
||||
|
||||
ClientConnection *conn = (ClientConnection *) context;
|
||||
|
||||
int64_t length;
|
||||
|
@ -1591,6 +1598,15 @@ void process_message(event_loop *loop,
|
|||
LOG_FATAL("invalid request %" PRId64, type);
|
||||
}
|
||||
free(data);
|
||||
|
||||
/* Print a warning if this method took too long. */
|
||||
int64_t end_time = current_time_ms();
|
||||
int64_t max_time_for_handler = 1000;
|
||||
if (end_time - start_time > max_time_for_handler) {
|
||||
LOG_WARN("process_message of type % " PRId64 " took %" PRId64
|
||||
" milliseconds.",
|
||||
type, end_time - start_time);
|
||||
}
|
||||
}
|
||||
|
||||
int heartbeat_handler(event_loop *loop, timer_id id, void *context) {
|
||||
|
|
Loading…
Add table
Reference in a new issue