emacs-jupyter/jupyter-channels.el
Nathaniel Nicandro b3b1066624 Re-organization of code base
- `jupyter-client.el` only contains client related code

- `jupyter-kernel-manager` related code is placed in `jupyter-kernel-manager.el`

- Socket creating functions and generating connection info plist function are
  placed in `jupyter-connection.el`. This also contains the
  `jupyter-connection` class.

- Kernelspec related functions are placed in `jupyter-kernelspec.el`

- Move general utility functions and variables requires necessary for `jupyter`
  into `jupyter-base.el`. This also contains the `jupyter-session` and
  `jupyter-request` struct definitions.
2018-01-06 19:55:45 -06:00

200 lines
7.3 KiB
EmacsLisp

(require 'jupyter-connection)
(require 'ring)
(eval-when-compile (require 'cl))
(defclass jupyter-channel ()
((type
:type keyword
:initarg :type
:documentation "The type of this channel. Should be one of
the keys in `jupyter-channel-socket-types', excluding `:hb'
which corresponds to the heartbeat channel and is handled
differently than the other channels. See `jupyter-hb-channel'.")
(endpoint
:type string
:initarg :endpoint
:documentation "The endpoint this channel is connected to.
Typical endpoints look like \"tcp://127.0.0.1:5555\".")
(socket
:type (or null zmq-socket)
:initform nil
:documentation "The socket this channel uses to communicate
with the kernel.")
(recv-queue
:type ring
:initform (make-ring 10)
:documentation "A queue of messages received on this channel
that are waiting to be processed. See `jupyter-process-message'
for its use."))
:abstract t
:documentation "A base class for channels used by `jupyter'.")
(defclass jupyter-iopub-channel (jupyter-channel)
((type :initform :iopub))
:documentation "A base class for iopub channels.")
(defclass jupyter-stdin-channel (jupyter-channel)
((type :initform :stdin))
:documentation "A base class for stdin channels.")
(defclass jupyter-shell-channel (jupyter-channel)
((type :initform :shell))
:documentation "A base class for shell channels.")
(defclass jupyter-control-channel (jupyter-channel)
((type :initform :control))
:documentation "A base class for control channels.")
(cl-defmethod jupyter-start-channel ((channel jupyter-channel) &key identity)
"Start a CHANNEL.
If IDENTITY is non-nil, it is used as the ROUTING_ID of the
underlying channel's socket."
(unless (jupyter-channel-alive-p channel)
(let ((sock (jupyter-connect-channel
(oref channel type) (oref channel endpoint) identity)))
(oset channel socket sock))))
(cl-defmethod jupyter-start-channel ((channel jupyter-iopub-channel) &key identity)
"Start an iopub CHANNEL subscribed to all messages.
If IDENTITY is non-nil, it is used as the ROUTING_ID of the
underlying channel's socket."
(let ((sock (cl-call-next-method)))
(when sock
(zmq-socket-set sock zmq-SUBSCRIBE ""))))
(cl-defmethod jupyter-stop-channel ((channel jupyter-channel))
"Stop a CHANNEL.
The underlying socket's LINGER property is set to 0, the socket
is closed, the channel's socket property is set to nil, and any
pending messages in the channels recv-queue are removed. Note
that `jupyter-channel-alive-p' on the CHANNEL will return nil
after a call to this function."
(when (jupyter-channel-alive-p channel)
(let ((sock (oref channel socket)))
(zmq-socket-set sock zmq-LINGER 0)
(zmq-close sock)
(cl-loop
with ring = (oref channel recv-queue)
repeat (ring-length ring) do (ring-remove ring))
(oset channel socket nil))))
(cl-defmethod jupyter-channel-alive-p ((channel jupyter-channel))
"Return non-nil if CHANNEL is alive.
A channel is alive if its socket property is bound to a
`zmq-socket'."
(and (slot-boundp channel 'socket)
(not (null (oref channel socket)))))
(cl-defmethod jupyter-channel-push-message ((channel jupyter-channel) msg)
(let ((ring (oref channel recv-queue)))
(ring-insert+extend ring msg 'grow)))
(cl-defmethod jupyter-channel-get-message ((channel jupyter-channel))
(unless (ring-empty-p (oref channel recv-queue))
(cl-destructuring-bind (idents . msg)
(ring-remove (oref channel recv-queue))
msg)))
(cl-defmethod jupyter-channel-messages-available-p ((channel jupyter-channel))
(not (ring-empty-p (oref channel recv-queue))))
(defclass jupyter-hb-channel ()
((type
:type keyword
:initform :hb
:documentation "The type of this channel is `:hb'.")
(endpoint
:type string
:initarg :endpoint
:documentation "The endpoint this channel is connected to.
Typical endpoints look like \"tcp://127.0.0.1:5555\".")
(socket
:type (or null zmq-socket)
:initform nil
:documentation "The socket used for communicating with the kernel.")
(time-to-dead
:type integer
:initform 1
:documentation "The time in seconds to wait for a response
from the kernel until the connection is assumed to be dead. Note
that this slot only takes effect when starting the channel.")
(beating
:type (or boolean symbol)
:initform t
:documentation "A flag variable indicating that the heartbeat
channel is communicating with the kernel.")
(paused
:type boolean
:initform t
:documentation "A flag variable indicating that the heartbeat
channel is paused and not communicating with the kernel. To
pause the heartbeat channel use `jupyter-hb-pause', to unpause
use `jupyter-hb-unpause'.")
(timer
:type (or null timer)
:initform nil
:documentation "The timer which sends and receives heartbeat
messages from the kernel."))
:documentation "A base class for heartbeat channels.")
(cl-defmethod jupyter-channel-alive-p ((channel jupyter-hb-channel))
"Return non-nil if CHANNEL is alive."
(and (oref channel timer) (memq (oref channel timer) timer-list)))
(cl-defmethod jupyter-hb-beating-p ((channel jupyter-hb-channel))
"Return non-nil if the kernel associated with CHANNEL is still
connected."
(unless (jupyter-channel-alive-p channel)
(error "Heartbeat channel not alive"))
(oref channel beating))
(cl-defmethod jupyter-hb-pause ((channel jupyter-hb-channel))
"Pause checking for heartbeat events on CHANNEL."
(unless (jupyter-channel-alive-p channel)
(error "Heartbeat channel not alive"))
(oset channel paused t))
(cl-defmethod jupyter-hb-unpause ((channel jupyter-hb-channel))
"Unpause checking for heatbeat events on CHANNEL."
(unless (jupyter-channel-alive-p channel)
(error "Heartbeat channel not alive"))
(oset channel paused nil))
(cl-defmethod jupyter-stop-channel ((channel jupyter-hb-channel))
"Stop the heartbeat CHANNEL."
(when (jupyter-channel-alive-p channel)
(cancel-timer (oref channel timer))
(zmq-socket-set (oref channel socket) zmq-LINGER 0)
(zmq-close (oref channel socket))
(oset channel socket nil)
(oset channel timer nil)))
(cl-defmethod jupyter-start-channel ((channel jupyter-hb-channel) &key identity)
(unless (jupyter-channel-alive-p channel)
(oset channel socket (jupyter-connect-channel
:hb (oref channel endpoint) identity))
(oset channel timer
(run-with-timer
0 (oref channel time-to-dead)
(lexical-let ((identity identity)
(sent nil))
(lambda (channel)
(let ((sock (oref channel socket)))
(when sent
(setq sent nil)
(if (condition-case nil
(zmq-recv sock zmq-NOBLOCK)
(zmq-EAGAIN nil))
(oset channel beating t)
(oset channel beating nil)
(zmq-socket-set sock zmq-LINGER 0)
(zmq-close sock)
(setq sock (jupyter-connect-channel
:hb (oref channel endpoint) identity))
(oset channel socket sock)))
(unless (oref channel paused)
(zmq-send sock "ping")
(setq sent t)))))
channel))))
(provide 'jupyter-channels)