From 50972f57372081195d7cfd9141276f4c440cd629 Mon Sep 17 00:00:00 2001 From: dencoded <33698537+dencoded@users.noreply.github.com> Date: Mon, 24 Feb 2020 23:40:55 -0500 Subject: [PATCH] open sourcing agent --- .gitignore | 4 + Makefile | 44 ++ README.md | 68 +++ broadcast/broadcast.go | 189 +++++++ config/config.go | 37 ++ go.mod | 14 + go.sum | 81 +++ hostutils/indifilter.go | 30 ++ lib/const.go | 9 + lib/type.go | 28 + lib/xml.go | 136 +++++ logutil/logutil.go | 20 + main.go | 578 +++++++++++++++++++++ manager/client.go | 124 +++++ proto/indihub/indihub.pb.go | 999 ++++++++++++++++++++++++++++++++++++ proxy/proxy.go | 218 ++++++++ solo/solo.go | 166 ++++++ version/version.go | 3 + websockets/server.go | 221 ++++++++ websockets/tls.go | 185 +++++++ 20 files changed, 3154 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 README.md create mode 100644 broadcast/broadcast.go create mode 100644 config/config.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 hostutils/indifilter.go create mode 100644 lib/const.go create mode 100644 lib/type.go create mode 100644 lib/xml.go create mode 100644 logutil/logutil.go create mode 100644 main.go create mode 100644 manager/client.go create mode 100644 proto/indihub/indihub.pb.go create mode 100644 proxy/proxy.go create mode 100644 solo/solo.go create mode 100644 version/version.go create mode 100644 websockets/server.go create mode 100644 websockets/tls.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dfeb799 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea +.indihub-agent +indihub.json +bin/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3bf5d13 --- /dev/null +++ b/Makefile @@ -0,0 +1,44 @@ +build-macos64: + mkdir -p ./bin/indihub-agent-macos64 + GOOS=darwin GOARCH=amd64 go build -v -o ./bin/indihub-agent-macos64/indihub-agent ./ + +build-linux64: + mkdir -p ./bin/indihub-agent-linux64 + GOOS=linux GOARCH=amd64 go build -v -o ./bin/indihub-agent-linux64/indihub-agent ./ + +build-unix64: + mkdir -p ./bin/indihub-agent-unix64 + GOOS=freebsd GOARCH=amd64 go build -v -o ./bin/indihub-agent-unix64/indihub-agent ./ + +build-win64: + mkdir -p ./bin/indihub-agent-win64 + GOOS=windows GOARCH=amd64 go build -v -o ./bin/indihub-agent-win64/indihub-agent.exe ./ + +build-win32: + mkdir -p ./bin/indihub-agent-win32 + GOOS=windows GOARCH=386 go build -v -o ./bin/indihub-agent-win32/indihub-agent.exe ./ + +build-raspberrypi: + mkdir -p ./bin/indihub-agent-raspberrypi + GOOS=linux GOARCH=arm GOARM=5 go build -v -o ./bin/indihub-agent-raspberrypi/indihub-agent ./ + +build-all: build-macos64 build-linux64 build-unix64 build-win64 build-win32 build-raspberrypi + +release: build-all + zip -r ./bin/indihub-agent-macos64.zip ./bin/indihub-agent-macos64 + openssl dgst -sha256 ./bin/indihub-agent-macos64.zip > ./bin/indihub-agent-macos64.sha256 + + tar czf ./bin/indihub-agent-linux64.tar.gz ./bin/indihub-agent-linux64 + openssl dgst -sha256 ./bin/indihub-agent-linux64.tar.gz > ./bin/indihub-agent-linux64.sha256 + + tar czf ./bin/indihub-agent-unix64.tar.gz ./bin/indihub-agent-unix64 + openssl dgst -sha256 ./bin/indihub-agent-unix64.tar.gz > ./bin/indihub-agent-unix64.sha256 + + zip -r ./bin/indihub-agent-win64.zip ./bin/indihub-agent-win64 + openssl dgst -sha256 ./bin/indihub-agent-win64.zip > ./bin/indihub-agent-win64.sha256 + + zip -r ./bin/indihub-agent-win32.zip ./bin/indihub-agent-win32 + openssl dgst -sha256 ./bin/indihub-agent-win32.zip > ./bin/indihub-agent-win32.sha256 + + tar czf ./bin/indihub-agent-raspberrypi.tar.gz ./bin/indihub-agent-raspberrypi + openssl dgst -sha256 ./bin/indihub-agent-raspberrypi.tar.gz > ./bin/indihub-agent-raspberrypi.sha256 \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..cf6308b --- /dev/null +++ b/README.md @@ -0,0 +1,68 @@ +# indihub-agent + +The `indihub-agent` is a command line interface tool to connect your astro-photography equipment to [INDIHUB](https://indihub.space) network. The network can be used to share you equipment with others, use others' equipment remotely or just use your equipment without sharing but still contribute your images to INDIHUB-network. + +NOTE: all astro-photos taken via INDIHUB-network (with auto-guiding or main imaging cameras) will be processed by INDIHUB cloud pipeline and used for scientific purposes. + +## Prerequisites + +0. You have a desire to contribute to Space exploration and sustainability projects. +1. You have motorized astro-photography equipment connected to your home network. +2. Your equipment is controlled by [IND-server](https://github.com/indilib/indi), manuals and docs can be found on [INDI-lib](http://indilib.org) Web-site. +3. INDI-server is controlled by [INDI Web Manager](https://github.com/knro/indiwebmanager). +4. You have ready to use INDI-profile created with INDI Web Manager. +5. Raspberry PI (or computer) where you run `indihub-agent` is connected to Internet so it can register your equipment on INDIHUB-network. + +## Registration on INDIHUB-network as a host + +Registration is very easy - you don't have to do anything. + +The `indihub-agent` doesn't require any signup or token to join INDIHUB. + +When you first run `indihub-agent` and it is connected to INDIHUB-network successfully - it receives token from network and saves in the same folder in the file `indihub.json`. Please keep this file there and don't loose it as it identifies you as a host on INDIHUB-network. Also, this file will be read and used automatically for all next runs of `indihub-agent`. + +## indihub-agent modes + +There are four modes available at the moment: + +1. `share` - open remote access to your equipment via INDIHUB-network of telescopes, so you can provide remote imaging sessions to your guests. +2. `solo` - use you equipment without opening remote access but equipment is still connected to INDIHUB-network and all images taken are contributed for scientific purposes. +3. `broadcast` - broadcast you imaging session to observers watching it via INDI-clients, in this case without any equipment remote access and sharing (experimental). +4. `robotic` - open remote access to your equipment to be controlled by scheduler running in INDIHUB-cloud (experimental). + +The mode is specified via `-mode` parameter, i.e. to run indihub-agent in a share-mode you will need run command: + +```bash +./indihub-agent -indi-profile=my-profile -mode=solo +``` + +The only mandatory parameter is `-indi-profile` where you specify profile name created with [INDI Web Manager](https://github.com/knro/indiwebmanager). All other parameters have default valyes. I.e. `-mode` default value is `solo`. + +To get usage of all parameters just run `indihub-agent -help`. + +The latest `indihub-agent` release can be downloaded from [releases](https://github.com/indihub-space/agent/releases) or [indihub.space](https://indihub.space) Web-site. + +## Building indihub-agent + +You will need to install [Golang](https://golang.org/dl/). + +There are `make` build-commands for different platforms available: + +- `make build-macos64` - build for macOS (64 bit) +- `make build-linux64` - build for Linux (64 bit) +- `make build-unix64` - build for Unix (64 bit) +- `make build-win64` - build for Windows (64 bit) +- `make build-win32` - build for Windows (32 bit) +- `make build-raspberrypi` - build for Raspberry Pi (ARM5) + +## Contributing + +PRs and issues are highly appreciated. TODO: `indihub-agent` speaks to the INDIHUB-cloud so special dev-server will be added for development purposes soon. + +## What is next + +The INDIHUB-network is in its beta release at the moment. We board new host on the network, collect data to most importantly - feedback from our first hosts. + +Also we work on partnerships. If you are interested please send us email at [info@indihub.space](mailto:info@indihub.space). + +And last but not least - don't forget to signup to our mailing list on [indihub.space](https://indihub.space) so you will know all the news first! \ No newline at end of file diff --git a/broadcast/broadcast.go b/broadcast/broadcast.go new file mode 100644 index 0000000..b40265e --- /dev/null +++ b/broadcast/broadcast.go @@ -0,0 +1,189 @@ +package broadcast + +import ( + "bytes" + "io" + "log" + "net" + "sync/atomic" + + "github.com/fatih/color" + + "github.com/indihub-space/agent/lib" + "github.com/indihub-space/agent/proto/indihub" +) + +var ( + getPropertiesStart = []byte("") +) + +// XmlFlattener reads XML from INDI-server by chunks and returns elements +type XmlFlattener struct { + buffer []byte + nextEndElement []byte +} + +func init() { + mxj.PrependAttrWithHyphen(false) + mxj.SetAttrPrefix("attr_") + mxj.DecodeSimpleValuesAsMap(false) +} + +// NewXMLReader returns XmlReader +func NewXmlFlattener() *XmlFlattener { + return &XmlFlattener{ + buffer: make([]byte, 0, INDIServerMaxRecvMsgSize), + } +} + +func (r *XmlFlattener) FeedChunk(chunk []byte) [][]byte { + if len(chunk) == 0 { + return nil + } + + elements := make([][]byte, 0, 10) + + r.buffer = append(r.buffer, chunk...) + + for { + if len(r.nextEndElement) == 0 { + // look for start of new entity + if n := bytes.IndexByte(r.buffer, '<'); n > 0 { + r.buffer = r.buffer[n:] + } + + if r.buffer[0] != '<' { + // something went wrong + return elements + } + + // special case of message-element without closing tag + if bytes.HasPrefix(r.buffer, elementMessage) || bytes.HasPrefix(r.buffer, elementDelProperty) || + bytes.HasPrefix(r.buffer, elementGetProperties) { + r.nextEndElement = elementSingleTagEnd + } else { + // get next end element value + if n := bytes.IndexByte(r.buffer, ' '); n < 1 { + // something went wrong + return elements + } else { + r.nextEndElement = []byte{'<', '/'} + if r.buffer[n-1] == '\n' { + n = n - 1 + } + r.nextEndElement = append(r.nextEndElement, r.buffer[1:n]...) + r.nextEndElement = append(r.nextEndElement, '>') + } + } + } + + if n := bytes.Index(r.buffer, r.nextEndElement); n == -1 { + // should read next chunk + return elements + } else { + // element closed, add it to return + end := n + len(r.nextEndElement) + 1 + if end > len(r.buffer) { + end = len(r.buffer) + } + elements = append(elements, r.buffer[:end]) + r.nextEndElement = r.nextEndElement[:0] + if len(r.buffer) == end { + r.buffer = r.buffer[:0] + break + } + + r.buffer = r.buffer[end:] + } + } + + return elements +} + +func (r *XmlFlattener) ConvertChunkToJSON(chunk []byte) [][]byte { + elements := r.FeedChunk(chunk) + if len(elements) == 0 { + return [][]byte{} + } + + jsonElements := make([][]byte, 0, len(elements)) + for _, el := range elements { + mapVal, err := mxj.NewMapXml(el, true) + if err != nil { + log.Println("could not parse XML chunk:", err) + continue + } + jsonEl, err := mapVal.Json() + if err != nil { + log.Println("could not convert map to JSON:", err) + continue + } + jsonElements = append(jsonElements, jsonEl) + } + + return jsonElements +} + +func (r *XmlFlattener) ConvertJSONToXML(jsonData []byte) ([]byte, error) { + mapVal, err := mxj.NewMapJson(jsonData) + if err != nil { + return nil, err + } + + xmlData, err := mapVal.Xml() + if err != nil { + return nil, err + } + return xmlData, nil +} diff --git a/logutil/logutil.go b/logutil/logutil.go new file mode 100644 index 0000000..68a7929 --- /dev/null +++ b/logutil/logutil.go @@ -0,0 +1,20 @@ +package logutil + +import ( + "log" + "os" +) + +var IsDev = false + +func init() { + IsDev = os.Getenv("INDIHUB_DEV") != "" +} + +func LogError(format string, args ...interface{}) { + if !IsDev { + return + } + + log.Printf(format, args...) +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..2eb5c71 --- /dev/null +++ b/main.go @@ -0,0 +1,578 @@ +package main + +import ( + "context" + "crypto/tls" + "flag" + "fmt" + "log" + "net" + "net/http" + "os" + "os/signal" + "runtime" + "sync" + "time" + + "github.com/fatih/color" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + _ "google.golang.org/grpc/encoding/gzip" + + "github.com/indihub-space/agent/broadcast" + "github.com/indihub-space/agent/config" + "github.com/indihub-space/agent/hostutils" + "github.com/indihub-space/agent/lib" + "github.com/indihub-space/agent/logutil" + "github.com/indihub-space/agent/manager" + "github.com/indihub-space/agent/proto/indihub" + "github.com/indihub-space/agent/proxy" + "github.com/indihub-space/agent/solo" + "github.com/indihub-space/agent/version" + "github.com/indihub-space/agent/websockets" +) + +const ( + defaultWSPort uint64 = 2020 + + modeSolo = "solo" + modeBroadcast = "broadcast" + modeShare = "share" + modeRobotic = "robotic" +) + +var ( + flagINDIServerManagerAddr string + flagPHD2ServerAddr string + flagINDIProfile string + flagToken string + flagConfFile string + flagSoloINDIServerAddr string + flagBroadcastINDIServerAddr string + flagCompress bool + flagWSServer bool + flagWSIsTLS bool + flagWSPort uint64 + flagWSOrigins string + flagMode string + + indiServerAddr string + + httpClientSM = http.Client{} +) + +func init() { + flag.StringVar( + &flagINDIServerManagerAddr, + "indi-server-manager", + "raspberrypi.local:8624", + "INDI-server Manager address (host:port)", + ) + flag.StringVar( + &flagMode, + "mode", + modeSolo, + `indihub-agent mode (deafult value is "solo"), there four modes:\n +solo - equipment sharing is not possible, you are connected to INDIHUB and contributing images +sharing - you are sharing equipment with another INDIHUB user (agent will output connection info) +broadcast - equipment sharing is not possible, you are broadcasting your experience to any number of INDIHUB users +robotic - equipment sharing is not possible, your equipment is controlled by INDIHUB AI (you can still watch what it is doing!) +`, + ) + flag.BoolVar( + &flagCompress, + "compress", + true, + "Enable gzip-compression", + ) + flag.StringVar( + &flagSoloINDIServerAddr, + "solo-indi-server", + "localhost:7624", + "agent INDI-server address (host:port) for solo-mode", + ) + flag.StringVar( + &flagBroadcastINDIServerAddr, + "broadcast-indi-server", + "localhost:7624", + "agent INDI-server address (host:port) for broadcast-mode", + ) + flag.StringVar( + &flagPHD2ServerAddr, + "phd2-server", + "", + "PHD2-server address (host:port)", + ) + flag.StringVar( + &flagToken, + "token", + "", + "token - can be requested at https://indihub.space/token", + ) + flag.StringVar( + &flagConfFile, + "conf", + "indihub.json", + "INDIHub Agent config file path", + ) + flag.StringVar( + &flagINDIProfile, + "indi-profile", + "", + "Name of INDI-profile to share via indihub", + ) + flag.BoolVar( + &flagWSServer, + "ws-server", + true, + "launch Websocket server to control equipment via Websocket API", + ) + flag.BoolVar( + &flagWSIsTLS, + "ws-tls", + false, + "serve web-socket over TLS with self-signed certificate", + ) + flag.Uint64Var( + &flagWSPort, + "ws-port", + defaultWSPort, + "port to start web socket-server on", + ) + flag.StringVar( + &flagWSOrigins, + "ws-origins", + "", + "comma-separated list of origins allowed to connect to WS-server", + ) +} + +func main() { + flag.Parse() + + if flagMode != modeSolo && flagMode != modeShare && flagMode != modeBroadcast && flagMode != modeRobotic { + log.Fatalf("Unknown mode '%s' provided\n", flagMode) + } + + indiHubAddr := "relay.indihub.io:7668" // tls one + if logutil.IsDev { + indiHubAddr = "localhost:7667" // TODO: change this to optional DEV server + } + + if flagINDIServerManagerAddr == "" { + log.Fatal("'indi-server-manager' parameter is missing, the 'host:port' format is expected") + } + + indiHost, _, err := net.SplitHostPort(flagINDIServerManagerAddr) + if err != nil { + log.Fatal("Bad syntax for 'indi-server-manager' parameter, the 'host:port' format is expected") + } + + if flagINDIProfile == "" { + log.Fatal("'indi-profile' parameter is required") + } + + // read token from flag or from config file if exists + if flagToken == "" { + conf, err := config.Read(flagConfFile) + if err == nil { + flagToken = conf.Token + } + } + + // connect to INDI-server Manager + log.Printf("Connection to local INDI-Server Manager on %s...\n", flagINDIServerManagerAddr) + managerClient := manager.NewClient(flagINDIServerManagerAddr) + running, currINDIProfile, err := managerClient.GetStatus() + if err != nil { + log.Fatal(err) + } + log.Println("...OK") + + // start required profile if it is not active and running + if !running || currINDIProfile != flagINDIProfile { + log.Printf("Setting active INDI-profile to '%s'\n", flagINDIProfile) + if err := managerClient.StopServer(); err != nil { + log.Fatal(err) + } + if err := managerClient.StartProfile(flagINDIProfile); err != nil { + log.Fatal(err) + } + } else { + log.Printf("INDI-server is running with active INDI-profile '%s'\n", flagINDIProfile) + } + + // get profile connect data + indiProfile, err := managerClient.GetProfile(flagINDIProfile) + if err != nil { + log.Fatalf("could not get INDI-profile from INDI-server manager: %s", err) + } + indiServerAddr = fmt.Sprintf("%s:%d", indiHost, indiProfile.Port) + + // get profile drivers data + indiDrivers, err := managerClient.GetDrivers() + if err != nil { + log.Fatalf("could not get INDI-drivers info from INDI-server manager: %s", err) + } + log.Println("INDIDrivers:") + for _, d := range indiDrivers { + log.Printf("%+v", *d) + } + + // test connect to local INDI-server + log.Printf("Test connection to local INDI-Server on %s...\n", indiServerAddr) + indiConn, err := net.Dial("tcp", indiServerAddr) + if err != nil { + log.Fatal(err) + } + indiConn.Close() + log.Println("...OK") + + if flagPHD2ServerAddr != "" { + log.Printf("Test connection to local PHD2-Server on %s...\n", flagPHD2ServerAddr) + phd2Conn, err := net.Dial("tcp", flagPHD2ServerAddr) + if err != nil { + log.Fatal(err) + } + phd2Conn.Close() + log.Println("...OK") + } + + // prepare indihub-host data + indiHubHost := &indihub.INDIHubHost{ + Token: flagToken, + Profile: &indihub.INDIProfile{ + Id: indiProfile.ID, + Name: indiProfile.Name, + Port: indiProfile.Port, + Autostart: indiProfile.AutoStart, + Autoconnect: indiProfile.AutoConnect, + }, + Drivers: make([]*indihub.INDIDriver, len(indiDrivers)), + SoloMode: flagMode == modeSolo, + IsPHD2: flagPHD2ServerAddr != "", + IsRobotic: flagMode == modeRobotic, + IsBroadcast: flagMode == modeBroadcast, + AgentVersion: version.AgentVersion, + Os: runtime.GOOS, + Arch: runtime.GOARCH, + } + for i, driver := range indiDrivers { + indiHubHost.Drivers[i] = &indihub.INDIDriver{ + Binary: driver.Binary, + Family: driver.Family, + Label: driver.Label, + Version: driver.Version, + Role: driver.Role, + Custom: driver.Custom, + Name: driver.Name, + } + } + + log.Println("Connecting to the indihub.space cloud...") + opts := []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(lib.GRPCMaxSendMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(lib.GRPCMaxRecvMsgSize)), + } + if flagCompress { + opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip"))) + } + + if logutil.IsDev { + opts = append(opts, grpc.WithInsecure()) + } else { + tlsConfig := &tls.Config{} + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + } + + conn, err := grpc.Dial( + indiHubAddr, + opts..., + ) + if err != nil { + log.Fatal(err) + } + log.Println("...OK") + + indiHubClient := indihub.NewINDIHubClient(conn) + + // register host + regInfo, err := indiHubClient.RegisterHost(context.Background(), indiHubHost) + if err != nil { + log.Fatal(err) + } + + log.Println("Current agent version:", version.AgentVersion) + log.Println("Latest agent version:", regInfo.AgentVersion) + + if version.AgentVersion < regInfo.AgentVersion { + yc := color.New(color.FgYellow) + yc.Println() + yc.Println(" ************************************************************") + yc.Println(" * WARNING: you version of agent is outdated! *") + yc.Println(" * *") + yc.Println(" * Please download the latest version from: *") + yc.Println(" * https://indihub.space/downloads *") + yc.Println(" * *") + yc.Println(" ************************************************************") + yc.Println(" ") + } + + log.Printf("Access token: %s\n", regInfo.Token) + log.Printf("Host session token: %s\n", regInfo.SessionIDPublic) + + // create config for new host if flag wasn't provided + if flagToken == "" { + conf := &config.Config{ + Token: regInfo.Token, + } + if err := config.Write(flagConfFile, conf); err != nil { + log.Printf("Could not create config file %s: %s", flagConfFile, err) + } + } + + // start WS-server + wsServer := websockets.NewWsServer( + regInfo.Token, + indiServerAddr, + flagPHD2ServerAddr, + flagWSPort, + flagWSIsTLS, + flagWSOrigins, + ) + go wsServer.Start() + + // start session + switch flagMode { + + case modeSolo: + // solo mode - equipment sharing is not available but host still sends all images to INDIHUB + log.Println("'solo' parameter was provided. Your session is in solo-mode: equipment sharing is not available") + log.Println("Starting INDIHUB agent in solo mode!") + + soloClient, err := indiHubClient.SoloMode(context.Background()) + if err != nil { + log.Fatalf("Could not start agent in solo mode: %v", err) + } + + soloProxy := solo.New( + "INDI-Server Solo-mode", + indiServerAddr, + soloClient, + ) + + go func() { + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, os.Kill) + + <-sigint + + // stop WS-server + wsServer.Stop() + + log.Println("Closing INDIHUB solo-session") + + // close connections to local INDI-server and to INDI client + soloProxy.Close() + + time.Sleep(1 * time.Second) + + // close grpc client connection + conn.Close() + }() + + // start solo mode INDI-server tcp-proxy + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + soloProxy.Start(flagSoloINDIServerAddr, regInfo.SessionID, regInfo.SessionIDPublic) + }() + + wg.Wait() + + case modeShare, modeRobotic: + // main equipment sharing mode + if flagMode == modeRobotic { + log.Println("'robotic' parameter was provided. Your session is in robotic-mode: equipment sharing is not available") + } + // open INDI server tunnel + log.Println("Starting INDI-Server in the cloud...") + indiServTunnel, err := indiHubClient.INDIServer( + context.Background(), + ) + if err != nil { + log.Fatal(err) + } + log.Println("...OK") + + indiFilterConf := &hostutils.INDIFilterConfig{} // TODO: add reading config + indiFilter := hostutils.NewINDIFilter(indiFilterConf) + indiServerProxy := proxy.New("INDI-Server", indiServerAddr, indiServTunnel, indiFilter) + + // start PHD2 server proxy if specified + var phd2ServerProxy *proxy.TcpProxy + if flagPHD2ServerAddr != "" { + // open PHD2 server tunnel + log.Println("Starting PHD2-Server in the cloud...") + phd2ServTunnel, err := indiHubClient.PHD2Server( + context.Background(), + ) + if err != nil { + log.Fatal(err) + } + log.Println("...OK") + phd2ServerProxy = proxy.New("PHD2-Server", flagPHD2ServerAddr, phd2ServTunnel, nil) + } + + go func() { + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, os.Kill) + + <-sigint + + // stop WS-server + wsServer.Stop() + + // close connections to tunnels + indiServTunnel.CloseSend() + if phd2ServerProxy != nil { + phd2ServerProxy.Tunnel.CloseSend() + } + + // close grpc client connection + conn.Close() + + // close connections to local INDI-server and PHD2-Server + indiServerProxy.Close() + if phd2ServerProxy != nil { + phd2ServerProxy.Close() + } + }() + + serverAddrChan := make(chan proxy.PublicServerAddr, 3) + + wg := sync.WaitGroup{} + + // INDI Server Proxy start + waitNum := 1 + wg.Add(1) + go func() { + defer wg.Done() + indiServerProxy.Start(serverAddrChan, regInfo.SessionID, regInfo.SessionIDPublic) + }() + + if flagPHD2ServerAddr != "" { + waitNum = 2 + wg.Add(1) + go func() { + defer wg.Done() + phd2ServerProxy.Start(serverAddrChan, regInfo.SessionID, regInfo.SessionIDPublic) + }() + } + + addrData := []proxy.PublicServerAddr{} + for i := 0; i < waitNum; i++ { + sAddr := <-serverAddrChan + addrData = append(addrData, sAddr) + } + + c := color.New(color.FgCyan) + gc := color.New(color.FgGreen) + yc := color.New(color.FgYellow) + rc := color.New(color.FgMagenta) + if flagMode != modeRobotic { + c.Println() + c.Println(" ************************************************************") + c.Println(" * INDIHUB public address list!! *") + c.Println(" ************************************************************") + c.Println(" ") + for _, sAddr := range addrData { + gc.Printf(" %s: %s\n", sAddr.Name, sAddr.Addr) + } + c.Println(" ") + c.Println(" ************************************************************") + c.Println() + c.Println(" Please provide your guest with this information:") + c.Println() + c.Println(" 1. Public address list from the above") + c.Println(" 2. Focal length and aperture of your main telescope") + c.Println(" 3. Focal length and aperture of your guiding telescope") + c.Println(" 4. Type of guiding you use: PHD2 or guiding via camera") + c.Println(" 5. Names of your imaging camera and guiding cameras") + c.Println() + yc.Println(" NOTE: These public addresses will be available ONLY until") + yc.Println(" agent is running! (Ctrl+C will stop the session)") + c.Println() + } else { + c.Println() + c.Println(" ************************************************************") + c.Println(" * INDIHUB robotic-session started!! *") + c.Println(" ************************************************************") + c.Println(" ") + } + + wg.Wait() + + c.Println() + c.Println(" ************************************************************") + c.Println(" * INDIHUB session finished!! *") + c.Println(" ************************************************************") + c.Println(" ") + if flagMode != modeRobotic { + for _, sAddr := range addrData { + rc.Printf(" %s: %s - CLOSED!!\n", sAddr.Name, sAddr.Addr) + } + } else { + c.Println(" * INDIHUB robotic-session finished. *") + c.Println(" * Thank you for your contribution! *") + } + c.Println(" ") + c.Println(" ************************************************************") + + case modeBroadcast: + // broadcast - broadcasting all replies from INDI-server to INDIHUB, equipment sharing is not available + log.Println("Starting INDIHUB agent in broadcast mode!") + + broadcastClient, err := indiHubClient.BroadcastINDIServer(context.Background()) + if err != nil { + log.Fatalf("Could not start agent in broadcast mode: %v", err) + } + + broadcastProxy := broadcast.New( + "INDI-Server Solo-mode", + indiServerAddr, + broadcastClient, + ) + + go func() { + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, os.Kill) + + <-sigint + + // stop WS-server + wsServer.Stop() + + log.Println("Closing INDIHUB solo-session") + + // close connections to local INDI-server and to INDI client + broadcastProxy.Close() + + time.Sleep(1 * time.Second) + + // close grpc client connection + conn.Close() + }() + + // start broadcast mode INDI-server tcp-proxy + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + broadcastProxy.Start(regInfo.SessionID, regInfo.SessionIDPublic, flagBroadcastINDIServerAddr) + }() + + wg.Wait() + } +} diff --git a/manager/client.go b/manager/client.go new file mode 100644 index 0000000..886245a --- /dev/null +++ b/manager/client.go @@ -0,0 +1,124 @@ +package manager + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "github.com/indihub-space/agent/lib" +) + +const ( + False = "False" + True = "True" +) + +type client struct { + httpClient http.Client + addr string +} + +func NewClient(managerServerAddr string) *client { + return &client{ + httpClient: http.Client{}, + addr: managerServerAddr, + } +} + +func (c *client) GetStatus() (bool, string, error) { + resp, err := c.httpClient.Get(fmt.Sprintf("http://%s/api/server/status", c.addr)) + if err != nil { + return false, "", err + } + defer resp.Body.Close() + + respJson, err := ioutil.ReadAll(resp.Body) + if err != nil { + return false, "", err + } + + respData := []map[string]string{} + if err := json.Unmarshal(respJson, &respData); err != nil { + return false, "", err + } + + if len(respData) != 1 { + return false, "", fmt.Errorf("wrong slice length in INDI-server manager reply %v", respData) + } + + return respData[0]["status"] == True, respData[0]["active_profile"], nil +} + +func (c *client) StopServer() error { + resp, err := c.httpClient.Post(fmt.Sprintf("http://%s/api/server/stop", c.addr), + "application/json", nil) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("could not stop INDI-server, response code %d", resp.StatusCode) + } + + return nil +} + +func (c *client) GetProfile(profile string) (*lib.INDIProfile, error) { + resp, err := c.httpClient.Get(fmt.Sprintf("http://%s/api/profiles/%s", c.addr, profile)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + respJson, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + indiProfile := &lib.INDIProfile{} + if err := json.Unmarshal(respJson, indiProfile); err != nil { + return nil, err + } + + return indiProfile, nil +} + +func (c *client) StartProfile(profile string) error { + resp, err := c.httpClient.Post(fmt.Sprintf("http://%s/api/server/start/%s", c.addr, profile), + "application/json", nil) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("could not start INDI-server with profile %s, response code %d", + profile, + resp.StatusCode, + ) + } + + return nil +} + +func (c *client) GetDrivers() ([]*lib.INDIDriver, error) { + resp, err := c.httpClient.Get(fmt.Sprintf("http://%s/api/server/drivers", c.addr)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + respJson, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + drivers := []*lib.INDIDriver{} + if err := json.Unmarshal(respJson, &drivers); err != nil { + return nil, err + } + + return drivers, nil +} diff --git a/proto/indihub/indihub.pb.go b/proto/indihub/indihub.pb.go new file mode 100644 index 0000000..791e6ab --- /dev/null +++ b/proto/indihub/indihub.pb.go @@ -0,0 +1,999 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: indihub.proto + +package indihub + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Request struct { + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + Conn uint32 `protobuf:"varint,2,opt,name=conn,proto3" json:"conn,omitempty"` + Closed bool `protobuf:"varint,3,opt,name=closed,proto3" json:"closed,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} +func (*Request) Descriptor() ([]byte, []int) { + return fileDescriptor_84cfc05744c754a5, []int{0} +} + +func (m *Request) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Request.Unmarshal(m, b) +} +func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Request.Marshal(b, m, deterministic) +} +func (m *Request) XXX_Merge(src proto.Message) { + xxx_messageInfo_Request.Merge(m, src) +} +func (m *Request) XXX_Size() int { + return xxx_messageInfo_Request.Size(m) +} +func (m *Request) XXX_DiscardUnknown() { + xxx_messageInfo_Request.DiscardUnknown(m) +} + +var xxx_messageInfo_Request proto.InternalMessageInfo + +func (m *Request) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *Request) GetConn() uint32 { + if m != nil { + return m.Conn + } + return 0 +} + +func (m *Request) GetClosed() bool { + if m != nil { + return m.Closed + } + return false +} + +type Response struct { + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + Conn uint32 `protobuf:"varint,2,opt,name=conn,proto3" json:"conn,omitempty"` + SessionID uint64 `protobuf:"varint,3,opt,name=sessionID,proto3" json:"sessionID,omitempty"` + SessionToken string `protobuf:"bytes,4,opt,name=sessionToken,proto3" json:"sessionToken,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { + return fileDescriptor_84cfc05744c754a5, []int{1} +} + +func (m *Response) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Response.Unmarshal(m, b) +} +func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Response.Marshal(b, m, deterministic) +} +func (m *Response) XXX_Merge(src proto.Message) { + xxx_messageInfo_Response.Merge(m, src) +} +func (m *Response) XXX_Size() int { + return xxx_messageInfo_Response.Size(m) +} +func (m *Response) XXX_DiscardUnknown() { + xxx_messageInfo_Response.DiscardUnknown(m) +} + +var xxx_messageInfo_Response proto.InternalMessageInfo + +func (m *Response) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *Response) GetConn() uint32 { + if m != nil { + return m.Conn + } + return 0 +} + +func (m *Response) GetSessionID() uint64 { + if m != nil { + return m.SessionID + } + return 0 +} + +func (m *Response) GetSessionToken() string { + if m != nil { + return m.SessionToken + } + return "" +} + +type INDIProfile struct { + Id uint32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Port uint32 `protobuf:"varint,3,opt,name=port,proto3" json:"port,omitempty"` + Autostart uint32 `protobuf:"varint,4,opt,name=autostart,proto3" json:"autostart,omitempty"` + Autoconnect uint32 `protobuf:"varint,5,opt,name=autoconnect,proto3" json:"autoconnect,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *INDIProfile) Reset() { *m = INDIProfile{} } +func (m *INDIProfile) String() string { return proto.CompactTextString(m) } +func (*INDIProfile) ProtoMessage() {} +func (*INDIProfile) Descriptor() ([]byte, []int) { + return fileDescriptor_84cfc05744c754a5, []int{2} +} + +func (m *INDIProfile) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_INDIProfile.Unmarshal(m, b) +} +func (m *INDIProfile) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_INDIProfile.Marshal(b, m, deterministic) +} +func (m *INDIProfile) XXX_Merge(src proto.Message) { + xxx_messageInfo_INDIProfile.Merge(m, src) +} +func (m *INDIProfile) XXX_Size() int { + return xxx_messageInfo_INDIProfile.Size(m) +} +func (m *INDIProfile) XXX_DiscardUnknown() { + xxx_messageInfo_INDIProfile.DiscardUnknown(m) +} + +var xxx_messageInfo_INDIProfile proto.InternalMessageInfo + +func (m *INDIProfile) GetId() uint32 { + if m != nil { + return m.Id + } + return 0 +} + +func (m *INDIProfile) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *INDIProfile) GetPort() uint32 { + if m != nil { + return m.Port + } + return 0 +} + +func (m *INDIProfile) GetAutostart() uint32 { + if m != nil { + return m.Autostart + } + return 0 +} + +func (m *INDIProfile) GetAutoconnect() uint32 { + if m != nil { + return m.Autoconnect + } + return 0 +} + +type INDIDriver struct { + Binary string `protobuf:"bytes,1,opt,name=binary,proto3" json:"binary,omitempty"` + Family string `protobuf:"bytes,2,opt,name=family,proto3" json:"family,omitempty"` + Label string `protobuf:"bytes,3,opt,name=label,proto3" json:"label,omitempty"` + Version string `protobuf:"bytes,4,opt,name=version,proto3" json:"version,omitempty"` + Role string `protobuf:"bytes,5,opt,name=role,proto3" json:"role,omitempty"` + Custom bool `protobuf:"varint,6,opt,name=custom,proto3" json:"custom,omitempty"` + Name string `protobuf:"bytes,7,opt,name=name,proto3" json:"name,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *INDIDriver) Reset() { *m = INDIDriver{} } +func (m *INDIDriver) String() string { return proto.CompactTextString(m) } +func (*INDIDriver) ProtoMessage() {} +func (*INDIDriver) Descriptor() ([]byte, []int) { + return fileDescriptor_84cfc05744c754a5, []int{3} +} + +func (m *INDIDriver) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_INDIDriver.Unmarshal(m, b) +} +func (m *INDIDriver) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_INDIDriver.Marshal(b, m, deterministic) +} +func (m *INDIDriver) XXX_Merge(src proto.Message) { + xxx_messageInfo_INDIDriver.Merge(m, src) +} +func (m *INDIDriver) XXX_Size() int { + return xxx_messageInfo_INDIDriver.Size(m) +} +func (m *INDIDriver) XXX_DiscardUnknown() { + xxx_messageInfo_INDIDriver.DiscardUnknown(m) +} + +var xxx_messageInfo_INDIDriver proto.InternalMessageInfo + +func (m *INDIDriver) GetBinary() string { + if m != nil { + return m.Binary + } + return "" +} + +func (m *INDIDriver) GetFamily() string { + if m != nil { + return m.Family + } + return "" +} + +func (m *INDIDriver) GetLabel() string { + if m != nil { + return m.Label + } + return "" +} + +func (m *INDIDriver) GetVersion() string { + if m != nil { + return m.Version + } + return "" +} + +func (m *INDIDriver) GetRole() string { + if m != nil { + return m.Role + } + return "" +} + +func (m *INDIDriver) GetCustom() bool { + if m != nil { + return m.Custom + } + return false +} + +func (m *INDIDriver) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +type INDIHubHost struct { + Token string `protobuf:"bytes,1,opt,name=token,proto3" json:"token,omitempty"` + Profile *INDIProfile `protobuf:"bytes,2,opt,name=profile,proto3" json:"profile,omitempty"` + Drivers []*INDIDriver `protobuf:"bytes,3,rep,name=drivers,proto3" json:"drivers,omitempty"` + SoloMode bool `protobuf:"varint,4,opt,name=soloMode,proto3" json:"soloMode,omitempty"` + IsPHD2 bool `protobuf:"varint,5,opt,name=isPHD2,proto3" json:"isPHD2,omitempty"` + IsRobotic bool `protobuf:"varint,6,opt,name=isRobotic,proto3" json:"isRobotic,omitempty"` + AgentVersion string `protobuf:"bytes,7,opt,name=agentVersion,proto3" json:"agentVersion,omitempty"` + Os string `protobuf:"bytes,8,opt,name=os,proto3" json:"os,omitempty"` + Arch string `protobuf:"bytes,9,opt,name=arch,proto3" json:"arch,omitempty"` + IsBroadcast bool `protobuf:"varint,10,opt,name=isBroadcast,proto3" json:"isBroadcast,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *INDIHubHost) Reset() { *m = INDIHubHost{} } +func (m *INDIHubHost) String() string { return proto.CompactTextString(m) } +func (*INDIHubHost) ProtoMessage() {} +func (*INDIHubHost) Descriptor() ([]byte, []int) { + return fileDescriptor_84cfc05744c754a5, []int{4} +} + +func (m *INDIHubHost) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_INDIHubHost.Unmarshal(m, b) +} +func (m *INDIHubHost) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_INDIHubHost.Marshal(b, m, deterministic) +} +func (m *INDIHubHost) XXX_Merge(src proto.Message) { + xxx_messageInfo_INDIHubHost.Merge(m, src) +} +func (m *INDIHubHost) XXX_Size() int { + return xxx_messageInfo_INDIHubHost.Size(m) +} +func (m *INDIHubHost) XXX_DiscardUnknown() { + xxx_messageInfo_INDIHubHost.DiscardUnknown(m) +} + +var xxx_messageInfo_INDIHubHost proto.InternalMessageInfo + +func (m *INDIHubHost) GetToken() string { + if m != nil { + return m.Token + } + return "" +} + +func (m *INDIHubHost) GetProfile() *INDIProfile { + if m != nil { + return m.Profile + } + return nil +} + +func (m *INDIHubHost) GetDrivers() []*INDIDriver { + if m != nil { + return m.Drivers + } + return nil +} + +func (m *INDIHubHost) GetSoloMode() bool { + if m != nil { + return m.SoloMode + } + return false +} + +func (m *INDIHubHost) GetIsPHD2() bool { + if m != nil { + return m.IsPHD2 + } + return false +} + +func (m *INDIHubHost) GetIsRobotic() bool { + if m != nil { + return m.IsRobotic + } + return false +} + +func (m *INDIHubHost) GetAgentVersion() string { + if m != nil { + return m.AgentVersion + } + return "" +} + +func (m *INDIHubHost) GetOs() string { + if m != nil { + return m.Os + } + return "" +} + +func (m *INDIHubHost) GetArch() string { + if m != nil { + return m.Arch + } + return "" +} + +func (m *INDIHubHost) GetIsBroadcast() bool { + if m != nil { + return m.IsBroadcast + } + return false +} + +type RegisterInfo struct { + Token string `protobuf:"bytes,1,opt,name=token,proto3" json:"token,omitempty"` + SessionID uint64 `protobuf:"varint,2,opt,name=sessionID,proto3" json:"sessionID,omitempty"` + SessionIDPublic string `protobuf:"bytes,3,opt,name=sessionIDPublic,proto3" json:"sessionIDPublic,omitempty"` + AgentVersion string `protobuf:"bytes,4,opt,name=agentVersion,proto3" json:"agentVersion,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RegisterInfo) Reset() { *m = RegisterInfo{} } +func (m *RegisterInfo) String() string { return proto.CompactTextString(m) } +func (*RegisterInfo) ProtoMessage() {} +func (*RegisterInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_84cfc05744c754a5, []int{5} +} + +func (m *RegisterInfo) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RegisterInfo.Unmarshal(m, b) +} +func (m *RegisterInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RegisterInfo.Marshal(b, m, deterministic) +} +func (m *RegisterInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_RegisterInfo.Merge(m, src) +} +func (m *RegisterInfo) XXX_Size() int { + return xxx_messageInfo_RegisterInfo.Size(m) +} +func (m *RegisterInfo) XXX_DiscardUnknown() { + xxx_messageInfo_RegisterInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_RegisterInfo proto.InternalMessageInfo + +func (m *RegisterInfo) GetToken() string { + if m != nil { + return m.Token + } + return "" +} + +func (m *RegisterInfo) GetSessionID() uint64 { + if m != nil { + return m.SessionID + } + return 0 +} + +func (m *RegisterInfo) GetSessionIDPublic() string { + if m != nil { + return m.SessionIDPublic + } + return "" +} + +func (m *RegisterInfo) GetAgentVersion() string { + if m != nil { + return m.AgentVersion + } + return "" +} + +type SoloSummary struct { + ImagesNum uint64 `protobuf:"varint,1,opt,name=imagesNum,proto3" json:"imagesNum,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SoloSummary) Reset() { *m = SoloSummary{} } +func (m *SoloSummary) String() string { return proto.CompactTextString(m) } +func (*SoloSummary) ProtoMessage() {} +func (*SoloSummary) Descriptor() ([]byte, []int) { + return fileDescriptor_84cfc05744c754a5, []int{6} +} + +func (m *SoloSummary) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SoloSummary.Unmarshal(m, b) +} +func (m *SoloSummary) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SoloSummary.Marshal(b, m, deterministic) +} +func (m *SoloSummary) XXX_Merge(src proto.Message) { + xxx_messageInfo_SoloSummary.Merge(m, src) +} +func (m *SoloSummary) XXX_Size() int { + return xxx_messageInfo_SoloSummary.Size(m) +} +func (m *SoloSummary) XXX_DiscardUnknown() { + xxx_messageInfo_SoloSummary.DiscardUnknown(m) +} + +var xxx_messageInfo_SoloSummary proto.InternalMessageInfo + +func (m *SoloSummary) GetImagesNum() uint64 { + if m != nil { + return m.ImagesNum + } + return 0 +} + +func (m *SoloSummary) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func init() { + proto.RegisterType((*Request)(nil), "Request") + proto.RegisterType((*Response)(nil), "Response") + proto.RegisterType((*INDIProfile)(nil), "INDIProfile") + proto.RegisterType((*INDIDriver)(nil), "INDIDriver") + proto.RegisterType((*INDIHubHost)(nil), "INDIHubHost") + proto.RegisterType((*RegisterInfo)(nil), "RegisterInfo") + proto.RegisterType((*SoloSummary)(nil), "SoloSummary") +} + +func init() { proto.RegisterFile("indihub.proto", fileDescriptor_84cfc05744c754a5) } + +var fileDescriptor_84cfc05744c754a5 = []byte{ + // 606 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0x51, 0x6b, 0xdb, 0x3c, + 0x14, 0xad, 0xdd, 0xb4, 0xb1, 0xe5, 0xf8, 0xfb, 0x40, 0x1b, 0xc3, 0x94, 0x3d, 0x18, 0xc3, 0x36, + 0xbf, 0xcc, 0x94, 0xec, 0x07, 0x0c, 0x46, 0x1e, 0x9a, 0x87, 0x95, 0xa2, 0x8c, 0xbd, 0xcb, 0xb6, + 0xda, 0x8a, 0xd9, 0x56, 0x26, 0xc9, 0x85, 0xfe, 0x80, 0xbd, 0xef, 0x6d, 0x7f, 0x62, 0xec, 0x37, + 0x8e, 0x7b, 0xad, 0xc4, 0x4e, 0xcb, 0x20, 0x6f, 0xf7, 0x1c, 0x29, 0xb9, 0xe7, 0x9c, 0x7b, 0x65, + 0x12, 0xcb, 0xae, 0x96, 0xf7, 0x7d, 0x59, 0x6c, 0xb5, 0xb2, 0x2a, 0x5b, 0x93, 0x39, 0x13, 0xdf, + 0x7b, 0x61, 0x2c, 0xa5, 0x64, 0x56, 0x73, 0xcb, 0x13, 0x2f, 0xf5, 0xf2, 0x05, 0xc3, 0x1a, 0xb8, + 0x4a, 0x75, 0x5d, 0xe2, 0xa7, 0x5e, 0x1e, 0x33, 0xac, 0xe9, 0x2b, 0x72, 0x5e, 0x35, 0xca, 0x88, + 0x3a, 0x39, 0x4d, 0xbd, 0x3c, 0x60, 0x0e, 0x65, 0x96, 0x04, 0x4c, 0x98, 0xad, 0xea, 0x8c, 0x38, + 0xfa, 0xbf, 0x5e, 0x93, 0xd0, 0x08, 0x63, 0xa4, 0xea, 0xd6, 0x2b, 0xfc, 0xbb, 0x19, 0x1b, 0x09, + 0x9a, 0x91, 0x85, 0x03, 0x5f, 0xd4, 0x37, 0xd1, 0x25, 0xb3, 0xd4, 0xcb, 0x43, 0x76, 0xc0, 0x65, + 0x3f, 0x3c, 0x12, 0xad, 0xaf, 0x57, 0xeb, 0x1b, 0xad, 0x6e, 0x65, 0x23, 0xe8, 0x7f, 0xc4, 0x97, + 0x35, 0xf6, 0x8d, 0x99, 0x2f, 0x6b, 0xe8, 0xda, 0xf1, 0x56, 0x60, 0xd7, 0x90, 0x61, 0x0d, 0xdc, + 0x56, 0x69, 0x8b, 0x0d, 0x63, 0x86, 0x35, 0x28, 0xe1, 0xbd, 0x55, 0xc6, 0x72, 0x6d, 0xb1, 0x51, + 0xcc, 0x46, 0x82, 0xa6, 0x24, 0x02, 0x00, 0x9a, 0x45, 0x65, 0x93, 0x33, 0x3c, 0x9f, 0x52, 0xd9, + 0x6f, 0x8f, 0x10, 0xd0, 0xb1, 0xd2, 0xf2, 0x41, 0x68, 0x08, 0xa9, 0x94, 0x1d, 0xd7, 0x8f, 0x28, + 0x25, 0x64, 0x0e, 0x01, 0x7f, 0xcb, 0x5b, 0xd9, 0x3c, 0x3a, 0x41, 0x0e, 0xd1, 0x97, 0xe4, 0xac, + 0xe1, 0xa5, 0x68, 0x50, 0x53, 0xc8, 0x06, 0x40, 0x13, 0x32, 0x7f, 0x10, 0x1a, 0xcc, 0x3a, 0xef, + 0x3b, 0x08, 0x16, 0xb4, 0x6a, 0x04, 0x2a, 0x09, 0x19, 0xd6, 0x38, 0x98, 0xde, 0x58, 0xd5, 0x26, + 0xe7, 0x6e, 0x30, 0x88, 0xf6, 0x11, 0xcc, 0xc7, 0x08, 0xb2, 0x3f, 0xfe, 0x10, 0xdb, 0x55, 0x5f, + 0x5e, 0x29, 0x63, 0xa1, 0xbf, 0xc5, 0x8c, 0x07, 0xb9, 0x03, 0xa0, 0x6f, 0xc9, 0x7c, 0x3b, 0xe4, + 0x8a, 0x72, 0xa3, 0xe5, 0xa2, 0x98, 0x64, 0xcd, 0x76, 0x87, 0xf4, 0x0d, 0x99, 0xd7, 0xe8, 0xdb, + 0x24, 0xa7, 0xe9, 0x69, 0x1e, 0x2d, 0xa3, 0x62, 0xcc, 0x82, 0xed, 0xce, 0xe8, 0x05, 0x09, 0x8c, + 0x6a, 0xd4, 0x67, 0x55, 0x0b, 0xf4, 0x13, 0xb0, 0x3d, 0x06, 0xf1, 0xd2, 0xdc, 0x5c, 0xad, 0x96, + 0x68, 0x29, 0x60, 0x0e, 0xc1, 0x5c, 0xa4, 0x61, 0xaa, 0x54, 0x56, 0x56, 0xce, 0xd7, 0x48, 0xc0, + 0x86, 0xf0, 0x3b, 0xd1, 0xd9, 0xaf, 0x2e, 0xa5, 0xc1, 0xe2, 0x01, 0x07, 0x1b, 0xa1, 0x4c, 0x12, + 0xe0, 0x89, 0xaf, 0x0c, 0xc4, 0xc1, 0x75, 0x75, 0x9f, 0x84, 0x43, 0x1c, 0x50, 0xc3, 0x7c, 0xa5, + 0xf9, 0xa4, 0x15, 0xaf, 0x2b, 0x6e, 0x6c, 0x42, 0xb0, 0xcf, 0x94, 0xca, 0x7e, 0x7a, 0x64, 0xc1, + 0xc4, 0x9d, 0x34, 0x56, 0xe8, 0x75, 0x77, 0xab, 0xfe, 0x91, 0xd8, 0xc1, 0x42, 0xfb, 0x4f, 0x17, + 0x3a, 0x27, 0xff, 0xef, 0xc1, 0x4d, 0x5f, 0x36, 0xb2, 0x72, 0xf3, 0x7e, 0x4a, 0x3f, 0x33, 0x36, + 0x7b, 0x6e, 0x2c, 0xfb, 0x48, 0xa2, 0x8d, 0x6a, 0xd4, 0xa6, 0x6f, 0x5b, 0x58, 0x2d, 0x48, 0xaa, + 0xe5, 0x77, 0xc2, 0x5c, 0xf7, 0x2d, 0x8a, 0x9a, 0xb1, 0x91, 0xd8, 0xbf, 0x48, 0x7f, 0x7c, 0x91, + 0xcb, 0x5f, 0x3e, 0x99, 0xbb, 0x25, 0xa0, 0xef, 0x47, 0x7b, 0xb8, 0x10, 0xc3, 0xa4, 0xdd, 0x7a, + 0x5c, 0xc4, 0xc5, 0xd4, 0x7b, 0x76, 0x42, 0xdf, 0x0d, 0xdb, 0xbe, 0x11, 0x1a, 0xb6, 0x3d, 0x2c, + 0x76, 0x2f, 0xff, 0x22, 0x28, 0xdc, 0xf7, 0x24, 0x3b, 0xc9, 0xbd, 0x4b, 0x0f, 0x2e, 0xc2, 0x1c, + 0x8f, 0xb9, 0x18, 0x6c, 0x76, 0xcb, 0x30, 0xb9, 0xb6, 0x28, 0x26, 0x1e, 0xe1, 0x2a, 0xbd, 0x24, + 0x2f, 0xf6, 0x63, 0x39, 0x4e, 0xc3, 0xf4, 0x17, 0x47, 0x89, 0x29, 0xcf, 0xf1, 0xeb, 0xf8, 0xe1, + 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0f, 0xfb, 0x67, 0x14, 0x2e, 0x05, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// INDIHubClient is the client API for INDIHub service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type INDIHubClient interface { + RegisterHost(ctx context.Context, in *INDIHubHost, opts ...grpc.CallOption) (*RegisterInfo, error) + INDIServer(ctx context.Context, opts ...grpc.CallOption) (INDIHub_INDIServerClient, error) + PHD2Server(ctx context.Context, opts ...grpc.CallOption) (INDIHub_PHD2ServerClient, error) + SoloMode(ctx context.Context, opts ...grpc.CallOption) (INDIHub_SoloModeClient, error) + BroadcastINDIServer(ctx context.Context, opts ...grpc.CallOption) (INDIHub_BroadcastINDIServerClient, error) + BroadcastPHD2Server(ctx context.Context, opts ...grpc.CallOption) (INDIHub_BroadcastPHD2ServerClient, error) +} + +type iNDIHubClient struct { + cc *grpc.ClientConn +} + +func NewINDIHubClient(cc *grpc.ClientConn) INDIHubClient { + return &iNDIHubClient{cc} +} + +func (c *iNDIHubClient) RegisterHost(ctx context.Context, in *INDIHubHost, opts ...grpc.CallOption) (*RegisterInfo, error) { + out := new(RegisterInfo) + err := c.cc.Invoke(ctx, "/INDIHub/RegisterHost", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *iNDIHubClient) INDIServer(ctx context.Context, opts ...grpc.CallOption) (INDIHub_INDIServerClient, error) { + stream, err := c.cc.NewStream(ctx, &_INDIHub_serviceDesc.Streams[0], "/INDIHub/INDIServer", opts...) + if err != nil { + return nil, err + } + x := &iNDIHubINDIServerClient{stream} + return x, nil +} + +type INDIHub_INDIServerClient interface { + Send(*Response) error + Recv() (*Request, error) + grpc.ClientStream +} + +type iNDIHubINDIServerClient struct { + grpc.ClientStream +} + +func (x *iNDIHubINDIServerClient) Send(m *Response) error { + return x.ClientStream.SendMsg(m) +} + +func (x *iNDIHubINDIServerClient) Recv() (*Request, error) { + m := new(Request) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *iNDIHubClient) PHD2Server(ctx context.Context, opts ...grpc.CallOption) (INDIHub_PHD2ServerClient, error) { + stream, err := c.cc.NewStream(ctx, &_INDIHub_serviceDesc.Streams[1], "/INDIHub/PHD2Server", opts...) + if err != nil { + return nil, err + } + x := &iNDIHubPHD2ServerClient{stream} + return x, nil +} + +type INDIHub_PHD2ServerClient interface { + Send(*Response) error + Recv() (*Request, error) + grpc.ClientStream +} + +type iNDIHubPHD2ServerClient struct { + grpc.ClientStream +} + +func (x *iNDIHubPHD2ServerClient) Send(m *Response) error { + return x.ClientStream.SendMsg(m) +} + +func (x *iNDIHubPHD2ServerClient) Recv() (*Request, error) { + m := new(Request) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *iNDIHubClient) SoloMode(ctx context.Context, opts ...grpc.CallOption) (INDIHub_SoloModeClient, error) { + stream, err := c.cc.NewStream(ctx, &_INDIHub_serviceDesc.Streams[2], "/INDIHub/SoloMode", opts...) + if err != nil { + return nil, err + } + x := &iNDIHubSoloModeClient{stream} + return x, nil +} + +type INDIHub_SoloModeClient interface { + Send(*Response) error + CloseAndRecv() (*SoloSummary, error) + grpc.ClientStream +} + +type iNDIHubSoloModeClient struct { + grpc.ClientStream +} + +func (x *iNDIHubSoloModeClient) Send(m *Response) error { + return x.ClientStream.SendMsg(m) +} + +func (x *iNDIHubSoloModeClient) CloseAndRecv() (*SoloSummary, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(SoloSummary) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *iNDIHubClient) BroadcastINDIServer(ctx context.Context, opts ...grpc.CallOption) (INDIHub_BroadcastINDIServerClient, error) { + stream, err := c.cc.NewStream(ctx, &_INDIHub_serviceDesc.Streams[3], "/INDIHub/BroadcastINDIServer", opts...) + if err != nil { + return nil, err + } + x := &iNDIHubBroadcastINDIServerClient{stream} + return x, nil +} + +type INDIHub_BroadcastINDIServerClient interface { + Send(*Response) error + Recv() (*Request, error) + grpc.ClientStream +} + +type iNDIHubBroadcastINDIServerClient struct { + grpc.ClientStream +} + +func (x *iNDIHubBroadcastINDIServerClient) Send(m *Response) error { + return x.ClientStream.SendMsg(m) +} + +func (x *iNDIHubBroadcastINDIServerClient) Recv() (*Request, error) { + m := new(Request) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *iNDIHubClient) BroadcastPHD2Server(ctx context.Context, opts ...grpc.CallOption) (INDIHub_BroadcastPHD2ServerClient, error) { + stream, err := c.cc.NewStream(ctx, &_INDIHub_serviceDesc.Streams[4], "/INDIHub/BroadcastPHD2Server", opts...) + if err != nil { + return nil, err + } + x := &iNDIHubBroadcastPHD2ServerClient{stream} + return x, nil +} + +type INDIHub_BroadcastPHD2ServerClient interface { + Send(*Response) error + Recv() (*Request, error) + grpc.ClientStream +} + +type iNDIHubBroadcastPHD2ServerClient struct { + grpc.ClientStream +} + +func (x *iNDIHubBroadcastPHD2ServerClient) Send(m *Response) error { + return x.ClientStream.SendMsg(m) +} + +func (x *iNDIHubBroadcastPHD2ServerClient) Recv() (*Request, error) { + m := new(Request) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// INDIHubServer is the server API for INDIHub service. +type INDIHubServer interface { + RegisterHost(context.Context, *INDIHubHost) (*RegisterInfo, error) + INDIServer(INDIHub_INDIServerServer) error + PHD2Server(INDIHub_PHD2ServerServer) error + SoloMode(INDIHub_SoloModeServer) error + BroadcastINDIServer(INDIHub_BroadcastINDIServerServer) error + BroadcastPHD2Server(INDIHub_BroadcastPHD2ServerServer) error +} + +// UnimplementedINDIHubServer can be embedded to have forward compatible implementations. +type UnimplementedINDIHubServer struct { +} + +func (*UnimplementedINDIHubServer) RegisterHost(ctx context.Context, req *INDIHubHost) (*RegisterInfo, error) { + return nil, status.Errorf(codes.Unimplemented, "method RegisterHost not implemented") +} +func (*UnimplementedINDIHubServer) INDIServer(srv INDIHub_INDIServerServer) error { + return status.Errorf(codes.Unimplemented, "method INDIServer not implemented") +} +func (*UnimplementedINDIHubServer) PHD2Server(srv INDIHub_PHD2ServerServer) error { + return status.Errorf(codes.Unimplemented, "method PHD2Server not implemented") +} +func (*UnimplementedINDIHubServer) SoloMode(srv INDIHub_SoloModeServer) error { + return status.Errorf(codes.Unimplemented, "method SoloMode not implemented") +} +func (*UnimplementedINDIHubServer) BroadcastINDIServer(srv INDIHub_BroadcastINDIServerServer) error { + return status.Errorf(codes.Unimplemented, "method BroadcastINDIServer not implemented") +} +func (*UnimplementedINDIHubServer) BroadcastPHD2Server(srv INDIHub_BroadcastPHD2ServerServer) error { + return status.Errorf(codes.Unimplemented, "method BroadcastPHD2Server not implemented") +} + +func RegisterINDIHubServer(s *grpc.Server, srv INDIHubServer) { + s.RegisterService(&_INDIHub_serviceDesc, srv) +} + +func _INDIHub_RegisterHost_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(INDIHubHost) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(INDIHubServer).RegisterHost(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/INDIHub/RegisterHost", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(INDIHubServer).RegisterHost(ctx, req.(*INDIHubHost)) + } + return interceptor(ctx, in, info, handler) +} + +func _INDIHub_INDIServer_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(INDIHubServer).INDIServer(&iNDIHubINDIServerServer{stream}) +} + +type INDIHub_INDIServerServer interface { + Send(*Request) error + Recv() (*Response, error) + grpc.ServerStream +} + +type iNDIHubINDIServerServer struct { + grpc.ServerStream +} + +func (x *iNDIHubINDIServerServer) Send(m *Request) error { + return x.ServerStream.SendMsg(m) +} + +func (x *iNDIHubINDIServerServer) Recv() (*Response, error) { + m := new(Response) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _INDIHub_PHD2Server_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(INDIHubServer).PHD2Server(&iNDIHubPHD2ServerServer{stream}) +} + +type INDIHub_PHD2ServerServer interface { + Send(*Request) error + Recv() (*Response, error) + grpc.ServerStream +} + +type iNDIHubPHD2ServerServer struct { + grpc.ServerStream +} + +func (x *iNDIHubPHD2ServerServer) Send(m *Request) error { + return x.ServerStream.SendMsg(m) +} + +func (x *iNDIHubPHD2ServerServer) Recv() (*Response, error) { + m := new(Response) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _INDIHub_SoloMode_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(INDIHubServer).SoloMode(&iNDIHubSoloModeServer{stream}) +} + +type INDIHub_SoloModeServer interface { + SendAndClose(*SoloSummary) error + Recv() (*Response, error) + grpc.ServerStream +} + +type iNDIHubSoloModeServer struct { + grpc.ServerStream +} + +func (x *iNDIHubSoloModeServer) SendAndClose(m *SoloSummary) error { + return x.ServerStream.SendMsg(m) +} + +func (x *iNDIHubSoloModeServer) Recv() (*Response, error) { + m := new(Response) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _INDIHub_BroadcastINDIServer_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(INDIHubServer).BroadcastINDIServer(&iNDIHubBroadcastINDIServerServer{stream}) +} + +type INDIHub_BroadcastINDIServerServer interface { + Send(*Request) error + Recv() (*Response, error) + grpc.ServerStream +} + +type iNDIHubBroadcastINDIServerServer struct { + grpc.ServerStream +} + +func (x *iNDIHubBroadcastINDIServerServer) Send(m *Request) error { + return x.ServerStream.SendMsg(m) +} + +func (x *iNDIHubBroadcastINDIServerServer) Recv() (*Response, error) { + m := new(Response) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _INDIHub_BroadcastPHD2Server_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(INDIHubServer).BroadcastPHD2Server(&iNDIHubBroadcastPHD2ServerServer{stream}) +} + +type INDIHub_BroadcastPHD2ServerServer interface { + Send(*Request) error + Recv() (*Response, error) + grpc.ServerStream +} + +type iNDIHubBroadcastPHD2ServerServer struct { + grpc.ServerStream +} + +func (x *iNDIHubBroadcastPHD2ServerServer) Send(m *Request) error { + return x.ServerStream.SendMsg(m) +} + +func (x *iNDIHubBroadcastPHD2ServerServer) Recv() (*Response, error) { + m := new(Response) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _INDIHub_serviceDesc = grpc.ServiceDesc{ + ServiceName: "INDIHub", + HandlerType: (*INDIHubServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "RegisterHost", + Handler: _INDIHub_RegisterHost_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "INDIServer", + Handler: _INDIHub_INDIServer_Handler, + ServerStreams: true, + ClientStreams: true, + }, + { + StreamName: "PHD2Server", + Handler: _INDIHub_PHD2Server_Handler, + ServerStreams: true, + ClientStreams: true, + }, + { + StreamName: "SoloMode", + Handler: _INDIHub_SoloMode_Handler, + ClientStreams: true, + }, + { + StreamName: "BroadcastINDIServer", + Handler: _INDIHub_BroadcastINDIServer_Handler, + ServerStreams: true, + ClientStreams: true, + }, + { + StreamName: "BroadcastPHD2Server", + Handler: _INDIHub_BroadcastPHD2Server_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "indihub.proto", +} diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000..c1e803a --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,218 @@ +package proxy + +import ( + "io" + "log" + "net" + "sync" + "time" + + "github.com/indihub-space/agent/hostutils" + "github.com/indihub-space/agent/lib" + "github.com/indihub-space/agent/proto/indihub" +) + +type INDIHubTunnel interface { + Send(*indihub.Response) error + Recv() (*indihub.Request, error) + CloseSend() error +} + +type TcpProxy struct { + Name string + Addr string + Tunnel INDIHubTunnel + + connMu sync.Mutex + connMap map[uint32]net.Conn + + filter *hostutils.INDIFilter +} + +type PublicServerAddr struct { + Name string + Addr string +} + +func New(name string, addr string, tunnel INDIHubTunnel, filter *hostutils.INDIFilter) *TcpProxy { + return &TcpProxy{ + Name: name, + Addr: addr, + Tunnel: tunnel, + connMap: map[uint32]net.Conn{}, + filter: filter, + } +} + +func (p *TcpProxy) Close() { + p.connMu.Lock() + defer p.connMu.Unlock() + + for num, c := range p.connMap { + c.Close() + delete(p.connMap, num) + } +} + +func (p *TcpProxy) connect(cNum uint32) (net.Conn, bool, error) { + p.connMu.Lock() + defer p.connMu.Unlock() + + if c, ok := p.connMap[cNum]; ok { + return c, false, nil + } + + log.Printf("Connecting to local %s... on %s\n", p.Name, p.Addr) + c, err := net.Dial("tcp", p.Addr) + if err != nil { + log.Printf("Could not connect to %s: %s\n", p.Name, err) + return nil, false, err + } + log.Println("...OK") + p.connMap[cNum] = c + return c, true, err +} + +func (p *TcpProxy) reConnect(cNum uint32) (net.Conn, error) { + p.connMu.Lock() + defer p.connMu.Unlock() + + log.Println("Re-connecting to local %s... on %s\n", p.Name, p.Addr) + c, err := net.Dial("tcp", p.Addr) + if err != nil { + log.Printf("Could not connect to %s: %s\n", p.Name, err) + return nil, err + } + log.Println("...OK") + p.connMap[cNum] = c + return c, err +} + +func (p *TcpProxy) close(cNum uint32) { + p.connMu.Lock() + defer p.connMu.Unlock() + + if c, ok := p.connMap[cNum]; ok { + c.Close() + delete(p.connMap, cNum) + } +} + +func (p *TcpProxy) Start(pubAddrChan chan PublicServerAddr, sessionID uint64, sessionToken string) { + wg := sync.WaitGroup{} + addrReceived := false + xmlFlattener := map[uint32]*lib.XmlFlattener{} + for { + // receive request from tunnel + in, err := p.Tunnel.Recv() + if err == io.EOF { + // read done, server closed connection + log.Printf("Exiting. Got EOF from %s tunnel.\n", p.Name) + break + } + if err != nil { + log.Printf("Exiting. Failed to receive a request from %s tunnel: %v\n", p.Name, err) + break + } + + // 1st message always with server address + if !addrReceived && in.Conn == 0 { + pubAddrChan <- PublicServerAddr{ + Name: p.Name, + Addr: string(in.Data), + } + addrReceived = true + continue + } + + // Flatten XML data stream into elements + if xmlFlattener[in.Conn] == nil { + xmlFlattener[in.Conn] = lib.NewXmlFlattener() + } + + xmlCommands := xmlFlattener[in.Conn].FeedChunk(in.Data) + + // check if we need to filter traffic + if p.filter != nil { + xmlCommands = p.filter.FilterIncoming(xmlCommands) + } + + c, isNewConn, err := p.connect(in.Conn) + if err != nil { + if c, err = p.reConnect(in.Conn); err != nil { + log.Printf("Failed to send a request to %s: %v\n", p.Name, err) + time.Sleep(1 * time.Second) + continue + } + } + + if in.Closed { + log.Printf("Client closed connection %d to the cloud, so closing it to local %s too\n", + in.Conn, p.Name) + p.close(in.Conn) + continue + } + + if isNewConn { + // INDI Server responses + wg.Add(1) + go func(conn net.Conn, cNum uint32, sessID uint64) { + defer wg.Done() + readBuf := make([]byte, lib.INDIServerMaxRecvMsgSize) + for { + // receive response from server + n, err := conn.Read(readBuf) + if err == io.EOF { + if conn, err = p.reConnect(cNum); err != nil { + log.Printf("Failed to send a request to %s: %v", p.Name, err) + time.Sleep(1 * time.Second) + continue + } else { + n, err = conn.Read(readBuf) + } + } + if err != nil { + log.Printf("Failed to receive a response from %s: %v", p.Name, err) + return + } + + // send request to tunnel + resp := &indihub.Response{ + Data: readBuf[:n], + Conn: cNum, + SessionID: sessID, + } + if err := p.Tunnel.Send(resp); err != nil { + log.Printf("Failed to send a response to %s tunnel: %v", p.Name, err) + return + } + } + }(c, in.Conn, sessionID) + } + + if len(xmlCommands) == 0 { + continue + } + + for _, command := range xmlCommands { + if _, err = c.Write(command); err == io.EOF { + if c, err = p.reConnect(in.Conn); err != nil { + log.Printf("Failed to send a request to %s: %v", p.Name, err) + time.Sleep(1 * time.Second) + continue + } else { + _, err = c.Write(command) + } + } + if err != nil { + break + } + } + if err != nil { + log.Printf("Failed to send a request to %s: %v", p.Name, err) + time.Sleep(1 * time.Second) + continue + } + } + wg.Wait() +} diff --git a/solo/solo.go b/solo/solo.go new file mode 100644 index 0000000..96ae1fe --- /dev/null +++ b/solo/solo.go @@ -0,0 +1,166 @@ +package solo + +import ( + "bytes" + "io" + "log" + "net" + "sync/atomic" + + "github.com/fatih/color" + + "github.com/indihub-space/agent/lib" + "github.com/indihub-space/agent/proto/indihub" +) + +var ( + setBLOBVector = []byte("