[log_monitor] Seek when reopening a file due to inode change (#27508)

When reopening a file due to an inode change, we weren't seeking back to the right location. Now we are (with a unit test).

Closes (but not really until it's cherry-picked) #27507

Co-authored-by: Alex <alex@anyscale.com>
Co-authored-by: Eric Liang <ekhliang@gmail.com>
This commit is contained in:
Alex Wu 2022-08-05 18:27:43 -07:00 committed by GitHub
parent 098628d9bf
commit a6b9019d38
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 1 deletions

View file

@ -73,10 +73,13 @@ class LogFileInfo:
would have different inodes, such as log rotation or file syncing
semantics.
"""
open_inode = os.fstat(self.file_handle.fileno()).st_ino
open_inode = None
if self.file_handle and not self.file_handle.closed:
open_inode = os.fstat(self.file_handle.fileno()).st_ino
new_inode = os.stat(self.filename).st_ino
if open_inode != new_inode:
self.file_handle = open(self.filename, "rb")
self.file_handle.seek(self.file_position)
def __repr__(self):
return (

View file

@ -15,6 +15,7 @@ from ray._private import ray_constants
from ray._private.log_monitor import (
LOG_NAME_UPDATE_INTERVAL_S,
RAY_LOG_MONITOR_MANY_FILES_THRESHOLD,
LogFileInfo,
LogMonitor,
)
from ray._private.test_utils import (
@ -32,6 +33,49 @@ def set_logging_config(monkeypatch, max_bytes, backup_count):
monkeypatch.setenv("RAY_ROTATION_BACKUP_COUNT", str(backup_count))
def test_reopen_changed_inode(tmp_path):
"""Make sure that when we reopen a file because the inode has changed, we
open to the right location."""
path1 = tmp_path / "file"
path2 = tmp_path / "changed_file"
with open(path1, "w") as f:
for i in range(1000):
print(f"{i}", file=f)
with open(path2, "w") as f:
for i in range(2000):
print(f"{i}", file=f)
file_info = LogFileInfo(
filename=path1,
size_when_last_opened=0,
file_position=0,
file_handle=None,
is_err_file=False,
job_id=None,
worker_pid=None,
)
file_info.reopen_if_necessary()
for _ in range(1000):
file_info.file_handle.readline()
orig_file_pos = file_info.file_handle.tell()
file_info.file_position = orig_file_pos
# NOTE: On windows, an open file can't be deleted.
file_info.file_handle.close()
os.remove(path1)
os.rename(path2, path1)
file_info.reopen_if_necessary()
assert file_info.file_position == orig_file_pos
assert file_info.file_handle.tell() == orig_file_pos
def test_log_rotation_config(ray_start_cluster, monkeypatch):
cluster = ray_start_cluster
max_bytes = 100