[Object spilling] Update object directory and reload spilled objects automatically (#11021)

* Fix pytest...

* Release objects that have been spilled

* GCS object table interface refactor

* Add spilled URL to object location info

* refactor to include spilled URL in notifications

* improve tests

* Add spilled URL to object directory results

* Remove force restore call

* Merge spilled URL and location

* fix

* CI

* build

* osx

* Fix multitenancy issues

* Skip windows tests
This commit is contained in:
Stephanie Wang 2020-10-02 15:52:42 -07:00 committed by GitHub
parent c17169dc11
commit ada58abcd9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
43 changed files with 499 additions and 430 deletions

View file

@ -1497,13 +1497,6 @@ cdef class CoreWorker:
check_status(CCoreWorkerProcess.GetCoreWorker()
.SpillObjects(object_ids))
def force_restore_spilled_objects(self, object_refs):
cdef c_vector[CObjectID] object_ids
object_ids = ObjectRefsToVector(object_refs)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.ForceRestoreSpilledObjects(object_ids))
cdef void async_set_result(shared_ptr[CRayObject] obj,
CObjectID object_ref,
void *future) with gil:

View file

@ -1,7 +1,6 @@
from .dynamic_resources import set_resource
from .object_spilling import force_spill_objects, force_restore_spilled_objects
from .object_spilling import force_spill_objects
__all__ = [
"set_resource",
"force_spill_objects",
"force_restore_spilled_objects",
]

View file

@ -16,20 +16,3 @@ def force_spill_objects(object_refs):
f"Attempting to call `force_spill_objects` on the "
f"value {object_ref}, which is not an ray.ObjectRef.")
return core_worker.force_spill_objects(object_refs)
def force_restore_spilled_objects(object_refs):
"""Force restoring objects from external storage.
Args:
object_refs: Object refs of the objects to be
restored.
"""
core_worker = ray.worker.global_worker.core_worker
# Make sure that the values are object refs.
for object_ref in object_refs:
if not isinstance(object_ref, ray.ObjectRef):
raise TypeError(
f"Attempting to call `force_restore_spilled_objects` on the "
f"value {object_ref}, which is not an ray.ObjectRef.")
return core_worker.force_restore_spilled_objects(object_refs)

View file

@ -200,8 +200,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const double capacity,
const CNodeID &client_Id)
CRayStatus SpillObjects(const c_vector[CObjectID] &object_ids)
CRayStatus ForceRestoreSpilledObjects(
const c_vector[CObjectID] &object_ids)
cdef cppclass CCoreWorkerOptions "ray::CoreWorkerOptions":
CWorkerType worker_type

View file

@ -1,5 +1,7 @@
import json
import random
import platform
import sys
import time
import numpy as np
@ -8,6 +10,8 @@ import psutil
import ray
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_objects_manually(shutdown_only):
# Limit our object store to 75 MiB of memory.
ray.init(
@ -25,7 +29,6 @@ def test_spill_objects_manually(shutdown_only):
arr = np.random.rand(1024 * 1024) # 8 MB data
replay_buffer = []
pinned_objects = set()
spilled_objects = set()
# Create objects of more than 200 MiB.
for _ in range(25):
@ -38,7 +41,6 @@ def test_spill_objects_manually(shutdown_only):
except ray.exceptions.ObjectStoreFullError:
ref_to_spill = pinned_objects.pop()
ray.experimental.force_spill_objects([ref_to_spill])
spilled_objects.add(ref_to_spill)
def is_worker(cmdline):
return cmdline and cmdline[0].startswith("ray::")
@ -54,17 +56,16 @@ def test_spill_objects_manually(shutdown_only):
# restoring objects back.
refs_to_spill = (pinned_objects.pop(), pinned_objects.pop())
ray.experimental.force_spill_objects(refs_to_spill)
spilled_objects.update(refs_to_spill)
# randomly sample objects
for _ in range(100):
ref = random.choice(replay_buffer)
if ref in spilled_objects:
ray.experimental.force_restore_spilled_objects([ref])
sample = ray.get(ref)
assert np.array_equal(sample, arr)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
def test_spill_objects_manually_from_workers(shutdown_only):
# Limit our object store to 100 MiB of memory.
ray.init(
@ -82,15 +83,22 @@ def test_spill_objects_manually_from_workers(shutdown_only):
@ray.remote
def _worker():
arr = np.random.rand(100 * 1024)
arr = np.random.rand(1024 * 1024) # 8 MB data
ref = ray.put(arr)
ray.experimental.force_spill_objects([ref])
ray.experimental.force_restore_spilled_objects([ref])
assert np.array_equal(ray.get(ref), arr)
return ref
ray.get([_worker.remote() for _ in range(50)])
# Create objects of more than 200 MiB.
replay_buffer = [ray.get(_worker.remote()) for _ in range(25)]
values = {ref: np.copy(ray.get(ref)) for ref in replay_buffer}
# Randomly sample objects.
for _ in range(100):
ref = random.choice(replay_buffer)
sample = ray.get(ref)
assert np.array_equal(sample, values[ref])
@pytest.mark.skip(reason="Not implemented yet.")
def test_spill_objects_manually_with_workers(shutdown_only):
# Limit our object store to 75 MiB of memory.
ray.init(
@ -118,27 +126,29 @@ def test_spill_objects_manually_with_workers(shutdown_only):
assert np.array_equal(restored, arr)
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"num_cpus": 0,
"object_store_memory": 75 * 1024 * 1024,
"_object_spilling_config": {
"object_spilling_config": {
"type": "filesystem",
"params": {
"directory_path": "/tmp"
}
},
"_system_config": json.dumps({
"_system_config": {
"object_store_full_max_retries": 0,
"max_io_workers": 4,
}),
},
}],
indirect=True)
def test_spill_remote_object(ray_start_cluster_head):
cluster = ray_start_cluster_head
cluster.add_node(
object_store_memory=75 * 1024 * 1024,
_object_spilling_config={
object_spilling_config={
"type": "filesystem",
"params": {
"directory_path": "/tmp"
@ -149,23 +159,33 @@ def test_spill_remote_object(ray_start_cluster_head):
def put():
return np.random.rand(5 * 1024 * 1024) # 40 MB data
# Create 2 objects. Only 1 should fit.
@ray.remote
def depends(arg):
return
ref = put.remote()
ray.get(ref)
copy = np.copy(ray.get(ref))
# Evict local copy.
ray.put(np.random.rand(5 * 1024 * 1024)) # 40 MB data
# Remote copy should not fit.
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(put.remote())
time.sleep(1)
# Spill 1 object. The second should now fit.
ray.experimental.force_spill_objects([ref])
ray.get(put.remote())
# TODO(swang): Restoring from the object directory is not yet supported.
# ray.experimental.force_restore_spilled_objects([ref])
# sample = ray.get(ref)
# assert np.array_equal(sample, copy)
sample = ray.get(ref)
assert np.array_equal(sample, copy)
# Evict the spilled object.
del sample
ray.get(put.remote())
ray.put(np.random.rand(5 * 1024 * 1024)) # 40 MB data
# Test passing the spilled object as an arg to another task.
ray.get(depends.remote(ref))
@pytest.mark.skip(reason="have not been fully implemented")
@pytest.mark.skip(reason="Not implemented yet.")
def test_spill_objects_automatically(shutdown_only):
# Limit our object store to 75 MiB of memory.
ray.init(
@ -196,3 +216,7 @@ def test_spill_objects_automatically(shutdown_only):
ref = random.choice(replay_buffer)
sample = ray.get(ref, timeout=0)
assert np.array_equal(sample, arr)
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))

View file

@ -496,13 +496,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
auto object_lookup_fn = [this](const ObjectID &object_id,
const ObjectLookupCallback &callback) {
return gcs_client_->Objects().AsyncGetLocations(
object_id,
[this, object_id, callback](const Status &status,
const std::vector<rpc::ObjectTableData> &results) {
object_id, [this, object_id, callback](
const Status &status,
const boost::optional<rpc::ObjectLocationInfo> &result) {
RAY_CHECK_OK(status);
std::vector<rpc::Address> locations;
for (const auto &result : results) {
const auto &node_id = NodeID::FromBinary(result.manager());
for (const auto &loc : result->locations()) {
const auto &node_id = NodeID::FromBinary(loc.manager());
auto node = gcs_client_->Nodes().Get(node_id);
RAY_CHECK(node.has_value());
if (node->state() == rpc::GcsNodeInfo::ALIVE) {
@ -1170,7 +1170,13 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id,
// Find the raylet that hosts the primary copy of the object.
NodeID pinned_at;
RAY_CHECK(reference_counter_->IsPlasmaObjectPinned(object_id, &pinned_at));
bool spilled;
RAY_CHECK(
reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &pinned_at, &spilled));
if (spilled) {
// The object has already been spilled.
return;
}
auto node = gcs_client_->Nodes().Get(pinned_at);
if (pinned_at.IsNil() || !node) {
RAY_LOG(ERROR) << "Primary raylet for object " << object_id << " unreachable";
@ -1179,6 +1185,7 @@ void CoreWorker::SpillOwnedObject(const ObjectID &object_id,
}
// Ask the raylet to spill the object.
RAY_LOG(DEBUG) << "Sending spill request to raylet for object " << object_id;
auto raylet_client =
std::make_shared<raylet::RayletClient>(rpc::NodeManagerWorkerClient::make(
node->node_manager_address(), node->node_manager_port(),
@ -1237,11 +1244,11 @@ Status CoreWorker::SpillObjects(const std::vector<ObjectID> &object_ids) {
}
ready_promise->get_future().wait();
return final_status;
}
Status CoreWorker::ForceRestoreSpilledObjects(const std::vector<ObjectID> &object_ids) {
return local_raylet_client_->ForceRestoreSpilledObjects(object_ids);
for (const auto &object_id : object_ids) {
reference_counter_->HandleObjectSpilled(object_id);
}
return final_status;
}
std::unordered_map<std::string, double> AddPlacementGroupConstraint(

View file

@ -632,11 +632,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// to spill the object.
Status SpillObjects(const std::vector<ObjectID> &object_ids);
/// Restore objects from external storage.
/// \param[in] object_ids The objects to be restored.
/// \return Status
Status ForceRestoreSpilledObjects(const std::vector<ObjectID> &object_ids);
/// Submit a normal task.
///
/// \param[in] function The remote function to execute.

View file

@ -21,7 +21,9 @@ namespace ray {
Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
// Check the ReferenceCounter to see if there is a location for the object.
NodeID pinned_at;
bool owned_by_us = reference_counter_->IsPlasmaObjectPinned(object_id, &pinned_at);
bool spilled;
bool owned_by_us =
reference_counter_->IsPlasmaObjectPinnedOrSpilled(object_id, &pinned_at, &spilled);
if (!owned_by_us) {
return Status::Invalid(
"Object reference no longer exists or is not owned by us. Either lineage pinning "
@ -29,7 +31,7 @@ Status ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
}
bool already_pending_recovery = true;
if (pinned_at.IsNil()) {
if (pinned_at.IsNil() && !spilled) {
{
absl::MutexLock lock(&mu_);
// Mark that we are attempting recovery for this object to prevent

View file

@ -492,6 +492,9 @@ bool ReferenceCounter::SetDeleteCallback(
// The object has been freed by the language frontend, so it
// should be deleted immediately.
return false;
} else if (it->second.spilled) {
// The object has been spilled, so it can be released immediately.
return false;
}
// NOTE: In two cases, `GcsActorManager` will send `WaitForActorOutOfScope` request more
@ -539,12 +542,14 @@ void ReferenceCounter::UpdateObjectPinnedAtRaylet(const ObjectID &object_id,
}
}
bool ReferenceCounter::IsPlasmaObjectPinned(const ObjectID &object_id,
NodeID *pinned_at) const {
bool ReferenceCounter::IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id,
NodeID *pinned_at,
bool *spilled) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it != object_id_refs_.end()) {
if (it->second.owned_by_us) {
*spilled = it->second.spilled;
*pinned_at = it->second.pinned_at_raylet_id.value_or(NodeID::Nil());
return true;
}
@ -920,6 +925,19 @@ std::unordered_set<NodeID> ReferenceCounter::GetObjectLocations(
return locations;
}
void ReferenceCounter::HandleObjectSpilled(const ObjectID &object_id) {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
if (it == object_id_refs_.end()) {
RAY_LOG(WARNING) << "Spilled object " << object_id << " already out of scope";
return;
}
it->second.spilled = true;
// Release the primary plasma copy, if any.
ReleasePlasmaObject(it);
}
ReferenceCounter::Reference ReferenceCounter::Reference::FromProto(
const rpc::ObjectReferenceCount &ref_count) {
Reference ref;

View file

@ -322,16 +322,19 @@ class ReferenceCounter : public ReferenceCounterInterface {
void UpdateObjectPinnedAtRaylet(const ObjectID &object_id, const NodeID &raylet_id)
LOCKS_EXCLUDED(mutex_);
/// Check whether the object is pinned at a remote plasma store node.
/// Check whether the object is pinned at a remote plasma store node or
/// spilled to external storage. In either case, a copy of the object is
/// available to fetch.
///
/// \param[in] object_id The object to check.
/// \param[out] pinned_at The node ID of the raylet at which this object is
/// \param[out] spilled Whether this object has been spilled.
/// pinned. Set to nil if the object is not pinned.
/// \return True if the object exists and is owned by us, false otherwise. We
/// return false here because a borrower should not know the pinned location
/// for an object.
bool IsPlasmaObjectPinned(const ObjectID &object_id, NodeID *pinned_at) const
LOCKS_EXCLUDED(mutex_);
bool IsPlasmaObjectPinnedOrSpilled(const ObjectID &object_id, NodeID *pinned_at,
bool *spilled) const LOCKS_EXCLUDED(mutex_);
/// Get and reset the objects that were pinned on the given node. This
/// method should be called upon a node failure, to determine which plasma
@ -376,6 +379,12 @@ class ReferenceCounter : public ReferenceCounterInterface {
std::unordered_set<NodeID> GetObjectLocations(const ObjectID &object_id)
LOCKS_EXCLUDED(mutex_);
/// Handle an object has been spilled to external storage.
///
/// This notifies the primary raylet that the object is safe to release and
/// records that the object has been spilled to suppress reconstruction.
void HandleObjectSpilled(const ObjectID &object_id);
private:
struct Reference {
/// Constructor for a reference whose origin is unknown.
@ -524,6 +533,8 @@ class ReferenceCounter : public ReferenceCounterInterface {
/// is inlined (not stored in plasma), then its lineage ref count is 0
/// because any dependent task will already have the value of the object.
size_t lineage_ref_count = 0;
/// Whether this object has been spilled to external storage.
bool spilled = false;
/// Callback that will be called when this ObjectID no longer has
/// references.

View file

@ -1987,21 +1987,22 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
ObjectID borrowed_id = ObjectID::FromRandom();
rc->AddLocalReference(borrowed_id, "");
NodeID pinned_at;
ASSERT_FALSE(rc->IsPlasmaObjectPinned(borrowed_id, &pinned_at));
bool spilled;
ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(borrowed_id, &pinned_at, &spilled));
ObjectID id = ObjectID::FromRandom();
NodeID node_id = NodeID::FromRandom();
rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true);
rc->AddLocalReference(id, "");
ASSERT_TRUE(rc->SetDeleteCallback(id, callback));
ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(pinned_at.IsNil());
rc->UpdateObjectPinnedAtRaylet(id, node_id);
ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_FALSE(pinned_at.IsNil());
rc->RemoveLocalReference(id, nullptr);
ASSERT_FALSE(rc->IsPlasmaObjectPinned(id, &pinned_at));
ASSERT_FALSE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(deleted->count(id) > 0);
deleted->clear();
@ -2012,7 +2013,7 @@ TEST_F(ReferenceCountLineageEnabledTest, TestPlasmaLocation) {
auto objects = rc->ResetObjectsOnRemovedNode(node_id);
ASSERT_EQ(objects.size(), 1);
ASSERT_EQ(objects[0], id);
ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(pinned_at.IsNil());
ASSERT_TRUE(deleted->count(id) > 0);
deleted->clear();
@ -2035,7 +2036,8 @@ TEST_F(ReferenceCountTest, TestFree) {
ASSERT_EQ(deleted->count(id), 0);
rc->UpdateObjectPinnedAtRaylet(id, node_id);
NodeID pinned_at;
ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at));
bool spilled;
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(pinned_at.IsNil());
ASSERT_TRUE(rc->IsPlasmaObjectFreed(id));
rc->RemoveLocalReference(id, nullptr);
@ -2050,7 +2052,7 @@ TEST_F(ReferenceCountTest, TestFree) {
rc->FreePlasmaObjects({id});
ASSERT_TRUE(rc->IsPlasmaObjectFreed(id));
ASSERT_TRUE(deleted->count(id) > 0);
ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned_at));
ASSERT_TRUE(rc->IsPlasmaObjectPinnedOrSpilled(id, &pinned_at, &spilled));
ASSERT_TRUE(pinned_at.IsNil());
rc->RemoveLocalReference(id, nullptr);
ASSERT_FALSE(rc->IsPlasmaObjectFreed(id));

View file

@ -352,7 +352,7 @@ class ObjectInfoAccessor {
/// \return Status
virtual Status AsyncGetLocations(
const ObjectID &object_id,
const MultiItemCallback<rpc::ObjectTableData> &callback) = 0;
const OptionalItemCallback<rpc::ObjectLocationInfo> &callback) = 0;
/// Get all object's locations from GCS asynchronously.
///
@ -370,6 +370,16 @@ class ObjectInfoAccessor {
virtual Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id,
const StatusCallback &callback) = 0;
/// Add spilled location of object to GCS asynchronously.
///
/// \param object_id The ID of object which location will be added to GCS.
/// \param spilled_url The URL where the object has been spilled.
/// \param callback Callback that will be called after object has been added to GCS.
/// \return Status
virtual Status AsyncAddSpilledUrl(const ObjectID &object_id,
const std::string &spilled_url,
const StatusCallback &callback) = 0;
/// Remove location of object from GCS asynchronously.
///
/// \param object_id The ID of object which location will be removed from GCS.
@ -388,7 +398,8 @@ class ObjectInfoAccessor {
/// \return Status
virtual Status AsyncSubscribeToLocations(
const ObjectID &object_id,
const SubscribeCallback<ObjectID, ObjectChangeNotification> &subscribe,
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>>
&subscribe,
const StatusCallback &done) = 0;
/// Cancel subscription to any update of an object's location.

View file

@ -108,17 +108,12 @@ std::unique_ptr<std::string> GlobalStateAccessor::GetObjectInfo(
const ObjectID &object_id) {
std::unique_ptr<std::string> object_info;
std::promise<bool> promise;
auto on_done = [object_id, &object_info, &promise](
auto on_done = [&object_info, &promise](
const Status &status,
const std::vector<rpc::ObjectTableData> &result) {
const boost::optional<rpc::ObjectLocationInfo> &result) {
RAY_CHECK_OK(status);
if (!result.empty()) {
rpc::ObjectLocationInfo object_location_info;
object_location_info.set_object_id(object_id.Binary());
for (auto &data : result) {
object_location_info.add_locations()->CopyFrom(data);
}
object_info.reset(new std::string(object_location_info.SerializeAsString()));
if (result) {
object_info.reset(new std::string(result->SerializeAsString()));
}
promise.set_value(true);
};

View file

@ -1086,19 +1086,15 @@ ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor(
: client_impl_(client_impl) {}
Status ServiceBasedObjectInfoAccessor::AsyncGetLocations(
const ObjectID &object_id, const MultiItemCallback<rpc::ObjectTableData> &callback) {
const ObjectID &object_id,
const OptionalItemCallback<rpc::ObjectLocationInfo> &callback) {
RAY_LOG(DEBUG) << "Getting object locations, object id = " << object_id;
rpc::GetObjectLocationsRequest request;
request.set_object_id(object_id.Binary());
client_impl_->GetGcsRpcClient().GetObjectLocations(
request, [object_id, callback](const Status &status,
const rpc::GetObjectLocationsReply &reply) {
std::vector<ObjectTableData> result;
result.reserve((reply.object_table_data_list_size()));
for (int index = 0; index < reply.object_table_data_list_size(); ++index) {
result.emplace_back(reply.object_table_data_list(index));
}
callback(status, result);
callback(status, reply.location_info());
RAY_LOG(DEBUG) << "Finished getting object locations, status = " << status
<< ", object id = " << object_id;
});
@ -1151,6 +1147,31 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i
return Status::OK();
}
Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl(
const ObjectID &object_id, const std::string &spilled_url,
const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id
<< ", spilled_url = " << spilled_url;
rpc::AddObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_spilled_url(spilled_url);
auto operation = [this, request, callback](const SequencerDoneCallback &done_callback) {
client_impl_->GetGcsRpcClient().AddObjectLocation(
request, [callback, done_callback](const Status &status,
const rpc::AddObjectLocationReply &reply) {
if (callback) {
callback(status);
}
done_callback();
});
};
sequencer_.Post(object_id, operation);
return Status::OK();
}
Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation(
const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Removing object location, object id = " << object_id
@ -1179,7 +1200,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation(
Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations(
const ObjectID &object_id,
const SubscribeCallback<ObjectID, ObjectChangeNotification> &subscribe,
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr)
<< "Failed to subscribe object location, object id = " << object_id;
@ -1188,10 +1209,20 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations(
subscribe](const StatusCallback &fetch_done) {
auto callback = [object_id, subscribe, fetch_done](
const Status &status,
const std::vector<rpc::ObjectTableData> &result) {
const boost::optional<rpc::ObjectLocationInfo> &result) {
if (status.ok()) {
gcs::ObjectChangeNotification notification(rpc::GcsChangeMode::APPEND_OR_ADD,
result);
std::vector<rpc::ObjectLocationChange> notification;
for (const auto &loc : result->locations()) {
rpc::ObjectLocationChange update;
update.set_is_add(true);
update.set_node_id(loc.manager());
notification.push_back(update);
}
if (!result->spilled_url().empty()) {
rpc::ObjectLocationChange update;
update.set_spilled_url(result->spilled_url());
notification.push_back(update);
}
subscribe(object_id, notification);
}
if (fetch_done) {
@ -1207,13 +1238,7 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations(
const std::string &data) {
rpc::ObjectLocationChange object_location_change;
object_location_change.ParseFromString(data);
std::vector<rpc::ObjectTableData> object_data_vector;
object_data_vector.emplace_back(object_location_change.data());
auto change_mode = object_location_change.is_add()
? rpc::GcsChangeMode::APPEND_OR_ADD
: rpc::GcsChangeMode::REMOVE;
gcs::ObjectChangeNotification notification(change_mode, object_data_vector);
subscribe(object_id, notification);
subscribe(object_id, {object_location_change});
};
return client_impl_->GetGcsPubSub().Subscribe(OBJECT_CHANNEL, object_id.Hex(),
on_subscribe, subscribe_done);

View file

@ -326,19 +326,23 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor {
Status AsyncGetLocations(
const ObjectID &object_id,
const MultiItemCallback<rpc::ObjectTableData> &callback) override;
const OptionalItemCallback<rpc::ObjectLocationInfo> &callback) override;
Status AsyncGetAll(const MultiItemCallback<rpc::ObjectLocationInfo> &callback) override;
Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id,
const StatusCallback &callback) override;
Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
const StatusCallback &callback) override;
Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id,
const StatusCallback &callback) override;
Status AsyncSubscribeToLocations(
const ObjectID &object_id,
const SubscribeCallback<ObjectID, ObjectChangeNotification> &subscribe,
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>>
&subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribeToLocations(const ObjectID &object_id) override;

View file

@ -440,7 +440,8 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
bool SubscribeToLocations(
const ObjectID &object_id,
const gcs::SubscribeCallback<ObjectID, gcs::ObjectChangeNotification> &subscribe) {
const gcs::SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>>
&subscribe) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Objects().AsyncSubscribeToLocations(
object_id, subscribe,
@ -479,9 +480,12 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
std::promise<bool> promise;
std::vector<rpc::ObjectTableData> locations;
RAY_CHECK_OK(gcs_client_->Objects().AsyncGetLocations(
object_id, [&locations, &promise](
Status status, const std::vector<rpc::ObjectTableData> &result) {
locations = result;
object_id,
[&locations, &promise](Status status,
const boost::optional<rpc::ObjectLocationInfo> &result) {
for (const auto &loc : result->locations()) {
locations.push_back(loc);
}
promise.set_value(status.ok());
}));
EXPECT_TRUE(WaitReady(promise.get_future(), timeout_ms_));
@ -851,11 +855,11 @@ TEST_F(ServiceBasedGcsClientTest, TestObjectInfo) {
std::atomic<int> object_remove_count(0);
auto on_subscribe = [&object_add_count, &object_remove_count](
const ObjectID &object_id,
const gcs::ObjectChangeNotification &result) {
if (!result.GetData().empty()) {
if (result.IsAdded()) {
const std::vector<rpc::ObjectLocationChange> &result) {
for (const auto &res : result) {
if (res.is_add()) {
++object_add_count;
} else if (result.IsRemoved()) {
} else {
++object_remove_count;
}
}
@ -1011,16 +1015,18 @@ TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) {
std::atomic<int> object1_change_count(0);
std::atomic<int> object2_change_count(0);
ASSERT_TRUE(SubscribeToLocations(
object1_id, [&object1_change_count](const ObjectID &object_id,
const gcs::ObjectChangeNotification &result) {
if (!result.GetData().empty()) {
object1_id,
[&object1_change_count](const ObjectID &object_id,
const std::vector<rpc::ObjectLocationChange> &result) {
if (!result.empty()) {
++object1_change_count;
}
}));
ASSERT_TRUE(SubscribeToLocations(
object2_id, [&object2_change_count](const ObjectID &object_id,
const gcs::ObjectChangeNotification &result) {
if (!result.GetData().empty()) {
object2_id,
[&object2_change_count](const ObjectID &object_id,
const std::vector<rpc::ObjectLocationChange> &result) {
if (!result.empty()) {
++object2_change_count;
}
}));
@ -1232,8 +1238,8 @@ TEST_F(ServiceBasedGcsClientTest, TestMultiThreadSubAndUnsub) {
for (int index = 0; index < sub_and_unsub_loop_count; ++index) {
auto object_id = ObjectID::FromRandom();
ASSERT_TRUE(SubscribeToLocations(
object_id,
[](const ObjectID &id, const gcs::ObjectChangeNotification &result) {}));
object_id, [](const ObjectID &id,
const std::vector<rpc::ObjectLocationChange> &result) {}));
gcs_client_->Objects().AsyncResubscribe(false);
UnsubscribeToLocations(object_id);
}

View file

@ -23,15 +23,15 @@ namespace gcs {
void GcsObjectManager::HandleGetObjectLocations(
const rpc::GetObjectLocationsRequest &request, rpc::GetObjectLocationsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
reply->mutable_location_info()->set_object_id(request.object_id());
ObjectID object_id = ObjectID::FromBinary(request.object_id());
RAY_LOG(DEBUG) << "Getting object locations, job id = " << object_id.TaskId().JobId()
<< ", object id = " << object_id;
auto object_locations = GetObjectLocations(object_id);
for (auto &node_id : object_locations) {
rpc::ObjectTableData object_table_data;
object_table_data.set_manager(node_id.Binary());
reply->add_object_table_data_list()->CopyFrom(object_table_data);
}
absl::MutexLock lock(&mutex_);
auto object_data = GenObjectLocationInfo(object_id);
reply->mutable_location_info()->Swap(&object_data);
RAY_LOG(DEBUG) << "Finished getting object locations, job id = "
<< object_id.TaskId().JobId() << ", object id = " << object_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
@ -45,7 +45,7 @@ void GcsObjectManager::HandleGetAllObjectLocations(
for (auto &item : object_to_locations_) {
rpc::ObjectLocationInfo object_location_info;
object_location_info.set_object_id(item.first.Binary());
for (auto &node_id : item.second) {
for (auto &node_id : item.second.locations) {
rpc::ObjectTableData object_table_data;
object_table_data.set_manager(node_id.Binary());
object_location_info.add_locations()->CopyFrom(object_table_data);
@ -60,17 +60,33 @@ void GcsObjectManager::HandleAddObjectLocation(
const rpc::AddObjectLocationRequest &request, rpc::AddObjectLocationReply *reply,
rpc::SendReplyCallback send_reply_callback) {
ObjectID object_id = ObjectID::FromBinary(request.object_id());
NodeID node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Adding object location, job id = " << object_id.TaskId().JobId()
<< ", object id = " << object_id << ", node id = " << node_id;
AddObjectLocationInCache(object_id, node_id);
auto on_done = [this, object_id, node_id, reply,
NodeID node_id;
std::string spilled_url;
if (!request.node_id().empty()) {
node_id = NodeID::FromBinary(request.node_id());
RAY_LOG(DEBUG) << "Adding object location, job id = " << object_id.TaskId().JobId()
<< ", object id = " << object_id << ", node id = " << node_id;
AddObjectLocationInCache(object_id, node_id);
} else {
absl::MutexLock lock(&mutex_);
object_to_locations_[object_id].spilled_url = request.spilled_url();
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id;
}
auto on_done = [this, object_id, node_id, spilled_url, reply,
send_reply_callback](const Status &status) {
if (status.ok()) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
OBJECT_CHANNEL, object_id.Hex(),
gcs::CreateObjectLocationChange(node_id, true)->SerializeAsString(), nullptr));
rpc::ObjectLocationChange notification;
notification.set_is_add(true);
if (!node_id.IsNil()) {
notification.set_node_id(node_id.Binary());
}
if (!spilled_url.empty()) {
notification.set_spilled_url(spilled_url);
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(OBJECT_CHANNEL, object_id.Hex(),
notification.SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished adding object location, job id = "
<< object_id.TaskId().JobId() << ", object id = " << object_id
<< ", node id = " << node_id << ", task id = " << object_id.TaskId();
@ -86,11 +102,8 @@ void GcsObjectManager::HandleAddObjectLocation(
};
absl::MutexLock lock(&mutex_);
auto object_location_set =
GetObjectLocationSet(object_id, /* create_if_not_exist */ false);
auto object_table_data_list = GenObjectTableDataList(*object_location_set);
Status status =
gcs_table_storage_->ObjectTable().Put(object_id, *object_table_data_list, on_done);
const auto object_data = GenObjectLocationInfo(object_id);
Status status = gcs_table_storage_->ObjectTable().Put(object_id, object_data, on_done);
if (!status.ok()) {
on_done(status);
}
@ -130,9 +143,8 @@ void GcsObjectManager::HandleRemoveObjectLocation(
GetObjectLocationSet(object_id, /* create_if_not_exist */ false);
Status status;
if (object_location_set != nullptr) {
auto object_table_data_list = GenObjectTableDataList(*object_location_set);
status = gcs_table_storage_->ObjectTable().Put(object_id, *object_table_data_list,
on_done);
const auto object_data = GenObjectLocationInfo(object_id);
status = gcs_table_storage_->ObjectTable().Put(object_id, object_data, on_done);
} else {
status = gcs_table_storage_->ObjectTable().Delete(object_id, on_done);
}
@ -154,7 +166,7 @@ void GcsObjectManager::AddObjectsLocation(
for (const auto &object_id : object_ids) {
auto *object_locations =
GetObjectLocationSet(object_id, /* create_if_not_exist */ true);
object_locations->emplace(node_id);
object_locations->locations.emplace(node_id);
}
}
@ -167,7 +179,7 @@ void GcsObjectManager::AddObjectLocationInCache(const ObjectID &object_id,
auto *object_locations =
GetObjectLocationSet(object_id, /* create_if_not_exist */ true);
object_locations->emplace(node_id);
object_locations->locations.emplace(node_id);
}
absl::flat_hash_set<NodeID> GcsObjectManager::GetObjectLocations(
@ -176,7 +188,7 @@ absl::flat_hash_set<NodeID> GcsObjectManager::GetObjectLocations(
auto *object_locations = GetObjectLocationSet(object_id);
if (object_locations) {
return *object_locations;
return object_locations->locations;
}
return absl::flat_hash_set<NodeID>{};
}
@ -198,8 +210,8 @@ void GcsObjectManager::OnNodeRemoved(const NodeID &node_id) {
for (const auto &object_id : objects_on_node) {
auto *object_locations = GetObjectLocationSet(object_id);
if (object_locations) {
object_locations->erase(node_id);
if (object_locations->empty()) {
object_locations->locations.erase(node_id);
if (object_locations->locations.empty() && object_locations->spilled_url.empty()) {
object_to_locations_.erase(object_id);
}
}
@ -212,8 +224,8 @@ void GcsObjectManager::RemoveObjectLocationInCache(const ObjectID &object_id,
auto *object_locations = GetObjectLocationSet(object_id);
if (object_locations) {
object_locations->erase(node_id);
if (object_locations->empty()) {
object_locations->locations.erase(node_id);
if (object_locations->locations.empty() && object_locations->spilled_url.empty()) {
object_to_locations_.erase(object_id);
}
}
@ -258,25 +270,28 @@ GcsObjectManager::ObjectSet *GcsObjectManager::GetObjectSetByNode(
return objects_on_node;
}
std::shared_ptr<ObjectTableDataList> GcsObjectManager::GenObjectTableDataList(
const GcsObjectManager::LocationSet &location_set) const {
auto object_table_data_list = std::make_shared<ObjectTableDataList>();
for (auto &node_id : location_set) {
object_table_data_list->add_items()->set_manager(node_id.Binary());
const ObjectLocationInfo GcsObjectManager::GenObjectLocationInfo(
const ObjectID &object_id) const {
ObjectLocationInfo object_data;
object_data.set_object_id(object_id.Binary());
auto it = object_to_locations_.find(object_id);
if (it != object_to_locations_.end()) {
for (const auto &node_id : it->second.locations) {
object_data.add_locations()->set_manager(node_id.Binary());
}
object_data.set_spilled_url(it->second.spilled_url);
}
return object_table_data_list;
return object_data;
}
void GcsObjectManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "Loading initial data.";
auto callback = [this, done](
const std::unordered_map<ObjectID, ObjectTableDataList> &result) {
auto callback = [this,
done](const std::unordered_map<ObjectID, ObjectLocationInfo> &result) {
absl::flat_hash_map<NodeID, ObjectSet> node_to_objects;
for (auto &item : result) {
auto object_list = item.second;
for (int index = 0; index < object_list.items_size(); ++index) {
node_to_objects[NodeID::FromBinary(object_list.items(index).manager())].insert(
item.first);
for (const auto &loc : item.second.locations()) {
node_to_objects[NodeID::FromBinary(loc.manager())].insert(item.first);
}
}

View file

@ -60,7 +60,10 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
void LoadInitialData(const EmptyCallback &done);
protected:
typedef absl::flat_hash_set<NodeID> LocationSet;
struct LocationSet {
absl::flat_hash_set<NodeID> locations;
std::string spilled_url = "";
};
/// Add a location of objects.
/// If the GCS server restarts, this function is used to reload data from storage.
@ -82,7 +85,8 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
///
/// \param object_id The id of object to lookup.
/// \return Object locations.
LocationSet GetObjectLocations(const ObjectID &object_id) LOCKS_EXCLUDED(mutex_);
absl::flat_hash_set<NodeID> GetObjectLocations(const ObjectID &object_id)
LOCKS_EXCLUDED(mutex_);
/// Handler if a node is removed.
///
@ -99,8 +103,8 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
private:
typedef absl::flat_hash_set<ObjectID> ObjectSet;
std::shared_ptr<ObjectTableDataList> GenObjectTableDataList(
const GcsObjectManager::LocationSet &location_set) const;
const ObjectLocationInfo GenObjectLocationInfo(const ObjectID &object_id) const
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/// Get object locations by object id from map.
/// Will create it if not exist and the flag create_if_not_exist is set to true.

View file

@ -140,14 +140,14 @@ template class GcsTable<ActorID, ActorCheckpointIdData>;
template class GcsTable<TaskID, TaskTableData>;
template class GcsTable<TaskID, TaskLeaseData>;
template class GcsTable<TaskID, TaskReconstructionData>;
template class GcsTable<ObjectID, ObjectTableDataList>;
template class GcsTable<ObjectID, ObjectLocationInfo>;
template class GcsTable<UniqueID, StoredConfig>;
template class GcsTableWithJobId<ActorID, ActorTableData>;
template class GcsTableWithJobId<ActorID, ActorCheckpointIdData>;
template class GcsTableWithJobId<TaskID, TaskTableData>;
template class GcsTableWithJobId<TaskID, TaskLeaseData>;
template class GcsTableWithJobId<TaskID, TaskReconstructionData>;
template class GcsTableWithJobId<ObjectID, ObjectTableDataList>;
template class GcsTableWithJobId<ObjectID, ObjectLocationInfo>;
template class GcsTable<PlacementGroupID, PlacementGroupTableData>;
template class GcsTable<PlacementGroupID, ScheduleData>;

View file

@ -31,8 +31,8 @@ using rpc::GcsNodeInfo;
using rpc::HeartbeatBatchTableData;
using rpc::HeartbeatTableData;
using rpc::JobTableData;
using rpc::ObjectLocationInfo;
using rpc::ObjectTableData;
using rpc::ObjectTableDataList;
using rpc::PlacementGroupTableData;
using rpc::ProfileTableData;
using rpc::ResourceMap;
@ -234,7 +234,7 @@ class GcsTaskReconstructionTable
JobID GetJobIdFromKey(const TaskID &key) override { return key.ActorId().JobId(); }
};
class GcsObjectTable : public GcsTableWithJobId<ObjectID, ObjectTableDataList> {
class GcsObjectTable : public GcsTableWithJobId<ObjectID, ObjectLocationInfo> {
public:
explicit GcsObjectTable(std::shared_ptr<StoreClient> &store_client)
: GcsTableWithJobId(store_client) {

View file

@ -295,8 +295,8 @@ class GcsServerTest : public ::testing::Test {
request, [&object_locations, &promise](
const Status &status, const rpc::GetObjectLocationsReply &reply) {
RAY_CHECK_OK(status);
for (int index = 0; index < reply.object_table_data_list_size(); ++index) {
object_locations.push_back(reply.object_table_data_list(index));
for (const auto &loc : reply.location_info().locations()) {
object_locations.push_back(loc);
}
promise.set_value(true);
});

View file

@ -105,11 +105,9 @@ inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
/// \return The object location change created by this method.
inline std::shared_ptr<ray::rpc::ObjectLocationChange> CreateObjectLocationChange(
const NodeID &node_id, bool is_add) {
ray::rpc::ObjectTableData object_table_data;
object_table_data.set_manager(node_id.Binary());
auto object_location_change = std::make_shared<ray::rpc::ObjectLocationChange>();
object_location_change->set_is_add(is_add);
object_location_change->mutable_data()->CopyFrom(object_table_data);
object_location_change->set_node_id(node_id.Binary());
return object_location_change;
}

View file

@ -414,11 +414,18 @@ RedisObjectInfoAccessor::RedisObjectInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl), object_sub_executor_(client_impl->object_table()) {}
Status RedisObjectInfoAccessor::AsyncGetLocations(
const ObjectID &object_id, const MultiItemCallback<ObjectTableData> &callback) {
const ObjectID &object_id,
const OptionalItemCallback<rpc::ObjectLocationInfo> &callback) {
RAY_CHECK(callback != nullptr);
auto on_done = [callback](RedisGcsClient *client, const ObjectID &object_id,
const std::vector<ObjectTableData> &data) {
callback(Status::OK(), data);
rpc::ObjectLocationInfo info;
info.set_object_id(object_id.Binary());
for (const auto &item : data) {
auto item_ptr = info.add_locations();
item_ptr->CopyFrom(item);
}
callback(Status::OK(), info);
};
ObjectTable &object_table = client_impl_->object_table();
@ -463,10 +470,22 @@ Status RedisObjectInfoAccessor::AsyncRemoveLocation(const ObjectID &object_id,
Status RedisObjectInfoAccessor::AsyncSubscribeToLocations(
const ObjectID &object_id,
const SubscribeCallback<ObjectID, ObjectChangeNotification> &subscribe,
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return object_sub_executor_.AsyncSubscribe(subscribe_id_, object_id, subscribe, done);
return object_sub_executor_.AsyncSubscribe(
subscribe_id_, object_id,
[subscribe](const ObjectID &id, const ObjectChangeNotification &notification_data) {
std::vector<rpc::ObjectLocationChange> updates;
for (const auto &item : notification_data.GetData()) {
rpc::ObjectLocationChange update;
update.set_is_add(notification_data.IsAdded());
update.set_node_id(item.manager());
updates.push_back(update);
}
subscribe(id, updates);
},
done);
}
Status RedisObjectInfoAccessor::AsyncUnsubscribeToLocations(const ObjectID &object_id) {

View file

@ -264,8 +264,9 @@ class RedisObjectInfoAccessor : public ObjectInfoAccessor {
virtual ~RedisObjectInfoAccessor() {}
Status AsyncGetLocations(const ObjectID &object_id,
const MultiItemCallback<ObjectTableData> &callback) override;
Status AsyncGetLocations(
const ObjectID &object_id,
const OptionalItemCallback<rpc::ObjectLocationInfo> &callback) override;
Status AsyncGetAll(
const MultiItemCallback<rpc::ObjectLocationInfo> &callback) override {
@ -275,12 +276,18 @@ class RedisObjectInfoAccessor : public ObjectInfoAccessor {
Status AsyncAddLocation(const ObjectID &object_id, const NodeID &node_id,
const StatusCallback &callback) override;
Status AsyncAddSpilledUrl(const ObjectID &object_id, const std::string &spilled_url,
const StatusCallback &callback) override {
return Status::NotImplemented("AsyncAddSpilledUrl not implemented");
}
Status AsyncRemoveLocation(const ObjectID &object_id, const NodeID &node_id,
const StatusCallback &callback) override;
Status AsyncSubscribeToLocations(
const ObjectID &object_id,
const SubscribeCallback<ObjectID, ObjectChangeNotification> &subscribe,
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>>
&subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribeToLocations(const ObjectID &object_id) override;

View file

@ -70,9 +70,10 @@ TEST_F(RedisObjectInfoAccessorTest, TestGetAddRemove) {
size_t total_size = elem.second.size();
RAY_CHECK_OK(object_accessor.AsyncGetLocations(
elem.first,
[this, total_size](Status status, const std::vector<ObjectTableData> &result) {
[this, total_size](Status status,
const boost::optional<rpc::ObjectLocationInfo> &result) {
RAY_CHECK_OK(status);
RAY_CHECK(total_size == result.size());
ASSERT_EQ(total_size, result->locations().size());
--pending_count_;
}));
}
@ -83,17 +84,18 @@ TEST_F(RedisObjectInfoAccessorTest, TestGetAddRemove) {
// subscribe && delete
// subscribe
std::atomic<int> sub_pending_count(0);
auto subscribe = [this, &sub_pending_count](const ObjectID &object_id,
const ObjectChangeNotification &result) {
auto subscribe = [this, &sub_pending_count](
const ObjectID &object_id,
const std::vector<rpc::ObjectLocationChange> &result) {
const auto it = object_id_to_data_.find(object_id);
ASSERT_TRUE(it != object_id_to_data_.end());
static size_t response_count = 1;
size_t cur_count = response_count <= object_count_ ? copy_count_ : 1;
ASSERT_EQ(result.GetData().size(), cur_count);
rpc::GcsChangeMode change_mode = response_count <= object_count_
? rpc::GcsChangeMode::APPEND_OR_ADD
: rpc::GcsChangeMode::REMOVE;
ASSERT_EQ(change_mode, result.GetGcsChangeMode());
ASSERT_EQ(result.size(), cur_count);
bool change_mode = response_count <= object_count_;
for (const auto &res : result) {
ASSERT_EQ(change_mode, res.is_add());
}
++response_count;
--sub_pending_count;
};
@ -128,9 +130,10 @@ TEST_F(RedisObjectInfoAccessorTest, TestGetAddRemove) {
size_t total_size = elem.second.size();
RAY_CHECK_OK(object_accessor.AsyncGetLocations(
elem.first,
[this, total_size](Status status, const std::vector<ObjectTableData> &result) {
[this, total_size](Status status,
const boost::optional<rpc::ObjectLocationInfo> &result) {
RAY_CHECK_OK(status);
ASSERT_EQ(total_size - 1, result.size());
ASSERT_EQ(total_size - 1, result->locations().size());
--pending_count_;
}));
}

View file

@ -29,22 +29,31 @@ using ray::rpc::ObjectTableData;
/// Process a notification of the object table entries and store the result in
/// node_ids. This assumes that node_ids already contains the result of the
/// object table entries up to but not including this notification.
bool UpdateObjectLocations(bool is_added,
const std::vector<ObjectTableData> &location_updates,
bool UpdateObjectLocations(const std::vector<rpc::ObjectLocationChange> &location_updates,
std::shared_ptr<gcs::GcsClient> gcs_client,
std::unordered_set<NodeID> *node_ids) {
std::unordered_set<NodeID> *node_ids,
std::string *spilled_url) {
// location_updates contains the updates of locations of the object.
// with GcsChangeMode, we can determine whether the update mode is
// addition or deletion.
bool isUpdated = false;
for (const auto &object_table_data : location_updates) {
NodeID node_id = NodeID::FromBinary(object_table_data.manager());
if (is_added && 0 == node_ids->count(node_id)) {
node_ids->insert(node_id);
isUpdated = true;
} else if (!is_added && 1 == node_ids->count(node_id)) {
node_ids->erase(node_id);
isUpdated = true;
for (const auto &update : location_updates) {
if (!update.node_id().empty()) {
NodeID node_id = NodeID::FromBinary(update.node_id());
if (update.is_add() && 0 == node_ids->count(node_id)) {
node_ids->insert(node_id);
isUpdated = true;
} else if (!update.is_add() && 1 == node_ids->count(node_id)) {
node_ids->erase(node_id);
isUpdated = true;
}
} else {
RAY_CHECK(!update.spilled_url().empty());
RAY_LOG(DEBUG) << "Received object spilled at " << update.spilled_url();
if (update.spilled_url() != *spilled_url) {
*spilled_url = update.spilled_url();
isUpdated = true;
}
}
}
// Filter out the removed clients from the object locations.
@ -111,14 +120,15 @@ void ObjectDirectory::HandleClientRemoved(const NodeID &client_id) {
if (listener.second.current_object_locations.count(client_id) > 0) {
// If the subscribed object has the removed client as a location, update
// its locations with an empty update so that the location will be removed.
UpdateObjectLocations(/*is_added*/ true, {}, gcs_client_,
&listener.second.current_object_locations);
UpdateObjectLocations({}, gcs_client_, &listener.second.current_object_locations,
&listener.second.spilled_url);
// Re-call all the subscribed callbacks for the object, since its
// locations have changed.
for (const auto &callback_pair : listener.second.callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, listener.second.current_object_locations);
callback_pair.second(object_id, listener.second.current_object_locations,
listener.second.spilled_url);
}
}
}
@ -135,7 +145,7 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
auto object_notification_callback =
[this](const ObjectID &object_id,
const gcs::ObjectChangeNotification &object_notification) {
const std::vector<rpc::ObjectLocationChange> &object_notifications) {
// Objects are added to this map in SubscribeObjectLocations.
auto it = listeners_.find(object_id);
// Do nothing for objects we are not listening for.
@ -147,9 +157,9 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
it->second.subscribed = true;
// Update entries for this object.
if (!UpdateObjectLocations(object_notification.IsAdded(),
object_notification.GetData(), gcs_client_,
&it->second.current_object_locations)) {
if (!UpdateObjectLocations(object_notifications, gcs_client_,
&it->second.current_object_locations,
&it->second.spilled_url)) {
return;
}
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
@ -162,7 +172,8 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
for (const auto &callback_pair : callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations);
callback_pair.second(object_id, it->second.current_object_locations,
it->second.spilled_url);
}
};
status = gcs_client_->Objects().AsyncSubscribeToLocations(
@ -179,8 +190,10 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// immediately notify the caller of the current known locations.
if (listener_state.subscribed) {
auto &locations = listener_state.current_object_locations;
io_service_.post(
[callback, locations, object_id]() { callback(object_id, locations); });
auto &spilled_url = listener_state.spilled_url;
io_service_.post([callback, locations, spilled_url, object_id]() {
callback(object_id, locations, spilled_url);
});
}
return status;
}
@ -211,8 +224,10 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
// the object's creation, then call the callback immediately with the
// cached locations.
auto &locations = it->second.current_object_locations;
io_service_.post(
[callback, object_id, locations]() { callback(object_id, locations); });
auto &spilled_url = it->second.spilled_url;
io_service_.post([callback, object_id, spilled_url, locations]() {
callback(object_id, locations, spilled_url);
});
} else {
// We do not have any locations cached due to a concurrent
// SubscribeObjectLocations call, so look up the object's locations
@ -220,16 +235,29 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
status = gcs_client_->Objects().AsyncGetLocations(
object_id,
[this, object_id, callback](
Status status, const std::vector<ObjectTableData> &location_updates) {
Status status, const boost::optional<rpc::ObjectLocationInfo> &update) {
RAY_CHECK(status.ok())
<< "Failed to get object location from GCS: " << status.message();
// Build the set of current locations based on the entries in the log.
std::vector<rpc::ObjectLocationChange> notification;
for (const auto &loc : update->locations()) {
rpc::ObjectLocationChange change;
change.set_is_add(true);
change.set_node_id(loc.manager());
notification.push_back(change);
}
if (!update->spilled_url().empty()) {
rpc::ObjectLocationChange change;
change.set_spilled_url(update->spilled_url());
notification.push_back(change);
}
std::unordered_set<NodeID> node_ids;
UpdateObjectLocations(/*is_added*/ true, location_updates, gcs_client_,
&node_ids);
std::string spilled_url;
UpdateObjectLocations(notification, gcs_client_, &node_ids, &spilled_url);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(object_id, node_ids);
callback(object_id, node_ids, spilled_url);
});
}
return status;

View file

@ -59,8 +59,9 @@ class ObjectDirectoryInterface {
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;
/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::NodeID> &)>;
using OnLocationsFound =
std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::NodeID> &, const std::string &)>;
/// Lookup object locations. Callback may be invoked with empty list of client ids.
///
@ -182,6 +183,8 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<UniqueID, OnLocationsFound> callbacks;
/// The current set of known locations of this object.
std::unordered_set<NodeID> current_object_locations;
/// The location where this object has been spilled, if any.
std::string spilled_url = "";
/// This flag will get set to true if received any notification of the object.
/// It means current_object_locations is up-to-date with GCS. It
/// should never go back to false once set to true. If this is true, and

View file

@ -50,7 +50,8 @@ ObjectStoreRunner::~ObjectStoreRunner() {
ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_node_id,
const ObjectManagerConfig &config,
std::shared_ptr<ObjectDirectoryInterface> object_directory)
std::shared_ptr<ObjectDirectoryInterface> object_directory,
RestoreSpilledObjectCallback restore_spilled_object)
: self_node_id_(self_node_id),
config_(config),
object_directory_(std::move(object_directory)),
@ -61,7 +62,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service, const NodeID &self_
object_manager_server_("ObjectManager", config_.object_manager_port,
config_.rpc_service_threads_number),
object_manager_service_(rpc_service_, *this),
client_call_manager_(main_service, config_.rpc_service_threads_number) {
client_call_manager_(main_service, config_.rpc_service_threads_number),
restore_spilled_object_(restore_spilled_object) {
RAY_CHECK(config_.rpc_service_threads_number > 0);
main_service_ = &main_service;
@ -184,7 +186,8 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id,
// no ordering guarantee between notifications.
return object_directory_->SubscribeObjectLocations(
object_directory_pull_callback_id_, object_id, owner_address,
[this](const ObjectID &object_id, const std::unordered_set<NodeID> &client_ids) {
[this](const ObjectID &object_id, const std::unordered_set<NodeID> &client_ids,
const std::string &spilled_url) {
// Exit if the Pull request has already been fulfilled or canceled.
auto it = pull_requests_.find(object_id);
if (it == pull_requests_.end()) {
@ -196,7 +199,16 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id,
// before.
it->second.client_locations =
std::vector<NodeID>(client_ids.begin(), client_ids.end());
if (it->second.client_locations.empty()) {
if (!spilled_url.empty()) {
// Try to restore the spilled object.
restore_spilled_object_(object_id, spilled_url,
[this, object_id](const ray::Status &status) {
// Fall back to fetching from another object manager.
if (!status.ok()) {
TryPull(object_id);
}
});
} else if (it->second.client_locations.empty()) {
// The object locations are now empty, so we should wait for the next
// notification about a new object location. Cancel the timer until
// the next Pull attempt since there are no more clients to try.
@ -605,12 +617,14 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) {
RAY_RETURN_NOT_OK(object_directory_->LookupLocations(
object_id, wait_state.owner_addresses[object_id],
[this, wait_id](const ObjectID &lookup_object_id,
const std::unordered_set<NodeID> &client_ids) {
const std::unordered_set<NodeID> &client_ids,
const std::string &spilled_url) {
auto &wait_state = active_wait_requests_.find(wait_id)->second;
// Note that the object is guaranteed to be added to local_objects_ before
// the notification is triggered.
bool remote_object_ready = !client_ids.empty() || !spilled_url.empty();
if (local_objects_.count(lookup_object_id) > 0 ||
(!wait_state.wait_local && !client_ids.empty())) {
(!wait_state.wait_local && remote_object_ready)) {
wait_state.remaining.erase(lookup_object_id);
wait_state.found.insert(lookup_object_id);
}
@ -646,7 +660,8 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
RAY_CHECK_OK(object_directory_->SubscribeObjectLocations(
wait_id, object_id, wait_state.owner_addresses[object_id],
[this, wait_id](const ObjectID &subscribe_object_id,
const std::unordered_set<NodeID> &client_ids) {
const std::unordered_set<NodeID> &client_ids,
const std::string &spilled_url) {
auto object_id_wait_state = active_wait_requests_.find(wait_id);
if (object_id_wait_state == active_wait_requests_.end()) {
// Depending on the timing of calls to the object directory, we
@ -658,8 +673,9 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) {
auto &wait_state = object_id_wait_state->second;
// Note that the object is guaranteed to be added to local_objects_ before
// the notification is triggered.
bool remote_object_ready = !client_ids.empty() || !spilled_url.empty();
if (local_objects_.count(subscribe_object_id) > 0 ||
(!wait_state.wait_local && !client_ids.empty())) {
(!wait_state.wait_local && remote_object_ready)) {
RAY_LOG(DEBUG) << "Wait request " << wait_id
<< ": subscription notification received for object "
<< subscribe_object_id;

View file

@ -100,6 +100,9 @@ class ObjectManagerInterface {
class ObjectManager : public ObjectManagerInterface,
public rpc::ObjectManagerServiceHandler {
public:
using RestoreSpilledObjectCallback = std::function<void(
const ObjectID &, const std::string &, std::function<void(const ray::Status &)>)>;
/// Implementation of object manager service
/// Handle push request from remote object manager
@ -186,7 +189,8 @@ class ObjectManager : public ObjectManagerInterface,
/// \param object_directory An object implementing the object directory interface.
explicit ObjectManager(boost::asio::io_service &main_service,
const NodeID &self_node_id, const ObjectManagerConfig &config,
std::shared_ptr<ObjectDirectoryInterface> object_directory);
std::shared_ptr<ObjectDirectoryInterface> object_directory,
RestoreSpilledObjectCallback restore_spilled_object);
~ObjectManager();
@ -466,6 +470,8 @@ class ObjectManager : public ObjectManagerInterface,
std::unordered_map<NodeID, std::shared_ptr<rpc::ObjectManagerClient>>
remote_object_manager_clients_;
const RestoreSpilledObjectCallback restore_spilled_object_;
/// Running sum of the amount of memory used in the object store.
int64_t used_memory_ = 0;
};

View file

@ -141,7 +141,7 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback(
for (const auto &callback_pair : callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations);
callback_pair.second(object_id, it->second.current_object_locations, "");
}
}
@ -207,8 +207,9 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations(
if (rpc_client == nullptr) {
RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. "
<< "LookupLocations returns an empty list of locations.";
io_service_.post(
[callback, object_id]() { callback(object_id, std::unordered_set<NodeID>()); });
io_service_.post([callback, object_id]() {
callback(object_id, std::unordered_set<NodeID>(), "");
});
return Status::OK();
}
@ -228,7 +229,7 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations(
client_ids.emplace(NodeID::FromBinary(client_id));
}
FilterRemovedClients(gcs_client_, &client_ids);
callback(object_id, client_ids);
callback(object_id, client_ids, "");
});
return Status::OK();
}

View file

@ -54,7 +54,8 @@ class MockServer {
config_(object_manager_config),
gcs_client_(gcs_client),
object_manager_(main_service, node_id_, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
std::make_shared<ObjectDirectory>(main_service, gcs_client_),
nullptr) {
RAY_CHECK_OK(RegisterGcs(main_service));
}

View file

@ -50,7 +50,8 @@ class MockServer {
config_(object_manager_config),
gcs_client_(gcs_client),
object_manager_(main_service, node_id_, object_manager_config,
std::make_shared<ObjectDirectory>(main_service, gcs_client_)) {
std::make_shared<ObjectDirectory>(main_service, gcs_client_),
nullptr) {
RAY_CHECK_OK(RegisterGcs(main_service));
}
@ -262,9 +263,9 @@ class TestObjectManager : public TestObjectManagerBase {
RAY_CHECK_OK(server1->object_manager_.object_directory_->SubscribeObjectLocations(
sub_id, object_1, rpc::Address(),
[this, sub_id, object_1, object_2](
const ray::ObjectID &object_id,
const std::unordered_set<ray::NodeID> &clients) {
[this, sub_id, object_1, object_2](const ray::ObjectID &object_id,
const std::unordered_set<ray::NodeID> &clients,
const std::string &spilled_url) {
if (!clients.empty()) {
TestWaitWhileSubscribed(sub_id, object_1, object_2);
}

View file

@ -79,10 +79,8 @@ message GcsEntry {
}
message ObjectTableData {
// The size of the object.
uint64 object_size = 1;
// The node manager ID that this object appeared on or was evicted by.
bytes manager = 2;
bytes manager = 1;
}
message TaskReconstructionData {
@ -410,19 +408,22 @@ message StoredConfig {
map<string, string> config = 1;
}
message ObjectTableDataList {
repeated ObjectTableData items = 1;
}
message ObjectLocationInfo {
bytes object_id = 1;
repeated ObjectTableData locations = 2;
// For objects that have been spilled to external storage, the URL from which
// they can be retrieved.
string spilled_url = 3;
}
// A notification message about one object's locations being changed.
message ObjectLocationChange {
bool is_add = 1;
ObjectTableData data = 2;
// The node manager ID that this object appeared on or was evicted by.
bytes node_id = 2;
// The object has been spilled to this URL. This should be set xor the above
// fields are set.
string spilled_url = 3;
}
// A notification message about one node's resources being changed.

View file

@ -303,8 +303,8 @@ message GetObjectLocationsRequest {
message GetObjectLocationsReply {
GcsStatus status = 1;
// Data of object.
repeated ObjectTableData object_table_data_list = 2;
// Object location information.
ObjectLocationInfo location_info = 2;
}
message GetAllObjectLocationsRequest {
@ -321,12 +321,19 @@ message AddObjectLocationRequest {
bytes object_id = 1;
// The location that will be added to GCS Service.
bytes node_id = 2;
// The spilled URL that will be added to GCS Service. Either this or the node
// ID should be set.
string spilled_url = 3;
}
message AddObjectLocationReply {
GcsStatus status = 1;
}
message AddObjectSpilledUrlReply {
GcsStatus status = 1;
}
message RemoveObjectLocationRequest {
// The ID of object which location will be removed from GCS Service.
bytes object_id = 1;

View file

@ -82,9 +82,6 @@ enum MessageType:int {
SetResourceRequest,
// Subscribe to Plasma updates
SubscribePlasmaReady,
// Manually restore objects from external storage.
ForceRestoreSpilledObjectsRequest,
ForceRestoreSpilledObjectsReply,
}
table TaskExecutionSpecification {
@ -308,11 +305,3 @@ table ForceSpillObjectsRequest {
table ForceSpillObjectsReply {
}
table ForceRestoreSpilledObjectsRequest {
// List of object IDs to be restored from external storage.
object_ids: [string];
}
table ForceRestoreSpilledObjectsReply {
}

View file

@ -531,14 +531,9 @@ void NodeManager::HandleRequestObjectSpillage(
});
}
void NodeManager::SpillObjects(const std::vector<ObjectID> &objects_ids_to_spill,
void NodeManager::SpillObjects(const std::vector<ObjectID> &objects_ids,
std::function<void(const ray::Status &)> callback) {
std::vector<ObjectID> objects_ids;
for (const auto &id : objects_ids_to_spill) {
// Do not spill already spilled objects.
if (spilled_objects_.count(id) == 0) {
objects_ids.push_back(id);
}
for (const auto &id : objects_ids) {
// We should not spill an object that we are not the primary copy for.
// TODO(swang): We should really return an error here but right now there
// is a race condition where the raylet receives the owner's request to
@ -549,12 +544,6 @@ void NodeManager::SpillObjects(const std::vector<ObjectID> &objects_ids_to_spill
"the primary copy.";
}
}
if (objects_ids.empty()) {
if (callback) {
callback(Status::OK());
}
return;
}
worker_pool_.PopIOWorker([this, objects_ids,
callback](std::shared_ptr<WorkerInterface> io_worker) {
rpc::SpillObjectsRequest request;
@ -569,50 +558,46 @@ void NodeManager::SpillObjects(const std::vector<ObjectID> &objects_ids_to_spill
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to send object spilling request: "
<< status.ToString();
if (callback) {
callback(status);
}
} else {
RAY_CHECK(static_cast<size_t>(r.spilled_objects_url_size()) ==
objects_ids.size());
for (size_t i = 0; i < objects_ids.size(); ++i) {
const ObjectID &object_id = objects_ids[i];
const std::string &object_url = r.spilled_objects_url(i);
RAY_LOG(DEBUG) << "Object " << object_id << " spilled at " << object_url;
// TODO(suquark): write to object directory.
spilled_objects_[object_id] = object_url;
auto search = pinned_objects_.find(object_id);
if (search != pinned_objects_.end()) {
pinned_objects_.erase(search);
} else {
RAY_LOG(ERROR) << "The spilled object " << object_id.Hex()
<< " is not pinned.";
}
// Write to object directory. Wait for the write to finish before
// releasing the object to make sure that the spilled object can
// be retrieved by other raylets.
RAY_CHECK_OK(gcs_client_->Objects().AsyncAddSpilledUrl(
object_id, object_url, [this, object_id, callback](Status status) {
RAY_CHECK_OK(status);
// Unpin the object.
// NOTE(swang): Due to a race condition, the object may not be in
// the map yet. In that case, the owner will respond to the
// WaitForObjectEvictionRequest and we will unpin the object
// then.
pinned_objects_.erase(object_id);
if (callback) {
callback(status);
}
}));
}
}
if (callback) {
callback(status);
}
});
});
}
void NodeManager::RestoreSpilledObjects(
const std::vector<ObjectID> &object_ids,
void NodeManager::AsyncRestoreSpilledObject(
const ObjectID &object_id, const std::string &object_url,
std::function<void(const ray::Status &)> callback) {
std::vector<std::string> object_urls;
object_urls.reserve(object_ids.size());
for (const auto &object_id : object_ids) {
if (spilled_objects_.count(object_id) == 0) {
callback(Status::Invalid("No object URL recorded"));
return;
}
object_urls.push_back(spilled_objects_[object_id]);
}
worker_pool_.PopIOWorker([this, object_urls,
RAY_LOG(DEBUG) << "Restoring spilled object " << object_id << " from URL "
<< object_url;
worker_pool_.PopIOWorker([this, object_url,
callback](std::shared_ptr<WorkerInterface> io_worker) {
RAY_LOG(DEBUG) << "Sending restore spilled object request";
rpc::RestoreSpilledObjectsRequest request;
for (const auto &url : object_urls) {
request.add_spilled_objects_url(std::move(url));
}
request.add_spilled_objects_url(std::move(object_url));
io_worker->rpc_client()->RestoreSpilledObjects(
request, [this, callback, io_worker](const ray::Status &status,
const rpc::RestoreSpilledObjectsReply &r) {
@ -1226,24 +1211,6 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr<ClientConnection> &
case protocol::MessageType::SubscribePlasmaReady: {
ProcessSubscribePlasmaReady(client, message_data);
} break;
case protocol::MessageType::ForceRestoreSpilledObjectsRequest: {
auto message =
flatbuffers::GetRoot<protocol::ForceRestoreSpilledObjectsRequest>(message_data);
std::vector<ObjectID> object_ids = from_flatbuf<ObjectID>(*message->object_ids());
RestoreSpilledObjects(object_ids, [this, client](const ray::Status &status) {
flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<protocol::ForceRestoreSpilledObjectsReply> reply =
protocol::CreateForceRestoreSpilledObjectsReply(fbb);
fbb.Finish(reply);
auto reply_status = client->WriteMessage(
static_cast<int64_t>(protocol::MessageType::ForceRestoreSpilledObjectsReply),
fbb.GetSize(), fbb.GetBufferPointer());
if (!reply_status.ok()) {
// We failed to write to the client, so disconnect the client.
ProcessDisconnectClientMessage(client);
}
});
} break;
default:
RAY_LOG(FATAL) << "Received unexpected message type " << message_type;
}
@ -1515,23 +1482,15 @@ void NodeManager::ProcessFetchOrReconstructMessage(
const auto refs =
FlatbufferToObjectReference(*message->object_ids(), *message->owner_addresses());
if (message->fetch_only()) {
std::vector<ObjectID> spilled_object_ids;
for (const auto &ref : refs) {
ObjectID object_id = ObjectID::FromBinary(ref.object_id());
// If only a fetch is required, then do not subscribe to the
// dependencies to the task dependency manager.
if (!task_dependency_manager_.CheckObjectLocal(object_id)) {
if (spilled_objects_.count(object_id) > 0) {
spilled_object_ids.push_back(object_id);
} else {
// Fetch the object if it's not already local.
RAY_CHECK_OK(object_manager_.Pull(object_id, ref.owner_address()));
}
// Fetch the object if it's not already local.
RAY_CHECK_OK(object_manager_.Pull(object_id, ref.owner_address()));
}
}
if (spilled_object_ids.size() > 0) {
RestoreSpilledObjects(spilled_object_ids);
}
} else {
// The values are needed. Add all requested objects to the list to
// subscribe to in the task dependency manager. These objects will be
@ -2278,45 +2237,6 @@ void NodeManager::MarkObjectsAsFailed(
}
}
void NodeManager::TreatTaskAsFailedIfLost(const Task &task) {
const TaskSpecification &spec = task.GetTaskSpecification();
RAY_LOG(DEBUG) << "Treating task " << spec.TaskId()
<< " as failed if return values lost.";
// Loop over the return IDs (except the dummy ID) and check whether a
// location for the return ID exists.
int64_t num_returns = spec.NumReturns();
if (spec.IsActorCreationTask()) {
// TODO(rkn): We subtract 1 to avoid the dummy ID. However, this leaks
// information about the TaskSpecification implementation.
num_returns -= 1;
}
// Use a shared flag to make sure that we only treat the task as failed at
// most once. This flag will get deallocated once all of the object table
// lookup callbacks are fired.
auto task_marked_as_failed = std::make_shared<bool>(false);
for (int64_t i = 0; i < num_returns; i++) {
const ObjectID object_id = spec.ReturnId(i);
// Lookup the return value's locations.
RAY_CHECK_OK(object_directory_->LookupLocations(
object_id, spec.CallerAddress(),
[this, task_marked_as_failed, task](
const ray::ObjectID &object_id,
const std::unordered_set<ray::NodeID> &clients) {
if (!*task_marked_as_failed) {
// Only process the object locations if we haven't already marked the
// task as failed.
if (clients.empty()) {
// The object does not exist on any nodes but has been created
// before, so the object has been lost. Mark the task as failed to
// prevent any tasks that depend on this object from hanging.
TreatTaskAsFailed(task, ErrorType::OBJECT_UNRECONSTRUCTABLE);
*task_marked_as_failed = true;
}
}
}));
}
}
void NodeManager::SubmitTask(const Task &task) {
stats::TaskCountReceived().Record(1);
const TaskSpecification &spec = task.GetTaskSpecification();

View file

@ -168,6 +168,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Get the port of the node manager rpc server.
int GetServerPort() const { return node_manager_server_.GetPort(); }
/// Restore a spilled object from external storage back into local memory.
/// \param object_id The ID of the object to restore.
/// \param object_url The URL in external storage from which the object can be restored.
/// \param callback A callback to call when the restoration is done. Status
/// will contain the error during restoration, if any.
void AsyncRestoreSpilledObject(const ObjectID &object_id, const std::string &object_url,
std::function<void(const ray::Status &)> callback);
private:
/// Methods for handling clients.
@ -260,14 +268,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
void MarkObjectsAsFailed(const ErrorType &error_type,
const std::vector<rpc::ObjectReference> object_ids,
const JobID &job_id);
/// This is similar to TreatTaskAsFailed, but it will only mark the task as
/// failed if at least one of the task's return values is lost. A return
/// value is lost if it has been created before, but no longer exists on any
/// nodes, due to either node failure or eviction.
///
/// \param task The task to potentially fail.
/// \return Void.
void TreatTaskAsFailedIfLost(const Task &task);
/// Handle specified task's submission to the local node manager.
///
/// \param task The task being submitted.
@ -658,11 +658,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
void SpillObjects(const std::vector<ObjectID> &objects_ids_to_spill,
std::function<void(const ray::Status &)> callback = nullptr);
/// Restore spilled objects from external storage.
/// \param object_ids Objects to be restored.
void RestoreSpilledObjects(const std::vector<ObjectID> &object_ids,
std::function<void(const ray::Status &)> callback = nullptr);
/// Push an error to the driver if this node is full of actors and so we are
/// unable to schedule new tasks or actors at all.
void WarnResourceDeadlock();
@ -754,9 +749,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// A mapping from actor ID to registration information about that actor
/// (including which node manager owns it).
std::unordered_map<ActorID, ActorRegistration> actor_registry_;
/// A mapping from ObjectIDs to external object URLs for spilled objects.
/// TODO(suquark): Move it into object directory.
absl::flat_hash_map<ObjectID, std::string> spilled_objects_;
/// This map stores actor ID to the ID of the checkpoint that will be used to
/// restore the actor.
std::unordered_map<ActorID, ActorCheckpointID> checkpoint_id_to_restore_;

View file

@ -69,8 +69,12 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_
gcs_client_))
: std::dynamic_pointer_cast<ObjectDirectoryInterface>(
std::make_shared<ObjectDirectory>(main_service, gcs_client_))),
object_manager_(main_service, self_node_id_, object_manager_config,
object_directory_),
object_manager_(
main_service, self_node_id_, object_manager_config, object_directory_,
[this](const ObjectID &object_id, const std::string &spilled_url,
std::function<void(const ray::Status &)> callback) {
node_manager_.AsyncRestoreSpilledObject(object_id, spilled_url, callback);
}),
node_manager_(main_service, self_node_id_, node_manager_config, object_manager_,
gcs_client_, object_directory_),
socket_name_(socket_name),

View file

@ -179,8 +179,9 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) {
created_object_id, it->second.owner_addresses[created_object_id],
[this, task_id, reconstruction_attempt](
const ray::ObjectID &object_id,
const std::unordered_set<ray::NodeID> &clients) {
if (clients.empty()) {
const std::unordered_set<ray::NodeID> &clients,
const std::string &spilled_url) {
if (clients.empty() && spilled_url.empty()) {
// The required object no longer exists on any live nodes. Attempt
// reconstruction.
AttemptReconstruction(task_id, object_id, reconstruction_attempt);

View file

@ -56,9 +56,9 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const ObjectID object_id = callback.first;
auto it = locations_.find(object_id);
if (it == locations_.end()) {
callback.second(object_id, std::unordered_set<ray::NodeID>());
callback.second(object_id, std::unordered_set<ray::NodeID>(), "");
} else {
callback.second(object_id, it->second);
callback.second(object_id, it->second, "");
}
}
callbacks_.clear();

View file

@ -172,7 +172,8 @@ Process WorkerPool::StartWorkerProcess(const Language &language,
const JobID &job_id,
std::vector<std::string> dynamic_options) {
rpc::JobConfig *job_config = nullptr;
if (RayConfig::instance().enable_multi_tenancy()) {
if (RayConfig::instance().enable_multi_tenancy() &&
worker_type != rpc::WorkerType::IO_WORKER) {
RAY_CHECK(!job_id.IsNil());
auto it = unfinished_jobs_.find(job_id);
if (it == unfinished_jobs_.end()) {
@ -213,15 +214,15 @@ Process WorkerPool::StartWorkerProcess(const Language &language,
}
}
if (RayConfig::instance().enable_multi_tenancy() &&
!job_config->jvm_options().empty()) {
// Note that we push the item to the front of the vector to make
// sure this is the freshest option than others.
dynamic_options.insert(dynamic_options.begin(), job_config->jvm_options().begin(),
job_config->jvm_options().end());
}
// For non-multi-tenancy mode, job code search path is embedded in worker_command.
if (RayConfig::instance().enable_multi_tenancy() && job_config) {
// Note that we push the item to the front of the vector to make
// sure this is the freshest option than others.
if (!job_config->jvm_options().empty()) {
dynamic_options.insert(dynamic_options.begin(), job_config->jvm_options().begin(),
job_config->jvm_options().end());
}
std::string code_search_path_str;
for (int i = 0; i < job_config->code_search_path_size(); i++) {
auto path = job_config->code_search_path(i);
@ -314,11 +315,11 @@ Process WorkerPool::StartWorkerProcess(const Language &language,
}
ProcessEnvironment env;
if (RayConfig::instance().enable_multi_tenancy()) {
if (RayConfig::instance().enable_multi_tenancy() && job_config) {
env.insert(job_config->worker_env().begin(), job_config->worker_env().end());
}
Process proc = StartProcess(worker_command_args, env);
if (RayConfig::instance().enable_multi_tenancy()) {
if (RayConfig::instance().enable_multi_tenancy() && job_config) {
// If the pid is reused between processes, the old process must have exited.
// So it's safe to bind the pid with another job ID.
RAY_LOG(DEBUG) << "Worker process " << proc.GetId() << " is bound to job " << job_id;
@ -464,7 +465,8 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
state.registered_workers.insert(worker);
if (RayConfig::instance().enable_multi_tenancy()) {
if (RayConfig::instance().enable_multi_tenancy() &&
worker->GetWorkerType() != rpc::WorkerType::IO_WORKER) {
auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid);
RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end());
auto job_id = dedicated_workers_it->second;

View file

@ -331,23 +331,6 @@ void raylet::RayletClient::RequestObjectSpillage(
grpc_client_->RequestObjectSpillage(request, callback);
}
/// Restore spilled objects from external storage.
/// \param object_ids The IDs of objects to be restored.
Status raylet::RayletClient::ForceRestoreSpilledObjects(
const std::vector<ObjectID> &object_ids) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
protocol::CreateForceRestoreSpilledObjectsRequest(fbb, to_flatbuf(fbb, object_ids));
fbb.Finish(message);
std::vector<uint8_t> reply;
RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(
MessageType::ForceRestoreSpilledObjectsRequest,
MessageType::ForceRestoreSpilledObjectsReply, &reply, &fbb));
RAY_UNUSED(
flatbuffers::GetRoot<protocol::ForceRestoreSpilledObjectsReply>(reply.data()));
return Status::OK();
}
Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) {
rpc::ReturnWorkerRequest request;

View file

@ -341,11 +341,6 @@ class RayletClient : public PinObjectsInterface,
const ObjectID &object_id,
const rpc::ClientCallback<rpc::RequestObjectSpillageReply> &callback);
/// Restore spilled objects from external storage.
/// \param object_ids The IDs of objects to be restored.
/// \return ray::Status
ray::Status ForceRestoreSpilledObjects(const std::vector<ObjectID> &object_ids);
/// Implements WorkerLeaseInterface.
void RequestWorkerLease(
const ray::TaskSpecification &resource_spec,