[object store refactor 4/n] object lifecycle manager (#17344)

* lifecycle

* address comments
This commit is contained in:
Chen Shen 2021-08-16 09:58:35 -07:00 committed by GitHub
parent 7d690e7231
commit b349c6bc4f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1046 additions and 406 deletions

View file

@ -334,6 +334,7 @@ cc_library(
"src/ray/object_manager/plasma/create_request_queue.cc", "src/ray/object_manager/plasma/create_request_queue.cc",
"src/ray/object_manager/plasma/dlmalloc.cc", "src/ray/object_manager/plasma/dlmalloc.cc",
"src/ray/object_manager/plasma/eviction_policy.cc", "src/ray/object_manager/plasma/eviction_policy.cc",
"src/ray/object_manager/plasma/object_lifecycle_manager.cc",
"src/ray/object_manager/plasma/object_store.cc", "src/ray/object_manager/plasma/object_store.cc",
"src/ray/object_manager/plasma/plasma_allocator.cc", "src/ray/object_manager/plasma/plasma_allocator.cc",
"src/ray/object_manager/plasma/store.cc", "src/ray/object_manager/plasma/store.cc",
@ -344,6 +345,7 @@ cc_library(
"src/ray/object_manager/plasma/allocator.h", "src/ray/object_manager/plasma/allocator.h",
"src/ray/object_manager/plasma/create_request_queue.h", "src/ray/object_manager/plasma/create_request_queue.h",
"src/ray/object_manager/plasma/eviction_policy.h", "src/ray/object_manager/plasma/eviction_policy.h",
"src/ray/object_manager/plasma/object_lifecycle_manager.h",
"src/ray/object_manager/plasma/object_store.h", "src/ray/object_manager/plasma/object_store.h",
"src/ray/object_manager/plasma/plasma_allocator.h", "src/ray/object_manager/plasma/plasma_allocator.h",
"src/ray/object_manager/plasma/store.h", "src/ray/object_manager/plasma/store.h",
@ -990,6 +992,20 @@ cc_test(
], ],
) )
cc_test(
name = "object_lifecycle_manager_test",
srcs = [
"src/ray/object_manager/plasma/test/object_lifecycle_manager_test.cc",
],
copts = COPTS,
deps = [
":plasma_store_server_lib",
"@com_google_absl//absl/random",
"@com_google_absl//absl/strings:str_format",
"@com_google_googletest//:gtest_main",
],
)
cc_test( cc_test(
name = "create_request_queue_test", name = "create_request_queue_test",
size = "small", size = "small",

View file

@ -253,9 +253,6 @@ RAY_CONFIG(bool, gcs_grpc_based_pubsub, false)
/// Duration to sleep after failing to put an object in plasma because it is full. /// Duration to sleep after failing to put an object in plasma because it is full.
RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10) RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10)
/// The amount of time to wait between logging plasma space usage debug messages.
RAY_CONFIG(uint64_t, object_store_usage_log_interval_s, 10 * 60)
/// The threshold to trigger a global gc /// The threshold to trigger a global gc
RAY_CONFIG(double, high_plasma_storage_usage, 0.7) RAY_CONFIG(double, high_plasma_storage_usage, 0.7)

View file

@ -23,6 +23,7 @@
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include "gtest/gtest.h"
#include "ray/common/id.h" #include "ray/common/id.h"
#include "ray/object_manager/common.h" #include "ray/object_manager/common.h"
#include "ray/object_manager/plasma/compat.h" #include "ray/object_manager/plasma/compat.h"
@ -37,9 +38,6 @@ using ray::WorkerID;
enum class ObjectLocation : int32_t { Local, Remote, Nonexistent }; enum class ObjectLocation : int32_t { Local, Remote, Nonexistent };
/// Size of object hash digests.
constexpr int64_t kDigestSize = sizeof(uint64_t);
enum class ObjectState : int { enum class ObjectState : int {
/// Object was created but not sealed in the local Plasma Store. /// Object was created but not sealed in the local Plasma Store.
PLASMA_CREATED = 1, PLASMA_CREATED = 1,
@ -62,6 +60,13 @@ struct Allocation {
/// the total size of this mapped memory. /// the total size of this mapped memory.
int64_t mmap_size; int64_t mmap_size;
// only allow moves.
RAY_DISALLOW_COPY_AND_ASSIGN(Allocation);
Allocation(Allocation &&) noexcept = default;
Allocation &operator=(Allocation &&) noexcept = default;
private:
// Only created by Allocator
Allocation(void *address, int64_t size, MEMFD_TYPE fd, ptrdiff_t offset, int device_num, Allocation(void *address, int64_t size, MEMFD_TYPE fd, ptrdiff_t offset, int device_num,
int64_t mmap_size) int64_t mmap_size)
: address(address), : address(address),
@ -71,27 +76,47 @@ struct Allocation {
device_num(device_num), device_num(device_num),
mmap_size(mmap_size) {} mmap_size(mmap_size) {}
// only allow moves. // Test only
RAY_DISALLOW_COPY_AND_ASSIGN(Allocation); Allocation()
Allocation(Allocation &&) noexcept = default; : address(nullptr), size(0), fd(), offset(0), device_num(0), mmap_size(0) {}
Allocation &operator=(Allocation &&) noexcept = default;
friend class PlasmaAllocator;
friend struct ObjectLifecycleManagerTest;
FRIEND_TEST(ObjectStoreTest, PassThroughTest);
}; };
/// This type is used by the Plasma store. It is here because it is exposed to /// This type is used by the Plasma store. It is here because it is exposed to
/// the eviction policy. /// the eviction policy.
struct LocalObject { class LocalObject {
public:
LocalObject(Allocation allocation); LocalObject(Allocation allocation);
RAY_DISALLOW_COPY_AND_ASSIGN(LocalObject); RAY_DISALLOW_COPY_AND_ASSIGN(LocalObject);
int64_t GetObjectSize() const { return object_info.GetObjectSize(); } int64_t GetObjectSize() const { return object_info.GetObjectSize(); }
bool Sealed() const { return state == ObjectState::PLASMA_SEALED; }
int32_t GetRefCount() const { return ref_count; }
const ray::ObjectInfo &GetObjectInfo() const { return object_info; }
const Allocation &GetAllocation() const { return allocation; }
private:
friend class ObjectStore;
friend class ObjectLifecycleManager;
FRIEND_TEST(ObjectStoreTest, PassThroughTest);
friend struct ObjectLifecycleManagerTest;
FRIEND_TEST(ObjectLifecycleManagerTest, RemoveReferenceOneRefNotSealed);
/// Allocation Info; /// Allocation Info;
Allocation allocation; Allocation allocation;
/// Ray object info; /// Ray object info;
ray::ObjectInfo object_info; ray::ObjectInfo object_info;
/// Number of clients currently using this object. /// Number of clients currently using this object.
mutable int ref_count; /// TODO: ref_count probably shouldn't belong to LocalObject.
mutable int32_t ref_count;
/// Unix epoch of when this object was created. /// Unix epoch of when this object was created.
int64_t create_time; int64_t create_time;
/// How long creation of this object took. /// How long creation of this object took.

View file

@ -90,7 +90,7 @@ int64_t LRUCache::ChooseObjectsToEvict(int64_t num_bytes_required,
return bytes_evicted; return bytes_evicted;
} }
EvictionPolicy::EvictionPolicy(const ObjectStore &object_store, EvictionPolicy::EvictionPolicy(const IObjectStore &object_store,
const IAllocator &allocator) const IAllocator &allocator)
: pinned_memory_bytes_(0), : pinned_memory_bytes_(0),
cache_("global lru", allocator.GetFootprintLimit()), cache_("global lru", allocator.GetFootprintLimit()),

View file

@ -15,6 +15,12 @@
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
// ==== The eviction policy ====
//
// This file contains declaration for all functions and data structures that
// need to be provided if you want to implement a new eviction algorithm for the
// Plasma store.
#pragma once #pragma once
#include <functional> #include <functional>
@ -31,13 +37,71 @@
namespace plasma { namespace plasma {
class Client; /// The eviction policy interface.
class IEvictionPolicy {
public:
virtual ~IEvictionPolicy() = default;
// ==== The eviction policy ==== /// This method will be called whenever an object is first created in order to
// /// add it to the LRU cache. This is done so that the first time, the Plasma
// This file contains declaration for all functions and data structures that /// store calls begin_object_access, we can remove the object from the LRU
// need to be provided if you want to implement a new eviction algorithm for the /// cache.
// Plasma store. ///
/// \param object_id The object ID of the object that was created.
/// \param is_create Whether we are creating a new object (vs reading an object).
virtual void ObjectCreated(const ObjectID &object_id, bool is_create) = 0;
/// This method will be called when the Plasma store needs more space, perhaps
/// to create a new object. When this method is called, the eviction
/// policy will assume that the objects chosen to be evicted will in fact be
/// evicted from the Plasma store by the caller.
///
/// \param size The size in bytes of the new object, including both data and
/// metadata.
/// \param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
/// \return The number of bytes of space that is still needed, if
/// any. If negative, then the required space has been made.
virtual int64_t RequireSpace(int64_t size, std::vector<ObjectID> *objects_to_evict) = 0;
/// This method will be called whenever an unused object in the Plasma store
/// starts to be used. When this method is called, the eviction policy will
/// assume that the objects chosen to be evicted will in fact be evicted from
/// the Plasma store by the caller.
///
/// \param object_id The ID of the object that is now being used.
virtual void BeginObjectAccess(const ObjectID &object_id) = 0;
/// This method will be called whenever an object in the Plasma store that was
/// being used is no longer being used. When this method is called, the
/// eviction policy will assume that the objects chosen to be evicted will in
/// fact be evicted from the Plasma store by the caller.
///
/// \param object_id The ID of the object that is no longer being used.
virtual void EndObjectAccess(const ObjectID &object_id) = 0;
/// Choose some objects to evict from the Plasma store. When this method is
/// called, the eviction policy will assume that the objects chosen to be
/// evicted will in fact be evicted from the Plasma store by the caller.
///
/// @note This method is not part of the API. It is exposed in the header file
/// only for testing.
///
/// \param num_bytes_required The number of bytes of space to try to free up.
/// \param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
/// \return The total number of bytes of space chosen to be evicted.
virtual int64_t ChooseObjectsToEvict(int64_t num_bytes_required,
std::vector<ObjectID> *objects_to_evict) = 0;
/// This method will be called when an object is going to be removed
///
/// \param object_id The ID of the object that is now being used.
virtual void RemoveObject(const ObjectID &object_id) = 0;
/// Returns debugging information for this eviction policy.
virtual std::string DebugString() const = 0;
};
class LRUCache { class LRUCache {
public: public:
@ -91,81 +155,27 @@ class LRUCache {
int64_t bytes_evicted_total_; int64_t bytes_evicted_total_;
}; };
/// The eviction policy. /// The eviction policy implementation
class EvictionPolicy { class EvictionPolicy : public IEvictionPolicy {
public: public:
/// Construct an eviction policy. EvictionPolicy(const IObjectStore &object_store, const IAllocator &allocator);
///
/// \param object_store Reference to the object_store
/// \param allocator Reference to the allocator
EvictionPolicy(const ObjectStore &object_store, const IAllocator &allocator);
/// Destroy an eviction policy. void ObjectCreated(const ObjectID &object_id, bool is_create) override;
virtual ~EvictionPolicy() {}
/// This method will be called whenever an object is first created in order to int64_t RequireSpace(int64_t size, std::vector<ObjectID> *objects_to_evict) override;
/// add it to the LRU cache. This is done so that the first time, the Plasma
/// store calls begin_object_access, we can remove the object from the LRU
/// cache.
///
/// \param object_id The object ID of the object that was created.
/// \param is_create Whether we are creating a new object (vs reading an object).
virtual void ObjectCreated(const ObjectID &object_id, bool is_create);
/// This method will be called when the Plasma store needs more space, perhaps void BeginObjectAccess(const ObjectID &object_id) override;
/// to create a new object. When this method is called, the eviction
/// policy will assume that the objects chosen to be evicted will in fact be
/// evicted from the Plasma store by the caller.
///
/// \param size The size in bytes of the new object, including both data and
/// metadata.
/// \param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
/// \return The number of bytes of space that is still needed, if
/// any. If negative, then the required space has been made.
virtual int64_t RequireSpace(int64_t size, std::vector<ObjectID> *objects_to_evict);
/// This method will be called whenever an unused object in the Plasma store void EndObjectAccess(const ObjectID &object_id) override;
/// starts to be used. When this method is called, the eviction policy will
/// assume that the objects chosen to be evicted will in fact be evicted from
/// the Plasma store by the caller.
///
/// \param object_id The ID of the object that is now being used.
virtual void BeginObjectAccess(const ObjectID &object_id);
/// This method will be called whenever an object in the Plasma store that was int64_t ChooseObjectsToEvict(int64_t num_bytes_required,
/// being used is no longer being used. When this method is called, the std::vector<ObjectID> *objects_to_evict) override;
/// eviction policy will assume that the objects chosen to be evicted will in
/// fact be evicted from the Plasma store by the caller.
///
/// \param object_id The ID of the object that is no longer being used.
virtual void EndObjectAccess(const ObjectID &object_id);
/// Choose some objects to evict from the Plasma store. When this method is void RemoveObject(const ObjectID &object_id) override;
/// called, the eviction policy will assume that the objects chosen to be
/// evicted will in fact be evicted from the Plasma store by the caller.
///
/// @note This method is not part of the API. It is exposed in the header file
/// only for testing.
///
/// \param num_bytes_required The number of bytes of space to try to free up.
/// \param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
/// \return The total number of bytes of space chosen to be evicted.
virtual int64_t ChooseObjectsToEvict(int64_t num_bytes_required,
std::vector<ObjectID> *objects_to_evict);
/// This method will be called when an object is going to be removed std::string DebugString() const override;
///
/// \param object_id The ID of the object that is now being used.
virtual void RemoveObject(const ObjectID &object_id);
/// Returns debugging information for this eviction policy. private:
virtual std::string DebugString() const;
int64_t GetPinnedMemoryBytes() const { return pinned_memory_bytes_; }
protected:
/// Returns the size of the object /// Returns the size of the object
int64_t GetObjectSize(const ObjectID &object_id) const; int64_t GetObjectSize(const ObjectID &object_id) const;
@ -175,7 +185,7 @@ class EvictionPolicy {
/// Datastructure for the LRU cache. /// Datastructure for the LRU cache.
LRUCache cache_; LRUCache cache_;
const ObjectStore &object_store_; const IObjectStore &object_store_;
const IAllocator &allocator_; const IAllocator &allocator_;
}; };

View file

@ -26,6 +26,7 @@ namespace plasma {
std::unordered_map<void *, MmapRecord> mmap_records; std::unordered_map<void *, MmapRecord> mmap_records;
namespace internal {
static void *pointer_advance(void *p, ptrdiff_t n) { return (unsigned char *)p + n; } static void *pointer_advance(void *p, ptrdiff_t n) { return (unsigned char *)p + n; }
static ptrdiff_t pointer_distance(void const *pfrom, void const *pto) { static ptrdiff_t pointer_distance(void const *pfrom, void const *pto) {
@ -52,5 +53,5 @@ bool GetMallocMapinfo(const void *const addr, MEMFD_TYPE *fd, int64_t *map_size,
return false; return false;
} }
} // namespace internal
} // namespace plasma } // namespace plasma

View file

@ -41,11 +41,9 @@ struct MmapRecord {
/// and size. /// and size.
extern std::unordered_map<void *, MmapRecord> mmap_records; extern std::unordered_map<void *, MmapRecord> mmap_records;
/// private function, only used by PlasmaAllocator to look up Mmap information /// private function, only used by PlasmaAllocator
/// given an address allocated by dlmalloc. namespace internal {
///
/// \return true if look up succeed. false means the address is not allocated
/// by dlmalloc.
bool GetMallocMapinfo(const void *const addr, MEMFD_TYPE *fd, int64_t *map_length, bool GetMallocMapinfo(const void *const addr, MEMFD_TYPE *fd, int64_t *map_length,
ptrdiff_t *offset); ptrdiff_t *offset);
}
} // namespace plasma } // namespace plasma

View file

@ -0,0 +1,278 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "absl/time/clock.h"
#include "ray/common/ray_config.h"
#include "ray/object_manager/plasma/object_lifecycle_manager.h"
namespace plasma {
using namespace flatbuf;
ObjectLifecycleManager::ObjectLifecycleManager(
IAllocator &allocator, ray::DeleteObjectCallback delete_object_callback)
: object_store_(std::make_unique<ObjectStore>(allocator)),
eviction_policy_(std::make_unique<EvictionPolicy>(*object_store_, allocator)),
delete_object_callback_(delete_object_callback),
earger_deletion_objects_(),
num_bytes_in_use_(0) {}
std::pair<const LocalObject *, flatbuf::PlasmaError> ObjectLifecycleManager::CreateObject(
const ray::ObjectInfo &object_info, plasma::flatbuf::ObjectSource source,
bool fallback_allocator) {
RAY_LOG(DEBUG) << "attempting to create object " << object_info.object_id << " size "
<< object_info.data_size;
if (object_store_->GetObject(object_info.object_id) != nullptr) {
return {nullptr, PlasmaError::ObjectExists};
}
auto entry = CreateObjectInternal(object_info, source, fallback_allocator);
if (entry == nullptr) {
return {nullptr, PlasmaError::OutOfMemory};
}
eviction_policy_->ObjectCreated(object_info.object_id, /*is_create=*/true);
return {entry, PlasmaError::OK};
}
const LocalObject *ObjectLifecycleManager::GetObject(const ObjectID &object_id) const {
return object_store_->GetObject(object_id);
}
const LocalObject *ObjectLifecycleManager::SealObject(const ObjectID &object_id) {
// TODO(scv119): should we check delete object from earger_deletion_objects_?
return object_store_->SealObject(object_id);
}
flatbuf::PlasmaError ObjectLifecycleManager::AbortObject(const ObjectID &object_id) {
auto entry = object_store_->GetObject(object_id);
if (entry == nullptr) {
RAY_LOG(ERROR) << "To abort an object it must be in the object table.";
return PlasmaError::ObjectNonexistent;
}
if (entry->state == ObjectState::PLASMA_SEALED) {
RAY_LOG(ERROR) << "To abort an object it must not have been sealed.";
return PlasmaError::ObjectSealed;
}
if (entry->ref_count > 0) {
// A client was using this object.
num_bytes_in_use_ -= entry->GetObjectSize();
RAY_LOG(DEBUG) << "Erasing object " << object_id << " with nonzero ref count"
<< object_id << ", num bytes in use is now " << num_bytes_in_use_;
}
DeleteObjectInternal(object_id);
return PlasmaError::OK;
}
PlasmaError ObjectLifecycleManager::DeleteObject(const ObjectID &object_id) {
auto entry = object_store_->GetObject(object_id);
if (entry == nullptr) {
return PlasmaError::ObjectNonexistent;
}
// TODO(scv119): should we delete unsealed with ref_count 0?
if (entry->state != ObjectState::PLASMA_SEALED) {
// To delete an object it must have been sealed,
// otherwise there might be memeory corruption.
// Put it into deletion cache, it will be deleted later.
earger_deletion_objects_.emplace(object_id);
return PlasmaError::ObjectNotSealed;
}
if (entry->ref_count != 0) {
// To delete an object, there must be no clients currently using it.
// Put it into deletion cache, it will be deleted later.
earger_deletion_objects_.emplace(object_id);
return PlasmaError::ObjectInUse;
}
DeleteObjectInternal(object_id);
return PlasmaError::OK;
}
int64_t ObjectLifecycleManager::RequireSpace(int64_t size) {
std::vector<ObjectID> objects_to_evict;
int64_t num_bytes_evicted =
eviction_policy_->ChooseObjectsToEvict(size, &objects_to_evict);
EvictObjects(objects_to_evict);
return num_bytes_evicted;
}
bool ObjectLifecycleManager::AddReference(const ObjectID &object_id) {
auto entry = object_store_->GetObject(object_id);
if (!entry) {
RAY_LOG(ERROR) << object_id << " doesn't exist, add reference failed.";
return false;
}
// If there are no other clients using this object, notify the eviction policy
// that the object is being used.
if (entry->ref_count == 0) {
// Tell the eviction policy that this object is being used.
eviction_policy_->BeginObjectAccess(object_id);
num_bytes_in_use_ += entry->GetObjectSize();
}
// Increase reference count.
entry->ref_count++;
RAY_LOG(DEBUG) << "Object " << object_id << " reference has incremented"
<< ", num bytes in use is now " << num_bytes_in_use_;
return true;
}
bool ObjectLifecycleManager::RemoveReference(const ObjectID &object_id) {
auto entry = object_store_->GetObject(object_id);
if (!entry || entry->ref_count == 0) {
RAY_LOG(ERROR)
<< object_id
<< " doesn't exist, or its ref count is already 0, remove reference failed.";
return false;
}
entry->ref_count--;
if (entry->ref_count > 0) {
return true;
}
num_bytes_in_use_ -= entry->GetObjectSize();
RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id
<< ", num bytes in use is now " << num_bytes_in_use_;
eviction_policy_->EndObjectAccess(object_id);
// TODO(scv119): handle this anomaly in upper layer.
RAY_CHECK(entry->Sealed()) << object_id << " is not sealed while ref count becomes 0.";
if (earger_deletion_objects_.count(object_id) > 0) {
DeleteObjectInternal(object_id);
}
return true;
}
std::string ObjectLifecycleManager::EvictionPolicyDebugString() const {
return eviction_policy_->DebugString();
}
const LocalObject *ObjectLifecycleManager::CreateObjectInternal(
const ray::ObjectInfo &object_info, plasma::flatbuf::ObjectSource source,
bool allow_fallback_allocation) {
// Try to evict objects until there is enough space.
// NOTE(ekl) if we can't achieve this after a number of retries, it's
// because memory fragmentation in dlmalloc prevents us from allocating
// even if our footprint tracker here still says we have free space.
for (int num_tries = 0; num_tries <= 10; num_tries++) {
auto result =
object_store_->CreateObject(object_info, source, /*fallback_allocate*/ false);
if (result != nullptr) {
return result;
}
// Tell the eviction policy how much space we need to create this object.
std::vector<ObjectID> objects_to_evict;
int64_t space_needed =
eviction_policy_->RequireSpace(object_info.GetObjectSize(), &objects_to_evict);
EvictObjects(objects_to_evict);
// More space is still needed.
if (space_needed > 0) {
RAY_LOG(DEBUG) << "attempt to allocate " << object_info.GetObjectSize()
<< " failed, need " << space_needed;
break;
}
}
if (!allow_fallback_allocation) {
RAY_LOG(DEBUG) << "Fallback allocation not enabled for this request.";
return nullptr;
}
RAY_LOG(INFO)
<< "Shared memory store full, falling back to allocating from filesystem: "
<< object_info.GetObjectSize();
auto result =
object_store_->CreateObject(object_info, source, /*fallback_allocate*/ true);
if (result == nullptr) {
RAY_LOG(ERROR) << "Plasma fallback allocator failed, likely out of disk space.";
}
return result;
}
void ObjectLifecycleManager::EvictObjects(const std::vector<ObjectID> &object_ids) {
for (const auto &object_id : object_ids) {
RAY_LOG(DEBUG) << "evicting object " << object_id.Hex();
auto entry = object_store_->GetObject(object_id);
// TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been
// created but not sealed.
RAY_CHECK(entry != nullptr) << "To evict an object it must be in the object table.";
RAY_CHECK(entry->state == ObjectState::PLASMA_SEALED)
<< "To evict an object it must have been sealed.";
RAY_CHECK(entry->ref_count == 0)
<< "To evict an object, there must be no clients currently using it.";
DeleteObjectInternal(object_id);
}
}
void ObjectLifecycleManager::DeleteObjectInternal(const ObjectID &object_id) {
auto entry = object_store_->GetObject(object_id);
RAY_CHECK(entry != nullptr);
bool aborted = entry->state == ObjectState::PLASMA_CREATED;
earger_deletion_objects_.erase(object_id);
eviction_policy_->RemoveObject(object_id);
object_store_->DeleteObject(object_id);
if (!aborted) {
// only send notification if it's not aborted.
delete_object_callback_(object_id);
}
}
int64_t ObjectLifecycleManager::GetNumBytesInUse() const { return num_bytes_in_use_; }
bool ObjectLifecycleManager::IsObjectSealed(const ObjectID &object_id) const {
auto entry = GetObject(object_id);
return entry && entry->state == ObjectState::PLASMA_SEALED;
}
int64_t ObjectLifecycleManager::GetNumBytesCreatedTotal() const {
return object_store_->GetNumBytesCreatedTotal();
}
int64_t ObjectLifecycleManager::GetNumBytesUnsealed() const {
return object_store_->GetNumBytesUnsealed();
}
int64_t ObjectLifecycleManager::GetNumObjectsUnsealed() const {
return object_store_->GetNumObjectsUnsealed();
}
void ObjectLifecycleManager::GetDebugDump(std::stringstream &buffer) const {
return object_store_->GetDebugDump(buffer);
}
// For test only.
ObjectLifecycleManager::ObjectLifecycleManager(
std::unique_ptr<IObjectStore> store, std::unique_ptr<IEvictionPolicy> eviction_policy,
ray::DeleteObjectCallback delete_object_callback)
: object_store_(std::move(store)),
eviction_policy_(std::move(eviction_policy)),
delete_object_callback_(delete_object_callback),
earger_deletion_objects_(),
num_bytes_in_use_(0) {}
} // namespace plasma

View file

@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "absl/container/flat_hash_set.h"
#include "gtest/gtest.h"
#include "ray/object_manager/plasma/common.h"
#include "ray/object_manager/plasma/eviction_policy.h"
#include "ray/object_manager/plasma/object_store.h"
#include "ray/object_manager/plasma/plasma_allocator.h"
namespace plasma {
// ObjectLifecycleManager allocates LocalObjects from the allocator.
// It tracks objects lifecycle states such as reference count or object states
// (created/sealed). It lazily garbage collects objects when running out of space.
class ObjectLifecycleManager {
public:
ObjectLifecycleManager(IAllocator &allocator,
ray::DeleteObjectCallback delete_object_callback);
/// Create a new object given object's info. Object creation might
/// fail if runs out of space; or an object with the same id exists.
///
/// \param object_info Plasma object info.
/// \param source From where the object is created.
/// \param fallback_allocator Whether to allow fallback allocation.
/// \return
/// - pointer to created object and PlasmaError::OK when succeeds.
/// - nullptr and error message, including ObjectExists/OutOfMemory
/// TODO(scv119): use RAII instead of pointer for returned object.
std::pair<const LocalObject *, flatbuf::PlasmaError> CreateObject(
const ray::ObjectInfo &object_info, plasma::flatbuf::ObjectSource source,
bool fallback_allocator);
/// Get object by id.
/// \return
/// - nullptr if such object doesn't exist.
/// - otherwise, pointer to the object.
const LocalObject *GetObject(const ObjectID &object_id) const;
/// Seal created object by id.
///
/// \param object_id Object ID of the object to be sealed.
/// \return
/// - nulltpr if such object doesn't exist, or the object has already been sealed.
/// - otherise, pointer to the sealed object.
const LocalObject *SealObject(const ObjectID &object_id);
/// Abort object creation by id. It deletes the object regardless of reference
/// counting.
///
/// \param object_id Object ID of the object to be aborted.
/// \return One of the following error codes:
/// - PlasmaError::OK, if the object was aborted successfully.
/// - PlasmaError::ObjectNonexistent, if ths object doesn't exist.
/// - PlasmaError::ObjectSealed, if ths object has already been sealed.
flatbuf::PlasmaError AbortObject(const ObjectID &object_id);
/// Delete a specific object by object_id. The object is delete immediately
/// if it's been sealed and reference counting is zero. Otherwise it will be
/// asynchronously deleted once there is no usage.
///
/// \param object_id Object ID of the object to be deleted.
/// \return One of the following error codes:
/// - PlasmaError::OK, if the object was delete successfully.
/// - PlasmaError::ObjectNonexistent, if ths object doesn't exist.
/// - PlasmaError::ObjectNotsealed, if ths object is created but not sealed.
/// - PlasmaError::ObjectInUse, if the object is in use; it will be deleted
/// once it's no longer used (ref count becomes 0).
flatbuf::PlasmaError DeleteObject(const ObjectID &object_id);
/// Bump up the reference count of the object.
///
/// \return true if object exists, false otherise.
bool AddReference(const ObjectID &object_id);
/// Decrese the reference count of the object. When reference count
/// drop to zero the object becomes evictable.
///
/// \return true if object exists and reference count is greater than 0, false otherise.
bool RemoveReference(const ObjectID &object_id);
/// Ask it to evict objects until we have at least size of capacity
/// available.
/// TEST ONLY
///
/// \return The number of bytes evicted.
int64_t RequireSpace(int64_t size);
std::string EvictionPolicyDebugString() const;
bool IsObjectSealed(const ObjectID &object_id) const;
int64_t GetNumBytesInUse() const;
int64_t GetNumBytesCreatedTotal() const;
int64_t GetNumBytesUnsealed() const;
int64_t GetNumObjectsUnsealed() const;
void GetDebugDump(std::stringstream &buffer) const;
private:
// Test only
ObjectLifecycleManager(std::unique_ptr<IObjectStore> store,
std::unique_ptr<IEvictionPolicy> eviction_policy,
ray::DeleteObjectCallback delete_object_callback);
const LocalObject *CreateObjectInternal(const ray::ObjectInfo &object_info,
plasma::flatbuf::ObjectSource source,
bool allow_fallback_allocation);
// Evict objects returned by the eviction policy.
//
// \param object_ids Object IDs of the objects to be evicted.
void EvictObjects(const std::vector<ObjectID> &object_ids);
void DeleteObjectInternal(const ObjectID &object_id);
private:
friend struct ObjectLifecycleManagerTest;
FRIEND_TEST(ObjectLifecycleManagerTest, DeleteFailure);
FRIEND_TEST(ObjectLifecycleManagerTest, RemoveReferenceOneRefEagerlyDeletion);
std::unique_ptr<IObjectStore> object_store_;
std::unique_ptr<IEvictionPolicy> eviction_policy_;
const ray::DeleteObjectCallback delete_object_callback_;
// list of objects which will be removed immediately
// once reference count becomes 0.
absl::flat_hash_set<ObjectID> earger_deletion_objects_;
// Total bytes of the objects whose references are greater than 0.
int64_t num_bytes_in_use_;
};
} // namespace plasma

View file

@ -32,6 +32,9 @@ const LocalObject *ObjectStore::CreateObject(const ray::ObjectInfo &object_info,
auto object_size = object_info.GetObjectSize(); auto object_size = object_info.GetObjectSize();
auto allocation = fallback_allocate ? allocator_.FallbackAllocate(object_size) auto allocation = fallback_allocate ? allocator_.FallbackAllocate(object_size)
: allocator_.Allocate(object_size); : allocator_.Allocate(object_size);
RAY_LOG_EVERY_MS(INFO, 10 * 60 * 1000)
<< "Object store current usage " << (allocator_.Allocated() / 1e9) << " / "
<< (allocator_.GetFootprintLimit() / 1e9) << " GB.";
if (!allocation.has_value()) { if (!allocation.has_value()) {
return nullptr; return nullptr;
} }

View file

@ -24,12 +24,11 @@
namespace plasma { namespace plasma {
// ObjectStore stores objects with unique object id. It uses IAllocator // IObjectStore stores objects with unique object id.
// to allocate memory for object creation. // It's not thread safe.
// ObjectStore is not thread safe. class IObjectStore {
class ObjectStore {
public: public:
explicit ObjectStore(IAllocator &allocator); virtual ~IObjectStore() = default;
/// Create a new object given object's info. Caller need to decide /// Create a new object given object's info. Caller need to decide
/// to use primary allocation or fallback allocation by setting /// to use primary allocation or fallback allocation by setting
@ -41,9 +40,9 @@ class ObjectStore {
/// \param fallback_allocate Whether to use fallback allocation. /// \param fallback_allocate Whether to use fallback allocation.
/// \return /// \return
/// - pointer to created object or nullptr when out of space. /// - pointer to created object or nullptr when out of space.
const LocalObject *CreateObject(const ray::ObjectInfo &object_info, virtual const LocalObject *CreateObject(const ray::ObjectInfo &object_info,
plasma::flatbuf::ObjectSource source, plasma::flatbuf::ObjectSource source,
bool fallback_allocate); bool fallback_allocate) = 0;
/// Get object by id. /// Get object by id.
/// ///
@ -51,31 +50,56 @@ class ObjectStore {
/// \return /// \return
/// - nullptr if such object doesn't exist. /// - nullptr if such object doesn't exist.
/// - otherwise, pointer to the object. /// - otherwise, pointer to the object.
const LocalObject *GetObject(const ObjectID &object_id) const; virtual const LocalObject *GetObject(const ObjectID &object_id) const = 0;
/// Seal created object by id. /// Seal created object by id.
/// ///
/// \param object_id Object ID of the object to be sealed. /// \param object_id Object ID of the object to be sealed.
/// \return /// \return
/// - nulltpr if such object doesn't exist, or the object has already been sealed. /// - nulltpr if such object doesn't exist, or the object has already been sealed.
/// - otherise, pointer to the sealed object.. /// - otherise, pointer to the sealed object.
const LocalObject *SealObject(const ObjectID &object_id); virtual const LocalObject *SealObject(const ObjectID &object_id) = 0;
/// Delete an existing object. /// Delete an existing object.
/// ///
/// \param object_id Object ID of the object to be sealed. /// \param object_id Object ID of the object to be sealed.
/// \return /// \return
/// - false if such object doesn't exist. /// - false if such object doesn't exist.
/// - true if abort successfuly. /// - true if deleted.
bool DeleteObject(const ObjectID &object_id); virtual bool DeleteObject(const ObjectID &object_id) = 0;
int64_t GetNumBytesCreatedTotal() const; virtual int64_t GetNumBytesCreatedTotal() const = 0;
int64_t GetNumBytesUnsealed() const; virtual int64_t GetNumBytesUnsealed() const = 0;
int64_t GetNumObjectsUnsealed() const; virtual int64_t GetNumObjectsUnsealed() const = 0;
void GetDebugDump(std::stringstream &buffer) const; virtual void GetDebugDump(std::stringstream &buffer) const = 0;
};
// ObjectStore implements IObjectStore. It uses IAllocator
// to allocate memory for object creation.
class ObjectStore : public IObjectStore {
public:
explicit ObjectStore(IAllocator &allocator);
const LocalObject *CreateObject(const ray::ObjectInfo &object_info,
plasma::flatbuf::ObjectSource source,
bool fallback_allocate) override;
const LocalObject *GetObject(const ObjectID &object_id) const override;
const LocalObject *SealObject(const ObjectID &object_id) override;
bool DeleteObject(const ObjectID &object_id) override;
int64_t GetNumBytesCreatedTotal() const override;
int64_t GetNumBytesUnsealed() const override;
int64_t GetNumObjectsUnsealed() const override;
void GetDebugDump(std::stringstream &buffer) const override;
private: private:
LocalObject *GetMutableObject(const ObjectID &object_id); LocalObject *GetMutableObject(const ObjectID &object_id);

View file

@ -23,4 +23,5 @@ namespace plasma {
LocalObject::LocalObject(Allocation allocation) LocalObject::LocalObject(Allocation allocation)
: allocation(std::move(allocation)), ref_count(0) {} : allocation(std::move(allocation)), ref_count(0) {}
} // namespace plasma } // namespace plasma

View file

@ -82,6 +82,8 @@ enum PlasmaError:int {
// the result of the same request twice. This is most likely due to a system // the result of the same request twice. This is most likely due to a system
// bug in the plasma store or caller. // bug in the plasma store or caller.
UnexpectedError, UnexpectedError,
// Trying to abort an object but it's not sealed.
ObjectSealed,
} }
// Plasma store messages // Plasma store messages

View file

@ -55,20 +55,6 @@ const size_t kAllocationAlignment = 64;
// bookkeeping. // bookkeeping.
const int64_t kDlMallocReserved = 256 * sizeof(size_t); const int64_t kDlMallocReserved = 256 * sizeof(size_t);
absl::optional<Allocation> BuildAllocation(void *addr, size_t size) {
if (addr == nullptr) {
return absl::nullopt;
}
MEMFD_TYPE fd;
int64_t mmap_size;
ptrdiff_t offset;
if (GetMallocMapinfo(addr, &fd, &mmap_size, &offset)) {
return Allocation(addr, static_cast<int64_t>(size), std::move(fd), offset,
0 /* device_number*/, mmap_size);
}
return absl::nullopt;
}
} // namespace } // namespace
PlasmaAllocator::PlasmaAllocator(const std::string &plasma_directory, PlasmaAllocator::PlasmaAllocator(const std::string &plasma_directory,
@ -138,4 +124,19 @@ int64_t PlasmaAllocator::GetFootprintLimit() const { return kFootprintLimit; }
int64_t PlasmaAllocator::Allocated() const { return allocated_; } int64_t PlasmaAllocator::Allocated() const { return allocated_; }
int64_t PlasmaAllocator::FallbackAllocated() const { return fallback_allocated_; } int64_t PlasmaAllocator::FallbackAllocated() const { return fallback_allocated_; }
absl::optional<Allocation> PlasmaAllocator::BuildAllocation(void *addr, size_t size) {
if (addr == nullptr) {
return absl::nullopt;
}
MEMFD_TYPE fd;
int64_t mmap_size;
ptrdiff_t offset;
if (internal::GetMallocMapinfo(addr, &fd, &mmap_size, &offset)) {
return Allocation(addr, static_cast<int64_t>(size), std::move(fd), offset,
0 /* device_number*/, mmap_size);
}
return absl::nullopt;
}
} // namespace plasma } // namespace plasma

View file

@ -22,6 +22,9 @@
#include <cstdint> #include <cstdint>
#include "ray/object_manager/plasma/allocator.h" #include "ray/object_manager/plasma/allocator.h"
#include "absl/types/optional.h"
#include "ray/object_manager/plasma/common.h"
namespace plasma { namespace plasma {
// PlasmaAllocator that allocates memory from mmaped file to // PlasmaAllocator that allocates memory from mmaped file to
@ -79,6 +82,9 @@ class PlasmaAllocator : public IAllocator {
/// Get the number of bytes fallback allocated so far. /// Get the number of bytes fallback allocated so far.
int64_t FallbackAllocated() const override; int64_t FallbackAllocated() const override;
private:
absl::optional<Allocation> BuildAllocation(void *addr, size_t size);
private: private:
const int64_t kFootprintLimit; const int64_t kFootprintLimit;
const size_t kAlignment; const size_t kAlignment;

View file

@ -68,15 +68,16 @@ ray::ObjectID GetCreateRequestObjectId(const std::vector<uint8_t> &message) {
void ToPlasmaObject(const LocalObject &entry, PlasmaObject *object, bool check_sealed) { void ToPlasmaObject(const LocalObject &entry, PlasmaObject *object, bool check_sealed) {
RAY_DCHECK(object != nullptr); RAY_DCHECK(object != nullptr);
if (check_sealed) { if (check_sealed) {
RAY_DCHECK(entry.state == ObjectState::PLASMA_SEALED); RAY_DCHECK(entry.Sealed());
} }
object->store_fd = entry.allocation.fd; object->store_fd = entry.GetAllocation().fd;
object->data_offset = entry.allocation.offset; object->data_offset = entry.GetAllocation().offset;
object->metadata_offset = entry.allocation.offset + entry.object_info.data_size; object->metadata_offset =
object->data_size = entry.object_info.data_size; entry.GetAllocation().offset + entry.GetObjectInfo().data_size;
object->metadata_size = entry.object_info.metadata_size; object->data_size = entry.GetObjectInfo().data_size;
object->device_num = entry.allocation.device_num; object->metadata_size = entry.GetObjectInfo().metadata_size;
object->mmap_size = entry.allocation.mmap_size; object->device_num = entry.GetAllocation().device_num;
object->mmap_size = entry.GetAllocation().mmap_size;
} }
} // namespace } // namespace
@ -155,15 +156,12 @@ PlasmaStore::PlasmaStore(instrumented_io_context &main_service, IAllocator &allo
acceptor_(main_service, ParseUrlEndpoint(socket_name)), acceptor_(main_service, ParseUrlEndpoint(socket_name)),
socket_(main_service), socket_(main_service),
allocator_(allocator), allocator_(allocator),
object_store_(allocator_),
eviction_policy_(object_store_, allocator_),
spill_objects_callback_(spill_objects_callback), spill_objects_callback_(spill_objects_callback),
add_object_callback_(add_object_callback), add_object_callback_(add_object_callback),
delete_object_callback_(delete_object_callback), delete_object_callback_(delete_object_callback),
object_lifecycle_mgr_(allocator_, delete_object_callback_),
delay_on_oom_ms_(delay_on_oom_ms), delay_on_oom_ms_(delay_on_oom_ms),
object_spilling_threshold_(object_spilling_threshold), object_spilling_threshold_(object_spilling_threshold),
usage_log_interval_ns_(RayConfig::instance().object_store_usage_log_interval_s() *
1e9),
create_request_queue_( create_request_queue_(
/*oom_grace_period_s=*/RayConfig::instance().oom_grace_period_s(), /*oom_grace_period_s=*/RayConfig::instance().oom_grace_period_s(),
spill_objects_callback, object_store_full_callback, spill_objects_callback, object_store_full_callback,
@ -190,72 +188,16 @@ void PlasmaStore::Stop() { acceptor_.close(); }
// If this client is not already using the object, add the client to the // If this client is not already using the object, add the client to the
// object's list of clients, otherwise do nothing. // object's list of clients, otherwise do nothing.
void PlasmaStore::AddToClientObjectIds(const ObjectID &object_id, void PlasmaStore::AddToClientObjectIds(const ObjectID &object_id,
const LocalObject *entry,
const std::shared_ptr<Client> &client) { const std::shared_ptr<Client> &client) {
// Check if this client is already using the object. // Check if this client is already using the object.
if (client->object_ids.find(object_id) != client->object_ids.end()) { if (client->object_ids.find(object_id) != client->object_ids.end()) {
return; return;
} }
// If there are no other clients using this object, notify the eviction policy RAY_CHECK(object_lifecycle_mgr_.AddReference(object_id));
// that the object is being used.
if (entry->ref_count == 0) {
// Tell the eviction policy that this object is being used.
eviction_policy_.BeginObjectAccess(object_id);
num_bytes_in_use_ += entry->GetObjectSize();
}
// Increase reference count.
entry->ref_count++;
RAY_LOG(DEBUG) << "Object " << object_id << " in use by client"
<< ", num bytes in use is now " << num_bytes_in_use_;
// Add object id to the list of object ids that this client is using. // Add object id to the list of object ids that this client is using.
client->object_ids.insert(object_id); client->object_ids.insert(object_id);
} }
const LocalObject *PlasmaStore::CreateObjectInternal(const ray::ObjectInfo &object_info,
plasma::flatbuf::ObjectSource source,
bool allow_fallback_allocation) {
// Try to evict objects until there is enough space.
// NOTE(ekl) if we can't achieve this after a number of retries, it's
// because memory fragmentation in dlmalloc prevents us from allocating
// even if our footprint tracker here still says we have free space.
for (int num_tries = 0; num_tries <= 10; num_tries++) {
auto result =
object_store_.CreateObject(object_info, source, /*fallback_allocate*/ false);
if (result != nullptr) {
return result;
}
// Tell the eviction policy how much space we need to create this object.
std::vector<ObjectID> objects_to_evict;
int64_t space_needed =
eviction_policy_.RequireSpace(object_info.GetObjectSize(), &objects_to_evict);
EvictObjects(objects_to_evict);
// More space is still needed.
if (space_needed > 0) {
RAY_LOG(DEBUG) << "attempt to allocate " << object_info.GetObjectSize()
<< " failed, need " << space_needed;
break;
}
}
if (!allow_fallback_allocation) {
RAY_LOG(DEBUG) << "Fallback allocation not enabled for this request.";
return nullptr;
}
RAY_LOG(INFO)
<< "Shared memory store full, falling back to allocating from filesystem: "
<< object_info.GetObjectSize();
auto result =
object_store_.CreateObject(object_info, source, /*fallback_allocate*/ true);
if (result == nullptr) {
RAY_LOG(ERROR) << "Plasma fallback allocator failed, likely out of disk space.";
}
return result;
}
PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr<Client> &client, PlasmaError PlasmaStore::HandleCreateObjectRequest(const std::shared_ptr<Client> &client,
const std::vector<uint8_t> &message, const std::vector<uint8_t> &message,
bool fallback_allocator, bool fallback_allocator,
@ -301,35 +243,15 @@ PlasmaError PlasmaStore::CreateObject(const ray::ObjectInfo &object_info,
fb::ObjectSource source, fb::ObjectSource source,
const std::shared_ptr<Client> &client, const std::shared_ptr<Client> &client,
bool fallback_allocator, PlasmaObject *result) { bool fallback_allocator, PlasmaObject *result) {
RAY_LOG(DEBUG) << "attempting to create object " << object_info.object_id << " size " auto pair = object_lifecycle_mgr_.CreateObject(object_info, source, fallback_allocator);
<< object_info.data_size; auto entry = pair.first;
auto error = pair.second;
if (object_store_.GetObject(object_info.object_id) != nullptr) {
return PlasmaError::ObjectExists;
}
auto entry = CreateObjectInternal(object_info, source, fallback_allocator);
{
// TODO(scv119) use RAY_LOG_EVERY_MS
auto now = absl::GetCurrentTimeNanos();
if (now - last_usage_log_ns_ > usage_log_interval_ns_) {
RAY_LOG(INFO) << "Object store current usage " << (allocator_.Allocated() / 1e9)
<< " / " << (allocator_.GetFootprintLimit() / 1e9) << " GB.";
last_usage_log_ns_ = now;
}
}
if (entry == nullptr) { if (entry == nullptr) {
return PlasmaError::OutOfMemory; return error;
} }
RAY_LOG(DEBUG) << "create object " << object_info.object_id << " succeeded";
ToPlasmaObject(*entry, result, /* check sealed */ false); ToPlasmaObject(*entry, result, /* check sealed */ false);
// Notify the eviction policy that this object was created. This must be done
// immediately before the call to AddToClientObjectIds so that the
// eviction policy does not have an opportunity to evict the object.
eviction_policy_.ObjectCreated(object_info.object_id, true);
// Record that this client is using this object. // Record that this client is using this object.
AddToClientObjectIds(object_info.object_id, entry, client); AddToClientObjectIds(object_info.object_id, client);
return PlasmaError::OK; return PlasmaError::OK;
} }
@ -387,7 +309,7 @@ void PlasmaStore::ReturnFromGet(const std::shared_ptr<GetRequest> &get_req) {
std::vector<MEMFD_TYPE> store_fds; std::vector<MEMFD_TYPE> store_fds;
std::vector<int64_t> mmap_sizes; std::vector<int64_t> mmap_sizes;
for (const auto &object_id : get_req->object_ids) { for (const auto &object_id : get_req->object_ids) {
PlasmaObject &object = get_req->objects[object_id]; const PlasmaObject &object = get_req->objects[object_id];
MEMFD_TYPE fd = object.store_fd; MEMFD_TYPE fd = object.store_fd;
if (object.data_size != -1 && fds_to_send.count(fd) == 0 && fd.first != INVALID_FD) { if (object.data_size != -1 && fds_to_send.count(fd) == 0 && fd.first != INVALID_FD) {
fds_to_send.insert(fd); fds_to_send.insert(fd);
@ -422,7 +344,7 @@ void PlasmaStore::ReturnFromGet(const std::shared_ptr<GetRequest> &get_req) {
RemoveGetRequest(get_req); RemoveGetRequest(get_req);
} }
void PlasmaStore::UpdateObjectGetRequests(const ObjectID &object_id) { void PlasmaStore::NotifyObjectSealedToGetRequests(const ObjectID &object_id) {
auto it = object_get_requests_.find(object_id); auto it = object_get_requests_.find(object_id);
// If there are no get requests involving this object, then return. // If there are no get requests involving this object, then return.
if (it == object_get_requests_.end()) { if (it == object_get_requests_.end()) {
@ -437,13 +359,13 @@ void PlasmaStore::UpdateObjectGetRequests(const ObjectID &object_id) {
size_t num_requests = get_requests.size(); size_t num_requests = get_requests.size();
for (size_t i = 0; i < num_requests; ++i) { for (size_t i = 0; i < num_requests; ++i) {
auto get_req = get_requests[index]; auto get_req = get_requests[index];
auto entry = object_store_.GetObject(object_id); auto entry = object_lifecycle_mgr_.GetObject(object_id);
RAY_CHECK(entry != nullptr); RAY_CHECK(entry != nullptr);
ToPlasmaObject(*entry, &get_req->objects[object_id], /* check sealed */ true); ToPlasmaObject(*entry, &get_req->objects[object_id], /* check sealed */ true);
get_req->num_satisfied += 1; get_req->num_satisfied += 1;
// Record the fact that this client will be using this object and will // Record the fact that this client will be using this object and will
// be responsible for releasing this object. // be responsible for releasing this object.
AddToClientObjectIds(object_id, entry, get_req->client); AddToClientObjectIds(object_id, get_req->client);
// If this get request is done, reply to the client. // If this get request is done, reply to the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for) { if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
@ -474,14 +396,14 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr<Client> &client,
for (auto object_id : object_ids) { for (auto object_id : object_ids) {
// Check if this object is already present // Check if this object is already present
// locally. If so, record that the object is being used and mark it as accounted for. // locally. If so, record that the object is being used and mark it as accounted for.
auto entry = object_store_.GetObject(object_id); auto entry = object_lifecycle_mgr_.GetObject(object_id);
if (entry && entry->state == ObjectState::PLASMA_SEALED) { if (entry && entry->Sealed()) {
// Update the get request to take into account the present object. // Update the get request to take into account the present object.
ToPlasmaObject(*entry, &get_req->objects[object_id], /* checksealed */ true); ToPlasmaObject(*entry, &get_req->objects[object_id], /* checksealed */ true);
get_req->num_satisfied += 1; get_req->num_satisfied += 1;
// If necessary, record that this client is using this object. In the case // If necessary, record that this client is using this object. In the case
// where entry == NULL, this will be called from SealObject. // where entry == NULL, this will be called from SealObject.
AddToClientObjectIds(object_id, entry, client); AddToClientObjectIds(object_id, client);
} else { } else {
// Add a placeholder plasma object to the get request to indicate that the // Add a placeholder plasma object to the get request to indicate that the
// object is not present. This will be parsed by the client. We set the // object is not present. This will be parsed by the client. We set the
@ -509,31 +431,13 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr<Client> &client,
} }
int PlasmaStore::RemoveFromClientObjectIds(const ObjectID &object_id, int PlasmaStore::RemoveFromClientObjectIds(const ObjectID &object_id,
const LocalObject *entry,
const std::shared_ptr<Client> &client) { const std::shared_ptr<Client> &client) {
auto it = client->object_ids.find(object_id); auto it = client->object_ids.find(object_id);
if (it != client->object_ids.end()) { if (it != client->object_ids.end()) {
client->object_ids.erase(it); client->object_ids.erase(it);
// Decrease reference count.
entry->ref_count--;
RAY_LOG(DEBUG) << "Object " << object_id << " no longer in use by client"; RAY_LOG(DEBUG) << "Object " << object_id << " no longer in use by client";
// Decrease reference count.
// If no more clients are using this object, notify the eviction policy object_lifecycle_mgr_.RemoveReference(object_id);
// that the object is no longer being used.
if (entry->ref_count == 0) {
num_bytes_in_use_ -= entry->GetObjectSize();
RAY_LOG(DEBUG) << "Releasing object no longer in use " << object_id
<< ", num bytes in use is now " << num_bytes_in_use_;
if (deletion_cache_.count(object_id) == 0) {
// Tell the eviction policy that this object is no longer being used.
eviction_policy_.EndObjectAccess(object_id);
} else {
// Above code does not really delete an object. Instead, it just put an
// object to LRU cache which will be cleaned when the memory is not enough.
deletion_cache_.erase(object_id);
EvictObjects({object_id});
}
}
// Return 1 to indicate that the client was removed. // Return 1 to indicate that the client was removed.
return 1; return 1;
} else { } else {
@ -542,123 +446,39 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID &object_id,
} }
} }
void PlasmaStore::EraseFromObjectTable(const ObjectID &object_id) {
auto object = object_store_.GetObject(object_id);
if (object == nullptr) {
RAY_LOG(WARNING) << object_id << " has already been deleted.";
return;
}
RAY_CHECK(object->allocation.device_num == 0)
<< object_id << "'s device_num is " << object->allocation.device_num
<< "but CUDA not enabled";
RAY_LOG(DEBUG) << "Erasing object: " << object_id
<< ", address: " << static_cast<void *>(object->allocation.address)
<< ", size:" << object->GetObjectSize();
if (object->ref_count > 0) {
// A client was using this object.
num_bytes_in_use_ -= object->GetObjectSize();
RAY_LOG(DEBUG) << "Erasing object " << object_id << " with nonzero ref count"
<< object_id << ", num bytes in use is now " << num_bytes_in_use_;
}
object_store_.DeleteObject(object_id);
}
void PlasmaStore::ReleaseObject(const ObjectID &object_id, void PlasmaStore::ReleaseObject(const ObjectID &object_id,
const std::shared_ptr<Client> &client) { const std::shared_ptr<Client> &client) {
auto entry = object_store_.GetObject(object_id); auto entry = object_lifecycle_mgr_.GetObject(object_id);
RAY_CHECK(entry != nullptr); RAY_CHECK(entry != nullptr);
// Remove the client from the object's array of clients. // Remove the client from the object's array of clients.
RAY_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1); RAY_CHECK(RemoveFromClientObjectIds(object_id, client) == 1);
}
// Check if an object is present.
ObjectStatus PlasmaStore::ContainsObject(const ObjectID &object_id) {
auto entry = object_store_.GetObject(object_id);
return entry && entry->state == ObjectState::PLASMA_SEALED
? ObjectStatus::OBJECT_FOUND
: ObjectStatus::OBJECT_NOT_FOUND;
} }
void PlasmaStore::SealObjects(const std::vector<ObjectID> &object_ids) { void PlasmaStore::SealObjects(const std::vector<ObjectID> &object_ids) {
for (size_t i = 0; i < object_ids.size(); ++i) { for (size_t i = 0; i < object_ids.size(); ++i) {
RAY_LOG(DEBUG) << "sealing object " << object_ids[i]; RAY_LOG(DEBUG) << "sealing object " << object_ids[i];
auto entry = object_store_.SealObject(object_ids[i]); auto entry = object_lifecycle_mgr_.SealObject(object_ids[i]);
RAY_CHECK(entry) << object_ids[i] << " doesn't exist or has already been sealed."; RAY_CHECK(entry) << object_ids[i] << " is missing or not sealed.";
add_object_callback_(entry->object_info); add_object_callback_(entry->GetObjectInfo());
} }
for (size_t i = 0; i < object_ids.size(); ++i) { for (size_t i = 0; i < object_ids.size(); ++i) {
UpdateObjectGetRequests(object_ids[i]); NotifyObjectSealedToGetRequests(object_ids[i]);
} }
} }
int PlasmaStore::AbortObject(const ObjectID &object_id, int PlasmaStore::AbortObject(const ObjectID &object_id,
const std::shared_ptr<Client> &client) { const std::shared_ptr<Client> &client) {
auto entry = object_store_.GetObject(object_id);
RAY_CHECK(entry != nullptr) << "To abort an object it must be in the object table.";
RAY_CHECK(entry->state != ObjectState::PLASMA_SEALED)
<< "To abort an object it must not have been sealed.";
auto it = client->object_ids.find(object_id); auto it = client->object_ids.find(object_id);
if (it == client->object_ids.end()) { if (it == client->object_ids.end()) {
// If the client requesting the abort is not the creator, do not // If the client requesting the abort is not the creator, do not
// perform the abort. // perform the abort.
return 0; return 0;
} else {
// The client requesting the abort is the creator. Free the object.
EraseFromObjectTable(object_id);
client->object_ids.erase(it);
return 1;
}
}
PlasmaError PlasmaStore::DeleteObject(ObjectID &object_id) {
auto entry = object_store_.GetObject(object_id);
// TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been
// created but not sealed.
if (entry == nullptr) {
// To delete an object it must be in the object table.
return PlasmaError::ObjectNonexistent;
}
if (entry->state != ObjectState::PLASMA_SEALED) {
// To delete an object it must have been sealed.
// Put it into deletion cache, it will be deleted later.
deletion_cache_.emplace(object_id);
return PlasmaError::ObjectNotSealed;
}
if (entry->ref_count != 0) {
// To delete an object, there must be no clients currently using it.
// Put it into deletion cache, it will be deleted later.
deletion_cache_.emplace(object_id);
return PlasmaError::ObjectInUse;
}
eviction_policy_.RemoveObject(object_id);
EraseFromObjectTable(object_id);
// Inform all subscribers that the object has been deleted.
delete_object_callback_(object_id);
return PlasmaError::OK;
}
void PlasmaStore::EvictObjects(const std::vector<ObjectID> &object_ids) {
for (const auto &object_id : object_ids) {
RAY_LOG(DEBUG) << "evicting object " << object_id.Hex();
auto entry = object_store_.GetObject(object_id);
// TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been
// created but not sealed.
RAY_CHECK(entry != nullptr) << "To evict an object it must be in the object table.";
RAY_CHECK(entry->state == ObjectState::PLASMA_SEALED)
<< "To evict an object it must have been sealed.";
RAY_CHECK(entry->ref_count == 0)
<< "To evict an object, there must be no clients currently using it.";
// Erase the object entry and send a deletion notification.
EraseFromObjectTable(object_id);
// Inform all subscribers that the object has been deleted.
delete_object_callback_(object_id);
} }
// The client requesting the abort is the creator. Free the object.
RAY_CHECK(object_lifecycle_mgr_.AbortObject(object_id) == PlasmaError::OK);
client->object_ids.erase(it);
return 1;
} }
void PlasmaStore::ConnectClient(const boost::system::error_code &error) { void PlasmaStore::ConnectClient(const boost::system::error_code &error) {
@ -677,19 +497,18 @@ void PlasmaStore::DisconnectClient(const std::shared_ptr<Client> &client) {
// Release all the objects that the client was using. // Release all the objects that the client was using.
std::unordered_map<ObjectID, const LocalObject *> sealed_objects; std::unordered_map<ObjectID, const LocalObject *> sealed_objects;
for (const auto &object_id : client->object_ids) { for (const auto &object_id : client->object_ids) {
auto entry = object_store_.GetObject(object_id); auto entry = object_lifecycle_mgr_.GetObject(object_id);
if (entry == nullptr) { if (entry == nullptr) {
continue; continue;
} }
if (entry->state == ObjectState::PLASMA_SEALED) { if (entry->Sealed()) {
// Add sealed objects to a temporary list of object IDs. Do not perform // Add sealed objects to a temporary list of object IDs. Do not perform
// the remove here, since it potentially modifies the object_ids table. // the remove here, since it potentially modifies the object_ids table.
sealed_objects[object_id] = entry; sealed_objects[object_id] = entry;
} else { } else {
// Abort unsealed object. // Abort unsealed object.
// Don't call AbortObject() because client->object_ids would be modified. object_lifecycle_mgr_.AbortObject(object_id);
EraseFromObjectTable(object_id);
} }
} }
@ -697,7 +516,7 @@ void PlasmaStore::DisconnectClient(const std::shared_ptr<Client> &client) {
RemoveGetRequestsForClient(client); RemoveGetRequestsForClient(client);
for (const auto &entry : sealed_objects) { for (const auto &entry : sealed_objects) {
RemoveFromClientObjectIds(entry.first, entry.second, client); RemoveFromClientObjectIds(entry.first, client);
} }
create_request_queue_.RemoveDisconnectedClientRequests(client); create_request_queue_.RemoveDisconnectedClientRequests(client);
@ -780,13 +599,13 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
RAY_RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids)); RAY_RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids));
error_codes.reserve(object_ids.size()); error_codes.reserve(object_ids.size());
for (auto &object_id : object_ids) { for (auto &object_id : object_ids) {
error_codes.push_back(DeleteObject(object_id)); error_codes.push_back(object_lifecycle_mgr_.DeleteObject(object_id));
} }
RAY_RETURN_NOT_OK(SendDeleteReply(client, object_ids, error_codes)); RAY_RETURN_NOT_OK(SendDeleteReply(client, object_ids, error_codes));
} break; } break;
case fb::MessageType::PlasmaContainsRequest: { case fb::MessageType::PlasmaContainsRequest: {
RAY_RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id)); RAY_RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
if (ContainsObject(object_id) == ObjectStatus::OBJECT_FOUND) { if (object_lifecycle_mgr_.IsObjectSealed(object_id)) {
RAY_RETURN_NOT_OK(SendContainsReply(client, object_id, 1)); RAY_RETURN_NOT_OK(SendContainsReply(client, object_id, 1));
} else { } else {
RAY_RETURN_NOT_OK(SendContainsReply(client, object_id, 0)); RAY_RETURN_NOT_OK(SendContainsReply(client, object_id, 0));
@ -801,10 +620,7 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
// This code path should only be used for testing. // This code path should only be used for testing.
int64_t num_bytes; int64_t num_bytes;
RAY_RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes)); RAY_RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes));
std::vector<ObjectID> objects_to_evict; int64_t num_bytes_evicted = object_lifecycle_mgr_.RequireSpace(num_bytes);
int64_t num_bytes_evicted =
eviction_policy_.ChooseObjectsToEvict(num_bytes, &objects_to_evict);
EvictObjects(objects_to_evict);
RAY_RETURN_NOT_OK(SendEvictReply(client, num_bytes_evicted)); RAY_RETURN_NOT_OK(SendEvictReply(client, num_bytes_evicted));
} break; } break;
case fb::MessageType::PlasmaConnectRequest: { case fb::MessageType::PlasmaConnectRequest: {
@ -816,7 +632,8 @@ Status PlasmaStore::ProcessMessage(const std::shared_ptr<Client> &client,
return Status::Disconnected("The Plasma Store client is disconnected."); return Status::Disconnected("The Plasma Store client is disconnected.");
break; break;
case fb::MessageType::PlasmaGetDebugStringRequest: { case fb::MessageType::PlasmaGetDebugStringRequest: {
RAY_RETURN_NOT_OK(SendGetDebugStringReply(client, eviction_policy_.DebugString())); RAY_RETURN_NOT_OK(SendGetDebugStringReply(
client, object_lifecycle_mgr_.EvictionPolicyDebugString()));
} break; } break;
default: default:
// This code should be unreachable. // This code should be unreachable.
@ -888,12 +705,12 @@ bool PlasmaStore::IsObjectSpillable(const ObjectID &object_id) {
// The lock is acquired when a request is received to the plasma store. // The lock is acquired when a request is received to the plasma store.
// recursive mutex is used here to allow // recursive mutex is used here to allow
std::lock_guard<std::recursive_mutex> guard(mutex_); std::lock_guard<std::recursive_mutex> guard(mutex_);
auto entry = object_store_.GetObject(object_id); auto entry = object_lifecycle_mgr_.GetObject(object_id);
if (!entry) { if (!entry) {
// Object already evicted or deleted. // Object already evicted or deleted.
return false; return false;
} }
return entry->ref_count == 1; return entry->GetRefCount() == 1;
} }
void PlasmaStore::PrintDebugDump() const { void PlasmaStore::PrintDebugDump() const {
@ -909,13 +726,13 @@ std::string PlasmaStore::GetDebugDump() const {
buffer << "========== Plasma store: =================\n"; buffer << "========== Plasma store: =================\n";
buffer << "Current usage: " << (allocator_.Allocated() / 1e9) << " / " buffer << "Current usage: " << (allocator_.Allocated() / 1e9) << " / "
<< (allocator_.GetFootprintLimit() / 1e9) << " GB\n"; << (allocator_.GetFootprintLimit() / 1e9) << " GB\n";
buffer << "- num bytes created total: " << object_store_.GetNumBytesCreatedTotal() buffer << "- num bytes created total: "
<< "\n"; << object_lifecycle_mgr_.GetNumBytesCreatedTotal() << "\n";
auto num_pending_requests = create_request_queue_.NumPendingRequests(); auto num_pending_requests = create_request_queue_.NumPendingRequests();
auto num_pending_bytes = create_request_queue_.NumPendingBytes(); auto num_pending_bytes = create_request_queue_.NumPendingBytes();
buffer << num_pending_requests << " pending objects of total size " buffer << num_pending_requests << " pending objects of total size "
<< num_pending_bytes / 1024 / 1024 << "MB\n"; << num_pending_bytes / 1024 / 1024 << "MB\n";
object_store_.GetDebugDump(buffer); object_lifecycle_mgr_.GetDebugDump(buffer);
return buffer.str(); return buffer.str();
} }

View file

@ -32,6 +32,7 @@
#include "ray/object_manager/plasma/connection.h" #include "ray/object_manager/plasma/connection.h"
#include "ray/object_manager/plasma/create_request_queue.h" #include "ray/object_manager/plasma/create_request_queue.h"
#include "ray/object_manager/plasma/eviction_policy.h" #include "ray/object_manager/plasma/eviction_policy.h"
#include "ray/object_manager/plasma/object_lifecycle_manager.h"
#include "ray/object_manager/plasma/object_store.h" #include "ray/object_manager/plasma/object_store.h"
#include "ray/object_manager/plasma/plasma.h" #include "ray/object_manager/plasma/plasma.h"
#include "ray/object_manager/plasma/plasma_allocator.h" #include "ray/object_manager/plasma/plasma_allocator.h"
@ -106,11 +107,6 @@ class PlasmaStore {
/// - PlasmaError::ObjectInUse, if the object is in use. /// - PlasmaError::ObjectInUse, if the object is in use.
PlasmaError DeleteObject(ObjectID &object_id); PlasmaError DeleteObject(ObjectID &object_id);
/// Evict objects returned by the eviction policy.
///
/// \param object_ids Object IDs of the objects to be evicted.
void EvictObjects(const std::vector<ObjectID> &object_ids);
/// Process a get request from a client. This method assumes that we will /// Process a get request from a client. This method assumes that we will
/// eventually have these objects sealed. If one of the objects has not yet /// eventually have these objects sealed. If one of the objects has not yet
/// been sealed, the client that requested the object will be notified when it /// been sealed, the client that requested the object will be notified when it
@ -132,13 +128,6 @@ class PlasmaStore {
/// \param object_ids The vector of Object IDs of the objects to be sealed. /// \param object_ids The vector of Object IDs of the objects to be sealed.
void SealObjects(const std::vector<ObjectID> &object_ids); void SealObjects(const std::vector<ObjectID> &object_ids);
/// Check if the plasma store contains an object:
///
/// \param object_id Object ID that will be checked.
/// \return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if
/// not
ObjectStatus ContainsObject(const ObjectID &object_id);
/// Record the fact that a particular client is no longer using an object. /// Record the fact that a particular client is no longer using an object.
/// ///
/// \param object_id The object ID of the object that is being released. /// \param object_id The object ID of the object that is being released.
@ -176,17 +165,18 @@ class PlasmaStore {
/// Get the available memory for new objects to be created. This includes /// Get the available memory for new objects to be created. This includes
/// memory that is currently being used for created but unsealed objects. /// memory that is currently being used for created but unsealed objects.
void GetAvailableMemory(std::function<void(size_t)> callback) const { void GetAvailableMemory(std::function<void(size_t)> callback) const {
RAY_CHECK((object_store_.GetNumBytesUnsealed() > 0 && RAY_CHECK((object_lifecycle_mgr_.GetNumBytesUnsealed() > 0 &&
object_store_.GetNumObjectsUnsealed() > 0) || object_lifecycle_mgr_.GetNumObjectsUnsealed() > 0) ||
(object_store_.GetNumBytesUnsealed() == 0 && (object_lifecycle_mgr_.GetNumBytesUnsealed() == 0 &&
object_store_.GetNumObjectsUnsealed() == 0)) object_lifecycle_mgr_.GetNumObjectsUnsealed() == 0))
<< "Tracking for available memory in the plasma store has gone out of sync. " << "Tracking for available memory in the plasma store has gone out of sync. "
"Please file a GitHub issue."; "Please file a GitHub issue.";
RAY_CHECK(num_bytes_in_use_ >= object_store_.GetNumBytesUnsealed()); RAY_CHECK(object_lifecycle_mgr_.GetNumBytesInUse() >=
object_lifecycle_mgr_.GetNumBytesUnsealed());
// We do not count unsealed objects as in use because these may have been // We do not count unsealed objects as in use because these may have been
// created by the object manager. // created by the object manager.
int64_t num_bytes_in_use = int64_t num_bytes_in_use = object_lifecycle_mgr_.GetNumBytesInUse() -
static_cast<int64_t>(num_bytes_in_use_ - object_store_.GetNumBytesUnsealed()); object_lifecycle_mgr_.GetNumBytesUnsealed();
size_t available = 0; size_t available = 0;
if (num_bytes_in_use < allocator_.GetFootprintLimit()) { if (num_bytes_in_use < allocator_.GetFootprintLimit()) {
available = allocator_.GetFootprintLimit() - num_bytes_in_use; available = allocator_.GetFootprintLimit() - num_bytes_in_use;
@ -209,7 +199,7 @@ class PlasmaStore {
void ReplyToCreateClient(const std::shared_ptr<Client> &client, void ReplyToCreateClient(const std::shared_ptr<Client> &client,
const ObjectID &object_id, uint64_t req_id); const ObjectID &object_id, uint64_t req_id);
void AddToClientObjectIds(const ObjectID &object_id, const LocalObject *entry, void AddToClientObjectIds(const ObjectID &object_id,
const std::shared_ptr<Client> &client); const std::shared_ptr<Client> &client);
/// Remove a GetRequest and clean up the relevant data structures. /// Remove a GetRequest and clean up the relevant data structures.
@ -224,17 +214,11 @@ class PlasmaStore {
void ReturnFromGet(const std::shared_ptr<GetRequest> &get_req); void ReturnFromGet(const std::shared_ptr<GetRequest> &get_req);
void UpdateObjectGetRequests(const ObjectID &object_id); void NotifyObjectSealedToGetRequests(const ObjectID &object_id);
int RemoveFromClientObjectIds(const ObjectID &object_id, const LocalObject *entry, int RemoveFromClientObjectIds(const ObjectID &object_id,
const std::shared_ptr<Client> &client); const std::shared_ptr<Client> &client);
void EraseFromObjectTable(const ObjectID &object_id);
const LocalObject *CreateObjectInternal(const ray::ObjectInfo &object_info,
plasma::flatbuf::ObjectSource source,
bool allow_fallback_allocation);
// Start listening for clients. // Start listening for clients.
void DoAccept(); void DoAccept();
@ -249,9 +233,6 @@ class PlasmaStore {
/// The allocator that allocates mmaped memory. /// The allocator that allocates mmaped memory.
IAllocator &allocator_; IAllocator &allocator_;
/// The object store stores created objects. /// The object store stores created objects.
ObjectStore object_store_;
/// The state that is managed by the eviction policy.
EvictionPolicy eviction_policy_;
/// A hash table mapping object IDs to a vector of the get requests that are /// A hash table mapping object IDs to a vector of the get requests that are
/// waiting for the object to arrive. /// waiting for the object to arrive.
std::unordered_map<ObjectID, std::vector<std::shared_ptr<GetRequest>>> std::unordered_map<ObjectID, std::vector<std::shared_ptr<GetRequest>>>
@ -276,6 +257,8 @@ class PlasmaStore {
/// shared with the main raylet thread. /// shared with the main raylet thread.
const ray::DeleteObjectCallback delete_object_callback_; const ray::DeleteObjectCallback delete_object_callback_;
ObjectLifecycleManager object_lifecycle_mgr_;
/// The amount of time to wait before retrying a creation request after an /// The amount of time to wait before retrying a creation request after an
/// OOM error. /// OOM error.
const uint32_t delay_on_oom_ms_; const uint32_t delay_on_oom_ms_;
@ -283,12 +266,6 @@ class PlasmaStore {
/// The percentage of object store memory used above which spilling is triggered. /// The percentage of object store memory used above which spilling is triggered.
const float object_spilling_threshold_; const float object_spilling_threshold_;
/// The amount of time to wait between logging space usage debug messages.
const uint64_t usage_log_interval_ns_;
/// The last time space usage was logged.
uint64_t last_usage_log_ns_ = 0;
/// A timer that is set when the first request in the queue is not /// A timer that is set when the first request in the queue is not
/// serviceable because there is not enough memory. The request will be /// serviceable because there is not enough memory. The request will be
/// retried when this timer expires. /// retried when this timer expires.
@ -308,11 +285,6 @@ class PlasmaStore {
/// mutex if it is not absolutely necessary. /// mutex if it is not absolutely necessary.
std::recursive_mutex mutex_; std::recursive_mutex mutex_;
/// Total number of bytes allocated to objects that are in use by any client.
/// This includes objects that are being created and objects that a client
/// called get on.
int64_t num_bytes_in_use_ = 0;
/// Total plasma object bytes that are consumed by core workers. /// Total plasma object bytes that are consumed by core workers.
int64_t total_consumed_bytes_ = 0; int64_t total_consumed_bytes_ = 0;

View file

@ -0,0 +1,337 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <limits>
#include "absl/random/random.h"
#include "absl/strings/str_format.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "ray/object_manager/plasma/object_lifecycle_manager.h"
using namespace ray;
using namespace testing;
namespace plasma {
class MockEvictionPolicy : public IEvictionPolicy {
public:
MOCK_METHOD2(ObjectCreated, void(const ObjectID &, bool));
MOCK_METHOD2(RequireSpace, int64_t(int64_t, std::vector<ObjectID> *));
MOCK_METHOD1(BeginObjectAccess, void(const ObjectID &));
MOCK_METHOD1(EndObjectAccess, void(const ObjectID &));
MOCK_METHOD2(ChooseObjectsToEvict, int64_t(int64_t, std::vector<ObjectID> *));
MOCK_METHOD1(RemoveObject, void(const ObjectID &));
MOCK_CONST_METHOD0(DebugString, std::string());
};
class MockObjectStore : public IObjectStore {
public:
MOCK_METHOD3(CreateObject, const LocalObject *(const ray::ObjectInfo &,
plasma::flatbuf::ObjectSource, bool));
MOCK_CONST_METHOD1(GetObject, const LocalObject *(const ObjectID &));
MOCK_METHOD1(SealObject, const LocalObject *(const ObjectID &));
MOCK_METHOD1(DeleteObject, bool(const ObjectID &));
MOCK_CONST_METHOD0(GetNumBytesCreatedTotal, int64_t());
MOCK_CONST_METHOD0(GetNumBytesUnsealed, int64_t());
MOCK_CONST_METHOD0(GetNumObjectsUnsealed, int64_t());
MOCK_CONST_METHOD1(GetDebugDump, void(std::stringstream &buffer));
};
struct ObjectLifecycleManagerTest : public Test {
void SetUp() override {
Test::SetUp();
auto eviction_policy = std::make_unique<MockEvictionPolicy>();
auto object_store = std::make_unique<MockObjectStore>();
eviction_policy_ = eviction_policy.get();
object_store_ = object_store.get();
manager_ = std::make_unique<ObjectLifecycleManager>(
ObjectLifecycleManager(std::move(object_store), std::move(eviction_policy),
[this](auto &id) { notify_deleted_ids_.push_back(id); }));
sealed_object_.state = ObjectState::PLASMA_SEALED;
not_sealed_object_.state = ObjectState::PLASMA_CREATED;
one_ref_object_.state = ObjectState::PLASMA_SEALED;
one_ref_object_.ref_count = 1;
two_ref_object_.state = ObjectState::PLASMA_SEALED;
two_ref_object_.ref_count = 2;
}
MockEvictionPolicy *eviction_policy_;
MockObjectStore *object_store_;
std::unique_ptr<ObjectLifecycleManager> manager_;
std::vector<ObjectID> notify_deleted_ids_;
LocalObject object1_{Allocation()};
LocalObject object2_{Allocation()};
LocalObject sealed_object_{Allocation()};
LocalObject not_sealed_object_{Allocation()};
LocalObject one_ref_object_{Allocation()};
LocalObject two_ref_object_{Allocation()};
ObjectID id1_ = ObjectID::FromRandom();
ObjectID id2_ = ObjectID::FromRandom();
ObjectID id3_ = ObjectID::FromRandom();
};
TEST_F(ObjectLifecycleManagerTest, CreateObjectExists) {
EXPECT_CALL(*object_store_, GetObject(_)).Times(1).WillOnce(Return(&object1_));
auto expected = std::pair<const LocalObject *, flatbuf::PlasmaError>(
nullptr, flatbuf::PlasmaError::ObjectExists);
auto result = manager_->CreateObject({}, {}, /*falback*/ false);
EXPECT_EQ(expected, result);
}
TEST_F(ObjectLifecycleManagerTest, CreateObjectSuccess) {
EXPECT_CALL(*object_store_, GetObject(_)).Times(1).WillOnce(Return(nullptr));
EXPECT_CALL(*object_store_, CreateObject(_, _, false))
.Times(1)
.WillOnce(Return(&object1_));
auto expected = std::pair<const LocalObject *, flatbuf::PlasmaError>(
&object1_, flatbuf::PlasmaError::OK);
auto result = manager_->CreateObject({}, {}, /*falback*/ false);
EXPECT_EQ(expected, result);
}
TEST_F(ObjectLifecycleManagerTest, CreateObjectTriggerGC) {
EXPECT_CALL(*object_store_, GetObject(_))
.Times(3)
.WillOnce(Return(nullptr))
// called during eviction.
.WillOnce(Return(&sealed_object_))
.WillOnce(Return(&sealed_object_));
EXPECT_CALL(*object_store_, CreateObject(_, _, false))
.Times(2)
.WillOnce(Return(nullptr))
// once eviction finishes, createobject is called again.
.WillOnce(Return(&object1_));
// gc returns object to evict
EXPECT_CALL(*eviction_policy_, RequireSpace(_, _))
.Times(1)
.WillOnce(Invoke([&](auto size, auto &to_evict) {
to_evict->push_back(id1_);
return 0;
}));
// eviction
EXPECT_CALL(*object_store_, DeleteObject(id1_)).Times(1).WillOnce(Return(true));
EXPECT_CALL(*eviction_policy_, RemoveObject(id1_)).Times(1).WillOnce(Return());
auto expected = std::pair<const LocalObject *, flatbuf::PlasmaError>(
&object1_, flatbuf::PlasmaError::OK);
auto result = manager_->CreateObject({}, {}, /*falback*/ false);
EXPECT_EQ(expected, result);
// evicton is notified.
std::vector<ObjectID> expect_notified_ids{id1_};
EXPECT_EQ(expect_notified_ids, notify_deleted_ids_);
}
TEST_F(ObjectLifecycleManagerTest, CreateObjectTriggerGCExhaused) {
EXPECT_CALL(*object_store_, GetObject(_)).Times(1).WillOnce(Return(nullptr));
EXPECT_CALL(*object_store_, CreateObject(_, _, false))
.Times(11)
.WillRepeatedly(Return(nullptr));
EXPECT_CALL(*eviction_policy_, RequireSpace(_, _)).Times(11).WillRepeatedly(Return(0));
EXPECT_CALL(*object_store_, CreateObject(_, _, true))
.Times(1)
.WillOnce(Return(&object1_));
auto expected = std::pair<const LocalObject *, flatbuf::PlasmaError>(
&object1_, flatbuf::PlasmaError::OK);
auto result = manager_->CreateObject({}, {}, /*falback*/ true);
EXPECT_EQ(expected, result);
}
TEST_F(ObjectLifecycleManagerTest, CreateObjectWithoutFallback) {
EXPECT_CALL(*object_store_, GetObject(_)).Times(1).WillOnce(Return(nullptr));
EXPECT_CALL(*object_store_, CreateObject(_, _, false))
.Times(1)
.WillOnce(Return(nullptr));
// evict failed;
EXPECT_CALL(*eviction_policy_, RequireSpace(_, _)).Times(1).WillOnce(Return(1));
auto expected = std::pair<const LocalObject *, flatbuf::PlasmaError>(
nullptr, flatbuf::PlasmaError::OutOfMemory);
auto result = manager_->CreateObject({}, {}, /*falback*/ false);
EXPECT_EQ(expected, result);
}
TEST_F(ObjectLifecycleManagerTest, CreateObjectWithFallback) {
EXPECT_CALL(*object_store_, GetObject(_)).Times(1).WillOnce(Return(nullptr));
EXPECT_CALL(*object_store_, CreateObject(_, _, false))
.Times(1)
.WillOnce(Return(nullptr));
EXPECT_CALL(*eviction_policy_, RequireSpace(_, _)).Times(1).WillOnce(Return(1));
EXPECT_CALL(*object_store_, CreateObject(_, _, true))
.Times(1)
.WillOnce(Return(&object1_));
auto expected = std::pair<const LocalObject *, flatbuf::PlasmaError>(
&object1_, flatbuf::PlasmaError::OK);
auto result = manager_->CreateObject({}, {}, /*falback*/ true);
EXPECT_EQ(expected, result);
}
TEST_F(ObjectLifecycleManagerTest, CreateObjectWithFallbackFailed) {
EXPECT_CALL(*object_store_, GetObject(_)).Times(1).WillOnce(Return(nullptr));
EXPECT_CALL(*object_store_, CreateObject(_, _, false))
.Times(1)
.WillOnce(Return(nullptr));
EXPECT_CALL(*eviction_policy_, RequireSpace(_, _)).Times(1).WillOnce(Return(1));
EXPECT_CALL(*object_store_, CreateObject(_, _, true))
.Times(1)
.WillOnce(Return(nullptr));
auto expected = std::pair<const LocalObject *, flatbuf::PlasmaError>(
nullptr, flatbuf::PlasmaError::OutOfMemory);
auto result = manager_->CreateObject({}, {}, /*falback*/ true);
EXPECT_EQ(expected, result);
}
TEST_F(ObjectLifecycleManagerTest, GetObject) {
EXPECT_CALL(*object_store_, GetObject(id1_)).Times(1).WillOnce(Return(&object2_));
EXPECT_EQ(&object2_, manager_->GetObject(id1_));
}
TEST_F(ObjectLifecycleManagerTest, SealObject) {
EXPECT_CALL(*object_store_, SealObject(id1_)).Times(1).WillOnce(Return(&object2_));
EXPECT_EQ(&object2_, manager_->SealObject(id1_));
}
TEST_F(ObjectLifecycleManagerTest, AbortFailure) {
EXPECT_CALL(*object_store_, GetObject(id1_)).Times(1).WillOnce(Return(nullptr));
EXPECT_CALL(*object_store_, GetObject(id2_)).Times(1).WillOnce(Return(&sealed_object_));
EXPECT_EQ(manager_->AbortObject(id1_), flatbuf::PlasmaError::ObjectNonexistent);
EXPECT_EQ(manager_->AbortObject(id2_), flatbuf::PlasmaError::ObjectSealed);
}
TEST_F(ObjectLifecycleManagerTest, AbortSuccess) {
EXPECT_CALL(*object_store_, GetObject(id3_))
.Times(2)
.WillRepeatedly(Return(&not_sealed_object_));
EXPECT_CALL(*object_store_, DeleteObject(id3_)).Times(1).WillOnce(Return(true));
EXPECT_CALL(*eviction_policy_, RemoveObject(id3_)).Times(1).WillOnce(Return());
EXPECT_EQ(manager_->AbortObject(id3_), flatbuf::PlasmaError::OK);
// aborted object is not notified.
EXPECT_TRUE(notify_deleted_ids_.empty());
}
TEST_F(ObjectLifecycleManagerTest, DeleteFailure) {
EXPECT_CALL(*object_store_, GetObject(id1_)).Times(1).WillOnce(Return(nullptr));
EXPECT_EQ(flatbuf::PlasmaError::ObjectNonexistent, manager_->DeleteObject(id1_));
{
EXPECT_CALL(*object_store_, GetObject(id2_))
.Times(1)
.WillOnce(Return(&not_sealed_object_));
EXPECT_EQ(flatbuf::PlasmaError::ObjectNotSealed, manager_->DeleteObject(id2_));
absl::flat_hash_set<ObjectID> expected_eagerly_deletion_objects{id2_};
EXPECT_EQ(expected_eagerly_deletion_objects, manager_->earger_deletion_objects_);
}
{
manager_->earger_deletion_objects_.clear();
EXPECT_CALL(*object_store_, GetObject(id3_))
.Times(1)
.WillOnce(Return(&one_ref_object_));
EXPECT_EQ(flatbuf::PlasmaError::ObjectInUse, manager_->DeleteObject(id3_));
absl::flat_hash_set<ObjectID> expected_eagerly_deletion_objects{id3_};
EXPECT_EQ(expected_eagerly_deletion_objects, manager_->earger_deletion_objects_);
}
}
TEST_F(ObjectLifecycleManagerTest, DeleteSuccess) {
EXPECT_CALL(*object_store_, GetObject(id1_))
.Times(2)
.WillRepeatedly(Return(&sealed_object_));
EXPECT_CALL(*object_store_, DeleteObject(id1_)).Times(1).WillOnce(Return(true));
EXPECT_CALL(*eviction_policy_, RemoveObject(id1_)).Times(1).WillOnce(Return());
EXPECT_EQ(flatbuf::PlasmaError::OK, manager_->DeleteObject(id1_));
std::vector<ObjectID> expect_notified_ids{id1_};
EXPECT_EQ(expect_notified_ids, notify_deleted_ids_);
}
TEST_F(ObjectLifecycleManagerTest, AddReference) {
{
EXPECT_CALL(*object_store_, GetObject(id1_)).Times(1).WillOnce(Return(nullptr));
EXPECT_FALSE(manager_->AddReference(id1_));
}
{
EXPECT_CALL(*object_store_, GetObject(id2_)).Times(1).WillOnce(Return(&object1_));
EXPECT_CALL(*eviction_policy_, BeginObjectAccess(id2_)).Times(1).WillOnce(Return());
EXPECT_TRUE(manager_->AddReference(id2_));
EXPECT_EQ(1, object1_.GetRefCount());
}
{
EXPECT_CALL(*object_store_, GetObject(id3_))
.Times(1)
.WillOnce(Return(&one_ref_object_));
EXPECT_TRUE(manager_->AddReference(id3_));
EXPECT_EQ(2, one_ref_object_.GetRefCount());
}
}
TEST_F(ObjectLifecycleManagerTest, RemoveReferenceFailure) {
{
EXPECT_CALL(*object_store_, GetObject(id1_)).Times(1).WillOnce(Return(nullptr));
EXPECT_FALSE(manager_->RemoveReference(id1_));
}
{
EXPECT_CALL(*object_store_, GetObject(id2_)).Times(1).WillOnce(Return(&object1_));
EXPECT_FALSE(manager_->RemoveReference(id2_));
}
}
TEST_F(ObjectLifecycleManagerTest, RemoveReferenceTwoRef) {
EXPECT_CALL(*object_store_, GetObject(id1_))
.Times(1)
.WillOnce(Return(&two_ref_object_));
EXPECT_TRUE(manager_->RemoveReference(id1_));
EXPECT_EQ(1, two_ref_object_.GetRefCount());
}
TEST_F(ObjectLifecycleManagerTest, RemoveReferenceOneRefSealed) {
EXPECT_TRUE(one_ref_object_.Sealed());
EXPECT_CALL(*object_store_, GetObject(id1_))
.Times(1)
.WillOnce(Return(&one_ref_object_));
EXPECT_CALL(*eviction_policy_, EndObjectAccess(id1_)).Times(1).WillOnce(Return());
EXPECT_TRUE(manager_->RemoveReference(id1_));
EXPECT_EQ(0, one_ref_object_.GetRefCount());
}
TEST_F(ObjectLifecycleManagerTest, RemoveReferenceOneRefEagerlyDeletion) {
manager_->earger_deletion_objects_.emplace(id1_);
EXPECT_CALL(*object_store_, GetObject(id1_))
.Times(2)
.WillRepeatedly(Return(&one_ref_object_));
EXPECT_CALL(*eviction_policy_, EndObjectAccess(id1_)).Times(1).WillOnce(Return());
EXPECT_CALL(*object_store_, DeleteObject(id1_)).Times(1).WillOnce(Return(true));
EXPECT_CALL(*eviction_policy_, RemoveObject(id1_)).Times(1).WillOnce(Return());
EXPECT_TRUE(manager_->RemoveReference(id1_));
EXPECT_EQ(0, one_ref_object_.GetRefCount());
std::vector<ObjectID> expect_notified_ids{id1_};
EXPECT_EQ(expect_notified_ids, notify_deleted_ids_);
}
} // namespace plasma
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View file

@ -30,13 +30,11 @@ T Random(T max = std::numeric_limits<T>::max()) {
return absl::Uniform(bitgen, 0, max); return absl::Uniform(bitgen, 0, max);
} }
Allocation CreateAllocation(int64_t size) { Allocation CreateAllocation(Allocation alloc, int64_t size) {
return Allocation( alloc.size = size;
/* address */ nullptr, size, alloc.offset = Random<ptrdiff_t>();
/* fd */ MEMFD_TYPE(), alloc.mmap_size = Random<int64_t>();
/* offset */ Random<ptrdiff_t>(), return alloc;
/* device_num */ 0,
/* mmap_size */ Random<int64_t>());
} }
const std::string Serialize(const Allocation &allocation) { const std::string Serialize(const Allocation &allocation) {
@ -82,7 +80,7 @@ TEST(ObjectStoreTest, PassThroughTest) {
ObjectStore store(allocator); ObjectStore store(allocator);
{ {
auto info = CreateObjectInfo(kId1, 10); auto info = CreateObjectInfo(kId1, 10);
auto allocation = CreateAllocation(10); auto allocation = CreateAllocation(Allocation(), 10);
auto alloc_str = Serialize(allocation); auto alloc_str = Serialize(allocation);
EXPECT_CALL(allocator, Allocate(10)).Times(1).WillOnce(Invoke([&](size_t bytes) { EXPECT_CALL(allocator, Allocate(10)).Times(1).WillOnce(Invoke([&](size_t bytes) {
@ -134,7 +132,7 @@ TEST(ObjectStoreTest, PassThroughTest) {
} }
{ {
auto allocation = CreateAllocation(12); auto allocation = CreateAllocation(Allocation(), 12);
auto alloc_str = Serialize(allocation); auto alloc_str = Serialize(allocation);
auto info = CreateObjectInfo(kId2, 12); auto info = CreateObjectInfo(kId2, 12);
// allocation failure // allocation failure