Introduce jupyter-request and jupyter-callback types

`jupyter-request` encapsulates a request ID, request callbacks, and a flag
variable which tells you if the kernel has sent an idle message for this
request.

`jupyter-callback` encapsulates a callback function and a flag variable
determining if the callback has run at least once. The `callbacks` field of a
`jupyter-request` is an alist mapping reply types to `jupyter-callback`
objects.

Whenever a message is sent to the kernel a new `jupyter-request` object is
created and returned from one of the `jupyter-request-*` methods. This object
holds all the required information to track a message that the kernel is
handling. When a message is received from the kernel, the client checks its
`requests` hash table for the `jupyter-request` object associated with the
message and runs any callbacks for the message and updates the flag variables
of the request and callback if necessary.

The request is considered complete and removed from the `requests` hash table
when an idle message has been received for the request and all callbacks have
run at least once. Note that this is almost surely does not handle all cases
since there may be situations where you would like a callback to run multiple
times while an idle message has already been sent.
This commit is contained in:
Nathaniel Nicandro 2017-12-19 18:02:55 -06:00
parent 3b7b6efaaa
commit 0f84618914

View file

@ -17,15 +17,8 @@
kernel was started by this client.")
;; TODO: Better name, message-requests?
(message-callbacks
((requests
:type hash-table
;; Callbacks are removed once the status for a request is idle so no need
;; for a weak table here.
;;
;; FIXME: Take into account never receiving an idle status message. This
;; could happen when messages get dropped or you lose connection to a
;; kernel. If a connection is lost, it doesn't mean that we won't receive
;; previous requests since the IOPub channel broadcasts messages for every
;; client.
:initform (make-hash-table :test 'equal)
:documentation "A hash table with message ID's as keys. This
is used to register callback functions to run once a reply from
@ -135,16 +128,19 @@ protocol. Optional variable FLAGS are the flags sent to the
underlying `zmq-send-multipart' call using the CHANNEL's socket."
(declare (indent 1))
(let* ((ioloop (oref client ioloop))
(ring (or (process-get ioloop :jupyter-pending-replies)
(ring (or (process-get ioloop :jupyter-pending-requests)
(let ((ring (make-ring 10)))
(process-put ioloop :jupyter-pending-replies
(process-put ioloop :jupyter-pending-requests
ring)
ring)))
(future (cons :jupyter-future nil)))
ring))))
(zmq-subprocess-send (oref client ioloop)
(list 'send (oref channel type) type message flags))
(ring-insert+extend ring future 'grow)
future))
;; Anything sent to stdin is a reply not a request so don't add it to
;; :jupyter-pending-requests.
(unless (eq (oref channel type) :stdin)
(let ((req (make-jupyter-request)))
(ring-insert+extend ring req 'grow)
req))))
(defun jupyter--ioloop (client)
(let ((iopub-channel (oref client iopub-channel))
@ -254,12 +250,14 @@ underlying `zmq-send-multipart' call using the CHANNEL's socket."
(cl-case (car event)
;; data = sent message id
(sent
(let* ((ring (process-get (oref client ioloop) :jupyter-pending-replies))
(future (ring-remove ring)))
(setcdr future data)
(unless (eq ctype :stdin)
;; indicate that this message is expecting a reply
(puthash data t (oref client message-callbacks)))))
;; Anything sent on stdin is a reply and therefore never added to
;; :jupyter-pending-requests
(unless (eq ctype :stdin)
(let* ((ring (process-get
(oref client ioloop) :jupyter-pending-requests))
(req (ring-remove ring)))
(setf (jupyter-request--id req) data)
(puthash data req (oref client requests)))))
;; data = (idents . msg)
(recvd
(let* ((channel (cl-find-if
@ -325,44 +323,39 @@ for the heartbeat channel."
;;; Message callbacks
(defun jupyter--callback-for-message (client msg)
(let* ((message-callbacks (oref client message-callbacks))
(pmsg-id (jupyter-message-parent-id msg))
(callbacks (gethash pmsg-id message-callbacks))
(all-type-cb nil)
(cb nil))
(when (and callbacks (not (eq callbacks t)))
(setq cb (cdr (assoc (jupyter-message-type msg) callbacks))
;; TODO: Better name
all-type-cb (cdr (assoc t callbacks))))
(if (and cb all-type-cb)
(setq cb (lexical-let ((f1 cb)
(f2 all-type-cb))
(lambda (msg)
(funcall f1 msg)
(funcall f2 msg)
msg)))
(when all-type-cb
(setq cb all-type-cb)))
;; Remove callbacks once status is idle for request PMSG-ID
;;
;; Changed in version 5.0: Busy and idle messages should be sent
;; before/after handling every request, not just execution.
;; -- http://jupyter-client.readthedocs.io/en/latest/messaging.html#kernel-status
(when (jupyter-message-status-idle-p msg)
(remhash pmsg-id message-callbacks))
cb))
;; A `jupyter-request' object represents the status of a request to the kernel
;; and holds all the information required to process the messages associated
;; with the request. Whenever a message arrives that is associated with a
;; request's `jupyter-request-id', any callbacks associated with the message
;; type are run (see `jupyter-add-receive-callback'). When a request's
;; `jupyter-idle-received-p' property is non-nil, then it signifies that the
;; request has been handled by the kernel.
(cl-defstruct jupyter-request
(-id)
(idle-received-p)
(callbacks))
(defun jupyter-ensure-id (msg-id)
(cond
((stringp msg-id) msg-id)
((and (consp msg-id) (eq (car msg-id) :jupyter-future))
(while (null (cdr msg-id))
(sleep-for 0 1))
(cdr msg-id))
(t (error "Invalid message ID %s" msg-id))))
(cl-defstruct jupyter-callback
(ran-p nil)
(cb nil))
(defun jupyter-add-receive-callback (client msg-type msg-id function)
(defun jupyter-request-id (req)
(while (null (jupyter-request--id req))
(sleep-for 0 1))
(jupyter-request--id req))
(defun jupyter--run-callbacks-for-message (req msg)
(when req
(let* ((callbacks (jupyter-request-callbacks req))
(cbt (cdr (assoc t callbacks)))
(cb (cdr (assoc (jupyter-message-type msg) callbacks))))
(cl-loop
for cb in (list cb cbt)
when cb do
(funcall (jupyter-callback-cb cb) msg)
(setf (jupyter-callback-ran-p cb) t)))))
(defun jupyter-add-receive-callback (client msg-type req function)
"Add FUNCTION to run when receiving a message reply.
The function will be run when CLIENT receives a reply message
@ -386,20 +379,17 @@ from the kernel without any processing done to it."
;; msg-id
(unless (eq msg-type t)
(error "Not a valid received message type (`%s')" msg-type))))
;; Ensure that the message ID is ready
(setq msg-id (jupyter-ensure-id msg-id))
(let* ((message-callbacks (oref client message-callbacks))
(callbacks (gethash msg-id message-callbacks)))
;; If a message is sent with MSG-ID, then its entry in message-callbacks is
;; either t or an alist of callbacks.
(if (null callbacks) (error "Invalid message ID or message has already been received.")
(if (eq callbacks t)
(puthash msg-id (list (cons msg-type function)) message-callbacks)
(let ((cb-for-type (assoc msg-type callbacks)))
(if cb-for-type (setcdr cb-for-type function)
(nconc callbacks (list (cons msg-type function)))))))))
(if (jupyter-request-idle-received-p req)
(error "Request already received idle message.")
(let ((callbacks (jupyter-request-callbacks req))
(cb (make-jupyter-callback :cb function)))
(if (null callbacks)
(setf (jupyter-request-callbacks req) (list (cons msg-type cb)))
(let* ((cb-for-type (assoc msg-type callbacks)))
(if cb-for-type (setcdr cb-for-type cb)
(nconc callbacks (list (cons msg-type cb)))))))))
(defun jupyter-wait-until (client msg-type pmsg-id timeout cond)
(defun jupyter-wait-until (client msg-type req timeout cond)
"Wait until COND returns non-nil for a received message.
COND is run for every received message that has a type of
MSG-TYPE and whose parent header has a message ID of PMSG-ID. If
@ -411,7 +401,7 @@ defaults to 1 second."
(setq timeout (if timeout (* 1000 timeout) 1000))
(lexical-let ((msg nil)
(cond cond))
(jupyter-add-receive-callback client msg-type pmsg-id
(jupyter-add-receive-callback client msg-type req
(lambda (m) (setq msg (if (funcall cond m) m nil))))
(let ((time 0))
(catch 'timeout
@ -422,15 +412,15 @@ defaults to 1 second."
(setq time (+ time 10)))
msg))))
(defun jupyter-wait-until-idle (client pmsg-id &optional timeout)
(defun jupyter-wait-until-idle (client req &optional timeout)
"Wait until a status: idle message is received for PMSG-ID.
This function waits until TIMEOUT for CLIENT to receive an idle
status message for the request associated with PMSG-ID. If
TIMEOUT is non-nil, it defaults to 1 second."
(jupyter-wait-until client 'status pmsg-id timeout
(jupyter-wait-until client 'status req timeout
#'jupyter-message-status-idle-p))
(defun jupyter-wait-until-received (client msg-type pmsg-id &optional timeout)
(defun jupyter-wait-until-received (client msg-type req &optional timeout)
"Wait for a message with MSG-TYPE to be received on CLIENT.
This function waits until CLIENT receives a message from the
kernel that satisfies the following conditions:
@ -454,7 +444,7 @@ sending one. For example you would not be expecting an
more info
http://jupyter-client.readthedocs.io/en/latest/messaging.html"
(declare (indent 2))
(jupyter-wait-until client msg-type pmsg-id timeout
(jupyter-wait-until client msg-type req timeout
#'identity))
(defun jupyter--handle-message (client channel)
@ -475,33 +465,51 @@ To process a message the following steps are taken:
(let ((ring (oref channel recv-queue)))
(unless (ring-empty-p ring)
;; Messages are stored like (idents . msg) in the ring
(let* ((msg (cdr (ring-remove ring)))
(ctype (oref channel type))
(let* ((ctype (oref channel type))
(handler (cl-case ctype
(:stdin #'jupyter--handle-stdin-message)
(:iopub #'jupyter--handle-iopub-message)
(:shell #'jupyter--handle-shell-message)
(:control #'jupyter--handle-control-message)
(otherwise (error "Wrong channel type (%s)." ctype)))))
(otherwise (error "Wrong channel type (%s)." ctype))))
(msg (cdr (ring-remove ring)))
(pmsg-id (jupyter-message-parent-id msg))
(requests (oref client requests))
(req (gethash pmsg-id requests)))
(unless req
(error "No request found for message."))
(when (jupyter-message-status-idle-p msg)
(setf (jupyter-request-idle-received-p req) t))
(unwind-protect
(funcall handler client msg)
(funcall handler client req msg)
(unwind-protect
(let ((cb (jupyter--callback-for-message client msg)))
(when cb (funcall cb msg)))
(unless (ring-empty-p ring)
(run-with-timer
0.01 nil #'jupyter--handle-message client channel))))))))
(jupyter--run-callbacks-for-message req msg)
;; Remove the request once an idle message has been received and
;; all callbacks have run atleast once. This is done because it is
;; not gauranteed that the idle message is received after all other
;; messages for a request. Note that this probably doesn't handle
;; all cases.
(when (and (jupyter-request-idle-received-p req)
;; Check if all callbacks for req have run at least once
(if (not (jupyter-request-callbacks req)) t
(cl-loop
for (reply-type . cb) in (jupyter-request-callbacks req)
unless (jupyter-callback-ran-p cb) return nil
finally return t)))
(remhash pmsg-id requests))
(run-with-timer
0.01 nil #'jupyter--handle-message client channel)))))))
;;; Received message handlers
;;; stdin messages
(defun jupyter--handle-stdin-message (client msg)
(defun jupyter--handle-stdin-message (client req msg)
(cl-destructuring-bind (&key prompt password &allow-other-keys)
(plist-get msg :content)
(jupyter-handle-input client prompt password)))
(jupyter-handle-input client req prompt password)))
(cl-defmethod jupyter-handle-input ((client jupyter-kernel-client) prompt password)
(cl-defmethod jupyter-handle-input ((client jupyter-kernel-client) req prompt password)
"Handle an input request from CLIENT's kernel.
PROMPT is the prompt the kernel would like to show the user. If
PASSWORD is non-nil, then `read-passwd' is used to get input from
@ -517,7 +525,7 @@ the user. Otherwise `read-from-minibuffer' is used."
;;; control messages
(defun jupyter--handle-control-message (client msg)
(defun jupyter--handle-control-message (client req msg)
(cl-destructuring-bind (&key msg_type content &allow-other-keys) msg
(let ((status (plist-get content :status)))
(if (equal status "ok")
@ -525,7 +533,7 @@ the user. Otherwise `read-from-minibuffer' is used."
;; to message in a kernel's kernelspec.
(pcase msg_type
("interrupt_reply"
(jupyter-handle-interrupt client)))
(jupyter-handle-interrupt client req)))
(if (equal status "error")
(error "Error (%s): %s"
(plist-get content :ename) (plist-get content :evalue))
@ -538,15 +546,8 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
(msg (jupyter-shutdown-request :restart restart)))
(jupyter--send-encoded client channel "shutdown_request" msg)))
(cl-defmethod jupyter-handle-shutdown ((client jupyter-kernel-client) restart)
"Default shutdown reply handler."
(unless restart
(let ((kernel (oref client kernel)))
(when kernel
(jupyter-stop-channels client)
(delete-process kernel)
(kill-buffer (process-buffer kernel))
(oset client kernel nil)))))
(cl-defmethod jupyter-handle-shutdown ((client jupyter-kernel-client) req restart)
"Default shutdown reply handler.")
;; FIXME: This breaks the convention that all jupyter-request-* functions
;; returns a message-id future object.
@ -556,13 +557,13 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
(let ((channel (oref client control-channel)))
(jupyter--send-encoded client channel "interrupt_request" ())))
(cl-defmethod jupyter-handle-interrupt ((client jupyter-kernel-client))
(cl-defmethod jupyter-handle-interrupt ((client jupyter-kernel-client) req)
"Default interrupt reply handler.")
;;; shell messages
;; http://jupyter-client.readthedocs.io/en/latest/messaging.html#messages-on-the-shell-router-dealer-channel
(defun jupyter--handle-shell-message (client msg)
(defun jupyter--handle-shell-message (client req msg)
(cl-destructuring-bind (&key msg_type content &allow-other-keys) msg
(let ((status (plist-get content :status)))
;; We check for error or abort since "is_complete_reply" also contains a
@ -576,7 +577,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
&allow-other-keys)
content
(jupyter-handle-execute
client execution_count user_expressions payload)))
client req execution_count user_expressions payload)))
("inspect_reply"
(cl-destructuring-bind (&key found
data
@ -584,7 +585,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
&allow-other-keys)
content
(jupyter-handle-inspect
client found data metadata)))
client req found data metadata)))
("complete_reply"
(cl-destructuring-bind (&key matches
cursor_start
@ -593,19 +594,19 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
&allow-other-keys)
content
(jupyter-handle-complete
client matches cursor_start cursor_end metadata)))
client req matches cursor_start cursor_end metadata)))
("history_reply"
(cl-destructuring-bind (&key history &allow-other-keys)
content
(jupyter-handle-history client history)))
(jupyter-handle-history client req history)))
("is_complete_reply"
(cl-destructuring-bind (&key status indent &allow-other-keys)
content
(jupyter-handle-is-complete client status indent)))
(jupyter-handle-is-complete client req status indent)))
("comm_info_reply"
(cl-destructuring-bind (&key comms &allow-other-keys)
content
(jupyter-handle-comm-info client comms)))
(jupyter-handle-comm-info client req comms)))
("kernel_info_reply"
(cl-destructuring-bind (&key protocol_version
implementation
@ -616,7 +617,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
&allow-other-keys)
content
(jupyter-handle-kernel-info
client protocol_version implementation implementation_version
client req protocol_version implementation implementation_version
language_info banner help_links)))
(_ (error "Message type not handled yet.")))
(if (equal status "error")
@ -644,6 +645,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
(jupyter--send-encoded client channel "execute_request" msg)))
(cl-defmethod jupyter-handle-execute ((client jupyter-kernel-client)
req
execution-count
user-expressions
payload)
@ -661,6 +663,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
(jupyter--send-encoded client channel "inspect_request" msg)))
(cl-defmethod jupyter-handle-inspect ((client jupyter-kernel-client)
req
found
data
metadata)
@ -677,6 +680,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
(jupyter--send-encoded client channel "complete_request" msg)))
(cl-defmethod jupyter-handle-complete ((client jupyter-kernel-client)
req
matches
cursor-start
cursor-end
@ -709,7 +713,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
:unique unique)))
(jupyter--send-encoded client channel "history_request" msg)))
(cl-defmethod jupyter-handle-history ((client jupyter-kernel-client) history)
(cl-defmethod jupyter-handle-history ((client jupyter-kernel-client) req history)
"Default history reply handler.")
(cl-defmethod jupyter-request-is-complete ((client jupyter-kernel-client)
@ -722,7 +726,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
(jupyter--send-encoded client channel "is_complete_request" msg)))
(cl-defmethod jupyter-handle-is-complete
((client jupyter-kernel-client) status indent)
((client jupyter-kernel-client) req status indent)
"Default is complete reply handler.")
(cl-defmethod jupyter-request-comm-info ((client jupyter-kernel-client)
@ -734,7 +738,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
:target-name target-name)))
(jupyter--send-encoded client channel "comm_info_request" msg)))
(cl-defmethod jupyter-handle-comm-info ((client jupyter-kernel-client) comms)
(cl-defmethod jupyter-handle-comm-info ((client jupyter-kernel-client) req comms)
"Default comm info. reply handler.")
(cl-defmethod jupyter-request-kernel-info ((client jupyter-kernel-client))
@ -743,6 +747,7 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
(jupyter--send-encoded client channel "kernel_info_request" ())))
(cl-defmethod jupyter-handle-kernel-info ((client jupyter-kernel-client)
req
protocol-version
implementation
implementation-version
@ -753,83 +758,88 @@ If RESTART is non-nil, request a restart instead of a complete shutdown."
;;; iopub messages
(defun jupyter--handle-iopub-message (client msg)
(defun jupyter--handle-iopub-message (client req msg)
(cl-destructuring-bind (&key msg_type content &allow-other-keys) msg
(pcase msg_type
("shutdown_reply"
(cl-destructuring-bind (&key restart &allow-other-keys)
content
(jupyter-handle-shutdown client restart)))
(jupyter-handle-shutdown client req restart)))
("stream"
(cl-destructuring-bind (&key name text &allow-other-keys)
content
(jupyter-handle-stream client name text)))
(jupyter-handle-stream client req name text)))
("execute_input"
(cl-destructuring-bind (&key code execution_count &allow-other-keys)
content
(jupyter-handle-execute-input client code execution_count)))
(jupyter-handle-execute-input client req code execution_count)))
("execute_result"
(cl-destructuring-bind (&key execution_count
data
metadata
&allow-other-keys)
content
(jupyter-handle-execute-result client execution_count data metadata)))
(jupyter-handle-execute-result client req execution_count data metadata)))
("error"
(cl-destructuring-bind (&key ename evalue traceback &allow-other-keys)
content
(jupyter-handle-error client ename evalue traceback)))
(jupyter-handle-error client req ename evalue traceback)))
("status"
(cl-destructuring-bind (&key execution_state &allow-other-keys)
content
(jupyter-handle-status client execution_state)))
(jupyter-handle-status client req execution_state)))
("clear_output"
(cl-destructuring-bind (&key wait &allow-other-keys)
content
(jupyter-handle-clear-output client wait)))
(jupyter-handle-clear-output client req wait)))
("display_data"
(cl-destructuring-bind (&key data metadata transient &allow-other-keys)
content
(jupyter-handle-display-data client data metadata transient)))
(jupyter-handle-display-data client req data metadata transient)))
("update_display_data"
(cl-destructuring-bind (&key data metadata transient &allow-other-keys)
content
(jupyter-handle-update-display-data client data metadata transient)))
(jupyter-handle-update-display-data client req data metadata transient)))
(_ (error "Message type not handled yet.")))))
(cl-defmethod jupyter-handle-stream ((client jupyter-kernel-client) name text)
(cl-defmethod jupyter-handle-stream ((client jupyter-kernel-client) pmsg-id name text)
"Default stream handler.")
(cl-defmethod jupyter-handle-execute-input ((client jupyter-kernel-client)
req
code
execution-count)
"Default execute input handler.")
(cl-defmethod jupyter-handle-execute-result ((client jupyter-kernel-client)
req
execution-count
data
metadata)
"Default execute result handler.")
(cl-defmethod jupyter-handle-error ((client jupyter-kernel-client)
req
ename
evalue
traceback)
"Default error handler.")
(cl-defmethod jupyter-handle-status ((client jupyter-kernel-client) execution_state)
(cl-defmethod jupyter-handle-status ((client jupyter-kernel-client) req execution_state)
"Default status handler.")
(cl-defmethod jupyter-handle-clear-output ((client jupyter-kernel-client) wait)
(cl-defmethod jupyter-handle-clear-output ((client jupyter-kernel-client) req wait)
"Default clear output handler.")
(cl-defmethod jupyter-handle-display-data ((client jupyter-kernel-client)
req
data
metadata
transient)
"Default display data handler.")
(cl-defmethod jupyter-handle-update-display-data ((client jupyter-kernel-client)
req
data
metadata
transient)