mirror of
https://github.com/vale981/emacs-jupyter
synced 2025-03-05 07:41:37 -05:00
Generalize communication with a kernel
The previous mechanism to communicate with a kernel was too low level from the perspective of a client. The client interfaced directly with the subprocess abstraction, `jupyter-ioloop`, and had to handle all "events" that occurred in the `jupyter-ioloop`, e.g. when a channel was started or stopped. But in reality such events should not be the concern of a client. A client should only care about events that are directly related to kernel messages and not events related to the implementation details of *how* communication occurs. This commit abstracts out the way in which a client communicates with its kernel by introducing a new `jupyter-comm-layer` class. The `jupyter-comm-layer` class takes care of managing the communication channel between a kernel and its clients as well as sending events to all registered clients. This way, clients operate solely at the level of events on the communication layer. All a client does is register itself to receive events on the communication layer and send events on the layer. * jupyter-base.el (jupyter-session-endpoints): New function. * jupyter-client.el (jupyter-kernel-client): Remove ioloop and channels slots. Add kcomm slot. (initialize-instance): Unconditionally stop channels. (jupyter-initialize-connection): Change into a method call. Call `jupyter-initialize-connection` on the `kcomm` slot. (jupyter-with-client-buffer): Remove stale comment. (jupyter-send): Call `jupyter-send` on the `kcomm` slot. (jupyter-ioloop-handler): Remove all method definitions, replace `sent` and `message` methods with their `jupyter-event-handler` equivalents. (jupyter-hb-pause, jupyter-hb-unpause, jupyter-hb-beating): (jupyter-channel-alive-p, jupyter-start-channel, jupyter-stop-channel): (jupyter-start-channels, jupyter-stop-channels): Replace with calls to their equivalents using the `kcomm` slot. * jupyter-comm-layer.el: New file. * jupyter-kernel-manager (jupyter-make-client): Set a client's `kcomm` slot to `jupyter-channel-ioloop-comm`. * jupyter-messages.el (jupyter-decode-message): Use `list` directly. There seemed to be issues when using the new `jupyter-sync-channel-comm` due to using quoted lists. * test/jupyter-test.el: Add `jupyter-comm-layer` test. Update other tests. * test/test-helper.el: Add `jupyter-comm-layer` mock objects. Update `jupyter-echo-client`.
This commit is contained in:
parent
923e30e0f7
commit
b2294dceb2
7 changed files with 590 additions and 240 deletions
|
@ -484,6 +484,22 @@ fields:
|
||||||
(id nil :read-only t)
|
(id nil :read-only t)
|
||||||
(key nil :read-only t))
|
(key nil :read-only t))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-session-endpoints ((session jupyter-session))
|
||||||
|
"Return a property list containing the endpoints from SESSION."
|
||||||
|
(cl-destructuring-bind
|
||||||
|
(&key shell_port iopub_port stdin_port hb_port ip transport
|
||||||
|
&allow-other-keys)
|
||||||
|
(jupyter-session-conn-info session)
|
||||||
|
(cl-assert (and transport ip))
|
||||||
|
(let ((addr (lambda (port) (format "%s://%s:%d" transport ip port))))
|
||||||
|
(cl-loop
|
||||||
|
for (channel . port) in `((:hb . ,hb_port)
|
||||||
|
(:stdin . ,stdin_port)
|
||||||
|
(:shell . ,shell_port)
|
||||||
|
(:iopub . ,iopub_port))
|
||||||
|
do (cl-assert port) and
|
||||||
|
collect channel and collect (funcall addr port)))))
|
||||||
|
|
||||||
;;; Request object definition
|
;;; Request object definition
|
||||||
|
|
||||||
(cl-defstruct (jupyter-request
|
(cl-defstruct (jupyter-request
|
||||||
|
|
|
@ -33,8 +33,7 @@
|
||||||
|
|
||||||
(eval-when-compile (require 'subr-x))
|
(eval-when-compile (require 'subr-x))
|
||||||
(require 'jupyter-base)
|
(require 'jupyter-base)
|
||||||
(require 'jupyter-channels)
|
(require 'jupyter-comm-layer)
|
||||||
(require 'jupyter-channel-ioloop)
|
|
||||||
(require 'jupyter-mime)
|
(require 'jupyter-mime)
|
||||||
(require 'jupyter-messages)
|
(require 'jupyter-messages)
|
||||||
|
|
||||||
|
@ -108,9 +107,10 @@ requests like the above example.")
|
||||||
(pending-requests
|
(pending-requests
|
||||||
:type ring
|
:type ring
|
||||||
:initform (make-ring 10)
|
:initform (make-ring 10)
|
||||||
:documentation "A ring of pending `jupyter-request's.
|
:documentation "A ring of pending `jupyter-request's. A
|
||||||
A request is pending if it has not been sent to the kernel via the
|
request is pending if a notification has not been received by the
|
||||||
client's ioloop slot.")
|
client that the message has actually been sent by the
|
||||||
|
communication layer. See the kcomm slot.")
|
||||||
(execution-state
|
(execution-state
|
||||||
:type string
|
:type string
|
||||||
:initform "idle"
|
:initform "idle"
|
||||||
|
@ -142,9 +142,8 @@ client is expecting a reply from the kernel.")
|
||||||
initializing this client. When `jupyter-start-channels' is
|
initializing this client. When `jupyter-start-channels' is
|
||||||
called, this will be set to the kernel info plist returned
|
called, this will be set to the kernel info plist returned
|
||||||
from an initial `:kernel-info-request'.")
|
from an initial `:kernel-info-request'.")
|
||||||
(ioloop
|
(kcomm
|
||||||
:type (or null jupyter-channel-ioloop)
|
:type jupyter-comm-layer
|
||||||
:initform nil
|
|
||||||
:documentation "The process which receives events from channels.")
|
:documentation "The process which receives events from channels.")
|
||||||
(session
|
(session
|
||||||
:type jupyter-session
|
:type jupyter-session
|
||||||
|
@ -165,17 +164,7 @@ initialized the client.")
|
||||||
(-buffer
|
(-buffer
|
||||||
:type buffer
|
:type buffer
|
||||||
:documentation "An internal buffer used to store client local
|
:documentation "An internal buffer used to store client local
|
||||||
variables and intermediate ioloop process output. When the ioloop
|
variables.")))
|
||||||
slot is non-nil, its `process-buffer' will be `eq' to this
|
|
||||||
buffer.")
|
|
||||||
(channels
|
|
||||||
:type list
|
|
||||||
:initform nil
|
|
||||||
:initarg :channels
|
|
||||||
:documentation "A property list describing the channels.
|
|
||||||
The keys are channel types whose values are the status of the
|
|
||||||
channels. The exception is the heartbeat channel. The value of
|
|
||||||
the :hb key is a `jupyter-hb-channel'.")))
|
|
||||||
|
|
||||||
;;; `jupyter-current-client' language method specializer
|
;;; `jupyter-current-client' language method specializer
|
||||||
|
|
||||||
|
@ -209,10 +198,7 @@ passed as the argument has a language of LANG."
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(when (buffer-live-p buffer)
|
(when (buffer-live-p buffer)
|
||||||
(kill-buffer buffer))
|
(kill-buffer buffer))
|
||||||
;; Ensure the ioloop process gets cleaned up when the client goes out
|
(jupyter-stop-channels client)))))
|
||||||
;; of scope.
|
|
||||||
(when (jupyter-channels-running-p client)
|
|
||||||
(jupyter-stop-channels client))))))
|
|
||||||
|
|
||||||
(defun jupyter-clients ()
|
(defun jupyter-clients ()
|
||||||
"Return a list of all `jupyter-kernel-clients'."
|
"Return a list of all `jupyter-kernel-clients'."
|
||||||
|
@ -243,12 +229,13 @@ See `jupyter-initialize-connection'."
|
||||||
(list info-or-session
|
(list info-or-session
|
||||||
'(or jupyter-session-p json-plist-p stringp))))))
|
'(or jupyter-session-p json-plist-p stringp))))))
|
||||||
|
|
||||||
(defun jupyter-initialize-connection (client info-or-session)
|
;; NOTE: This requires that CLIENT is communicating with a kernel using a
|
||||||
|
;; `jupyter-channel-ioloop-comm' object.
|
||||||
|
(cl-defmethod jupyter-initialize-connection ((client jupyter-kernel-client) info-or-session)
|
||||||
"Initialize CLIENT with connection INFO-OR-SESSION.
|
"Initialize CLIENT with connection INFO-OR-SESSION.
|
||||||
INFO-OR-SESSION can be a file name, a plist, or a
|
INFO-OR-SESSION can be a file name, a plist, or a
|
||||||
`jupyter-session' object that will be used to initialize CLIENT's
|
`jupyter-session' object that will be used to initialize CLIENT's
|
||||||
connection. If CLIENT is already connected to a kernel, its
|
connection.
|
||||||
connection is first terminated before initializing a new one.
|
|
||||||
|
|
||||||
When INFO-OR-SESSION is a file name, read the contents of the
|
When INFO-OR-SESSION is a file name, read the contents of the
|
||||||
file as a JSON plist and create a new `jupyter-session' from it.
|
file as a JSON plist and create a new `jupyter-session' from it.
|
||||||
|
@ -267,43 +254,22 @@ connection and will be accessible as the session slot of CLIENT.
|
||||||
The necessary keys and values to initialize a connection can be
|
The necessary keys and values to initialize a connection can be
|
||||||
found at
|
found at
|
||||||
http://jupyter-client.readthedocs.io/en/latest/kernels.html#connection-files."
|
http://jupyter-client.readthedocs.io/en/latest/kernels.html#connection-files."
|
||||||
(cl-check-type client jupyter-kernel-client)
|
|
||||||
(let ((session (and (jupyter-session-p info-or-session) info-or-session))
|
(let ((session (and (jupyter-session-p info-or-session) info-or-session))
|
||||||
(conn-info (jupyter--connection-info info-or-session)))
|
(conn-info (jupyter--connection-info info-or-session)))
|
||||||
(cl-destructuring-bind
|
(cl-destructuring-bind (&key key signature_scheme &allow-other-keys)
|
||||||
(&key shell_port iopub_port stdin_port hb_port ip
|
|
||||||
key transport signature_scheme
|
|
||||||
&allow-other-keys)
|
|
||||||
conn-info
|
conn-info
|
||||||
(when (and (> (length key) 0)
|
(when (and (> (length key) 0)
|
||||||
(not (functionp
|
(not (functionp
|
||||||
(intern (concat "jupyter-" signature_scheme)))))
|
(intern (concat "jupyter-" signature_scheme)))))
|
||||||
(error "Unsupported signature scheme: %s" signature_scheme))
|
(error "Unsupported signature scheme: %s" signature_scheme)))
|
||||||
;; Stop the channels if connected to some other kernel
|
(oset client session
|
||||||
(when (jupyter-channels-running-p client)
|
(or (copy-sequence session)
|
||||||
(jupyter-stop-channels client))
|
(jupyter-session
|
||||||
;; Initialize the channels
|
:key (plist-get conn-info :key)
|
||||||
(unless session
|
:conn-info conn-info)))
|
||||||
(setq session (jupyter-session :key key :conn-info conn-info)))
|
(jupyter-initialize-connection
|
||||||
(oset client session session)
|
(oref client kcomm)
|
||||||
(let ((addr (lambda (port) (format "%s://%s:%d" transport ip port))))
|
(oref client session))))
|
||||||
(setf (oref client channels)
|
|
||||||
;; Construct the channel plist
|
|
||||||
(cl-list*
|
|
||||||
:hb (make-instance
|
|
||||||
'jupyter-hb-channel
|
|
||||||
:session session
|
|
||||||
:endpoint (funcall addr hb_port))
|
|
||||||
(cl-loop
|
|
||||||
for (channel . port) in `((:stdin . ,stdin_port)
|
|
||||||
(:shell . ,shell_port)
|
|
||||||
(:iopub . ,iopub_port))
|
|
||||||
collect channel
|
|
||||||
and collect
|
|
||||||
;; The session will be associated with these channels in the
|
|
||||||
;; ioloop subprocess. See `jupyter-start-channels'.
|
|
||||||
(list :endpoint (funcall addr port)
|
|
||||||
:alive-p nil))))))))
|
|
||||||
|
|
||||||
;;; Client local variables
|
;;; Client local variables
|
||||||
|
|
||||||
|
@ -314,11 +280,6 @@ subprocess buffer."
|
||||||
(declare (indent 1))
|
(declare (indent 1))
|
||||||
`(progn
|
`(progn
|
||||||
(cl-check-type ,client jupyter-kernel-client)
|
(cl-check-type ,client jupyter-kernel-client)
|
||||||
;; NOTE: -buffer will be set as the IOLoop process buffer, see
|
|
||||||
;; `jupyter-start-channels', but before the IOLoop process is started we
|
|
||||||
;; would like to have a buffer available so that client local variables
|
|
||||||
;; can be set on the buffer. This is why we create our own buffer when a
|
|
||||||
;; client is initialized.
|
|
||||||
(with-current-buffer (oref ,client -buffer)
|
(with-current-buffer (oref ,client -buffer)
|
||||||
,@body)))
|
,@body)))
|
||||||
|
|
||||||
|
@ -409,19 +370,16 @@ Note that you can manipulate how to handle messages received in
|
||||||
response to the sent message, see `jupyter-add-callback' and
|
response to the sent message, see `jupyter-add-callback' and
|
||||||
`jupyter-request-inhibited-handlers'."
|
`jupyter-request-inhibited-handlers'."
|
||||||
(declare (indent 1))
|
(declare (indent 1))
|
||||||
(let ((ioloop (oref client ioloop)))
|
(jupyter-verify-inhibited-handlers)
|
||||||
(unless ioloop
|
(let ((msg-id (or msg-id (jupyter-new-uuid))))
|
||||||
(signal 'wrong-type-argument (list 'jupyter-ioloop ioloop 'ioloop)))
|
(jupyter-send (oref client kcomm) 'send channel type message msg-id)
|
||||||
(jupyter-verify-inhibited-handlers)
|
;; Anything sent to stdin is a reply not a request so don't add it as a
|
||||||
(let ((msg-id (or msg-id (jupyter-new-uuid))))
|
;; pending request
|
||||||
(jupyter-send ioloop 'send channel type message msg-id)
|
(unless (eq channel :stdin)
|
||||||
;; Anything sent to stdin is a reply not a request so don't add it as a
|
(let ((req (jupyter-generate-request client message)))
|
||||||
;; pending request
|
(setf (jupyter-request-id req) msg-id)
|
||||||
(unless (eq channel :stdin)
|
(setf (jupyter-request-inhibited-handlers req) jupyter-inhibit-handlers)
|
||||||
(let ((req (jupyter-generate-request client message)))
|
(jupyter--push-pending-request client req)))))
|
||||||
(setf (jupyter-request-id req) msg-id)
|
|
||||||
(setf (jupyter-request-inhibited-handlers req) jupyter-inhibit-handlers)
|
|
||||||
(jupyter--push-pending-request client req))))))
|
|
||||||
|
|
||||||
;;; Pending requests
|
;;; Pending requests
|
||||||
|
|
||||||
|
@ -448,33 +406,18 @@ back."
|
||||||
(jupyter--drop-idle-requests client)
|
(jupyter--drop-idle-requests client)
|
||||||
(with-slots (pending-requests requests) client
|
(with-slots (pending-requests requests) client
|
||||||
(or (> (ring-length pending-requests) 0)
|
(or (> (ring-length pending-requests) 0)
|
||||||
;; If there are two requests, then there is really only one since
|
;; If there are two requests, then there is really only one since
|
||||||
;; "last-sent" is an alias for the other.
|
;; "last-sent" is an alias for the other.
|
||||||
(> (hash-table-count requests) 2)
|
(> (hash-table-count requests) 2)
|
||||||
(when-let* ((last-sent (gethash "last-sent" requests)))
|
(when-let* ((last-sent (gethash "last-sent" requests)))
|
||||||
(not (jupyter-request-idle-received-p last-sent))))))
|
(not (jupyter-request-idle-received-p last-sent))))))
|
||||||
|
|
||||||
;;; HB channel methods
|
;;; Event handlers
|
||||||
|
|
||||||
(cl-defmethod jupyter-hb-pause ((client jupyter-kernel-client))
|
|
||||||
"Pause CLIENT's heartbeat channel."
|
|
||||||
(jupyter-hb-pause (plist-get (oref client channels) :hb)))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-hb-unpause ((client jupyter-kernel-client))
|
|
||||||
"Unpause CLIENT's heartbeat channel."
|
|
||||||
(jupyter-hb-unpause (plist-get (oref client channels) :hb)))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-hb-beating-p ((client jupyter-kernel-client))
|
|
||||||
"Is CLIENT still connected to its kernel?"
|
|
||||||
(jupyter-hb-beating-p (plist-get (oref client channels) :hb)))
|
|
||||||
|
|
||||||
;;; IOLoop handlers (receiving messages, starting/stopping channels)
|
|
||||||
|
|
||||||
;;;; Sending/receiving
|
;;;; Sending/receiving
|
||||||
|
|
||||||
(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop)
|
(cl-defmethod jupyter-event-handler ((client jupyter-kernel-client)
|
||||||
(client jupyter-kernel-client)
|
(event (head sent)))
|
||||||
(event (head sent)))
|
|
||||||
(cl-destructuring-bind (_ channel-type msg-id) event
|
(cl-destructuring-bind (_ channel-type msg-id) event
|
||||||
(unless (eq channel-type :stdin)
|
(unless (eq channel-type :stdin)
|
||||||
;; Anything sent on stdin is a reply and therefore never added as a
|
;; Anything sent on stdin is a reply and therefore never added as a
|
||||||
|
@ -486,145 +429,46 @@ back."
|
||||||
(puthash msg-id req requests)
|
(puthash msg-id req requests)
|
||||||
(puthash "last-sent" req requests)))))
|
(puthash "last-sent" req requests)))))
|
||||||
|
|
||||||
(cl-defmethod jupyter-ioloop-printer ((_ioloop jupyter-ioloop)
|
(cl-defmethod jupyter-event-handler ((client jupyter-kernel-client)
|
||||||
(_client jupyter-kernel-client)
|
(event (head message)))
|
||||||
(event (head message)))
|
|
||||||
(cl-destructuring-bind (_ channel _idents . msg) event
|
(cl-destructuring-bind (_ channel _idents . msg) event
|
||||||
(format "%s" (list
|
(when jupyter--debug
|
||||||
channel
|
(message "%s" (concat (upcase (symbol-name (car event))) ": "
|
||||||
(jupyter-message-type msg)
|
(format "%s" (list
|
||||||
(jupyter-message-content msg)))))
|
channel
|
||||||
|
(jupyter-message-type msg)
|
||||||
|
(jupyter-message-content msg))))))
|
||||||
|
(jupyter-handle-message client channel msg)))
|
||||||
|
|
||||||
(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop)
|
;;; Starting communication with a kernel
|
||||||
(client jupyter-kernel-client)
|
|
||||||
(event (head message)))
|
|
||||||
"For CLIENT, queue a message EVENT to be handled."
|
|
||||||
(cl-destructuring-bind (_ channel _idents . msg) event
|
|
||||||
;; Run immediately after handling this event, i.e. on the next command loop
|
|
||||||
(run-at-time 0 nil #'jupyter-handle-message client channel msg)))
|
|
||||||
|
|
||||||
;;;; Channel alive methods
|
(cl-defmethod jupyter-start-channels ((client jupyter-kernel-client))
|
||||||
|
(jupyter-connect-client (oref client kcomm) client))
|
||||||
(cl-defmethod jupyter-channel-alive-p ((client jupyter-kernel-client) channel)
|
|
||||||
(cl-assert (memq channel '(:hb :stdin :shell :iopub)) t)
|
|
||||||
(with-slots (ioloop channels) client
|
|
||||||
(if (not (eq channel :hb))
|
|
||||||
(when (and ioloop (jupyter-ioloop-alive-p ioloop))
|
|
||||||
(plist-get (plist-get channels channel) :alive-p))
|
|
||||||
(setq channel (plist-get channels :hb))
|
|
||||||
;; The hb channel is implemented locally in the current process whereas the
|
|
||||||
;; other channels are implemented in subprocesses and the current process
|
|
||||||
;; acts as a proxy.
|
|
||||||
(and channel (jupyter-channel-alive-p channel)))))
|
|
||||||
|
|
||||||
;;;; Start channel methods
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop)
|
|
||||||
(client jupyter-kernel-client)
|
|
||||||
(event (head start-channel)))
|
|
||||||
(plist-put (plist-get (oref client channels) (cadr event)) :alive-p t))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-start-channel ((client jupyter-kernel-client) channel)
|
|
||||||
(cl-assert (memq channel '(:hb :stdin :shell :iopub)) t)
|
|
||||||
(unless (jupyter-channel-alive-p client channel)
|
|
||||||
(with-slots (channels) client
|
|
||||||
(if (eq channel :hb)
|
|
||||||
(jupyter-start-channel (plist-get channels :hb))
|
|
||||||
(cl-destructuring-bind (&key endpoint &allow-other-keys)
|
|
||||||
(plist-get channels channel)
|
|
||||||
(jupyter-send
|
|
||||||
(oref client ioloop) 'start-channel channel endpoint))))))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-start-channel :after ((client jupyter-kernel-client) channel)
|
|
||||||
"Verify that CLIENT's CHANNEL started.
|
|
||||||
Raise an error if it did not start within
|
|
||||||
`jupyter-default-timeout'."
|
|
||||||
(unless (or (eq channel :hb) (jupyter-channel-alive-p client channel))
|
|
||||||
(with-slots (ioloop) client
|
|
||||||
(or (jupyter-ioloop-wait-until ioloop 'start-channel
|
|
||||||
(lambda (_) (jupyter-channel-alive-p client channel)))
|
|
||||||
(error "Channel not started in ioloop subprocess (%s)" channel)))))
|
|
||||||
|
|
||||||
;;;; Stop channel methods
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop)
|
|
||||||
(client jupyter-kernel-client)
|
|
||||||
(event (head stop-channel)))
|
|
||||||
(plist-put (plist-get (oref client channels) (cadr event)) :alive-p nil))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-stop-channel ((client jupyter-kernel-client) channel)
|
|
||||||
(cl-assert (memq channel '(:hb :stdin :shell :iopub)) t)
|
|
||||||
(when (jupyter-channel-alive-p client channel)
|
|
||||||
(if (eq channel :hb)
|
|
||||||
(jupyter-stop-channel (plist-get (oref client channels) :hb))
|
|
||||||
(jupyter-send (oref client ioloop) 'stop-channel channel))))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-stop-channel :after ((client jupyter-kernel-client) channel)
|
|
||||||
"Verify that CLIENT's CHANNEL has stopped.
|
|
||||||
Raise a warning if it has not been stopped within
|
|
||||||
`jupyter-default-timeout'."
|
|
||||||
(unless (or (eq channel :hb) (not (jupyter-channel-alive-p client channel)))
|
|
||||||
(with-slots (ioloop) client
|
|
||||||
(or (jupyter-ioloop-wait-until ioloop 'stop-channel
|
|
||||||
(lambda (_) (not (jupyter-channel-alive-p client channel))))
|
|
||||||
(warn "Channel not stopped in ioloop subprocess")))))
|
|
||||||
|
|
||||||
;;; Starting/stopping IOLoop
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-start-channels :before ((client jupyter-kernel-client)
|
|
||||||
&rest _)
|
|
||||||
"Start CLIENT's channel ioloop."
|
|
||||||
(with-slots (ioloop session) client
|
|
||||||
(unless ioloop
|
|
||||||
(oset client ioloop (jupyter-channel-ioloop))
|
|
||||||
(setq ioloop (oref client ioloop)))
|
|
||||||
(unless (jupyter-ioloop-alive-p ioloop)
|
|
||||||
(jupyter-ioloop-start ioloop session client))))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-start-channels ((client jupyter-kernel-client)
|
|
||||||
&key (shell t)
|
|
||||||
(iopub t)
|
|
||||||
(stdin t)
|
|
||||||
(hb t))
|
|
||||||
"Start the pre-configured channels of CLIENT.
|
|
||||||
Before starting the channels, ensure that the channel subprocess
|
|
||||||
responsible for encoding/decoding messages and sending/receiving
|
|
||||||
messages to/from the kernel is running.
|
|
||||||
|
|
||||||
Call `jupyter-start-channel' for every channel whose key has a
|
|
||||||
non-nil value passed to this function.
|
|
||||||
|
|
||||||
If the shell channel is started, send an initial
|
|
||||||
`:kernel-info-request' to set the kernel-info slot of CLIENT if
|
|
||||||
necessary."
|
|
||||||
(cl-loop
|
|
||||||
for (channel . start) in `((:hb . ,hb)
|
|
||||||
(:shell . ,shell)
|
|
||||||
(:iopub . ,iopub)
|
|
||||||
(:stdin . ,stdin))
|
|
||||||
when start do (jupyter-start-channel client channel))
|
|
||||||
;; Needed for reliability. Sometimes we are not fast enough to capture the
|
|
||||||
;; startup message of a kernel.
|
|
||||||
(sleep-for 0.3))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-stop-channels ((client jupyter-kernel-client))
|
(cl-defmethod jupyter-stop-channels ((client jupyter-kernel-client))
|
||||||
"Stop any running channels of CLIENT."
|
"Stop any running channels of CLIENT."
|
||||||
(cl-loop
|
(when (slot-boundp client 'kcomm)
|
||||||
for channel in '(:shell :iopub :stdin :hb)
|
(jupyter-disconnect-client (oref client kcomm) client)))
|
||||||
do (jupyter-stop-channel client channel)))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-stop-channels :after ((client jupyter-kernel-client)
|
|
||||||
&rest _)
|
|
||||||
"Stop CLIENT's channel ioloop."
|
|
||||||
(with-slots (ioloop) client
|
|
||||||
(when ioloop
|
|
||||||
(jupyter-ioloop-stop ioloop))))
|
|
||||||
|
|
||||||
(cl-defmethod jupyter-channels-running-p ((client jupyter-kernel-client))
|
(cl-defmethod jupyter-channels-running-p ((client jupyter-kernel-client))
|
||||||
"Are any channels of CLIENT running?"
|
"Are any channels of CLIENT running?"
|
||||||
(cl-loop
|
(jupyter-comm-alive-p (oref client kcomm)))
|
||||||
for channel in '(:shell :iopub :stdin :hb)
|
|
||||||
thereis (jupyter-channel-alive-p client channel)))
|
(cl-defmethod jupyter-channel-alive-p ((client jupyter-kernel-client) channel)
|
||||||
|
(jupyter-channel-alive-p (oref client kcomm) channel))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-hb-pause ((client jupyter-kernel-client))
|
||||||
|
(when (cl-typep (oref client kcomm) 'jupyter-hb-comm)
|
||||||
|
(jupyter-hb-pause (oref client kcomm))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-hb-unpause ((client jupyter-kernel-client))
|
||||||
|
(when (cl-typep (oref client kcomm) 'jupyter-hb-comm)
|
||||||
|
(jupyter-hb-unpause (oref client kcomm))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-hb-beating-p ((client jupyter-kernel-client))
|
||||||
|
"Is CLIENT still connected to its kernel?"
|
||||||
|
(or (null (cl-typep (oref client kcomm) 'jupyter-hb-comm))
|
||||||
|
(jupyter-hb-beating-p (oref client kcomm))))
|
||||||
|
|
||||||
;;; Message callbacks
|
;;; Message callbacks
|
||||||
|
|
||||||
|
|
426
jupyter-comm-layer.el
Normal file
426
jupyter-comm-layer.el
Normal file
|
@ -0,0 +1,426 @@
|
||||||
|
;;; jupyter-comm-layer.el --- Kernel communication layer -*- lexical-binding: t -*-
|
||||||
|
|
||||||
|
;; Copyright (C) 2019 Nathaniel Nicandro
|
||||||
|
|
||||||
|
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
|
||||||
|
;; Created: 06 Apr 2019
|
||||||
|
;; Version: 0.7.3
|
||||||
|
|
||||||
|
;; This program is free software; you can redistribute it and/or
|
||||||
|
;; modify it under the terms of the GNU General Public License as
|
||||||
|
;; published by the Free Software Foundation; either version 2, or (at
|
||||||
|
;; your option) any later version.
|
||||||
|
|
||||||
|
;; This program is distributed in the hope that it will be useful, but
|
||||||
|
;; WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||||
|
;; General Public License for more details.
|
||||||
|
|
||||||
|
;; You should have received a copy of the GNU General Public License
|
||||||
|
;; along with GNU Emacs; see the file COPYING. If not, write to the
|
||||||
|
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
||||||
|
;; Boston, MA 02111-1307, USA.
|
||||||
|
|
||||||
|
;;; Commentary:
|
||||||
|
|
||||||
|
;; Communication with a kernel can happen in various ways, e.g. through zmq
|
||||||
|
;; sockets, a websocket, and potentially others.
|
||||||
|
;;
|
||||||
|
;; The purpose of this file is to implement a kernel communication layer to
|
||||||
|
;; abstract away how a client communicates with the kernel it is connected to.
|
||||||
|
;;
|
||||||
|
;; A specific kernel communication layer (kcomm for short) is implemented by
|
||||||
|
;; extending the methods: `jupyter-comm-start', `jupyter-comm-stop',
|
||||||
|
;; `jupyter-comm-alive-p',`jupyter-event-handler', `jupyter-send', and possibly
|
||||||
|
;; `jupyter-initialize-connection'.
|
||||||
|
;;
|
||||||
|
;; A client registers with the kcomm by calling `jupyter-connect-client' and
|
||||||
|
;; de-registers with `jupyter-disconnect-client'. The communication layer deals
|
||||||
|
;; with "events" which are just lists with an identifying symbol as the head
|
||||||
|
;; element. Events that occur on the communication layer meant for clients,
|
||||||
|
;; e.g. a message received by a kernel or notification that a message was sent
|
||||||
|
;; to a kernel, will be broadcast to all registered clients. Every client
|
||||||
|
;; wanting to receive such events must extend the method
|
||||||
|
;; `jupyter-event-handler' using the head method specializer.
|
||||||
|
;;
|
||||||
|
;; An event is sent to the kernel using `jupyter-send'. So that sending an
|
||||||
|
;; event to the communication layer would look like
|
||||||
|
;;
|
||||||
|
;; (jupyter-send kcomm 'send channel-type msg-type msg msg-id)
|
||||||
|
;;
|
||||||
|
;; The possible events that can be handled by a client is dependent on the
|
||||||
|
;; communication layer, but a `jupyter-kernel-client' implements handlers for a
|
||||||
|
;; `message' event (a kernel message) and a `sent' event (a notification that a
|
||||||
|
;; message was sent to a kernel).
|
||||||
|
|
||||||
|
;;; Code:
|
||||||
|
|
||||||
|
(eval-when-compile (require 'subr-x))
|
||||||
|
(require 'jupyter-base)
|
||||||
|
(require 'jupyter-channel-ioloop)
|
||||||
|
(require 'jupyter-messages)
|
||||||
|
|
||||||
|
(defgroup jupyter-comm-layer nil
|
||||||
|
"Kernel communication layer"
|
||||||
|
:group 'jupyter)
|
||||||
|
|
||||||
|
(defclass jupyter-comm-layer ()
|
||||||
|
((clients :type list :initform nil))
|
||||||
|
:abstract t)
|
||||||
|
|
||||||
|
;;; `jupyter-comm-layer'
|
||||||
|
|
||||||
|
(cl-defgeneric jupyter-comm-start ((comm jupyter-comm-layer) &rest _)
|
||||||
|
"Start communication on COMM.")
|
||||||
|
|
||||||
|
(cl-defgeneric jupyter-comm-stop ((comm jupyter-comm-layer) &rest _)
|
||||||
|
"Stop communication on COMM.")
|
||||||
|
|
||||||
|
(cl-defgeneric jupyter-comm-alive-p ((comm jupyter-comm-layer))
|
||||||
|
"Return non-nil if communication has started on COMM.")
|
||||||
|
|
||||||
|
(cl-defgeneric jupyter-connect-client ((comm jupyter-comm-layer) obj)
|
||||||
|
"Register OBJ to receive events from COMM.
|
||||||
|
By default, on the first OBJ connected, `jupyter-comm-start' is
|
||||||
|
called if needed. This means that a call to
|
||||||
|
`jupyter-initialize-connection' should precede a call to
|
||||||
|
`jupyter-connect-client'.")
|
||||||
|
|
||||||
|
(cl-defgeneric jupyter-disconnect-client ((comm jupyter-comm-layer) obj)
|
||||||
|
"De-register OBJ from receiving events from COMM.
|
||||||
|
By default, on the last OBJ removed, `jupyter-comm-stop' is
|
||||||
|
called if needed.")
|
||||||
|
|
||||||
|
(cl-defgeneric jupyter-event-handler (_obj _event)
|
||||||
|
"Handle EVENT using OBJ."
|
||||||
|
nil)
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-send ((_comm jupyter-comm-layer) &rest _event)
|
||||||
|
"Send EVENT to the underlying kernel using COMM."
|
||||||
|
(error "Subclasses need to override this method"))
|
||||||
|
|
||||||
|
(cl-defgeneric jupyter-initialize-connection ((comm jupyter-comm-layer) &rest _)
|
||||||
|
"Initialize communication on COMM."
|
||||||
|
(when (jupyter-comm-alive-p comm)
|
||||||
|
(error "Can't initialize a live comm")))
|
||||||
|
|
||||||
|
;; TODO: Figure out a better interface for these channel methods or just make
|
||||||
|
;; them unnecessary. The design of `jupyter-comm-layer' only deals with
|
||||||
|
;; "events" and the channel abstraction is an implementation detail that
|
||||||
|
;; shouldn't be visible to the client.
|
||||||
|
|
||||||
|
(cl-defgeneric jupyter-channels-running-p ((comm jupyter-comm-layer))
|
||||||
|
"Are any channels of CLIENT running?")
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-channel-alive-p ((_comm jupyter-comm-layer) _channel)
|
||||||
|
(error "Need to implement"))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-connect-client ((comm jupyter-comm-layer) obj)
|
||||||
|
(unless (cl-loop for ref in (oref comm clients)
|
||||||
|
thereis (eq (jupyter-weak-ref-resolve ref) obj))
|
||||||
|
(push (jupyter-weak-ref obj) (oref comm clients)))
|
||||||
|
;; Remove any garbage collected clients
|
||||||
|
(oset comm clients
|
||||||
|
(cl-remove-if-not #'jupyter-weak-ref-resolve
|
||||||
|
(oref comm clients)))
|
||||||
|
(unless (jupyter-comm-alive-p comm)
|
||||||
|
(jupyter-comm-start comm)))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-disconnect-client ((comm jupyter-comm-layer) obj)
|
||||||
|
(oset comm clients
|
||||||
|
(cl-remove-if (lambda (ref)
|
||||||
|
(let ((deref (jupyter-weak-ref-resolve ref)))
|
||||||
|
(or (eq deref obj) (null deref))))
|
||||||
|
(oref comm clients)))
|
||||||
|
;; FIXME: This is more of a convenience and it probably makes sense to keep
|
||||||
|
;; the comm open even though there are no clients.
|
||||||
|
(when (and (jupyter-comm-alive-p comm)
|
||||||
|
(zerop (length (oref comm clients))))
|
||||||
|
(jupyter-comm-stop comm)))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-event-handler ((comm jupyter-comm-layer) event)
|
||||||
|
"Broadcast EVENT to all clients registered to receive them on COMM."
|
||||||
|
;; TODO: Dynamically cleanup list of garbage collected clients when looping
|
||||||
|
;; over it.
|
||||||
|
(let ((clients (oref comm clients)))
|
||||||
|
(while clients
|
||||||
|
(when-let* ((client (jupyter-weak-ref-resolve (pop clients))))
|
||||||
|
(run-at-time 0 nil #'jupyter-event-handler client event)))))
|
||||||
|
|
||||||
|
;;; `jupyter-hb-comm'
|
||||||
|
;; If the communication layer can talk to a heartbeat channel, then it should
|
||||||
|
;; add this class as a parent class.
|
||||||
|
|
||||||
|
(defclass jupyter-hb-comm ()
|
||||||
|
((hb :type jupyter-hb-channel))
|
||||||
|
:abstract t)
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-hb-beating-p ((comm jupyter-hb-comm))
|
||||||
|
(jupyter-hb-beating-p (oref comm hb)))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-hb-pause ((comm jupyter-hb-comm))
|
||||||
|
(jupyter-hb-pause (oref comm hb)))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-hb-unpause ((comm jupyter-hb-comm))
|
||||||
|
(jupyter-hb-unpause (oref comm hb)))
|
||||||
|
|
||||||
|
;;; `jupyter-channel-comm'
|
||||||
|
;; A communication layer using `jupyter-sync-channel' objects for communicating
|
||||||
|
;; with a kernel. This communication layer is mainly meant for speed comparison
|
||||||
|
;; with the `jupyter-channel-ioloop-comm' layer. It implements communication in
|
||||||
|
;; the current Emacs instance and comparing it with the
|
||||||
|
;; `jupyter-channel-ioloop-comm' shows how much of a slow down there is when
|
||||||
|
;; all the processing of messages happens in the current Emacs instance.
|
||||||
|
;;
|
||||||
|
;; Running the test suit using `jupyter-channel-comm' vs
|
||||||
|
;; `jupyter-channel-ioloop-comm' shows, very roughly, around a 2x speed up
|
||||||
|
;; using `jupyter-channel-ioloop-comm'.
|
||||||
|
|
||||||
|
;; FIXME: This is needed since the `jupyter-kernel-client' and
|
||||||
|
;; `jupyter-channel-ioloop' use keywords whereas you can only access slots
|
||||||
|
;; using symbols.
|
||||||
|
(defsubst jupyter-comm--channel (c)
|
||||||
|
(cl-case c
|
||||||
|
(:hb 'hb)
|
||||||
|
(:iopub 'iopub)
|
||||||
|
(:shell 'shell)
|
||||||
|
(:stdin 'stdin)))
|
||||||
|
|
||||||
|
(defclass jupyter-sync-channel-comm (jupyter-comm-layer
|
||||||
|
jupyter-hb-comm)
|
||||||
|
((session :type jupyter-session)
|
||||||
|
(iopub :type jupyter-sync-channel)
|
||||||
|
(shell :type jupyter-sync-channel)
|
||||||
|
(stdin :type jupyter-sync-channel)
|
||||||
|
(thread)))
|
||||||
|
|
||||||
|
(cl-defmethod initialize-instance ((_comm jupyter-sync-channel-comm) &rest _)
|
||||||
|
(unless (functionp 'make-thread)
|
||||||
|
(error "Need threading support"))
|
||||||
|
(cl-call-next-method))
|
||||||
|
|
||||||
|
(defun jupyter-sync-channel-comm--check (comm)
|
||||||
|
(condition-case err
|
||||||
|
(cl-loop
|
||||||
|
for channel-type in '(:iopub :shell :stdin)
|
||||||
|
for channel = (slot-value comm (jupyter-comm--channel channel-type))
|
||||||
|
for msg = (and (jupyter-channel-alive-p channel)
|
||||||
|
(with-slots (session socket) channel
|
||||||
|
(condition-case nil
|
||||||
|
(jupyter-recv session socket zmq-DONTWAIT)
|
||||||
|
((zmq-EINTR zmq-EAGAIN) nil))))
|
||||||
|
when msg do (jupyter-event-handler
|
||||||
|
comm (cl-list* 'message channel-type msg)))
|
||||||
|
(error
|
||||||
|
(thread-signal (car (all-threads)) (car err)
|
||||||
|
(cons 'jupyter-sync-channel-comm--check (cdr err)))
|
||||||
|
(signal (car err) (cdr err)))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-start ((comm jupyter-sync-channel-comm))
|
||||||
|
(cl-loop
|
||||||
|
for channel in '(hb shell iopub stdin)
|
||||||
|
do (jupyter-start-channel (slot-value comm channel)))
|
||||||
|
(oset comm thread
|
||||||
|
(make-thread
|
||||||
|
(let ((comm-ref (jupyter-weak-ref comm)))
|
||||||
|
(lambda ()
|
||||||
|
(while (when-let* ((comm (jupyter-weak-ref-resolve comm-ref)))
|
||||||
|
(prog1 comm
|
||||||
|
(jupyter-sync-channel-comm--check comm)))
|
||||||
|
(thread-yield)
|
||||||
|
(thread-yield)))))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-stop ((comm jupyter-sync-channel-comm))
|
||||||
|
(when (and (slot-boundp comm 'thread)
|
||||||
|
(thread-alive-p (oref comm thread)))
|
||||||
|
(thread-signal (oref comm thread) 'quit nil)
|
||||||
|
(slot-makeunbound comm 'thread))
|
||||||
|
(cl-loop
|
||||||
|
for channel in '(hb shell iopub stdin)
|
||||||
|
do (jupyter-stop-channel (slot-value comm channel))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-alive-p ((comm jupyter-sync-channel-comm))
|
||||||
|
(jupyter-channels-running-p comm))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-channel-alive-p ((comm jupyter-sync-channel-comm) channel)
|
||||||
|
(and (slot-boundp comm (jupyter-comm--channel channel))
|
||||||
|
(jupyter-channel-alive-p (slot-value comm (jupyter-comm--channel channel)))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-channels-running-p ((comm jupyter-sync-channel-comm))
|
||||||
|
(cl-loop
|
||||||
|
for channel in '(:shell :iopub :stdin :hb)
|
||||||
|
thereis (jupyter-channel-alive-p comm channel)))
|
||||||
|
|
||||||
|
;;;; Channel start/stop methods
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-stop-channel ((comm jupyter-sync-channel-comm) channel)
|
||||||
|
(when (jupyter-channel-alive-p comm channel)
|
||||||
|
(jupyter-stop-channel
|
||||||
|
(slot-value comm (jupyter-comm--channel channel)))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-start-channel ((comm jupyter-sync-channel-comm) channel)
|
||||||
|
(unless (jupyter-channel-alive-p comm channel)
|
||||||
|
(jupyter-start-channel
|
||||||
|
(slot-value comm (jupyter-comm--channel channel)))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-initialize-connection ((comm jupyter-sync-channel-comm)
|
||||||
|
(session jupyter-session))
|
||||||
|
(cl-call-next-method)
|
||||||
|
(let ((endpoints (jupyter-session-endpoints session)))
|
||||||
|
(oset comm session (copy-sequence session))
|
||||||
|
(oset comm hb (make-instance
|
||||||
|
'jupyter-hb-channel
|
||||||
|
:session (oref comm session)
|
||||||
|
:endpoint (plist-get endpoints :hb)))
|
||||||
|
(cl-loop
|
||||||
|
for channel in `(:stdin :shell :iopub)
|
||||||
|
do (setf (slot-value comm (jupyter-comm--channel channel))
|
||||||
|
(jupyter-sync-channel
|
||||||
|
:type channel
|
||||||
|
:session (oref comm session)
|
||||||
|
:endpoint (plist-get endpoints channel))))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-send ((comm jupyter-sync-channel-comm)
|
||||||
|
_ channel-type msg-type msg msg-id)
|
||||||
|
(let ((channel (slot-value comm (jupyter-comm--channel channel-type))))
|
||||||
|
;; Run the event handler on the next command loop since the expectation is
|
||||||
|
;; the client is that sending is asynchronous. There may be some code that
|
||||||
|
;; makes assumptions based on this.
|
||||||
|
(run-at-time
|
||||||
|
0 nil (lambda (id)
|
||||||
|
(jupyter-event-handler comm (list 'sent channel-type id)))
|
||||||
|
(jupyter-send channel msg-type msg msg-id))))
|
||||||
|
|
||||||
|
;;; `jupyter-ioloop-comm'
|
||||||
|
|
||||||
|
(defclass jupyter-ioloop-comm (jupyter-comm-layer)
|
||||||
|
((ioloop :type jupyter-ioloop))
|
||||||
|
:abstract t)
|
||||||
|
|
||||||
|
;; Fall back method that catches IOLoop events that have not been handled by
|
||||||
|
;; the communication layer already.
|
||||||
|
(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop)
|
||||||
|
(comm jupyter-ioloop-comm)
|
||||||
|
event)
|
||||||
|
(unless (memq (car event) '(start quit))
|
||||||
|
(jupyter-event-handler comm event)))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-send ((comm jupyter-ioloop-comm) &rest event)
|
||||||
|
(apply #'jupyter-send (oref comm ioloop) event))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-start ((comm jupyter-ioloop-comm))
|
||||||
|
(with-slots (ioloop) comm
|
||||||
|
(unless (jupyter-ioloop-alive-p ioloop)
|
||||||
|
(jupyter-ioloop-start ioloop comm))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-stop ((comm jupyter-ioloop-comm))
|
||||||
|
(with-slots (ioloop) comm
|
||||||
|
(when (jupyter-ioloop-alive-p ioloop)
|
||||||
|
(jupyter-ioloop-stop ioloop))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-alive-p ((comm jupyter-ioloop-comm))
|
||||||
|
(and (slot-boundp comm 'ioloop)
|
||||||
|
(jupyter-ioloop-alive-p (oref comm ioloop))))
|
||||||
|
|
||||||
|
;;; `jupyter-channel-ioloop-comm'
|
||||||
|
|
||||||
|
(cl-defstruct jupyter-proxy-channel endpoint alive-p)
|
||||||
|
|
||||||
|
(defclass jupyter-channel-ioloop-comm (jupyter-ioloop-comm jupyter-hb-comm)
|
||||||
|
((session :type jupyter-session)
|
||||||
|
(iopub :type jupyter-proxy-channel)
|
||||||
|
(shell :type jupyter-proxy-channel)
|
||||||
|
(stdin :type jupyter-proxy-channel)))
|
||||||
|
|
||||||
|
(cl-defmethod initialize-instance ((comm jupyter-channel-ioloop-comm) &rest _)
|
||||||
|
(cl-call-next-method)
|
||||||
|
(oset comm ioloop (jupyter-channel-ioloop)))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-initialize-connection ((comm jupyter-channel-ioloop-comm)
|
||||||
|
(session jupyter-session))
|
||||||
|
(cl-call-next-method)
|
||||||
|
(let ((endpoints (jupyter-session-endpoints session)))
|
||||||
|
(oset comm session (copy-sequence session))
|
||||||
|
(oset comm hb (make-instance
|
||||||
|
'jupyter-hb-channel
|
||||||
|
:session (oref comm session)
|
||||||
|
:endpoint (plist-get endpoints :hb)))
|
||||||
|
(cl-loop
|
||||||
|
for channel in '(:stdin :shell :iopub)
|
||||||
|
do (setf (slot-value comm (jupyter-comm--channel channel))
|
||||||
|
(make-jupyter-proxy-channel
|
||||||
|
:endpoint (plist-get endpoints channel)
|
||||||
|
:alive-p nil)))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-start ((comm jupyter-channel-ioloop-comm))
|
||||||
|
(with-slots (ioloop session) comm
|
||||||
|
(unless (jupyter-ioloop-alive-p ioloop)
|
||||||
|
(jupyter-ioloop-start ioloop session comm))
|
||||||
|
(cl-loop
|
||||||
|
for channel in '(:hb :shell :iopub :stdin)
|
||||||
|
do (jupyter-start-channel comm channel))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-stop ((comm jupyter-channel-ioloop-comm))
|
||||||
|
(cl-loop
|
||||||
|
for channel in '(:hb :shell :iopub :stdin)
|
||||||
|
do (jupyter-stop-channel comm channel))
|
||||||
|
(cl-call-next-method))
|
||||||
|
|
||||||
|
;;;; `jupyter-channel-ioloop' events
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-channel-ioloop)
|
||||||
|
(comm jupyter-channel-ioloop-comm)
|
||||||
|
(event (head stop-channel)))
|
||||||
|
(setf (jupyter-proxy-channel-alive-p
|
||||||
|
(slot-value comm (jupyter-comm--channel (cadr event))))
|
||||||
|
nil))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-channel-ioloop)
|
||||||
|
(comm jupyter-channel-ioloop-comm)
|
||||||
|
(event (head start-channel)))
|
||||||
|
(setf (jupyter-proxy-channel-alive-p
|
||||||
|
(slot-value comm (jupyter-comm--channel (cadr event))))
|
||||||
|
t))
|
||||||
|
|
||||||
|
;;;; Channel querying methods
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-channel-alive-p ((comm jupyter-channel-ioloop-comm) channel)
|
||||||
|
(if (eq channel :hb) (jupyter-channel-alive-p (oref comm hb))
|
||||||
|
(with-slots (ioloop) comm
|
||||||
|
(and ioloop (jupyter-ioloop-alive-p ioloop)
|
||||||
|
(jupyter-proxy-channel-alive-p
|
||||||
|
(slot-value comm (jupyter-comm--channel channel)))))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-channels-running-p ((comm jupyter-channel-ioloop-comm))
|
||||||
|
"Are any channels of CLIENT running?"
|
||||||
|
(cl-loop
|
||||||
|
for channel in '(:shell :iopub :stdin :hb)
|
||||||
|
thereis (jupyter-channel-alive-p comm channel)))
|
||||||
|
|
||||||
|
;;;; Channel start/stop methods
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-stop-channel ((comm jupyter-channel-ioloop-comm) channel)
|
||||||
|
(when (jupyter-channel-alive-p comm channel)
|
||||||
|
(if (eq channel :hb) (jupyter-stop-channel (oref comm hb))
|
||||||
|
(with-slots (ioloop) comm
|
||||||
|
(jupyter-send ioloop 'stop-channel channel)
|
||||||
|
;; Verify that the channel stops
|
||||||
|
(or (jupyter-ioloop-wait-until ioloop 'stop-channel
|
||||||
|
(lambda (_) (not (jupyter-channel-alive-p comm channel))))
|
||||||
|
(error "Channel not stopped in ioloop subprocess"))))))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-start-channel ((comm jupyter-channel-ioloop-comm) channel)
|
||||||
|
(unless (jupyter-channel-alive-p comm channel)
|
||||||
|
(if (eq channel :hb) (jupyter-start-channel (oref comm hb))
|
||||||
|
(let ((endpoint (jupyter-proxy-channel-endpoint
|
||||||
|
(slot-value comm (jupyter-comm--channel channel)))))
|
||||||
|
(with-slots (ioloop) comm
|
||||||
|
(jupyter-send ioloop 'start-channel channel endpoint)
|
||||||
|
;; Verify that the channel starts
|
||||||
|
(or (jupyter-ioloop-wait-until ioloop 'start-channel
|
||||||
|
(lambda (_) (jupyter-channel-alive-p comm channel)))
|
||||||
|
(error "Channel not started in ioloop subprocess (%s)" channel)))))))
|
||||||
|
|
||||||
|
(provide 'jupyter-comm-layer)
|
||||||
|
|
||||||
|
;;; jupyter-comm-layer.el ends here
|
|
@ -114,8 +114,15 @@ connect to MANAGER's kernel."
|
||||||
(signal 'wrong-type-argument (list '(subclass jupyter-kernel-client) class)))
|
(signal 'wrong-type-argument (list '(subclass jupyter-kernel-client) class)))
|
||||||
(let ((client (apply #'make-instance class slots)))
|
(let ((client (apply #'make-instance class slots)))
|
||||||
(prog1 client
|
(prog1 client
|
||||||
(jupyter-initialize-connection client (oref manager session))
|
(oset client manager manager)
|
||||||
(oset client manager manager))))
|
;; TODO: We can also have the manager hold the kcomm object and just
|
||||||
|
;; pass a single kcomm object to all clients using this manager since the
|
||||||
|
;; kcomm broadcasts event to all connected clients. This is more
|
||||||
|
;; efficient as it only uses one subprocess for every client connected to
|
||||||
|
;; a kernel.
|
||||||
|
(oset client kcomm (jupyter-channel-ioloop-comm))
|
||||||
|
(jupyter-initialize-connection
|
||||||
|
client (copy-sequence (oref manager session))))))
|
||||||
|
|
||||||
(defun jupyter--kernel-sentinel (kernel &optional _)
|
(defun jupyter--kernel-sentinel (kernel &optional _)
|
||||||
"Kill the KERNEL process and its buffer."
|
"Kill the KERNEL process and its buffer."
|
||||||
|
|
|
@ -287,15 +287,15 @@ and `:msg_type'."
|
||||||
(cdr parts)
|
(cdr parts)
|
||||||
(let ((dheader (jupyter--decode header)))
|
(let ((dheader (jupyter--decode header)))
|
||||||
(list
|
(list
|
||||||
:header `(message-part ,header ,dheader)
|
:header (list 'message-part header dheader)
|
||||||
:msg_id (plist-get dheader :msg_id)
|
:msg_id (plist-get dheader :msg_id)
|
||||||
:msg_type (plist-get dheader :msg_type)
|
:msg_type (plist-get dheader :msg_type)
|
||||||
;; Also decode the parent header here since it is used quite often in
|
;; Also decode the parent header here since it is used quite often in
|
||||||
;; the parent Emacs process
|
;; the parent Emacs process
|
||||||
:parent_header `(message-part ,parent-header
|
:parent_header (list 'message-part parent-header
|
||||||
,(jupyter--decode parent-header))
|
(jupyter--decode parent-header))
|
||||||
:metadata `(message-part ,metadata nil)
|
:metadata (list 'message-part metadata nil)
|
||||||
:content `(message-part ,content nil)
|
:content (list 'message-part content nil)
|
||||||
:buffers buffers))))
|
:buffers buffers))))
|
||||||
|
|
||||||
;;; Sending/receiving
|
;;; Sending/receiving
|
||||||
|
|
|
@ -66,6 +66,32 @@
|
||||||
(jupyter-request-id req)))
|
(jupyter-request-id req)))
|
||||||
(should (equal (jupyter-message-get (caddr msgs) :execution_state) "idle"))))))
|
(should (equal (jupyter-message-get (caddr msgs) :execution_state) "idle"))))))
|
||||||
|
|
||||||
|
;;;; Comm layer
|
||||||
|
|
||||||
|
(ert-deftest jupyter-comm-layer ()
|
||||||
|
:tags '(mock comm)
|
||||||
|
(let ((comm (jupyter-mock-comm-layer))
|
||||||
|
(obj (make-jupyter-mock-comm-obj)))
|
||||||
|
(jupyter-connect-client comm obj)
|
||||||
|
(should (= (length (oref comm clients)) 1))
|
||||||
|
(should (eq (jupyter-weak-ref-resolve (car (oref comm clients))) obj))
|
||||||
|
(should (= (oref comm alive) 1))
|
||||||
|
(jupyter-connect-client comm obj)
|
||||||
|
(should (= (length (oref comm clients)) 1))
|
||||||
|
(should (eq (jupyter-weak-ref-resolve (car (oref comm clients))) obj))
|
||||||
|
(should (= (oref comm alive) 1))
|
||||||
|
|
||||||
|
(should-not (jupyter-mock-comm-obj-event obj))
|
||||||
|
(jupyter-event-handler comm '(event))
|
||||||
|
;; Events are handled in a timer, not right away
|
||||||
|
(sleep-for 0.01)
|
||||||
|
(should (equal (jupyter-mock-comm-obj-event obj) '(event)))
|
||||||
|
|
||||||
|
(jupyter-disconnect-client comm obj)
|
||||||
|
(should (= (length (oref comm clients)) 0))
|
||||||
|
(should-not (oref comm alive))
|
||||||
|
(jupyter-disconnect-client comm obj)))
|
||||||
|
|
||||||
;;; Callbacks
|
;;; Callbacks
|
||||||
|
|
||||||
(ert-deftest jupyter-wait-until-idle ()
|
(ert-deftest jupyter-wait-until-idle ()
|
||||||
|
@ -524,20 +550,25 @@
|
||||||
;;; Client
|
;;; Client
|
||||||
|
|
||||||
;; TODO: Different values of the session argument
|
;; TODO: Different values of the session argument
|
||||||
|
;; TODO: Update for new `jupyter-channel-ioloop-comm'
|
||||||
(ert-deftest jupyter-initialize-connection ()
|
(ert-deftest jupyter-initialize-connection ()
|
||||||
:tags '(client init)
|
:tags '(client init)
|
||||||
|
(skip-unless nil)
|
||||||
|
;; The default comm is a jupyter-channel-ioloop-comm
|
||||||
(let ((conn-info (jupyter-test-conn-info-plist))
|
(let ((conn-info (jupyter-test-conn-info-plist))
|
||||||
(client (jupyter-kernel-client)))
|
(client (jupyter-kernel-client)))
|
||||||
|
(oset client kcomm (jupyter-sync-channel-comm))
|
||||||
(jupyter-initialize-connection client conn-info)
|
(jupyter-initialize-connection client conn-info)
|
||||||
(with-slots (session channels) client
|
;; kcomm by default is a `jupyter-channel-ioloop-comm'
|
||||||
|
(with-slots (session kcomm) client
|
||||||
(ert-info ("Client session")
|
(ert-info ("Client session")
|
||||||
(should (string= (jupyter-session-key session)
|
(should (string= (jupyter-session-key session)
|
||||||
(plist-get conn-info :key)))
|
(plist-get conn-info :key)))
|
||||||
(should (equal (jupyter-session-conn-info session)
|
(should (equal (jupyter-session-conn-info session)
|
||||||
conn-info)))
|
conn-info)))
|
||||||
(ert-info ("Heartbeat channel initialized")
|
(ert-info ("Heartbeat channel initialized")
|
||||||
(should (eq session (oref (plist-get channels :hb) session)))
|
(should (eq session (oref (oref kcomm hb) session)))
|
||||||
(should (string= (oref (plist-get channels :hb) endpoint)
|
(should (string= (oref (oref kcomm hb) endpoint)
|
||||||
(format "tcp://127.0.0.1:%d"
|
(format "tcp://127.0.0.1:%d"
|
||||||
(plist-get conn-info :hb_port)))))
|
(plist-get conn-info :hb_port)))))
|
||||||
(ert-info ("Shell, iopub, stdin initialized")
|
(ert-info ("Shell, iopub, stdin initialized")
|
||||||
|
@ -566,6 +597,7 @@
|
||||||
(ert-info ("Starting/stopping channels")
|
(ert-info ("Starting/stopping channels")
|
||||||
(let ((conn-info (jupyter-test-conn-info-plist))
|
(let ((conn-info (jupyter-test-conn-info-plist))
|
||||||
(client (jupyter-kernel-client)))
|
(client (jupyter-kernel-client)))
|
||||||
|
(oset client kcomm (jupyter-sync-channel-comm))
|
||||||
(jupyter-initialize-connection client conn-info)
|
(jupyter-initialize-connection client conn-info)
|
||||||
(cl-loop
|
(cl-loop
|
||||||
for channel in '(:hb :shell :iopub :stdin)
|
for channel in '(:hb :shell :iopub :stdin)
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
(require 'zmq)
|
(require 'zmq)
|
||||||
(require 'jupyter-client)
|
(require 'jupyter-client)
|
||||||
(require 'jupyter-repl)
|
(require 'jupyter-repl)
|
||||||
|
(require 'jupyter-comm-layer)
|
||||||
(require 'jupyter-org-client)
|
(require 'jupyter-org-client)
|
||||||
(require 'jupyter-kernel-manager)
|
(require 'jupyter-kernel-manager)
|
||||||
(require 'cl-lib)
|
(require 'cl-lib)
|
||||||
|
@ -58,11 +59,12 @@ handling a message is always
|
||||||
(cl-defmethod initialize-instance ((client jupyter-echo-client) &rest _slots)
|
(cl-defmethod initialize-instance ((client jupyter-echo-client) &rest _slots)
|
||||||
(cl-call-next-method)
|
(cl-call-next-method)
|
||||||
(oset client messages (make-ring 10))
|
(oset client messages (make-ring 10))
|
||||||
(oset client channels
|
(oset client kcomm (jupyter-channel-ioloop-comm))
|
||||||
(list :hb (jupyter-hb-channel)
|
(with-slots (kcomm) client
|
||||||
:shell (list :alive-p nil :endpoint "foo://bar")
|
(oset kcomm hb (jupyter-hb-channel))
|
||||||
:stdin (list :alive-p nil :endpoint "foo://bar")
|
(oset kcomm stdin (make-jupyter-proxy-channel))
|
||||||
:iopub (list :alive-p nil :endpoint "foo://bar"))))
|
(oset kcomm shell (make-jupyter-proxy-channel))
|
||||||
|
(oset kcomm iopub (make-jupyter-proxy-channel))))
|
||||||
|
|
||||||
(cl-defmethod jupyter-send ((client jupyter-echo-client)
|
(cl-defmethod jupyter-send ((client jupyter-echo-client)
|
||||||
channel
|
channel
|
||||||
|
@ -96,6 +98,29 @@ handling a message is always
|
||||||
(ring-insert+extend (oref client messages) msg 'grow)
|
(ring-insert+extend (oref client messages) msg 'grow)
|
||||||
(cl-call-next-method))
|
(cl-call-next-method))
|
||||||
|
|
||||||
|
;;; `jupyter-mock-comm-layer'
|
||||||
|
|
||||||
|
(defclass jupyter-mock-comm-layer (jupyter-comm-layer)
|
||||||
|
((alive :initform nil)))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-alive-p ((comm jupyter-mock-comm-layer))
|
||||||
|
(oref comm alive))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-start ((comm jupyter-mock-comm-layer))
|
||||||
|
(unless (oref comm alive)
|
||||||
|
(oset comm alive 0))
|
||||||
|
(cl-incf (oref comm alive)))
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-comm-stop ((comm jupyter-mock-comm-layer))
|
||||||
|
(cl-decf (oref comm alive))
|
||||||
|
(when (zerop (oref comm alive))
|
||||||
|
(oset comm alive nil)))
|
||||||
|
|
||||||
|
(cl-defstruct jupyter-mock-comm-obj event)
|
||||||
|
|
||||||
|
(cl-defmethod jupyter-event-handler ((obj jupyter-mock-comm-obj) event)
|
||||||
|
(setf (jupyter-mock-comm-obj-event obj) event))
|
||||||
|
|
||||||
;;; Macros
|
;;; Macros
|
||||||
|
|
||||||
(cl-defmacro jupyter-ert-info ((message-form &key ((:prefix prefix-form) "Info: "))
|
(cl-defmacro jupyter-ert-info ((message-form &key ((:prefix prefix-form) "Info: "))
|
||||||
|
|
Loading…
Add table
Reference in a new issue