emacs-ipython-notebook/tools/testein.py

514 lines
17 KiB
Python
Executable file

#!/usr/bin/env python
"""
Run EIN test suite
"""
import glob
import os
import sys
import re
from subprocess import Popen, PIPE, STDOUT, check_output
EIN_ROOT = os.path.normpath(
os.path.join(os.path.dirname(__file__), os.path.pardir))
def cask_load_path():
path = check_output(['cask','load-path'])
return path.decode()
def has_library(emacs, library):
"""
Return True when `emacs` has build-in `library`.
"""
with open(os.devnull, 'w') as devnull:
proc = Popen(
[emacs, '-Q', '-batch', '-l', 'cl',
'--eval', '(assert (locate-library "{0}"))'.format(library)],
stdout=devnull, stderr=devnull)
return proc.wait() == 0
def eindir(*path):
return os.path.join(EIN_ROOT, *path)
def einlispdir(*path):
return eindir('lisp', *path)
def eintestdir(*path):
return eindir('tests', *path)
def einlibdir(*path):
return eindir('lib', *path)
def show_nonprinting(string, stream=sys.stdout):
"""Emulate ``cat -v`` (``--show-nonprinting``)."""
stream.writelines(map(chr, convert_nonprinting(string)))
def convert_nonprinting(string):
"""
Convert non-printing characters in `string`.
Output is iterable of int. So for Python 2, you need to
convert it into string using `chr`.
Adapted from: http://stackoverflow.com/a/437542/727827
"""
for b in map(ord, string.decode('utf-8')):
assert 0 <= b < 0x100
if b in (0x09, 0x0a): # '\t\n'
yield b
continue
if b > 0x7f: # not ascii
yield 0x4d # 'M'
yield 0x2d # '-'
b &= 0x7f
if b < 0x20: # control char
yield 0x5e # '^'
b |= 0x40
elif b == 0x7f:
yield 0x5e # '^'
yield 0x3f # '?'
continue
yield b
class BaseRunner(object):
def __init__(self, **kwds):
self.__dict__.update(kwds)
self.batch = self.batch and not self.debug_on_error
def logpath(self, name, ext='log'):
path = os.path.join(
self.log_dir,
"{testname}_{logname}_{modename}_{emacsname}.{ext}".format(
ext=ext,
logname=name,
emacsname=os.path.basename(self.emacs),
testname=os.path.splitext(self.testfile)[0],
modename='batch' if self.batch else 'interactive',
))
path = re.sub(r'\\', '/', path)
return path
@property
def command(self):
raise NotImplementedError
def do_run(self):
raise NotImplementedError
def run(self):
if self.dry_run:
command = self.command
if isinstance(command, str):
print(command)
else:
print((construct_command(command)))
return 0
else:
mkdirp(self.log_dir)
return self.do_run()
class TestRunner(BaseRunner):
def __init__(self, **kwds):
super(TestRunner, self).__init__(**kwds)
fmtdata = self.__dict__.copy()
fmtdata.update(
emacsname=os.path.basename(self.emacs),
testname=os.path.splitext(self.testfile)[0],
modename='batch' if self.batch else 'interactive',
)
quote = '"{0}"'.format
self.logpath_log = self.logpath('log')
self.logpath_messages = self.logpath('messages')
self.lispvars = {
'ein:testing-dump-file-log': quote(self.logpath_log),
'ein:testing-dump-file-messages': quote(self.logpath_messages),
'ein:log-level': self.ein_log_level,
'ein:log-message-level': self.ein_message_level,
}
if self.ein_debug:
self.lispvars['ein:debug'] = "'t"
def setq(self, sym, val):
self.lispvars[sym] = val
def bind_lispvars(self):
command = []
for (k, v) in self.lispvars.items():
if v is not None:
command.extend([
'--eval', '(setq {0} {1})'.format(k, v)])
return command
@property
def base_command(self):
command = [self.emacs, '-Q'] + self.bind_lispvars()
if self.batch:
command.append('-batch')
if self.debug_on_error:
command.extend(['-f', 'toggle-debug-on-error'])
# load modules
if self.need_ert():
ertdir = einlibdir('ert', 'lisp', 'emacs-lisp')
command.extend([
'-L', ertdir,
# Load `ert-run-tests-batch-and-exit`:
'-l', os.path.join(ertdir, 'ert-batch.el'),
# Load `ert-run-tests-interactively`:
'-l', os.path.join(ertdir, 'ert-ui.el'),
])
for path in self.load_path:
command.extend(['-L', path])
for path in self.load:
command.extend(['-l', path])
command.extend(['-L', einlispdir(),
'-L', eintestdir(),
'-l', eintestdir(self.testfile)])
# command.extend(['-L', einlispdir(),
# '-L', einlibdir('websocket'),
# '-L', einlibdir('request'),
# '-L', einlibdir('auto-complete'),
# '-L', einlibdir('popup'),
# '-L', eintestdir(),
# '-l', eintestdir(self.testfile)])
return command
@property
def command(self):
command = self.base_command[:]
if self.batch:
command.extend(['-f', 'ert-run-tests-batch-and-exit'])
else:
command.extend(['--eval', "(ert 't)"])
return command
def show_sys_info(self):
print(("*" * 50))
command = self.base_command + [
'-batch', '-l', 'lisp/ein-dev.el', '-f', 'ein:dev-print-sys-info']
proc = Popen(command, stderr=PIPE)
err = proc.stderr.read()
proc.wait()
if proc.returncode != 0:
print(("Error with return code {0} while running {1}".format(
proc.returncode, command)))
print(err)
pass
print(("*" * 50))
def need_ert(self):
if self.load_ert:
return True
if self.auto_ert:
if has_library(self.emacs, 'ert'):
print(("{0} has ERT module.".format(self.emacs)))
return False
else:
print("{0} has no ERT module.".format(self.emacs))
print("ERT is going to be loaded from git submodule.")
return True
return False
def make_process(self):
print("Start test {0}".format(self.testfile))
print("Emacs command {0}".format(self.command))
self.proc = Popen(self.command, stdout=PIPE, stderr=STDOUT)
return self.proc
def report(self):
(stdout, _) = self.proc.communicate()
self.stdout = stdout
self.failed = self.proc.returncode != 0
if self.failed:
print("*" * 50)
print("Showing {0}:".format(self.logpath_log))
print(open(self.logpath_log).read())
print()
print("*" * 50)
print("Showing STDOUT/STDERR:")
show_nonprinting(stdout)
print()
print("{0} failed".format(self.testfile))
else:
print("{0} OK".format(self.testfile))
for line in reversed(stdout.decode('utf-8').splitlines()):
if line.startswith('Ran'):
print(line)
break
return int(self.failed)
def do_run(self):
self.show_sys_info()
self.make_process()
return self.report()
def is_known_failure(self):
"""
Check if failures are known, based on STDOUT from ERT.
"""
import re
lines = iter(self.stdout.splitlines())
for l in lines:
if re.match("[0-9]+ unexpected results:.*", l.decode('utf-8')):
break
else:
return True # no failure
# Check "FAILED <test-name>" lines
for l in lines:
if not l:
break # end with an empty line
for f in self.known_failures:
if re.search(f, l.decode('utf-8')):
break
else:
return False
return True
known_failures = [
"ein:notebook-execute-current-cell-pyout-image$",
]
"""
A list of regexp which matches to test that is known to fail (sometimes).
This is a workaround for ##74.
"""
def mkdirp(path):
"""Do ``mkdir -p {path}``"""
if not os.path.isdir(path):
os.makedirs(path)
def remove_elc():
files = glob.glob(einlispdir("*.elc")) + glob.glob(eintestdir("*.elc"))
list(map(os.remove, files))
print("Removed {0} elc files".format(len(files)))
class ServerRunner(BaseRunner):
port = None
notebook_dir = os.path.join(EIN_ROOT, "tests", "notebook")
def __enter__(self):
self.run()
return self.port
def __exit__(self, type, value, traceback):
self.stop()
def do_run(self):
self.clear_notebook_dir()
self.start()
self.get_port()
print("Server running at", self.port)
def clear_notebook_dir(self):
files = glob.glob(os.path.join(self.notebook_dir, '*.ipynb'))
list(map(os.remove, files))
print("Removed {0} ipynb files".format(len(files)))
@staticmethod
def _parse_port_line(line):
if line.find('token'):
port = line.rpartition('/')[0]
port = line.strip().rsplit(':', 1)[-1].strip('/')
return port
def get_port(self):
if self.port is None:
val = self.proc.stdout.readline()
dval = val.decode('utf-8')
self.port = self._parse_port_line(dval)
return self.port
def start(self):
from subprocess import Popen, PIPE, STDOUT
self.proc = Popen(
self.command, stdout=PIPE, stderr=STDOUT, stdin=PIPE,
shell=True)
# Answer "y" to the prompt: Shutdown Notebook Server (y/[n])?
self.proc.stdin.write(b'y\n')
def stop(self):
print("Stopping server", self.port)
returncode = self.proc.poll()
if returncode is not None:
logpath = self.logpath('server')
print("Server process was already dead by exit code", returncode)
print("*" * 50)
print("Showing {0}:".format(logpath))
print(open(logpath).read())
print()
return
if not self.dry_run:
try:
kill_subprocesses(self.proc.pid, lambda x: 'ipython' in x)
finally:
self.proc.terminate()
@property
def command(self):
fmtdata = dict(
notebook_dir=self.notebook_dir,
ipython=self.ipython,
server_log=self.logpath('server'),
)
return self.command_template.format(**fmtdata)
command_template = r"""{ipython} notebook --notebook-dir {notebook_dir} --no-browser --NotebookApp.token='' --debug 2>&1 | tee {server_log} | grep --line-buffered 'Notebook is running at' | head -n1"""
def kill_subprocesses(pid, include=lambda x: True):
from subprocess import Popen, PIPE
import signal
command = ['ps', '-e', '-o', 'ppid,pid,command']
proc = Popen(command, stdout=PIPE, stderr=PIPE)
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
raise RuntimeError(
'Command {0} failed with code {1} and following error message:\n'
'{2}'.format(command, proc.returncode, stderr))
for line in map(lambda l: str.strip(str(l)), stdout.decode('utf-8').splitlines()):
(cmd_ppid, cmd_pid, cmd) = line.split(None, 2)
if cmd_ppid == str(pid) and include(cmd):
print("Killing PID={0} COMMAND={1}".format(cmd_pid, cmd))
os.kill(int(cmd_pid), signal.SIGINT)
def construct_command(args):
"""
Construct command as a string given a list of arguments.
"""
command = []
escapes = set(' ()')
for a in args:
if set(a) & escapes:
command.append(repr(str(a))) # hackish way to escape
else:
command.append(a)
return " ".join(command)
def run_ein_test(unit_test, func_test, func_test_max_retries,
no_skip, clean_elc, **kwds):
if clean_elc and not kwds['dry_run']:
remove_elc()
if unit_test:
unit_test_runner = TestRunner(testfile='test-load.el', **kwds)
if unit_test_runner.run() != 0:
return 1
if func_test:
for i in range(func_test_max_retries + 1):
func_test_runner = TestRunner(testfile='func-test.el', **kwds)
with ServerRunner(testfile='func-test.el', **kwds) as port:
func_test_runner.setq('ein:testing-port', port)
if func_test_runner.run() == 0:
print("Functional test succeeded after {0} retries." \
.format(i))
return 0
if not no_skip and func_test_runner.is_known_failure():
print("All failures are known. Ending functional test.")
return 0
print("Functional test failed after {0} retries.".format(i))
return 1
return 0
def main():
import sys
import os
from argparse import ArgumentParser
os.environ['EMACSLOADPATH'] = cask_load_path()
os.environ['LC_ALL'] = 'en_us.UTF-8'
parser = ArgumentParser(description=__doc__.splitlines()[1])
parser.add_argument('--emacs', '-e', default='emacs',
help='Emacs executable.')
parser.add_argument('--load-path', '-L', default=[], action='append',
help="add a directory to load-path. "
"can be specified multiple times.")
parser.add_argument('--load', '-l', default=[], action='append',
help="load lisp file before tests. "
"can be specified multiple times.")
parser.add_argument('--load-ert', default=False, action='store_true',
help="load ERT from git submodule. "
"you need to update git submodule manually "
"if ert/ directory does not exist yet.")
parser.add_argument('--no-auto-ert', default=True,
dest='auto_ert', action='store_false',
help="load ERT from git submodule. "
"if this Emacs has no build-in ERT module.")
parser.add_argument('--batch', '-B', default=True,
dest='batch', action='store_false',
help="start interactive session.")
parser.add_argument('--debug-on-error', '-d', default=False,
action='store_true',
help="set debug-on-error to t and start "
"interactive session.")
parser.add_argument('--func-test-max-retries', default=4, type=int,
help="""
Specify number of retries for functional test
before failing with error. This is workaround
for the issue #74.
""")
parser.add_argument('--no-skip', default=False, action='store_true',
help="""
Do no skip known failures. Known failures
are implemented as another workaround for the
issue #74.
""")
parser.add_argument('--no-func-test', '-F', default=True,
dest='func_test', action='store_false',
help="do not run functional test.")
parser.add_argument('--no-unit-test', '-U', default=True,
dest='unit_test', action='store_false',
help="do not run unit test.")
parser.add_argument('--clean-elc', '-c', default=False,
action='store_true',
help="remove *.elc files in ein/lisp and "
"ein/tests directories.")
parser.add_argument('--dry-run', default=False,
action='store_true',
help="Print commands to be executed.")
parser.add_argument('--ipython', default='ipython',
help="""
ipython executable to use to run notebook server.
""")
parser.add_argument('--ein-log-level', default=40)
parser.add_argument('--ein-message-level', default=30)
parser.add_argument('--ein-debug', default=False, action='store_true',
help="(setq ein:debug t) when given.")
parser.add_argument('--log-dir', default="log",
help="Directory to store log (default: %(default)s)")
args = parser.parse_args()
sys.exit(run_ein_test(**vars(args)))
if __name__ == '__main__':
main()