[Core] test_advanced_3.py::test_logging_to_driver (round 2) (#9916)

This commit is contained in:
Alex Wu 2020-08-05 13:04:36 -07:00 committed by GitHub
parent b6da7dcef9
commit 12d75784a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 46 additions and 9 deletions

View file

@ -202,17 +202,21 @@ def test_logging_to_driver(shutdown_only):
def f():
# It's important to make sure that these print statements occur even
# without calling sys.stdout.flush() and sys.stderr.flush().
for i in range(100):
print(i)
print(100 + i, file=sys.stderr)
for i in range(10):
print(i, end=" ")
print(100 + i, end=" ", file=sys.stderr)
captured = {}
with CaptureOutputAndError(captured):
ray.get(f.remote())
time.sleep(1)
out_lines = captured["out"]
err_lines = captured["err"]
for i in range(200):
for i in range(10):
assert str(i) in out_lines
for i in range(100, 110):
assert str(i) in err_lines

View file

@ -391,20 +391,52 @@ def setup_logger(logging_level, logging_format):
logger.propagate = False
def open_log(path, **kwargs):
class Unbuffered(object):
"""There's no "built-in" solution to programatically disabling buffering of
text files. Ray expects stdout/err to be text files, so creating an
unbuffered binary file is unacceptable.
See
https://mail.python.org/pipermail/tutor/2003-November/026645.html.
https://docs.python.org/3/library/functions.html#open
"""
def __init__(self, stream):
self.stream = stream
def write(self, data):
self.stream.write(data)
self.stream.flush()
def writelines(self, datas):
self.stream.writelines(datas)
self.stream.flush()
def __getattr__(self, attr):
return getattr(self.stream, attr)
def open_log(path, unbuffered=False, **kwargs):
"""
Opens the log file at `path`, with the provided kwargs being given to
`open`.
"""
# Disable buffering, see test_advanced_3.py::test_logging_to_driver
kwargs.setdefault("buffering", 1)
kwargs.setdefault("mode", "a")
kwargs.setdefault("encoding", "utf-8")
return open(path, **kwargs)
stream = open(path, **kwargs)
if unbuffered:
return Unbuffered(stream)
else:
return stream
def create_and_init_new_worker_log(path, worker_pid):
"""
Opens a path (or creates if necessary) for a log.
"""Opens or creates and sets up a new worker log file. Note that because we
expect to dup the underlying file descriptor, then fdopen it, the python
level metadata is not important.
Args:
path (str): The name/path of the file to be opened.
@ -412,6 +444,7 @@ def create_and_init_new_worker_log(path, worker_pid):
Returns:
A file-like object which can be written to.
"""
# TODO (Alex): We should eventually be able to replace this with
# named-pipes.

View file

@ -931,7 +931,7 @@ def _set_log_file(file_name, worker_pid, old_obj, setter_func):
# and stderr are heavily buffered resulting in seemingly lost logging
# statements. We never want to close the stdout file descriptor, dup2 will
# close it when necessary and we don't want python's GC to close it.
setter_func(open_log(fileno, closefd=False))
setter_func(open_log(fileno, unbuffered=True, closefd=False))
return os.path.abspath(f.name)