diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index dc6d415ee..39cf58b8d 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -19,70 +19,94 @@ Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object, Status CoreWorkerPlasmaStoreProvider::Get( const std::vector &ids, int64_t timeout_ms, const TaskID &task_id, std::vector> *results) { + int64_t batch_size = RayConfig::instance().worker_fetch_request_size(); (*results).resize(ids.size(), nullptr); + std::unordered_map remaining; - bool was_blocked = false; + // First, attempt to fetch all of the required objects without reconstructing. + for (int64_t start = 0; start < int64_t(ids.size()); start += batch_size) { + int64_t end = std::min(start + batch_size, int64_t(ids.size())); + const std::vector ids_slice(ids.cbegin() + start, ids.cbegin() + end); + RAY_CHECK_OK( + raylet_client_->FetchOrReconstruct(ids_slice, /*fetch_only=*/true, task_id)); + std::vector> results_slice; + RAY_RETURN_NOT_OK(local_store_provider_.Get(ids_slice, 0, task_id, &results_slice)); - std::unordered_map unready; - for (size_t i = 0; i < ids.size(); i++) { - unready.insert({ids[i], i}); + // Iterate through the results from the local store, adding them to the remaining + // map if they weren't successfully fetched from the local store (are nullptr). + // Keeps track of the locations of the remaining object IDs in the original list. + for (size_t i = 0; i < ids_slice.size(); i++) { + if (results_slice[i] != nullptr) { + (*results)[start + i] = results_slice[i]; + // Terminate early on exception because it'll be raised by the worker anyways. + if (IsException(*results_slice[i])) { + return Status::OK(); + } + } else { + remaining.insert({ids_slice[i], start + i}); + } + } } - int num_attempts = 0; + // If all objects were fetched already, return. + if (remaining.empty()) { + return Status::OK(); + } + + // If not all objects were successfully fetched, repeatedly call FetchOrReconstruct and + // Get from the local object store in batches. This loop will run indefinitely until the + // objects are all fetched if timeout is -1. + int unsuccessful_attempts = 0; bool should_break = false; int64_t remaining_timeout = timeout_ms; - // Repeat until we get all objects. - while (!unready.empty() && !should_break) { - std::vector unready_ids; - for (const auto &entry : unready) { - unready_ids.push_back(entry.first); + while (!remaining.empty() && !should_break) { + std::vector batch_ids; + for (const auto &entry : remaining) { + if (int64_t(batch_ids.size()) == batch_size) { + break; + } + batch_ids.push_back(entry.first); } - // For the initial fetch, we only fetch the objects, do not reconstruct them. - bool fetch_only = num_attempts == 0; - if (!fetch_only) { - // If fetch_only is false, this worker will be blocked. - was_blocked = true; - } + RAY_CHECK_OK( + raylet_client_->FetchOrReconstruct(batch_ids, /*fetch_only=*/false, task_id)); - // TODO(zhijunfu): can call `fetchOrReconstruct` in batches as an optimization. - RAY_CHECK_OK(raylet_client_->FetchOrReconstruct(unready_ids, fetch_only, task_id)); - - // Get the objects from the object store, and parse the result. - int64_t get_timeout; + int64_t batch_timeout = std::max(RayConfig::instance().get_timeout_milliseconds(), + int64_t(10 * batch_ids.size())); if (remaining_timeout >= 0) { - get_timeout = - std::min(remaining_timeout, RayConfig::instance().get_timeout_milliseconds()); - remaining_timeout -= get_timeout; + batch_timeout = std::min(remaining_timeout, batch_timeout); + remaining_timeout -= batch_timeout; should_break = remaining_timeout <= 0; - } else { - get_timeout = RayConfig::instance().get_timeout_milliseconds(); } - std::vector> result_objects; + std::vector> batch_results; RAY_RETURN_NOT_OK( - local_store_provider_.Get(unready_ids, get_timeout, task_id, &result_objects)); + local_store_provider_.Get(batch_ids, batch_timeout, task_id, &batch_results)); - for (size_t i = 0; i < result_objects.size(); i++) { - if (result_objects[i] != nullptr) { - const auto &object_id = unready_ids[i]; - (*results)[unready[object_id]] = result_objects[i]; - unready.erase(object_id); - if (IsException(*result_objects[i])) { + // Add successfully retrieved objects to the result list and remove them from + // remaining. + uint64_t successes = 0; + for (size_t i = 0; i < batch_results.size(); i++) { + if (batch_results[i] != nullptr) { + successes++; + const auto &object_id = batch_ids[i]; + (*results)[remaining[object_id]] = batch_results[i]; + remaining.erase(object_id); + if (IsException(*batch_results[i])) { should_break = true; } } } - num_attempts += 1; - WarnIfAttemptedTooManyTimes(num_attempts, unready); + if (successes < batch_ids.size()) { + unsuccessful_attempts++; + WarnIfAttemptedTooManyTimes(unsuccessful_attempts, remaining); + } } - if (was_blocked) { - RAY_CHECK_OK(raylet_client_->NotifyUnblocked(task_id)); - } - - return Status::OK(); + // Notify unblocked because we blocked when calling FetchOrReconstruct with + // fetch_only=false. + return raylet_client_->NotifyUnblocked(task_id); } Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector &object_ids, @@ -129,12 +153,12 @@ bool CoreWorkerPlasmaStoreProvider::IsException(const RayObject &object) { } void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes( - int num_attempts, const std::unordered_map &unready) { + int num_attempts, const std::unordered_map &remaining) { if (num_attempts % RayConfig::instance().object_store_get_warn_per_num_attempts() == 0) { std::ostringstream oss; size_t printed = 0; - for (auto &entry : unready) { + for (auto &entry : remaining) { if (printed >= RayConfig::instance().object_store_get_max_ids_to_print_in_warning()) { break; @@ -144,14 +168,14 @@ void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes( } oss << entry.first.Hex(); } - if (printed < unready.size()) { + if (printed < remaining.size()) { oss << ", etc"; } RAY_LOG(WARNING) << "Attempted " << num_attempts << " times to reconstruct objects, but " << "some objects are still unavailable. If this message continues to print," << " it may indicate that object's creating task is hanging, or something wrong" - << " happened in raylet backend. " << unready.size() + << " happened in raylet backend. " << remaining.size() << " object(s) pending: " << oss.str() << "."; } } diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 1ef6a8b1d..4fa5537bc 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -72,9 +72,9 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider { /// unavailable. /// /// \param[in] num_attemps The number of attempted times. - /// \param[in] unready The unready objects. + /// \param[in] remaining The remaining objects. static void WarnIfAttemptedTooManyTimes( - int num_attempts, const std::unordered_map &unready); + int num_attempts, const std::unordered_map &remaining); /// local plasma store provider. CoreWorkerLocalPlasmaStoreProvider local_store_provider_;