implement higher level interface

This commit is contained in:
hiro98 2020-07-08 21:08:59 +02:00
parent 7a7c134f6b
commit b413c05d6f
3 changed files with 244 additions and 17 deletions

View file

@ -1,4 +1,14 @@
(ns stream.commander.api)
(ns stream.commander.api
(:require [stream.commander.systemd :as sys]
[clojure.string :as string]
[taoensso.timbre :as timbre
:refer [log trace debug info warn error fatal report
logf tracef debugf infof warnf errorf fatalf reportf
spy get-env]]
[clojure.core.async
:as a
:refer [>! <! >!! <!! go chan buffer close! thread
alts! alts!! timeout]]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -6,9 +16,102 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; TODO: specs
;; TODO: sanitize custom flags
(defrecord process
[unit-name monitor-channel ffmpeg-config problems])
[id process-name unit-name monitor ffmpeg-config problems])
(def ^:private processes (ref {}))
(def processes (ref {}))
(defn- generate-process-id
[]
(loop []
(let [id (str (java.util.UUID/randomUUID))]
(if (contains? @processes id)
(recur)
id))))
;; TODO: load from config
(def default-ffmpeg-config
{:ffmpeg-path "ffmpeg" :rtsp-transport "tcp" :custom-flags ""
:audio-bitrate "128k" :audio-channels 1 :audio-sampling-rate 44100
:rtsp-user nil :rtsp-password nil :buffer-size "100M"})
(defn- input-string
[user pass ip port profile]
(str "rtsp://"
(if (and user pass) (str user ":" pass "@"))
ip ":" port "/" profile))
(defn ffmpeg-command
"Generate the ffmpeg command from the config."
[config]
(let [{:keys
[cam-ip
cam-rtsp-port
rtsp-user
rtsp-password
profile
stream-server
stream-key
ffmpeg-path
custom-flags
audio-bitrate
audio-channels
audio-sampling-rate
rtsp-transport
buffer-size]} (merge default-ffmpeg-config config)]
(str ffmpeg-path
" -rtsp_transport " rtsp-transport
" -i " (input-string rtsp-user rtsp-password cam-ip cam-rtsp-port profile)
" -vcodec copy -b:a " audio-bitrate
" -ac " audio-channels
" -ar " audio-sampling-rate
" -f flv -bufsize " buffer-size
" -tune film " stream-server "/" stream-key)))
(defn- sanitize-process-name [name]
(string/lower-case (string/replace name #"\W" "-")))
(defn create-process!
"Creates a process, adds it to the registry and assigns a monitoring
channel to it. Returns the process."
[process-name ffmpeg-config]
(let [id (generate-process-id)
unit-name (str (sanitize-process-name process-name)
"-" id)
path (sys/create-service! unit-name
(ffmpeg-command ffmpeg-config)
(get ffmpeg-config :description
"FFMPEG streaming process, created by `stream`."))
monitor (sys/create-monitor! unit-name)
process (->process id process-name unit-name monitor ffmpeg-config #{})]
(debug "Creating process with ID:" id)
(dosync
(commute processes assoc id process))
process))
(defn delete-process!
"Deletes a process from the process map, stops it and deletes the unit file.
Returns `true` on success, `false` otherwise."
[id]
(debug "Removing process with ID:" id)
(if-let [proc (get @processes id)]
(let [{:keys [unit-name monitor]} proc
[_ close] monitor]
(sys/remove-service! unit-name)
(println close)
(close)
(dosync (commute processes dissoc id))
true)
false))
(defn delete-all-processes! []
"Deletes all processes."
(doseq [[id _] @processes]
(delete-process! id)))
(defn get-process!
"Get the process with the id."
[id]
(get @processes id))

View file

@ -6,7 +6,11 @@
logf tracef debugf infof warnf errorf fatalf reportf
spy get-env]]
[clojure.java.shell :refer [sh]]
[slingshot.slingshot :refer [throw+]])
[slingshot.slingshot :refer [throw+]]
[clojure.core.async
:as a
:refer [>! <! >!! <!! go chan buffer close! thread
alts! alts!! timeout]])
(:import [de.thjom.java.systemd Systemd Manager Systemd$InstanceType
UnitStateListener]))
@ -76,7 +80,8 @@
(if (= (:exit result) 0)
true
(throw+ {:type ::systemd-error
:name name :message (:err result)}))))
:detail-type ::command-error
:message (:err result)}))))
(defn reload-systemd!
"Reloads the systemd user instance."
@ -136,11 +141,7 @@
(defn enable-service!
"Enables the service with the name. Returns true on success."
[name]
(let [result (sh "systemctl" "--user" "enable" name)]
(if (= (:exit result) 0)
true
(throw+ {:type ::systemd-error :message "Service can't be enabled."
:name name :err (:err result)}))))
(run-systemd-command! "enable" name))
(defn disable-service!
"Disables the service with the name."
@ -149,9 +150,15 @@
(defn create-service!
"Creates a unit file and reloads systemd. See `create-unit-file`."
[name command description target]
(create-unit-file! name command description target)
(reload-systemd!))
([name command description]
(create-service! name command description "default.target"))
([name command description target]
(if (re-find #"[0-9]" (str (first name)))
(throw+ {:type ::systemd-error
:detail-type ::create-error
:message "Service name can't start with a digit."}))
(create-unit-file! name command description target)
(reload-systemd!)))
(defn remove-service!
"Stops the service, removes the unit file and reloads systemd."
@ -160,6 +167,32 @@
(remove-unit-file! name)
(reload-systemd!))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Process Management ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- DBusMap->map [map]
(reduce (fn [result [key value]]
(assoc result (keyword key) (. value getValue)))
{} map))
;; TODO: implement removal
(defn create-monitor!
"Creates a monitoring channel for the service with the name.
Returns a vector with the channel and a function to close it."
[name]
(debug "Creating monitor for service:" name)
(let [chan (chan)
listener
(reify
UnitStateListener
(stateChanged [this unit properties]
(>!! chan (DBusMap->map properties))))]
(. (get-service! name) addListener listener)
[chan (fn []
(close! chan)
(. (get-service! name) removeListener listener))]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Graveyard ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View file

@ -2,7 +2,10 @@
(:require [stream.commander.systemd :as systemd]
[stream.commander.api :as api]
[clojure.test :refer :all]
[clojure.java.io :as io]))
[clojure.java.io :as io]
[slingshot.slingshot :refer [try+]]
[clojure.core.async
:as a]))
(deftest unit-files
(testing "The rendering."
@ -20,9 +23,9 @@
(is (not (.exists (io/as-file (systemd/get-unit-path name))))))))
(deftest systemd-services
(let [name (str (java.util.UUID/randomUUID))
(let [name (str "testing-" (java.util.UUID/randomUUID))
unit-path (systemd/create-unit-file! name
"cat /dev/zero" "test service")]
"cat /dev/zero" "test service")]
(testing "loading the service"
(systemd/reload-systemd!)
(is (= :loaded (systemd/get-service-load-state! name))))
@ -35,6 +38,11 @@
(systemd/stop-service! name)
(is (= :inactive (systemd/get-service-state! name))))
(testing "restarting the service"
(systemd/start-service! name)
(systemd/restart-service! name)
(is (= :active (systemd/get-service-state! name))))
(testing "enabling the service"
(systemd/enable-service! name)
(is (= :enabled (systemd/get-service-file-state! name))))
@ -45,4 +53,87 @@
(testing "removing the service"
(systemd/remove-service! name)
(is (= :not-found (systemd/get-service-load-state! name))))))
(is (= :not-found (systemd/get-service-load-state! name))))
(testing "creating a service automatically"
(systemd/create-service! name "cat /dev/zero" "test service")
(is (= :loaded (systemd/get-service-load-state! name))))
(let [[channel close] (systemd/create-monitor! name)]
(testing "detecting activity"
(systemd/restart-service! name)
(is (a/<!! channel) "No value in channel!")
(while (a/poll! channel))
(close)
(is (= nil (a/<!! channel)))))
(testing "failing systemd command"
(systemd/remove-service! name)
(try+
(systemd/start-service! "non-existend")
(is false "No exception thrown.")
(catch [:type :stream.commander.systemd/systemd-error] _
(is true))
(catch Object _
(is false "Wrong exception thrown."))))
(testing "creating service with digit as first char"
(try+
(systemd/create-service! "1234" "cat /dev/zero" "test service")
(catch [:type :stream.commander.systemd/systemd-error] _
(is true))
(catch Object _
(is false "Wrong exception thrown."))))
(testing "getting file state state of nonexisting service"
(is (not (systemd/get-service-file-state!
(str "does_not_exist"
(java.util.UUID/randomUUID))))))))
(deftest auxiliary-api
(testing "ffmpeg input string"
(is (= "rtsp://192.168.1.205:554/axis-media/media.amp"
(#'api/input-string nil nil "192.168.1.205" 554 "axis-media/media.amp")))
(is (= "rtsp://cam:pass@192.168.1.205:554/axis-media/media.amp"
(#'api/input-string "cam" "pass" "192.168.1.205" 554 "axis-media/media.amp"))))
(testing "unit name sanitizer"
(is (= "a-b-c-d-" (#'api/sanitize-process-name "a*b C?d.")))))
(deftest ffmpeg-process-management
(let [config {:cam-ip "localhost"
:cam-rtsp-port "554"
:profile "bla"
:stream-server "server"
:stream-key "key"}]
;; TODO: proper spec testing
(testing "getting an ffmpeg command"
(api/ffmpeg-command config))
(let [proc (api/create-process!
"tester" config)]
(testing "creating ffmpeg process the high-level way"
(is (= :loaded (systemd/get-service-load-state!
(:unit-name proc)))))
(testing "deleting the process"
(is (api/delete-process! (:id proc)))
(is (not (api/get-process! (:id proc))))))
(testing "creating two processes"
(api/create-process!
"tester" config)
(api/create-process!
"tester" config)
(is (= 2 (count @@#'api/processes))))
(testing "generated ids do not collide"
(doseq [i (range 100)]
(is (not (api/get-process! (#'api/generate-process-id))))))
(testing "deleting all processes"
(api/delete-all-processes!)
(is (= 0 (count @@#'api/processes))))
(testing "deleting non-existent process"
(is (not (api/delete-process! "nonexistent"))))))