mirror of
https://github.com/vale981/emacs-jupyter
synced 2025-03-05 07:41:37 -05:00
Break up jupyter-channels.el
to further decouple zmq
* jupyter-channel-ioloop.el (jupyter-zmq-channel): Require. (jupyter-channel-ioloop-add-start-channel-event): `sync` -> `zmq` * jupyter-comm-layer (jupyter-comm--channel): Moved from `jupyter-channels.el`. * jupyter-kernel-manager.el: `sync` -> `zmq` * jupyter-zmq-channel-comm.el: New file. * jupyter-channels.el: Mostly renamed to jupyter-zmq-channel.el. The `jupyter-channel` class was moved to `jupyter-channel.el`. All that remains are those classes dependent on `zmq`. * test/jupyter-test.el: `sync` -> `zmq` where appropriate. Extract `jupyter-channel` class from `jupyter-zmq-channel.el` into its own file
This commit is contained in:
parent
b40b7de837
commit
58b715a4e8
7 changed files with 277 additions and 221 deletions
|
@ -43,7 +43,7 @@
|
|||
|
||||
(require 'jupyter-base)
|
||||
(require 'jupyter-ioloop)
|
||||
(require 'jupyter-channels)
|
||||
(require 'jupyter-zmq-channel)
|
||||
|
||||
;;; Variables used in the ioloop
|
||||
|
||||
|
@ -171,7 +171,7 @@ is returned to the parent process."
|
|||
(cl-assert (memq type jupyter-socket-types))
|
||||
(let ((channel (object-assoc type :type jupyter-ioloop-channels)))
|
||||
(unless channel
|
||||
(setq channel (jupyter-sync-channel
|
||||
(setq channel (jupyter-zmq-channel
|
||||
:session jupyter-ioloop-session
|
||||
:type type :endpoint endpoint))
|
||||
(push channel jupyter-ioloop-channels))
|
||||
|
|
67
jupyter-channel.el
Normal file
67
jupyter-channel.el
Normal file
|
@ -0,0 +1,67 @@
|
|||
;;; jupyter-channel.el --- Jupyter channel interface -*- lexical-binding: t -*-
|
||||
|
||||
;; Copyright (C) 2019 Nathaniel Nicandro
|
||||
|
||||
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
|
||||
;; Created: 27 Jun 2019
|
||||
;; Version: 0.8.0
|
||||
|
||||
;; This program is free software; you can redistribute it and/or
|
||||
;; modify it under the terms of the GNU General Public License as
|
||||
;; published by the Free Software Foundation; either version 3, or (at
|
||||
;; your option) any later version.
|
||||
|
||||
;; This program is distributed in the hope that it will be useful, but
|
||||
;; WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
;; General Public License for more details.
|
||||
|
||||
;; You should have received a copy of the GNU General Public License
|
||||
;; along with GNU Emacs; see the file COPYING. If not, write to the
|
||||
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
||||
;; Boston, MA 02111-1307, USA.
|
||||
|
||||
;;; Commentary:
|
||||
|
||||
;; Defines the `jupyter-channel' interface.
|
||||
|
||||
;;; Code:
|
||||
|
||||
(defclass jupyter-channel ()
|
||||
((type
|
||||
:type keyword
|
||||
:initarg :type
|
||||
:documentation "The type of this channel.")
|
||||
(session
|
||||
:type jupyter-session
|
||||
:initarg :session
|
||||
:documentation "The session object used to sign and send/receive messages.")
|
||||
(endpoint
|
||||
:type string
|
||||
:initarg :endpoint
|
||||
:documentation "The endpoint this channel is connected to.
|
||||
Typical endpoints look like \"tcp://127.0.0.1:5555\"."))
|
||||
:abstract t)
|
||||
|
||||
(cl-defgeneric jupyter-start-channel ((channel jupyter-channel) &key identity)
|
||||
"Start a Jupyter CHANNEL using IDENTITY as the routing ID.
|
||||
If CHANNEL is already alive, do nothing.")
|
||||
|
||||
(cl-defgeneric jupyter-stop-channel ((channel jupyter-channel))
|
||||
"Stop a Jupyter CHANNEL.
|
||||
If CHANNEL is already stopped, do nothing.")
|
||||
|
||||
(cl-defgeneric jupyter-channel-alive-p ((channel jupyter-channel))
|
||||
"Return non-nil if a CHANNEL is alive.")
|
||||
|
||||
(cl-defgeneric jupyter-send (channel type message &optional msg-id)
|
||||
"On CHANNEL send MESSAGE which has message TYPE and optionally a MSG-ID.")
|
||||
|
||||
(cl-defgeneric jupyter-recv (channel &optional dont-wait)
|
||||
"Receive a message on CHANNEL.
|
||||
If DONT-WAIT is non-nil, return nil immediately if there is no
|
||||
message available to receive.")
|
||||
|
||||
(provide 'jupyter-channel)
|
||||
|
||||
;;; jupyter-channel.el ends here
|
|
@ -76,6 +76,16 @@
|
|||
(when-let* ((,client (jupyter-weak-ref-resolve (pop ,clients))))
|
||||
,@body)))))
|
||||
|
||||
;; FIXME: This is needed since the `jupyter-kernel-client' and
|
||||
;; `jupyter-channel-ioloop' use keywords whereas you can only access slots
|
||||
;; using symbols.
|
||||
(defsubst jupyter-comm--channel (c)
|
||||
(cl-case c
|
||||
(:hb 'hb)
|
||||
(:iopub 'iopub)
|
||||
(:shell 'shell)
|
||||
(:stdin 'stdin)))
|
||||
|
||||
;;; `jupyter-comm-layer'
|
||||
|
||||
(cl-defgeneric jupyter-comm-start ((comm jupyter-comm-layer) &rest _ignore)
|
||||
|
|
|
@ -301,7 +301,7 @@ it hasn't been already."
|
|||
:initarg :kernel
|
||||
:documentation "The name of the kernel that is being managed.")
|
||||
(control-channel
|
||||
:type (or null jupyter-sync-channel)
|
||||
:type (or null jupyter-zmq-channel)
|
||||
:initform nil
|
||||
:documentation "The kernel's control channel.")))
|
||||
|
||||
|
@ -363,7 +363,7 @@ connect to MANAGER's kernel."
|
|||
(cl-destructuring-bind (&key transport ip control_port &allow-other-keys)
|
||||
(jupyter-session-conn-info (oref kernel session))
|
||||
(oset manager control-channel
|
||||
(jupyter-sync-channel
|
||||
(jupyter-zmq-channel
|
||||
:type :control
|
||||
:session (oref kernel session)
|
||||
:endpoint (format "%s://%s:%d" transport ip control_port)))
|
||||
|
|
156
jupyter-zmq-channel-comm.el
Normal file
156
jupyter-zmq-channel-comm.el
Normal file
|
@ -0,0 +1,156 @@
|
|||
;;; jupyter-zmq-channel-comm.el --- Communication layer using ZMQ sockets -*- lexical-binding: t -*-
|
||||
|
||||
;; Copyright (C) 2019 Nathaniel Nicandro
|
||||
|
||||
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
|
||||
;; Created: 27 Jun 2019
|
||||
;; Version: 0.8.0
|
||||
|
||||
;; This program is free software; you can redistribute it and/or
|
||||
;; modify it under the terms of the GNU General Public License as
|
||||
;; published by the Free Software Foundation; either version 3, or (at
|
||||
;; your option) any later version.
|
||||
|
||||
;; This program is distributed in the hope that it will be useful, but
|
||||
;; WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
;; General Public License for more details.
|
||||
|
||||
;; You should have received a copy of the GNU General Public License
|
||||
;; along with GNU Emacs; see the file COPYING. If not, write to the
|
||||
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
|
||||
;; Boston, MA 02111-1307, USA.
|
||||
|
||||
;;; Commentary:
|
||||
|
||||
;; A communication layer using `jupyter-zmq-channel' objects for communicating
|
||||
;; with a kernel. This communication layer is mainly meant for speed comparison
|
||||
;; with the `jupyter-channel-ioloop-comm' layer. It implements communication in
|
||||
;; the current Emacs instance and comparing it with the
|
||||
;; `jupyter-channel-ioloop-comm' shows how much of a slow down there is when
|
||||
;; all the processing of messages happens in the current Emacs instance.
|
||||
;;
|
||||
;; Running the test suit using `jupyter-zmq-channel-comm' vs
|
||||
;; `jupyter-channel-ioloop-comm' shows, very roughly, around a 2x speed up
|
||||
;; using `jupyter-channel-ioloop-comm'.
|
||||
|
||||
;;; Code:
|
||||
|
||||
(require 'jupyter-zmq-channel)
|
||||
(require 'jupyter-comm-layer)
|
||||
(eval-when-compile (require 'subr-x))
|
||||
|
||||
(defclass jupyter-zmq-channel-comm (jupyter-comm-layer
|
||||
jupyter-hb-comm
|
||||
jupyter-comm-autostop)
|
||||
((session :type jupyter-session)
|
||||
(iopub :type jupyter-zmq-channel)
|
||||
(shell :type jupyter-zmq-channel)
|
||||
(stdin :type jupyter-zmq-channel)
|
||||
(thread)))
|
||||
|
||||
(cl-defmethod initialize-instance ((_comm jupyter-zmq-channel-comm) &optional _slots)
|
||||
(unless (functionp 'make-thread)
|
||||
(error "Need threading support"))
|
||||
(cl-call-next-method))
|
||||
|
||||
(cl-defmethod jupyter-comm-id ((comm jupyter-zmq-channel-comm))
|
||||
(format "session=%s" (truncate-string-to-width
|
||||
(jupyter-session-id (oref comm session))
|
||||
9 nil nil "…")))
|
||||
|
||||
(defun jupyter-zmq-channel-comm--check (comm)
|
||||
(condition-case err
|
||||
(cl-loop
|
||||
for channel-type in '(:iopub :shell :stdin)
|
||||
for channel = (slot-value comm (jupyter-comm--channel channel-type))
|
||||
for msg = (and (jupyter-channel-alive-p channel)
|
||||
(with-slots (session socket) channel
|
||||
(condition-case nil
|
||||
(jupyter-recv session socket zmq-DONTWAIT)
|
||||
((zmq-EINTR zmq-EAGAIN) nil))))
|
||||
when msg do (jupyter-event-handler
|
||||
comm (cl-list* 'message channel-type msg)))
|
||||
(error
|
||||
(thread-signal (car (all-threads)) (car err)
|
||||
(cons 'jupyter-zmq-channel-comm--check (cdr err)))
|
||||
(signal (car err) (cdr err)))))
|
||||
|
||||
(cl-defmethod jupyter-comm-start ((comm jupyter-zmq-channel-comm))
|
||||
(cl-loop
|
||||
for channel in '(hb shell iopub stdin)
|
||||
do (jupyter-start-channel (slot-value comm channel)))
|
||||
(oset comm thread
|
||||
(make-thread
|
||||
(let ((comm-ref (jupyter-weak-ref comm)))
|
||||
(lambda ()
|
||||
(while (when-let* ((comm (jupyter-weak-ref-resolve comm-ref)))
|
||||
(prog1 comm
|
||||
(jupyter-zmq-channel-comm--check comm)))
|
||||
(thread-yield)
|
||||
(thread-yield)))))))
|
||||
|
||||
(cl-defmethod jupyter-comm-stop ((comm jupyter-zmq-channel-comm))
|
||||
(when (and (slot-boundp comm 'thread)
|
||||
(thread-alive-p (oref comm thread)))
|
||||
(thread-signal (oref comm thread) 'quit nil)
|
||||
(slot-makeunbound comm 'thread))
|
||||
(cl-loop
|
||||
for channel in '(hb shell iopub stdin)
|
||||
do (jupyter-stop-channel (slot-value comm channel))))
|
||||
|
||||
(cl-defmethod jupyter-comm-alive-p ((comm jupyter-zmq-channel-comm))
|
||||
(jupyter-channels-running-p comm))
|
||||
|
||||
(cl-defmethod jupyter-channel-alive-p ((comm jupyter-zmq-channel-comm) channel)
|
||||
(and (slot-boundp comm (jupyter-comm--channel channel))
|
||||
(jupyter-channel-alive-p (slot-value comm (jupyter-comm--channel channel)))))
|
||||
|
||||
(cl-defmethod jupyter-channels-running-p ((comm jupyter-zmq-channel-comm))
|
||||
(cl-loop
|
||||
for channel in '(:shell :iopub :stdin :hb)
|
||||
thereis (jupyter-channel-alive-p comm channel)))
|
||||
|
||||
;;;; Channel start/stop methods
|
||||
|
||||
(cl-defmethod jupyter-stop-channel ((comm jupyter-zmq-channel-comm) channel)
|
||||
(when (jupyter-channel-alive-p comm channel)
|
||||
(jupyter-stop-channel
|
||||
(slot-value comm (jupyter-comm--channel channel)))))
|
||||
|
||||
(cl-defmethod jupyter-start-channel ((comm jupyter-zmq-channel-comm) channel)
|
||||
(unless (jupyter-channel-alive-p comm channel)
|
||||
(jupyter-start-channel
|
||||
(slot-value comm (jupyter-comm--channel channel)))))
|
||||
|
||||
(cl-defmethod jupyter-initialize-connection ((comm jupyter-zmq-channel-comm)
|
||||
(session jupyter-session))
|
||||
(cl-call-next-method)
|
||||
(let ((endpoints (jupyter-session-endpoints session)))
|
||||
(oset comm session (copy-sequence session))
|
||||
(oset comm hb (make-instance
|
||||
'jupyter-hb-channel
|
||||
:session (oref comm session)
|
||||
:endpoint (plist-get endpoints :hb)))
|
||||
(cl-loop
|
||||
for channel in `(:stdin :shell :iopub)
|
||||
do (setf (slot-value comm (jupyter-comm--channel channel))
|
||||
(jupyter-zmq-channel
|
||||
:type channel
|
||||
:session (oref comm session)
|
||||
:endpoint (plist-get endpoints channel))))))
|
||||
|
||||
(cl-defmethod jupyter-send ((comm jupyter-zmq-channel-comm)
|
||||
_ channel-type msg-type msg msg-id)
|
||||
(let ((channel (slot-value comm (jupyter-comm--channel channel-type))))
|
||||
;; Run the event handler on the next command loop since the expectation is
|
||||
;; the client is that sending is asynchronous. There may be some code that
|
||||
;; makes assumptions based on this.
|
||||
(run-at-time
|
||||
0 nil (lambda (id)
|
||||
(jupyter-event-handler comm (list 'sent channel-type id)))
|
||||
(jupyter-send channel msg-type msg msg-id))))
|
||||
|
||||
(provide 'jupyter-zmq-channel-comm)
|
||||
|
||||
;;; jupyter-zmq-channel-comm.el ends here
|
|
@ -1,9 +1,9 @@
|
|||
;;; jupyter-channels.el --- Jupyter channels -*- lexical-binding: t -*-
|
||||
;;; jupyter-zmq-channel.el --- A Jupyter channel implementation using ZMQ sockets -*- lexical-binding: t -*-
|
||||
|
||||
;; Copyright (C) 2018 Nathaniel Nicandro
|
||||
;; Copyright (C) 2019 Nathaniel Nicandro
|
||||
|
||||
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
|
||||
;; Created: 08 Jan 2018
|
||||
;; Created: 27 Jun 2019
|
||||
;; Version: 0.8.0
|
||||
|
||||
;; This program is free software; you can redistribute it and/or
|
||||
|
@ -23,42 +23,22 @@
|
|||
|
||||
;;; Commentary:
|
||||
|
||||
;; Implements synchronous channel types. Each channel is essentially a wrapper
|
||||
;; around a `zmq-socket' constrained to a socket type by the type of the
|
||||
;; channel and with an associated `zmq-IDENTITY' obtained from the
|
||||
;; `jupyter-session' that must be associated with the channel. A heartbeat
|
||||
;; Implements synchronous channel types using ZMQ sockets. Each channel is
|
||||
;; essentially a wrapper around a `zmq-socket' constrained to a socket type by
|
||||
;; the type of the channel and with an associated `zmq-IDENTITY' obtained from
|
||||
;; the `jupyter-session' that must be associated with the channel. A heartbeat
|
||||
;; channel is distinct from the other channels in that it is implemented using
|
||||
;; a timer which periodically pings the kernel depending on how its configured.
|
||||
;; In order for communication to occur on the other channels, one of
|
||||
;; `jupyter-send' or `jupyter-recv' must be called after starting the channel
|
||||
;; with `jupyter-start-channel'.
|
||||
;;
|
||||
;; Also implemented is a `jupyter-comm-layer' using `jupyter-sync-channel' comm
|
||||
;; objects (`jupyter-sync-channel-comm') defined in this file. It is more of a
|
||||
;; reference implementation to show how it could be done and required that
|
||||
;; Emacs was built with threading support enabled.
|
||||
|
||||
;;; Code:
|
||||
|
||||
(eval-when-compile (require 'subr-x))
|
||||
(require 'jupyter-base)
|
||||
(require 'jupyter-comm-layer)
|
||||
(require 'jupyter-messages)
|
||||
(require 'zmq)
|
||||
(require 'ring)
|
||||
|
||||
(defgroup jupyter-channels nil
|
||||
"Jupyter channels"
|
||||
:group 'jupyter)
|
||||
|
||||
(defcustom jupyter-hb-max-failures 5
|
||||
"Number of heartbeat failures until the kernel is considered unreachable.
|
||||
A ping is sent to the kernel on a heartbeat channel and waits
|
||||
until `time-to-dead' seconds to see if the kernel sent a ping
|
||||
back. If the kernel doesn't send a ping back after
|
||||
`jupyter-hb-max-failures', the callback associated with the
|
||||
heartbeat channel is called. See `jupyter-hb-on-kernel-dead'."
|
||||
:type 'integer
|
||||
:group 'jupyter-channels)
|
||||
(require 'jupyter-channel)
|
||||
(eval-when-compile (require 'subr-x))
|
||||
|
||||
(defconst jupyter-socket-types
|
||||
(list :hb zmq-REQ
|
||||
|
@ -68,36 +48,12 @@ heartbeat channel is called. See `jupyter-hb-on-kernel-dead'."
|
|||
:control zmq-DEALER)
|
||||
"The socket types for the various channels used by `jupyter'.")
|
||||
|
||||
;;; Basic channel types
|
||||
|
||||
(defclass jupyter-channel ()
|
||||
((type
|
||||
:type keyword
|
||||
:initarg :type
|
||||
:documentation "The type of this channel. Should be one of
|
||||
the keys in `jupyter-socket-types'.")
|
||||
(session
|
||||
:type jupyter-session
|
||||
:initarg :session
|
||||
:documentation "The session object used to sign and
|
||||
send/receive messages.")
|
||||
(endpoint
|
||||
:type string
|
||||
:initarg :endpoint
|
||||
:documentation "The endpoint this channel is connected to.
|
||||
Typical endpoints look like \"tcp://127.0.0.1:5555\"."))
|
||||
:abstract t)
|
||||
|
||||
(defclass jupyter-sync-channel (jupyter-channel)
|
||||
(defclass jupyter-zmq-channel (jupyter-channel)
|
||||
((socket
|
||||
:type (or null zmq-socket)
|
||||
:initform nil
|
||||
:documentation "The socket used for communicating with the kernel.")))
|
||||
|
||||
(cl-defgeneric jupyter-start-channel ((channel jupyter-channel) &key identity)
|
||||
"Start a Jupyter CHANNEL using IDENTITY as the routing ID.
|
||||
If CHANNEL is already alive, do nothing.")
|
||||
|
||||
(defun jupyter-connect-endpoint (type endpoint &optional identity)
|
||||
"Create socket with TYPE and connect to ENDPOINT.
|
||||
If IDENTITY is non-nil, it will be set as the ROUTING-ID of the
|
||||
|
@ -120,7 +76,7 @@ the ROUTING-ID of the socket. Return the created socket."
|
|||
(error "Invalid channel type (%s)" ctype))
|
||||
(jupyter-connect-endpoint sock-type endpoint identity)))
|
||||
|
||||
(cl-defmethod jupyter-start-channel ((channel jupyter-sync-channel)
|
||||
(cl-defmethod jupyter-start-channel ((channel jupyter-zmq-channel)
|
||||
&key (identity (jupyter-session-id
|
||||
(oref channel session))))
|
||||
(unless (jupyter-channel-alive-p channel)
|
||||
|
@ -131,23 +87,20 @@ the ROUTING-ID of the socket. Return the created socket."
|
|||
(:iopub
|
||||
(zmq-socket-set socket zmq-SUBSCRIBE ""))))))
|
||||
|
||||
(cl-defgeneric jupyter-stop-channel ((channel jupyter-channel))
|
||||
"Stop a Jupyter CHANNEL.
|
||||
If CHANNEL is already stopped, do nothing.")
|
||||
|
||||
(cl-defmethod jupyter-stop-channel ((channel jupyter-sync-channel))
|
||||
(cl-defmethod jupyter-stop-channel ((channel jupyter-zmq-channel))
|
||||
(when (jupyter-channel-alive-p channel)
|
||||
(zmq-socket-set (oref channel socket) zmq-LINGER 0)
|
||||
(zmq-close (oref channel socket))
|
||||
(oset channel socket nil)))
|
||||
|
||||
(cl-defgeneric jupyter-channel-alive-p ((channel jupyter-channel))
|
||||
"Determine if a CHANNEL is alive.")
|
||||
|
||||
(cl-defmethod jupyter-channel-alive-p ((channel jupyter-sync-channel))
|
||||
(cl-defmethod jupyter-channel-alive-p ((channel jupyter-zmq-channel))
|
||||
(not (null (oref channel socket))))
|
||||
|
||||
;;; Sending/receiving
|
||||
(cl-defmethod jupyter-send ((channel jupyter-zmq-channel) type message &optional msg-id)
|
||||
(jupyter-send (oref channel session) (oref channel socket) type message msg-id))
|
||||
|
||||
(cl-defmethod jupyter-recv ((channel jupyter-zmq-channel))
|
||||
(jupyter-recv (oref channel session) (oref channel socket)))
|
||||
|
||||
(cl-defmethod jupyter-send ((session jupyter-session)
|
||||
socket
|
||||
|
@ -185,15 +138,17 @@ and other such functions."
|
|||
(jupyter--split-identities msg)
|
||||
(cons idents (jupyter-decode-message session parts))))))
|
||||
|
||||
(cl-defmethod jupyter-send ((channel jupyter-sync-channel) type message &optional msg-id)
|
||||
(jupyter-send (oref channel session) (oref channel socket) type message msg-id))
|
||||
|
||||
(cl-defmethod jupyter-recv ((channel jupyter-sync-channel))
|
||||
(jupyter-recv (oref channel session) (oref channel socket)))
|
||||
|
||||
;;; Heartbeat channel
|
||||
|
||||
(defclass jupyter-hb-channel (jupyter-sync-channel)
|
||||
(defvar jupyter-hb-max-failures 5
|
||||
"Number of heartbeat failures until the kernel is considered unreachable.
|
||||
A ping is sent to the kernel on a heartbeat channel and waits
|
||||
until `time-to-dead' seconds to see if the kernel sent a ping
|
||||
back. If the kernel doesn't send a ping back after
|
||||
`jupyter-hb-max-failures', the callback associated with the
|
||||
heartbeat channel is called. See `jupyter-hb-on-kernel-dead'.")
|
||||
|
||||
(defclass jupyter-hb-channel (jupyter-zmq-channel)
|
||||
((type
|
||||
:type keyword
|
||||
:initform :hb
|
||||
|
@ -294,139 +249,6 @@ seconds has elapsed without the kernel sending a ping back."
|
|||
(funcall (oref channel dead-cb)))))))
|
||||
(jupyter-weak-ref channel))))
|
||||
|
||||
;;; `jupyter-channel-comm'
|
||||
;; A communication layer using `jupyter-sync-channel' objects for communicating
|
||||
;; with a kernel. This communication layer is mainly meant for speed comparison
|
||||
;; with the `jupyter-channel-ioloop-comm' layer. It implements communication in
|
||||
;; the current Emacs instance and comparing it with the
|
||||
;; `jupyter-channel-ioloop-comm' shows how much of a slow down there is when
|
||||
;; all the processing of messages happens in the current Emacs instance.
|
||||
;;
|
||||
;; Running the test suit using `jupyter-channel-comm' vs
|
||||
;; `jupyter-channel-ioloop-comm' shows, very roughly, around a 2x speed up
|
||||
;; using `jupyter-channel-ioloop-comm'.
|
||||
(provide 'jupyter-zmq-channel)
|
||||
|
||||
;; FIXME: This is needed since the `jupyter-kernel-client' and
|
||||
;; `jupyter-channel-ioloop' use keywords whereas you can only access slots
|
||||
;; using symbols.
|
||||
(defsubst jupyter-comm--channel (c)
|
||||
(cl-case c
|
||||
(:hb 'hb)
|
||||
(:iopub 'iopub)
|
||||
(:shell 'shell)
|
||||
(:stdin 'stdin)))
|
||||
|
||||
(defclass jupyter-sync-channel-comm (jupyter-comm-layer
|
||||
jupyter-hb-comm
|
||||
jupyter-comm-autostop)
|
||||
((session :type jupyter-session)
|
||||
(iopub :type jupyter-sync-channel)
|
||||
(shell :type jupyter-sync-channel)
|
||||
(stdin :type jupyter-sync-channel)
|
||||
(thread)))
|
||||
|
||||
(cl-defmethod initialize-instance ((_comm jupyter-sync-channel-comm) &optional _slots)
|
||||
(unless (functionp 'make-thread)
|
||||
(error "Need threading support"))
|
||||
(cl-call-next-method))
|
||||
|
||||
(cl-defmethod jupyter-comm-id ((comm jupyter-sync-channel-comm))
|
||||
(format "session=%s" (truncate-string-to-width
|
||||
(jupyter-session-id (oref comm session))
|
||||
9 nil nil "…")))
|
||||
|
||||
(defun jupyter-sync-channel-comm--check (comm)
|
||||
(condition-case err
|
||||
(cl-loop
|
||||
for channel-type in '(:iopub :shell :stdin)
|
||||
for channel = (slot-value comm (jupyter-comm--channel channel-type))
|
||||
for msg = (and (jupyter-channel-alive-p channel)
|
||||
(with-slots (session socket) channel
|
||||
(condition-case nil
|
||||
(jupyter-recv session socket zmq-DONTWAIT)
|
||||
((zmq-EINTR zmq-EAGAIN) nil))))
|
||||
when msg do (jupyter-event-handler
|
||||
comm (cl-list* 'message channel-type msg)))
|
||||
(error
|
||||
(thread-signal (car (all-threads)) (car err)
|
||||
(cons 'jupyter-sync-channel-comm--check (cdr err)))
|
||||
(signal (car err) (cdr err)))))
|
||||
|
||||
(cl-defmethod jupyter-comm-start ((comm jupyter-sync-channel-comm))
|
||||
(cl-loop
|
||||
for channel in '(hb shell iopub stdin)
|
||||
do (jupyter-start-channel (slot-value comm channel)))
|
||||
(oset comm thread
|
||||
(make-thread
|
||||
(let ((comm-ref (jupyter-weak-ref comm)))
|
||||
(lambda ()
|
||||
(while (when-let* ((comm (jupyter-weak-ref-resolve comm-ref)))
|
||||
(prog1 comm
|
||||
(jupyter-sync-channel-comm--check comm)))
|
||||
(thread-yield)
|
||||
(thread-yield)))))))
|
||||
|
||||
(cl-defmethod jupyter-comm-stop ((comm jupyter-sync-channel-comm))
|
||||
(when (and (slot-boundp comm 'thread)
|
||||
(thread-alive-p (oref comm thread)))
|
||||
(thread-signal (oref comm thread) 'quit nil)
|
||||
(slot-makeunbound comm 'thread))
|
||||
(cl-loop
|
||||
for channel in '(hb shell iopub stdin)
|
||||
do (jupyter-stop-channel (slot-value comm channel))))
|
||||
|
||||
(cl-defmethod jupyter-comm-alive-p ((comm jupyter-sync-channel-comm))
|
||||
(jupyter-channels-running-p comm))
|
||||
|
||||
(cl-defmethod jupyter-channel-alive-p ((comm jupyter-sync-channel-comm) channel)
|
||||
(and (slot-boundp comm (jupyter-comm--channel channel))
|
||||
(jupyter-channel-alive-p (slot-value comm (jupyter-comm--channel channel)))))
|
||||
|
||||
(cl-defmethod jupyter-channels-running-p ((comm jupyter-sync-channel-comm))
|
||||
(cl-loop
|
||||
for channel in '(:shell :iopub :stdin :hb)
|
||||
thereis (jupyter-channel-alive-p comm channel)))
|
||||
|
||||
;;;; Channel start/stop methods
|
||||
|
||||
(cl-defmethod jupyter-stop-channel ((comm jupyter-sync-channel-comm) channel)
|
||||
(when (jupyter-channel-alive-p comm channel)
|
||||
(jupyter-stop-channel
|
||||
(slot-value comm (jupyter-comm--channel channel)))))
|
||||
|
||||
(cl-defmethod jupyter-start-channel ((comm jupyter-sync-channel-comm) channel)
|
||||
(unless (jupyter-channel-alive-p comm channel)
|
||||
(jupyter-start-channel
|
||||
(slot-value comm (jupyter-comm--channel channel)))))
|
||||
|
||||
(cl-defmethod jupyter-initialize-connection ((comm jupyter-sync-channel-comm)
|
||||
(session jupyter-session))
|
||||
(cl-call-next-method)
|
||||
(let ((endpoints (jupyter-session-endpoints session)))
|
||||
(oset comm session (copy-sequence session))
|
||||
(oset comm hb (make-instance
|
||||
'jupyter-hb-channel
|
||||
:session (oref comm session)
|
||||
:endpoint (plist-get endpoints :hb)))
|
||||
(cl-loop
|
||||
for channel in `(:stdin :shell :iopub)
|
||||
do (setf (slot-value comm (jupyter-comm--channel channel))
|
||||
(jupyter-sync-channel
|
||||
:type channel
|
||||
:session (oref comm session)
|
||||
:endpoint (plist-get endpoints channel))))))
|
||||
|
||||
(cl-defmethod jupyter-send ((comm jupyter-sync-channel-comm)
|
||||
_ channel-type msg-type msg msg-id)
|
||||
(let ((channel (slot-value comm (jupyter-comm--channel channel-type))))
|
||||
;; Run the event handler on the next command loop since the expectation is
|
||||
;; the client is that sending is asynchronous. There may be some code that
|
||||
;; makes assumptions based on this.
|
||||
(run-at-time
|
||||
0 nil (lambda (id)
|
||||
(jupyter-event-handler comm (list 'sent channel-type id)))
|
||||
(jupyter-send channel msg-type msg msg-id))))
|
||||
|
||||
(provide 'jupyter-channels)
|
||||
|
||||
;;; jupyter-channels.el ends here
|
||||
;;; jupyter-zmq-channel.el ends here
|
|
@ -28,6 +28,7 @@
|
|||
;;; Code:
|
||||
|
||||
(require 'zmq)
|
||||
(require 'jupyter-zmq-channel-comm)
|
||||
(require 'jupyter-env)
|
||||
(require 'jupyter-client)
|
||||
(require 'jupyter-repl)
|
||||
|
@ -500,9 +501,9 @@
|
|||
|
||||
;;; Channels
|
||||
|
||||
(ert-deftest jupyter-sync-channel ()
|
||||
(ert-deftest jupyter-zmq-channel ()
|
||||
:tags '(channels)
|
||||
(let ((channel (jupyter-sync-channel
|
||||
(let ((channel (jupyter-zmq-channel
|
||||
:type :shell
|
||||
:endpoint "tcp://127.0.0.1:5555")))
|
||||
(ert-info ("Starting the channel")
|
||||
|
@ -637,7 +638,7 @@
|
|||
(jupyter-start-kernel manager)
|
||||
(setq process (oref kernel process))
|
||||
(setq control-channel (oref manager control-channel))
|
||||
(should (jupyter-sync-channel-p control-channel))
|
||||
(should (jupyter-zmq-channel-p control-channel))
|
||||
(should (jupyter-kernel-alive-p manager))
|
||||
(should (jupyter-kernel-alive-p kernel))
|
||||
(jupyter-shutdown-kernel manager)
|
||||
|
@ -647,7 +648,7 @@
|
|||
(should-not (jupyter-kernel-alive-p manager))
|
||||
(should-not (jupyter-kernel-alive-p kernel)))
|
||||
(setq control-channel (oref manager control-channel))
|
||||
(should-not (jupyter-sync-channel-p control-channel))))))
|
||||
(should-not (jupyter-zmq-channel-p control-channel))))))
|
||||
|
||||
(ert-deftest jupyter-command-kernel ()
|
||||
:tags '(kernel)
|
||||
|
@ -684,7 +685,7 @@
|
|||
;; The default comm is a jupyter-channel-ioloop-comm
|
||||
(let ((conn-info (jupyter-test-conn-info-plist))
|
||||
(client (jupyter-kernel-client)))
|
||||
(oset client kcomm (jupyter-sync-channel-comm))
|
||||
(oset client kcomm (jupyter-zmq-channel-comm))
|
||||
(jupyter-initialize-connection client conn-info)
|
||||
;; kcomm by default is a `jupyter-channel-ioloop-comm'
|
||||
(with-slots (session kcomm) client
|
||||
|
@ -749,7 +750,7 @@
|
|||
(ert-info ("Starting/stopping channels")
|
||||
(let ((conn-info (jupyter-test-conn-info-plist))
|
||||
(client (jupyter-kernel-client)))
|
||||
(oset client kcomm (jupyter-sync-channel-comm))
|
||||
(oset client kcomm (jupyter-zmq-channel-comm))
|
||||
(jupyter-initialize-connection client conn-info)
|
||||
(cl-loop
|
||||
for channel in '(:hb :shell :iopub :stdin)
|
||||
|
@ -931,7 +932,7 @@
|
|||
(with-temp-buffer
|
||||
(cl-letf ((ioloop (jupyter-channel-ioloop))
|
||||
(standard-output (current-buffer))
|
||||
(jupyter-ioloop-channels (list (jupyter-sync-channel :type :shell)))
|
||||
(jupyter-ioloop-channels (list (jupyter-zmq-channel :type :shell)))
|
||||
((symbol-function #'jupyter-send)
|
||||
(lambda (_channel _msg-type _msg msg-id) msg-id)))
|
||||
(let ((msg-id (jupyter-new-uuid)))
|
||||
|
@ -954,7 +955,7 @@
|
|||
(jupyter-ioloop-test-eval-ioloop
|
||||
ioloop `(list 'start-channel :shell ,channel-endpoint))
|
||||
(should (not (null jupyter-ioloop-channels)))
|
||||
(should (jupyter-sync-channel-p (car jupyter-ioloop-channels)))
|
||||
(should (jupyter-zmq-channel-p (car jupyter-ioloop-channels)))
|
||||
(let ((channel (car jupyter-ioloop-channels)))
|
||||
(with-slots (type socket endpoint) channel
|
||||
(ert-info ("Verify the requested channel was started")
|
||||
|
@ -978,7 +979,7 @@
|
|||
(with-temp-buffer
|
||||
(let* ((ioloop (jupyter-channel-ioloop))
|
||||
(standard-output (current-buffer))
|
||||
(channel (jupyter-sync-channel
|
||||
(channel (jupyter-zmq-channel
|
||||
:type :shell
|
||||
:endpoint "tcp://127.0.0.1:5555"))
|
||||
(jupyter-ioloop-channels (list channel))
|
||||
|
|
Loading…
Add table
Reference in a new issue