mirror of
https://github.com/vale981/emacs-ipython-notebook
synced 2025-03-06 09:31:39 -05:00

This reduces some of the complexity in testein.py, but unfortunately running tests is still unreliable. Running batchwise tests don't work at all in Windows, and running from inside emacs tests often need to be run multiple times before they pass. The worst offender is the delete notebook test, which will pass on usually only 1 out of 3 tries. Testing seems to have revealed a couple bugs, so win??
522 lines
17 KiB
Python
Executable file
522 lines
17 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
"""
|
|
Run EIN test suite
|
|
"""
|
|
|
|
import glob
|
|
import os
|
|
import sys
|
|
import re
|
|
from subprocess import Popen, PIPE, STDOUT, check_output
|
|
|
|
EIN_ROOT = os.path.normpath(
|
|
os.path.join(os.path.dirname(__file__), os.path.pardir))
|
|
|
|
def cask_load_path():
|
|
try:
|
|
path = check_output(['cask','load-path'])
|
|
except WindowsError:
|
|
path = check_output(['C:/Users/mille/.cask/bin/cask.bat', 'load-path'])
|
|
|
|
return path.decode()
|
|
|
|
def has_library(emacs, library):
|
|
"""
|
|
Return True when `emacs` has build-in `library`.
|
|
"""
|
|
with open(os.devnull, 'w') as devnull:
|
|
proc = Popen(
|
|
[emacs, '-Q', '-batch', '-l', 'cl',
|
|
'--eval', '(assert (locate-library "{0}"))'.format(library)],
|
|
stdout=devnull, stderr=devnull)
|
|
return proc.wait() == 0
|
|
|
|
|
|
def eindir(*path):
|
|
return os.path.join(EIN_ROOT, *path)
|
|
|
|
|
|
def einlispdir(*path):
|
|
return eindir('lisp', *path)
|
|
|
|
|
|
def eintestdir(*path):
|
|
return eindir('tests', *path)
|
|
|
|
|
|
def einlibdir(*path):
|
|
return eindir('lib', *path)
|
|
|
|
|
|
def show_nonprinting(string, stream=sys.stdout):
|
|
"""Emulate ``cat -v`` (``--show-nonprinting``)."""
|
|
stream.writelines(map(chr, convert_nonprinting(string)))
|
|
|
|
|
|
def convert_nonprinting(string):
|
|
"""
|
|
Convert non-printing characters in `string`.
|
|
|
|
Output is iterable of int. So for Python 2, you need to
|
|
convert it into string using `chr`.
|
|
|
|
Adapted from: http://stackoverflow.com/a/437542/727827
|
|
|
|
"""
|
|
|
|
for b in map(ord, string.decode('utf-8')):
|
|
assert 0 <= b < 0x100
|
|
|
|
if b in (0x09, 0x0a): # '\t\n'
|
|
yield b
|
|
continue
|
|
|
|
if b > 0x7f: # not ascii
|
|
yield 0x4d # 'M'
|
|
yield 0x2d # '-'
|
|
b &= 0x7f
|
|
|
|
if b < 0x20: # control char
|
|
yield 0x5e # '^'
|
|
b |= 0x40
|
|
elif b == 0x7f:
|
|
yield 0x5e # '^'
|
|
yield 0x3f # '?'
|
|
continue
|
|
|
|
yield b
|
|
|
|
|
|
class BaseRunner(object):
|
|
|
|
def __init__(self, **kwds):
|
|
self.__dict__.update(kwds)
|
|
self.batch = self.batch and not self.debug_on_error
|
|
|
|
def logpath(self, name, ext='log'):
|
|
path = os.path.join(
|
|
self.log_dir,
|
|
"{testname}_{logname}_{modename}_{emacsname}.{ext}".format(
|
|
ext=ext,
|
|
logname=name,
|
|
emacsname=os.path.basename(self.emacs),
|
|
testname=os.path.splitext(self.testfile)[0],
|
|
modename='batch' if self.batch else 'interactive',
|
|
))
|
|
path = re.sub(r'\\', '/', path)
|
|
return path
|
|
|
|
@property
|
|
def command(self):
|
|
raise NotImplementedError
|
|
|
|
def do_run(self):
|
|
raise NotImplementedError
|
|
|
|
def run(self):
|
|
if self.dry_run:
|
|
command = self.command
|
|
if isinstance(command, str):
|
|
print(command)
|
|
else:
|
|
print((construct_command(command)))
|
|
return 0
|
|
else:
|
|
mkdirp(self.log_dir)
|
|
return self.do_run()
|
|
|
|
|
|
class TestRunner(BaseRunner):
|
|
|
|
def __init__(self, **kwds):
|
|
super(TestRunner, self).__init__(**kwds)
|
|
|
|
fmtdata = self.__dict__.copy()
|
|
fmtdata.update(
|
|
emacsname=os.path.basename(self.emacs),
|
|
testname=os.path.splitext(self.testfile)[0],
|
|
modename='batch' if self.batch else 'interactive',
|
|
)
|
|
quote = '"{0}"'.format
|
|
self.logpath_log = self.logpath('log')
|
|
self.logpath_messages = self.logpath('messages')
|
|
self.logpath_server = self.logpath('server')
|
|
self.notebook_dir = os.path.join(EIN_ROOT, "tests")
|
|
self.lispvars = {
|
|
'ein:testing-dump-file-log': quote(self.logpath_log),
|
|
'ein:testing-dump-server-log': quote(self.logpath_server),
|
|
'ein:testing-dump-file-messages': quote(self.logpath_messages),
|
|
'ein:log-level': self.ein_log_level,
|
|
'ein:log-message-level': self.ein_message_level,
|
|
'ein:testing-jupyter-server-command': quote(self.ipython),
|
|
'ein:testing-jupyter-server-directory': quote(os.path.normpath(self.notebook_dir))
|
|
}
|
|
if self.ein_debug:
|
|
self.lispvars['ein:debug'] = "'t"
|
|
|
|
def setq(self, sym, val):
|
|
self.lispvars[sym] = val
|
|
|
|
def bind_lispvars(self):
|
|
command = []
|
|
for (k, v) in self.lispvars.items():
|
|
if v is not None:
|
|
command.extend([
|
|
'--eval', '(setq {0} {1})'.format(k, v)])
|
|
return command
|
|
|
|
@property
|
|
def base_command(self):
|
|
command = [self.emacs, '-Q'] + self.bind_lispvars()
|
|
|
|
if self.batch:
|
|
command.append('-batch')
|
|
if self.debug_on_error:
|
|
command.extend(['-f', 'toggle-debug-on-error'])
|
|
|
|
# load modules
|
|
if self.need_ert():
|
|
ertdir = einlibdir('ert', 'lisp', 'emacs-lisp')
|
|
command.extend([
|
|
'-L', ertdir,
|
|
# Load `ert-run-tests-batch-and-exit`:
|
|
'-l', os.path.join(ertdir, 'ert-batch.el'),
|
|
# Load `ert-run-tests-interactively`:
|
|
'-l', os.path.join(ertdir, 'ert-ui.el'),
|
|
])
|
|
for path in self.load_path:
|
|
command.extend(['-L', path])
|
|
for path in self.load:
|
|
command.extend(['-l', path])
|
|
|
|
command.extend(['-L', einlispdir(),
|
|
'-L', eintestdir(),
|
|
'-l', eintestdir(self.testfile)])
|
|
# command.extend(['-L', einlispdir(),
|
|
# '-L', einlibdir('websocket'),
|
|
# '-L', einlibdir('request'),
|
|
# '-L', einlibdir('auto-complete'),
|
|
# '-L', einlibdir('popup'),
|
|
# '-L', eintestdir(),
|
|
# '-l', eintestdir(self.testfile)])
|
|
return command
|
|
|
|
@property
|
|
def command(self):
|
|
command = self.base_command[:]
|
|
if self.batch:
|
|
command.extend(['-f', 'ert-run-tests-batch-and-exit'])
|
|
else:
|
|
command.extend(['--eval', "(ert 't)"])
|
|
return command
|
|
|
|
def show_sys_info(self):
|
|
print(("*" * 50))
|
|
command = self.base_command + [
|
|
'-batch', '-l', 'lisp/ein-dev.el', '-f', 'ein:dev-print-sys-info']
|
|
proc = Popen(command, stderr=PIPE)
|
|
err = proc.stderr.read()
|
|
proc.wait()
|
|
if proc.returncode != 0:
|
|
print(("Error with return code {0} while running {1}".format(
|
|
proc.returncode, command)))
|
|
print(err)
|
|
pass
|
|
print(("*" * 50))
|
|
|
|
def need_ert(self):
|
|
if self.load_ert:
|
|
return True
|
|
if self.auto_ert:
|
|
if has_library(self.emacs, 'ert'):
|
|
print(("{0} has ERT module.".format(self.emacs)))
|
|
return False
|
|
else:
|
|
print("{0} has no ERT module.".format(self.emacs))
|
|
print("ERT is going to be loaded from git submodule.")
|
|
return True
|
|
return False
|
|
|
|
def make_process(self):
|
|
print("Start test {0}".format(self.testfile))
|
|
print("Emacs command {0}".format(self.command))
|
|
self.proc = Popen(self.command, stdout=PIPE, stderr=STDOUT)
|
|
return self.proc
|
|
|
|
def report(self):
|
|
(stdout, _) = self.proc.communicate()
|
|
self.stdout = stdout
|
|
self.failed = self.proc.returncode != 0
|
|
if self.failed:
|
|
print("*" * 50)
|
|
print("Showing {0}:".format(self.logpath_log))
|
|
print(open(self.logpath_log).read())
|
|
print()
|
|
print("*" * 50)
|
|
print("Showing STDOUT/STDERR:")
|
|
show_nonprinting(stdout)
|
|
print()
|
|
print("{0} failed".format(self.testfile))
|
|
else:
|
|
print("{0} OK".format(self.testfile))
|
|
for line in reversed(stdout.decode('utf-8').splitlines()):
|
|
if line.startswith('Ran'):
|
|
print(line)
|
|
break
|
|
return int(self.failed)
|
|
|
|
def do_run(self):
|
|
self.show_sys_info()
|
|
self.make_process()
|
|
return self.report()
|
|
|
|
def is_known_failure(self):
|
|
"""
|
|
Check if failures are known, based on STDOUT from ERT.
|
|
"""
|
|
import re
|
|
lines = iter(self.stdout.splitlines())
|
|
for l in lines:
|
|
if re.match("[0-9]+ unexpected results:.*", l.decode('utf-8')):
|
|
break
|
|
else:
|
|
return True # no failure
|
|
|
|
# Check "FAILED <test-name>" lines
|
|
for l in lines:
|
|
if not l:
|
|
break # end with an empty line
|
|
for f in self.known_failures:
|
|
if re.search(f, l.decode('utf-8')):
|
|
break
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
known_failures = [
|
|
"ein:notebook-execute-current-cell-pyout-image$",
|
|
]
|
|
"""
|
|
A list of regexp which matches to test that is known to fail (sometimes).
|
|
This is a workaround for ##74.
|
|
"""
|
|
|
|
|
|
def mkdirp(path):
|
|
"""Do ``mkdir -p {path}``"""
|
|
if not os.path.isdir(path):
|
|
os.makedirs(path)
|
|
|
|
|
|
def remove_elc():
|
|
files = glob.glob(einlispdir("*.elc")) + glob.glob(eintestdir("*.elc"))
|
|
list(map(os.remove, files))
|
|
print("Removed {0} elc files".format(len(files)))
|
|
|
|
|
|
class ServerRunner(BaseRunner):
|
|
|
|
port = None
|
|
notebook_dir = os.path.join(EIN_ROOT, "tests", "notebook")
|
|
|
|
def __enter__(self):
|
|
self.run()
|
|
return self.port
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.stop()
|
|
|
|
def do_run(self):
|
|
self.clear_notebook_dir()
|
|
self.start()
|
|
self.get_port()
|
|
print("Server running at", self.port)
|
|
|
|
def clear_notebook_dir(self):
|
|
files = glob.glob(os.path.join(self.notebook_dir, '*.ipynb'))
|
|
list(map(os.remove, files))
|
|
print("Removed {0} ipynb files".format(len(files)))
|
|
|
|
@staticmethod
|
|
def _parse_port_line(line):
|
|
if line.find('token'):
|
|
port = line.rpartition('/')[0]
|
|
|
|
port = line.strip().rsplit(':', 1)[-1].strip('/')
|
|
return port
|
|
|
|
def get_port(self):
|
|
if self.port is None:
|
|
val = self.proc.stdout.readline()
|
|
dval = val.decode('utf-8')
|
|
self.port = self._parse_port_line(dval)
|
|
return self.port
|
|
|
|
def start(self):
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
self.proc = Popen(
|
|
self.command, stdout=PIPE, stderr=STDOUT, stdin=PIPE,
|
|
shell=True)
|
|
# Answer "y" to the prompt: Shutdown Notebook Server (y/[n])?
|
|
self.proc.stdin.write(b'y\n')
|
|
|
|
def stop(self):
|
|
print("Stopping server", self.port)
|
|
returncode = self.proc.poll()
|
|
if returncode is not None:
|
|
logpath = self.logpath('server')
|
|
print("Server process was already dead by exit code", returncode)
|
|
print("*" * 50)
|
|
print("Showing {0}:".format(logpath))
|
|
print(open(logpath).read())
|
|
print()
|
|
return
|
|
if not self.dry_run:
|
|
try:
|
|
kill_subprocesses(self.proc.pid, lambda x: 'ipython' in x)
|
|
finally:
|
|
self.proc.terminate()
|
|
|
|
@property
|
|
def command(self):
|
|
fmtdata = dict(
|
|
notebook_dir=self.notebook_dir,
|
|
ipython=self.ipython,
|
|
server_log=self.logpath('server'),
|
|
)
|
|
return self.command_template.format(**fmtdata)
|
|
|
|
command_template = r"""{ipython} notebook --notebook-dir {notebook_dir} --no-browser --NotebookApp.token='' --debug 2>&1 | tee {server_log} | grep --line-buffered 'Notebook is running at' | head -n1"""
|
|
|
|
|
|
def kill_subprocesses(pid, include=lambda x: True):
|
|
from subprocess import Popen, PIPE
|
|
import signal
|
|
|
|
command = ['ps', '-e', '-o', 'ppid,pid,command']
|
|
proc = Popen(command, stdout=PIPE, stderr=PIPE)
|
|
(stdout, stderr) = proc.communicate()
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(
|
|
'Command {0} failed with code {1} and following error message:\n'
|
|
'{2}'.format(command, proc.returncode, stderr))
|
|
|
|
for line in map(lambda l: str.strip(str(l)), stdout.decode('utf-8').splitlines()):
|
|
(cmd_ppid, cmd_pid, cmd) = line.split(None, 2)
|
|
if cmd_ppid == str(pid) and include(cmd):
|
|
print("Killing PID={0} COMMAND={1}".format(cmd_pid, cmd))
|
|
os.kill(int(cmd_pid), signal.SIGINT)
|
|
|
|
|
|
def construct_command(args):
|
|
"""
|
|
Construct command as a string given a list of arguments.
|
|
"""
|
|
command = []
|
|
escapes = set(' ()')
|
|
for a in args:
|
|
if set(a) & escapes:
|
|
command.append(repr(str(a))) # hackish way to escape
|
|
else:
|
|
command.append(a)
|
|
return " ".join(command)
|
|
|
|
|
|
def run_ein_test(unit_test, func_test, func_test_max_retries,
|
|
no_skip, clean_elc, **kwds):
|
|
if clean_elc and not kwds['dry_run']:
|
|
remove_elc()
|
|
if unit_test:
|
|
unit_test_runner = TestRunner(testfile='test-load.el', **kwds)
|
|
if unit_test_runner.run() != 0:
|
|
return 1
|
|
if func_test:
|
|
for i in range(func_test_max_retries + 1):
|
|
func_test_runner = TestRunner(testfile='func-test.el', **kwds)
|
|
# with ServerRunner(testfile='func-test.el', **kwds) as port:
|
|
# func_test_runner.setq('ein:testing-port', port)
|
|
if func_test_runner.run() == 0:
|
|
print("Functional test succeeded after {0} retries." \
|
|
.format(i))
|
|
return 0
|
|
if not no_skip and func_test_runner.is_known_failure():
|
|
print("All failures are known. Ending functional test.")
|
|
return 0
|
|
print("Functional test failed after {0} retries.".format(i))
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def main():
|
|
import sys
|
|
import os
|
|
from argparse import ArgumentParser
|
|
|
|
os.environ['EMACSLOADPATH'] = cask_load_path()
|
|
os.environ['LC_ALL'] = 'en_us.UTF-8'
|
|
parser = ArgumentParser(description=__doc__.splitlines()[1])
|
|
parser.add_argument('--emacs', '-e', default='emacs',
|
|
help='Emacs executable.')
|
|
parser.add_argument('--load-path', '-L', default=[], action='append',
|
|
help="add a directory to load-path. "
|
|
"can be specified multiple times.")
|
|
parser.add_argument('--load', '-l', default=[], action='append',
|
|
help="load lisp file before tests. "
|
|
"can be specified multiple times.")
|
|
parser.add_argument('--load-ert', default=False, action='store_true',
|
|
help="load ERT from git submodule. "
|
|
"you need to update git submodule manually "
|
|
"if ert/ directory does not exist yet.")
|
|
parser.add_argument('--no-auto-ert', default=True,
|
|
dest='auto_ert', action='store_false',
|
|
help="load ERT from git submodule. "
|
|
"if this Emacs has no build-in ERT module.")
|
|
parser.add_argument('--batch', '-B', default=True,
|
|
dest='batch', action='store_false',
|
|
help="start interactive session.")
|
|
parser.add_argument('--debug-on-error', '-d', default=False,
|
|
action='store_true',
|
|
help="set debug-on-error to t and start "
|
|
"interactive session.")
|
|
parser.add_argument('--func-test-max-retries', default=4, type=int,
|
|
help="""
|
|
Specify number of retries for functional test
|
|
before failing with error. This is workaround
|
|
for the issue #74.
|
|
""")
|
|
parser.add_argument('--no-skip', default=False, action='store_true',
|
|
help="""
|
|
Do no skip known failures. Known failures
|
|
are implemented as another workaround for the
|
|
issue #74.
|
|
""")
|
|
parser.add_argument('--no-func-test', '-F', default=True,
|
|
dest='func_test', action='store_false',
|
|
help="do not run functional test.")
|
|
parser.add_argument('--no-unit-test', '-U', default=True,
|
|
dest='unit_test', action='store_false',
|
|
help="do not run unit test.")
|
|
parser.add_argument('--clean-elc', '-c', default=False,
|
|
action='store_true',
|
|
help="remove *.elc files in ein/lisp and "
|
|
"ein/tests directories.")
|
|
parser.add_argument('--dry-run', default=False,
|
|
action='store_true',
|
|
help="Print commands to be executed.")
|
|
parser.add_argument('--ipython', default='ipython',
|
|
help="""
|
|
ipython executable to use to run notebook server.
|
|
""")
|
|
parser.add_argument('--ein-log-level', default=40)
|
|
parser.add_argument('--ein-message-level', default=30)
|
|
parser.add_argument('--ein-debug', default=False, action='store_true',
|
|
help="(setq ein:debug t) when given.")
|
|
parser.add_argument('--log-dir', default="log",
|
|
help="Directory to store log (default: %(default)s)")
|
|
args = parser.parse_args()
|
|
sys.exit(run_ein_test(**vars(args)))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|