Avoid delays during message processing

- Allow specifying a msg-id before a call to `jupyter-send`

  - This avoid sending a message to the browser displaying the widgets on every
    message send to the kernel. The previous implementation generated a new ID
    without allowing the caller to pass one in.

- Simplify message polling by sending received messages from the kernel to the
  parent Emacs process at the moment of arrival.
This commit is contained in:
Nathaniel Nicandro 2018-05-25 02:07:47 -05:00
parent 276e2a0668
commit 554e519bf0
3 changed files with 41 additions and 52 deletions

View file

@ -136,12 +136,12 @@ wait until TIMEOUT for a message."
idents-msg
msg))))
(cl-defmethod jupyter-send ((channel jupyter-async-channel) type message)
(cl-defmethod jupyter-send ((channel jupyter-async-channel) type message &optional msg-id)
(zmq-subprocess-send (oref channel ioloop)
(list 'send (oref channel type) type message)))
(list 'send (oref channel type) type message msg-id)))
(cl-defmethod jupyter-send ((channel jupyter-sync-channel) type message)
(jupyter-send (oref channel session) (oref channel socket) type message))
(cl-defmethod jupyter-send ((channel jupyter-sync-channel) type message &optional msg-id)
(jupyter-send (oref channel session) (oref channel socket) type message msg-id))
(cl-defmethod jupyter-recv ((channel jupyter-sync-channel))
(jupyter-recv (oref channel session) (oref channel socket)))

View file

@ -289,7 +289,8 @@ this is called."
(cl-defmethod jupyter-send ((client jupyter-kernel-client)
channel
type
message)
message
&optional msg-id)
"Send a message on CLIENT's CHANNEL.
Return a `jupyter-request' representing the sent message. CHANNEL
is one of the channel's of CLIENT. TYPE is one of the
@ -309,14 +310,16 @@ response to the sent message, see `jupyter-add-callback' and
do (error "Not a valid message type (`%s')" msg-type)))
(when jupyter--debug
(message "SENDING: %s %s" type message))
(jupyter-send channel type message)
;; Anything sent to stdin is a reply not a request so don't add it to
;; `:pending-requests'.
(unless (eq (oref channel type) :stdin)
(let ((req (make-jupyter-request
:inhibited-handlers jupyter-inhibit-handlers)))
(jupyter--ioloop-push-request client req)
req))))
(let ((msg-id (or msg-id (jupyter-new-uuid))))
(jupyter-send channel type message msg-id)
;; Anything sent to stdin is a reply not a request so don't add it to
;; `:pending-requests'.
(unless (eq (oref channel type) :stdin)
(let ((req (make-jupyter-request
:-id msg-id
:inhibited-handlers jupyter-inhibit-handlers)))
(jupyter--ioloop-push-request client req)
req)))))
;;; Channel subprocess (receiving messages)
@ -406,8 +409,6 @@ Any other command sent to the subprocess will be ignored."
(channels `((:stdin . ,stdin)
(:shell . ,shell)
(:iopub . ,iopub)))
(idle-count 0)
(timeout 20)
(messages nil))
(condition-case nil
(let ((poller (zmq-poller)))
@ -417,7 +418,7 @@ Any other command sent to the subprocess will be ignored."
(while t
(let ((events
(condition-case nil
(zmq-poller-wait-all poller (1+ (length channels)) timeout)
(zmq-poller-wait-all poller (1+ (length channels)) -1)
((zmq-EAGAIN zmq-EINTR zmq-ETIMEDOUT) nil))))
;; Perform a command from stdin
(when (alist-get 0 events)
@ -428,34 +429,12 @@ Any other command sent to the subprocess will be ignored."
(cl-destructuring-bind (type . channel) type-channel
(when (zmq-assoc (oref channel socket) events)
(push (cons type (jupyter-recv channel)) messages))))
(if events
;; When messages have been received, reset idle counter
;; and shorten polling timeout
(setq idle-count 0 timeout 20)
(setq idle-count (1+ idle-count))
(when (= idle-count 100)
;; If no messages have been received for 100 polling
;; periods, lengthen timeout so as to not waste CPU
;; cycles
(setq timeout 5000))
;; Send queued messages.
;;
;; Pool at least some messages, but not at the cost of
;; responsiveness. If messages are being blasted at us by
;; the kernel ensure that they still get through and not
;; pooled indefinately.
;;
;; TODO: Drop messages if they are comming too frequently
;; to the point where the parent Emacs process would be
;; spending too much time handling messages. Or better
;; yet, reduce the rate at which messages are being sent
;; to the parent process.
(when (and messages (or (= idle-count 5)
(> (length messages) 10)))
(setq messages (nreverse messages))
(while messages
(prin1 (cons 'recvd (pop messages))))
(zmq-flush 'stdout))))))
;; TODO: Throttle messages if they are coming in too hot
(when messages
(setq messages (nreverse messages))
(while messages
(prin1 (cons 'recvd (pop messages))))
(zmq-flush 'stdout)))))
(quit
(mapc #'jupyter-stop-channel (mapcar #'cdr channels))
(zmq-prin1 '(quit))))))))
@ -539,7 +518,8 @@ by `jupyter--ioloop'."
;; Anything sent on stdin is a reply and therefore never added to
;; `:pending-requests'
(let ((req (jupyter--ioloop-pop-request client)))
(setf (jupyter-request--id req) msg-id)
(cl-assert (equal (jupyter-request-id req) msg-id)
nil "Message request sent out of order to the kernel.")
(puthash msg-id req (oref client requests)))))
(`(recvd ,ctype ,idents . ,msg)
(when jupyter--debug

View file

@ -70,9 +70,14 @@
(cons idents parts)
(error "Message delimiter not in message list"))))
(defun jupyter--message-header (session msg-type)
(defun jupyter--message-header (session msg-type &optional msg-id)
"Return a message header.
The `:session' key of the header will have its value set to
SESSION's ID, and its `:msg_type' will be set to MSG-TYPE. The
other fields are `:msg_id', `:version', `:username', and `:date'.
They are all set to appropriate default values."
(list
:msg_id (jupyter-new-uuid)
:msg_id (or msg-id (jupyter-new-uuid))
:msg_type msg-type
:version jupyter-protocol-version
:username user-login-name
@ -138,6 +143,7 @@
type
&key idents
content
msg-id
parent-header
metadata
buffers)
@ -146,7 +152,7 @@
(cl-check-type metadata json-plist)
(cl-check-type content json-plist)
(cl-check-type buffers list)
(let* ((header (jupyter--message-header session type))
(let* ((header (jupyter--message-header session type msg-id))
(msg-id (plist-get header :msg_id))
(parts (mapcar #'jupyter--encode (list header
parent-header
@ -190,11 +196,14 @@
socket
type
message
&optional flags)
&optional
msg-id
flags)
(declare (indent 1))
(cl-destructuring-bind (msg-id . msg)
(jupyter--encode-message session type :content message)
(prog1 msg-id
(cl-destructuring-bind (id . msg)
(jupyter--encode-message session type
:msg-id msg-id :content message)
(prog1 id
(zmq-send-multipart socket msg flags))))
(cl-defmethod jupyter-recv ((session jupyter-session) socket &optional flags)