From 9cc07176017e7d66ec7dc1444dad6f7a370c0f9f Mon Sep 17 00:00:00 2001 From: hiro98 Date: Fri, 7 Aug 2020 15:49:25 +0200 Subject: [PATCH] implement serialization --- project.clj | 10 ++- src/stream/commander/api.clj | 133 +++++++++++++++++++++++++++++++---- 2 files changed, 128 insertions(+), 15 deletions(-) diff --git a/project.clj b/project.clj index a12e864..d1b066b 100644 --- a/project.clj +++ b/project.clj @@ -11,8 +11,12 @@ [slingshot "0.12.2"] ;; [com.github.hypfvieh/dbus-java "3.2.1"] [org.clojure/data.json "1.0.0"] - [com.taoensso/encore "2.122.0"]] + [com.taoensso/encore "2.122.0"] + [com.outpace/config "0.13.2"]] :main ^:skip-aot stream.core + :aliases {"config" ["run" "-m" "outpace.config.generate"]} + :profiles {:test {:jvm-opts ["-Dconfig.edn=resources/test/config.edn"]} + :prod {:jvm-opts ["-Dconfig.edn=config.edn"]} + :uberjar {:aot :all}} :plugins [[lein-cloverage "1.1.2"]] - :target-path "target/%s" - :profiles {:uberjar {:aot :all}}) + :target-path "target/%s") diff --git a/src/stream/commander/api.clj b/src/stream/commander/api.clj index b9d25f7..9969459 100644 --- a/src/stream/commander/api.clj +++ b/src/stream/commander/api.clj @@ -2,7 +2,9 @@ (:require [stream.commander.systemd :as sys] [stream.commander.journal :as journal] [clojure.string :as string] + [clojure.java.io :as io] [slingshot.slingshot :refer [throw+]] + [outpace.config :refer [defconfig]] [taoensso.timbre :as timbre :refer [log trace debug info warn error fatal report logf tracef debugf infof warnf errorf fatalf reportf @@ -10,7 +12,10 @@ [clojure.core.async :as a :refer [>! !! process id process-name unit-name monitor supervisor ffmpeg-config #{})] (dosync (commute processes assoc id process)) + (go + (save-process! process)) process)) ([process-name ffmpeg-config] (create-process! process-name ffmpeg-config (generate-process-id)))) @@ -320,7 +336,7 @@ (sys/start-service! (:unit-name proc)) prom)) ([proc] - (start-process! proc +default-timeout+))) + (start-process! proc default-timeout))) (defn stop-process! "Stops the service associated to the process `proc`. Returns a promise @@ -332,7 +348,7 @@ (sys/stop-service! (:unit-name proc)) prom)) ([proc] - (start-process! proc +default-timeout+))) + (start-process! proc default-timeout))) (defn restart-process! "Restarts a process `proc` and wait for stop and start to happen @@ -341,7 +357,7 @@ @(stop-process! proc timeout) (start-process! proc timeout)) ([proc] - (restart-process! proc +default-timeout+))) + (restart-process! proc default-timeout))) (defn process-running? "Queries wether a process is running." @@ -359,10 +375,103 @@ (= (sys/get-service-file-state! (:unit-name proc)) :enabled)) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + ; Serialization ; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defn process->edn + "Serializes the process `proc` into edn." + [^process proc] + (prn-str {:id (:id proc) :name (:process-name proc) + :config (:ffmpeg-config proc) + :version 1})) + +(defn edn->process! + "Creates a process from its edn serialized version." + [{:keys [name config id]}] + (create-process! name config id)) + +(defn processdb-filename + "Computes the filename of the serialization file for the given process + `proc`." + [id] + (str processdb-directory "/" id ".edn")) + +;; TODO: error hangling +(defn save-process! + "Serializes process `proc` information to a file in the + [[processdb-directory]]. Returns `true` on success. + + It writes a temporary file first and then move it." + [proc] + (info "Writing processdb file for process:" (:process-name proc)) + (let [tempfile + (File/createTempFile (:process-name proc) "_stream" (File. "/tmp"))] + (spit tempfile (process->edn proc)) + (trace "Tepmfile is:" tempfile) + (try+ + (Files/move (.toPath tempfile) + (Paths/get (processdb-filename (:id proc)) + (into-array String [])) + (into-array CopyOption + [(StandardCopyOption/REPLACE_EXISTING)])) + true + (catch Object _ + (error (:throwable &throw-context) "could not write process file") + false)))) + +;; TODO: Global warnings +(defn load-process! + "Loads a process with the `id` from the processdb directory. + Returns `false` if the db file is not found or can't be read and + the loaded process otherwise." + [id] + (if-let [proc-data + (try+ + (clojure.edn/read-string + (slurp (processdb-filename id))) + (catch Object _ + (error (:throwable &throw-context) "could not read process file") + false))] + (edn->process! proc-data) + false)) + +(defn load-processes! + "Loads the serialized processes from the processdb directory by + iterating over it. Returns an array of processes." + [] + (reduce (fn [procs file] + (if-let [id (second (re-matches #"(.*)\.edn" file))] + (if-let [proc (load-process! id)] + (conj procs proc) + ) + procs)) + [] (.list (File. processdb-directory)))) + + +(defn save-processes! + "Saves the process registry to files via [[save-process!]]" + [] + (info "saving processes") + (doseq [[_ proc] @processes] + (save-process! proc))) + +(defn delete-processdb-file! + "Deletes the processdb file of a process. + Returns `false` if the file is not found." + [id] + (try+ + (io/delete-file (processdb-filename id)) + true + (catch Object _ + (error (:throwable &throw-context) "could not delete process file") + false))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ; Init ; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn init! [] - "Initialize the systemd connection." + "Initialize the systemd connection and makes the commander API + operable. Processess will have to be loaded manually." (sys/init!))