Batch fetch requests in core worker get (#5342)

This commit is contained in:
Edward Oakes 2019-08-22 11:16:46 -07:00 committed by Philipp Moritz
parent cdc9227f1b
commit f359333933
2 changed files with 71 additions and 47 deletions

View file

@ -19,70 +19,94 @@ Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object,
Status CoreWorkerPlasmaStoreProvider::Get(
const std::vector<ObjectID> &ids, int64_t timeout_ms, const TaskID &task_id,
std::vector<std::shared_ptr<RayObject>> *results) {
int64_t batch_size = RayConfig::instance().worker_fetch_request_size();
(*results).resize(ids.size(), nullptr);
std::unordered_map<ObjectID, int> remaining;
bool was_blocked = false;
// First, attempt to fetch all of the required objects without reconstructing.
for (int64_t start = 0; start < int64_t(ids.size()); start += batch_size) {
int64_t end = std::min(start + batch_size, int64_t(ids.size()));
const std::vector<ObjectID> ids_slice(ids.cbegin() + start, ids.cbegin() + end);
RAY_CHECK_OK(
raylet_client_->FetchOrReconstruct(ids_slice, /*fetch_only=*/true, task_id));
std::vector<std::shared_ptr<RayObject>> results_slice;
RAY_RETURN_NOT_OK(local_store_provider_.Get(ids_slice, 0, task_id, &results_slice));
std::unordered_map<ObjectID, int> unready;
for (size_t i = 0; i < ids.size(); i++) {
unready.insert({ids[i], i});
// Iterate through the results from the local store, adding them to the remaining
// map if they weren't successfully fetched from the local store (are nullptr).
// Keeps track of the locations of the remaining object IDs in the original list.
for (size_t i = 0; i < ids_slice.size(); i++) {
if (results_slice[i] != nullptr) {
(*results)[start + i] = results_slice[i];
// Terminate early on exception because it'll be raised by the worker anyways.
if (IsException(*results_slice[i])) {
return Status::OK();
}
} else {
remaining.insert({ids_slice[i], start + i});
}
}
}
int num_attempts = 0;
// If all objects were fetched already, return.
if (remaining.empty()) {
return Status::OK();
}
// If not all objects were successfully fetched, repeatedly call FetchOrReconstruct and
// Get from the local object store in batches. This loop will run indefinitely until the
// objects are all fetched if timeout is -1.
int unsuccessful_attempts = 0;
bool should_break = false;
int64_t remaining_timeout = timeout_ms;
// Repeat until we get all objects.
while (!unready.empty() && !should_break) {
std::vector<ObjectID> unready_ids;
for (const auto &entry : unready) {
unready_ids.push_back(entry.first);
while (!remaining.empty() && !should_break) {
std::vector<ObjectID> batch_ids;
for (const auto &entry : remaining) {
if (int64_t(batch_ids.size()) == batch_size) {
break;
}
batch_ids.push_back(entry.first);
}
// For the initial fetch, we only fetch the objects, do not reconstruct them.
bool fetch_only = num_attempts == 0;
if (!fetch_only) {
// If fetch_only is false, this worker will be blocked.
was_blocked = true;
}
RAY_CHECK_OK(
raylet_client_->FetchOrReconstruct(batch_ids, /*fetch_only=*/false, task_id));
// TODO(zhijunfu): can call `fetchOrReconstruct` in batches as an optimization.
RAY_CHECK_OK(raylet_client_->FetchOrReconstruct(unready_ids, fetch_only, task_id));
// Get the objects from the object store, and parse the result.
int64_t get_timeout;
int64_t batch_timeout = std::max(RayConfig::instance().get_timeout_milliseconds(),
int64_t(10 * batch_ids.size()));
if (remaining_timeout >= 0) {
get_timeout =
std::min(remaining_timeout, RayConfig::instance().get_timeout_milliseconds());
remaining_timeout -= get_timeout;
batch_timeout = std::min(remaining_timeout, batch_timeout);
remaining_timeout -= batch_timeout;
should_break = remaining_timeout <= 0;
} else {
get_timeout = RayConfig::instance().get_timeout_milliseconds();
}
std::vector<std::shared_ptr<RayObject>> result_objects;
std::vector<std::shared_ptr<RayObject>> batch_results;
RAY_RETURN_NOT_OK(
local_store_provider_.Get(unready_ids, get_timeout, task_id, &result_objects));
local_store_provider_.Get(batch_ids, batch_timeout, task_id, &batch_results));
for (size_t i = 0; i < result_objects.size(); i++) {
if (result_objects[i] != nullptr) {
const auto &object_id = unready_ids[i];
(*results)[unready[object_id]] = result_objects[i];
unready.erase(object_id);
if (IsException(*result_objects[i])) {
// Add successfully retrieved objects to the result list and remove them from
// remaining.
uint64_t successes = 0;
for (size_t i = 0; i < batch_results.size(); i++) {
if (batch_results[i] != nullptr) {
successes++;
const auto &object_id = batch_ids[i];
(*results)[remaining[object_id]] = batch_results[i];
remaining.erase(object_id);
if (IsException(*batch_results[i])) {
should_break = true;
}
}
}
num_attempts += 1;
WarnIfAttemptedTooManyTimes(num_attempts, unready);
if (successes < batch_ids.size()) {
unsuccessful_attempts++;
WarnIfAttemptedTooManyTimes(unsuccessful_attempts, remaining);
}
}
if (was_blocked) {
RAY_CHECK_OK(raylet_client_->NotifyUnblocked(task_id));
}
return Status::OK();
// Notify unblocked because we blocked when calling FetchOrReconstruct with
// fetch_only=false.
return raylet_client_->NotifyUnblocked(task_id);
}
Status CoreWorkerPlasmaStoreProvider::Wait(const std::vector<ObjectID> &object_ids,
@ -129,12 +153,12 @@ bool CoreWorkerPlasmaStoreProvider::IsException(const RayObject &object) {
}
void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes(
int num_attempts, const std::unordered_map<ObjectID, int> &unready) {
int num_attempts, const std::unordered_map<ObjectID, int> &remaining) {
if (num_attempts % RayConfig::instance().object_store_get_warn_per_num_attempts() ==
0) {
std::ostringstream oss;
size_t printed = 0;
for (auto &entry : unready) {
for (auto &entry : remaining) {
if (printed >=
RayConfig::instance().object_store_get_max_ids_to_print_in_warning()) {
break;
@ -144,14 +168,14 @@ void CoreWorkerPlasmaStoreProvider::WarnIfAttemptedTooManyTimes(
}
oss << entry.first.Hex();
}
if (printed < unready.size()) {
if (printed < remaining.size()) {
oss << ", etc";
}
RAY_LOG(WARNING)
<< "Attempted " << num_attempts << " times to reconstruct objects, but "
<< "some objects are still unavailable. If this message continues to print,"
<< " it may indicate that object's creating task is hanging, or something wrong"
<< " happened in raylet backend. " << unready.size()
<< " happened in raylet backend. " << remaining.size()
<< " object(s) pending: " << oss.str() << ".";
}
}

View file

@ -72,9 +72,9 @@ class CoreWorkerPlasmaStoreProvider : public CoreWorkerStoreProvider {
/// unavailable.
///
/// \param[in] num_attemps The number of attempted times.
/// \param[in] unready The unready objects.
/// \param[in] remaining The remaining objects.
static void WarnIfAttemptedTooManyTimes(
int num_attempts, const std::unordered_map<ObjectID, int> &unready);
int num_attempts, const std::unordered_map<ObjectID, int> &remaining);
/// local plasma store provider.
CoreWorkerLocalPlasmaStoreProvider local_store_provider_;