From 58041578f7e8846eddebc58bc8e18288a96d3f40 Mon Sep 17 00:00:00 2001 From: Valentin Boettcher Date: Tue, 15 Aug 2017 14:10:29 +0200 Subject: [PATCH] some error handling and snapshot fiexes --- extras/sshManager.js | 49 ++++++++-------- src/commander.js | 136 ++++++++++++++++++++++++++----------------- src/communicator.js | 26 +++++++-- src/errorHandler.js | 3 +- src/logger.js | 2 +- src/reducers.js | 4 +- 6 files changed, 132 insertions(+), 88 deletions(-) diff --git a/extras/sshManager.js b/extras/sshManager.js index b923084..dc7977f 100644 --- a/extras/sshManager.js +++ b/extras/sshManager.js @@ -47,37 +47,40 @@ server.on('create_tunnel', (data, socket) => { tunnel = tunnels[data.localPort]; // If the tunnel already runs: - if (tunnel && (tunnel.info.localHost === 'localhost' || tunnel.info.localHost === data.localHost)){ + if (tunnel && (tunnel.info.localHost === 'localhost' || tunnel.info.localHost === data.localHost)) { return replySuccess(tunnel.info.remotePort); } // Dummy for other request tunnels[data.localPort] = { - info: { - localHost: data.localHost || 'localhost' - } + info: { + localHost: data.localHost || 'localhost' + } }; // Let's create a tunnel! return createNewTunnel(data).then((tunnel) => { tunnels[tunnel.info.localPort] = tunnel; - replySuccess(tunnel.info.remotePort); - console.log("Created Tunnel:\n", JSON.stringify(tunnel)); + replySuccess(tunnel.info.remotePort); + console.log("Created Tunnel:\n", JSON.stringify(tunnel)); }, (error) => { - console.error("Tunnel Creation Failed:\n", error); - replyError(error); - delete tunnels[data.localPort]; + console.error("Tunnel Creation Failed:\n", error); + replyError(error); + delete tunnels[data.localPort]; }); }); /** * The request handler to close a tunnel. The reply is {error: [message]} in case of an error, otherwise {success: true}. */ -server.on('close_tunnel', ({port, id}, socket) => { +server.on('close_tunnel', ({ + port, + id +}, socket) => { let replySuccess = generateRelyFunction('success', socket, id); let replyError = generateRelyFunction('error', socket, id); let error, tunnel; - + tunnel = tunnels[port]; error = !tunnel; @@ -100,21 +103,21 @@ function createNewTunnel(options) { return new Promise((resolve, reject) => { try { let tunnel = autossh(options); - // TODO: BETTER ERROR HANDLING + // TODO: BETTER ERROR HANDLING tunnel.on('connect', connection => { - // Wait for Exit // TODO: Nicer - setTimeout(() => resolve(tunnel), 1000); + // Wait for Exit // TODO: Nicer + setTimeout(() => resolve(tunnel), 1000); }); - - tunnel.on('error', error => { - // If auto SSH or SSH itself issue an error. // TODO: Investigate - if(typeof error === 'string' || error.message && (error.message.indexOf("failed for listen port") > -1 || error.message.indexOf("refused") > -1)){ - tunnel.kill(); - reject(error.message || error); - } - return; - }); + tunnel.on('error', error => { + // If auto SSH or SSH itself issue an error. // TODO: Investigate + if (typeof error === 'string' || error.message && (error.message.indexOf("failed for listen port") > -1 || error.message.indexOf("refused") > -1)) { + tunnel.kill(); + reject(error.message || error); + } + + return; + }); } catch (error) { reject(error); } diff --git a/src/commander.js b/src/commander.js index 5f68bde..0b31d07 100644 --- a/src/commander.js +++ b/src/commander.js @@ -20,20 +20,7 @@ let self = false; * The FFMPEG command. * @member */ -let cmd = ffmpeg({ - stdoutLines: 20 -}); - -/** - * The command to take a snapshot. - * @member - */ -let snapCmd = ffmpeg({ - stdoutLines: 20 -}); - -// Memory Buffer to hold 'em all... (The Snapshot) -let snapBuff; +let cmd; // The Config, Logger and a handle for the kill timeout. let _stopHandle = false; @@ -110,10 +97,6 @@ class Commander { config = _getConfig; dispatch = _dispatch; - // Register events. - cmd.on('error', crashed); - cmd.on('end', crashed); // Can posibly be an error. - return this; } } @@ -124,7 +107,7 @@ module.exports = Commander; * Action creators. */ -// NOTE: Maybe better error resolving strategy. +// NOTE: Maybe better error resolving strategy. // Start streaming. /** * Starts the streaming process if possible. @@ -149,7 +132,7 @@ Commander.prototype.start = function() { dispatch(requestStart()); // Create the FFMPEG-Command initially. - this.createCommands(); + self.createCommands(); new Promise((resolve, reject) => { cmd.once('start', resolve); @@ -179,7 +162,10 @@ Commander.prototype.restart = function() { dispatch(self.stop()).then(() => { dispatch(self.start()).then(() => { resolve("Successfully restarted."); - }).catch((message) => reject({ message: "Could not restart!", details: message })); // TODO: CD + }).catch((message) => reject({ + message: "Could not restart!", + details: message + })); // TODO: CD }); }); }); @@ -213,6 +199,7 @@ Commander.prototype.stop = function() { // Ok let's force it then... logger.log(logger.importance[3], "Force Stop!"); cmd.kill(); + resolve(); }, 3000); }).then(() => { clearTimeout(_stopHandle); @@ -234,6 +221,7 @@ Commander.prototype.takeSnapshot = function() { // TODO: CD // Init memstream. + let snapBuff; try { snapBuff = new WMStrm(); } catch (e) { @@ -241,11 +229,22 @@ Commander.prototype.takeSnapshot = function() { return; } + let snapCmd = ffmpeg({ + stdoutLines: 20 + }); + + snapCmd.input(source + config().snapProfile) + .outputFormat('mjpeg') + .frames(1) + .stream(snapBuff, { + end: true + }); + // Reject on Error. snapCmd.once('error', () => reject('An error occured while taking the snapshot')); // Send data on Finish. - snapBuff.once('finnish', () => { + snapCmd.once('end', () => { try { resolve(snapBuff.memStore.toString('base64')); } catch (e) { @@ -253,7 +252,7 @@ Commander.prototype.takeSnapshot = function() { } }); - snapBuff.run(); + snapCmd.run(); }).then(snap => { { dispatch(setSnapshotTaken()); @@ -272,8 +271,9 @@ Commander.prototype.takeSnapshot = function() { * @returns { Object } The fluent-ffmpeg command object. */ Commander.prototype.createCommands = function() { - // Clear inputs - cmd._inputs = []; + cmd = ffmpeg({ + stdoutLines: 20 + }); // TODO: Multi Protocol source = 'rtsp://' + config().camIP + ':' + config().camPort + '/'; @@ -296,11 +296,21 @@ Commander.prototype.createCommands = function() { // Output Options. cmd.outputFormat('flv') .outputOptions(['-bufsize 50000k', '-tune film']) - .output('rtmp://a.rtmp.youtube.com/live2/' + config().key); + .output('rtmp://' + config()['stream-server'] + '/' + config().key); - // Clear inputs. - snapCmd._inputs = []; + // Register events. + cmd.on('error', (error, stdout, stderr) => crashed({ + error, + stdout, + stderr + })); + // Can posibly be an error. + cmd.on('end', (stdout, stderr) => crashed({ + stdout, + stderr + })); + //snapCmd._outputs = []; // Snap Profile. /*snapCmd.input(source + config().snapProfile) .outputFormat('mjpeg') @@ -327,39 +337,53 @@ Commander.prototype.createCommands = function() { * @param { String } stdout * @param { String } stderr */ -function crashed(error, stdout, stderr) { +function crashed({ + error, + stdout, + stderr +}) { let errorCode, handler; - // We stopped... - if (getState().stream.running === 'STOPPED' || getState().stream.running === 'STOPPING') - return; + // Finished + if (!error) { + dispatch(setStopped()); + } else { + // We stopped... + if (getState().stream.running === 'STOPPED' || getState().stream.running === 'STOPPING') + return; - // Can't connect to the Camera - if (error.message.indexOf(source) > -1) { - errorCode = 0; - handler = errorHandling.handlers.tryReconnect(config().camIP, config().camPort, - () => dispatch(self.start())); - } + // Can't connect to the Camera + if (error.message.indexOf(source) > -1) { + errorCode = 0; + handler = errorHandling.handlers.tryReconnect(config().camIP, config().camPort, + () => dispatch(self.start()).catch(() => {})); + } - // Can't connect to the Internet / YouTube - else if (error.message.indexOf(source + 'Input/output error') > -1 || error.message.indexOf('rtmp://a.rtmp.youtube.com/live2/' + config().key) > -1) { - errorCode = 1; + // Invalid Stream Key etc... + else if (error.message.indexOf('Operation not permitted') > -1 || (error.message.indexOf('Input/output') > -1 && error.message.indexOf('rtmp://' + config()['stream-server'] + '/' + config().key) > -1)) { + errorCode = 4; + } - handler = errorHandling.handlers.tryReconnect('a.rtmp.youtube.com/live2/', 1935, - () => dispatch(self.start())); - } + // Can't connect to the Internet / YouTube + else if (error.message.indexOf(source + ' Input/output error') > -1 || error.message.indexOf('rtmp://' + config()['stream-server'] + '/' + config().key) > -1) { + errorCode = 1; - // Wrong FFMPEG Executable - else if (error.message.indexOf('spawn') > -1 || error.message.indexOf('niceness') > -1) - errorCode = 2; + handler = errorHandling.handlers.tryReconnect(config()['stream-server'], 1935, + () => dispatch(self.start()).catch(() => {})); // TODO: Better Solution + } - // Stopped by us - SIGINT Shouldn't lead to a crash. - else if (error.message.indexOf('SIGINT') > -1 || error.message.indexOf('SIGKILL') > -1) { - return; + // Wrong FFMPEG Executable + else if (error.message.indexOf('spawn') > -1 || error.message.indexOf('niceness') > -1) + errorCode = 2; + + // Stopped by us - SIGINT Shouldn't lead to a crash. + else if (error.message.indexOf('SIGINT') > -1 || error.message.indexOf('SIGKILL') > -1) { + return; + } } // Some unknown Problem, just try to restart. - else { + if (!errorCode && errorCode !== 0) { errorCode = 3; // Just restart in a Second. @@ -387,16 +411,18 @@ Commander.middleware = store => next => action => { let result = next(action); // If sth. has changed, we restart. - if (self && getState().stream.running === 'RUNNING' - && action.type === UPDATE_CONFIG) { + if (self && (getState().stream.error !== false || getState().stream.running === 'RUNNING') && + action.type === UPDATE_CONFIG) { if (action.data.key || action.data.ffmpegPath || action.data.customOutputOptions || action.data.customVideoOptions || action.data.customAudioOptions || action.data.camIP || - action.data.camPort) { - dispatch(self.restart()).catch(()=>{}); //TODO: error Handling + action.data.camPort || + action.data['stream-server'] || + action.key) { + dispatch(self.restart()).catch(() => {}); //TODO: error Handling } } return result; diff --git a/src/communicator.js b/src/communicator.js index 3ba26c4..cfadb17 100644 --- a/src/communicator.js +++ b/src/communicator.js @@ -138,6 +138,17 @@ Communicator.prototype.sendAction = function(action) { let type, change, state = getState(); + let convertError = () => { + return state.stream.error === false ? -1 : state.stream.error; + }; + + let convertRunning = () => { + if(state.stream.error !== false) + return 2; + + return state.stream.running == 'RUNNING' ? 0 : 1; // TODO: CD + }; + // All Legacy Stuff switch (action.type) { case SET_STARTED: @@ -145,8 +156,8 @@ Communicator.prototype.sendAction = function(action) { case SET_ERROR_RESOLVED: type = 'startStop'; change = { - running: state.stream.running == 'RUNNING' ? 0 : 1, // TODO: CD - error: state.stream.error || -1 + running: convertRunning(), + error: convertError() }; break; @@ -175,11 +186,14 @@ Communicator.prototype.sendAction = function(action) { case HYDRATE: socket.emit('meta', { - running: state.stream.running == 'RUNNING' ? 0 : 1, - error: state.stream.error || -1, + running: convertRunning(), + error: convertError(), name: state.name, config: state.config, - haveSettings: true // LEGACY + haveSettings: true, // LEGACY + ssh: { + port: state.ssh.sshForwardPort + } }); return; @@ -359,7 +373,7 @@ function handleCommand(command, callback) { dispatch(SSHMan.restartTunnels()).then(msg => answerSuccess(msg)).catch(err => answerError(err)); break; case commands.SNAPSHOT: - dispatch(commander.takeSnapshot()).then(snap => self.sendSnapshot(snap)) + dispatch(commander.takeSnapshot()).then(snap => self.sendSnapshot(snap, command.sender)) .catch(error => self.sendMessage('Snapshot Failed', 'error', error, command.sender)); break; case commands.GET_LOGS: diff --git a/src/errorHandler.js b/src/errorHandler.js index 29db7c8..de0dde4 100644 --- a/src/errorHandler.js +++ b/src/errorHandler.js @@ -49,13 +49,14 @@ module.exports.stopHandling = function() { let stopper, handler = handlers[getState().stream.handleError]; + dispatch(stopErrorHandling()); + if (!getState().stream.handleError) return Promise.resolve(); if (!handler) return Promise.resolve(); - dispatch(stopErrorHandling()); stopper = handler.stop(); if (!stopper.then) diff --git a/src/logger.js b/src/logger.js index f996b5b..0d64147 100644 --- a/src/logger.js +++ b/src/logger.js @@ -30,7 +30,7 @@ const customLevels = { }; // Error Messages -const errors = ['Camera Disconnected', 'YoutTube Disconnected', 'Wrong ffmpeg executable.', 'Unknown error.']; +const errors = ['Camera Disconnected', 'Stream-Server Disconnected', 'Wrong ffmpeg executable.', 'Unknown error.', 'Invalid Stream Key or the stream server doesn\'t accept connections.']; const { UPDATE_CONFIG, diff --git a/src/reducers.js b/src/reducers.js index 63af338..e29b807 100644 --- a/src/reducers.js +++ b/src/reducers.js @@ -41,8 +41,8 @@ function error(state = false, action) { case SET_ERROR: return action.data; case SET_STARTED: - case SET_STOPPED: - case SET_ERROR_RESOLVED: + case SET_STOPPED: + case SET_ERROR_RESOLVED: return false; default: return state;