From b40b7de837d37ea36f75f5d5e925c0991b2ddeaf Mon Sep 17 00:00:00 2001 From: Nathaniel Nicandro Date: Fri, 28 Jun 2019 20:03:00 -0500 Subject: [PATCH] Do not depend strongly on zmq Having the `jupyter-comm-layer` abstraction means we do not need to do so. * jupyter-base.el (zmq): Un-require. (jupyter-socket-types): Move to `jupyter-channels.el`. (jupyter-session): Don't mention zmq in doc string. (jupyter-available-local-ports, jupyter-make-ssh-tunnel): New functions. (jupyter-tunnel-connection): Use them. * jupyter-channel-ioloop-comm.el: New file. * jupyter-channels.el (jupyter-messages): Un-require. (jupyter-comm-layer, zmq): New requires. (jupyter-socket-types): Moved from `jupyter-base.el`. (jupyter-send, jupyter-recv): Implementations for `jupyter-session` moved from `jupyter-messages.el`. (jupyter-sync-channel-comm): `jupyter-comm-layer` implementation for `jupyter-sync-channel` objects moved from `jupyter-comm-layer.el`. * jupyter-comm-layer.el (jupyter-channel-ioloop): Un-require. (jupyter-sync-channel-comm): Move implementation to `jupyter-channels.el`. (jupyter-ioloop-comm): Move implementation to new file `jupyter-ioloop-comm.el`. (jupyter-channel-ioloop-comm): Move implementation to new file `jupyter-channel-ioloop-comm.el`. * jupyter-ioloop-comm.el: New file. * jupyter-ioloop.el (zmq): Require. * jupyter-kernel-manager.el (jupyter-make-client): Ensure `jupyter-channel-ioloop-comm` is required. * jupyter-messages.el (jupyter-send) (jupyter-recv): Moved to `jupyter-channels.el` * jupyter-repl.el (jupyter-connect-repl): Ensure `jupyter-channel-ioloop-comm` is required. * test/jupyter-test.el (jupyter-available-local-ports): New test. * test/test-helper.el (jupyter-channel-ioloop-comm): New require. --- jupyter-base.el | 70 +++++---- jupyter-channel-ioloop-comm.el | 140 +++++++++++++++++ jupyter-channels.el | 199 +++++++++++++++++++++++- jupyter-comm-layer.el | 271 --------------------------------- jupyter-ioloop-comm.el | 67 ++++++++ jupyter-ioloop.el | 1 + jupyter-kernel-manager.el | 4 +- jupyter-messages.el | 39 ----- jupyter-repl.el | 4 +- test/jupyter-test.el | 9 ++ test/test-helper.el | 2 +- 11 files changed, 460 insertions(+), 346 deletions(-) create mode 100644 jupyter-channel-ioloop-comm.el create mode 100644 jupyter-ioloop-comm.el diff --git a/jupyter-base.el b/jupyter-base.el index 75b5f44..58616fa 100644 --- a/jupyter-base.el +++ b/jupyter-base.el @@ -34,7 +34,6 @@ (require 'eieio) (require 'eieio-base) (require 'json) -(require 'zmq) (declare-function tramp-dissect-file-name "tramp" (name &optional nodefault)) (declare-function tramp-file-name-user "tramp") @@ -71,14 +70,6 @@ messages consider this variable." (defconst jupyter-protocol-version "5.3" "The jupyter protocol version that is implemented.") -(defconst jupyter-socket-types - (list :hb zmq-REQ - :shell zmq-DEALER - :iopub zmq-SUB - :stdin zmq-DEALER - :control zmq-DEALER) - "The socket types for the various channels used by `jupyter'.") - (defconst jupyter-message-types (list :execute-result "execute_result" :execute-request "execute_request" @@ -400,8 +391,7 @@ fields: - CONN-INFO :: The connection info. property list of the kernel this session is used to sign messages for. -- ID :: A string of bytes used as the `zmq-ROUTING-ID' for every - `jupyter-channel' that utilizes the session object. +- ID :: A string of bytes that uniquely identifies this session. - KEY :: The key used when signing messages. If KEY is nil, message signing is not performed." @@ -469,6 +459,37 @@ following fields: (eval-when-compile (require 'tramp)) +(defun jupyter-available-local-ports (n) + "Return a list of N ports available on the localhost." + (let (servers) + (unwind-protect + (cl-loop + repeat n + do (push (make-network-process + :name "jupyter-available-local-ports" + :server t + :host "127.0.0.1" + :service t) + servers) + finally return (mapcar (lambda (p) (cadr (process-contact p))) servers)) + (mapc #'delete-process servers)))) + +(defun jupyter-make-ssh-tunnel (lport rport server remoteip) + (or remoteip (setq remoteip "127.0.0.1")) + (start-process + "jupyter-ssh-tunnel" nil + "ssh" + ;; Run in background + "-f" + ;; Wait until the tunnel is open + "-o ExitOnForwardFailure=yes" + ;; Local forward + "-L" (format "127.0.0.1:%d:%s:%d" lport remoteip rport) + server + ;; Close the tunnel if no other connections are made within 60 + ;; seconds + "sleep 60")) + (defun jupyter-tunnel-connection (conn-file &optional server) "Forward local ports to the remote ports in CONN-FILE. CONN-FILE is the path to a Jupyter connection file, SERVER is the @@ -480,7 +501,7 @@ If CONN-FILE is a `tramp' file name, the SERVER argument will be ignored and the host will be extracted from the information contained in the file name. -Note that `zmq-make-tunnel' is used to create the tunnels." +Note only SSH tunnels are currently supported." (catch 'no-tunnels (let ((conn-info (jupyter-read-plist conn-file))) (when (and (file-remote-p conn-file) @@ -498,20 +519,17 @@ Note that `zmq-make-tunnel' is used to create the tunnels." (_ (setq server (if user (concat user "@" host) host)))))) - (let ((sock (zmq-socket (zmq-current-context) zmq-REP))) - (unwind-protect - (cl-loop - with remoteip = (plist-get conn-info :ip) - for (key maybe-rport) on conn-info by #'cddr - collect key and if (memq key '(:hb_port :shell_port :control_port - :stdin_port :iopub_port)) - collect (let ((lport (zmq-bind-to-random-port sock "tcp://127.0.0.1"))) - (zmq-unbind sock (zmq-socket-get sock zmq-LAST-ENDPOINT)) - (prog1 lport - (zmq-make-tunnel lport maybe-rport server remoteip))) - else collect maybe-rport) - (zmq-socket-set sock zmq-LINGER 0) - (zmq-close sock)))))) + (let* ((keys '(:hb_port :shell_port :control_port + :stdin_port :iopub_port)) + (lports (jupyter-available-local-ports (length keys)))) + (cl-loop + with remoteip = (plist-get conn-info :ip) + for (key maybe-rport) on conn-info by #'cddr + collect key and if (memq key keys) + collect (let ((lport (pop lports))) + (prog1 lport + (jupyter-make-ssh-tunnel lport maybe-rport server remoteip))) + else collect maybe-rport))))) ;;; Helper functions diff --git a/jupyter-channel-ioloop-comm.el b/jupyter-channel-ioloop-comm.el new file mode 100644 index 0000000..6997442 --- /dev/null +++ b/jupyter-channel-ioloop-comm.el @@ -0,0 +1,140 @@ +;;; jupyter-channel-ioloop-comm.el --- Communication layer using jupyter-channel-ioloop -*- lexical-binding: t -*- + +;; Copyright (C) 2019 Nathaniel Nicandro + +;; Author: Nathaniel Nicandro +;; Created: 27 Jun 2019 +;; Version: 0.8.0 + +;; This program is free software; you can redistribute it and/or +;; modify it under the terms of the GNU General Public License as +;; published by the Free Software Foundation; either version 3, or (at +;; your option) any later version. + +;; This program is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with GNU Emacs; see the file COPYING. If not, write to the +;; Free Software Foundation, Inc., 59 Temple Place - Suite 330, +;; Boston, MA 02111-1307, USA. + +;;; Commentary: + +;; Implement the `jupyter-comm-layer' interface on-top of a +;; `jupyter-channel-ioloop'. + +;;; Code: + +(require 'jupyter-ioloop-comm) +(require 'jupyter-channel-ioloop) + +(cl-defstruct jupyter-proxy-channel endpoint alive-p) + +(defclass jupyter-channel-ioloop-comm (jupyter-ioloop-comm + jupyter-hb-comm + jupyter-comm-autostop) + ((session :type jupyter-session) + (iopub :type jupyter-proxy-channel) + (shell :type jupyter-proxy-channel) + (stdin :type jupyter-proxy-channel))) + +(cl-defmethod initialize-instance ((comm jupyter-channel-ioloop-comm) &optional _slots) + (cl-call-next-method) + (oset comm ioloop (jupyter-channel-ioloop))) + +(cl-defmethod jupyter-comm-id ((comm jupyter-channel-ioloop-comm)) + (format "session=%s" (truncate-string-to-width + (jupyter-session-id (oref comm session)) + 9 nil nil "…"))) + +(cl-defmethod jupyter-initialize-connection ((comm jupyter-channel-ioloop-comm) + (session jupyter-session)) + (cl-call-next-method) + (let ((endpoints (jupyter-session-endpoints session))) + (oset comm session (copy-sequence session)) + (oset comm hb (make-instance + 'jupyter-hb-channel + :session (oref comm session) + :endpoint (plist-get endpoints :hb))) + (cl-loop + for channel in '(:stdin :shell :iopub) + do (setf (slot-value comm (jupyter-comm--channel channel)) + (make-jupyter-proxy-channel + :endpoint (plist-get endpoints channel) + :alive-p nil))))) + +(cl-defmethod jupyter-comm-start ((comm jupyter-channel-ioloop-comm)) + (with-slots (ioloop session) comm + (unless (jupyter-ioloop-alive-p ioloop) + (jupyter-ioloop-start ioloop session comm)) + (cl-loop + for channel in '(:hb :shell :iopub :stdin) + do (jupyter-start-channel comm channel)))) + +(cl-defmethod jupyter-comm-stop ((comm jupyter-channel-ioloop-comm)) + (cl-loop + for channel in '(:hb :shell :iopub :stdin) + do (jupyter-stop-channel comm channel)) + (cl-call-next-method)) + +;;;; `jupyter-channel-ioloop' events + +(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-channel-ioloop) + (comm jupyter-channel-ioloop-comm) + (event (head stop-channel))) + (setf (jupyter-proxy-channel-alive-p + (slot-value comm (jupyter-comm--channel (cadr event)))) + nil)) + +(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-channel-ioloop) + (comm jupyter-channel-ioloop-comm) + (event (head start-channel))) + (setf (jupyter-proxy-channel-alive-p + (slot-value comm (jupyter-comm--channel (cadr event)))) + t)) + +;;;; Channel querying methods + +(cl-defmethod jupyter-channel-alive-p ((comm jupyter-channel-ioloop-comm) channel) + (if (eq channel :hb) (jupyter-channel-alive-p (oref comm hb)) + (with-slots (ioloop) comm + (and ioloop (jupyter-ioloop-alive-p ioloop) + (jupyter-proxy-channel-alive-p + (slot-value comm (jupyter-comm--channel channel))))))) + +(cl-defmethod jupyter-channels-running-p ((comm jupyter-channel-ioloop-comm)) + "Are any channels of CLIENT running?" + (cl-loop + for channel in '(:shell :iopub :stdin :hb) + thereis (jupyter-channel-alive-p comm channel))) + +;;;; Channel start/stop methods + +(cl-defmethod jupyter-stop-channel ((comm jupyter-channel-ioloop-comm) channel) + (when (jupyter-channel-alive-p comm channel) + (if (eq channel :hb) (jupyter-stop-channel (oref comm hb)) + (with-slots (ioloop) comm + (jupyter-send ioloop 'stop-channel channel) + ;; Verify that the channel stops + (or (jupyter-ioloop-wait-until ioloop 'stop-channel + (lambda (_) (not (jupyter-channel-alive-p comm channel)))) + (error "Channel not stopped in ioloop subprocess (%s)" channel)))))) + +(cl-defmethod jupyter-start-channel ((comm jupyter-channel-ioloop-comm) channel) + (unless (jupyter-channel-alive-p comm channel) + (if (eq channel :hb) (jupyter-start-channel (oref comm hb)) + (let ((endpoint (jupyter-proxy-channel-endpoint + (slot-value comm (jupyter-comm--channel channel))))) + (with-slots (ioloop) comm + (jupyter-send ioloop 'start-channel channel endpoint) + ;; Verify that the channel starts + (or (jupyter-ioloop-wait-until ioloop 'start-channel + (lambda (_) (jupyter-channel-alive-p comm channel))) + (error "Channel not started in ioloop subprocess (%s)" channel))))))) + +(provide 'jupyter-channel-ioloop-comm) + +;;; jupyter-channel-ioloop-comm.el ends here diff --git a/jupyter-channels.el b/jupyter-channels.el index d0e2272..dbb1809 100644 --- a/jupyter-channels.el +++ b/jupyter-channels.el @@ -32,12 +32,18 @@ ;; In order for communication to occur on the other channels, one of ;; `jupyter-send' or `jupyter-recv' must be called after starting the channel ;; with `jupyter-start-channel'. +;; +;; Also implemented is a `jupyter-comm-layer' using `jupyter-sync-channel' comm +;; objects (`jupyter-sync-channel-comm') defined in this file. It is more of a +;; reference implementation to show how it could be done and required that +;; Emacs was built with threading support enabled. ;;; Code: (eval-when-compile (require 'subr-x)) (require 'jupyter-base) -(require 'jupyter-messages) ; For `jupyter-send' +(require 'jupyter-comm-layer) +(require 'zmq) (require 'ring) (defgroup jupyter-channels nil @@ -54,6 +60,14 @@ heartbeat channel is called. See `jupyter-hb-on-kernel-dead'." :type 'integer :group 'jupyter-channels) +(defconst jupyter-socket-types + (list :hb zmq-REQ + :shell zmq-DEALER + :iopub zmq-SUB + :stdin zmq-DEALER + :control zmq-DEALER) + "The socket types for the various channels used by `jupyter'.") + ;;; Basic channel types (defclass jupyter-channel () @@ -127,18 +141,56 @@ If CHANNEL is already stopped, do nothing.") (zmq-close (oref channel socket)) (oset channel socket nil))) -(cl-defmethod jupyter-send ((channel jupyter-sync-channel) type message &optional msg-id) - (jupyter-send (oref channel session) (oref channel socket) type message msg-id)) - -(cl-defmethod jupyter-recv ((channel jupyter-sync-channel)) - (jupyter-recv (oref channel session) (oref channel socket))) - (cl-defgeneric jupyter-channel-alive-p ((channel jupyter-channel)) "Determine if a CHANNEL is alive.") (cl-defmethod jupyter-channel-alive-p ((channel jupyter-sync-channel)) (not (null (oref channel socket)))) +;;; Sending/receiving + +(cl-defmethod jupyter-send ((session jupyter-session) + socket + type + message + &optional + msg-id + flags) + "For SESSION, send a message on SOCKET. +TYPE is message type of MESSAGE, one of the keys in +`jupyter-message-types'. MESSAGE is the message content. +Optionally supply a MSG-ID to the message, if this is nil a new +message ID will be generated. FLAGS has the same meaning as in +`zmq-send'. Return the message ID of the sent message." + (declare (indent 1)) + (cl-destructuring-bind (id . msg) + (jupyter-encode-message session type + :msg-id msg-id :content message) + (prog1 id + (zmq-send-multipart socket msg flags)))) + +(cl-defmethod jupyter-recv ((session jupyter-session) socket &optional flags) + "For SESSION, receive a message on SOCKET with FLAGS. +FLAGS is passed to SOCKET according to `zmq-recv'. Return a cons cell + + (IDENTS . MSG) + +where IDENTS are the ZMQ identities associated with MSG and MSG +is the message property list whose fields can be accessed through +calls to `jupyter-message-content', `jupyter-message-parent-id', +and other such functions." + (let ((msg (zmq-recv-multipart socket flags))) + (when msg + (cl-destructuring-bind (idents . parts) + (jupyter--split-identities msg) + (cons idents (jupyter-decode-message session parts)))))) + +(cl-defmethod jupyter-send ((channel jupyter-sync-channel) type message &optional msg-id) + (jupyter-send (oref channel session) (oref channel socket) type message msg-id)) + +(cl-defmethod jupyter-recv ((channel jupyter-sync-channel)) + (jupyter-recv (oref channel session) (oref channel socket))) + ;;; Heartbeat channel (defclass jupyter-hb-channel (jupyter-sync-channel) @@ -242,6 +294,139 @@ seconds has elapsed without the kernel sending a ping back." (funcall (oref channel dead-cb))))))) (jupyter-weak-ref channel)))) +;;; `jupyter-channel-comm' +;; A communication layer using `jupyter-sync-channel' objects for communicating +;; with a kernel. This communication layer is mainly meant for speed comparison +;; with the `jupyter-channel-ioloop-comm' layer. It implements communication in +;; the current Emacs instance and comparing it with the +;; `jupyter-channel-ioloop-comm' shows how much of a slow down there is when +;; all the processing of messages happens in the current Emacs instance. +;; +;; Running the test suit using `jupyter-channel-comm' vs +;; `jupyter-channel-ioloop-comm' shows, very roughly, around a 2x speed up +;; using `jupyter-channel-ioloop-comm'. + +;; FIXME: This is needed since the `jupyter-kernel-client' and +;; `jupyter-channel-ioloop' use keywords whereas you can only access slots +;; using symbols. +(defsubst jupyter-comm--channel (c) + (cl-case c + (:hb 'hb) + (:iopub 'iopub) + (:shell 'shell) + (:stdin 'stdin))) + +(defclass jupyter-sync-channel-comm (jupyter-comm-layer + jupyter-hb-comm + jupyter-comm-autostop) + ((session :type jupyter-session) + (iopub :type jupyter-sync-channel) + (shell :type jupyter-sync-channel) + (stdin :type jupyter-sync-channel) + (thread))) + +(cl-defmethod initialize-instance ((_comm jupyter-sync-channel-comm) &optional _slots) + (unless (functionp 'make-thread) + (error "Need threading support")) + (cl-call-next-method)) + +(cl-defmethod jupyter-comm-id ((comm jupyter-sync-channel-comm)) + (format "session=%s" (truncate-string-to-width + (jupyter-session-id (oref comm session)) + 9 nil nil "…"))) + +(defun jupyter-sync-channel-comm--check (comm) + (condition-case err + (cl-loop + for channel-type in '(:iopub :shell :stdin) + for channel = (slot-value comm (jupyter-comm--channel channel-type)) + for msg = (and (jupyter-channel-alive-p channel) + (with-slots (session socket) channel + (condition-case nil + (jupyter-recv session socket zmq-DONTWAIT) + ((zmq-EINTR zmq-EAGAIN) nil)))) + when msg do (jupyter-event-handler + comm (cl-list* 'message channel-type msg))) + (error + (thread-signal (car (all-threads)) (car err) + (cons 'jupyter-sync-channel-comm--check (cdr err))) + (signal (car err) (cdr err))))) + +(cl-defmethod jupyter-comm-start ((comm jupyter-sync-channel-comm)) + (cl-loop + for channel in '(hb shell iopub stdin) + do (jupyter-start-channel (slot-value comm channel))) + (oset comm thread + (make-thread + (let ((comm-ref (jupyter-weak-ref comm))) + (lambda () + (while (when-let* ((comm (jupyter-weak-ref-resolve comm-ref))) + (prog1 comm + (jupyter-sync-channel-comm--check comm))) + (thread-yield) + (thread-yield))))))) + +(cl-defmethod jupyter-comm-stop ((comm jupyter-sync-channel-comm)) + (when (and (slot-boundp comm 'thread) + (thread-alive-p (oref comm thread))) + (thread-signal (oref comm thread) 'quit nil) + (slot-makeunbound comm 'thread)) + (cl-loop + for channel in '(hb shell iopub stdin) + do (jupyter-stop-channel (slot-value comm channel)))) + +(cl-defmethod jupyter-comm-alive-p ((comm jupyter-sync-channel-comm)) + (jupyter-channels-running-p comm)) + +(cl-defmethod jupyter-channel-alive-p ((comm jupyter-sync-channel-comm) channel) + (and (slot-boundp comm (jupyter-comm--channel channel)) + (jupyter-channel-alive-p (slot-value comm (jupyter-comm--channel channel))))) + +(cl-defmethod jupyter-channels-running-p ((comm jupyter-sync-channel-comm)) + (cl-loop + for channel in '(:shell :iopub :stdin :hb) + thereis (jupyter-channel-alive-p comm channel))) + +;;;; Channel start/stop methods + +(cl-defmethod jupyter-stop-channel ((comm jupyter-sync-channel-comm) channel) + (when (jupyter-channel-alive-p comm channel) + (jupyter-stop-channel + (slot-value comm (jupyter-comm--channel channel))))) + +(cl-defmethod jupyter-start-channel ((comm jupyter-sync-channel-comm) channel) + (unless (jupyter-channel-alive-p comm channel) + (jupyter-start-channel + (slot-value comm (jupyter-comm--channel channel))))) + +(cl-defmethod jupyter-initialize-connection ((comm jupyter-sync-channel-comm) + (session jupyter-session)) + (cl-call-next-method) + (let ((endpoints (jupyter-session-endpoints session))) + (oset comm session (copy-sequence session)) + (oset comm hb (make-instance + 'jupyter-hb-channel + :session (oref comm session) + :endpoint (plist-get endpoints :hb))) + (cl-loop + for channel in `(:stdin :shell :iopub) + do (setf (slot-value comm (jupyter-comm--channel channel)) + (jupyter-sync-channel + :type channel + :session (oref comm session) + :endpoint (plist-get endpoints channel)))))) + +(cl-defmethod jupyter-send ((comm jupyter-sync-channel-comm) + _ channel-type msg-type msg msg-id) + (let ((channel (slot-value comm (jupyter-comm--channel channel-type)))) + ;; Run the event handler on the next command loop since the expectation is + ;; the client is that sending is asynchronous. There may be some code that + ;; makes assumptions based on this. + (run-at-time + 0 nil (lambda (id) + (jupyter-event-handler comm (list 'sent channel-type id))) + (jupyter-send channel msg-type msg msg-id)))) + (provide 'jupyter-channels) ;;; jupyter-channels.el ends here diff --git a/jupyter-comm-layer.el b/jupyter-comm-layer.el index be3d71c..2f96a09 100644 --- a/jupyter-comm-layer.el +++ b/jupyter-comm-layer.el @@ -57,7 +57,6 @@ (eval-when-compile (require 'subr-x)) (require 'jupyter-base) -(require 'jupyter-channel-ioloop) (require 'jupyter-messages) (defgroup jupyter-comm-layer nil @@ -188,276 +187,6 @@ buffer.") (cl-defmethod jupyter-hb-unpause ((comm jupyter-hb-comm)) (jupyter-hb-unpause (oref comm hb))) -;;; `jupyter-channel-comm' -;; A communication layer using `jupyter-sync-channel' objects for communicating -;; with a kernel. This communication layer is mainly meant for speed comparison -;; with the `jupyter-channel-ioloop-comm' layer. It implements communication in -;; the current Emacs instance and comparing it with the -;; `jupyter-channel-ioloop-comm' shows how much of a slow down there is when -;; all the processing of messages happens in the current Emacs instance. -;; -;; Running the test suit using `jupyter-channel-comm' vs -;; `jupyter-channel-ioloop-comm' shows, very roughly, around a 2x speed up -;; using `jupyter-channel-ioloop-comm'. - -;; FIXME: This is needed since the `jupyter-kernel-client' and -;; `jupyter-channel-ioloop' use keywords whereas you can only access slots -;; using symbols. -(defsubst jupyter-comm--channel (c) - (cl-case c - (:hb 'hb) - (:iopub 'iopub) - (:shell 'shell) - (:stdin 'stdin))) - -(defclass jupyter-sync-channel-comm (jupyter-comm-layer - jupyter-hb-comm - jupyter-comm-autostop) - ((session :type jupyter-session) - (iopub :type jupyter-sync-channel) - (shell :type jupyter-sync-channel) - (stdin :type jupyter-sync-channel) - (thread))) - -(cl-defmethod initialize-instance ((_comm jupyter-sync-channel-comm) &optional _slots) - (unless (functionp 'make-thread) - (error "Need threading support")) - (cl-call-next-method)) - -(cl-defmethod jupyter-comm-id ((comm jupyter-sync-channel-comm)) - (format "session=%s" (truncate-string-to-width - (jupyter-session-id (oref comm session)) - 9 nil nil "…"))) - -(defun jupyter-sync-channel-comm--check (comm) - (condition-case err - (cl-loop - for channel-type in '(:iopub :shell :stdin) - for channel = (slot-value comm (jupyter-comm--channel channel-type)) - for msg = (and (jupyter-channel-alive-p channel) - (with-slots (session socket) channel - (condition-case nil - (jupyter-recv session socket zmq-DONTWAIT) - ((zmq-EINTR zmq-EAGAIN) nil)))) - when msg do (jupyter-event-handler - comm (cl-list* 'message channel-type msg))) - (error - (thread-signal (car (all-threads)) (car err) - (cons 'jupyter-sync-channel-comm--check (cdr err))) - (signal (car err) (cdr err))))) - -(cl-defmethod jupyter-comm-start ((comm jupyter-sync-channel-comm)) - (cl-loop - for channel in '(hb shell iopub stdin) - do (jupyter-start-channel (slot-value comm channel))) - (oset comm thread - (make-thread - (let ((comm-ref (jupyter-weak-ref comm))) - (lambda () - (while (when-let* ((comm (jupyter-weak-ref-resolve comm-ref))) - (prog1 comm - (jupyter-sync-channel-comm--check comm))) - (thread-yield) - (thread-yield))))))) - -(cl-defmethod jupyter-comm-stop ((comm jupyter-sync-channel-comm)) - (when (and (slot-boundp comm 'thread) - (thread-alive-p (oref comm thread))) - (thread-signal (oref comm thread) 'quit nil) - (slot-makeunbound comm 'thread)) - (cl-loop - for channel in '(hb shell iopub stdin) - do (jupyter-stop-channel (slot-value comm channel)))) - -(cl-defmethod jupyter-comm-alive-p ((comm jupyter-sync-channel-comm)) - (jupyter-channels-running-p comm)) - -(cl-defmethod jupyter-channel-alive-p ((comm jupyter-sync-channel-comm) channel) - (and (slot-boundp comm (jupyter-comm--channel channel)) - (jupyter-channel-alive-p (slot-value comm (jupyter-comm--channel channel))))) - -(cl-defmethod jupyter-channels-running-p ((comm jupyter-sync-channel-comm)) - (cl-loop - for channel in '(:shell :iopub :stdin :hb) - thereis (jupyter-channel-alive-p comm channel))) - -;;;; Channel start/stop methods - -(cl-defmethod jupyter-stop-channel ((comm jupyter-sync-channel-comm) channel) - (when (jupyter-channel-alive-p comm channel) - (jupyter-stop-channel - (slot-value comm (jupyter-comm--channel channel))))) - -(cl-defmethod jupyter-start-channel ((comm jupyter-sync-channel-comm) channel) - (unless (jupyter-channel-alive-p comm channel) - (jupyter-start-channel - (slot-value comm (jupyter-comm--channel channel))))) - -(cl-defmethod jupyter-initialize-connection ((comm jupyter-sync-channel-comm) - (session jupyter-session)) - (cl-call-next-method) - (let ((endpoints (jupyter-session-endpoints session))) - (oset comm session (copy-sequence session)) - (oset comm hb (make-instance - 'jupyter-hb-channel - :session (oref comm session) - :endpoint (plist-get endpoints :hb))) - (cl-loop - for channel in `(:stdin :shell :iopub) - do (setf (slot-value comm (jupyter-comm--channel channel)) - (jupyter-sync-channel - :type channel - :session (oref comm session) - :endpoint (plist-get endpoints channel)))))) - -(cl-defmethod jupyter-send ((comm jupyter-sync-channel-comm) - _ channel-type msg-type msg msg-id) - (let ((channel (slot-value comm (jupyter-comm--channel channel-type)))) - ;; Run the event handler on the next command loop since the expectation is - ;; the client is that sending is asynchronous. There may be some code that - ;; makes assumptions based on this. - (run-at-time - 0 nil (lambda (id) - (jupyter-event-handler comm (list 'sent channel-type id))) - (jupyter-send channel msg-type msg msg-id)))) - -;;; `jupyter-ioloop-comm' - -(defclass jupyter-ioloop-comm (jupyter-comm-layer) - ((ioloop :type jupyter-ioloop)) - :abstract t) - -;; Fall back method that catches IOLoop events that have not been handled by -;; the communication layer already. -(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop) - (comm jupyter-ioloop-comm) - event) - (unless (memq (car event) '(start quit)) - (jupyter-event-handler comm event))) - -(cl-defmethod jupyter-send ((comm jupyter-ioloop-comm) &rest event) - (apply #'jupyter-send (oref comm ioloop) event)) - -(cl-defmethod jupyter-comm-start ((comm jupyter-ioloop-comm)) - (with-slots (ioloop) comm - (unless (jupyter-ioloop-alive-p ioloop) - (jupyter-ioloop-start ioloop comm)))) - -(cl-defmethod jupyter-comm-stop ((comm jupyter-ioloop-comm)) - (with-slots (ioloop) comm - (when (jupyter-ioloop-alive-p ioloop) - (jupyter-ioloop-stop ioloop)))) - -(cl-defmethod jupyter-comm-alive-p ((comm jupyter-ioloop-comm)) - (and (slot-boundp comm 'ioloop) - (jupyter-ioloop-alive-p (oref comm ioloop)))) - -;;; `jupyter-channel-ioloop-comm' - -(cl-defstruct jupyter-proxy-channel endpoint alive-p) - -(defclass jupyter-channel-ioloop-comm (jupyter-ioloop-comm - jupyter-hb-comm - jupyter-comm-autostop) - ((session :type jupyter-session) - (iopub :type jupyter-proxy-channel) - (shell :type jupyter-proxy-channel) - (stdin :type jupyter-proxy-channel))) - -(cl-defmethod initialize-instance ((comm jupyter-channel-ioloop-comm) &optional _slots) - (cl-call-next-method) - (oset comm ioloop (jupyter-channel-ioloop))) - -(cl-defmethod jupyter-comm-id ((comm jupyter-channel-ioloop-comm)) - (format "session=%s" (truncate-string-to-width - (jupyter-session-id (oref comm session)) - 9 nil nil "…"))) - -(cl-defmethod jupyter-initialize-connection ((comm jupyter-channel-ioloop-comm) - (session jupyter-session)) - (cl-call-next-method) - (let ((endpoints (jupyter-session-endpoints session))) - (oset comm session (copy-sequence session)) - (oset comm hb (make-instance - 'jupyter-hb-channel - :session (oref comm session) - :endpoint (plist-get endpoints :hb))) - (cl-loop - for channel in '(:stdin :shell :iopub) - do (setf (slot-value comm (jupyter-comm--channel channel)) - (make-jupyter-proxy-channel - :endpoint (plist-get endpoints channel) - :alive-p nil))))) - -(cl-defmethod jupyter-comm-start ((comm jupyter-channel-ioloop-comm)) - (with-slots (ioloop session) comm - (unless (jupyter-ioloop-alive-p ioloop) - (jupyter-ioloop-start ioloop session comm)) - (cl-loop - for channel in '(:hb :shell :iopub :stdin) - do (jupyter-start-channel comm channel)))) - -(cl-defmethod jupyter-comm-stop ((comm jupyter-channel-ioloop-comm)) - (cl-loop - for channel in '(:hb :shell :iopub :stdin) - do (jupyter-stop-channel comm channel)) - (cl-call-next-method)) - -;;;; `jupyter-channel-ioloop' events - -(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-channel-ioloop) - (comm jupyter-channel-ioloop-comm) - (event (head stop-channel))) - (setf (jupyter-proxy-channel-alive-p - (slot-value comm (jupyter-comm--channel (cadr event)))) - nil)) - -(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-channel-ioloop) - (comm jupyter-channel-ioloop-comm) - (event (head start-channel))) - (setf (jupyter-proxy-channel-alive-p - (slot-value comm (jupyter-comm--channel (cadr event)))) - t)) - -;;;; Channel querying methods - -(cl-defmethod jupyter-channel-alive-p ((comm jupyter-channel-ioloop-comm) channel) - (if (eq channel :hb) (jupyter-channel-alive-p (oref comm hb)) - (with-slots (ioloop) comm - (and ioloop (jupyter-ioloop-alive-p ioloop) - (jupyter-proxy-channel-alive-p - (slot-value comm (jupyter-comm--channel channel))))))) - -(cl-defmethod jupyter-channels-running-p ((comm jupyter-channel-ioloop-comm)) - "Are any channels of CLIENT running?" - (cl-loop - for channel in '(:shell :iopub :stdin :hb) - thereis (jupyter-channel-alive-p comm channel))) - -;;;; Channel start/stop methods - -(cl-defmethod jupyter-stop-channel ((comm jupyter-channel-ioloop-comm) channel) - (when (jupyter-channel-alive-p comm channel) - (if (eq channel :hb) (jupyter-stop-channel (oref comm hb)) - (with-slots (ioloop) comm - (jupyter-send ioloop 'stop-channel channel) - ;; Verify that the channel stops - (or (jupyter-ioloop-wait-until ioloop 'stop-channel - (lambda (_) (not (jupyter-channel-alive-p comm channel)))) - (error "Channel not stopped in ioloop subprocess (%s)" channel)))))) - -(cl-defmethod jupyter-start-channel ((comm jupyter-channel-ioloop-comm) channel) - (unless (jupyter-channel-alive-p comm channel) - (if (eq channel :hb) (jupyter-start-channel (oref comm hb)) - (let ((endpoint (jupyter-proxy-channel-endpoint - (slot-value comm (jupyter-comm--channel channel))))) - (with-slots (ioloop) comm - (jupyter-send ioloop 'start-channel channel endpoint) - ;; Verify that the channel starts - (or (jupyter-ioloop-wait-until ioloop 'start-channel - (lambda (_) (jupyter-channel-alive-p comm channel))) - (error "Channel not started in ioloop subprocess (%s)" channel))))))) - (provide 'jupyter-comm-layer) ;;; jupyter-comm-layer.el ends here diff --git a/jupyter-ioloop-comm.el b/jupyter-ioloop-comm.el new file mode 100644 index 0000000..d977f01 --- /dev/null +++ b/jupyter-ioloop-comm.el @@ -0,0 +1,67 @@ +;;; jupyter-ioloop-comm.el --- Communication layer using jupyter-ioloop -*- lexical-binding: t -*- + +;; Copyright (C) 2019 Nathaniel Nicandro + +;; Author: Nathaniel Nicandro +;; Created: 27 Jun 2019 +;; Version: 0.8.0 + +;; This program is free software; you can redistribute it and/or +;; modify it under the terms of the GNU General Public License as +;; published by the Free Software Foundation; either version 3, or (at +;; your option) any later version. + +;; This program is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with GNU Emacs; see the file COPYING. If not, write to the +;; Free Software Foundation, Inc., 59 Temple Place - Suite 330, +;; Boston, MA 02111-1307, USA. + +;;; Commentary: + +;; Implement the `jupyter-comm-layer' interface on-top of a `jupyter-ioloop'. +;; Note this class only implements a subset of the `jupyter-comm-layer' +;; interface needed for a `jupyter-kernel-client' and is usually sub-classed to +;; be usable by a `jupyter-kernel-client'. See `jupyter-channel-ioloop-comm'. + +;;; Code: + +(require 'jupyter-comm-layer) +(require 'jupyter-ioloop) + +(defclass jupyter-ioloop-comm (jupyter-comm-layer) + ((ioloop :type jupyter-ioloop)) + :abstract t) + +;; Fall back method that catches IOLoop events that have not been handled by +;; the communication layer already. +(cl-defmethod jupyter-ioloop-handler ((_ioloop jupyter-ioloop) + (comm jupyter-ioloop-comm) + event) + (unless (memq (car event) '(start quit)) + (jupyter-event-handler comm event))) + +(cl-defmethod jupyter-send ((comm jupyter-ioloop-comm) &rest event) + (apply #'jupyter-send (oref comm ioloop) event)) + +(cl-defmethod jupyter-comm-start ((comm jupyter-ioloop-comm)) + (with-slots (ioloop) comm + (unless (jupyter-ioloop-alive-p ioloop) + (jupyter-ioloop-start ioloop comm)))) + +(cl-defmethod jupyter-comm-stop ((comm jupyter-ioloop-comm)) + (with-slots (ioloop) comm + (when (jupyter-ioloop-alive-p ioloop) + (jupyter-ioloop-stop ioloop)))) + +(cl-defmethod jupyter-comm-alive-p ((comm jupyter-ioloop-comm)) + (and (slot-boundp comm 'ioloop) + (jupyter-ioloop-alive-p (oref comm ioloop)))) + +(provide 'jupyter-ioloop-comm) + +;;; jupyter-ioloop-comm.el ends here diff --git a/jupyter-ioloop.el b/jupyter-ioloop.el index dc55825..96b4403 100644 --- a/jupyter-ioloop.el +++ b/jupyter-ioloop.el @@ -53,6 +53,7 @@ ;;; Code: (require 'jupyter-base) +(require 'zmq) (eval-when-compile (require 'subr-x)) (defvar jupyter-ioloop-poller nil diff --git a/jupyter-kernel-manager.el b/jupyter-kernel-manager.el index 9b0dea1..6c33ce1 100644 --- a/jupyter-kernel-manager.el +++ b/jupyter-kernel-manager.el @@ -324,6 +324,7 @@ SLOTS are the slots used to initialize the client with.") (prog1 client (oset client manager manager)))) +;; FIXME: Do not hard-code the communication layer (cl-defmethod jupyter-make-client ((manager jupyter-kernel-manager) _class &rest _slots) "Make a new client from CLASS connected to MANAGER's kernel. CLASS should be a subclass of `jupyter-kernel-client', a new @@ -332,12 +333,13 @@ connect to MANAGER's kernel." (let ((client (cl-call-next-method))) (with-slots (kernel) manager (prog1 client + (require 'jupyter-channel-ioloop-comm) ;; TODO: We can also have the manager hold the kcomm object and just ;; pass a single kcomm object to all clients using this manager since the ;; kcomm broadcasts event to all connected clients. This is more ;; efficient as it only uses one subprocess for every client connected to ;; a kernel. - (oset client kcomm (jupyter-channel-ioloop-comm)) + (oset client kcomm (make-instance 'jupyter-channel-ioloop-comm)) (jupyter-initialize-connection client (oref kernel session)))))) (cl-defmethod jupyter-start-kernel ((manager jupyter-kernel-manager) &rest args) diff --git a/jupyter-messages.el b/jupyter-messages.el index db08fdc..4fb4a86 100644 --- a/jupyter-messages.el +++ b/jupyter-messages.el @@ -270,7 +270,6 @@ The returned object has the same form as the object returned by parts) buffers))) - (cl-defun jupyter-decode-message (session parts &key (signer #'jupyter-hmac-sha256)) "Use SESSION to decode message PARTS. PARTS should be a list of message parts in the order of a valid @@ -328,44 +327,6 @@ and `:msg_type'." :content (list 'message-part content nil) :buffers buffers)))) -;;; Sending/receiving - -(cl-defmethod jupyter-send ((session jupyter-session) - socket - type - message - &optional - msg-id - flags) - "For SESSION, send a message on SOCKET. -TYPE is message type of MESSAGE, one of the keys in -`jupyter-message-types'. MESSAGE is the message content. -Optionally supply a MSG-ID to the message, if this is nil a new -message ID will be generated. FLAGS has the same meaning as in -`zmq-send'. Return the message ID of the sent message." - (declare (indent 1)) - (cl-destructuring-bind (id . msg) - (jupyter-encode-message session type - :msg-id msg-id :content message) - (prog1 id - (zmq-send-multipart socket msg flags)))) - -(cl-defmethod jupyter-recv ((session jupyter-session) socket &optional flags) - "For SESSION, receive a message on SOCKET with FLAGS. -FLAGS is passed to SOCKET according to `zmq-recv'. Return a cons cell - - (IDENTS . MSG) - -where IDENTS are the ZMQ identities associated with MSG and MSG -is the message property list whose fields can be accessed through -calls to `jupyter-message-content', `jupyter-message-parent-id', -and other such functions." - (let ((msg (zmq-recv-multipart socket flags))) - (when msg - (cl-destructuring-bind (idents . parts) - (jupyter--split-identities msg) - (cons idents (jupyter-decode-message session parts)))))) - ;;; Control messages (cl-defun jupyter-message-interrupt-request () diff --git a/jupyter-repl.el b/jupyter-repl.el index fb4f96c..72c05de 100644 --- a/jupyter-repl.el +++ b/jupyter-repl.el @@ -2212,7 +2212,9 @@ interactively, DISPLAY the new REPL buffer as well." (or client-class (setq client-class 'jupyter-repl-client)) (jupyter-error-if-not-client-class-p client-class 'jupyter-repl-client) (let ((client (make-instance client-class))) - (oset client kcomm (jupyter-channel-ioloop-comm)) + ;; FIXME: See note in `jupyter-make-client' + (require 'jupyter-channel-ioloop-comm) + (oset client kcomm (make-instance 'jupyter-channel-ioloop-comm)) (jupyter-initialize-connection client file-or-plist) (jupyter-start-channels client) (jupyter-hb-unpause client) diff --git a/test/jupyter-test.el b/test/jupyter-test.el index 7358c6a..7fea302 100644 --- a/test/jupyter-test.el +++ b/test/jupyter-test.el @@ -825,6 +825,15 @@ (should-not (jupyter-line-count-greater-p "\n\n" 2)) (should-not (jupyter-line-count-greater-p "\n\n" 3))) +(ert-deftest jupyter-available-local-ports () + :tags '(client) + (let ((ports (jupyter-available-local-ports 5))) + (should (= (length ports) 5)) + (dolist (p ports) (should (integerp p))) + (dolist (proc (process-list)) + (should-not (string-match-p "jupyter-available-local-ports" + (process-name proc)))))) + ;;; IOloop (ert-deftest jupyter-ioloop-lifetime () diff --git a/test/test-helper.el b/test/test-helper.el index 1633e18..3b9b7b2 100644 --- a/test/test-helper.el +++ b/test/test-helper.el @@ -30,7 +30,7 @@ (require 'zmq) (require 'jupyter-client) (require 'jupyter-repl) -(require 'jupyter-comm-layer) +(require 'jupyter-channel-ioloop-comm) (require 'jupyter-org-client) (require 'jupyter-kernel-manager) (require 'cl-lib)