Add a web dashboard for monitoring node resource usage (#4066)

This commit is contained in:
Daniel Edgecumbe 2019-02-21 08:10:04 +00:00 committed by Robert Nishihara
parent 3ac8fd7ee8
commit 2e30f7ba38
13 changed files with 1239 additions and 44 deletions

View file

@ -8,7 +8,7 @@ else
base_commit="$TRAVIS_BRANCH"
echo "Running clang-format against branch $base_commit, with hash $(git rev-parse $base_commit)"
fi
output="$(.travis/git-clang-format --binary clang-format-3.8 --commit $base_commit --diff --exclude '(.*thirdparty/|.*redismodule.h|.*webui*)')"
output="$(.travis/git-clang-format --binary clang-format-3.8 --commit $base_commit --diff --exclude '(.*thirdparty/|.*redismodule.h|.*.js)')"
if [ "$output" == "no modified files to format" ] || [ "$output" == "clang-format did not modify any files" ] ; then
echo "clang-format passed."
exit 0

View file

@ -127,30 +127,34 @@ def kill_node(config_file, yes, override_cluster_name):
confirm("This will kill a node in your cluster", yes)
provider = get_node_provider(config["provider"], config["cluster_name"])
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
node = random.choice(nodes)
logger.info("kill_node: Terminating worker {}".format(node))
try:
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
node = random.choice(nodes)
logger.info("kill_node: Terminating worker {}".format(node))
updater = NodeUpdaterThread(
node_id=node,
provider_config=config["provider"],
provider=provider,
auth_config=config["auth"],
cluster_name=config["cluster_name"],
file_mounts=config["file_mounts"],
initialization_commands=[],
setup_commands=[],
runtime_hash="")
updater = NodeUpdaterThread(
node_id=node,
provider_config=config["provider"],
provider=provider,
auth_config=config["auth"],
cluster_name=config["cluster_name"],
file_mounts=config["file_mounts"],
initialization_commands=[],
setup_commands=[],
runtime_hash="")
_exec(updater, "ray stop", False, False)
_exec(updater, "ray stop", False, False)
time.sleep(5)
time.sleep(5)
iip = provider.internal_ip(node)
if iip:
return iip
if config.get("provider", {}).get("use_internal_ips", False) is True:
node_ip = provider.internal_ip(node)
else:
node_ip = provider.external_ip(node)
finally:
provider.cleanup()
return provider.external_ip(node)
return node_ip
def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
@ -246,14 +250,18 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
# Refresh the node cache so we see the external ip if available
provider.nodes(head_node_tags)
if config.get("provider", {}).get("use_internal_ips", False) is True:
head_node_ip = provider.internal_ip(head_node)
else:
head_node_ip = provider.external_ip(head_node)
if updater.exitcode != 0:
logger.error("get_or_create_head_node: "
"Updating {} failed".format(
provider.external_ip(head_node)))
"Updating {} failed".format(head_node_ip))
sys.exit(1)
logger.info("get_or_create_head_node: "
"Head node up-to-date, IP address is: {}".format(
provider.external_ip(head_node)))
logger.info(
"get_or_create_head_node: "
"Head node up-to-date, IP address is: {}".format(head_node_ip))
monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*"
use_docker = bool(config["docker"]["container_name"])
@ -268,10 +276,11 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes,
quote(monitor_str), modifiers))
print("To open a console on the cluster:\n\n"
" ray attach {}{}\n".format(config_file, modifiers))
print("To ssh manually to the cluster, run:\n\n"
" ssh -i {} {}@{}\n".format(config["auth"]["ssh_private_key"],
config["auth"]["ssh_user"],
provider.external_ip(head_node)))
head_node_ip))
finally:
provider.cleanup()
@ -456,11 +465,14 @@ def get_head_node_ip(config_file, override_cluster_name):
provider = get_node_provider(config["provider"], config["cluster_name"])
try:
head_node = _get_head_node(config, config_file, override_cluster_name)
ip = provider.external_ip(head_node)
if config.get("provider", {}).get("use_internal_ips", False) is True:
head_node_ip = provider.internal_ip(head_node)
else:
head_node_ip = provider.external_ip(head_node)
finally:
provider.cleanup()
return ip
return head_node_ip
def get_worker_node_ips(config_file, override_cluster_name):
@ -469,9 +481,17 @@ def get_worker_node_ips(config_file, override_cluster_name):
config = yaml.load(open(config_file).read())
if override_cluster_name is not None:
config["cluster_name"] = override_cluster_name
provider = get_node_provider(config["provider"], config["cluster_name"])
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
return [provider.external_ip(node) for node in nodes]
try:
nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"})
if config.get("provider", {}).get("use_internal_ips", False) is True:
return [provider.internal_ip(node) for node in nodes]
else:
return [provider.external_ip(node) for node in nodes]
finally:
provider.cleanup()
def _get_head_node(config,

View file

@ -0,0 +1,348 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
try:
import aiohttp.web
except ModuleNotFoundError:
print("The reporter requires aiohttp to run.")
import sys
sys.exit(1)
import argparse
import datetime
import json
import logging
import os
import threading
import traceback
import yaml
from pathlib import Path
from collections import Counter
from operator import itemgetter
from typing import Dict
import ray.ray_constants as ray_constants
import ray.utils
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray provides a default configuration at
# entry/init points.
logger = logging.getLogger(__name__)
def to_unix_time(dt):
return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
class Dashboard(object):
"""A dashboard process for monitoring Ray nodes.
This dashboard is made up of a REST API which collates data published by
Reporter processes on nodes into a json structure, and a webserver
which polls said API for display purposes.
Attributes:
redis_client: A client used to communicate with the Redis server.
"""
def __init__(self,
redis_address,
http_port,
token,
temp_dir,
redis_password=None):
"""Initialize the dashboard object."""
self.ip = ray.services.get_node_ip_address()
self.port = http_port
self.token = token
self.temp_dir = temp_dir
self.node_stats = NodeStats(redis_address, redis_password)
self.app = aiohttp.web.Application(middlewares=[self.auth_middleware])
self.setup_routes()
@aiohttp.web.middleware
async def auth_middleware(self, req, handler):
def valid_token(req):
# If the cookie token is correct, accept that.
try:
if req.cookies["token"] == self.token:
return True
except KeyError:
pass
# If the query token is correct, accept that.
try:
if req.query["token"] == self.token:
return True
except KeyError:
pass
# Reject.
logger.warning("Dashboard: rejected an invalid token")
return False
# Check that the token is present, either in query or as cookie.
if not valid_token(req):
return aiohttp.web.Response(status=401, text="401 Unauthorized")
resp = await handler(req)
resp.cookies["token"] = self.token
return resp
def setup_routes(self):
def forbidden() -> aiohttp.web.Response:
return aiohttp.web.Response(status=403, text="403 Forbidden")
def get_forbidden(_) -> aiohttp.web.Response:
return forbidden()
async def get_index(req) -> aiohttp.web.Response:
return aiohttp.web.FileResponse(
os.path.join(
os.path.dirname(os.path.abspath(__file__)), "index.html"))
async def get_resource(req) -> aiohttp.web.Response:
try:
path = req.match_info["x"]
except KeyError:
return forbidden()
if path not in ["main.css", "main.js"]:
return forbidden()
return aiohttp.web.FileResponse(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"res/{}".format(path)))
async def json_response(result=None, error=None,
ts=None) -> aiohttp.web.Response:
if ts is None:
ts = datetime.datetime.utcnow()
return aiohttp.web.json_response({
"result": result,
"timestamp": to_unix_time(ts),
"error": error,
})
async def ray_config(_) -> aiohttp.web.Response:
try:
with open(
Path("~/ray_bootstrap_config.yaml").expanduser()) as f:
cfg = yaml.load(f)
except Exception:
return await json_response(error="No config")
D = {
"min_workers": cfg["min_workers"],
"max_workers": cfg["max_workers"],
"initial_workers": cfg["initial_workers"],
"idle_timeout_minutes": cfg["idle_timeout_minutes"],
}
try:
D["head_type"] = cfg["head_node"]["InstanceType"]
except KeyError:
D["head_type"] = "unknown"
try:
D["worker_type"] = cfg["worker_nodes"]["InstanceType"]
except KeyError:
D["worker_type"] = "unknown"
return await json_response(result=D)
async def node_info(req) -> aiohttp.web.Response:
now = datetime.datetime.utcnow()
D = self.node_stats.get_node_stats()
return await json_response(result=D, ts=now)
self.app.router.add_get("/", get_index)
self.app.router.add_get("/index.html", get_index)
self.app.router.add_get("/index.htm", get_index)
self.app.router.add_get("/res/{x}", get_resource)
self.app.router.add_get("/api/node_info", node_info)
self.app.router.add_get("/api/super_client_table", node_info)
self.app.router.add_get("/api/ray_config", ray_config)
self.app.router.add_get("/{_}", get_forbidden)
def log_dashboard_url(self):
url = "http://{}:{}?token={}".format(self.ip, self.port, self.token)
with open(os.path.join(self.temp_dir, "dashboard_url"), "w") as f:
f.write(url)
logger.info("Dashboard running on {}".format(url))
def run(self):
self.log_dashboard_url()
self.node_stats.start()
aiohttp.web.run_app(self.app, host=self.ip, port=self.port)
class NodeStats(threading.Thread):
def __init__(self, redis_address, redis_password=None):
self.redis_key = "{}.*".format(ray.gcs_utils.REPORTER_CHANNEL)
self.redis_client = ray.services.create_redis_client(
redis_address, password=redis_password)
self._node_stats = {}
self._node_stats_lock = threading.Lock()
super().__init__()
def calculate_totals(self) -> Dict:
total_boot_time = 0
total_cpus = 0
total_workers = 0
total_load = [0.0, 0.0, 0.0]
total_storage_avail = 0
total_storage_total = 0
total_ram_avail = 0
total_ram_total = 0
total_sent = 0
total_recv = 0
for v in self._node_stats.values():
total_boot_time += v["boot_time"]
total_cpus += v["cpus"][0]
total_workers += len(v["workers"])
total_load[0] += v["load_avg"][0][0]
total_load[1] += v["load_avg"][0][1]
total_load[2] += v["load_avg"][0][2]
total_storage_avail += v["disk"]["/"]["free"]
total_storage_total += v["disk"]["/"]["total"]
total_ram_avail += v["mem"][1]
total_ram_total += v["mem"][0]
total_sent += v["net"][0]
total_recv += v["net"][1]
return {
"boot_time": total_boot_time,
"n_workers": total_workers,
"n_cores": total_cpus,
"m_avail": total_ram_avail,
"m_total": total_ram_total,
"d_avail": total_storage_avail,
"d_total": total_storage_total,
"load": total_load,
"n_sent": total_sent,
"n_recv": total_recv,
}
def calculate_tasks(self) -> Counter:
return Counter(
(x["name"]
for y in (v["workers"] for v in self._node_stats.values())
for x in y))
def purge_outdated_stats(self):
def current(then, now):
if (now - then) > 5:
return False
return True
now = to_unix_time(datetime.datetime.utcnow())
self._node_stats = {
k: v
for k, v in self._node_stats.items() if current(v["now"], now)
}
def get_node_stats(self) -> Dict:
with self._node_stats_lock:
self.purge_outdated_stats()
node_stats = sorted(
(v for v in self._node_stats.values()),
key=itemgetter("boot_time"))
return {
"totals": self.calculate_totals(),
"tasks": self.calculate_tasks(),
"clients": node_stats,
}
def run(self):
p = self.redis_client.pubsub(ignore_subscribe_messages=True)
p.psubscribe(self.redis_key)
logger.info("NodeStats: subscribed to {}".format(self.redis_key))
for x in p.listen():
try:
D = json.loads(x["data"])
with self._node_stats_lock:
self._node_stats[D["hostname"]] = D
except Exception:
logger.exception(traceback.format_exc())
continue
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=("Parse Redis server for the "
"dashboard to connect to."))
parser.add_argument(
"--http-port",
required=True,
type=int,
help="The port to use for the HTTP server.")
parser.add_argument(
"--token",
required=True,
type=str,
help="The token to use for the HTTP server.")
parser.add_argument(
"--redis-address",
required=True,
type=str,
help="The address to use for Redis.")
parser.add_argument(
"--redis-password",
required=False,
type=str,
default=None,
help="the password to use for Redis")
parser.add_argument(
"--logging-level",
required=False,
type=str,
default=ray_constants.LOGGER_LEVEL,
choices=ray_constants.LOGGER_LEVEL_CHOICES,
help=ray_constants.LOGGER_LEVEL_HELP)
parser.add_argument(
"--logging-format",
required=False,
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
parser.add_argument(
"--temp-dir",
required=False,
type=str,
default=None,
help="Specify the path of the temporary directory use by Ray process.")
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)
dashboard = Dashboard(
args.redis_address,
args.http_port,
args.token,
args.temp_dir,
redis_password=args.redis_password,
)
try:
dashboard.run()
except Exception as e:
# Something went wrong, so push an error to all drivers.
redis_client = ray.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = ("The dashboard on node {} failed with the following "
"error:\n{}".format(os.uname()[1], traceback_str))
ray.utils.push_error_to_driver_through_redis(
redis_client, ray_constants.DASHBOARD_DIED_ERROR, message)
raise e

View file

@ -0,0 +1,99 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>ray dashboard</title>
<meta name="description" content="ray dashboard"</meta>
<link rel="stylesheet" href="res/main.css">
<meta name="referrer" content="same-origin">
<!--
<script src="https://cdnjs.cloudflare.com/ajax/libs/vue/2.6.4/vue.min.js"
integrity="sha384-rldcjlIPDkF0mEihgyEOIFhd2NW5YL717okjKC5YF2LrqoiBeMk4tpcgbRrlDHj5"
crossorigin="anonymous"></script>
-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/vue/2.6.4/vue.js"
integrity="sha384-94H2I+MU5hfBDUinQG+/Y9JbHALTPlQmHO26R3Jv60MT6WWkOD5hlYNyT9ciiLsR"
crossorigin="anonymous"></script>
</head>
<body>
<div id="dashboard">
<table v-if="clients && !error" class="ray_node_grid">
<thead>
<tr>
<th class="hostname">Hostname</th>
<th class="uptime">Uptime</th>
<th class="workers">Workers</th>
<th class="mem">RAM</th>
<th class="storage">Disk</th>
<th class="load">Load (1m, 5m, 15m)</th>
<th class="netsent">Sent (M/s)</th>
<th class="netrecv">Recv (M/s)</th>
</tr>
</thead>
<tbody is="node"
v-for="v in clients"
:key="v.hostname"
:now="now"
:hostname="v.hostname"
:boot_time="v.boot_time"
:n_workers="v.workers.length"
:n_cores="v.cpus[0]"
:m_avail="v.mem[1]"
:m_total="v.mem[0]"
:d_avail="v.disk['/'].free"
:d_total="v.disk['/'].total"
:load="v.load_avg[0]"
:n_sent="v.net[0]"
:n_recv="v.net[1]"
:workers="v.workers"
></tbody>
<tbody is="node"
class="totals"
v-if="totals"
:now="now"
:hostname="Object.keys(clients).length"
:boot_time="totals.boot_time"
:n_workers="totals.n_workers"
:n_cores="totals.n_cores"
:m_avail="totals.m_avail"
:m_total="totals.m_total"
:d_avail="totals.d_avail"
:d_total="totals.d_total"
:load="totals.load"
:n_sent="totals.n_sent"
:n_recv="totals.n_recv"
:workers="[]"
></tbody>
</table>
<template v-if="error">
<h2>{{error}}</h2>
</template>
<h2 v-if="last_update" :class="outdated_cls">Last updated {{age}} ago</h2>
<div class="cols">
<div class="tasks">
<template v-if="tasks && !error">
<h2>tasks</h2>
<ul>
<li v-for="v, k, _ in tasks">{{k}}: {{v}}</li>
</ul>
</template>
</div>
<div class="ray_config">
<template v-if="ray_config">
<h2>ray config</h2>
<pre>{{ray_config}}</pre>
</template>
</div>
</div>
</div>
</body>
<script src="res/main.js"></script>

View file

@ -0,0 +1,58 @@
* { font-family: monospace; margin: 0; padding: 0; }
h1, h2 { text-align: center; margin: 1rem 0; }
h1 { font-size: 3rem; margin: 0.5rem auto; text-align: center; }
h2 { font-size: 2rem; margin: 1rem auto; text-align: center; }
div#dashboard {
width: 116rem; margin: 1rem auto;
}
table, tbody, thead {
width: 100%;
margin: 0;
padding: 0;
}
tr {
cursor: pointer;
}
tr.workerlist td, tr.workerlist th {
font-size: 0.8rem;
}
tr:hover {
filter: brightness(0.85);
}
td, th {
padding: 0.3rem;
font-size: 1.4rem;
font-family: monospace;
text-align: center;
background-color: white;
}
th {
background-color: #eeeeee;
border: 1px solid black;
}
tbody.totals {
font-weight: bold;
font-size: 1.5rem;
}
ul { list-style-position: inside; }
.critical { background-color: magenta; }
.bad { background-color: red; }
.high { background-color: orange; }
.average { background-color: limegreen; }
.low { background-color: aquamarine; }
div.cols {
width: 100%;
margin: 1rem 0;
display: grid;
grid-template-columns: 1fr 1fr;
}
div.cols div {
padding: 1rem 0;
}
.outdated { background-color: red; }

View file

@ -0,0 +1,265 @@
let dashboard = new Vue({
el: "#dashboard",
data: {
now: (new Date()).getTime() / 1000,
shown: {},
error: "loading...",
last_update: undefined,
clients: undefined,
totals: undefined,
tasks: undefined,
ray_config: undefined,
},
methods: {
updateNodeInfo: function() {
var self = this;
fetch("/api/node_info").then(function (resp) {
return resp.json();
}).then(function(data) {
self.error = data.error;
if (data.error) {
self.clients = undefined;
self.tasks = undefined;
self.totals = undefined;
return;
}
self.last_update = data.timestamp;
self.clients = data.result.clients;
self.tasks = data.result.tasks;
self.totals = data.result.totals;
}).catch(function() {
self.error = "request error"
self.clients = undefined;
self.tasks = undefined;
self.totals = undefined;
}).finally(function() {
setTimeout(self.updateNodeInfo, 500);
});
},
updateRayConfig: function() {
var self = this;
fetch("/api/ray_config").then(function (resp) {
return resp.json();
}).then(function(data) {
if (data.error) {
self.ray_config = undefined;
return;
}
self.ray_config = data.result;
}).catch(function() {
self.error = "request error"
self.ray_config = undefined;
}).finally(function() {
setTimeout(self.updateRayConfig, 10000);
});
},
updateAll: function() {
this.updateNodeInfo();
this.updateRayConfig();
},
tickClock: function() {
this.now = (new Date()).getTime() / 1000;
}
},
computed: {
outdated_cls: function(ts) {
if ((this.now - this.last_update) > 5) {
return "outdated";
}
return "";
},
age: function(ts) {
return (this.now - this.last_update | 0) + "s";
},
},
filters: {
si: function(x) {
let prefixes = ["B", "K", "M", "G", "T"]
let i = 0;
while (x > 1024) {
x /= 1024;
i += 1;
}
return `${x.toFixed(1)}${prefixes[i]}`;
},
},
});
Vue.component("worker-usage", {
props: ['cores', 'workers'],
computed: {
frac: function() {
return this.workers / this.cores;
},
cls: function() {
if (this.frac > 3) { return "critical"; }
if (this.frac > 2) { return "bad"; }
if (this.frac > 1.5) { return "high"; }
if (this.frac > 1) { return "average"; }
return "low";
},
},
template: `
<td class="workers" :class="cls">
{{workers}}/{{cores}} {{(frac*100).toFixed(0)}}%
</td>
`,
});
Vue.component("node", {
props: [
"now",
"hostname",
"boot_time",
"n_workers",
"n_cores",
"m_avail",
"m_total",
"d_avail",
"d_total",
"load",
"n_sent",
"n_recv",
"workers",
],
data: function() {
return {
hidden: true,
};
},
computed: {
age: function() {
if (this.boot_time) {
let n = this.now;
if (this.boot_time > 2840140800) {
// Hack. It's a sum of multiple nodes.
n *= this.hostname;
}
let rs = n - this.boot_time | 0;
let s = rs % 60;
let m = ((rs / 60) % 60) | 0;
let h = (rs / 3600) | 0;
if (h) {
return `${h}h ${m}m ${s}s`;
}
if (m) {
return `${m}m ${s}s`;
}
return `${s}s`;
}
return "?"
},
},
methods: {
toggleHide: function() {
this.hidden = !this.hidden;
}
},
filters: {
mib(x) {
return `${(x/(1024**2)).toFixed(3)}M`;
},
hostnamefilter(x) {
if (isNaN(x)) {
return x;
}
return `Totals: ${x} nodes`;
},
},
template: `
<tbody v-on:click="toggleHide()">
<tr class="ray_node">
<td class="hostname">{{hostname | hostnamefilter}}</td>
<td class="uptime">{{age}}</td>
<worker-usage
:workers="n_workers"
:cores="n_cores"
></worker-usage>
<usagebar
:avail="m_avail" :total="m_total"
stat="mem"
></usagebar>
<usagebar
:avail="d_avail" :total="d_total"
stat="storage"
></usagebar>
<loadbar
:cores="n_cores"
:onem="load[0]"
:fivem="load[1]"
:fifteenm="load[2]"
>
</loadbar>
<td class="netsent">{{n_sent | mib}}/s</td>
<td class="netrecv">{{n_recv | mib}}/s</td>
</tr>
<template v-if="!hidden && workers">
<tr class="workerlist">
<th>time</th>
<th>name</th>
<th>pid</th>
<th>uss</th>
</tr>
<tr class="workerlist" v-for="x in workers">
<td>user: {{x.cpu_times.user}}s</td>
<td>{{x.name}}</td>
<td>{{x.pid}}</td>
<td>{{(x.memory_full_info.uss/1048576).toFixed(0)}}MiB</td>
</tr>
</template>
</tbody>
`,
});
Vue.component("usagebar", {
props: ['stat', 'avail', 'total'], // e.g. free -m avail
computed: {
used: function() { return this.total - this.avail; },
frac: function() { return (this.total - this.avail)/this.total; },
cls: function() {
if (this.frac > 0.95) { return "critical"; }
if (this.frac > 0.9) { return "bad"; }
if (this.frac > 0.8) { return "high"; }
if (this.frac > 0.5) { return "average"; }
return "low";
},
tcls: function() {
return `${this.stat} ${this.cls}`;
}
},
filters: {
gib(x) {
return `${(x/(1024**3)).toFixed(1)}G`;
},
pct(x) {
return `${(x*100).toFixed(0)}%`;
},
},
template: `
<td class="usagebar" :class="tcls">
{{used | gib}}/{{total | gib}} {{ frac | pct }}
</td>
`,
});
Vue.component("loadbar", {
props: ['cores', 'onem', 'fivem', 'fifteenm'],
computed: {
frac: function() { return this.onem/this.cores; },
cls: function() {
if (this.frac > 3) { return "critical"; }
if (this.frac > 2.5) { return "bad"; }
if (this.frac > 2) { return "high"; }
if (this.frac > 1.5) { return "average"; }
return "low";
},
},
template: `
<td class="load loadbar" :class="cls">
{{onem.toFixed(2)}}, {{fivem.toFixed(2)}}, {{fifteenm.toFixed(2)}}
</td>
`,
});
setInterval(dashboard.tickClock, 1000);
dashboard.updateAll();

View file

@ -39,6 +39,7 @@ __all__ = [
FUNCTION_PREFIX = "RemoteFunction:"
LOG_FILE_CHANNEL = "RAY_LOG_CHANNEL"
REPORTER_CHANNEL = "RAY_REPORTER"
# xray heartbeats
XRAY_HEARTBEAT_CHANNEL = str(TablePubsub.HEARTBEAT).encode("ascii")

View file

@ -10,6 +10,7 @@ import json
import os
import logging
import signal
import sys
import tempfile
import threading
import time
@ -24,6 +25,8 @@ from ray.utils import try_to_create_directory
# using logging.basicConfig in its entry/init points.
logger = logging.getLogger(__name__)
PY3 = sys.version_info.major >= 3
class Node(object):
"""An encapsulation of the Ray processes on a single node.
@ -81,6 +84,7 @@ class Node(object):
self._plasma_store_socket_name = None
self._raylet_socket_name = None
self._webui_url = None
self._dashboard_url = None
else:
self._plasma_store_socket_name = (
ray_params.plasma_store_socket_name)
@ -284,6 +288,35 @@ class Node(object):
process_info
]
def start_reporter(self):
"""Start the reporter."""
stdout_file, stderr_file = self.new_log_files("reporter", True)
process_info = ray.services.start_reporter(
self.redis_address,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password)
assert ray_constants.PROCESS_TYPE_REPORTER not in self.all_processes
if process_info is not None:
self.all_processes[ray_constants.PROCESS_TYPE_REPORTER] = [
process_info
]
def start_dashboard(self):
"""Start the dashboard."""
stdout_file, stderr_file = self.new_log_files("dashboard", True)
self._dashboard_url, process_info = ray.services.start_dashboard(
self.redis_address,
self._temp_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password)
assert ray_constants.PROCESS_TYPE_DASHBOARD not in self.all_processes
if process_info is not None:
self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD] = [
process_info
]
def start_ui(self):
"""Start the web UI."""
stdout_file, stderr_file = self.new_log_files("webui")
@ -408,14 +441,16 @@ class Node(object):
self.start_redis()
self.start_monitor()
self.start_raylet_monitor()
if PY3 and self._ray_params.include_webui:
self.start_dashboard()
self.start_plasma_store()
self.start_raylet()
if PY3 and self._ray_params.include_webui:
self.start_reporter()
if self._ray_params.include_log_monitor:
self.start_log_monitor()
if self._ray_params.include_webui:
self.start_ui()
def _kill_process_type(self,
process_type,
@ -545,6 +580,26 @@ class Node(object):
self._kill_process_type(
ray_constants.PROCESS_TYPE_LOG_MONITOR, check_alive=check_alive)
def kill_reporter(self, check_alive=True):
"""Kill the reporter.
Args:
check_alive (bool): Raise an exception if the process was already
dead.
"""
self._kill_process_type(
ray_constants.PROCESS_TYPE_REPORTER, check_alive=check_alive)
def kill_dashboard(self, check_alive=True):
"""Kill the dashboard.
Args:
check_alive (bool): Raise an exception if the process was already
dead.
"""
self._kill_process_type(
ray_constants.PROCESS_TYPE_DASHBOARD, check_alive=check_alive)
def kill_monitor(self, check_alive=True):
"""Kill the monitor.

View file

@ -54,6 +54,8 @@ INFEASIBLE_TASK_ERROR = "infeasible_task"
REMOVED_NODE_ERROR = "node_removed"
MONITOR_DIED_ERROR = "monitor_died"
LOG_MONITOR_DIED_ERROR = "log_monitor_died"
REPORTER_DIED_ERROR = "reporter_died"
DASHBOARD_DIED_ERROR = "dashboard_died"
# Abort autoscaling if more than this number of errors are encountered. This
# is a safety feature to prevent e.g. runaway node launches.
@ -76,6 +78,9 @@ AUTOSCALER_UPDATE_INTERVAL_S = env_integer("AUTOSCALER_UPDATE_INTERVAL_S", 5)
AUTOSCALER_HEARTBEAT_TIMEOUT_S = env_integer("AUTOSCALER_HEARTBEAT_TIMEOUT_S",
30)
# The reporter will report its' statistics this often (milliseconds).
REPORTER_UPDATE_INTERVAL_MS = env_integer("REPORTER_UPDATE_INTERVAL_MS", 500)
# Max number of retries to AWS (default is 5, time increases exponentially)
BOTO_MAX_RETRIES = env_integer("BOTO_MAX_RETRIES", 12)
@ -96,6 +101,8 @@ INFINITE_RECONSTRUCTION = 2**30
PROCESS_TYPE_MONITOR = "monitor"
PROCESS_TYPE_RAYLET_MONITOR = "raylet_monitor"
PROCESS_TYPE_LOG_MONITOR = "log_monitor"
PROCESS_TYPE_REPORTER = "reporter"
PROCESS_TYPE_DASHBOARD = "dashboard"
PROCESS_TYPE_WORKER = "worker"
PROCESS_TYPE_RAYLET = "raylet"
PROCESS_TYPE_PLASMA_STORE = "plasma_store"

228
python/ray/reporter.py Normal file
View file

@ -0,0 +1,228 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import json
import os
import traceback
import time
import datetime
from socket import AddressFamily
try:
import psutil
except ModuleNotFoundError:
print("The reporter requires psutil to run.")
import sys
sys.exit(1)
import ray.ray_constants as ray_constants
import ray.utils
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray provides a default configuration at
# entry/init points.
logger = logging.getLogger(__name__)
def recursive_asdict(o):
if isinstance(o, tuple) and hasattr(o, "_asdict"):
return recursive_asdict(o._asdict())
if isinstance(o, (tuple, list)):
L = []
for k in o:
L.append(recursive_asdict(k))
return L
if isinstance(o, dict):
D = {k: recursive_asdict(v) for k, v in o.items()}
return D
return o
def jsonify_asdict(o):
return json.dumps(recursive_asdict(o))
def running_worker(s):
if "ray_worker" not in s:
return False
if s == "ray_worker":
return False
return True
def determine_ip_address():
"""Return the first IP address for an ethernet interface on the system."""
addrs = [
x.address for k, v in psutil.net_if_addrs().items() if k[0] == "e"
for x in v if x.family == AddressFamily.AF_INET
]
return addrs[0]
def to_posix_time(dt):
return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
class Reporter(object):
"""A monitor process for monitoring Ray nodes.
Attributes:
host (str): The hostname of this machine. Used to improve the log
messages published to Redis.
redis_client: A client used to communicate with the Redis server.
"""
def __init__(self, redis_address, redis_password=None):
"""Initialize the reporter object."""
self.cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False))
self.ip_addr = determine_ip_address()
self.hostname = os.uname().nodename
_ = psutil.cpu_percent() # For initialization
self.redis_key = "{}.{}".format(ray.gcs_utils.REPORTER_CHANNEL,
self.hostname)
self.redis_client = ray.services.create_redis_client(
redis_address, password=redis_password)
self.network_stats_hist = [(0, (0.0, 0.0))] # time, (sent, recv)
@staticmethod
def get_cpu_percent():
return psutil.cpu_percent()
@staticmethod
def get_boot_time():
return psutil.boot_time()
@staticmethod
def get_network_stats():
ifaces = [
v for k, v in psutil.net_io_counters(pernic=True).items()
if k[0] == "e"
]
sent = sum((iface.bytes_sent for iface in ifaces))
recv = sum((iface.bytes_recv for iface in ifaces))
return sent, recv
@staticmethod
def get_mem_usage():
vm = psutil.virtual_memory()
return vm.total, vm.available, vm.percent
@staticmethod
def get_disk_usage():
return {x: psutil.disk_usage(x) for x in ["/", "/tmp"]}
@staticmethod
def get_workers():
return [
x.as_dict(attrs=[
"pid", "create_time", "cpu_times", "name", "memory_full_info"
]) for x in psutil.process_iter() if running_worker(x.name())
]
def get_load_avg(self):
load = os.getloadavg()
per_cpu_load = tuple((round(x / self.cpu_counts[0], 2) for x in load))
return load, per_cpu_load
def get_all_stats(self):
now = to_posix_time(datetime.datetime.utcnow())
network_stats = self.get_network_stats()
self.network_stats_hist.append((now, network_stats))
self.network_stats_hist = self.network_stats_hist[-7:]
then, prev_network_stats = self.network_stats_hist[0]
netstats = ((network_stats[0] - prev_network_stats[0]) / (now - then),
(network_stats[1] - prev_network_stats[1]) / (now - then))
return {
"now": now,
"hostname": self.hostname,
"ip": self.ip_addr,
"cpu": self.get_cpu_percent(),
"cpus": self.cpu_counts,
"mem": self.get_mem_usage(),
"workers": self.get_workers(),
"boot_time": self.get_boot_time(),
"load_avg": self.get_load_avg(),
"disk": self.get_disk_usage(),
"net": netstats,
}
def perform_iteration(self):
"""Get any changes to the log files and push updates to Redis."""
stats = self.get_all_stats()
self.redis_client.publish(
self.redis_key,
jsonify_asdict(stats),
)
def run(self):
"""Run the reporter."""
while True:
try:
self.perform_iteration()
except Exception:
traceback.print_exc()
pass
time.sleep(ray_constants.REPORTER_UPDATE_INTERVAL_MS / 1000)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=("Parse Redis server for the "
"reporter to connect to."))
parser.add_argument(
"--redis-address",
required=True,
type=str,
help="The address to use for Redis.")
parser.add_argument(
"--redis-password",
required=False,
type=str,
default=None,
help="the password to use for Redis")
parser.add_argument(
"--logging-level",
required=False,
type=str,
default=ray_constants.LOGGER_LEVEL,
choices=ray_constants.LOGGER_LEVEL_CHOICES,
help=ray_constants.LOGGER_LEVEL_HELP)
parser.add_argument(
"--logging-format",
required=False,
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP)
args = parser.parse_args()
ray.utils.setup_logger(args.logging_level, args.logging_format)
reporter = Reporter(args.redis_address, redis_password=args.redis_password)
try:
reporter.run()
except Exception as e:
# Something went wrong, so push an error to all drivers.
redis_client = ray.services.create_redis_client(
args.redis_address, password=args.redis_password)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = ("The reporter on node {} failed with the following "
"error:\n{}".format(os.uname()[1], traceback_str))
ray.utils.push_error_to_driver_through_redis(
redis_client, ray_constants.REPORTER_DIED_ERROR, message)
raise e

View file

@ -827,6 +827,117 @@ def start_log_monitor(redis_address,
return process_info
def start_reporter(redis_address,
stdout_file=None,
stderr_file=None,
redis_password=None):
"""Start a reporter process.
Args:
redis_address (str): The address of the Redis instance.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
redis_password (str): The password of the redis server.
Returns:
ProcessInfo for the process that was started.
"""
reporter_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "reporter.py")
command = [
sys.executable, "-u", reporter_filepath,
"--redis-address={}".format(redis_address)
]
if redis_password:
command += ["--redis-password", redis_password]
try:
import psutil # noqa: F401
except ImportError:
logger.warning("Failed to start the reporter. The reporter requires "
"'pip install psutil'.")
return None
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_REPORTER,
stdout_file=stdout_file,
stderr_file=stderr_file)
return process_info
def start_dashboard(redis_address,
temp_dir,
stdout_file=None,
stderr_file=None,
redis_password=None):
"""Start a dashboard process.
Args:
redis_address (str): The address of the Redis instance.
temp_dir (str): The temporary directory used for log files and
information for this Ray session.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
redis_password (str): The password of the redis server.
Returns:
ProcessInfo for the process that was started.
"""
port = 8080
while True:
try:
port_test_socket = socket.socket()
port_test_socket.bind(("127.0.0.1", port))
port_test_socket.close()
break
except socket.error:
port += 1
token = ray.utils.decode(binascii.hexlify(os.urandom(24)))
dashboard_filepath = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "dashboard/dashboard.py")
command = [
sys.executable,
"-u",
dashboard_filepath,
"--redis-address={}".format(redis_address),
"--http-port={}".format(port),
"--token={}".format(token),
"--temp-dir={}".format(temp_dir),
]
if redis_password:
command += ["--redis-password", redis_password]
if sys.version_info <= (3, 0):
return None, None
try:
import aiohttp # noqa: F401
import psutil # noqa: F401
except ImportError:
logger.warning(
"Failed to start the dashboard. The dashboard requires Python 3 "
"as well as 'pip install aiohttp psutil'.")
return None, None
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_DASHBOARD,
stdout_file=stdout_file,
stderr_file=stderr_file)
dashboard_url = "http://{}:{}/?token={}".format(
ray.services.get_node_ip_address(), port, token)
print("\n" + "=" * 70)
print("View the dashboard at {}".format(dashboard_url))
print("=" * 70 + "\n")
return dashboard_url, process_info
def start_ui(redis_address, notebook_name, stdout_file=None, stderr_file=None):
"""Start a UI process.

View file

@ -66,6 +66,7 @@ optional_ray_files += ray_autoscaler_files
extras = {
"rllib": ["pyyaml", "gym[atari]", "opencv-python", "lz4", "scipy"],
"debug": ["psutil", "setproctitle", "py-spy"],
"dashboard": ["psutil", "aiohttp"],
}

View file

@ -1,3 +1,7 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import shutil
import time
@ -67,15 +71,14 @@ def test_raylet_tempfiles():
ray.init(num_cpus=0)
node = ray.worker._global_node
top_levels = set(os.listdir(node.get_temp_dir_path()))
assert top_levels == {"ray_ui.ipynb", "sockets", "logs"}
assert top_levels.issuperset({"sockets", "logs"})
log_files = set(os.listdir(node.get_logs_dir_path()))
assert log_files == {
assert log_files.issuperset({
"log_monitor.out", "log_monitor.err", "plasma_store.out",
"plasma_store.err", "webui.out", "webui.err", "monitor.out",
"monitor.err", "raylet_monitor.out", "raylet_monitor.err",
"redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err",
"raylet.out", "raylet.err"
} # with raylet logs
"plasma_store.err", "monitor.out", "monitor.err", "raylet_monitor.out",
"raylet_monitor.err", "redis-shard_0.out", "redis-shard_0.err",
"redis.out", "redis.err", "raylet.out", "raylet.err"
}) # with raylet logs
socket_files = set(os.listdir(node.get_sockets_dir_path()))
assert socket_files == {"plasma_store", "raylet"}
ray.shutdown()
@ -83,15 +86,14 @@ def test_raylet_tempfiles():
ray.init(num_cpus=2)
node = ray.worker._global_node
top_levels = set(os.listdir(node.get_temp_dir_path()))
assert top_levels == {"ray_ui.ipynb", "sockets", "logs"}
assert top_levels.issuperset({"sockets", "logs"})
time.sleep(3) # wait workers to start
log_files = set(os.listdir(node.get_logs_dir_path()))
assert log_files.issuperset({
"log_monitor.out", "log_monitor.err", "plasma_store.out",
"plasma_store.err", "webui.out", "webui.err", "monitor.out",
"monitor.err", "raylet_monitor.out", "raylet_monitor.err",
"redis-shard_0.out", "redis-shard_0.err", "redis.out", "redis.err",
"raylet.out", "raylet.err"
"plasma_store.err", "monitor.out", "monitor.err", "raylet_monitor.out",
"raylet_monitor.err", "redis-shard_0.out", "redis-shard_0.err",
"redis.out", "redis.err", "raylet.out", "raylet.err"
}) # with raylet logs
# Check numbers of worker log file.