Remove object id translation (#6531)

This commit is contained in:
Edward Oakes 2019-12-19 12:47:49 -08:00 committed by Eric Liang
parent 7b955881f3
commit 41fa2e9604
17 changed files with 81 additions and 93 deletions

View file

@ -92,5 +92,7 @@ def get_async(object_id):
inner_future.add_done_callback(done_callback)
# A hack to keep reference to inner_future so it doesn't get GC.
user_future.inner_future = inner_future
# A hack to keep a reference to the object ID for ref counting.
user_future.object_id = object_id
return user_future

View file

@ -170,9 +170,6 @@ cdef class ObjectID(BaseID):
def is_direct_call_type(self):
return self.data.IsDirectCallType()
def with_plasma_transport_type(self):
return ObjectID(self.data.WithPlasmaTransportType().Binary())
def is_nil(self):
return self.data.IsNil()

View file

@ -54,7 +54,7 @@ def test_dying_worker_get(ray_start_2_cpus):
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type())
ray.worker.global_worker.put_object(1, x_id)
time.sleep(0.1)
# Make sure that nothing has died.
@ -97,7 +97,7 @@ ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}")))
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type())
ray.worker.global_worker.put_object(1, x_id)
time.sleep(0.1)
# Make sure that nothing has died.
@ -137,7 +137,7 @@ def test_dying_worker_wait(ray_start_2_cpus):
time.sleep(0.1)
# Create the object.
ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type())
ray.worker.global_worker.put_object(1, x_id)
time.sleep(0.1)
# Make sure that nothing has died.
@ -180,7 +180,7 @@ ray.wait([ray.ObjectID(ray.utils.hex_to_binary("{}"))])
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# wait can return.
ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type())
ray.worker.global_worker.put_object(1, x_id)
time.sleep(0.1)
# Make sure that nothing has died.

View file

@ -1,7 +1,7 @@
#include <memory>
#include "ray/core_worker/actor_handle.h"
#include <memory>
namespace {
ray::rpc::ActorHandle CreateInnerActorHandle(

View file

@ -1,4 +1,5 @@
#include "ray/core_worker/actor_manager.h"
#include "ray/gcs/redis_actor_info_accessor.h"
namespace ray {

View file

@ -329,9 +329,8 @@ void CoreWorker::PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id,
rpc::Address *owner_address) {
RAY_CHECK(object_id.IsDirectCallType());
auto value = memory_store_->GetOrPromoteToPlasma(object_id);
if (value != nullptr) {
RAY_CHECK_OK(
plasma_store_provider_->Put(*value, object_id.WithPlasmaTransportType()));
if (value) {
RAY_CHECK_OK(plasma_store_provider_->Put(*value, object_id));
}
auto has_owner = reference_counter_->GetOwner(object_id, owner_id, owner_address);
@ -405,51 +404,31 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_m
bool got_exception = false;
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> result_map;
auto start_time = current_time_ms();
RAY_RETURN_NOT_OK(plasma_store_provider_->Get(
plasma_object_ids, timeout_ms, worker_context_, &result_map, &got_exception));
if (!got_exception) {
int64_t local_timeout_ms = timeout_ms;
if (timeout_ms >= 0) {
local_timeout_ms = std::max(static_cast<int64_t>(0),
timeout_ms - (current_time_ms() - start_time));
}
RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, local_timeout_ms,
worker_context_, &result_map, &got_exception));
if (!memory_object_ids.empty()) {
RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, timeout_ms, worker_context_,
&result_map, &got_exception));
}
if (!got_exception) {
// If any of the objects have been promoted to plasma, then we retry their
// gets at the provider plasma. Once we get the objects from plasma, we flip
// the transport type again and return them for the original direct call ids.
absl::flat_hash_set<ObjectID> promoted_plasma_ids;
for (const auto &pair : result_map) {
if (pair.second->IsInPlasmaError()) {
RAY_LOG(DEBUG) << pair.first << " in plasma, doing fetch-and-get";
promoted_plasma_ids.insert(
pair.first.WithTransportType(TaskTransportType::RAYLET));
RAY_LOG(INFO) << pair.first << " in plasma, doing fetch-and-get";
plasma_object_ids.insert(pair.first);
}
}
if (!promoted_plasma_ids.empty()) {
int64_t local_timeout_ms = timeout_ms;
if (timeout_ms >= 0) {
local_timeout_ms = std::max(static_cast<int64_t>(0),
timeout_ms - (current_time_ms() - start_time));
}
RAY_LOG(DEBUG) << "Plasma GET timeout " << local_timeout_ms;
RAY_RETURN_NOT_OK(plasma_store_provider_->Get(promoted_plasma_ids, local_timeout_ms,
worker_context_, &result_map,
&got_exception));
for (const auto &id : promoted_plasma_ids) {
auto it = result_map.find(id);
if (it == result_map.end()) {
result_map.erase(id.WithTransportType(TaskTransportType::DIRECT));
} else {
result_map[id.WithTransportType(TaskTransportType::DIRECT)] = it->second;
}
result_map.erase(id);
}
int64_t local_timeout_ms = timeout_ms;
if (timeout_ms >= 0) {
local_timeout_ms = std::max(static_cast<int64_t>(0),
timeout_ms - (current_time_ms() - start_time));
}
RAY_LOG(DEBUG) << "Plasma GET timeout " << local_timeout_ms;
RAY_RETURN_NOT_OK(plasma_store_provider_->Get(plasma_object_ids, local_timeout_ms,
worker_context_, &result_map,
&got_exception));
}
// Loop through `ids` and fill each entry for the `results` vector,
@ -478,7 +457,7 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_m
}
// If no timeout was set and none of the results will throw an exception,
// then check that we fetched all results before returning.
if (timeout_ms >= 0 && !will_throw_exception) {
if (timeout_ms < 0 && !will_throw_exception) {
RAY_CHECK(!missing_result);
}
@ -488,15 +467,13 @@ Status CoreWorker::Get(const std::vector<ObjectID> &ids, const int64_t timeout_m
Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) {
bool found = false;
if (object_id.IsDirectCallType()) {
// Note that the memory store returns false if the object value is
// ErrorType::OBJECT_IN_PLASMA.
found = memory_store_->Contains(object_id);
}
if (!found) {
// We check plasma as a fallback in all cases, since a direct call object
// may have been spilled to plasma.
RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(
object_id.WithTransportType(TaskTransportType::RAYLET), &found));
bool in_plasma = false;
found = memory_store_->Contains(object_id, &in_plasma);
if (in_plasma) {
RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found));
}
} else {
RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found));
}
*has_object = found;
return Status::OK();
@ -847,9 +824,8 @@ Status CoreWorker::AllocateReturnObjects(
RayConfig::instance().max_direct_call_object_size()) {
data_buffer = std::make_shared<LocalMemoryBuffer>(data_sizes[i]);
} else {
RAY_RETURN_NOT_OK(Create(
metadatas[i], data_sizes[i],
object_ids[i].WithTransportType(TaskTransportType::RAYLET), &data_buffer));
RAY_RETURN_NOT_OK(
Create(metadatas[i], data_sizes[i], object_ids[i], &data_buffer));
object_already_exists = !data_buffer;
}
}
@ -910,7 +886,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
continue;
}
if (return_objects->at(i)->GetData()->IsPlasmaBuffer()) {
if (!Seal(return_ids[i].WithTransportType(TaskTransportType::RAYLET)).ok()) {
if (!Seal(return_ids[i]).ok()) {
RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object "
<< return_ids[i] << " in store: " << status.message();
}
@ -943,16 +919,23 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task,
args->resize(num_args);
arg_reference_ids->resize(num_args);
std::vector<ObjectID> object_ids_to_fetch;
std::vector<int> indices;
absl::flat_hash_set<ObjectID> by_ref_ids;
absl::flat_hash_map<ObjectID, int> by_ref_indices;
for (size_t i = 0; i < task.NumArgs(); ++i) {
int count = task.ArgIdCount(i);
if (count > 0) {
// pass by reference.
RAY_CHECK(count == 1);
object_ids_to_fetch.push_back(task.ArgId(i, 0));
indices.push_back(i);
// Direct call type objects that weren't inlined have been promoted to plasma.
// We need to put an OBJECT_IN_PLASMA error here so the subsequent call to Get()
// properly redirects to the plasma store.
if (task.ArgId(i, 0).IsDirectCallType()) {
RAY_CHECK_OK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
task.ArgId(i, 0)));
}
by_ref_ids.insert(task.ArgId(i, 0));
by_ref_indices.emplace(task.ArgId(i, 0), i);
arg_reference_ids->at(i) = task.ArgId(i, 0);
} else {
// pass by value.
@ -971,15 +954,16 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task,
}
}
std::vector<std::shared_ptr<RayObject>> results;
auto status = Get(object_ids_to_fetch, -1, &results);
if (status.ok()) {
for (size_t i = 0; i < results.size(); i++) {
args->at(indices[i]) = results[i];
}
// Fetch by-reference arguments directly from the plasma store.
bool got_exception = false;
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> result_map;
RAY_RETURN_NOT_OK(plasma_store_provider_->Get(by_ref_ids, -1, worker_context_,
&result_map, &got_exception));
for (const auto &it : result_map) {
args->at(by_ref_indices[it.first]) = it.second;
}
return status;
return Status::OK();
}
void CoreWorker::HandleAssignTask(const rpc::AssignTaskRequest &request,
@ -1077,7 +1061,7 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c
memory_store_->GetAsync(object_id, [python_future, success_callback, fallback_callback,
object_id](std::shared_ptr<RayObject> ray_object) {
if (ray_object->IsInPlasmaError()) {
fallback_callback(ray_object, object_id.WithPlasmaTransportType(), python_future);
fallback_callback(ray_object, object_id, python_future);
} else {
success_callback(ray_object, object_id, python_future);
}

View file

@ -130,7 +130,7 @@ class CoreWorker {
/// called on object IDs that were created randomly, e.g.,
/// ObjectID::FromRandom.
///
/// Postcondition: Get(object_id.WithPlasmaTransportType()) is valid.
/// Postcondition: Get(object_id) is valid.
///
/// \param[in] object_id The object ID to serialize.
/// \param[out] owner_id The ID of the object's owner. This should be

View file

@ -1,7 +1,7 @@
#include <chrono>
#include "ray/core_worker/profiling.h"
#include <chrono>
namespace ray {
namespace worker {

View file

@ -4,7 +4,6 @@
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "ray/core_worker/context.h"
#include "ray/gcs/redis_gcs_client.h"

View file

@ -1,4 +1,5 @@
#include <condition_variable>
#include "ray/common/ray_config.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
@ -176,7 +177,7 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec
if (!object.IsInPlasmaError()) {
// Only need to promote to plasma if it wasn't already put into plasma
// by the task that created the object.
store_in_plasma_(object, object_id.WithTransportType(TaskTransportType::RAYLET));
store_in_plasma_(object, object_id);
}
promoted_to_plasma_.erase(promoted_it);
}
@ -367,8 +368,7 @@ void CoreWorkerMemoryStore::Delete(const absl::flat_hash_set<ObjectID> &object_i
auto it = objects_.find(object_id);
if (it != objects_.end()) {
if (it->second->IsInPlasmaError()) {
plasma_ids_to_delete->insert(
object_id.WithTransportType(TaskTransportType::RAYLET));
plasma_ids_to_delete->insert(object_id);
} else {
objects_.erase(it);
}
@ -383,13 +383,17 @@ void CoreWorkerMemoryStore::Delete(const std::vector<ObjectID> &object_ids) {
}
}
bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id) {
bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id, bool *in_plasma) {
absl::MutexLock lock(&mu_);
auto it = objects_.find(object_id);
if (it != objects_.end() && it->second->IsInPlasmaError()) {
return false;
if (it != objects_.end()) {
if (it->second->IsInPlasmaError()) {
*in_plasma = true;
return false;
}
return true;
}
return it != objects_.end();
return false;
}
} // namespace ray

View file

@ -104,8 +104,10 @@ class CoreWorkerMemoryStore {
/// Check whether this store contains the object.
///
/// \param[in] object_id The object to check.
/// \param[out] in_plasma Set to true if the object was spilled to plasma.
/// If this is set to true, Contains() will return false.
/// \return Whether the store has the object.
bool Contains(const ObjectID &object_id);
bool Contains(const ObjectID &object_id, bool *in_plasma);
/// Returns the number of objects in this store.
///

View file

@ -1,4 +1,5 @@
#include "ray/core_worker/store_provider/plasma_store_provider.h"
#include "ray/common/ray_config.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/core_worker.h"
@ -48,7 +49,6 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr<Buffer> &meta
const size_t data_size,
const ObjectID &object_id,
std::shared_ptr<Buffer> *data) {
RAY_CHECK(!object_id.IsDirectCallType());
auto plasma_id = object_id.ToPlasmaId();
std::shared_ptr<arrow::Buffer> arrow_buffer;
{

View file

@ -1,4 +1,5 @@
#include "ray/core_worker/task_manager.h"
#include "ray/util/util.h"
namespace ray {

View file

@ -4,7 +4,6 @@
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/synchronization/mutex.h"
#include "ray/common/id.h"
#include "ray/common/task/task.h"
#include "ray/core_worker/actor_manager.h"

View file

@ -1,9 +1,9 @@
#include "gtest/gtest.h"
#include "ray/core_worker/transport/direct_task_transport.h"
#include "gtest/gtest.h"
#include "ray/common/task/task_spec.h"
#include "ray/common/task/task_util.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/transport/direct_task_transport.h"
#include "ray/raylet/raylet_client.h"
#include "ray/rpc/worker/core_worker_client.h"
#include "src/ray/util/test_util.h"
@ -168,8 +168,8 @@ TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) {
resolver.ResolveDependencies(task, [&ok]() { ok = true; });
ASSERT_TRUE(ok);
ASSERT_TRUE(task.ArgByRef(0));
// Checks that the object id was promoted to a plasma type id.
ASSERT_FALSE(task.ArgId(0, 0).IsDirectCallType());
// Checks that the object id is still a direct call id.
ASSERT_TRUE(task.ArgId(0, 0).IsDirectCallType());
ASSERT_EQ(resolver.NumPendingTasks(), 0);
}

View file

@ -32,8 +32,7 @@ void InlineDependencies(
mutable_arg->clear_object_ids();
if (it->second->IsInPlasmaError()) {
// Promote the object id to plasma.
mutable_arg->add_object_ids(
it->first.WithTransportType(TaskTransportType::RAYLET).Binary());
mutable_arg->add_object_ids(it->first.Binary());
} else {
// Inline the object value.
if (it->second->HasData()) {

View file

@ -21,8 +21,8 @@ class LocalDependencyResolver {
//
/// Note: This method **will mutate** the given TaskSpecification.
///
/// Postcondition: all direct call ids in arguments are converted to values and all
/// remaining by-reference arguments are TaskTransportType::RAYLET.
/// Postcondition: all direct call id arguments that haven't been spilled to plasma
/// are converted to values and all remaining arguments are arguments in the task spec.
void ResolveDependencies(TaskSpecification &task, std::function<void()> on_complete);
/// Return the number of tasks pending dependency resolution.