mirror of
synced 2025-03-05 10:01:39 -05:00
beef up the api to allow for process supervision
This commit is contained in:
2 changed files with 211 additions and 26 deletions
@ -2,6 +2,7 @@
(:require [stream.commander.systemd :as sys]
(:require [stream.commander.systemd :as sys]
[stream.commander.journal :as journal]
[stream.commander.journal :as journal]
[clojure.string :as string]
[clojure.string :as string]
[clojure.core.match :refer [match]]
[taoensso.timbre :as timbre
[taoensso.timbre :as timbre
:refer [log trace debug info warn error fatal report
:refer [log trace debug info warn error fatal report
logf tracef debugf infof warnf errorf fatalf reportf
logf tracef debugf infof warnf errorf fatalf reportf
@ -20,9 +21,29 @@
;; TODO: sanitize custom flags
;; TODO: sanitize custom flags
(defrecord process
(defrecord process
[id process-name unit-name monitor ffmpeg-config problems])
[id process-name unit-name monitor supervisor ffmpeg-config problems])
(def ^:private processes (ref {}))
(def ^:private processes (ref {}))
(def ^:private dbus-monitor (chan))
(def ^:private master-monitor (chan))
(def monitor (a/pub master-monitor :type))
;; TODO: load from config
(def default-ffmpeg-config
{:ffmpeg-path "/usr/bin/ffmpeg" :rtsp-transport "tcp" :custom-flags ""
:audio-bitrate "128k" :audio-channels 1 :audio-sampling-rate 44100
:rtsp-user nil :rtsp-password nil :buffer-size "100M"})
; Utilities ;
(defn- input-string
[user pass ip port profile]
(str "rtsp://"
(if (and user pass) (str user ":" pass "@"))
ip ":" port "/" profile))
(defn- generate-process-id
(defn- generate-process-id
"Generates a guaranteed unique ID for the process."
"Generates a guaranteed unique ID for the process."
@ -33,17 +54,127 @@
;; TODO: load from config
(def default-ffmpeg-config
; Monitoring ;
{:ffmpeg-path "/usr/bin/ffmpeg" :rtsp-transport "tcp" :custom-flags ""
:audio-bitrate "128k" :audio-channels 1 :audio-sampling-rate 44100
:rtsp-user nil :rtsp-password nil :buffer-size "100M"})
(defn- input-string
(defn- parse-dbus-event
[user pass ip port profile]
"Parses a dbus state `update` into an event."
(str "rtsp://"
[update process]
(if (and user pass) (str user ":" pass "@"))
(condp = (:ActiveState update)
ip ":" port "/" profile))
"failed" {:event :failed
;; we take the last message because the journal does not
;; distinct between message types from the executable
:error (:message
(:unit-name process)
:number 1
:executable (:ffmpeg-path (:ffmpeg-config process)))))}
"active" {:event :started}
(:event :unknown)))
(defn- put-process-event!
[id data]
(a/put! master-monitor {:type :process-event
:id id
:data data}))
(defn- handle-monitor-event!
"Handles a service monitor event in the supervisor thread."
[id event queue]
(let [{type :type
data :data} event]
(condp = type
(let [parsed (parse-dbus-event data (get @processes id))]
(trace "Dbus Event" id parsed)
(put-process-event! id parsed)
(reduce (fn [queue element]
(println (:event parsed) (:wait-for element))
(if (= (:event parsed) (:wait-for element))
(trace id
"Delivering event:" parsed)
(deliver (:promise element) parsed)
(conj queue element))) [] queue))
(defn- handle-control-event!
"Handles a control message in the supervisor thread."
[event queue]
(condp = (:command event)
(do (trace "Waiting for" event)
(conj queue
{:timeout (timeout (:timeout event))
:wait-for (:event event)
:promise (:promise event)}))
(defn- handle-timeout-event!
"Handles a timed-out waiter in the supervisor thread.
Removes that waiter from the queue and resolves its promise with a
timeout message."
[channel queue]
(trace "Handling timeout.")
(fn [[queue found] element]
(if (and (not found) (= (:timeout element) channel))
(do (deliver (:promise element)
[queue true])
[(conj queue element) found]))
[[] false] queue)))
;; TODO: specs
;; TODO: document event types
(defn- attach-supervisor!
"Attaches a monitor consumer to the monitor channel of a process.
It processes all status messages from the process and forwards them
to the main monitor if necessary."
[id monitor]
(let [control (chan)]
(trace "Monitoring" id)
(go (a/into [] monitor)) ; flush the channel
(loop [queue []]
(println queue)
(let [[event channel]
(alts!! (conj (map :timeout queue) monitor control))]
(condp = channel
(recur (handle-monitor-event! id event queue))
(if event
(recur (handle-control-event! event queue))
(do ; channel closed -> stopp
(trace "Stopping monitoring of " id)
(put-process-event! id {:event :monitor-deleted})
(doseq [waiter queue]
(deliver (:promise waiter) :timeout))))
(recur (handle-timeout-event! channel queue))))))
(defn wait-for!
"Installs a waiter in the supervisor thread for a process.
Takes the `supervisor` channel, the `event` keyword and a `timeout` (in msec)
to wait for. Returns a promise."
[supervisor event timeout]
(let [prom (promise)]
(a/put! supervisor
{:command :wait-for :event event :promise prom :timeout timeout})
; Process Management ;
(defn ffmpeg-command
(defn ffmpeg-command
"Generate the ffmpeg command from the config."
"Generate the ffmpeg command from the config."
@ -62,7 +193,7 @@
buffer-size]} (merge default-ffmpeg-config config)]
buffer-size]} config]
(str ffmpeg-path
(str ffmpeg-path
" -rtsp_transport " rtsp-transport
" -rtsp_transport " rtsp-transport
" -i " (input-string rtsp-user rtsp-password cam-ip cam-rtsp-port profile)
" -i " (input-string rtsp-user rtsp-password cam-ip cam-rtsp-port profile)
@ -70,7 +201,8 @@
" -ac " audio-channels
" -ac " audio-channels
" -ar " audio-sampling-rate
" -ar " audio-sampling-rate
" -f flv -bufsize " buffer-size
" -f flv -bufsize " buffer-size
" -tune film " stream-server "/" stream-key)))
" -tune film " stream-server "/" stream-key
" -loglevel error")))
(defn- sanitize-process-name [name]
(defn- sanitize-process-name [name]
(string/lower-case (string/replace name #"\W" "-")))
(string/lower-case (string/replace name #"\W" "-")))
@ -80,6 +212,7 @@
channel to it. Returns the process."
channel to it. Returns the process."
[process-name ffmpeg-config]
[process-name ffmpeg-config]
(let [id (generate-process-id)
(let [id (generate-process-id)
ffmpeg-config (merge default-ffmpeg-config ffmpeg-config)
unit-name (str (sanitize-process-name process-name)
unit-name (str (sanitize-process-name process-name)
"-" id)
"-" id)
path (sys/create-service! unit-name
path (sys/create-service! unit-name
@ -87,35 +220,87 @@
(get ffmpeg-config :description
(get ffmpeg-config :description
"FFMPEG streaming process, created by `stream`."))
"FFMPEG streaming process, created by `stream`."))
monitor (sys/create-monitor! unit-name)
monitor (sys/create-monitor! unit-name)
process (->process id process-name unit-name monitor ffmpeg-config #{})]
supervisor (attach-supervisor! id (first monitor))
process (->process id process-name unit-name monitor supervisor ffmpeg-config #{})]
(debug "Creating process with ID:" id)
(debug "Creating process with ID:" id)
(commute processes assoc id process))
(commute processes assoc id process))
(defn get-process!
"Get the process with the id."
(get @processes id))
;; TODO: defmulti
(defmacro with-process
"A wrapper to access a process either by id or by the process itself."
[proc proc-var & body]
`(if-let [~proc-var
(string? proc)
(get-process! ~proc)
(instance? process proc)
:default proc)]
(do ~@body)
(defn delete-process!
(defn delete-process!
"Deletes a process from the process map, stops it and deletes the unit file.
"Deletes a process from the process map, stops it and deletes the unit file.
Returns `true` on success, `false` otherwise."
Returns `true` on success, `false` otherwise."
(debug "Removing process with ID:" id)
(with-process process proc
(if-let [proc (get @processes id)]
(debug "Removing process with ID:" (:id proc))
(let [{:keys [unit-name monitor]} proc
(let [{:keys [unit-name monitor]} proc
[_ close] monitor]
[monitor close] monitor]
(close! (:supervisor proc))
(sys/remove-service! unit-name)
(sys/remove-service! unit-name)
(dosync (commute processes dissoc id))
(dosync (commute processes dissoc (:id process)))
(defn delete-all-processes! []
(defn delete-all-processes! []
"Deletes all processes."
"Deletes all processes."
(doseq [[id _] @processes]
(doseq [[id _] @processes]
(delete-process! id)))
(delete-process! id)))
(defn get-process!
(defn get-process-state!
"Get the process with the id."
"Queries wether a process is running."
(get @processes id))
(with-process process proc
(sys/get-service-state! (:unit-name proc))))
(defn start-process!
"Starts the service associated to the process."
(with-process process proc
(sys/start-service! (:unit-name proc))))
(defn stop-process!
"Stops the service associated to the process."
(with-process process proc
(sys/stop-service! (:unit-name proc))))
(defn process-running?
"Queries wether a process is running."
(= (get-process-state! process) "active"))
(defn enable-process!
"Enables a process."
(with-process process proc
(sys/enable-service! (:unit-name proc))))
(defn process-enabled?
"Enables a process."
(with-process process proc
(= (sys/get-service-file-state! (:unit-name proc))
; Init ;
; Init ;
@ -84,7 +84,7 @@
(systemd/create-service! name script "test service")
(systemd/create-service! name script "test service")
(is (= :loaded (systemd/get-service-load-state! name))))
(is (= :loaded (systemd/get-service-load-state! name))))
(let [[channel close] (systemd/create-monitor! name)]
(let [[channel close] (systemd/create-monitor! name "0")]
(testing "detecting activity"
(testing "detecting activity"
(systemd/restart-service! name)
(systemd/restart-service! name)
(is (a/<!! channel) "No value in channel!")
(is (a/<!! channel) "No value in channel!")
Add table
Reference in a new issue