From f12db5f0e27bfc632145f3a2d222b2789a6a0a5a Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 17 Jun 2017 21:42:15 -0700 Subject: [PATCH] Divide large plasma requests into smaller chunks, and wait longer before reissuing large requests. (#678) * Divide large get requests into smaller chunks. * Divide fetches into smaller chunks. * Wait longer in worker and manager before reissuing fetch requests if there are many outstanding fetch requests. * Log warning if a handler in the local scheduler or plasma manager takes more than one second. --- python/ray/worker.py | 39 ++++++++++++++++++-------- src/local_scheduler/local_scheduler.cc | 11 ++++++++ src/plasma/plasma_manager.cc | 18 +++++++++++- 3 files changed, 56 insertions(+), 12 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 7b92100e8..761829131 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -350,10 +350,16 @@ class Worker(object): warning_sent = False while True: try: - results = ray.numbuf.retrieve_list( - object_ids, - self.plasma_client.conn, - timeout) + # We divide very large get requests into smaller get requests so that + # a single get request doesn't block the store for a long time, if the + # store is blocked, it can block the manager as well as a consequence. + results = [] + get_request_size = 10000 + for i in range(0, len(object_ids), get_request_size): + results += ray.numbuf.retrieve_list( + object_ids[i:(i + get_request_size)], + self.plasma_client.conn, + timeout) return results except serialization.RayDeserializationException as e: # Wait a little bit for the import thread to import the class. If we @@ -392,8 +398,13 @@ class Worker(object): if not isinstance(object_id, ray.local_scheduler.ObjectID): raise Exception("Attempting to call `get` on the value {}, which is " "not an ObjectID.".format(object_id)) - # Do an initial fetch for remote objects. - self.plasma_client.fetch([object_id.id() for object_id in object_ids]) + # Do an initial fetch for remote objects. We divide the fetch into smaller + # fetches so as to not block the manager for a prolonged period of time in + # a single call. + fetch_request_size = 10000 + plain_object_ids = [object_id.id() for object_id in object_ids] + for i in range(0, len(object_ids), fetch_request_size): + self.plasma_client.fetch(plain_object_ids[i:(i + fetch_request_size)]) # Get the objects. We initially try to get the objects immediately. final_results = self.retrieve_and_deserialize( @@ -404,15 +415,21 @@ class Worker(object): enumerate(final_results) if val is None) was_blocked = (len(unready_ids) > 0) # Try reconstructing any objects we haven't gotten yet. Try to get them - # until GET_TIMEOUT_MILLISECONDS milliseconds passes, then repeat. + # until at least GET_TIMEOUT_MILLISECONDS milliseconds passes, then repeat. while len(unready_ids) > 0: for unready_id in unready_ids: self.local_scheduler_client.reconstruct_object(unready_id) # Do another fetch for objects that aren't available locally yet, in case - # they were evicted since the last fetch. - self.plasma_client.fetch(list(unready_ids.keys())) - results = self.retrieve_and_deserialize(list(unready_ids.keys()), - GET_TIMEOUT_MILLISECONDS) + # they were evicted since the last fetch. We divide the fetch into + # smaller fetches so as to not block the manager for a prolonged period + # of time in a single call. + object_ids_to_fetch = list(unready_ids.keys()) + for i in range(0, len(object_ids_to_fetch), fetch_request_size): + self.plasma_client.fetch( + object_ids_to_fetch[i:(i + fetch_request_size)]) + results = self.retrieve_and_deserialize( + list(unready_ids.keys()), + max([GET_TIMEOUT_MILLISECONDS, int(0.01 * len(unready_ids))])) # Remove any entries for objects we received during this iteration so we # don't retrieve the same object twice. for object_id, val in results: diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 1454f5e15..a3337f270 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -871,6 +871,8 @@ void process_message(event_loop *loop, int client_sock, void *context, int events) { + int64_t start_time = current_time_ms(); + LocalSchedulerClient *worker = (LocalSchedulerClient *) context; LocalSchedulerState *state = worker->local_scheduler_state; @@ -1001,6 +1003,15 @@ void process_message(event_loop *loop, /* This code should be unreachable. */ CHECK(0); } + + /* Print a warning if this method took too long. */ + int64_t end_time = current_time_ms(); + int64_t max_time_for_handler = 1000; + if (end_time - start_time > max_time_for_handler) { + LOG_WARN("process_message of type % " PRId64 " took %" PRId64 + " milliseconds.", + type, end_time - start_time); + } } void new_client_connection(event_loop *loop, diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index b54cb8fdc..6447df544 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -1023,7 +1023,12 @@ int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { } free(object_ids_to_request); - return MANAGER_TIMEOUT; + /* Wait at least MANAGER_TIMEOUT before sending running this timeout handler + * again. But if we're waiting for a large number of objects, wait longer + * (e.g., 10 seconds for one million objects) so that we don't overwhelm other + * components like Redis with too many requests (and so that we don't + * overwhelm this manager with responses). */ + return std::max(MANAGER_TIMEOUT, int(0.01 * num_object_ids)); } bool is_object_local(PlasmaManagerState *state, ObjectID object_id) { @@ -1530,6 +1535,8 @@ void process_message(event_loop *loop, int client_sock, void *context, int events) { + int64_t start_time = current_time_ms(); + ClientConnection *conn = (ClientConnection *) context; int64_t length; @@ -1591,6 +1598,15 @@ void process_message(event_loop *loop, LOG_FATAL("invalid request %" PRId64, type); } free(data); + + /* Print a warning if this method took too long. */ + int64_t end_time = current_time_ms(); + int64_t max_time_for_handler = 1000; + if (end_time - start_time > max_time_for_handler) { + LOG_WARN("process_message of type % " PRId64 " took %" PRId64 + " milliseconds.", + type, end_time - start_time); + } } int heartbeat_handler(event_loop *loop, timer_id id, void *context) {