WIP commit

This commit is contained in:
Matthew R Kasun 2022-01-12 16:23:34 -05:00
parent 2f47c351d8
commit 9be97b3bc8
9 changed files with 293 additions and 22 deletions

1
.gitignore vendored
View file

@ -17,3 +17,4 @@ config/dnsconfig/
data/
.vscode/
.idea/
.vscode/

View file

@ -43,12 +43,14 @@ type ServerConfig struct {
GRPCHost string `yaml:"grpchost"`
GRPCPort string `yaml:"grpcport"`
GRPCSecure string `yaml:"grpcsecure"`
MQHOST string `yaml:"mqhost"`
MasterKey string `yaml:"masterkey"`
DNSKey string `yaml:"dnskey"`
AllowedOrigin string `yaml:"allowedorigin"`
NodeID string `yaml:"nodeid"`
RestBackend string `yaml:"restbackend"`
AgentBackend string `yaml:"agentbackend"`
MessageQueueBackend string `yaml:"messagequeuebackend"`
ClientMode string `yaml:"clientmode"`
DNSMode string `yaml:"dnsmode"`
SplitDNS string `yaml:"splitdns"`

6
go.mod
View file

@ -3,6 +3,7 @@ module github.com/gravitl/netmaker
go 1.17
require (
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/go-playground/validator/v10 v10.10.0
github.com/golang-jwt/jwt/v4 v4.2.0
github.com/golang/protobuf v1.5.2 // indirect
@ -25,6 +26,7 @@ require (
google.golang.org/genproto v0.0.0-20210201151548-94839c025ad4 // indirect
google.golang.org/grpc v1.43.0
google.golang.org/protobuf v1.27.1
gopkg.in/ini.v1 v1.66.2
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
@ -32,9 +34,7 @@ require (
cloud.google.com/go v0.34.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eclipse/paho.mqtt.golang v1.3.5 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/google/go-cmp v0.5.5 // indirect
@ -46,7 +46,5 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.0.1 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
google.golang.org/appengine v1.4.0 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
)

6
go.sum
View file

@ -36,8 +36,6 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 h1:dhy9OQKGBh4zVXbjwbxxHjRxMJtLXj3zfgpBYQaR4Q4=
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.0 h1:u50s323jtVGugKlcYeyzC0etD1HifMjqmJqb8WugfUU=
@ -194,7 +192,6 @@ golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210504132125-bbd867fde50d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985 h1:4CSI6oo7cOjJKajidEljs9h+uP0rRZBPPPhcCbj5mw8=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@ -205,8 +202,6 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -228,7 +223,6 @@ golang.org/x/sys v0.0.0-20210123111255-9b0068b26619/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210216163648-f7da38b97c65/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309040221-94ec62e08169/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210503173754-0981d6026fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

57
main.go
View file

@ -11,6 +11,7 @@ import (
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/auth"
controller "github.com/gravitl/netmaker/controllers"
"github.com/gravitl/netmaker/database"
@ -19,6 +20,7 @@ import (
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/servercfg"
"github.com/gravitl/netmaker/serverctl"
@ -106,8 +108,14 @@ func startControllers() {
go controller.HandleRESTRequests(&waitnetwork)
}
if !servercfg.IsAgentBackend() && !servercfg.IsRestBackend() {
logger.Log(0, "No Server Mode selected, so nothing is being served! Set either Agent mode (AGENT_BACKEND) or Rest mode (REST_BACKEND) to 'true'.")
//Run MessageQueue
if servercfg.IsMessageQueueBackend() {
waitnetwork.Add(1)
go runMessageQueue(&waitnetwork)
}
if !servercfg.IsAgentBackend() && !servercfg.IsRestBackend() && !servercfg.IsMessageQueueBackend() {
logger.Log(0, "No Server Mode selected, so nothing is being served! Set Agent mode (AGENT_BACKEND) or Rest mode (REST_BACKEND) or MessageQueue (MESSAGEQUEUE_BACKEND) to 'true'.")
}
if servercfg.IsClientMode() == "on" {
@ -175,6 +183,51 @@ func runGRPC(wg *sync.WaitGroup) {
logger.Log(0, "Closed DB connection.")
}
// Should we be using a context vice a waitgroup????????????
func runMessageQueue(wg *sync.WaitGroup) {
defer wg.Done()
//refactor netclient.functions.SetupMQTT so can be called from here
//setupMQTT
opts := mqtt.NewClientOptions()
opts.AddBroker(servercfg.GetMessageQueueEndpoint())
logger.Log(0, "setting broker "+servercfg.GetMessageQueueEndpoint())
opts.SetDefaultPublishHandler(mq.DefaultHandler)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Log(0, "unable to connect to message queue broker, closing down")
return
}
//Set up Subscriptions
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
//should make constant for disconnect wait period
client.Disconnect(250)
logger.Log(0, "could not subscribe to message queue ...")
return
}
if token := client.Subscribe("ping/#", 0, mq.Ping); token.Wait() && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "ping sub failed")
}
if token := client.Subscribe("metrics/#", 0, mq.Metrics); token.Wait() && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "metrics sub failed")
}
if token := client.Subscribe("update/localaddress/#", 0, mq.LocalAddressUpdate); token.Wait() && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "metrics sub failed")
}
if token := client.Subscribe("update/ip/#", 0, mq.IPUpdate); token.Wait() && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "metrics sub failed")
}
if token := client.Subscribe("update/publickey/#", 0, mq.PublicKeyUpdate); token.Wait() && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "metrics sub failed")
}
for {
}
}
func authServerUnaryInterceptor() grpc.ServerOption {
return grpc.UnaryInterceptor(controller.AuthServerUnaryInterceptor)
}

View file

@ -3,9 +3,8 @@ package models
import "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
type PeerUpdate struct {
Network string
Interface string
Peers []wgtypes.Peer
Network string
Peers []wgtypes.Peer
}
type KeyUpdate struct {

197
mq/mq.go Normal file
View file

@ -0,0 +1,197 @@
package mq
import (
"encoding/json"
"errors"
"log"
"net"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "MQTT Message: Topic: "+string(msg.Topic())+" Message: "+string(msg.Payload()))
}
var Metrics mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "Metrics Handler")
}
var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "Ping Handler")
//test code --- create a node if it doesn't exit for testing only
createnode := models.Node{PublicKey: "DM5qhLAE20PG9BbfBCger+Ac9D2NDOwCtY1rbYDLf34=", Name: "testnode",
Endpoint: "10.0.0.1", MacAddress: "01:02:03:04:05:06", Password: "password", Network: "skynet"}
if _, err := logic.GetNode("01:02:03:04:05:06", "skynet"); err != nil {
err := logic.CreateNode(&createnode)
if err != nil {
log.Println(err)
}
}
//end of test code
go func() {
mac, net, err := GetMacNetwork(msg.Topic())
if err != nil {
logger.Log(0, "error getting node.ID sent on ping topic ")
return
}
logger.Log(0, "ping recieved from "+mac+" on net "+net)
node, err := logic.GetNodeByMacAddress(net, mac)
if err != nil {
logger.Log(0, "mq-ping error getting node: "+err.Error())
record, err := database.FetchRecord(database.NODES_TABLE_NAME, mac+"###"+net)
if err != nil {
logger.Log(0, "error reading database ", err.Error())
return
}
logger.Log(0, "record from database")
logger.Log(0, record)
return
}
node.SetLastCheckIn()
// --TODO --set client version once feature is implemented.
//node.SetClientVersion(msg.Payload())
}()
}
var PublicKeyUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "PublicKey Handler")
go func() {
logger.Log(0, "public key update "+msg.Topic())
key := string(msg.Payload())
mac, network, err := GetMacNetwork(msg.Topic())
if err != nil {
logger.Log(0, "error getting node.ID sent on "+msg.Topic()+" "+err.Error())
}
node, err := logic.GetNode(mac, network)
if err != nil {
logger.Log(0, "error retrieving node "+msg.Topic()+" "+err.Error())
}
node.PublicKey = key
node.SetLastCheckIn()
UpdatePeers(&node, client)
}()
}
var IPUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
go func() {
ip := string(msg.Payload())
logger.Log(0, "IPUpdate Handler")
mac, network, err := GetMacNetwork(msg.Topic())
logger.Log(0, "ipUpdate recieved from "+mac+" on net "+network)
if err != nil {
logger.Log(0, "error getting node.ID sent on update/ip topic ")
return
}
node, err := logic.GetNode(mac, network)
if err != nil {
logger.Log(0, "invalid ID recieved on update/ip topic: "+err.Error())
return
}
node.Endpoint = ip
node.SetLastCheckIn()
UpdatePeers(&node, client)
}()
}
func UpdatePeers(node *models.Node, client mqtt.Client) {
peersToUpdate, err := logic.GetPeers(node)
if err != nil {
logger.Log(0, "error retrieving peers")
return
}
for _, peerToUpdate := range peersToUpdate {
var peerUpdate models.PeerUpdate
peerUpdate.Network = node.Network
myPeers, err := logic.GetPeers(&peerToUpdate)
if err != nil {
logger.Log(0, "uable to get peers "+err.Error())
continue
}
for i, myPeer := range myPeers {
var allowedIPs []net.IPNet
var allowedIP net.IPNet
endpoint, err := net.ResolveUDPAddr("udp", myPeer.Address+":"+string(myPeer.ListenPort))
if err != nil {
logger.Log(0, "error setting endpoint for peer "+err.Error())
}
for _, ipString := range myPeer.AllowedIPs {
_, ipNet, _ := net.ParseCIDR(ipString)
allowedIP = *ipNet
allowedIPs = append(allowedIPs, allowedIP)
}
key, err := wgtypes.ParseKey(myPeer.PublicKey)
if err != nil {
logger.Log(0, "err parsing publickey")
continue
}
peerUpdate.Peers[i].PublicKey = key
peerUpdate.Peers[i].Endpoint = endpoint
peerUpdate.Peers[i].PersistentKeepaliveInterval = time.Duration(myPeer.PersistentKeepalive)
peerUpdate.Peers[i].AllowedIPs = allowedIPs
peerUpdate.Peers[i].ProtocolVersion = 0
}
//PublishPeerUpdate(my)
data, err := json.Marshal(peerUpdate)
if err != nil {
logger.Log(0, "err marshalling data for peer update "+err.Error())
}
if token := client.Publish("update/peers/"+peerToUpdate.ID, 0, false, data); token.Wait() && token.Error() != nil {
logger.Log(0, "error publishing peer update "+token.Error().Error())
}
client.Disconnect(250)
}
}
var LocalAddressUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "LocalAddressUpdate Handler")
go func() {
logger.Log(0, "LocalAddressUpdate handler")
mac, net, err := GetMacNetwork(msg.Topic())
if err != nil {
logger.Log(0, "error getting node.ID "+msg.Topic())
return
}
node, err := logic.GetNode(mac, net)
if err != nil {
logger.Log(0, "error get node "+msg.Topic())
return
}
node.LocalAddress = string(msg.Payload())
node.SetLastCheckIn()
}()
}
func GetMacNetwork(topic string) (string, string, error) {
parts := strings.Split(topic, "/")
count := len(parts)
if count == 1 {
return "", "", errors.New("invalid topic")
}
macnet := strings.Split(parts[count-1], "---")
if len(macnet) != 2 {
return "", "", errors.New("topic id not in mac---network format")
}
return macnet[0], macnet[1], nil
}
func GetID(topic string) (string, error) {
parts := strings.Split(topic, "/")
count := len(parts)
if count == 1 {
return "", errors.New("invalid topic")
}
macnet := strings.Split(parts[count-1], "---")
if len(macnet) != 2 {
return "", errors.New("topic id not in mac---network format")
}
return macnet[0] + "###" + macnet[1], nil
}

View file

@ -100,7 +100,7 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message)
//check if interface name has changed if so delete.
if cfg.Node.Interface != newNode.Interface {
if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": ", err.Error(), 1)
ncutils.PrintLog("could not delete old interface "+cfg.Node.Interface+": "+err.Error(), 1)
}
}
newNode.PullChanges = "no"
@ -109,12 +109,12 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message)
cfg.Node = newNode
switch newNode.Action {
case models.NODE_DELETE:
if err := RemoveLocalInstance(cfg, cfg.Network); err != nil {
ncutils.Printlog("error deleting local instance: "+err.Error(), 1)
if err := RemoveLocalInstance(&cfg, cfg.Network); err != nil {
ncutils.PrintLog("error deleting local instance: "+err.Error(), 1)
return
}
case models.NODE_UPDATE_KEY:
UpdateKeys(cfg)
UpdateKeys(&cfg, client)
case models.NODE_NOOP:
default:
}
@ -123,12 +123,12 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message)
ncutils.PrintLog("error updating node configuration: "+err.Error(), 1)
}
nameserver := cfg.Server.CoreDNSAddr
privateKey, err := wireguard.RetrievePrivKey(data.Network)
privateKey, err := wireguard.RetrievePrivKey(newNode.Network)
if err != nil {
ncutils.Log("error reading PrivateKey " + err.Error())
return
}
if err := wireguard.UpdateWgInterface(cfg.Node.Interface, privateKey, nameserver, data); err != nil {
if err := wireguard.UpdateWgInterface(cfg.Node.Interface, privateKey, nameserver, newNode); err != nil {
ncutils.Log("error updating wireguard config " + err.Error())
return
}
@ -177,7 +177,7 @@ func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) (*config.ClientCon
ncutils.Log("error generating privatekey " + err.Error())
return cfg, err
}
if err := wireguard.UpdatePrivateKey(data.Interface, key.String()); err != nil {
if err := wireguard.UpdatePrivateKey(cfg.Node.Interface, key.String()); err != nil {
ncutils.Log("error updating wireguard key " + err.Error())
return cfg, err
}

View file

@ -244,6 +244,18 @@ func GetGRPCPort() string {
return grpcport
}
// GetMessageQueueEndpoint - gets the message queue endpoint
func GetMessageQueueEndpoint() string {
host, _ := GetPublicIP()
if os.Getenv("MQ_HOST") != "" {
host = os.Getenv("MQ_HOST")
} else if config.Config.Server.MQHOST != "" {
host = config.Config.Server.MQHOST
}
//Do we want MQ port configurable???
return host + ":1883"
}
// GetMasterKey - gets the configured master key of server
func GetMasterKey() string {
key := "secretkey"
@ -307,6 +319,21 @@ func IsAgentBackend() bool {
return isagent
}
// IsMessageQueueBackend - checks if message queue is on or off
func IsMessageQueueBackend() bool {
ismessagequeue := true
if os.Getenv("MESSAGEQUEUE_BACKEND") != "" {
if os.Getenv("MESSAGEQUEUE_BACKEND") == "off" {
ismessagequeue = false
}
} else if config.Config.Server.MessageQueueBackend != "" {
if config.Config.Server.MessageQueueBackend == "off" {
ismessagequeue = false
}
}
return ismessagequeue
}
// IsClientMode - checks if it should run in client mode
func IsClientMode() string {
isclient := "on"