/////////////////////////////////////////////////////////////////////////////////// // Main Module - Communicates with the Server and Orchestrates the other Moules. // /////////////////////////////////////////////////////////////////////////////////// const sock = require('socket.io-client'); const ffmpeg = require('fluent-ffmpeg'); const http = require('http'); const path = require('path'); const fs = require('fs'); const WMStrm = require(__dirname + '/lib/memWrite.js'); const exec = require('child_process').exec; const execSync = require('child_process').execSync; const spawnP = require('child_process').spawn; /** * Custom Modules * Yet to be initialized. */ const logger = require('src/logger.js'); /////////////////////////////////////////////////////////////////////////////// // Declarations // /////////////////////////////////////////////////////////////////////////////// // Force a Stop. let mustBe = false; // Restart the stream after it stopped. let restart = false; // Central State Variable // NOTE: Could be done with redux! let status = { status: 0, error: -1 } // Minor declarations. let config, source, snapSource, stopTimeout; let spawn = function() { source = 'rtsp://' + config.camIP + ':' + config.camPort + '/' + config.camProfile; ffmpeg.setFfmpegPath(config.ffmpegPath); delete cmd; cmd = ffmpeg({ source: source, stdoutLines: 20 }); if (config.customOutputOptions !== "") cmd.outputOptions(config.customOutputOptions.replace(/\s+\,\s+/g, ',').replace(/\s+\-/g, ',-').split(',')); if (config.customAudioOptions !== "") cmd.outputOptions(config.customAudioOptions.replace(/\s+\,\s+/g, ',').replace(/\s+\-/g, ',-').split(',')); else cmd.AudioCodec('copy'); if (config.customVideoOptions !== "") cmd.outputOptions(config.customVideoOptions.replace(/\s+\,\s+/g, ',').replace(/\s+\-/g, ',-').split(',')); else cmd.videoCodec('copy'); cmd.on('start', function(commandLine) { status.running = 0; logger.log(logger.importance[4], 'Spawned Ffmpeg with command: ' + commandLine); }) .on('end', function(o, e) { imDead('Normal Stop.', e); }) .on('error', function(err, o, e) { if (err.message.indexOf(source) > -1) criticalProblem(0, e, handleDisc, config.camIP, config.camPort); else if (err.message.indexOf(source + 'Input/output error') > -1 || err.message.indexOf('rtmp://a.rtmp.youtube.com/live2/' + config.key) > -1) criticalProblem(1, e, handleDisc, 'a.rtmp.youtube.com/live2/', 1935); else if (err.message.indexOf('spawn') > -1 || err.message.indexOf('niceness') > -1) criticalProblem(2, e, function() {}); else if (err.message.indexOf('SIGINT') > -1 || err.message.indexOf('SIGKILL') > -1) imDead('Normal Stop.', e); else imDead(err.message, e); }) .outputFormat('flv') .outputOptions(['-bufsize 50000k', '-tune film']) .output('rtmp://a.rtmp.youtube.com/live2/' + config.key); status.error = -1; socket.emit('change', { type: 'startStop', change: { running: 0, error: -1 } }); cmd.run(); }; let getSnap = function(cb) { snapSource = 'rtsp://' + config.camIP + ':' + config.camPort + '/' + config.snapProfile; let picBuff = new WMStrm(); recCmd = ffmpeg(snapSource) .on('start', function(commandLine) { logger.log(logger.importance[4], 'Snapshot ' + commandLine); }) .on('error', function(err, o, e) {}) .outputFormat('mjpeg') .frames(1) .stream(picBuff, { end: true }); picBuff.on('finish', function() { try { cb(picBuff.memStore.toString('base64')); delete pickBuff; } catch (e) { cb(false); } }); } function imDead(why, e = '') { if (stopTimeout) { clearTimeout(stopTimeout); stopTimeout = false; } status.running = 1; socket.emit('change', { type: 'startStop', change: { running: 1 } }); if (restart) { spawn(); restart = false; } if (!mustBe) { logger.log(logger.importance[2], 'Crash! ' + e); setTimeout(function() { spawn(); }, 1000); } mustBe = false; } function criticalProblem(err, e, handler, ...args) { if (!mustBe) { setTimeout(function() { status.running = 2; status.error = err; logger.log(logger.importance[3], 'Critical Problem: ' + errors[err] + '\n' + e); socket.emit('change', { type: 'error', change: { running: 2, error: err } }); handler(args) }, 1000); } mustBe = false; } function handleDisc(info) { let [host, port] = info; isReachable(host, port, is => { if (is) { spawn(); } else { setTimeout(function() { handleDisc(info); }, 10000); } }); } function isReachable(host, port, callback) { http.get({ host: host.split('/')[0], port: port }, function(res) { callback(true); }).on("error", function(e) { if (e.message == "socket hang up") { setTimeout(function() { callback(true); }, 1000); } else callback(false); }); } function stopFFMPEG() { cmd.kill('SIGINT'); stopTimeout = setTimeout(() => { logger.log(logger.importance[3], "Force Stop!"); cmd.kill(); }, 3000); } var commandHandlers = function commandHandlers(command, cb) { var handlers = { startStop: function() { if (restart) return; if (status.running !== 2) if (status.running === 0) { logger.log(logger.importance[1], "Stop Command!"); mustBe = true; stopFFMPEG(); socket.emit('data', { type: 'message', data: { title: 'Success', type: 'success', text: 'Stopped!' } }, command.sender); } else { logger.log(logger.importance[1], "Start Command!"); spawn(); socket.emit('data', { type: 'message', data: { title: 'Success', type: 'success', text: 'Started!' } }, command.sender); } }, snap: function() { getSnap(snap => { socket.emit('data', { type: 'snap', data: snap, }, command.sender); }); }, config: function() { socket.emit('data', { type: 'config', data: config, }, command.sender); }, changeSettings: function() { for (let set in command.data) { if (typeof config[set] !== 'undefined') config[set] = command.data[set]; } let oldConfigured; if (config.configured === true) oldConfigured = true; config.configured = true; fs.writeFile(__dirname + '/config.js', JSON.stringify(config, undefined, 2), function(err) { if (err) { socket.emit('data', { type: 'message', data: { title: 'Error', type: 'error', text: 'Can\'t save the Settings!\n' + err.message } }, command.sender); } else { restartSSH(() => { socket.emit('data', { type: 'message', data: { title: 'Success', type: 'success', text: 'Settings Saved!' } }, command.sender); if (oldConfigured) { socket.emit('change', { type: 'settings', change: { config: command.data } }); stopFFMPEG(); spawn(); } else { socket.disconnect(); init(); } }); } }); }, restart: function() { if (status.running === 0) { logger.log(logger.importance[1], "Restart Command!"); mustBe = true; restart = true; stopFFMPEG(); } else { logger.log(logger.importance[1], "Start Command!"); spawn(); } socket.emit('data', { type: 'message', data: { title: 'Success', type: 'success', text: 'Restarted!' } }, command.sender); }, restartSSH: function() { restartSSH(() => { socket.emit('data', { type: 'message', data: { title: 'Success', type: 'success', text: 'Restarted SSH Tunnels!' } }, command.sender); }); }, getLogs: function() { logger.query({ limit: 100 }, function(err, data) { data = data.file; if (err) { data = []; } else if (data.length === 0) data = []; socket.emit('data', { type: 'logs', data: data }, command.sender); }); } }; //call the handler var call = handlers[command.command]; if (call) call(); } function restartSSH(cb) { exec('forever stop SSH-Serv', () => { connectSSH(() => { socket.emit('change', { change: { ssh: { port: status.ssh.port, camForwardPort: status.ssh.camForwardPort } } }); cb(); }); }); } function handleKill() { process.stdin.resume(); logger.log(logger.importance[0], "Received Shutdown Command"); mustBe = true; cmd.kill(); process.exit(0); } process.on('SIGTERM', function() { handleKill(); }); //let's go function init() { config = readConfig(); if (config.configured) { socket = sock(config.master + '/pi'); initSocket(); status.name = config.name; if (!cmd) spawn(); } else { socket = sock(config.master + '/pi', { query: "unconfigured=true" }); status.running = 2; initSocket(); } } function initSSH(cb) { status.ssh = { // that Could come from the Master server! user: config['ssh-user'], localUser: config['ssh-local-user'], masterPort: config['ssh-port'] }; checkSSH((alive) => { if (alive) cb(); else connectSSH(cb); }); } function connectSSH(cb = function() { socket.emit('change', { change: { ssh: { port: status.ssh.port, camForwardPort: status.ssh.camForwardPort } } }); }) { socket.emit('getSSHPort', ports => { [status.ssh.port, status.ssh.camForwardPort] = ports; let ssh = exec(`forever start -a --killSignal=SIGINT --uid SSH-Serv sshManager.js ${status.ssh.port} ${status.ssh.camForwardPort} ${config.camPanelPort}`, { detached: true, shell: true, cwd: __dirname }); ssh.on('error', (err) => { socket.emit('data', { type: 'message', data: { title: 'Error', type: 'error', text: 'Could not start SSH tunnels!' } }, command.sender); }); cb(); }); } function checkSSH(cb) { let m, pid, alive; let re = /SSH-Serv.*?sshManager\.js\s([0-9]+)\s([0-9]+).*log.*[^STOPPED]+/g; exec('forever list', (error, stdout, stderr) => { if (error) throw error; let alive = false; while ((m = re.exec(stdout)) !== null) { if (m.index === re.lastIndex) { re.lastIndex++; } if (alive) { exec('forever stop SSH-Serv'); cb(false) return; } else { [, status.ssh.port, status.ssh.camForwardPort] = m; alive = true; } } cb(alive); }); } function initSocket() { socket.on('connect', function() { logger.log(logger.importance[0], 'Connected to Master: ' + config.master + '.'); if (config['ssh-user']) initSSH(err => { if (err) throw err; socket.emit('meta', status); }); else { socket.emit('meta', status); } }); socket.on('disconnect', function() { socket.disconnect(); init(); }); socket.on('command', (command, cb) => { commandHandlers(command, cb); }); } function readConfig() { return JSON.parse(fs.readFileSync(__dirname + '/config.js')); } init();