Partially revert "Remove no longer used files"

This partially reverts commit 472d6bf322.
This commit is contained in:
Nathaniel Nicandro 2021-02-06 13:37:11 -06:00
parent db0f3678ac
commit 23b9d03f3e
6 changed files with 1444 additions and 0 deletions

187
jupyter-channel-ioloop.el Normal file
View file

@ -0,0 +1,187 @@
;;; jupyter-channel-ioloop.el --- Abstract class to communicate with a jupyter-channel in a subprocess -*- lexical-binding: t -*-
;; Copyright (C) 2019-2020 Nathaniel Nicandro
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
;; Created: 27 Jun 2019
;; This program is free software; you can redistribute it and/or
;; modify it under the terms of the GNU General Public License as
;; published by the Free Software Foundation; either version 3, or (at
;; your option) any later version.
;; This program is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with GNU Emacs; see the file COPYING. If not, write to the
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
;; Boston, MA 02111-1307, USA.
;;; Commentary:
;; Define a `jupyter-ioloop' that can be sent events to start, stop, or send a
;; message on a set of `jupyter-channel' objects. For example to start a
;; `jupyter-channel' in the subprocess environment you would do something like
;;
;; (jupyter-send ioloop 'start-channel TYPE ENDPOINT)
;;
;; where TYPE and ENDPOINT have the same meaning as in `jupyter-channel'.
;;
;; Note by default, no channels are available in the subprocess environment.
;; You initialize channels by setting the `jupyter-channel-ioloop-channels'
;; variable in the subprocess environment, e.g. using
;; `jupyter-ioloop-add-setup', before starting the `jupyter-ioloop'.
;;
;; When you call `jupyter-ioloop-start' a `jupyter-session' object needs to
;; passed as the second argument with whatever object you would like to receive
;; events as the third. The `jupyter-session-id' will be used as the value of
;; the :identity key in the call to `jupyter-start' when starting a
;; channel.
;;
;; Each event sent to the subprocess will send back a corresponding
;; confirmation event, the three events that can be sent and their
;; corresponding confirmation events are:
;;
;; (start-channel TYPE ENDPOINT) -> (start-channel TYPE)
;; (stop-channel TYPE) -> (stop-channel TYPE)
;; (send TYPE MSG-TYPE MSG MSG-ID) -> (sent MSG-ID)
;;
;; For the send event, the MSG-TYPE, MSG, and MSG-ID have the same meaning as
;; the TYPE, MSG, and MSG-ID arguments of the `jupyter-send' method of a
;; `jupyter-channel'.
;;
;; Ex.
;;
;; (let ((ioloop (jupyter-channel-ioloop))
;; (session (jupyter-session :id ...)))
;; (jupyter-start-ioloop ioloop session ...)
;; ...
;; (jupyter-send ioloop 'start-channel ...)
;; ...)
;;; Code:
(require 'jupyter-ioloop)
(defvar jupyter-channel-ioloop-session nil
"The `jupyter-session' used when initializing Jupyter channels.")
(defvar jupyter-channel-ioloop-channels nil
"A list of synchronous channels in an ioloop controlling Jupyter channels.")
(jupyter-ioloop-add-arg-type jupyter-channel
(lambda (arg)
`(or (object-assoc ,arg :type jupyter-channel-ioloop-channels)
(error "Channel not alive (%s)" ,arg))))
(defclass jupyter-channel-ioloop (jupyter-ioloop)
()
:abstract t)
(cl-defmethod initialize-instance ((ioloop jupyter-channel-ioloop) &optional _slots)
(cl-call-next-method)
(jupyter-ioloop-add-setup ioloop
(require 'jupyter-channel-ioloop))
(jupyter-channel-ioloop-add-start-channel-event ioloop)
(jupyter-channel-ioloop-add-stop-channel-event ioloop)
(jupyter-channel-ioloop-add-send-event ioloop)
(jupyter-ioloop-add-teardown ioloop
(mapc #'jupyter-stop jupyter-channel-ioloop-channels)))
(defun jupyter-channel-ioloop-set-session (ioloop session)
"In the IOLOOP, set SESSION as the `jupyter-channel-ioloop-session'.
Add a form to IOLOOP's setup that sets the variable
`jupyter-channel-ioloop-session' to a `jupyter-session' based on
SESSION's id and key. Remove any top level form in the setup that
sets `jupyter-channel-ioloop-session' via `setq' before doing so."
(cl-callf (lambda (setup)
(cons `(setq jupyter-channel-ioloop-session
(jupyter-session
:id ,(jupyter-session-id session)
:key ,(jupyter-session-key session)))
(cl-remove-if
(lambda (f) (and (eq (car f) 'setq)
(eq (cadr f) 'jupyter-channel-ioloop-session)))
setup)))
(oref ioloop setup)))
;;; Channel events
(defun jupyter-channel-ioloop-add-start-channel-event (ioloop)
"Add a start-channel event handler to IOLOOP.
The event fires when the IOLOOP receives a list with the form
(start-channel CHANNEL-TYPE ENDPOINT)
and shall stop any existing channel with CHANNEL-TYPE and start a
new channel with CHANNEL-TYPE connected to ENDPOINT. The
underlying socket IDENTITY is derived from
`jupyter-channel-ioloop-session' in the IOLOOP environment. The
channel will be added to the variable
`jupyter-channel-ioloop-channels' in the IOLOOP environment.
Note, before sending this event to IOLOOP, the corresponding
channel needs to be available in the
`jupyer-channel-ioloop-channels' variable. You can initialize
this variable in the setup form of IOLOOP.
A list with the form
(start-channel CHANNEL-TYPE)
is returned to the parent process."
(jupyter-ioloop-add-event
ioloop start-channel ((channel jupyter-channel) endpoint)
;; Stop the channel if it is already alive
(when (jupyter-alive-p channel)
(jupyter-stop channel))
;; Start the channel
(oset channel endpoint endpoint)
(let ((identity (jupyter-session-id jupyter-channel-ioloop-session)))
(jupyter-start channel :identity identity))
(list 'start-channel (oref channel type))))
(defun jupyter-channel-ioloop-add-stop-channel-event (ioloop)
"Add a stop-channel event handler to IOLOOP.
The event fires when the IOLOOP receives a list with the form
(stop-channel CHANNEL-TYPE)
If a channel with CHANNEL-TYPE exists and is alive, it is stopped.
A list with the form
(stop-channel CHANNEL-TYPE)
is returned to the parent process."
(jupyter-ioloop-add-event ioloop stop-channel (type)
(let ((channel (object-assoc type :type jupyter-channel-ioloop-channels)))
(when (and channel (jupyter-alive-p channel))
(jupyter-stop channel))
(list 'stop-channel type))))
(defun jupyter-channel-ioloop-add-send-event (ioloop)
"Add a send event handler to IOLOOP.
The event fires when the IOLOOP receives a list of the form
(send CHANNEL-TYPE MSG-TYPE MSG MSG-ID)
and calls (jupyter-send CHANNEL MSG-TYPE MSG MSG-ID) using the
channel corresponding to CHANNEL-TYPE in the IOLOOP environment.
A list of the form
(sent CHANNEL-TYPE MSG-ID)
is returned to the parent process."
(jupyter-ioloop-add-event
ioloop send ((channel jupyter-channel) msg-type msg msg-id)
(list 'sent (oref channel type)
(jupyter-send channel msg-type msg msg-id))))
(provide 'jupyter-channel-ioloop)
;;; jupyter-channel-ioloop.el ends here

71
jupyter-channel.el Normal file
View file

@ -0,0 +1,71 @@
;;; jupyter-channel.el --- Jupyter channel interface -*- lexical-binding: t -*-
;; Copyright (C) 2019-2020 Nathaniel Nicandro
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
;; Created: 27 Jun 2019
;; This program is free software; you can redistribute it and/or
;; modify it under the terms of the GNU General Public License as
;; published by the Free Software Foundation; either version 3, or (at
;; your option) any later version.
;; This program is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with GNU Emacs; see the file COPYING. If not, write to the
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
;; Boston, MA 02111-1307, USA.
;;; Commentary:
;; Defines the `jupyter-channel' interface.
;;; Code:
(defclass jupyter-channel ()
((type
:type keyword
:initarg :type
:documentation "The type of this channel.")
(session
:type jupyter-session
:initarg :session
:documentation "The session object used to sign and send/receive messages.")
(endpoint
:type string
:initarg :endpoint
:documentation "The endpoint this channel is connected to.
Typical endpoints look like \"tcp://127.0.0.1:5555\"."))
:abstract t)
(cl-defmethod jupyter-start ((channel jupyter-channel) &key identity)
"Start a Jupyter CHANNEL using IDENTITY as the routing ID.
If CHANNEL is already alive, do nothing."
(cl-call-next-method))
(cl-defmethod jupyter-stop ((channel jupyter-channel))
"Stop a Jupyter CHANNEL.
If CHANNEL is already stopped, do nothing."
(cl-call-next-method))
(cl-defmethod jupyter-alive-p ((channel jupyter-channel))
"Return non-nil if a CHANNEL is alive."
(cl-call-next-method))
(cl-defmethod jupyter-send (channel type message &optional msg-id)
"On CHANNEL send MESSAGE which has message TYPE and optionally a MSG-ID."
(cl-call-next-method))
(cl-defmethod jupyter-recv (channel &optional dont-wait)
"Receive a message on CHANNEL.
If DONT-WAIT is non-nil, return nil immediately if there is no
message available to receive."
(cl-call-next-method))
(provide 'jupyter-channel)
;;; jupyter-channel.el ends here

507
jupyter-ioloop.el Normal file
View file

@ -0,0 +1,507 @@
;;; jupyter-ioloop.el --- Jupyter channel subprocess -*- lexical-binding: t -*-
;; Copyright (C) 2018-2020 Nathaniel Nicandro
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
;; Created: 03 Nov 2018
;; This program is free software; you can redistribute it and/or
;; modify it under the terms of the GNU General Public License as
;; published by the Free Software Foundation; either version 3, or (at
;; your option) any later version.
;; This program is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with GNU Emacs; see the file COPYING. If not, write to the
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
;; Boston, MA 02111-1307, USA.
;;; Commentary:
;; An ioloop encapsulates a subprocess that communicates with its parent
;; process in a pre-defined way. The parent process sends events (lists with a
;; head element tagging the type of event and the rest of the elements being
;; the arguments), via a call to the `jupyter-send' method of a
;; `jupyter-ioloop'. The ioloop subprocess then handles the event in its
;; environment. You add an event that can be handled in the ioloop environment
;; by calling `jupyter-ioloop-add-event' before calling `jupyter-ioloop-start'.
;;
;; When one of the events added through `jupyter-ioloop-add-event'
;; returns something other than nil, it is sent back to the parent
;; process and the handler function passed to `jupyter-ioloop-start'
;; is called.
;;
;; An example that will echo back what was sent to the ioloop as a message in
;; the parent process:
;;
;; (let ((ioloop (jupyter-ioloop))
;; (jupyter-ioloop-add-event ioloop echo (data)
;; "Return DATA back to the parent process."
;; (list 'echo data))
;; (jupyter-ioloop-start ioloop (lambda (event) (message "%s" (cadr event))))
;; (jupyter-send ioloop 'echo "Message")
;; (jupyter-ioloop-stop ioloop))
;;; Code:
(require 'jupyter-base)
(require 'zmq)
(eval-when-compile (require 'subr-x))
(defvar jupyter-ioloop-poller nil
"The polling object being used to poll for events in an ioloop.")
(defvar jupyter-ioloop-stdin nil
"A file descriptor or ZMQ socket used to receive events in an ioloop.")
(defvar jupyter-ioloop-nsockets 1
"The number of sockets being polled by `jupyter-ioloop-poller'.")
(defvar jupyter-ioloop-pre-hook nil
"A hook called at the start of every polling loop.
The hook is called with no arguments.")
(defvar jupyter-ioloop-post-hook nil
"A hook called at the end of every polling loop.
The hook is called with a single argument, the list of polling
events that occurred for this iteration or nil. The polling
events have the same value as the return value of
`zmq-poller-wait-all'.")
(defvar jupyter-ioloop-timers nil)
(defvar jupyter-ioloop-timeout 200
"Maximum time (in ms) to wait for polling events on `jupyter-ioloop-poller'.")
(defvar jupyter-ioloop--argument-types nil
"Argument types added via `jupyter-ioloop-add-arg-type'.")
(defun jupyter-ioloop-environment-p ()
"Return non-nil if this Emacs instance is an IOLoop subprocess."
(and noninteractive jupyter-ioloop-stdin jupyter-ioloop-poller))
(defclass jupyter-ioloop ()
((process :type (or null process) :initform nil)
(callbacks :type list :initform nil)
(events :type list :initform nil)
(setup :type list :initform nil)
(teardown :type list :initform nil))
:documentation "An interface for sending asynchronous messages via a subprocess.
An ioloop starts an Emacs subprocess setup to send events back
and forth between the parent Emacs process and the ioloop
asynchronously. The ioloop subprocess is essentially a polling
loop that polls its stdin and any sockets that may have been
created in the ioloop environment and performs pre-defined
actions when stdin sends an event. The structure of the
subprocess is the following
\(progn
(let ((jupyter-ioloop-poller (zmq-poller)))
<jupyter-ioloop-setup>
<send start event to parent>
(condition-case nil
(while t
(run-hook 'jupyter-ioloop-pre-hook)
<poll for stdin/socket events>
(run-hook 'jupyter-ioloop-post-hook))
(quit
<jupyter-ioloop-teardown>
<send quit event to parent>))))
<jupyter-ioloop-setup> is replaced by the form in the setup slot
of an ioloop and can be conveniently added to using
`jupyter-ioloop-add-setup'.
<jupyter-ioloop-teardown> is replaced with the teardown slot and
can be added to using `jupyter-ioloop-add-teardown'.
<poll for stdin/socket events> is replaced by code that will
listen for stdin/socket events using `jupyter-ioloop-poller'.
You add events to be handled by the subprocess using
`jupyter-ioloop-add-event', the return value of any event added
is what is sent to the parent Emacs process and what will
eventually be the sole argument to the handler function passed to
`jupyter-ioloop-start'. To suppress the subprocess from sending
anything back to the parent, ensure nil is returned by the form
created by `jupyter-ioloop-add-event'.
See `jupyter-channel-ioloop' for an example of its usage.")
(cl-defmethod initialize-instance ((ioloop jupyter-ioloop) &optional _slots)
(cl-call-next-method)
(jupyter-add-finalizer ioloop
(lambda ()
(with-slots (process) ioloop
(when (process-live-p process)
(delete-process process))))))
(defun jupyter-ioloop-wait-until (ioloop event cb &optional timeout progress-msg)
"Wait until EVENT occurs on IOLOOP.
If EVENT occurs, call CB and return its value if non-nil. CB is
called with a single argument, an event list whose first element
is EVENT. If CB returns nil, continue waiting until EVENT occurs
again or until TIMEOUT seconds elapses, TIMEOUT defaults to
`jupyter-default-timeout'. If TIMEOUT is reached, return nil.
If PROGRESS-MSG is non-nil, a progress reporter will be displayed
while waiting using PROGRESS-MSG as the message."
(declare (indent 2))
(cl-check-type ioloop jupyter-ioloop)
(jupyter-with-timeout
(progress-msg (or timeout jupyter-default-timeout))
(let ((e (jupyter-ioloop-last-event ioloop)))
(when (eq (car-safe e) event) (funcall cb e)))))
(defun jupyter-ioloop-last-event (ioloop)
"Return the last event received on IOLOOP."
(cl-check-type ioloop jupyter-ioloop)
(and (oref ioloop process)
(process-get (oref ioloop process) :last-event)))
(defmacro jupyter-ioloop-add-setup (ioloop &rest body)
"Set IOLOOP's `jupyter-ioloop-setup' slot to BODY.
BODY is the code that will be evaluated before the IOLOOP sends a
start event to the parent process."
(declare (indent 1))
`(setf (oref ,ioloop setup)
(append (oref ,ioloop setup)
(quote ,body))))
(defmacro jupyter-ioloop-add-teardown (ioloop &rest body)
"Set IOLOOP's `jupyter-ioloop-teardown' slot to BODY.
BODY is the code that will be evaluated just before the IOLOOP
sends a quit event to the parent process."
(declare (indent 1))
`(setf (oref ,ioloop teardown)
(append (oref ,ioloop teardown)
(quote ,body))))
(defmacro jupyter-ioloop-add-arg-type (tag fun)
"Add a new argument type for arguments in `jupyter-ioloop-add-event'.
If an argument has the form (arg TAG), where TAG is a symbol, in
the ARGS argument of `jupyter-ioloop-add-event', replace it with
the result of evaluating the form returned by FUN on arg in the
IOLOOP environment.
For example suppose we define an argument type, jupyter-channel:
(jupyter-ioloop-add-arg-type jupyter-channel
(lambda (arg)
`(or (object-assoc ,arg :type jupyter-channel-ioloop-channels)
(error \"Channel not alive (%s)\" ,arg))))
and define an event like
(jupyter-ioloop-add-event ioloop stop-channel ((channel jupyter-channel))
(jupyter-stop channel))
Finally after adding other events and starting the ioloop we send
an event like
(jupyter-send ioloop 'stop-channel :shell)
Then before the stop-channel event defined by
`jupyter-ioloop-add-event' is called in the IOLOOP environment,
the value for the channel argument passed by the `jupyter-send'
call is replaced by the form returned by the function specified
in the `jupyter-ioloop-add-arg-type' call."
(declare (indent 1))
`(progn
(setf (alist-get ',tag jupyter-ioloop--argument-types nil 'remove) nil)
;; NOTE: FUN is quoted to ensure lexical closures aren't created
(push (cons ',tag ,(list '\` fun)) jupyter-ioloop--argument-types)))
(defun jupyter-ioloop--replace-args (args)
"Convert special arguments in ARGS.
Map over ARGS, converting its elements into
,arg or ,(app (lambda (x) BODY) arg)
for use in a `pcase' form. The latter form occurs when one of
ARGS is of the form (arg TAG) where TAG is one of the keys in
`jupyter-ioloop--argument-types'. BODY will be replaced with the
result of calling the function associated with TAG in
`jupyter-ioloop--argument-types'.
Return the list of converted arguments."
(mapcar (lambda (arg)
(pcase arg
(`(,val ,tag)
(let ((form (alist-get tag jupyter-ioloop--argument-types)))
(list '\, (list 'app `(lambda (x) ,(funcall form 'x)) val))))
(_ (list '\, arg))))
args))
(defmacro jupyter-ioloop-add-event (ioloop event args &optional doc &rest body)
"For IOLOOP, add an EVENT handler.
ARGS is a list of arguments that are bound when EVENT occurs. DOC
is an optional documentation string describing what BODY, the
expression which will be evaluated when EVENT occurs, does. If
BODY evaluates to any non-nil value, it will be sent to the
parent Emacs process. A nil value for BODY means don't send
anything.
Some arguments are treated specially:
If one of ARGS is a list (<sym> tag) where <sym> is any symbol,
then the parent process that sends EVENT to IOLOOP is expected to
send a value that will be bound to <sym> and be handled by an
argument handler associated with tag before BODY is evaluated in
the IOLOOP process, see `jupyter-ioloop-add-arg-type'."
(declare (indent 3) (doc-string 4) (debug t))
(unless (stringp doc)
(when doc
(setq body (cons doc body))))
`(setf (oref ,ioloop events)
(cons (list (quote ,event) (quote ,args) (quote ,body))
(cl-remove-if (lambda (x) (eq (car x) (quote ,event)))
(oref ,ioloop events)))))
(defun jupyter-ioloop--event-dispatcher (ioloop exp)
"For IOLOOP return a form suitable for matching against EXP.
That is, return an expression which will cause an event to be
fired if EXP matches any event types handled by IOLOOP.
TODO: Explain these
By default this adds the events quit, callback, and timer."
(let ((user-events
(cl-loop
for (event args body) in (oref ioloop events)
for cond = (list '\` (cl-list*
event (jupyter-ioloop--replace-args args)))
if (memq event '(quit callback timer))
do (error "Event can't be one of quit, callback, or, timer")
;; cond = `(event ,arg1 ,arg2 ...)
else collect `(,cond ,@body))))
`(let* ((cmd ,exp)
(res (pcase cmd
,@user-events
;; Default events
(`(timer ,id ,period ,cb)
;; Ensure we don't send anything back to the parent process
(prog1 nil
(let ((timer (run-at-time 0.0 period (byte-compile cb))))
(puthash id timer jupyter-ioloop-timers))))
(`(callback ,cb)
;; Ensure we don't send anything back to the parent process
(prog1 nil
(setq jupyter-ioloop-timeout 0)
(add-hook 'jupyter-ioloop-pre-hook (byte-compile cb) 'append)))
('(quit) (signal 'quit nil))
(_ (error "Unhandled command %s" cmd)))))
;; Can only send lists at the moment
(when (and res (listp res)) (zmq-prin1 res)))))
(cl-defgeneric jupyter-ioloop-add-callback ((ioloop jupyter-ioloop) cb)
"In IOLOOP, add CB to be run in the IOLOOP environment.
CB is run at the start of every polling loop. Callbacks are
called in the order they are added.
WARNING: A function added as a callback should be quoted to avoid
sending closures to the IOLOOP. An example:
(jupyter-ioloop-add-callback ioloop
`(lambda () (zmq-prin1 'foo \"bar\")))"
(declare (indent 1))
(cl-assert (functionp cb))
(cl-callf append (oref ioloop callbacks) (list cb))
(when (process-live-p (oref ioloop process))
(jupyter-send ioloop 'callback (macroexpand-all cb))))
(defun jupyter-ioloop-poller-add (socket events)
"Add SOCKET to be polled using the `jupyter-ioloop-poller'.
EVENTS are the polling events that should be listened for on
SOCKET. If `jupyter-ioloop-poller' is not a `zmq-poller' object
do nothing."
(when (zmq-poller-p jupyter-ioloop-poller)
(zmq-poller-add jupyter-ioloop-poller socket events)
(cl-incf jupyter-ioloop-nsockets)))
(defun jupyter-ioloop-poller-remove (socket)
"Remove SOCKET from the `jupyter-ioloop-poller'.
If `jupyter-ioloop-poller' is not a `zmq-poller' object do
nothing."
(when (zmq-poller-p jupyter-ioloop-poller)
(zmq-poller-remove jupyter-ioloop-poller socket)
(cl-decf jupyter-ioloop-nsockets)))
(defun jupyter-ioloop--body (ioloop on-stdin)
`(let (events)
(condition-case nil
(progn
,@(oref ioloop setup)
;; Initialize any callbacks that were added before the ioloop was
;; started
(setq jupyter-ioloop-pre-hook
(mapcar (lambda (f)
(when (symbolp f)
(setq f (symbol-function f)))
(unless (byte-code-function-p f)
(byte-compile f)))
(append jupyter-ioloop-pre-hook
(quote ,(mapcar #'macroexpand-all
(oref ioloop callbacks))))))
;; Notify the parent process we are ready to do something
(zmq-prin1 '(start))
(let ((on-stdin (byte-compile (lambda () ,on-stdin))))
(while t
(run-hooks 'jupyter-ioloop-pre-hook)
(setq events
(condition-case nil
(zmq-poller-wait-all
jupyter-ioloop-poller
jupyter-ioloop-nsockets
jupyter-ioloop-timeout)
((zmq-EAGAIN zmq-EINTR zmq-ETIMEDOUT) nil)))
(let ((stdin-event (zmq-assoc jupyter-ioloop-stdin events)))
(when stdin-event
(setq events (delq stdin-event events))
(funcall on-stdin)))
(run-hook-with-args 'jupyter-ioloop-post-hook events))))
(quit
,@(oref ioloop teardown)
(zmq-prin1 '(quit))))))
(defun jupyter-ioloop--function (ioloop port)
"Return the function that does the work of IOLOOP.
The returned function is suitable to send to a ZMQ subprocess for
evaluation using `zmq-start-process'.
If PORT is non-nil the returned function will create a ZMQ PULL
socket to receive events from the parent process on the PORT of
the local host, otherwise events are expected to be received on
STDIN. This is useful on Windows systems which don't allow
polling the STDIN file handle."
`(lambda (ctx)
(push ,(file-name-directory (locate-library "jupyter-base")) load-path)
(require 'jupyter-ioloop)
(setq jupyter-ioloop-poller (zmq-poller))
(setq jupyter-ioloop-stdin
,(if port
`(let ((sock (zmq-socket ctx zmq-PAIR)))
(prog1 sock
(zmq-connect sock (format "tcp://127.0.0.1:%s" ,port))))
0))
(zmq-poller-add jupyter-ioloop-poller jupyter-ioloop-stdin zmq-POLLIN)
,(jupyter-ioloop--body
ioloop (jupyter-ioloop--event-dispatcher
ioloop (if port '(read (zmq-recv-decoded jupyter-ioloop-stdin))
'(zmq-subprocess-read))))))
(defun jupyter-ioloop-alive-p (ioloop)
"Return non-nil if IOLOOP is ready to receive/send events."
(cl-check-type ioloop jupyter-ioloop)
(with-slots (process) ioloop
(and (process-live-p process) (process-get process :start))))
(defun jupyter-ioloop--make-filter (ioloop handler)
(lambda (event)
(let ((process (oref ioloop process)))
(process-put process :last-event event)
(cond
((eq (car-safe event) 'start)
(process-put process :start t))
((eq (car-safe event) 'quit)
(process-put process :quit t))
(t
(funcall handler event))))))
(cl-defgeneric jupyter-ioloop-start ((ioloop jupyter-ioloop)
handler
&key buffer)
"Start an IOLOOP.
HANDLER is a function of one argument and will be passed an event
received by the subprocess that IOLOOP represents, an event is
just a list.
If IOLOOP was previously running, it is stopped first.
If BUFFER is non-nil it should be a buffer that will be used as
the IOLOOP subprocess buffer, see `zmq-start-process'."
(jupyter-ioloop-stop ioloop)
(let (stdin port)
;; NOTE: A socket is used to read input from the parent process to avoid
;; the stdin buffering done when using `read-from-minibuffer' in the
;; subprocess. When `noninteractive', `read-from-minibuffer' uses
;; `getc_unlocked' internally and `getc_unlocked' reads from the stdin FILE
;; object as opposed to reading directly from STDIN_FILENO. The problem is
;; that FILE objects are buffered streams which means that every message
;; the parent process sends does not necessarily correspond to a POLLIN
;; event on STDIN_FILENO in the subprocess. Since we only call
;; `read-from-minibuffer' when there is a POLLIN event on STDIN_FILENO
;; there is the potential that a message is waiting to be handled in the
;; buffer used by stdin which will only get handled if we send more
;; messages to the subprocess thereby creating more POLLIN events.
(when (or t (memq system-type '(windows-nt ms-dos cygwin)))
(setq stdin (zmq-socket (zmq-current-context) zmq-PAIR))
(setq port (zmq-bind-to-random-port stdin "tcp://127.0.0.1")))
(let ((process (zmq-start-process
(jupyter-ioloop--function ioloop (when stdin port))
;; We go through this Emacs-fu, brought to you by Chris
;; Wellons, https://nullprogram.com/blog/2014/01/27/,
;; because we want OBJECT to be the final say in when
;; everything gets garbage collected. If OBJECT loses
;; scope, the ioloop process should be killed off. This
;; wouldn't happen if we hold a strong reference to
;; OBJECT.
:filter (jupyter-ioloop--make-filter ioloop handler)
:buffer buffer)))
(oset ioloop process process)
(when stdin
(process-put process :stdin stdin))
(jupyter-ioloop-wait-until ioloop 'start #'identity))))
(cl-defgeneric jupyter-ioloop-stop ((ioloop jupyter-ioloop))
"Stop IOLOOP.
Send a quit event to IOLOOP, wait until it actually quits before
returning."
(with-slots (process) ioloop
(when (process-live-p process)
(jupyter-send ioloop 'quit)
(unless (jupyter-ioloop-wait-until ioloop 'quit #'identity)
(delete-process process))
(when-let* ((stdin (process-get process :stdin))
(socket-p (zmq-socket-p stdin)))
(zmq-unbind stdin (zmq-get-option stdin zmq-LAST-ENDPOINT))))))
(defvar jupyter-ioloop--send-buffer nil)
(defun jupyter-ioloop--dump-message (plist)
(with-current-buffer
(if (buffer-live-p jupyter-ioloop--send-buffer)
jupyter-ioloop--send-buffer
(setq jupyter-ioloop--send-buffer
(get-buffer-create " *jupyter-ioloop-send*")))
(erase-buffer)
(let (print-level print-length)
(prin1 plist (current-buffer)))
(buffer-string)))
(cl-defmethod jupyter-send ((ioloop jupyter-ioloop) &rest args)
"Using IOLOOP, send ARGS to its process.
All arguments passed to this function are sent as a list to the
process unchanged. This means that all arguments should be
serializable."
(with-slots (process) ioloop
(cl-assert (process-live-p process))
(let ((stdin (process-get process :stdin)))
(if stdin
(let ((msg (jupyter-ioloop--dump-message args)) sent)
(while (not sent)
(condition-case nil
(progn
(zmq-send-encoded stdin msg nil zmq-DONTWAIT)
(setq sent t))
(zmq-EAGAIN (accept-process-output nil 0)))))
(zmq-subprocess-send process args)))))
(provide 'jupyter-ioloop)
;;; jupyter-ioloop.el ends here

349
jupyter-kernel-process.el Normal file
View file

@ -0,0 +1,349 @@
;;; jupyter-kernel-process.el --- Jupyter kernels as Emacs processes -*- lexical-binding: t -*-
;; Copyright (C) 2020 Nathaniel Nicandro
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
;; Created: 25 Apr 2020
;; This program is free software; you can redistribute it and/or
;; modify it under the terms of the GNU General Public License as
;; published by the Free Software Foundation; either version 3, or (at
;; your option) any later version.
;; This program is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with GNU Emacs; see the file COPYING. If not, write to the
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
;; Boston, MA 02111-1307, USA.
;;; Commentary:
;; Jupyter kernels as Emacs processes.
;;; Code:
(require 'jupyter-kernel)
(require 'jupyter-monads)
(defgroup jupyter-kernel-process nil
"Jupyter kernels as Emacs processes"
:group 'jupyter)
(declare-function jupyter-channel-ioloop-set-session "jupyter-channel-ioloop")
(defvar jupyter--kernel-processes '()
"The list of kernel processes launched.
Elements look like (PROCESS CONN-FILE) where PROCESS is a kernel
process and CONN-FILE the associated connection file.
Cleaning up the list removes elements whose PROCESS is no longer
live. When removing an element, CONN-FILE will be deleted and
PROCESS's buffer killed.
The list is periodically cleaned up when a new process is
launched.
Also, just before Emacs exits any connection files that still
exist are deleted.")
;;; Kernel definition
(cl-defstruct (jupyter-kernel-process
(:include jupyter-kernel)))
(cl-defmethod jupyter-process ((kernel jupyter-kernel-process))
"Return the process of KERNEL.
Return nil if KERNEL does not have an associated process."
(car (cl-find-if (lambda (x) (and (processp (car x))
(eq (process-get (car x) :kernel) kernel)))
jupyter--kernel-processes)))
(cl-defmethod jupyter-alive-p ((kernel jupyter-kernel-process))
(let ((process (jupyter-process kernel)))
(and (process-live-p process)
(cl-call-next-method))))
(defun jupyter-kernel-process (&rest args)
"Return a `jupyter-kernel-process' initialized with ARGS."
(apply #'make-jupyter-kernel-process args))
(cl-defmethod jupyter-kernel :extra "process" (&rest args)
"Return a representation of a kernel based on an Emacs process.
If ARGS contains a :spec key, return a `jupyter-kernel-process'
initialized using ARGS. If the value is the name of a
kernelspec, the returned kernel's spec slot will be set to the
corresponding `jupyter-kernelspec'. The session of the returned
kernel will be initialized with the return value of
`jupyter-session-with-random-ports'.
Call the next method if ARGS does not contain :spec."
(let ((spec (plist-get args :spec)))
(if (not spec) (cl-call-next-method)
(when (stringp spec)
(plist-put args :spec
(or (jupyter-guess-kernelspec spec)
(error "No kernelspec matching name (%s)" spec))))
(apply #'jupyter-kernel-process args))))
;;; Client connection
(defun jupyter-kernel-process-io (session)
(let* ((channels '(:shell :iopub :stdin))
(ch-group (let ((endpoints (jupyter-session-endpoints session)))
(cl-loop
for ch in channels
collect ch
collect (list :endpoint (plist-get endpoints ch)
:alive-p nil))))
(hb nil)
(discarded nil)
(kernel-io nil)
(ioloop nil))
(cl-macrolet ((continue-after
(cond on-timeout)
`(jupyter-with-timeout
(nil jupyter-default-timeout ,on-timeout)
,cond)))
(cl-labels ((ch-put
(ch prop value)
(plist-put (plist-get ch-group ch) prop value))
(ch-get
(ch prop)
(plist-get (plist-get ch-group ch) prop))
(ch-alive-p
(ch)
(and ioloop (jupyter-ioloop-alive-p ioloop)
(ch-get ch :alive-p)))
(ch-start
(ch)
(unless (ch-alive-p ch)
(jupyter-send ioloop 'start-channel ch
(ch-get ch :endpoint))
(continue-after
(ch-alive-p ch)
(error "Channel failed to start: %s" ch))))
(ch-stop
(ch)
(when (ch-alive-p ch)
(jupyter-send ioloop 'stop-channel ch)
(continue-after
(not (ch-alive-p ch))
(error "Channel failed to stop: %s" ch))))
(start
()
(unless ioloop
(require 'jupyter-zmq-channel-ioloop)
(setq ioloop (make-instance 'jupyter-zmq-channel-ioloop))
(jupyter-channel-ioloop-set-session ioloop session))
(unless (jupyter-ioloop-alive-p ioloop)
(jupyter-ioloop-start
ioloop
(lambda (event)
(pcase (car event)
((and 'start-channel (let ch (cadr event)))
(ch-put ch :alive-p t))
((and 'stop-channel (let ch (cadr event)))
(ch-put ch :alive-p nil))
;; TODO: Get rid of this
('sent nil)
(_
;; FIXME: Turn into a function not a macro,
;; there is no need.
(jupyter-run-with-io kernel-io
(jupyter-publish event))))))
(condition-case err
(cl-loop
for ch in channels
do (ch-start ch))
(error
(jupyter-ioloop-stop ioloop)
(signal (car err) (cdr err)))))
ioloop)
(stop
()
(and ioloop
(jupyter-ioloop-alive-p ioloop)
(jupyter-ioloop-stop ioloop))))
(setq kernel-io
;; TODO: (jupyter-publisher :name "Session I/O" :fn ...)
;;
;; so that on error in a subscriber, the name can be
;; displayed to know where to look. This requires a
;; `jupyter-publisher' struct type.
(jupyter-publisher
(lambda (content)
(if discarded
(error "Kernel I/O no longer available: %s"
(cl-prin1-to-string session))
(pcase (car content)
;; ('message channel idents . msg)
('message
(pop content)
;; TODO: Get rid of this. Have the ioloop do
;; this.
(plist-put
(cddr content) :channel
(substring (symbol-name (car content)) 1))
(jupyter-content (cddr content)))
('send (apply #'jupyter-send (start) content))
('hb
(unless hb
(setq hb
(let ((endpoints
(jupyter-session-endpoints session)))
(make-instance
'jupyter-hb-channel
:session session
:endpoint (plist-get endpoints :hb)))))
(jupyter-run-with-io (cadr content)
(jupyter-publish hb)))
(_ (error "Unhandled I/O: %s" content)))))))
(jupyter-return-delayed
(list kernel-io
(lambda ()
(and hb (jupyter-hb-pause hb))
(stop)
(setq hb nil ioloop nil discarded t))))))))
(cl-defmethod jupyter-io ((kernel jupyter-kernel-process))
"Return an I/O connection to KERNEL's session."
(jupyter-kernel-process-io (jupyter-kernel-session kernel)))
;;; Kernel management
(defun jupyter--gc-kernel-processes ()
(setq jupyter--kernel-processes
(cl-loop for (p conn-file) in jupyter--kernel-processes
if (process-live-p p) collect (list p conn-file)
else do (delete-process p)
(when (file-exists-p conn-file)
(delete-file conn-file))
and when (buffer-live-p (process-buffer p))
do (kill-buffer (process-buffer p)))))
(defun jupyter-delete-connection-files ()
"Delete all connection files created by Emacs."
;; Ensure Emacs can be killed on error
(ignore-errors
(cl-loop for (_ conn-file) in jupyter--kernel-processes
do (when (file-exists-p conn-file)
(delete-file conn-file)))))
(add-hook 'kill-emacs-hook #'jupyter-delete-connection-files)
(defun jupyter--start-kernel-process (name kernelspec conn-file)
(let* ((process-name (format "jupyter-kernel-%s" name))
(buffer-name (format " *jupyter-kernel[%s]*" name))
(process-environment
(append (jupyter-process-environment kernelspec)
process-environment))
(args (jupyter-kernel-argv kernelspec conn-file))
(atime (nth 4 (file-attributes conn-file)))
(process (apply #'start-file-process process-name
(generate-new-buffer buffer-name)
(car args) (cdr args))))
(set-process-query-on-exit-flag process jupyter--debug)
;; Wait until the connection file has been read before returning.
;; This is to give the kernel a chance to setup before sending it
;; messages.
;;
;; TODO: Replace with a check of the heartbeat channel.
(jupyter-with-timeout
((format "Starting %s kernel process..." name)
jupyter-long-timeout
(unless (process-live-p process)
(error "Kernel process exited:\n%s"
(with-current-buffer (process-buffer process)
(ansi-color-apply (buffer-string))))))
;; Windows systems may not have good time resolution when retrieving
;; the last access time of a file so we don't bother with checking that
;; the kernel has read the connection file and leave it to the
;; downstream initialization to ensure that we can communicate with a
;; kernel.
(or (memq system-type '(ms-dos windows-nt cygwin))
(let ((attribs (file-attributes conn-file)))
;; `file-attributes' can potentially return nil, in this case
;; just assume it has read the connection file so that we can
;; know for sure it is not connected if it fails to respond to
;; any messages we send it.
(or (null attribs)
(not (equal atime (nth 4 attribs)))))))
(jupyter--gc-kernel-processes)
(push (list process conn-file) jupyter--kernel-processes)
process))
(cl-defmethod jupyter-do-launch :before ((kernel jupyter-kernel-process))
"Ensure KERNEL has a non-nil SESSION slot.
A `jupyter-session' with random port numbers for the channels and
a newly generated message signing key will be set as the value of
KERNEL's SESSION slot if it is nil."
(pcase-let (((cl-struct jupyter-kernel-process session) kernel))
(unless session
(setf (jupyter-kernel-session kernel) (jupyter-session-with-random-ports))
;; This is here for stability when running the tests. Sometimes
;; the kernel ports are not set up fast enough due to the hack
;; done in `jupyter-session-with-random-ports'. The effect
;; seems to be messages that are sent but never received by the
;; kernel.
(sit-for 0.2))))
(cl-defmethod jupyter-do-launch ((kernel jupyter-kernel-process))
"Start KERNEL's process.
Do nothing if KERNEL's process is already live.
The process arguments are constructed from KERNEL's SPEC. The
connection file passed as argument to the process is first
written to file, its contents are generated from KERNEL's SESSION
slot.
See also https://jupyter-client.readthedocs.io/en/stable/kernels.html#kernel-specs"
(let ((process (jupyter-process kernel)))
(unless (process-live-p process)
(pcase-let (((cl-struct jupyter-kernel-process spec session) kernel))
(setq process (jupyter--start-kernel-process
(jupyter-kernel-name kernel) spec
(jupyter-write-connection-file session))))
(setf (process-get process :kernel) kernel)
(setf (process-sentinel process)
(lambda (process _)
(pcase (process-status process)
('signal
(jupyter-kernel-died (process-get process :kernel))))))))
(cl-call-next-method))
;; TODO: Add restart argument
(cl-defmethod jupyter-do-shutdown ((kernel jupyter-kernel-process))
"Shutdown KERNEL by killing its process unconditionally."
(let ((process (jupyter-process kernel)))
(when process
(delete-process process)
(setf (process-get process :kernel) nil))
(cl-call-next-method)))
(cl-defmethod jupyter-do-interrupt ((kernel jupyter-kernel-process))
"Interrupt KERNEL's process.
The process can be interrupted when the interrupt mode of
KERNEL's SPEC is \"signal\" or not specified, otherwise the
KERNEL is interrupted by sending an :interrupt-request on
KERNEL's control channel.
See also https://jupyter-client.readthedocs.io/en/stable/kernels.html#kernel-specs"
(pcase-let* ((process (jupyter-process kernel))
((cl-struct jupyter-kernel-process spec) kernel)
((cl-struct jupyter-kernelspec plist) spec)
(imode (plist-get plist :interrupt_mode)))
(if (or (null imode) (string= imode "signal"))
(when process
(interrupt-process process t))
(cl-call-next-method))))
(provide 'jupyter-kernel-process)
;;; jupyter-kernel-process.el ends here

View file

@ -0,0 +1,82 @@
;;; jupyter-zmq-channel-ioloop.el --- IOLoop functions for Jupyter channels -*- lexical-binding: t -*-
;; Copyright (C) 2018-2020 Nathaniel Nicandro
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
;; Created: 08 Nov 2018
;; This program is free software; you can redistribute it and/or
;; modify it under the terms of the GNU General Public License as
;; published by the Free Software Foundation; either version 3, or (at
;; your option) any later version.
;; This program is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with GNU Emacs; see the file COPYING. If not, write to the
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
;; Boston, MA 02111-1307, USA.
;;; Commentary:
;; A `jupyter-channel-ioloop' using `jupyter-zmq-channel' to send and receive
;; messages. Whenever a message is received on a channel an event that looks
;; like the following will be sent back to the parent process
;;
;; (message CHANNEL-TYPE IDENTS . MSG)
;;
;; where CHANNEL-TYPE is the channel on which the message was received (one of
;; `jupyter-socket-types'), IDENTS are ZMQ identities, typically ignored, and
;; MSG is the message plist.
;;; Code:
(require 'jupyter-base)
(require 'jupyter-channel-ioloop)
(require 'jupyter-zmq-channel)
(defclass jupyter-zmq-channel-ioloop (jupyter-channel-ioloop)
()
:documentation "A `jupyter-ioloop' configured for Jupyter channels.")
(cl-defmethod initialize-instance ((ioloop jupyter-zmq-channel-ioloop) &optional _slots)
(cl-call-next-method)
(jupyter-ioloop-add-setup ioloop
(require 'jupyter-zmq-channel-ioloop)
(push 'jupyter-zmq-channel-ioloop--recv-messages jupyter-ioloop-post-hook)
(cl-loop
for channel in '(:shell :stdin :iopub)
unless (object-assoc channel :type jupyter-channel-ioloop-channels)
do (push (jupyter-zmq-channel
:session jupyter-channel-ioloop-session
:type channel)
jupyter-channel-ioloop-channels))))
(defun jupyter-zmq-channel-ioloop--recv-messages (events)
"Print the received messages described in EVENTS.
EVENTS is a list of socket events as returned by
`zmq-poller-wait-all'. If any of the sockets in EVENTS matches
one of the sockets in `jupyter-channel-ioloop-channels', receive a
message on the channel and print a list with the form
(message CHANNEL-TYPE . MSG...)
to stdout. CHANNEL-TYPE is the channel on which MSG was received,
either :shell, :stdin, or :iopub. MSG is a list as returned by
`jupyter-recv'."
(let (messages)
(dolist (channel jupyter-channel-ioloop-channels)
(with-slots (type socket) channel
(when (zmq-assoc socket events)
(push (cons type (jupyter-recv channel)) messages))))
(when messages
;; Send messages
(mapc (lambda (msg) (prin1 (cons 'message msg))) (nreverse messages))
(zmq-flush 'stdout))))
(provide 'jupyter-zmq-channel-ioloop)
;;; jupyter-zmq-channel-ioloop.el ends here

248
jupyter-zmq-channel.el Normal file
View file

@ -0,0 +1,248 @@
;;; jupyter-zmq-channel.el --- A Jupyter channel implementation using ZMQ sockets -*- lexical-binding: t -*-
;; Copyright (C) 2019-2020 Nathaniel Nicandro
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
;; Created: 27 Jun 2019
;; This program is free software; you can redistribute it and/or
;; modify it under the terms of the GNU General Public License as
;; published by the Free Software Foundation; either version 3, or (at
;; your option) any later version.
;; This program is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with GNU Emacs; see the file COPYING. If not, write to the
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
;; Boston, MA 02111-1307, USA.
;;; Commentary:
;; Implements synchronous channel types using ZMQ sockets. Each channel is
;; essentially a wrapper around a `zmq-socket' constrained to a socket type by
;; the type of the channel and with an associated `zmq-IDENTITY' obtained from
;; the `jupyter-session' that must be associated with the channel. A heartbeat
;; channel is distinct from the other channels in that it is implemented using
;; a timer which periodically pings the kernel depending on how its configured.
;; In order for communication to occur on the other channels, one of
;; `jupyter-send' or `jupyter-recv' must be called after starting the channel
;; with `jupyter-start'.
;;; Code:
(require 'jupyter-messages)
(require 'zmq)
(require 'jupyter-channel)
(eval-when-compile (require 'subr-x))
(declare-function jupyter-ioloop-poller-remove "jupyter-ioloop")
(declare-function jupyter-ioloop-poller-add "jupyter-ioloop")
(defconst jupyter-socket-types
(list :hb zmq-REQ
:shell zmq-DEALER
:iopub zmq-SUB
:stdin zmq-DEALER
:control zmq-DEALER)
"The socket types for the various channels used by `jupyter'.")
(defclass jupyter-zmq-channel (jupyter-channel)
((socket
:type (or null zmq-socket)
:initform nil
:documentation "The socket used for communicating with the kernel.")))
(defun jupyter-connect-endpoint (type endpoint &optional identity)
"Create socket with TYPE and connect to ENDPOINT.
If IDENTITY is non-nil, it will be set as the ROUTING-ID of the
socket. Return the created socket."
(let ((sock (zmq-socket (zmq-current-context) type)))
(prog1 sock
(zmq-socket-set sock zmq-LINGER 1000)
(when identity
(zmq-socket-set sock zmq-ROUTING-ID identity))
(zmq-connect sock endpoint))))
(defun jupyter-connect-channel (ctype endpoint &optional identity)
"Create a socket based on a Jupyter channel type.
CTYPE is one of the symbols `:hb', `:stdin', `:shell',
`:control', or `:iopub' and represents the type of channel to
connect to ENDPOINT. If IDENTITY is non-nil, it will be set as
the ROUTING-ID of the socket. Return the created socket."
(let ((sock-type (plist-get jupyter-socket-types ctype)))
(unless sock-type
(error "Invalid channel type (%s)" ctype))
(jupyter-connect-endpoint sock-type endpoint identity)))
(cl-defmethod jupyter-start ((channel jupyter-zmq-channel)
&key (identity (jupyter-session-id
(oref channel session))))
(unless (jupyter-alive-p channel)
(let ((socket (jupyter-connect-channel
(oref channel type) (oref channel endpoint) identity)))
(oset channel socket socket)
(cl-case (oref channel type)
(:iopub
(zmq-socket-set socket zmq-SUBSCRIBE ""))))
(when (and (functionp 'jupyter-ioloop-environment-p)
(jupyter-ioloop-environment-p))
(jupyter-ioloop-poller-add (oref channel socket) zmq-POLLIN))))
(cl-defmethod jupyter-stop ((channel jupyter-zmq-channel))
(when (jupyter-alive-p channel)
(when (and (functionp 'jupyter-ioloop-environment-p)
(jupyter-ioloop-environment-p))
(jupyter-ioloop-poller-remove (oref channel socket)))
(with-slots (socket) channel
(zmq-disconnect socket (zmq-socket-get socket zmq-LAST-ENDPOINT)))
(oset channel socket nil)))
(cl-defmethod jupyter-alive-p ((channel jupyter-zmq-channel))
(not (null (oref channel socket))))
(cl-defmethod jupyter-send ((channel jupyter-zmq-channel) type message &optional msg-id)
"Send a message on a ZMQ based Jupyter channel.
CHANNEL is the channel to send MESSAGE on. TYPE is a Jupyter
message type, like :kernel-info-request. Return the message ID
of the sent message."
(cl-destructuring-bind (id . msg)
(jupyter-encode-message (oref channel session) type
:msg-id msg-id
:content message)
(prog1 id
(zmq-send-multipart (oref channel socket) msg))))
(cl-defmethod jupyter-recv ((channel jupyter-zmq-channel) &optional dont-wait)
"Receive a message on CHANNEL.
Return a cons cell (IDENTS . MSG) where IDENTS are the ZMQ
message identities, as a list, and MSG is the received message.
If DONT-WAIT is non-nil, return immediately without waiting for a
message if one isn't already available."
(condition-case nil
(let ((session (oref channel session))
(msg (zmq-recv-multipart (oref channel socket)
(and dont-wait zmq-DONTWAIT))))
(when msg
(cl-destructuring-bind (idents . parts)
(jupyter--split-identities msg)
(cons idents (jupyter-decode-message session parts)))))
(zmq-EAGAIN nil)))
;;; Heartbeat channel
(defvar jupyter-hb-max-failures 3
"Number of heartbeat failures until the kernel is considered unreachable.
A ping is sent to the kernel on a heartbeat channel and waits
until `time-to-dead' seconds to see if the kernel sent a ping
back. If the kernel doesn't send a ping back after
`jupyter-hb-max-failures', the callback associated with the
heartbeat channel is called. See `jupyter-hb-on-kernel-dead'.")
(defclass jupyter-hb-channel (jupyter-zmq-channel)
((type
:type keyword
:initform :hb
:documentation "The type of this channel is `:hb'.")
(time-to-dead
:type number
:initform 10
:documentation "The time in seconds to wait for a response
from the kernel until the connection is assumed to be dead. Note
that this slot only takes effect when starting the channel.")
(dead-cb
:type function
:initform #'ignore
:documentation "A callback function that takes 0 arguments
and is called when the kernel has not responded for
\(* `jupyter-hb-max-failures' `time-to-dead'\) seconds.")
(beating
:type (or boolean symbol)
:initform t
:documentation "A flag variable indicating that the heartbeat
channel is communicating with the kernel.")
(paused
:type boolean
:initform t
:documentation "A flag variable indicating that the heartbeat
channel is paused and not communicating with the kernel. To
pause the heartbeat channel use `jupyter-hb-pause', to unpause
use `jupyter-hb-unpause'."))
:documentation "A base class for heartbeat channels.")
(cl-defmethod jupyter-alive-p ((channel jupyter-hb-channel))
"Return non-nil if CHANNEL is alive."
(zmq-socket-p (oref channel socket)))
(defun jupyter-hb--pingable-p (channel)
(and (not (oref channel paused))
(jupyter-alive-p channel)))
(cl-defmethod jupyter-hb-beating-p ((channel jupyter-hb-channel))
"Return non-nil if CHANNEL is reachable."
(and (jupyter-hb--pingable-p channel)
(oref channel beating)))
(cl-defmethod jupyter-hb-pause ((channel jupyter-hb-channel))
"Pause checking for heartbeat events on CHANNEL."
(oset channel paused t))
(cl-defmethod jupyter-hb-unpause ((channel jupyter-hb-channel))
"Un-pause checking for heatbeat events on CHANNEL."
(when (oref channel paused)
(if (jupyter-alive-p channel)
;; Consume a pending message from the kernel if there is one. We send a
;; ping and then schedule a timer which fires TIME-TO-DEAD seconds
;; later to receive the ping back from the kernel and start the process
;; all over again. If the channel is paused before TIME-TO-DEAD
;; seconds, there may still be a ping from the kernel waiting.
(ignore-errors (zmq-recv (oref channel socket) zmq-DONTWAIT))
(jupyter-start channel))
(oset channel paused nil)
(jupyter-hb--send-ping channel)))
(cl-defmethod jupyter-hb-on-kernel-dead ((channel jupyter-hb-channel) fun)
"When the kernel connected to CHANNEL dies, call FUN.
A kernel is considered dead when CHANNEL does not receive a
response after \(* `jupyter-hb-max-failures' `time-to-dead'\)
seconds has elapsed without the kernel sending a ping back."
(declare (indent 1))
(oset channel dead-cb fun))
(defun jupyter-hb--send-ping (channel &optional failed-count)
(when (jupyter-hb--pingable-p channel)
(condition-case nil
(progn
(zmq-send (oref channel socket) "ping")
(run-with-timer
(oref channel time-to-dead) nil
(lambda ()
(when-let* ((sock (and (jupyter-hb--pingable-p channel)
(oref channel socket))))
(oset channel beating
(condition-case nil
(and (zmq-recv sock zmq-DONTWAIT) t)
((zmq-EINTR zmq-EAGAIN) nil)))
(if (oref channel beating)
(jupyter-hb--send-ping channel)
;; Reset the socket
(jupyter-stop channel)
(jupyter-start channel)
(or failed-count (setq failed-count 0))
(if (< failed-count jupyter-hb-max-failures)
(jupyter-hb--send-ping channel (1+ failed-count))
(oset channel paused t)
(when (functionp (oref channel dead-cb))
(funcall (oref channel dead-cb)))))))))
;; FIXME: Should be a part of `jupyter-hb--pingable-p'
(zmq-ENOTSOCK
(jupyter-hb-pause channel)
(oset channel socket nil)))))
(provide 'jupyter-zmq-channel)
;;; jupyter-zmq-channel.el ends here