diff --git a/jupyter-channels.el b/jupyter-channels.el index 1a4a1ab..447acc7 100644 --- a/jupyter-channels.el +++ b/jupyter-channels.el @@ -11,15 +11,6 @@ :control zmq-DEALER) "The socket types for the various channels used by `jupyter'.") -;; TODO: Each channel has its own process like the heartbeat channel and does -;; the majority of encoding and decoding messages there. The parent emacs -;; process then is only responsible to carry out the actions in the messages -;; and construct replies which are then sent to the process. -;; -;; The current implementation still creates a process, but only for polling the -;; file descriptor of the socket to check for incoming messages. What would be -;; simpler is to create one polling process and handle messages through a -;; single filter function instead of per channel. (defclass jupyter-channel () ((type :type keyword @@ -124,166 +115,98 @@ A channel is alive if its socket property is bound to a ((type :type keyword :initform :hb - :documentation "The type of this channel. Should be one of - the keys in `jupyter-channel-socket-types', excluding `:hb' - which corresponds to the heartbeat channel.") + :documentation "The type of this channel is `:hb'.") (endpoint :type string :initarg :endpoint :documentation "The endpoint this channel is connected to. Typical endpoints look like \"tcp://127.0.0.1:5555\".") - ;; channel must be restarted for this to be updated + (socket + :type (or null zmq-socket) + :initform nil + :documentation "The socket used for communicating with the kernel.") (time-to-dead :type integer :initform 1 :documentation "The time in seconds to wait for a response - from the kernel until the connection is assumed to be dead.") + from the kernel until the connection is assumed to be dead. Note + that this slot only takes effect when starting the channel.") (beating :type (or boolean symbol) :initform t :documentation "A flag variable indicating that the heartbeat - channel is sending and receiving messages with the kernel.") + channel is communicating with the kernel.") (paused :type boolean - :initform nil + :initform t :documentation "A flag variable indicating that the heartbeat channel is paused and not communicating with the kernel. To pause the heartbeat channel use `jupyter-hb-pause', to unpause use `jupyter-hb-unpause'.") - (process - :type (or null process) + (timer + :type (or null timer) :initform nil - :documentation "The underlying process which runs the - heartbeat channel and communicates with the kernel.")) + :documentation "The timer which sends and receives heartbeat + messages from the kernel.")) :documentation "A base class for heartbeat channels.") (cl-defmethod jupyter-channel-alive-p ((channel jupyter-hb-channel)) "Return non-nil if CHANNEL is alive." - (process-live-p (oref channel process))) + (and (oref channel timer) (memq (oref channel timer) timer-list))) (cl-defmethod jupyter-hb-beating-p ((channel jupyter-hb-channel)) "Return non-nil if the kernel associated with CHANNEL is still connected." (unless (jupyter-channel-alive-p channel) - (error "Heartbeat process not started")) - (process-send-string (oref channel process) "beating\n") - (accept-process-output (oref channel process) nil nil 1) + (error "Heartbeat channel not alive")) (oref channel beating)) (cl-defmethod jupyter-hb-pause ((channel jupyter-hb-channel)) "Pause checking for heartbeat events on CHANNEL." (unless (jupyter-channel-alive-p channel) - (error "Heartbeat process not started")) - (process-send-string (oref channel process) "pause\n") - (accept-process-output (oref channel process) nil nil 1)) + (error "Heartbeat channel not alive")) + (oset channel paused t)) (cl-defmethod jupyter-hb-unpause ((channel jupyter-hb-channel)) "Unpause checking for heatbeat events on CHANNEL." (unless (jupyter-channel-alive-p channel) - (error "Heartbeat process not started")) - (process-send-string (oref channel process) "unpause\n") - (accept-process-output (oref channel process) nil nil 1)) + (error "Heartbeat channel not alive")) + (oset channel paused nil)) (cl-defmethod jupyter-stop-channel ((channel jupyter-hb-channel)) - "Stop a CHANNEL." - (let ((proc (oref channel process))) - (when proc - (delete-process proc) - (kill-buffer (process-buffer proc)) - (oset channel process nil)))) + "Stop the heartbeat CHANNEL." + (cancel-timer (oref channel timer)) + (zmq-socket-set (oref channel socket) zmq-LINGER 0) + (zmq-close (oref channel socket)) + (oset channel socket nil) + (oset channel timer nil)) -;; TODO: Convert the heartbeat to a timer function that runs every second -;; instead. I can just check zmq-EVENTS every second to see if the channel is -;; beating (cl-defmethod jupyter-start-channel ((channel jupyter-hb-channel) &key identity) - "Start a CHANNEL." - (declare (indent 1)) - (jupyter-stop-channel channel) - ;; https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/channels.py - (let* - ((time-to-dead (oref channel time-to-dead)) - (proc - (zmq-start-process - `(lambda (ctx) - (let ((beating t) - (paused nil) - (request-time nil) - (wait-time nil) - (last-success nil)) - (while t - (catch 'restart - (with-zmq-socket sock ,(plist-get jupyter-channel-socket-types - (oref channel type)) - ((zmq-LINGER 1000)) - ,(when identity - `(zmq-socket-set sock zmq-ROUTING_ID ,identity)) - (zmq-connect sock ,(oref channel endpoint)) - (with-zmq-poller - ;; Poll STDIN to avoid blocking - (zmq-poller-register (current-zmq-poller) 0 zmq-POLLIN) - (zmq-poller-register (current-zmq-poller) sock zmq-POLLIN) - (while t - ;; Send a ping request to the heartbeat channel and poll - ;; for the reply. If any commands from stdin arrive - ;; while polling, handle those and continue waiting. - ;; Once the reply is received, keep polling for stdin - ;; for the remaining time-to-dead period. After waiting - ;; send another ping. - (if request-time - (setq wait-time (* (ceiling - (- ,time-to-dead - (float-time - (time-subtract - (current-time) - request-time)))) - 1000)) - (unless paused - (zmq-send sock "ping")) - (setq request-time (current-time) - wait-time ,(* (ceiling time-to-dead) 1000))) - (let ((event - (condition-case err - (zmq-poller-wait (current-zmq-poller) - (if (> wait-time 0) wait-time 0)) - (zmq-EINTR nil) - (error (signal (car err) (cdr err)))))) - (cond - ((and event (integerp (car event))) - (cl-case (read-minibuffer "") - (beating - (zmq-prin1 (cons 'beating beating))) - (pause - (setq paused t) - (zmq-prin1 '(pause . t))) - (unpause - (setq paused nil) - (zmq-prin1 '(unpause . t))))) - (event - (zmq-recv sock) - (setq beating t - last-success t)) - ;; When no events have arrived after the poll, its an - ;; indication that a reply has been received and we - ;; should send another one so set request-time to nil - ;; to force another send, note that the send will not - ;; happen if we are paused. - ((or paused last-success) - (setq request-time nil - last-success nil)) - (t - (setq beating nil - request-time nil - last-success nil) - (throw 'restart t))))))))))) - (lexical-let ((channel channel)) - (lambda (event) - (cl-case (car event) - (pause (oset channel paused (cdr event))) - (unpause (oset channel paused (not (cdr event)))) - (beating (oset channel beating (cdr event))) - (otherwise (error "Invalid event from heartbeat channel.")))))))) - ;; Don't query when exiting - (set-process-query-on-exit-flag proc nil) - (oset channel process proc))) + (unless (jupyter-channel-alive-p channel) + (oset channel socket (jupyter-connect-channel + :hb (oref channel endpoint) identity)) + (oset channel timer + (run-with-timer + 0 (oref channel time-to-dead) + (lexical-let ((identity identity) + (sent nil)) + (lambda (channel) + (let ((sock (oref channel socket))) + (when sent + (setq sent nil) + (if (condition-case nil + (zmq-recv sock zmq-NOBLOCK) + (zmq-EAGAIN nil)) + (oset channel beating t) + (oset channel beating nil) + (zmq-socket-set sock zmq-LINGER 0) + (zmq-close sock) + (setq sock (jupyter-connect-channel + :hb (oref channel endpoint) identity)) + (oset channel socket sock))) + (unless (oref channel paused) + (zmq-send sock "ping") + (setq sent t))))) + channel)))) (provide 'jupyter-channels)