[Core][Remove JVM FullGC 1/N] Add allocator to in-memory store. (#21250)

According to the description of #21218 , in this PR, we support the ability specifying a frontend-defined in-memory object allocator. So that we can specify an allocator to allocate the buffers from JVM heap. This is the basic functionality for the next PR #21441 that the JVM is able to be aware of the memory pressure of the in-memeory store objects.

Note that, if we use a frontend defined allocator, it may break the zerocopy ability. In Java, JVM buffers is in heap and we should copy it to native memory if needed.

Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
This commit is contained in:
Qing Wang 2022-02-24 10:53:59 +08:00 committed by GitHub
parent e15a419028
commit eb9960785b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 134 additions and 8 deletions

View file

@ -45,6 +45,25 @@ class RayObject {
Init(data, metadata, nested_refs, copy_data); Init(data, metadata, nested_refs, copy_data);
} }
/// This constructor creates a ray object instance whose data will be generated
/// by the data factory.
RayObject(const std::shared_ptr<Buffer> &metadata,
const std::vector<rpc::ObjectReference> &nested_refs,
std::function<std::shared_ptr<ray::Buffer>()> data_factory,
bool copy_data = false)
: data_factory_(std::move(data_factory)),
metadata_(metadata),
nested_refs_(nested_refs),
has_data_copy_(copy_data),
creation_time_nanos_(absl::GetCurrentTimeNanos()) {
if (has_data_copy_) {
if (metadata_ && !metadata_->OwnsData()) {
metadata_ = std::make_shared<LocalMemoryBuffer>(
metadata_->Data(), metadata_->Size(), /*copy_data=*/true);
}
}
}
/// Create an Ray error object. It uses msgpack for the serialization format now. /// Create an Ray error object. It uses msgpack for the serialization format now.
/// Ray error objects consist of metadata that indicates the error code (see /// Ray error objects consist of metadata that indicates the error code (see
/// rpc::ErrorType) and the serialized message pack that contains serialized /// rpc::ErrorType) and the serialized message pack that contains serialized
@ -62,7 +81,13 @@ class RayObject {
RayObject(rpc::ErrorType error_type, const rpc::RayErrorInfo *ray_error_info = nullptr); RayObject(rpc::ErrorType error_type, const rpc::RayErrorInfo *ray_error_info = nullptr);
/// Return the data of the ray object. /// Return the data of the ray object.
const std::shared_ptr<Buffer> &GetData() const { return data_; } std::shared_ptr<Buffer> GetData() const {
if (data_factory_ != nullptr) {
return data_factory_();
} else {
return data_;
}
}
/// Return the metadata of the ray object. /// Return the metadata of the ray object.
const std::shared_ptr<Buffer> &GetMetadata() const { return metadata_; } const std::shared_ptr<Buffer> &GetMetadata() const { return metadata_; }
@ -78,7 +103,7 @@ class RayObject {
} }
/// Whether this object has data. /// Whether this object has data.
bool HasData() const { return data_ != nullptr; } bool HasData() const { return data_ != nullptr || data_factory_ != nullptr; }
/// Whether this object has metadata. /// Whether this object has metadata.
bool HasMetadata() const { return metadata_ != nullptr; } bool HasMetadata() const { return metadata_ != nullptr; }
@ -127,6 +152,9 @@ class RayObject {
} }
std::shared_ptr<Buffer> data_; std::shared_ptr<Buffer> data_;
/// The data factory is used to allocate data from the language frontend.
/// Note that, if this is provided, `data_` should be null.
std::function<const std::shared_ptr<Buffer>()> data_factory_ = nullptr;
std::shared_ptr<Buffer> metadata_; std::shared_ptr<Buffer> metadata_;
std::vector<rpc::ObjectReference> nested_refs_; std::vector<rpc::ObjectReference> nested_refs_;
/// Whether this class holds a data copy. /// Whether this class holds a data copy.

View file

@ -147,11 +147,15 @@ CoreWorkerMemoryStore::CoreWorkerMemoryStore(
std::shared_ptr<ReferenceCounter> counter, std::shared_ptr<ReferenceCounter> counter,
std::shared_ptr<raylet::RayletClient> raylet_client, std::shared_ptr<raylet::RayletClient> raylet_client,
std::function<Status()> check_signals, std::function<Status()> check_signals,
std::function<void(const RayObject &)> unhandled_exception_handler) std::function<void(const RayObject &)> unhandled_exception_handler,
std::function<std::shared_ptr<ray::RayObject>(const ray::RayObject &object,
const ObjectID &object_id)>
object_allocator)
: ref_counter_(std::move(counter)), : ref_counter_(std::move(counter)),
raylet_client_(raylet_client), raylet_client_(raylet_client),
check_signals_(check_signals), check_signals_(check_signals),
unhandled_exception_handler_(unhandled_exception_handler) {} unhandled_exception_handler_(unhandled_exception_handler),
object_allocator_(std::move(object_allocator)) {}
void CoreWorkerMemoryStore::GetAsync( void CoreWorkerMemoryStore::GetAsync(
const ObjectID &object_id, std::function<void(std::shared_ptr<RayObject>)> callback) { const ObjectID &object_id, std::function<void(std::shared_ptr<RayObject>)> callback) {
@ -191,10 +195,16 @@ std::shared_ptr<RayObject> CoreWorkerMemoryStore::GetIfExists(const ObjectID &ob
bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) { bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) {
std::vector<std::function<void(std::shared_ptr<RayObject>)>> async_callbacks; std::vector<std::function<void(std::shared_ptr<RayObject>)>> async_callbacks;
auto object_entry = std::make_shared<RayObject>(object.GetData(), object.GetMetadata(), RAY_LOG(DEBUG) << "Putting object into memory store. objectid is " << object_id;
std::shared_ptr<RayObject> object_entry = nullptr;
if (object_allocator_ != nullptr) {
object_entry = object_allocator_(object, object_id);
} else {
object_entry = std::make_shared<RayObject>(object.GetData(), object.GetMetadata(),
object.GetNestedRefs(), true); object.GetNestedRefs(), true);
bool stored_in_direct_memory = true; }
bool stored_in_direct_memory = true;
// TODO(edoakes): we should instead return a flag to the caller to put the object in // TODO(edoakes): we should instead return a flag to the caller to put the object in
// plasma. // plasma.
{ {

View file

@ -51,7 +51,10 @@ class CoreWorkerMemoryStore {
std::shared_ptr<ReferenceCounter> counter = nullptr, std::shared_ptr<ReferenceCounter> counter = nullptr,
std::shared_ptr<raylet::RayletClient> raylet_client = nullptr, std::shared_ptr<raylet::RayletClient> raylet_client = nullptr,
std::function<Status()> check_signals = nullptr, std::function<Status()> check_signals = nullptr,
std::function<void(const RayObject &)> unhandled_exception_handler = nullptr); std::function<void(const RayObject &)> unhandled_exception_handler = nullptr,
std::function<std::shared_ptr<RayObject>(const RayObject &object,
const ObjectID &object_id)>
object_allocator = nullptr);
~CoreWorkerMemoryStore(){}; ~CoreWorkerMemoryStore(){};
/// Put an object with specified ID into object store. /// Put an object with specified ID into object store.
@ -217,6 +220,12 @@ class CoreWorkerMemoryStore {
/// Number of object store memory used by this memory store. (It doesn't include plasma /// Number of object store memory used by this memory store. (It doesn't include plasma
/// store memory usage). /// store memory usage).
int64_t used_object_store_memory_ GUARDED_BY(mu_) = 0; int64_t used_object_store_memory_ GUARDED_BY(mu_) = 0;
/// This lambda is used to allow language frontend to allocate the objects
/// in the memory store.
std::function<std::shared_ptr<RayObject>(const RayObject &object,
const ObjectID &object_id)>
object_allocator_;
}; };
} // namespace core } // namespace core

View file

@ -21,6 +21,19 @@
namespace ray { namespace ray {
namespace core { namespace core {
inline std::shared_ptr<ray::LocalMemoryBuffer> MakeBufferFromString(const uint8_t *data,
size_t data_size) {
auto metadata = const_cast<uint8_t *>(data);
auto meta_buffer =
std::make_shared<ray::LocalMemoryBuffer>(metadata, data_size, /*copy_data=*/true);
return meta_buffer;
}
inline std::shared_ptr<ray::LocalMemoryBuffer> MakeLocalMemoryBufferFromString(
const std::string &str) {
return MakeBufferFromString(reinterpret_cast<const uint8_t *>(str.data()), str.size());
}
TEST(TestMemoryStore, TestReportUnhandledErrors) { TEST(TestMemoryStore, TestReportUnhandledErrors) {
std::vector<std::shared_ptr<RayObject>> results; std::vector<std::shared_ptr<RayObject>> results;
WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0));
@ -124,6 +137,72 @@ TEST(TestMemoryStore, TestMemoryStoreStats) {
ASSERT_EQ(item.used_object_store_memory, expected_item3.used_object_store_memory); ASSERT_EQ(item.used_object_store_memory, expected_item3.used_object_store_memory);
} }
/// A mock manager that manages all test buffers. This mocks
/// that memory pressure is able to be awared.
class MockBufferManager {
public:
int64_t GetBuferPressureInBytes() const { return buffer_pressure_in_bytes_; }
void AcquireMemory(int64_t sz) { buffer_pressure_in_bytes_ += sz; }
void ReleaseMemory(int64_t sz) { buffer_pressure_in_bytes_ -= sz; }
private:
int64_t buffer_pressure_in_bytes_ = 0;
};
class TestBuffer : public Buffer {
public:
explicit TestBuffer(MockBufferManager &manager, std::string data)
: manager_(manager), data_(std::move(data)) {}
uint8_t *Data() const override {
return reinterpret_cast<uint8_t *>(const_cast<char *>(data_.data()));
}
size_t Size() const override { return data_.size(); }
bool OwnsData() const override { return true; }
bool IsPlasmaBuffer() const override { return false; }
const MockBufferManager &GetBufferManager() const { return manager_; }
private:
MockBufferManager &manager_;
std::string data_;
};
TEST(TestMemoryStore, TestObjectAllocator) {
MockBufferManager mock_buffer_manager;
auto my_object_allocator = [&mock_buffer_manager](const ray::RayObject &object,
const ObjectID &object_id) {
auto buf = object.GetData();
mock_buffer_manager.AcquireMemory(buf->Size());
auto data_factory = [&mock_buffer_manager, object]() -> std::shared_ptr<ray::Buffer> {
auto buf = object.GetData();
std::string data(reinterpret_cast<char *>(buf->Data()), buf->Size());
return std::make_shared<TestBuffer>(mock_buffer_manager, data);
};
return std::make_shared<ray::RayObject>(object.GetMetadata(), object.GetNestedRefs(),
std::move(data_factory), /*copy_data=*/true);
};
std::shared_ptr<CoreWorkerMemoryStore> memory_store =
std::make_shared<CoreWorkerMemoryStore>(nullptr, nullptr, nullptr, nullptr,
std::move(my_object_allocator));
const int32_t max_rounds = 1000;
const std::string hello = "hello";
for (auto i = 0; i < max_rounds; ++i) {
auto hello_buffer = MakeLocalMemoryBufferFromString(hello);
std::vector<rpc::ObjectReference> nested_refs;
auto hello_object =
std::make_shared<ray::RayObject>(hello_buffer, nullptr, nested_refs, true);
memory_store->Put(*hello_object, ObjectID::FromRandom());
}
ASSERT_EQ(max_rounds * hello.size(), mock_buffer_manager.GetBuferPressureInBytes());
}
} // namespace core } // namespace core
} // namespace ray } // namespace ray