[Tests] Add a memory usage on dask on ray tests (#19674)

This commit is contained in:
SangBin Cho 2021-10-26 06:58:26 +09:00 committed by GitHub
parent 544f774245
commit ecd5a622ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 11 deletions

View file

@ -716,7 +716,9 @@ def placement_group_assert_no_leak(pgs_created):
wait_for_condition(wait_for_resource_recovered)
def monitor_memory_usage(interval_s: int = 5, warning_threshold: float = 0.9):
def monitor_memory_usage(print_interval_s: int = 30,
record_interval_s: int = 5,
warning_threshold: float = 0.9):
"""Run the memory monitor actor that prints the memory usage.
The monitor will run on the same node as this function is called.
@ -735,21 +737,26 @@ def monitor_memory_usage(interval_s: int = 5, warning_threshold: float = 0.9):
@ray.remote(num_cpus=0)
class MemoryMonitorActor:
def __init__(self,
interval_s: float = 5,
print_interval_s: float = 20,
record_interval_s: float = 5,
warning_threshold: float = 0.9,
n: int = 10):
"""The actor that monitor the memory usage of the cluster.
Params:
interval_s (float): The interval where
print_interval_s (float): The interval where
memory usage is printed.
record_interval_s (float): The interval where
memory usage is recorded.
warning_threshold (float): The threshold where
memory warning is printed
n (int): When memory usage is printed,
top n entries are printed.
"""
# -- Interval the monitor omits the memory usage information. --
self.interval_s = interval_s
# -- Interval the monitor prints the memory usage information. --
self.print_interval_s = print_interval_s
# -- Interval the monitor records the memory usage information. --
self.record_interval_s = record_interval_s
# -- Whether or not the monitor is running. --
self.is_running = False
# -- The used_gb/total_gb threshold where warning message omits. --
@ -763,6 +770,8 @@ def monitor_memory_usage(interval_s: int = 5, warning_threshold: float = 0.9):
# -- The top n memory usage of processes
# during peak memory usage. --
self.peak_top_n_memory_usage = ""
# -- The last time memory usage was printed --
self._last_print_time = 0
# -- logger. --
logging.basicConfig(level=logging.INFO)
@ -774,6 +783,7 @@ def monitor_memory_usage(interval_s: int = 5, warning_threshold: float = 0.9):
"""
self.is_running = True
while self.is_running:
now = time.time()
used_gb, total_gb = self.monitor.get_memory_usage()
top_n_memory_usage = memory_monitor.get_top_n_memory_usage(
n=self.n)
@ -784,10 +794,12 @@ def monitor_memory_usage(interval_s: int = 5, warning_threshold: float = 0.9):
if used_gb > total_gb * self.warning_threshold:
logging.warning("The memory usage is high: "
f"{used_gb / total_gb * 100}%")
logging.info(f"Memory usage: {used_gb} / {total_gb}")
logging.info(f"Top {self.n} process memory usage:")
logging.info(top_n_memory_usage)
await asyncio.sleep(self.interval_s)
if now - self._last_print_time > self.print_interval_s:
logging.info(f"Memory usage: {used_gb} / {total_gb}")
logging.info(f"Top {self.n} process memory usage:")
logging.info(top_n_memory_usage)
self._last_print_time = now
await asyncio.sleep(self.record_interval_s)
async def stop_run(self):
"""Stop running the monitor.
@ -810,7 +822,9 @@ def monitor_memory_usage(interval_s: int = 5, warning_threshold: float = 0.9):
memory_monitor_actor = MemoryMonitorActor.options(resources={
f"node:{current_node_ip}": 0.001
}).remote(
interval_s=interval_s, warning_threshold=warning_threshold)
print_interval_s=print_interval_s,
record_interval_s=record_interval_s,
warning_threshold=warning_threshold)
print("Waiting for memory monitor actor to be ready...")
ray.get(memory_monitor_actor.ready.remote())
print("Memory monitor actor is ready now.")

View file

@ -14,6 +14,7 @@ import numpy as np
import dask.array
import xarray
from ray.util.dask import ray_dask_get
from ray._private.test_utils import monitor_memory_usage
import math
import json
"""
@ -442,6 +443,7 @@ def main():
# Connect to the Ray cluster
ray.init(address="auto")
monitor_actor = monitor_memory_usage()
# Save all the Xarrays to disk; this will trigger
# Dask computations on Ray.
@ -452,9 +454,18 @@ def main():
batch_size=test_spec.batch_size,
ray_scheduler=ray_dask_get,
)
ray.get(monitor_actor.stop_run.remote())
used_gb, usage = ray.get(monitor_actor.get_peak_memory_info.remote())
print(f"Peak memory usage: {round(used_gb, 2)}GB")
print(f"Peak memory usage per processes:\n {usage}")
print(ray.internal.internal_api.memory_summary(stats_only=True))
with open(os.environ["TEST_OUTPUT_JSON"], "w") as f:
f.write(json.dumps({"success": 1}))
f.write(
json.dumps({
"success": 1,
"_peak_memory": round(used_gb, 2),
"_peak_process_memory": usage
}))
if __name__ == "__main__":