Add fallback allocator stats to "ray memory" (#16362)

This commit is contained in:
Eric Liang 2021-06-10 18:33:59 -07:00 committed by GitHub
parent 8d56a36d28
commit 47bbca04be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 61 additions and 8 deletions

View file

@ -112,6 +112,10 @@ def store_stats_summary(reply):
round(
100 * reply.store_stats.object_store_bytes_used /
reply.store_stats.object_store_bytes_avail, 2)))
if reply.store_stats.object_store_bytes_fallback > 0:
store_summary += ("Plasma filesystem mmap usage: {} MiB\n".format(
int(reply.store_stats.object_store_bytes_fallback /
(1024 * 1024))))
if reply.store_stats.spill_time_total_s > 0:
store_summary += (
"Spilled {} MiB, {} objects, avg write throughput {} MiB/s\n".

View file

@ -17,7 +17,7 @@ def _init_ray():
_system_config={"plasma_unlimited": 1})
def _check_spilled_mb(address, spilled=None, restored=None):
def _check_spilled_mb(address, spilled=None, restored=None, fallback=None):
def ok():
s = memory_summary(address=address["redis_address"], stats_only=True)
print(s)
@ -33,6 +33,13 @@ def _check_spilled_mb(address, spilled=None, restored=None):
else:
if "Spilled" in s:
return False
if fallback:
if "Plasma filesystem mmap usage: {} MiB".format(
fallback) not in s:
return False
else:
if "Plasma filesystem mmap usage:" in s:
return False
return True
wait_for_condition(ok, timeout=3, retry_interval_ms=1000)
@ -50,7 +57,7 @@ def test_fallback_when_spilling_impossible_on_put():
x2p = ray.get(x2)
del x1p
del x2p
_check_spilled_mb(address, spilled=None)
_check_spilled_mb(address, spilled=None, fallback=400)
finally:
ray.shutdown()
@ -82,7 +89,7 @@ def test_fallback_when_spilling_impossible_on_get():
_check_spilled_mb(address, spilled=800, restored=400)
# x2 will be restored, triggering a fallback allocation.
x2p = ray.get(x2)
_check_spilled_mb(address, spilled=800, restored=800)
_check_spilled_mb(address, spilled=800, restored=800, fallback=400)
del x1p
del x2p
finally:
@ -130,7 +137,7 @@ def test_task_unlimited():
# round 1
ray.get(consume.remote(refs))
_check_spilled_mb(address, spilled=500, restored=400)
_check_spilled_mb(address, spilled=500, restored=400, fallback=400)
del x2p
del sentinel
@ -158,7 +165,7 @@ def test_task_unlimited_multiget_args():
return os.getpid()
ray.get([consume.remote(refs) for _ in range(1000)])
_check_spilled_mb(address, spilled=2000, restored=2000)
_check_spilled_mb(address, spilled=2000, restored=2000, fallback=2000)
del x2p
finally:
ray.shutdown()

View file

@ -901,6 +901,7 @@ std::string ObjectManager::DebugString() const {
void ObjectManager::RecordMetrics() const {
stats::ObjectStoreAvailableMemory().Record(config_.object_store_memory - used_memory_);
stats::ObjectStoreUsedMemory().Record(used_memory_);
stats::ObjectStoreFallbackMemory().Record(plasma::PlasmaAllocator::FallbackAllocated());
stats::ObjectStoreLocalObjects().Record(local_objects_.size());
stats::ObjectManagerPullRequests().Record(pull_manager_->NumActiveRequests());
}
@ -908,6 +909,7 @@ void ObjectManager::RecordMetrics() const {
void ObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const {
auto stats = reply->mutable_store_stats();
stats->set_object_store_bytes_used(used_memory_);
stats->set_object_store_bytes_fallback(plasma::PlasmaAllocator::FallbackAllocated());
stats->set_object_store_bytes_avail(config_.object_store_memory);
stats->set_num_local_objects(local_objects_.size());
stats->set_consumed_bytes(plasma::plasma_store_runner->GetConsumedBytes());

View file

@ -73,6 +73,11 @@ constexpr int GRANULARITY_MULTIPLIER = 2;
// Combined with MAP_POPULATE, this can guarantee we never run into SIGBUS errors.
static bool allocated_once = false;
// Populated on the first allocation so we can track which allocations fall within
// the initial region vs outside.
static char *initial_region_ptr = nullptr;
static size_t initial_region_size = 0;
static void *pointer_advance(void *p, ptrdiff_t n) { return (unsigned char *)p + n; }
static void *pointer_retreat(void *p, ptrdiff_t n) { return (unsigned char *)p - n; }
@ -141,8 +146,12 @@ void create_and_mmap_buffer(int64_t size, void **pointer, int *fd) {
RAY_LOG(ERROR)
<< " (this probably means you have to increase /proc/sys/vm/nr_hugepages)";
}
} else if (!allocated_once) {
initial_region_ptr = static_cast<char *>(*pointer);
initial_region_size = size;
}
}
#endif
void *fake_mmap(size_t size) {
@ -211,6 +220,14 @@ int fake_munmap(void *addr, int64_t size) {
void SetMallocGranularity(int value) { change_mparam(M_GRANULARITY, value); }
// Returns whether the given pointer is outside the initially allocated region.
bool IsOutsideInitialAllocation(void *p) {
if (initial_region_ptr == nullptr) {
return false;
}
return (p < initial_region_ptr) || (p >= (initial_region_ptr + initial_region_size));
}
const PlasmaStoreInfo *plasma_config;
} // namespace plasma

View file

@ -23,6 +23,8 @@
namespace plasma {
bool IsOutsideInitialAllocation(void *ptr);
extern "C" {
void *dlmemalign(size_t alignment, size_t bytes);
void dlfree(void *mem);
@ -35,6 +37,7 @@ const int M_MMAP_THRESHOLD = -3;
int64_t PlasmaAllocator::footprint_limit_ = 0;
int64_t PlasmaAllocator::allocated_ = 0;
int64_t PlasmaAllocator::fallback_allocated_ = 0;
void *PlasmaAllocator::Memalign(size_t alignment, size_t bytes) {
if (!RayConfig::instance().plasma_unlimited()) {
@ -54,7 +57,6 @@ void *PlasmaAllocator::Memalign(size_t alignment, size_t bytes) {
return mem;
}
// TODO(ekl) we should track these allocations separately from the overall footprint.
void *PlasmaAllocator::DiskMemalignUnlimited(size_t alignment, size_t bytes) {
// Forces allocation as a separate file.
RAY_CHECK(dlmallopt(M_MMAP_THRESHOLD, 0));
@ -64,13 +66,18 @@ void *PlasmaAllocator::DiskMemalignUnlimited(size_t alignment, size_t bytes) {
if (!mem) {
return nullptr;
}
RAY_CHECK(IsOutsideInitialAllocation(mem));
allocated_ += bytes;
fallback_allocated_ += bytes;
return mem;
}
void PlasmaAllocator::Free(void *mem, size_t bytes) {
dlfree(mem);
allocated_ -= bytes;
if (RayConfig::instance().plasma_unlimited() && IsOutsideInitialAllocation(mem)) {
fallback_allocated_ -= bytes;
}
}
void PlasmaAllocator::SetFootprintLimit(size_t bytes) {
@ -81,4 +88,6 @@ int64_t PlasmaAllocator::GetFootprintLimit() { return footprint_limit_; }
int64_t PlasmaAllocator::Allocated() { return allocated_; }
int64_t PlasmaAllocator::FallbackAllocated() { return fallback_allocated_; }
} // namespace plasma

View file

@ -58,8 +58,13 @@ class PlasmaAllocator {
/// \return Number of bytes allocated by Plasma so far.
static int64_t Allocated();
/// Get the number of bytes fallback allocated by Plasma so far.
/// \return Number of bytes fallback allocated by Plasma so far.
static int64_t FallbackAllocated();
private:
static int64_t allocated_;
static int64_t fallback_allocated_;
static int64_t footprint_limit_;
};

View file

@ -137,10 +137,12 @@ message ObjectStoreStats {
int64 object_store_bytes_used = 7;
// The max capacity of the object store.
int64 object_store_bytes_avail = 8;
// The number of bytes allocated from the filesystem (fallback allocs).
int64 object_store_bytes_fallback = 9;
// The number of local objects total.
int64 num_local_objects = 9;
int64 num_local_objects = 10;
// The number of plasma object bytes that are consumed by core workers.
int64 consumed_bytes = 10;
int64 consumed_bytes = 11;
}
message GetNodeStatsReply {

View file

@ -2208,6 +2208,9 @@ rpc::ObjectStoreStats AccumulateStoreStats(
cur_store.object_store_bytes_used());
store_stats.set_object_store_bytes_avail(store_stats.object_store_bytes_avail() +
cur_store.object_store_bytes_avail());
store_stats.set_object_store_bytes_fallback(
store_stats.object_store_bytes_fallback() +
cur_store.object_store_bytes_fallback());
store_stats.set_num_local_objects(store_stats.num_local_objects() +
cur_store.num_local_objects());
store_stats.set_consumed_bytes(store_stats.consumed_bytes() +

View file

@ -59,6 +59,10 @@ static Gauge ObjectStoreUsedMemory(
"object_store_used_memory",
"Amount of memory currently occupied in the object store.", "bytes");
static Gauge ObjectStoreFallbackMemory(
"object_store_fallback_memory",
"Amount of memory in fallback allocations in the filesystem.", "bytes");
static Gauge ObjectStoreLocalObjects("object_store_num_local_objects",
"Number of objects currently in the object store.",
"objects");