From b2294dceb24f49efab6b678886f625fbd5e7c1f0 Mon Sep 17 00:00:00 2001 From: Nathaniel Nicandro Date: Mon, 8 Apr 2019 11:42:00 -0500 Subject: [PATCH] Generalize communication with a kernel The previous mechanism to communicate with a kernel was too low level from the perspective of a client. The client interfaced directly with the subprocess abstraction, `jupyter-ioloop`, and had to handle all "events" that occurred in the `jupyter-ioloop`, e.g. when a channel was started or stopped. But in reality such events should not be the concern of a client. A client should only care about events that are directly related to kernel messages and not events related to the implementation details of *how* communication occurs. This commit abstracts out the way in which a client communicates with its kernel by introducing a new `jupyter-comm-layer` class. The `jupyter-comm-layer` class takes care of managing the communication channel between a kernel and its clients as well as sending events to all registered clients. This way, clients operate solely at the level of events on the communication layer. All a client does is register itself to receive events on the communication layer and send events on the layer. * jupyter-base.el (jupyter-session-endpoints): New function. * jupyter-client.el (jupyter-kernel-client): Remove ioloop and channels slots. Add kcomm slot. (initialize-instance): Unconditionally stop channels. (jupyter-initialize-connection): Change into a method call. Call `jupyter-initialize-connection` on the `kcomm` slot. (jupyter-with-client-buffer): Remove stale comment. (jupyter-send): Call `jupyter-send` on the `kcomm` slot. (jupyter-ioloop-handler): Remove all method definitions, replace `sent` and `message` methods with their `jupyter-event-handler` equivalents. (jupyter-hb-pause, jupyter-hb-unpause, jupyter-hb-beating): (jupyter-channel-alive-p, jupyter-start-channel, jupyter-stop-channel): (jupyter-start-channels, jupyter-stop-channels): Replace with calls to their equivalents using the `kcomm` slot. * jupyter-comm-layer.el: New file. * jupyter-kernel-manager (jupyter-make-client): Set a client's `kcomm` slot to `jupyter-channel-ioloop-comm`. * jupyter-messages.el (jupyter-decode-message): Use `list` directly. There seemed to be issues when using the new `jupyter-sync-channel-comm` due to using quoted lists. * test/jupyter-test.el: Add `jupyter-comm-layer` test. Update other tests. * test/test-helper.el: Add `jupyter-comm-layer` mock objects. Update `jupyter-echo-client`. --- jupyter-base.el | 16 ++ jupyter-client.el | 294 ++++++-------------------- jupyter-comm-layer.el | 426 ++++++++++++++++++++++++++++++++++++++ jupyter-kernel-manager.el | 11 +- jupyter-messages.el | 10 +- test/jupyter-test.el | 38 +++- test/test-helper.el | 35 +++- 7 files changed, 590 insertions(+), 240 deletions(-) create mode 100644 jupyter-comm-layer.el diff --git a/jupyter-base.el b/jupyter-base.el index 395998e..d79c73e 100644 --- a/jupyter-base.el +++ b/jupyter-base.el @@ -484,6 +484,22 @@ fields: (id nil :read-only t) (key nil :read-only t)) +(cl-defmethod jupyter-session-endpoints ((session jupyter-session)) + "Return a property list containing the endpoints from SESSION." + (cl-destructuring-bind + (&key shell_port iopub_port stdin_port hb_port ip transport + &allow-other-keys) + (jupyter-session-conn-info session) + (cl-assert (and transport ip)) + (let ((addr (lambda (port) (format "%s://%s:%d" transport ip port)))) + (cl-loop + for (channel . port) in `((:hb . ,hb_port) + (:stdin . ,stdin_port) + (:shell . ,shell_port) + (:iopub . ,iopub_port)) + do (cl-assert port) and + collect channel and collect (funcall addr port))))) + ;;; Request object definition (cl-defstruct (jupyter-request diff --git a/jupyter-client.el b/jupyter-client.el index 2e6435b..3a1a952 100644 --- a/jupyter-client.el +++ b/jupyter-client.el @@ -33,8 +33,7 @@ (eval-when-compile (require 'subr-x)) (require 'jupyter-base) -(require 'jupyter-channels) -(require 'jupyter-channel-ioloop) +(require 'jupyter-comm-layer) (require 'jupyter-mime) (require 'jupyter-messages) @@ -108,9 +107,10 @@ requests like the above example.") (pending-requests :type ring :initform (make-ring 10) - :documentation "A ring of pending `jupyter-request's. -A request is pending if it has not been sent to the kernel via the -client's ioloop slot.") + :documentation "A ring of pending `jupyter-request's. A +request is pending if a notification has not been received by the +client that the message has actually been sent by the +communication layer. See the kcomm slot.") (execution-state :type string :initform "idle" @@ -142,9 +142,8 @@ client is expecting a reply from the kernel.") initializing this client. When `jupyter-start-channels' is called, this will be set to the kernel info plist returned from an initial `:kernel-info-request'.") - (ioloop - :type (or null jupyter-channel-ioloop) - :initform nil + (kcomm + :type jupyter-comm-layer :documentation "The process which receives events from channels.") (session :type jupyter-session @@ -165,17 +164,7 @@ initialized the client.") (-buffer :type buffer :documentation "An internal buffer used to store client local -variables and intermediate ioloop process output. When the ioloop -slot is non-nil, its `process-buffer' will be `eq' to this -buffer.") - (channels - :type list - :initform nil - :initarg :channels - :documentation "A property list describing the channels. -The keys are channel types whose values are the status of the -channels. The exception is the heartbeat channel. The value of -the :hb key is a `jupyter-hb-channel'."))) +variables."))) ;;; `jupyter-current-client' language method specializer @@ -209,10 +198,7 @@ passed as the argument has a language of LANG." (lambda () (when (buffer-live-p buffer) (kill-buffer buffer)) - ;; Ensure the ioloop process gets cleaned up when the client goes out - ;; of scope. - (when (jupyter-channels-running-p client) - (jupyter-stop-channels client)))))) + (jupyter-stop-channels client))))) (defun jupyter-clients () "Return a list of all `jupyter-kernel-clients'." @@ -243,12 +229,13 @@ See `jupyter-initialize-connection'." (list info-or-session '(or jupyter-session-p json-plist-p stringp)))))) -(defun jupyter-initialize-connection (client info-or-session) +;; NOTE: This requires that CLIENT is communicating with a kernel using a +;; `jupyter-channel-ioloop-comm' object. +(cl-defmethod jupyter-initialize-connection ((client jupyter-kernel-client) info-or-session) "Initialize CLIENT with connection INFO-OR-SESSION. INFO-OR-SESSION can be a file name, a plist, or a `jupyter-session' object that will be used to initialize CLIENT's -connection. If CLIENT is already connected to a kernel, its -connection is first terminated before initializing a new one. +connection. When INFO-OR-SESSION is a file name, read the contents of the file as a JSON plist and create a new `jupyter-session' from it. @@ -267,43 +254,22 @@ connection and will be accessible as the session slot of CLIENT. The necessary keys and values to initialize a connection can be found at http://jupyter-client.readthedocs.io/en/latest/kernels.html#connection-files." - (cl-check-type client jupyter-kernel-client) (let ((session (and (jupyter-session-p info-or-session) info-or-session)) (conn-info (jupyter--connection-info info-or-session))) - (cl-destructuring-bind - (&key shell_port iopub_port stdin_port hb_port ip - key transport signature_scheme - &allow-other-keys) + (cl-destructuring-bind (&key key signature_scheme &allow-other-keys) conn-info (when (and (> (length key) 0) (not (functionp (intern (concat "jupyter-" signature_scheme))))) - (error "Unsupported signature scheme: %s" signature_scheme)) - ;; Stop the channels if connected to some other kernel - (when (jupyter-channels-running-p client) - (jupyter-stop-channels client)) - ;; Initialize the channels - (unless session - (setq session (jupyter-session :key key :conn-info conn-info))) - (oset client session session) - (let ((addr (lambda (port) (format "%s://%s:%d" transport ip port)))) - (setf (oref client channels) - ;; Construct the channel plist - (cl-list* - :hb (make-instance - 'jupyter-hb-channel - :session session - :endpoint (funcall addr hb_port)) - (cl-loop - for (channel . port) in `((:stdin . ,stdin_port) - (:shell . ,shell_port) - (:iopub . ,iopub_port)) - collect channel - and collect - ;; The session will be associated with these channels in the - ;; ioloop subprocess. See `jupyter-start-channels'. - (list :endpoint (funcall addr port) - :alive-p nil)))))))) + (error "Unsupported signature scheme: %s" signature_scheme))) + (oset client session + (or (copy-sequence session) + (jupyter-session + :key (plist-get conn-info :key) + :conn-info conn-info))) + (jupyter-initialize-connection + (oref client kcomm) + (oref client session)))) ;;; Client local variables @@ -314,11 +280,6 @@ subprocess buffer." (declare (indent 1)) `(progn (cl-check-type ,client jupyter-kernel-client) - ;; NOTE: -buffer will be set as the IOLoop process buffer, see - ;; `jupyter-start-channels', but before the IOLoop process is started we - ;; would like to have a buffer available so that client local variables - ;; can be set on the buffer. This is why we create our own buffer when a - ;; client is initialized. (with-current-buffer (oref ,client -buffer) ,@body))) @@ -409,19 +370,16 @@ Note that you can manipulate how to handle messages received in response to the sent message, see `jupyter-add-callback' and `jupyter-request-inhibited-handlers'." (declare (indent 1)) - (let ((ioloop (oref client ioloop))) - (unless ioloop - (signal 'wrong-type-argument (list 'jupyter-ioloop ioloop 'ioloop))) - (jupyter-verify-inhibited-handlers) - (let ((msg-id (or msg-id (jupyter-new-uuid)))) - (jupyter-send ioloop 'send channel type message msg-id) - ;; Anything sent to stdin is a reply not a request so don't add it as a - ;; pending request - (unless (eq channel :stdin) - (let ((req (jupyter-generate-request client message))) - (setf (jupyter-request-id req) msg-id) - (setf (jupyter-request-inhibited-handlers req) jupyter-inhibit-handlers) - (jupyter--push-pending-request client req)))))) + (jupyter-verify-inhibited-handlers) + (let ((msg-id (or msg-id (jupyter-new-uuid)))) + (jupyter-send (oref client kcomm) 'send channel type message msg-id) + ;; Anything sent to stdin is a reply not a request so don't add it as a + ;; pending request + (unless (eq channel :stdin) + (let ((req (jupyter-generate-request client message))) + (setf (jupyter-request-id req) msg-id) + (setf (jupyter-request-inhibited-handlers req) jupyter-inhibit-handlers) + (jupyter--push-pending-request client req))))) ;;; Pending requests @@ -448,33 +406,18 @@ back." (jupyter--drop-idle-requests client) (with-slots (pending-requests requests) client (or (> (ring-length pending-requests) 0) - ;; If there are two requests, then there is really only one since - ;; "last-sent" is an alias for the other. + ;; If there are two requests, then there is really only one since + ;; "last-sent" is an alias for the other. (> (hash-table-count requests) 2) (when-let* ((last-sent (gethash "last-sent" requests))) (not (jupyter-request-idle-received-p last-sent)))))) -;;; HB channel methods - -(cl-defmethod jupyter-hb-pause ((client jupyter-kernel-client)) - "Pause CLIENT's heartbeat channel." - (jupyter-hb-pause (plist-get (oref client channels) :hb))) - -(cl-defmethod jupyter-hb-unpause ((client jupyter-kernel-client)) - "Unpause CLIENT's heartbeat channel." - (jupyter-hb-unpause (plist-get (oref client channels) :hb))) - -(cl-defmethod jupyter-hb-beating-p ((client jupyter-kernel-client)) - "Is CLIENT still connected to its kernel?" - (jupyter-hb-beating-p (plist-get (oref client channels) :hb))) - -;;; IOLoop handlers (receiving messages, starting/stopping channels) +;;; Event handlers ;;;; Sending/receiving -(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop) - (client jupyter-kernel-client) - (event (head sent))) +(cl-defmethod jupyter-event-handler ((client jupyter-kernel-client) + (event (head sent))) (cl-destructuring-bind (_ channel-type msg-id) event (unless (eq channel-type :stdin) ;; Anything sent on stdin is a reply and therefore never added as a @@ -486,145 +429,46 @@ back." (puthash msg-id req requests) (puthash "last-sent" req requests))))) -(cl-defmethod jupyter-ioloop-printer ((_ioloop jupyter-ioloop) - (_client jupyter-kernel-client) - (event (head message))) +(cl-defmethod jupyter-event-handler ((client jupyter-kernel-client) + (event (head message))) (cl-destructuring-bind (_ channel _idents . msg) event - (format "%s" (list - channel - (jupyter-message-type msg) - (jupyter-message-content msg))))) + (when jupyter--debug + (message "%s" (concat (upcase (symbol-name (car event))) ": " + (format "%s" (list + channel + (jupyter-message-type msg) + (jupyter-message-content msg)))))) + (jupyter-handle-message client channel msg))) -(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop) - (client jupyter-kernel-client) - (event (head message))) - "For CLIENT, queue a message EVENT to be handled." - (cl-destructuring-bind (_ channel _idents . msg) event - ;; Run immediately after handling this event, i.e. on the next command loop - (run-at-time 0 nil #'jupyter-handle-message client channel msg))) +;;; Starting communication with a kernel -;;;; Channel alive methods - -(cl-defmethod jupyter-channel-alive-p ((client jupyter-kernel-client) channel) - (cl-assert (memq channel '(:hb :stdin :shell :iopub)) t) - (with-slots (ioloop channels) client - (if (not (eq channel :hb)) - (when (and ioloop (jupyter-ioloop-alive-p ioloop)) - (plist-get (plist-get channels channel) :alive-p)) - (setq channel (plist-get channels :hb)) - ;; The hb channel is implemented locally in the current process whereas the - ;; other channels are implemented in subprocesses and the current process - ;; acts as a proxy. - (and channel (jupyter-channel-alive-p channel))))) - -;;;; Start channel methods - -(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop) - (client jupyter-kernel-client) - (event (head start-channel))) - (plist-put (plist-get (oref client channels) (cadr event)) :alive-p t)) - -(cl-defmethod jupyter-start-channel ((client jupyter-kernel-client) channel) - (cl-assert (memq channel '(:hb :stdin :shell :iopub)) t) - (unless (jupyter-channel-alive-p client channel) - (with-slots (channels) client - (if (eq channel :hb) - (jupyter-start-channel (plist-get channels :hb)) - (cl-destructuring-bind (&key endpoint &allow-other-keys) - (plist-get channels channel) - (jupyter-send - (oref client ioloop) 'start-channel channel endpoint)))))) - -(cl-defmethod jupyter-start-channel :after ((client jupyter-kernel-client) channel) - "Verify that CLIENT's CHANNEL started. -Raise an error if it did not start within -`jupyter-default-timeout'." - (unless (or (eq channel :hb) (jupyter-channel-alive-p client channel)) - (with-slots (ioloop) client - (or (jupyter-ioloop-wait-until ioloop 'start-channel - (lambda (_) (jupyter-channel-alive-p client channel))) - (error "Channel not started in ioloop subprocess (%s)" channel))))) - -;;;; Stop channel methods - -(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop) - (client jupyter-kernel-client) - (event (head stop-channel))) - (plist-put (plist-get (oref client channels) (cadr event)) :alive-p nil)) - -(cl-defmethod jupyter-stop-channel ((client jupyter-kernel-client) channel) - (cl-assert (memq channel '(:hb :stdin :shell :iopub)) t) - (when (jupyter-channel-alive-p client channel) - (if (eq channel :hb) - (jupyter-stop-channel (plist-get (oref client channels) :hb)) - (jupyter-send (oref client ioloop) 'stop-channel channel)))) - -(cl-defmethod jupyter-stop-channel :after ((client jupyter-kernel-client) channel) - "Verify that CLIENT's CHANNEL has stopped. -Raise a warning if it has not been stopped within -`jupyter-default-timeout'." - (unless (or (eq channel :hb) (not (jupyter-channel-alive-p client channel))) - (with-slots (ioloop) client - (or (jupyter-ioloop-wait-until ioloop 'stop-channel - (lambda (_) (not (jupyter-channel-alive-p client channel)))) - (warn "Channel not stopped in ioloop subprocess"))))) - -;;; Starting/stopping IOLoop - -(cl-defmethod jupyter-start-channels :before ((client jupyter-kernel-client) - &rest _) - "Start CLIENT's channel ioloop." - (with-slots (ioloop session) client - (unless ioloop - (oset client ioloop (jupyter-channel-ioloop)) - (setq ioloop (oref client ioloop))) - (unless (jupyter-ioloop-alive-p ioloop) - (jupyter-ioloop-start ioloop session client)))) - -(cl-defmethod jupyter-start-channels ((client jupyter-kernel-client) - &key (shell t) - (iopub t) - (stdin t) - (hb t)) - "Start the pre-configured channels of CLIENT. -Before starting the channels, ensure that the channel subprocess -responsible for encoding/decoding messages and sending/receiving -messages to/from the kernel is running. - -Call `jupyter-start-channel' for every channel whose key has a -non-nil value passed to this function. - -If the shell channel is started, send an initial -`:kernel-info-request' to set the kernel-info slot of CLIENT if -necessary." - (cl-loop - for (channel . start) in `((:hb . ,hb) - (:shell . ,shell) - (:iopub . ,iopub) - (:stdin . ,stdin)) - when start do (jupyter-start-channel client channel)) - ;; Needed for reliability. Sometimes we are not fast enough to capture the - ;; startup message of a kernel. - (sleep-for 0.3)) +(cl-defmethod jupyter-start-channels ((client jupyter-kernel-client)) + (jupyter-connect-client (oref client kcomm) client)) (cl-defmethod jupyter-stop-channels ((client jupyter-kernel-client)) "Stop any running channels of CLIENT." - (cl-loop - for channel in '(:shell :iopub :stdin :hb) - do (jupyter-stop-channel client channel))) - -(cl-defmethod jupyter-stop-channels :after ((client jupyter-kernel-client) - &rest _) - "Stop CLIENT's channel ioloop." - (with-slots (ioloop) client - (when ioloop - (jupyter-ioloop-stop ioloop)))) + (when (slot-boundp client 'kcomm) + (jupyter-disconnect-client (oref client kcomm) client))) (cl-defmethod jupyter-channels-running-p ((client jupyter-kernel-client)) "Are any channels of CLIENT running?" - (cl-loop - for channel in '(:shell :iopub :stdin :hb) - thereis (jupyter-channel-alive-p client channel))) + (jupyter-comm-alive-p (oref client kcomm))) + +(cl-defmethod jupyter-channel-alive-p ((client jupyter-kernel-client) channel) + (jupyter-channel-alive-p (oref client kcomm) channel)) + +(cl-defmethod jupyter-hb-pause ((client jupyter-kernel-client)) + (when (cl-typep (oref client kcomm) 'jupyter-hb-comm) + (jupyter-hb-pause (oref client kcomm)))) + +(cl-defmethod jupyter-hb-unpause ((client jupyter-kernel-client)) + (when (cl-typep (oref client kcomm) 'jupyter-hb-comm) + (jupyter-hb-unpause (oref client kcomm)))) + +(cl-defmethod jupyter-hb-beating-p ((client jupyter-kernel-client)) + "Is CLIENT still connected to its kernel?" + (or (null (cl-typep (oref client kcomm) 'jupyter-hb-comm)) + (jupyter-hb-beating-p (oref client kcomm)))) ;;; Message callbacks diff --git a/jupyter-comm-layer.el b/jupyter-comm-layer.el new file mode 100644 index 0000000..3fc3581 --- /dev/null +++ b/jupyter-comm-layer.el @@ -0,0 +1,426 @@ +;;; jupyter-comm-layer.el --- Kernel communication layer -*- lexical-binding: t -*- + +;; Copyright (C) 2019 Nathaniel Nicandro + +;; Author: Nathaniel Nicandro +;; Created: 06 Apr 2019 +;; Version: 0.7.3 + +;; This program is free software; you can redistribute it and/or +;; modify it under the terms of the GNU General Public License as +;; published by the Free Software Foundation; either version 2, or (at +;; your option) any later version. + +;; This program is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with GNU Emacs; see the file COPYING. If not, write to the +;; Free Software Foundation, Inc., 59 Temple Place - Suite 330, +;; Boston, MA 02111-1307, USA. + +;;; Commentary: + +;; Communication with a kernel can happen in various ways, e.g. through zmq +;; sockets, a websocket, and potentially others. +;; +;; The purpose of this file is to implement a kernel communication layer to +;; abstract away how a client communicates with the kernel it is connected to. +;; +;; A specific kernel communication layer (kcomm for short) is implemented by +;; extending the methods: `jupyter-comm-start', `jupyter-comm-stop', +;; `jupyter-comm-alive-p',`jupyter-event-handler', `jupyter-send', and possibly +;; `jupyter-initialize-connection'. +;; +;; A client registers with the kcomm by calling `jupyter-connect-client' and +;; de-registers with `jupyter-disconnect-client'. The communication layer deals +;; with "events" which are just lists with an identifying symbol as the head +;; element. Events that occur on the communication layer meant for clients, +;; e.g. a message received by a kernel or notification that a message was sent +;; to a kernel, will be broadcast to all registered clients. Every client +;; wanting to receive such events must extend the method +;; `jupyter-event-handler' using the head method specializer. +;; +;; An event is sent to the kernel using `jupyter-send'. So that sending an +;; event to the communication layer would look like +;; +;; (jupyter-send kcomm 'send channel-type msg-type msg msg-id) +;; +;; The possible events that can be handled by a client is dependent on the +;; communication layer, but a `jupyter-kernel-client' implements handlers for a +;; `message' event (a kernel message) and a `sent' event (a notification that a +;; message was sent to a kernel). + +;;; Code: + +(eval-when-compile (require 'subr-x)) +(require 'jupyter-base) +(require 'jupyter-channel-ioloop) +(require 'jupyter-messages) + +(defgroup jupyter-comm-layer nil + "Kernel communication layer" + :group 'jupyter) + +(defclass jupyter-comm-layer () + ((clients :type list :initform nil)) + :abstract t) + +;;; `jupyter-comm-layer' + +(cl-defgeneric jupyter-comm-start ((comm jupyter-comm-layer) &rest _) + "Start communication on COMM.") + +(cl-defgeneric jupyter-comm-stop ((comm jupyter-comm-layer) &rest _) + "Stop communication on COMM.") + +(cl-defgeneric jupyter-comm-alive-p ((comm jupyter-comm-layer)) + "Return non-nil if communication has started on COMM.") + +(cl-defgeneric jupyter-connect-client ((comm jupyter-comm-layer) obj) + "Register OBJ to receive events from COMM. +By default, on the first OBJ connected, `jupyter-comm-start' is +called if needed. This means that a call to +`jupyter-initialize-connection' should precede a call to +`jupyter-connect-client'.") + +(cl-defgeneric jupyter-disconnect-client ((comm jupyter-comm-layer) obj) + "De-register OBJ from receiving events from COMM. +By default, on the last OBJ removed, `jupyter-comm-stop' is +called if needed.") + +(cl-defgeneric jupyter-event-handler (_obj _event) + "Handle EVENT using OBJ." + nil) + +(cl-defmethod jupyter-send ((_comm jupyter-comm-layer) &rest _event) + "Send EVENT to the underlying kernel using COMM." + (error "Subclasses need to override this method")) + +(cl-defgeneric jupyter-initialize-connection ((comm jupyter-comm-layer) &rest _) + "Initialize communication on COMM." + (when (jupyter-comm-alive-p comm) + (error "Can't initialize a live comm"))) + +;; TODO: Figure out a better interface for these channel methods or just make +;; them unnecessary. The design of `jupyter-comm-layer' only deals with +;; "events" and the channel abstraction is an implementation detail that +;; shouldn't be visible to the client. + +(cl-defgeneric jupyter-channels-running-p ((comm jupyter-comm-layer)) + "Are any channels of CLIENT running?") + +(cl-defmethod jupyter-channel-alive-p ((_comm jupyter-comm-layer) _channel) + (error "Need to implement")) + +(cl-defmethod jupyter-connect-client ((comm jupyter-comm-layer) obj) + (unless (cl-loop for ref in (oref comm clients) + thereis (eq (jupyter-weak-ref-resolve ref) obj)) + (push (jupyter-weak-ref obj) (oref comm clients))) + ;; Remove any garbage collected clients + (oset comm clients + (cl-remove-if-not #'jupyter-weak-ref-resolve + (oref comm clients))) + (unless (jupyter-comm-alive-p comm) + (jupyter-comm-start comm))) + +(cl-defmethod jupyter-disconnect-client ((comm jupyter-comm-layer) obj) + (oset comm clients + (cl-remove-if (lambda (ref) + (let ((deref (jupyter-weak-ref-resolve ref))) + (or (eq deref obj) (null deref)))) + (oref comm clients))) + ;; FIXME: This is more of a convenience and it probably makes sense to keep + ;; the comm open even though there are no clients. + (when (and (jupyter-comm-alive-p comm) + (zerop (length (oref comm clients)))) + (jupyter-comm-stop comm))) + +(cl-defmethod jupyter-event-handler ((comm jupyter-comm-layer) event) + "Broadcast EVENT to all clients registered to receive them on COMM." + ;; TODO: Dynamically cleanup list of garbage collected clients when looping + ;; over it. + (let ((clients (oref comm clients))) + (while clients + (when-let* ((client (jupyter-weak-ref-resolve (pop clients)))) + (run-at-time 0 nil #'jupyter-event-handler client event))))) + +;;; `jupyter-hb-comm' +;; If the communication layer can talk to a heartbeat channel, then it should +;; add this class as a parent class. + +(defclass jupyter-hb-comm () + ((hb :type jupyter-hb-channel)) + :abstract t) + +(cl-defmethod jupyter-hb-beating-p ((comm jupyter-hb-comm)) + (jupyter-hb-beating-p (oref comm hb))) + +(cl-defmethod jupyter-hb-pause ((comm jupyter-hb-comm)) + (jupyter-hb-pause (oref comm hb))) + +(cl-defmethod jupyter-hb-unpause ((comm jupyter-hb-comm)) + (jupyter-hb-unpause (oref comm hb))) + +;;; `jupyter-channel-comm' +;; A communication layer using `jupyter-sync-channel' objects for communicating +;; with a kernel. This communication layer is mainly meant for speed comparison +;; with the `jupyter-channel-ioloop-comm' layer. It implements communication in +;; the current Emacs instance and comparing it with the +;; `jupyter-channel-ioloop-comm' shows how much of a slow down there is when +;; all the processing of messages happens in the current Emacs instance. +;; +;; Running the test suit using `jupyter-channel-comm' vs +;; `jupyter-channel-ioloop-comm' shows, very roughly, around a 2x speed up +;; using `jupyter-channel-ioloop-comm'. + +;; FIXME: This is needed since the `jupyter-kernel-client' and +;; `jupyter-channel-ioloop' use keywords whereas you can only access slots +;; using symbols. +(defsubst jupyter-comm--channel (c) + (cl-case c + (:hb 'hb) + (:iopub 'iopub) + (:shell 'shell) + (:stdin 'stdin))) + +(defclass jupyter-sync-channel-comm (jupyter-comm-layer + jupyter-hb-comm) + ((session :type jupyter-session) + (iopub :type jupyter-sync-channel) + (shell :type jupyter-sync-channel) + (stdin :type jupyter-sync-channel) + (thread))) + +(cl-defmethod initialize-instance ((_comm jupyter-sync-channel-comm) &rest _) + (unless (functionp 'make-thread) + (error "Need threading support")) + (cl-call-next-method)) + +(defun jupyter-sync-channel-comm--check (comm) + (condition-case err + (cl-loop + for channel-type in '(:iopub :shell :stdin) + for channel = (slot-value comm (jupyter-comm--channel channel-type)) + for msg = (and (jupyter-channel-alive-p channel) + (with-slots (session socket) channel + (condition-case nil + (jupyter-recv session socket zmq-DONTWAIT) + ((zmq-EINTR zmq-EAGAIN) nil)))) + when msg do (jupyter-event-handler + comm (cl-list* 'message channel-type msg))) + (error + (thread-signal (car (all-threads)) (car err) + (cons 'jupyter-sync-channel-comm--check (cdr err))) + (signal (car err) (cdr err))))) + +(cl-defmethod jupyter-comm-start ((comm jupyter-sync-channel-comm)) + (cl-loop + for channel in '(hb shell iopub stdin) + do (jupyter-start-channel (slot-value comm channel))) + (oset comm thread + (make-thread + (let ((comm-ref (jupyter-weak-ref comm))) + (lambda () + (while (when-let* ((comm (jupyter-weak-ref-resolve comm-ref))) + (prog1 comm + (jupyter-sync-channel-comm--check comm))) + (thread-yield) + (thread-yield))))))) + +(cl-defmethod jupyter-comm-stop ((comm jupyter-sync-channel-comm)) + (when (and (slot-boundp comm 'thread) + (thread-alive-p (oref comm thread))) + (thread-signal (oref comm thread) 'quit nil) + (slot-makeunbound comm 'thread)) + (cl-loop + for channel in '(hb shell iopub stdin) + do (jupyter-stop-channel (slot-value comm channel)))) + +(cl-defmethod jupyter-comm-alive-p ((comm jupyter-sync-channel-comm)) + (jupyter-channels-running-p comm)) + +(cl-defmethod jupyter-channel-alive-p ((comm jupyter-sync-channel-comm) channel) + (and (slot-boundp comm (jupyter-comm--channel channel)) + (jupyter-channel-alive-p (slot-value comm (jupyter-comm--channel channel))))) + +(cl-defmethod jupyter-channels-running-p ((comm jupyter-sync-channel-comm)) + (cl-loop + for channel in '(:shell :iopub :stdin :hb) + thereis (jupyter-channel-alive-p comm channel))) + +;;;; Channel start/stop methods + +(cl-defmethod jupyter-stop-channel ((comm jupyter-sync-channel-comm) channel) + (when (jupyter-channel-alive-p comm channel) + (jupyter-stop-channel + (slot-value comm (jupyter-comm--channel channel))))) + +(cl-defmethod jupyter-start-channel ((comm jupyter-sync-channel-comm) channel) + (unless (jupyter-channel-alive-p comm channel) + (jupyter-start-channel + (slot-value comm (jupyter-comm--channel channel))))) + +(cl-defmethod jupyter-initialize-connection ((comm jupyter-sync-channel-comm) + (session jupyter-session)) + (cl-call-next-method) + (let ((endpoints (jupyter-session-endpoints session))) + (oset comm session (copy-sequence session)) + (oset comm hb (make-instance + 'jupyter-hb-channel + :session (oref comm session) + :endpoint (plist-get endpoints :hb))) + (cl-loop + for channel in `(:stdin :shell :iopub) + do (setf (slot-value comm (jupyter-comm--channel channel)) + (jupyter-sync-channel + :type channel + :session (oref comm session) + :endpoint (plist-get endpoints channel)))))) + +(cl-defmethod jupyter-send ((comm jupyter-sync-channel-comm) + _ channel-type msg-type msg msg-id) + (let ((channel (slot-value comm (jupyter-comm--channel channel-type)))) + ;; Run the event handler on the next command loop since the expectation is + ;; the client is that sending is asynchronous. There may be some code that + ;; makes assumptions based on this. + (run-at-time + 0 nil (lambda (id) + (jupyter-event-handler comm (list 'sent channel-type id))) + (jupyter-send channel msg-type msg msg-id)))) + +;;; `jupyter-ioloop-comm' + +(defclass jupyter-ioloop-comm (jupyter-comm-layer) + ((ioloop :type jupyter-ioloop)) + :abstract t) + +;; Fall back method that catches IOLoop events that have not been handled by +;; the communication layer already. +(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop) + (comm jupyter-ioloop-comm) + event) + (unless (memq (car event) '(start quit)) + (jupyter-event-handler comm event))) + +(cl-defmethod jupyter-send ((comm jupyter-ioloop-comm) &rest event) + (apply #'jupyter-send (oref comm ioloop) event)) + +(cl-defmethod jupyter-comm-start ((comm jupyter-ioloop-comm)) + (with-slots (ioloop) comm + (unless (jupyter-ioloop-alive-p ioloop) + (jupyter-ioloop-start ioloop comm)))) + +(cl-defmethod jupyter-comm-stop ((comm jupyter-ioloop-comm)) + (with-slots (ioloop) comm + (when (jupyter-ioloop-alive-p ioloop) + (jupyter-ioloop-stop ioloop)))) + +(cl-defmethod jupyter-comm-alive-p ((comm jupyter-ioloop-comm)) + (and (slot-boundp comm 'ioloop) + (jupyter-ioloop-alive-p (oref comm ioloop)))) + +;;; `jupyter-channel-ioloop-comm' + +(cl-defstruct jupyter-proxy-channel endpoint alive-p) + +(defclass jupyter-channel-ioloop-comm (jupyter-ioloop-comm jupyter-hb-comm) + ((session :type jupyter-session) + (iopub :type jupyter-proxy-channel) + (shell :type jupyter-proxy-channel) + (stdin :type jupyter-proxy-channel))) + +(cl-defmethod initialize-instance ((comm jupyter-channel-ioloop-comm) &rest _) + (cl-call-next-method) + (oset comm ioloop (jupyter-channel-ioloop))) + +(cl-defmethod jupyter-initialize-connection ((comm jupyter-channel-ioloop-comm) + (session jupyter-session)) + (cl-call-next-method) + (let ((endpoints (jupyter-session-endpoints session))) + (oset comm session (copy-sequence session)) + (oset comm hb (make-instance + 'jupyter-hb-channel + :session (oref comm session) + :endpoint (plist-get endpoints :hb))) + (cl-loop + for channel in '(:stdin :shell :iopub) + do (setf (slot-value comm (jupyter-comm--channel channel)) + (make-jupyter-proxy-channel + :endpoint (plist-get endpoints channel) + :alive-p nil))))) + +(cl-defmethod jupyter-comm-start ((comm jupyter-channel-ioloop-comm)) + (with-slots (ioloop session) comm + (unless (jupyter-ioloop-alive-p ioloop) + (jupyter-ioloop-start ioloop session comm)) + (cl-loop + for channel in '(:hb :shell :iopub :stdin) + do (jupyter-start-channel comm channel)))) + +(cl-defmethod jupyter-comm-stop ((comm jupyter-channel-ioloop-comm)) + (cl-loop + for channel in '(:hb :shell :iopub :stdin) + do (jupyter-stop-channel comm channel)) + (cl-call-next-method)) + +;;;; `jupyter-channel-ioloop' events + +(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-channel-ioloop) + (comm jupyter-channel-ioloop-comm) + (event (head stop-channel))) + (setf (jupyter-proxy-channel-alive-p + (slot-value comm (jupyter-comm--channel (cadr event)))) + nil)) + +(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-channel-ioloop) + (comm jupyter-channel-ioloop-comm) + (event (head start-channel))) + (setf (jupyter-proxy-channel-alive-p + (slot-value comm (jupyter-comm--channel (cadr event)))) + t)) + +;;;; Channel querying methods + +(cl-defmethod jupyter-channel-alive-p ((comm jupyter-channel-ioloop-comm) channel) + (if (eq channel :hb) (jupyter-channel-alive-p (oref comm hb)) + (with-slots (ioloop) comm + (and ioloop (jupyter-ioloop-alive-p ioloop) + (jupyter-proxy-channel-alive-p + (slot-value comm (jupyter-comm--channel channel))))))) + +(cl-defmethod jupyter-channels-running-p ((comm jupyter-channel-ioloop-comm)) + "Are any channels of CLIENT running?" + (cl-loop + for channel in '(:shell :iopub :stdin :hb) + thereis (jupyter-channel-alive-p comm channel))) + +;;;; Channel start/stop methods + +(cl-defmethod jupyter-stop-channel ((comm jupyter-channel-ioloop-comm) channel) + (when (jupyter-channel-alive-p comm channel) + (if (eq channel :hb) (jupyter-stop-channel (oref comm hb)) + (with-slots (ioloop) comm + (jupyter-send ioloop 'stop-channel channel) + ;; Verify that the channel stops + (or (jupyter-ioloop-wait-until ioloop 'stop-channel + (lambda (_) (not (jupyter-channel-alive-p comm channel)))) + (error "Channel not stopped in ioloop subprocess")))))) + +(cl-defmethod jupyter-start-channel ((comm jupyter-channel-ioloop-comm) channel) + (unless (jupyter-channel-alive-p comm channel) + (if (eq channel :hb) (jupyter-start-channel (oref comm hb)) + (let ((endpoint (jupyter-proxy-channel-endpoint + (slot-value comm (jupyter-comm--channel channel))))) + (with-slots (ioloop) comm + (jupyter-send ioloop 'start-channel channel endpoint) + ;; Verify that the channel starts + (or (jupyter-ioloop-wait-until ioloop 'start-channel + (lambda (_) (jupyter-channel-alive-p comm channel))) + (error "Channel not started in ioloop subprocess (%s)" channel))))))) + +(provide 'jupyter-comm-layer) + +;;; jupyter-comm-layer.el ends here diff --git a/jupyter-kernel-manager.el b/jupyter-kernel-manager.el index 94d0623..e558877 100644 --- a/jupyter-kernel-manager.el +++ b/jupyter-kernel-manager.el @@ -114,8 +114,15 @@ connect to MANAGER's kernel." (signal 'wrong-type-argument (list '(subclass jupyter-kernel-client) class))) (let ((client (apply #'make-instance class slots))) (prog1 client - (jupyter-initialize-connection client (oref manager session)) - (oset client manager manager)))) + (oset client manager manager) + ;; TODO: We can also have the manager hold the kcomm object and just + ;; pass a single kcomm object to all clients using this manager since the + ;; kcomm broadcasts event to all connected clients. This is more + ;; efficient as it only uses one subprocess for every client connected to + ;; a kernel. + (oset client kcomm (jupyter-channel-ioloop-comm)) + (jupyter-initialize-connection + client (copy-sequence (oref manager session)))))) (defun jupyter--kernel-sentinel (kernel &optional _) "Kill the KERNEL process and its buffer." diff --git a/jupyter-messages.el b/jupyter-messages.el index 1ad5559..9cd3ffe 100644 --- a/jupyter-messages.el +++ b/jupyter-messages.el @@ -287,15 +287,15 @@ and `:msg_type'." (cdr parts) (let ((dheader (jupyter--decode header))) (list - :header `(message-part ,header ,dheader) + :header (list 'message-part header dheader) :msg_id (plist-get dheader :msg_id) :msg_type (plist-get dheader :msg_type) ;; Also decode the parent header here since it is used quite often in ;; the parent Emacs process - :parent_header `(message-part ,parent-header - ,(jupyter--decode parent-header)) - :metadata `(message-part ,metadata nil) - :content `(message-part ,content nil) + :parent_header (list 'message-part parent-header + (jupyter--decode parent-header)) + :metadata (list 'message-part metadata nil) + :content (list 'message-part content nil) :buffers buffers)))) ;;; Sending/receiving diff --git a/test/jupyter-test.el b/test/jupyter-test.el index 4b87d11..017f2f1 100644 --- a/test/jupyter-test.el +++ b/test/jupyter-test.el @@ -66,6 +66,32 @@ (jupyter-request-id req))) (should (equal (jupyter-message-get (caddr msgs) :execution_state) "idle")))))) +;;;; Comm layer + +(ert-deftest jupyter-comm-layer () + :tags '(mock comm) + (let ((comm (jupyter-mock-comm-layer)) + (obj (make-jupyter-mock-comm-obj))) + (jupyter-connect-client comm obj) + (should (= (length (oref comm clients)) 1)) + (should (eq (jupyter-weak-ref-resolve (car (oref comm clients))) obj)) + (should (= (oref comm alive) 1)) + (jupyter-connect-client comm obj) + (should (= (length (oref comm clients)) 1)) + (should (eq (jupyter-weak-ref-resolve (car (oref comm clients))) obj)) + (should (= (oref comm alive) 1)) + + (should-not (jupyter-mock-comm-obj-event obj)) + (jupyter-event-handler comm '(event)) + ;; Events are handled in a timer, not right away + (sleep-for 0.01) + (should (equal (jupyter-mock-comm-obj-event obj) '(event))) + + (jupyter-disconnect-client comm obj) + (should (= (length (oref comm clients)) 0)) + (should-not (oref comm alive)) + (jupyter-disconnect-client comm obj))) + ;;; Callbacks (ert-deftest jupyter-wait-until-idle () @@ -524,20 +550,25 @@ ;;; Client ;; TODO: Different values of the session argument +;; TODO: Update for new `jupyter-channel-ioloop-comm' (ert-deftest jupyter-initialize-connection () :tags '(client init) + (skip-unless nil) + ;; The default comm is a jupyter-channel-ioloop-comm (let ((conn-info (jupyter-test-conn-info-plist)) (client (jupyter-kernel-client))) + (oset client kcomm (jupyter-sync-channel-comm)) (jupyter-initialize-connection client conn-info) - (with-slots (session channels) client + ;; kcomm by default is a `jupyter-channel-ioloop-comm' + (with-slots (session kcomm) client (ert-info ("Client session") (should (string= (jupyter-session-key session) (plist-get conn-info :key))) (should (equal (jupyter-session-conn-info session) conn-info))) (ert-info ("Heartbeat channel initialized") - (should (eq session (oref (plist-get channels :hb) session))) - (should (string= (oref (plist-get channels :hb) endpoint) + (should (eq session (oref (oref kcomm hb) session))) + (should (string= (oref (oref kcomm hb) endpoint) (format "tcp://127.0.0.1:%d" (plist-get conn-info :hb_port))))) (ert-info ("Shell, iopub, stdin initialized") @@ -566,6 +597,7 @@ (ert-info ("Starting/stopping channels") (let ((conn-info (jupyter-test-conn-info-plist)) (client (jupyter-kernel-client))) + (oset client kcomm (jupyter-sync-channel-comm)) (jupyter-initialize-connection client conn-info) (cl-loop for channel in '(:hb :shell :iopub :stdin) diff --git a/test/test-helper.el b/test/test-helper.el index c17dea1..bd774da 100644 --- a/test/test-helper.el +++ b/test/test-helper.el @@ -30,6 +30,7 @@ (require 'zmq) (require 'jupyter-client) (require 'jupyter-repl) +(require 'jupyter-comm-layer) (require 'jupyter-org-client) (require 'jupyter-kernel-manager) (require 'cl-lib) @@ -58,11 +59,12 @@ handling a message is always (cl-defmethod initialize-instance ((client jupyter-echo-client) &rest _slots) (cl-call-next-method) (oset client messages (make-ring 10)) - (oset client channels - (list :hb (jupyter-hb-channel) - :shell (list :alive-p nil :endpoint "foo://bar") - :stdin (list :alive-p nil :endpoint "foo://bar") - :iopub (list :alive-p nil :endpoint "foo://bar")))) + (oset client kcomm (jupyter-channel-ioloop-comm)) + (with-slots (kcomm) client + (oset kcomm hb (jupyter-hb-channel)) + (oset kcomm stdin (make-jupyter-proxy-channel)) + (oset kcomm shell (make-jupyter-proxy-channel)) + (oset kcomm iopub (make-jupyter-proxy-channel)))) (cl-defmethod jupyter-send ((client jupyter-echo-client) channel @@ -96,6 +98,29 @@ handling a message is always (ring-insert+extend (oref client messages) msg 'grow) (cl-call-next-method)) +;;; `jupyter-mock-comm-layer' + +(defclass jupyter-mock-comm-layer (jupyter-comm-layer) + ((alive :initform nil))) + +(cl-defmethod jupyter-comm-alive-p ((comm jupyter-mock-comm-layer)) + (oref comm alive)) + +(cl-defmethod jupyter-comm-start ((comm jupyter-mock-comm-layer)) + (unless (oref comm alive) + (oset comm alive 0)) + (cl-incf (oref comm alive))) + +(cl-defmethod jupyter-comm-stop ((comm jupyter-mock-comm-layer)) + (cl-decf (oref comm alive)) + (when (zerop (oref comm alive)) + (oset comm alive nil))) + +(cl-defstruct jupyter-mock-comm-obj event) + +(cl-defmethod jupyter-event-handler ((obj jupyter-mock-comm-obj) event) + (setf (jupyter-mock-comm-obj-event obj) event)) + ;;; Macros (cl-defmacro jupyter-ert-info ((message-form &key ((:prefix prefix-form) "Info: "))