Heartbeat channels are implemented with timers instead of a subprocess

This commit is contained in:
Nathaniel Nicandro 2017-12-30 23:30:53 -06:00
parent b57040e96c
commit c7c200d571

View file

@ -11,15 +11,6 @@
:control zmq-DEALER)
"The socket types for the various channels used by `jupyter'.")
;; TODO: Each channel has its own process like the heartbeat channel and does
;; the majority of encoding and decoding messages there. The parent emacs
;; process then is only responsible to carry out the actions in the messages
;; and construct replies which are then sent to the process.
;;
;; The current implementation still creates a process, but only for polling the
;; file descriptor of the socket to check for incoming messages. What would be
;; simpler is to create one polling process and handle messages through a
;; single filter function instead of per channel.
(defclass jupyter-channel ()
((type
:type keyword
@ -124,166 +115,98 @@ A channel is alive if its socket property is bound to a
((type
:type keyword
:initform :hb
:documentation "The type of this channel. Should be one of
the keys in `jupyter-channel-socket-types', excluding `:hb'
which corresponds to the heartbeat channel.")
:documentation "The type of this channel is `:hb'.")
(endpoint
:type string
:initarg :endpoint
:documentation "The endpoint this channel is connected to.
Typical endpoints look like \"tcp://127.0.0.1:5555\".")
;; channel must be restarted for this to be updated
(socket
:type (or null zmq-socket)
:initform nil
:documentation "The socket used for communicating with the kernel.")
(time-to-dead
:type integer
:initform 1
:documentation "The time in seconds to wait for a response
from the kernel until the connection is assumed to be dead.")
from the kernel until the connection is assumed to be dead. Note
that this slot only takes effect when starting the channel.")
(beating
:type (or boolean symbol)
:initform t
:documentation "A flag variable indicating that the heartbeat
channel is sending and receiving messages with the kernel.")
channel is communicating with the kernel.")
(paused
:type boolean
:initform nil
:initform t
:documentation "A flag variable indicating that the heartbeat
channel is paused and not communicating with the kernel. To
pause the heartbeat channel use `jupyter-hb-pause', to unpause
use `jupyter-hb-unpause'.")
(process
:type (or null process)
(timer
:type (or null timer)
:initform nil
:documentation "The underlying process which runs the
heartbeat channel and communicates with the kernel."))
:documentation "The timer which sends and receives heartbeat
messages from the kernel."))
:documentation "A base class for heartbeat channels.")
(cl-defmethod jupyter-channel-alive-p ((channel jupyter-hb-channel))
"Return non-nil if CHANNEL is alive."
(process-live-p (oref channel process)))
(and (oref channel timer) (memq (oref channel timer) timer-list)))
(cl-defmethod jupyter-hb-beating-p ((channel jupyter-hb-channel))
"Return non-nil if the kernel associated with CHANNEL is still
connected."
(unless (jupyter-channel-alive-p channel)
(error "Heartbeat process not started"))
(process-send-string (oref channel process) "beating\n")
(accept-process-output (oref channel process) nil nil 1)
(error "Heartbeat channel not alive"))
(oref channel beating))
(cl-defmethod jupyter-hb-pause ((channel jupyter-hb-channel))
"Pause checking for heartbeat events on CHANNEL."
(unless (jupyter-channel-alive-p channel)
(error "Heartbeat process not started"))
(process-send-string (oref channel process) "pause\n")
(accept-process-output (oref channel process) nil nil 1))
(error "Heartbeat channel not alive"))
(oset channel paused t))
(cl-defmethod jupyter-hb-unpause ((channel jupyter-hb-channel))
"Unpause checking for heatbeat events on CHANNEL."
(unless (jupyter-channel-alive-p channel)
(error "Heartbeat process not started"))
(process-send-string (oref channel process) "unpause\n")
(accept-process-output (oref channel process) nil nil 1))
(error "Heartbeat channel not alive"))
(oset channel paused nil))
(cl-defmethod jupyter-stop-channel ((channel jupyter-hb-channel))
"Stop a CHANNEL."
(let ((proc (oref channel process)))
(when proc
(delete-process proc)
(kill-buffer (process-buffer proc))
(oset channel process nil))))
"Stop the heartbeat CHANNEL."
(cancel-timer (oref channel timer))
(zmq-socket-set (oref channel socket) zmq-LINGER 0)
(zmq-close (oref channel socket))
(oset channel socket nil)
(oset channel timer nil))
;; TODO: Convert the heartbeat to a timer function that runs every second
;; instead. I can just check zmq-EVENTS every second to see if the channel is
;; beating
(cl-defmethod jupyter-start-channel ((channel jupyter-hb-channel) &key identity)
"Start a CHANNEL."
(declare (indent 1))
(jupyter-stop-channel channel)
;; https://github.com/jupyter/jupyter_client/blob/master/jupyter_client/channels.py
(let*
((time-to-dead (oref channel time-to-dead))
(proc
(zmq-start-process
`(lambda (ctx)
(let ((beating t)
(paused nil)
(request-time nil)
(wait-time nil)
(last-success nil))
(while t
(catch 'restart
(with-zmq-socket sock ,(plist-get jupyter-channel-socket-types
(oref channel type))
((zmq-LINGER 1000))
,(when identity
`(zmq-socket-set sock zmq-ROUTING_ID ,identity))
(zmq-connect sock ,(oref channel endpoint))
(with-zmq-poller
;; Poll STDIN to avoid blocking
(zmq-poller-register (current-zmq-poller) 0 zmq-POLLIN)
(zmq-poller-register (current-zmq-poller) sock zmq-POLLIN)
(while t
;; Send a ping request to the heartbeat channel and poll
;; for the reply. If any commands from stdin arrive
;; while polling, handle those and continue waiting.
;; Once the reply is received, keep polling for stdin
;; for the remaining time-to-dead period. After waiting
;; send another ping.
(if request-time
(setq wait-time (* (ceiling
(- ,time-to-dead
(float-time
(time-subtract
(current-time)
request-time))))
1000))
(unless paused
(zmq-send sock "ping"))
(setq request-time (current-time)
wait-time ,(* (ceiling time-to-dead) 1000)))
(let ((event
(condition-case err
(zmq-poller-wait (current-zmq-poller)
(if (> wait-time 0) wait-time 0))
(zmq-EINTR nil)
(error (signal (car err) (cdr err))))))
(cond
((and event (integerp (car event)))
(cl-case (read-minibuffer "")
(beating
(zmq-prin1 (cons 'beating beating)))
(pause
(setq paused t)
(zmq-prin1 '(pause . t)))
(unpause
(setq paused nil)
(zmq-prin1 '(unpause . t)))))
(event
(zmq-recv sock)
(setq beating t
last-success t))
;; When no events have arrived after the poll, its an
;; indication that a reply has been received and we
;; should send another one so set request-time to nil
;; to force another send, note that the send will not
;; happen if we are paused.
((or paused last-success)
(setq request-time nil
last-success nil))
(t
(setq beating nil
request-time nil
last-success nil)
(throw 'restart t)))))))))))
(lexical-let ((channel channel))
(lambda (event)
(cl-case (car event)
(pause (oset channel paused (cdr event)))
(unpause (oset channel paused (not (cdr event))))
(beating (oset channel beating (cdr event)))
(otherwise (error "Invalid event from heartbeat channel."))))))))
;; Don't query when exiting
(set-process-query-on-exit-flag proc nil)
(oset channel process proc)))
(unless (jupyter-channel-alive-p channel)
(oset channel socket (jupyter-connect-channel
:hb (oref channel endpoint) identity))
(oset channel timer
(run-with-timer
0 (oref channel time-to-dead)
(lexical-let ((identity identity)
(sent nil))
(lambda (channel)
(let ((sock (oref channel socket)))
(when sent
(setq sent nil)
(if (condition-case nil
(zmq-recv sock zmq-NOBLOCK)
(zmq-EAGAIN nil))
(oset channel beating t)
(oset channel beating nil)
(zmq-socket-set sock zmq-LINGER 0)
(zmq-close sock)
(setq sock (jupyter-connect-channel
:hb (oref channel endpoint) identity))
(oset channel socket sock)))
(unless (oref channel paused)
(zmq-send sock "ping")
(setq sent t)))))
channel))))
(provide 'jupyter-channels)