diff --git a/dashboard/modules/job/job_manager.py b/dashboard/modules/job/job_manager.py index 24b9ef74f..6530185af 100644 --- a/dashboard/modules/job/job_manager.py +++ b/dashboard/modules/job/job_manager.py @@ -10,7 +10,7 @@ import time import traceback from asyncio.tasks import FIRST_COMPLETED from collections import deque -from typing import Any, Dict, Iterator, Optional, Tuple +from typing import Any, Dict, Iterator, List, Optional, Tuple import ray from ray._private.gcs_utils import GcsAioClient @@ -58,6 +58,9 @@ class JobLogStorageClient: JOB_LOGS_PATH = "job-driver-{job_id}.log" # Number of last N lines to put in job message upon failure. NUM_LOG_LINES_ON_ERROR = 10 + # Maximum number of characters to print out of the logs to avoid + # HUGE log outputs that bring down the api server + MAX_LOG_SIZE = 20000 def get_logs(self, job_id: str) -> str: try: @@ -66,20 +69,31 @@ class JobLogStorageClient: except FileNotFoundError: return "" - def tail_logs(self, job_id: str) -> Iterator[str]: + def tail_logs(self, job_id: str) -> Iterator[List[str]]: return file_tail_iterator(self.get_log_file_path(job_id)) def get_last_n_log_lines( self, job_id: str, num_log_lines=NUM_LOG_LINES_ON_ERROR ) -> str: + """ + Returns the last MAX_LOG_SIZE (20000) characters in the last + `num_log_lines` lines. + + Args: + job_id: The id of the job whose logs we want to return + num_log_lines: The number of lines to return. + """ log_tail_iter = self.tail_logs(job_id) log_tail_deque = deque(maxlen=num_log_lines) - for line in log_tail_iter: - if line is None: + for lines in log_tail_iter: + if lines is None: break else: - log_tail_deque.append(line) - return "".join(log_tail_deque) + # log_tail_iter can return batches of lines at a time. + for line in lines: + log_tail_deque.append(line) + + return "".join(log_tail_deque)[-self.MAX_LOG_SIZE :] def get_log_file_path(self, job_id: str) -> Tuple[str, str]: """ @@ -281,7 +295,8 @@ class JobSupervisor: if log_tail is not None and log_tail != "": message = ( "Job failed due to an application error, " - "last available logs:\n" + log_tail + "last available logs (truncated to 20,000 chars):\n" + + log_tail ) else: message = None @@ -562,8 +577,8 @@ class JobManager: if await self.get_job_status(job_id) is None: raise RuntimeError(f"Job '{job_id}' does not exist.") - for line in self._log_client.tail_logs(job_id): - if line is None: + for lines in self._log_client.tail_logs(job_id): + if lines is None: # Return if the job has exited and there are no new log lines. status = await self.get_job_status(job_id) if status not in {JobStatus.PENDING, JobStatus.RUNNING}: @@ -571,4 +586,4 @@ class JobManager: await asyncio.sleep(self.LOG_TAIL_SLEEP_S) else: - yield line + yield "".join(lines) diff --git a/dashboard/modules/job/tests/test_job_manager.py b/dashboard/modules/job/tests/test_job_manager.py index f07e8bf0c..9ded996f1 100644 --- a/dashboard/modules/job/tests/test_job_manager.py +++ b/dashboard/modules/job/tests/test_job_manager.py @@ -793,5 +793,29 @@ async def test_job_runs_with_no_resources_available(job_manager): ray.cancel(hanging_ref) +async def test_failed_job_logs_max_char(job_manager): + """Test failed jobs does not print out too many logs""" + + # Prints 21000 characters + print_large_logs_cmd = ( + "python -c \"print('1234567890'* 2100); raise RuntimeError()\"" + ) + + job_id = await job_manager.submit_job( + entrypoint=print_large_logs_cmd, + ) + + await async_wait_for_condition( + check_job_failed, job_manager=job_manager, job_id=job_id + ) + + # Verify the status message length + job_info = await job_manager.get_job_info(job_id) + assert job_info + assert len(job_info.message) == 20000 + len( + "Job failed due to an application error, " "last available logs:\n" + ) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/job/tests/test_utils.py b/dashboard/modules/job/tests/test_utils.py index 339e26784..eea88b671 100644 --- a/dashboard/modules/job/tests/test_utils.py +++ b/dashboard/modules/job/tests/test_utils.py @@ -34,7 +34,7 @@ class TestIterLine: assert next(it) is None f.write("\n") f.flush() - assert next(it) == "no_newline_yet\n" + assert next(it) == ["no_newline_yet\n"] def test_multiple_lines(self, tmp): it = file_tail_iterator(tmp) @@ -47,7 +47,7 @@ class TestIterLine: s = f"{i}\n" f.write(s) f.flush() - assert next(it) == s + assert next(it) == [s] assert next(it) is None @@ -64,10 +64,64 @@ class TestIterLine: f.write(f"{i}\n") f.flush() - assert next(it) == "\n".join(str(i) for i in range(10)) + "\n" + assert next(it) == [f"{i}\n" for i in range(10)] assert next(it) is None + def test_max_line_batching(self, tmp): + it = file_tail_iterator(tmp) + assert next(it) is None + + f = open(tmp, "w") + + # Write lines in batches of 50, check that we get them back in batches of 10. + for _ in range(100): + num_lines = 50 + for i in range(num_lines): + f.write(f"{i}\n") + f.flush() + + assert next(it) == [f"{i}\n" for i in range(10)] + assert next(it) == [f"{i}\n" for i in range(10, 20)] + assert next(it) == [f"{i}\n" for i in range(20, 30)] + assert next(it) == [f"{i}\n" for i in range(30, 40)] + assert next(it) == [f"{i}\n" for i in range(40, 50)] + + assert next(it) is None + + def test_max_char_batching(self, tmp): + it = file_tail_iterator(tmp) + assert next(it) is None + + f = open(tmp, "w") + + # Write a single line that is over 60000 characters, + # check we get it in batches of 20000 + f.write(f"{'1234567890' * 6000}\n") + f.flush() + + assert next(it) == ["1234567890" * 2000] + assert next(it) == ["1234567890" * 2000] + assert next(it) == ["1234567890" * 2000] + assert next(it) == ["\n"] + assert next(it) is None + + # Write a 10 lines where last line is over 20000 characters, + # check we get it in batches of 20000 + for i in range(9): + f.write(f"{i}\n") + f.write(f"{'1234567890' * 2000}\n") + f.flush() + + first_nine_lines = [f"{i}\n" for i in range(9)] + first_nine_lines_length = sum(len(line) for line in first_nine_lines) + assert next(it) == first_nine_lines + [ + f"{'1234567890' * 2000}"[0:-first_nine_lines_length] + ] + # Remainder of last line + assert next(it) == [f"{'1234567890' * 2000}"[-first_nine_lines_length:] + "\n"] + assert next(it) is None + def test_delete_file(self): with NamedTemporaryFile() as tmp: it = file_tail_iterator(tmp.name) @@ -78,7 +132,7 @@ class TestIterLine: f.write("hi\n") f.flush() - assert next(it) == "hi\n" + assert next(it) == ["hi\n"] # Calls should continue returning None after file deleted. assert next(it) is None diff --git a/dashboard/modules/job/utils.py b/dashboard/modules/job/utils.py index dc555b4c0..6da2a6062 100644 --- a/dashboard/modules/job/utils.py +++ b/dashboard/modules/job/utils.py @@ -1,14 +1,20 @@ import logging import os -from typing import Iterator, Optional +from typing import Iterator, List, Optional logger = logging.getLogger(__name__) +MAX_CHUNK_LINE_LENGTH = 10 +MAX_CHUNK_CHAR_LENGTH = 20000 -def file_tail_iterator(path: str) -> Iterator[Optional[str]]: + +def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]: """Yield lines from a file as it's written. - Returns lines in batches opportunistically. + Returns lines in batches of up to 10 lines or 20000 characters, + whichever comes first. If it's a chunk of 20000 characters, then + the last line that is yielded could be an incomplete line. + New line characters are kept in the line string. Returns None until the file exists or if no new line has been written. """ @@ -20,12 +26,41 @@ def file_tail_iterator(path: str) -> Iterator[Optional[str]]: yield None with open(path, "r") as f: - lines = "" + lines = [] + chunk_char_count = 0 + curr_line = None while True: - curr_line = f.readline() - # readline() returns empty string when there's no new line. - if curr_line: - lines += curr_line - else: + if curr_line is None: + # Only read the next line in the file + # if there's no remaining "curr_line" to process + curr_line = f.readline() + new_chunk_char_count = chunk_char_count + len(curr_line) + if new_chunk_char_count > MAX_CHUNK_CHAR_LENGTH: + # Too many characters, return 20000 in this chunk, and then + # continue loop with remaining characters in curr_line + truncated_line = curr_line[0 : MAX_CHUNK_CHAR_LENGTH - chunk_char_count] + lines.append(truncated_line) + # Set remainder of current line to process next + curr_line = curr_line[MAX_CHUNK_CHAR_LENGTH - chunk_char_count :] yield lines or None - lines = "" + lines = [] + chunk_char_count = 0 + elif len(lines) >= 9: + # Too many lines, return 10 lines in this chunk, and then + # continue reading the file. + lines.append(curr_line) + yield lines or None + lines = [] + chunk_char_count = 0 + curr_line = None + elif curr_line: + # Add line to current chunk + lines.append(curr_line) + chunk_char_count = new_chunk_char_count + curr_line = None + else: + # readline() returns empty string when there's no new line. + yield lines or None + lines = [] + chunk_char_count = 0 + curr_line = None