From 652b0d38be19c9c35e437a0605daabe1345bd4b1 Mon Sep 17 00:00:00 2001 From: Valentin Boettcher Date: Mon, 3 Aug 2020 18:37:53 +0200 Subject: [PATCH] implement process replacement --- src/stream/commander/api.clj | 51 +++++++++++++++++++++------------- test/stream/commander_test.clj | 6 ++++ 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/src/stream/commander/api.clj b/src/stream/commander/api.clj index bc4ed39..b9d25f7 100644 --- a/src/stream/commander/api.clj +++ b/src/stream/commander/api.clj @@ -251,26 +251,6 @@ (defn- sanitize-process-name [name] (string/lower-case (string/replace name #"\W" "-"))) -(defn create-process! - "Creates a process, adds it to the registry and assigns a monitoring - channel to it. Returns the process." - [process-name ffmpeg-config] - (let [id (generate-process-id) - ffmpeg-config (merge default-ffmpeg-config ffmpeg-config) - unit-name (str (sanitize-process-name process-name) - "-" id) - path (sys/create-service! unit-name - (ffmpeg-command ffmpeg-config) - (get ffmpeg-config :description - "FFMPEG streaming process, created by `stream`.")) - monitor (sys/create-monitor! unit-name) - supervisor (attach-supervisor! id (first monitor)) - process (->process id process-name unit-name monitor supervisor ffmpeg-config #{})] - (info "Creating process with ID:" id) - (dosync - (commute processes assoc id process)) - process)) - (defn get-process! "Get the process with the id." [id] @@ -289,6 +269,34 @@ (dosync (commute processes dissoc (:id proc))) true)) +(defn create-process! + "Creates a process with the name `process-name` and the + `ffmpeg-config`, adds it to the registry and assigns a monitoring + channel to it. An `id` is generated if not specified. When the + specified `id` is already present in the registry it is replaced. + + Returns the process." + ([process-name ffmpeg-config id] + (if-let [proc (get-process! id)] + (do (info "Replacing process with ID:" id) + (delete-process! proc)) + (info "Creating process with ID:" id)) + (let [ffmpeg-config (merge default-ffmpeg-config ffmpeg-config) + unit-name (str (sanitize-process-name process-name) + "-" id) + path (sys/create-service! unit-name + (ffmpeg-command ffmpeg-config) + (get ffmpeg-config :description + "FFMPEG streaming process, created by `stream`.")) + monitor (sys/create-monitor! unit-name) + supervisor (attach-supervisor! id (first monitor)) + process (->process id process-name unit-name monitor supervisor ffmpeg-config #{})] + (dosync + (commute processes assoc id process)) + process)) + ([process-name ffmpeg-config] + (create-process! process-name ffmpeg-config (generate-process-id)))) + (defn delete-all-processes! [] "Deletes all processes." (doseq [[_ proc] @processes] @@ -299,6 +307,9 @@ [proc] (sys/get-service-state! (:unit-name proc))) +;; These control functions do not wait until a "final state" is +;; reached but resolve once a new, maybe unstable, state has been reached. + (defn start-process! "Starts the service associated to the process `proc`. Returns a promise that resolves to event `:failed` or `:active` or times out diff --git a/test/stream/commander_test.clj b/test/stream/commander_test.clj index 7c7d326..43d6df8 100644 --- a/test/stream/commander_test.clj +++ b/test/stream/commander_test.clj @@ -147,6 +147,7 @@ (is (= "a-b-c-d-" (#'api/sanitize-process-name "a*b C?d."))))) (deftest ffmpeg-process-management + ;; NOTE: This creates a failing process. (let [config {:cam-ip "0.0.0.0" :cam-rtsp-port "554" :profile "bla" @@ -240,6 +241,11 @@ "tester" config) (is (= 2 (count @@#'api/processes)))) + (testing "replace process" + (let [proc (api/create-process! "ghost" config) + proc-new (api/create-process! "ghost" config (:id proc))] + (is (not (= proc proc-new))))) + (testing "generated ids do not collide" (doseq [i (range 100)] (is (not (api/get-process! (#'api/generate-process-id))))))