[core] Pin object if it already exists (#20447)

A worker can crash right after putting its return values into the object store. Then, the owner will receive the worker crashed error, but the return objects will still be in the remote object store. Later, if the task is retried, the worker will crash on [this line](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/direct_actor_transport.cc#L105) because the object already exists.

Another way this can happen is if a task has multiple return values, and one of those return values is transferred to another node. If the task is later re-executed on that node, the task will fail because of the same error.

This PR fixes the crash so that:
1. If an object already exists, we try to pin that copy. Ideally, we should destroy the old copy and create the new one to make sure that metadata like the owner address is in sync, but this is pretty complicated to do right now.
2. If the pinning fails, we store an OBJECT_LOST error to throw to the application.
3. On the raylet, we check whether we already have the object pinned, and only subscribe to the owner's eviction message if the object is not pinned.
4. Also fixes bugs in the analogous case for `ray.put` (previously this would hang, now the application will receive an error if a `ray.put` object already exists).
This commit is contained in:
Stephanie Wang 2021-12-10 15:56:43 -08:00 committed by GitHub
parent 6280bc4391
commit 3a5dd9a10b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 459 additions and 121 deletions

View file

@ -198,7 +198,7 @@ Status TaskExecutor::ExecuteTask(
int64_t task_output_inlined_bytes = 0;
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
result_id, data_size, meta_buffer, std::vector<ray::ObjectID>(),
task_output_inlined_bytes, result_ptr));
&task_output_inlined_bytes, result_ptr));
auto result = *result_ptr;
if (result != nullptr) {

View file

@ -5,6 +5,9 @@
from cpython.pystate cimport PyThreadState_Get
from libc.stdint cimport (
int64_t,
)
from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
@ -132,6 +135,11 @@ cdef class CoreWorker:
owner_address=*,
c_bool inline_small_object=*)
cdef unique_ptr[CAddress] _convert_python_address(self, address=*)
cdef store_task_output(
self, serialized_object, const CObjectID &return_id, size_t
data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
&contained_id, int64_t *task_output_inlined_bytes,
shared_ptr[CRayObject] *return_ptr)
cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)

View file

@ -1869,6 +1869,41 @@ cdef class CoreWorker:
c_owner_address,
serialized_object_status))
cdef store_task_output(self, serialized_object, const CObjectID &return_id,
size_t data_size, shared_ptr[CBuffer] &metadata,
const c_vector[CObjectID] &contained_id,
int64_t *task_output_inlined_bytes,
shared_ptr[CRayObject] *return_ptr):
"""Store a task return value in plasma or as an inlined object."""
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().AllocateReturnObject(
return_id, data_size, metadata, contained_id,
task_output_inlined_bytes, return_ptr))
if return_ptr.get() != NULL:
if return_ptr.get().HasData():
(<SerializedObject>serialized_object).write_to(
Buffer.make(return_ptr.get().GetData()))
if self.is_local_mode:
check_status(
CCoreWorkerProcess.GetCoreWorker().Put(
CRayObject(return_ptr.get().GetData(),
return_ptr.get().GetMetadata(),
c_vector[CObjectReference]()),
c_vector[CObjectID](), return_id))
else:
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().SealReturnObject(
return_id, return_ptr[0]))
return True
else:
with nogil:
success = (CCoreWorkerProcess.GetCoreWorker()
.PinExistingReturnObject(return_id, return_ptr))
return success
cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns):
@ -1877,7 +1912,6 @@ cdef class CoreWorker:
size_t data_size
shared_ptr[CBuffer] metadata
c_vector[CObjectID] contained_id
c_vector[CObjectID] return_ids_vector
int64_t task_output_inlined_bytes
if return_ids.size() == 0:
@ -1904,30 +1938,17 @@ cdef class CoreWorker:
contained_id = ObjectRefsToVector(
serialized_object.contained_object_refs)
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().AllocateReturnObject(
return_id, data_size, metadata, contained_id,
task_output_inlined_bytes, &returns[0][i]))
if returns[0][i].get() != NULL:
if returns[0][i].get().HasData():
(<SerializedObject>serialized_object).write_to(
Buffer.make(returns[0][i].get().GetData()))
if self.is_local_mode:
return_ids_vector.push_back(return_ids[i])
check_status(
CCoreWorkerProcess.GetCoreWorker().Put(
CRayObject(returns[0][i].get().GetData(),
returns[0][i].get().GetMetadata(),
c_vector[CObjectReference]()),
c_vector[CObjectID](), return_ids[i]))
return_ids_vector.clear()
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().SealReturnObject(
return_id, returns[0][i]))
if not self.store_task_output(
serialized_object, return_id,
data_size, metadata, contained_id,
&task_output_inlined_bytes, &returns[0][i]):
# If the object already exists, but we fail to pin the copy, it
# means the existing copy might've gotten evicted. Try to
# create another copy.
self.store_task_output(
serialized_object, return_id, data_size, metadata,
contained_id, &task_output_inlined_bytes,
&returns[0][i])
cdef c_function_descriptors_to_python(
self,

View file

@ -139,12 +139,16 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const size_t &data_size,
const shared_ptr[CBuffer] &metadata,
const c_vector[CObjectID] &contained_object_id,
int64_t &task_output_inlined_bytes,
int64_t *task_output_inlined_bytes,
shared_ptr[CRayObject] *return_object)
CRayStatus SealReturnObject(
const CObjectID& return_id,
shared_ptr[CRayObject] return_object
)
c_bool PinExistingReturnObject(
const CObjectID& return_id,
shared_ptr[CRayObject] *return_object
)
CJobID GetCurrentJobId()
CTaskID GetCurrentTaskId()

View file

@ -10,6 +10,7 @@ import ray
from ray._private.test_utils import (
wait_for_condition,
wait_for_pid_to_exit,
SignalActor,
)
SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
@ -778,6 +779,132 @@ def test_lineage_evicted(ray_start_cluster):
assert "ObjectReconstructionFailedLineageEvictedError" in str(e)
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_multiple_returns(ray_start_cluster, reconstruction_enabled):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"object_timeout_milliseconds": 200,
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = False
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_system_config=config,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
# Node to place the initial object.
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)
cluster.wait_for_nodes()
@ray.remote(num_returns=2)
def two_large_objects():
return (np.zeros(10**7, dtype=np.uint8), np.zeros(
10**7, dtype=np.uint8))
@ray.remote
def dependent_task(x):
return
obj1, obj2 = two_large_objects.remote()
ray.get(dependent_task.remote(obj1))
cluster.add_node(
num_cpus=1, resources={"node": 1}, object_store_memory=10**8)
ray.get(dependent_task.options(resources={"node": 1}).remote(obj1))
cluster.remove_node(node_to_kill, allow_graceful=False)
wait_for_condition(
lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10)
if reconstruction_enabled:
ray.get(dependent_task.remote(obj1))
ray.get(dependent_task.remote(obj2))
else:
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(dependent_task.remote(obj1))
ray.get(dependent_task.remote(obj2))
with pytest.raises(ray.exceptions.ObjectLostError):
ray.get(obj2)
@pytest.mark.parametrize("reconstruction_enabled", [False, True])
def test_nested(ray_start_cluster, reconstruction_enabled):
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 100,
"object_timeout_milliseconds": 200,
"fetch_fail_timeout_milliseconds": 10_000,
}
# Workaround to reset the config to the default value.
if not reconstruction_enabled:
config["lineage_pinning_enabled"] = False
cluster = ray_start_cluster
# Head node with no resources.
cluster.add_node(
num_cpus=0,
_system_config=config,
enable_object_reconstruction=reconstruction_enabled)
ray.init(address=cluster.address)
done_signal = SignalActor.remote()
exit_signal = SignalActor.remote()
ray.get(done_signal.wait.remote(should_wait=False))
ray.get(exit_signal.wait.remote(should_wait=False))
# Node to place the initial object.
node_to_kill = cluster.add_node(num_cpus=1, object_store_memory=10**8)
cluster.wait_for_nodes()
@ray.remote
def dependent_task(x):
return
@ray.remote
def large_object():
return np.zeros(10**7, dtype=np.uint8)
@ray.remote
def nested(done_signal, exit_signal):
ref = ray.put(np.zeros(10**7, dtype=np.uint8))
# Flush object store.
for _ in range(20):
ray.put(np.zeros(10**7, dtype=np.uint8))
dep = dependent_task.options(resources={"node": 1}).remote(ref)
ray.get(done_signal.send.remote(clear=True))
ray.get(dep)
return ray.get(ref)
ref = nested.remote(done_signal, exit_signal)
# Wait for task to get scheduled on the node to kill.
ray.get(done_signal.wait.remote())
# Wait for ray.put object to get transferred to the other node.
cluster.add_node(
num_cpus=2, resources={"node": 10}, object_store_memory=10**8)
ray.get(dependent_task.remote(ref))
# Destroy the task's output.
cluster.remove_node(node_to_kill, allow_graceful=False)
wait_for_condition(
lambda: not all(node["Alive"] for node in ray.nodes()), timeout=10)
if reconstruction_enabled:
# NOTE(swang): This is supposed to work because nested doesn't actually
# return any ObjectRefs. However, currently the ray.put in `nested`
# fails because the object already exists with a different owner.
# See https://github.com/ray-project/ray/issues/20713.
try:
ray.get(ref, timeout=60)
except ray.exceptions.RayTaskError as e:
assert isinstance(e.cause, ray.exceptions.ObjectFetchTimedOutError)
else:
with pytest.raises(ray.exceptions.ObjectLostError):
ray.get(ref, timeout=60)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))

View file

@ -927,13 +927,18 @@ Status CoreWorker::CreateOwned(const std::shared_ptr<Buffer> &metadata,
/* owner_address = */ rpc_address_, data,
created_by_worker);
}
if (!status.ok() || !data) {
if (!status.ok()) {
if (owned_by_us) {
reference_counter_->RemoveOwnedObject(*object_id);
} else {
RemoveLocalReference(*object_id);
}
return status;
} else if (*data == nullptr) {
// Object already exists in plasma. Store the in-memory value so that the
// client will check the plasma store.
RAY_CHECK(
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), *object_id));
}
}
return Status::OK();
@ -2005,7 +2010,7 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id,
const size_t &data_size,
const std::shared_ptr<Buffer> &metadata,
const std::vector<ObjectID> &contained_object_ids,
int64_t &task_output_inlined_bytes,
int64_t *task_output_inlined_bytes,
std::shared_ptr<RayObject> *return_object) {
rpc::Address owner_address(options_.is_local_mode
? rpc::Address()
@ -2026,10 +2031,10 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id,
if (options_.is_local_mode ||
(static_cast<int64_t>(data_size) < max_direct_call_object_size_ &&
// ensure we don't exceed the limit if we allocate this object inline.
(task_output_inlined_bytes + static_cast<int64_t>(data_size) <=
(*task_output_inlined_bytes + static_cast<int64_t>(data_size) <=
RayConfig::instance().task_rpc_inlined_bytes_limit()))) {
data_buffer = std::make_shared<LocalMemoryBuffer>(data_size);
task_output_inlined_bytes += static_cast<int64_t>(data_size);
*task_output_inlined_bytes += static_cast<int64_t>(data_size);
} else {
RAY_RETURN_NOT_OK(CreateExisting(metadata, data_size, object_id, owner_address,
&data_buffer,
@ -2193,14 +2198,12 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
Status CoreWorker::SealReturnObject(const ObjectID &return_id,
std::shared_ptr<RayObject> return_object) {
RAY_LOG(DEBUG) << "Sealing return object " << return_id;
Status status = Status::OK();
if (!return_object) {
return status;
}
RAY_CHECK(return_object);
RAY_CHECK(!options_.is_local_mode);
std::unique_ptr<rpc::Address> caller_address =
options_.is_local_mode ? nullptr
: std::make_unique<rpc::Address>(
worker_context_.GetCurrentTask()->CallerAddress());
std::make_unique<rpc::Address>(worker_context_.GetCurrentTask()->CallerAddress());
if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) {
status = SealExisting(return_id, /*pin_object=*/true, std::move(caller_address));
if (!status.ok()) {
@ -2211,6 +2214,56 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id,
return status;
}
bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id,
std::shared_ptr<RayObject> *return_object) {
// TODO(swang): If there is already an existing copy of this object, then it
// might not have the same value as the new copy. It would be better to evict
// the existing copy here.
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> result_map;
bool got_exception;
rpc::Address owner_address(worker_context_.GetCurrentTask()->CallerAddress());
// Temporarily set the return object's owner's address. This is needed to retrieve the
// value from plasma.
reference_counter_->AddLocalReference(return_id, "<temporary (pin return object)>");
reference_counter_->AddBorrowedObject(return_id, ObjectID::Nil(), owner_address);
auto status = plasma_store_provider_->Get({return_id}, 0, worker_context_, &result_map,
&got_exception);
// Remove the temporary ref.
RemoveLocalReference(return_id);
if (result_map.count(return_id)) {
*return_object = std::move(result_map[return_id]);
RAY_LOG(DEBUG) << "Pinning existing return object " << return_id
<< " owned by worker "
<< WorkerID::FromBinary(owner_address.worker_id());
// Keep the object in scope until it's been pinned.
std::shared_ptr<RayObject> pinned_return_object = *return_object;
// Asynchronously ask the raylet to pin the object. Note that this can fail
// if the raylet fails. We expect the owner of the object to handle that
// case (e.g., by detecting the raylet failure and storing an error).
local_raylet_client_->PinObjectIDs(
owner_address, {return_id},
[return_id, pinned_return_object](const Status &status,
const rpc::PinObjectIDsReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Failed to pin existing copy of the task return object "
<< return_id
<< ". This object may get evicted while there are still "
"references to it.";
}
});
return true;
} else {
// Failed to get the existing copy of the return object. It must have been
// evicted before we could pin it.
// TODO(swang): We should allow the owner to retry this task instead of
// immediately returning an error to the application.
return false;
}
}
std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(
const TaskSpecification &task_spec, const ActorID &actor_id) {
auto resource_ids = std::make_shared<ResourceMappingType>();

View file

@ -567,7 +567,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
Status AllocateReturnObject(const ObjectID &object_id, const size_t &data_size,
const std::shared_ptr<Buffer> &metadata,
const std::vector<ObjectID> &contained_object_id,
int64_t &task_output_inlined_bytes,
int64_t *task_output_inlined_bytes,
std::shared_ptr<RayObject> *return_object);
/// Seal a return object for an executing task. The caller should already have
@ -579,6 +579,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
Status SealReturnObject(const ObjectID &return_id,
std::shared_ptr<RayObject> return_object);
/// Pin the local copy of the return object, if one exists.
///
/// \param[in] return_id ObjectID of the return value.
/// \param[out] return_object The object that was pinned.
/// \return success if the object still existed and was pinned. Note that
/// pinning is done asynchronously.
bool PinExistingReturnObject(const ObjectID &return_id,
std::shared_ptr<RayObject> *return_object);
/// Get a handle to an actor.
///
/// NOTE: This function should be called ONLY WHEN we know actor handle exists.

View file

@ -185,7 +185,7 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().AllocateReturnObject(
result_id, data_size, metadata, contained_object_ids,
task_output_inlined_bytes, result_ptr));
&task_output_inlined_bytes, result_ptr));
// A nullptr is returned if the object already exists.
auto result = *result_ptr;

View file

@ -102,7 +102,13 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
ObjectID id = ObjectID::FromIndex(task_spec.TaskId(), /*index=*/i + 1);
return_object->set_object_id(id.Binary());
// The object is nullptr if it already existed in the object store.
if (!return_objects[i]) {
// This should only happen if the local raylet died. Caller should
// retry the task.
RAY_LOG(WARNING) << "Failed to create task return object " << id
<< " in the object store, exiting.";
QuickExit();
}
const auto &result = return_objects[i];
return_object->set_size(result->GetSize());
if (result->GetData() != nullptr && result->GetData()->IsPlasmaBuffer()) {

View file

@ -685,7 +685,6 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
// need to do anything here.
return;
} else if (!status.ok() || !is_actor_creation) {
RAY_LOG(DEBUG) << "Task failed with error: " << status;
// Successful actor creation leases the worker indefinitely from the raylet.
OnWorkerIdle(addr, scheduling_key,
/*error=*/!status.ok(), assigned_resources);

View file

@ -22,9 +22,10 @@ namespace ray {
namespace raylet {
void LocalObjectManager::PinObjects(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> &&objects,
const rpc::Address &owner_address) {
void LocalObjectManager::PinObjectsAndWaitForFree(
const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> &&objects,
const rpc::Address &owner_address) {
for (size_t i = 0; i < object_ids.size(); i++) {
const auto &object_id = object_ids[i];
auto &object = objects[i];
@ -33,15 +34,28 @@ void LocalObjectManager::PinObjects(const std::vector<ObjectID> &object_ids,
<< " was evicted before the raylet could pin it.";
continue;
}
RAY_LOG(DEBUG) << "Pinning object " << object_id;
pinned_objects_size_ += object->GetSize();
pinned_objects_.emplace(object_id, std::make_pair(std::move(object), owner_address));
}
}
void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address,
const std::vector<ObjectID> &object_ids) {
for (const auto &object_id : object_ids) {
const auto inserted = objects_waiting_for_free_.emplace(object_id, owner_address);
if (inserted.second) {
// This is the first time we're pinning this object.
RAY_LOG(DEBUG) << "Pinning object " << object_id;
pinned_objects_size_ += object->GetSize();
pinned_objects_.emplace(object_id, std::move(object));
} else {
if (inserted.first->second.worker_id() != owner_address.worker_id()) {
// TODO(swang): Handle this case. We should use the new owner address
// and object copy.
auto original_worker_id =
WorkerID::FromBinary(inserted.first->second.worker_id());
auto new_worker_id = WorkerID::FromBinary(owner_address.worker_id());
RAY_LOG(WARNING)
<< "Received PinObjects request from a different owner " << new_worker_id
<< " from the original " << original_worker_id << ". Object " << object_id
<< " may get freed while the new owner still has the object in scope.";
}
continue;
}
// Create a object eviction subscription message.
auto wait_request = std::make_unique<rpc::WorkerObjectEvictionSubMessage>();
wait_request->set_object_id(object_id.Binary());
@ -64,8 +78,8 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address,
};
// Callback that is invoked when the owner of the object id is dead.
auto owner_dead_callback = [this](const std::string &object_id_binary,
const Status &) {
auto owner_dead_callback = [this, owner_address](const std::string &object_id_binary,
const Status &) {
const auto object_id = ObjectID::FromBinary(object_id_binary);
ReleaseFreedObject(object_id);
};
@ -82,13 +96,16 @@ void LocalObjectManager::WaitForObjectFree(const rpc::Address &owner_address,
void LocalObjectManager::ReleaseFreedObject(const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Unpinning object " << object_id;
if (!objects_waiting_for_free_.erase(object_id)) {
return;
}
// The object should be in one of these stats. pinned, spilling, or spilled.
RAY_CHECK((pinned_objects_.count(object_id) > 0) ||
(spilled_objects_url_.count(object_id) > 0) ||
(objects_pending_spill_.count(object_id) > 0));
spilled_object_pending_delete_.push(object_id);
if (pinned_objects_.count(object_id)) {
pinned_objects_size_ -= pinned_objects_[object_id].first->GetSize();
pinned_objects_size_ -= pinned_objects_[object_id]->GetSize();
pinned_objects_.erase(object_id);
}
@ -150,7 +167,7 @@ bool LocalObjectManager::SpillObjectsOfSize(int64_t num_bytes_to_spill) {
while (bytes_to_spill <= num_bytes_to_spill && it != pinned_objects_.end() &&
counts < max_fused_object_count_) {
if (is_plasma_object_spillable_(it->first)) {
bytes_to_spill += it->second.first->GetSize();
bytes_to_spill += it->second->GetSize();
objects_to_spill.push_back(it->first);
}
it++;
@ -220,7 +237,7 @@ void LocalObjectManager::SpillObjectsInternal(
objects_to_spill.push_back(id);
// Move a pinned object to the pending spill object.
auto object_size = it->second.first->GetSize();
auto object_size = it->second->GetSize();
num_bytes_pending_spill_ += object_size;
objects_pending_spill_[id] = std::move(it->second);
@ -238,16 +255,23 @@ void LocalObjectManager::SpillObjectsInternal(
io_worker_pool_.PopSpillWorker(
[this, objects_to_spill, callback](std::shared_ptr<WorkerInterface> io_worker) {
rpc::SpillObjectsRequest request;
std::vector<ObjectID> requested_objects_to_spill;
for (const auto &object_id : objects_to_spill) {
RAY_LOG(DEBUG) << "Sending spill request for object " << object_id;
auto ref = request.add_object_refs_to_spill();
ref->set_object_id(object_id.Binary());
auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());
ref->mutable_owner_address()->CopyFrom(it->second.second);
RAY_CHECK(objects_pending_spill_.count(object_id));
auto owner_it = objects_waiting_for_free_.find(object_id);
// If the object hasn't already been freed, spill it.
if (owner_it == objects_waiting_for_free_.end()) {
objects_pending_spill_.erase(object_id);
} else {
auto ref = request.add_object_refs_to_spill();
ref->set_object_id(object_id.Binary());
ref->mutable_owner_address()->CopyFrom(owner_it->second);
RAY_LOG(DEBUG) << "Sending spill request for object " << object_id;
requested_objects_to_spill.push_back(object_id);
}
}
io_worker->rpc_client()->SpillObjects(
request, [this, objects_to_spill, callback, io_worker](
request, [this, requested_objects_to_spill, callback, io_worker](
const ray::Status &status, const rpc::SpillObjectsReply &r) {
{
absl::MutexLock lock(&mutex_);
@ -258,13 +282,14 @@ void LocalObjectManager::SpillObjectsInternal(
// Object spilling is always done in the order of the request.
// For example, if an object succeeded, it'll guarentee that all objects
// before this will succeed.
RAY_CHECK(num_objects_spilled <= objects_to_spill.size());
for (size_t i = num_objects_spilled; i != objects_to_spill.size(); ++i) {
const auto &object_id = objects_to_spill[i];
RAY_CHECK(num_objects_spilled <= requested_objects_to_spill.size());
for (size_t i = num_objects_spilled; i != requested_objects_to_spill.size();
++i) {
const auto &object_id = requested_objects_to_spill[i];
auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());
pinned_objects_size_ += it->second.first->GetSize();
num_bytes_pending_spill_ -= it->second.first->GetSize();
pinned_objects_size_ += it->second->GetSize();
num_bytes_pending_spill_ -= it->second->GetSize();
pinned_objects_.emplace(object_id, std::move(it->second));
objects_pending_spill_.erase(it);
}
@ -273,7 +298,7 @@ void LocalObjectManager::SpillObjectsInternal(
RAY_LOG(ERROR) << "Failed to send object spilling request: "
<< status.ToString();
} else {
OnObjectSpilled(objects_to_spill, r);
OnObjectSpilled(requested_objects_to_spill, r);
}
if (callback) {
callback(status);
@ -313,8 +338,7 @@ void LocalObjectManager::OnObjectSpilled(const std::vector<ObjectID> &object_ids
RAY_LOG(DEBUG) << "Unpinning pending spill object " << object_id;
auto it = objects_pending_spill_.find(object_id);
RAY_CHECK(it != objects_pending_spill_.end());
const auto object_size = it->second.first->GetSize();
const auto worker_addr = it->second.second;
const auto object_size = it->second->GetSize();
num_bytes_pending_spill_ -= object_size;
objects_pending_spill_.erase(it);
@ -325,6 +349,14 @@ void LocalObjectManager::OnObjectSpilled(const std::vector<ObjectID> &object_ids
request.set_spilled_node_id(node_id_object_spilled.Binary());
request.set_size(object_size);
auto owner_it = objects_waiting_for_free_.find(object_id);
if (owner_it == objects_waiting_for_free_.end()) {
RAY_LOG(DEBUG) << "Spilled object already freed, skipping send of spilled URL to "
"object directory for object "
<< object_id;
continue;
}
const auto &worker_addr = owner_it->second;
auto owner_client = owner_client_pool_.GetOrConnect(worker_addr);
RAY_LOG(DEBUG) << "Sending spilled URL " << object_url << " for object " << object_id
<< " to owner " << WorkerID::FromBinary(worker_addr.worker_id());

View file

@ -64,22 +64,17 @@ class LocalObjectManager {
/// Pin objects.
///
/// Also wait for the objects' owner to free the object. The objects will be
/// released when the owner at the given address fails or replies that the
/// object can be evicted.
///
/// \param object_ids The objects to be pinned.
/// \param objects Pointers to the objects to be pinned. The pointer should
/// be kept in scope until the object can be released.
/// \param owner_address The owner of the objects to be pinned.
void PinObjects(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> &&objects,
const rpc::Address &owner_address);
/// Wait for the objects' owner to free the object. The objects will be
/// released when the owner at the given address fails or replies that the
/// object can be evicted.
///
/// \param owner_address The address of the owner of the objects.
/// \param object_ids The objects to be freed.
void WaitForObjectFree(const rpc::Address &owner_address,
const std::vector<ObjectID> &object_ids);
void PinObjectsAndWaitForFree(const std::vector<ObjectID> &object_ids,
std::vector<std::unique_ptr<RayObject>> &&objects,
const rpc::Address &owner_address);
/// Spill objects as much as possible as fast as possible up to the max throughput.
///
@ -201,9 +196,11 @@ class LocalObjectManager {
/// A callback to call when an object has been freed.
std::function<void(const std::vector<ObjectID> &)> on_objects_freed_;
/// Hashmap from objects that we are waiting to free to their owner address.
absl::flat_hash_map<ObjectID, rpc::Address> objects_waiting_for_free_;
// Objects that are pinned on this node.
absl::flat_hash_map<ObjectID, std::pair<std::unique_ptr<RayObject>, rpc::Address>>
pinned_objects_;
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> pinned_objects_;
// Total size of objects pinned on this node.
size_t pinned_objects_size_ = 0;
@ -211,8 +208,7 @@ class LocalObjectManager {
// Objects that were pinned on this node but that are being spilled.
// These objects will be released once spilling is complete and the URL is
// written to the object directory.
absl::flat_hash_map<ObjectID, std::pair<std::unique_ptr<RayObject>, rpc::Address>>
objects_pending_spill_;
absl::flat_hash_map<ObjectID, std::unique_ptr<RayObject>> objects_pending_spill_;
/// Objects that were spilled on this node but that are being restored.
/// The field is used to dedup the same restore request while restoration is in
@ -310,6 +306,8 @@ class LocalObjectManager {
/// The last time a restore log finished.
int64_t last_restore_log_ns_ = 0;
friend class LocalObjectManagerTest;
};
}; // namespace raylet

View file

@ -2170,9 +2170,9 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr);
return;
}
local_object_manager_.PinObjects(object_ids, std::move(results), owner_address);
// Wait for the object to be freed by the owner, which keeps the ref count.
local_object_manager_.WaitForObjectFree(owner_address, object_ids);
local_object_manager_.PinObjectsAndWaitForFree(object_ids, std::move(results),
owner_address);
send_reply_callback(Status::OK(), nullptr, nullptr);
}

View file

@ -43,24 +43,35 @@ class MockSubscriber : public pubsub::SubscriberInterface {
pubsub::SubscribeDoneCallback subscribe_done_callback,
pubsub::SubscriptionItemCallback subscription_callback,
pubsub::SubscriptionFailureCallback subscription_failure_callback) override {
callbacks.push_back(
auto worker_id = WorkerID::FromBinary(owner_address.worker_id());
callbacks[worker_id].push_back(
std::make_pair(ObjectID::FromBinary(key_id_binary), subscription_callback));
return true;
}
bool PublishObjectEviction() {
bool PublishObjectEviction(WorkerID worker_id = WorkerID::Nil()) {
if (callbacks.empty()) {
return false;
}
auto object_id = callbacks.front().first;
auto callback = callbacks.front().second;
auto cbs = callbacks.begin();
if (!worker_id.IsNil()) {
cbs = callbacks.find(worker_id);
}
if (cbs == callbacks.end() || cbs->second.empty()) {
return false;
}
auto object_id = cbs->second.front().first;
auto callback = cbs->second.front().second;
auto msg = rpc::PubMessage();
msg.set_key_id(object_id.Binary());
msg.set_channel_type(channel_type_);
auto *object_eviction_msg = msg.mutable_worker_object_eviction_message();
object_eviction_msg->set_object_id(object_id.Binary());
callback(msg);
callbacks.pop_front();
cbs->second.pop_front();
if (cbs->second.empty()) {
callbacks.erase(cbs);
}
return true;
}
@ -86,7 +97,9 @@ class MockSubscriber : public pubsub::SubscriberInterface {
MOCK_CONST_METHOD0(DebugString, std::string());
rpc::ChannelType channel_type_ = rpc::ChannelType::WORKER_OBJECT_EVICTION;
std::deque<std::pair<ObjectID, pubsub::SubscriptionItemCallback>> callbacks;
std::unordered_map<WorkerID,
std::deque<std::pair<ObjectID, pubsub::SubscriptionItemCallback>>>
callbacks;
};
class MockWorkerClient : public rpc::CoreWorkerClientInterface {
@ -289,6 +302,16 @@ class LocalObjectManagerTest : public ::testing::Test {
RayConfig::instance().initialize(R"({"object_spilling_config": "dummy"})");
}
void AssertNoLeaks() {
// TODO(swang): Assert this for all tests.
ASSERT_TRUE(manager.pinned_objects_size_ == 0);
ASSERT_TRUE(manager.pinned_objects_.empty());
ASSERT_TRUE(manager.spilled_objects_url_.empty());
ASSERT_TRUE(manager.objects_pending_spill_.empty());
ASSERT_TRUE(manager.url_ref_count_.empty());
ASSERT_TRUE(manager.objects_waiting_for_free_.empty());
}
void TearDown() { unevictable_objects_.clear(); }
std::string BuildURL(const std::string url, int offset = 0, int num_objects = 1) {
@ -331,8 +354,7 @@ TEST_F(LocalObjectManagerTest, TestPin) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.WaitForObjectFree(owner_address, object_ids);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
for (size_t i = 0; i < free_objects_batch_size; i++) {
ASSERT_TRUE(freed.empty());
@ -358,7 +380,7 @@ TEST_F(LocalObjectManagerTest, TestRestoreSpilledObject) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
manager.SpillObjects(object_ids,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
@ -414,7 +436,7 @@ TEST_F(LocalObjectManagerTest, TestExplicitSpill) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
int num_times_fired = 0;
manager.SpillObjects(object_ids, [&](const Status &status) mutable {
@ -459,8 +481,7 @@ TEST_F(LocalObjectManagerTest, TestDuplicateSpill) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.WaitForObjectFree(owner_address, object_ids);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
int num_times_fired = 0;
manager.SpillObjects(object_ids, [&](const Status &status) mutable {
@ -513,7 +534,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectsOfSize) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
ASSERT_TRUE(manager.SpillObjectsOfSize(total_size / 2));
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
@ -580,7 +601,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxFuseCount) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
ASSERT_TRUE(manager.SpillObjectsOfSize(total_size));
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
@ -626,7 +647,7 @@ TEST_F(LocalObjectManagerTest, TestSpillObjectNotEvictable) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
ASSERT_FALSE(manager.SpillObjectsOfSize(1000));
for (const auto &id : object_ids) {
ASSERT_EQ((*unpins)[id], 0);
@ -655,7 +676,7 @@ TEST_F(LocalObjectManagerTest, TestSpillUptoMaxThroughput) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
// This will spill until 2 workers are occupied.
manager.SpillObjectUptoMaxThroughput();
@ -722,7 +743,7 @@ TEST_F(LocalObjectManagerTest, TestSpillError) {
std::vector<std::unique_ptr<RayObject>> objects;
objects.push_back(std::move(object));
manager.PinObjects({object_id}, std::move(objects), owner_address);
manager.PinObjectsAndWaitForFree({object_id}, std::move(objects), owner_address);
int num_times_fired = 0;
manager.SpillObjects({object_id}, [&](const Status &status) mutable {
@ -767,7 +788,7 @@ TEST_F(LocalObjectManagerTest, TestPartialSpillError) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
manager.SpillObjects(object_ids,
[&](const Status &status) mutable { ASSERT_TRUE(status.ok()); });
@ -803,8 +824,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteNoSpilledObjects) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.WaitForObjectFree(owner_address, object_ids);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
for (size_t i = 0; i < free_objects_batch_size; i++) {
ASSERT_TRUE(freed.empty());
@ -832,8 +852,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpilledObjects) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.WaitForObjectFree(owner_address, object_ids);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
// 2 Objects are spilled out of 3.
std::vector<ObjectID> object_ids_to_spill;
@ -881,8 +900,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCount) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.WaitForObjectFree(owner_address, object_ids);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
// Every object is spilled.
std::vector<ObjectID> object_ids_to_spill;
@ -942,8 +960,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.WaitForObjectFree(owner_address, object_ids);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
// Objects are spilled.
std::vector<ObjectID> spill_set_1;
@ -991,7 +1008,9 @@ TEST_F(LocalObjectManagerTest, TestDeleteSpillingObjectsBlocking) {
// Now spilling is completely done.
ASSERT_TRUE(worker_pool.io_worker_client->ReplySpillObjects(urls_spill_set_2));
for (size_t i = 0; i < spill_set_2_size; i++) {
ASSERT_TRUE(owner_client->ReplyAddSpilledUrl());
// These fail because the object is already freed, so the raylet does not
// send the RPC.
ASSERT_FALSE(owner_client->ReplyAddSpilledUrl());
}
// Every object is now deleted.
@ -1016,8 +1035,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteMaxObjects) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.WaitForObjectFree(owner_address, object_ids);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
std::vector<ObjectID> object_ids_to_spill;
int spilled_urls_size = free_objects_batch_size;
@ -1068,8 +1086,7 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) {
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjects(object_ids, std::move(objects), owner_address);
manager.WaitForObjectFree(owner_address, object_ids);
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
// Every object is spilled.
std::vector<ObjectID> object_ids_to_spill;
@ -1110,6 +1127,70 @@ TEST_F(LocalObjectManagerTest, TestDeleteURLRefCountRaceCondition) {
ASSERT_EQ(deleted_urls_size, 1);
}
TEST_F(LocalObjectManagerTest, TestDuplicatePin) {
rpc::Address owner_address;
owner_address.set_worker_id(WorkerID::FromRandom().Binary());
std::vector<ObjectID> object_ids;
for (size_t i = 0; i < free_objects_batch_size; i++) {
ObjectID object_id = ObjectID::FromRandom();
object_ids.push_back(object_id);
}
std::vector<std::unique_ptr<RayObject>> objects;
for (size_t i = 0; i < free_objects_batch_size; i++) {
std::string meta = std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto object = std::make_unique<RayObject>(nullptr, meta_buffer,
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
// Receive a duplicate pin with the same owner. Same objects should not get
// pinned again.
objects.clear();
for (size_t i = 0; i < free_objects_batch_size; i++) {
std::string meta = std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto object = std::make_unique<RayObject>(nullptr, meta_buffer,
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address);
// Receive a duplicate pin with a different owner.
objects.clear();
for (size_t i = 0; i < free_objects_batch_size; i++) {
std::string meta = std::to_string(static_cast<int>(rpc::ErrorType::OBJECT_IN_PLASMA));
auto metadata = const_cast<uint8_t *>(reinterpret_cast<const uint8_t *>(meta.data()));
auto meta_buffer = std::make_shared<LocalMemoryBuffer>(metadata, meta.size());
auto object = std::make_unique<RayObject>(nullptr, meta_buffer,
std::vector<rpc::ObjectReference>());
objects.push_back(std::move(object));
}
rpc::Address owner_address2;
owner_address2.set_worker_id(WorkerID::FromRandom().Binary());
manager.PinObjectsAndWaitForFree(object_ids, std::move(objects), owner_address2);
// No subscribe to the second owner.
auto owner_id2 = WorkerID::FromBinary(owner_address2.worker_id());
ASSERT_FALSE(subscriber_->PublishObjectEviction(owner_id2));
// Free on messages from the original owner.
auto owner_id1 = WorkerID::FromBinary(owner_address.worker_id());
for (size_t i = 0; i < free_objects_batch_size; i++) {
ASSERT_TRUE(freed.empty());
EXPECT_CALL(*subscriber_, Unsubscribe(_, _, object_ids[i].Binary()));
ASSERT_TRUE(subscriber_->PublishObjectEviction(owner_id1));
}
std::unordered_set<ObjectID> expected(object_ids.begin(), object_ids.end());
ASSERT_EQ(freed, expected);
AssertNoLeaks();
}
} // namespace raylet
} // namespace ray