From ddcc252b51c94355d853aed53283c0041ab5bbff Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Thu, 21 Apr 2022 02:11:02 -0400 Subject: [PATCH] [Core] Ray logs API (1/n) (#23435) Expose HTTP endpoint to retrieve logs from ray cluster --- dashboard/modules/log/log_agent.py | 134 +++++++++++++++++++++ dashboard/modules/log/log_consts.py | 6 + dashboard/modules/log/log_head.py | 150 ++++++++++++++++++++++++ dashboard/modules/log/tests/test_log.py | 82 +++++++++++++ dashboard/modules/node/node_consts.py | 1 - python/ray/_private/services.py | 1 + python/ray/ray_constants.py | 2 + python/ray/scripts/scripts.py | 1 - src/ray/protobuf/reporter.proto | 36 +++++- 9 files changed, 410 insertions(+), 3 deletions(-) diff --git a/dashboard/modules/log/log_agent.py b/dashboard/modules/log/log_agent.py index c4d1e6810..b15724f12 100644 --- a/dashboard/modules/log/log_agent.py +++ b/dashboard/modules/log/log_agent.py @@ -1,8 +1,14 @@ import logging import ray.dashboard.modules.log.log_utils as log_utils +import ray.dashboard.modules.log.log_consts as log_consts import ray.dashboard.utils as dashboard_utils import ray.dashboard.optional_utils as dashboard_optional_utils +from ray.core.generated import reporter_pb2 +from ray.core.generated import reporter_pb2_grpc +import asyncio +import io +import os logger = logging.getLogger(__name__) routes = dashboard_optional_utils.ClassMethodRouteTable @@ -20,3 +26,131 @@ class LogAgent(dashboard_utils.DashboardAgentModule): @staticmethod def is_minimal_module(): return False + + +# 64 KB +BLOCK_SIZE = 1 << 16 + + +class LogAgentV1Grpc( + dashboard_utils.DashboardAgentModule, reporter_pb2_grpc.ReporterServiceServicer +): + def __init__(self, dashboard_agent): + super().__init__(dashboard_agent) + + async def run(self, server): + if server: + reporter_pb2_grpc.add_LogServiceServicer_to_server(self, server) + + # TODO: should this return True + @staticmethod + def is_minimal_module(): + return False + + async def ListLogs(self, request, context): + """ + Lists all files in the active Ray logs directory. + """ + logger.info(f"initiated ListLogs:\n{request}") + + def on_exit(self): + logger.info(f"terminated ListLogs:\n{request}") + + context.add_done_callback(on_exit) + if os.path.exists(self._dashboard_agent.log_dir): + log_files = os.listdir(self._dashboard_agent.log_dir) + else: + logger.exception( + f"Could not find log dir at path: {self._dashboard_agent.log_dir}" + "It is unexpected. Please report an issue to Ray Github." + ) + log_files = [] + return reporter_pb2.ListLogsReply(log_files=log_files) + + async def StreamLog(self, request, context): + """ + Streams the log in real time starting from `request.lines` number of lines from + the end of the file if `request.keep_alive == True`. Else, it terminates the + stream once there are no more bytes to read from the log file. + """ + logger.info(f"initiated StreamLog:\n{request}") + + def on_exit(self): + logger.info(f"terminated StreamLog:\n{request}") + + context.add_done_callback(on_exit) + lines = request.lines if request.lines else 1000 + + filepath = f"{self._dashboard_agent.log_dir}/{request.log_file_name}" + if "/" in request.log_file_name or not os.path.isfile(filepath): + await context.send_initial_metadata( + [[log_consts.LOG_GRPC_ERROR, log_consts.FILE_NOT_FOUND]] + ) + else: + with open(filepath, "rb") as f: + await context.send_initial_metadata([]) + # If requesting the whole file, we stream it since it may be large. + if lines == -1: + while not context.done(): + bytes = f.read(BLOCK_SIZE) + if bytes == b"": + end = f.tell() + break + yield reporter_pb2.StreamLogReply(data=bytes) + else: + bytes, end = tail(f, lines) + yield reporter_pb2.StreamLogReply(data=bytes + b"\n") + if request.keep_alive: + interval = request.interval if request.interval else 0.5 + f.seek(end) + while not context.done(): + await asyncio.sleep(interval) + bytes = f.read() + if bytes != b"": + yield reporter_pb2.StreamLogReply(data=bytes) + + +def tail(f: io.TextIOBase, lines: int): + """Tails the given file (in 'rb' mode) + + We assume that any "lines" parameter is not significant (<100,000 lines) + and will result in a buffer with a small memory profile (<1MB) + + Taken from: https://stackoverflow.com/a/136368/8299684 + + Examples: + Args: + f: text file in 'rb' mode + lines: The number of lines to read from the end of the file. + Returns: + string containing the lines of the file, + the position of the last byte read in units of bytes + """ + + total_lines_wanted = lines + + # Seek to the end of the file + f.seek(0, 2) + block_end_byte = f.tell() + + last_byte_read = block_end_byte + lines_to_go = total_lines_wanted + block_number = -1 + blocks = [] + + # Read blocks into memory until we have seen at least + # `total_lines_wanted` number of lines. Then, return a string + # containing the last `total_lines_wanted` number of lines + while lines_to_go > 0 and block_end_byte > 0: + if block_end_byte - BLOCK_SIZE > 0: + f.seek(block_number * BLOCK_SIZE, 2) + blocks.append(f.read(BLOCK_SIZE)) + else: + f.seek(0, 0) + blocks.append(f.read(block_end_byte)) + lines_found = blocks[-1].count(b"\n") + lines_to_go -= lines_found + block_end_byte -= BLOCK_SIZE + block_number -= 1 + all_read_text = b"".join(reversed(blocks)) + return b"\n".join(all_read_text.splitlines()[-total_lines_wanted:]), last_byte_read diff --git a/dashboard/modules/log/log_consts.py b/dashboard/modules/log/log_consts.py index 905f88b5f..994ae60ba 100644 --- a/dashboard/modules/log/log_consts.py +++ b/dashboard/modules/log/log_consts.py @@ -1,3 +1,9 @@ MIME_TYPES = { "text/plain": [".err", ".out", ".log"], } + +LOG_GRPC_ERROR = "log_grpc_status" +FILE_NOT_FOUND = "LOG_GRPC_ERROR: file_not_found" + +# 10 seconds +GRPC_TIMEOUT = 10 diff --git a/dashboard/modules/log/log_head.py b/dashboard/modules/log/log_head.py index f952cdfba..5dde89344 100644 --- a/dashboard/modules/log/log_head.py +++ b/dashboard/modules/log/log_head.py @@ -2,9 +2,16 @@ import logging import aiohttp.web import ray.dashboard.modules.log.log_utils as log_utils +import ray.dashboard.modules.log.log_consts as log_consts import ray.dashboard.utils as dashboard_utils +from ray._private.utils import init_grpc_channel +from ray.core.generated import reporter_pb2 +from ray.core.generated import reporter_pb2_grpc import ray.dashboard.optional_utils as dashboard_optional_utils from ray.dashboard.datacenter import DataSource, GlobalSignals +from ray import ray_constants +from typing import List +import asyncio logger = logging.getLogger(__name__) routes = dashboard_optional_utils.ClassMethodRouteTable @@ -100,3 +107,146 @@ class LogHead(dashboard_utils.DashboardHeadModule): @staticmethod def is_minimal_module(): return False + + +class LogHeadV1(dashboard_utils.DashboardHeadModule): + def __init__(self, dashboard_head): + super().__init__(dashboard_head) + self._stubs = {} + self._ip_to_node_id = {} + DataSource.agents.signal.append(self._update_stubs) + + async def _update_stubs(self, change): + if change.old: + node_id, _ = change.old + ip = DataSource.node_id_to_ip[node_id] + self._stubs.pop(node_id) + self._ip_to_node_id.pop(ip) + if change.new: + node_id, ports = change.new + ip = DataSource.node_id_to_ip[node_id] + + options = (("grpc.enable_http_proxy", 0),) + channel = init_grpc_channel( + f"{ip}:{ports[1]}", options=options, asynchronous=True + ) + stub = reporter_pb2_grpc.LogServiceStub(channel) + self._stubs[node_id] = stub + self._ip_to_node_id[ip] = node_id + + @staticmethod + def _list_logs_single_node(log_files: List[str], filters: List[str]): + """ + Returns a JSON file mapping a category of log component to a list of filenames, + on the given node. + """ + filters = [] if filters == [""] else filters + + def contains_all_filters(log_file_name): + return all(f in log_file_name for f in filters) + + filtered = list(filter(contains_all_filters, log_files)) + logs = {} + logs["worker_errors"] = list( + filter(lambda s: "worker" in s and s.endswith(".err"), filtered) + ) + logs["worker_outs"] = list( + filter(lambda s: "worker" in s and s.endswith(".out"), filtered) + ) + for lang in ray_constants.LANGUAGE_WORKER_TYPES: + logs[f"{lang}_core_worker_logs"] = list( + filter( + lambda s: f"{lang}-core-worker" in s and s.endswith(".log"), + filtered, + ) + ) + logs[f"{lang}_driver_logs"] = list( + filter( + lambda s: f"{lang}-core-driver" in s and s.endswith(".log"), + filtered, + ) + ) + logs["dashboard"] = list(filter(lambda s: "dashboard" in s, filtered)) + logs["raylet"] = list(filter(lambda s: "raylet" in s, filtered)) + logs["gcs_server"] = list(filter(lambda s: "gcs" in s, filtered)) + logs["ray_client"] = list(filter(lambda s: "ray_client" in s, filtered)) + logs["autoscaler"] = list( + filter(lambda s: "monitor" in s and "log_monitor" not in s, filtered) + ) + logs["runtime_env"] = list(filter(lambda s: "runtime_env" in s, filtered)) + logs["folders"] = list(filter(lambda s: "." not in s, filtered)) + logs["misc"] = list( + filter(lambda s: all([s not in logs[k] for k in logs]), filtered) + ) + return logs + + async def _wait_until_initialized(self): + """ + Wait until connected to at least one node's log agent. + """ + POLL_SLEEP_TIME = 0.5 + POLL_RETRIES = 10 + for _ in range(POLL_RETRIES): + if self._stubs != {}: + return None + await asyncio.sleep(POLL_SLEEP_TIME) + return aiohttp.web.HTTPGatewayTimeout( + reason="Could not connect to agents via gRPC after " + f"{POLL_SLEEP_TIME * POLL_RETRIES} seconds." + ) + + async def _list_logs(self, node_id_query: str, filters: List[str]): + """ + Helper function to list the logs by querying each agent + on each cluster via gRPC. + """ + response = {} + tasks = [] + for node_id, grpc_stub in self._stubs.items(): + if node_id_query is None or node_id_query == node_id: + + async def coro(): + reply = await grpc_stub.ListLogs( + reporter_pb2.ListLogsRequest(), timeout=log_consts.GRPC_TIMEOUT + ) + response[node_id] = self._list_logs_single_node( + reply.log_files, filters + ) + + tasks.append(coro()) + await asyncio.gather(*tasks) + return response + + @routes.get("/api/experimental/logs/list") + async def handle_list_logs(self, req): + """ + Returns a JSON file containing, for each node in the cluster, + a dict mapping a category of log component to a list of filenames. + """ + try: + node_id = req.query.get("node_id", None) + if node_id is None: + ip = req.query.get("node_ip", None) + if ip is not None: + if ip not in self._ip_to_node_id: + return aiohttp.web.HTTPNotFound( + reason=f"node_ip: {ip} not found" + ) + node_id = self._ip_to_node_id[ip] + filters = req.query.get("filters", "").split(",") + err = await self._wait_until_initialized() + if err is not None: + return err + response = await self._list_logs(node_id, filters) + return aiohttp.web.json_response(response) + + except Exception as e: + logger.exception(e) + return aiohttp.web.HTTPInternalServerError(reason=e) + + async def run(self, server): + pass + + @staticmethod + def is_minimal_module(): + return False diff --git a/dashboard/modules/log/tests/test_log.py b/dashboard/modules/log/tests/test_log.py index 87f1e5717..c86a7f99b 100644 --- a/dashboard/modules/log/tests/test_log.py +++ b/dashboard/modules/log/tests/test_log.py @@ -5,6 +5,8 @@ import time import traceback import html.parser import urllib.parse +import json +import os from ray.dashboard.tests.conftest import * # noqa import pytest @@ -13,6 +15,7 @@ from ray._private.test_utils import ( format_web_url, wait_until_server_available, ) +from ray.dashboard.modules.log.log_agent import tail as tail_file logger = logging.getLogger(__name__) @@ -161,5 +164,84 @@ def test_log_proxy(ray_start_with_dashboard): raise Exception(f"Timed out while testing, {ex_stack}") +def test_logs_experimental_list(ray_start_with_dashboard): + assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True + webui_url = ray_start_with_dashboard["webui_url"] + webui_url = format_web_url(webui_url) + + # test that list logs is comprehensive + response = requests.get(webui_url + "/api/experimental/logs/list") + response.raise_for_status() + logs = json.loads(response.text) + assert len(logs) == 1 + node_id = next(iter(logs)) + + # test worker logs + outs = logs[node_id]["worker_outs"] + errs = logs[node_id]["worker_outs"] + core_worker_logs = logs[node_id]["python_core_worker_logs"] + + assert len(outs) == len(errs) == len(core_worker_logs) + assert len(outs) > 0 + + for file in ["debug_state_gcs.txt", "gcs_server.out", "gcs_server.err"]: + assert file in logs[node_id]["gcs_server"] + for file in ["raylet.out", "raylet.err"]: + assert file in logs[node_id]["raylet"] + for file in ["dashboard_agent.log", "dashboard.log"]: + assert file in logs[node_id]["dashboard"] + return True + + # Test that logs/list can be filtered + response = requests.get(webui_url + "/api/experimental/logs/list?filters=gcs") + response.raise_for_status() + logs = json.loads(response.text) + assert len(logs) == 1 + node_id = next(iter(logs)) + assert "gcs_server" in logs[node_id] + for category in logs[node_id]: + if category != "gcs_server": + assert len(logs[node_id][category]) == 0 + + response = requests.get(webui_url + "/api/experimental/logs/list?filters=worker") + response.raise_for_status() + logs = json.loads(response.text) + assert len(logs) == 1 + node_id = next(iter(logs)) + worker_log_categories = [ + "python_core_worker_logs", + "worker_outs", + "worker_errors", + ] + assert all([cat in logs[node_id] for cat in worker_log_categories]) + for category in logs[node_id]: + if category not in worker_log_categories: + assert len(logs[node_id][category]) == 0 + + +def test_logs_tail(): + """ + Unit test for tail + """ + TOTAL_LINES = 1000 + FILE_NAME = "test_file.txt" + try: + with open(FILE_NAME, "w") as f: + for i in range(TOTAL_LINES): + f.write(f"Message {i:4}\n") + file = open(FILE_NAME, "rb") + text, byte_pos = tail_file(file, 100) + assert byte_pos == TOTAL_LINES * len(b"Message 1000\n") + lines = text.decode("utf-8").split("\n") + assert len(lines) == 100 + assert lines[0] == "Message 900" + assert lines[99] == "Message 999" + except Exception as e: + raise e + finally: + if os.path.exists(FILE_NAME): + os.remove(FILE_NAME) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/node/node_consts.py b/dashboard/modules/node/node_consts.py index 60b202d12..23ee41d49 100644 --- a/dashboard/modules/node/node_consts.py +++ b/dashboard/modules/node/node_consts.py @@ -1,5 +1,4 @@ NODE_STATS_UPDATE_INTERVAL_SECONDS = 1 -LOG_INFO_UPDATE_INTERVAL_SECONDS = 5 UPDATE_NODES_INTERVAL_SECONDS = 5 MAX_COUNT_OF_GCS_RPC_ERROR = 10 MAX_LOGS_TO_CACHE = 10000 diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 3365cbbea..438031b58 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -270,6 +270,7 @@ def _find_address_from_flag(flag: str): cmdline = proc.cmdline() # NOTE(kfstorm): To support Windows, we can't use # `os.path.basename(cmdline[0]) == "raylet"` here. + if len(cmdline) > 0 and "raylet" in os.path.basename(cmdline[0]): for arglist in cmdline: # Given we're merely seeking --redis-address, we just split diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 46a26cf27..e460e4f01 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -337,3 +337,5 @@ KV_NAMESPACE_CLUSTER = b"cluster" KV_NAMESPACE_PACKAGE = None KV_NAMESPACE_SERVE = b"serve" KV_NAMESPACE_FUNCTION_TABLE = b"fun" + +LANGUAGE_WORKER_TYPES = ["python", "java", "cpp"] diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index f9fb783cd..c8bb81c79 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -38,7 +38,6 @@ from ray.autoscaler._private.commands import ( from ray.autoscaler._private.constants import RAY_PROCESSES from ray.autoscaler._private.fake_multi_node.node_provider import FAKE_HEAD_NODE_ID from ray.autoscaler._private.kuberay.run_autoscaler import run_autoscaler_with_retries - from ray.internal.internal_api import memory_summary from ray.autoscaler._private.cli_logger import add_click_logging_options, cli_logger, cf from ray.dashboard.modules.job.cli import job_cli_group diff --git a/src/ray/protobuf/reporter.proto b/src/ray/protobuf/reporter.proto index 6adecb6e6..b9eb53e88 100644 --- a/src/ray/protobuf/reporter.proto +++ b/src/ray/protobuf/reporter.proto @@ -60,7 +60,7 @@ message NodeResourceUsage { string json = 1; } -// Service for communicating with the reporter.py process on a remote node. +// Service for communicating with the reporter agent module on a remote node. service ReporterService { // Get the profiling stats. rpc GetProfilingStats(GetProfilingStatsRequest) returns (GetProfilingStatsReply); @@ -69,3 +69,37 @@ service ReporterService { // Report OpenCensus metrics to the local metrics agent. rpc ReportOCMetrics(ReportOCMetricsRequest) returns (ReportOCMetricsReply); } + +message StreamLogRequest { + // File name of the log file + string log_file_name = 1; + // Keeps stream alive perpertually if true, else terminates on EOF + bool keep_alive = 2; + // Number of lines to tail from the log file initially + // -1 indicates to fetch the whole file + optional int32 lines = 3; + // if keep_alive is true, this indicates how frequently to poll the + // log file for new lines + optional float interval = 4; +} + +message StreamLogReply { + // The raw bytes of the log file chunk + bytes data = 1; +} + +message ListLogsRequest {} + +message ListLogsReply { + // The file names of all the log files in this node's + // log directory + repeated string log_files = 1; +} + +// Service for communicating with the log agent module on a remote node. +service LogService { + // Get the list of logs from the agent. + rpc ListLogs(ListLogsRequest) returns (ListLogsReply); + // Streams a log file from the agent. + rpc StreamLog(StreamLogRequest) returns (stream StreamLogReply); +}