some error handling and snapshot fiexes

This commit is contained in:
Valentin Boettcher 2017-08-15 14:10:29 +02:00
parent 5f19649fc8
commit 58041578f7
6 changed files with 132 additions and 88 deletions

View file

@ -47,37 +47,40 @@ server.on('create_tunnel', (data, socket) => {
tunnel = tunnels[data.localPort]; tunnel = tunnels[data.localPort];
// If the tunnel already runs: // If the tunnel already runs:
if (tunnel && (tunnel.info.localHost === 'localhost' || tunnel.info.localHost === data.localHost)){ if (tunnel && (tunnel.info.localHost === 'localhost' || tunnel.info.localHost === data.localHost)) {
return replySuccess(tunnel.info.remotePort); return replySuccess(tunnel.info.remotePort);
} }
// Dummy for other request // Dummy for other request
tunnels[data.localPort] = { tunnels[data.localPort] = {
info: { info: {
localHost: data.localHost || 'localhost' localHost: data.localHost || 'localhost'
} }
}; };
// Let's create a tunnel! // Let's create a tunnel!
return createNewTunnel(data).then((tunnel) => { return createNewTunnel(data).then((tunnel) => {
tunnels[tunnel.info.localPort] = tunnel; tunnels[tunnel.info.localPort] = tunnel;
replySuccess(tunnel.info.remotePort); replySuccess(tunnel.info.remotePort);
console.log("Created Tunnel:\n", JSON.stringify(tunnel)); console.log("Created Tunnel:\n", JSON.stringify(tunnel));
}, (error) => { }, (error) => {
console.error("Tunnel Creation Failed:\n", error); console.error("Tunnel Creation Failed:\n", error);
replyError(error); replyError(error);
delete tunnels[data.localPort]; delete tunnels[data.localPort];
}); });
}); });
/** /**
* The request handler to close a tunnel. The reply is {error: [message]} in case of an error, otherwise {success: true}. * The request handler to close a tunnel. The reply is {error: [message]} in case of an error, otherwise {success: true}.
*/ */
server.on('close_tunnel', ({port, id}, socket) => { server.on('close_tunnel', ({
port,
id
}, socket) => {
let replySuccess = generateRelyFunction('success', socket, id); let replySuccess = generateRelyFunction('success', socket, id);
let replyError = generateRelyFunction('error', socket, id); let replyError = generateRelyFunction('error', socket, id);
let error, tunnel; let error, tunnel;
tunnel = tunnels[port]; tunnel = tunnels[port];
error = !tunnel; error = !tunnel;
@ -100,21 +103,21 @@ function createNewTunnel(options) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
try { try {
let tunnel = autossh(options); let tunnel = autossh(options);
// TODO: BETTER ERROR HANDLING // TODO: BETTER ERROR HANDLING
tunnel.on('connect', connection => { tunnel.on('connect', connection => {
// Wait for Exit // TODO: Nicer // Wait for Exit // TODO: Nicer
setTimeout(() => resolve(tunnel), 1000); setTimeout(() => resolve(tunnel), 1000);
}); });
tunnel.on('error', error => {
// If auto SSH or SSH itself issue an error. // TODO: Investigate
if(typeof error === 'string' || error.message && (error.message.indexOf("failed for listen port") > -1 || error.message.indexOf("refused") > -1)){
tunnel.kill();
reject(error.message || error);
}
return; tunnel.on('error', error => {
}); // If auto SSH or SSH itself issue an error. // TODO: Investigate
if (typeof error === 'string' || error.message && (error.message.indexOf("failed for listen port") > -1 || error.message.indexOf("refused") > -1)) {
tunnel.kill();
reject(error.message || error);
}
return;
});
} catch (error) { } catch (error) {
reject(error); reject(error);
} }

View file

@ -20,20 +20,7 @@ let self = false;
* The FFMPEG command. * The FFMPEG command.
* @member * @member
*/ */
let cmd = ffmpeg({ let cmd;
stdoutLines: 20
});
/**
* The command to take a snapshot.
* @member
*/
let snapCmd = ffmpeg({
stdoutLines: 20
});
// Memory Buffer to hold 'em all... (The Snapshot)
let snapBuff;
// The Config, Logger and a handle for the kill timeout. // The Config, Logger and a handle for the kill timeout.
let _stopHandle = false; let _stopHandle = false;
@ -110,10 +97,6 @@ class Commander {
config = _getConfig; config = _getConfig;
dispatch = _dispatch; dispatch = _dispatch;
// Register events.
cmd.on('error', crashed);
cmd.on('end', crashed); // Can posibly be an error.
return this; return this;
} }
} }
@ -124,7 +107,7 @@ module.exports = Commander;
* Action creators. * Action creators.
*/ */
// NOTE: Maybe better error resolving strategy. // NOTE: Maybe better error resolving strategy.
// Start streaming. // Start streaming.
/** /**
* Starts the streaming process if possible. * Starts the streaming process if possible.
@ -149,7 +132,7 @@ Commander.prototype.start = function() {
dispatch(requestStart()); dispatch(requestStart());
// Create the FFMPEG-Command initially. // Create the FFMPEG-Command initially.
this.createCommands(); self.createCommands();
new Promise((resolve, reject) => { new Promise((resolve, reject) => {
cmd.once('start', resolve); cmd.once('start', resolve);
@ -179,7 +162,10 @@ Commander.prototype.restart = function() {
dispatch(self.stop()).then(() => { dispatch(self.stop()).then(() => {
dispatch(self.start()).then(() => { dispatch(self.start()).then(() => {
resolve("Successfully restarted."); resolve("Successfully restarted.");
}).catch((message) => reject({ message: "Could not restart!", details: message })); // TODO: CD }).catch((message) => reject({
message: "Could not restart!",
details: message
})); // TODO: CD
}); });
}); });
}); });
@ -213,6 +199,7 @@ Commander.prototype.stop = function() {
// Ok let's force it then... // Ok let's force it then...
logger.log(logger.importance[3], "Force Stop!"); logger.log(logger.importance[3], "Force Stop!");
cmd.kill(); cmd.kill();
resolve();
}, 3000); }, 3000);
}).then(() => { }).then(() => {
clearTimeout(_stopHandle); clearTimeout(_stopHandle);
@ -234,6 +221,7 @@ Commander.prototype.takeSnapshot = function() {
// TODO: CD // TODO: CD
// Init memstream. // Init memstream.
let snapBuff;
try { try {
snapBuff = new WMStrm(); snapBuff = new WMStrm();
} catch (e) { } catch (e) {
@ -241,11 +229,22 @@ Commander.prototype.takeSnapshot = function() {
return; return;
} }
let snapCmd = ffmpeg({
stdoutLines: 20
});
snapCmd.input(source + config().snapProfile)
.outputFormat('mjpeg')
.frames(1)
.stream(snapBuff, {
end: true
});
// Reject on Error. // Reject on Error.
snapCmd.once('error', () => reject('An error occured while taking the snapshot')); snapCmd.once('error', () => reject('An error occured while taking the snapshot'));
// Send data on Finish. // Send data on Finish.
snapBuff.once('finnish', () => { snapCmd.once('end', () => {
try { try {
resolve(snapBuff.memStore.toString('base64')); resolve(snapBuff.memStore.toString('base64'));
} catch (e) { } catch (e) {
@ -253,7 +252,7 @@ Commander.prototype.takeSnapshot = function() {
} }
}); });
snapBuff.run(); snapCmd.run();
}).then(snap => { }).then(snap => {
{ {
dispatch(setSnapshotTaken()); dispatch(setSnapshotTaken());
@ -272,8 +271,9 @@ Commander.prototype.takeSnapshot = function() {
* @returns { Object } The fluent-ffmpeg command object. * @returns { Object } The fluent-ffmpeg command object.
*/ */
Commander.prototype.createCommands = function() { Commander.prototype.createCommands = function() {
// Clear inputs cmd = ffmpeg({
cmd._inputs = []; stdoutLines: 20
});
// TODO: Multi Protocol // TODO: Multi Protocol
source = 'rtsp://' + config().camIP + ':' + config().camPort + '/'; source = 'rtsp://' + config().camIP + ':' + config().camPort + '/';
@ -296,11 +296,21 @@ Commander.prototype.createCommands = function() {
// Output Options. // Output Options.
cmd.outputFormat('flv') cmd.outputFormat('flv')
.outputOptions(['-bufsize 50000k', '-tune film']) .outputOptions(['-bufsize 50000k', '-tune film'])
.output('rtmp://a.rtmp.youtube.com/live2/' + config().key); .output('rtmp://' + config()['stream-server'] + '/' + config().key);
// Clear inputs. // Register events.
snapCmd._inputs = []; cmd.on('error', (error, stdout, stderr) => crashed({
error,
stdout,
stderr
}));
// Can posibly be an error.
cmd.on('end', (stdout, stderr) => crashed({
stdout,
stderr
}));
//snapCmd._outputs = [];
// Snap Profile. // Snap Profile.
/*snapCmd.input(source + config().snapProfile) /*snapCmd.input(source + config().snapProfile)
.outputFormat('mjpeg') .outputFormat('mjpeg')
@ -327,39 +337,53 @@ Commander.prototype.createCommands = function() {
* @param { String } stdout * @param { String } stdout
* @param { String } stderr * @param { String } stderr
*/ */
function crashed(error, stdout, stderr) { function crashed({
error,
stdout,
stderr
}) {
let errorCode, handler; let errorCode, handler;
// We stopped... // Finished
if (getState().stream.running === 'STOPPED' || getState().stream.running === 'STOPPING') if (!error) {
return; dispatch(setStopped());
} else {
// We stopped...
if (getState().stream.running === 'STOPPED' || getState().stream.running === 'STOPPING')
return;
// Can't connect to the Camera // Can't connect to the Camera
if (error.message.indexOf(source) > -1) { if (error.message.indexOf(source) > -1) {
errorCode = 0; errorCode = 0;
handler = errorHandling.handlers.tryReconnect(config().camIP, config().camPort, handler = errorHandling.handlers.tryReconnect(config().camIP, config().camPort,
() => dispatch(self.start())); () => dispatch(self.start()).catch(() => {}));
} }
// Can't connect to the Internet / YouTube // Invalid Stream Key etc...
else if (error.message.indexOf(source + 'Input/output error') > -1 || error.message.indexOf('rtmp://a.rtmp.youtube.com/live2/' + config().key) > -1) { else if (error.message.indexOf('Operation not permitted') > -1 || (error.message.indexOf('Input/output') > -1 && error.message.indexOf('rtmp://' + config()['stream-server'] + '/' + config().key) > -1)) {
errorCode = 1; errorCode = 4;
}
handler = errorHandling.handlers.tryReconnect('a.rtmp.youtube.com/live2/', 1935, // Can't connect to the Internet / YouTube
() => dispatch(self.start())); else if (error.message.indexOf(source + ' Input/output error') > -1 || error.message.indexOf('rtmp://' + config()['stream-server'] + '/' + config().key) > -1) {
} errorCode = 1;
// Wrong FFMPEG Executable handler = errorHandling.handlers.tryReconnect(config()['stream-server'], 1935,
else if (error.message.indexOf('spawn') > -1 || error.message.indexOf('niceness') > -1) () => dispatch(self.start()).catch(() => {})); // TODO: Better Solution
errorCode = 2; }
// Stopped by us - SIGINT Shouldn't lead to a crash. // Wrong FFMPEG Executable
else if (error.message.indexOf('SIGINT') > -1 || error.message.indexOf('SIGKILL') > -1) { else if (error.message.indexOf('spawn') > -1 || error.message.indexOf('niceness') > -1)
return; errorCode = 2;
// Stopped by us - SIGINT Shouldn't lead to a crash.
else if (error.message.indexOf('SIGINT') > -1 || error.message.indexOf('SIGKILL') > -1) {
return;
}
} }
// Some unknown Problem, just try to restart. // Some unknown Problem, just try to restart.
else { if (!errorCode && errorCode !== 0) {
errorCode = 3; errorCode = 3;
// Just restart in a Second. // Just restart in a Second.
@ -387,16 +411,18 @@ Commander.middleware = store => next => action => {
let result = next(action); let result = next(action);
// If sth. has changed, we restart. // If sth. has changed, we restart.
if (self && getState().stream.running === 'RUNNING' if (self && (getState().stream.error !== false || getState().stream.running === 'RUNNING') &&
&& action.type === UPDATE_CONFIG) { action.type === UPDATE_CONFIG) {
if (action.data.key || if (action.data.key ||
action.data.ffmpegPath || action.data.ffmpegPath ||
action.data.customOutputOptions || action.data.customOutputOptions ||
action.data.customVideoOptions || action.data.customVideoOptions ||
action.data.customAudioOptions || action.data.customAudioOptions ||
action.data.camIP || action.data.camIP ||
action.data.camPort) { action.data.camPort ||
dispatch(self.restart()).catch(()=>{}); //TODO: error Handling action.data['stream-server'] ||
action.key) {
dispatch(self.restart()).catch(() => {}); //TODO: error Handling
} }
} }
return result; return result;

View file

@ -138,6 +138,17 @@ Communicator.prototype.sendAction = function(action) {
let type, change, state = getState(); let type, change, state = getState();
let convertError = () => {
return state.stream.error === false ? -1 : state.stream.error;
};
let convertRunning = () => {
if(state.stream.error !== false)
return 2;
return state.stream.running == 'RUNNING' ? 0 : 1; // TODO: CD
};
// All Legacy Stuff // All Legacy Stuff
switch (action.type) { switch (action.type) {
case SET_STARTED: case SET_STARTED:
@ -145,8 +156,8 @@ Communicator.prototype.sendAction = function(action) {
case SET_ERROR_RESOLVED: case SET_ERROR_RESOLVED:
type = 'startStop'; type = 'startStop';
change = { change = {
running: state.stream.running == 'RUNNING' ? 0 : 1, // TODO: CD running: convertRunning(),
error: state.stream.error || -1 error: convertError()
}; };
break; break;
@ -175,11 +186,14 @@ Communicator.prototype.sendAction = function(action) {
case HYDRATE: case HYDRATE:
socket.emit('meta', { socket.emit('meta', {
running: state.stream.running == 'RUNNING' ? 0 : 1, running: convertRunning(),
error: state.stream.error || -1, error: convertError(),
name: state.name, name: state.name,
config: state.config, config: state.config,
haveSettings: true // LEGACY haveSettings: true, // LEGACY
ssh: {
port: state.ssh.sshForwardPort
}
}); });
return; return;
@ -359,7 +373,7 @@ function handleCommand(command, callback) {
dispatch(SSHMan.restartTunnels()).then(msg => answerSuccess(msg)).catch(err => answerError(err)); dispatch(SSHMan.restartTunnels()).then(msg => answerSuccess(msg)).catch(err => answerError(err));
break; break;
case commands.SNAPSHOT: case commands.SNAPSHOT:
dispatch(commander.takeSnapshot()).then(snap => self.sendSnapshot(snap)) dispatch(commander.takeSnapshot()).then(snap => self.sendSnapshot(snap, command.sender))
.catch(error => self.sendMessage('Snapshot Failed', 'error', error, command.sender)); .catch(error => self.sendMessage('Snapshot Failed', 'error', error, command.sender));
break; break;
case commands.GET_LOGS: case commands.GET_LOGS:

View file

@ -49,13 +49,14 @@ module.exports.stopHandling = function() {
let stopper, let stopper,
handler = handlers[getState().stream.handleError]; handler = handlers[getState().stream.handleError];
dispatch(stopErrorHandling());
if (!getState().stream.handleError) if (!getState().stream.handleError)
return Promise.resolve(); return Promise.resolve();
if (!handler) if (!handler)
return Promise.resolve(); return Promise.resolve();
dispatch(stopErrorHandling());
stopper = handler.stop(); stopper = handler.stop();
if (!stopper.then) if (!stopper.then)

View file

@ -30,7 +30,7 @@ const customLevels = {
}; };
// Error Messages // Error Messages
const errors = ['Camera Disconnected', 'YoutTube Disconnected', 'Wrong ffmpeg executable.', 'Unknown error.']; const errors = ['Camera Disconnected', 'Stream-Server Disconnected', 'Wrong ffmpeg executable.', 'Unknown error.', 'Invalid Stream Key or the stream server doesn\'t accept connections.'];
const { const {
UPDATE_CONFIG, UPDATE_CONFIG,

View file

@ -41,8 +41,8 @@ function error(state = false, action) {
case SET_ERROR: case SET_ERROR:
return action.data; return action.data;
case SET_STARTED: case SET_STARTED:
case SET_STOPPED: case SET_STOPPED:
case SET_ERROR_RESOLVED: case SET_ERROR_RESOLVED:
return false; return false;
default: default:
return state; return state;