[CoreWorker]Remove plasma_objects_only parameter (#17384)

This commit is contained in:
wanxing 2021-07-30 14:48:36 +08:00 committed by GitHub
parent b8baac3cb0
commit 705248f4ee
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 27 deletions

View file

@ -1011,16 +1011,14 @@ cdef class CoreWorker:
return self.plasma_event_handler
def get_objects(self, object_refs, TaskID current_task_id,
int64_t timeout_ms=-1,
plasma_objects_only=False):
int64_t timeout_ms=-1):
cdef:
c_vector[shared_ptr[CRayObject]] results
CTaskID c_task_id = current_task_id.native()
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
c_bool _plasma_objects_only = plasma_objects_only
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Get(
c_object_ids, timeout_ms, &results, _plasma_objects_only))
c_object_ids, timeout_ms, &results))
return RayObjectsToDataMetadataPairs(results)

View file

@ -203,8 +203,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object,
const unique_ptr[CAddress] &owner_address)
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
c_vector[shared_ptr[CRayObject]] *results,
c_bool plasma_objects_only)
c_vector[shared_ptr[CRayObject]] *results)
CRayStatus GetIfLocal(
const c_vector[CObjectID] &ids,
c_vector[shared_ptr[CRayObject]] *results)

View file

@ -1230,8 +1230,7 @@ Status CoreWorker::SealExisting(const ObjectID &object_id, bool pin_object,
}
Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_ms,
std::vector<std::shared_ptr<RayObject>> *results,
bool plasma_objects_only) {
std::vector<std::shared_ptr<RayObject>> *results) {
results->resize(ids.size(), nullptr);
absl::flat_hash_set<ObjectID> plasma_object_ids;
@ -1241,24 +1240,20 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_m
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> result_map;
auto start_time = current_time_ms();
if (!plasma_objects_only) {
if (!memory_object_ids.empty()) {
RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, timeout_ms, worker_context_,
&result_map, &got_exception));
}
if (!memory_object_ids.empty()) {
RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, timeout_ms, worker_context_,
&result_map, &got_exception));
}
// Erase any objects that were promoted to plasma from the results. These get
// requests will be retried at the plasma store.
for (auto it = result_map.begin(); it != result_map.end();) {
auto current = it++;
if (current->second->IsInPlasmaError()) {
RAY_LOG(DEBUG) << current->first << " in plasma, doing fetch-and-get";
plasma_object_ids.insert(current->first);
result_map.erase(current);
}
// Erase any objects that were promoted to plasma from the results. These get
// requests will be retried at the plasma store.
for (auto it = result_map.begin(); it != result_map.end();) {
auto current = it++;
if (current->second->IsInPlasmaError()) {
RAY_LOG(DEBUG) << current->first << " in plasma, doing fetch-and-get";
plasma_object_ids.insert(current->first);
result_map.erase(current);
}
} else {
plasma_object_ids = std::move(memory_object_ids);
}
if (!got_exception) {

View file

@ -590,11 +590,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] ids IDs of the objects to get.
/// \param[in] timeout_ms Timeout in milliseconds, wait infinitely if it's negative.
/// \param[out] results Result list of objects data.
/// \param[in] plasma_objects_only Only get objects from Plasma Store.
/// \return Status.
Status Get(const std::vector<ObjectID> &ids, const int64_t timeout_ms,
std::vector<std::shared_ptr<RayObject>> *results,
bool plasma_objects_only = false);
std::vector<std::shared_ptr<RayObject>> *results);
/// Get objects directly from the local plasma store, without waiting for the
/// objects to be fetched from another node. This should only be used