Update channel implementation

- Distinguish between an asynchronous channel and a synchronous one

  - `jupyter-async-channel` :: Messages are sent and received through an ioloop
                               subprocess that the channel has access to. The
                               subprocess uses the `jupyter-sync-channel` for
                               sending and receiving messages.

  - `jupyter-sync-channel` :: Messages are sent and received through a
                               `zmq-socket` in the current Emacs session.

  - Define `jupyter-send` and `jupyter-recv` methods for channels.

  - Update `jupyter-kernel-client` and ioloop interfaces to take into account
    changes with channels.
This commit is contained in:
Nathaniel Nicandro 2018-02-03 00:02:33 -06:00
parent c2c62329cb
commit 43a59ba74d
3 changed files with 292 additions and 270 deletions

View file

@ -23,37 +23,6 @@
;;; Commentary: ;;; Commentary:
;; TODO: The `jupyter-channel' methods need work. `jupyter-kernel-client'
;; actually only uses a `jupyter-channel' to store received messages in the
;; recv-queue slot, to get the endpoint information for sockets created in a
;; client's ioloop subprocess, and to dispatch to message handlers using
;; `jupyter-handle-message'.
;;
;; The start and stop channel methods actually start and stop a channel's
;; socket in the current Emacs instance. What they should do is start and stop
;; a channel in a client's ioloop subprocess. A client's ioloop is available to
;; a channel since any channels initialized through
;; `jupyter-initialize-connection' have their parent-instance slot (from
;; `jupyter-connection') set to the client. So what can be done in the start
;; and stop methods is to check to see if the parent-instance slot is a
;; `jupyter-kernel-client' and if so, send its ioloop a command using
;; `zmq-subprocess-send'.
;;
;; TODO: `jupyter-channel' classes might not even need to be implemented in
;; reality. You could just as easily implement functions called on a client to
;; implement channels. Then the client can hold the recv-queue for each channel
;; and any channel information. This would even be better because then
;; internally to the client you can distinguish between a blocking client and
;; on that uses the ioloop subprocess. If the ioloop subprocess is nil, then
;; the client is blocking.
;;
;; You can do something like
;;
;; (jupyter-get-message client :iopub)
;;
;; To get a message from the IOPub recv-queue or directly from a `jupyter-recv'
;; call based on if the client is blocking or not.
;;; Code: ;;; Code:
(require 'jupyter-connection) (require 'jupyter-connection)
@ -70,121 +39,139 @@
:type keyword :type keyword
:initarg :type :initarg :type
:documentation "The type of this channel. Should be one of :documentation "The type of this channel. Should be one of
the keys in `jupyter-channel-socket-types', excluding `:hb' the keys in `jupyter-channel-socket-types'.")
which corresponds to the heartbeat channel and is handled
differently than the other channels. See `jupyter-hb-channel'.")
(endpoint (endpoint
:type string :type string
:initarg :endpoint :initarg :endpoint
:documentation "The endpoint this channel is connected to. :documentation "The endpoint this channel is connected to.
Typical endpoints look like \"tcp://127.0.0.1:5555\".") Typical endpoints look like \"tcp://127.0.0.1:5555\"."))
(socket :abstract t)
(defclass jupyter-sync-channel (jupyter-channel)
((socket
:type (or null zmq-socket) :type (or null zmq-socket)
:initform nil :initform nil
:documentation "The socket this channel uses to communicate :documentation "The socket used for communicating with the kernel.")))
with the kernel.")
(defclass jupyter-async-channel (jupyter-channel)
((ioloop
:type (or null process)
:initform nil
:documentation "The process responsible for sending and
receiving messages on this channel.")
(recv-queue (recv-queue
:type ring :type ring
:initform (make-ring 10) :initform (make-ring 10))
:documentation "A queue of messages received on this channel (status
that are waiting to be processed.")) :type symbol
:abstract t :initform 'stopped)))
:documentation "A base class for channels used by `jupyter'.")
(defclass jupyter-iopub-channel (jupyter-channel) (cl-defgeneric jupyter-start-channel ((channel jupyter-channel) &key identity)
((type :initform :iopub)) "Start a Jupyter CHANNEL using IDENTITY as the routing ID.")
:documentation "A base class for iopub channels.")
(defclass jupyter-stdin-channel (jupyter-channel) (cl-defmethod jupyter-start-channel ((channel jupyter-async-channel) &key identity)
((type :initform :stdin)) ;; TODO: In an IOLoop actually start the channel by sending it the endpoint
:documentation "A base class for stdin channels.") ;; and identity. Currently the IOLoop is assumed to have this information.
;;
(defclass jupyter-shell-channel (jupyter-channel) ;; TODO: Define a mechanism to attach a callback for each type of command in
((type :initform :shell)) ;; an IOLoop so that the IOLoop filter is not responsible for setting the
:documentation "A base class for shell channels.") ;; status slot of a channel. Look how python implements event loops.
(defclass jupyter-control-channel (jupyter-channel)
((type :initform :control))
:documentation "A base class for control channels.")
(cl-defmethod jupyter-start-channel ((channel jupyter-channel) &key identity)
"Start a CHANNEL.
If IDENTITY is non-nil, it is used as the ROUTING_ID of the
underlying channel's socket."
(unless (jupyter-channel-alive-p channel) (unless (jupyter-channel-alive-p channel)
(let ((sock (jupyter-connect-channel (zmq-subprocess-send (oref channel ioloop)
(list 'start-channel (oref channel type)))))
(cl-defmethod jupyter-start-channel ((channel jupyter-sync-channel) &key identity)
(unless (jupyter-channel-alive-p channel)
(let ((socket (jupyter-connect-channel
(oref channel type) (oref channel endpoint) identity))) (oref channel type) (oref channel endpoint) identity)))
(oset channel socket sock)))) (oset channel socket socket)
(cl-case (oref channel type)
(:iopub
(zmq-socket-set socket zmq-SUBSCRIBE ""))))))
(cl-defmethod jupyter-start-channel ((channel jupyter-iopub-channel) &key _identity) (cl-defgeneric jupyter-stop-channel ((channel jupyter-channel))
"Start an iopub CHANNEL subscribed to all messages. "Stop a Jupyter CHANNEL.")
If IDENTITY is non-nil, it is used as the ROUTING_ID of the
underlying channel's socket."
(when (cl-call-next-method)
(zmq-socket-set (oref channel socket) zmq-SUBSCRIBE "")))
(cl-defmethod jupyter-stop-channel ((channel jupyter-channel)) (cl-defmethod jupyter-stop-channel ((channel jupyter-sync-channel))
"Stop a CHANNEL.
The underlying socket's LINGER property is set to 0, the socket
is closed, the channel's socket property is set to nil, and any
pending messages in the channels recv-queue are removed. Note
that `jupyter-channel-alive-p' on the CHANNEL will return nil
after a call to this function."
(when (jupyter-channel-alive-p channel) (when (jupyter-channel-alive-p channel)
(let ((sock (oref channel socket))) (condition-case nil
(zmq-socket-set sock zmq-LINGER 0) (zmq-close (oref channel socket))
(zmq-close sock) (zmq-ENOENT nil))
(cl-loop (oset channel socket nil)))
with ring = (oref channel recv-queue)
repeat (ring-length ring) do (ring-remove ring))
(oset channel socket nil))))
(cl-defmethod jupyter-channel-alive-p ((channel jupyter-channel)) (cl-defmethod jupyter-stop-channel ((channel jupyter-async-channel))
"Return non-nil if CHANNEL is alive. (when (jupyter-channel-alive-p channel)
A channel is alive if its socket property is bound to a (zmq-subprocess-send (oref channel ioloop)
`zmq-socket'." (list 'stop-channel (oref channel type)))))
(and (slot-boundp channel 'socket)
(not (null (oref channel socket)))))
(cl-defmethod jupyter-queue-message ((channel jupyter-channel) msg) (cl-defgeneric jupyter-get-message ((channel jupyter-channel) &rest _args)
"Add a message to a CHANNEL's recieve queue. "Receive a message on CHANNEL.")
(cl-defmethod jupyter-get-message ((channel jupyter-sync-channel))
"Block until a message is received on CHANNEL.
Return the received message."
(cl-destructuring-bind (_idents . msg)
(jupyter-recv channel)
msg))
(cl-defmethod jupyter-get-message ((channel jupyter-async-channel) &optional timeout)
"Get a message from CHANNEL's recv-queue.
If no message is available, return nil. Otherwise return the
oldest message in CHANNEL's recv-queue. If TIMEOUT is non-nil,
wait until TIMEOUT for a message."
(let ((idents-msg (jupyter-recv channel timeout)))
(when idents-msg
(cl-destructuring-bind (_idents . msg)
idents-msg
msg))))
(cl-defmethod jupyter-send ((channel jupyter-async-channel) type message)
(zmq-subprocess-send (oref channel ioloop)
(list 'send (oref channel type) type message)))
(cl-defmethod jupyter-send ((channel jupyter-sync-channel) type message)
(jupyter-send (oref channel session) (oref channel socket) type message))
(cl-defmethod jupyter-recv ((channel jupyter-sync-channel))
(jupyter-recv (oref channel session) (oref channel socket)))
(cl-defmethod jupyter-recv ((channel jupyter-async-channel) &optional timeout)
(let ((ring (oref channel recv-queue)))
(when timeout
(with-timeout (timeout
(error "Message not received on channel within timeout"))
(while (ring-empty-p ring)
(sleep-for 0.01))))
(unless (ring-empty-p ring)
(ring-remove ring))))
(cl-defgeneric jupyter-queue-message ((channel jupyter-async-channel) msg)
"Queue MSG in CHANNEL's recv-queue.
MSG is a cons pair (IDENTS . MSG) which will be added to the MSG is a cons pair (IDENTS . MSG) which will be added to the
recv-queue slot of CHANNEL. To receive a message from the channel recv-queue slot of CHANNEL. To receive a message from the channel
call `jupyter-get-message'." call `jupyter-get-message'.")
(cl-defmethod jupyter-queue-message ((channel jupyter-async-channel) msg)
"Queue MSG in CHANNEL's recv-queue."
(let ((ring (oref channel recv-queue))) (let ((ring (oref channel recv-queue)))
(ring-insert+extend ring msg 'grow))) (ring-insert+extend ring msg 'grow)))
(cl-defmethod jupyter-get-message ((channel jupyter-channel)) (cl-defgeneric jupyter-channel-alive-p ((channel jupyter-channel))
"Get a message from CHANNEL's recv-queue. "Determine if a CHANNEL is alive.")
If messages are available in a channel's recv-queue, return the
oldest message. Otherwise if no messages are available, return
nil."
(when (jupyter-messages-available-p channel)
(cl-destructuring-bind (_idents . msg)
(ring-remove (oref channel recv-queue))
msg)))
(cl-defmethod jupyter-messages-available-p ((channel jupyter-channel)) (cl-defmethod jupyter-channel-alive-p ((channel jupyter-sync-channel))
"Determine if CHANNEL has an messages available. (not (null (oref channel socket))))
A CHANNEL has messages available if its recv-queue is not empty."
(not (ring-empty-p (oref channel recv-queue)))) (cl-defmethod jupyter-channel-alive-p ((channel jupyter-async-channel))
(and (oref channel ioloop) (not (eq (oref channel status) 'stopped))))
;;; Heartbeat channel ;;; Heartbeat channel
(defclass jupyter-hb-channel (jupyter-connection) (defclass jupyter-hb-channel (jupyter-sync-channel)
((type ((type
:type keyword :type keyword
:initform :hb :initform :hb
:documentation "The type of this channel is `:hb'.") :documentation "The type of this channel is `:hb'.")
(endpoint
:type string
:initarg :endpoint
:documentation "The endpoint this channel is connected to.
Typical endpoints look like \"tcp://127.0.0.1:5555\".")
(socket
:type (or null zmq-socket)
:initform nil
:documentation "The socket used for communicating with the kernel.")
(time-to-dead (time-to-dead
:type integer :type integer
:initform 1 :initform 1
@ -252,6 +239,9 @@ channel, starts the timer."
(unless (jupyter-channel-alive-p channel) (unless (jupyter-channel-alive-p channel)
(oset channel socket (jupyter-connect-channel (oset channel socket (jupyter-connect-channel
:hb (oref channel endpoint) identity)) :hb (oref channel endpoint) identity))
;; TODO: Do something when the kernel is for sure dead, i.e. when a message
;; has not been received for a certain number of time-to-dead periods. For
;; example run a hook and pause the channel.
(oset channel timer (oset channel timer
(run-with-timer (run-with-timer
0 (oref channel time-to-dead) 0 (oref channel time-to-dead)

View file

@ -51,6 +51,20 @@ would like to inhibit handlers for any new requests. If this is
set to t globally, all new requests will have message handlers set to t globally, all new requests will have message handlers
inhibited.") inhibited.")
;; Define channel classes for method dispatching based on the channel type
(defclass jupyter-shell-channel (jupyter-async-channel)
((type
:initform :shell)))
(defclass jupyter-iopub-channel (jupyter-async-channel)
((type
:initform :iopub)))
(defclass jupyter-stdin-channel (jupyter-async-channel)
((type
:initform :stdin)))
(defclass jupyter-kernel-client (jupyter-connection) (defclass jupyter-kernel-client (jupyter-connection)
((requests ((requests
:type hash-table :type hash-table
@ -96,16 +110,16 @@ buffer.")
:initform nil :initform nil
:initarg :iopub-channel :initarg :iopub-channel
:documentation "The IOPub channel.") :documentation "The IOPub channel.")
(hb-channel
:type (or null jupyter-hb-channel)
:initform nil
:initarg :hb-channel
:documentation "The heartbeat channel.")
(stdin-channel (stdin-channel
:type (or null jupyter-stdin-channel) :type (or null jupyter-stdin-channel)
:initform nil :initform nil
:initarg :stdin-channel :initarg :stdin-channel
:documentation "The stdin channel."))) :documentation "The stdin channel.")
(hb-channel
:type (or null jupyter-hb-channel)
:initform nil
:initarg :hb-channel
:documentation "The heartbeat channel.")))
(cl-defmethod initialize-instance ((client jupyter-kernel-client) &rest _slots) (cl-defmethod initialize-instance ((client jupyter-kernel-client) &rest _slots)
(cl-call-next-method) (cl-call-next-method)
@ -158,19 +172,28 @@ connection is terminated before initializing."
(unless (and (ignore-errors (oref client session)) (unless (and (ignore-errors (oref client session))
(equal (jupyter-session-key (oref client session)) key)) (equal (jupyter-session-key (oref client session)) key))
(oset client session (jupyter-session :key key))) (oset client session (jupyter-session :key key)))
(let ((addr (lambda (port) (format "%s://%s:%d" transport ip port))))
(oset client hb-channel (make-instance
'jupyter-hb-channel
:parent-instance client
:endpoint (funcall addr hb_port)))
(cl-loop (cl-loop
with addr = (concat transport "://" ip)
for (channel . port) in `((stdin-channel . ,stdin_port) for (channel . port) in `((stdin-channel . ,stdin_port)
(shell-channel . ,shell_port) (shell-channel . ,shell_port)
(hb-channel . ,hb_port)
(iopub-channel . ,iopub_port)) (iopub-channel . ,iopub_port))
for class = (intern (concat "jupyter-" (symbol-name channel)))
do (setf (slot-value client channel) do (setf (slot-value client channel)
(make-instance (make-instance
class (cl-case channel
(stdin-channel 'jupyter-stdin-channel)
(shell-channel 'jupyter-shell-channel)
(iopub-channel 'jupyter-iopub-channel)
(otherwise (error "Wrong channel type")))
;; So channels have access to the client's session ;; So channels have access to the client's session
;;
;; See `jupyter-start-channels' for when the :ioloop slot is
;; set
:parent-instance client :parent-instance client
:endpoint (format "%s:%d" addr port))))))) :endpoint (funcall addr port))))))))
;;; Client local variables ;;; Client local variables
@ -227,8 +250,7 @@ this is called."
(cl-defmethod jupyter-send ((client jupyter-kernel-client) (cl-defmethod jupyter-send ((client jupyter-kernel-client)
channel channel
type type
message message)
&optional flags)
"Send a message on CLIENT's CHANNEL. "Send a message on CLIENT's CHANNEL.
Return a `jupyter-request' representing the sent message. CHANNEL Return a `jupyter-request' representing the sent message. CHANNEL
is one of the channel's of CLIENT. TYPE is one of the values in is one of the channel's of CLIENT. TYPE is one of the values in
@ -243,8 +265,7 @@ sent message, see `jupyter-add-callback' and
(signal 'wrong-type-argument (list 'process ioloop 'ioloop))) (signal 'wrong-type-argument (list 'process ioloop 'ioloop)))
(when jupyter--debug (when jupyter--debug
(message "SENDING: %s %s" type message)) (message "SENDING: %s %s" type message))
(zmq-subprocess-send (oref client ioloop) (jupyter-send channel type message)
(list 'send (oref channel type) type message flags))
;; Anything sent to stdin is a reply not a request so don't add it to ;; Anything sent to stdin is a reply not a request so don't add it to
;; `:pending-requests'. ;; `:pending-requests'.
(unless (eq (oref channel type) :stdin) (unless (eq (oref channel type) :stdin)
@ -256,7 +277,7 @@ sent message, see `jupyter-add-callback' and
;;; Channel subprocess (receiving messages) ;;; Channel subprocess (receiving messages)
(defmacro jupyter--ioloop-do-command (session poller channels) (defmacro jupyter--ioloop-do-command (poller channels)
"Read and execute a command from stdin. "Read and execute a command from stdin.
SESSION is a variable bound to a `jupyter-session' object, POLLER SESSION is a variable bound to a `jupyter-session' object, POLLER
is a variable bound to a `zmq-poller' object. and CHANNELS is a is a variable bound to a `zmq-poller' object. and CHANNELS is a
@ -295,20 +316,21 @@ Any other command sent to the subprocess will be ignored."
(cl-case cmd (cl-case cmd
(send (send
(cl-destructuring-bind (ctype . args) args (cl-destructuring-bind (ctype . args) args
(let ((sock (car (rassoc ctype ,channels)))) (let ((channel (cdr (assoc ctype ,channels))))
(zmq-prin1 (zmq-prin1 (list 'sent ctype (apply #'jupyter-send channel args))))))
(list 'sent ctype (apply #'jupyter-send ,session sock args))))))
(start-channel (start-channel
(let ((sock (car (rassoc args ,channels)))) (cl-destructuring-bind (ctype) args
(zmq-connect sock (zmq-socket-get sock zmq-LAST-ENDPOINT)) (let ((channel (cdr (assoc ctype ,channels))))
(zmq-poller-register ,poller sock zmq-POLLIN))) (jupyter-start-channel
channel :identity (jupyter-session-id (oref channel session)))
(zmq-poller-register ,poller (oref channel socket) zmq-POLLIN)
(zmq-prin1 (list 'start-channel ctype)))))
(stop-channel (stop-channel
(let ((sock (car (rassoc args ,channels)))) (cl-destructuring-bind (ctype) args
(zmq-poller-unregister ,poller sock) (let ((channel (cdr (assoc ctype ,channels))))
(condition-case err (zmq-poller-unregister ,poller (oref channel socket))
(zmq-disconnect (jupyter-stop-channel channel)
sock (zmq-socket-get sock zmq-LAST-ENDPOINT)) (zmq-prin1 (list 'stop-channel ctype)))))
(zmq-ENOENT nil))))
(quit (quit
(signal 'quit nil)) (signal 'quit nil))
(otherwise (error "Unhandled command (%s)" cmd))))) (otherwise (error "Unhandled command (%s)" cmd)))))
@ -358,51 +380,6 @@ message that has a channel type with the lower priority."
(if (eq head tail) (setq ,messages (cons elem head)) (if (eq head tail) (setq ,messages (cons elem head))
(setcdr head (cons elem tail))))))) (setcdr head (cons elem tail)))))))
(defmacro jupyter--ioloop-collect-messages
(session poller channels messages priorities timeout)
"Collect messages from kernel.
SESSION, POLLER, CHANNELS, MESSAGES, PRIORITIES, and TIMEOUT
should all be variable names bound to objects with the following
meanings:
SESSION - A `jupyter-session'
POLLER - A `zmq-poller'
CHANNELS - An alist of (SOCK . CTYPE) pairs where sock is a
`zmq-socket' representing a `jupyter-channel' with
type CTYPE.
MESSAGES - A variable in which to store the collected list of
messages during this polling period. If the variable
is already bound to a list, new messages added to it
will be sorted based on the `:date' field of the
Jupyter message. If two messages have the same
`:date', e.g. the fractional seconds resolution is not
high enough, also take into account PRIORITIES.
PRIORITIES - An alist of (CTYPE . PRIORITY) pairs where CTYPE is
a `jupyter-channel' type with PRIORITY, a number. If
one channel has a higher priority than another and
two messages, one from each channel, have the same
`:date' field, the message with the higher channel
priority will have its message come before the
message whose channel has a lower priority in the
sorted order."
`(let ((events (condition-case nil
(zmq-poller-wait-all ,poller (length ,channels) ,timeout)
((zmq-EAGAIN zmq-EINTR zmq-ETIMEDOUT) nil))))
(when (alist-get 0 events)
;; Got input from stdin, do the command it
;; specifies
(setf (alist-get 0 events nil 'remove) nil)
(jupyter--ioloop-do-command ,session ,poller ,channels))
(dolist (sock (mapcar #'car events))
(jupyter--ioloop-queue-message ,messages ,priorities
(cons (alist-get sock channels)
(jupyter-recv ,session sock))))
events))
;; TODO: Make this more debuggable, I've spent hours wondering why I wasn't ;; TODO: Make this more debuggable, I've spent hours wondering why I wasn't
;; receiving messages only to find out (caar elem) should have been (car elem) ;; receiving messages only to find out (caar elem) should have been (car elem)
;; in `jupyter--ioloop-queue-message'. For some reason the `condition-case' in ;; in `jupyter--ioloop-queue-message'. For some reason the `condition-case' in
@ -415,9 +392,8 @@ PRIORITIES - An alist of (CTYPE . PRIORITY) pairs where CTYPE is
;; still alive, then exit the subprocess if the parent process is dead. ;; still alive, then exit the subprocess if the parent process is dead.
(defun jupyter--ioloop (client) (defun jupyter--ioloop (client)
"Return the function used for communicating with CLIENT's kernel." "Return the function used for communicating with CLIENT's kernel."
(let* ((session (oref client session)) (let* ((sid (jupyter-session-id (oref client session)))
(sid (jupyter-session-id session)) (skey (jupyter-session-key (oref client session)))
(skey (jupyter-session-key session))
(iopub-ep (oref (oref client iopub-channel) endpoint)) (iopub-ep (oref (oref client iopub-channel) endpoint))
(shell-ep (oref (oref client shell-channel) endpoint)) (shell-ep (oref (oref client shell-channel) endpoint))
(stdin-ep (oref (oref client stdin-channel) endpoint))) (stdin-ep (oref (oref client stdin-channel) endpoint)))
@ -426,40 +402,74 @@ PRIORITIES - An alist of (CTYPE . PRIORITY) pairs where CTYPE is
(require 'jupyter-channels) (require 'jupyter-channels)
(require 'jupyter-messages) (require 'jupyter-messages)
(let* ((session (jupyter-session :id ,sid :key ,skey)) (let* ((session (jupyter-session :id ,sid :key ,skey))
(iopub (jupyter-connect-channel :iopub ,iopub-ep ,sid)) (iopub (jupyter-sync-channel
(shell (jupyter-connect-channel :shell ,shell-ep ,sid)) :type :iopub
(stdin (jupyter-connect-channel :stdin ,stdin-ep ,sid)) :session session
:endpoint ,iopub-ep))
(shell (jupyter-sync-channel
:type :shell
:session session
:endpoint ,shell-ep))
(stdin (jupyter-sync-channel
:type :stdin
:session session
:endpoint ,stdin-ep))
(priorities '((:shell . 4) (priorities '((:shell . 4)
(:iopub . 2) (:iopub . 2)
(:stdin . 2))) (:stdin . 2)))
(channels `((,stdin . :stdin) (channels `((:stdin . ,stdin)
(,shell . :shell) (:shell . ,shell)
(,iopub . :iopub))) (:iopub . ,iopub)))
(idle-count 0) (idle-count 0)
(timeout 20) (timeout 20)
(messages nil)) (messages nil))
(zmq-socket-set iopub zmq-SUBSCRIBE "")
(condition-case nil (condition-case nil
(with-zmq-poller poller (with-zmq-poller poller
;; Poll for stdin messages ;; Poll for stdin messages
(zmq-poller-register poller 0 zmq-POLLIN) (zmq-poller-register poller 0 zmq-POLLIN)
(mapc (lambda (x) (zmq-poller-register poller (car x) zmq-POLLIN))
channels)
(while t (while t
(if (jupyter--ioloop-collect-messages (let ((events
session poller channels messages priorities timeout) (condition-case nil
(zmq-poller-wait-all poller (1+ (length channels)) timeout)
((zmq-EAGAIN zmq-EINTR zmq-ETIMEDOUT) nil))))
;; Perform a command from stdin
(when (alist-get 0 events)
(setf (alist-get 0 events nil 'remove) nil)
(jupyter--ioloop-do-command poller channels))
;; Queue received messages
(dolist (sock (mapcar #'car events))
(let ((channel
(cdr (cl-find-if
(lambda (c) (eq (oref (cdr c) socket) sock))
channels))))
(jupyter--ioloop-queue-message messages priorities
(cons (oref channel type) (jupyter-recv channel)))))
;; Possibly send queued messages to parent process
(if events
;; When messages have been received, reset idle counter
;; and shorten polling timeout
(setq idle-count 0 timeout 20) (setq idle-count 0 timeout 20)
(setq idle-count (1+ idle-count)) (setq idle-count (1+ idle-count))
;; Lengthen timeout so as to not waste CPU cycles ;; When no messages have been received during this polling
;; period
(when (= idle-count 100) (when (= idle-count 100)
;; If no messages have been received for 100 polling
;; periods, lengthen timeout so as to not waste CPU
;; cycles
(setq timeout 100)) (setq timeout 100))
;; Send queued messages.
;;
;; Pool at least some messages, but not at the cost of ;; Pool at least some messages, but not at the cost of
;; responsiveness. If messages are being blasted at us by the ;; responsiveness. If messages are being blasted at us by
;; kernel ensure that they still get through and not pooled ;; the kernel ensure that they still get through and not
;; indefinately. ;; pooled indefinately.
;;
;; TODO: Drop messages if they are comming too frequently
;; to the point where the parent Emacs process would be
;; spending too much time handling messages.
(when (or (= idle-count 5) (> (length messages) 10)) (when (or (= idle-count 5) (> (length messages) 10))
(while messages (while messages
(zmq-prin1 (cons 'recvd (pop messages)))))))) (zmq-prin1 (cons 'recvd (pop messages)))))))))
(quit (quit
(mapc (lambda (x) (mapc (lambda (x)
(zmq-socket-set (car x) zmq-LINGER 0) (zmq-socket-set (car x) zmq-LINGER 0)
@ -539,6 +549,24 @@ by `jupyter--ioloop'."
(if (not channel) (warn "No handler for channel type (%s)" ctype) (if (not channel) (warn "No handler for channel type (%s)" ctype)
(jupyter-queue-message channel (cons idents msg)) (jupyter-queue-message channel (cons idents msg))
(run-with-timer 0.0001 nil #'jupyter-handle-message client channel)))) (run-with-timer 0.0001 nil #'jupyter-handle-message client channel))))
(`(start-channel ,ctype)
(let ((channel (cl-loop
for c in '(stdin-channel
shell-channel
iopub-channel)
for channel = (slot-value client c)
when (eq (oref channel type) ctype)
return channel)))
(oset channel status 'running)))
(`(stop-channel ,ctype)
(let ((channel (cl-loop
for c in '(stdin-channel
shell-channel
iopub-channel)
for channel = (slot-value client c)
when (eq (oref channel type) ctype)
return channel)))
(oset channel status 'stopped)))
('(quit) ('(quit)
;; Cleanup handled in sentinel ;; Cleanup handled in sentinel
(when jupyter--debug (when jupyter--debug
@ -546,6 +574,17 @@ by `jupyter--ioloop'."
;;; Starting the channel subprocess ;;; Starting the channel subprocess
(defun jupyter--start-ioloop (client)
(unless (oref client ioloop)
(oset client ioloop
(zmq-start-process
(jupyter--ioloop client)
(apply-partially #'jupyter--ioloop-filter client)
(apply-partially #'jupyter--ioloop-sentinel client)
(oref client -buffer)))
;; Allow the subprocess to start
(sleep-for 0.1)))
(cl-defmethod jupyter-start-channels ((client jupyter-kernel-client) (cl-defmethod jupyter-start-channels ((client jupyter-kernel-client)
&key (shell t) &key (shell t)
(iopub t) (iopub t)
@ -565,31 +604,26 @@ In addition to calling `jupyter-start-channel', a subprocess is
created for each channel which monitors the channel's socket for created for each channel which monitors the channel's socket for
input events. Note that this polling subprocess is not created input events. Note that this polling subprocess is not created
for the heartbeat channel." for the heartbeat channel."
(unless (oref client ioloop) (jupyter--start-ioloop client)
;; TODO: Currently there is no way to stop/start a channel individually (when hb
;; outside of this method. Create channel methods which are aware of a (jupyter-start-channel (oref client hb-channel)))
;; client's ioloop so that you can send commands to the ioloop to start and (cl-loop
;; stop a channel. Also figure out a way to block until the ioloop says it for (sym . start) in `((shell-channel . ,shell)
;; has finished with the operation. This may need changes in (iopub-channel . ,iopub)
;; `jupyter--ioloop' (stdin-channel . ,stdin))
(let ((ioloop (zmq-start-process for channel = (slot-value client sym)
(jupyter--ioloop client) do (oset channel ioloop (oref client ioloop))
(apply-partially #'jupyter--ioloop-filter client) and if start do (jupyter-start-channel channel)
(apply-partially #'jupyter--ioloop-sentinel client) (with-timeout (0.5 (error "Channel not started in ioloop subprocess"))
(oref client -buffer)))) (while (not (jupyter-channel-alive-p channel))
(oset client ioloop ioloop) (accept-process-output (oref client ioloop) 0.1)))))
(when hb (jupyter-start-channel (oref client hb-channel)))
(unless shell
(zmq-subprocess-send ioloop '(stop-channel :shell)))
(unless iopub
(zmq-subprocess-send ioloop '(stop-channel :iopub)))
(unless stdin
(zmq-subprocess-send ioloop '(stop-channel :stdin))))))
(cl-defmethod jupyter-stop-channels ((client jupyter-kernel-client)) (cl-defmethod jupyter-stop-channels ((client jupyter-kernel-client))
"Stop any running channels of CLIENT." "Stop any running channels of CLIENT."
(when (oref client hb-channel) (cl-loop
(jupyter-stop-channel (oref client hb-channel))) for sym in '(hb-channel shell-channel iopub-channel stdin-channel)
for channel = (slot-value client sym)
when channel do (jupyter-stop-channel channel))
(let ((ioloop (oref client ioloop))) (let ((ioloop (oref client ioloop)))
(when ioloop (when ioloop
(zmq-subprocess-send ioloop (cons 'quit nil)) (zmq-subprocess-send ioloop (cons 'quit nil))
@ -601,12 +635,9 @@ for the heartbeat channel."
(cl-defmethod jupyter-channels-running-p ((client jupyter-kernel-client)) (cl-defmethod jupyter-channels-running-p ((client jupyter-kernel-client))
"Are any channels of CLIENT running?" "Are any channels of CLIENT running?"
(cl-loop (cl-loop
for channel in '(shell-channel for sym in '(hb-channel shell-channel iopub-channel stdin-channel)
iopub-channel for channel = (slot-value client sym)
hb-channel thereis (jupyter-channel-alive-p channel)))
stdin-channel)
;; FIXME: This does not work with the current implementation of channels
thereis (jupyter-channel-alive-p (slot-value client channel))))
;;; Message callbacks ;;; Message callbacks
@ -765,9 +796,9 @@ are taken:
`jupyter-handle-execute-result', `jupyter-handle-execute-result',
`jupyter-handle-kernel-info-reply', ... `jupyter-handle-kernel-info-reply', ...
- Remove request from client request table when idle message is received" - Remove request from client request table when idle message is received"
(when (jupyter-messages-available-p channel) (let ((msg (jupyter-get-message channel)))
(let* ((msg (jupyter-get-message channel)) (when msg
(pmsg-id (jupyter-message-parent-id msg)) (let* ((pmsg-id (jupyter-message-parent-id msg))
(requests (oref client requests)) (requests (oref client requests))
(req (gethash pmsg-id requests))) (req (gethash pmsg-id requests)))
(if (not req) (if (not req)
@ -781,7 +812,7 @@ are taken:
(jupyter-handle-message channel client req msg)) (jupyter-handle-message channel client req msg))
(when (jupyter-message-status-idle-p msg) (when (jupyter-message-status-idle-p msg)
(setf (jupyter-request-idle-received-p req) t)) (setf (jupyter-request-idle-received-p req) t))
(jupyter--drop-idle-requests client))))))) (jupyter--drop-idle-requests client))))))))
;;; STDIN handlers ;;; STDIN handlers

View file

@ -55,7 +55,7 @@ the kernel is alive.")
:documentation "The local kernel process when the kernel is :documentation "The local kernel process when the kernel is
alive.") alive.")
(control-channel (control-channel
:type (or null jupyter-control-channel) :type (or null jupyter-sync-channel)
:initform nil :initform nil
:documentation "A control channel to make shutdown and :documentation "A control channel to make shutdown and
interrupt requests to the kernel.") interrupt requests to the kernel.")
@ -257,7 +257,8 @@ kernel. Starting a kernel involves the following steps:
channel :identity (jupyter-session-id (oref manager session)))) channel :identity (jupyter-session-id (oref manager session))))
(let ((conn-info (oref manager conn-info))) (let ((conn-info (oref manager conn-info)))
(oset manager control-channel (oset manager control-channel
(jupyter-control-channel (jupyter-sync-channel
:type :control
:endpoint (format "%s://%s:%d" :endpoint (format "%s://%s:%d"
(plist-get conn-info :transport) (plist-get conn-info :transport)
(plist-get conn-info :ip) (plist-get conn-info :ip)