mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00

This PR implements the basic log APIs. For the better APIs (like higher level APIs like ray logs actors), it will be implemented after the internal API review is done. # If there's only 1 match, print a file content. Otherwise, print all files that match glob. ray logs [glob_filter] --node-id=[head node by default] Args: --tail: Tail the last X lines --follow: Follow the new logs --actor-id: The actor id --pid --node-ip: For worker logs --node-id: The node id of the log --interval: When --follow is specified, logs are printed with this interval. (should we remove it?)
154 lines
5.3 KiB
Python
154 lines
5.3 KiB
Python
import logging
|
|
|
|
import ray.dashboard.modules.log.log_utils as log_utils
|
|
import ray.dashboard.modules.log.log_consts as log_consts
|
|
import ray.dashboard.utils as dashboard_utils
|
|
import ray.dashboard.optional_utils as dashboard_optional_utils
|
|
import asyncio
|
|
import io
|
|
import os
|
|
|
|
from pathlib import Path
|
|
|
|
from ray.core.generated import reporter_pb2
|
|
from ray.core.generated import reporter_pb2_grpc
|
|
|
|
logger = logging.getLogger(__name__)
|
|
routes = dashboard_optional_utils.ClassMethodRouteTable
|
|
|
|
|
|
class LogAgent(dashboard_utils.DashboardAgentModule):
|
|
def __init__(self, dashboard_agent):
|
|
super().__init__(dashboard_agent)
|
|
log_utils.register_mimetypes()
|
|
routes.static("/logs", self._dashboard_agent.log_dir, show_index=True)
|
|
|
|
async def run(self, server):
|
|
pass
|
|
|
|
@staticmethod
|
|
def is_minimal_module():
|
|
return False
|
|
|
|
|
|
# 64 KB
|
|
BLOCK_SIZE = 1 << 16
|
|
|
|
|
|
class LogAgentV1Grpc(
|
|
dashboard_utils.DashboardAgentModule, reporter_pb2_grpc.ReporterServiceServicer
|
|
):
|
|
def __init__(self, dashboard_agent):
|
|
super().__init__(dashboard_agent)
|
|
|
|
async def run(self, server):
|
|
if server:
|
|
reporter_pb2_grpc.add_LogServiceServicer_to_server(self, server)
|
|
|
|
# TODO: should this return True
|
|
@staticmethod
|
|
def is_minimal_module():
|
|
return False
|
|
|
|
async def ListLogs(self, request, context):
|
|
"""
|
|
Lists all files in the active Ray logs directory.
|
|
|
|
NOTE: These RPCs are used by state_head.py, not log_head.py
|
|
"""
|
|
path = Path(self._dashboard_agent.log_dir)
|
|
if not path.exists():
|
|
raise FileNotFoundError(
|
|
f"Could not find log dir at path: {self._dashboard_agent.log_dir}"
|
|
"It is unexpected. Please report an issue to Ray Github."
|
|
)
|
|
log_files = []
|
|
for p in path.glob(request.glob_filter):
|
|
log_files.append(p.name)
|
|
return reporter_pb2.ListLogsReply(log_files=log_files)
|
|
|
|
async def StreamLog(self, request, context):
|
|
"""
|
|
Streams the log in real time starting from `request.lines` number of lines from
|
|
the end of the file if `request.keep_alive == True`. Else, it terminates the
|
|
stream once there are no more bytes to read from the log file.
|
|
|
|
NOTE: These RPCs are used by state_head.py, not log_head.py
|
|
"""
|
|
# NOTE: If the client side connection is closed, this handler will
|
|
# be automatically terminated.
|
|
lines = request.lines if request.lines else 1000
|
|
|
|
filepath = f"{self._dashboard_agent.log_dir}/{request.log_file_name}"
|
|
if "/" in request.log_file_name or not os.path.isfile(filepath):
|
|
await context.send_initial_metadata(
|
|
[[log_consts.LOG_GRPC_ERROR, log_consts.FILE_NOT_FOUND]]
|
|
)
|
|
else:
|
|
with open(filepath, "rb") as f:
|
|
await context.send_initial_metadata([])
|
|
# If requesting the whole file, we stream it since it may be large.
|
|
if lines == -1:
|
|
while not context.done():
|
|
bytes = f.read(BLOCK_SIZE)
|
|
if bytes == b"":
|
|
end = f.tell()
|
|
break
|
|
yield reporter_pb2.StreamLogReply(data=bytes)
|
|
else:
|
|
bytes, end = tail(f, lines)
|
|
yield reporter_pb2.StreamLogReply(data=bytes + b"\n")
|
|
if request.keep_alive:
|
|
interval = request.interval if request.interval else 1
|
|
f.seek(end)
|
|
while not context.done():
|
|
await asyncio.sleep(interval)
|
|
bytes = f.read()
|
|
if bytes != b"":
|
|
yield reporter_pb2.StreamLogReply(data=bytes)
|
|
|
|
|
|
def tail(f: io.TextIOBase, lines: int):
|
|
"""Tails the given file (in 'rb' mode)
|
|
|
|
We assume that any "lines" parameter is not significant (<100,000 lines)
|
|
and will result in a buffer with a small memory profile (<1MB)
|
|
|
|
Taken from: https://stackoverflow.com/a/136368/8299684
|
|
|
|
Examples:
|
|
Args:
|
|
f: text file in 'rb' mode
|
|
lines: The number of lines to read from the end of the file.
|
|
Returns:
|
|
string containing the lines of the file,
|
|
the position of the last byte read in units of bytes
|
|
"""
|
|
|
|
total_lines_wanted = lines
|
|
|
|
# Seek to the end of the file
|
|
f.seek(0, 2)
|
|
block_end_byte = f.tell()
|
|
|
|
last_byte_read = block_end_byte
|
|
lines_to_go = total_lines_wanted
|
|
block_number = -1
|
|
blocks = []
|
|
|
|
# Read blocks into memory until we have seen at least
|
|
# `total_lines_wanted` number of lines. Then, return a string
|
|
# containing the last `total_lines_wanted` number of lines
|
|
while lines_to_go > 0 and block_end_byte > 0:
|
|
if block_end_byte - BLOCK_SIZE > 0:
|
|
f.seek(block_number * BLOCK_SIZE, 2)
|
|
blocks.append(f.read(BLOCK_SIZE))
|
|
else:
|
|
f.seek(0, 0)
|
|
blocks.append(f.read(block_end_byte))
|
|
lines_found = blocks[-1].count(b"\n")
|
|
lines_to_go -= lines_found
|
|
block_end_byte -= BLOCK_SIZE
|
|
block_number -= 1
|
|
all_read_text = b"".join(reversed(blocks))
|
|
return b"\n".join(all_read_text.splitlines()[-total_lines_wanted:]), last_byte_read
|