[Core] Optimize get core worker Stats (#15008)

* in progress.

* Optimize get core worker stats.

* Fix a segfault.

* Addressed code review.

* Update comments.

* Addressed code review.
This commit is contained in:
SangBin Cho 2021-03-31 12:21:53 -07:00 committed by GitHub
parent 4480132229
commit 79a6aa97b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 137 additions and 18 deletions

View file

@ -215,9 +215,11 @@ bool CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_
if (should_add_entry) {
// If there is no existing get request, then add the `RayObject` to map.
objects_.emplace(object_id, object_entry);
EmplaceObjectAndUpdateStats(object_id, object_entry);
} else {
OnErase(object_entry);
// It is equivalent to the object being added and immediately deleted from the
// store.
OnDelete(object_entry);
}
}
@ -283,7 +285,7 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids,
// Clean up the objects if ref counting is off.
if (ref_counter_ == nullptr) {
for (const auto &object_id : ids_to_remove) {
objects_.erase(object_id);
EraseObjectAndUpdateStats(object_id);
}
}
@ -436,8 +438,8 @@ void CoreWorkerMemoryStore::Delete(const absl::flat_hash_set<ObjectID> &object_i
if (it->second->IsInPlasmaError()) {
plasma_ids_to_delete->insert(object_id);
} else {
OnErase(it->second);
objects_.erase(it);
OnDelete(it->second);
EraseObjectAndUpdateStats(object_id);
}
}
}
@ -448,8 +450,8 @@ void CoreWorkerMemoryStore::Delete(const std::vector<ObjectID> &object_ids) {
for (const auto &object_id : object_ids) {
auto it = objects_.find(object_id);
if (it != objects_.end()) {
OnErase(it->second);
objects_.erase(it);
OnDelete(it->second);
EraseObjectAndUpdateStats(object_id);
}
}
}
@ -466,7 +468,7 @@ bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id, bool *in_plasma)
return false;
}
void CoreWorkerMemoryStore::OnErase(std::shared_ptr<RayObject> obj) {
void CoreWorkerMemoryStore::OnDelete(std::shared_ptr<RayObject> obj) {
rpc::ErrorType error_type;
// TODO(ekl) note that this doesn't warn on errors that are stored in plasma.
if (obj->IsException(&error_type) &&
@ -479,17 +481,44 @@ void CoreWorkerMemoryStore::OnErase(std::shared_ptr<RayObject> obj) {
}
}
inline void CoreWorkerMemoryStore::EraseObjectAndUpdateStats(const ObjectID &object_id) {
auto it = objects_.find(object_id);
if (it == objects_.end()) {
return;
}
if (it->second->IsInPlasmaError()) {
num_in_plasma_ -= 1;
} else {
num_local_objects_ -= 1;
used_object_store_memory_ -= it->second->GetSize();
}
RAY_CHECK(num_in_plasma_ >= 0 && num_local_objects_ >= 0 &&
used_object_store_memory_ >= 0);
objects_.erase(it);
}
inline void CoreWorkerMemoryStore::EmplaceObjectAndUpdateStats(
const ObjectID &object_id, std::shared_ptr<RayObject> &object_entry) {
auto inserted = objects_.emplace(object_id, object_entry).second;
if (inserted) {
if (object_entry->IsInPlasmaError()) {
num_in_plasma_ += 1;
} else {
num_local_objects_ += 1;
used_object_store_memory_ += object_entry->GetSize();
}
}
RAY_CHECK(num_in_plasma_ >= 0 && num_local_objects_ >= 0 &&
used_object_store_memory_ >= 0);
}
MemoryStoreStats CoreWorkerMemoryStore::GetMemoryStoreStatisticalData() {
absl::MutexLock lock(&mu_);
MemoryStoreStats item;
for (const auto &it : objects_) {
if (it.second->IsInPlasmaError()) {
item.num_in_plasma += 1;
} else {
item.num_local_objects += 1;
item.used_object_store_memory += it.second->GetSize();
}
}
item.num_in_plasma = num_in_plasma_;
item.num_local_objects = num_local_objects_;
item.used_object_store_memory = used_object_store_memory_;
return item;
}

View file

@ -1,5 +1,7 @@
#pragma once
#include <gtest/gtest_prod.h>
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/synchronization/mutex.h"
@ -136,6 +138,8 @@ class CoreWorkerMemoryStore {
uint64_t UsedMemory();
private:
FRIEND_TEST(TestMemoryStore, TestMemoryStoreStats);
/// See the public version of `Get` for meaning of the other arguments.
/// \param[in] abort_if_any_object_is_exception Whether we should abort if any object
/// resources. is an exception.
@ -144,8 +148,17 @@ class CoreWorkerMemoryStore {
std::vector<std::shared_ptr<RayObject>> *results,
bool abort_if_any_object_is_exception);
/// Called when an object is erased from the store.
void OnErase(std::shared_ptr<RayObject> obj);
/// Called when an object is deleted from the store.
void OnDelete(std::shared_ptr<RayObject> obj);
/// Emplace the given object entry to the in-memory-store and update stats properly.
void EmplaceObjectAndUpdateStats(const ObjectID &object_id,
std::shared_ptr<RayObject> &object_entry)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Erase the object of the object id from the in memory store and update stats
/// properly.
void EraseObjectAndUpdateStats(const ObjectID &object_id) EXCLUSIVE_LOCKS_REQUIRED(mu_);
/// Optional callback for putting objects into the plasma store.
std::function<void(const RayObject &, const ObjectID &)> store_in_plasma_;
@ -164,6 +177,8 @@ class CoreWorkerMemoryStore {
absl::flat_hash_set<ObjectID> promoted_to_plasma_ GUARDED_BY(mu_);
/// Map from object ID to `RayObject`.
/// NOTE: This map should be modified by EmplaceObjectAndUpdateStats and
/// EraseObjectAndUpdateStats.
absl::flat_hash_map<ObjectID, std::shared_ptr<RayObject>> objects_ GUARDED_BY(mu_);
/// Map from object ID to its get requests.
@ -180,6 +195,17 @@ class CoreWorkerMemoryStore {
/// Function called to report unhandled exceptions.
std::function<void(const RayObject &)> unhandled_exception_handler_;
///
/// Below information is stats.
///
/// Number of objects in the plasma store for this memory store.
int32_t num_in_plasma_ GUARDED_BY(mu_) = 0;
/// Number of objects that don't exist in the plasma store.
int32_t num_local_objects_ GUARDED_BY(mu_) = 0;
/// Number of object store memory used by this memory store. (It doesn't include plasma
/// store memory usage).
int64_t used_object_store_memory_ GUARDED_BY(mu_) = 0;
};
} // namespace ray

View file

@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/mutex.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "gtest/gtest.h"
@ -58,6 +60,68 @@ TEST(TestMemoryStore, TestReportUnhandledErrors) {
ASSERT_EQ(unhandled_count, 0);
}
TEST(TestMemoryStore, TestMemoryStoreStats) {
/// Simple validation for test memory store stats.
std::shared_ptr<CoreWorkerMemoryStore> provider =
std::make_shared<CoreWorkerMemoryStore>(nullptr, nullptr, nullptr, nullptr,
nullptr);
// Iterate through the memory store and compare the values that are obtained by
// GetMemoryStoreStatisticalData.
auto fill_expected_memory_stats = [&](MemoryStoreStats &expected_item) {
{
absl::MutexLock lock(&provider->mu_);
for (const auto &it : provider->objects_) {
if (it.second->IsInPlasmaError()) {
expected_item.num_in_plasma += 1;
} else {
expected_item.num_local_objects += 1;
expected_item.used_object_store_memory += it.second->GetSize();
}
}
}
};
RayObject obj1(rpc::ErrorType::OBJECT_IN_PLASMA);
RayObject obj2(rpc::ErrorType::TASK_EXECUTION_EXCEPTION);
RayObject obj3(rpc::ErrorType::TASK_EXECUTION_EXCEPTION);
auto id1 = ObjectID::FromRandom();
auto id2 = ObjectID::FromRandom();
auto id3 = ObjectID::FromRandom();
RAY_CHECK(provider->Put(obj1, id1));
RAY_CHECK(provider->Put(obj2, id2));
RAY_CHECK(provider->Put(obj3, id3));
provider->Delete({id3});
MemoryStoreStats expected_item;
fill_expected_memory_stats(expected_item);
MemoryStoreStats item = provider->GetMemoryStoreStatisticalData();
ASSERT_EQ(item.num_in_plasma, expected_item.num_in_plasma);
ASSERT_EQ(item.num_local_objects, expected_item.num_local_objects);
ASSERT_EQ(item.used_object_store_memory, expected_item.used_object_store_memory);
// Delete all other objects and see if stats are recorded correctly.
provider->Delete({id1, id2});
MemoryStoreStats expected_item2;
fill_expected_memory_stats(expected_item2);
item = provider->GetMemoryStoreStatisticalData();
ASSERT_EQ(item.num_in_plasma, expected_item2.num_in_plasma);
ASSERT_EQ(item.num_local_objects, expected_item2.num_local_objects);
ASSERT_EQ(item.used_object_store_memory, expected_item2.used_object_store_memory);
RAY_CHECK(provider->Put(obj1, id1));
RAY_CHECK(provider->Put(obj2, id2));
RAY_CHECK(provider->Put(obj3, id3));
MemoryStoreStats expected_item3;
fill_expected_memory_stats(expected_item3);
item = provider->GetMemoryStoreStatisticalData();
ASSERT_EQ(item.num_in_plasma, expected_item3.num_in_plasma);
ASSERT_EQ(item.num_local_objects, expected_item3.num_local_objects);
ASSERT_EQ(item.used_object_store_memory, expected_item3.used_object_store_memory);
}
} // namespace ray
int main(int argc, char **argv) {