fixed bumber of issues in transport for share and solo modes

This commit is contained in:
dencoded 2020-04-08 00:10:30 -04:00
parent e081abaf9b
commit 9f6d72843e
3 changed files with 166 additions and 57 deletions

View file

@ -4,8 +4,8 @@ const (
GRPCMaxRecvMsgSize = 10 * 1024 * 1024 GRPCMaxRecvMsgSize = 10 * 1024 * 1024
GRPCMaxSendMsgSize = 10 * 1024 * 1024 GRPCMaxSendMsgSize = 10 * 1024 * 1024
INDIServerMaxRecvMsgSize = GRPCMaxRecvMsgSize INDIServerMaxRecvMsgSize = 49152
INDIServerMaxSendMsgSize = GRPCMaxSendMsgSize INDIServerMaxSendMsgSize = 2048
ModeSolo = "solo" ModeSolo = "solo"
ModeShare = "share" ModeShare = "share"

View file

@ -12,6 +12,8 @@ import (
"github.com/indihub-space/agent/proto/indihub" "github.com/indihub-space/agent/proto/indihub"
) )
const queueSize = 4096
type INDIHubTunnel interface { type INDIHubTunnel interface {
Send(*indihub.Response) error Send(*indihub.Response) error
Recv() (*indihub.Request, error) Recv() (*indihub.Request, error)
@ -27,6 +29,8 @@ type TcpProxy struct {
connMap map[uint32]net.Conn connMap map[uint32]net.Conn
filter *hostutils.INDIFilter filter *hostutils.INDIFilter
respPool *sync.Pool
} }
type PublicServerAddr struct { type PublicServerAddr struct {
@ -41,6 +45,13 @@ func New(name string, addr string, tunnel INDIHubTunnel, filter *hostutils.INDIF
Tunnel: tunnel, Tunnel: tunnel,
connMap: map[uint32]net.Conn{}, connMap: map[uint32]net.Conn{},
filter: filter, filter: filter,
respPool: &sync.Pool{
New: func() interface{} {
return &indihub.Response{
Data: make([]byte, lib.INDIServerMaxRecvMsgSize),
}
},
},
} }
} }
@ -100,6 +111,12 @@ func (p *TcpProxy) close(cNum uint32) {
func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, sessionToken string) { func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, sessionToken string) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
// run response sending queue
respCh := make(chan *indihub.Response, queueSize)
defer close(respCh)
go p.sendResponses(respCh)
addrReceived := false addrReceived := false
xmlFlattener := map[uint32]*lib.XmlFlattener{} xmlFlattener := map[uint32]*lib.XmlFlattener{}
for { for {
@ -156,7 +173,7 @@ func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, se
if isNewConn { if isNewConn {
// INDI Server responses // INDI Server responses
wg.Add(1) wg.Add(1)
go func(conn net.Conn, cNum uint32, sessID uint64, sessToken string) { go func(conn net.Conn, cNum uint32, sessID uint64, sessToken string, ch chan *indihub.Response) {
defer wg.Done() defer wg.Done()
readBuf := make([]byte, lib.INDIServerMaxRecvMsgSize) readBuf := make([]byte, lib.INDIServerMaxRecvMsgSize)
for { for {
@ -176,19 +193,16 @@ func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, se
return return
} }
// send request to tunnel // send response to tunnel
resp := &indihub.Response{ resp := p.respPool.Get().(*indihub.Response)
Data: readBuf[:n], resp.Conn = cNum
Conn: cNum, resp.SessionToken = sessionToken
SessionID: sessID, resp.SessionID = sessionID
SessionToken: sessToken, resp.Data = resp.Data[:n]
copy(resp.Data, readBuf[:n])
ch <- resp
} }
if err := p.Tunnel.Send(resp); err != nil { }(c, in.Conn, sessionID, sessionToken, respCh)
log.Printf("Failed to send a response to %s tunnel: %v", p.Name, err)
return
}
}
}(c, in.Conn, sessionID, sessionToken)
} }
if len(xmlCommands) == 0 { if len(xmlCommands) == 0 {
@ -217,3 +231,12 @@ func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, se
} }
wg.Wait() wg.Wait()
} }
func (p *TcpProxy) sendResponses(respCh chan *indihub.Response) {
for resp := range respCh {
if err := p.Tunnel.Send(resp); err != nil {
log.Printf("Failed to send a response to %s tunnel: %v", p.Name, err)
}
p.respPool.Put(resp)
}
}

View file

@ -3,6 +3,7 @@ package solo
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"log" "log"
"net" "net"
"sync" "sync"
@ -14,10 +15,12 @@ import (
"github.com/indihub-space/agent/proto/indihub" "github.com/indihub-space/agent/proto/indihub"
) )
const queueSize = 4096
var ( var (
getProperties = []byte("<getProperties version='1.7'/>") getProperties = []byte("<getProperties version='1.7'/>")
getCCDProperties = "<getProperties device='%s' version='1.7'/>" getCCDProperties = "<getProperties device='%s' version='1.7'/>"
enableBLOBNever = "<enableBLOB device='%s'>Also</enableBLOB>" enableBLOBNever = "<enableBLOB device='%s'>Never</enableBLOB>"
enableBLOBOnly = "<enableBLOB device='%s'>Only</enableBLOB>" enableBLOBOnly = "<enableBLOB device='%s'>Only</enableBLOB>"
setBLOBVector = []byte("<setBLOBVector") setBLOBVector = []byte("<setBLOBVector")
@ -40,6 +43,8 @@ type Agent struct {
sessionID uint64 sessionID uint64
sessionToken string sessionToken string
respPool *sync.Pool
} }
func New(indiServerAddr string, tunnel INDIHubSoloTunnel) *Agent { func New(indiServerAddr string, tunnel INDIHubSoloTunnel) *Agent {
@ -47,6 +52,13 @@ func New(indiServerAddr string, tunnel INDIHubSoloTunnel) *Agent {
indiServerAddr: indiServerAddr, indiServerAddr: indiServerAddr,
tunnel: tunnel, tunnel: tunnel,
ccdConnMap: make(map[string]net.Conn), ccdConnMap: make(map[string]net.Conn),
respPool: &sync.Pool{
New: func() interface{} {
return &indihub.Response{
Data: make([]byte, lib.INDIServerMaxRecvMsgSize),
}
},
},
} }
} }
@ -56,21 +68,20 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
// open connection to real INDI-server // open connection to real INDI-server
var err error var err error
p.indiConn, err = net.Dial("tcp", p.indiServerAddr) p.indiConn, err = p.connectToINDI()
if err != nil { if err != nil {
log.Printf("could not connect to INDI-server in solo-mode: %s\n", err) log.Printf("could not connect to INDI-server in solo-mode: %s\n", err)
return err return err
} }
defer p.indiConn.Close() defer p.indiConn.Close()
// set connection to receive data // run response sending queue
if _, err := p.indiConn.Write(getProperties); err != nil { respCh := make(chan *indihub.Response, queueSize)
log.Printf("could not write to INDI-server in solo-mode: %s\n", err) defer close(respCh)
return err go p.sendResponses(respCh)
}
// listen INDI-server for data // listen INDI-server for data
buf := make([]byte, lib.INDIServerMaxSendMsgSize, lib.INDIServerMaxSendMsgSize) buf := make([]byte, lib.INDIServerMaxSendMsgSize)
xmlFlattener := lib.NewXmlFlattener() xmlFlattener := lib.NewXmlFlattener()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
var connNum uint32 var connNum uint32
@ -80,6 +91,15 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
} }
n, err := p.indiConn.Read(buf) n, err := p.indiConn.Read(buf)
if err == io.EOF {
// reconnect
if p.indiConn, err = p.connectToINDI(); err != nil {
log.Printf("Failed to re-connect to INDI-server is solo mode: %s", err)
break
} else {
n, err = p.indiConn.Read(buf)
}
}
if err != nil { if err != nil {
log.Println("could not read from INDI-server in solo-mode:", err) log.Println("could not read from INDI-server in solo-mode:", err)
break break
@ -115,10 +135,10 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
// launch Go-routine with connection per CCD and only BLOB enabled // launch Go-routine with connection per CCD and only BLOB enabled
connNum++ connNum++
wg.Add(1) wg.Add(1)
go func(ccdName string, cNum uint32) { go func(ccdName string, cNum uint32, ch chan *indihub.Response) {
defer wg.Done() defer wg.Done()
p.readFromCCD(ccdName, cNum) p.readFromCCD(ccdName, cNum, ch)
}(deviceStr, connNum) }(deviceStr, connNum, respCh)
} }
} }
} }
@ -147,6 +167,45 @@ func (p *Agent) Start(sessionID uint64, sessionToken string) error {
return nil return nil
} }
func (p *Agent) connectToINDI() (net.Conn, error) {
log.Println("Connecting to INDI-server in solo mode...")
conn, err := net.Dial("tcp", p.indiServerAddr)
if err != nil {
log.Printf("could not connect to INDI-server in solo-mode: %s\n", err)
return nil, err
}
// set connection to receive data
if _, err := conn.Write(getProperties); err != nil {
log.Printf("could not write to INDI-server in solo-mode: %s\n", err)
conn.Close()
return nil, err
}
// disable BLOBs for CCDs if any
names := p.getCurrCCD()
for _, ccdName := range names {
if _, err := conn.Write([]byte(fmt.Sprintf(enableBLOBNever, ccdName))); err != nil {
log.Printf("could not write to INDI-server in solo-mode: %s\n", err)
}
}
log.Println("...OK")
return conn, nil
}
func (p *Agent) getCurrCCD() []string {
p.ccdConnMapMu.Lock()
defer p.ccdConnMapMu.Unlock()
names := []string{}
for ccdName := range p.ccdConnMap {
names = append(names, ccdName)
}
return names
}
func (p *Agent) getConnCCD(ccdName string) net.Conn { func (p *Agent) getConnCCD(ccdName string) net.Conn {
p.ccdConnMapMu.Lock() p.ccdConnMapMu.Lock()
defer p.ccdConnMapMu.Unlock() defer p.ccdConnMapMu.Unlock()
@ -159,59 +218,86 @@ func (p *Agent) setConnCCD(ccdName string, conn net.Conn) {
p.ccdConnMap[ccdName] = conn p.ccdConnMap[ccdName] = conn
} }
func (p *Agent) readFromCCD(ccdName string, cNum uint32) { func (p *Agent) readFromCCD(ccdName string, cNum uint32, ch chan *indihub.Response) {
// open connection // open connection
log.Println("Connecting to INDI-device:", ccdName) conn, err := p.connectToCCD(ccdName)
conn, err := net.Dial("tcp", p.indiServerAddr)
if err != nil { if err != nil {
log.Printf("could not connect to INDI-server for CCD '%s' in solo-mode: %s\n",
ccdName, err)
return return
} }
defer conn.Close() defer conn.Close()
log.Println("...OK")
p.setConnCCD(ccdName, conn)
// set connection to receive data
if _, err := conn.Write([]byte(fmt.Sprintf(getCCDProperties, ccdName))); err != nil {
log.Printf("getProperties: could not write to INDI-server for %s in solo-mode: %s\n", ccdName, err)
return
}
// enable BLOBS only
if _, err := conn.Write([]byte(fmt.Sprintf(enableBLOBOnly, ccdName))); err != nil {
log.Printf("getProperties: could not write to INDI-server for %s in solo-mode: %s\n", ccdName, err)
return
}
// read data from INDI-server and send to tunnel // read data from INDI-server and send to tunnel
buf := make([]byte, lib.INDIServerMaxSendMsgSize, lib.INDIServerMaxSendMsgSize) buf := make([]byte, lib.INDIServerMaxSendMsgSize)
resp := &indihub.Response{
Conn: cNum,
SessionID: p.sessionID,
SessionToken: p.sessionToken,
}
for { for {
if p.shouldExit { if p.shouldExit {
break break
} }
n, err := conn.Read(buf) n, err := conn.Read(buf)
if err == io.EOF {
// reconnect
if conn, err = p.connectToCCD(ccdName); err != nil {
log.Printf("Failed to re-connect to INDI-server for %s is solo mode: %s\n", ccdName, err)
break
} else {
n, err = conn.Read(buf)
}
}
if err != nil { if err != nil {
log.Printf("could not read from INDI-server for %s in solo-mode: %s\n", ccdName, err) log.Printf("could not read from INDI-server for %s in solo-mode: %s\n", ccdName, err)
break break
} }
// send data to tunnel // send data to tunnel
resp.Data = buf[:n] resp := p.respPool.Get().(*indihub.Response)
resp.Conn = cNum
resp.SessionToken = p.sessionToken
resp.SessionID = p.sessionID
resp.Data = resp.Data[:n]
copy(resp.Data, buf[:n])
ch <- resp
}
}
func (p *Agent) sendResponses(respCh chan *indihub.Response) {
for resp := range respCh {
if err := p.tunnel.Send(resp); err != nil { if err := p.tunnel.Send(resp); err != nil {
log.Printf("could not send to tunnel for %s in solo-mode: %s\n", ccdName, err) log.Printf("Failed to send a response to tunnel in solo-mode: %s", err)
break
} }
p.respPool.Put(resp)
} }
} }
func (p *Agent) connectToCCD(ccdName string) (net.Conn, error) {
// open connection
log.Println("Connecting to INDI-device:", ccdName)
conn, err := net.Dial("tcp", p.indiServerAddr)
if err != nil {
log.Printf("could not connect to INDI-server for CCD '%s' in solo-mode: %s\n",
ccdName, err)
return nil, err
}
log.Println("...OK")
// set connection to receive data
if _, err := conn.Write([]byte(fmt.Sprintf(getCCDProperties, ccdName))); err != nil {
log.Printf("getProperties: could not write to INDI-server for %s in solo-mode: %s\n", ccdName, err)
conn.Close()
return nil, err
}
// enable BLOBS only
if _, err := conn.Write([]byte(fmt.Sprintf(enableBLOBOnly, ccdName))); err != nil {
log.Printf("getProperties: could not write to INDI-server for %s in solo-mode: %s\n", ccdName, err)
conn.Close()
return nil, err
}
p.setConnCCD(ccdName, conn)
return conn, nil
}
func (p *Agent) Close() { func (p *Agent) Close() {
// close connections to CCDs // close connections to CCDs
p.ccdConnMapMu.Lock() p.ccdConnMapMu.Lock()