diff --git a/lib/const.go b/lib/const.go index 11fe3d6..4a390f3 100644 --- a/lib/const.go +++ b/lib/const.go @@ -4,8 +4,8 @@ const ( GRPCMaxRecvMsgSize = 10 * 1024 * 1024 GRPCMaxSendMsgSize = 10 * 1024 * 1024 - INDIServerMaxRecvMsgSize = GRPCMaxRecvMsgSize - INDIServerMaxSendMsgSize = GRPCMaxSendMsgSize + INDIServerMaxRecvMsgSize = 49152 + INDIServerMaxSendMsgSize = 2048 ModeSolo = "solo" ModeShare = "share" diff --git a/proxy/proxy.go b/proxy/proxy.go index f141768..ed3d3e8 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -12,6 +12,8 @@ import ( "github.com/indihub-space/agent/proto/indihub" ) +const queueSize = 4096 + type INDIHubTunnel interface { Send(*indihub.Response) error Recv() (*indihub.Request, error) @@ -27,6 +29,8 @@ type TcpProxy struct { connMap map[uint32]net.Conn filter *hostutils.INDIFilter + + respPool *sync.Pool } type PublicServerAddr struct { @@ -41,6 +45,13 @@ func New(name string, addr string, tunnel INDIHubTunnel, filter *hostutils.INDIF Tunnel: tunnel, connMap: map[uint32]net.Conn{}, filter: filter, + respPool: &sync.Pool{ + New: func() interface{} { + return &indihub.Response{ + Data: make([]byte, lib.INDIServerMaxRecvMsgSize), + } + }, + }, } } @@ -100,6 +111,12 @@ func (p *TcpProxy) close(cNum uint32) { func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, sessionToken string) { wg := sync.WaitGroup{} + + // run response sending queue + respCh := make(chan *indihub.Response, queueSize) + defer close(respCh) + go p.sendResponses(respCh) + addrReceived := false xmlFlattener := map[uint32]*lib.XmlFlattener{} for { @@ -156,7 +173,7 @@ func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, se if isNewConn { // INDI Server responses wg.Add(1) - go func(conn net.Conn, cNum uint32, sessID uint64, sessToken string) { + go func(conn net.Conn, cNum uint32, sessID uint64, sessToken string, ch chan *indihub.Response) { defer wg.Done() readBuf := make([]byte, lib.INDIServerMaxRecvMsgSize) for { @@ -176,19 +193,16 @@ func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, se return } - // send request to tunnel - resp := &indihub.Response{ - Data: readBuf[:n], - Conn: cNum, - SessionID: sessID, - SessionToken: sessToken, - } - if err := p.Tunnel.Send(resp); err != nil { - log.Printf("Failed to send a response to %s tunnel: %v", p.Name, err) - return - } + // send response to tunnel + resp := p.respPool.Get().(*indihub.Response) + resp.Conn = cNum + resp.SessionToken = sessionToken + resp.SessionID = sessionID + resp.Data = resp.Data[:n] + copy(resp.Data, readBuf[:n]) + ch <- resp } - }(c, in.Conn, sessionID, sessionToken) + }(c, in.Conn, sessionID, sessionToken, respCh) } if len(xmlCommands) == 0 { @@ -217,3 +231,12 @@ func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, se } wg.Wait() } + +func (p *TcpProxy) sendResponses(respCh chan *indihub.Response) { + for resp := range respCh { + if err := p.Tunnel.Send(resp); err != nil { + log.Printf("Failed to send a response to %s tunnel: %v", p.Name, err) + } + p.respPool.Put(resp) + } +} diff --git a/solo/solo.go b/solo/solo.go index af31b96..7de6ccc 100644 --- a/solo/solo.go +++ b/solo/solo.go @@ -3,6 +3,7 @@ package solo import ( "bytes" "fmt" + "io" "log" "net" "sync" @@ -14,10 +15,12 @@ import ( "github.com/indihub-space/agent/proto/indihub" ) +const queueSize = 4096 + var ( getProperties = []byte("") getCCDProperties = "" - enableBLOBNever = "Also" + enableBLOBNever = "Never" enableBLOBOnly = "Only" setBLOBVector = []byte("