Support single websocket with multiple channels

With IPython 3.0 we now support multiplexed communication over a single
websocket. At the moment iopub and shell are supported; stdin doesn't
seem to be working yet (not sure if it needs to be, though).
This commit is contained in:
John Miller 2015-01-14 17:07:28 -06:00
parent e5728799ef
commit b1f4c017e7
2 changed files with 95 additions and 60 deletions

View file

@ -51,6 +51,7 @@
kernel-id
shell-channel
iopub-channel
channels ; For IPython 3.x+
base-url ; /api/kernels/
kernel-url ; /api/kernels/<KERNEL-ID>
ws-url ; ws://<URL>[:<PORT>]
@ -82,6 +83,7 @@
:api-version (or api-version 2)
:session-id (ein:utils-uuid)
:kernel-id nil
:channels nil
:shell-channel nil
:iopub-channel nil
:base-url base-url
@ -159,16 +161,21 @@
(setf (ein:$kernel-kernel-url kernel)
(concat (ein:$kernel-base-url kernel) "/" id)))
(ein:kernel-start-channels kernel)
(let ((shell-channel (ein:$kernel-shell-channel kernel))
(iopub-channel (ein:$kernel-iopub-channel kernel)))
;; FIXME: get rid of lexical-let
(if (= (ein:$kernel-api-version kernel) 2)
(let ((shell-channel (ein:$kernel-shell-channel kernel))
(iopub-channel (ein:$kernel-iopub-channel kernel)))
;; FIXME: get rid of lexical-let
(lexical-let ((kernel kernel))
(setf (ein:$websocket-onmessage shell-channel)
(lambda (packet)
(ein:kernel--handle-shell-reply kernel packet)))
(setf (ein:$websocket-onmessage iopub-channel)
(lambda (packet)
(ein:kernel--handle-iopub-reply kernel packet)))))
(lexical-let ((kernel kernel))
(setf (ein:$websocket-onmessage shell-channel)
(setf (ein:$websocket-onmessage (ein:$kernel-channels kernel))
(lambda (packet)
(ein:kernel--handle-shell-reply kernel packet)))
(setf (ein:$websocket-onmessage iopub-channel)
(lambda (packet)
(ein:kernel--handle-iopub-reply kernel packet)))))))
(ein:kernel--handle-channels-reply kernel packet)))))))
(defun ein:kernel--ws-url (kernel ws_url)
@ -211,60 +218,74 @@ See: https://github.com/ipython/ipython/pull/3307"
;; Use "event-was-clean" when it is implemented in websocket.el.
(ein:kernel--websocket-closed kernel ws-url early)))))
(defun ein:kernel-start-channels (kernel)
(ein:kernel-stop-channels kernel)
(let* ((api-version (ein:$kernel-api-version kernel))
(ws-url (concat (ein:$kernel-ws-url kernel)
(defun ein:start-channels-multiple-websocket (kernel)
"Start kernel channels for IPython notebook v2.x"
(let* ((ws-url (concat (ein:$kernel-ws-url kernel)
(ein:$kernel-kernel-url kernel)))
(shell-session-url (concat ws-url "/shell?session_id="
(ein:$kernel-session-id kernel)))
(iopub-session-url (concat ws-url "/iopub?session_id="
(ein:$kernel-session-id kernel)))
(onclose-arg (list :ws-url ws-url
:already-called-onclose nil
:early t)))
(ein:log 'info "Starting session WS: %S" shell-session-url)
(ein:log 'info "Starting iopub WS: %S" iopub-session-url)
(setf (ein:$kernel-shell-channel kernel)
(cond ((= api-version 3)
(ein:websocket shell-session-url))
(t
(ein:websocket (concat ws-url "/shell")))))
(setf (ein:$kernel-iopub-channel kernel)
(cond ((= api-version 3)
(ein:websocket iopub-session-url))
(t
(ein:websocket (concat ws-url "/iopub")))))
(ein:log 'info "Starting WS channels: %S" ws-url)
(setf (ein:$kernel-shell-channel kernel) (ein:websocket (concat ws-url "/shell")))
(setf (ein:$kernel-iopub-channel kernel) (ein:websocket (concat ws-url "/iopub")))
(loop for c in (list (ein:$kernel-shell-channel kernel)
(ein:$kernel-iopub-channel kernel))
do (setf (ein:$websocket-onclose-args c) (list kernel onclose-arg))
do (setf (ein:$websocket-onopen c)
(lexical-let ((channel c)
(kernel kernel)
(api-version api-version)
(host (let (url-or-port
(ein:$kernel-url-or-port kernel))
(if (stringp url-or-port)
url-or-port
ein:url-localhost))))
(lambda ()
(cond ((= api-version 2)
(ein:kernel-send-cookie channel host))
((= api-version 3)
(ein:kernel-connect-request kernel (list :kernel_connect_reply (cons 'ein:kernel-on-connect kernel))))
)
(ein:kernel-send-cookie channel host)
;; run `ein:$kernel-after-start-hook' if both
;; channels are ready.
(when (ein:kernel-live-p kernel)
(ein:kernel-run-after-start-hook kernel)))))
do (setf (ein:$websocket-onclose c)
#'ein:kernel--ws-closed-callback))
#'ein:kernel--ws-closed-callback))))
(defun ein:start-channels-single-websocket (kernel)
(let* ((ws-url (concat (ein:$kernel-ws-url kernel)
(ein:$kernel-kernel-url kernel)))
(channels-url (concat ws-url "/channels?session_id="
(ein:$kernel-session-id kernel)))
(onclose-arg (list :ws-url ws-url
:already-called-onclose nil
:early t)))
(ein:log 'info "Starting channels WS: %S" channels-url)
(setf (ein:$kernel-channels kernel) (ein:websocket channels-url))
(let ((c (ein:$kernel-channels kernel)))
(setf (ein:$websocket-onclose-args c) (list kernel onclose-arg))
(setf (ein:$websocket-onopen c)
(lexical-let ((kernel kernel))
(lambda ()
(ein:kernel-connect-request kernel (list :kernel_connect_reply (cons 'ein:kernel-on-connect kernel)))
;; run `ein:$kernel-after-start-hook' if both
;; channels are ready.
(when (ein:kernel-live-p kernel)
(ein:kernel-run-after-start-hook kernel)))))
(setf (ein:$websocket-onclose c)
#'ein:kernel--ws-closed-callback))))
(defun ein:kernel-start-channels (kernel)
(ein:kernel-stop-channels kernel)
(let* ((api-version (ein:$kernel-api-version kernel))
(ws-url (concat (ein:$kernel-ws-url kernel)
(ein:$kernel-kernel-url kernel)))
(onclose-arg (list :ws-url ws-url
:already-called-onclose nil
:early t)))
(cond ((= api-version 2)
(ein:start-channels-multiple-websocket kernel))
((= api-version 3)
(ein:start-channels-single-websocket kernel)))
;; switch from early-close to late-close message after 1s
(run-at-time
1 nil
2 nil
(lambda (onclose-arg)
(plist-put onclose-arg :early nil)
(ein:log 'debug "(via run-at-time) onclose-arg changed to: %S"
@ -284,6 +305,10 @@ See: https://github.com/ipython/ipython/pull/3307"
(defun ein:kernel-stop-channels (kernel)
(when (ein:$kernel-channels kernel)
(setf (ein:$websocket-onclose (ein:$kernel-channels kernel)) nil)
(ein:websocket-close (ein:$kernel-channels kernel))
(setf (ein:$kernel-channels kernel) nil))
(when (ein:$kernel-shell-channel kernel)
(setf (ein:$websocket-onclose (ein:$kernel-shell-channel kernel)) nil)
(ein:websocket-close (ein:$kernel-shell-channel kernel))
@ -297,8 +322,11 @@ See: https://github.com/ipython/ipython/pull/3307"
(defun ein:kernel-live-p (kernel)
(and
(ein:$kernel-p kernel)
(ein:aand (ein:$kernel-shell-channel kernel) (ein:websocket-open-p it))
(ein:aand (ein:$kernel-iopub-channel kernel) (ein:websocket-open-p it))))
(or
(ein:aand (ein:$kernel-channels kernel) (ein:websocket-open-p it))
(and
(ein:aand (ein:$kernel-shell-channel kernel) (ein:websocket-open-p it))
(ein:aand (ein:$kernel-iopub-channel kernel) (ein:websocket-open-p it))))))
(defmacro ein:kernel-if-ready (kernel &rest body)
@ -339,9 +367,7 @@ http://ipython.org/ipython-doc/dev/development/messaging.html#object-information
(let* ((content (list :oname (format "%s" objname)))
(msg (ein:kernel--get-msg kernel "object_info_request" content))
(msg-id (plist-get (plist-get msg :header) :msg_id)))
(ein:websocket-send
(ein:$kernel-shell-channel kernel)
(json-encode msg))
(ein:websocket-send-shell-channel kernel msg)
(ein:kernel-set-callbacks-for-msg kernel msg-id callbacks)
msg-id)))
@ -420,9 +446,7 @@ Sample implementations
:allow_stdin allow-stdin))
(msg (ein:kernel--get-msg kernel "execute_request" content))
(msg-id (plist-get (plist-get msg :header) :msg_id)))
(ein:websocket-send
(ein:$kernel-shell-channel kernel)
(json-encode msg))
(ein:websocket-send-shell-channel kernel msg)
(unless (plist-get callbacks :execute_reply)
(ein:log 'debug "code: %s" code))
(ein:kernel-set-callbacks-for-msg kernel msg-id callbacks)
@ -457,9 +481,7 @@ http://ipython.org/ipython-doc/dev/development/messaging.html#complete
:cursor_pos cursor-pos))
(msg (ein:kernel--get-msg kernel "complete_request" content))
(msg-id (plist-get (plist-get msg :header) :msg_id)))
(ein:websocket-send
(ein:$kernel-shell-channel kernel)
(json-encode msg))
(ein:websocket-send-shell-channel kernel msg)
(ein:kernel-set-callbacks-for-msg kernel msg-id callbacks)
msg-id))
@ -508,9 +530,7 @@ Relevant Python code:
:unique unique))
(msg (ein:kernel--get-msg kernel "history_request" content))
(msg-id (plist-get (plist-get msg :header) :msg_id)))
(ein:websocket-send
(ein:$kernel-shell-channel kernel)
(json-encode msg))
(ein:websocket-send-shell-channel kernel msg)
(ein:kernel-set-callbacks-for-msg kernel msg-id callbacks)
msg-id))
@ -536,12 +556,10 @@ Example::
(ein:get-kernel)
'(:kernel_connect_reply (message . \"CONTENT: %S\\nMETADATA: %S\")))
"
(assert (ein:kernel-live-p kernel) nil "connect_reply: Kernel is not active.")
;(assert (ein:kernel-live-p kernel) nil "connect_reply: Kernel is not active.")
(let* ((msg (ein:kernel--get-msg kernel "connect_request" (make-hash-table)))
(msg-id (plist-get (plist-get msg :header) :msg_id)))
(ein:websocket-send
(ein:$kernel-shell-channel kernel)
(json-encode msg))
(ein:websocket-send-shell-channel kernel msg)
(ein:kernel-set-callbacks-for-msg kernel msg-id callbacks)
msg-id))
@ -570,9 +588,7 @@ Example::
(assert (ein:kernel-live-p kernel) nil "kernel_info_reply: Kernel is not active.")
(let* ((msg (ein:kernel--get-msg kernel "kernel_info_request" nil))
(msg-id (plist-get (plist-get msg :header) :msg_id)))
(ein:websocket-send
(ein:$kernel-shell-channel kernel)
(json-encode msg))
(ein:websocket-send-shell-channel kernel msg)
(ein:kernel-set-callbacks-for-msg kernel msg-id callbacks)
msg-id))
@ -613,6 +629,15 @@ Example::
(defun ein:kernel-set-callbacks-for-msg (kernel msg-id callbacks)
(puthash msg-id callbacks (ein:$kernel-msg-callbacks kernel)))
(defun ein:kernel--handle-channels-reply (kernel packet)
(ein:log 'debug "KERNEL--HANDLE_CHANNELS-REPLY")
(let ((channel (plist-get (ein:json-read-from-string packet) :channel)))
(cond ((string-equal channel "iopub")
(ein:kernel--handle-iopub-reply kernel packet))
((string-equal channel "shell")
(ein:kernel--handle-shell-reply kernel packet))
(t (ein:log 'warn "Received reply from unkown channel %s" channel)))))
(defun ein:kernel--handle-shell-reply (kernel packet)
(ein:log 'debug "KERNEL--HANDLE-SHELL-REPLY")
(destructuring-bind

View file

@ -107,6 +107,16 @@
(websocket-close (ein:$websocket-ws websocket)))
(defun ein:websocket-send-shell-channel (kernel msg)
(cond ((= (ein:$kernel-api-version kernel) 2)
(ein:websocket-send
(ein:$kernel-shell-channel kernel)
(json-encode msg)))
((= (ein:$kernel-api-version kernel) 3)
(ein:websocket-send
(ein:$kernel-channels kernel)
(json-encode (plist-put msg :channel "shell"))))))
(provide 'ein-websocket)
;;; ein-websocket.el ends here