implement serialization

This commit is contained in:
hiro98 2020-08-07 15:49:25 +02:00
parent 652b0d38be
commit 9cc0717601
2 changed files with 128 additions and 15 deletions

View file

@ -11,8 +11,12 @@
[slingshot "0.12.2"] [slingshot "0.12.2"]
;; [com.github.hypfvieh/dbus-java "3.2.1"] ;; [com.github.hypfvieh/dbus-java "3.2.1"]
[org.clojure/data.json "1.0.0"] [org.clojure/data.json "1.0.0"]
[com.taoensso/encore "2.122.0"]] [com.taoensso/encore "2.122.0"]
[com.outpace/config "0.13.2"]]
:main ^:skip-aot stream.core :main ^:skip-aot stream.core
:aliases {"config" ["run" "-m" "outpace.config.generate"]}
:profiles {:test {:jvm-opts ["-Dconfig.edn=resources/test/config.edn"]}
:prod {:jvm-opts ["-Dconfig.edn=config.edn"]}
:uberjar {:aot :all}}
:plugins [[lein-cloverage "1.1.2"]] :plugins [[lein-cloverage "1.1.2"]]
:target-path "target/%s" :target-path "target/%s")
:profiles {:uberjar {:aot :all}})

View file

@ -2,7 +2,9 @@
(:require [stream.commander.systemd :as sys] (:require [stream.commander.systemd :as sys]
[stream.commander.journal :as journal] [stream.commander.journal :as journal]
[clojure.string :as string] [clojure.string :as string]
[clojure.java.io :as io]
[slingshot.slingshot :refer [throw+]] [slingshot.slingshot :refer [throw+]]
[outpace.config :refer [defconfig]]
[taoensso.timbre :as timbre [taoensso.timbre :as timbre
:refer [log trace debug info warn error fatal report :refer [log trace debug info warn error fatal report
logf tracef debugf infof warnf errorf fatalf reportf logf tracef debugf infof warnf errorf fatalf reportf
@ -10,7 +12,10 @@
[clojure.core.async [clojure.core.async
:as a :as a
:refer [>! <! >!! <!! go chan buffer close! thread :refer [>! <! >!! <!! go chan buffer close! thread
alts! alts!! timeout]])) alts! alts!! timeout]]
[slingshot.slingshot :refer [throw+ try+]])
(:import [java.io File]
[java.nio.file Files LinkOption CopyOption StandardCopyOption Paths]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -29,13 +34,19 @@
(def monitor (a/pub master-monitor :type)) (def monitor (a/pub master-monitor :type))
;; TODO: load from config (defconfig default-ffmpeg-config
(def default-ffmpeg-config
{:ffmpeg-path "/usr/bin/ffmpeg" :rtsp-transport "tcp" :custom-flags "" {:ffmpeg-path "/usr/bin/ffmpeg" :rtsp-transport "tcp" :custom-flags ""
:audio-bitrate "128k" :audio-channels 1 :audio-sampling-rate 44100 :audio-bitrate "128k" :audio-channels 1 :audio-sampling-rate 44100
:rtsp-user nil :rtsp-password nil :buffer-size "100M"}) :rtsp-user nil :rtsp-password nil :buffer-size "100M"})
(def ^:const +default-timeout+ 1000) (defconfig ^{:validate [number? "Must be a number."]} default-timeout 1000)
(defconfig ^{:validate [#(let [path (Paths/get % (into-array String []))]
(Files/exists path
(into-array LinkOption []))
(Files/isWritable path)
(Files/isReadable path))
"Path must exist, be writable, be readable."]}
processdb-directory "./processes")
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Utilities ; ; Utilities ;
@ -199,7 +210,7 @@
takes precedence. takes precedence.
" "
[process & {:keys [event timeout matcher] [process & {:keys [event timeout matcher]
:or {timeout +default-timeout+}}] :or {timeout default-timeout}}]
(let [prom (promise) (let [prom (promise)
supervisor (:supervisor process)] supervisor (:supervisor process)]
(if (and (not event) (not matcher)) (if (and (not event) (not matcher))
@ -256,6 +267,7 @@
[id] [id]
(get @processes id)) (get @processes id))
(declare delete-processdb-file!)
(defn delete-process! (defn delete-process!
"Deletes a process from the process map, stops it and deletes the unit file. "Deletes a process from the process map, stops it and deletes the unit file.
Returns `true` on success, `false` otherwise." Returns `true` on success, `false` otherwise."
@ -267,8 +279,10 @@
(sys/remove-service! unit-name) (sys/remove-service! unit-name)
(close) (close)
(dosync (commute processes dissoc (:id proc))) (dosync (commute processes dissoc (:id proc)))
(delete-processdb-file! (:id proc))
true)) true))
(declare save-process!)
(defn create-process! (defn create-process!
"Creates a process with the name `process-name` and the "Creates a process with the name `process-name` and the
`ffmpeg-config`, adds it to the registry and assigns a monitoring `ffmpeg-config`, adds it to the registry and assigns a monitoring
@ -278,9 +292,9 @@
Returns the process." Returns the process."
([process-name ffmpeg-config id] ([process-name ffmpeg-config id]
(if-let [proc (get-process! id)] (if-let [proc (get-process! id)]
(do (info "Replacing process with ID:" id) (do (info "Replacing process with ID:" id)
(delete-process! proc)) (delete-process! proc))
(info "Creating process with ID:" id)) (info "Creating process with ID:" id))
(let [ffmpeg-config (merge default-ffmpeg-config ffmpeg-config) (let [ffmpeg-config (merge default-ffmpeg-config ffmpeg-config)
unit-name (str (sanitize-process-name process-name) unit-name (str (sanitize-process-name process-name)
"-" id) "-" id)
@ -293,6 +307,8 @@
process (->process id process-name unit-name monitor supervisor ffmpeg-config #{})] process (->process id process-name unit-name monitor supervisor ffmpeg-config #{})]
(dosync (dosync
(commute processes assoc id process)) (commute processes assoc id process))
(go
(save-process! process))
process)) process))
([process-name ffmpeg-config] ([process-name ffmpeg-config]
(create-process! process-name ffmpeg-config (generate-process-id)))) (create-process! process-name ffmpeg-config (generate-process-id))))
@ -320,7 +336,7 @@
(sys/start-service! (:unit-name proc)) (sys/start-service! (:unit-name proc))
prom)) prom))
([proc] ([proc]
(start-process! proc +default-timeout+))) (start-process! proc default-timeout)))
(defn stop-process! (defn stop-process!
"Stops the service associated to the process `proc`. Returns a promise "Stops the service associated to the process `proc`. Returns a promise
@ -332,7 +348,7 @@
(sys/stop-service! (:unit-name proc)) (sys/stop-service! (:unit-name proc))
prom)) prom))
([proc] ([proc]
(start-process! proc +default-timeout+))) (start-process! proc default-timeout)))
(defn restart-process! (defn restart-process!
"Restarts a process `proc` and wait for stop and start to happen "Restarts a process `proc` and wait for stop and start to happen
@ -341,7 +357,7 @@
@(stop-process! proc timeout) @(stop-process! proc timeout)
(start-process! proc timeout)) (start-process! proc timeout))
([proc] ([proc]
(restart-process! proc +default-timeout+))) (restart-process! proc default-timeout)))
(defn process-running? (defn process-running?
"Queries wether a process is running." "Queries wether a process is running."
@ -359,10 +375,103 @@
(= (sys/get-service-file-state! (:unit-name proc)) (= (sys/get-service-file-state! (:unit-name proc))
:enabled)) :enabled))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Serialization ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn process->edn
"Serializes the process `proc` into edn."
[^process proc]
(prn-str {:id (:id proc) :name (:process-name proc)
:config (:ffmpeg-config proc)
:version 1}))
(defn edn->process!
"Creates a process from its edn serialized version."
[{:keys [name config id]}]
(create-process! name config id))
(defn processdb-filename
"Computes the filename of the serialization file for the given process
`proc`."
[id]
(str processdb-directory "/" id ".edn"))
;; TODO: error hangling
(defn save-process!
"Serializes process `proc` information to a file in the
[[processdb-directory]]. Returns `true` on success.
It writes a temporary file first and then move it."
[proc]
(info "Writing processdb file for process:" (:process-name proc))
(let [tempfile
(File/createTempFile (:process-name proc) "_stream" (File. "/tmp"))]
(spit tempfile (process->edn proc))
(trace "Tepmfile is:" tempfile)
(try+
(Files/move (.toPath tempfile)
(Paths/get (processdb-filename (:id proc))
(into-array String []))
(into-array CopyOption
[(StandardCopyOption/REPLACE_EXISTING)]))
true
(catch Object _
(error (:throwable &throw-context) "could not write process file")
false))))
;; TODO: Global warnings
(defn load-process!
"Loads a process with the `id` from the processdb directory.
Returns `false` if the db file is not found or can't be read and
the loaded process otherwise."
[id]
(if-let [proc-data
(try+
(clojure.edn/read-string
(slurp (processdb-filename id)))
(catch Object _
(error (:throwable &throw-context) "could not read process file")
false))]
(edn->process! proc-data)
false))
(defn load-processes!
"Loads the serialized processes from the processdb directory by
iterating over it. Returns an array of processes."
[]
(reduce (fn [procs file]
(if-let [id (second (re-matches #"(.*)\.edn" file))]
(if-let [proc (load-process! id)]
(conj procs proc)
)
procs))
[] (.list (File. processdb-directory))))
(defn save-processes!
"Saves the process registry to files via [[save-process!]]"
[]
(info "saving processes")
(doseq [[_ proc] @processes]
(save-process! proc)))
(defn delete-processdb-file!
"Deletes the processdb file of a process.
Returns `false` if the file is not found."
[id]
(try+
(io/delete-file (processdb-filename id))
true
(catch Object _
(error (:throwable &throw-context) "could not delete process file")
false)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Init ; ; Init ;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn init! [] (defn init! []
"Initialize the systemd connection." "Initialize the systemd connection and makes the commander API
operable. Processess will have to be loaded manually."
(sys/init!)) (sys/init!))