mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[dashboard] Only show workers from the correct cluster (#8434)
This commit is contained in:
parent
13231ba63b
commit
0fadc11437
10 changed files with 143 additions and 81 deletions
|
@ -31,7 +31,10 @@
|
|||
"start": "react-scripts start",
|
||||
"build": "react-scripts build",
|
||||
"test": "react-scripts test",
|
||||
"eject": "react-scripts eject"
|
||||
"eject": "react-scripts eject",
|
||||
"lint": "npm run eslint && npm run prettier",
|
||||
"prettier": "./node_modules/.bin/prettier -c src/",
|
||||
"eslint": "./node_modules/.bin/eslint \"src/**\""
|
||||
},
|
||||
"eslintConfig": {
|
||||
"extends": [
|
||||
|
|
|
@ -55,6 +55,25 @@ export type RayConfigResponse = {
|
|||
|
||||
export const getRayConfig = () => get<RayConfigResponse>("/api/ray_config", {});
|
||||
|
||||
export type NodeInfoResponseWorker = {
|
||||
pid: number;
|
||||
create_time: number;
|
||||
cmdline: string[];
|
||||
cpu_percent: number;
|
||||
cpu_times: {
|
||||
system: number;
|
||||
children_system: number;
|
||||
user: number;
|
||||
children_user: number;
|
||||
};
|
||||
memory_info: {
|
||||
pageins: number;
|
||||
pfaults: number;
|
||||
vms: number;
|
||||
rss: number;
|
||||
};
|
||||
};
|
||||
|
||||
export type NodeInfoResponse = {
|
||||
clients: Array<{
|
||||
now: number;
|
||||
|
@ -74,24 +93,7 @@ export type NodeInfoResponse = {
|
|||
};
|
||||
load_avg: [[number, number, number], [number, number, number]];
|
||||
net: [number, number]; // Sent and received network traffic in bytes / second
|
||||
workers: Array<{
|
||||
pid: number;
|
||||
create_time: number;
|
||||
cmdline: string[];
|
||||
cpu_percent: number;
|
||||
cpu_times: {
|
||||
system: number;
|
||||
children_system: number;
|
||||
user: number;
|
||||
children_user: number;
|
||||
};
|
||||
memory_info: {
|
||||
pageins: number;
|
||||
pfaults: number;
|
||||
vms: number;
|
||||
rss: number;
|
||||
};
|
||||
}>;
|
||||
workers: Array<NodeInfoResponseWorker>;
|
||||
}>;
|
||||
log_counts: {
|
||||
[ip: string]: {
|
||||
|
|
|
@ -12,12 +12,31 @@ import {
|
|||
} from "@material-ui/core";
|
||||
import React from "react";
|
||||
import { connect } from "react-redux";
|
||||
import { RayletInfoResponse } from "../../../api";
|
||||
import { StoreState } from "../../../store";
|
||||
import Errors from "./dialogs/errors/Errors";
|
||||
import Logs from "./dialogs/logs/Logs";
|
||||
import NodeRowGroup from "./NodeRowGroup";
|
||||
import TotalRow from "./TotalRow";
|
||||
|
||||
const clusterWorkerPids = (
|
||||
rayletInfo: RayletInfoResponse,
|
||||
): Map<string, Set<string>> => {
|
||||
// Groups PIDs registered with the raylet by node IP address
|
||||
// This is used to filter out processes belonging to other ray clusters.
|
||||
const nodeMap = new Map();
|
||||
const workerPids = new Set();
|
||||
for (const [nodeIp, { workersStats }] of Object.entries(rayletInfo.nodes)) {
|
||||
for (const worker of workersStats) {
|
||||
if (!worker.isDriver) {
|
||||
workerPids.add(worker.pid.toString());
|
||||
}
|
||||
}
|
||||
nodeMap.set(nodeIp, workerPids);
|
||||
}
|
||||
return nodeMap;
|
||||
};
|
||||
|
||||
const styles = (theme: Theme) =>
|
||||
createStyles({
|
||||
table: {
|
||||
|
@ -92,31 +111,44 @@ class NodeInfo extends React.Component<
|
|||
};
|
||||
} = {};
|
||||
|
||||
// We fetch data about which process IDs are registered with
|
||||
// the cluster's raylet for each node. We use this to filter
|
||||
// the worker data contained in the node info data because
|
||||
// the node info can contain data from more than one cluster
|
||||
// if more than one cluster is running on a machine.
|
||||
const clusterWorkerPidsByIp = clusterWorkerPids(rayletInfo);
|
||||
const clusterTotalWorkers = Array.from(
|
||||
clusterWorkerPidsByIp.values(),
|
||||
).reduce((acc, workerSet) => acc + workerSet.size, 0);
|
||||
// Initialize inner structure of the count objects
|
||||
for (const client of nodeInfo.clients) {
|
||||
logCounts[client.ip] = { perWorker: {}, total: 0 };
|
||||
errorCounts[client.ip] = { perWorker: {}, total: 0 };
|
||||
for (const worker of client.workers) {
|
||||
logCounts[client.ip].perWorker[worker.pid] = 0;
|
||||
errorCounts[client.ip].perWorker[worker.pid] = 0;
|
||||
const clusterWorkerPids = clusterWorkerPidsByIp.get(client.ip);
|
||||
if (!clusterWorkerPids) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
const filteredLogEntries = Object.entries(
|
||||
nodeInfo.log_counts[client.ip] || {},
|
||||
).filter(([pid, _]) => clusterWorkerPids.has(pid));
|
||||
const totalLogEntries = filteredLogEntries.reduce(
|
||||
(acc, [_, count]) => acc + count,
|
||||
0,
|
||||
);
|
||||
logCounts[client.ip] = {
|
||||
perWorker: Object.fromEntries(filteredLogEntries),
|
||||
total: totalLogEntries,
|
||||
};
|
||||
|
||||
for (const ip of Object.keys(nodeInfo.log_counts)) {
|
||||
if (ip in logCounts) {
|
||||
for (const [pid, count] of Object.entries(nodeInfo.log_counts[ip])) {
|
||||
logCounts[ip].perWorker[pid] = count;
|
||||
logCounts[ip].total += count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const ip of Object.keys(nodeInfo.error_counts)) {
|
||||
if (ip in errorCounts) {
|
||||
for (const [pid, count] of Object.entries(nodeInfo.error_counts[ip])) {
|
||||
errorCounts[ip].perWorker[pid] = count;
|
||||
errorCounts[ip].total += count;
|
||||
}
|
||||
}
|
||||
const filteredErrEntries = Object.entries(
|
||||
nodeInfo.error_counts[client.ip] || {},
|
||||
).filter(([pid, _]) => clusterWorkerPids.has(pid));
|
||||
const totalErrEntries = filteredErrEntries.reduce(
|
||||
(acc, [_, count]) => acc + count,
|
||||
0,
|
||||
);
|
||||
errorCounts[client.ip] = {
|
||||
perWorker: Object.fromEntries(filteredErrEntries),
|
||||
total: totalErrEntries,
|
||||
};
|
||||
}
|
||||
|
||||
return (
|
||||
|
@ -138,23 +170,31 @@ class NodeInfo extends React.Component<
|
|||
</TableRow>
|
||||
</TableHead>
|
||||
<TableBody>
|
||||
{nodeInfo.clients.map((client) => (
|
||||
<NodeRowGroup
|
||||
key={client.ip}
|
||||
node={client}
|
||||
raylet={
|
||||
client.ip in rayletInfo.nodes
|
||||
? rayletInfo.nodes[client.ip]
|
||||
: null
|
||||
}
|
||||
logCounts={logCounts[client.ip]}
|
||||
errorCounts={errorCounts[client.ip]}
|
||||
setLogDialog={this.setLogDialog}
|
||||
setErrorDialog={this.setErrorDialog}
|
||||
initialExpanded={nodeInfo.clients.length <= 1}
|
||||
/>
|
||||
))}
|
||||
{nodeInfo.clients.map((client) => {
|
||||
const clusterWorkerPids =
|
||||
clusterWorkerPidsByIp.get(client.ip) || new Set();
|
||||
return (
|
||||
<NodeRowGroup
|
||||
key={client.ip}
|
||||
clusterWorkers={client.workers.filter((worker) =>
|
||||
clusterWorkerPids.has(worker.pid.toString()),
|
||||
)}
|
||||
node={client}
|
||||
raylet={
|
||||
client.ip in rayletInfo.nodes
|
||||
? rayletInfo.nodes[client.ip]
|
||||
: null
|
||||
}
|
||||
logCounts={logCounts[client.ip]}
|
||||
errorCounts={errorCounts[client.ip]}
|
||||
setLogDialog={this.setLogDialog}
|
||||
setErrorDialog={this.setErrorDialog}
|
||||
initialExpanded={nodeInfo.clients.length <= 1}
|
||||
/>
|
||||
);
|
||||
})}
|
||||
<TotalRow
|
||||
clusterTotalWorkers={clusterTotalWorkers}
|
||||
nodes={nodeInfo.clients}
|
||||
logCounts={logCounts}
|
||||
errorCounts={errorCounts}
|
||||
|
|
|
@ -10,7 +10,11 @@ import AddIcon from "@material-ui/icons/Add";
|
|||
import RemoveIcon from "@material-ui/icons/Remove";
|
||||
import classNames from "classnames";
|
||||
import React from "react";
|
||||
import { NodeInfoResponse, RayletInfoResponse } from "../../../api";
|
||||
import {
|
||||
NodeInfoResponse,
|
||||
NodeInfoResponseWorker,
|
||||
RayletInfoResponse,
|
||||
} from "../../../api";
|
||||
import { NodeCPU, WorkerCPU } from "./features/CPU";
|
||||
import { NodeDisk, WorkerDisk } from "./features/Disk";
|
||||
import { makeNodeErrors, makeWorkerErrors } from "./features/Errors";
|
||||
|
@ -50,6 +54,7 @@ type Node = ArrayType<NodeInfoResponse["clients"]>;
|
|||
|
||||
type Props = {
|
||||
node: Node;
|
||||
clusterWorkers: Array<NodeInfoResponseWorker>;
|
||||
raylet: RayletInfoResponse["nodes"][keyof RayletInfoResponse["nodes"]] | null;
|
||||
logCounts: {
|
||||
perWorker: { [pid: string]: number };
|
||||
|
@ -87,16 +92,19 @@ class NodeRowGroup extends React.Component<
|
|||
classes,
|
||||
node,
|
||||
raylet,
|
||||
clusterWorkers,
|
||||
logCounts,
|
||||
errorCounts,
|
||||
setLogDialog,
|
||||
setErrorDialog,
|
||||
} = this.props;
|
||||
const { expanded } = this.state;
|
||||
|
||||
const features = [
|
||||
{ NodeFeature: NodeHost, WorkerFeature: WorkerHost },
|
||||
{ NodeFeature: NodeWorkers, WorkerFeature: WorkerWorkers },
|
||||
{
|
||||
NodeFeature: NodeWorkers(clusterWorkers.length),
|
||||
WorkerFeature: WorkerWorkers,
|
||||
},
|
||||
{ NodeFeature: NodeUptime, WorkerFeature: WorkerUptime },
|
||||
{ NodeFeature: NodeCPU, WorkerFeature: WorkerCPU },
|
||||
{ NodeFeature: NodeRAM, WorkerFeature: WorkerRAM },
|
||||
|
@ -145,7 +153,7 @@ class NodeRowGroup extends React.Component<
|
|||
</TableCell>
|
||||
</TableRow>
|
||||
)}
|
||||
{node.workers.map((worker, index: number) => (
|
||||
{clusterWorkers.map((worker, index: number) => (
|
||||
<TableRow hover key={index}>
|
||||
<TableCell className={classes.cell} />
|
||||
{features.map(({ WorkerFeature }, index) => (
|
||||
|
|
|
@ -41,6 +41,7 @@ const styles = (theme: Theme) =>
|
|||
|
||||
type Props = {
|
||||
nodes: NodeInfoResponse["clients"];
|
||||
clusterTotalWorkers: number;
|
||||
logCounts: {
|
||||
[ip: string]: {
|
||||
perWorker: { [pid: string]: number };
|
||||
|
@ -57,11 +58,17 @@ type Props = {
|
|||
|
||||
class TotalRow extends React.Component<Props & WithStyles<typeof styles>> {
|
||||
render() {
|
||||
const { classes, nodes, logCounts, errorCounts } = this.props;
|
||||
const {
|
||||
classes,
|
||||
nodes,
|
||||
clusterTotalWorkers,
|
||||
logCounts,
|
||||
errorCounts,
|
||||
} = this.props;
|
||||
|
||||
const features = [
|
||||
{ ClusterFeature: ClusterHost },
|
||||
{ ClusterFeature: ClusterWorkers },
|
||||
{ ClusterFeature: ClusterWorkers(clusterTotalWorkers) },
|
||||
{ ClusterFeature: ClusterUptime },
|
||||
{ ClusterFeature: ClusterCPU },
|
||||
{ ClusterFeature: ClusterRAM },
|
||||
|
|
|
@ -57,12 +57,12 @@ export const makeWorkerErrors = (
|
|||
},
|
||||
setErrorDialog: (hostname: string, pid: number | null) => void,
|
||||
): WorkerFeatureComponent => ({ node, worker }) =>
|
||||
errorCounts.perWorker[worker.pid] === 0 ? (
|
||||
<Typography color="textSecondary" component="span" variant="inherit">
|
||||
No errors
|
||||
</Typography>
|
||||
) : (
|
||||
errorCounts.perWorker[worker.pid] ? (
|
||||
<SpanButton onClick={() => setErrorDialog(node.hostname, worker.pid)}>
|
||||
View errors ({errorCounts.perWorker[worker.pid].toLocaleString()})
|
||||
</SpanButton>
|
||||
) : (
|
||||
<Typography color="textSecondary" component="span" variant="inherit">
|
||||
No errors
|
||||
</Typography>
|
||||
);
|
||||
|
|
|
@ -57,13 +57,13 @@ export const makeWorkerLogs = (
|
|||
},
|
||||
setLogDialog: (hostname: string, pid: number | null) => void,
|
||||
): WorkerFeatureComponent => ({ node, worker }) =>
|
||||
logCounts.perWorker[worker.pid] === 0 ? (
|
||||
<Typography color="textSecondary" component="span" variant="inherit">
|
||||
No logs
|
||||
</Typography>
|
||||
) : (
|
||||
logCounts.perWorker[worker.pid] ? (
|
||||
<SpanButton onClick={() => setLogDialog(node.hostname, worker.pid)}>
|
||||
View log ({logCounts.perWorker[worker.pid].toLocaleString()}{" "}
|
||||
{logCounts.perWorker[worker.pid] === 1 ? "line" : "lines"})
|
||||
</SpanButton>
|
||||
) : (
|
||||
<Typography color="textSecondary" component="span" variant="inherit">
|
||||
No logs
|
||||
</Typography>
|
||||
);
|
||||
|
|
|
@ -5,11 +5,11 @@ import {
|
|||
WorkerFeatureComponent,
|
||||
} from "./types";
|
||||
|
||||
export const ClusterWorkers: ClusterFeatureComponent = ({ nodes }) => {
|
||||
let totalWorkers = 0;
|
||||
export const ClusterWorkers = (
|
||||
totalWorkers: number,
|
||||
): ClusterFeatureComponent => ({ nodes }) => {
|
||||
let totalCpus = 0;
|
||||
for (const node of nodes) {
|
||||
totalWorkers += node.workers.length;
|
||||
totalCpus += node.cpus[0];
|
||||
}
|
||||
return (
|
||||
|
@ -21,13 +21,15 @@ export const ClusterWorkers: ClusterFeatureComponent = ({ nodes }) => {
|
|||
);
|
||||
};
|
||||
|
||||
export const NodeWorkers: NodeFeatureComponent = ({ node }) => {
|
||||
const workers = node.workers.length;
|
||||
export const NodeWorkers = (totalWorkers: number): NodeFeatureComponent => ({
|
||||
node,
|
||||
}) => {
|
||||
const cpus = node.cpus[0];
|
||||
return (
|
||||
<React.Fragment>
|
||||
{workers.toLocaleString()} {workers === 1 ? "worker" : "workers"} /{" "}
|
||||
{cpus.toLocaleString()} {cpus === 1 ? "core" : "cores"}
|
||||
{totalWorkers.toLocaleString()}{" "}
|
||||
{totalWorkers === 1 ? "worker" : "workers"} / {cpus.toLocaleString()}{" "}
|
||||
{cpus === 1 ? "core" : "cores"}
|
||||
</React.Fragment>
|
||||
);
|
||||
};
|
||||
|
|
|
@ -945,7 +945,6 @@ class RayletStats(threading.Thread):
|
|||
while True:
|
||||
time.sleep(1.0)
|
||||
replies = {}
|
||||
|
||||
try:
|
||||
for node in self.nodes:
|
||||
node_id = node["NodeID"]
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include <boost/optional/optional.hpp>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "ray/common/status.h"
|
||||
|
||||
namespace ray {
|
||||
|
|
Loading…
Add table
Reference in a new issue