mirror of
https://github.com/vale981/emacs-jupyter
synced 2025-03-05 15:41:37 -05:00
Refactor ioloop subprocess communication
- Only modify/access the `:jupyter-pending-requests' property of the ioloop subprocess through `jupyter--ioloop-pop-request' and `jupyter--ioloop-push-request' - Push a message into a channels receive queue using `jupyter-channel-push-message'
This commit is contained in:
parent
e54ab6e899
commit
2d16ab4900
1 changed files with 41 additions and 42 deletions
|
@ -409,20 +409,14 @@ found in the jupyter messaging protocol. Optional variable FLAGS
|
|||
are the flags sent to the underlying `zmq-send-multipart' call
|
||||
using the CHANNEL's socket."
|
||||
(declare (indent 1))
|
||||
(let* ((ioloop (oref client ioloop))
|
||||
(ring (or (process-get ioloop :jupyter-pending-requests)
|
||||
(let ((ring (make-ring 10)))
|
||||
(process-put ioloop :jupyter-pending-requests
|
||||
ring)
|
||||
ring))))
|
||||
(zmq-subprocess-send (oref client ioloop)
|
||||
(list 'send (oref channel type) type message flags))
|
||||
;; Anything sent to stdin is a reply not a request so don't add it to
|
||||
;; :jupyter-pending-requests.
|
||||
(unless (eq (oref channel type) :stdin)
|
||||
(let ((req (make-jupyter-request)))
|
||||
(ring-insert+extend ring req 'grow)
|
||||
req))))
|
||||
(zmq-subprocess-send (oref client ioloop)
|
||||
(list 'send (oref channel type) type message flags))
|
||||
;; Anything sent to stdin is a reply not a request so don't add it to
|
||||
;; `:jupyter-pending-requests.'
|
||||
(unless (eq (oref channel type) :stdin)
|
||||
(let ((req (make-jupyter-request)))
|
||||
(jupyter--ioloop-push-request client req)
|
||||
req)))
|
||||
|
||||
(defun jupyter--ioloop (client)
|
||||
(let ((iopub-channel (oref client iopub-channel))
|
||||
|
@ -564,6 +558,19 @@ using the CHANNEL's socket."
|
|||
(mapcar #'car channels))))
|
||||
(quit (zmq-prin1 (cons 'quit (cons nil nil))))))))))
|
||||
|
||||
(defun jupyter--ioloop-pop-request (client)
|
||||
(let* ((ring (process-get (oref client ioloop) :jupyter-pending-requests))
|
||||
(req (ring-remove ring)))
|
||||
req))
|
||||
|
||||
(defun jupyter--ioloop-push-request (client req)
|
||||
(let* ((ioloop (oref client ioloop))
|
||||
(ring (or (process-get ioloop :jupyter-pending-requests)
|
||||
(let ((ring (make-ring 10)))
|
||||
(process-put ioloop :jupyter-pending-requests ring)
|
||||
ring))))
|
||||
(ring-insert+extend ring req 'grow)))
|
||||
|
||||
(defun jupyter--ioloop-sentinel (client ioloop event)
|
||||
(cond
|
||||
((cl-loop for type in '("exited" "failed" "finished" "killed" "deleted")
|
||||
|
@ -578,37 +585,29 @@ using the CHANNEL's socket."
|
|||
(cl-case (car event)
|
||||
;; Cleanup handled in sentinel
|
||||
(quit)
|
||||
;; data = sent message id
|
||||
(sent
|
||||
;; Anything sent on stdin is a reply and therefore never added to
|
||||
;; :jupyter-pending-requests
|
||||
(when jupyter--debug
|
||||
(message "SEND: %s" data))
|
||||
(unless (eq ctype :stdin)
|
||||
(let* ((ring (process-get
|
||||
(oref client ioloop) :jupyter-pending-requests))
|
||||
(req (ring-remove ring)))
|
||||
(setf (jupyter-request--id req) data)
|
||||
(when jupyter--debug
|
||||
(message "SEND: %s" (jupyter-request-id req)))
|
||||
(puthash data req (oref client requests)))))
|
||||
;; data = (idents . msg)
|
||||
;; Anything sent on stdin is a reply and therefore never added to
|
||||
;; `:jupyter-pending-requests'
|
||||
(let ((id data)
|
||||
(req (jupyter--ioloop-pop-request client)))
|
||||
(setf (jupyter-request--id req) id)
|
||||
(puthash id req (oref client requests)))))
|
||||
(recvd
|
||||
(let* ((channel (cl-find-if
|
||||
(lambda (c) (eq (oref c type) ctype))
|
||||
(mapcar (lambda (x) (eieio-oref client x))
|
||||
'(stdin-channel
|
||||
shell-channel
|
||||
iopub-channel))))
|
||||
(ring (oref channel recv-queue)))
|
||||
(when jupyter--debug
|
||||
(message "RECV: %s %s %s" (jupyter-message-type (cdr data))
|
||||
(jupyter-message-parent-id (cdr data))
|
||||
(plist-get (cdr data) :content)))
|
||||
(if (= (ring-length ring) (ring-size ring))
|
||||
;; Try to process at a later time when the recv-queue is full
|
||||
(run-with-timer 0.05 nil #'jupyter--ioloop-filter client event)
|
||||
(ring-insert ring data)
|
||||
(run-with-timer
|
||||
0.01 nil #'jupyter--handle-message client channel)))))))
|
||||
(when jupyter--debug
|
||||
(message "RECV: %s %s %s"
|
||||
(jupyter-message-type (cdr data))
|
||||
(jupyter-message-parent-id (cdr data))
|
||||
(jupyter-message-content (cdr data))))
|
||||
(let ((channel (cl-find-if (lambda (c) (eq (oref c type) ctype))
|
||||
(mapcar (lambda (x) (eieio-oref client x))
|
||||
'(stdin-channel
|
||||
shell-channel
|
||||
iopub-channel)))))
|
||||
(jupyter-channel-push-message channel data)
|
||||
(run-with-timer 0.001 nil #'jupyter-handle-message client channel))))))
|
||||
|
||||
(cl-defmethod jupyter-start-channels ((client jupyter-kernel-client)
|
||||
&key (shell t)
|
||||
|
|
Loading…
Add table
Reference in a new issue