[WIP] Move all socket communication to a subprocess

With this new implementation, all communication between the kernel and the
client happens in a subprocess. When the client would like to send a message,
the parent emacs process generates the required plist and sends it to the
subprocess for encoding and sending to the kernel. When a message is received,
the subprocess decodes it and prints it to the pipe for the parent emacs
process to read.

This implementation also introduces the use of futures to avoid having to wait
for subprocess output when sending a message to the kernel. Every
`jupyter-request-*` function now returns a primitive future object which is
just a cons cell with the `car` equal to `:jupyter-future`. When the `cdr` of
the future is non-nil, then it is the message ID of the sent request. This acts
as a check to ensure that the message ID is available from the future object,
if the `cdr` is nil the ID is not available, but if the `cdr` is non-nil then
it is the message ID. The convenience function `jupyter-ensure-id` ensures that
the message ID is available and returns the ID.

The future acts as a stand in for the message ID of the encoded request which
will be retrieved from the subprocess once the message has been encoded and
sent to the kernel. This future object is meant to be passed to
`jupyter-add-receive-callback` and other related functions the same way as an
actual message id.
This commit is contained in:
Nathaniel Nicandro 2017-12-17 02:39:16 -06:00
parent 805255e816
commit 1e246ee480
2 changed files with 164 additions and 69 deletions

View file

@ -37,8 +37,8 @@
the fact that the message has been sent. So if there is a
non-nil value for a message ID it means that a message has been
sent and the client is expecting a reply from the kernel.")
(channel-timers
:type (or null (list-of timer))
(ioloop
:type (or null process)
:initform nil
:documentation "The process which polls for events on all
live channels of the client.")
@ -124,12 +124,6 @@ in the jupyter runtime directory."
;;; Lower level sending/receiving
(defun jupyter--channel-readable-p (channel)
(and channel
(/= (logand (zmq-socket-get (oref channel socket) zmq-EVENTS)
zmq-POLLIN)
0)))
(cl-defmethod jupyter--send-encoded ((client jupyter-kernel-client)
channel
type
@ -140,39 +134,127 @@ The message should have a TYPE as found in the jupyter messaging
protocol. Optional variable FLAGS are the flags sent to the
underlying `zmq-send-multipart' call using the CHANNEL's socket."
(declare (indent 1))
(unless (jupyter-channel-alive-p channel)
(error "Channel not alive: %s" (oref channel type)))
(cl-destructuring-bind (msg-id . msg)
(jupyter--encode-message (oref client session) type :content message)
;; TODO: Check for EAGAIN and reschedule the message for sending
(zmq-send-multipart (oref channel socket) msg flags)
;; stdin messages do not expect a reply
(unless (eq (oref channel type) :stdin)
;; indicate that this message is expecting a reply
(puthash msg-id t (oref client message-callbacks)))
msg-id))
(let* ((ioloop (oref client ioloop))
(ring (or (process-get ioloop :jupyter-pending-replies)
(let ((ring (make-ring 10)))
(process-put ioloop :jupyter-pending-replies
ring)
ring)))
(future (cons :jupyter-future nil)))
(zmq-subprocess-send (oref client ioloop)
(list 'send (oref channel type) type message flags))
(ring-insert+extend ring future 'grow)
future))
;; TODO: Maybe instead of decoding the message directly, use `apply-partially'
;; to delay decoding until the message is actually handled registered with
;; `jupyter-add-receive-callback' or in some subclass.
(cl-defmethod jupyter--recv-decoded ((client jupyter-kernel-client) channel &optional flags)
(cl-destructuring-bind (idents . parts)
(jupyter--split-identities
(zmq-recv-multipart (oref channel socket) flags))
(cons idents (jupyter--decode-message (oref client session) parts))))
(defun jupyter--queue-message (client channel)
"Queue a message to be processed for CLIENT's CHANNEL."
(when (jupyter--channel-readable-p channel)
(let* ((ring (oref channel recv-queue)))
;; TODO: How many messages does ZMQ store in its internal buffers before it
;; starts droping messages? And what socket option can be examined to
;; figure this out?
(unless (= (ring-length ring) (ring-size ring))
(let* ((res (jupyter--recv-decoded client channel)))
(ring-insert ring res))
(run-with-timer 0.01 nil #'jupyter--handle-message client channel)))))
(defun jupyter--ioloop (client)
(let ((iopub-channel (oref client iopub-channel))
(shell-channel (oref client shell-channel))
(stdin-channel (oref client stdin-channel))
(control-channel (oref client control-channel)))
`(lambda (ctx)
(require 'jupyter-channels ,(locate-library "jupyter-channels"))
(require 'jupyter-messages ,(locate-library "jupyter-messages"))
;; We can splice the session object because it contains primitive types
(let* ((session ,(oref client session))
(iopub
(let ((sock (jupyter-connect-channel
:iopub ,(oref (oref client iopub-channel) endpoint)
(jupyter-session-id session))))
(zmq-socket-set sock zmq-SUBSCRIBE "")
sock))
(shell
(jupyter-connect-channel
:shell ,(oref (oref client shell-channel) endpoint)
(jupyter-session-id session)))
(stdin
(jupyter-connect-channel
:stdin ,(oref (oref client stdin-channel) endpoint)
(jupyter-session-id session)))
(control
(jupyter-connect-channel
:control ,(oref (oref client control-channel) endpoint)
(jupyter-session-id session)))
;; NOTE: Order matters here since when multiple events arrive for
;; different channels they will be processed in this order.
(channels (list (cons control :control)
(cons stdin :stdin)
(cons shell :shell)
(cons iopub :iopub))))
(with-zmq-poller
;; Also poll for standard-in events to be able to read commands from
;; the parent emacs process without blocking
(zmq-poller-register (current-zmq-poller) 0 zmq-POLLIN)
(mapc (lambda (x) (zmq-poller-register (current-zmq-poller)
(car x)
zmq-POLLIN))
channels)
(while t
;; TODO: Dynamic polling period, if the rate of received events is
;; high, reduce the period. If the rate of received events is low
;; increase it. Sample the rate in a time window that spans
;; multiple polling periods. Polling at 10 ms periods was causing a
;; pretty sizable portion of CPU time to be eaten up.
(let ((events (zmq-poller-wait-all (current-zmq-poller) 5 20)))
(cl-loop
for (sock . event) in events
if (integerp sock) do
(cl-destructuring-bind (cmd . data) (zmq-subprocess-read)
(cl-case cmd
;; (stop-channel
;; (let* ((type data))
;; (cl-destructuring-bind (sock . channel)
;; (cl-find-if (lambda (x) (eq type (oref (cdr x) type))) channels)
;; (zmq-poller-unregister (current-zmq-poller) sock)
;; (jupyter-stop-channel channel))))
;; (start-channel
;; (let* ((elem (assoc data channels))
;; (sock (cadr elem))
;; (endpoint (cddr elem)))
;; (zmq-connect sock endpoint)
;; (zmq-poller-register (current-zmq-poller) sock zmq-POLLIN)))
(send
(cl-destructuring-bind (ctype . rest) data
(zmq-prin1
(cons 'sent
(cons
ctype
(apply #'jupyter--send-encoded session
(car (rassoc ctype channels))
rest))))))))
else do
(zmq-prin1
(cons 'recvd
(cons
(cdr (assoc sock channels))
(jupyter--recv-decoded session sock))))))))))))
(defun jupyter--ioloop-filter (client event)
(cl-destructuring-bind (ctype . data) (cdr event)
(cl-case (car event)
;; data = sent message id
(sent
(let* ((ring (process-get (oref client ioloop) :jupyter-pending-replies))
(future (ring-remove ring)))
(setcdr future data)
(unless (eq ctype :stdin)
;; indicate that this message is expecting a reply
(puthash data t (oref client message-callbacks)))))
;; data = (idents . msg)
(recvd
(let* ((channel (cl-find-if
(lambda (c) (eq (oref c type) ctype))
(mapcar (lambda (x) (eieio-oref client x))
'(stdin-channel
shell-channel
control-channel
iopub-channel))))
(ring (oref channel recv-queue)))
(if (= (ring-length ring) (ring-size ring))
;; Try to process at a later time when the recv-queue is full
(run-with-timer 0.05 nil #'jupyter--ioloop-filter client event)
(ring-insert ring data)
(run-with-timer
0.01 nil #'jupyter--handle-message client channel)))))))
(cl-defmethod jupyter-start-channels ((client jupyter-kernel-client)
&key (shell t)
(iopub t)
@ -193,39 +275,17 @@ In addition to calling `jupyter-start-channel', a subprocess is
created for each channel which monitors the channel's socket for
input events. Note that this polling subprocess is not created
for the heartbeat channel."
(let ((timers (cl-loop
with channel = nil
;; NOTE: The order determines the order in which messages are
;; processed when a message can be read from multiple channels.
for (cname . start) in (list
(cons 'control-channel control)
(cons 'stdin-channel stdin)
(cons 'iopub-channel iopub)
(cons 'shell-channel shell)
(cons 'hb-channel hb))
when start
do (setq channel (eieio-oref client cname))
and unless (jupyter-channel-alive-p channel)
do (jupyter-start-channel
channel :identity (jupyter-session-id
(oref client session)))
and unless (eq (oref channel type) :hb)
collect (run-with-timer 0 0.01 #'jupyter--queue-message client channel))))
(oset client channel-timers timers)))
(oset client ioloop
(zmq-start-process
(jupyter--ioloop client)
(apply-partially #'jupyter--ioloop-filter client))))
(cl-defmethod jupyter-stop-channels ((client jupyter-kernel-client))
"Stop any running channels of CLIENT."
(cl-loop
for channel in (mapcar (lambda (c) (eieio-oref client c))
(list 'shell-channel
'iopub-channel
'hb-channel
'control-channel
'stdin-channel))
when (jupyter-channel-alive-p channel)
do (jupyter-stop-channel channel))
(mapc #'cancel-timer (oref client channel-timers))
(oset client channel-timers nil))
;; TODO: Better cleanup
(delete-process (oref client ioloop))
(kill-buffer (process-buffer (oref client ioloop)))
(oset client ioloop nil))
(cl-defmethod jupyter-channels-running-p ((client jupyter-kernel-client))
"Are any channels of CLIENT alive?"
@ -256,6 +316,15 @@ for the heartbeat channel."
(remhash pmsg-id message-callbacks))
cb))
(defun jupyter-ensure-id (msg-id)
(cond
((stringp msg-id) msg-id)
((and (consp msg-id) (eq (car msg-id) :jupyter-future))
(while (null (cdr msg-id))
(sleep-for 0 1))
(cdr msg-id))
(t (error "Invalid message ID %s" msg-id))))
(defun jupyter-add-receive-callback (client msg-type msg-id function)
"Add FUNCTION to run when receiving a message reply.
@ -277,6 +346,8 @@ from the kernel without any processing done to it."
(let ((mt (plist-get jupyter--received-message-types msg-type)))
(if mt (setq msg-type mt)
(error "Not a valid message type (`%s')" msg-type)))
;; Ensure that the message ID is ready
(setq msg-id (jupyter-ensure-id msg-id))
(let* ((message-callbacks (oref client message-callbacks))
(callbacks (gethash msg-id message-callbacks)))
;; If a message is sent with MSG-ID, then its entry in message-callbacks is

View file

@ -168,6 +168,30 @@ in this plist, an error is thrown.")
:content (jupyter--decode-string content)
:buffers buffers))))
;;; Sending/receiving
(cl-defmethod jupyter--send-encoded ((session jupyter-session)
socket
type
message
&optional flags)
"Encode MESSAGE and send it on CLIENT's CHANNEL.
The message should have a TYPE as found in the jupyter messaging
protocol. Optional variable FLAGS are the flags sent to the
underlying `zmq-send-multipart' call using the CHANNEL's socket."
(declare (indent 1))
(cl-destructuring-bind (msg-id . msg)
(jupyter--encode-message session type :content message)
;; TODO: Check for EAGAIN and reschedule the message for sending
(zmq-send-multipart socket msg flags)
msg-id))
(cl-defmethod jupyter--recv-decoded ((session jupyter-session) socket &optional flags)
(cl-destructuring-bind (idents . parts)
(jupyter--split-identities
(zmq-recv-multipart socket flags))
(cons idents (jupyter--decode-message session parts))))
;;; stdin messages
(cl-defun jupyter-input-reply (&key value)