diff --git a/.gitignore b/.gitignore index 3f09bc2e..18da4b52 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ config/dnsconfig/ data/ .vscode/ .idea/ +.vscode/ diff --git a/config/config.go b/config/config.go index fc1bcbf2..649ef1b9 100644 --- a/config/config.go +++ b/config/config.go @@ -43,12 +43,14 @@ type ServerConfig struct { GRPCHost string `yaml:"grpchost"` GRPCPort string `yaml:"grpcport"` GRPCSecure string `yaml:"grpcsecure"` + MQHOST string `yaml:"mqhost"` MasterKey string `yaml:"masterkey"` DNSKey string `yaml:"dnskey"` AllowedOrigin string `yaml:"allowedorigin"` NodeID string `yaml:"nodeid"` RestBackend string `yaml:"restbackend"` AgentBackend string `yaml:"agentbackend"` + MessageQueueBackend string `yaml:"messagequeuebackend"` ClientMode string `yaml:"clientmode"` DNSMode string `yaml:"dnsmode"` SplitDNS string `yaml:"splitdns"` diff --git a/go.mod b/go.mod index b8b169d9..b4df5780 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/gravitl/netmaker go 1.17 require ( + github.com/eclipse/paho.mqtt.golang v1.3.5 github.com/go-playground/validator/v10 v10.10.0 github.com/golang-jwt/jwt/v4 v4.2.0 github.com/golang/protobuf v1.5.2 // indirect @@ -25,6 +26,7 @@ require ( google.golang.org/genproto v0.0.0-20210201151548-94839c025ad4 // indirect google.golang.org/grpc v1.43.0 google.golang.org/protobuf v1.27.1 + gopkg.in/ini.v1 v1.66.2 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) @@ -32,9 +34,7 @@ require ( cloud.google.com/go v0.34.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/eclipse/paho.mqtt.golang v1.3.5 // indirect github.com/felixge/httpsnoop v1.0.1 // indirect - github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 // indirect github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect github.com/google/go-cmp v0.5.5 // indirect @@ -46,7 +46,5 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.0.1 // indirect github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect google.golang.org/appengine v1.4.0 // indirect - gopkg.in/ini.v1 v1.66.2 // indirect ) diff --git a/go.sum b/go.sum index 84dbc91f..e36273a2 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,6 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= -github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 h1:dhy9OQKGBh4zVXbjwbxxHjRxMJtLXj3zfgpBYQaR4Q4= -github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk= github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU= @@ -194,7 +192,6 @@ golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210504132125-bbd867fde50d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 h1:4CSI6oo7cOjJKajidEljs9h+uP0rRZBPPPhcCbj5mw8= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -205,8 +202,6 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -228,7 +223,6 @@ golang.org/x/sys v0.0.0-20210123111255-9b0068b26619/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210216163648-f7da38b97c65/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210309040221-94ec62e08169/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/main.go b/main.go index 4e3b71ca..eb17c38b 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "strconv" "sync" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gravitl/netmaker/auth" controller "github.com/gravitl/netmaker/controllers" "github.com/gravitl/netmaker/database" @@ -18,6 +19,7 @@ import ( "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/serverctl" @@ -105,8 +107,14 @@ func startControllers() { go controller.HandleRESTRequests(&waitnetwork) } - if !servercfg.IsAgentBackend() && !servercfg.IsRestBackend() { - logger.Log(0, "No Server Mode selected, so nothing is being served! Set either Agent mode (AGENT_BACKEND) or Rest mode (REST_BACKEND) to 'true'.") + //Run MessageQueue + if servercfg.IsMessageQueueBackend() { + waitnetwork.Add(1) + go runMessageQueue(&waitnetwork) + } + + if !servercfg.IsAgentBackend() && !servercfg.IsRestBackend() && !servercfg.IsMessageQueueBackend() { + logger.Log(0, "No Server Mode selected, so nothing is being served! Set Agent mode (AGENT_BACKEND) or Rest mode (REST_BACKEND) or MessageQueue (MESSAGEQUEUE_BACKEND) to 'true'.") } waitnetwork.Wait() @@ -158,6 +166,51 @@ func runGRPC(wg *sync.WaitGroup) { logger.Log(0, "Closed DB connection.") } +// Should we be using a context vice a waitgroup???????????? +func runMessageQueue(wg *sync.WaitGroup) { + defer wg.Done() + //refactor netclient.functions.SetupMQTT so can be called from here + //setupMQTT + opts := mqtt.NewClientOptions() + opts.AddBroker(servercfg.GetMessageQueueEndpoint()) + logger.Log(0, "setting broker "+servercfg.GetMessageQueueEndpoint()) + opts.SetDefaultPublishHandler(mq.DefaultHandler) + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + logger.Log(0, "unable to connect to message queue broker, closing down") + return + } + //Set up Subscriptions + if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { + //should make constant for disconnect wait period + client.Disconnect(250) + logger.Log(0, "could not subscribe to message queue ...") + return + } + if token := client.Subscribe("ping/#", 0, mq.Ping); token.Wait() && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "ping sub failed") + } + if token := client.Subscribe("metrics/#", 0, mq.Metrics); token.Wait() && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "metrics sub failed") + } + if token := client.Subscribe("update/localaddress/#", 0, mq.LocalAddressUpdate); token.Wait() && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "metrics sub failed") + } + if token := client.Subscribe("update/ip/#", 0, mq.IPUpdate); token.Wait() && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "metrics sub failed") + } + if token := client.Subscribe("update/publickey/#", 0, mq.PublicKeyUpdate); token.Wait() && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "metrics sub failed") + } + for { + } +} + func authServerUnaryInterceptor() grpc.ServerOption { return grpc.UnaryInterceptor(controller.AuthServerUnaryInterceptor) } diff --git a/models/mqtt.go b/models/mqtt.go index a7e87e1f..da7265f3 100644 --- a/models/mqtt.go +++ b/models/mqtt.go @@ -3,9 +3,8 @@ package models import "golang.zx2c4.com/wireguard/wgctrl/wgtypes" type PeerUpdate struct { - Network string - Interface string - Peers []wgtypes.Peer + Network string + Peers []wgtypes.Peer } type KeyUpdate struct { diff --git a/mq/mq.go b/mq/mq.go new file mode 100644 index 00000000..3620b215 --- /dev/null +++ b/mq/mq.go @@ -0,0 +1,197 @@ +package mq + +import ( + "encoding/json" + "errors" + "log" + "net" + "strings" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gravitl/netmaker/database" + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" +) + +var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "MQTT Message: Topic: "+string(msg.Topic())+" Message: "+string(msg.Payload())) +} + +var Metrics mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "Metrics Handler") +} + +var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "Ping Handler") + //test code --- create a node if it doesn't exit for testing only + createnode := models.Node{PublicKey: "DM5qhLAE20PG9BbfBCger+Ac9D2NDOwCtY1rbYDLf34=", Name: "testnode", + Endpoint: "10.0.0.1", MacAddress: "01:02:03:04:05:06", Password: "password", Network: "skynet"} + if _, err := logic.GetNode("01:02:03:04:05:06", "skynet"); err != nil { + err := logic.CreateNode(&createnode) + if err != nil { + log.Println(err) + } + } + //end of test code + go func() { + mac, net, err := GetMacNetwork(msg.Topic()) + if err != nil { + logger.Log(0, "error getting node.ID sent on ping topic ") + return + } + logger.Log(0, "ping recieved from "+mac+" on net "+net) + node, err := logic.GetNodeByMacAddress(net, mac) + if err != nil { + logger.Log(0, "mq-ping error getting node: "+err.Error()) + record, err := database.FetchRecord(database.NODES_TABLE_NAME, mac+"###"+net) + if err != nil { + logger.Log(0, "error reading database ", err.Error()) + return + } + logger.Log(0, "record from database") + logger.Log(0, record) + return + } + node.SetLastCheckIn() + // --TODO --set client version once feature is implemented. + //node.SetClientVersion(msg.Payload()) + }() +} + +var PublicKeyUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "PublicKey Handler") + go func() { + logger.Log(0, "public key update "+msg.Topic()) + key := string(msg.Payload()) + mac, network, err := GetMacNetwork(msg.Topic()) + if err != nil { + logger.Log(0, "error getting node.ID sent on "+msg.Topic()+" "+err.Error()) + } + node, err := logic.GetNode(mac, network) + if err != nil { + logger.Log(0, "error retrieving node "+msg.Topic()+" "+err.Error()) + } + node.PublicKey = key + node.SetLastCheckIn() + UpdatePeers(&node, client) + }() +} + +var IPUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { + go func() { + ip := string(msg.Payload()) + logger.Log(0, "IPUpdate Handler") + mac, network, err := GetMacNetwork(msg.Topic()) + logger.Log(0, "ipUpdate recieved from "+mac+" on net "+network) + if err != nil { + logger.Log(0, "error getting node.ID sent on update/ip topic ") + return + } + node, err := logic.GetNode(mac, network) + if err != nil { + logger.Log(0, "invalid ID recieved on update/ip topic: "+err.Error()) + return + } + node.Endpoint = ip + node.SetLastCheckIn() + UpdatePeers(&node, client) + }() +} + +func UpdatePeers(node *models.Node, client mqtt.Client) { + peersToUpdate, err := logic.GetPeers(node) + if err != nil { + logger.Log(0, "error retrieving peers") + return + } + for _, peerToUpdate := range peersToUpdate { + var peerUpdate models.PeerUpdate + peerUpdate.Network = node.Network + + myPeers, err := logic.GetPeers(&peerToUpdate) + if err != nil { + logger.Log(0, "uable to get peers "+err.Error()) + continue + } + for i, myPeer := range myPeers { + var allowedIPs []net.IPNet + var allowedIP net.IPNet + endpoint, err := net.ResolveUDPAddr("udp", myPeer.Address+":"+string(myPeer.ListenPort)) + if err != nil { + logger.Log(0, "error setting endpoint for peer "+err.Error()) + } + for _, ipString := range myPeer.AllowedIPs { + _, ipNet, _ := net.ParseCIDR(ipString) + allowedIP = *ipNet + allowedIPs = append(allowedIPs, allowedIP) + } + key, err := wgtypes.ParseKey(myPeer.PublicKey) + if err != nil { + logger.Log(0, "err parsing publickey") + continue + } + peerUpdate.Peers[i].PublicKey = key + peerUpdate.Peers[i].Endpoint = endpoint + peerUpdate.Peers[i].PersistentKeepaliveInterval = time.Duration(myPeer.PersistentKeepalive) + peerUpdate.Peers[i].AllowedIPs = allowedIPs + peerUpdate.Peers[i].ProtocolVersion = 0 + } + //PublishPeerUpdate(my) + data, err := json.Marshal(peerUpdate) + if err != nil { + logger.Log(0, "err marshalling data for peer update "+err.Error()) + } + if token := client.Publish("update/peers/"+peerToUpdate.ID, 0, false, data); token.Wait() && token.Error() != nil { + logger.Log(0, "error publishing peer update "+token.Error().Error()) + } + client.Disconnect(250) + } +} + +var LocalAddressUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "LocalAddressUpdate Handler") + go func() { + logger.Log(0, "LocalAddressUpdate handler") + mac, net, err := GetMacNetwork(msg.Topic()) + if err != nil { + logger.Log(0, "error getting node.ID "+msg.Topic()) + return + } + node, err := logic.GetNode(mac, net) + if err != nil { + logger.Log(0, "error get node "+msg.Topic()) + return + } + node.LocalAddress = string(msg.Payload()) + node.SetLastCheckIn() + }() +} + +func GetMacNetwork(topic string) (string, string, error) { + parts := strings.Split(topic, "/") + count := len(parts) + if count == 1 { + return "", "", errors.New("invalid topic") + } + macnet := strings.Split(parts[count-1], "---") + if len(macnet) != 2 { + return "", "", errors.New("topic id not in mac---network format") + } + return macnet[0], macnet[1], nil +} + +func GetID(topic string) (string, error) { + parts := strings.Split(topic, "/") + count := len(parts) + if count == 1 { + return "", errors.New("invalid topic") + } + macnet := strings.Split(parts[count-1], "---") + if len(macnet) != 2 { + return "", errors.New("topic id not in mac---network format") + } + return macnet[0] + "###" + macnet[1], nil +} diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 639436ae..a5360319 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -100,7 +100,7 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) //check if interface name has changed if so delete. if cfg.Node.Interface != newNode.Interface { if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil { - ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": ", err.Error(), 1) + ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": "+err.Error(), 1) } } newNode.PullChanges = "no" @@ -109,12 +109,12 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) cfg.Node = newNode switch newNode.Action { case models.NODE_DELETE: - if err := RemoveLocalInstance(cfg, cfg.Network); err != nil { - ncutils.Printlog("error deleting local instance: "+err.Error(), 1) + if err := RemoveLocalInstance(&cfg, cfg.Network); err != nil { + ncutils.PrintLog("error deleting local instance: "+err.Error(), 1) return } case models.NODE_UPDATE_KEY: - UpdateKeys(cfg) + UpdateKeys(&cfg, client) case models.NODE_NOOP: default: } @@ -123,12 +123,12 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) ncutils.PrintLog("error updating node configuration: "+err.Error(), 1) } nameserver := cfg.Server.CoreDNSAddr - privateKey, err := wireguard.RetrievePrivKey(data.Network) + privateKey, err := wireguard.RetrievePrivKey(newNode.Network) if err != nil { ncutils.Log("error reading PrivateKey " + err.Error()) return } - if err := wireguard.UpdateWgInterface(cfg.Node.Interface, privateKey, nameserver, data); err != nil { + if err := wireguard.UpdateWgInterface(cfg.Node.Interface, privateKey, nameserver, newNode); err != nil { ncutils.Log("error updating wireguard config " + err.Error()) return } @@ -177,7 +177,7 @@ func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) (*config.ClientCon ncutils.Log("error generating privatekey " + err.Error()) return cfg, err } - if err := wireguard.UpdatePrivateKey(data.Interface, key.String()); err != nil { + if err := wireguard.UpdatePrivateKey(cfg.Node.Interface, key.String()); err != nil { ncutils.Log("error updating wireguard key " + err.Error()) return cfg, err } diff --git a/servercfg/serverconf.go b/servercfg/serverconf.go index 224d1862..52fc6503 100644 --- a/servercfg/serverconf.go +++ b/servercfg/serverconf.go @@ -244,6 +244,18 @@ func GetGRPCPort() string { return grpcport } +// GetMessageQueueEndpoint - gets the message queue endpoint +func GetMessageQueueEndpoint() string { + host, _ := GetPublicIP() + if os.Getenv("MQ_HOST") != "" { + host = os.Getenv("MQ_HOST") + } else if config.Config.Server.MQHOST != "" { + host = config.Config.Server.MQHOST + } + //Do we want MQ port configurable??? + return host + ":1883" +} + // GetMasterKey - gets the configured master key of server func GetMasterKey() string { key := "secretkey" @@ -307,6 +319,21 @@ func IsAgentBackend() bool { return isagent } +// IsMessageQueueBackend - checks if message queue is on or off +func IsMessageQueueBackend() bool { + ismessagequeue := true + if os.Getenv("MESSAGEQUEUE_BACKEND") != "" { + if os.Getenv("MESSAGEQUEUE_BACKEND") == "off" { + ismessagequeue = false + } + } else if config.Config.Server.MessageQueueBackend != "" { + if config.Config.Server.MessageQueueBackend == "off" { + ismessagequeue = false + } + } + return ismessagequeue +} + // IsClientMode - checks if it should run in client mode func IsClientMode() string { isclient := "on"