From ecce9b573bd45d91c91f8e7399fae36590387eda Mon Sep 17 00:00:00 2001 From: Valentin Boettcher Date: Fri, 14 Aug 2020 18:00:53 +0200 Subject: [PATCH] use homebrew process monitoring instead of systemd --- src/stream/commander/api.clj | 295 ++++++----------------------- src/stream/commander/journal.clj | 84 -------- src/stream/commander/processes.clj | 22 ++- src/stream/commander/systemd.clj | 217 --------------------- src/stream/core.clj | 3 +- test/stream/commander_test.clj | 195 ++----------------- test/stream/processes_test.clj | 3 + 7 files changed, 90 insertions(+), 729 deletions(-) delete mode 100644 src/stream/commander/journal.clj delete mode 100644 src/stream/commander/systemd.clj diff --git a/src/stream/commander/api.clj b/src/stream/commander/api.clj index 5bc863c..f7c04b0 100644 --- a/src/stream/commander/api.clj +++ b/src/stream/commander/api.clj @@ -1,6 +1,5 @@ (ns stream.commander.api - (:require [stream.commander.systemd :as sys] - [stream.commander.journal :as journal] + (:require [stream.commander.processes :as procs] [clojure.string :as string] [clojure.java.io :as io] [slingshot.slingshot :refer [throw+]] @@ -26,7 +25,9 @@ ;; TODO: sanitize custom flags (defrecord process - [id process-name unit-name monitor supervisor ffmpeg-config problems]) + [id process-name monitor status control ffmpeg-config problems] + procs/ControllableProcess + (control [this] control)) (defn process= "Tests `proc-1` and `proc-2` for equality." @@ -39,7 +40,12 @@ true))) (def ^:private processes (ref {})) -(def ^:private dbus-monitor (chan)) + +(defn get-process! + "Get the process with the id." + [id] + (get @processes id)) + (def ^:private master-monitor (chan)) (def monitor (a/pub master-monitor :type)) @@ -58,6 +64,9 @@ "Path must exist, be writable, be readable."]} processdb-directory "./processes") +(defconfig ^{:validate [integer? "Must be an integer."]} + default-buffer-size 50) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; Utilities ; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -117,162 +126,49 @@ ; Monitoring ; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defn- parse-dbus-event - "Parses a dbus state `update` into an event." - [update id] - (condp = (:ActiveState update) - "failed" {:event :failed - :reason (keyword (:SubState update)) - ;; we take the last message because the journal does not - ;; distinct between message types from the executable - :error - (let [process (get @processes id)] - (:message - (first - (journal/get-logs! - (:unit-name process) - :number 1 - :executable (:ffmpeg-path (:ffmpeg-config process))))))} - "active" {:event :active :reason (keyword (:SubState update))} - "activating" {:event :activating :reason (keyword (:SubState update))} - "deactivating" {:event :deactivating :reason (keyword (:SubState update))} - "inactive" {:event :inactive :reason (keyword (:SubState update))} - nil)) - (defn- put-process-event! [id data] (a/put! master-monitor {:type :process-event :id id - :data data})) + :event data})) -(defn- handle-monitor-event! - "Handles a service monitor event in the supervisor thread." - [id event queue] - (let [{type :type - data :data} event] - (condp = type - :dbus - (if-let [parsed (parse-dbus-event data id)] - (do (trace "Dbus Event" id parsed) - (put-process-event! id parsed) - (reduce (fn [queue element] - (if (or (if-let [watch-for (:event element)] - (= (:event parsed) watch-for) - false) - (if-let [matcher (:matcher element)] - (matcher parsed) - false)) - (do - (trace id - "Delivering event:" parsed) - (deliver (:promise element) parsed) - queue) - (conj queue element))) [] queue)) - queue) - queue))) - -(defn- handle-control-event! - "Handles a control message in the supervisor thread. - - Unknown events are a noop." - [event queue] - (condp = (:command event) - :wait-for - (do (trace "Waiting for" event) - (conj queue - {:timeout (timeout (:timeout event)) - :event (:event event) - :matcher (:matcher event) - :promise (:promise event)})) - queue)) - -(defn- handle-timeout-event! - "Handles a timed-out waiter in the supervisor thread. - Removes that waiter from the queue and resolves its promise with a - timeout message." - [channel queue] - (trace "Handling timeout.") - (first - (reduce - (fn [[queue found] element] - (if (and (not found) (= (:timeout element) channel)) - (do (deliver (:promise element) - :timeout) - [queue true]) - [(conj queue element) found])) - [[] false] queue))) +(defn- handle-status-event! + "Handles status `event`s from the process with the `id`." + [id event] + (case (:type event) + :error (when (= :nonzero-exit (:detail-type :nonzero-exit)) + (when-let [proc (get-process! id)] + (diagnose-error (:stderr (:details event)) proc)))) + (put-process-event! id event)) ;; TODO: specs ;; TODO: document event types (defn- attach-supervisor! - "Attaches a monitor consumer to the monitor channel of a process. + "Attaches a monitor consumer to the `status` channel of a process with the `id`. It processes all status messages from the process and forwards them - to the main monitor if necessary." - [id monitor] + to the main monitor if necessary. + + Returns a control channel, whose sole purpose it is to stop the supervisor thread." + [id status] (let [control (chan)] (thread (trace "Monitoring" id) - (loop [queue []] + (loop [] (let [[event channel] - (alts!! (conj (map :timeout queue) monitor control))] + (alts!! [status control])] (condp = channel - monitor - (recur (handle-monitor-event! id event queue)) + status + (do + (handle-status-event! id event) + (recur)) control - (if event - (recur (handle-control-event! event queue)) - (do ; channel closed -> stopp - (trace "Stopping monitoring of " id) - (put-process-event! id {:event :monitor-deleted}) - (doseq [waiter queue] - (deliver (:promise waiter) :timeout)))) - - (recur (handle-timeout-event! channel queue)))))) + (if (not event) + (do (trace "Stopping monitoring of" id) + (put-process-event! id {:event :monitor-deleted})) + (recur)))))) control)) -(defn wait-for! - "Installs a waiter in the supervisor thread for a process. - Takes the `supervisor` and some keyword arguments described - below. Returns a promise. - - The promise resolves to `:timeout` in case of a timeout and to the - event data in case the event occurs. - - :event - : The event to wait for. For example `:active` - - :matcher - : A predicate that takes a parsed event returned - by [[parse-dbus-event]]. If that predicate returns true, - the promise will be fulfilled. - - :timeout - : Timeout in msec to wait. If the timeout is triggered, the - promise will be fulfilled with the value `:timeout`. Defaults to one - second. - - Either `:event` or `:matcher` or both have to be given. `:event` - takes precedence. - " - [process & {:keys [event timeout matcher] - :or {timeout default-timeout}}] - (let [prom (promise) - supervisor (:supervisor process)] - (if (and (not event) (not matcher)) - (throw+ {:type ::commander-error - :detail-type ::create-watch-error - :message "Either event or matcher have to be specified!"}) - (a/put! supervisor - {:command :wait-for :matcher matcher - :event event :promise prom :timeout timeout})) - prom)) - -(defn multi-event-matcher - "Matches any of the given events." - [& events] - #(some #{(:event %1)} [:inactive :failed])) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; Process Management ; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -295,35 +191,30 @@ audio-sampling-rate rtsp-transport buffer-size]} config] - (str ffmpeg-path - " -rtsp_transport " rtsp-transport - " -i " (input-string rtsp-user rtsp-password cam-ip cam-rtsp-port profile) - " -vcodec copy -b:a " audio-bitrate - " -ac " audio-channels - " -ar " audio-sampling-rate - " -f flv -bufsize " buffer-size - " -tune film " stream-server "/" stream-key - " -loglevel error"))) + [ffmpeg-path + (str "-rtsp_transport" rtsp-transport) + (str "-i" (input-string rtsp-user rtsp-password cam-ip cam-rtsp-port profile)) + "-vcodec copy" + (str "-b:a" audio-bitrate) + (str "-ac" audio-channels) + (str "-ar" audio-sampling-rate) + "-f flv" + (str "-bufsize" buffer-size) + (str "-tune film") + "-loglevel error" (str stream-server "/" stream-key)])) (defn- sanitize-process-name [name] (string/lower-case (string/replace name #"\W" "-"))) -(defn get-process! - "Get the process with the id." - [id] - (get @processes id)) - (declare delete-processdb-file!) (defn delete-process! - "Deletes a process from the process map, stops it and deletes the unit file. - Returns `true` on success, `false` otherwise." + "Deletes a process from the process map, stops it and stops monitoring it." [proc] (info "Removing process with ID:" (:process-name proc)) - (let [{:keys [unit-name monitor]} proc - [monitor close] monitor] - (close! (:supervisor proc)) - (sys/remove-service! unit-name) - (close) + (let [{:keys [monitor]} proc] + (close! monitor) + (procs/stop! proc) + (procs/stop-monitor! proc) (dosync (commute processes dissoc (:id proc))) (delete-processdb-file! (:id proc)) true)) @@ -342,15 +233,11 @@ (delete-process! proc)) (info "Creating process " process-name)) (let [ffmpeg-config (merge default-ffmpeg-config ffmpeg-config) - unit-name (str (sanitize-process-name process-name) - "-" id) - path (sys/create-service! unit-name - (ffmpeg-command ffmpeg-config) - (get ffmpeg-config :description - "FFMPEG streaming process, created by `stream`.")) - monitor (sys/create-monitor! unit-name) - supervisor (attach-supervisor! id (first monitor)) - process (->process id process-name unit-name monitor supervisor ffmpeg-config #{})] + [status control] (procs/launch! + (ffmpeg-command ffmpeg-config) + :stderr-buffer-size default-buffer-size) + monitor (attach-supervisor! id status) + process (->process id process-name monitor status control ffmpeg-config #{})] (dosync (commute processes assoc id process)) (save-process! process) @@ -363,63 +250,6 @@ (doseq [[_ proc] @processes] (delete-process! proc))) -(defn get-process-state! - "Queries wether a process is running." - [proc] - (sys/get-service-state! (:unit-name proc))) - -;; These control functions do not wait until a "final state" is -;; reached but resolve once a new, maybe unstable, state has been reached. - -(defn start-process! - "Starts the service associated to the process `proc`. Returns a - promise that resolves to event `:failed` or `:active` or times out - after `timeout` ms." - ([proc timeout] - (let [prom - (wait-for! proc :matcher (multi-event-matcher [:active :failed]))] - (sys/start-service! (:unit-name proc)) - prom)) - ([proc] - (start-process! proc default-timeout))) - -(defn stop-process! - "Stops the service associated to the process `proc`. Returns a promise - that resolves to event `:failed` or `:inactive` or times out after - `timeout` ms." - ([proc timeout] - (let [prom - (wait-for! proc :matcher (multi-event-matcher [:inactive :failed]))] - (sys/stop-service! (:unit-name proc)) - prom)) - ([proc] - (start-process! proc default-timeout))) - -(defn restart-process! - "Restarts a process `proc` and wait for stop and start to happen - within `timeout`." - ([proc timeout] - @(stop-process! proc timeout) - (start-process! proc timeout)) - ([proc] - (restart-process! proc default-timeout))) - -(defn process-running? - "Queries wether a process is running." - [proc] - (= (get-process-state! proc) "active")) - -(defn enable-process! - "Enables a process." - [proc] - (sys/enable-service! (:unit-name proc))) - -(defn process-enabled? - "Enables a process." - [proc] - (= (sys/get-service-file-state! (:unit-name proc)) - :enabled)) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; Serialization ; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -509,12 +339,3 @@ (throw+ {:type ::commander-error :detail-type :deleting-failed :error (:throwable &throw-context)})))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - ; Init ; -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defn init! [] - "Initialize the systemd connection and makes the commander API - operable. Processess will have to be loaded manually." - (sys/init!)) diff --git a/src/stream/commander/journal.clj b/src/stream/commander/journal.clj deleted file mode 100644 index 1938acf..0000000 --- a/src/stream/commander/journal.clj +++ /dev/null @@ -1,84 +0,0 @@ -(ns stream.commander.journal - (:require [taoensso.timbre :as timbre - :refer [log trace debug info warn error fatal report - logf tracef debugf infof warnf errorf fatalf reportf - spy get-env]] - [clojure.java.shell :refer [sh]] - [clojure.set :refer [map-invert]] - [clojure.data.json :as json] - [slingshot.slingshot :refer [throw+]] - [clojure.core.async - :as a - :refer [>! !! flags - "Converts a hashmap {:a 1 :b 2} to command line flags." - [map] - (let [flags (reduce (fn [options [flag value]] - (if (and value (not (= flag :verbatim))) - (conj options - (let [name (name flag)] - (if (> (count name) 1) - (str "--" name "=" value) - (str "-" name value)))) - options)) - [] map) - verb (:verbatim map)] - (if verb - (conj flags verb) - flags))) - -(defn- run-journalctl-command! - "Boilerplate to run a journalctl command over the shell. - Returns the parsed json output as a lazy seq of maps. - This need not be performant." - [& {:as options}] - (let [result - (apply sh "journalctl" "-a" "--user" "-ojson" (map->flags options))] - (if (= (:exit result) 0) - (if (> (count (:out result)) 0) - (map #(json/read-str % :key-fn (comp keyword str/lower-case)) - (str/split-lines (:out result))) - []) - (throw+ {:type ::journalctl-error - :detail-type ::log-read-error - :message (:err result)})))) - -(defn get-logs! - "Reads the logs from a systemd user unit specified by its name: `unit`. - - :number - How many of the latest log entries to read. - Defaults to 10." - [unit & {:keys [number priority executable] :or {number 10 executable false}}] - (let [logs (run-journalctl-command! - :u unit :p (get level-strings priority) - :n number :verbatim executable)] - (map - (fn [entry] - {:level (get levels (:priority entry)) - :message (:message entry)}) - logs))) diff --git a/src/stream/commander/processes.clj b/src/stream/commander/processes.clj index aa07cbe..c45a404 100644 --- a/src/stream/commander/processes.clj +++ b/src/stream/commander/processes.clj @@ -32,13 +32,15 @@ :detail-type detail-type :details details}) -(def ^:const started-message {:type :event - :event :started}) +(def ^:const started-message + {:type :event + :event :started}) (defn- stopped-message [event stderr] {:type :event - :event :stopped :code event :stderr + :event :stopped + :code event :stderr (loop [lines []] (if-let [line ( ignore - (recur state))) + (do + (when prom + (resolve prom :not-implemented)) + (recur state)))) exit-chan (do @@ -249,6 +257,9 @@ (encapsulate-command start! "Starts the process through the control channel." :start) +(encapsulate-command + restart! "Restarts the process through the control channel." :restart) + (encapsulate-command stop! "Stops the process through the control channel." :stop) @@ -270,6 +281,7 @@ For the `options` see [[monitor-and-control-proc!]]." [command & options] + (println command) (let [status (chan) control (chan)] (let [command-list (into-array String command) diff --git a/src/stream/commander/systemd.clj b/src/stream/commander/systemd.clj deleted file mode 100644 index 8b4d9ca..0000000 --- a/src/stream/commander/systemd.clj +++ /dev/null @@ -1,217 +0,0 @@ -(ns stream.commander.systemd - (:require [cljstache.core :as template] - [clojure.java.io :as io] - [taoensso.timbre :as timbre - :refer [log trace debug info warn error fatal report - logf tracef debugf infof warnf errorf fatalf reportf - spy get-env]] - [clojure.java.shell :refer [sh]] - [slingshot.slingshot :refer [throw+]] - [clojure.core.async - :as a - :refer [>! !! (count state) 0) - (keyword state) - false))) - -(defn enable-service! - "Enables the service with the name. Returns true on success." - [name] - (run-systemd-command! "enable" name)) - -(defn disable-service! - "Disables the service with the name." - [name] - (run-systemd-command! "disable" name)) - -(defn create-service! - "Creates a unit file and reloads systemd. See `create-unit-file`." - ([name command description] - (create-service! name command description "default.target")) - ([name command description target] - (if (re-find #"[0-9]" (str (first name))) - (throw+ {:type ::systemd-error - :detail-type ::create-error - :message "Service name can't start with a digit."})) - (create-unit-file! name command description target) - (reload-systemd!))) - -(defn remove-service! - "Stops the service, removes the unit file and reloads systemd." - [name] - (stop-service! name) - (disable-service! name) - (remove-unit-file! name) - (reload-systemd!)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - ; Process Management ; -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defn- DBusMap->map [map] - (reduce (fn [result [key value]] - (assoc result (keyword key) (. value getValue))) - {} map)) - -;; TODO: implement removal -(defn create-monitor! - "Creates a monitoring channel for the service with the name. - Returns a vector with the channel and a function to close it." - [name] - (debug "Creating monitor for service:" name) - (let [chan (chan) - listener - (reify - UnitStateListener - (stateChanged [this unit properties] - (a/put! chan {:data (DBusMap->map properties) - :type :dbus})))] - (. (get-service! name) addListener listener) - [chan (fn [] - (close! chan) - (. (get-service! name) removeListener listener))])) - - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - ; Init ; -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defn init! - "Initialize systemd manager etc..." - [] - (info "starting systemd interface") - (deliver systemd (. Systemd get Systemd$InstanceType/USER)) - (deliver manager (. @systemd getManager))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - ; Graveyard ; -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -;; (comment -;; (. manager addConsumer ManagerInterface$JobRemoved -;; (reify DBusSigHandler (handle [this sig] (println sig))))) diff --git a/src/stream/core.clj b/src/stream/core.clj index 54406da..a2a8711 100644 --- a/src/stream/core.clj +++ b/src/stream/core.clj @@ -6,8 +6,7 @@ (defn init! [] "Initialize all the modules." - (logging/init!) - (commander/init!)) + (logging/init!)) (defn -main "I don't do a whole lot ... yet." diff --git a/test/stream/commander_test.clj b/test/stream/commander_test.clj index e1d736a..282449a 100644 --- a/test/stream/commander_test.clj +++ b/test/stream/commander_test.clj @@ -1,6 +1,5 @@ (ns stream.commander-test - (:require [stream.commander.systemd :as systemd] - [stream.commander.journal :as journal] + (:require [stream.commander.processes :as p] [stream.commander.api :as api] [stream.core :as core] [stream.util.logging :as logging] @@ -12,130 +11,6 @@ (core/init!) -(deftest unit-files - (testing "The rendering." - (is (#'systemd/render-unit-file "test" "test" "test"))) - - (let [name (str (java.util.UUID/randomUUID))] - (testing "Writing a unit file." - (let [file (io/as-file - (systemd/create-unit-file! name - "test" "test" "test"))] - (is (.exists file)))) - - (testing "Deleting a unit file" - (systemd/remove-unit-file! name) - (is (not (.exists (io/as-file (systemd/get-unit-path name)))))))) - -(deftest journal-utilities - (testing "map to flags" - (is (= (#'journal/map->flags {:t 1 :u 2}) - ["-t1" "-u2"])) - (is (= (#'journal/map->flags {:t 1 :u 2 :test 11}) - ["-t1" "-u2" "--test=11"])) - (is (= (#'journal/map->flags {:t 1 :u 2 :test 11 :verbatim "hi"}) - ["-t1" "-u2" "--test=11" "hi"])))) - -(deftest journal - (testing "returns empty seq if there are no logs" - (is (empty? (journal/get-logs! "nonexistent")))) - (testing "throws upon error in journalctl" - (try+ - (#'journal/run-journalctl-command! :uuuu "bad") - (is false "No exception thrown.") - (catch [:type :stream.commander.journal/journalctl-error] _ - (is true)) - (catch Object _ - (is false "Wrong exception thrown."))))) - -(def script - (str "/bin/bash -c \"" - "echo test; " - ">&2 echo error;" - "cat /dev/zero" - "\"")) - -(deftest systemd-services - (let [name (str "testing-" (java.util.UUID/randomUUID)) - unit-path - (systemd/create-unit-file! name - script "test service")] - (testing "loading the service" - (systemd/reload-systemd!) - (is (= :loaded (systemd/get-service-load-state! name)))) - - (testing "starting the service" - (systemd/start-service! name) - (while (= :activating (systemd/get-service-state! name)) - (a/!! (:supervisor proc) "junk")) + (testing "starting a process (which will fail)" + (is (not (:success @(p/start! proc))))) (testing "spilling junk into the monitor channel" - (a/>!! (first (:monitor proc)) "junk")) - - (testing "waiting for a timeout" - (let [prom (api/wait-for! - proc - :event :one - :timeout 100) - prom1 (api/wait-for! - proc - :matcher #(and % false) - :timeout 100)] - (is (= :timeout @prom)) - (is (= :timeout @prom1)))) + (a/>!! (:monitor proc) "junk")) (testing "stopping the process" - @(api/start-process! proc) - (let [prom (api/stop-process! proc)] - (is (not (= :timeout @prom))) - (is (not (api/process-running? proc))))) + (let [prom (p/stop! proc)] + (is (= :not-running (:message @prom))) + (is (not @(p/alive? proc))))) (testing "re starting a process" - (is (not (= :timeout (:event @(api/restart-process! proc)))))) - - (testing "enabling the process" - (api/enable-process! proc) - (is (api/process-enabled? proc))) + (is (not (:success @(p/restart! proc))))) (testing "the subscription to the master monitor" (let [c (a/chan)] (a/sub api/monitor :process-event c) - (api/start-process! proc) + (p/start! proc) (is (= (:id proc) (:id (a/ err :details :code)))))) + (testing "spilling junk into the control channel" + (>!! control "junk")) + (testing "destroying the monitor" (is @(stop-monitor! control)) (is (not (>!! status 0)))