Add maximum number of characters in logs output for jobs status message (#27581)

We've seen the API server go down from trying to return 500mb of log output
This commit is contained in:
Alan Guo 2022-08-08 20:24:51 -07:00 committed by GitHub
parent 0e74bc20b5
commit c3a8ba0f8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 152 additions and 24 deletions

View file

@ -10,7 +10,7 @@ import time
import traceback
from asyncio.tasks import FIRST_COMPLETED
from collections import deque
from typing import Any, Dict, Iterator, Optional, Tuple
from typing import Any, Dict, Iterator, List, Optional, Tuple
import ray
from ray._private.gcs_utils import GcsAioClient
@ -58,6 +58,9 @@ class JobLogStorageClient:
JOB_LOGS_PATH = "job-driver-{job_id}.log"
# Number of last N lines to put in job message upon failure.
NUM_LOG_LINES_ON_ERROR = 10
# Maximum number of characters to print out of the logs to avoid
# HUGE log outputs that bring down the api server
MAX_LOG_SIZE = 20000
def get_logs(self, job_id: str) -> str:
try:
@ -66,20 +69,31 @@ class JobLogStorageClient:
except FileNotFoundError:
return ""
def tail_logs(self, job_id: str) -> Iterator[str]:
def tail_logs(self, job_id: str) -> Iterator[List[str]]:
return file_tail_iterator(self.get_log_file_path(job_id))
def get_last_n_log_lines(
self, job_id: str, num_log_lines=NUM_LOG_LINES_ON_ERROR
) -> str:
"""
Returns the last MAX_LOG_SIZE (20000) characters in the last
`num_log_lines` lines.
Args:
job_id: The id of the job whose logs we want to return
num_log_lines: The number of lines to return.
"""
log_tail_iter = self.tail_logs(job_id)
log_tail_deque = deque(maxlen=num_log_lines)
for line in log_tail_iter:
if line is None:
for lines in log_tail_iter:
if lines is None:
break
else:
log_tail_deque.append(line)
return "".join(log_tail_deque)
# log_tail_iter can return batches of lines at a time.
for line in lines:
log_tail_deque.append(line)
return "".join(log_tail_deque)[-self.MAX_LOG_SIZE :]
def get_log_file_path(self, job_id: str) -> Tuple[str, str]:
"""
@ -281,7 +295,8 @@ class JobSupervisor:
if log_tail is not None and log_tail != "":
message = (
"Job failed due to an application error, "
"last available logs:\n" + log_tail
"last available logs (truncated to 20,000 chars):\n"
+ log_tail
)
else:
message = None
@ -562,8 +577,8 @@ class JobManager:
if await self.get_job_status(job_id) is None:
raise RuntimeError(f"Job '{job_id}' does not exist.")
for line in self._log_client.tail_logs(job_id):
if line is None:
for lines in self._log_client.tail_logs(job_id):
if lines is None:
# Return if the job has exited and there are no new log lines.
status = await self.get_job_status(job_id)
if status not in {JobStatus.PENDING, JobStatus.RUNNING}:
@ -571,4 +586,4 @@ class JobManager:
await asyncio.sleep(self.LOG_TAIL_SLEEP_S)
else:
yield line
yield "".join(lines)

View file

@ -793,5 +793,29 @@ async def test_job_runs_with_no_resources_available(job_manager):
ray.cancel(hanging_ref)
async def test_failed_job_logs_max_char(job_manager):
"""Test failed jobs does not print out too many logs"""
# Prints 21000 characters
print_large_logs_cmd = (
"python -c \"print('1234567890'* 2100); raise RuntimeError()\""
)
job_id = await job_manager.submit_job(
entrypoint=print_large_logs_cmd,
)
await async_wait_for_condition(
check_job_failed, job_manager=job_manager, job_id=job_id
)
# Verify the status message length
job_info = await job_manager.get_job_info(job_id)
assert job_info
assert len(job_info.message) == 20000 + len(
"Job failed due to an application error, " "last available logs:\n"
)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -34,7 +34,7 @@ class TestIterLine:
assert next(it) is None
f.write("\n")
f.flush()
assert next(it) == "no_newline_yet\n"
assert next(it) == ["no_newline_yet\n"]
def test_multiple_lines(self, tmp):
it = file_tail_iterator(tmp)
@ -47,7 +47,7 @@ class TestIterLine:
s = f"{i}\n"
f.write(s)
f.flush()
assert next(it) == s
assert next(it) == [s]
assert next(it) is None
@ -64,10 +64,64 @@ class TestIterLine:
f.write(f"{i}\n")
f.flush()
assert next(it) == "\n".join(str(i) for i in range(10)) + "\n"
assert next(it) == [f"{i}\n" for i in range(10)]
assert next(it) is None
def test_max_line_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
f = open(tmp, "w")
# Write lines in batches of 50, check that we get them back in batches of 10.
for _ in range(100):
num_lines = 50
for i in range(num_lines):
f.write(f"{i}\n")
f.flush()
assert next(it) == [f"{i}\n" for i in range(10)]
assert next(it) == [f"{i}\n" for i in range(10, 20)]
assert next(it) == [f"{i}\n" for i in range(20, 30)]
assert next(it) == [f"{i}\n" for i in range(30, 40)]
assert next(it) == [f"{i}\n" for i in range(40, 50)]
assert next(it) is None
def test_max_char_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
f = open(tmp, "w")
# Write a single line that is over 60000 characters,
# check we get it in batches of 20000
f.write(f"{'1234567890' * 6000}\n")
f.flush()
assert next(it) == ["1234567890" * 2000]
assert next(it) == ["1234567890" * 2000]
assert next(it) == ["1234567890" * 2000]
assert next(it) == ["\n"]
assert next(it) is None
# Write a 10 lines where last line is over 20000 characters,
# check we get it in batches of 20000
for i in range(9):
f.write(f"{i}\n")
f.write(f"{'1234567890' * 2000}\n")
f.flush()
first_nine_lines = [f"{i}\n" for i in range(9)]
first_nine_lines_length = sum(len(line) for line in first_nine_lines)
assert next(it) == first_nine_lines + [
f"{'1234567890' * 2000}"[0:-first_nine_lines_length]
]
# Remainder of last line
assert next(it) == [f"{'1234567890' * 2000}"[-first_nine_lines_length:] + "\n"]
assert next(it) is None
def test_delete_file(self):
with NamedTemporaryFile() as tmp:
it = file_tail_iterator(tmp.name)
@ -78,7 +132,7 @@ class TestIterLine:
f.write("hi\n")
f.flush()
assert next(it) == "hi\n"
assert next(it) == ["hi\n"]
# Calls should continue returning None after file deleted.
assert next(it) is None

View file

@ -1,14 +1,20 @@
import logging
import os
from typing import Iterator, Optional
from typing import Iterator, List, Optional
logger = logging.getLogger(__name__)
MAX_CHUNK_LINE_LENGTH = 10
MAX_CHUNK_CHAR_LENGTH = 20000
def file_tail_iterator(path: str) -> Iterator[Optional[str]]:
def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]:
"""Yield lines from a file as it's written.
Returns lines in batches opportunistically.
Returns lines in batches of up to 10 lines or 20000 characters,
whichever comes first. If it's a chunk of 20000 characters, then
the last line that is yielded could be an incomplete line.
New line characters are kept in the line string.
Returns None until the file exists or if no new line has been written.
"""
@ -20,12 +26,41 @@ def file_tail_iterator(path: str) -> Iterator[Optional[str]]:
yield None
with open(path, "r") as f:
lines = ""
lines = []
chunk_char_count = 0
curr_line = None
while True:
curr_line = f.readline()
# readline() returns empty string when there's no new line.
if curr_line:
lines += curr_line
else:
if curr_line is None:
# Only read the next line in the file
# if there's no remaining "curr_line" to process
curr_line = f.readline()
new_chunk_char_count = chunk_char_count + len(curr_line)
if new_chunk_char_count > MAX_CHUNK_CHAR_LENGTH:
# Too many characters, return 20000 in this chunk, and then
# continue loop with remaining characters in curr_line
truncated_line = curr_line[0 : MAX_CHUNK_CHAR_LENGTH - chunk_char_count]
lines.append(truncated_line)
# Set remainder of current line to process next
curr_line = curr_line[MAX_CHUNK_CHAR_LENGTH - chunk_char_count :]
yield lines or None
lines = ""
lines = []
chunk_char_count = 0
elif len(lines) >= 9:
# Too many lines, return 10 lines in this chunk, and then
# continue reading the file.
lines.append(curr_line)
yield lines or None
lines = []
chunk_char_count = 0
curr_line = None
elif curr_line:
# Add line to current chunk
lines.append(curr_line)
chunk_char_count = new_chunk_char_count
curr_line = None
else:
# readline() returns empty string when there's no new line.
yield lines or None
lines = []
chunk_char_count = 0
curr_line = None