mirror of
https://github.com/vale981/agent
synced 2025-03-05 09:31:39 -05:00
Merge pull request #6 from indihub-space/stream-images-in-solo
streaming for solo, INDI messages buffer changed to 10Mb
This commit is contained in:
commit
19b20b5736
3 changed files with 108 additions and 31 deletions
|
@ -4,8 +4,8 @@ const (
|
||||||
GRPCMaxRecvMsgSize = 10 * 1024 * 1024
|
GRPCMaxRecvMsgSize = 10 * 1024 * 1024
|
||||||
GRPCMaxSendMsgSize = 10 * 1024 * 1024
|
GRPCMaxSendMsgSize = 10 * 1024 * 1024
|
||||||
|
|
||||||
INDIServerMaxRecvMsgSize = 49152
|
INDIServerMaxRecvMsgSize = GRPCMaxRecvMsgSize
|
||||||
INDIServerMaxSendMsgSize = 2048
|
INDIServerMaxSendMsgSize = GRPCMaxSendMsgSize
|
||||||
|
|
||||||
ModeSolo = "solo"
|
ModeSolo = "solo"
|
||||||
ModeShare = "share"
|
ModeShare = "share"
|
||||||
|
|
|
@ -3,6 +3,7 @@ package solo
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/indihub-space/agent/proto/indihub"
|
"github.com/indihub-space/agent/proto/indihub"
|
||||||
)
|
)
|
||||||
|
@ -58,6 +59,7 @@ func (s *Mode) Start() {
|
||||||
func (s *Mode) Stop() {
|
func (s *Mode) Stop() {
|
||||||
s.status = "stopped"
|
s.status = "stopped"
|
||||||
s.stopCh <- struct{}{}
|
s.stopCh <- struct{}{}
|
||||||
|
time.Sleep(3 * time.Second) // give some time to get and display solo-session summary
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Mode) GetStatus() map[string]interface{} {
|
func (s *Mode) GetStatus() map[string]interface{} {
|
||||||
|
|
133
solo/solo.go
133
solo/solo.go
|
@ -5,9 +5,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/clbanning/mxj"
|
"github.com/clbanning/mxj"
|
||||||
|
|
||||||
"github.com/fatih/color"
|
"github.com/fatih/color"
|
||||||
|
|
||||||
"github.com/indihub-space/agent/lib"
|
"github.com/indihub-space/agent/lib"
|
||||||
|
@ -15,8 +15,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
getProperties = []byte("<getProperties version='1.7'/>")
|
getProperties = []byte("<getProperties version='1.7'/>")
|
||||||
enableBLOB = "<enableBLOB device='%s'>Also</enableBLOB>"
|
getCCDProperties = "<getProperties device='%s' version='1.7'/>"
|
||||||
|
enableBLOBNever = "<enableBLOB device='%s'>Also</enableBLOB>"
|
||||||
|
enableBLOBOnly = "<enableBLOB device='%s'>Only</enableBLOB>"
|
||||||
|
|
||||||
setBLOBVector = []byte("<setBLOBVector")
|
setBLOBVector = []byte("<setBLOBVector")
|
||||||
defNumberVector = []byte("<defNumberVector")
|
defNumberVector = []byte("<defNumberVector")
|
||||||
|
@ -32,16 +34,26 @@ type Agent struct {
|
||||||
indiConn net.Conn
|
indiConn net.Conn
|
||||||
tunnel INDIHubSoloTunnel
|
tunnel INDIHubSoloTunnel
|
||||||
shouldExit bool
|
shouldExit bool
|
||||||
|
|
||||||
|
ccdConnMap map[string]net.Conn
|
||||||
|
ccdConnMapMu sync.Mutex
|
||||||
|
|
||||||
|
sessionID uint64
|
||||||
|
sessionToken string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(indiServerAddr string, tunnel INDIHubSoloTunnel) *Agent {
|
func New(indiServerAddr string, tunnel INDIHubSoloTunnel) *Agent {
|
||||||
return &Agent{
|
return &Agent{
|
||||||
indiServerAddr: indiServerAddr,
|
indiServerAddr: indiServerAddr,
|
||||||
tunnel: tunnel,
|
tunnel: tunnel,
|
||||||
|
ccdConnMap: make(map[string]net.Conn),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Agent) Start(sessionID uint64, sessionToken string) error {
|
func (p *Agent) Start(sessionID uint64, sessionToken string) error {
|
||||||
|
p.sessionID = sessionID
|
||||||
|
p.sessionToken = sessionToken
|
||||||
|
|
||||||
// open connection to real INDI-server
|
// open connection to real INDI-server
|
||||||
var err error
|
var err error
|
||||||
p.indiConn, err = net.Dial("tcp", p.indiServerAddr)
|
p.indiConn, err = net.Dial("tcp", p.indiServerAddr)
|
||||||
|
@ -60,7 +72,8 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
|
||||||
// listen INDI-server for data
|
// listen INDI-server for data
|
||||||
buf := make([]byte, lib.INDIServerMaxSendMsgSize, lib.INDIServerMaxSendMsgSize)
|
buf := make([]byte, lib.INDIServerMaxSendMsgSize, lib.INDIServerMaxSendMsgSize)
|
||||||
xmlFlattener := lib.NewXmlFlattener()
|
xmlFlattener := lib.NewXmlFlattener()
|
||||||
subscribedCCDs := map[string]bool{}
|
wg := sync.WaitGroup{}
|
||||||
|
var connNum uint32
|
||||||
for {
|
for {
|
||||||
if p.shouldExit {
|
if p.shouldExit {
|
||||||
break
|
break
|
||||||
|
@ -76,15 +89,6 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
|
||||||
xmlCommands := xmlFlattener.FeedChunk(buf[:n])
|
xmlCommands := xmlFlattener.FeedChunk(buf[:n])
|
||||||
|
|
||||||
for _, xmlCmd := range xmlCommands {
|
for _, xmlCmd := range xmlCommands {
|
||||||
// catch images
|
|
||||||
if bytes.HasPrefix(xmlCmd, setBLOBVector) {
|
|
||||||
err := p.sendImages(xmlCmd, 1, sessionID, sessionToken)
|
|
||||||
if err != nil {
|
|
||||||
log.Println("could not send image to INDIHUB in solo-mode:", err)
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// subscribe to BLOBs from CCDs by catching defNumberVector property with name="CCD_EXPOSURE"
|
// subscribe to BLOBs from CCDs by catching defNumberVector property with name="CCD_EXPOSURE"
|
||||||
if !bytes.HasPrefix(xmlCmd, defNumberVector) {
|
if !bytes.HasPrefix(xmlCmd, defNumberVector) {
|
||||||
continue
|
continue
|
||||||
|
@ -102,18 +106,26 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
|
||||||
defNumberVectorVal := defNumberVectorMap.(map[string]interface{})
|
defNumberVectorVal := defNumberVectorMap.(map[string]interface{})
|
||||||
|
|
||||||
if nameStr, ok := defNumberVectorVal["attr_name"].(string); ok && nameStr == "CCD_EXPOSURE" {
|
if nameStr, ok := defNumberVectorVal["attr_name"].(string); ok && nameStr == "CCD_EXPOSURE" {
|
||||||
if deviceStr, ok := defNumberVectorVal["attr_device"].(string); ok && !subscribedCCDs[deviceStr] {
|
if deviceStr, ok := defNumberVectorVal["attr_device"].(string); ok && p.getConnCCD(deviceStr) == nil {
|
||||||
_, err := p.indiConn.Write([]byte(fmt.Sprintf(enableBLOB, deviceStr)))
|
// disable receiving BLOBs on main connection
|
||||||
if err != nil {
|
if _, err := p.indiConn.Write([]byte(fmt.Sprintf(enableBLOBNever, deviceStr))); err != nil {
|
||||||
log.Printf("could not write to INDI-server in solo-mode: %s\n", err)
|
log.Printf("could not write to INDI-server in solo-mode: %s\n", err)
|
||||||
}
|
}
|
||||||
subscribedCCDs[deviceStr] = true
|
|
||||||
break
|
// launch Go-routine with connection per CCD and only BLOB enabled
|
||||||
|
connNum++
|
||||||
|
wg.Add(1)
|
||||||
|
go func(ccdName string, cNum uint32) {
|
||||||
|
defer wg.Done()
|
||||||
|
p.readFromCCD(ccdName, cNum)
|
||||||
|
}(deviceStr, connNum)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
// close connections to tunnel
|
// close connections to tunnel
|
||||||
if summary, err := p.tunnel.CloseAndRecv(); err == nil {
|
if summary, err := p.tunnel.CloseAndRecv(); err == nil {
|
||||||
gc := color.New(color.FgGreen)
|
gc := color.New(color.FgGreen)
|
||||||
|
@ -135,18 +147,81 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Agent) getConnCCD(ccdName string) net.Conn {
|
||||||
|
p.ccdConnMapMu.Lock()
|
||||||
|
defer p.ccdConnMapMu.Unlock()
|
||||||
|
return p.ccdConnMap[ccdName]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Agent) setConnCCD(ccdName string, conn net.Conn) {
|
||||||
|
p.ccdConnMapMu.Lock()
|
||||||
|
defer p.ccdConnMapMu.Unlock()
|
||||||
|
p.ccdConnMap[ccdName] = conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Agent) readFromCCD(ccdName string, cNum uint32) {
|
||||||
|
// open connection
|
||||||
|
log.Println("Connecting to INDI-device:", ccdName)
|
||||||
|
conn, err := net.Dial("tcp", p.indiServerAddr)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("could not connect to INDI-server for CCD '%s' in solo-mode: %s\n",
|
||||||
|
ccdName, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
log.Println("...OK")
|
||||||
|
|
||||||
|
p.setConnCCD(ccdName, conn)
|
||||||
|
|
||||||
|
// set connection to receive data
|
||||||
|
if _, err := conn.Write([]byte(fmt.Sprintf(getCCDProperties, ccdName))); err != nil {
|
||||||
|
log.Printf("getProperties: could not write to INDI-server for %s in solo-mode: %s\n", ccdName, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// enable BLOBS only
|
||||||
|
if _, err := conn.Write([]byte(fmt.Sprintf(enableBLOBOnly, ccdName))); err != nil {
|
||||||
|
log.Printf("getProperties: could not write to INDI-server for %s in solo-mode: %s\n", ccdName, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// read data from INDI-server and send to tunnel
|
||||||
|
buf := make([]byte, lib.INDIServerMaxSendMsgSize, lib.INDIServerMaxSendMsgSize)
|
||||||
|
resp := &indihub.Response{
|
||||||
|
Conn: cNum,
|
||||||
|
SessionID: p.sessionID,
|
||||||
|
SessionToken: p.sessionToken,
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
if p.shouldExit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := conn.Read(buf)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("could not read from INDI-server for %s in solo-mode: %s\n", ccdName, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// send data to tunnel
|
||||||
|
resp.Data = buf[:n]
|
||||||
|
if err := p.tunnel.Send(resp); err != nil {
|
||||||
|
log.Printf("could not send to tunnel for %s in solo-mode: %s\n", ccdName, err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Agent) Close() {
|
func (p *Agent) Close() {
|
||||||
|
// close connections to CCDs
|
||||||
|
p.ccdConnMapMu.Lock()
|
||||||
|
defer p.ccdConnMapMu.Unlock()
|
||||||
|
for ccdName, conn := range p.ccdConnMap {
|
||||||
|
log.Println("Closing connection to:", ccdName)
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// close main connection
|
||||||
p.indiConn.Close()
|
p.indiConn.Close()
|
||||||
p.shouldExit = true
|
p.shouldExit = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Agent) sendImages(imagesData []byte, cNum uint32, sessID uint64, sessToken string) error {
|
|
||||||
resp := &indihub.Response{
|
|
||||||
Data: imagesData,
|
|
||||||
Conn: cNum,
|
|
||||||
SessionID: sessID,
|
|
||||||
SessionToken: sessToken,
|
|
||||||
}
|
|
||||||
|
|
||||||
return p.tunnel.Send(resp)
|
|
||||||
}
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue