emacs-jupyter/jupyter-channels.el

217 lines
8.2 KiB
EmacsLisp
Raw Normal View History

2018-01-08 21:38:32 -06:00
;;; jupyter-channels.el --- Jupyter channels -*- lexical-binding: t -*-
;; Copyright (C) 2018 Nathaniel Nicandro
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
;; Created: 08 Jan 2018
2019-03-14 09:08:31 -05:00
;; Version: 0.7.3
2018-01-08 21:38:32 -06:00
;; This program is free software; you can redistribute it and/or
;; modify it under the terms of the GNU General Public License as
;; published by the Free Software Foundation; either version 2, or (at
;; your option) any later version.
;; This program is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with GNU Emacs; see the file COPYING. If not, write to the
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
;; Boston, MA 02111-1307, USA.
;;; Commentary:
2018-11-08 13:41:39 -06:00
;; Implements synchronous channel types. Each channel is essentially a wrapper
;; around a `zmq-socket' constrained to a socket type by the type of the
;; channel and with an associated `zmq-IDENTITY' obtained from the
;; `jupyter-session' that must be associated with the channel. A heartbeat
;; channel is distinct from the other channels in that it is implemented using
;; a timer which periodically pings the kernel depending on how its configured.
;; In order for communication to occur on the other channels, one of
;; `jupyter-send' or `jupyter-recv' must be called after starting the channel
;; with `jupyter-start-channel'.
2018-01-08 21:38:32 -06:00
;;; Code:
2019-03-10 19:53:54 -05:00
(eval-when-compile (require 'subr-x))
(require 'jupyter-base)
(require 'jupyter-messages) ; For `jupyter-send'
2017-12-14 13:39:30 -06:00
(require 'ring)
2018-01-08 21:38:32 -06:00
(defgroup jupyter-channels nil
"Jupyter channels"
:group 'jupyter)
2018-01-08 21:38:32 -06:00
(defcustom jupyter-hb-max-failures 5
"Number of heartbeat failures until the kernel is considered unreachable.
2018-11-14 18:51:50 -06:00
A ping is sent to the kernel on a heartbeat channel and waits
until `time-to-dead' seconds to see if the kernel sent a ping
back. If the kernel doesn't send a ping back after
`jupyter-hb-max-failures', the callback associated with the
heartbeat channel is called. See `jupyter-hb-on-kernel-dead'."
:type 'integer
:group 'jupyter-channels)
2018-10-16 16:45:56 -05:00
2018-01-08 21:38:32 -06:00
;;; Basic channel types
2017-12-13 11:27:13 -06:00
(defclass jupyter-channel ()
2017-12-13 11:27:13 -06:00
((type
:type keyword
:initarg :type
:documentation "The type of this channel. Should be one of
2018-11-04 23:54:30 -06:00
the keys in `jupyter-socket-types'.")
(session
:type jupyter-session
:initarg :session
:documentation "The session object used to sign and
send/receive messages.")
2017-12-13 11:27:13 -06:00
(endpoint
:type string
:initarg :endpoint
:documentation "The endpoint this channel is connected to.
Typical endpoints look like \"tcp://127.0.0.1:5555\"."))
:abstract t)
(defclass jupyter-sync-channel (jupyter-channel)
((socket
2017-12-13 11:27:13 -06:00
:type (or null zmq-socket)
:initform nil
:documentation "The socket used for communicating with the kernel.")))
(cl-defgeneric jupyter-start-channel ((channel jupyter-channel) &key identity)
"Start a Jupyter CHANNEL using IDENTITY as the routing ID.")
(cl-defmethod jupyter-start-channel ((channel jupyter-sync-channel)
&key (identity (jupyter-session-id
(oref channel session))))
(unless (jupyter-channel-alive-p channel)
(let ((socket (jupyter-connect-channel
(oref channel type) (oref channel endpoint) identity)))
(oset channel socket socket)
(cl-case (oref channel type)
(:iopub
(zmq-socket-set socket zmq-SUBSCRIBE ""))))))
(cl-defgeneric jupyter-stop-channel ((channel jupyter-channel))
"Stop a Jupyter CHANNEL.")
(cl-defmethod jupyter-stop-channel ((channel jupyter-sync-channel))
(when (jupyter-channel-alive-p channel)
(zmq-socket-set (oref channel socket) zmq-LINGER 0)
(zmq-close (oref channel socket))
(oset channel socket nil)))
(cl-defmethod jupyter-send ((channel jupyter-sync-channel) type message &optional msg-id)
(jupyter-send (oref channel session) (oref channel socket) type message msg-id))
(cl-defmethod jupyter-recv ((channel jupyter-sync-channel))
(jupyter-recv (oref channel session) (oref channel socket)))
(cl-defgeneric jupyter-channel-alive-p ((channel jupyter-channel))
"Determine if a CHANNEL is alive.")
(cl-defmethod jupyter-channel-alive-p ((channel jupyter-sync-channel))
(not (null (oref channel socket))))
2018-01-08 21:38:32 -06:00
;;; Heartbeat channel
(defclass jupyter-hb-channel (jupyter-sync-channel)
2017-12-13 11:27:13 -06:00
((type
:type keyword
:initform :hb
:documentation "The type of this channel is `:hb'.")
2017-12-13 11:27:13 -06:00
(time-to-dead
2018-10-16 16:45:56 -05:00
:type number
2017-12-13 11:27:13 -06:00
:initform 1
:documentation "The time in seconds to wait for a response
2018-09-09 21:33:05 -05:00
from the kernel until the connection is assumed to be dead. Note
that this slot only takes effect when starting the channel.")
(dead-cb
:type function
:initform #'ignore
:documentation "A callback function that takes 0 arguments
and is called when the kernel has not responded for
\(* `jupyter-hb-max-failures' `time-to-dead'\) seconds.")
2017-12-13 11:27:13 -06:00
(beating
:type (or boolean symbol)
:initform t
:documentation "A flag variable indicating that the heartbeat
2018-09-09 21:33:05 -05:00
channel is communicating with the kernel.")
2017-12-13 11:27:13 -06:00
(paused
:type boolean
:initform t
2017-12-13 11:27:13 -06:00
:documentation "A flag variable indicating that the heartbeat
2018-09-09 16:19:06 -05:00
channel is paused and not communicating with the kernel. To
pause the heartbeat channel use `jupyter-hb-pause', to unpause
use `jupyter-hb-unpause'."))
2017-12-13 11:27:13 -06:00
:documentation "A base class for heartbeat channels.")
(cl-defmethod jupyter-channel-alive-p ((channel jupyter-hb-channel))
"Return non-nil if CHANNEL is alive."
2018-09-09 16:19:06 -05:00
(zmq-socket-p (oref channel socket)))
2017-12-13 11:27:13 -06:00
2018-11-14 01:28:09 -06:00
(defun jupyter-hb--pingable-p (channel)
(and (not (oref channel paused))
(jupyter-channel-alive-p channel)))
(cl-defmethod jupyter-hb-beating-p ((channel jupyter-hb-channel))
2018-09-09 21:33:05 -05:00
"Return non-nil if CHANNEL is reachable."
2018-11-14 01:28:09 -06:00
(and (jupyter-hb--pingable-p channel)
2018-09-09 16:19:06 -05:00
(oref channel beating)))
2017-12-13 11:27:13 -06:00
(cl-defmethod jupyter-hb-pause ((channel jupyter-hb-channel))
"Pause checking for heartbeat events on CHANNEL."
(oset channel paused t))
2017-12-13 11:27:13 -06:00
(cl-defmethod jupyter-hb-unpause ((channel jupyter-hb-channel))
"Un-pause checking for heatbeat events on CHANNEL."
2018-09-09 16:19:06 -05:00
(when (oref channel paused)
(if (jupyter-channel-alive-p channel)
;; Consume a pending message from the kernel if there is one. We send a
;; ping and then schedule a timer which fires TIME-TO-DEAD seconds
;; later to receive the ping back from the kernel and start the process
;; all over again. If the channel is paused before TIME-TO-DEAD
;; seconds, there may still be a ping from the kernel waiting.
(ignore-errors (zmq-recv (oref channel socket) zmq-DONTWAIT))
2018-11-11 21:23:42 -06:00
(jupyter-start-channel channel))
(oset channel paused nil)
(jupyter-hb--send-ping channel)))
2017-12-13 11:27:13 -06:00
(cl-defmethod jupyter-hb-on-kernel-dead ((channel jupyter-hb-channel) fun)
"When the kernel connected to CHANNEL dies, call FUN.
2018-10-16 16:45:56 -05:00
A kernel is considered dead when CHANNEL does not receive a
response after \(* `jupyter-hb-max-failures' `time-to-dead'\)
seconds has elapsed without the kernel sending a ping back."
(declare (indent 1))
(oset channel dead-cb fun))
(defun jupyter-hb--send-ping (channel &optional failed-count)
2018-11-14 01:28:09 -06:00
(when (jupyter-hb--pingable-p channel)
(zmq-send (oref channel socket) "ping")
(run-with-timer
(oref channel time-to-dead) nil
(lambda ()
2019-01-16 20:30:44 -06:00
(when-let* ((sock (and (jupyter-hb--pingable-p channel)
(oref channel socket))))
(oset channel beating
(condition-case nil
(and (zmq-recv sock zmq-DONTWAIT) t)
((zmq-EINTR zmq-EAGAIN) nil)))
(if (oref channel beating)
(jupyter-hb--send-ping channel)
;; Reset the socket
(jupyter-stop-channel channel)
(jupyter-start-channel channel)
(or failed-count (setq failed-count 0))
(if (< failed-count jupyter-hb-max-failures)
(jupyter-hb--send-ping channel (1+ failed-count))
2019-01-16 20:30:44 -06:00
(oset channel paused t)
(when (functionp (oref channel dead-cb))
(funcall (oref channel dead-cb))))))))))
2018-09-09 16:19:06 -05:00
2017-12-13 11:27:13 -06:00
(provide 'jupyter-channels)
2018-01-08 21:38:32 -06:00
;;; jupyter-channels.el ends here