From 23b9d03f3e43af7c5a8be134e19a8a4251a0fd97 Mon Sep 17 00:00:00 2001 From: Nathaniel Nicandro Date: Sat, 6 Feb 2021 13:37:11 -0600 Subject: [PATCH] Partially revert "Remove no longer used files" This partially reverts commit 472d6bf3224911fefbbad5da4aefef6f3510d824. --- jupyter-channel-ioloop.el | 187 +++++++++++++ jupyter-channel.el | 71 +++++ jupyter-ioloop.el | 507 ++++++++++++++++++++++++++++++++++ jupyter-kernel-process.el | 349 +++++++++++++++++++++++ jupyter-zmq-channel-ioloop.el | 82 ++++++ jupyter-zmq-channel.el | 248 +++++++++++++++++ 6 files changed, 1444 insertions(+) create mode 100644 jupyter-channel-ioloop.el create mode 100644 jupyter-channel.el create mode 100644 jupyter-ioloop.el create mode 100644 jupyter-kernel-process.el create mode 100644 jupyter-zmq-channel-ioloop.el create mode 100644 jupyter-zmq-channel.el diff --git a/jupyter-channel-ioloop.el b/jupyter-channel-ioloop.el new file mode 100644 index 0000000..4005d5f --- /dev/null +++ b/jupyter-channel-ioloop.el @@ -0,0 +1,187 @@ +;;; jupyter-channel-ioloop.el --- Abstract class to communicate with a jupyter-channel in a subprocess -*- lexical-binding: t -*- + +;; Copyright (C) 2019-2020 Nathaniel Nicandro + +;; Author: Nathaniel Nicandro +;; Created: 27 Jun 2019 + +;; This program is free software; you can redistribute it and/or +;; modify it under the terms of the GNU General Public License as +;; published by the Free Software Foundation; either version 3, or (at +;; your option) any later version. + +;; This program is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with GNU Emacs; see the file COPYING. If not, write to the +;; Free Software Foundation, Inc., 59 Temple Place - Suite 330, +;; Boston, MA 02111-1307, USA. + +;;; Commentary: + +;; Define a `jupyter-ioloop' that can be sent events to start, stop, or send a +;; message on a set of `jupyter-channel' objects. For example to start a +;; `jupyter-channel' in the subprocess environment you would do something like +;; +;; (jupyter-send ioloop 'start-channel TYPE ENDPOINT) +;; +;; where TYPE and ENDPOINT have the same meaning as in `jupyter-channel'. +;; +;; Note by default, no channels are available in the subprocess environment. +;; You initialize channels by setting the `jupyter-channel-ioloop-channels' +;; variable in the subprocess environment, e.g. using +;; `jupyter-ioloop-add-setup', before starting the `jupyter-ioloop'. +;; +;; When you call `jupyter-ioloop-start' a `jupyter-session' object needs to +;; passed as the second argument with whatever object you would like to receive +;; events as the third. The `jupyter-session-id' will be used as the value of +;; the :identity key in the call to `jupyter-start' when starting a +;; channel. +;; +;; Each event sent to the subprocess will send back a corresponding +;; confirmation event, the three events that can be sent and their +;; corresponding confirmation events are: +;; +;; (start-channel TYPE ENDPOINT) -> (start-channel TYPE) +;; (stop-channel TYPE) -> (stop-channel TYPE) +;; (send TYPE MSG-TYPE MSG MSG-ID) -> (sent MSG-ID) +;; +;; For the send event, the MSG-TYPE, MSG, and MSG-ID have the same meaning as +;; the TYPE, MSG, and MSG-ID arguments of the `jupyter-send' method of a +;; `jupyter-channel'. +;; +;; Ex. +;; +;; (let ((ioloop (jupyter-channel-ioloop)) +;; (session (jupyter-session :id ...))) +;; (jupyter-start-ioloop ioloop session ...) +;; ... +;; (jupyter-send ioloop 'start-channel ...) +;; ...) + +;;; Code: + +(require 'jupyter-ioloop) + +(defvar jupyter-channel-ioloop-session nil + "The `jupyter-session' used when initializing Jupyter channels.") + +(defvar jupyter-channel-ioloop-channels nil + "A list of synchronous channels in an ioloop controlling Jupyter channels.") + +(jupyter-ioloop-add-arg-type jupyter-channel + (lambda (arg) + `(or (object-assoc ,arg :type jupyter-channel-ioloop-channels) + (error "Channel not alive (%s)" ,arg)))) + +(defclass jupyter-channel-ioloop (jupyter-ioloop) + () + :abstract t) + +(cl-defmethod initialize-instance ((ioloop jupyter-channel-ioloop) &optional _slots) + (cl-call-next-method) + (jupyter-ioloop-add-setup ioloop + (require 'jupyter-channel-ioloop)) + (jupyter-channel-ioloop-add-start-channel-event ioloop) + (jupyter-channel-ioloop-add-stop-channel-event ioloop) + (jupyter-channel-ioloop-add-send-event ioloop) + (jupyter-ioloop-add-teardown ioloop + (mapc #'jupyter-stop jupyter-channel-ioloop-channels))) + +(defun jupyter-channel-ioloop-set-session (ioloop session) + "In the IOLOOP, set SESSION as the `jupyter-channel-ioloop-session'. +Add a form to IOLOOP's setup that sets the variable +`jupyter-channel-ioloop-session' to a `jupyter-session' based on +SESSION's id and key. Remove any top level form in the setup that +sets `jupyter-channel-ioloop-session' via `setq' before doing so." + (cl-callf (lambda (setup) + (cons `(setq jupyter-channel-ioloop-session + (jupyter-session + :id ,(jupyter-session-id session) + :key ,(jupyter-session-key session))) + (cl-remove-if + (lambda (f) (and (eq (car f) 'setq) + (eq (cadr f) 'jupyter-channel-ioloop-session))) + setup))) + (oref ioloop setup))) + +;;; Channel events + +(defun jupyter-channel-ioloop-add-start-channel-event (ioloop) + "Add a start-channel event handler to IOLOOP. +The event fires when the IOLOOP receives a list with the form + + (start-channel CHANNEL-TYPE ENDPOINT) + +and shall stop any existing channel with CHANNEL-TYPE and start a +new channel with CHANNEL-TYPE connected to ENDPOINT. The +underlying socket IDENTITY is derived from +`jupyter-channel-ioloop-session' in the IOLOOP environment. The +channel will be added to the variable +`jupyter-channel-ioloop-channels' in the IOLOOP environment. + +Note, before sending this event to IOLOOP, the corresponding +channel needs to be available in the +`jupyer-channel-ioloop-channels' variable. You can initialize +this variable in the setup form of IOLOOP. + +A list with the form + + (start-channel CHANNEL-TYPE) + +is returned to the parent process." + (jupyter-ioloop-add-event + ioloop start-channel ((channel jupyter-channel) endpoint) + ;; Stop the channel if it is already alive + (when (jupyter-alive-p channel) + (jupyter-stop channel)) + ;; Start the channel + (oset channel endpoint endpoint) + (let ((identity (jupyter-session-id jupyter-channel-ioloop-session))) + (jupyter-start channel :identity identity)) + (list 'start-channel (oref channel type)))) + +(defun jupyter-channel-ioloop-add-stop-channel-event (ioloop) + "Add a stop-channel event handler to IOLOOP. +The event fires when the IOLOOP receives a list with the form + + (stop-channel CHANNEL-TYPE) + +If a channel with CHANNEL-TYPE exists and is alive, it is stopped. + +A list with the form + + (stop-channel CHANNEL-TYPE) + +is returned to the parent process." + (jupyter-ioloop-add-event ioloop stop-channel (type) + (let ((channel (object-assoc type :type jupyter-channel-ioloop-channels))) + (when (and channel (jupyter-alive-p channel)) + (jupyter-stop channel)) + (list 'stop-channel type)))) + +(defun jupyter-channel-ioloop-add-send-event (ioloop) + "Add a send event handler to IOLOOP. +The event fires when the IOLOOP receives a list of the form + + (send CHANNEL-TYPE MSG-TYPE MSG MSG-ID) + +and calls (jupyter-send CHANNEL MSG-TYPE MSG MSG-ID) using the +channel corresponding to CHANNEL-TYPE in the IOLOOP environment. + +A list of the form + + (sent CHANNEL-TYPE MSG-ID) + +is returned to the parent process." + (jupyter-ioloop-add-event + ioloop send ((channel jupyter-channel) msg-type msg msg-id) + (list 'sent (oref channel type) + (jupyter-send channel msg-type msg msg-id)))) + +(provide 'jupyter-channel-ioloop) + +;;; jupyter-channel-ioloop.el ends here diff --git a/jupyter-channel.el b/jupyter-channel.el new file mode 100644 index 0000000..1d1719f --- /dev/null +++ b/jupyter-channel.el @@ -0,0 +1,71 @@ +;;; jupyter-channel.el --- Jupyter channel interface -*- lexical-binding: t -*- + +;; Copyright (C) 2019-2020 Nathaniel Nicandro + +;; Author: Nathaniel Nicandro +;; Created: 27 Jun 2019 + +;; This program is free software; you can redistribute it and/or +;; modify it under the terms of the GNU General Public License as +;; published by the Free Software Foundation; either version 3, or (at +;; your option) any later version. + +;; This program is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with GNU Emacs; see the file COPYING. If not, write to the +;; Free Software Foundation, Inc., 59 Temple Place - Suite 330, +;; Boston, MA 02111-1307, USA. + +;;; Commentary: + +;; Defines the `jupyter-channel' interface. + +;;; Code: + +(defclass jupyter-channel () + ((type + :type keyword + :initarg :type + :documentation "The type of this channel.") + (session + :type jupyter-session + :initarg :session + :documentation "The session object used to sign and send/receive messages.") + (endpoint + :type string + :initarg :endpoint + :documentation "The endpoint this channel is connected to. + Typical endpoints look like \"tcp://127.0.0.1:5555\".")) + :abstract t) + +(cl-defmethod jupyter-start ((channel jupyter-channel) &key identity) + "Start a Jupyter CHANNEL using IDENTITY as the routing ID. +If CHANNEL is already alive, do nothing." + (cl-call-next-method)) + +(cl-defmethod jupyter-stop ((channel jupyter-channel)) + "Stop a Jupyter CHANNEL. +If CHANNEL is already stopped, do nothing." + (cl-call-next-method)) + +(cl-defmethod jupyter-alive-p ((channel jupyter-channel)) + "Return non-nil if a CHANNEL is alive." + (cl-call-next-method)) + +(cl-defmethod jupyter-send (channel type message &optional msg-id) + "On CHANNEL send MESSAGE which has message TYPE and optionally a MSG-ID." + (cl-call-next-method)) + +(cl-defmethod jupyter-recv (channel &optional dont-wait) + "Receive a message on CHANNEL. +If DONT-WAIT is non-nil, return nil immediately if there is no +message available to receive." + (cl-call-next-method)) + +(provide 'jupyter-channel) + +;;; jupyter-channel.el ends here diff --git a/jupyter-ioloop.el b/jupyter-ioloop.el new file mode 100644 index 0000000..0e6a660 --- /dev/null +++ b/jupyter-ioloop.el @@ -0,0 +1,507 @@ +;;; jupyter-ioloop.el --- Jupyter channel subprocess -*- lexical-binding: t -*- + +;; Copyright (C) 2018-2020 Nathaniel Nicandro + +;; Author: Nathaniel Nicandro +;; Created: 03 Nov 2018 + +;; This program is free software; you can redistribute it and/or +;; modify it under the terms of the GNU General Public License as +;; published by the Free Software Foundation; either version 3, or (at +;; your option) any later version. + +;; This program is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with GNU Emacs; see the file COPYING. If not, write to the +;; Free Software Foundation, Inc., 59 Temple Place - Suite 330, +;; Boston, MA 02111-1307, USA. + +;;; Commentary: + +;; An ioloop encapsulates a subprocess that communicates with its parent +;; process in a pre-defined way. The parent process sends events (lists with a +;; head element tagging the type of event and the rest of the elements being +;; the arguments), via a call to the `jupyter-send' method of a +;; `jupyter-ioloop'. The ioloop subprocess then handles the event in its +;; environment. You add an event that can be handled in the ioloop environment +;; by calling `jupyter-ioloop-add-event' before calling `jupyter-ioloop-start'. +;; +;; When one of the events added through `jupyter-ioloop-add-event' +;; returns something other than nil, it is sent back to the parent +;; process and the handler function passed to `jupyter-ioloop-start' +;; is called. +;; +;; An example that will echo back what was sent to the ioloop as a message in +;; the parent process: +;; +;; (let ((ioloop (jupyter-ioloop)) +;; (jupyter-ioloop-add-event ioloop echo (data) +;; "Return DATA back to the parent process." +;; (list 'echo data)) +;; (jupyter-ioloop-start ioloop (lambda (event) (message "%s" (cadr event)))) +;; (jupyter-send ioloop 'echo "Message") +;; (jupyter-ioloop-stop ioloop)) + +;;; Code: + +(require 'jupyter-base) +(require 'zmq) +(eval-when-compile (require 'subr-x)) + +(defvar jupyter-ioloop-poller nil + "The polling object being used to poll for events in an ioloop.") + +(defvar jupyter-ioloop-stdin nil + "A file descriptor or ZMQ socket used to receive events in an ioloop.") + +(defvar jupyter-ioloop-nsockets 1 + "The number of sockets being polled by `jupyter-ioloop-poller'.") + +(defvar jupyter-ioloop-pre-hook nil + "A hook called at the start of every polling loop. +The hook is called with no arguments.") + +(defvar jupyter-ioloop-post-hook nil + "A hook called at the end of every polling loop. +The hook is called with a single argument, the list of polling +events that occurred for this iteration or nil. The polling +events have the same value as the return value of +`zmq-poller-wait-all'.") + +(defvar jupyter-ioloop-timers nil) + +(defvar jupyter-ioloop-timeout 200 + "Maximum time (in ms) to wait for polling events on `jupyter-ioloop-poller'.") + +(defvar jupyter-ioloop--argument-types nil + "Argument types added via `jupyter-ioloop-add-arg-type'.") + +(defun jupyter-ioloop-environment-p () + "Return non-nil if this Emacs instance is an IOLoop subprocess." + (and noninteractive jupyter-ioloop-stdin jupyter-ioloop-poller)) + +(defclass jupyter-ioloop () + ((process :type (or null process) :initform nil) + (callbacks :type list :initform nil) + (events :type list :initform nil) + (setup :type list :initform nil) + (teardown :type list :initform nil)) + :documentation "An interface for sending asynchronous messages via a subprocess. + +An ioloop starts an Emacs subprocess setup to send events back +and forth between the parent Emacs process and the ioloop +asynchronously. The ioloop subprocess is essentially a polling +loop that polls its stdin and any sockets that may have been +created in the ioloop environment and performs pre-defined +actions when stdin sends an event. The structure of the +subprocess is the following + +\(progn + (let ((jupyter-ioloop-poller (zmq-poller))) + + + (condition-case nil + (while t + (run-hook 'jupyter-ioloop-pre-hook) + + (run-hook 'jupyter-ioloop-post-hook)) + (quit + + )))) + + is replaced by the form in the setup slot +of an ioloop and can be conveniently added to using +`jupyter-ioloop-add-setup'. + + is replaced with the teardown slot and +can be added to using `jupyter-ioloop-add-teardown'. + + is replaced by code that will +listen for stdin/socket events using `jupyter-ioloop-poller'. + +You add events to be handled by the subprocess using +`jupyter-ioloop-add-event', the return value of any event added +is what is sent to the parent Emacs process and what will +eventually be the sole argument to the handler function passed to +`jupyter-ioloop-start'. To suppress the subprocess from sending +anything back to the parent, ensure nil is returned by the form +created by `jupyter-ioloop-add-event'. + +See `jupyter-channel-ioloop' for an example of its usage.") + +(cl-defmethod initialize-instance ((ioloop jupyter-ioloop) &optional _slots) + (cl-call-next-method) + (jupyter-add-finalizer ioloop + (lambda () + (with-slots (process) ioloop + (when (process-live-p process) + (delete-process process)))))) + +(defun jupyter-ioloop-wait-until (ioloop event cb &optional timeout progress-msg) + "Wait until EVENT occurs on IOLOOP. +If EVENT occurs, call CB and return its value if non-nil. CB is +called with a single argument, an event list whose first element +is EVENT. If CB returns nil, continue waiting until EVENT occurs +again or until TIMEOUT seconds elapses, TIMEOUT defaults to +`jupyter-default-timeout'. If TIMEOUT is reached, return nil. + +If PROGRESS-MSG is non-nil, a progress reporter will be displayed +while waiting using PROGRESS-MSG as the message." + (declare (indent 2)) + (cl-check-type ioloop jupyter-ioloop) + (jupyter-with-timeout + (progress-msg (or timeout jupyter-default-timeout)) + (let ((e (jupyter-ioloop-last-event ioloop))) + (when (eq (car-safe e) event) (funcall cb e))))) + +(defun jupyter-ioloop-last-event (ioloop) + "Return the last event received on IOLOOP." + (cl-check-type ioloop jupyter-ioloop) + (and (oref ioloop process) + (process-get (oref ioloop process) :last-event))) + +(defmacro jupyter-ioloop-add-setup (ioloop &rest body) + "Set IOLOOP's `jupyter-ioloop-setup' slot to BODY. +BODY is the code that will be evaluated before the IOLOOP sends a +start event to the parent process." + (declare (indent 1)) + `(setf (oref ,ioloop setup) + (append (oref ,ioloop setup) + (quote ,body)))) + +(defmacro jupyter-ioloop-add-teardown (ioloop &rest body) + "Set IOLOOP's `jupyter-ioloop-teardown' slot to BODY. +BODY is the code that will be evaluated just before the IOLOOP +sends a quit event to the parent process." + (declare (indent 1)) + `(setf (oref ,ioloop teardown) + (append (oref ,ioloop teardown) + (quote ,body)))) + +(defmacro jupyter-ioloop-add-arg-type (tag fun) + "Add a new argument type for arguments in `jupyter-ioloop-add-event'. +If an argument has the form (arg TAG), where TAG is a symbol, in +the ARGS argument of `jupyter-ioloop-add-event', replace it with +the result of evaluating the form returned by FUN on arg in the +IOLOOP environment. + +For example suppose we define an argument type, jupyter-channel: + + (jupyter-ioloop-add-arg-type jupyter-channel + (lambda (arg) + `(or (object-assoc ,arg :type jupyter-channel-ioloop-channels) + (error \"Channel not alive (%s)\" ,arg)))) + +and define an event like + + (jupyter-ioloop-add-event ioloop stop-channel ((channel jupyter-channel)) + (jupyter-stop channel)) + +Finally after adding other events and starting the ioloop we send +an event like + + (jupyter-send ioloop 'stop-channel :shell) + +Then before the stop-channel event defined by +`jupyter-ioloop-add-event' is called in the IOLOOP environment, +the value for the channel argument passed by the `jupyter-send' +call is replaced by the form returned by the function specified +in the `jupyter-ioloop-add-arg-type' call." + (declare (indent 1)) + `(progn + (setf (alist-get ',tag jupyter-ioloop--argument-types nil 'remove) nil) + ;; NOTE: FUN is quoted to ensure lexical closures aren't created + (push (cons ',tag ,(list '\` fun)) jupyter-ioloop--argument-types))) + +(defun jupyter-ioloop--replace-args (args) + "Convert special arguments in ARGS. +Map over ARGS, converting its elements into + + ,arg or ,(app (lambda (x) BODY) arg) + +for use in a `pcase' form. The latter form occurs when one of +ARGS is of the form (arg TAG) where TAG is one of the keys in +`jupyter-ioloop--argument-types'. BODY will be replaced with the +result of calling the function associated with TAG in +`jupyter-ioloop--argument-types'. + +Return the list of converted arguments." + (mapcar (lambda (arg) + (pcase arg + (`(,val ,tag) + (let ((form (alist-get tag jupyter-ioloop--argument-types))) + (list '\, (list 'app `(lambda (x) ,(funcall form 'x)) val)))) + (_ (list '\, arg)))) + args)) + +(defmacro jupyter-ioloop-add-event (ioloop event args &optional doc &rest body) + "For IOLOOP, add an EVENT handler. +ARGS is a list of arguments that are bound when EVENT occurs. DOC +is an optional documentation string describing what BODY, the +expression which will be evaluated when EVENT occurs, does. If +BODY evaluates to any non-nil value, it will be sent to the +parent Emacs process. A nil value for BODY means don't send +anything. + +Some arguments are treated specially: + +If one of ARGS is a list ( tag) where is any symbol, +then the parent process that sends EVENT to IOLOOP is expected to +send a value that will be bound to and be handled by an +argument handler associated with tag before BODY is evaluated in +the IOLOOP process, see `jupyter-ioloop-add-arg-type'." + (declare (indent 3) (doc-string 4) (debug t)) + (unless (stringp doc) + (when doc + (setq body (cons doc body)))) + `(setf (oref ,ioloop events) + (cons (list (quote ,event) (quote ,args) (quote ,body)) + (cl-remove-if (lambda (x) (eq (car x) (quote ,event))) + (oref ,ioloop events))))) + +(defun jupyter-ioloop--event-dispatcher (ioloop exp) + "For IOLOOP return a form suitable for matching against EXP. +That is, return an expression which will cause an event to be +fired if EXP matches any event types handled by IOLOOP. + +TODO: Explain these +By default this adds the events quit, callback, and timer." + (let ((user-events + (cl-loop + for (event args body) in (oref ioloop events) + for cond = (list '\` (cl-list* + event (jupyter-ioloop--replace-args args))) + if (memq event '(quit callback timer)) + do (error "Event can't be one of quit, callback, or, timer") + ;; cond = `(event ,arg1 ,arg2 ...) + else collect `(,cond ,@body)))) + `(let* ((cmd ,exp) + (res (pcase cmd + ,@user-events + ;; Default events + (`(timer ,id ,period ,cb) + ;; Ensure we don't send anything back to the parent process + (prog1 nil + (let ((timer (run-at-time 0.0 period (byte-compile cb)))) + (puthash id timer jupyter-ioloop-timers)))) + (`(callback ,cb) + ;; Ensure we don't send anything back to the parent process + (prog1 nil + (setq jupyter-ioloop-timeout 0) + (add-hook 'jupyter-ioloop-pre-hook (byte-compile cb) 'append))) + ('(quit) (signal 'quit nil)) + (_ (error "Unhandled command %s" cmd))))) + ;; Can only send lists at the moment + (when (and res (listp res)) (zmq-prin1 res))))) + +(cl-defgeneric jupyter-ioloop-add-callback ((ioloop jupyter-ioloop) cb) + "In IOLOOP, add CB to be run in the IOLOOP environment. +CB is run at the start of every polling loop. Callbacks are +called in the order they are added. + +WARNING: A function added as a callback should be quoted to avoid +sending closures to the IOLOOP. An example: + + (jupyter-ioloop-add-callback ioloop + `(lambda () (zmq-prin1 'foo \"bar\")))" + (declare (indent 1)) + (cl-assert (functionp cb)) + (cl-callf append (oref ioloop callbacks) (list cb)) + (when (process-live-p (oref ioloop process)) + (jupyter-send ioloop 'callback (macroexpand-all cb)))) + +(defun jupyter-ioloop-poller-add (socket events) + "Add SOCKET to be polled using the `jupyter-ioloop-poller'. +EVENTS are the polling events that should be listened for on +SOCKET. If `jupyter-ioloop-poller' is not a `zmq-poller' object +do nothing." + (when (zmq-poller-p jupyter-ioloop-poller) + (zmq-poller-add jupyter-ioloop-poller socket events) + (cl-incf jupyter-ioloop-nsockets))) + +(defun jupyter-ioloop-poller-remove (socket) + "Remove SOCKET from the `jupyter-ioloop-poller'. +If `jupyter-ioloop-poller' is not a `zmq-poller' object do +nothing." + (when (zmq-poller-p jupyter-ioloop-poller) + (zmq-poller-remove jupyter-ioloop-poller socket) + (cl-decf jupyter-ioloop-nsockets))) + +(defun jupyter-ioloop--body (ioloop on-stdin) + `(let (events) + (condition-case nil + (progn + ,@(oref ioloop setup) + ;; Initialize any callbacks that were added before the ioloop was + ;; started + (setq jupyter-ioloop-pre-hook + (mapcar (lambda (f) + (when (symbolp f) + (setq f (symbol-function f))) + (unless (byte-code-function-p f) + (byte-compile f))) + (append jupyter-ioloop-pre-hook + (quote ,(mapcar #'macroexpand-all + (oref ioloop callbacks)))))) + ;; Notify the parent process we are ready to do something + (zmq-prin1 '(start)) + (let ((on-stdin (byte-compile (lambda () ,on-stdin)))) + (while t + (run-hooks 'jupyter-ioloop-pre-hook) + (setq events + (condition-case nil + (zmq-poller-wait-all + jupyter-ioloop-poller + jupyter-ioloop-nsockets + jupyter-ioloop-timeout) + ((zmq-EAGAIN zmq-EINTR zmq-ETIMEDOUT) nil))) + (let ((stdin-event (zmq-assoc jupyter-ioloop-stdin events))) + (when stdin-event + (setq events (delq stdin-event events)) + (funcall on-stdin))) + (run-hook-with-args 'jupyter-ioloop-post-hook events)))) + (quit + ,@(oref ioloop teardown) + (zmq-prin1 '(quit)))))) + +(defun jupyter-ioloop--function (ioloop port) + "Return the function that does the work of IOLOOP. +The returned function is suitable to send to a ZMQ subprocess for +evaluation using `zmq-start-process'. + +If PORT is non-nil the returned function will create a ZMQ PULL +socket to receive events from the parent process on the PORT of +the local host, otherwise events are expected to be received on +STDIN. This is useful on Windows systems which don't allow +polling the STDIN file handle." + `(lambda (ctx) + (push ,(file-name-directory (locate-library "jupyter-base")) load-path) + (require 'jupyter-ioloop) + (setq jupyter-ioloop-poller (zmq-poller)) + (setq jupyter-ioloop-stdin + ,(if port + `(let ((sock (zmq-socket ctx zmq-PAIR))) + (prog1 sock + (zmq-connect sock (format "tcp://127.0.0.1:%s" ,port)))) + 0)) + (zmq-poller-add jupyter-ioloop-poller jupyter-ioloop-stdin zmq-POLLIN) + ,(jupyter-ioloop--body + ioloop (jupyter-ioloop--event-dispatcher + ioloop (if port '(read (zmq-recv-decoded jupyter-ioloop-stdin)) + '(zmq-subprocess-read)))))) + +(defun jupyter-ioloop-alive-p (ioloop) + "Return non-nil if IOLOOP is ready to receive/send events." + (cl-check-type ioloop jupyter-ioloop) + (with-slots (process) ioloop + (and (process-live-p process) (process-get process :start)))) + +(defun jupyter-ioloop--make-filter (ioloop handler) + (lambda (event) + (let ((process (oref ioloop process))) + (process-put process :last-event event) + (cond + ((eq (car-safe event) 'start) + (process-put process :start t)) + ((eq (car-safe event) 'quit) + (process-put process :quit t)) + (t + (funcall handler event)))))) + +(cl-defgeneric jupyter-ioloop-start ((ioloop jupyter-ioloop) + handler + &key buffer) + "Start an IOLOOP. +HANDLER is a function of one argument and will be passed an event +received by the subprocess that IOLOOP represents, an event is +just a list. + +If IOLOOP was previously running, it is stopped first. + +If BUFFER is non-nil it should be a buffer that will be used as +the IOLOOP subprocess buffer, see `zmq-start-process'." + (jupyter-ioloop-stop ioloop) + (let (stdin port) + ;; NOTE: A socket is used to read input from the parent process to avoid + ;; the stdin buffering done when using `read-from-minibuffer' in the + ;; subprocess. When `noninteractive', `read-from-minibuffer' uses + ;; `getc_unlocked' internally and `getc_unlocked' reads from the stdin FILE + ;; object as opposed to reading directly from STDIN_FILENO. The problem is + ;; that FILE objects are buffered streams which means that every message + ;; the parent process sends does not necessarily correspond to a POLLIN + ;; event on STDIN_FILENO in the subprocess. Since we only call + ;; `read-from-minibuffer' when there is a POLLIN event on STDIN_FILENO + ;; there is the potential that a message is waiting to be handled in the + ;; buffer used by stdin which will only get handled if we send more + ;; messages to the subprocess thereby creating more POLLIN events. + (when (or t (memq system-type '(windows-nt ms-dos cygwin))) + (setq stdin (zmq-socket (zmq-current-context) zmq-PAIR)) + (setq port (zmq-bind-to-random-port stdin "tcp://127.0.0.1"))) + (let ((process (zmq-start-process + (jupyter-ioloop--function ioloop (when stdin port)) + ;; We go through this Emacs-fu, brought to you by Chris + ;; Wellons, https://nullprogram.com/blog/2014/01/27/, + ;; because we want OBJECT to be the final say in when + ;; everything gets garbage collected. If OBJECT loses + ;; scope, the ioloop process should be killed off. This + ;; wouldn't happen if we hold a strong reference to + ;; OBJECT. + :filter (jupyter-ioloop--make-filter ioloop handler) + :buffer buffer))) + (oset ioloop process process) + (when stdin + (process-put process :stdin stdin)) + (jupyter-ioloop-wait-until ioloop 'start #'identity)))) + +(cl-defgeneric jupyter-ioloop-stop ((ioloop jupyter-ioloop)) + "Stop IOLOOP. +Send a quit event to IOLOOP, wait until it actually quits before +returning." + (with-slots (process) ioloop + (when (process-live-p process) + (jupyter-send ioloop 'quit) + (unless (jupyter-ioloop-wait-until ioloop 'quit #'identity) + (delete-process process)) + (when-let* ((stdin (process-get process :stdin)) + (socket-p (zmq-socket-p stdin))) + (zmq-unbind stdin (zmq-get-option stdin zmq-LAST-ENDPOINT)))))) + +(defvar jupyter-ioloop--send-buffer nil) + +(defun jupyter-ioloop--dump-message (plist) + (with-current-buffer + (if (buffer-live-p jupyter-ioloop--send-buffer) + jupyter-ioloop--send-buffer + (setq jupyter-ioloop--send-buffer + (get-buffer-create " *jupyter-ioloop-send*"))) + (erase-buffer) + (let (print-level print-length) + (prin1 plist (current-buffer))) + (buffer-string))) + +(cl-defmethod jupyter-send ((ioloop jupyter-ioloop) &rest args) + "Using IOLOOP, send ARGS to its process. + +All arguments passed to this function are sent as a list to the +process unchanged. This means that all arguments should be +serializable." + (with-slots (process) ioloop + (cl-assert (process-live-p process)) + (let ((stdin (process-get process :stdin))) + (if stdin + (let ((msg (jupyter-ioloop--dump-message args)) sent) + (while (not sent) + (condition-case nil + (progn + (zmq-send-encoded stdin msg nil zmq-DONTWAIT) + (setq sent t)) + (zmq-EAGAIN (accept-process-output nil 0))))) + (zmq-subprocess-send process args))))) + +(provide 'jupyter-ioloop) + +;;; jupyter-ioloop.el ends here diff --git a/jupyter-kernel-process.el b/jupyter-kernel-process.el new file mode 100644 index 0000000..978e6e6 --- /dev/null +++ b/jupyter-kernel-process.el @@ -0,0 +1,349 @@ +;;; jupyter-kernel-process.el --- Jupyter kernels as Emacs processes -*- lexical-binding: t -*- + +;; Copyright (C) 2020 Nathaniel Nicandro + +;; Author: Nathaniel Nicandro +;; Created: 25 Apr 2020 + +;; This program is free software; you can redistribute it and/or +;; modify it under the terms of the GNU General Public License as +;; published by the Free Software Foundation; either version 3, or (at +;; your option) any later version. + +;; This program is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with GNU Emacs; see the file COPYING. If not, write to the +;; Free Software Foundation, Inc., 59 Temple Place - Suite 330, +;; Boston, MA 02111-1307, USA. + +;;; Commentary: + +;; Jupyter kernels as Emacs processes. + +;;; Code: + +(require 'jupyter-kernel) +(require 'jupyter-monads) + +(defgroup jupyter-kernel-process nil + "Jupyter kernels as Emacs processes" + :group 'jupyter) + +(declare-function jupyter-channel-ioloop-set-session "jupyter-channel-ioloop") + +(defvar jupyter--kernel-processes '() + "The list of kernel processes launched. +Elements look like (PROCESS CONN-FILE) where PROCESS is a kernel +process and CONN-FILE the associated connection file. + +Cleaning up the list removes elements whose PROCESS is no longer +live. When removing an element, CONN-FILE will be deleted and +PROCESS's buffer killed. + +The list is periodically cleaned up when a new process is +launched. + +Also, just before Emacs exits any connection files that still +exist are deleted.") + +;;; Kernel definition + +(cl-defstruct (jupyter-kernel-process + (:include jupyter-kernel))) + +(cl-defmethod jupyter-process ((kernel jupyter-kernel-process)) + "Return the process of KERNEL. +Return nil if KERNEL does not have an associated process." + (car (cl-find-if (lambda (x) (and (processp (car x)) + (eq (process-get (car x) :kernel) kernel))) + jupyter--kernel-processes))) + +(cl-defmethod jupyter-alive-p ((kernel jupyter-kernel-process)) + (let ((process (jupyter-process kernel))) + (and (process-live-p process) + (cl-call-next-method)))) + +(defun jupyter-kernel-process (&rest args) + "Return a `jupyter-kernel-process' initialized with ARGS." + (apply #'make-jupyter-kernel-process args)) + +(cl-defmethod jupyter-kernel :extra "process" (&rest args) + "Return a representation of a kernel based on an Emacs process. +If ARGS contains a :spec key, return a `jupyter-kernel-process' +initialized using ARGS. If the value is the name of a +kernelspec, the returned kernel's spec slot will be set to the +corresponding `jupyter-kernelspec'. The session of the returned +kernel will be initialized with the return value of +`jupyter-session-with-random-ports'. + +Call the next method if ARGS does not contain :spec." + (let ((spec (plist-get args :spec))) + (if (not spec) (cl-call-next-method) + (when (stringp spec) + (plist-put args :spec + (or (jupyter-guess-kernelspec spec) + (error "No kernelspec matching name (%s)" spec)))) + (apply #'jupyter-kernel-process args)))) + +;;; Client connection + +(defun jupyter-kernel-process-io (session) + (let* ((channels '(:shell :iopub :stdin)) + (ch-group (let ((endpoints (jupyter-session-endpoints session))) + (cl-loop + for ch in channels + collect ch + collect (list :endpoint (plist-get endpoints ch) + :alive-p nil)))) + (hb nil) + (discarded nil) + (kernel-io nil) + (ioloop nil)) + (cl-macrolet ((continue-after + (cond on-timeout) + `(jupyter-with-timeout + (nil jupyter-default-timeout ,on-timeout) + ,cond))) + (cl-labels ((ch-put + (ch prop value) + (plist-put (plist-get ch-group ch) prop value)) + (ch-get + (ch prop) + (plist-get (plist-get ch-group ch) prop)) + (ch-alive-p + (ch) + (and ioloop (jupyter-ioloop-alive-p ioloop) + (ch-get ch :alive-p))) + (ch-start + (ch) + (unless (ch-alive-p ch) + (jupyter-send ioloop 'start-channel ch + (ch-get ch :endpoint)) + (continue-after + (ch-alive-p ch) + (error "Channel failed to start: %s" ch)))) + (ch-stop + (ch) + (when (ch-alive-p ch) + (jupyter-send ioloop 'stop-channel ch) + (continue-after + (not (ch-alive-p ch)) + (error "Channel failed to stop: %s" ch)))) + (start + () + (unless ioloop + (require 'jupyter-zmq-channel-ioloop) + (setq ioloop (make-instance 'jupyter-zmq-channel-ioloop)) + (jupyter-channel-ioloop-set-session ioloop session)) + (unless (jupyter-ioloop-alive-p ioloop) + (jupyter-ioloop-start + ioloop + (lambda (event) + (pcase (car event) + ((and 'start-channel (let ch (cadr event))) + (ch-put ch :alive-p t)) + ((and 'stop-channel (let ch (cadr event))) + (ch-put ch :alive-p nil)) + ;; TODO: Get rid of this + ('sent nil) + (_ + ;; FIXME: Turn into a function not a macro, + ;; there is no need. + (jupyter-run-with-io kernel-io + (jupyter-publish event)))))) + (condition-case err + (cl-loop + for ch in channels + do (ch-start ch)) + (error + (jupyter-ioloop-stop ioloop) + (signal (car err) (cdr err))))) + + ioloop) + (stop + () + (and ioloop + (jupyter-ioloop-alive-p ioloop) + (jupyter-ioloop-stop ioloop)))) + (setq kernel-io + ;; TODO: (jupyter-publisher :name "Session I/O" :fn ...) + ;; + ;; so that on error in a subscriber, the name can be + ;; displayed to know where to look. This requires a + ;; `jupyter-publisher' struct type. + (jupyter-publisher + (lambda (content) + (if discarded + (error "Kernel I/O no longer available: %s" + (cl-prin1-to-string session)) + (pcase (car content) + ;; ('message channel idents . msg) + ('message + (pop content) + ;; TODO: Get rid of this. Have the ioloop do + ;; this. + (plist-put + (cddr content) :channel + (substring (symbol-name (car content)) 1)) + (jupyter-content (cddr content))) + ('send (apply #'jupyter-send (start) content)) + ('hb + (unless hb + (setq hb + (let ((endpoints + (jupyter-session-endpoints session))) + (make-instance + 'jupyter-hb-channel + :session session + :endpoint (plist-get endpoints :hb))))) + (jupyter-run-with-io (cadr content) + (jupyter-publish hb))) + (_ (error "Unhandled I/O: %s" content))))))) + (jupyter-return-delayed + (list kernel-io + (lambda () + (and hb (jupyter-hb-pause hb)) + (stop) + (setq hb nil ioloop nil discarded t)))))))) + +(cl-defmethod jupyter-io ((kernel jupyter-kernel-process)) + "Return an I/O connection to KERNEL's session." + (jupyter-kernel-process-io (jupyter-kernel-session kernel))) + +;;; Kernel management + +(defun jupyter--gc-kernel-processes () + (setq jupyter--kernel-processes + (cl-loop for (p conn-file) in jupyter--kernel-processes + if (process-live-p p) collect (list p conn-file) + else do (delete-process p) + (when (file-exists-p conn-file) + (delete-file conn-file)) + and when (buffer-live-p (process-buffer p)) + do (kill-buffer (process-buffer p))))) + +(defun jupyter-delete-connection-files () + "Delete all connection files created by Emacs." + ;; Ensure Emacs can be killed on error + (ignore-errors + (cl-loop for (_ conn-file) in jupyter--kernel-processes + do (when (file-exists-p conn-file) + (delete-file conn-file))))) + +(add-hook 'kill-emacs-hook #'jupyter-delete-connection-files) + +(defun jupyter--start-kernel-process (name kernelspec conn-file) + (let* ((process-name (format "jupyter-kernel-%s" name)) + (buffer-name (format " *jupyter-kernel[%s]*" name)) + (process-environment + (append (jupyter-process-environment kernelspec) + process-environment)) + (args (jupyter-kernel-argv kernelspec conn-file)) + (atime (nth 4 (file-attributes conn-file))) + (process (apply #'start-file-process process-name + (generate-new-buffer buffer-name) + (car args) (cdr args)))) + (set-process-query-on-exit-flag process jupyter--debug) + ;; Wait until the connection file has been read before returning. + ;; This is to give the kernel a chance to setup before sending it + ;; messages. + ;; + ;; TODO: Replace with a check of the heartbeat channel. + (jupyter-with-timeout + ((format "Starting %s kernel process..." name) + jupyter-long-timeout + (unless (process-live-p process) + (error "Kernel process exited:\n%s" + (with-current-buffer (process-buffer process) + (ansi-color-apply (buffer-string)))))) + ;; Windows systems may not have good time resolution when retrieving + ;; the last access time of a file so we don't bother with checking that + ;; the kernel has read the connection file and leave it to the + ;; downstream initialization to ensure that we can communicate with a + ;; kernel. + (or (memq system-type '(ms-dos windows-nt cygwin)) + (let ((attribs (file-attributes conn-file))) + ;; `file-attributes' can potentially return nil, in this case + ;; just assume it has read the connection file so that we can + ;; know for sure it is not connected if it fails to respond to + ;; any messages we send it. + (or (null attribs) + (not (equal atime (nth 4 attribs))))))) + (jupyter--gc-kernel-processes) + (push (list process conn-file) jupyter--kernel-processes) + process)) + +(cl-defmethod jupyter-do-launch :before ((kernel jupyter-kernel-process)) + "Ensure KERNEL has a non-nil SESSION slot. +A `jupyter-session' with random port numbers for the channels and +a newly generated message signing key will be set as the value of +KERNEL's SESSION slot if it is nil." + (pcase-let (((cl-struct jupyter-kernel-process session) kernel)) + (unless session + (setf (jupyter-kernel-session kernel) (jupyter-session-with-random-ports)) + ;; This is here for stability when running the tests. Sometimes + ;; the kernel ports are not set up fast enough due to the hack + ;; done in `jupyter-session-with-random-ports'. The effect + ;; seems to be messages that are sent but never received by the + ;; kernel. + (sit-for 0.2)))) + +(cl-defmethod jupyter-do-launch ((kernel jupyter-kernel-process)) + "Start KERNEL's process. +Do nothing if KERNEL's process is already live. + +The process arguments are constructed from KERNEL's SPEC. The +connection file passed as argument to the process is first +written to file, its contents are generated from KERNEL's SESSION +slot. + +See also https://jupyter-client.readthedocs.io/en/stable/kernels.html#kernel-specs" + (let ((process (jupyter-process kernel))) + (unless (process-live-p process) + (pcase-let (((cl-struct jupyter-kernel-process spec session) kernel)) + (setq process (jupyter--start-kernel-process + (jupyter-kernel-name kernel) spec + (jupyter-write-connection-file session)))) + (setf (process-get process :kernel) kernel) + (setf (process-sentinel process) + (lambda (process _) + (pcase (process-status process) + ('signal + (jupyter-kernel-died (process-get process :kernel)))))))) + (cl-call-next-method)) + +;; TODO: Add restart argument +(cl-defmethod jupyter-do-shutdown ((kernel jupyter-kernel-process)) + "Shutdown KERNEL by killing its process unconditionally." + (let ((process (jupyter-process kernel))) + (when process + (delete-process process) + (setf (process-get process :kernel) nil)) + (cl-call-next-method))) + +(cl-defmethod jupyter-do-interrupt ((kernel jupyter-kernel-process)) + "Interrupt KERNEL's process. +The process can be interrupted when the interrupt mode of +KERNEL's SPEC is \"signal\" or not specified, otherwise the +KERNEL is interrupted by sending an :interrupt-request on +KERNEL's control channel. + +See also https://jupyter-client.readthedocs.io/en/stable/kernels.html#kernel-specs" + (pcase-let* ((process (jupyter-process kernel)) + ((cl-struct jupyter-kernel-process spec) kernel) + ((cl-struct jupyter-kernelspec plist) spec) + (imode (plist-get plist :interrupt_mode))) + (if (or (null imode) (string= imode "signal")) + (when process + (interrupt-process process t)) + (cl-call-next-method)))) + +(provide 'jupyter-kernel-process) + +;;; jupyter-kernel-process.el ends here + + diff --git a/jupyter-zmq-channel-ioloop.el b/jupyter-zmq-channel-ioloop.el new file mode 100644 index 0000000..f46c1eb --- /dev/null +++ b/jupyter-zmq-channel-ioloop.el @@ -0,0 +1,82 @@ +;;; jupyter-zmq-channel-ioloop.el --- IOLoop functions for Jupyter channels -*- lexical-binding: t -*- + +;; Copyright (C) 2018-2020 Nathaniel Nicandro + +;; Author: Nathaniel Nicandro +;; Created: 08 Nov 2018 + +;; This program is free software; you can redistribute it and/or +;; modify it under the terms of the GNU General Public License as +;; published by the Free Software Foundation; either version 3, or (at +;; your option) any later version. + +;; This program is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with GNU Emacs; see the file COPYING. If not, write to the +;; Free Software Foundation, Inc., 59 Temple Place - Suite 330, +;; Boston, MA 02111-1307, USA. + +;;; Commentary: + +;; A `jupyter-channel-ioloop' using `jupyter-zmq-channel' to send and receive +;; messages. Whenever a message is received on a channel an event that looks +;; like the following will be sent back to the parent process +;; +;; (message CHANNEL-TYPE IDENTS . MSG) +;; +;; where CHANNEL-TYPE is the channel on which the message was received (one of +;; `jupyter-socket-types'), IDENTS are ZMQ identities, typically ignored, and +;; MSG is the message plist. + +;;; Code: + +(require 'jupyter-base) +(require 'jupyter-channel-ioloop) +(require 'jupyter-zmq-channel) + +(defclass jupyter-zmq-channel-ioloop (jupyter-channel-ioloop) + () + :documentation "A `jupyter-ioloop' configured for Jupyter channels.") + +(cl-defmethod initialize-instance ((ioloop jupyter-zmq-channel-ioloop) &optional _slots) + (cl-call-next-method) + (jupyter-ioloop-add-setup ioloop + (require 'jupyter-zmq-channel-ioloop) + (push 'jupyter-zmq-channel-ioloop--recv-messages jupyter-ioloop-post-hook) + (cl-loop + for channel in '(:shell :stdin :iopub) + unless (object-assoc channel :type jupyter-channel-ioloop-channels) + do (push (jupyter-zmq-channel + :session jupyter-channel-ioloop-session + :type channel) + jupyter-channel-ioloop-channels)))) + +(defun jupyter-zmq-channel-ioloop--recv-messages (events) + "Print the received messages described in EVENTS. +EVENTS is a list of socket events as returned by +`zmq-poller-wait-all'. If any of the sockets in EVENTS matches +one of the sockets in `jupyter-channel-ioloop-channels', receive a +message on the channel and print a list with the form + + (message CHANNEL-TYPE . MSG...) + +to stdout. CHANNEL-TYPE is the channel on which MSG was received, +either :shell, :stdin, or :iopub. MSG is a list as returned by +`jupyter-recv'." + (let (messages) + (dolist (channel jupyter-channel-ioloop-channels) + (with-slots (type socket) channel + (when (zmq-assoc socket events) + (push (cons type (jupyter-recv channel)) messages)))) + (when messages + ;; Send messages + (mapc (lambda (msg) (prin1 (cons 'message msg))) (nreverse messages)) + (zmq-flush 'stdout)))) + +(provide 'jupyter-zmq-channel-ioloop) + +;;; jupyter-zmq-channel-ioloop.el ends here diff --git a/jupyter-zmq-channel.el b/jupyter-zmq-channel.el new file mode 100644 index 0000000..0cdd1ac --- /dev/null +++ b/jupyter-zmq-channel.el @@ -0,0 +1,248 @@ +;;; jupyter-zmq-channel.el --- A Jupyter channel implementation using ZMQ sockets -*- lexical-binding: t -*- + +;; Copyright (C) 2019-2020 Nathaniel Nicandro + +;; Author: Nathaniel Nicandro +;; Created: 27 Jun 2019 + +;; This program is free software; you can redistribute it and/or +;; modify it under the terms of the GNU General Public License as +;; published by the Free Software Foundation; either version 3, or (at +;; your option) any later version. + +;; This program is distributed in the hope that it will be useful, but +;; WITHOUT ANY WARRANTY; without even the implied warranty of +;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +;; General Public License for more details. + +;; You should have received a copy of the GNU General Public License +;; along with GNU Emacs; see the file COPYING. If not, write to the +;; Free Software Foundation, Inc., 59 Temple Place - Suite 330, +;; Boston, MA 02111-1307, USA. + +;;; Commentary: + +;; Implements synchronous channel types using ZMQ sockets. Each channel is +;; essentially a wrapper around a `zmq-socket' constrained to a socket type by +;; the type of the channel and with an associated `zmq-IDENTITY' obtained from +;; the `jupyter-session' that must be associated with the channel. A heartbeat +;; channel is distinct from the other channels in that it is implemented using +;; a timer which periodically pings the kernel depending on how its configured. +;; In order for communication to occur on the other channels, one of +;; `jupyter-send' or `jupyter-recv' must be called after starting the channel +;; with `jupyter-start'. + +;;; Code: + +(require 'jupyter-messages) +(require 'zmq) +(require 'jupyter-channel) +(eval-when-compile (require 'subr-x)) + +(declare-function jupyter-ioloop-poller-remove "jupyter-ioloop") +(declare-function jupyter-ioloop-poller-add "jupyter-ioloop") + +(defconst jupyter-socket-types + (list :hb zmq-REQ + :shell zmq-DEALER + :iopub zmq-SUB + :stdin zmq-DEALER + :control zmq-DEALER) + "The socket types for the various channels used by `jupyter'.") + +(defclass jupyter-zmq-channel (jupyter-channel) + ((socket + :type (or null zmq-socket) + :initform nil + :documentation "The socket used for communicating with the kernel."))) + +(defun jupyter-connect-endpoint (type endpoint &optional identity) + "Create socket with TYPE and connect to ENDPOINT. +If IDENTITY is non-nil, it will be set as the ROUTING-ID of the +socket. Return the created socket." + (let ((sock (zmq-socket (zmq-current-context) type))) + (prog1 sock + (zmq-socket-set sock zmq-LINGER 1000) + (when identity + (zmq-socket-set sock zmq-ROUTING-ID identity)) + (zmq-connect sock endpoint)))) + +(defun jupyter-connect-channel (ctype endpoint &optional identity) + "Create a socket based on a Jupyter channel type. +CTYPE is one of the symbols `:hb', `:stdin', `:shell', +`:control', or `:iopub' and represents the type of channel to +connect to ENDPOINT. If IDENTITY is non-nil, it will be set as +the ROUTING-ID of the socket. Return the created socket." + (let ((sock-type (plist-get jupyter-socket-types ctype))) + (unless sock-type + (error "Invalid channel type (%s)" ctype)) + (jupyter-connect-endpoint sock-type endpoint identity))) + +(cl-defmethod jupyter-start ((channel jupyter-zmq-channel) + &key (identity (jupyter-session-id + (oref channel session)))) + (unless (jupyter-alive-p channel) + (let ((socket (jupyter-connect-channel + (oref channel type) (oref channel endpoint) identity))) + (oset channel socket socket) + (cl-case (oref channel type) + (:iopub + (zmq-socket-set socket zmq-SUBSCRIBE "")))) + (when (and (functionp 'jupyter-ioloop-environment-p) + (jupyter-ioloop-environment-p)) + (jupyter-ioloop-poller-add (oref channel socket) zmq-POLLIN)))) + +(cl-defmethod jupyter-stop ((channel jupyter-zmq-channel)) + (when (jupyter-alive-p channel) + (when (and (functionp 'jupyter-ioloop-environment-p) + (jupyter-ioloop-environment-p)) + (jupyter-ioloop-poller-remove (oref channel socket))) + (with-slots (socket) channel + (zmq-disconnect socket (zmq-socket-get socket zmq-LAST-ENDPOINT))) + (oset channel socket nil))) + +(cl-defmethod jupyter-alive-p ((channel jupyter-zmq-channel)) + (not (null (oref channel socket)))) + +(cl-defmethod jupyter-send ((channel jupyter-zmq-channel) type message &optional msg-id) + "Send a message on a ZMQ based Jupyter channel. +CHANNEL is the channel to send MESSAGE on. TYPE is a Jupyter +message type, like :kernel-info-request. Return the message ID +of the sent message." + (cl-destructuring-bind (id . msg) + (jupyter-encode-message (oref channel session) type + :msg-id msg-id + :content message) + (prog1 id + (zmq-send-multipart (oref channel socket) msg)))) + +(cl-defmethod jupyter-recv ((channel jupyter-zmq-channel) &optional dont-wait) + "Receive a message on CHANNEL. +Return a cons cell (IDENTS . MSG) where IDENTS are the ZMQ +message identities, as a list, and MSG is the received message. + +If DONT-WAIT is non-nil, return immediately without waiting for a +message if one isn't already available." + (condition-case nil + (let ((session (oref channel session)) + (msg (zmq-recv-multipart (oref channel socket) + (and dont-wait zmq-DONTWAIT)))) + (when msg + (cl-destructuring-bind (idents . parts) + (jupyter--split-identities msg) + (cons idents (jupyter-decode-message session parts))))) + (zmq-EAGAIN nil))) + +;;; Heartbeat channel + +(defvar jupyter-hb-max-failures 3 + "Number of heartbeat failures until the kernel is considered unreachable. +A ping is sent to the kernel on a heartbeat channel and waits +until `time-to-dead' seconds to see if the kernel sent a ping +back. If the kernel doesn't send a ping back after +`jupyter-hb-max-failures', the callback associated with the +heartbeat channel is called. See `jupyter-hb-on-kernel-dead'.") + +(defclass jupyter-hb-channel (jupyter-zmq-channel) + ((type + :type keyword + :initform :hb + :documentation "The type of this channel is `:hb'.") + (time-to-dead + :type number + :initform 10 + :documentation "The time in seconds to wait for a response +from the kernel until the connection is assumed to be dead. Note +that this slot only takes effect when starting the channel.") + (dead-cb + :type function + :initform #'ignore + :documentation "A callback function that takes 0 arguments +and is called when the kernel has not responded for +\(* `jupyter-hb-max-failures' `time-to-dead'\) seconds.") + (beating + :type (or boolean symbol) + :initform t + :documentation "A flag variable indicating that the heartbeat +channel is communicating with the kernel.") + (paused + :type boolean + :initform t + :documentation "A flag variable indicating that the heartbeat +channel is paused and not communicating with the kernel. To +pause the heartbeat channel use `jupyter-hb-pause', to unpause +use `jupyter-hb-unpause'.")) + :documentation "A base class for heartbeat channels.") + +(cl-defmethod jupyter-alive-p ((channel jupyter-hb-channel)) + "Return non-nil if CHANNEL is alive." + (zmq-socket-p (oref channel socket))) + +(defun jupyter-hb--pingable-p (channel) + (and (not (oref channel paused)) + (jupyter-alive-p channel))) + +(cl-defmethod jupyter-hb-beating-p ((channel jupyter-hb-channel)) + "Return non-nil if CHANNEL is reachable." + (and (jupyter-hb--pingable-p channel) + (oref channel beating))) + +(cl-defmethod jupyter-hb-pause ((channel jupyter-hb-channel)) + "Pause checking for heartbeat events on CHANNEL." + (oset channel paused t)) + +(cl-defmethod jupyter-hb-unpause ((channel jupyter-hb-channel)) + "Un-pause checking for heatbeat events on CHANNEL." + (when (oref channel paused) + (if (jupyter-alive-p channel) + ;; Consume a pending message from the kernel if there is one. We send a + ;; ping and then schedule a timer which fires TIME-TO-DEAD seconds + ;; later to receive the ping back from the kernel and start the process + ;; all over again. If the channel is paused before TIME-TO-DEAD + ;; seconds, there may still be a ping from the kernel waiting. + (ignore-errors (zmq-recv (oref channel socket) zmq-DONTWAIT)) + (jupyter-start channel)) + (oset channel paused nil) + (jupyter-hb--send-ping channel))) + +(cl-defmethod jupyter-hb-on-kernel-dead ((channel jupyter-hb-channel) fun) + "When the kernel connected to CHANNEL dies, call FUN. +A kernel is considered dead when CHANNEL does not receive a +response after \(* `jupyter-hb-max-failures' `time-to-dead'\) +seconds has elapsed without the kernel sending a ping back." + (declare (indent 1)) + (oset channel dead-cb fun)) + +(defun jupyter-hb--send-ping (channel &optional failed-count) + (when (jupyter-hb--pingable-p channel) + (condition-case nil + (progn + (zmq-send (oref channel socket) "ping") + (run-with-timer + (oref channel time-to-dead) nil + (lambda () + (when-let* ((sock (and (jupyter-hb--pingable-p channel) + (oref channel socket)))) + (oset channel beating + (condition-case nil + (and (zmq-recv sock zmq-DONTWAIT) t) + ((zmq-EINTR zmq-EAGAIN) nil))) + (if (oref channel beating) + (jupyter-hb--send-ping channel) + ;; Reset the socket + (jupyter-stop channel) + (jupyter-start channel) + (or failed-count (setq failed-count 0)) + (if (< failed-count jupyter-hb-max-failures) + (jupyter-hb--send-ping channel (1+ failed-count)) + (oset channel paused t) + (when (functionp (oref channel dead-cb)) + (funcall (oref channel dead-cb))))))))) + ;; FIXME: Should be a part of `jupyter-hb--pingable-p' + (zmq-ENOTSOCK + (jupyter-hb-pause channel) + (oset channel socket nil))))) + +(provide 'jupyter-zmq-channel) + +;;; jupyter-zmq-channel.el ends here