From 2e30f7ba386e716bf80f019dcd473b67d83abb95 Mon Sep 17 00:00:00 2001 From: Daniel Edgecumbe <45787862+ls-daniel@users.noreply.github.com> Date: Thu, 21 Feb 2019 08:10:04 +0000 Subject: [PATCH] Add a web dashboard for monitoring node resource usage (#4066) --- .travis/check-git-clang-format-output.sh | 2 +- python/ray/autoscaler/commands.py | 78 +++-- python/ray/dashboard/dashboard.py | 348 +++++++++++++++++++++++ python/ray/dashboard/index.html | 99 +++++++ python/ray/dashboard/res/main.css | 58 ++++ python/ray/dashboard/res/main.js | 265 +++++++++++++++++ python/ray/gcs_utils.py | 1 + python/ray/node.py | 59 +++- python/ray/ray_constants.py | 7 + python/ray/reporter.py | 228 +++++++++++++++ python/ray/services.py | 111 ++++++++ python/setup.py | 1 + test/tempfile_test.py | 26 +- 13 files changed, 1239 insertions(+), 44 deletions(-) create mode 100644 python/ray/dashboard/dashboard.py create mode 100644 python/ray/dashboard/index.html create mode 100644 python/ray/dashboard/res/main.css create mode 100644 python/ray/dashboard/res/main.js create mode 100644 python/ray/reporter.py diff --git a/.travis/check-git-clang-format-output.sh b/.travis/check-git-clang-format-output.sh index a6872cfc3..1b78e8175 100755 --- a/.travis/check-git-clang-format-output.sh +++ b/.travis/check-git-clang-format-output.sh @@ -8,7 +8,7 @@ else base_commit="$TRAVIS_BRANCH" echo "Running clang-format against branch $base_commit, with hash $(git rev-parse $base_commit)" fi -output="$(.travis/git-clang-format --binary clang-format-3.8 --commit $base_commit --diff --exclude '(.*thirdparty/|.*redismodule.h|.*webui*)')" +output="$(.travis/git-clang-format --binary clang-format-3.8 --commit $base_commit --diff --exclude '(.*thirdparty/|.*redismodule.h|.*.js)')" if [ "$output" == "no modified files to format" ] || [ "$output" == "clang-format did not modify any files" ] ; then echo "clang-format passed." exit 0 diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 074b58fc8..6032d814f 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -127,30 +127,34 @@ def kill_node(config_file, yes, override_cluster_name): confirm("This will kill a node in your cluster", yes) provider = get_node_provider(config["provider"], config["cluster_name"]) - nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) - node = random.choice(nodes) - logger.info("kill_node: Terminating worker {}".format(node)) + try: + nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) + node = random.choice(nodes) + logger.info("kill_node: Terminating worker {}".format(node)) - updater = NodeUpdaterThread( - node_id=node, - provider_config=config["provider"], - provider=provider, - auth_config=config["auth"], - cluster_name=config["cluster_name"], - file_mounts=config["file_mounts"], - initialization_commands=[], - setup_commands=[], - runtime_hash="") + updater = NodeUpdaterThread( + node_id=node, + provider_config=config["provider"], + provider=provider, + auth_config=config["auth"], + cluster_name=config["cluster_name"], + file_mounts=config["file_mounts"], + initialization_commands=[], + setup_commands=[], + runtime_hash="") - _exec(updater, "ray stop", False, False) + _exec(updater, "ray stop", False, False) - time.sleep(5) + time.sleep(5) - iip = provider.internal_ip(node) - if iip: - return iip + if config.get("provider", {}).get("use_internal_ips", False) is True: + node_ip = provider.internal_ip(node) + else: + node_ip = provider.external_ip(node) + finally: + provider.cleanup() - return provider.external_ip(node) + return node_ip def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, @@ -246,14 +250,18 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, # Refresh the node cache so we see the external ip if available provider.nodes(head_node_tags) + if config.get("provider", {}).get("use_internal_ips", False) is True: + head_node_ip = provider.internal_ip(head_node) + else: + head_node_ip = provider.external_ip(head_node) + if updater.exitcode != 0: logger.error("get_or_create_head_node: " - "Updating {} failed".format( - provider.external_ip(head_node))) + "Updating {} failed".format(head_node_ip)) sys.exit(1) - logger.info("get_or_create_head_node: " - "Head node up-to-date, IP address is: {}".format( - provider.external_ip(head_node))) + logger.info( + "get_or_create_head_node: " + "Head node up-to-date, IP address is: {}".format(head_node_ip)) monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*" use_docker = bool(config["docker"]["container_name"]) @@ -268,10 +276,11 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, quote(monitor_str), modifiers)) print("To open a console on the cluster:\n\n" " ray attach {}{}\n".format(config_file, modifiers)) + print("To ssh manually to the cluster, run:\n\n" " ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"], config["auth"]["ssh_user"], - provider.external_ip(head_node))) + head_node_ip)) finally: provider.cleanup() @@ -456,11 +465,14 @@ def get_head_node_ip(config_file, override_cluster_name): provider = get_node_provider(config["provider"], config["cluster_name"]) try: head_node = _get_head_node(config, config_file, override_cluster_name) - ip = provider.external_ip(head_node) + if config.get("provider", {}).get("use_internal_ips", False) is True: + head_node_ip = provider.internal_ip(head_node) + else: + head_node_ip = provider.external_ip(head_node) finally: provider.cleanup() - return ip + return head_node_ip def get_worker_node_ips(config_file, override_cluster_name): @@ -469,9 +481,17 @@ def get_worker_node_ips(config_file, override_cluster_name): config = yaml.load(open(config_file).read()) if override_cluster_name is not None: config["cluster_name"] = override_cluster_name + provider = get_node_provider(config["provider"], config["cluster_name"]) - nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) - return [provider.external_ip(node) for node in nodes] + try: + nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) + + if config.get("provider", {}).get("use_internal_ips", False) is True: + return [provider.internal_ip(node) for node in nodes] + else: + return [provider.external_ip(node) for node in nodes] + finally: + provider.cleanup() def _get_head_node(config, diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py new file mode 100644 index 000000000..97e13de02 --- /dev/null +++ b/python/ray/dashboard/dashboard.py @@ -0,0 +1,348 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +try: + import aiohttp.web +except ModuleNotFoundError: + print("The reporter requires aiohttp to run.") + import sys + sys.exit(1) + +import argparse +import datetime +import json +import logging +import os +import threading +import traceback +import yaml + +from pathlib import Path +from collections import Counter +from operator import itemgetter +from typing import Dict + +import ray.ray_constants as ray_constants +import ray.utils + +# Logger for this module. It should be configured at the entry point +# into the program using Ray. Ray provides a default configuration at +# entry/init points. +logger = logging.getLogger(__name__) + + +def to_unix_time(dt): + return (dt - datetime.datetime(1970, 1, 1)).total_seconds() + + +class Dashboard(object): + """A dashboard process for monitoring Ray nodes. + + This dashboard is made up of a REST API which collates data published by + Reporter processes on nodes into a json structure, and a webserver + which polls said API for display purposes. + + Attributes: + redis_client: A client used to communicate with the Redis server. + """ + + def __init__(self, + redis_address, + http_port, + token, + temp_dir, + redis_password=None): + """Initialize the dashboard object.""" + self.ip = ray.services.get_node_ip_address() + self.port = http_port + self.token = token + self.temp_dir = temp_dir + self.node_stats = NodeStats(redis_address, redis_password) + + self.app = aiohttp.web.Application(middlewares=[self.auth_middleware]) + self.setup_routes() + + @aiohttp.web.middleware + async def auth_middleware(self, req, handler): + def valid_token(req): + # If the cookie token is correct, accept that. + try: + if req.cookies["token"] == self.token: + return True + except KeyError: + pass + + # If the query token is correct, accept that. + try: + if req.query["token"] == self.token: + return True + except KeyError: + pass + + # Reject. + logger.warning("Dashboard: rejected an invalid token") + return False + + # Check that the token is present, either in query or as cookie. + if not valid_token(req): + return aiohttp.web.Response(status=401, text="401 Unauthorized") + + resp = await handler(req) + resp.cookies["token"] = self.token + return resp + + def setup_routes(self): + def forbidden() -> aiohttp.web.Response: + return aiohttp.web.Response(status=403, text="403 Forbidden") + + def get_forbidden(_) -> aiohttp.web.Response: + return forbidden() + + async def get_index(req) -> aiohttp.web.Response: + return aiohttp.web.FileResponse( + os.path.join( + os.path.dirname(os.path.abspath(__file__)), "index.html")) + + async def get_resource(req) -> aiohttp.web.Response: + try: + path = req.match_info["x"] + except KeyError: + return forbidden() + + if path not in ["main.css", "main.js"]: + return forbidden() + + return aiohttp.web.FileResponse( + os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "res/{}".format(path))) + + async def json_response(result=None, error=None, + ts=None) -> aiohttp.web.Response: + if ts is None: + ts = datetime.datetime.utcnow() + + return aiohttp.web.json_response({ + "result": result, + "timestamp": to_unix_time(ts), + "error": error, + }) + + async def ray_config(_) -> aiohttp.web.Response: + try: + with open( + Path("~/ray_bootstrap_config.yaml").expanduser()) as f: + cfg = yaml.load(f) + except Exception: + return await json_response(error="No config") + + D = { + "min_workers": cfg["min_workers"], + "max_workers": cfg["max_workers"], + "initial_workers": cfg["initial_workers"], + "idle_timeout_minutes": cfg["idle_timeout_minutes"], + } + + try: + D["head_type"] = cfg["head_node"]["InstanceType"] + except KeyError: + D["head_type"] = "unknown" + + try: + D["worker_type"] = cfg["worker_nodes"]["InstanceType"] + except KeyError: + D["worker_type"] = "unknown" + + return await json_response(result=D) + + async def node_info(req) -> aiohttp.web.Response: + now = datetime.datetime.utcnow() + D = self.node_stats.get_node_stats() + return await json_response(result=D, ts=now) + + self.app.router.add_get("/", get_index) + self.app.router.add_get("/index.html", get_index) + self.app.router.add_get("/index.htm", get_index) + self.app.router.add_get("/res/{x}", get_resource) + + self.app.router.add_get("/api/node_info", node_info) + self.app.router.add_get("/api/super_client_table", node_info) + self.app.router.add_get("/api/ray_config", ray_config) + + self.app.router.add_get("/{_}", get_forbidden) + + def log_dashboard_url(self): + url = "http://{}:{}?token={}".format(self.ip, self.port, self.token) + with open(os.path.join(self.temp_dir, "dashboard_url"), "w") as f: + f.write(url) + logger.info("Dashboard running on {}".format(url)) + + def run(self): + self.log_dashboard_url() + self.node_stats.start() + aiohttp.web.run_app(self.app, host=self.ip, port=self.port) + + +class NodeStats(threading.Thread): + def __init__(self, redis_address, redis_password=None): + self.redis_key = "{}.*".format(ray.gcs_utils.REPORTER_CHANNEL) + self.redis_client = ray.services.create_redis_client( + redis_address, password=redis_password) + + self._node_stats = {} + self._node_stats_lock = threading.Lock() + super().__init__() + + def calculate_totals(self) -> Dict: + total_boot_time = 0 + total_cpus = 0 + total_workers = 0 + total_load = [0.0, 0.0, 0.0] + total_storage_avail = 0 + total_storage_total = 0 + total_ram_avail = 0 + total_ram_total = 0 + total_sent = 0 + total_recv = 0 + + for v in self._node_stats.values(): + total_boot_time += v["boot_time"] + total_cpus += v["cpus"][0] + total_workers += len(v["workers"]) + total_load[0] += v["load_avg"][0][0] + total_load[1] += v["load_avg"][0][1] + total_load[2] += v["load_avg"][0][2] + total_storage_avail += v["disk"]["/"]["free"] + total_storage_total += v["disk"]["/"]["total"] + total_ram_avail += v["mem"][1] + total_ram_total += v["mem"][0] + total_sent += v["net"][0] + total_recv += v["net"][1] + + return { + "boot_time": total_boot_time, + "n_workers": total_workers, + "n_cores": total_cpus, + "m_avail": total_ram_avail, + "m_total": total_ram_total, + "d_avail": total_storage_avail, + "d_total": total_storage_total, + "load": total_load, + "n_sent": total_sent, + "n_recv": total_recv, + } + + def calculate_tasks(self) -> Counter: + return Counter( + (x["name"] + for y in (v["workers"] for v in self._node_stats.values()) + for x in y)) + + def purge_outdated_stats(self): + def current(then, now): + if (now - then) > 5: + return False + + return True + + now = to_unix_time(datetime.datetime.utcnow()) + self._node_stats = { + k: v + for k, v in self._node_stats.items() if current(v["now"], now) + } + + def get_node_stats(self) -> Dict: + with self._node_stats_lock: + self.purge_outdated_stats() + node_stats = sorted( + (v for v in self._node_stats.values()), + key=itemgetter("boot_time")) + return { + "totals": self.calculate_totals(), + "tasks": self.calculate_tasks(), + "clients": node_stats, + } + + def run(self): + p = self.redis_client.pubsub(ignore_subscribe_messages=True) + p.psubscribe(self.redis_key) + logger.info("NodeStats: subscribed to {}".format(self.redis_key)) + + for x in p.listen(): + try: + D = json.loads(x["data"]) + with self._node_stats_lock: + self._node_stats[D["hostname"]] = D + except Exception: + logger.exception(traceback.format_exc()) + continue + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=("Parse Redis server for the " + "dashboard to connect to.")) + parser.add_argument( + "--http-port", + required=True, + type=int, + help="The port to use for the HTTP server.") + parser.add_argument( + "--token", + required=True, + type=str, + help="The token to use for the HTTP server.") + parser.add_argument( + "--redis-address", + required=True, + type=str, + help="The address to use for Redis.") + parser.add_argument( + "--redis-password", + required=False, + type=str, + default=None, + help="the password to use for Redis") + parser.add_argument( + "--logging-level", + required=False, + type=str, + default=ray_constants.LOGGER_LEVEL, + choices=ray_constants.LOGGER_LEVEL_CHOICES, + help=ray_constants.LOGGER_LEVEL_HELP) + parser.add_argument( + "--logging-format", + required=False, + type=str, + default=ray_constants.LOGGER_FORMAT, + help=ray_constants.LOGGER_FORMAT_HELP) + parser.add_argument( + "--temp-dir", + required=False, + type=str, + default=None, + help="Specify the path of the temporary directory use by Ray process.") + args = parser.parse_args() + ray.utils.setup_logger(args.logging_level, args.logging_format) + + dashboard = Dashboard( + args.redis_address, + args.http_port, + args.token, + args.temp_dir, + redis_password=args.redis_password, + ) + + try: + dashboard.run() + except Exception as e: + # Something went wrong, so push an error to all drivers. + redis_client = ray.services.create_redis_client( + args.redis_address, password=args.redis_password) + traceback_str = ray.utils.format_error_message(traceback.format_exc()) + message = ("The dashboard on node {} failed with the following " + "error:\n{}".format(os.uname()[1], traceback_str)) + ray.utils.push_error_to_driver_through_redis( + redis_client, ray_constants.DASHBOARD_DIED_ERROR, message) + raise e diff --git a/python/ray/dashboard/index.html b/python/ray/dashboard/index.html new file mode 100644 index 000000000..395dc36ae --- /dev/null +++ b/python/ray/dashboard/index.html @@ -0,0 +1,99 @@ + + + + + + ray dashboard + + + + + + + + + + +
+ + + + + + + + + + + + + + + +
HostnameUptimeWorkersRAMDiskLoad (1m, 5m, 15m)Sent (M/s)Recv (M/s)
+ + + +

Last updated {{age}} ago

+ +
+
+ +
+ +
+ +
+
+
+ + + diff --git a/python/ray/dashboard/res/main.css b/python/ray/dashboard/res/main.css new file mode 100644 index 000000000..9b7474ae4 --- /dev/null +++ b/python/ray/dashboard/res/main.css @@ -0,0 +1,58 @@ +* { font-family: monospace; margin: 0; padding: 0; } +h1, h2 { text-align: center; margin: 1rem 0; } +h1 { font-size: 3rem; margin: 0.5rem auto; text-align: center; } +h2 { font-size: 2rem; margin: 1rem auto; text-align: center; } +div#dashboard { + width: 116rem; margin: 1rem auto; +} + +table, tbody, thead { + width: 100%; + margin: 0; + padding: 0; +} +tr { + cursor: pointer; +} +tr.workerlist td, tr.workerlist th { + font-size: 0.8rem; +} +tr:hover { + filter: brightness(0.85); +} +td, th { + padding: 0.3rem; + font-size: 1.4rem; + font-family: monospace; + text-align: center; + background-color: white; +} +th { + background-color: #eeeeee; + border: 1px solid black; +} + +tbody.totals { + font-weight: bold; + font-size: 1.5rem; +} + +ul { list-style-position: inside; } + +.critical { background-color: magenta; } +.bad { background-color: red; } +.high { background-color: orange; } +.average { background-color: limegreen; } +.low { background-color: aquamarine; } + +div.cols { + width: 100%; + margin: 1rem 0; + display: grid; + grid-template-columns: 1fr 1fr; +} +div.cols div { + padding: 1rem 0; +} + +.outdated { background-color: red; } diff --git a/python/ray/dashboard/res/main.js b/python/ray/dashboard/res/main.js new file mode 100644 index 000000000..b6d149266 --- /dev/null +++ b/python/ray/dashboard/res/main.js @@ -0,0 +1,265 @@ +let dashboard = new Vue({ + el: "#dashboard", + data: { + now: (new Date()).getTime() / 1000, + shown: {}, + error: "loading...", + last_update: undefined, + clients: undefined, + totals: undefined, + tasks: undefined, + ray_config: undefined, + }, + methods: { + updateNodeInfo: function() { + var self = this; + fetch("/api/node_info").then(function (resp) { + return resp.json(); + }).then(function(data) { + self.error = data.error; + if (data.error) { + self.clients = undefined; + self.tasks = undefined; + self.totals = undefined; + return; + } + self.last_update = data.timestamp; + self.clients = data.result.clients; + self.tasks = data.result.tasks; + self.totals = data.result.totals; + }).catch(function() { + self.error = "request error" + self.clients = undefined; + self.tasks = undefined; + self.totals = undefined; + }).finally(function() { + setTimeout(self.updateNodeInfo, 500); + }); + }, + updateRayConfig: function() { + var self = this; + fetch("/api/ray_config").then(function (resp) { + return resp.json(); + }).then(function(data) { + if (data.error) { + self.ray_config = undefined; + return; + } + self.ray_config = data.result; + }).catch(function() { + self.error = "request error" + self.ray_config = undefined; + }).finally(function() { + setTimeout(self.updateRayConfig, 10000); + }); + }, + updateAll: function() { + this.updateNodeInfo(); + this.updateRayConfig(); + }, + tickClock: function() { + this.now = (new Date()).getTime() / 1000; + } + }, + computed: { + outdated_cls: function(ts) { + if ((this.now - this.last_update) > 5) { + return "outdated"; + } + return ""; + }, + age: function(ts) { + return (this.now - this.last_update | 0) + "s"; + }, + }, + filters: { + si: function(x) { + let prefixes = ["B", "K", "M", "G", "T"] + let i = 0; + while (x > 1024) { + x /= 1024; + i += 1; + } + return `${x.toFixed(1)}${prefixes[i]}`; + }, + }, +}); + +Vue.component("worker-usage", { + props: ['cores', 'workers'], + computed: { + frac: function() { + return this.workers / this.cores; + }, + cls: function() { + if (this.frac > 3) { return "critical"; } + if (this.frac > 2) { return "bad"; } + if (this.frac > 1.5) { return "high"; } + if (this.frac > 1) { return "average"; } + return "low"; + }, + }, + template: ` + + {{workers}}/{{cores}} {{(frac*100).toFixed(0)}}% + + `, +}); + +Vue.component("node", { + props: [ + "now", + "hostname", + "boot_time", + "n_workers", + "n_cores", + "m_avail", + "m_total", + "d_avail", + "d_total", + "load", + "n_sent", + "n_recv", + "workers", + ], + data: function() { + return { + hidden: true, + }; + }, + computed: { + age: function() { + if (this.boot_time) { + let n = this.now; + if (this.boot_time > 2840140800) { + // Hack. It's a sum of multiple nodes. + n *= this.hostname; + } + let rs = n - this.boot_time | 0; + let s = rs % 60; + let m = ((rs / 60) % 60) | 0; + let h = (rs / 3600) | 0; + if (h) { + return `${h}h ${m}m ${s}s`; + } + if (m) { + return `${m}m ${s}s`; + } + return `${s}s`; + } + return "?" + }, + }, + methods: { + toggleHide: function() { + this.hidden = !this.hidden; + } + }, + filters: { + mib(x) { + return `${(x/(1024**2)).toFixed(3)}M`; + }, + hostnamefilter(x) { + if (isNaN(x)) { + return x; + } + return `Totals: ${x} nodes`; + }, + }, + template: ` + + + {{hostname | hostnamefilter}} + {{age}} + + + + + + {{n_sent | mib}}/s + {{n_recv | mib}}/s + + + + `, +}); + +Vue.component("usagebar", { + props: ['stat', 'avail', 'total'], // e.g. free -m avail + computed: { + used: function() { return this.total - this.avail; }, + frac: function() { return (this.total - this.avail)/this.total; }, + cls: function() { + if (this.frac > 0.95) { return "critical"; } + if (this.frac > 0.9) { return "bad"; } + if (this.frac > 0.8) { return "high"; } + if (this.frac > 0.5) { return "average"; } + return "low"; + }, + tcls: function() { + return `${this.stat} ${this.cls}`; + } + }, + filters: { + gib(x) { + return `${(x/(1024**3)).toFixed(1)}G`; + }, + pct(x) { + return `${(x*100).toFixed(0)}%`; + }, + }, + template: ` + + {{used | gib}}/{{total | gib}} {{ frac | pct }} + + `, +}); + +Vue.component("loadbar", { + props: ['cores', 'onem', 'fivem', 'fifteenm'], + computed: { + frac: function() { return this.onem/this.cores; }, + cls: function() { + if (this.frac > 3) { return "critical"; } + if (this.frac > 2.5) { return "bad"; } + if (this.frac > 2) { return "high"; } + if (this.frac > 1.5) { return "average"; } + return "low"; + }, + }, + template: ` + + {{onem.toFixed(2)}}, {{fivem.toFixed(2)}}, {{fifteenm.toFixed(2)}} + + `, +}); + +setInterval(dashboard.tickClock, 1000); +dashboard.updateAll(); diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index bc6afbeea..3b204470f 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -39,6 +39,7 @@ __all__ = [ FUNCTION_PREFIX = "RemoteFunction:" LOG_FILE_CHANNEL = "RAY_LOG_CHANNEL" +REPORTER_CHANNEL = "RAY_REPORTER" # xray heartbeats XRAY_HEARTBEAT_CHANNEL = str(TablePubsub.HEARTBEAT).encode("ascii") diff --git a/python/ray/node.py b/python/ray/node.py index cee9e6fd0..5830a727b 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -10,6 +10,7 @@ import json import os import logging import signal +import sys import tempfile import threading import time @@ -24,6 +25,8 @@ from ray.utils import try_to_create_directory # using logging.basicConfig in its entry/init points. logger = logging.getLogger(__name__) +PY3 = sys.version_info.major >= 3 + class Node(object): """An encapsulation of the Ray processes on a single node. @@ -81,6 +84,7 @@ class Node(object): self._plasma_store_socket_name = None self._raylet_socket_name = None self._webui_url = None + self._dashboard_url = None else: self._plasma_store_socket_name = ( ray_params.plasma_store_socket_name) @@ -284,6 +288,35 @@ class Node(object): process_info ] + def start_reporter(self): + """Start the reporter.""" + stdout_file, stderr_file = self.new_log_files("reporter", True) + process_info = ray.services.start_reporter( + self.redis_address, + stdout_file=stdout_file, + stderr_file=stderr_file, + redis_password=self._ray_params.redis_password) + assert ray_constants.PROCESS_TYPE_REPORTER not in self.all_processes + if process_info is not None: + self.all_processes[ray_constants.PROCESS_TYPE_REPORTER] = [ + process_info + ] + + def start_dashboard(self): + """Start the dashboard.""" + stdout_file, stderr_file = self.new_log_files("dashboard", True) + self._dashboard_url, process_info = ray.services.start_dashboard( + self.redis_address, + self._temp_dir, + stdout_file=stdout_file, + stderr_file=stderr_file, + redis_password=self._ray_params.redis_password) + assert ray_constants.PROCESS_TYPE_DASHBOARD not in self.all_processes + if process_info is not None: + self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD] = [ + process_info + ] + def start_ui(self): """Start the web UI.""" stdout_file, stderr_file = self.new_log_files("webui") @@ -408,14 +441,16 @@ class Node(object): self.start_redis() self.start_monitor() self.start_raylet_monitor() + if PY3 and self._ray_params.include_webui: + self.start_dashboard() self.start_plasma_store() self.start_raylet() + if PY3 and self._ray_params.include_webui: + self.start_reporter() if self._ray_params.include_log_monitor: self.start_log_monitor() - if self._ray_params.include_webui: - self.start_ui() def _kill_process_type(self, process_type, @@ -545,6 +580,26 @@ class Node(object): self._kill_process_type( ray_constants.PROCESS_TYPE_LOG_MONITOR, check_alive=check_alive) + def kill_reporter(self, check_alive=True): + """Kill the reporter. + + Args: + check_alive (bool): Raise an exception if the process was already + dead. + """ + self._kill_process_type( + ray_constants.PROCESS_TYPE_REPORTER, check_alive=check_alive) + + def kill_dashboard(self, check_alive=True): + """Kill the dashboard. + + Args: + check_alive (bool): Raise an exception if the process was already + dead. + """ + self._kill_process_type( + ray_constants.PROCESS_TYPE_DASHBOARD, check_alive=check_alive) + def kill_monitor(self, check_alive=True): """Kill the monitor. diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 98a43fb0a..45c56c815 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -54,6 +54,8 @@ INFEASIBLE_TASK_ERROR = "infeasible_task" REMOVED_NODE_ERROR = "node_removed" MONITOR_DIED_ERROR = "monitor_died" LOG_MONITOR_DIED_ERROR = "log_monitor_died" +REPORTER_DIED_ERROR = "reporter_died" +DASHBOARD_DIED_ERROR = "dashboard_died" # Abort autoscaling if more than this number of errors are encountered. This # is a safety feature to prevent e.g. runaway node launches. @@ -76,6 +78,9 @@ AUTOSCALER_UPDATE_INTERVAL_S = env_integer("AUTOSCALER_UPDATE_INTERVAL_S", 5) AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S", 30) +# The reporter will report its' statistics this often (milliseconds). +REPORTER_UPDATE_INTERVAL_MS = env_integer("REPORTER_UPDATE_INTERVAL_MS", 500) + # Max number of retries to AWS (default is 5, time increases exponentially) BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12) @@ -96,6 +101,8 @@ INFINITE_RECONSTRUCTION = 2**30 PROCESS_TYPE_MONITOR = "monitor" PROCESS_TYPE_RAYLET_MONITOR = "raylet_monitor" PROCESS_TYPE_LOG_MONITOR = "log_monitor" +PROCESS_TYPE_REPORTER = "reporter" +PROCESS_TYPE_DASHBOARD = "dashboard" PROCESS_TYPE_WORKER = "worker" PROCESS_TYPE_RAYLET = "raylet" PROCESS_TYPE_PLASMA_STORE = "plasma_store" diff --git a/python/ray/reporter.py b/python/ray/reporter.py new file mode 100644 index 000000000..29aa34d4c --- /dev/null +++ b/python/ray/reporter.py @@ -0,0 +1,228 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import logging +import json +import os +import traceback +import time +import datetime +from socket import AddressFamily + +try: + import psutil +except ModuleNotFoundError: + print("The reporter requires psutil to run.") + import sys + sys.exit(1) + +import ray.ray_constants as ray_constants +import ray.utils + +# Logger for this module. It should be configured at the entry point +# into the program using Ray. Ray provides a default configuration at +# entry/init points. +logger = logging.getLogger(__name__) + + +def recursive_asdict(o): + if isinstance(o, tuple) and hasattr(o, "_asdict"): + return recursive_asdict(o._asdict()) + + if isinstance(o, (tuple, list)): + L = [] + for k in o: + L.append(recursive_asdict(k)) + return L + + if isinstance(o, dict): + D = {k: recursive_asdict(v) for k, v in o.items()} + return D + + return o + + +def jsonify_asdict(o): + return json.dumps(recursive_asdict(o)) + + +def running_worker(s): + if "ray_worker" not in s: + return False + + if s == "ray_worker": + return False + + return True + + +def determine_ip_address(): + """Return the first IP address for an ethernet interface on the system.""" + addrs = [ + x.address for k, v in psutil.net_if_addrs().items() if k[0] == "e" + for x in v if x.family == AddressFamily.AF_INET + ] + return addrs[0] + + +def to_posix_time(dt): + return (dt - datetime.datetime(1970, 1, 1)).total_seconds() + + +class Reporter(object): + """A monitor process for monitoring Ray nodes. + + Attributes: + host (str): The hostname of this machine. Used to improve the log + messages published to Redis. + redis_client: A client used to communicate with the Redis server. + """ + + def __init__(self, redis_address, redis_password=None): + """Initialize the reporter object.""" + self.cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False)) + self.ip_addr = determine_ip_address() + self.hostname = os.uname().nodename + + _ = psutil.cpu_percent() # For initialization + + self.redis_key = "{}.{}".format(ray.gcs_utils.REPORTER_CHANNEL, + self.hostname) + self.redis_client = ray.services.create_redis_client( + redis_address, password=redis_password) + + self.network_stats_hist = [(0, (0.0, 0.0))] # time, (sent, recv) + + @staticmethod + def get_cpu_percent(): + return psutil.cpu_percent() + + @staticmethod + def get_boot_time(): + return psutil.boot_time() + + @staticmethod + def get_network_stats(): + ifaces = [ + v for k, v in psutil.net_io_counters(pernic=True).items() + if k[0] == "e" + ] + + sent = sum((iface.bytes_sent for iface in ifaces)) + recv = sum((iface.bytes_recv for iface in ifaces)) + return sent, recv + + @staticmethod + def get_mem_usage(): + vm = psutil.virtual_memory() + return vm.total, vm.available, vm.percent + + @staticmethod + def get_disk_usage(): + return {x: psutil.disk_usage(x) for x in ["/", "/tmp"]} + + @staticmethod + def get_workers(): + return [ + x.as_dict(attrs=[ + "pid", "create_time", "cpu_times", "name", "memory_full_info" + ]) for x in psutil.process_iter() if running_worker(x.name()) + ] + + def get_load_avg(self): + load = os.getloadavg() + per_cpu_load = tuple((round(x / self.cpu_counts[0], 2) for x in load)) + return load, per_cpu_load + + def get_all_stats(self): + now = to_posix_time(datetime.datetime.utcnow()) + network_stats = self.get_network_stats() + + self.network_stats_hist.append((now, network_stats)) + self.network_stats_hist = self.network_stats_hist[-7:] + then, prev_network_stats = self.network_stats_hist[0] + netstats = ((network_stats[0] - prev_network_stats[0]) / (now - then), + (network_stats[1] - prev_network_stats[1]) / (now - then)) + + return { + "now": now, + "hostname": self.hostname, + "ip": self.ip_addr, + "cpu": self.get_cpu_percent(), + "cpus": self.cpu_counts, + "mem": self.get_mem_usage(), + "workers": self.get_workers(), + "boot_time": self.get_boot_time(), + "load_avg": self.get_load_avg(), + "disk": self.get_disk_usage(), + "net": netstats, + } + + def perform_iteration(self): + """Get any changes to the log files and push updates to Redis.""" + stats = self.get_all_stats() + + self.redis_client.publish( + self.redis_key, + jsonify_asdict(stats), + ) + + def run(self): + """Run the reporter.""" + while True: + try: + self.perform_iteration() + except Exception: + traceback.print_exc() + pass + + time.sleep(ray_constants.REPORTER_UPDATE_INTERVAL_MS / 1000) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=("Parse Redis server for the " + "reporter to connect to.")) + parser.add_argument( + "--redis-address", + required=True, + type=str, + help="The address to use for Redis.") + parser.add_argument( + "--redis-password", + required=False, + type=str, + default=None, + help="the password to use for Redis") + parser.add_argument( + "--logging-level", + required=False, + type=str, + default=ray_constants.LOGGER_LEVEL, + choices=ray_constants.LOGGER_LEVEL_CHOICES, + help=ray_constants.LOGGER_LEVEL_HELP) + parser.add_argument( + "--logging-format", + required=False, + type=str, + default=ray_constants.LOGGER_FORMAT, + help=ray_constants.LOGGER_FORMAT_HELP) + args = parser.parse_args() + ray.utils.setup_logger(args.logging_level, args.logging_format) + + reporter = Reporter(args.redis_address, redis_password=args.redis_password) + + try: + reporter.run() + except Exception as e: + # Something went wrong, so push an error to all drivers. + redis_client = ray.services.create_redis_client( + args.redis_address, password=args.redis_password) + traceback_str = ray.utils.format_error_message(traceback.format_exc()) + message = ("The reporter on node {} failed with the following " + "error:\n{}".format(os.uname()[1], traceback_str)) + ray.utils.push_error_to_driver_through_redis( + redis_client, ray_constants.REPORTER_DIED_ERROR, message) + raise e diff --git a/python/ray/services.py b/python/ray/services.py index e529420b1..99c9141d0 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -827,6 +827,117 @@ def start_log_monitor(redis_address, return process_info +def start_reporter(redis_address, + stdout_file=None, + stderr_file=None, + redis_password=None): + """Start a reporter process. + + Args: + redis_address (str): The address of the Redis instance. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. + redis_password (str): The password of the redis server. + + Returns: + ProcessInfo for the process that was started. + """ + reporter_filepath = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "reporter.py") + command = [ + sys.executable, "-u", reporter_filepath, + "--redis-address={}".format(redis_address) + ] + if redis_password: + command += ["--redis-password", redis_password] + + try: + import psutil # noqa: F401 + except ImportError: + logger.warning("Failed to start the reporter. The reporter requires " + "'pip install psutil'.") + return None + + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_REPORTER, + stdout_file=stdout_file, + stderr_file=stderr_file) + return process_info + + +def start_dashboard(redis_address, + temp_dir, + stdout_file=None, + stderr_file=None, + redis_password=None): + """Start a dashboard process. + + Args: + redis_address (str): The address of the Redis instance. + temp_dir (str): The temporary directory used for log files and + information for this Ray session. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. + redis_password (str): The password of the redis server. + + Returns: + ProcessInfo for the process that was started. + """ + port = 8080 + while True: + try: + port_test_socket = socket.socket() + port_test_socket.bind(("127.0.0.1", port)) + port_test_socket.close() + break + except socket.error: + port += 1 + + token = ray.utils.decode(binascii.hexlify(os.urandom(24))) + + dashboard_filepath = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "dashboard/dashboard.py") + command = [ + sys.executable, + "-u", + dashboard_filepath, + "--redis-address={}".format(redis_address), + "--http-port={}".format(port), + "--token={}".format(token), + "--temp-dir={}".format(temp_dir), + ] + if redis_password: + command += ["--redis-password", redis_password] + + if sys.version_info <= (3, 0): + return None, None + try: + import aiohttp # noqa: F401 + import psutil # noqa: F401 + except ImportError: + logger.warning( + "Failed to start the dashboard. The dashboard requires Python 3 " + "as well as 'pip install aiohttp psutil'.") + return None, None + + process_info = start_ray_process( + command, + ray_constants.PROCESS_TYPE_DASHBOARD, + stdout_file=stdout_file, + stderr_file=stderr_file) + dashboard_url = "http://{}:{}/?token={}".format( + ray.services.get_node_ip_address(), port, token) + print("\n" + "=" * 70) + print("View the dashboard at {}".format(dashboard_url)) + print("=" * 70 + "\n") + return dashboard_url, process_info + + def start_ui(redis_address, notebook_name, stdout_file=None, stderr_file=None): """Start a UI process. diff --git a/python/setup.py b/python/setup.py index 1bdbb9021..a9496053b 100644 --- a/python/setup.py +++ b/python/setup.py @@ -66,6 +66,7 @@ optional_ray_files += ray_autoscaler_files extras = { "rllib": ["pyyaml", "gym[atari]", "opencv-python", "lz4", "scipy"], "debug": ["psutil", "setproctitle", "py-spy"], + "dashboard": ["psutil", "aiohttp"], } diff --git a/test/tempfile_test.py b/test/tempfile_test.py index 3016e8aa5..ead1d31d9 100644 --- a/test/tempfile_test.py +++ b/test/tempfile_test.py @@ -1,3 +1,7 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + import os import shutil import time @@ -67,15 +71,14 @@ def test_raylet_tempfiles(): ray.init(num_cpus=0) node = ray.worker._global_node top_levels = set(os.listdir(node.get_temp_dir_path())) - assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} + assert top_levels.issuperset({"sockets", "logs"}) log_files = set(os.listdir(node.get_logs_dir_path())) - assert log_files == { + assert log_files.issuperset({ "log_monitor.out", "log_monitor.err", "plasma_store.out", - "plasma_store.err", "webui.out", "webui.err", "monitor.out", - "monitor.err", "raylet_monitor.out", "raylet_monitor.err", - "redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err", - "raylet.out", "raylet.err" - } # with raylet logs + "plasma_store.err", "monitor.out", "monitor.err", "raylet_monitor.out", + "raylet_monitor.err", "redis-shard_0.out", "redis-shard_0.err", + "redis.out", "redis.err", "raylet.out", "raylet.err" + }) # with raylet logs socket_files = set(os.listdir(node.get_sockets_dir_path())) assert socket_files == {"plasma_store", "raylet"} ray.shutdown() @@ -83,15 +86,14 @@ def test_raylet_tempfiles(): ray.init(num_cpus=2) node = ray.worker._global_node top_levels = set(os.listdir(node.get_temp_dir_path())) - assert top_levels == {"ray_ui.ipynb", "sockets", "logs"} + assert top_levels.issuperset({"sockets", "logs"}) time.sleep(3) # wait workers to start log_files = set(os.listdir(node.get_logs_dir_path())) assert log_files.issuperset({ "log_monitor.out", "log_monitor.err", "plasma_store.out", - "plasma_store.err", "webui.out", "webui.err", "monitor.out", - "monitor.err", "raylet_monitor.out", "raylet_monitor.err", - "redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err", - "raylet.out", "raylet.err" + "plasma_store.err", "monitor.out", "monitor.err", "raylet_monitor.out", + "raylet_monitor.err", "redis-shard_0.out", "redis-shard_0.err", + "redis.out", "redis.err", "raylet.out", "raylet.err" }) # with raylet logs # Check numbers of worker log file.