mirror of
https://github.com/vale981/emacs-ipython-notebook
synced 2025-03-05 09:01:40 -05:00
501 lines
16 KiB
Python
Executable file
501 lines
16 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
"""
|
|
Run EIN test suite
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import glob
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
import itertools
|
|
|
|
EIN_ROOT = os.path.normpath(
|
|
os.path.join(os.path.dirname(__file__), os.path.pardir))
|
|
|
|
|
|
def has_library(emacs, library):
|
|
"""
|
|
Return True when `emacs` has build-in `library`.
|
|
"""
|
|
with open(os.devnull, 'w') as devnull:
|
|
proc = Popen(
|
|
[emacs, '-Q', '-batch', '-l', 'cl',
|
|
'--eval', '(assert (locate-library "{0}"))'.format(library)],
|
|
stdout=devnull, stderr=devnull)
|
|
return proc.wait() == 0
|
|
|
|
|
|
def eindir(*path):
|
|
return os.path.join(EIN_ROOT, *path)
|
|
|
|
|
|
def einlispdir(*path):
|
|
return eindir('lisp', *path)
|
|
|
|
|
|
def eintestdir(*path):
|
|
return eindir('tests', *path)
|
|
|
|
|
|
def einlibdir(*path):
|
|
return eindir('lib', *path)
|
|
|
|
|
|
def show_nonprinting(string, stream=sys.stdout):
|
|
"""Emulate ``cat -v`` (``--show-nonprinting``)."""
|
|
stream.writelines(map(chr, convert_nonprinting(string)))
|
|
|
|
|
|
def convert_nonprinting(string):
|
|
"""
|
|
Convert non-printing characters in `string`.
|
|
|
|
Output is iterable of int. So for Python 2, you need to
|
|
convert it into string using `chr`.
|
|
|
|
Adapted from: http://stackoverflow.com/a/437542/727827
|
|
|
|
"""
|
|
|
|
for b in map(ord, string.decode('utf-8')):
|
|
assert 0 <= b < 0x100
|
|
|
|
if b in (0x09, 0x0a): # '\t\n'
|
|
yield b
|
|
continue
|
|
|
|
if b > 0x7f: # not ascii
|
|
yield 0x4d # 'M'
|
|
yield 0x2d # '-'
|
|
b &= 0x7f
|
|
|
|
if b < 0x20: # control char
|
|
yield 0x5e # '^'
|
|
b |= 0x40
|
|
elif b == 0x7f:
|
|
yield 0x5e # '^'
|
|
yield 0x3f # '?'
|
|
continue
|
|
|
|
yield b
|
|
|
|
|
|
class BaseRunner(object):
|
|
|
|
def __init__(self, **kwds):
|
|
self.__dict__.update(kwds)
|
|
self.batch = self.batch and not self.debug_on_error
|
|
|
|
def logpath(self, name, ext='log'):
|
|
return os.path.join(
|
|
self.log_dir,
|
|
"{testname}_{logname}_{modename}_{emacsname}.{ext}".format(
|
|
ext=ext,
|
|
logname=name,
|
|
emacsname=os.path.basename(self.emacs),
|
|
testname=os.path.splitext(self.testfile)[0],
|
|
modename='batch' if self.batch else 'interactive',
|
|
))
|
|
|
|
@property
|
|
def command(self):
|
|
raise NotImplementedError
|
|
|
|
def do_run(self):
|
|
raise NotImplementedError
|
|
|
|
def run(self):
|
|
if self.dry_run:
|
|
command = self.command
|
|
if isinstance(command, str):
|
|
print(command)
|
|
else:
|
|
print((construct_command(command)))
|
|
return 0
|
|
else:
|
|
mkdirp(self.log_dir)
|
|
return self.do_run()
|
|
|
|
|
|
class TestRunner(BaseRunner):
|
|
|
|
def __init__(self, **kwds):
|
|
super(TestRunner, self).__init__(**kwds)
|
|
|
|
fmtdata = self.__dict__.copy()
|
|
fmtdata.update(
|
|
emacsname=os.path.basename(self.emacs),
|
|
testname=os.path.splitext(self.testfile)[0],
|
|
modename='batch' if self.batch else 'interactive',
|
|
)
|
|
quote = '"{0}"'.format
|
|
self.logpath_log = self.logpath('log')
|
|
self.logpath_messages = self.logpath('messages')
|
|
self.lispvars = {
|
|
'ein:testing-dump-file-log': quote(self.logpath_log),
|
|
'ein:testing-dump-file-messages': quote(self.logpath_messages),
|
|
'ein:log-level': self.ein_log_level,
|
|
'ein:log-message-level': self.ein_message_level,
|
|
}
|
|
if self.ein_debug:
|
|
self.lispvars['ein:debug'] = "'t"
|
|
|
|
def setq(self, sym, val):
|
|
self.lispvars[sym] = val
|
|
|
|
def bind_lispvars(self):
|
|
command = []
|
|
for (k, v) in self.lispvars.items():
|
|
if v is not None:
|
|
command.extend([
|
|
'--eval', '(setq {0} {1})'.format(k, v)])
|
|
return command
|
|
|
|
@property
|
|
def base_command(self):
|
|
command = [self.emacs, '-Q'] + self.bind_lispvars()
|
|
|
|
if self.batch:
|
|
command.append('-batch')
|
|
if self.debug_on_error:
|
|
command.extend(['-f', 'toggle-debug-on-error'])
|
|
|
|
# load modules
|
|
if self.need_ert():
|
|
ertdir = einlibdir('ert', 'lisp', 'emacs-lisp')
|
|
command.extend([
|
|
'-L', ertdir,
|
|
# Load `ert-run-tests-batch-and-exit`:
|
|
'-l', os.path.join(ertdir, 'ert-batch.el'),
|
|
# Load `ert-run-tests-interactively`:
|
|
'-l', os.path.join(ertdir, 'ert-ui.el'),
|
|
])
|
|
for path in self.load_path:
|
|
command.extend(['-L', path])
|
|
for path in self.load:
|
|
command.extend(['-l', path])
|
|
command.extend(['-L', einlispdir(),
|
|
'-L', einlibdir('websocket'),
|
|
'-L', einlibdir('request'),
|
|
'-L', einlibdir('auto-complete'),
|
|
'-L', einlibdir('popup'),
|
|
'-L', eintestdir(),
|
|
'-l', eintestdir(self.testfile)])
|
|
return command
|
|
|
|
@property
|
|
def command(self):
|
|
command = self.base_command[:]
|
|
if self.batch:
|
|
command.extend(['-f', 'ert-run-tests-batch-and-exit'])
|
|
else:
|
|
command.extend(['--eval', "(ert 't)"])
|
|
return command
|
|
|
|
def show_sys_info(self):
|
|
print(("*" * 50))
|
|
command = self.base_command + [
|
|
'-batch', '-l', 'ein-dev', '-f', 'ein:dev-print-sys-info']
|
|
proc = Popen(command, stderr=PIPE)
|
|
err = proc.stderr.read()
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
print(("Error with return code {0} while running {1}".format(
|
|
proc.returncode, command)))
|
|
print(err)
|
|
pass
|
|
print(("*" * 50))
|
|
|
|
def need_ert(self):
|
|
if self.load_ert:
|
|
return True
|
|
if self.auto_ert:
|
|
if has_library(self.emacs, 'ert'):
|
|
print(("{0} has ERT module.".format(self.emacs)))
|
|
return False
|
|
else:
|
|
print("{0} has no ERT module.".format(self.emacs))
|
|
print("ERT is going to be loaded from git submodule.")
|
|
return True
|
|
return False
|
|
|
|
def make_process(self):
|
|
print("Start test {0}".format(self.testfile))
|
|
self.proc = Popen(self.command, stdout=PIPE, stderr=STDOUT)
|
|
return self.proc
|
|
|
|
def report(self):
|
|
(stdout, _) = self.proc.communicate()
|
|
self.stdout = stdout
|
|
self.failed = self.proc.returncode != 0
|
|
if self.failed:
|
|
print("*" * 50)
|
|
print("Showing {0}:".format(self.logpath_log))
|
|
print(open(self.logpath_log).read())
|
|
print()
|
|
print("*" * 50)
|
|
print("Showing STDOUT/STDERR:")
|
|
show_nonprinting(stdout)
|
|
print()
|
|
print("{0} failed".format(self.testfile))
|
|
else:
|
|
print("{0} OK".format(self.testfile))
|
|
for line in reversed(stdout.decode('utf-8').splitlines()):
|
|
if line.startswith('Ran'):
|
|
print(line)
|
|
break
|
|
return int(self.failed)
|
|
|
|
def do_run(self):
|
|
self.show_sys_info()
|
|
self.make_process()
|
|
return self.report()
|
|
|
|
def is_known_failure(self):
|
|
"""
|
|
Check if failures are known, based on STDOUT from ERT.
|
|
"""
|
|
import re
|
|
lines = iter(self.stdout.splitlines())
|
|
for l in lines:
|
|
if re.match("[0-9]+ unexpected results:.*", l.decode('utf-8')):
|
|
break
|
|
else:
|
|
return True # no failure
|
|
|
|
# Check "FAILED <test-name>" lines
|
|
for l in lines:
|
|
if not l:
|
|
break # end with an empty line
|
|
for f in self.known_failures:
|
|
if re.search(f, l.decode('utf-8')):
|
|
break
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
known_failures = [
|
|
"ein:notebook-execute-current-cell-pyout-image$",
|
|
]
|
|
"""
|
|
A list of regexp which matches to test that is known to fail (sometimes).
|
|
This is a workaround for ##74.
|
|
"""
|
|
|
|
|
|
def mkdirp(path):
|
|
"""Do ``mkdir -p {path}``"""
|
|
if not os.path.isdir(path):
|
|
os.makedirs(path)
|
|
|
|
|
|
def remove_elc():
|
|
files = glob.glob(einlispdir("*.elc")) + glob.glob(eintestdir("*.elc"))
|
|
list(map(os.remove, files))
|
|
print("Removed {0} elc files".format(len(files)))
|
|
|
|
|
|
class ServerRunner(BaseRunner):
|
|
|
|
port = None
|
|
notebook_dir = os.path.join(EIN_ROOT, "tests", "notebook")
|
|
|
|
def __enter__(self):
|
|
self.run()
|
|
return self.port
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.stop()
|
|
|
|
def do_run(self):
|
|
self.clear_notebook_dir()
|
|
self.start()
|
|
self.get_port()
|
|
print("Server running at", self.port)
|
|
|
|
def clear_notebook_dir(self):
|
|
files = glob.glob(os.path.join(self.notebook_dir, '*.ipynb'))
|
|
list(map(os.remove, files))
|
|
print("Removed {0} ipynb files".format(len(files)))
|
|
|
|
@staticmethod
|
|
def _parse_port_line(line):
|
|
return line.strip().rsplit(':', 1)[-1].strip('/')
|
|
|
|
def get_port(self):
|
|
if self.port is None:
|
|
self.port = self._parse_port_line(self.proc.stdout.readline().decode('utf-8'))
|
|
return self.port
|
|
|
|
def start(self):
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
self.proc = Popen(
|
|
self.command, stdout=PIPE, stderr=STDOUT, stdin=PIPE,
|
|
shell=True)
|
|
# Answer "y" to the prompt: Shutdown Notebook Server (y/[n])?
|
|
self.proc.stdin.write(b'y\n')
|
|
|
|
def stop(self):
|
|
print("Stopping server", self.port)
|
|
returncode = self.proc.poll()
|
|
if returncode is not None:
|
|
logpath = self.logpath('server')
|
|
print("Server process was already dead by exit code", returncode)
|
|
print("*" * 50)
|
|
print("Showing {0}:".format(logpath))
|
|
print(open(logpath).read())
|
|
print()
|
|
return
|
|
if not self.dry_run:
|
|
try:
|
|
kill_subprocesses(self.proc.pid, lambda x: 'ipython' in x)
|
|
finally:
|
|
self.proc.terminate()
|
|
|
|
@property
|
|
def command(self):
|
|
fmtdata = dict(
|
|
notebook_dir=self.notebook_dir,
|
|
ipython=self.ipython,
|
|
server_log=self.logpath('server'),
|
|
)
|
|
return self.command_template.format(**fmtdata)
|
|
|
|
command_template = r"""
|
|
{ipython} notebook \
|
|
--notebook-dir {notebook_dir} \
|
|
--debug \
|
|
--no-browser 2>&1 \
|
|
| tee {server_log} \
|
|
| grep --line-buffered 'The IPython Notebook is running at' \
|
|
| head -n1
|
|
"""
|
|
|
|
|
|
def kill_subprocesses(pid, include=lambda x: True):
|
|
from subprocess import Popen, PIPE
|
|
import signal
|
|
|
|
command = ['ps', '-e', '-o', 'ppid,pid,command']
|
|
proc = Popen(command, stdout=PIPE, stderr=PIPE)
|
|
(stdout, stderr) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(
|
|
'Command {0} failed with code {1} and following error message:\n'
|
|
'{2}'.format(command, proc.returncode, stderr))
|
|
|
|
for line in map(lambda l: str.strip(str(l)), stdout.decode('utf-8').splitlines()):
|
|
(cmd_ppid, cmd_pid, cmd) = line.split(None, 2)
|
|
if cmd_ppid == str(pid) and include(cmd):
|
|
print("Killing PID={0} COMMAND={1}".format(cmd_pid, cmd))
|
|
os.kill(int(cmd_pid), signal.SIGINT)
|
|
|
|
|
|
def construct_command(args):
|
|
"""
|
|
Construct command as a string given a list of arguments.
|
|
"""
|
|
command = []
|
|
escapes = set(' ()')
|
|
for a in args:
|
|
if set(a) & escapes:
|
|
command.append(repr(str(a))) # hackish way to escape
|
|
else:
|
|
command.append(a)
|
|
return " ".join(command)
|
|
|
|
|
|
def run_ein_test(unit_test, func_test, func_test_max_retries,
|
|
no_skip, clean_elc, **kwds):
|
|
if clean_elc and not kwds['dry_run']:
|
|
remove_elc()
|
|
if unit_test:
|
|
unit_test_runner = TestRunner(testfile='test-load.el', **kwds)
|
|
if unit_test_runner.run() != 0:
|
|
return 1
|
|
if func_test:
|
|
for i in range(func_test_max_retries + 1):
|
|
func_test_runner = TestRunner(testfile='func-test.el', **kwds)
|
|
with ServerRunner(testfile='func-test.el', **kwds) as port:
|
|
func_test_runner.setq('ein:testing-port', port)
|
|
if func_test_runner.run() == 0:
|
|
print("Functional test succeeded after {0} retries." \
|
|
.format(i))
|
|
return 0
|
|
if not no_skip and func_test_runner.is_known_failure():
|
|
print("All failures are known. Ending functional test.")
|
|
return 0
|
|
print("Functional test failed after {0} retries.".format(i))
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def main():
|
|
import sys
|
|
from argparse import ArgumentParser
|
|
parser = ArgumentParser(description=__doc__.splitlines()[1])
|
|
parser.add_argument('--emacs', '-e', default='emacs',
|
|
help='Emacs executable.')
|
|
parser.add_argument('--load-path', '-L', default=[], action='append',
|
|
help="add a directory to load-path. "
|
|
"can be specified multiple times.")
|
|
parser.add_argument('--load', '-l', default=[], action='append',
|
|
help="load lisp file before tests. "
|
|
"can be specified multiple times.")
|
|
parser.add_argument('--load-ert', default=False, action='store_true',
|
|
help="load ERT from git submodule. "
|
|
"you need to update git submodule manually "
|
|
"if ert/ directory does not exist yet.")
|
|
parser.add_argument('--no-auto-ert', default=True,
|
|
dest='auto_ert', action='store_false',
|
|
help="load ERT from git submodule. "
|
|
"if this Emacs has no build-in ERT module.")
|
|
parser.add_argument('--no-batch', '-B', default=True,
|
|
dest='batch', action='store_false',
|
|
help="start interactive session.")
|
|
parser.add_argument('--debug-on-error', '-d', default=False,
|
|
action='store_true',
|
|
help="set debug-on-error to t and start "
|
|
"interactive session.")
|
|
parser.add_argument('--func-test-max-retries', default=4, type=int,
|
|
help="""
|
|
Specify number of retries for functional test
|
|
before failing with error. This is workaround
|
|
for the issue #74.
|
|
""")
|
|
parser.add_argument('--no-skip', default=False, action='store_true',
|
|
help="""
|
|
Do no skip known failures. Known failures
|
|
are implemented as another workaround for the
|
|
issue #74.
|
|
""")
|
|
parser.add_argument('--no-func-test', '-F', default=True,
|
|
dest='func_test', action='store_false',
|
|
help="do not run functional test.")
|
|
parser.add_argument('--no-unit-test', '-U', default=True,
|
|
dest='unit_test', action='store_false',
|
|
help="do not run unit test.")
|
|
parser.add_argument('--clean-elc', '-c', default=False,
|
|
action='store_true',
|
|
help="remove *.elc files in ein/lisp and "
|
|
"ein/tests directories.")
|
|
parser.add_argument('--dry-run', default=False,
|
|
action='store_true',
|
|
help="Print commands to be executed.")
|
|
parser.add_argument('--ipython', default='ipython',
|
|
help="""
|
|
ipython executable to use to run notebook server.
|
|
""")
|
|
parser.add_argument('--ein-log-level', default=40)
|
|
parser.add_argument('--ein-message-level', default=30)
|
|
parser.add_argument('--ein-debug', default=False, action='store_true',
|
|
help="(setq ein:debug t) when given.")
|
|
parser.add_argument('--log-dir', default="log",
|
|
help="Directory to store log (default: %(default)s)")
|
|
args = parser.parse_args()
|
|
sys.exit(run_ein_test(**vars(args)))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|