streamin for solo, INDI messages buffer changed to 10Mb

This commit is contained in:
dencoded 2020-04-01 15:32:45 -04:00
parent 800eb33b3a
commit a93cdb71be
3 changed files with 108 additions and 31 deletions

View file

@ -4,8 +4,8 @@ const (
GRPCMaxRecvMsgSize = 10 * 1024 * 1024
GRPCMaxSendMsgSize = 10 * 1024 * 1024
INDIServerMaxRecvMsgSize = 49152
INDIServerMaxSendMsgSize = 2048
INDIServerMaxRecvMsgSize = GRPCMaxRecvMsgSize
INDIServerMaxSendMsgSize = GRPCMaxSendMsgSize
ModeSolo = "solo"
ModeShare = "share"

View file

@ -3,6 +3,7 @@ package solo
import (
"context"
"log"
"time"
"github.com/indihub-space/agent/proto/indihub"
)
@ -58,6 +59,7 @@ func (s *Mode) Start() {
func (s *Mode) Stop() {
s.status = "stopped"
s.stopCh <- struct{}{}
time.Sleep(3 * time.Second) // give some time to get and display solo-session summary
}
func (s *Mode) GetStatus() map[string]interface{} {

View file

@ -5,9 +5,9 @@ import (
"fmt"
"log"
"net"
"sync"
"github.com/clbanning/mxj"
"github.com/fatih/color"
"github.com/indihub-space/agent/lib"
@ -16,7 +16,9 @@ import (
var (
getProperties = []byte("<getProperties version='1.7'/>")
enableBLOB = "<enableBLOB device='%s'>Also</enableBLOB>"
getCCDProperties = "<getProperties device='%s' version='1.7'/>"
enableBLOBNever = "<enableBLOB device='%s'>Also</enableBLOB>"
enableBLOBOnly = "<enableBLOB device='%s'>Only</enableBLOB>"
setBLOBVector = []byte("<setBLOBVector")
defNumberVector = []byte("<defNumberVector")
@ -32,16 +34,26 @@ type Agent struct {
indiConn net.Conn
tunnel INDIHubSoloTunnel
shouldExit bool
ccdConnMap map[string]net.Conn
ccdConnMapMu sync.Mutex
sessionID uint64
sessionToken string
}
func New(indiServerAddr string, tunnel INDIHubSoloTunnel) *Agent {
return &Agent{
indiServerAddr: indiServerAddr,
tunnel: tunnel,
ccdConnMap: make(map[string]net.Conn),
}
}
func (p *Agent) Start(sessionID uint64, sessionToken string) error {
p.sessionID = sessionID
p.sessionToken = sessionToken
// open connection to real INDI-server
var err error
p.indiConn, err = net.Dial("tcp", p.indiServerAddr)
@ -60,7 +72,8 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
// listen INDI-server for data
buf := make([]byte, lib.INDIServerMaxSendMsgSize, lib.INDIServerMaxSendMsgSize)
xmlFlattener := lib.NewXmlFlattener()
subscribedCCDs := map[string]bool{}
wg := sync.WaitGroup{}
var connNum uint32
for {
if p.shouldExit {
break
@ -76,15 +89,6 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
xmlCommands := xmlFlattener.FeedChunk(buf[:n])
for _, xmlCmd := range xmlCommands {
// catch images
if bytes.HasPrefix(xmlCmd, setBLOBVector) {
err := p.sendImages(xmlCmd, 1, sessionID, sessionToken)
if err != nil {
log.Println("could not send image to INDIHUB in solo-mode:", err)
}
continue
}
// subscribe to BLOBs from CCDs by catching defNumberVector property with name="CCD_EXPOSURE"
if !bytes.HasPrefix(xmlCmd, defNumberVector) {
continue
@ -102,18 +106,26 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
defNumberVectorVal := defNumberVectorMap.(map[string]interface{})
if nameStr, ok := defNumberVectorVal["attr_name"].(string); ok && nameStr == "CCD_EXPOSURE" {
if deviceStr, ok := defNumberVectorVal["attr_device"].(string); ok && !subscribedCCDs[deviceStr] {
_, err := p.indiConn.Write([]byte(fmt.Sprintf(enableBLOB, deviceStr)))
if err != nil {
if deviceStr, ok := defNumberVectorVal["attr_device"].(string); ok && p.getConnCCD(deviceStr) == nil {
// disable receiving BLOBs on main connection
if _, err := p.indiConn.Write([]byte(fmt.Sprintf(enableBLOBNever, deviceStr))); err != nil {
log.Printf("could not write to INDI-server in solo-mode: %s\n", err)
}
subscribedCCDs[deviceStr] = true
break
// launch Go-routine with connection per CCD and only BLOB enabled
connNum++
wg.Add(1)
go func(ccdName string, cNum uint32) {
defer wg.Done()
p.readFromCCD(ccdName, cNum)
}(deviceStr, connNum)
}
}
}
}
wg.Wait()
// close connections to tunnel
if summary, err := p.tunnel.CloseAndRecv(); err == nil {
gc := color.New(color.FgGreen)
@ -135,18 +147,81 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
return nil
}
func (p *Agent) getConnCCD(ccdName string) net.Conn {
p.ccdConnMapMu.Lock()
defer p.ccdConnMapMu.Unlock()
return p.ccdConnMap[ccdName]
}
func (p *Agent) setConnCCD(ccdName string, conn net.Conn) {
p.ccdConnMapMu.Lock()
defer p.ccdConnMapMu.Unlock()
p.ccdConnMap[ccdName] = conn
}
func (p *Agent) readFromCCD(ccdName string, cNum uint32) {
// open connection
log.Println("Connecting to INDI-device:", ccdName)
conn, err := net.Dial("tcp", p.indiServerAddr)
if err != nil {
log.Printf("could not connect to INDI-server for CCD '%s' in solo-mode: %s\n",
ccdName, err)
return
}
defer conn.Close()
log.Println("...OK")
p.setConnCCD(ccdName, conn)
// set connection to receive data
if _, err := conn.Write([]byte(fmt.Sprintf(getCCDProperties, ccdName))); err != nil {
log.Printf("getProperties: could not write to INDI-server for %s in solo-mode: %s\n", ccdName, err)
return
}
// enable BLOBS only
if _, err := conn.Write([]byte(fmt.Sprintf(enableBLOBOnly, ccdName))); err != nil {
log.Printf("getProperties: could not write to INDI-server for %s in solo-mode: %s\n", ccdName, err)
return
}
// read data from INDI-server and send to tunnel
buf := make([]byte, lib.INDIServerMaxSendMsgSize, lib.INDIServerMaxSendMsgSize)
resp := &indihub.Response{
Conn: cNum,
SessionID: p.sessionID,
SessionToken: p.sessionToken,
}
for {
if p.shouldExit {
break
}
n, err := conn.Read(buf)
if err != nil {
log.Printf("could not read from INDI-server for %s in solo-mode: %s\n", ccdName, err)
break
}
// send data to tunnel
resp.Data = buf[:n]
if err := p.tunnel.Send(resp); err != nil {
log.Printf("could not send to tunnel for %s in solo-mode: %s\n", ccdName, err)
break
}
}
}
func (p *Agent) Close() {
// close connections to CCDs
p.ccdConnMapMu.Lock()
defer p.ccdConnMapMu.Unlock()
for ccdName, conn := range p.ccdConnMap {
log.Println("Closing connection to:", ccdName)
conn.Close()
}
// close main connection
p.indiConn.Close()
p.shouldExit = true
}
func (p *Agent) sendImages(imagesData []byte, cNum uint32, sessID uint64, sessToken string) error {
resp := &indihub.Response{
Data: imagesData,
Conn: cNum,
SessionID: sessID,
SessionToken: sessToken,
}
return p.tunnel.Send(resp)
}