use homebrew process monitoring instead of systemd

This commit is contained in:
Valentin Boettcher 2020-08-14 18:00:53 +02:00
parent f0bc616919
commit ecce9b573b
7 changed files with 90 additions and 729 deletions

View file

@ -1,6 +1,5 @@
(ns stream.commander.api
(:require [stream.commander.systemd :as sys]
[stream.commander.journal :as journal]
(:require [stream.commander.processes :as procs]
[clojure.string :as string]
[clojure.java.io :as io]
[slingshot.slingshot :refer [throw+]]
@ -26,7 +25,9 @@
;; TODO: sanitize custom flags
(defrecord process
[id process-name unit-name monitor supervisor ffmpeg-config problems])
[id process-name monitor status control ffmpeg-config problems]
procs/ControllableProcess
(control [this] control))
(defn process=
"Tests `proc-1` and `proc-2` for equality."
@ -39,7 +40,12 @@
true)))
(def ^:private processes (ref {}))
(def ^:private dbus-monitor (chan))
(defn get-process!
"Get the process with the id."
[id]
(get @processes id))
(def ^:private master-monitor (chan))
(def monitor (a/pub master-monitor :type))
@ -58,6 +64,9 @@
"Path must exist, be writable, be readable."]}
processdb-directory "./processes")
(defconfig ^{:validate [integer? "Must be an integer."]}
default-buffer-size 50)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Utilities ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -117,162 +126,49 @@
; Monitoring ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- parse-dbus-event
"Parses a dbus state `update` into an event."
[update id]
(condp = (:ActiveState update)
"failed" {:event :failed
:reason (keyword (:SubState update))
;; we take the last message because the journal does not
;; distinct between message types from the executable
:error
(let [process (get @processes id)]
(:message
(first
(journal/get-logs!
(:unit-name process)
:number 1
:executable (:ffmpeg-path (:ffmpeg-config process))))))}
"active" {:event :active :reason (keyword (:SubState update))}
"activating" {:event :activating :reason (keyword (:SubState update))}
"deactivating" {:event :deactivating :reason (keyword (:SubState update))}
"inactive" {:event :inactive :reason (keyword (:SubState update))}
nil))
(defn- put-process-event!
[id data]
(a/put! master-monitor {:type :process-event
:id id
:data data}))
:event data}))
(defn- handle-monitor-event!
"Handles a service monitor event in the supervisor thread."
[id event queue]
(let [{type :type
data :data} event]
(condp = type
:dbus
(if-let [parsed (parse-dbus-event data id)]
(do (trace "Dbus Event" id parsed)
(put-process-event! id parsed)
(reduce (fn [queue element]
(if (or (if-let [watch-for (:event element)]
(= (:event parsed) watch-for)
false)
(if-let [matcher (:matcher element)]
(matcher parsed)
false))
(do
(trace id
"Delivering event:" parsed)
(deliver (:promise element) parsed)
queue)
(conj queue element))) [] queue))
queue)
queue)))
(defn- handle-control-event!
"Handles a control message in the supervisor thread.
Unknown events are a noop."
[event queue]
(condp = (:command event)
:wait-for
(do (trace "Waiting for" event)
(conj queue
{:timeout (timeout (:timeout event))
:event (:event event)
:matcher (:matcher event)
:promise (:promise event)}))
queue))
(defn- handle-timeout-event!
"Handles a timed-out waiter in the supervisor thread.
Removes that waiter from the queue and resolves its promise with a
timeout message."
[channel queue]
(trace "Handling timeout.")
(first
(reduce
(fn [[queue found] element]
(if (and (not found) (= (:timeout element) channel))
(do (deliver (:promise element)
:timeout)
[queue true])
[(conj queue element) found]))
[[] false] queue)))
(defn- handle-status-event!
"Handles status `event`s from the process with the `id`."
[id event]
(case (:type event)
:error (when (= :nonzero-exit (:detail-type :nonzero-exit))
(when-let [proc (get-process! id)]
(diagnose-error (:stderr (:details event)) proc))))
(put-process-event! id event))
;; TODO: specs
;; TODO: document event types
(defn- attach-supervisor!
"Attaches a monitor consumer to the monitor channel of a process.
"Attaches a monitor consumer to the `status` channel of a process with the `id`.
It processes all status messages from the process and forwards them
to the main monitor if necessary."
[id monitor]
to the main monitor if necessary.
Returns a control channel, whose sole purpose it is to stop the supervisor thread."
[id status]
(let [control (chan)]
(thread
(trace "Monitoring" id)
(loop [queue []]
(loop []
(let [[event channel]
(alts!! (conj (map :timeout queue) monitor control))]
(alts!! [status control])]
(condp = channel
monitor
(recur (handle-monitor-event! id event queue))
status
(do
(handle-status-event! id event)
(recur))
control
(if event
(recur (handle-control-event! event queue))
(do ; channel closed -> stopp
(trace "Stopping monitoring of " id)
(put-process-event! id {:event :monitor-deleted})
(doseq [waiter queue]
(deliver (:promise waiter) :timeout))))
(recur (handle-timeout-event! channel queue))))))
(if (not event)
(do (trace "Stopping monitoring of" id)
(put-process-event! id {:event :monitor-deleted}))
(recur))))))
control))
(defn wait-for!
"Installs a waiter in the supervisor thread for a process.
Takes the `supervisor` and some keyword arguments described
below. Returns a promise.
The promise resolves to `:timeout` in case of a timeout and to the
event data in case the event occurs.
:event
: The event to wait for. For example `:active`
:matcher
: A predicate that takes a parsed event returned
by [[parse-dbus-event]]. If that predicate returns true,
the promise will be fulfilled.
:timeout
: Timeout in msec to wait. If the timeout is triggered, the
promise will be fulfilled with the value `:timeout`. Defaults to one
second.
Either `:event` or `:matcher` or both have to be given. `:event`
takes precedence.
"
[process & {:keys [event timeout matcher]
:or {timeout default-timeout}}]
(let [prom (promise)
supervisor (:supervisor process)]
(if (and (not event) (not matcher))
(throw+ {:type ::commander-error
:detail-type ::create-watch-error
:message "Either event or matcher have to be specified!"})
(a/put! supervisor
{:command :wait-for :matcher matcher
:event event :promise prom :timeout timeout}))
prom))
(defn multi-event-matcher
"Matches any of the given events."
[& events]
#(some #{(:event %1)} [:inactive :failed]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Process Management ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -295,35 +191,30 @@
audio-sampling-rate
rtsp-transport
buffer-size]} config]
(str ffmpeg-path
" -rtsp_transport " rtsp-transport
" -i " (input-string rtsp-user rtsp-password cam-ip cam-rtsp-port profile)
" -vcodec copy -b:a " audio-bitrate
" -ac " audio-channels
" -ar " audio-sampling-rate
" -f flv -bufsize " buffer-size
" -tune film " stream-server "/" stream-key
" -loglevel error")))
[ffmpeg-path
(str "-rtsp_transport" rtsp-transport)
(str "-i" (input-string rtsp-user rtsp-password cam-ip cam-rtsp-port profile))
"-vcodec copy"
(str "-b:a" audio-bitrate)
(str "-ac" audio-channels)
(str "-ar" audio-sampling-rate)
"-f flv"
(str "-bufsize" buffer-size)
(str "-tune film")
"-loglevel error" (str stream-server "/" stream-key)]))
(defn- sanitize-process-name [name]
(string/lower-case (string/replace name #"\W" "-")))
(defn get-process!
"Get the process with the id."
[id]
(get @processes id))
(declare delete-processdb-file!)
(defn delete-process!
"Deletes a process from the process map, stops it and deletes the unit file.
Returns `true` on success, `false` otherwise."
"Deletes a process from the process map, stops it and stops monitoring it."
[proc]
(info "Removing process with ID:" (:process-name proc))
(let [{:keys [unit-name monitor]} proc
[monitor close] monitor]
(close! (:supervisor proc))
(sys/remove-service! unit-name)
(close)
(let [{:keys [monitor]} proc]
(close! monitor)
(procs/stop! proc)
(procs/stop-monitor! proc)
(dosync (commute processes dissoc (:id proc)))
(delete-processdb-file! (:id proc))
true))
@ -342,15 +233,11 @@
(delete-process! proc))
(info "Creating process " process-name))
(let [ffmpeg-config (merge default-ffmpeg-config ffmpeg-config)
unit-name (str (sanitize-process-name process-name)
"-" id)
path (sys/create-service! unit-name
(ffmpeg-command ffmpeg-config)
(get ffmpeg-config :description
"FFMPEG streaming process, created by `stream`."))
monitor (sys/create-monitor! unit-name)
supervisor (attach-supervisor! id (first monitor))
process (->process id process-name unit-name monitor supervisor ffmpeg-config #{})]
[status control] (procs/launch!
(ffmpeg-command ffmpeg-config)
:stderr-buffer-size default-buffer-size)
monitor (attach-supervisor! id status)
process (->process id process-name monitor status control ffmpeg-config #{})]
(dosync
(commute processes assoc id process))
(save-process! process)
@ -363,63 +250,6 @@
(doseq [[_ proc] @processes]
(delete-process! proc)))
(defn get-process-state!
"Queries wether a process is running."
[proc]
(sys/get-service-state! (:unit-name proc)))
;; These control functions do not wait until a "final state" is
;; reached but resolve once a new, maybe unstable, state has been reached.
(defn start-process!
"Starts the service associated to the process `proc`. Returns a
promise that resolves to event `:failed` or `:active` or times out
after `timeout` ms."
([proc timeout]
(let [prom
(wait-for! proc :matcher (multi-event-matcher [:active :failed]))]
(sys/start-service! (:unit-name proc))
prom))
([proc]
(start-process! proc default-timeout)))
(defn stop-process!
"Stops the service associated to the process `proc`. Returns a promise
that resolves to event `:failed` or `:inactive` or times out after
`timeout` ms."
([proc timeout]
(let [prom
(wait-for! proc :matcher (multi-event-matcher [:inactive :failed]))]
(sys/stop-service! (:unit-name proc))
prom))
([proc]
(start-process! proc default-timeout)))
(defn restart-process!
"Restarts a process `proc` and wait for stop and start to happen
within `timeout`."
([proc timeout]
@(stop-process! proc timeout)
(start-process! proc timeout))
([proc]
(restart-process! proc default-timeout)))
(defn process-running?
"Queries wether a process is running."
[proc]
(= (get-process-state! proc) "active"))
(defn enable-process!
"Enables a process."
[proc]
(sys/enable-service! (:unit-name proc)))
(defn process-enabled?
"Enables a process."
[proc]
(= (sys/get-service-file-state! (:unit-name proc))
:enabled))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Serialization ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -509,12 +339,3 @@
(throw+ {:type ::commander-error
:detail-type :deleting-failed
:error (:throwable &throw-context)}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Init ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn init! []
"Initialize the systemd connection and makes the commander API
operable. Processess will have to be loaded manually."
(sys/init!))

View file

@ -1,84 +0,0 @@
(ns stream.commander.journal
(:require [taoensso.timbre :as timbre
:refer [log trace debug info warn error fatal report
logf tracef debugf infof warnf errorf fatalf reportf
spy get-env]]
[clojure.java.shell :refer [sh]]
[clojure.set :refer [map-invert]]
[clojure.data.json :as json]
[slingshot.slingshot :refer [throw+]]
[clojure.core.async
:as a
:refer [>! <! >!! <!! go chan buffer close! thread
alts! alts!! timeout]]
[clojure.string :as str]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Parameters and Constants ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^{:doc "The journal loglevels."}
levels {"0" :emerg,
"1" :alert,
"2" :crit,
"3" :err,
"4" :warning,
"5" :notice,
"6" :info,
"7" :debug})
(def ^:private level-strings
(map-invert levels))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Journal Commands ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- map->flags
"Converts a hashmap {:a 1 :b 2} to command line flags."
[map]
(let [flags (reduce (fn [options [flag value]]
(if (and value (not (= flag :verbatim)))
(conj options
(let [name (name flag)]
(if (> (count name) 1)
(str "--" name "=" value)
(str "-" name value))))
options))
[] map)
verb (:verbatim map)]
(if verb
(conj flags verb)
flags)))
(defn- run-journalctl-command!
"Boilerplate to run a journalctl command over the shell.
Returns the parsed json output as a lazy seq of maps.
This need not be performant."
[& {:as options}]
(let [result
(apply sh "journalctl" "-a" "--user" "-ojson" (map->flags options))]
(if (= (:exit result) 0)
(if (> (count (:out result)) 0)
(map #(json/read-str % :key-fn (comp keyword str/lower-case))
(str/split-lines (:out result)))
[])
(throw+ {:type ::journalctl-error
:detail-type ::log-read-error
:message (:err result)}))))
(defn get-logs!
"Reads the logs from a systemd user unit specified by its name: `unit`.
:number
How many of the latest log entries to read.
Defaults to 10."
[unit & {:keys [number priority executable] :or {number 10 executable false}}]
(let [logs (run-journalctl-command!
:u unit :p (get level-strings priority)
:n number :verbatim executable)]
(map
(fn [entry]
{:level (get levels (:priority entry))
:message (:message entry)})
logs)))

View file

@ -32,13 +32,15 @@
:detail-type detail-type
:details details})
(def ^:const started-message {:type :event
:event :started})
(def ^:const started-message
{:type :event
:event :started})
(defn- stopped-message
[event stderr]
{:type :event
:event :stopped :code event :stderr
:event :stopped
:code event :stderr
(loop [lines []]
(if-let [line (<!! stderr)]
(recur (cons line lines))
@ -115,7 +117,9 @@
{:proc process :exit-chan exit-chan
:stderr (make-stderr-channel! process stderr-buffer-size)})
(do
(a/put! status (error-status err :start-failed))
(a/put! status (error-status err
:start-failed
:stderr err))
(deliver prom {:success false :error err})
nil)))))
@ -168,6 +172,7 @@
(error-status
(str "Nonzero exit code! (" code ")")
:nonzero-exit
:stderr stderr
:code code))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -217,7 +222,10 @@
:stop-monitor (handle-stop-monitor! status control prom)
;; default -> ignore
(recur state)))
(do
(when prom
(resolve prom :not-implemented))
(recur state))))
exit-chan
(do
@ -249,6 +257,9 @@
(encapsulate-command
start! "Starts the process through the control channel." :start)
(encapsulate-command
restart! "Restarts the process through the control channel." :restart)
(encapsulate-command
stop! "Stops the process through the control channel." :stop)
@ -270,6 +281,7 @@
For the `options` see [[monitor-and-control-proc!]]."
[command & options]
(println command)
(let [status (chan)
control (chan)]
(let [command-list (into-array String command)

View file

@ -1,217 +0,0 @@
(ns stream.commander.systemd
(:require [cljstache.core :as template]
[clojure.java.io :as io]
[taoensso.timbre :as timbre
:refer [log trace debug info warn error fatal report
logf tracef debugf infof warnf errorf fatalf reportf
spy get-env]]
[clojure.java.shell :refer [sh]]
[slingshot.slingshot :refer [throw+]]
[clojure.core.async
:as a
:refer [>! <! >!! <!! go chan buffer close! thread
alts! alts!! timeout]])
(:import [de.thjom.java.systemd Systemd Manager Systemd$InstanceType
UnitStateListener]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Systemd Instances ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; These are instanciated toplevel and are allowed to crash the
;; program if not successfull.
(def ^:private systemd (promise))
(def ^:private manager (promise))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Unit file Management ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- render-unit-file
"Renders the unit file from a template.
The template can be found at:
resources/commander/templates/unit.mustache"
[command description target]
(template/render-resource "commander/templates/unit.mustache"
{:description description
:command command
:target target}))
(defn get-unit-path [name]
(str (System/getProperty "user.home") "/.config/systemd/user/" name ".service"))
(defn create-unit-file!
"Creates or overwrites a service unit file in the appropriate
directory and returns the file path."
([name command description]
(create-unit-file! name command description "default.target"))
([name command description target]
(let [unit-contents (render-unit-file command description target)
path (get-unit-path name)
file (io/as-file path)]
(.mkdirs (.getParentFile file))
(spit path unit-contents)
(debug "Wrote a unit file to:" path)
path)))
(defn remove-unit-file!
"Removes the unit file with the given name. Returns true if file was
deleted."
[name]
(let [path (get-unit-path name)]
(debug "Deleting a unit file:" path)
(.delete (io/as-file path))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Systemd Handling ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; For now these work with the service name. May be generalized to a
;; record some time.
;; The control commands are implemented through shell calls, because
;; the dbus library is broken and the performance is not important.
;; The commands are implemented synchronousily.
(defn- run-systemd-command! [& commands]
(let [result (apply sh "systemctl" "--no-block"
"--job-mode=replace" "--user" "--job-mode=replace"
commands)]
(if (= (:exit result) 0)
true
(throw+ {:type ::systemd-error
:detail-type ::command-error
:message (:err result)}))))
(defn reload-systemd!
"Reloads the systemd user instance."
[]
(. @manager reload))
(defn- get-service! [name]
(. @manager getService name))
(defn start-service!
"Starts the userspace service with the name."
[name]
(run-systemd-command! "start" name))
(defn restart-service!
"Restarts the userspace service with the name."
[name]
(run-systemd-command! "restart" name))
(defn stop-service!
"Stops the userspace service with the name."
[name]
(run-systemd-command! "stop" name))
(defn get-service-state!
"Gets the ActiveState for the service of the name.
Refer to
[the systemd docs](https://www.freedesktop.org/wiki/Software/systemd/dbus/)
for further information."
[name]
(keyword (. (get-service! name) getActiveState)))
(defn get-service-load-state!
"Gets the LoadState for the process of the name.
Refer to
[the systemd docs](https://www.freedesktop.org/wiki/Software/systemd/dbus/)
for further information."
[name]
(keyword (. (get-service! name) getLoadState)))
(defn get-service-file-state!
"Gets the UnitFileState for the process of the name.
Returns false if the service file isn't found.
Refer to
[the systemd docs](https://www.freedesktop.org/wiki/Software/systemd/dbus/)
for further information."
[name]
(let [state (. (get-service! name) getUnitFileState)]
(if (> (count state) 0)
(keyword state)
false)))
(defn enable-service!
"Enables the service with the name. Returns true on success."
[name]
(run-systemd-command! "enable" name))
(defn disable-service!
"Disables the service with the name."
[name]
(run-systemd-command! "disable" name))
(defn create-service!
"Creates a unit file and reloads systemd. See `create-unit-file`."
([name command description]
(create-service! name command description "default.target"))
([name command description target]
(if (re-find #"[0-9]" (str (first name)))
(throw+ {:type ::systemd-error
:detail-type ::create-error
:message "Service name can't start with a digit."}))
(create-unit-file! name command description target)
(reload-systemd!)))
(defn remove-service!
"Stops the service, removes the unit file and reloads systemd."
[name]
(stop-service! name)
(disable-service! name)
(remove-unit-file! name)
(reload-systemd!))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Process Management ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- DBusMap->map [map]
(reduce (fn [result [key value]]
(assoc result (keyword key) (. value getValue)))
{} map))
;; TODO: implement removal
(defn create-monitor!
"Creates a monitoring channel for the service with the name.
Returns a vector with the channel and a function to close it."
[name]
(debug "Creating monitor for service:" name)
(let [chan (chan)
listener
(reify
UnitStateListener
(stateChanged [this unit properties]
(a/put! chan {:data (DBusMap->map properties)
:type :dbus})))]
(. (get-service! name) addListener listener)
[chan (fn []
(close! chan)
(. (get-service! name) removeListener listener))]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Init ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn init!
"Initialize systemd manager etc..."
[]
(info "starting systemd interface")
(deliver systemd (. Systemd get Systemd$InstanceType/USER))
(deliver manager (. @systemd getManager)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Graveyard ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; (comment
;; (. manager addConsumer ManagerInterface$JobRemoved
;; (reify DBusSigHandler (handle [this sig] (println sig)))))

View file

@ -6,8 +6,7 @@
(defn init! []
"Initialize all the modules."
(logging/init!)
(commander/init!))
(logging/init!))
(defn -main
"I don't do a whole lot ... yet."

View file

@ -1,6 +1,5 @@
(ns stream.commander-test
(:require [stream.commander.systemd :as systemd]
[stream.commander.journal :as journal]
(:require [stream.commander.processes :as p]
[stream.commander.api :as api]
[stream.core :as core]
[stream.util.logging :as logging]
@ -12,130 +11,6 @@
(core/init!)
(deftest unit-files
(testing "The rendering."
(is (#'systemd/render-unit-file "test" "test" "test")))
(let [name (str (java.util.UUID/randomUUID))]
(testing "Writing a unit file."
(let [file (io/as-file
(systemd/create-unit-file! name
"test" "test" "test"))]
(is (.exists file))))
(testing "Deleting a unit file"
(systemd/remove-unit-file! name)
(is (not (.exists (io/as-file (systemd/get-unit-path name))))))))
(deftest journal-utilities
(testing "map to flags"
(is (= (#'journal/map->flags {:t 1 :u 2})
["-t1" "-u2"]))
(is (= (#'journal/map->flags {:t 1 :u 2 :test 11})
["-t1" "-u2" "--test=11"]))
(is (= (#'journal/map->flags {:t 1 :u 2 :test 11 :verbatim "hi"})
["-t1" "-u2" "--test=11" "hi"]))))
(deftest journal
(testing "returns empty seq if there are no logs"
(is (empty? (journal/get-logs! "nonexistent"))))
(testing "throws upon error in journalctl"
(try+
(#'journal/run-journalctl-command! :uuuu "bad")
(is false "No exception thrown.")
(catch [:type :stream.commander.journal/journalctl-error] _
(is true))
(catch Object _
(is false "Wrong exception thrown.")))))
(def script
(str "/bin/bash -c \""
"echo test; "
">&2 echo error;"
"cat /dev/zero"
"\""))
(deftest systemd-services
(let [name (str "testing-" (java.util.UUID/randomUUID))
unit-path
(systemd/create-unit-file! name
script "test service")]
(testing "loading the service"
(systemd/reload-systemd!)
(is (= :loaded (systemd/get-service-load-state! name))))
(testing "starting the service"
(systemd/start-service! name)
(while (= :activating (systemd/get-service-state! name))
(a/<!! (a/timeout 1000)))
(is (= :active (systemd/get-service-state! name))))
(testing "stopping the service"
(systemd/stop-service! name)
(while (= :deactivating (systemd/get-service-state! name))
(a/<!! (a/timeout 1000)))
(is (= :inactive (systemd/get-service-state! name))))
(testing "reading the logs"
(let [logs (journal/get-logs! name)]
(is (= (:message (second logs)) "test"))
(is (= (:message (nth logs 2)) "error"))))
(testing "restarting the service"
(systemd/start-service! name)
(systemd/restart-service! name)
(while (some #(= % (systemd/get-service-state! name))
[:activating :deactivating :inactive])
(a/<!! (a/timeout 1000)))
(is (= :active (systemd/get-service-state! name))))
(testing "enabling the service"
(systemd/enable-service! name)
(is (= :enabled (systemd/get-service-file-state! name))))
(testing "disable the service"
(systemd/disable-service! name)
(is (= :disabled (systemd/get-service-file-state! name))))
(testing "removing the service"
(systemd/remove-service! name)
(is (= :not-found (systemd/get-service-load-state! name))))
(testing "creating a service automatically"
(systemd/create-service! name script "test service")
(is (= :loaded (systemd/get-service-load-state! name))))
(let [[channel close] (systemd/create-monitor! name)]
(testing "detecting activity"
(systemd/restart-service! name)
(is (a/<!! channel) "No value in channel!")
(while (a/poll! channel))
(close)
(is (not (a/put! channel "hi")))))
(testing "failing systemd command"
(systemd/remove-service! name)
(try+
(systemd/start-service! "non-existend")
(is false "No exception thrown.")
(catch [:type :stream.commander.systemd/systemd-error] _
(is true))
(catch Object _
(is false "Wrong exception thrown."))))
(testing "creating service with digit as first char"
(try+
(systemd/create-service! "1234" script "test service")
(catch [:type :stream.commander.systemd/systemd-error] _
(is true))
(catch Object _
(is false "Wrong exception thrown."))))
(testing "getting file state state of nonexisting service"
(is (not (systemd/get-service-file-state!
(str "does_not_exist"
(java.util.UUID/randomUUID))))))))
(deftest auxiliary-api
(testing "ffmpeg input string"
(is (= "rtsp://192.168.1.205:554/axis-media/media.amp"
@ -160,75 +35,27 @@
(let [proc (api/create-process!
"tester" config)]
(testing "creating ffmpeg process the high-level way"
(is (= :loaded (systemd/get-service-load-state!
(:unit-name proc)))))
(testing "getting the newly created process state"
(is :loaded (api/get-process-state! proc)))
(is (not @(p/alive? proc))))
(testing "starting a process"
(is (not (= :timeout (:event @(api/start-process! proc))))))
(testing "waiting for the process to start"
(let [prom (api/wait-for!
proc
:event :active :timeout 10000)]
(api/start-process! proc)
(is (not (= :timeout @prom)))))
(testing "waiting for the process to fail"
(let [prom (api/wait-for!
proc
:event :failed
:timeout 100000)]
(api/start-process! proc)
(is (not (= :timeout @prom)))))
(testing "waiting for the process to activate or fail"
(let [prom (api/wait-for!
proc
:matcher #(or (= (:event %1) :active)
(= (:event %1) :failed))
:timeout 1000)]
(api/start-process! proc)
(is (not (= :timeout @prom)))))
(testing "spilling junk into the control channel"
(a/>!! (:supervisor proc) "junk"))
(testing "starting a process (which will fail)"
(is (not (:success @(p/start! proc)))))
(testing "spilling junk into the monitor channel"
(a/>!! (first (:monitor proc)) "junk"))
(testing "waiting for a timeout"
(let [prom (api/wait-for!
proc
:event :one
:timeout 100)
prom1 (api/wait-for!
proc
:matcher #(and % false)
:timeout 100)]
(is (= :timeout @prom))
(is (= :timeout @prom1))))
(a/>!! (:monitor proc) "junk"))
(testing "stopping the process"
@(api/start-process! proc)
(let [prom (api/stop-process! proc)]
(is (not (= :timeout @prom)))
(is (not (api/process-running? proc)))))
(let [prom (p/stop! proc)]
(is (= :not-running (:message @prom)))
(is (not @(p/alive? proc)))))
(testing "re starting a process"
(is (not (= :timeout (:event @(api/restart-process! proc))))))
(testing "enabling the process"
(api/enable-process! proc)
(is (api/process-enabled? proc)))
(is (not (:success @(p/restart! proc)))))
(testing "the subscription to the master monitor"
(let [c (a/chan)]
(a/sub api/monitor :process-event c)
(api/start-process! proc)
(p/start! proc)
(is (= (:id proc) (:id (a/<!! c))))))
(testing "deleting the process"
@ -255,7 +82,7 @@
(api/delete-all-processes!)
(is (= 0 (count @@#'api/processes))))
(testing "serialize and load process"
(testing "serialize and load a process"
(let [proc (api/create-process! "test" config)
id (:id proc)
saved (api/save-process! proc)

View file

@ -36,6 +36,9 @@
(is (= :error (:type err)))
(is (= 11 (-> err :details :code))))))
(testing "spilling junk into the control channel"
(>!! control "junk"))
(testing "destroying the monitor"
(is @(stop-monitor! control))
(is (not (>!! status 0)))