mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
[Core] Ray logs API (1/n) (#23435)
Expose HTTP endpoint to retrieve logs from ray cluster
This commit is contained in:
parent
371d1f4533
commit
ddcc252b51
9 changed files with 410 additions and 3 deletions
|
@ -1,8 +1,14 @@
|
|||
import logging
|
||||
|
||||
import ray.dashboard.modules.log.log_utils as log_utils
|
||||
import ray.dashboard.modules.log.log_consts as log_consts
|
||||
import ray.dashboard.utils as dashboard_utils
|
||||
import ray.dashboard.optional_utils as dashboard_optional_utils
|
||||
from ray.core.generated import reporter_pb2
|
||||
from ray.core.generated import reporter_pb2_grpc
|
||||
import asyncio
|
||||
import io
|
||||
import os
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
routes = dashboard_optional_utils.ClassMethodRouteTable
|
||||
|
@ -20,3 +26,131 @@ class LogAgent(dashboard_utils.DashboardAgentModule):
|
|||
@staticmethod
|
||||
def is_minimal_module():
|
||||
return False
|
||||
|
||||
|
||||
# 64 KB
|
||||
BLOCK_SIZE = 1 << 16
|
||||
|
||||
|
||||
class LogAgentV1Grpc(
|
||||
dashboard_utils.DashboardAgentModule, reporter_pb2_grpc.ReporterServiceServicer
|
||||
):
|
||||
def __init__(self, dashboard_agent):
|
||||
super().__init__(dashboard_agent)
|
||||
|
||||
async def run(self, server):
|
||||
if server:
|
||||
reporter_pb2_grpc.add_LogServiceServicer_to_server(self, server)
|
||||
|
||||
# TODO: should this return True
|
||||
@staticmethod
|
||||
def is_minimal_module():
|
||||
return False
|
||||
|
||||
async def ListLogs(self, request, context):
|
||||
"""
|
||||
Lists all files in the active Ray logs directory.
|
||||
"""
|
||||
logger.info(f"initiated ListLogs:\n{request}")
|
||||
|
||||
def on_exit(self):
|
||||
logger.info(f"terminated ListLogs:\n{request}")
|
||||
|
||||
context.add_done_callback(on_exit)
|
||||
if os.path.exists(self._dashboard_agent.log_dir):
|
||||
log_files = os.listdir(self._dashboard_agent.log_dir)
|
||||
else:
|
||||
logger.exception(
|
||||
f"Could not find log dir at path: {self._dashboard_agent.log_dir}"
|
||||
"It is unexpected. Please report an issue to Ray Github."
|
||||
)
|
||||
log_files = []
|
||||
return reporter_pb2.ListLogsReply(log_files=log_files)
|
||||
|
||||
async def StreamLog(self, request, context):
|
||||
"""
|
||||
Streams the log in real time starting from `request.lines` number of lines from
|
||||
the end of the file if `request.keep_alive == True`. Else, it terminates the
|
||||
stream once there are no more bytes to read from the log file.
|
||||
"""
|
||||
logger.info(f"initiated StreamLog:\n{request}")
|
||||
|
||||
def on_exit(self):
|
||||
logger.info(f"terminated StreamLog:\n{request}")
|
||||
|
||||
context.add_done_callback(on_exit)
|
||||
lines = request.lines if request.lines else 1000
|
||||
|
||||
filepath = f"{self._dashboard_agent.log_dir}/{request.log_file_name}"
|
||||
if "/" in request.log_file_name or not os.path.isfile(filepath):
|
||||
await context.send_initial_metadata(
|
||||
[[log_consts.LOG_GRPC_ERROR, log_consts.FILE_NOT_FOUND]]
|
||||
)
|
||||
else:
|
||||
with open(filepath, "rb") as f:
|
||||
await context.send_initial_metadata([])
|
||||
# If requesting the whole file, we stream it since it may be large.
|
||||
if lines == -1:
|
||||
while not context.done():
|
||||
bytes = f.read(BLOCK_SIZE)
|
||||
if bytes == b"":
|
||||
end = f.tell()
|
||||
break
|
||||
yield reporter_pb2.StreamLogReply(data=bytes)
|
||||
else:
|
||||
bytes, end = tail(f, lines)
|
||||
yield reporter_pb2.StreamLogReply(data=bytes + b"\n")
|
||||
if request.keep_alive:
|
||||
interval = request.interval if request.interval else 0.5
|
||||
f.seek(end)
|
||||
while not context.done():
|
||||
await asyncio.sleep(interval)
|
||||
bytes = f.read()
|
||||
if bytes != b"":
|
||||
yield reporter_pb2.StreamLogReply(data=bytes)
|
||||
|
||||
|
||||
def tail(f: io.TextIOBase, lines: int):
|
||||
"""Tails the given file (in 'rb' mode)
|
||||
|
||||
We assume that any "lines" parameter is not significant (<100,000 lines)
|
||||
and will result in a buffer with a small memory profile (<1MB)
|
||||
|
||||
Taken from: https://stackoverflow.com/a/136368/8299684
|
||||
|
||||
Examples:
|
||||
Args:
|
||||
f: text file in 'rb' mode
|
||||
lines: The number of lines to read from the end of the file.
|
||||
Returns:
|
||||
string containing the lines of the file,
|
||||
the position of the last byte read in units of bytes
|
||||
"""
|
||||
|
||||
total_lines_wanted = lines
|
||||
|
||||
# Seek to the end of the file
|
||||
f.seek(0, 2)
|
||||
block_end_byte = f.tell()
|
||||
|
||||
last_byte_read = block_end_byte
|
||||
lines_to_go = total_lines_wanted
|
||||
block_number = -1
|
||||
blocks = []
|
||||
|
||||
# Read blocks into memory until we have seen at least
|
||||
# `total_lines_wanted` number of lines. Then, return a string
|
||||
# containing the last `total_lines_wanted` number of lines
|
||||
while lines_to_go > 0 and block_end_byte > 0:
|
||||
if block_end_byte - BLOCK_SIZE > 0:
|
||||
f.seek(block_number * BLOCK_SIZE, 2)
|
||||
blocks.append(f.read(BLOCK_SIZE))
|
||||
else:
|
||||
f.seek(0, 0)
|
||||
blocks.append(f.read(block_end_byte))
|
||||
lines_found = blocks[-1].count(b"\n")
|
||||
lines_to_go -= lines_found
|
||||
block_end_byte -= BLOCK_SIZE
|
||||
block_number -= 1
|
||||
all_read_text = b"".join(reversed(blocks))
|
||||
return b"\n".join(all_read_text.splitlines()[-total_lines_wanted:]), last_byte_read
|
||||
|
|
|
@ -1,3 +1,9 @@
|
|||
MIME_TYPES = {
|
||||
"text/plain": [".err", ".out", ".log"],
|
||||
}
|
||||
|
||||
LOG_GRPC_ERROR = "log_grpc_status"
|
||||
FILE_NOT_FOUND = "LOG_GRPC_ERROR: file_not_found"
|
||||
|
||||
# 10 seconds
|
||||
GRPC_TIMEOUT = 10
|
||||
|
|
|
@ -2,9 +2,16 @@ import logging
|
|||
|
||||
import aiohttp.web
|
||||
import ray.dashboard.modules.log.log_utils as log_utils
|
||||
import ray.dashboard.modules.log.log_consts as log_consts
|
||||
import ray.dashboard.utils as dashboard_utils
|
||||
from ray._private.utils import init_grpc_channel
|
||||
from ray.core.generated import reporter_pb2
|
||||
from ray.core.generated import reporter_pb2_grpc
|
||||
import ray.dashboard.optional_utils as dashboard_optional_utils
|
||||
from ray.dashboard.datacenter import DataSource, GlobalSignals
|
||||
from ray import ray_constants
|
||||
from typing import List
|
||||
import asyncio
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
routes = dashboard_optional_utils.ClassMethodRouteTable
|
||||
|
@ -100,3 +107,146 @@ class LogHead(dashboard_utils.DashboardHeadModule):
|
|||
@staticmethod
|
||||
def is_minimal_module():
|
||||
return False
|
||||
|
||||
|
||||
class LogHeadV1(dashboard_utils.DashboardHeadModule):
|
||||
def __init__(self, dashboard_head):
|
||||
super().__init__(dashboard_head)
|
||||
self._stubs = {}
|
||||
self._ip_to_node_id = {}
|
||||
DataSource.agents.signal.append(self._update_stubs)
|
||||
|
||||
async def _update_stubs(self, change):
|
||||
if change.old:
|
||||
node_id, _ = change.old
|
||||
ip = DataSource.node_id_to_ip[node_id]
|
||||
self._stubs.pop(node_id)
|
||||
self._ip_to_node_id.pop(ip)
|
||||
if change.new:
|
||||
node_id, ports = change.new
|
||||
ip = DataSource.node_id_to_ip[node_id]
|
||||
|
||||
options = (("grpc.enable_http_proxy", 0),)
|
||||
channel = init_grpc_channel(
|
||||
f"{ip}:{ports[1]}", options=options, asynchronous=True
|
||||
)
|
||||
stub = reporter_pb2_grpc.LogServiceStub(channel)
|
||||
self._stubs[node_id] = stub
|
||||
self._ip_to_node_id[ip] = node_id
|
||||
|
||||
@staticmethod
|
||||
def _list_logs_single_node(log_files: List[str], filters: List[str]):
|
||||
"""
|
||||
Returns a JSON file mapping a category of log component to a list of filenames,
|
||||
on the given node.
|
||||
"""
|
||||
filters = [] if filters == [""] else filters
|
||||
|
||||
def contains_all_filters(log_file_name):
|
||||
return all(f in log_file_name for f in filters)
|
||||
|
||||
filtered = list(filter(contains_all_filters, log_files))
|
||||
logs = {}
|
||||
logs["worker_errors"] = list(
|
||||
filter(lambda s: "worker" in s and s.endswith(".err"), filtered)
|
||||
)
|
||||
logs["worker_outs"] = list(
|
||||
filter(lambda s: "worker" in s and s.endswith(".out"), filtered)
|
||||
)
|
||||
for lang in ray_constants.LANGUAGE_WORKER_TYPES:
|
||||
logs[f"{lang}_core_worker_logs"] = list(
|
||||
filter(
|
||||
lambda s: f"{lang}-core-worker" in s and s.endswith(".log"),
|
||||
filtered,
|
||||
)
|
||||
)
|
||||
logs[f"{lang}_driver_logs"] = list(
|
||||
filter(
|
||||
lambda s: f"{lang}-core-driver" in s and s.endswith(".log"),
|
||||
filtered,
|
||||
)
|
||||
)
|
||||
logs["dashboard"] = list(filter(lambda s: "dashboard" in s, filtered))
|
||||
logs["raylet"] = list(filter(lambda s: "raylet" in s, filtered))
|
||||
logs["gcs_server"] = list(filter(lambda s: "gcs" in s, filtered))
|
||||
logs["ray_client"] = list(filter(lambda s: "ray_client" in s, filtered))
|
||||
logs["autoscaler"] = list(
|
||||
filter(lambda s: "monitor" in s and "log_monitor" not in s, filtered)
|
||||
)
|
||||
logs["runtime_env"] = list(filter(lambda s: "runtime_env" in s, filtered))
|
||||
logs["folders"] = list(filter(lambda s: "." not in s, filtered))
|
||||
logs["misc"] = list(
|
||||
filter(lambda s: all([s not in logs[k] for k in logs]), filtered)
|
||||
)
|
||||
return logs
|
||||
|
||||
async def _wait_until_initialized(self):
|
||||
"""
|
||||
Wait until connected to at least one node's log agent.
|
||||
"""
|
||||
POLL_SLEEP_TIME = 0.5
|
||||
POLL_RETRIES = 10
|
||||
for _ in range(POLL_RETRIES):
|
||||
if self._stubs != {}:
|
||||
return None
|
||||
await asyncio.sleep(POLL_SLEEP_TIME)
|
||||
return aiohttp.web.HTTPGatewayTimeout(
|
||||
reason="Could not connect to agents via gRPC after "
|
||||
f"{POLL_SLEEP_TIME * POLL_RETRIES} seconds."
|
||||
)
|
||||
|
||||
async def _list_logs(self, node_id_query: str, filters: List[str]):
|
||||
"""
|
||||
Helper function to list the logs by querying each agent
|
||||
on each cluster via gRPC.
|
||||
"""
|
||||
response = {}
|
||||
tasks = []
|
||||
for node_id, grpc_stub in self._stubs.items():
|
||||
if node_id_query is None or node_id_query == node_id:
|
||||
|
||||
async def coro():
|
||||
reply = await grpc_stub.ListLogs(
|
||||
reporter_pb2.ListLogsRequest(), timeout=log_consts.GRPC_TIMEOUT
|
||||
)
|
||||
response[node_id] = self._list_logs_single_node(
|
||||
reply.log_files, filters
|
||||
)
|
||||
|
||||
tasks.append(coro())
|
||||
await asyncio.gather(*tasks)
|
||||
return response
|
||||
|
||||
@routes.get("/api/experimental/logs/list")
|
||||
async def handle_list_logs(self, req):
|
||||
"""
|
||||
Returns a JSON file containing, for each node in the cluster,
|
||||
a dict mapping a category of log component to a list of filenames.
|
||||
"""
|
||||
try:
|
||||
node_id = req.query.get("node_id", None)
|
||||
if node_id is None:
|
||||
ip = req.query.get("node_ip", None)
|
||||
if ip is not None:
|
||||
if ip not in self._ip_to_node_id:
|
||||
return aiohttp.web.HTTPNotFound(
|
||||
reason=f"node_ip: {ip} not found"
|
||||
)
|
||||
node_id = self._ip_to_node_id[ip]
|
||||
filters = req.query.get("filters", "").split(",")
|
||||
err = await self._wait_until_initialized()
|
||||
if err is not None:
|
||||
return err
|
||||
response = await self._list_logs(node_id, filters)
|
||||
return aiohttp.web.json_response(response)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
return aiohttp.web.HTTPInternalServerError(reason=e)
|
||||
|
||||
async def run(self, server):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def is_minimal_module():
|
||||
return False
|
||||
|
|
|
@ -5,6 +5,8 @@ import time
|
|||
import traceback
|
||||
import html.parser
|
||||
import urllib.parse
|
||||
import json
|
||||
import os
|
||||
|
||||
from ray.dashboard.tests.conftest import * # noqa
|
||||
import pytest
|
||||
|
@ -13,6 +15,7 @@ from ray._private.test_utils import (
|
|||
format_web_url,
|
||||
wait_until_server_available,
|
||||
)
|
||||
from ray.dashboard.modules.log.log_agent import tail as tail_file
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -161,5 +164,84 @@ def test_log_proxy(ray_start_with_dashboard):
|
|||
raise Exception(f"Timed out while testing, {ex_stack}")
|
||||
|
||||
|
||||
def test_logs_experimental_list(ray_start_with_dashboard):
|
||||
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True
|
||||
webui_url = ray_start_with_dashboard["webui_url"]
|
||||
webui_url = format_web_url(webui_url)
|
||||
|
||||
# test that list logs is comprehensive
|
||||
response = requests.get(webui_url + "/api/experimental/logs/list")
|
||||
response.raise_for_status()
|
||||
logs = json.loads(response.text)
|
||||
assert len(logs) == 1
|
||||
node_id = next(iter(logs))
|
||||
|
||||
# test worker logs
|
||||
outs = logs[node_id]["worker_outs"]
|
||||
errs = logs[node_id]["worker_outs"]
|
||||
core_worker_logs = logs[node_id]["python_core_worker_logs"]
|
||||
|
||||
assert len(outs) == len(errs) == len(core_worker_logs)
|
||||
assert len(outs) > 0
|
||||
|
||||
for file in ["debug_state_gcs.txt", "gcs_server.out", "gcs_server.err"]:
|
||||
assert file in logs[node_id]["gcs_server"]
|
||||
for file in ["raylet.out", "raylet.err"]:
|
||||
assert file in logs[node_id]["raylet"]
|
||||
for file in ["dashboard_agent.log", "dashboard.log"]:
|
||||
assert file in logs[node_id]["dashboard"]
|
||||
return True
|
||||
|
||||
# Test that logs/list can be filtered
|
||||
response = requests.get(webui_url + "/api/experimental/logs/list?filters=gcs")
|
||||
response.raise_for_status()
|
||||
logs = json.loads(response.text)
|
||||
assert len(logs) == 1
|
||||
node_id = next(iter(logs))
|
||||
assert "gcs_server" in logs[node_id]
|
||||
for category in logs[node_id]:
|
||||
if category != "gcs_server":
|
||||
assert len(logs[node_id][category]) == 0
|
||||
|
||||
response = requests.get(webui_url + "/api/experimental/logs/list?filters=worker")
|
||||
response.raise_for_status()
|
||||
logs = json.loads(response.text)
|
||||
assert len(logs) == 1
|
||||
node_id = next(iter(logs))
|
||||
worker_log_categories = [
|
||||
"python_core_worker_logs",
|
||||
"worker_outs",
|
||||
"worker_errors",
|
||||
]
|
||||
assert all([cat in logs[node_id] for cat in worker_log_categories])
|
||||
for category in logs[node_id]:
|
||||
if category not in worker_log_categories:
|
||||
assert len(logs[node_id][category]) == 0
|
||||
|
||||
|
||||
def test_logs_tail():
|
||||
"""
|
||||
Unit test for tail
|
||||
"""
|
||||
TOTAL_LINES = 1000
|
||||
FILE_NAME = "test_file.txt"
|
||||
try:
|
||||
with open(FILE_NAME, "w") as f:
|
||||
for i in range(TOTAL_LINES):
|
||||
f.write(f"Message {i:4}\n")
|
||||
file = open(FILE_NAME, "rb")
|
||||
text, byte_pos = tail_file(file, 100)
|
||||
assert byte_pos == TOTAL_LINES * len(b"Message 1000\n")
|
||||
lines = text.decode("utf-8").split("\n")
|
||||
assert len(lines) == 100
|
||||
assert lines[0] == "Message 900"
|
||||
assert lines[99] == "Message 999"
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
if os.path.exists(FILE_NAME):
|
||||
os.remove(FILE_NAME)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
NODE_STATS_UPDATE_INTERVAL_SECONDS = 1
|
||||
LOG_INFO_UPDATE_INTERVAL_SECONDS = 5
|
||||
UPDATE_NODES_INTERVAL_SECONDS = 5
|
||||
MAX_COUNT_OF_GCS_RPC_ERROR = 10
|
||||
MAX_LOGS_TO_CACHE = 10000
|
||||
|
|
|
@ -270,6 +270,7 @@ def _find_address_from_flag(flag: str):
|
|||
cmdline = proc.cmdline()
|
||||
# NOTE(kfstorm): To support Windows, we can't use
|
||||
# `os.path.basename(cmdline[0]) == "raylet"` here.
|
||||
|
||||
if len(cmdline) > 0 and "raylet" in os.path.basename(cmdline[0]):
|
||||
for arglist in cmdline:
|
||||
# Given we're merely seeking --redis-address, we just split
|
||||
|
|
|
@ -337,3 +337,5 @@ KV_NAMESPACE_CLUSTER = b"cluster"
|
|||
KV_NAMESPACE_PACKAGE = None
|
||||
KV_NAMESPACE_SERVE = b"serve"
|
||||
KV_NAMESPACE_FUNCTION_TABLE = b"fun"
|
||||
|
||||
LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"]
|
||||
|
|
|
@ -38,7 +38,6 @@ from ray.autoscaler._private.commands import (
|
|||
from ray.autoscaler._private.constants import RAY_PROCESSES
|
||||
from ray.autoscaler._private.fake_multi_node.node_provider import FAKE_HEAD_NODE_ID
|
||||
from ray.autoscaler._private.kuberay.run_autoscaler import run_autoscaler_with_retries
|
||||
|
||||
from ray.internal.internal_api import memory_summary
|
||||
from ray.autoscaler._private.cli_logger import add_click_logging_options, cli_logger, cf
|
||||
from ray.dashboard.modules.job.cli import job_cli_group
|
||||
|
|
|
@ -60,7 +60,7 @@ message NodeResourceUsage {
|
|||
string json = 1;
|
||||
}
|
||||
|
||||
// Service for communicating with the reporter.py process on a remote node.
|
||||
// Service for communicating with the reporter agent module on a remote node.
|
||||
service ReporterService {
|
||||
// Get the profiling stats.
|
||||
rpc GetProfilingStats(GetProfilingStatsRequest) returns (GetProfilingStatsReply);
|
||||
|
@ -69,3 +69,37 @@ service ReporterService {
|
|||
// Report OpenCensus metrics to the local metrics agent.
|
||||
rpc ReportOCMetrics(ReportOCMetricsRequest) returns (ReportOCMetricsReply);
|
||||
}
|
||||
|
||||
message StreamLogRequest {
|
||||
// File name of the log file
|
||||
string log_file_name = 1;
|
||||
// Keeps stream alive perpertually if true, else terminates on EOF
|
||||
bool keep_alive = 2;
|
||||
// Number of lines to tail from the log file initially
|
||||
// -1 indicates to fetch the whole file
|
||||
optional int32 lines = 3;
|
||||
// if keep_alive is true, this indicates how frequently to poll the
|
||||
// log file for new lines
|
||||
optional float interval = 4;
|
||||
}
|
||||
|
||||
message StreamLogReply {
|
||||
// The raw bytes of the log file chunk
|
||||
bytes data = 1;
|
||||
}
|
||||
|
||||
message ListLogsRequest {}
|
||||
|
||||
message ListLogsReply {
|
||||
// The file names of all the log files in this node's
|
||||
// log directory
|
||||
repeated string log_files = 1;
|
||||
}
|
||||
|
||||
// Service for communicating with the log agent module on a remote node.
|
||||
service LogService {
|
||||
// Get the list of logs from the agent.
|
||||
rpc ListLogs(ListLogsRequest) returns (ListLogsReply);
|
||||
// Streams a log file from the agent.
|
||||
rpc StreamLog(StreamLogRequest) returns (stream StreamLogReply);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue