[wingman -> tune] Add system performance tracking (#4924)

This commit is contained in:
Dušan Josipović 2019-07-06 09:57:35 +02:00 committed by Richard Liaw
parent c3e9d94b18
commit e9b88dcbed
4 changed files with 97 additions and 6 deletions

View file

@ -70,6 +70,7 @@ COMMON_CONFIG = {
# Execute TF loss functions in eager mode. This is currently experimental
# and only really works with the basic PG algorithm.
"use_eager": False,
"log_sys_usage": True,
# === Policy ===
# Arguments to pass to model. See models/catalog.py for a full list of the

View file

@ -87,11 +87,13 @@ class Experiment(object):
if sync_function:
_raise_deprecation_note(
"sync_function", "sync_to_driver", soft=False)
config = config or {}
run_identifier = Experiment._register_if_needed(run)
spec = {
"run": run_identifier,
"stop": stop or {},
"config": config or {},
"config": config,
"resources_per_trial": resources_per_trial,
"num_samples": num_samples,
"local_dir": os.path.expanduser(local_dir or DEFAULT_RESULTS_DIR),

View file

@ -22,6 +22,8 @@ from ray.tune.result import (DEFAULT_RESULTS_DIR, TIME_THIS_ITER_S,
EPISODES_THIS_ITER, EPISODES_TOTAL,
TRAINING_ITERATION, RESULT_DUPLICATE)
from ray.tune.util import UtilMonitor
logger = logging.getLogger(__name__)
@ -64,6 +66,7 @@ class Trainable(object):
self._experiment_id = uuid.uuid4().hex
self.config = config or {}
log_sys_usage = self.config.get("log_sys_usage", False)
if logger_creator:
self._result_logger = logger_creator(self.config)
@ -86,6 +89,7 @@ class Trainable(object):
self._restored = False
self._setup(copy.deepcopy(self.config))
self._local_ip = ray.services.get_node_ip_address()
self._monitor = UtilMonitor(start=log_sys_usage)
@classmethod
def default_resource_request(cls, config):
@ -205,6 +209,10 @@ class Trainable(object):
timesteps_since_restore=self._timesteps_since_restore,
iterations_since_restore=self._iterations_since_restore)
monitor_data = self._monitor.get_data()
if monitor_data:
result.update(monitor_data)
self._log_result(result)
return result
@ -442,7 +450,6 @@ class Trainable(object):
Args:
result (dict): Training result returned by _train().
"""
self._result_logger.on_result(result)
def _stop(self):

View file

@ -2,22 +2,103 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import base64
import fnmatch
import os
import copy
import numpy as np
import fnmatch
import logging
import os
import threading
import time
from collections import defaultdict
from threading import Thread
import numpy as np
import ray
logger = logging.getLogger(__name__)
try:
import psutil
except ImportError:
psutil = None
try:
import GPUtil
except ImportError:
GPUtil = None
_pinned_objects = []
PINNED_OBJECT_PREFIX = "ray.tune.PinnedObject:"
class UtilMonitor(Thread):
"""Class for system usage utilization monitoring.
It keeps track of CPU, RAM, GPU, VRAM usage (each gpu separately) by
pinging for information every x seconds in a separate thread.
Requires psutil and GPUtil to be installed. Can be enabled with
tune.run(config={"log_sys_usage": True}).
"""
def __init__(self, start=True, delay=0.7):
self.stopped = True
if GPUtil is None:
logger.warning("Install gputil for GPU system monitoring.")
if psutil is None:
logger.warning("Install psutil to monitor system performance.")
if GPUtil is None and psutil is None:
return
super(UtilMonitor, self).__init__()
self.delay = delay # Time between calls to GPUtil
self.values = defaultdict(list)
self.lock = threading.Lock()
self.daemon = True
if start:
self.start()
def _read_utilization(self):
with self.lock:
if psutil is not None:
self.values["cpu_util_percent"].append(
float(psutil.cpu_percent(interval=None)))
self.values["ram_util_percent"].append(
float(getattr(psutil.virtual_memory(), "percent")))
if GPUtil is not None:
for gpu in GPUtil.getGPUs():
self.values["gpu_util_percent" + str(gpu.id)].append(
float(gpu.load))
self.values["vram_util_percent" + str(gpu.id)].append(
float(gpu.memoryUtil))
def get_data(self):
if self.stopped:
return {}
with self.lock:
ret_values = copy.deepcopy(self.values)
for key, val in self.values.items():
del val[:]
return {
"perf": {
k: np.mean(v)
for k, v in ret_values.items() if len(v) > 0
}
}
def run(self):
self.stopped = False
while not self.stopped:
self._read_utilization()
time.sleep(self.delay)
def stop(self):
self.stopped = True
def pin_in_object_store(obj):
"""Pin an object in the object store.