emacs-jupyter/jupyter-server-kernel.el
2023-02-13 20:22:50 -06:00

534 lines
22 KiB
EmacsLisp

;;; jupyter-server-kernel.el --- Working with kernels behind a Jupyter server -*- lexical-binding: t -*-
;; Copyright (C) 2020 Nathaniel Nicandro
;; Author: Nathaniel Nicandro <nathanielnicandro@gmail.com>
;; Created: 23 Apr 2020
;; This program is free software; you can redistribute it and/or
;; modify it under the terms of the GNU General Public License as
;; published by the Free Software Foundation; either version 3, or (at
;; your option) any later version.
;; This program is distributed in the hope that it will be useful, but
;; WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;; General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with GNU Emacs; see the file COPYING. If not, write to the
;; Free Software Foundation, Inc., 59 Temple Place - Suite 330,
;; Boston, MA 02111-1307, USA.
;;; Commentary:
;; Holds the definitions of `jupyter-server', what communicates to the
;; Jupyter server using the REST API, and `jupyter-kernel-server' a
;; representation of a kernel on a server.
;;; Code:
(require 'jupyter-kernel)
(require 'jupyter-rest-api)
(require 'jupyter-server-ioloop)
(require 'jupyter-monads)
(defgroup jupyter-server-kernel nil
"Kernel behind a Jupyter server"
:group 'jupyter)
;;; `jupyter-server'
(defvar-local jupyter-current-server nil
"The `jupyter-server' associated with the current buffer.
Used in, e.g. a `jupyter-server-kernel-list-mode' buffer.")
(put 'jupyter-current-server 'permanent-local t)
(defvar jupyter--servers-1 (make-hash-table :weakness 'value :test #'equal))
(defvar jupyter--servers nil)
;; TODO: We should really rename `jupyter-server' to something like
;; `jupyter-server-client' since it isn't a representation of a server, but a
;; communication channel with one.
(defclass jupyter-server (jupyter-rest-client eieio-instance-tracker)
((tracking-symbol :initform 'jupyter--servers)
(conn :type jupyter-connection)
(handlers :type list :initform nil)
(kernelspecs
:type json-plist
:initform nil
:documentation "Kernelspecs for the kernels available behind this gateway.
Access should be done through `jupyter-available-kernelspecs'.")))
(cl-defmethod make-instance ((class (subclass jupyter-server)) &rest slots)
(cl-assert (plist-get slots :url))
(or (gethash (plist-get slots :url) jupyter--servers-1)
(puthash (plist-get slots :url)
(cl-call-next-method) jupyter--servers-1)))
(defun jupyter-server-ioloop-io (ioloop)
(let* ((ids '())
(event-pub (jupyter-publisher))
(channels-pub (jupyter-publisher))
(event-handler
(lambda (event)
(if (not (memq (car event)
'(connect-channels disconnect-channels)))
(jupyter-run-with-io event-pub
(jupyter-publish event))
(let ((id (cadr event)))
(pcase (car event)
('connect-channels
(cl-pushnew id ids :test #'string=))
('disconnect-channels
(cl-callf2 delete id ids))))
;; Notify subscribers of CHANNELS-PUB that the connected
;; kernels have changed.
(jupyter-run-with-io channels-pub
(jupyter-publish event)))))
(start
(lambda ()
(unless (jupyter-ioloop-alive-p ioloop)
;; Write the cookies to file so that they can be read by
;; the subprocess.
(url-cookie-write-file)
(jupyter-ioloop-start ioloop event-handler)
(when ids
(let ((head ids))
;; Reset KERNEL-IDS since it will be updated after the
;; channels have been re-connected.
(setq ids nil)
(while head
(jupyter-send ioloop 'connect-channels (pop head))))))
nil))
(action-sub
(jupyter-subscriber
(lambda (action)
(pcase (if (listp action) (car action) action)
('send
(funcall start)
(apply #'jupyter-send ioloop action))
('event
(funcall start)
(apply #'jupyter-send ioloop (cdr action)))
('start (funcall start))
('stop
(when (jupyter-ioloop-alive-p ioloop)
(jupyter-ioloop-stop ioloop))))))))
(jupyter-return-delayed
(list action-sub channels-pub event-pub))))
(defun jupyter-server-synced-channel-action-sub (action-sub channels-pub)
"Return a subscriber that connects/disconnects kernel channels.
The subscriber will wait until the channels have been
connected/disconnected before returning.
Content like '(connect-channels ID) or '(disconnect-channels ID)
can be submitted to connect or disconnect the WebSocket channels
of a kernel with ID, a string.
ACTION-SUB is a subscriber of content like
'(event connect-channels ID)
and does the actual connecting/disconnecting of kernel channels.
CHANNELS-PUB is a publisher of the status changes of a kernel's
channels and publishes content like '(connect-channels ID) when
the corresponding action has been completed."
(jupyter-subscriber
(lambda (content)
(unless (and (memq (car-safe content)
'(connect-channels disconnect-channels))
(stringp (car (cdr-safe content))))
(error "Unknown value: %s" content))
(pcase-let ((`(,action ,id) content)
(done nil))
(jupyter-run-with-io channels-pub
;; TODO: (subscribe (subscriber ...)) -> (subscribe ...)
;;
;; Need to make a publisher struct type to distinguish
;; between publisher functions and regular functions
;; first.
(jupyter-subscribe
(jupyter-subscriber
(lambda (event)
(when (and (eq (car event) action)
(string= id (cadr event)))
(setq done t)
(jupyter-unsubscribe))))))
(jupyter-run-with-io action-sub
(jupyter-publish (list 'event action id)))
;; TODO: Synchronization I/O actions?
;;
;; (jupyter-with-io pub
;; (jupyter-wait (lambda () cond)))
(jupyter-with-timeout
(nil jupyter-default-timeout
(error "Timeout when %sconnecting server channels"
(if (eq action 'connect-channels) "" "dis")))
done)))))
;; TODO: Figure out how to refresh the connection with new
;; auth-headers. I think its just call this function again. Due to
;; the functional design, all references to the old objects should get
;; cleaned up.
(defun jupyter-server-io (server)
(let ((ioloop (jupyter-server-ioloop
:url (oref server url)
:ws-url (oref server ws-url)
:ws-headers (jupyter-api-auth-headers server))))
;; TODO: Another instance where it would be great for mlet* to
;; support `pcase' patterns. Or should it be the other way round?
;; Make a `pcase' macro for I/O values.
;;
;; (pcase (jupyter-server-ioloop-io ioloop)
;; ((jupyter-io `(,action-sub ,kernel-channels-pub ,ioloop-event-pub))
;; ...))
(jupyter-mlet* ((value (jupyter-server-ioloop-io ioloop)))
(pcase-let ((`(,action-sub ,channels-pub ,event-pub) value))
(jupyter-add-finalizer server
(lambda ()
(jupyter-run-with-io action-sub
(jupyter-publish 'stop))))
(jupyter-return-delayed
(list action-sub
(jupyter-server-synced-channel-action-sub
action-sub channels-pub)
event-pub))))))
(cl-defmethod jupyter-io ((server jupyter-server))
(jupyter-server-io server))
(defun jupyter-servers ()
"Return a list of all `jupyter-server's."
jupyter--servers)
(defun jupyter-gc-servers ()
"Forget `jupyter-servers' that are no longer accessible at their hosts."
(dolist (server (jupyter-servers))
(unless (jupyter-api-server-exists-p server)
;; TODO: Stopping a connection, stops all subordinate
;; connections and disconnects all subordinate clients.
(jupyter-stop (jupyter-io server))
(jupyter-api-delete-cookies (oref server url))
(delete-instance server))))
(cl-defmethod jupyter-api-request :around ((server jupyter-server) _method &rest _plist)
(condition-case nil
(cl-call-next-method)
(jupyter-api-unauthenticated
(if (memq jupyter-api-authentication-method '(ask token password))
(oset server auth jupyter-api-authentication-method)
(error "Unauthenticated request, can't attempt re-authentication \
with default `jupyter-api-authentication-method'"))
(prog1 (cl-call-next-method)
(jupyter-reauthenticate-websockets server)))))
(cl-defmethod jupyter-server-kernelspecs ((server jupyter-server) &optional refresh)
"Return the kernelspecs on SERVER.
By default the available kernelspecs are cached. To force an
update of the cached kernelspecs, give a non-nil value to
REFRESH.
The kernelspecs are returned in the same form as returned by
`jupyter-available-kernelspecs'."
(when (or refresh (null (oref server kernelspecs)))
(let ((specs (jupyter-api-get-kernelspec server)))
(unless specs
(error "Can't retrieve kernelspecs from server @ %s" (oref server url)))
(oset server kernelspecs specs)
(plist-put (oref server kernelspecs) :kernelspecs
(cl-loop
with specs = (plist-get specs :kernelspecs)
for (_ spec) on specs by #'cddr
for name = (plist-get spec :name)
collect (make-jupyter-kernelspec
:name name
:plist (plist-get spec :spec))))))
(plist-get (oref server kernelspecs) :kernelspecs))
(cl-defmethod jupyter-server-has-kernelspec-p ((server jupyter-server) name)
"Return non-nil if SERVER can launch kernels with kernelspec NAME."
(jupyter-guess-kernelspec name (jupyter-server-kernelspecs server)))
;;; Kernel definition
(cl-defstruct (jupyter-server-kernel
(:include jupyter-kernel))
(server jupyter-current-server
:read-only t
:documentation "The kernel server.")
;; TODO: Make this read only by only allowing creating
;; representations of kernels that have already been launched and
;; have a connection to the kernel.
(id nil
:type (or null string)
:documentation "The kernel ID."))
(cl-defmethod jupyter-alive-p ((kernel jupyter-server-kernel))
(pcase-let (((cl-struct jupyter-server-kernel server id) kernel))
(and id server
;; TODO: Cache this call
(condition-case err
(jupyter-api-get-kernel server id)
(file-error nil) ; Non-existent server
(jupyter-api-http-error
(unless (= (nth 1 err) 404) ; Not Found
(signal (car err) (cdr err)))))
(cl-call-next-method))))
(defun jupyter-server-kernel (&rest args)
"Return a `jupyter-server-kernel' initialized with ARGS."
(apply #'make-jupyter-server-kernel args))
(cl-defmethod jupyter-kernel :extra "server" (&rest args)
"Return a representation of a kernel on a Jupyter server.
If ARGS has a :server key, return a `jupyter-server-kernel'
initialized using ARGS. If ARGS also has a :spec key, whose
value is the name of a kernelspec, the returned kernel's spec
slot will be the corresponding `jupyter-kernelspec'.
Call the next method if ARGS does not contain :server."
(let ((server (plist-get args :server)))
(if (not server) (cl-call-next-method)
(cl-assert (object-of-class-p server 'jupyter-server))
(let ((spec (plist-get args :spec)))
(when (stringp spec)
(plist-put args :spec
;; TODO: (jupyter-server-kernelspec server "python3")
;; which returns an I/O action and then arrange
;; for that action to be bound by mlet* and set
;; as the spec value. Or better yet, have
;; `jupyter-kernel' return a delayed kernel with
;; the server connection already open and
;; kernelspecs already retrieved.
(or (jupyter-guess-kernelspec
spec (jupyter-server-kernelspecs server))
;; TODO: Return the error to the I/O context.
(error "No kernelspec matching %s @ %s" spec
(oref server url))))))
(apply #'jupyter-server-kernel args))))
;;; Client connection
(defun jupyter-server-kernel-io (kernel)
;; TODO: What about disconnecting channels? Do that at a later
;; stage.
(pcase-let (((cl-struct jupyter-server-kernel server id) kernel)
(discarded nil))
(jupyter-mlet* ((server-io (jupyter-io server)))
(pcase-let*
((`(,action-sub ,channel-action ,event-pub) server-io)
(kernel-io
(jupyter-publisher
(lambda (event)
(if discarded
(error "Kernel I/O no longer available")
(pcase event
((and `(message ,kid . ,rest)
(guard (string= kid id)))
(jupyter-content rest))
(`(unsubscribe ,kid)
(when (string= kid id)
(jupyter-unsubscribe)))
(_
(jupyter-run-with-io action-sub
(jupyter-publish (cl-list* 'send id args))))))))))
(jupyter-do
(jupyter-with-io channel-action
(jupyter-publish (list 'connect-channels id)))
(jupyter-with-io event-pub
(jupyter-subscribe kernel-io))
(jupyter-return-delayed
(list kernel-io
;; TODO: Bring this, as an action, into kernel-io
(lambda ()
(jupyter-run-with-io event-pub
;; TODO: How can this be avoided?
(jupyter-publish (list 'unsubscribe id)))
(jupyter-run-with-io channel-action
(jupyter-publish (list 'disconnect-channels id)))
(setq discarded t)))))))))
(cl-defmethod jupyter-io ((kernel jupyter-server-kernel))
(jupyter-server-kernel-io kernel))
;;; Websocket IO
(defvar jupyter-server-websockets (make-hash-table :weakness 'key :test 'eq))
(defun jupyter-reauthenticate-websockets (server)
(let ((headers (jupyter-api-auth-headers server)))
(setf (gethash server jupyter-server-websockets)
(delq nil
(mapcar
(lambda (ws)
(when (websocket-openp ws)
(websocket-close ws)
(websocket-open
:on-open (websocket-on-open ws)
:custom-header-alist headers)
ws))
(gethash server jupyter-server-websockets))))))
(defun jupyter--websocket-io (kernel)
(pcase-let
(((cl-struct jupyter-server-kernel server id) kernel)
(msg-pub (jupyter-publisher))
(status-pub (jupyter-publisher)))
(let ((ws (jupyter-api-kernel-websocket
server id
:custom-header-alist (jupyter-api-auth-headers server)
;; TODO: on-error publishes to status-pub
:on-message (lambda (_ws frame)
(pcase (websocket-frame-opcode frame)
((or 'text 'binary)
(let* ((msg (jupyter-read-plist-from-string
(websocket-frame-payload frame)))
(header (plist-get msg :header))
(pheader (plist-get msg :parent_header)))
;; TODO: Remove the need for this by getting rid of
;; message type keywords.
(plist-put msg :msg_type (jupyter-message-type-as-keyword
(plist-get msg :msg_type)))
(plist-put header :msg_type (jupyter-message-type-as-keyword
(plist-get header :msg_type)))
(plist-put pheader :msg_type (jupyter-message-type-as-keyword
(plist-get pheader :msg_type)))
(jupyter-run-with-io msg-pub
(jupyter-publish (cons 'message msg)))))
(_
(jupyter-run-with-io status-pub
(jupyter-publish
(list 'error (websocket-frame-opcode frame))))))))))
(push ws (gethash server jupyter-server-websockets))
(list
ws
msg-pub
status-pub))))
(cl-defmethod jupyter-websocket-io :around (thing)
"Cache the I/O object of THING in `jupyter-io-cache'."
(or (gethash thing jupyter-io-cache)
(puthash thing (cl-call-next-method) jupyter-io-cache)))
(cl-defmethod jupyter-websocket-io ((kernel jupyter-server-kernel))
"Return a list of three elements representing an I/O connection to kernel.
The returned list looks like (ACTION-SUB MSG-PUB STATUS-PUB)
where
ACTION-SUB is a subscriber of websocket actions to start, stop,
or send a Jupyter message on the websocket.
MSG-PUB is a publisher of Jupyter messages received from the
websocket.
STATUS-PUB is a publisher of status changes to the websocket.
TODO The form of content each sends/consumes."
(jupyter-do-launch kernel)
(pcase-let*
((`(,ws ,msg-pub ,status-pub) (jupyter--websocket-io kernel))
(discarded nil)
(kernel-io
(jupyter-publisher
(lambda (event)
(if discarded
;; TODO: What to do here?
(error "Kernel I/O discarded")
(pcase event
(`(message . ,rest) (jupyter-content rest))
(`(send ,channel ,msg-type ,content ,msg-id)
(websocket-send-text
ws (jupyter-encode-raw-message
(plist-get (websocket-client-data ws) :session) msg-type
:channel channel
:msg-id msg-id
:content content)))
('start (websocket-ensure-connected ws))
('stop (websocket-close ws))))))))
(jupyter-do
(jupyter-with-io msg-pub
(jupyter-subscribe kernel-io))
(jupyter-return-delayed
(list kernel-io
nil
;; (make-finalizer
;; (lambda ()
;; (websocket-close ws)
;; (message "DISCARDED")
;; (setq discarded t)))
)))))
(cl-defmethod jupyter-new-repl
((kernel jupyter-kernel)
&optional repl-name associate-buffer client-class
(display t))
(cl-assert (jupyter-alive-p kernel))
(or client-class (setq client-class 'jupyter-repl-client))
(jupyter-error-if-not-client-class-p client-class 'jupyter-repl-client)
(jupyter-bootstrap-repl
(jupyter-client kernel client-class)
repl-name associate-buffer display))
;;; Kernel management
;; The KERNEL argument is optional here so that `jupyter-do-launch'
;; does not require more than one argument just to handle this case.
(cl-defmethod jupyter-do-launch ((server jupyter-server) &optional (kernel string))
(cl-check-type kernel string)
(let* ((spec (jupyter-guess-kernelspec
kernel (jupyter-server-kernelspecs server)))
(plist (jupyter-api-start-kernel
server (jupyter-kernelspec-name spec))))
(jupyter-kernel :server server :id (plist-get plist :id) :spec spec)))
;; FIXME: Don't allow creating kernels without them being launched.
(cl-defmethod jupyter-do-launch ((kernel jupyter-server-kernel))
"Launch KERNEL based on its kernelspec.
When KERNEL does not have an ID yet, launch KERNEL on SERVER
using its SPEC."
(pcase-let (((cl-struct jupyter-server-kernel server id spec session) kernel))
(unless session
(and id (setq id (or (jupyter-server-kernel-id-from-name server id) id)))
(if id
;; When KERNEL already has an ID before it has a session,
;; assume we are connecting to an already launched kernel. In
;; this case, make sure the KERNEL's SPEC is the same as the
;; one being connected to.
;;
;; Note, this also has the side effect of raising an error
;; when the ID does not match one on the server.
(unless spec
(let ((model (jupyter-api-get-kernel server id)))
(setf (jupyter-kernel-spec kernel)
(jupyter-guess-kernelspec
(plist-get model :name)
(jupyter-server-kernelspecs server)))))
(let ((plist (jupyter-api-start-kernel
server (jupyter-kernelspec-name spec))))
(setf (jupyter-server-kernel-id kernel) (plist-get plist :id))
(sit-for 1)))
;; TODO: Replace with the real session object
(setf (jupyter-kernel-session kernel) (jupyter-session))))
(cl-call-next-method))
(cl-defmethod jupyter-do-shutdown ((kernel jupyter-server-kernel))
(pcase-let (((cl-struct jupyter-server-kernel server id session) kernel))
(cl-call-next-method)
(when session
(jupyter-api-shutdown-kernel server id))))
(cl-defmethod jupyter-do-interrupt ((kernel jupyter-server-kernel))
(pcase-let (((cl-struct jupyter-server-kernel server id) kernel))
(jupyter-api-interrupt-kernel server id)))
(provide 'jupyter-server-kernel)
;;; jupyter-server-kernel.el ends here