diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index de5e89c75..43af66aba 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -112,6 +112,10 @@ def store_stats_summary(reply): round( 100 * reply.store_stats.object_store_bytes_used / reply.store_stats.object_store_bytes_avail, 2))) + if reply.store_stats.object_store_bytes_fallback > 0: + store_summary += ("Plasma filesystem mmap usage: {} MiB\n".format( + int(reply.store_stats.object_store_bytes_fallback / + (1024 * 1024)))) if reply.store_stats.spill_time_total_s > 0: store_summary += ( "Spilled {} MiB, {} objects, avg write throughput {} MiB/s\n". diff --git a/python/ray/tests/test_plasma_unlimited.py b/python/ray/tests/test_plasma_unlimited.py index 4a719cc90..2b8500168 100644 --- a/python/ray/tests/test_plasma_unlimited.py +++ b/python/ray/tests/test_plasma_unlimited.py @@ -17,7 +17,7 @@ def _init_ray(): _system_config={"plasma_unlimited": 1}) -def _check_spilled_mb(address, spilled=None, restored=None): +def _check_spilled_mb(address, spilled=None, restored=None, fallback=None): def ok(): s = memory_summary(address=address["redis_address"], stats_only=True) print(s) @@ -33,6 +33,13 @@ def _check_spilled_mb(address, spilled=None, restored=None): else: if "Spilled" in s: return False + if fallback: + if "Plasma filesystem mmap usage: {} MiB".format( + fallback) not in s: + return False + else: + if "Plasma filesystem mmap usage:" in s: + return False return True wait_for_condition(ok, timeout=3, retry_interval_ms=1000) @@ -50,7 +57,7 @@ def test_fallback_when_spilling_impossible_on_put(): x2p = ray.get(x2) del x1p del x2p - _check_spilled_mb(address, spilled=None) + _check_spilled_mb(address, spilled=None, fallback=400) finally: ray.shutdown() @@ -82,7 +89,7 @@ def test_fallback_when_spilling_impossible_on_get(): _check_spilled_mb(address, spilled=800, restored=400) # x2 will be restored, triggering a fallback allocation. x2p = ray.get(x2) - _check_spilled_mb(address, spilled=800, restored=800) + _check_spilled_mb(address, spilled=800, restored=800, fallback=400) del x1p del x2p finally: @@ -130,7 +137,7 @@ def test_task_unlimited(): # round 1 ray.get(consume.remote(refs)) - _check_spilled_mb(address, spilled=500, restored=400) + _check_spilled_mb(address, spilled=500, restored=400, fallback=400) del x2p del sentinel @@ -158,7 +165,7 @@ def test_task_unlimited_multiget_args(): return os.getpid() ray.get([consume.remote(refs) for _ in range(1000)]) - _check_spilled_mb(address, spilled=2000, restored=2000) + _check_spilled_mb(address, spilled=2000, restored=2000, fallback=2000) del x2p finally: ray.shutdown() diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 17c8d2b2d..3013df04f 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -901,6 +901,7 @@ std::string ObjectManager::DebugString() const { void ObjectManager::RecordMetrics() const { stats::ObjectStoreAvailableMemory().Record(config_.object_store_memory - used_memory_); stats::ObjectStoreUsedMemory().Record(used_memory_); + stats::ObjectStoreFallbackMemory().Record(plasma::PlasmaAllocator::FallbackAllocated()); stats::ObjectStoreLocalObjects().Record(local_objects_.size()); stats::ObjectManagerPullRequests().Record(pull_manager_->NumActiveRequests()); } @@ -908,6 +909,7 @@ void ObjectManager::RecordMetrics() const { void ObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const { auto stats = reply->mutable_store_stats(); stats->set_object_store_bytes_used(used_memory_); + stats->set_object_store_bytes_fallback(plasma::PlasmaAllocator::FallbackAllocated()); stats->set_object_store_bytes_avail(config_.object_store_memory); stats->set_num_local_objects(local_objects_.size()); stats->set_consumed_bytes(plasma::plasma_store_runner->GetConsumedBytes()); diff --git a/src/ray/object_manager/plasma/dlmalloc.cc b/src/ray/object_manager/plasma/dlmalloc.cc index 87fbcd5a4..e800c5f61 100644 --- a/src/ray/object_manager/plasma/dlmalloc.cc +++ b/src/ray/object_manager/plasma/dlmalloc.cc @@ -73,6 +73,11 @@ constexpr int GRANULARITY_MULTIPLIER = 2; // Combined with MAP_POPULATE, this can guarantee we never run into SIGBUS errors. static bool allocated_once = false; +// Populated on the first allocation so we can track which allocations fall within +// the initial region vs outside. +static char *initial_region_ptr = nullptr; +static size_t initial_region_size = 0; + static void *pointer_advance(void *p, ptrdiff_t n) { return (unsigned char *)p + n; } static void *pointer_retreat(void *p, ptrdiff_t n) { return (unsigned char *)p - n; } @@ -141,8 +146,12 @@ void create_and_mmap_buffer(int64_t size, void **pointer, int *fd) { RAY_LOG(ERROR) << " (this probably means you have to increase /proc/sys/vm/nr_hugepages)"; } + } else if (!allocated_once) { + initial_region_ptr = static_cast(*pointer); + initial_region_size = size; } } + #endif void *fake_mmap(size_t size) { @@ -211,6 +220,14 @@ int fake_munmap(void *addr, int64_t size) { void SetMallocGranularity(int value) { change_mparam(M_GRANULARITY, value); } +// Returns whether the given pointer is outside the initially allocated region. +bool IsOutsideInitialAllocation(void *p) { + if (initial_region_ptr == nullptr) { + return false; + } + return (p < initial_region_ptr) || (p >= (initial_region_ptr + initial_region_size)); +} + const PlasmaStoreInfo *plasma_config; } // namespace plasma diff --git a/src/ray/object_manager/plasma/plasma_allocator.cc b/src/ray/object_manager/plasma/plasma_allocator.cc index 6e5c373b0..1fe5bbeea 100644 --- a/src/ray/object_manager/plasma/plasma_allocator.cc +++ b/src/ray/object_manager/plasma/plasma_allocator.cc @@ -23,6 +23,8 @@ namespace plasma { +bool IsOutsideInitialAllocation(void *ptr); + extern "C" { void *dlmemalign(size_t alignment, size_t bytes); void dlfree(void *mem); @@ -35,6 +37,7 @@ const int M_MMAP_THRESHOLD = -3; int64_t PlasmaAllocator::footprint_limit_ = 0; int64_t PlasmaAllocator::allocated_ = 0; +int64_t PlasmaAllocator::fallback_allocated_ = 0; void *PlasmaAllocator::Memalign(size_t alignment, size_t bytes) { if (!RayConfig::instance().plasma_unlimited()) { @@ -54,7 +57,6 @@ void *PlasmaAllocator::Memalign(size_t alignment, size_t bytes) { return mem; } -// TODO(ekl) we should track these allocations separately from the overall footprint. void *PlasmaAllocator::DiskMemalignUnlimited(size_t alignment, size_t bytes) { // Forces allocation as a separate file. RAY_CHECK(dlmallopt(M_MMAP_THRESHOLD, 0)); @@ -64,13 +66,18 @@ void *PlasmaAllocator::DiskMemalignUnlimited(size_t alignment, size_t bytes) { if (!mem) { return nullptr; } + RAY_CHECK(IsOutsideInitialAllocation(mem)); allocated_ += bytes; + fallback_allocated_ += bytes; return mem; } void PlasmaAllocator::Free(void *mem, size_t bytes) { dlfree(mem); allocated_ -= bytes; + if (RayConfig::instance().plasma_unlimited() && IsOutsideInitialAllocation(mem)) { + fallback_allocated_ -= bytes; + } } void PlasmaAllocator::SetFootprintLimit(size_t bytes) { @@ -81,4 +88,6 @@ int64_t PlasmaAllocator::GetFootprintLimit() { return footprint_limit_; } int64_t PlasmaAllocator::Allocated() { return allocated_; } +int64_t PlasmaAllocator::FallbackAllocated() { return fallback_allocated_; } + } // namespace plasma diff --git a/src/ray/object_manager/plasma/plasma_allocator.h b/src/ray/object_manager/plasma/plasma_allocator.h index eb589814c..fb5adebec 100644 --- a/src/ray/object_manager/plasma/plasma_allocator.h +++ b/src/ray/object_manager/plasma/plasma_allocator.h @@ -58,8 +58,13 @@ class PlasmaAllocator { /// \return Number of bytes allocated by Plasma so far. static int64_t Allocated(); + /// Get the number of bytes fallback allocated by Plasma so far. + /// \return Number of bytes fallback allocated by Plasma so far. + static int64_t FallbackAllocated(); + private: static int64_t allocated_; + static int64_t fallback_allocated_; static int64_t footprint_limit_; }; diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 87fb09a83..f9d95c5de 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -137,10 +137,12 @@ message ObjectStoreStats { int64 object_store_bytes_used = 7; // The max capacity of the object store. int64 object_store_bytes_avail = 8; + // The number of bytes allocated from the filesystem (fallback allocs). + int64 object_store_bytes_fallback = 9; // The number of local objects total. - int64 num_local_objects = 9; + int64 num_local_objects = 10; // The number of plasma object bytes that are consumed by core workers. - int64 consumed_bytes = 10; + int64 consumed_bytes = 11; } message GetNodeStatsReply { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 11c2cb9f0..e503c6cf6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2208,6 +2208,9 @@ rpc::ObjectStoreStats AccumulateStoreStats( cur_store.object_store_bytes_used()); store_stats.set_object_store_bytes_avail(store_stats.object_store_bytes_avail() + cur_store.object_store_bytes_avail()); + store_stats.set_object_store_bytes_fallback( + store_stats.object_store_bytes_fallback() + + cur_store.object_store_bytes_fallback()); store_stats.set_num_local_objects(store_stats.num_local_objects() + cur_store.num_local_objects()); store_stats.set_consumed_bytes(store_stats.consumed_bytes() + diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index 7b968cbc4..28904bf88 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -59,6 +59,10 @@ static Gauge ObjectStoreUsedMemory( "object_store_used_memory", "Amount of memory currently occupied in the object store.", "bytes"); +static Gauge ObjectStoreFallbackMemory( + "object_store_fallback_memory", + "Amount of memory in fallback allocations in the filesystem.", "bytes"); + static Gauge ObjectStoreLocalObjects("object_store_num_local_objects", "Number of objects currently in the object store.", "objects");